1#![cfg_attr(docsrs, feature(doc_cfg))]
6#![cfg_attr(feature = "once_cell_try", feature(once_cell_try))]
7#![allow(unused_features)]
8#![warn(missing_docs)]
9#![deny(rustdoc::broken_intra_doc_links)]
10#![doc(
11 html_logo_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
12)]
13#![doc(
14 html_favicon_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
15)]
16
17use std::{
18 io,
19 task::{Poll, Waker},
20 time::Duration,
21};
22
23use compio_buf::BufResult;
24use compio_log::instrument;
25
26mod macros;
27
28mod key;
29pub use key::Key;
30
31mod asyncify;
32pub use asyncify::*;
33
34pub mod op;
35
36mod fd;
37pub use fd::*;
38
39mod driver_type;
40pub use driver_type::*;
41
42mod buffer_pool;
43pub use buffer_pool::*;
44
45mod sys;
46pub use sys::{Extra, *};
47
48mod cancel;
49pub use cancel::*;
50
51use crate::{key::ErasedKey, op::OpCodeFlag};
52
53mod sys_slice;
54
55#[derive(Debug)]
57pub enum PushEntry<K, R> {
58 Pending(K),
60 Ready(R),
62}
63
64impl<K, R> PushEntry<K, R> {
65 pub const fn is_ready(&self) -> bool {
67 matches!(self, Self::Ready(_))
68 }
69
70 pub fn take_ready(self) -> Option<R> {
72 match self {
73 Self::Pending(_) => None,
74 Self::Ready(res) => Some(res),
75 }
76 }
77
78 pub fn map_pending<L>(self, f: impl FnOnce(K) -> L) -> PushEntry<L, R> {
80 match self {
81 Self::Pending(k) => PushEntry::Pending(f(k)),
82 Self::Ready(r) => PushEntry::Ready(r),
83 }
84 }
85
86 pub fn map_ready<S>(self, f: impl FnOnce(R) -> S) -> PushEntry<K, S> {
88 match self {
89 Self::Pending(k) => PushEntry::Pending(k),
90 Self::Ready(r) => PushEntry::Ready(f(r)),
91 }
92 }
93}
94
95pub struct Proactor {
98 driver: Driver,
99 cancel: CancelRegistry,
100}
101
102assert_not_impl!(Proactor, Send);
103assert_not_impl!(Proactor, Sync);
104
105impl Proactor {
106 pub fn new() -> io::Result<Self> {
108 Self::builder().build()
109 }
110
111 pub fn builder() -> ProactorBuilder {
113 ProactorBuilder::new()
114 }
115
116 fn with_builder(builder: &ProactorBuilder) -> io::Result<Self> {
117 Ok(Self {
118 driver: Driver::new(builder)?,
119 cancel: CancelRegistry::new(),
120 })
121 }
122
123 pub fn default_extra(&self) -> Extra {
125 sys::default_extra(&self.driver)
126 }
127
128 pub fn driver_type(&self) -> DriverType {
130 self.driver.driver_type()
131 }
132
133 pub fn attach(&mut self, fd: RawFd) -> io::Result<()> {
141 self.driver.attach(fd)
142 }
143
144 pub fn cancel<T: OpCode>(&mut self, key: Key<T>) -> Option<BufResult<usize, T>> {
151 instrument!(compio_log::Level::DEBUG, "cancel", ?key);
152 if key.set_cancelled() {
153 return None;
154 }
155 self.cancel.remove(&key);
156 if key.is_unique() && key.has_result() {
157 Some(key.take_result())
158 } else {
159 self.driver.cancel(key.erase());
160 None
161 }
162 }
163
164 pub fn cancel_token(&mut self, token: Cancel) -> bool {
172 let Some(key) = self.cancel.take(token) else {
173 return false;
174 };
175 if key.set_cancelled() || key.has_result() {
176 return false;
177 }
178 self.driver.cancel(key);
179 true
180 }
181
182 pub fn register_cancel<T: OpCode>(&mut self, key: &Key<T>) -> Cancel {
191 self.cancel.register(key)
192 }
193
194 pub fn push<T: sys::OpCode + 'static>(
197 &mut self,
198 op: T,
199 ) -> PushEntry<Key<T>, BufResult<usize, T>> {
200 self.push_with_extra(op, self.default_extra())
201 }
202
203 pub fn push_with_extra<T: sys::OpCode + 'static>(
206 &mut self,
207 op: T,
208 extra: Extra,
209 ) -> PushEntry<Key<T>, BufResult<usize, T>> {
210 let key = Key::new(op, extra);
211 match self.driver.push(key.clone().erase()) {
212 Poll::Pending => PushEntry::Pending(key),
213 Poll::Ready(res) => {
214 key.set_result(res);
215 PushEntry::Ready(key.take_result())
216 }
217 }
218 }
219
220 pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
224 self.driver.poll(timeout)
225 }
226
227 pub fn pop<T>(&mut self, key: Key<T>) -> PushEntry<Key<T>, BufResult<usize, T>> {
233 instrument!(compio_log::Level::DEBUG, "pop", ?key);
234 if key.has_result() {
235 self.cancel.remove(&key);
236 PushEntry::Ready(key.take_result())
237 } else {
238 PushEntry::Pending(key)
239 }
240 }
241
242 pub fn pop_with_extra<T>(
249 &mut self,
250 key: Key<T>,
251 ) -> PushEntry<Key<T>, (BufResult<usize, T>, Extra)> {
252 instrument!(compio_log::Level::DEBUG, "pop", ?key);
253 if key.has_result() {
254 self.cancel.remove(&key);
255 let extra = key.swap_extra(self.default_extra());
256 let res = key.take_result();
257 PushEntry::Ready((res, extra))
258 } else {
259 PushEntry::Pending(key)
260 }
261 }
262
263 pub fn pop_multishot<T>(&mut self, key: &Key<T>) -> Option<BufResult<usize, Extra>> {
267 instrument!(compio_log::Level::DEBUG, "pop_multishot", ?key);
268 self.driver.pop_multishot(key)
269 }
270
271 pub fn update_waker<T>(&mut self, op: &Key<T>, waker: &Waker) {
273 op.set_waker(waker);
274 }
275
276 pub fn waker(&self) -> Waker {
278 self.driver.waker()
279 }
280
281 pub fn create_buffer_pool(
288 &mut self,
289 buffer_len: u16,
290 buffer_size: usize,
291 ) -> io::Result<BufferPool> {
292 self.driver.create_buffer_pool(buffer_len, buffer_size)
293 }
294
295 pub unsafe fn release_buffer_pool(&mut self, buffer_pool: BufferPool) -> io::Result<()> {
302 unsafe { self.driver.release_buffer_pool(buffer_pool) }
303 }
304
305 pub fn register_files(&self, fds: &[RawFd]) -> io::Result<()> {
312 fn unsupported(_: &[RawFd]) -> io::Error {
313 io::Error::new(
314 io::ErrorKind::Unsupported,
315 "Fixed-file registration is only supported on io-uring driver",
316 )
317 }
318
319 #[cfg(io_uring)]
320 match self.driver.as_iour() {
321 Some(iour) => iour.register_files(fds),
322 None => Err(unsupported(fds)),
323 }
324
325 #[cfg(not(io_uring))]
326 Err(unsupported(fds))
327 }
328
329 pub fn unregister_files(&self) -> io::Result<()> {
336 fn unsupported() -> io::Error {
337 io::Error::new(
338 io::ErrorKind::Unsupported,
339 "Fixed-file unregistration is only supported on io-uring driver",
340 )
341 }
342
343 #[cfg(io_uring)]
344 match self.driver.as_iour() {
345 Some(iour) => iour.unregister_files(),
346 None => Err(unsupported()),
347 }
348
349 #[cfg(not(io_uring))]
350 Err(unsupported())
351 }
352
353 pub fn register_personality(&self) -> io::Result<u16> {
365 fn unsupported() -> io::Error {
366 io::Error::new(
367 io::ErrorKind::Unsupported,
368 "Personality is only supported on io-uring driver",
369 )
370 }
371
372 #[cfg(io_uring)]
373 match self.driver.as_iour() {
374 Some(iour) => iour.register_personality(),
375 None => Err(unsupported()),
376 }
377
378 #[cfg(not(io_uring))]
379 Err(unsupported())
380 }
381
382 pub fn unregister_personality(&self, personality: u16) -> io::Result<()> {
391 fn unsupported(_: u16) -> io::Error {
392 io::Error::new(
393 io::ErrorKind::Unsupported,
394 "Personality is only supported on io-uring driver",
395 )
396 }
397
398 #[cfg(io_uring)]
399 match self.driver.as_iour() {
400 Some(iour) => iour.unregister_personality(personality),
401 None => Err(unsupported(personality)),
402 }
403
404 #[cfg(not(io_uring))]
405 Err(unsupported(personality))
406 }
407}
408
409impl AsRawFd for Proactor {
410 fn as_raw_fd(&self) -> RawFd {
411 self.driver.as_raw_fd()
412 }
413}
414
415#[derive(Debug)]
420pub(crate) struct Entry {
421 key: ErasedKey,
422 result: io::Result<usize>,
423
424 #[cfg(io_uring)]
425 flags: u32,
426}
427
428unsafe impl Send for Entry {}
429unsafe impl Sync for Entry {}
430
431impl Entry {
432 pub(crate) fn new(key: ErasedKey, result: io::Result<usize>) -> Self {
433 #[cfg(not(io_uring))]
434 {
435 Self { key, result }
436 }
437 #[cfg(io_uring)]
438 {
439 Self {
440 key,
441 result,
442 flags: 0,
443 }
444 }
445 }
446
447 #[allow(dead_code)]
448 pub fn user_data(&self) -> usize {
449 self.key.as_raw()
450 }
451
452 #[allow(dead_code)]
453 pub fn into_key(self) -> ErasedKey {
454 self.key
455 }
456
457 #[cfg(io_uring)]
458 pub fn flags(&self) -> u32 {
459 self.flags
460 }
461
462 #[cfg(io_uring)]
463 pub(crate) fn set_flags(&mut self, flags: u32) {
465 self.flags = flags;
466 }
467
468 pub fn notify(self) {
469 #[cfg(io_uring)]
470 self.key.borrow().extra_mut().set_flags(self.flags());
471 self.key.set_result(self.result);
472 }
473}
474
475#[derive(Debug, Clone)]
476enum ThreadPoolBuilder {
477 Create { limit: usize, recv_limit: Duration },
478 Reuse(AsyncifyPool),
479}
480
481impl Default for ThreadPoolBuilder {
482 fn default() -> Self {
483 Self::new()
484 }
485}
486
487impl ThreadPoolBuilder {
488 pub fn new() -> Self {
489 Self::Create {
490 limit: 256,
491 recv_limit: Duration::from_secs(60),
492 }
493 }
494
495 pub fn create_or_reuse(&self) -> AsyncifyPool {
496 match self {
497 Self::Create { limit, recv_limit } => AsyncifyPool::new(*limit, *recv_limit),
498 Self::Reuse(pool) => pool.clone(),
499 }
500 }
501}
502
503#[derive(Debug, Clone)]
505pub struct ProactorBuilder {
506 capacity: u32,
507 pool_builder: ThreadPoolBuilder,
508 sqpoll_idle: Option<Duration>,
509 coop_taskrun: bool,
510 taskrun_flag: bool,
511 eventfd: Option<RawFd>,
512 driver_type: Option<DriverType>,
513 op_flags: OpCodeFlag,
514}
515
516unsafe impl Send for ProactorBuilder {}
518unsafe impl Sync for ProactorBuilder {}
519
520impl Default for ProactorBuilder {
521 fn default() -> Self {
522 Self::new()
523 }
524}
525
526impl ProactorBuilder {
527 pub fn new() -> Self {
529 Self {
530 capacity: 1024,
531 pool_builder: ThreadPoolBuilder::new(),
532 sqpoll_idle: None,
533 coop_taskrun: false,
534 taskrun_flag: false,
535 eventfd: None,
536 driver_type: None,
537 op_flags: OpCodeFlag::empty(),
538 }
539 }
540
541 pub fn capacity(&mut self, capacity: u32) -> &mut Self {
544 self.capacity = capacity;
545 self
546 }
547
548 pub fn thread_pool_limit(&mut self, value: usize) -> &mut Self {
558 if let ThreadPoolBuilder::Create { limit, .. } = &mut self.pool_builder {
559 *limit = value;
560 }
561 self
562 }
563
564 pub fn thread_pool_recv_timeout(&mut self, timeout: Duration) -> &mut Self {
569 if let ThreadPoolBuilder::Create { recv_limit, .. } = &mut self.pool_builder {
570 *recv_limit = timeout;
571 }
572 self
573 }
574
575 pub fn reuse_thread_pool(&mut self, pool: AsyncifyPool) -> &mut Self {
577 self.pool_builder = ThreadPoolBuilder::Reuse(pool);
578 self
579 }
580
581 pub fn force_reuse_thread_pool(&mut self) -> &mut Self {
584 self.reuse_thread_pool(self.create_or_get_thread_pool());
585 self
586 }
587
588 pub fn create_or_get_thread_pool(&self) -> AsyncifyPool {
590 self.pool_builder.create_or_reuse()
591 }
592
593 pub fn sqpoll_idle(&mut self, idle: Duration) -> &mut Self {
603 self.sqpoll_idle = Some(idle);
604 self
605 }
606
607 pub fn coop_taskrun(&mut self, enable: bool) -> &mut Self {
617 self.coop_taskrun = enable;
618 self
619 }
620
621 pub fn taskrun_flag(&mut self, enable: bool) -> &mut Self {
631 self.taskrun_flag = enable;
632 self
633 }
634
635 pub fn register_eventfd(&mut self, fd: RawFd) -> &mut Self {
641 self.eventfd = Some(fd);
642 self
643 }
644
645 pub fn detect_opcode_support(&mut self, flags: OpCodeFlag) -> &mut Self {
662 self.op_flags = flags;
663 self
664 }
665
666 pub fn driver_type(&mut self, t: DriverType) -> &mut Self {
670 self.driver_type = Some(t);
671 self
672 }
673
674 pub fn build(&self) -> io::Result<Proactor> {
676 Proactor::with_builder(self)
677 }
678}