Skip to main content

compio_driver/
lib.rs

1//! Platform-specific drivers.
2//!
3//! Some types differ by compilation target.
4
5#![cfg_attr(docsrs, feature(doc_cfg))]
6#![cfg_attr(feature = "once_cell_try", feature(once_cell_try))]
7#![allow(unused_features)]
8#![warn(missing_docs)]
9#![deny(rustdoc::broken_intra_doc_links)]
10#![doc(
11    html_logo_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
12)]
13#![doc(
14    html_favicon_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
15)]
16
17use std::{
18    io,
19    num::NonZero,
20    task::{Poll, Waker},
21    time::Duration,
22};
23
24use compio_buf::BufResult;
25use compio_log::instrument;
26
27mod control;
28mod macros;
29mod panic;
30
31mod key;
32pub use key::Key;
33
34mod asyncify;
35pub use asyncify::*;
36
37mod fd;
38pub use fd::*;
39
40mod driver_type;
41pub use driver_type::*;
42
43mod sys;
44pub use sys::{op, *};
45
46mod cancel;
47pub use cancel::*;
48
49mod buffer_pool;
50pub use buffer_pool::{BoxAllocator, BufferAllocator, BufferPool, BufferRef};
51
52use crate::{
53    buffer_pool::{BufferAlloc, BufferPoolRoot},
54    key::ErasedKey,
55    panic::resume_unwind_io,
56    sys::op::OpCodeFlag,
57};
58
59/// The return type of [`Proactor::push`].
60#[derive(Debug)]
61pub enum PushEntry<K, R> {
62    /// The operation is pushed to the submission queue.
63    Pending(K),
64    /// The operation is ready and returns.
65    Ready(R),
66}
67
68impl<K, R> PushEntry<K, R> {
69    /// Get if the current variant is [`PushEntry::Ready`].
70    pub const fn is_ready(&self) -> bool {
71        matches!(self, Self::Ready(_))
72    }
73
74    /// Take the ready variant if exists.
75    pub fn take_ready(self) -> Option<R> {
76        match self {
77            Self::Pending(_) => None,
78            Self::Ready(res) => Some(res),
79        }
80    }
81
82    /// Map the [`PushEntry::Pending`] branch.
83    pub fn map_pending<L>(self, f: impl FnOnce(K) -> L) -> PushEntry<L, R> {
84        match self {
85            Self::Pending(k) => PushEntry::Pending(f(k)),
86            Self::Ready(r) => PushEntry::Ready(r),
87        }
88    }
89
90    /// Map the [`PushEntry::Ready`] branch.
91    pub fn map_ready<S>(self, f: impl FnOnce(R) -> S) -> PushEntry<K, S> {
92        match self {
93            Self::Pending(k) => PushEntry::Pending(k),
94            Self::Ready(r) => PushEntry::Ready(f(r)),
95        }
96    }
97}
98
99/// Low-level actions of completion-based IO.
100/// It owns the operations to keep the driver safe.
101pub struct Proactor {
102    driver: Driver,
103    buffer_pool: BufferPoolState,
104}
105
106enum BufferPoolState {
107    Uninit {
108        allocator: BufferAlloc,
109        num_of_bufs: u16,
110        buffer_len: usize,
111        flags: u16,
112    },
113    Init(BufferPoolRoot),
114}
115
116impl BufferPoolState {
117    fn get(&mut self, driver: &mut Driver) -> io::Result<BufferPool> {
118        loop {
119            match self {
120                BufferPoolState::Uninit {
121                    allocator,
122                    num_of_bufs,
123                    buffer_len,
124                    flags,
125                } => {
126                    *self = BufferPoolState::Init(BufferPoolRoot::new(
127                        driver,
128                        *allocator,
129                        *num_of_bufs,
130                        *buffer_len,
131                        *flags,
132                    )?);
133                }
134                BufferPoolState::Init(root) => return Ok(root.get_pool()),
135            }
136        }
137    }
138}
139
140impl Drop for Proactor {
141    fn drop(&mut self) {
142        let BufferPoolState::Init(buffer_pool) = &mut self.buffer_pool else {
143            return;
144        };
145        debug_assert!(buffer_pool.is_unique()); // Just in case. Shouldn't happen
146        _ = unsafe { buffer_pool.release(&mut self.driver) };
147    }
148}
149
150assert_not_impl!(Proactor, Send);
151assert_not_impl!(Proactor, Sync);
152
153impl Proactor {
154    /// Create [`Proactor`] with 1024 entries.
155    pub fn new() -> io::Result<Self> {
156        Self::builder().build()
157    }
158
159    /// Create [`ProactorBuilder`] to config the proactor.
160    pub fn builder() -> ProactorBuilder {
161        ProactorBuilder::new()
162    }
163
164    fn with_builder(builder: &ProactorBuilder) -> io::Result<Self> {
165        Ok(Self {
166            driver: Driver::new(builder)?,
167            buffer_pool: BufferPoolState::Uninit {
168                allocator: builder.buffer_pool_allocator,
169                num_of_bufs: builder.buffer_pool_size,
170                buffer_len: builder.buffer_pool_buffer_len,
171                flags: builder.buffer_pool_flag,
172            },
173        })
174    }
175
176    /// Get a default [`Extra`] for underlying driver.
177    pub fn default_extra(&self) -> Extra {
178        Extra::new(&self.driver)
179    }
180
181    /// The current driver type.
182    pub fn driver_type(&self) -> DriverType {
183        self.driver.driver_type()
184    }
185
186    /// Attach an fd to the driver.
187    ///
188    /// ## Platform specific
189    /// * IOCP: it will be attached to the completion port. An fd could only be
190    ///   attached to one driver, and could only be attached once, even if you
191    ///   `try_clone` it.
192    /// * io-uring & polling: it will do nothing but return `Ok(())`.
193    pub fn attach(&mut self, fd: RawFd) -> io::Result<()> {
194        self.driver.attach(fd)
195    }
196
197    /// Cancel an operation with the pushed [`Key`].
198    ///
199    /// Returns the result if the key is unique and the operation is completed.
200    ///
201    /// The cancellation is not reliable. The underlying operation may continue,
202    /// but just don't return from [`Proactor::poll`].
203    pub fn cancel<T: OpCode>(&mut self, key: Key<T>) -> Option<BufResult<usize, T>> {
204        instrument!(compio_log::Level::DEBUG, "cancel", ?key);
205        if key.set_cancelled() {
206            return None;
207        }
208        if key.is_unique() && key.has_result() {
209            let (res, buf) = key.take_result().into_parts();
210            Some(BufResult(resume_unwind_io(res), buf))
211        } else {
212            self.driver.cancel(key.erase());
213            None
214        }
215    }
216
217    /// Cancel an operation with a [`Cancel`] token.
218    ///
219    /// Returns if a cancellation has been issued.
220    ///
221    /// The cancellation is not reliable. The underlying operation may continue,
222    /// but just don't return from [`Proactor::pop`]. This will do nothing if
223    /// the operation has already been completed or cancelled before.
224    pub fn cancel_token(&mut self, token: Cancel) -> bool {
225        instrument!(compio_log::Level::DEBUG, "cancel_token", ?token);
226
227        let Some(key) = token.upgrade() else {
228            return false;
229        };
230        if key.set_cancelled() || key.has_result() {
231            return false;
232        }
233        self.driver.cancel(key);
234        true
235    }
236
237    /// Create a [`Cancel`] that can be used to cancel the operation even
238    /// without the key.
239    ///
240    /// This acts like a weak reference to the [`Key`], but can only be used to
241    /// cancel the operation with [`Proactor::cancel_token`]. Extra copy of
242    /// [`Key`] may cause [`Proactor::pop`] to panic while keys registered
243    /// as [`Cancel`] will not. So this is useful in cases where you're not sure
244    /// if the operation will be cancelled.
245    pub fn register_cancel<T: OpCode>(&mut self, key: &Key<T>) -> Cancel {
246        Cancel::new(key)
247    }
248
249    /// Push an operation into the driver, and return the unique key [`Key`],
250    /// associated with it.
251    pub fn push<T: sys::OpCode + 'static>(
252        &mut self,
253        op: T,
254    ) -> PushEntry<Key<T>, BufResult<usize, T>> {
255        self.push_with_extra(op, self.default_extra())
256    }
257
258    /// Push an operation into the driver with user-defined [`Extra`], and
259    /// return the unique key [`Key`], associated with it.
260    pub fn push_with_extra<T: sys::OpCode + 'static>(
261        &mut self,
262        op: T,
263        extra: Extra,
264    ) -> PushEntry<Key<T>, BufResult<usize, T>> {
265        let key = Key::new(op, extra, self.driver_type());
266        match self.driver.push(key.clone().erase()) {
267            Poll::Pending => PushEntry::Pending(key),
268            Poll::Ready(res) => {
269                key.set_result(res);
270                PushEntry::Ready(key.take_result())
271            }
272        }
273    }
274
275    /// Flush the pushed operations to the kernel. It is roughly equivalent to
276    /// calling [`poll`](Proactor::poll) with `Some(Duration::ZERO)` on
277    /// io-uring, but is only needed if you're waiting on the driver fd with an
278    /// external event loop.
279    ///
280    /// The return value indicates if the driver is in notified state, which
281    /// means the driver is already notified by the user and should not be
282    /// waited infinitely.
283    ///
284    /// This method resets the internal notified state, so that the waker to the
285    /// driver will wake up the driver fd through syscalls after this method is
286    /// called.
287    pub fn flush(&mut self) -> bool {
288        self.driver.flush()
289    }
290
291    /// Poll the driver and get completed entries.
292    /// You need to call [`Proactor::pop`] to get the pushed
293    /// operations.
294    pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
295        self.driver.poll(timeout)
296    }
297
298    /// Get the pushed operations from the completion entries.
299    ///
300    /// # Panics
301    ///
302    /// This function will panic if the [`Key`] is not unique or if the
303    /// operation is blocking and it panicked in the thread pool.
304    pub fn pop<T: OpCode>(&mut self, key: Key<T>) -> PushEntry<Key<T>, BufResult<usize, T>> {
305        instrument!(compio_log::Level::DEBUG, "pop", ?key);
306        if key.has_result() {
307            let (res, buf) = key.take_result().into_parts();
308            PushEntry::Ready(BufResult(resume_unwind_io(res), buf))
309        } else {
310            PushEntry::Pending(key)
311        }
312    }
313
314    /// Get the pushed operations from the completion entries along the
315    /// [`Extra`] associated.
316    ///
317    /// # Panics
318    ///
319    /// This function will panic if the [`Key`] is not unique or if the
320    /// operation is blocking and it panicked in the thread pool.
321    pub fn pop_with_extra<T: OpCode>(
322        &mut self,
323        key: Key<T>,
324    ) -> PushEntry<Key<T>, (BufResult<usize, T>, Extra)> {
325        instrument!(compio_log::Level::DEBUG, "pop", ?key);
326        if key.has_result() {
327            let extra = key.swap_extra(self.default_extra());
328            let (res, buf) = key.take_result().into_parts();
329            PushEntry::Ready((BufResult(resume_unwind_io(res), buf), extra))
330        } else {
331            PushEntry::Pending(key)
332        }
333    }
334
335    /// Get one completion entry for a multishot operation. If it returns
336    /// [`None`], the user should call [`Proactor::pop_with_extra`] to get the
337    /// final result of the operation.
338    pub fn pop_multishot<T: OpCode>(&mut self, key: &Key<T>) -> Option<BufResult<usize, Extra>> {
339        instrument!(compio_log::Level::DEBUG, "pop_multishot", ?key);
340        self.driver.pop_multishot(key)
341    }
342
343    /// Update the waker of the specified op.
344    pub fn update_waker<T>(&mut self, op: &Key<T>, waker: &Waker) {
345        op.set_waker(waker);
346    }
347
348    /// Create a waker to interrupt the inner driver.
349    pub fn waker(&self) -> Waker {
350        self.driver.waker()
351    }
352
353    /// Register file descriptors for fixed-file operations with io_uring.
354    ///
355    /// This only works on `io_uring` driver. It will return an [`Unsupported`]
356    /// error on other drivers.
357    ///
358    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
359    pub fn register_files(&self, fds: &[RawFd]) -> io::Result<()> {
360        fn unsupported(_: &[RawFd]) -> io::Error {
361            io::Error::new(
362                io::ErrorKind::Unsupported,
363                "Fixed-file registration is only supported on io-uring driver",
364            )
365        }
366
367        #[cfg(io_uring)]
368        match self.driver.as_iour() {
369            Some(iour) => iour.register_files(fds),
370            None => Err(unsupported(fds)),
371        }
372
373        #[cfg(not(io_uring))]
374        Err(unsupported(fds))
375    }
376
377    /// Unregister previously registered file descriptors.
378    ///
379    /// This only works on `io_uring` driver. It will return an [`Unsupported`]
380    /// error on other drivers.
381    ///
382    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
383    pub fn unregister_files(&self) -> io::Result<()> {
384        fn unsupported() -> io::Error {
385            io::Error::new(
386                io::ErrorKind::Unsupported,
387                "Fixed-file unregistration is only supported on io-uring driver",
388            )
389        }
390
391        #[cfg(io_uring)]
392        match self.driver.as_iour() {
393            Some(iour) => iour.unregister_files(),
394            None => Err(unsupported()),
395        }
396
397        #[cfg(not(io_uring))]
398        Err(unsupported())
399    }
400
401    /// Register a new personality in io-uring driver.
402    ///
403    /// Returns the personality id, which can be used with
404    /// [`Extra::set_personality`] to set the personality for an operation.
405    ///
406    /// This only works on `io_uring` driver. It will return an [`Unsupported`]
407    /// error on other drivers. See [`Submitter::register_personality`] for
408    /// more.
409    ///
410    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
411    /// [`Submitter::register_personality`]: https://docs.rs/io-uring/latest/io_uring/struct.Submitter.html#method.register_personality
412    pub fn register_personality(&self) -> io::Result<u16> {
413        fn unsupported() -> io::Error {
414            io::Error::new(
415                io::ErrorKind::Unsupported,
416                "Personality is only supported on io-uring driver",
417            )
418        }
419
420        #[cfg(io_uring)]
421        match self.driver.as_iour() {
422            Some(iour) => iour.register_personality(),
423            None => Err(unsupported()),
424        }
425
426        #[cfg(not(io_uring))]
427        Err(unsupported())
428    }
429
430    /// Unregister the given personality in io-uring driver.
431    ///
432    /// This only works on `io_uring` driver. It will return an [`Unsupported`]
433    /// error on other drivers. See [`Submitter::unregister_personality`] for
434    /// more.
435    ///
436    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
437    /// [`Submitter::unregister_personality`]: https://docs.rs/io-uring/latest/io_uring/struct.Submitter.html#method.unregister_personality
438    pub fn unregister_personality(&self, personality: u16) -> io::Result<()> {
439        fn unsupported(_: u16) -> io::Error {
440            io::Error::new(
441                io::ErrorKind::Unsupported,
442                "Personality is only supported on io-uring driver",
443            )
444        }
445
446        #[cfg(io_uring)]
447        match self.driver.as_iour() {
448            Some(iour) => iour.unregister_personality(personality),
449            None => Err(unsupported(personality)),
450        }
451
452        #[cfg(not(io_uring))]
453        Err(unsupported(personality))
454    }
455
456    /// Get the buffer pool of the driver.
457    ///
458    /// This will lazily initialize the pool at the first time it's accessed,
459    /// and future access to the pool will be cheap and infallible.
460    pub fn buffer_pool(&mut self) -> io::Result<BufferPool> {
461        self.buffer_pool.get(&mut self.driver)
462    }
463}
464
465impl AsRawFd for Proactor {
466    fn as_raw_fd(&self) -> RawFd {
467        self.driver.as_raw_fd()
468    }
469}
470
471/// An completed entry returned from kernel.
472///
473/// This represents the ownership of [`Key`] passed into the kernel is given
474/// back from it to the driver.
475#[derive(Debug)]
476pub(crate) struct Entry {
477    key: ErasedKey,
478    result: io::Result<usize>,
479
480    #[cfg(io_uring)]
481    flags: u32,
482}
483
484unsafe impl Send for Entry {}
485unsafe impl Sync for Entry {}
486
487impl Entry {
488    pub(crate) fn new(key: ErasedKey, result: io::Result<usize>) -> Self {
489        #[cfg(not(io_uring))]
490        {
491            Self { key, result }
492        }
493        #[cfg(io_uring)]
494        {
495            Self {
496                key,
497                result,
498                flags: 0,
499            }
500        }
501    }
502
503    #[allow(dead_code)]
504    pub fn user_data(&self) -> usize {
505        self.key.as_raw()
506    }
507
508    #[allow(dead_code)]
509    pub fn into_key(self) -> ErasedKey {
510        self.key
511    }
512
513    #[cfg(io_uring)]
514    pub fn flags(&self) -> u32 {
515        self.flags
516    }
517
518    #[cfg(io_uring)]
519    pub(crate) fn set_flags(&mut self, flags: u32) {
520        self.flags = flags;
521    }
522
523    pub fn notify(self) {
524        #[cfg(io_uring)]
525        self.key.borrow().extra_mut().set_flags(self.flags());
526        self.key.set_result(self.result);
527    }
528}
529
530#[derive(Debug, Clone)]
531enum ThreadPoolBuilder {
532    Create { limit: usize, recv_limit: Duration },
533    Reuse(AsyncifyPool),
534}
535
536impl Default for ThreadPoolBuilder {
537    fn default() -> Self {
538        Self::new()
539    }
540}
541
542impl ThreadPoolBuilder {
543    pub fn new() -> Self {
544        Self::Create {
545            limit: 256,
546            recv_limit: Duration::from_secs(60),
547        }
548    }
549
550    pub fn create_or_reuse(&self) -> AsyncifyPool {
551        match self {
552            Self::Create { limit, recv_limit } => AsyncifyPool::new(*limit, *recv_limit),
553            Self::Reuse(pool) => pool.clone(),
554        }
555    }
556}
557
558/// Builder for [`Proactor`].
559#[derive(Debug, Clone)]
560pub struct ProactorBuilder {
561    capacity: u32,
562    pool_builder: ThreadPoolBuilder,
563    sqpoll_idle: Option<Duration>,
564    sqpoll_cpu: Option<u32>,
565    cqsize: Option<u32>,
566    single_issuer: bool,
567    coop_taskrun: bool,
568    taskrun_flag: bool,
569    defer_taskrun: bool,
570    eventfd: Option<RawFd>,
571    driver_type: Option<DriverType>,
572    op_flags: OpCodeFlag,
573    buffer_pool_size: u16,
574    buffer_pool_flag: u16,
575    buffer_pool_buffer_len: usize,
576    buffer_pool_allocator: BufferAlloc,
577}
578
579// SAFETY: `RawFd` is thread safe.
580unsafe impl Send for ProactorBuilder {}
581unsafe impl Sync for ProactorBuilder {}
582
583impl Default for ProactorBuilder {
584    fn default() -> Self {
585        Self::new()
586    }
587}
588
589impl ProactorBuilder {
590    /// Create the builder with default config.
591    pub fn new() -> Self {
592        Self {
593            capacity: 1024,
594            pool_builder: ThreadPoolBuilder::new(),
595            sqpoll_idle: None,
596            sqpoll_cpu: None,
597            cqsize: None,
598            single_issuer: false,
599            coop_taskrun: false,
600            taskrun_flag: false,
601            defer_taskrun: false,
602            eventfd: None,
603            driver_type: None,
604            op_flags: OpCodeFlag::empty(),
605            buffer_pool_size: 8,
606            buffer_pool_flag: 0,
607            buffer_pool_buffer_len: 8192,
608            buffer_pool_allocator: BufferAlloc::new::<BoxAllocator>(),
609        }
610    }
611
612    /// Set the capacity of the inner event queue or submission queue, if
613    /// exists. The default value is 1024.
614    pub fn capacity(&mut self, capacity: u32) -> &mut Self {
615        self.capacity = capacity;
616        self
617    }
618
619    /// Set the completion queue size of io-uring driver. The value should be
620    /// greater than `capacity`.
621    pub fn cqsize(&mut self, cqsize: u32) -> &mut Self {
622        self.cqsize = Some(cqsize);
623        self
624    }
625
626    /// Set the thread number limit of the inner thread pool, if exists. The
627    /// default value is 256.
628    ///
629    /// It will be ignored if `reuse_thread_pool` is set.
630    ///
631    /// Warning: some operations don't work if the limit is set to zero:
632    /// * `Asyncify` needs thread pool.
633    /// * Operations except `Recv*`, `Send*`, `Connect`, `Accept` may need
634    ///   thread pool.
635    pub fn thread_pool_limit(&mut self, value: usize) -> &mut Self {
636        if let ThreadPoolBuilder::Create { limit, .. } = &mut self.pool_builder {
637            *limit = value;
638        }
639        self
640    }
641
642    /// Set the waiting timeout of the inner thread, if exists. The default is
643    /// 60 seconds.
644    ///
645    /// It will be ignored if `reuse_thread_pool` is set.
646    pub fn thread_pool_recv_timeout(&mut self, timeout: Duration) -> &mut Self {
647        if let ThreadPoolBuilder::Create { recv_limit, .. } = &mut self.pool_builder {
648            *recv_limit = timeout;
649        }
650        self
651    }
652
653    /// Set to reuse an existing [`AsyncifyPool`] in this proactor.
654    pub fn reuse_thread_pool(&mut self, pool: AsyncifyPool) -> &mut Self {
655        self.pool_builder = ThreadPoolBuilder::Reuse(pool);
656        self
657    }
658
659    /// Force reuse the thread pool for each proactor created by this builder,
660    /// even `reuse_thread_pool` is not set.
661    pub fn force_reuse_thread_pool(&mut self) -> &mut Self {
662        self.reuse_thread_pool(self.create_or_get_thread_pool());
663        self
664    }
665
666    /// Create or reuse the thread pool from the config.
667    pub fn create_or_get_thread_pool(&self) -> AsyncifyPool {
668        self.pool_builder.create_or_reuse()
669    }
670
671    /// Set `io-uring` sqpoll idle duration,
672    ///
673    /// This will also enable io-uring's sqpoll feature.
674    ///
675    /// # Notes
676    ///
677    /// - Only effective when the `io-uring` feature is enabled
678    /// - `idle` must be >= 1ms, otherwise sqpoll idle will be set to 0 ms
679    /// - `idle` will be rounded down
680    pub fn sqpoll_idle(&mut self, idle: Duration) -> &mut Self {
681        self.sqpoll_idle = Some(idle);
682        self
683    }
684
685    /// Set CPU affinity for the `io-uring` SQPOLL thread when SQPOLL is
686    /// enabled.
687    ///
688    /// This is only applied when SQPOLL is enabled with
689    /// [`sqpoll_idle`](Self::sqpoll_idle).
690    ///
691    /// # Notes
692    ///
693    /// - Only effective when the `io-uring` feature is enabled
694    /// - `cpu` must be less than the number of cpus in the system, otherwise it
695    ///   will return an error when building the proactor.
696    pub fn sqpoll_cpu(&mut self, cpu: u32) -> &mut Self {
697        self.sqpoll_cpu = Some(cpu);
698        self
699    }
700
701    /// Set the `io-uring` single issuer hint.
702    ///
703    /// # Notes
704    ///
705    /// - Available since Linux Kernel 6.0.
706    /// - Only effective when the `io-uring` feature is enabled
707    pub fn single_issuer(&mut self, enable: bool) -> &mut Self {
708        self.single_issuer = enable;
709        self
710    }
711
712    /// Optimize performance for most cases, especially compio is a single
713    /// thread runtime.
714    ///
715    /// However, it can't run with sqpoll feature.
716    ///
717    /// # Notes
718    ///
719    /// - Available since Linux Kernel 5.19.
720    /// - Only effective when the `io-uring` feature is enabled
721    pub fn coop_taskrun(&mut self, enable: bool) -> &mut Self {
722        self.coop_taskrun = enable;
723        self
724    }
725
726    /// Allows io-uring driver to know if any cqe's are available when try to
727    /// push an sqe to the submission queue.
728    ///
729    /// This should be enabled with [`coop_taskrun`](Self::coop_taskrun)
730    ///
731    /// # Notes
732    ///
733    /// - Available since Linux Kernel 5.19.
734    /// - Only effective when the `io-uring` feature is enabled
735    pub fn taskrun_flag(&mut self, enable: bool) -> &mut Self {
736        self.taskrun_flag = enable;
737        self
738    }
739
740    /// Defer io-uring task work until the driver enters the kernel to wait for
741    /// completions.
742    ///
743    /// This is only applied when [`single_issuer`](Self::single_issuer) is
744    /// enabled. The kernel requires `IORING_SETUP_SINGLE_ISSUER` for
745    /// `IORING_SETUP_DEFER_TASKRUN`.
746    ///
747    /// # Notes
748    ///
749    /// - Available since Linux Kernel 6.1.
750    /// - Only effective when the `io-uring` feature is enabled
751    pub fn defer_taskrun(&mut self, enable: bool) -> &mut Self {
752        self.defer_taskrun = enable;
753        self
754    }
755
756    /// Register an eventfd to io-uring.
757    ///
758    /// # Notes
759    ///
760    /// - Only effective when the `io-uring` feature is enabled
761    pub fn register_eventfd(&mut self, fd: RawFd) -> &mut Self {
762        self.eventfd = Some(fd);
763        self
764    }
765
766    /// Set which io-uring [`OpCode`] must be supported by the driver.
767    ///
768    /// Support for io-uring opcodes varies by kernel version. Setting this
769    /// will force the driver to check for support of the specified opcodes, and
770    /// when any of them are not supported:
771    ///
772    /// - Fallback to `polling` driver if it is enabled, or
773    /// - Return an [`Unsupported`] error when building the proactor otherwise.
774    ///
775    /// # Notes
776    ///
777    /// - Only effective when the `io-uring` feature is enabled
778    /// - [`OpCodeFlag`] is a bitflag struct, you can combine multiple opcodes
779    ///   with bitwise OR or use [`OpCodeFlag::all`] to require all opcodes to
780    ///   be supported.
781    ///
782    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
783    pub fn detect_opcode_support(&mut self, flags: OpCodeFlag) -> &mut Self {
784        self.op_flags = flags;
785        self
786    }
787
788    /// Force a driver type to use.
789    ///
790    /// It is ignored if the fusion driver is disabled.
791    pub fn driver_type(&mut self, t: DriverType) -> &mut Self {
792        self.driver_type = Some(t);
793        self
794    }
795
796    /// Number of buffers in the buffer pool.
797    ///
798    /// `size` will be rounded up if it's not power of 2.
799    ///
800    /// Default to be `8`.
801    pub fn buffer_pool_size(&mut self, size: NonZero<u16>) -> &mut Self {
802        self.buffer_pool_size = size.get();
803        self
804    }
805
806    /// Flag to be used to initialize buffer pool.
807    ///
808    /// This is only supported on io-uring driver.
809    ///
810    /// Default to be `0`.
811    pub fn buffer_pool_flag(&mut self, flag: u16) -> &mut Self {
812        self.buffer_pool_flag = flag;
813        self
814    }
815
816    /// Length of each buffer pool's buffer.
817    ///
818    /// Default to be `8192`.
819    pub fn buffer_pool_buffer_len(&mut self, size: usize) -> &mut Self {
820        self.buffer_pool_buffer_len = size;
821        self
822    }
823
824    /// Set the allocator for buffer pool.
825    ///
826    /// This is different from the std's unstable `Allocator` trait: it's purely
827    /// static and doesn't take an instance at all. This means implementation
828    /// should be global (e.g., `Global`, `malloc` or `mmap`).
829    ///
830    /// Default to [`BoxAllocator`].
831    ///
832    /// # Note
833    ///
834    /// Default allocator performs [very poor] when using managed i/o on Zen 3,
835    /// possibly due to [a bug related to FSRM]. If you observe such a problem,
836    /// try swap the allocator to a mmap-based one may solve it.
837    ///
838    /// [very poor]: https://github.com/compio-rs/compio/issues/472
839    /// [a bug related to FSRM]: https://bugs.launchpad.net/ubuntu/+source/glibc/+bug/2030515
840    pub fn buffer_pool_allocator<A: BufferAllocator>(&mut self) -> &mut Self {
841        self.buffer_pool_allocator = BufferAlloc::new::<A>();
842        self
843    }
844
845    /// Build the [`Proactor`].
846    pub fn build(&self) -> io::Result<Proactor> {
847        Proactor::with_builder(self)
848    }
849}
850
851mod seal {
852    use std::io;
853
854    use compio_buf::BufResult;
855
856    pub(crate) trait Seal {}
857
858    impl Seal for io::Error {}
859    impl<T> Seal for io::Result<T> {}
860    impl<T, B> Seal for BufResult<T, B> {}
861}
862
863/// Extension trait for [`io::Error`] and results with it.
864#[allow(private_bounds)]
865pub trait ErrorExt: seal::Seal {
866    #[doc(hidden)]
867    fn as_io_error(&self) -> Option<&io::Error>;
868
869    /// Whether the error or result is cancelled.
870    fn is_cancelled(&self) -> bool {
871        #[cfg(unix)]
872        const CANCEL_ERROR: i32 = libc::ECANCELED;
873        #[cfg(windows)]
874        const CANCEL_ERROR: i32 = windows_sys::Win32::Foundation::ERROR_OPERATION_ABORTED as _;
875
876        self.as_io_error()
877            .and_then(io::Error::raw_os_error)
878            .is_some_and(|e| e == CANCEL_ERROR)
879    }
880}
881
882impl ErrorExt for io::Error {
883    fn as_io_error(&self) -> Option<&io::Error> {
884        Some(self)
885    }
886}
887
888impl<T> ErrorExt for io::Result<T> {
889    fn as_io_error(&self) -> Option<&io::Error> {
890        self.as_ref().err()
891    }
892}
893
894impl<T, B> ErrorExt for BufResult<T, B> {
895    fn as_io_error(&self) -> Option<&io::Error> {
896        self.0.as_io_error()
897    }
898}