1#[cfg_attr(all(doc, docsrs), doc(cfg(all())))]
2#[allow(unused_imports)]
3pub use std::os::fd::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
4#[cfg(aio)]
5use std::ptr::NonNull;
6use std::{
7 collections::{HashMap, VecDeque},
8 io,
9 num::NonZeroUsize,
10 pin::Pin,
11 sync::Arc,
12 task::{Poll, Wake, Waker},
13 time::Duration,
14};
15
16use compio_buf::BufResult;
17use compio_log::{instrument, trace};
18use flume::{Receiver, Sender};
19use polling::{Event, Events, Poller};
20use smallvec::SmallVec;
21
22use crate::{
23 AsyncifyPool, BufferPool, DriverType, Entry, ErasedKey, ProactorBuilder,
24 key::{BorrowedKey, RefExt},
25 op::Interest,
26 syscall,
27};
28
29mod extra;
30pub(in crate::sys) use extra::Extra;
31pub(crate) mod op;
32
33struct Track {
34 arg: WaitArg,
35 ready: bool,
36}
37
38impl From<WaitArg> for Track {
39 fn from(arg: WaitArg) -> Self {
40 Self { arg, ready: false }
41 }
42}
43
44pub unsafe trait OpCode {
53 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision>;
56
57 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
59 None
60 }
61
62 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>>;
66}
67
68pub use OpCode as PollOpCode;
69
70type Multi<T> = SmallVec<[T; 1]>;
72
73#[non_exhaustive]
75pub enum Decision {
76 Completed(usize),
78 Wait(Multi<WaitArg>),
80 Blocking,
82 #[cfg(aio)]
84 Aio(AioControl),
85}
86
87impl Decision {
88 pub fn wait_for(fd: RawFd, interest: Interest) -> Self {
90 Self::Wait(SmallVec::from_buf([WaitArg { fd, interest }]))
91 }
92
93 pub fn wait_for_many<I: IntoIterator<Item = WaitArg>>(args: I) -> Self {
95 Self::Wait(Multi::from_iter(args))
96 }
97
98 pub fn wait_readable(fd: RawFd) -> Self {
100 Self::wait_for(fd, Interest::Readable)
101 }
102
103 pub fn wait_writable(fd: RawFd) -> Self {
105 Self::wait_for(fd, Interest::Writable)
106 }
107
108 #[cfg(aio)]
110 pub fn aio(
111 cb: &mut libc::aiocb,
112 submit: unsafe extern "C" fn(*mut libc::aiocb) -> i32,
113 ) -> Self {
114 Self::Aio(AioControl {
115 aiocbp: NonNull::from(cb),
116 submit,
117 })
118 }
119}
120
121#[derive(Debug, Clone, Copy)]
123pub struct WaitArg {
124 pub fd: RawFd,
126 pub interest: Interest,
128}
129
130impl WaitArg {
131 pub fn readable(fd: RawFd) -> Self {
133 Self {
134 fd,
135 interest: Interest::Readable,
136 }
137 }
138
139 pub fn writable(fd: RawFd) -> Self {
141 Self {
142 fd,
143 interest: Interest::Writable,
144 }
145 }
146}
147
148#[cfg(aio)]
150#[derive(Debug, Clone, Copy)]
151pub struct AioControl {
152 pub aiocbp: NonNull<libc::aiocb>,
154 pub submit: unsafe extern "C" fn(*mut libc::aiocb) -> i32,
156}
157
158#[derive(Debug, Default)]
159struct FdQueue {
160 read_queue: VecDeque<ErasedKey>,
161 write_queue: VecDeque<ErasedKey>,
162}
163
164struct RemoveToken {
171 idx: usize,
172 is_read: bool,
173}
174
175impl RemoveToken {
176 fn read(idx: usize) -> Self {
177 Self { idx, is_read: true }
178 }
179
180 fn write(idx: usize) -> Self {
181 Self {
182 idx,
183 is_read: false,
184 }
185 }
186}
187
188impl FdQueue {
189 fn is_empty(&self) -> bool {
190 self.read_queue.is_empty() && self.write_queue.is_empty()
191 }
192
193 fn remove_token(&mut self, token: RemoveToken) -> Option<ErasedKey> {
194 if token.is_read {
195 self.read_queue.remove(token.idx)
196 } else {
197 self.write_queue.remove(token.idx)
198 }
199 }
200
201 pub fn push_back_interest(&mut self, key: ErasedKey, interest: Interest) -> RemoveToken {
202 match interest {
203 Interest::Readable => {
204 self.read_queue.push_back(key);
205 RemoveToken::read(self.read_queue.len() - 1)
206 }
207 Interest::Writable => {
208 self.write_queue.push_back(key);
209 RemoveToken::write(self.write_queue.len() - 1)
210 }
211 }
212 }
213
214 pub fn push_front_interest(&mut self, key: ErasedKey, interest: Interest) -> RemoveToken {
215 let is_read = match interest {
216 Interest::Readable => {
217 self.read_queue.push_front(key);
218 true
219 }
220 Interest::Writable => {
221 self.write_queue.push_front(key);
222 false
223 }
224 };
225 RemoveToken { idx: 0, is_read }
226 }
227
228 pub fn remove(&mut self, key: &ErasedKey) {
229 self.read_queue.retain(|k| k != key);
230 self.write_queue.retain(|k| k != key);
231 }
232
233 pub fn event(&self) -> Event {
234 let mut event = Event::none(0);
235 if let Some(key) = self.read_queue.front() {
236 event.readable = true;
237 event.key = key.as_raw();
238 }
239 if let Some(key) = self.write_queue.front() {
240 event.writable = true;
241 event.key = key.as_raw();
242 }
243 event
244 }
245
246 pub fn pop_interest(&mut self, event: &Event) -> Option<(ErasedKey, Interest)> {
247 if event.readable
248 && let Some(key) = self.read_queue.pop_front()
249 {
250 return Some((key, Interest::Readable));
251 }
252 if event.writable
253 && let Some(key) = self.write_queue.pop_front()
254 {
255 return Some((key, Interest::Writable));
256 }
257 None
258 }
259}
260
261#[non_exhaustive]
264pub enum OpType {
265 Fd(Multi<RawFd>),
267 #[cfg(aio)]
269 Aio(NonNull<libc::aiocb>),
270}
271
272impl OpType {
273 pub fn fd(fd: RawFd) -> Self {
275 Self::Fd(SmallVec::from_buf([fd]))
276 }
277
278 pub fn multi_fd<I: IntoIterator<Item = RawFd>>(fds: I) -> Self {
280 Self::Fd(Multi::from_iter(fds))
281 }
282}
283
284pub(crate) struct Driver {
286 events: Events,
287 notify: Arc<Notify>,
288 registry: HashMap<RawFd, FdQueue>,
289 pool: AsyncifyPool,
290 completed_tx: Sender<Entry>,
291 completed_rx: Receiver<Entry>,
292}
293
294impl Driver {
295 pub fn new(builder: &ProactorBuilder) -> io::Result<Self> {
296 instrument!(compio_log::Level::TRACE, "new", ?builder);
297 trace!("new poll driver");
298
299 let events = if let Some(cap) = NonZeroUsize::new(builder.capacity as _) {
300 Events::with_capacity(cap)
301 } else {
302 Events::new()
303 };
304 let poll = Poller::new()?;
305 let notify = Arc::new(Notify::new(poll));
306 let (completed_tx, completed_rx) = flume::unbounded();
307
308 Ok(Self {
309 events,
310 notify,
311 registry: HashMap::new(),
312 pool: builder.create_or_get_thread_pool(),
313 completed_tx,
314 completed_rx,
315 })
316 }
317
318 pub fn driver_type(&self) -> DriverType {
319 DriverType::Poll
320 }
321
322 pub(in crate::sys) fn default_extra(&self) -> Extra {
323 Extra::new()
324 }
325
326 fn poller(&self) -> &Poller {
327 &self.notify.poll
328 }
329
330 fn with_events<F, R>(&mut self, f: F) -> R
331 where
332 F: FnOnce(&mut Self, &mut Events) -> R,
333 {
334 let mut events = std::mem::take(&mut self.events);
335 let res = f(self, &mut events);
336 self.events = events;
337 res
338 }
339
340 fn try_get_queue(&mut self, fd: RawFd) -> Option<&mut FdQueue> {
341 self.registry.get_mut(&fd)
342 }
343
344 fn get_queue(&mut self, fd: RawFd) -> &mut FdQueue {
345 self.try_get_queue(fd).expect("the fd should be submitted")
346 }
347
348 unsafe fn submit(&mut self, key: ErasedKey, arg: WaitArg) -> io::Result<()> {
353 let Self {
354 registry, notify, ..
355 } = self;
356 let need_add = !registry.contains_key(&arg.fd);
357 let queue = registry.entry(arg.fd).or_default();
358 let token = queue.push_back_interest(key, arg.interest);
359 let event = queue.event();
360 let res = if need_add {
361 unsafe { notify.poll.add(arg.fd, event) }
363 } else {
364 let fd = unsafe { BorrowedFd::borrow_raw(arg.fd) };
365 notify.poll.modify(fd, event)
366 };
367 if res.is_err() {
368 queue.remove_token(token);
370 if queue.is_empty() {
371 registry.remove(&arg.fd);
372 }
373 }
374
375 res
376 }
377
378 unsafe fn submit_front(&mut self, key: ErasedKey, arg: WaitArg) -> io::Result<()> {
383 let need_add = !self.registry.contains_key(&arg.fd);
384 let queue = self.registry.entry(arg.fd).or_default();
385 queue.push_front_interest(key, arg.interest);
386 let event = queue.event();
387 if need_add {
388 unsafe { self.poller().add(arg.fd, event)? }
390 } else {
391 let fd = unsafe { BorrowedFd::borrow_raw(arg.fd) };
392 self.poller().modify(fd, event)?;
393 }
394 Ok(())
395 }
396
397 fn renew(&mut self, fd: BorrowedFd, renew_event: Event) -> io::Result<()> {
398 if !renew_event.readable && !renew_event.writable {
399 self.poller().delete(fd)?;
400 self.registry.remove(&fd.as_raw_fd());
401 } else {
402 self.poller().modify(fd, renew_event)?;
403 }
404 Ok(())
405 }
406
407 fn remove_one(&mut self, key: &ErasedKey, fd: RawFd) -> io::Result<()> {
409 let Some(queue) = self.try_get_queue(fd) else {
410 return Ok(());
411 };
412 queue.remove(key);
413 let renew_event = queue.event();
414 if queue.is_empty() {
415 self.registry.remove(&fd);
416 }
417 self.renew(unsafe { BorrowedFd::borrow_raw(fd) }, renew_event)
418 }
419
420 fn cancel_one(&mut self, key: ErasedKey, fd: RawFd) -> Option<Entry> {
422 self.remove_one(&key, fd)
423 .map_or(None, |_| Some(Entry::new_cancelled(key)))
424 }
425
426 pub fn attach(&mut self, _fd: RawFd) -> io::Result<()> {
427 Ok(())
428 }
429
430 pub fn cancel(&mut self, key: ErasedKey) {
431 let op_type = key.borrow().pinned_op().op_type();
432 match op_type {
433 None => {}
434 Some(OpType::Fd(fds)) => {
435 let mut pushed = false;
436 for fd in fds {
437 let entry = self.cancel_one(key.clone(), fd);
438 if !pushed && let Some(entry) = entry {
439 _ = self.completed_tx.send(entry);
440 pushed = true;
441 }
442 }
443 }
444 #[cfg(aio)]
445 Some(OpType::Aio(aiocbp)) => {
446 let aiocb = unsafe { aiocbp.as_ref() };
447 let fd = aiocb.aio_fildes;
448 syscall!(libc::aio_cancel(fd, aiocbp.as_ptr())).ok();
449 }
450 }
451 }
452
453 pub fn push(&mut self, key: ErasedKey) -> Poll<io::Result<usize>> {
454 instrument!(compio_log::Level::TRACE, "push", ?key);
455 match { key.borrow().pinned_op().pre_submit()? } {
456 Decision::Wait(args) => {
457 key.borrow()
458 .extra_mut()
459 .as_poll_mut()
460 .set_args(args.clone());
461 for arg in args.iter().copied() {
462 let res = unsafe { self.submit(key.clone(), arg) };
464 if let Err(e) = res {
466 args.into_iter().for_each(|arg| {
467 let _ = self.remove_one(&key, arg.fd);
469 });
470 return Poll::Ready(Err(e));
471 }
472 trace!("register {:?}", arg);
473 }
474 Poll::Pending
475 }
476 Decision::Completed(res) => Poll::Ready(Ok(res)),
477 Decision::Blocking => {
478 self.push_blocking(key);
479 Poll::Pending
480 }
481 #[cfg(aio)]
482 Decision::Aio(AioControl { mut aiocbp, submit }) => {
483 let aiocb = unsafe { aiocbp.as_mut() };
484 let user_data = key.as_raw();
485 #[cfg(freebsd)]
486 {
487 aiocb.aio_sigevent.sigev_signo = self.as_raw_fd();
489 aiocb.aio_sigevent.sigev_notify = libc::SIGEV_KEVENT;
490 aiocb.aio_sigevent.sigev_value.sival_ptr = user_data as _;
491 }
492 #[cfg(solarish)]
493 let mut notify = libc::port_notify {
494 portnfy_port: self.as_raw_fd(),
495 portnfy_user: user_data as _,
496 };
497 #[cfg(solarish)]
498 {
499 aiocb.aio_sigevent.sigev_notify = libc::SIGEV_PORT;
500 aiocb.aio_sigevent.sigev_value.sival_ptr = &mut notify as *mut _ as _;
501 }
502 match syscall!(submit(aiocbp.as_ptr())) {
503 Ok(_) => {
504 key.into_raw();
506 Poll::Pending
507 }
508 Err(e)
516 if matches!(
517 e.raw_os_error(),
518 Some(libc::EOPNOTSUPP) | Some(libc::EAGAIN)
519 ) =>
520 {
521 self.push_blocking(key);
522 Poll::Pending
523 }
524 Err(e) => Poll::Ready(Err(e)),
525 }
526 }
527 }
528 }
529
530 fn push_blocking(&mut self, key: ErasedKey) {
531 let waker = self.waker();
532 let completed = self.completed_tx.clone();
533 let mut key = unsafe { key.freeze() };
535
536 let mut closure = move || {
537 let poll = key.pinned_op().operate();
538 let res = match poll {
539 Poll::Pending => unreachable!("this operation is not non-blocking"),
540 Poll::Ready(res) => res,
541 };
542 let _ = completed.send(Entry::new(key.into_inner(), res));
543 waker.wake();
544 };
545
546 while let Err(e) = self.pool.dispatch(closure) {
547 closure = e.0;
548 self.poll_completed();
549 }
550 }
551
552 fn poll_completed(&mut self) -> bool {
553 let mut ret = false;
554 while let Ok(entry) = self.completed_rx.try_recv() {
555 entry.notify();
556 ret = true;
557 }
558 ret
559 }
560
561 #[allow(clippy::blocks_in_conditions)]
562 fn poll_one(&mut self, event: Event, fd: RawFd) -> io::Result<()> {
563 let queue = self.get_queue(fd);
564
565 if let Some((key, _)) = queue.pop_interest(&event)
566 && let mut op = key.borrow()
567 && op.extra_mut().as_poll_mut().handle_event(fd)
568 {
569 match { op.pinned_op().operate() } {
571 Poll::Pending => {
573 let extra = op.extra_mut().as_poll_mut();
574 extra.reset();
575 for t in extra.track.iter() {
577 let res = unsafe { self.submit_front(key.clone(), t.arg) };
578 if let Err(e) = res {
579 for t in extra.track.iter() {
581 let _ = self.remove_one(&key, t.arg.fd);
582 }
583 return Err(e);
584 }
585 }
586 }
587 Poll::Ready(res) => {
588 drop(op);
589 Entry::new(key, res).notify()
590 }
591 };
592 }
593
594 let renew_event = self.get_queue(fd).event();
595 let fd = unsafe { BorrowedFd::borrow_raw(fd) };
596 self.renew(fd, renew_event)
597 }
598
599 pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
600 instrument!(compio_log::Level::TRACE, "poll", ?timeout);
601 if self.poll_completed() {
602 return Ok(());
603 }
604 self.events.clear();
605 self.notify.poll.wait(&mut self.events, timeout)?;
606 if self.events.is_empty() && timeout.is_some() {
607 return Err(io::Error::from_raw_os_error(libc::ETIMEDOUT));
608 }
609 self.with_events(|this, events| {
610 for event in events.iter() {
611 trace!("receive {} for {:?}", event.key, event);
612 let key = unsafe { BorrowedKey::from_raw(event.key) };
614 let mut op = key.borrow();
615 let op_type = op.pinned_op().op_type();
616 match op_type {
617 None => {
618 trace!("op {} is completed", event.key);
621 }
622 Some(OpType::Fd(_)) => {
623 let Some(fd) = op.extra().as_poll().next_fd() else {
625 return Ok(());
626 };
627 drop(op);
628 this.poll_one(event, fd)?;
629 }
630 #[cfg(aio)]
631 Some(OpType::Aio(aiocbp)) => {
632 drop(op);
633 let err = unsafe { libc::aio_error(aiocbp.as_ptr()) };
634 let res = match err {
635 libc::EINPROGRESS => {
640 trace!("op {} is not completed", key.as_raw());
641 continue;
642 }
643 libc::ECANCELED => {
644 unsafe { libc::aio_return(aiocbp.as_ptr()) };
646 Err(io::Error::from_raw_os_error(libc::ETIMEDOUT))
647 }
648 _ => {
649 syscall!(libc::aio_return(aiocbp.as_ptr())).map(|res| res as usize)
650 }
651 };
652 let key = unsafe { ErasedKey::from_raw(event.key) };
653 Entry::new(key, res).notify()
654 }
655 }
656 }
657
658 Ok(())
659 })
660 }
661
662 pub fn waker(&self) -> Waker {
663 Waker::from(self.notify.clone())
664 }
665
666 pub fn create_buffer_pool(
667 &mut self,
668 buffer_len: u16,
669 buffer_size: usize,
670 ) -> io::Result<BufferPool> {
671 #[cfg(fusion)]
672 {
673 Ok(BufferPool::new_poll(crate::FallbackBufferPool::new(
674 buffer_len,
675 buffer_size,
676 )))
677 }
678 #[cfg(not(fusion))]
679 {
680 Ok(BufferPool::new(buffer_len, buffer_size))
681 }
682 }
683
684 pub unsafe fn release_buffer_pool(&mut self, _: BufferPool) -> io::Result<()> {
688 Ok(())
689 }
690
691 pub fn pop_multishot(&mut self, _: &ErasedKey) -> Option<BufResult<usize, crate::sys::Extra>> {
692 None
693 }
694}
695
696impl AsRawFd for Driver {
697 fn as_raw_fd(&self) -> RawFd {
698 self.poller().as_raw_fd()
699 }
700}
701
702impl Drop for Driver {
703 fn drop(&mut self) {
704 for fd in self.registry.keys() {
705 unsafe {
706 let fd = BorrowedFd::borrow_raw(*fd);
707 self.poller().delete(fd).ok();
708 }
709 }
710 }
711}
712
713impl Entry {
714 pub(crate) fn new_cancelled(key: ErasedKey) -> Self {
715 Entry::new(key, Err(io::Error::from_raw_os_error(libc::ETIMEDOUT)))
716 }
717}
718
719pub(crate) struct Notify {
721 poll: Poller,
722}
723
724impl Notify {
725 fn new(poll: Poller) -> Self {
726 Self { poll }
727 }
728
729 pub fn notify(&self) -> io::Result<()> {
731 self.poll.notify()
732 }
733}
734
735impl Wake for Notify {
736 fn wake(self: Arc<Self>) {
737 self.wake_by_ref();
738 }
739
740 fn wake_by_ref(self: &Arc<Self>) {
741 self.notify().ok();
742 }
743}