Skip to main content

compio_runtime/
cancel.rs

1use std::{
2    cell::{Cell, RefCell},
3    collections::HashSet,
4    future::poll_fn,
5    mem,
6    ops::DerefMut,
7    pin::Pin,
8    rc::Rc,
9    task::{Context, Poll},
10};
11
12use compio_driver::{Cancel, Key, OpCode};
13use futures_util::{FutureExt, ready};
14use synchrony::unsync::event::{Event, EventListener};
15
16use crate::Runtime;
17
18#[derive(Debug)]
19struct Inner {
20    tokens: RefCell<HashSet<Cancel>>,
21    is_cancelled: Cell<bool>,
22    runtime: Runtime,
23    notify: Event,
24}
25
26/// A token that can be used to cancel multiple operations at once.
27///
28/// When [`CancelToken::cancel`] is called, all operations that have been
29/// registered with this token will be cancelled.
30///
31/// It is also possible to use [`CancelToken::wait`] to wait until the token is
32/// cancelled, which can be useful for implementing timeouts or other
33/// cancellation-based logic.
34///
35/// To associate a future with this cancel token, use the `with_cancel`
36/// combinator from the [`FutureExt`] trait, which requires nightly feature
37/// `future-combinator`.
38#[cfg_attr(
39    feature = "future-combinator",
40    doc = "\n\n [`FutureExt`]: crate::future::FutureExt"
41)]
42#[cfg_attr(
43    not(feature = "future-combinator"),
44    doc = "\n\n [`FutureExt`]: https://docs.rs/compio/latest/compio/runtime/future/trait.FutureExt.html"
45)]
46#[derive(Clone, Debug)]
47pub struct CancelToken(Rc<Inner>);
48
49impl PartialEq for CancelToken {
50    fn eq(&self, other: &Self) -> bool {
51        Rc::ptr_eq(&self.0, &other.0)
52    }
53}
54
55impl Eq for CancelToken {}
56
57impl CancelToken {
58    /// Create a new cancel token.
59    pub fn new() -> Self {
60        Self(Rc::new(Inner {
61            tokens: RefCell::new(HashSet::new()),
62            is_cancelled: Cell::new(false),
63            runtime: Runtime::current(),
64            notify: Event::new(),
65        }))
66    }
67
68    pub(crate) fn listen(&self) -> EventListener {
69        self.0.notify.listen()
70    }
71
72    /// Cancel all operations registered with this token.
73    pub fn cancel(self) {
74        self.0.notify.notify_all();
75        if self.0.is_cancelled.replace(true) {
76            return;
77        }
78        let tokens = mem::take(self.0.tokens.borrow_mut().deref_mut());
79        for t in tokens {
80            self.0.runtime.cancel_token(t);
81        }
82    }
83
84    /// Check if this token has been cancelled.
85    pub fn is_cancelled(&self) -> bool {
86        self.0.is_cancelled.get()
87    }
88
89    /// Register an operation with this token.
90    ///
91    /// If the token has already been cancelled, the operation will be cancelled
92    /// immediately. Usually this method should not be used directly, but rather
93    /// through the `with_cancel` combinator, which requires nightly feature
94    /// `future-combinator`.
95    ///
96    /// Multiple registrations of the same key does nothing, and the key will
97    /// only be cancelled once.
98    pub fn register<T: OpCode>(&self, key: &Key<T>) {
99        if self.0.is_cancelled.get() {
100            self.0.runtime.cancel(key.clone());
101        } else {
102            let token = self.0.runtime.register_cancel(key);
103            self.0.tokens.borrow_mut().insert(token);
104        }
105    }
106
107    /// Wait until this token is cancelled.
108    pub fn wait(self) -> WaitFuture {
109        WaitFuture::new(self)
110    }
111
112    /// Try to get the current cancel token associated with the future.
113    ///
114    /// This is done by checking if the current context has a cancel token
115    /// associated with it. This will only work with nightly feature
116    /// `future-combinator` turned on; otherwise, `None` is always returned.
117    pub async fn current() -> Option<Self> {
118        #[cfg(not(feature = "future-combinator"))]
119        return None;
120
121        #[cfg(feature = "future-combinator")]
122        poll_fn(|cx| {
123            use crate::runtime::ContextExt;
124            Poll::Ready(cx.as_cancel().cloned())
125        })
126        .await
127    }
128}
129
130impl Default for CancelToken {
131    fn default() -> Self {
132        Self::new()
133    }
134}
135
136/// Future returned by [`CancelToken::wait`].
137pub struct WaitFuture {
138    listen: EventListener,
139    token: CancelToken,
140}
141
142impl WaitFuture {
143    fn new(token: CancelToken) -> WaitFuture {
144        WaitFuture {
145            listen: token.0.notify.listen(),
146            token,
147        }
148    }
149}
150
151impl Future for WaitFuture {
152    type Output = ();
153
154    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
155        loop {
156            if self.token.is_cancelled() {
157                return Poll::Ready(());
158            } else {
159                ready!(self.listen.poll_unpin(cx))
160            }
161        }
162    }
163}