Skip to main content

compio_driver/sys/poll/
mod.rs

1#[cfg_attr(all(doc, docsrs), doc(cfg(all())))]
2#[allow(unused_imports)]
3pub use std::os::fd::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
4#[cfg(aio)]
5use std::ptr::NonNull;
6use std::{
7    collections::{HashMap, VecDeque},
8    io,
9    num::NonZeroUsize,
10    pin::Pin,
11    sync::Arc,
12    task::{Poll, Wake, Waker},
13    time::Duration,
14};
15
16use compio_buf::BufResult;
17use compio_log::{instrument, trace};
18use flume::{Receiver, Sender};
19use polling::{Event, Events, Poller};
20use smallvec::SmallVec;
21
22use crate::{
23    AsyncifyPool, BufferPool, DriverType, Entry, ErasedKey, ProactorBuilder,
24    key::{BorrowedKey, RefExt},
25    op::Interest,
26    syscall,
27};
28
29mod extra;
30pub(in crate::sys) use extra::Extra;
31pub(crate) mod op;
32
33struct Track {
34    arg: WaitArg,
35    ready: bool,
36}
37
38impl From<WaitArg> for Track {
39    fn from(arg: WaitArg) -> Self {
40        Self { arg, ready: false }
41    }
42}
43
44/// Abstraction of operations.
45///
46/// # Safety
47///
48/// If `pre_submit` returns `Decision::Wait`, `op_type` must also return
49/// `Some(OpType::Fd)` with same fds as the `WaitArg`s. Similarly, if
50/// `pre_submit` returns `Decision::Aio`, `op_type` must return
51/// `Some(OpType::Aio)` with the correct `aiocb` pointer.
52pub unsafe trait OpCode {
53    /// Perform the operation before submit, and return [`Decision`] to
54    /// indicate whether submitting the operation to polling is required.
55    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision>;
56
57    /// Get the operation type when an event is occurred.
58    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
59        None
60    }
61
62    /// Perform the operation after received corresponding
63    /// event. If this operation is blocking, the return value should be
64    /// [`Poll::Ready`].
65    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>>;
66}
67
68pub use OpCode as PollOpCode;
69
70/// One item in local or more items on heap.
71type Multi<T> = SmallVec<[T; 1]>;
72
73/// Result of [`OpCode::pre_submit`].
74#[non_exhaustive]
75pub enum Decision {
76    /// Instant operation, no need to submit
77    Completed(usize),
78    /// Async operation, needs to submit
79    Wait(Multi<WaitArg>),
80    /// Blocking operation, needs to be spawned in another thread
81    Blocking,
82    /// AIO operation, needs to be spawned to the kernel.
83    #[cfg(aio)]
84    Aio(AioControl),
85}
86
87impl Decision {
88    /// Decide to wait for the given fd with the given interest.
89    pub fn wait_for(fd: RawFd, interest: Interest) -> Self {
90        Self::Wait(SmallVec::from_buf([WaitArg { fd, interest }]))
91    }
92
93    /// Decide to wait for many fds.
94    pub fn wait_for_many<I: IntoIterator<Item = WaitArg>>(args: I) -> Self {
95        Self::Wait(Multi::from_iter(args))
96    }
97
98    /// Decide to wait for the given fd to be readable.
99    pub fn wait_readable(fd: RawFd) -> Self {
100        Self::wait_for(fd, Interest::Readable)
101    }
102
103    /// Decide to wait for the given fd to be writable.
104    pub fn wait_writable(fd: RawFd) -> Self {
105        Self::wait_for(fd, Interest::Writable)
106    }
107
108    /// Decide to spawn an AIO operation. `submit` is a method like `aio_read`.
109    #[cfg(aio)]
110    pub fn aio(
111        cb: &mut libc::aiocb,
112        submit: unsafe extern "C" fn(*mut libc::aiocb) -> i32,
113    ) -> Self {
114        Self::Aio(AioControl {
115            aiocbp: NonNull::from(cb),
116            submit,
117        })
118    }
119}
120
121/// Meta of polling operations.
122#[derive(Debug, Clone, Copy)]
123pub struct WaitArg {
124    /// The raw fd of the operation.
125    pub fd: RawFd,
126    /// The interest to be registered.
127    pub interest: Interest,
128}
129
130impl WaitArg {
131    /// Create a new readable `WaitArg`.
132    pub fn readable(fd: RawFd) -> Self {
133        Self {
134            fd,
135            interest: Interest::Readable,
136        }
137    }
138
139    /// Create a new writable `WaitArg`.
140    pub fn writable(fd: RawFd) -> Self {
141        Self {
142            fd,
143            interest: Interest::Writable,
144        }
145    }
146}
147
148/// Meta of AIO operations.
149#[cfg(aio)]
150#[derive(Debug, Clone, Copy)]
151pub struct AioControl {
152    /// Pointer of the control block.
153    pub aiocbp: NonNull<libc::aiocb>,
154    /// The aio_* submit function.
155    pub submit: unsafe extern "C" fn(*mut libc::aiocb) -> i32,
156}
157
158#[derive(Debug, Default)]
159struct FdQueue {
160    read_queue: VecDeque<ErasedKey>,
161    write_queue: VecDeque<ErasedKey>,
162}
163
164/// A token to remove an interest from `FdQueue`.
165///
166/// It is returned when an interest is pushed, and can be used to remove the
167/// interest later. However do be careful that the index may be invalid or does
168/// not correspond to the one inserted if other interests are added or removed
169/// before it (toctou).
170struct RemoveToken {
171    idx: usize,
172    is_read: bool,
173}
174
175impl RemoveToken {
176    fn read(idx: usize) -> Self {
177        Self { idx, is_read: true }
178    }
179
180    fn write(idx: usize) -> Self {
181        Self {
182            idx,
183            is_read: false,
184        }
185    }
186}
187
188impl FdQueue {
189    fn is_empty(&self) -> bool {
190        self.read_queue.is_empty() && self.write_queue.is_empty()
191    }
192
193    fn remove_token(&mut self, token: RemoveToken) -> Option<ErasedKey> {
194        if token.is_read {
195            self.read_queue.remove(token.idx)
196        } else {
197            self.write_queue.remove(token.idx)
198        }
199    }
200
201    pub fn push_back_interest(&mut self, key: ErasedKey, interest: Interest) -> RemoveToken {
202        match interest {
203            Interest::Readable => {
204                self.read_queue.push_back(key);
205                RemoveToken::read(self.read_queue.len() - 1)
206            }
207            Interest::Writable => {
208                self.write_queue.push_back(key);
209                RemoveToken::write(self.write_queue.len() - 1)
210            }
211        }
212    }
213
214    pub fn push_front_interest(&mut self, key: ErasedKey, interest: Interest) -> RemoveToken {
215        let is_read = match interest {
216            Interest::Readable => {
217                self.read_queue.push_front(key);
218                true
219            }
220            Interest::Writable => {
221                self.write_queue.push_front(key);
222                false
223            }
224        };
225        RemoveToken { idx: 0, is_read }
226    }
227
228    pub fn remove(&mut self, key: &ErasedKey) {
229        self.read_queue.retain(|k| k != key);
230        self.write_queue.retain(|k| k != key);
231    }
232
233    pub fn event(&self) -> Event {
234        let mut event = Event::none(0);
235        if let Some(key) = self.read_queue.front() {
236            event.readable = true;
237            event.key = key.as_raw();
238        }
239        if let Some(key) = self.write_queue.front() {
240            event.writable = true;
241            event.key = key.as_raw();
242        }
243        event
244    }
245
246    pub fn pop_interest(&mut self, event: &Event) -> Option<(ErasedKey, Interest)> {
247        if event.readable
248            && let Some(key) = self.read_queue.pop_front()
249        {
250            return Some((key, Interest::Readable));
251        }
252        if event.writable
253            && let Some(key) = self.write_queue.pop_front()
254        {
255            return Some((key, Interest::Writable));
256        }
257        None
258    }
259}
260
261/// Represents the filter type of kqueue. `polling` crate doesn't expose such
262/// API, and we need to know about it when `cancel` is called.
263#[non_exhaustive]
264pub enum OpType {
265    /// The operation polls an fd.
266    Fd(Multi<RawFd>),
267    /// The operation submits an AIO.
268    #[cfg(aio)]
269    Aio(NonNull<libc::aiocb>),
270}
271
272impl OpType {
273    /// Create an [`OpType::Fd`] with one [`RawFd`].
274    pub fn fd(fd: RawFd) -> Self {
275        Self::Fd(SmallVec::from_buf([fd]))
276    }
277
278    /// Create an [`OpType::Fd`] with multiple [`RawFd`]s.
279    pub fn multi_fd<I: IntoIterator<Item = RawFd>>(fds: I) -> Self {
280        Self::Fd(Multi::from_iter(fds))
281    }
282}
283
284/// Low-level driver of polling.
285pub(crate) struct Driver {
286    events: Events,
287    notify: Arc<Notify>,
288    registry: HashMap<RawFd, FdQueue>,
289    pool: AsyncifyPool,
290    completed_tx: Sender<Entry>,
291    completed_rx: Receiver<Entry>,
292}
293
294impl Driver {
295    pub fn new(builder: &ProactorBuilder) -> io::Result<Self> {
296        instrument!(compio_log::Level::TRACE, "new", ?builder);
297        trace!("new poll driver");
298
299        let events = if let Some(cap) = NonZeroUsize::new(builder.capacity as _) {
300            Events::with_capacity(cap)
301        } else {
302            Events::new()
303        };
304        let poll = Poller::new()?;
305        let notify = Arc::new(Notify::new(poll));
306        let (completed_tx, completed_rx) = flume::unbounded();
307
308        Ok(Self {
309            events,
310            notify,
311            registry: HashMap::new(),
312            pool: builder.create_or_get_thread_pool(),
313            completed_tx,
314            completed_rx,
315        })
316    }
317
318    pub fn driver_type(&self) -> DriverType {
319        DriverType::Poll
320    }
321
322    pub(in crate::sys) fn default_extra(&self) -> Extra {
323        Extra::new()
324    }
325
326    fn poller(&self) -> &Poller {
327        &self.notify.poll
328    }
329
330    fn with_events<F, R>(&mut self, f: F) -> R
331    where
332        F: FnOnce(&mut Self, &mut Events) -> R,
333    {
334        let mut events = std::mem::take(&mut self.events);
335        let res = f(self, &mut events);
336        self.events = events;
337        res
338    }
339
340    fn try_get_queue(&mut self, fd: RawFd) -> Option<&mut FdQueue> {
341        self.registry.get_mut(&fd)
342    }
343
344    fn get_queue(&mut self, fd: RawFd) -> &mut FdQueue {
345        self.try_get_queue(fd).expect("the fd should be submitted")
346    }
347
348    /// Submit a new operation to the end of the queue.
349    ///
350    ///  # Safety
351    /// The input fd should be valid.
352    unsafe fn submit(&mut self, key: ErasedKey, arg: WaitArg) -> io::Result<()> {
353        let Self {
354            registry, notify, ..
355        } = self;
356        let need_add = !registry.contains_key(&arg.fd);
357        let queue = registry.entry(arg.fd).or_default();
358        let token = queue.push_back_interest(key, arg.interest);
359        let event = queue.event();
360        let res = if need_add {
361            // SAFETY: the events are deleted correctly.
362            unsafe { notify.poll.add(arg.fd, event) }
363        } else {
364            let fd = unsafe { BorrowedFd::borrow_raw(arg.fd) };
365            notify.poll.modify(fd, event)
366        };
367        if res.is_err() {
368            // Rollback the push if submission failed.
369            queue.remove_token(token);
370            if queue.is_empty() {
371                registry.remove(&arg.fd);
372            }
373        }
374
375        res
376    }
377
378    /// Submit a new operation to the front of the queue.
379    ///
380    /// # Safety
381    /// The input fd should be valid.
382    unsafe fn submit_front(&mut self, key: ErasedKey, arg: WaitArg) -> io::Result<()> {
383        let need_add = !self.registry.contains_key(&arg.fd);
384        let queue = self.registry.entry(arg.fd).or_default();
385        queue.push_front_interest(key, arg.interest);
386        let event = queue.event();
387        if need_add {
388            // SAFETY: the events are deleted correctly.
389            unsafe { self.poller().add(arg.fd, event)? }
390        } else {
391            let fd = unsafe { BorrowedFd::borrow_raw(arg.fd) };
392            self.poller().modify(fd, event)?;
393        }
394        Ok(())
395    }
396
397    fn renew(&mut self, fd: BorrowedFd, renew_event: Event) -> io::Result<()> {
398        if !renew_event.readable && !renew_event.writable {
399            self.poller().delete(fd)?;
400            self.registry.remove(&fd.as_raw_fd());
401        } else {
402            self.poller().modify(fd, renew_event)?;
403        }
404        Ok(())
405    }
406
407    /// Remove one interest from the queue.
408    fn remove_one(&mut self, key: &ErasedKey, fd: RawFd) -> io::Result<()> {
409        let Some(queue) = self.try_get_queue(fd) else {
410            return Ok(());
411        };
412        queue.remove(key);
413        let renew_event = queue.event();
414        if queue.is_empty() {
415            self.registry.remove(&fd);
416        }
417        self.renew(unsafe { BorrowedFd::borrow_raw(fd) }, renew_event)
418    }
419
420    /// Remove one interest from the queue, and emit a cancelled entry.
421    fn cancel_one(&mut self, key: ErasedKey, fd: RawFd) -> Option<Entry> {
422        self.remove_one(&key, fd)
423            .map_or(None, |_| Some(Entry::new_cancelled(key)))
424    }
425
426    pub fn attach(&mut self, _fd: RawFd) -> io::Result<()> {
427        Ok(())
428    }
429
430    pub fn cancel(&mut self, key: ErasedKey) {
431        let op_type = key.borrow().pinned_op().op_type();
432        match op_type {
433            None => {}
434            Some(OpType::Fd(fds)) => {
435                let mut pushed = false;
436                for fd in fds {
437                    let entry = self.cancel_one(key.clone(), fd);
438                    if !pushed && let Some(entry) = entry {
439                        _ = self.completed_tx.send(entry);
440                        pushed = true;
441                    }
442                }
443            }
444            #[cfg(aio)]
445            Some(OpType::Aio(aiocbp)) => {
446                let aiocb = unsafe { aiocbp.as_ref() };
447                let fd = aiocb.aio_fildes;
448                syscall!(libc::aio_cancel(fd, aiocbp.as_ptr())).ok();
449            }
450        }
451    }
452
453    pub fn push(&mut self, key: ErasedKey) -> Poll<io::Result<usize>> {
454        instrument!(compio_log::Level::TRACE, "push", ?key);
455        match { key.borrow().pinned_op().pre_submit()? } {
456            Decision::Wait(args) => {
457                key.borrow()
458                    .extra_mut()
459                    .as_poll_mut()
460                    .set_args(args.clone());
461                for arg in args.iter().copied() {
462                    // SAFETY: fd is from the OpCode.
463                    let res = unsafe { self.submit(key.clone(), arg) };
464                    // if submission fails, remove all previously submitted fds.
465                    if let Err(e) = res {
466                        args.into_iter().for_each(|arg| {
467                            // we don't care about renew errors
468                            let _ = self.remove_one(&key, arg.fd);
469                        });
470                        return Poll::Ready(Err(e));
471                    }
472                    trace!("register {:?}", arg);
473                }
474                Poll::Pending
475            }
476            Decision::Completed(res) => Poll::Ready(Ok(res)),
477            Decision::Blocking => {
478                self.push_blocking(key);
479                Poll::Pending
480            }
481            #[cfg(aio)]
482            Decision::Aio(AioControl { mut aiocbp, submit }) => {
483                let aiocb = unsafe { aiocbp.as_mut() };
484                let user_data = key.as_raw();
485                #[cfg(freebsd)]
486                {
487                    // sigev_notify_kqueue
488                    aiocb.aio_sigevent.sigev_signo = self.as_raw_fd();
489                    aiocb.aio_sigevent.sigev_notify = libc::SIGEV_KEVENT;
490                    aiocb.aio_sigevent.sigev_value.sival_ptr = user_data as _;
491                }
492                #[cfg(solarish)]
493                let mut notify = libc::port_notify {
494                    portnfy_port: self.as_raw_fd(),
495                    portnfy_user: user_data as _,
496                };
497                #[cfg(solarish)]
498                {
499                    aiocb.aio_sigevent.sigev_notify = libc::SIGEV_PORT;
500                    aiocb.aio_sigevent.sigev_value.sival_ptr = &mut notify as *mut _ as _;
501                }
502                match syscall!(submit(aiocbp.as_ptr())) {
503                    Ok(_) => {
504                        // Key is successfully submitted, leak it on this side.
505                        key.into_raw();
506                        Poll::Pending
507                    }
508                    // FreeBSD:
509                    //   * EOPNOTSUPP: It's on a filesystem without AIO support. Just fallback to
510                    //     blocking IO.
511                    //   * EAGAIN: The process-wide queue is full. No safe way to remove the (maybe)
512                    //     dead entries.
513                    // Solarish:
514                    //   * EAGAIN: Allocation failed.
515                    Err(e)
516                        if matches!(
517                            e.raw_os_error(),
518                            Some(libc::EOPNOTSUPP) | Some(libc::EAGAIN)
519                        ) =>
520                    {
521                        self.push_blocking(key);
522                        Poll::Pending
523                    }
524                    Err(e) => Poll::Ready(Err(e)),
525                }
526            }
527        }
528    }
529
530    fn push_blocking(&mut self, key: ErasedKey) {
531        let waker = self.waker();
532        let completed = self.completed_tx.clone();
533        // SAFETY: we're submitting into the driver, so it's safe to freeze here.
534        let mut key = unsafe { key.freeze() };
535
536        let mut closure = move || {
537            let poll = key.pinned_op().operate();
538            let res = match poll {
539                Poll::Pending => unreachable!("this operation is not non-blocking"),
540                Poll::Ready(res) => res,
541            };
542            let _ = completed.send(Entry::new(key.into_inner(), res));
543            waker.wake();
544        };
545
546        while let Err(e) = self.pool.dispatch(closure) {
547            closure = e.0;
548            self.poll_completed();
549        }
550    }
551
552    fn poll_completed(&mut self) -> bool {
553        let mut ret = false;
554        while let Ok(entry) = self.completed_rx.try_recv() {
555            entry.notify();
556            ret = true;
557        }
558        ret
559    }
560
561    #[allow(clippy::blocks_in_conditions)]
562    fn poll_one(&mut self, event: Event, fd: RawFd) -> io::Result<()> {
563        let queue = self.get_queue(fd);
564
565        if let Some((key, _)) = queue.pop_interest(&event)
566            && let mut op = key.borrow()
567            && op.extra_mut().as_poll_mut().handle_event(fd)
568        {
569            // Add brace here to force `Ref` drop within the scrutinee
570            match { op.pinned_op().operate() } {
571                // Submit all fd's back to the front of the queue
572                Poll::Pending => {
573                    let extra = op.extra_mut().as_poll_mut();
574                    extra.reset();
575                    // `FdQueue` may have been removed, need to submit again
576                    for t in extra.track.iter() {
577                        let res = unsafe { self.submit_front(key.clone(), t.arg) };
578                        if let Err(e) = res {
579                            // On error, remove all previously submitted fds.
580                            for t in extra.track.iter() {
581                                let _ = self.remove_one(&key, t.arg.fd);
582                            }
583                            return Err(e);
584                        }
585                    }
586                }
587                Poll::Ready(res) => {
588                    drop(op);
589                    Entry::new(key, res).notify()
590                }
591            };
592        }
593
594        let renew_event = self.get_queue(fd).event();
595        let fd = unsafe { BorrowedFd::borrow_raw(fd) };
596        self.renew(fd, renew_event)
597    }
598
599    pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
600        instrument!(compio_log::Level::TRACE, "poll", ?timeout);
601        if self.poll_completed() {
602            return Ok(());
603        }
604        self.events.clear();
605        self.notify.poll.wait(&mut self.events, timeout)?;
606        if self.events.is_empty() && timeout.is_some() {
607            return Err(io::Error::from_raw_os_error(libc::ETIMEDOUT));
608        }
609        self.with_events(|this, events| {
610            for event in events.iter() {
611                trace!("receive {} for {:?}", event.key, event);
612                // SAFETY: user_data is promised to be valid.
613                let key = unsafe { BorrowedKey::from_raw(event.key) };
614                let mut op = key.borrow();
615                let op_type = op.pinned_op().op_type();
616                match op_type {
617                    None => {
618                        // On epoll, multiple event may be received even if it is registered as
619                        // one-shot. It is safe to ignore it.
620                        trace!("op {} is completed", event.key);
621                    }
622                    Some(OpType::Fd(_)) => {
623                        // FIXME: This should not happen
624                        let Some(fd) = op.extra().as_poll().next_fd() else {
625                            return Ok(());
626                        };
627                        drop(op);
628                        this.poll_one(event, fd)?;
629                    }
630                    #[cfg(aio)]
631                    Some(OpType::Aio(aiocbp)) => {
632                        drop(op);
633                        let err = unsafe { libc::aio_error(aiocbp.as_ptr()) };
634                        let res = match err {
635                            // If the user_data is reused but the previously registered event still
636                            // emits (for example, HUP in epoll; however it is impossible now
637                            // because we only use AIO on FreeBSD), we'd better ignore the current
638                            // one and wait for the real event.
639                            libc::EINPROGRESS => {
640                                trace!("op {} is not completed", key.as_raw());
641                                continue;
642                            }
643                            libc::ECANCELED => {
644                                // Remove the aiocb from kqueue.
645                                unsafe { libc::aio_return(aiocbp.as_ptr()) };
646                                Err(io::Error::from_raw_os_error(libc::ETIMEDOUT))
647                            }
648                            _ => {
649                                syscall!(libc::aio_return(aiocbp.as_ptr())).map(|res| res as usize)
650                            }
651                        };
652                        let key = unsafe { ErasedKey::from_raw(event.key) };
653                        Entry::new(key, res).notify()
654                    }
655                }
656            }
657
658            Ok(())
659        })
660    }
661
662    pub fn waker(&self) -> Waker {
663        Waker::from(self.notify.clone())
664    }
665
666    pub fn create_buffer_pool(
667        &mut self,
668        buffer_len: u16,
669        buffer_size: usize,
670    ) -> io::Result<BufferPool> {
671        #[cfg(fusion)]
672        {
673            Ok(BufferPool::new_poll(crate::FallbackBufferPool::new(
674                buffer_len,
675                buffer_size,
676            )))
677        }
678        #[cfg(not(fusion))]
679        {
680            Ok(BufferPool::new(buffer_len, buffer_size))
681        }
682    }
683
684    /// # Safety
685    ///
686    /// caller must make sure release the buffer pool with correct driver
687    pub unsafe fn release_buffer_pool(&mut self, _: BufferPool) -> io::Result<()> {
688        Ok(())
689    }
690
691    pub fn pop_multishot(&mut self, _: &ErasedKey) -> Option<BufResult<usize, crate::sys::Extra>> {
692        None
693    }
694}
695
696impl AsRawFd for Driver {
697    fn as_raw_fd(&self) -> RawFd {
698        self.poller().as_raw_fd()
699    }
700}
701
702impl Drop for Driver {
703    fn drop(&mut self) {
704        for fd in self.registry.keys() {
705            unsafe {
706                let fd = BorrowedFd::borrow_raw(*fd);
707                self.poller().delete(fd).ok();
708            }
709        }
710    }
711}
712
713impl Entry {
714    pub(crate) fn new_cancelled(key: ErasedKey) -> Self {
715        Entry::new(key, Err(io::Error::from_raw_os_error(libc::ETIMEDOUT)))
716    }
717}
718
719/// A notify handle to the inner driver.
720pub(crate) struct Notify {
721    poll: Poller,
722}
723
724impl Notify {
725    fn new(poll: Poller) -> Self {
726        Self { poll }
727    }
728
729    /// Notify the inner driver.
730    pub fn notify(&self) -> io::Result<()> {
731        self.poll.notify()
732    }
733}
734
735impl Wake for Notify {
736    fn wake(self: Arc<Self>) {
737        self.wake_by_ref();
738    }
739
740    fn wake_by_ref(self: &Arc<Self>) {
741        self.notify().ok();
742    }
743}