1#![cfg_attr(docsrs, feature(doc_cfg))]
6#![cfg_attr(feature = "once_cell_try", feature(once_cell_try))]
7#![allow(unused_features)]
8#![warn(missing_docs)]
9#![deny(rustdoc::broken_intra_doc_links)]
10#![doc(
11 html_logo_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
12)]
13#![doc(
14 html_favicon_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
15)]
16
17use std::{
18 io,
19 num::NonZero,
20 task::{Poll, Waker},
21 time::Duration,
22};
23
24use compio_buf::BufResult;
25use compio_log::instrument;
26
27mod control;
28mod macros;
29mod panic;
30
31mod key;
32pub use key::Key;
33
34mod asyncify;
35pub use asyncify::*;
36
37mod fd;
38pub use fd::*;
39
40mod driver_type;
41pub use driver_type::*;
42
43mod sys;
44pub use sys::{op, *};
45
46mod cancel;
47pub use cancel::*;
48
49mod buffer_pool;
50pub use buffer_pool::{BoxAllocator, BufferAllocator, BufferPool, BufferRef};
51
52use crate::{
53 buffer_pool::{BufferAlloc, BufferPoolRoot},
54 key::ErasedKey,
55 panic::resume_unwind_io,
56 sys::op::OpCodeFlag,
57};
58
59#[derive(Debug)]
61pub enum PushEntry<K, R> {
62 Pending(K),
64 Ready(R),
66}
67
68impl<K, R> PushEntry<K, R> {
69 pub const fn is_ready(&self) -> bool {
71 matches!(self, Self::Ready(_))
72 }
73
74 pub fn take_ready(self) -> Option<R> {
76 match self {
77 Self::Pending(_) => None,
78 Self::Ready(res) => Some(res),
79 }
80 }
81
82 pub fn map_pending<L>(self, f: impl FnOnce(K) -> L) -> PushEntry<L, R> {
84 match self {
85 Self::Pending(k) => PushEntry::Pending(f(k)),
86 Self::Ready(r) => PushEntry::Ready(r),
87 }
88 }
89
90 pub fn map_ready<S>(self, f: impl FnOnce(R) -> S) -> PushEntry<K, S> {
92 match self {
93 Self::Pending(k) => PushEntry::Pending(k),
94 Self::Ready(r) => PushEntry::Ready(f(r)),
95 }
96 }
97}
98
99pub struct Proactor {
102 driver: Driver,
103 buffer_pool: BufferPoolState,
104}
105
106enum BufferPoolState {
107 Uninit {
108 allocator: BufferAlloc,
109 num_of_bufs: u16,
110 buffer_len: usize,
111 flags: u16,
112 },
113 Init(BufferPoolRoot),
114}
115
116impl BufferPoolState {
117 fn get(&mut self, driver: &mut Driver) -> io::Result<BufferPool> {
118 loop {
119 match self {
120 BufferPoolState::Uninit {
121 allocator,
122 num_of_bufs,
123 buffer_len,
124 flags,
125 } => {
126 *self = BufferPoolState::Init(BufferPoolRoot::new(
127 driver,
128 *allocator,
129 *num_of_bufs,
130 *buffer_len,
131 *flags,
132 )?);
133 }
134 BufferPoolState::Init(root) => return Ok(root.get_pool()),
135 }
136 }
137 }
138}
139
140impl Drop for Proactor {
141 fn drop(&mut self) {
142 let BufferPoolState::Init(buffer_pool) = &mut self.buffer_pool else {
143 return;
144 };
145 debug_assert!(buffer_pool.is_unique()); _ = unsafe { buffer_pool.release(&mut self.driver) };
147 }
148}
149
150assert_not_impl!(Proactor, Send);
151assert_not_impl!(Proactor, Sync);
152
153impl Proactor {
154 pub fn new() -> io::Result<Self> {
156 Self::builder().build()
157 }
158
159 pub fn builder() -> ProactorBuilder {
161 ProactorBuilder::new()
162 }
163
164 fn with_builder(builder: &ProactorBuilder) -> io::Result<Self> {
165 Ok(Self {
166 driver: Driver::new(builder)?,
167 buffer_pool: BufferPoolState::Uninit {
168 allocator: builder.buffer_pool_allocator,
169 num_of_bufs: builder.buffer_pool_size,
170 buffer_len: builder.buffer_pool_buffer_len,
171 flags: builder.buffer_pool_flag,
172 },
173 })
174 }
175
176 pub fn default_extra(&self) -> Extra {
178 Extra::new(&self.driver)
179 }
180
181 pub fn driver_type(&self) -> DriverType {
183 self.driver.driver_type()
184 }
185
186 pub fn attach(&mut self, fd: RawFd) -> io::Result<()> {
194 self.driver.attach(fd)
195 }
196
197 pub fn cancel<T: OpCode>(&mut self, key: Key<T>) -> Option<BufResult<usize, T>> {
204 instrument!(compio_log::Level::DEBUG, "cancel", ?key);
205 if key.set_cancelled() {
206 return None;
207 }
208 if key.is_unique() && key.has_result() {
209 let (res, buf) = key.take_result().into_parts();
210 Some(BufResult(resume_unwind_io(res), buf))
211 } else {
212 self.driver.cancel(key.erase());
213 None
214 }
215 }
216
217 pub fn cancel_token(&mut self, token: Cancel) -> bool {
225 instrument!(compio_log::Level::DEBUG, "cancel_token", ?token);
226
227 let Some(key) = token.upgrade() else {
228 return false;
229 };
230 if key.set_cancelled() || key.has_result() {
231 return false;
232 }
233 self.driver.cancel(key);
234 true
235 }
236
237 pub fn register_cancel<T: OpCode>(&mut self, key: &Key<T>) -> Cancel {
246 Cancel::new(key)
247 }
248
249 pub fn push<T: sys::OpCode + 'static>(
252 &mut self,
253 op: T,
254 ) -> PushEntry<Key<T>, BufResult<usize, T>> {
255 self.push_with_extra(op, self.default_extra())
256 }
257
258 pub fn push_with_extra<T: sys::OpCode + 'static>(
261 &mut self,
262 op: T,
263 extra: Extra,
264 ) -> PushEntry<Key<T>, BufResult<usize, T>> {
265 let key = Key::new(op, extra, self.driver_type());
266 match self.driver.push(key.clone().erase()) {
267 Poll::Pending => PushEntry::Pending(key),
268 Poll::Ready(res) => {
269 key.set_result(res);
270 PushEntry::Ready(key.take_result())
271 }
272 }
273 }
274
275 pub fn flush(&mut self) -> bool {
288 self.driver.flush()
289 }
290
291 pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
295 self.driver.poll(timeout)
296 }
297
298 pub fn pop<T: OpCode>(&mut self, key: Key<T>) -> PushEntry<Key<T>, BufResult<usize, T>> {
305 instrument!(compio_log::Level::DEBUG, "pop", ?key);
306 if key.has_result() {
307 let (res, buf) = key.take_result().into_parts();
308 PushEntry::Ready(BufResult(resume_unwind_io(res), buf))
309 } else {
310 PushEntry::Pending(key)
311 }
312 }
313
314 pub fn pop_with_extra<T: OpCode>(
322 &mut self,
323 key: Key<T>,
324 ) -> PushEntry<Key<T>, (BufResult<usize, T>, Extra)> {
325 instrument!(compio_log::Level::DEBUG, "pop", ?key);
326 if key.has_result() {
327 let extra = key.swap_extra(self.default_extra());
328 let (res, buf) = key.take_result().into_parts();
329 PushEntry::Ready((BufResult(resume_unwind_io(res), buf), extra))
330 } else {
331 PushEntry::Pending(key)
332 }
333 }
334
335 pub fn pop_multishot<T: OpCode>(&mut self, key: &Key<T>) -> Option<BufResult<usize, Extra>> {
339 instrument!(compio_log::Level::DEBUG, "pop_multishot", ?key);
340 self.driver.pop_multishot(key)
341 }
342
343 pub fn update_waker<T>(&mut self, op: &Key<T>, waker: &Waker) {
345 op.set_waker(waker);
346 }
347
348 pub fn waker(&self) -> Waker {
350 self.driver.waker()
351 }
352
353 pub fn register_files(&self, fds: &[RawFd]) -> io::Result<()> {
360 fn unsupported(_: &[RawFd]) -> io::Error {
361 io::Error::new(
362 io::ErrorKind::Unsupported,
363 "Fixed-file registration is only supported on io-uring driver",
364 )
365 }
366
367 #[cfg(io_uring)]
368 match self.driver.as_iour() {
369 Some(iour) => iour.register_files(fds),
370 None => Err(unsupported(fds)),
371 }
372
373 #[cfg(not(io_uring))]
374 Err(unsupported(fds))
375 }
376
377 pub fn unregister_files(&self) -> io::Result<()> {
384 fn unsupported() -> io::Error {
385 io::Error::new(
386 io::ErrorKind::Unsupported,
387 "Fixed-file unregistration is only supported on io-uring driver",
388 )
389 }
390
391 #[cfg(io_uring)]
392 match self.driver.as_iour() {
393 Some(iour) => iour.unregister_files(),
394 None => Err(unsupported()),
395 }
396
397 #[cfg(not(io_uring))]
398 Err(unsupported())
399 }
400
401 pub fn register_personality(&self) -> io::Result<u16> {
413 fn unsupported() -> io::Error {
414 io::Error::new(
415 io::ErrorKind::Unsupported,
416 "Personality is only supported on io-uring driver",
417 )
418 }
419
420 #[cfg(io_uring)]
421 match self.driver.as_iour() {
422 Some(iour) => iour.register_personality(),
423 None => Err(unsupported()),
424 }
425
426 #[cfg(not(io_uring))]
427 Err(unsupported())
428 }
429
430 pub fn unregister_personality(&self, personality: u16) -> io::Result<()> {
439 fn unsupported(_: u16) -> io::Error {
440 io::Error::new(
441 io::ErrorKind::Unsupported,
442 "Personality is only supported on io-uring driver",
443 )
444 }
445
446 #[cfg(io_uring)]
447 match self.driver.as_iour() {
448 Some(iour) => iour.unregister_personality(personality),
449 None => Err(unsupported(personality)),
450 }
451
452 #[cfg(not(io_uring))]
453 Err(unsupported(personality))
454 }
455
456 pub fn buffer_pool(&mut self) -> io::Result<BufferPool> {
461 self.buffer_pool.get(&mut self.driver)
462 }
463}
464
465impl AsRawFd for Proactor {
466 fn as_raw_fd(&self) -> RawFd {
467 self.driver.as_raw_fd()
468 }
469}
470
471#[derive(Debug)]
476pub(crate) struct Entry {
477 key: ErasedKey,
478 result: io::Result<usize>,
479
480 #[cfg(io_uring)]
481 flags: u32,
482}
483
484unsafe impl Send for Entry {}
485unsafe impl Sync for Entry {}
486
487impl Entry {
488 pub(crate) fn new(key: ErasedKey, result: io::Result<usize>) -> Self {
489 #[cfg(not(io_uring))]
490 {
491 Self { key, result }
492 }
493 #[cfg(io_uring)]
494 {
495 Self {
496 key,
497 result,
498 flags: 0,
499 }
500 }
501 }
502
503 #[allow(dead_code)]
504 pub fn user_data(&self) -> usize {
505 self.key.as_raw()
506 }
507
508 #[allow(dead_code)]
509 pub fn into_key(self) -> ErasedKey {
510 self.key
511 }
512
513 #[cfg(io_uring)]
514 pub fn flags(&self) -> u32 {
515 self.flags
516 }
517
518 #[cfg(io_uring)]
519 pub(crate) fn set_flags(&mut self, flags: u32) {
520 self.flags = flags;
521 }
522
523 pub fn notify(self) {
524 #[cfg(io_uring)]
525 self.key.borrow().extra_mut().set_flags(self.flags());
526 self.key.set_result(self.result);
527 }
528}
529
530#[derive(Debug, Clone)]
531enum ThreadPoolBuilder {
532 Create { limit: usize, recv_limit: Duration },
533 Reuse(AsyncifyPool),
534}
535
536impl Default for ThreadPoolBuilder {
537 fn default() -> Self {
538 Self::new()
539 }
540}
541
542impl ThreadPoolBuilder {
543 pub fn new() -> Self {
544 Self::Create {
545 limit: 256,
546 recv_limit: Duration::from_secs(60),
547 }
548 }
549
550 pub fn create_or_reuse(&self) -> AsyncifyPool {
551 match self {
552 Self::Create { limit, recv_limit } => AsyncifyPool::new(*limit, *recv_limit),
553 Self::Reuse(pool) => pool.clone(),
554 }
555 }
556}
557
558#[derive(Debug, Clone)]
560pub struct ProactorBuilder {
561 capacity: u32,
562 pool_builder: ThreadPoolBuilder,
563 sqpoll_idle: Option<Duration>,
564 sqpoll_cpu: Option<u32>,
565 cqsize: Option<u32>,
566 single_issuer: bool,
567 coop_taskrun: bool,
568 taskrun_flag: bool,
569 defer_taskrun: bool,
570 eventfd: Option<RawFd>,
571 driver_type: Option<DriverType>,
572 op_flags: OpCodeFlag,
573 buffer_pool_size: u16,
574 buffer_pool_flag: u16,
575 buffer_pool_buffer_len: usize,
576 buffer_pool_allocator: BufferAlloc,
577}
578
579unsafe impl Send for ProactorBuilder {}
581unsafe impl Sync for ProactorBuilder {}
582
583impl Default for ProactorBuilder {
584 fn default() -> Self {
585 Self::new()
586 }
587}
588
589impl ProactorBuilder {
590 pub fn new() -> Self {
592 Self {
593 capacity: 1024,
594 pool_builder: ThreadPoolBuilder::new(),
595 sqpoll_idle: None,
596 sqpoll_cpu: None,
597 cqsize: None,
598 single_issuer: false,
599 coop_taskrun: false,
600 taskrun_flag: false,
601 defer_taskrun: false,
602 eventfd: None,
603 driver_type: None,
604 op_flags: OpCodeFlag::empty(),
605 buffer_pool_size: 8,
606 buffer_pool_flag: 0,
607 buffer_pool_buffer_len: 8192,
608 buffer_pool_allocator: BufferAlloc::new::<BoxAllocator>(),
609 }
610 }
611
612 pub fn capacity(&mut self, capacity: u32) -> &mut Self {
615 self.capacity = capacity;
616 self
617 }
618
619 pub fn cqsize(&mut self, cqsize: u32) -> &mut Self {
622 self.cqsize = Some(cqsize);
623 self
624 }
625
626 pub fn thread_pool_limit(&mut self, value: usize) -> &mut Self {
636 if let ThreadPoolBuilder::Create { limit, .. } = &mut self.pool_builder {
637 *limit = value;
638 }
639 self
640 }
641
642 pub fn thread_pool_recv_timeout(&mut self, timeout: Duration) -> &mut Self {
647 if let ThreadPoolBuilder::Create { recv_limit, .. } = &mut self.pool_builder {
648 *recv_limit = timeout;
649 }
650 self
651 }
652
653 pub fn reuse_thread_pool(&mut self, pool: AsyncifyPool) -> &mut Self {
655 self.pool_builder = ThreadPoolBuilder::Reuse(pool);
656 self
657 }
658
659 pub fn force_reuse_thread_pool(&mut self) -> &mut Self {
662 self.reuse_thread_pool(self.create_or_get_thread_pool());
663 self
664 }
665
666 pub fn create_or_get_thread_pool(&self) -> AsyncifyPool {
668 self.pool_builder.create_or_reuse()
669 }
670
671 pub fn sqpoll_idle(&mut self, idle: Duration) -> &mut Self {
681 self.sqpoll_idle = Some(idle);
682 self
683 }
684
685 pub fn sqpoll_cpu(&mut self, cpu: u32) -> &mut Self {
697 self.sqpoll_cpu = Some(cpu);
698 self
699 }
700
701 pub fn single_issuer(&mut self, enable: bool) -> &mut Self {
708 self.single_issuer = enable;
709 self
710 }
711
712 pub fn coop_taskrun(&mut self, enable: bool) -> &mut Self {
722 self.coop_taskrun = enable;
723 self
724 }
725
726 pub fn taskrun_flag(&mut self, enable: bool) -> &mut Self {
736 self.taskrun_flag = enable;
737 self
738 }
739
740 pub fn defer_taskrun(&mut self, enable: bool) -> &mut Self {
752 self.defer_taskrun = enable;
753 self
754 }
755
756 pub fn register_eventfd(&mut self, fd: RawFd) -> &mut Self {
762 self.eventfd = Some(fd);
763 self
764 }
765
766 pub fn detect_opcode_support(&mut self, flags: OpCodeFlag) -> &mut Self {
784 self.op_flags = flags;
785 self
786 }
787
788 pub fn driver_type(&mut self, t: DriverType) -> &mut Self {
792 self.driver_type = Some(t);
793 self
794 }
795
796 pub fn buffer_pool_size(&mut self, size: NonZero<u16>) -> &mut Self {
802 self.buffer_pool_size = size.get();
803 self
804 }
805
806 pub fn buffer_pool_flag(&mut self, flag: u16) -> &mut Self {
812 self.buffer_pool_flag = flag;
813 self
814 }
815
816 pub fn buffer_pool_buffer_len(&mut self, size: usize) -> &mut Self {
820 self.buffer_pool_buffer_len = size;
821 self
822 }
823
824 pub fn buffer_pool_allocator<A: BufferAllocator>(&mut self) -> &mut Self {
841 self.buffer_pool_allocator = BufferAlloc::new::<A>();
842 self
843 }
844
845 pub fn build(&self) -> io::Result<Proactor> {
847 Proactor::with_builder(self)
848 }
849}
850
851mod seal {
852 use std::io;
853
854 use compio_buf::BufResult;
855
856 pub(crate) trait Seal {}
857
858 impl Seal for io::Error {}
859 impl<T> Seal for io::Result<T> {}
860 impl<T, B> Seal for BufResult<T, B> {}
861}
862
863#[allow(private_bounds)]
865pub trait ErrorExt: seal::Seal {
866 #[doc(hidden)]
867 fn as_io_error(&self) -> Option<&io::Error>;
868
869 fn is_cancelled(&self) -> bool {
871 #[cfg(unix)]
872 const CANCEL_ERROR: i32 = libc::ECANCELED;
873 #[cfg(windows)]
874 const CANCEL_ERROR: i32 = windows_sys::Win32::Foundation::ERROR_OPERATION_ABORTED as _;
875
876 self.as_io_error()
877 .and_then(io::Error::raw_os_error)
878 .is_some_and(|e| e == CANCEL_ERROR)
879 }
880}
881
882impl ErrorExt for io::Error {
883 fn as_io_error(&self) -> Option<&io::Error> {
884 Some(self)
885 }
886}
887
888impl<T> ErrorExt for io::Result<T> {
889 fn as_io_error(&self) -> Option<&io::Error> {
890 self.as_ref().err()
891 }
892}
893
894impl<T, B> ErrorExt for BufResult<T, B> {
895 fn as_io_error(&self) -> Option<&io::Error> {
896 self.0.as_io_error()
897 }
898}