1use std::{
2 cell::{Cell, RefCell},
3 collections::HashSet,
4 future::poll_fn,
5 mem,
6 ops::DerefMut,
7 pin::Pin,
8 rc::Rc,
9 task::{Context, Poll},
10};
11
12use compio_driver::{Cancel, Key, OpCode};
13use futures_util::{FutureExt, ready};
14use synchrony::unsync::event::{Event, EventListener};
15
16use crate::Runtime;
17
18#[derive(Debug)]
19struct Inner {
20 tokens: RefCell<HashSet<Cancel>>,
21 is_cancelled: Cell<bool>,
22 runtime: Runtime,
23 notify: Event,
24}
25
26#[cfg_attr(
39 feature = "future-combinator",
40 doc = "\n\n [`FutureExt`]: crate::future::FutureExt"
41)]
42#[cfg_attr(
43 not(feature = "future-combinator"),
44 doc = "\n\n [`FutureExt`]: https://docs.rs/compio/latest/compio/runtime/future/trait.FutureExt.html"
45)]
46#[derive(Clone, Debug)]
47pub struct CancelToken(Rc<Inner>);
48
49impl PartialEq for CancelToken {
50 fn eq(&self, other: &Self) -> bool {
51 Rc::ptr_eq(&self.0, &other.0)
52 }
53}
54
55impl Eq for CancelToken {}
56
57impl CancelToken {
58 pub fn new() -> Self {
60 Self(Rc::new(Inner {
61 tokens: RefCell::new(HashSet::new()),
62 is_cancelled: Cell::new(false),
63 runtime: Runtime::current(),
64 notify: Event::new(),
65 }))
66 }
67
68 pub(crate) fn listen(&self) -> EventListener {
69 self.0.notify.listen()
70 }
71
72 pub fn cancel(self) {
74 self.0.notify.notify_all();
75 if self.0.is_cancelled.replace(true) {
76 return;
77 }
78 let tokens = mem::take(self.0.tokens.borrow_mut().deref_mut());
79 for t in tokens {
80 self.0.runtime.cancel_token(t);
81 }
82 }
83
84 pub fn is_cancelled(&self) -> bool {
86 self.0.is_cancelled.get()
87 }
88
89 pub fn register<T: OpCode>(&self, key: &Key<T>) {
99 if self.0.is_cancelled.get() {
100 self.0.runtime.cancel(key.clone());
101 } else {
102 let token = self.0.runtime.register_cancel(key);
103 self.0.tokens.borrow_mut().insert(token);
104 }
105 }
106
107 pub fn wait(self) -> WaitFuture {
109 WaitFuture::new(self)
110 }
111
112 pub async fn current() -> Option<Self> {
118 #[cfg(not(feature = "future-combinator"))]
119 return None;
120
121 #[cfg(feature = "future-combinator")]
122 poll_fn(|cx| {
123 use crate::runtime::ContextExt;
124 Poll::Ready(cx.as_cancel().cloned())
125 })
126 .await
127 }
128}
129
130impl Default for CancelToken {
131 fn default() -> Self {
132 Self::new()
133 }
134}
135
136pub struct WaitFuture {
138 listen: EventListener,
139 token: CancelToken,
140}
141
142impl WaitFuture {
143 fn new(token: CancelToken) -> WaitFuture {
144 WaitFuture {
145 listen: token.0.notify.listen(),
146 token,
147 }
148 }
149}
150
151impl Future for WaitFuture {
152 type Output = ();
153
154 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
155 loop {
156 if self.token.is_cancelled() {
157 return Poll::Ready(());
158 } else {
159 ready!(self.listen.poll_unpin(cx))
160 }
161 }
162 }
163}