Skip to main content

compio_executor/
lib.rs

1//! Executor for compio runtime.
2
3#![cfg_attr(docsrs, feature(doc_cfg))]
4#![allow(unused_features)]
5#![warn(missing_docs)]
6#![deny(rustdoc::broken_intra_doc_links)]
7#![doc(
8    html_logo_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
9)]
10#![doc(
11    html_favicon_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
12)]
13
14use std::{any::Any, fmt::Debug, ptr::NonNull, task::Waker};
15
16use crate::queue::{TaskId, TaskQueue};
17
18mod join_handle;
19mod queue;
20mod task;
21mod util;
22mod waker;
23
24use compio_log::{instrument, trace};
25use compio_send_wrapper::SendWrapper;
26use crossbeam_queue::ArrayQueue;
27pub use join_handle::{JoinError, JoinHandle, ResumeUnwind};
28use util::panic_guard;
29
30cfg_select! {
31    loom => {
32        use loom::cell::UnsafeCell;
33        use loom::hint;
34        use loom::thread::yield_now;
35        use loom::sync::atomic::*;
36    }
37    _ => {
38        use std::hint;
39        use std::thread::yield_now;
40        use std::sync::atomic::*;
41
42        #[repr(transparent)]
43        struct UnsafeCell<T>(std::cell::UnsafeCell<T>);
44
45        impl<T> UnsafeCell<T> {
46            pub fn new(value: T) -> Self {
47                Self(std::cell::UnsafeCell::new(value))
48            }
49
50            #[inline(always)]
51            pub fn with_mut<F, R>(&self, f: F) -> R
52            where
53                F: FnOnce(*mut T) -> R,
54            {
55                f(self.0.get())
56            }
57
58            #[inline(always)]
59            pub fn with<F, R>(&self, f: F) -> R
60            where
61                F: FnOnce(*const T) -> R,
62            {
63                f(self.0.get())
64            }
65        }
66    }
67}
68
69pub(crate) type PanicResult<T> = Result<T, Panic>;
70pub(crate) type Panic = Box<dyn Any + Send + 'static>;
71
72/// A dual-queue executor optimized for singlethreaded usecase, with support for
73/// multithreaded wakes.
74///
75/// Same-thread wakes ([`Waker::wake`]) will schedule tasks within the queue
76/// directly; cross-thread wakes will send task id's to a channel, and
77/// piggybacked to singlethreaded wakes or ticks. This ensures maximum
78/// performance for singlethreaded scenario at the trade-off of worse tail
79/// latency for multithreaded wake-ups.
80///
81/// Optionally, all [`Waker`]s generated from this executor can contain an extra
82/// data, parameterized as `E`.
83///
84/// [`Waker`]: std::task::Waker
85/// [`Waker::wake`]: std::task::Waker::wake
86#[derive(Debug)]
87pub struct Executor {
88    ptr: NonNull<Shared>,
89    config: ExecutorConfig,
90}
91
92/// Configuration for [`Executor`].
93#[derive(Debug, Clone)]
94pub struct ExecutorConfig {
95    /// The size of the sync queue, which holds task id's for cross-thread
96    /// wakes.
97    ///
98    /// This is fixed and will create backpressure when full.
99    pub sync_queue_size: usize,
100
101    /// The size of the local queues, which hold tasks for same-thread
102    /// execution.
103    ///
104    /// This is dynamically resized to avoid blocking.
105    pub local_queue_size: usize,
106
107    /// The maximum number of hot tasks to run in each tick.
108    pub max_interval: u32,
109
110    /// A waker to be woken when a task is scheduled.
111    ///
112    /// This is useful for waking up drivers that switch to kernel state when
113    /// idle.
114    pub waker: Option<Waker>,
115}
116
117impl Default for ExecutorConfig {
118    fn default() -> Self {
119        Self {
120            sync_queue_size: 64,
121            local_queue_size: 64,
122            max_interval: 61,
123            waker: None,
124        }
125    }
126}
127
128pub(crate) struct Shared {
129    waker: Option<Waker>,
130    sync: ArrayQueue<TaskId>,
131    queue: SendWrapper<TaskQueue>,
132}
133
134impl Executor {
135    /// Create a new executor.
136    pub fn new() -> Self {
137        Self::with_config(ExecutorConfig::default())
138    }
139
140    /// Create a new executor with config.
141    pub fn with_config(mut config: ExecutorConfig) -> Self {
142        let ptr = Box::into_raw(Box::new(Shared {
143            waker: config.waker.take(),
144            sync: ArrayQueue::new(config.sync_queue_size),
145            queue: SendWrapper::new(TaskQueue::new(config.local_queue_size)),
146        }));
147
148        Self {
149            config,
150            ptr: unsafe { NonNull::new_unchecked(ptr) },
151        }
152    }
153
154    /// Spawn a future onto the executor.
155    pub fn spawn<F: Future + 'static>(&self, fut: F) -> JoinHandle<F::Output> {
156        let shared = self.shared();
157        let tracker = shared.queue.tracker();
158        // SAFETY: Executor cannot be sent to ther thread
159        let queue = unsafe { shared.queue.get_unchecked() };
160        let task = queue.insert(self.ptr, tracker, fut);
161
162        JoinHandle::new(task)
163    }
164
165    /// Retrieve all sync tasks, schedule those to the tail of `hot` queue
166    /// and run at most [`max_interval`] tasks.
167    ///
168    /// Running start with `hot` tasks, then `cold` ones. Finished tasks will
169    /// be pushed back to tail of `cold` queue.
170    ///
171    /// Return whether there are still hot tasks after the tick.
172    ///
173    /// [`max_interval`]: ExecutorConfig::max_interval
174    pub fn tick(&self) -> bool {
175        let queue = self.queue();
176
177        while let Some(id) = self.shared().sync.pop() {
178            queue.make_hot(id);
179        }
180
181        for id in queue.iter_hot().take(self.config.max_interval as _) {
182            queue.make_cold(id);
183            let task = queue.take(id).expect("Task was not reset back");
184            let res = unsafe { task.run() };
185            if res.is_ready() {
186                // SAFETY: We're removing it soon, so drop will only be called once.
187                // The shared pointer is kept valid until the Executor is dropped,
188                // to avoid use-after-free issues with concurrent wakers.
189                unsafe { task.drop() };
190                queue.remove(id);
191            } else {
192                queue.reset(id, task);
193            }
194        }
195
196        queue.has_hot()
197    }
198
199    /// Check if there's still scheduled task that needs to be ran.
200    #[doc(hidden)]
201    pub fn has_task(&self) -> bool {
202        self.queue().hot_head().is_some()
203    }
204
205    /// Clear the executor, drop all tasks.
206    ///
207    /// This should be called only in context of the runtime, if any future may
208    /// use it. Any panic happened during dropping the future will cause the
209    /// process to abort. If this was not called before dropping, all tasks will
210    /// be leakded.
211    pub fn clear(&self) {
212        instrument!(compio_log::Level::TRACE, "Executor::drop");
213        trace!("Dropping Executor");
214
215        while self.shared().sync.pop().is_some() {}
216        unsafe { self.queue().clear() };
217    }
218
219    #[inline(always)]
220    fn shared(&self) -> &Shared {
221        unsafe { self.ptr.as_ref() }
222    }
223
224    #[inline(always)]
225    fn queue(&self) -> &TaskQueue {
226        // SAFETY: Executor is single threaded
227        unsafe { self.shared().queue.get_unchecked() }
228    }
229}
230
231impl Drop for Executor {
232    fn drop(&mut self) {
233        self.clear();
234        unsafe { drop(Box::from_raw(self.ptr.as_ptr())) };
235    }
236}
237
238impl Default for Executor {
239    fn default() -> Self {
240        Self::new()
241    }
242}