Skip to main content

compio_driver/
lib.rs

1//! Platform-specific drivers.
2//!
3//! Some types differ by compilation target.
4
5#![cfg_attr(docsrs, feature(doc_cfg))]
6#![cfg_attr(feature = "once_cell_try", feature(once_cell_try))]
7#![allow(unused_features)]
8#![warn(missing_docs)]
9#![deny(rustdoc::broken_intra_doc_links)]
10#![doc(
11    html_logo_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
12)]
13#![doc(
14    html_favicon_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
15)]
16
17use std::{
18    io,
19    task::{Poll, Waker},
20    time::Duration,
21};
22
23use compio_buf::BufResult;
24use compio_log::instrument;
25
26mod macros;
27
28mod key;
29pub use key::Key;
30
31mod asyncify;
32pub use asyncify::*;
33
34pub mod op;
35
36mod fd;
37pub use fd::*;
38
39mod driver_type;
40pub use driver_type::*;
41
42mod buffer_pool;
43pub use buffer_pool::*;
44
45mod sys;
46pub use sys::{Extra, *};
47
48mod cancel;
49pub use cancel::*;
50
51use crate::{key::ErasedKey, op::OpCodeFlag};
52
53mod sys_slice;
54
55/// The return type of [`Proactor::push`].
56#[derive(Debug)]
57pub enum PushEntry<K, R> {
58    /// The operation is pushed to the submission queue.
59    Pending(K),
60    /// The operation is ready and returns.
61    Ready(R),
62}
63
64impl<K, R> PushEntry<K, R> {
65    /// Get if the current variant is [`PushEntry::Ready`].
66    pub const fn is_ready(&self) -> bool {
67        matches!(self, Self::Ready(_))
68    }
69
70    /// Take the ready variant if exists.
71    pub fn take_ready(self) -> Option<R> {
72        match self {
73            Self::Pending(_) => None,
74            Self::Ready(res) => Some(res),
75        }
76    }
77
78    /// Map the [`PushEntry::Pending`] branch.
79    pub fn map_pending<L>(self, f: impl FnOnce(K) -> L) -> PushEntry<L, R> {
80        match self {
81            Self::Pending(k) => PushEntry::Pending(f(k)),
82            Self::Ready(r) => PushEntry::Ready(r),
83        }
84    }
85
86    /// Map the [`PushEntry::Ready`] branch.
87    pub fn map_ready<S>(self, f: impl FnOnce(R) -> S) -> PushEntry<K, S> {
88        match self {
89            Self::Pending(k) => PushEntry::Pending(k),
90            Self::Ready(r) => PushEntry::Ready(f(r)),
91        }
92    }
93}
94
95/// Low-level actions of completion-based IO.
96/// It owns the operations to keep the driver safe.
97pub struct Proactor {
98    driver: Driver,
99    cancel: CancelRegistry,
100}
101
102assert_not_impl!(Proactor, Send);
103assert_not_impl!(Proactor, Sync);
104
105impl Proactor {
106    /// Create [`Proactor`] with 1024 entries.
107    pub fn new() -> io::Result<Self> {
108        Self::builder().build()
109    }
110
111    /// Create [`ProactorBuilder`] to config the proactor.
112    pub fn builder() -> ProactorBuilder {
113        ProactorBuilder::new()
114    }
115
116    fn with_builder(builder: &ProactorBuilder) -> io::Result<Self> {
117        Ok(Self {
118            driver: Driver::new(builder)?,
119            cancel: CancelRegistry::new(),
120        })
121    }
122
123    /// Get a default [`Extra`] for underlying driver.
124    pub fn default_extra(&self) -> Extra {
125        sys::default_extra(&self.driver)
126    }
127
128    /// The current driver type.
129    pub fn driver_type(&self) -> DriverType {
130        self.driver.driver_type()
131    }
132
133    /// Attach an fd to the driver.
134    ///
135    /// ## Platform specific
136    /// * IOCP: it will be attached to the completion port. An fd could only be
137    ///   attached to one driver, and could only be attached once, even if you
138    ///   `try_clone` it.
139    /// * io-uring & polling: it will do nothing but return `Ok(())`.
140    pub fn attach(&mut self, fd: RawFd) -> io::Result<()> {
141        self.driver.attach(fd)
142    }
143
144    /// Cancel an operation with the pushed [`Key`].
145    ///
146    /// Returns the result if the key is unique and the operation is completed.
147    ///
148    /// The cancellation is not reliable. The underlying operation may continue,
149    /// but just don't return from [`Proactor::poll`].
150    pub fn cancel<T: OpCode>(&mut self, key: Key<T>) -> Option<BufResult<usize, T>> {
151        instrument!(compio_log::Level::DEBUG, "cancel", ?key);
152        if key.set_cancelled() {
153            return None;
154        }
155        self.cancel.remove(&key);
156        if key.is_unique() && key.has_result() {
157            Some(key.take_result())
158        } else {
159            self.driver.cancel(key.erase());
160            None
161        }
162    }
163
164    /// Cancel an operation with a [`Cancel`] token.
165    ///
166    /// Returns if a cancellation has been issued.
167    ///
168    /// The cancellation is not reliable. The underlying operation may continue,
169    /// but just don't return from [`Proactor::pop`]. This will do nothing if
170    /// the operation has already been completed or cancelled before.
171    pub fn cancel_token(&mut self, token: Cancel) -> bool {
172        let Some(key) = self.cancel.take(token) else {
173            return false;
174        };
175        if key.set_cancelled() || key.has_result() {
176            return false;
177        }
178        self.driver.cancel(key);
179        true
180    }
181
182    /// Create a [`Cancel`] that can be used to cancel the operation even
183    /// without the key.
184    ///
185    /// This acts like a weak reference to the [`Key`], but can only be used to
186    /// cancel the operation with [`Proactor::cancel_token`]. Extra copy of
187    /// [`Key`] may cause [`Proactor::pop`] to panic while keys registered
188    /// as [`Cancel`] will be properly handled. So this is useful in cases
189    /// where you're not sure if the operation will be cancelled.
190    pub fn register_cancel<T: OpCode>(&mut self, key: &Key<T>) -> Cancel {
191        self.cancel.register(key)
192    }
193
194    /// Push an operation into the driver, and return the unique key [`Key`],
195    /// associated with it.
196    pub fn push<T: sys::OpCode + 'static>(
197        &mut self,
198        op: T,
199    ) -> PushEntry<Key<T>, BufResult<usize, T>> {
200        self.push_with_extra(op, self.default_extra())
201    }
202
203    /// Push an operation into the driver with user-defined [`Extra`], and
204    /// return the unique key [`Key`], associated with it.
205    pub fn push_with_extra<T: sys::OpCode + 'static>(
206        &mut self,
207        op: T,
208        extra: Extra,
209    ) -> PushEntry<Key<T>, BufResult<usize, T>> {
210        let key = Key::new(op, extra);
211        match self.driver.push(key.clone().erase()) {
212            Poll::Pending => PushEntry::Pending(key),
213            Poll::Ready(res) => {
214                key.set_result(res);
215                PushEntry::Ready(key.take_result())
216            }
217        }
218    }
219
220    /// Poll the driver and get completed entries.
221    /// You need to call [`Proactor::pop`] to get the pushed
222    /// operations.
223    pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
224        self.driver.poll(timeout)
225    }
226
227    /// Get the pushed operations from the completion entries.
228    ///
229    /// # Panics
230    ///
231    /// This function will panic if the [`Key`] is not unique.
232    pub fn pop<T>(&mut self, key: Key<T>) -> PushEntry<Key<T>, BufResult<usize, T>> {
233        instrument!(compio_log::Level::DEBUG, "pop", ?key);
234        if key.has_result() {
235            self.cancel.remove(&key);
236            PushEntry::Ready(key.take_result())
237        } else {
238            PushEntry::Pending(key)
239        }
240    }
241
242    /// Get the pushed operations from the completion entries along the
243    /// [`Extra`] associated.
244    ///
245    /// # Panics
246    ///
247    /// This function will panic if the [`Key`] is not unique.
248    pub fn pop_with_extra<T>(
249        &mut self,
250        key: Key<T>,
251    ) -> PushEntry<Key<T>, (BufResult<usize, T>, Extra)> {
252        instrument!(compio_log::Level::DEBUG, "pop", ?key);
253        if key.has_result() {
254            self.cancel.remove(&key);
255            let extra = key.swap_extra(self.default_extra());
256            let res = key.take_result();
257            PushEntry::Ready((res, extra))
258        } else {
259            PushEntry::Pending(key)
260        }
261    }
262
263    /// Get one completion entry for a multishot operation. If it returns
264    /// [`None`], the user should call [`Proactor::pop_with_extra`] to get the
265    /// final result of the operation.
266    pub fn pop_multishot<T>(&mut self, key: &Key<T>) -> Option<BufResult<usize, Extra>> {
267        instrument!(compio_log::Level::DEBUG, "pop_multishot", ?key);
268        self.driver.pop_multishot(key)
269    }
270
271    /// Update the waker of the specified op.
272    pub fn update_waker<T>(&mut self, op: &Key<T>, waker: &Waker) {
273        op.set_waker(waker);
274    }
275
276    /// Create a waker to interrupt the inner driver.
277    pub fn waker(&self) -> Waker {
278        self.driver.waker()
279    }
280
281    /// Create buffer pool with given `buffer_size` and `buffer_len`
282    ///
283    /// # Notes
284    ///
285    /// If `buffer_len` is not a power of 2, it will be rounded up with
286    /// [`u16::next_power_of_two`].
287    pub fn create_buffer_pool(
288        &mut self,
289        buffer_len: u16,
290        buffer_size: usize,
291    ) -> io::Result<BufferPool> {
292        self.driver.create_buffer_pool(buffer_len, buffer_size)
293    }
294
295    /// Release the buffer pool
296    ///
297    /// # Safety
298    ///
299    /// Caller must make sure to release the buffer pool with the correct
300    /// driver, i.e., the one they created the buffer pool with.
301    pub unsafe fn release_buffer_pool(&mut self, buffer_pool: BufferPool) -> io::Result<()> {
302        unsafe { self.driver.release_buffer_pool(buffer_pool) }
303    }
304
305    /// Register file descriptors for fixed-file operations with io_uring.
306    ///
307    /// This only works on `io_uring` driver. It will return an [`Unsupported`]
308    /// error on other drivers.
309    ///
310    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
311    pub fn register_files(&self, fds: &[RawFd]) -> io::Result<()> {
312        fn unsupported(_: &[RawFd]) -> io::Error {
313            io::Error::new(
314                io::ErrorKind::Unsupported,
315                "Fixed-file registration is only supported on io-uring driver",
316            )
317        }
318
319        #[cfg(io_uring)]
320        match self.driver.as_iour() {
321            Some(iour) => iour.register_files(fds),
322            None => Err(unsupported(fds)),
323        }
324
325        #[cfg(not(io_uring))]
326        Err(unsupported(fds))
327    }
328
329    /// Unregister previously registered file descriptors.
330    ///
331    /// This only works on `io_uring` driver. It will return an [`Unsupported`]
332    /// error on other drivers.
333    ///
334    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
335    pub fn unregister_files(&self) -> io::Result<()> {
336        fn unsupported() -> io::Error {
337            io::Error::new(
338                io::ErrorKind::Unsupported,
339                "Fixed-file unregistration is only supported on io-uring driver",
340            )
341        }
342
343        #[cfg(io_uring)]
344        match self.driver.as_iour() {
345            Some(iour) => iour.unregister_files(),
346            None => Err(unsupported()),
347        }
348
349        #[cfg(not(io_uring))]
350        Err(unsupported())
351    }
352
353    /// Register a new personality in io-uring driver.
354    ///
355    /// Returns the personality id, which can be used with
356    /// [`Extra::set_personality`] to set the personality for an operation.
357    ///
358    /// This only works on `io_uring` driver. It will return an [`Unsupported`]
359    /// error on other drivers. See [`Submitter::register_personality`] for
360    /// more.
361    ///
362    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
363    /// [`Submitter::register_personality`]: https://docs.rs/io-uring/latest/io_uring/struct.Submitter.html#method.register_personality
364    pub fn register_personality(&self) -> io::Result<u16> {
365        fn unsupported() -> io::Error {
366            io::Error::new(
367                io::ErrorKind::Unsupported,
368                "Personality is only supported on io-uring driver",
369            )
370        }
371
372        #[cfg(io_uring)]
373        match self.driver.as_iour() {
374            Some(iour) => iour.register_personality(),
375            None => Err(unsupported()),
376        }
377
378        #[cfg(not(io_uring))]
379        Err(unsupported())
380    }
381
382    /// Unregister the given personality in io-uring driver.
383    ///
384    /// This only works on `io_uring` driver. It will return an [`Unsupported`]
385    /// error on other drivers. See [`Submitter::unregister_personality`] for
386    /// more.
387    ///
388    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
389    /// [`Submitter::unregister_personality`]: https://docs.rs/io-uring/latest/io_uring/struct.Submitter.html#method.unregister_personality
390    pub fn unregister_personality(&self, personality: u16) -> io::Result<()> {
391        fn unsupported(_: u16) -> io::Error {
392            io::Error::new(
393                io::ErrorKind::Unsupported,
394                "Personality is only supported on io-uring driver",
395            )
396        }
397
398        #[cfg(io_uring)]
399        match self.driver.as_iour() {
400            Some(iour) => iour.unregister_personality(personality),
401            None => Err(unsupported(personality)),
402        }
403
404        #[cfg(not(io_uring))]
405        Err(unsupported(personality))
406    }
407}
408
409impl AsRawFd for Proactor {
410    fn as_raw_fd(&self) -> RawFd {
411        self.driver.as_raw_fd()
412    }
413}
414
415/// An completed entry returned from kernel.
416///
417/// This represents the ownership of [`Key`] passed into the kernel is given
418/// back from it to the driver.
419#[derive(Debug)]
420pub(crate) struct Entry {
421    key: ErasedKey,
422    result: io::Result<usize>,
423
424    #[cfg(io_uring)]
425    flags: u32,
426}
427
428unsafe impl Send for Entry {}
429unsafe impl Sync for Entry {}
430
431impl Entry {
432    pub(crate) fn new(key: ErasedKey, result: io::Result<usize>) -> Self {
433        #[cfg(not(io_uring))]
434        {
435            Self { key, result }
436        }
437        #[cfg(io_uring)]
438        {
439            Self {
440                key,
441                result,
442                flags: 0,
443            }
444        }
445    }
446
447    #[allow(dead_code)]
448    pub fn user_data(&self) -> usize {
449        self.key.as_raw()
450    }
451
452    #[allow(dead_code)]
453    pub fn into_key(self) -> ErasedKey {
454        self.key
455    }
456
457    #[cfg(io_uring)]
458    pub fn flags(&self) -> u32 {
459        self.flags
460    }
461
462    #[cfg(io_uring)]
463    // this method only used by in io-uring driver
464    pub(crate) fn set_flags(&mut self, flags: u32) {
465        self.flags = flags;
466    }
467
468    pub fn notify(self) {
469        #[cfg(io_uring)]
470        self.key.borrow().extra_mut().set_flags(self.flags());
471        self.key.set_result(self.result);
472    }
473}
474
475#[derive(Debug, Clone)]
476enum ThreadPoolBuilder {
477    Create { limit: usize, recv_limit: Duration },
478    Reuse(AsyncifyPool),
479}
480
481impl Default for ThreadPoolBuilder {
482    fn default() -> Self {
483        Self::new()
484    }
485}
486
487impl ThreadPoolBuilder {
488    pub fn new() -> Self {
489        Self::Create {
490            limit: 256,
491            recv_limit: Duration::from_secs(60),
492        }
493    }
494
495    pub fn create_or_reuse(&self) -> AsyncifyPool {
496        match self {
497            Self::Create { limit, recv_limit } => AsyncifyPool::new(*limit, *recv_limit),
498            Self::Reuse(pool) => pool.clone(),
499        }
500    }
501}
502
503/// Builder for [`Proactor`].
504#[derive(Debug, Clone)]
505pub struct ProactorBuilder {
506    capacity: u32,
507    pool_builder: ThreadPoolBuilder,
508    sqpoll_idle: Option<Duration>,
509    coop_taskrun: bool,
510    taskrun_flag: bool,
511    eventfd: Option<RawFd>,
512    driver_type: Option<DriverType>,
513    op_flags: OpCodeFlag,
514}
515
516// SAFETY: `RawFd` is thread safe.
517unsafe impl Send for ProactorBuilder {}
518unsafe impl Sync for ProactorBuilder {}
519
520impl Default for ProactorBuilder {
521    fn default() -> Self {
522        Self::new()
523    }
524}
525
526impl ProactorBuilder {
527    /// Create the builder with default config.
528    pub fn new() -> Self {
529        Self {
530            capacity: 1024,
531            pool_builder: ThreadPoolBuilder::new(),
532            sqpoll_idle: None,
533            coop_taskrun: false,
534            taskrun_flag: false,
535            eventfd: None,
536            driver_type: None,
537            op_flags: OpCodeFlag::empty(),
538        }
539    }
540
541    /// Set the capacity of the inner event queue or submission queue, if
542    /// exists. The default value is 1024.
543    pub fn capacity(&mut self, capacity: u32) -> &mut Self {
544        self.capacity = capacity;
545        self
546    }
547
548    /// Set the thread number limit of the inner thread pool, if exists. The
549    /// default value is 256.
550    ///
551    /// It will be ignored if `reuse_thread_pool` is set.
552    ///
553    /// Warning: some operations don't work if the limit is set to zero:
554    /// * `Asyncify` needs thread pool.
555    /// * Operations except `Recv*`, `Send*`, `Connect`, `Accept` may need
556    ///   thread pool.
557    pub fn thread_pool_limit(&mut self, value: usize) -> &mut Self {
558        if let ThreadPoolBuilder::Create { limit, .. } = &mut self.pool_builder {
559            *limit = value;
560        }
561        self
562    }
563
564    /// Set the waiting timeout of the inner thread, if exists. The default is
565    /// 60 seconds.
566    ///
567    /// It will be ignored if `reuse_thread_pool` is set.
568    pub fn thread_pool_recv_timeout(&mut self, timeout: Duration) -> &mut Self {
569        if let ThreadPoolBuilder::Create { recv_limit, .. } = &mut self.pool_builder {
570            *recv_limit = timeout;
571        }
572        self
573    }
574
575    /// Set to reuse an existing [`AsyncifyPool`] in this proactor.
576    pub fn reuse_thread_pool(&mut self, pool: AsyncifyPool) -> &mut Self {
577        self.pool_builder = ThreadPoolBuilder::Reuse(pool);
578        self
579    }
580
581    /// Force reuse the thread pool for each proactor created by this builder,
582    /// even `reuse_thread_pool` is not set.
583    pub fn force_reuse_thread_pool(&mut self) -> &mut Self {
584        self.reuse_thread_pool(self.create_or_get_thread_pool());
585        self
586    }
587
588    /// Create or reuse the thread pool from the config.
589    pub fn create_or_get_thread_pool(&self) -> AsyncifyPool {
590        self.pool_builder.create_or_reuse()
591    }
592
593    /// Set `io-uring` sqpoll idle duration,
594    ///
595    /// This will also enable io-uring's sqpoll feature.
596    ///
597    /// # Notes
598    ///
599    /// - Only effective when the `io-uring` feature is enabled
600    /// - `idle` must be >= 1ms, otherwise sqpoll idle will be set to 0 ms
601    /// - `idle` will be rounded down
602    pub fn sqpoll_idle(&mut self, idle: Duration) -> &mut Self {
603        self.sqpoll_idle = Some(idle);
604        self
605    }
606
607    /// Optimize performance for most cases, especially compio is a single
608    /// thread runtime.
609    ///
610    /// However, it can't run with sqpoll feature.
611    ///
612    /// # Notes
613    ///
614    /// - Available since Linux Kernel 5.19.
615    /// - Only effective when the `io-uring` feature is enabled
616    pub fn coop_taskrun(&mut self, enable: bool) -> &mut Self {
617        self.coop_taskrun = enable;
618        self
619    }
620
621    /// Allows io-uring driver to know if any cqe's are available when try to
622    /// push an sqe to the submission queue.
623    ///
624    /// This should be enabled with [`coop_taskrun`](Self::coop_taskrun)
625    ///
626    /// # Notes
627    ///
628    /// - Available since Linux Kernel 5.19.
629    /// - Only effective when the `io-uring` feature is enabled
630    pub fn taskrun_flag(&mut self, enable: bool) -> &mut Self {
631        self.taskrun_flag = enable;
632        self
633    }
634
635    /// Register an eventfd to io-uring.
636    ///
637    /// # Notes
638    ///
639    /// - Only effective when the `io-uring` feature is enabled
640    pub fn register_eventfd(&mut self, fd: RawFd) -> &mut Self {
641        self.eventfd = Some(fd);
642        self
643    }
644
645    /// Set which io-uring [`OpCode`] must be supported by the driver.
646    ///
647    /// Support for io-uring opcodes varies by kernel version. Setting this
648    /// will force the driver to check for support of the specified opcodes, and
649    /// when any of them are not supported:
650    /// - Fallback to `polling` driver if `fusion` driver is enabled,
651    /// - Return an [`Unsupported`] error when building the proactor otherwise.
652    ///
653    /// # Notes
654    ///
655    /// - Only effective when the `io-uring` feature is enabled
656    /// - [`OpCodeFlag`] is a bitflag struct, you can combine multiple opcodes
657    ///   with bitwise OR or use [`OpCodeFlag::all`] to require all opcodes to
658    ///   be supported.
659    ///
660    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
661    pub fn detect_opcode_support(&mut self, flags: OpCodeFlag) -> &mut Self {
662        self.op_flags = flags;
663        self
664    }
665
666    /// Force a driver type to use.
667    ///
668    /// It is ignored if the fusion driver is disabled.
669    pub fn driver_type(&mut self, t: DriverType) -> &mut Self {
670        self.driver_type = Some(t);
671        self
672    }
673
674    /// Build the [`Proactor`].
675    pub fn build(&self) -> io::Result<Proactor> {
676        Proactor::with_builder(self)
677    }
678}