Skip to main content

compio_runtime/
lib.rs

1//! The compio runtime.
2//!
3//! ```
4//! let ans = compio_runtime::Runtime::new().unwrap().block_on(async {
5//!     println!("Hello world!");
6//!     42
7//! });
8//! assert_eq!(ans, 42);
9//! ```
10
11#![cfg_attr(docsrs, feature(doc_cfg))]
12#![cfg_attr(feature = "current_thread_id", feature(current_thread_id))]
13#![allow(unused_features)]
14#![warn(missing_docs)]
15#![deny(rustdoc::broken_intra_doc_links)]
16#![doc(
17    html_logo_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
18)]
19#![doc(
20    html_favicon_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
21)]
22
23mod affinity;
24mod attacher;
25mod cancel;
26mod future;
27mod waker;
28
29pub mod fd;
30
31#[cfg(feature = "time")]
32pub mod time;
33
34use std::{
35    cell::RefCell,
36    collections::HashSet,
37    fmt::Debug,
38    future::Future,
39    io,
40    ops::Deref,
41    rc::Rc,
42    task::{Context, Poll, Waker},
43    time::Duration,
44};
45
46use compio_buf::{BufResult, IntoInner};
47use compio_driver::{
48    AsRawFd, Cancel, DriverType, Extra, Key, OpCode, Proactor, ProactorBuilder, PushEntry, RawFd,
49    op::Asyncify,
50};
51pub use compio_driver::{BufferPool, ErrorExt};
52use compio_executor::{Executor, ExecutorConfig};
53pub use compio_executor::{JoinHandle, ResumeUnwind};
54use compio_log::{debug, instrument};
55
56use crate::affinity::bind_to_cpu_set;
57#[cfg(feature = "time")]
58use crate::time::{TimerFuture, TimerKey, TimerRuntime};
59pub use crate::{attacher::*, cancel::CancelToken, future::*};
60
61scoped_tls::scoped_thread_local!(static CURRENT_RUNTIME: Runtime);
62
63#[cold]
64fn not_in_compio_runtime() -> ! {
65    panic!("not in a compio runtime")
66}
67
68/// Inner structure of [`Runtime`].
69pub struct RuntimeInner {
70    executor: Executor,
71    driver: RefCell<Proactor>,
72    #[cfg(feature = "time")]
73    timer_runtime: RefCell<TimerRuntime>,
74}
75
76/// The async runtime of compio.
77///
78/// It is a thread-local runtime, meaning it cannot be sent to other threads.
79#[derive(Clone)]
80pub struct Runtime(Rc<RuntimeInner>);
81
82impl Debug for Runtime {
83    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84        let mut s = f.debug_struct("Runtime");
85        s.field("executor", &self.0.executor)
86            .field("driver", &"...")
87            .field("scheduler", &"...");
88        #[cfg(feature = "time")]
89        s.field("timer_runtime", &"...");
90        s.finish()
91    }
92}
93
94impl Deref for Runtime {
95    type Target = RuntimeInner;
96
97    fn deref(&self) -> &Self::Target {
98        &self.0
99    }
100}
101
102impl Runtime {
103    /// Create [`Runtime`] with default config.
104    pub fn new() -> io::Result<Self> {
105        Self::builder().build()
106    }
107
108    /// Create a builder for [`Runtime`].
109    pub fn builder() -> RuntimeBuilder {
110        RuntimeBuilder::new()
111    }
112
113    /// The current driver type.
114    pub fn driver_type(&self) -> DriverType {
115        self.driver.borrow().driver_type()
116    }
117
118    /// Try to perform a function on the current runtime, and if no runtime is
119    /// running, return the function back.
120    pub fn try_with_current<T, F: FnOnce(&Self) -> T>(f: F) -> Result<T, F> {
121        if CURRENT_RUNTIME.is_set() {
122            Ok(CURRENT_RUNTIME.with(f))
123        } else {
124            Err(f)
125        }
126    }
127
128    /// Perform a function on the current runtime.
129    ///
130    /// ## Panics
131    ///
132    /// This method will panic if there is no running [`Runtime`].
133    pub fn with_current<T, F: FnOnce(&Self) -> T>(f: F) -> T {
134        if CURRENT_RUNTIME.is_set() {
135            CURRENT_RUNTIME.with(f)
136        } else {
137            not_in_compio_runtime()
138        }
139    }
140
141    /// Try to get the current runtime, and if no runtime is running, return
142    /// `None`.
143    pub fn try_current() -> Option<Self> {
144        if CURRENT_RUNTIME.is_set() {
145            Some(CURRENT_RUNTIME.with(|r| r.clone()))
146        } else {
147            None
148        }
149    }
150
151    /// Get the current runtime.
152    ///
153    /// # Panics
154    ///
155    /// This method will panic if there is no running [`Runtime`].
156    pub fn current() -> Self {
157        if CURRENT_RUNTIME.is_set() {
158            CURRENT_RUNTIME.with(|r| r.clone())
159        } else {
160            not_in_compio_runtime()
161        }
162    }
163
164    /// Set this runtime as current runtime, and perform a function in the
165    /// current scope.
166    pub fn enter<T, F: FnOnce() -> T>(&self, f: F) -> T {
167        CURRENT_RUNTIME.set(self, f)
168    }
169
170    /// Low level API to control the runtime.
171    ///
172    /// Run the scheduled tasks.
173    ///
174    /// The return value indicates whether there are still tasks in the queue.
175    pub fn run(&self) -> bool {
176        self.executor.tick()
177    }
178
179    /// Low level API to control the runtime.
180    ///
181    /// Create a waker that always notifies the runtime when woken.
182    pub fn waker(&self) -> Waker {
183        self.driver.borrow().waker()
184    }
185
186    /// Block on the future till it completes.
187    pub fn block_on<F: Future>(&self, future: F) -> F::Output {
188        self.enter(|| {
189            let waker = self.waker();
190            let mut context = Context::from_waker(&waker);
191            let mut future = std::pin::pin!(future);
192            loop {
193                if let Poll::Ready(result) = future.as_mut().poll(&mut context) {
194                    self.run();
195                    return result;
196                }
197                let remaining_tasks = self.run();
198                if remaining_tasks {
199                    self.poll_with(Some(Duration::ZERO));
200                } else {
201                    self.poll();
202                }
203            }
204        })
205    }
206
207    /// Spawns a new asynchronous task, returning a [`JoinHandle`] for it.
208    ///
209    /// Spawning a task enables the task to execute concurrently to other tasks.
210    /// There is no guarantee that a spawned task will execute to completion.
211    pub fn spawn<F: Future + 'static>(&self, future: F) -> JoinHandle<F::Output> {
212        self.0.executor.spawn(future)
213    }
214
215    /// Spawns a blocking task in a new thread, and wait for it.
216    ///
217    /// The task will not be cancelled even if the future is dropped.
218    pub fn spawn_blocking<T: Send + 'static>(
219        &self,
220        f: impl (FnOnce() -> T) + Send + 'static,
221    ) -> JoinHandle<T> {
222        use futures_util::FutureExt;
223
224        let op = Asyncify::new(move || {
225            // TODO: Refactor blocking pool and handle panic within worker and propagate it
226            // back
227            let res = f();
228            BufResult(Ok(0), res)
229        });
230        let submit = self.submit(op);
231        self.spawn(submit.map(|res| res.1.into_inner()))
232    }
233
234    /// Attach a raw file descriptor/handle/socket to the runtime.
235    ///
236    /// You only need this when authoring your own high-level APIs. High-level
237    /// resources in this crate are attached automatically.
238    pub fn attach(&self, fd: RawFd) -> io::Result<()> {
239        self.driver.borrow_mut().attach(fd)
240    }
241
242    fn submit_raw<T: OpCode + 'static>(
243        &self,
244        op: T,
245        extra: Option<Extra>,
246    ) -> PushEntry<Key<T>, BufResult<usize, T>> {
247        let mut this = self.driver.borrow_mut();
248        match extra {
249            Some(e) => this.push_with_extra(op, e),
250            None => this.push(op),
251        }
252    }
253
254    fn default_extra(&self) -> Extra {
255        self.driver.borrow().default_extra()
256    }
257
258    /// Submit an operation to the runtime.
259    ///
260    /// You only need this when authoring your own [`OpCode`].
261    pub fn submit<T: OpCode + 'static>(&self, op: T) -> Submit<T> {
262        Submit::new(self.clone(), op)
263    }
264
265    /// Submit a multishot operation to the runtime.
266    ///
267    /// You only need this when authoring your own [`OpCode`].
268    pub fn submit_multi<T: OpCode + 'static>(&self, op: T) -> SubmitMulti<T> {
269        SubmitMulti::new(self.clone(), op)
270    }
271
272    /// Flush the driver and return whether the driver has been notified.
273    ///
274    /// See [`Proactor::flush`] for more details.
275    pub fn flush(&self) -> bool {
276        self.driver.borrow_mut().flush()
277    }
278
279    pub(crate) fn cancel<T: OpCode>(&self, key: Key<T>) {
280        self.driver.borrow_mut().cancel(key);
281    }
282
283    pub(crate) fn register_cancel<T: OpCode>(&self, key: &Key<T>) -> Cancel {
284        self.driver.borrow_mut().register_cancel(key)
285    }
286
287    pub(crate) fn cancel_token(&self, token: Cancel) -> bool {
288        self.driver.borrow_mut().cancel_token(token)
289    }
290
291    #[cfg(feature = "time")]
292    pub(crate) fn cancel_timer(&self, key: &TimerKey) {
293        self.timer_runtime.borrow_mut().cancel(key);
294    }
295
296    pub(crate) fn poll_task<T: OpCode>(
297        &self,
298        waker: &Waker,
299        key: Key<T>,
300    ) -> PushEntry<Key<T>, BufResult<usize, T>> {
301        instrument!(compio_log::Level::DEBUG, "poll_task", ?key);
302        let mut driver = self.driver.borrow_mut();
303        driver.pop(key).map_pending(|k| {
304            driver.update_waker(&k, waker);
305            k
306        })
307    }
308
309    pub(crate) fn poll_task_with_extra<T: OpCode>(
310        &self,
311        waker: &Waker,
312        key: Key<T>,
313    ) -> PushEntry<Key<T>, (BufResult<usize, T>, Extra)> {
314        instrument!(compio_log::Level::DEBUG, "poll_task_with_extra", ?key);
315        let mut driver = self.driver.borrow_mut();
316        driver.pop_with_extra(key).map_pending(|k| {
317            driver.update_waker(&k, waker);
318            k
319        })
320    }
321
322    pub(crate) fn poll_multishot<T: OpCode>(
323        &self,
324        waker: &Waker,
325        key: &Key<T>,
326    ) -> Option<BufResult<usize, Extra>> {
327        instrument!(compio_log::Level::DEBUG, "poll_multishot", ?key);
328        let mut driver = self.driver.borrow_mut();
329        if let Some(res) = driver.pop_multishot(key) {
330            return Some(res);
331        }
332        driver.update_waker(key, waker);
333        None
334    }
335
336    #[cfg(feature = "time")]
337    pub(crate) fn poll_timer(&self, cx: &mut Context, key: &TimerKey) -> Poll<()> {
338        instrument!(compio_log::Level::DEBUG, "poll_timer", ?cx, ?key);
339        let mut timer_runtime = self.timer_runtime.borrow_mut();
340        if timer_runtime.is_completed(key) {
341            debug!("ready");
342            Poll::Ready(())
343        } else {
344            debug!("pending");
345            timer_runtime.update_waker(key, cx.waker());
346            Poll::Pending
347        }
348    }
349
350    /// Low level API to control the runtime.
351    ///
352    /// Get the timeout value to be passed to [`Proactor::poll`].
353    pub fn current_timeout(&self) -> Option<Duration> {
354        #[cfg(not(feature = "time"))]
355        let timeout = None;
356        #[cfg(feature = "time")]
357        let timeout = self.timer_runtime.borrow().min_timeout();
358        timeout
359    }
360
361    /// Low level API to control the runtime.
362    ///
363    /// Poll the inner proactor. It is equal to calling [`Runtime::poll_with`]
364    /// with [`Runtime::current_timeout`].
365    pub fn poll(&self) {
366        instrument!(compio_log::Level::DEBUG, "poll");
367        let timeout = self.current_timeout();
368        debug!("timeout: {:?}", timeout);
369        self.poll_with(timeout)
370    }
371
372    /// Low level API to control the runtime.
373    ///
374    /// Poll the inner proactor with a custom timeout.
375    pub fn poll_with(&self, timeout: Option<Duration>) {
376        instrument!(compio_log::Level::DEBUG, "poll_with");
377
378        let mut driver = self.driver.borrow_mut();
379        match driver.poll(timeout) {
380            Ok(()) => {}
381            Err(e) => match e.kind() {
382                io::ErrorKind::TimedOut | io::ErrorKind::Interrupted => {
383                    debug!("expected error: {e}");
384                }
385                _ => panic!("{e:?}"),
386            },
387        }
388        #[cfg(feature = "time")]
389        self.timer_runtime.borrow_mut().wake();
390    }
391
392    /// Get buffer pool of the runtime.
393    ///
394    /// This will lazily initialize the pool at the first time it's accessed,
395    /// and future access to the pool will be cheap and infallible.
396    pub fn buffer_pool(&self) -> io::Result<BufferPool> {
397        self.driver.borrow_mut().buffer_pool()
398    }
399
400    /// Register file descriptors for fixed-file operations.
401    ///
402    /// This is only supported on io-uring driver, and will return an
403    /// [`Unsupported`] io error on all other drivers.
404    ///
405    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
406    pub fn register_files(&self, fds: &[RawFd]) -> io::Result<()> {
407        self.driver.borrow_mut().register_files(fds)
408    }
409
410    /// Unregister previously registered file descriptors.
411    ///
412    /// This is only supported on io-uring driver, and will return an
413    /// [`Unsupported`] io error on all other drivers.
414    ///
415    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
416    pub fn unregister_files(&self) -> io::Result<()> {
417        self.driver.borrow_mut().unregister_files()
418    }
419
420    /// Register the personality for the runtime.
421    ///
422    /// This is only supported on io-uring driver, and will return an
423    /// [`Unsupported`] io error on all other drivers.
424    ///
425    /// The returned personality can be used with
426    /// [`FutureExt::with_personality`].
427    ///
428    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
429    pub fn register_personality(&self) -> io::Result<u16> {
430        self.driver.borrow_mut().register_personality()
431    }
432
433    /// Unregister the given personality for the runtime.
434    ///
435    /// This is only supported on io-uring driver, and will return an
436    /// [`Unsupported`] io error on all other drivers.
437    ///
438    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
439    pub fn unregister_personality(&self, personality: u16) -> io::Result<()> {
440        self.driver.borrow_mut().unregister_personality(personality)
441    }
442}
443
444impl Drop for Runtime {
445    fn drop(&mut self) {
446        // this is not the last runtime reference, no need to clear
447        if Rc::strong_count(&self.0) > 1 {
448            return;
449        }
450
451        self.enter(|| {
452            self.executor.clear();
453        })
454    }
455}
456
457impl AsRawFd for Runtime {
458    fn as_raw_fd(&self) -> RawFd {
459        self.driver.borrow().as_raw_fd()
460    }
461}
462
463#[cfg(feature = "criterion")]
464impl criterion::async_executor::AsyncExecutor for Runtime {
465    fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
466        self.block_on(future)
467    }
468}
469
470#[cfg(feature = "criterion")]
471impl criterion::async_executor::AsyncExecutor for &Runtime {
472    fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
473        (**self).block_on(future)
474    }
475}
476
477/// Builder for [`Runtime`].
478#[derive(Debug, Clone)]
479pub struct RuntimeBuilder {
480    proactor_builder: ProactorBuilder,
481    thread_affinity: HashSet<usize>,
482    sync_queue_size: usize,
483    local_queue_size: usize,
484    event_interval: u32,
485}
486
487impl Default for RuntimeBuilder {
488    fn default() -> Self {
489        Self::new()
490    }
491}
492
493impl RuntimeBuilder {
494    /// Create the builder with default config.
495    pub fn new() -> Self {
496        Self {
497            proactor_builder: ProactorBuilder::new(),
498            event_interval: 61,
499            sync_queue_size: 64,
500            local_queue_size: 64,
501            thread_affinity: HashSet::new(),
502        }
503    }
504
505    /// Replace proactor builder.
506    pub fn with_proactor(&mut self, builder: ProactorBuilder) -> &mut Self {
507        self.proactor_builder = builder;
508        self
509    }
510
511    /// Sets the thread affinity for the runtime.
512    pub fn thread_affinity(&mut self, cpus: HashSet<usize>) -> &mut Self {
513        self.thread_affinity = cpus;
514        self
515    }
516
517    /// Sets the number of scheduler ticks after which the scheduler will poll
518    /// for external events (timers, I/O, and so on).
519    ///
520    /// A scheduler “tick” roughly corresponds to one poll invocation on a task.
521    pub fn event_interval(&mut self, val: usize) -> &mut Self {
522        self.event_interval = val as _;
523        self
524    }
525
526    /// The size of the sync queue, which is used to wake up tasks from other
527    /// threads (remote).
528    ///
529    /// This is fixed and will create backpressure in other remote threads when
530    /// full.
531    pub fn sync_queue_size(&mut self, val: usize) -> &mut Self {
532        self.sync_queue_size = val;
533        self
534    }
535
536    /// The size of the local queues, which is used to wake up tasks within the
537    /// same thread.
538    ///
539    /// This is dynamically resized to avoid blocking.
540    pub fn local_queue_size(&mut self, val: usize) -> &mut Self {
541        self.local_queue_size = val;
542        self
543    }
544
545    /// Build [`Runtime`].
546    pub fn build(&self) -> io::Result<Runtime> {
547        let RuntimeBuilder {
548            proactor_builder,
549            thread_affinity,
550            sync_queue_size,
551            local_queue_size,
552            event_interval,
553        } = self;
554
555        if !thread_affinity.is_empty() {
556            bind_to_cpu_set(thread_affinity);
557        }
558        let driver = proactor_builder.build()?;
559        let executor = Executor::with_config(ExecutorConfig {
560            max_interval: *event_interval,
561            sync_queue_size: *sync_queue_size,
562            local_queue_size: *local_queue_size,
563            waker: Some(driver.waker()),
564        });
565        let inner = RuntimeInner {
566            executor,
567            driver: RefCell::new(driver),
568            #[cfg(feature = "time")]
569            timer_runtime: RefCell::new(TimerRuntime::new()),
570        };
571        Ok(Runtime(Rc::new(inner)))
572    }
573}
574
575/// Spawns a new asynchronous task, returning a [`JoinHandle`] for it.
576///
577/// Spawning a task enables the task to execute concurrently to other tasks.
578/// There is no guarantee that a spawned task will execute to completion.
579///
580/// ```
581/// # compio_runtime::Runtime::new().unwrap().block_on(async {
582/// use compio_runtime::ResumeUnwind;
583///
584/// let task = compio_runtime::spawn(async {
585///     println!("Hello from a spawned task!");
586///     42
587/// });
588///
589/// assert_eq!(
590///     task.await.resume_unwind().expect("shouldn't be cancelled"),
591///     42
592/// );
593/// # })
594/// ```
595///
596/// ## Panics
597///
598/// This method doesn't create runtime. It tries to obtain the current runtime
599/// by [`Runtime::with_current`].
600pub fn spawn<F: Future + 'static>(future: F) -> JoinHandle<F::Output> {
601    Runtime::with_current(|r| r.spawn(future))
602}
603
604/// Spawns a blocking task in a new thread, and wait for it.
605///
606/// The task will not be cancelled even if the future is dropped.
607///
608/// ## Panics
609///
610/// This method doesn't create runtime. It tries to obtain the current runtime
611/// by [`Runtime::with_current`].
612pub fn spawn_blocking<T: Send + 'static>(
613    f: impl (FnOnce() -> T) + Send + 'static,
614) -> JoinHandle<T> {
615    Runtime::with_current(|r| r.spawn_blocking(f))
616}
617
618/// Submit an operation to the current runtime, and return a future for it.
619///
620/// ## Panics
621///
622/// This method doesn't create runtime and will panic if it's not within a
623/// runtime. It tries to obtain the current runtime with
624/// [`Runtime::with_current`].
625pub fn submit<T: OpCode + 'static>(op: T) -> Submit<T> {
626    Runtime::with_current(|r| r.submit(op))
627}
628
629/// Submit a multishot operation to the current runtime, and return a stream for
630/// it.
631///
632/// ## Panics
633///
634/// This method doesn't create runtime and will panic if it's not within a
635/// runtime. It tries to obtain the current runtime with
636/// [`Runtime::with_current`].
637pub fn submit_multi<T: OpCode + 'static>(op: T) -> SubmitMulti<T> {
638    Runtime::with_current(|r| r.submit_multi(op))
639}
640
641/// Register file descriptors for fixed-file operations with the current
642/// runtime's io_uring instance.
643///
644/// This only works on `io_uring` driver. It will return an [`Unsupported`]
645/// error on other drivers.
646///
647/// ## Panics
648///
649/// This method doesn't create runtime. It tries to obtain the current runtime
650/// by [`Runtime::with_current`].
651///
652/// [`Unsupported`]: std::io::ErrorKind::Unsupported
653pub fn register_files(fds: &[RawFd]) -> io::Result<()> {
654    Runtime::with_current(|r| r.register_files(fds))
655}
656
657/// Unregister previously registered file descriptors from the current
658/// runtime's io_uring instance.
659///
660/// This only works on `io_uring` driver. It will return an [`Unsupported`]
661/// error on other drivers.
662///
663/// ## Panics
664///
665/// This method doesn't create runtime. It tries to obtain the current runtime
666/// by [`Runtime::with_current`].
667///
668/// [`Unsupported`]: std::io::ErrorKind::Unsupported
669pub fn unregister_files() -> io::Result<()> {
670    Runtime::with_current(|r| r.unregister_files())
671}
672
673#[cfg(feature = "time")]
674pub(crate) async fn create_timer(instant: std::time::Instant) {
675    let key = Runtime::with_current(|r| r.timer_runtime.borrow_mut().insert(instant));
676    if let Some(key) = key {
677        TimerFuture::new(key).await
678    }
679}