1#![cfg_attr(docsrs, feature(doc_cfg))]
12#![cfg_attr(feature = "current_thread_id", feature(current_thread_id))]
13#![allow(unused_features)]
14#![warn(missing_docs)]
15#![deny(rustdoc::broken_intra_doc_links)]
16#![doc(
17 html_logo_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
18)]
19#![doc(
20 html_favicon_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
21)]
22
23mod affinity;
24mod attacher;
25mod cancel;
26mod future;
27mod waker;
28
29pub mod fd;
30
31#[cfg(feature = "time")]
32pub mod time;
33
34use std::{
35 cell::RefCell,
36 collections::HashSet,
37 fmt::Debug,
38 future::Future,
39 io,
40 ops::Deref,
41 rc::Rc,
42 task::{Context, Poll, Waker},
43 time::Duration,
44};
45
46use compio_buf::{BufResult, IntoInner};
47use compio_driver::{
48 AsRawFd, Cancel, DriverType, Extra, Key, OpCode, Proactor, ProactorBuilder, PushEntry, RawFd,
49 op::Asyncify,
50};
51pub use compio_driver::{BufferPool, ErrorExt};
52use compio_executor::{Executor, ExecutorConfig};
53pub use compio_executor::{JoinHandle, ResumeUnwind};
54use compio_log::{debug, instrument};
55
56use crate::affinity::bind_to_cpu_set;
57#[cfg(feature = "time")]
58use crate::time::{TimerFuture, TimerKey, TimerRuntime};
59pub use crate::{attacher::*, cancel::CancelToken, future::*};
60
61scoped_tls::scoped_thread_local!(static CURRENT_RUNTIME: Runtime);
62
63#[cold]
64fn not_in_compio_runtime() -> ! {
65 panic!("not in a compio runtime")
66}
67
68pub struct RuntimeInner {
70 executor: Executor,
71 driver: RefCell<Proactor>,
72 #[cfg(feature = "time")]
73 timer_runtime: RefCell<TimerRuntime>,
74}
75
76#[derive(Clone)]
80pub struct Runtime(Rc<RuntimeInner>);
81
82impl Debug for Runtime {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 let mut s = f.debug_struct("Runtime");
85 s.field("executor", &self.0.executor)
86 .field("driver", &"...")
87 .field("scheduler", &"...");
88 #[cfg(feature = "time")]
89 s.field("timer_runtime", &"...");
90 s.finish()
91 }
92}
93
94impl Deref for Runtime {
95 type Target = RuntimeInner;
96
97 fn deref(&self) -> &Self::Target {
98 &self.0
99 }
100}
101
102impl Runtime {
103 pub fn new() -> io::Result<Self> {
105 Self::builder().build()
106 }
107
108 pub fn builder() -> RuntimeBuilder {
110 RuntimeBuilder::new()
111 }
112
113 pub fn driver_type(&self) -> DriverType {
115 self.driver.borrow().driver_type()
116 }
117
118 pub fn try_with_current<T, F: FnOnce(&Self) -> T>(f: F) -> Result<T, F> {
121 if CURRENT_RUNTIME.is_set() {
122 Ok(CURRENT_RUNTIME.with(f))
123 } else {
124 Err(f)
125 }
126 }
127
128 pub fn with_current<T, F: FnOnce(&Self) -> T>(f: F) -> T {
134 if CURRENT_RUNTIME.is_set() {
135 CURRENT_RUNTIME.with(f)
136 } else {
137 not_in_compio_runtime()
138 }
139 }
140
141 pub fn try_current() -> Option<Self> {
144 if CURRENT_RUNTIME.is_set() {
145 Some(CURRENT_RUNTIME.with(|r| r.clone()))
146 } else {
147 None
148 }
149 }
150
151 pub fn current() -> Self {
157 if CURRENT_RUNTIME.is_set() {
158 CURRENT_RUNTIME.with(|r| r.clone())
159 } else {
160 not_in_compio_runtime()
161 }
162 }
163
164 pub fn enter<T, F: FnOnce() -> T>(&self, f: F) -> T {
167 CURRENT_RUNTIME.set(self, f)
168 }
169
170 pub fn run(&self) -> bool {
176 self.executor.tick()
177 }
178
179 pub fn waker(&self) -> Waker {
183 self.driver.borrow().waker()
184 }
185
186 pub fn block_on<F: Future>(&self, future: F) -> F::Output {
188 self.enter(|| {
189 let waker = self.waker();
190 let mut context = Context::from_waker(&waker);
191 let mut future = std::pin::pin!(future);
192 loop {
193 if let Poll::Ready(result) = future.as_mut().poll(&mut context) {
194 self.run();
195 return result;
196 }
197 let remaining_tasks = self.run();
198 if remaining_tasks {
199 self.poll_with(Some(Duration::ZERO));
200 } else {
201 self.poll();
202 }
203 }
204 })
205 }
206
207 pub fn spawn<F: Future + 'static>(&self, future: F) -> JoinHandle<F::Output> {
212 self.0.executor.spawn(future)
213 }
214
215 pub fn spawn_blocking<T: Send + 'static>(
219 &self,
220 f: impl (FnOnce() -> T) + Send + 'static,
221 ) -> JoinHandle<T> {
222 use futures_util::FutureExt;
223
224 let op = Asyncify::new(move || {
225 let res = f();
228 BufResult(Ok(0), res)
229 });
230 let submit = self.submit(op);
231 self.spawn(submit.map(|res| res.1.into_inner()))
232 }
233
234 pub fn attach(&self, fd: RawFd) -> io::Result<()> {
239 self.driver.borrow_mut().attach(fd)
240 }
241
242 fn submit_raw<T: OpCode + 'static>(
243 &self,
244 op: T,
245 extra: Option<Extra>,
246 ) -> PushEntry<Key<T>, BufResult<usize, T>> {
247 let mut this = self.driver.borrow_mut();
248 match extra {
249 Some(e) => this.push_with_extra(op, e),
250 None => this.push(op),
251 }
252 }
253
254 fn default_extra(&self) -> Extra {
255 self.driver.borrow().default_extra()
256 }
257
258 pub fn submit<T: OpCode + 'static>(&self, op: T) -> Submit<T> {
262 Submit::new(self.clone(), op)
263 }
264
265 pub fn submit_multi<T: OpCode + 'static>(&self, op: T) -> SubmitMulti<T> {
269 SubmitMulti::new(self.clone(), op)
270 }
271
272 pub fn flush(&self) -> bool {
276 self.driver.borrow_mut().flush()
277 }
278
279 pub(crate) fn cancel<T: OpCode>(&self, key: Key<T>) {
280 self.driver.borrow_mut().cancel(key);
281 }
282
283 pub(crate) fn register_cancel<T: OpCode>(&self, key: &Key<T>) -> Cancel {
284 self.driver.borrow_mut().register_cancel(key)
285 }
286
287 pub(crate) fn cancel_token(&self, token: Cancel) -> bool {
288 self.driver.borrow_mut().cancel_token(token)
289 }
290
291 #[cfg(feature = "time")]
292 pub(crate) fn cancel_timer(&self, key: &TimerKey) {
293 self.timer_runtime.borrow_mut().cancel(key);
294 }
295
296 pub(crate) fn poll_task<T: OpCode>(
297 &self,
298 waker: &Waker,
299 key: Key<T>,
300 ) -> PushEntry<Key<T>, BufResult<usize, T>> {
301 instrument!(compio_log::Level::DEBUG, "poll_task", ?key);
302 let mut driver = self.driver.borrow_mut();
303 driver.pop(key).map_pending(|k| {
304 driver.update_waker(&k, waker);
305 k
306 })
307 }
308
309 pub(crate) fn poll_task_with_extra<T: OpCode>(
310 &self,
311 waker: &Waker,
312 key: Key<T>,
313 ) -> PushEntry<Key<T>, (BufResult<usize, T>, Extra)> {
314 instrument!(compio_log::Level::DEBUG, "poll_task_with_extra", ?key);
315 let mut driver = self.driver.borrow_mut();
316 driver.pop_with_extra(key).map_pending(|k| {
317 driver.update_waker(&k, waker);
318 k
319 })
320 }
321
322 pub(crate) fn poll_multishot<T: OpCode>(
323 &self,
324 waker: &Waker,
325 key: &Key<T>,
326 ) -> Option<BufResult<usize, Extra>> {
327 instrument!(compio_log::Level::DEBUG, "poll_multishot", ?key);
328 let mut driver = self.driver.borrow_mut();
329 if let Some(res) = driver.pop_multishot(key) {
330 return Some(res);
331 }
332 driver.update_waker(key, waker);
333 None
334 }
335
336 #[cfg(feature = "time")]
337 pub(crate) fn poll_timer(&self, cx: &mut Context, key: &TimerKey) -> Poll<()> {
338 instrument!(compio_log::Level::DEBUG, "poll_timer", ?cx, ?key);
339 let mut timer_runtime = self.timer_runtime.borrow_mut();
340 if timer_runtime.is_completed(key) {
341 debug!("ready");
342 Poll::Ready(())
343 } else {
344 debug!("pending");
345 timer_runtime.update_waker(key, cx.waker());
346 Poll::Pending
347 }
348 }
349
350 pub fn current_timeout(&self) -> Option<Duration> {
354 #[cfg(not(feature = "time"))]
355 let timeout = None;
356 #[cfg(feature = "time")]
357 let timeout = self.timer_runtime.borrow().min_timeout();
358 timeout
359 }
360
361 pub fn poll(&self) {
366 instrument!(compio_log::Level::DEBUG, "poll");
367 let timeout = self.current_timeout();
368 debug!("timeout: {:?}", timeout);
369 self.poll_with(timeout)
370 }
371
372 pub fn poll_with(&self, timeout: Option<Duration>) {
376 instrument!(compio_log::Level::DEBUG, "poll_with");
377
378 let mut driver = self.driver.borrow_mut();
379 match driver.poll(timeout) {
380 Ok(()) => {}
381 Err(e) => match e.kind() {
382 io::ErrorKind::TimedOut | io::ErrorKind::Interrupted => {
383 debug!("expected error: {e}");
384 }
385 _ => panic!("{e:?}"),
386 },
387 }
388 #[cfg(feature = "time")]
389 self.timer_runtime.borrow_mut().wake();
390 }
391
392 pub fn buffer_pool(&self) -> io::Result<BufferPool> {
397 self.driver.borrow_mut().buffer_pool()
398 }
399
400 pub fn register_files(&self, fds: &[RawFd]) -> io::Result<()> {
407 self.driver.borrow_mut().register_files(fds)
408 }
409
410 pub fn unregister_files(&self) -> io::Result<()> {
417 self.driver.borrow_mut().unregister_files()
418 }
419
420 pub fn register_personality(&self) -> io::Result<u16> {
430 self.driver.borrow_mut().register_personality()
431 }
432
433 pub fn unregister_personality(&self, personality: u16) -> io::Result<()> {
440 self.driver.borrow_mut().unregister_personality(personality)
441 }
442}
443
444impl Drop for Runtime {
445 fn drop(&mut self) {
446 if Rc::strong_count(&self.0) > 1 {
448 return;
449 }
450
451 self.enter(|| {
452 self.executor.clear();
453 })
454 }
455}
456
457impl AsRawFd for Runtime {
458 fn as_raw_fd(&self) -> RawFd {
459 self.driver.borrow().as_raw_fd()
460 }
461}
462
463#[cfg(feature = "criterion")]
464impl criterion::async_executor::AsyncExecutor for Runtime {
465 fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
466 self.block_on(future)
467 }
468}
469
470#[cfg(feature = "criterion")]
471impl criterion::async_executor::AsyncExecutor for &Runtime {
472 fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
473 (**self).block_on(future)
474 }
475}
476
477#[derive(Debug, Clone)]
479pub struct RuntimeBuilder {
480 proactor_builder: ProactorBuilder,
481 thread_affinity: HashSet<usize>,
482 sync_queue_size: usize,
483 local_queue_size: usize,
484 event_interval: u32,
485}
486
487impl Default for RuntimeBuilder {
488 fn default() -> Self {
489 Self::new()
490 }
491}
492
493impl RuntimeBuilder {
494 pub fn new() -> Self {
496 Self {
497 proactor_builder: ProactorBuilder::new(),
498 event_interval: 61,
499 sync_queue_size: 64,
500 local_queue_size: 64,
501 thread_affinity: HashSet::new(),
502 }
503 }
504
505 pub fn with_proactor(&mut self, builder: ProactorBuilder) -> &mut Self {
507 self.proactor_builder = builder;
508 self
509 }
510
511 pub fn thread_affinity(&mut self, cpus: HashSet<usize>) -> &mut Self {
513 self.thread_affinity = cpus;
514 self
515 }
516
517 pub fn event_interval(&mut self, val: usize) -> &mut Self {
522 self.event_interval = val as _;
523 self
524 }
525
526 pub fn sync_queue_size(&mut self, val: usize) -> &mut Self {
532 self.sync_queue_size = val;
533 self
534 }
535
536 pub fn local_queue_size(&mut self, val: usize) -> &mut Self {
541 self.local_queue_size = val;
542 self
543 }
544
545 pub fn build(&self) -> io::Result<Runtime> {
547 let RuntimeBuilder {
548 proactor_builder,
549 thread_affinity,
550 sync_queue_size,
551 local_queue_size,
552 event_interval,
553 } = self;
554
555 if !thread_affinity.is_empty() {
556 bind_to_cpu_set(thread_affinity);
557 }
558 let driver = proactor_builder.build()?;
559 let executor = Executor::with_config(ExecutorConfig {
560 max_interval: *event_interval,
561 sync_queue_size: *sync_queue_size,
562 local_queue_size: *local_queue_size,
563 waker: Some(driver.waker()),
564 });
565 let inner = RuntimeInner {
566 executor,
567 driver: RefCell::new(driver),
568 #[cfg(feature = "time")]
569 timer_runtime: RefCell::new(TimerRuntime::new()),
570 };
571 Ok(Runtime(Rc::new(inner)))
572 }
573}
574
575pub fn spawn<F: Future + 'static>(future: F) -> JoinHandle<F::Output> {
601 Runtime::with_current(|r| r.spawn(future))
602}
603
604pub fn spawn_blocking<T: Send + 'static>(
613 f: impl (FnOnce() -> T) + Send + 'static,
614) -> JoinHandle<T> {
615 Runtime::with_current(|r| r.spawn_blocking(f))
616}
617
618pub fn submit<T: OpCode + 'static>(op: T) -> Submit<T> {
626 Runtime::with_current(|r| r.submit(op))
627}
628
629pub fn submit_multi<T: OpCode + 'static>(op: T) -> SubmitMulti<T> {
638 Runtime::with_current(|r| r.submit_multi(op))
639}
640
641pub fn register_files(fds: &[RawFd]) -> io::Result<()> {
654 Runtime::with_current(|r| r.register_files(fds))
655}
656
657pub fn unregister_files() -> io::Result<()> {
670 Runtime::with_current(|r| r.unregister_files())
671}
672
673#[cfg(feature = "time")]
674pub(crate) async fn create_timer(instant: std::time::Instant) {
675 let key = Runtime::with_current(|r| r.timer_runtime.borrow_mut().insert(instant));
676 if let Some(key) = key {
677 TimerFuture::new(key).await
678 }
679}