1#![cfg_attr(docsrs, feature(doc_cfg))]
4#![allow(unused_features)]
5#![warn(missing_docs)]
6#![deny(rustdoc::broken_intra_doc_links)]
7#![doc(
8 html_logo_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
9)]
10#![doc(
11 html_favicon_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
12)]
13
14use std::{any::Any, fmt::Debug, ptr::NonNull, task::Waker};
15
16use crate::queue::{TaskId, TaskQueue};
17
18mod join_handle;
19mod queue;
20mod task;
21mod util;
22mod waker;
23
24use compio_log::{instrument, trace};
25use compio_send_wrapper::SendWrapper;
26use crossbeam_queue::ArrayQueue;
27pub use join_handle::{JoinError, JoinHandle, ResumeUnwind};
28use util::panic_guard;
29
30cfg_select! {
31 loom => {
32 use loom::cell::UnsafeCell;
33 use loom::hint;
34 use loom::thread::yield_now;
35 use loom::sync::atomic::*;
36 }
37 _ => {
38 use std::hint;
39 use std::thread::yield_now;
40 use std::sync::atomic::*;
41
42 #[repr(transparent)]
43 struct UnsafeCell<T>(std::cell::UnsafeCell<T>);
44
45 impl<T> UnsafeCell<T> {
46 pub fn new(value: T) -> Self {
47 Self(std::cell::UnsafeCell::new(value))
48 }
49
50 #[inline(always)]
51 pub fn with_mut<F, R>(&self, f: F) -> R
52 where
53 F: FnOnce(*mut T) -> R,
54 {
55 f(self.0.get())
56 }
57
58 #[inline(always)]
59 pub fn with<F, R>(&self, f: F) -> R
60 where
61 F: FnOnce(*const T) -> R,
62 {
63 f(self.0.get())
64 }
65 }
66 }
67}
68
69pub(crate) type PanicResult<T> = Result<T, Panic>;
70pub(crate) type Panic = Box<dyn Any + Send + 'static>;
71
72#[derive(Debug)]
87pub struct Executor {
88 ptr: NonNull<Shared>,
89 config: ExecutorConfig,
90}
91
92#[derive(Debug, Clone)]
94pub struct ExecutorConfig {
95 pub sync_queue_size: usize,
100
101 pub local_queue_size: usize,
106
107 pub max_interval: u32,
109
110 pub waker: Option<Waker>,
115}
116
117impl Default for ExecutorConfig {
118 fn default() -> Self {
119 Self {
120 sync_queue_size: 64,
121 local_queue_size: 64,
122 max_interval: 61,
123 waker: None,
124 }
125 }
126}
127
128pub(crate) struct Shared {
129 waker: Option<Waker>,
130 sync: ArrayQueue<TaskId>,
131 queue: SendWrapper<TaskQueue>,
132}
133
134impl Executor {
135 pub fn new() -> Self {
137 Self::with_config(ExecutorConfig::default())
138 }
139
140 pub fn with_config(mut config: ExecutorConfig) -> Self {
142 let ptr = Box::into_raw(Box::new(Shared {
143 waker: config.waker.take(),
144 sync: ArrayQueue::new(config.sync_queue_size),
145 queue: SendWrapper::new(TaskQueue::new(config.local_queue_size)),
146 }));
147
148 Self {
149 config,
150 ptr: unsafe { NonNull::new_unchecked(ptr) },
151 }
152 }
153
154 pub fn spawn<F: Future + 'static>(&self, fut: F) -> JoinHandle<F::Output> {
156 let shared = self.shared();
157 let tracker = shared.queue.tracker();
158 let queue = unsafe { shared.queue.get_unchecked() };
160 let task = queue.insert(self.ptr, tracker, fut);
161
162 JoinHandle::new(task)
163 }
164
165 pub fn tick(&self) -> bool {
175 let queue = self.queue();
176
177 while let Some(id) = self.shared().sync.pop() {
178 queue.make_hot(id);
179 }
180
181 for id in queue.iter_hot().take(self.config.max_interval as _) {
182 queue.make_cold(id);
183 let task = queue.take(id).expect("Task was not reset back");
184 let res = unsafe { task.run() };
185 if res.is_ready() {
186 unsafe { task.drop() };
190 queue.remove(id);
191 } else {
192 queue.reset(id, task);
193 }
194 }
195
196 queue.has_hot()
197 }
198
199 #[doc(hidden)]
201 pub fn has_task(&self) -> bool {
202 self.queue().hot_head().is_some()
203 }
204
205 pub fn clear(&self) {
212 instrument!(compio_log::Level::TRACE, "Executor::drop");
213 trace!("Dropping Executor");
214
215 while self.shared().sync.pop().is_some() {}
216 unsafe { self.queue().clear() };
217 }
218
219 #[inline(always)]
220 fn shared(&self) -> &Shared {
221 unsafe { self.ptr.as_ref() }
222 }
223
224 #[inline(always)]
225 fn queue(&self) -> &TaskQueue {
226 unsafe { self.shared().queue.get_unchecked() }
228 }
229}
230
231impl Drop for Executor {
232 fn drop(&mut self) {
233 self.clear();
234 unsafe { drop(Box::from_raw(self.ptr.as_ptr())) };
235 }
236}
237
238impl Default for Executor {
239 fn default() -> Self {
240 Self::new()
241 }
242}