Skip to main content

compio_runtime\future/
cancel.rs

1use std::{
2    pin::Pin,
3    task::{Context, ContextBuilder, Poll},
4};
5
6use futures_util::FutureExt;
7use pin_project_lite::pin_project;
8use synchrony::unsync::event::EventListener;
9
10use crate::{CancelToken, future::Ext};
11
12pin_project! {
13    /// A future with a [`CancelToken`] attached to it.
14    ///
15    /// Created with [`FutureExt::with_cancel`].
16    ///
17    /// When the cancel token is triggered, this future will still be
18    /// polled until completion, only compio operations that registered its [`Key`]
19    /// to the cancel token will be cancelled. If you want a future that completes
20    /// with an error immediately when the cancel token is triggered, see [`WithCancelFailFast`].
21    ///
22    /// [`Key`]: compio_driver::Key
23    /// [`FutureExt::with_cancel`]: crate::future::FutureExt::with_cancel
24    pub struct WithCancel<F: ?Sized> {
25        cancel: CancelToken,
26        #[pin]
27        future: F,
28    }
29}
30
31pin_project! {
32    /// A fail-fast future with a [`CancelToken`] attached to it.
33    ///
34    /// Created with [`WithCancel::fail_fast`].
35    ///
36    /// Similar to [`WithCancel`], with the difference that when the
37    /// cancel token is triggered, this will also be notified and complete
38    /// with an error without further polling the inner future.
39    pub struct WithCancelFailFast<F: ?Sized> {
40        listen: EventListener,
41        #[pin]
42        future: WithCancel<F>,
43    }
44}
45
46/// An [`std::error::Error`] indicating that a future was cancelled.
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
48pub struct Cancelled;
49
50impl<F: ?Sized> WithCancel<F> {
51    /// Create a new [`WithCancel`] future.
52    pub fn new(future: F, cancel: CancelToken) -> Self
53    where
54        F: Sized,
55    {
56        Self { cancel, future }
57    }
58}
59
60impl<F> WithCancel<F> {
61    /// Convert to a fail-fast version.
62    ///
63    /// When the cancel token is triggered, the future will be notified and
64    /// complete with an error without further polling the inner future.
65    pub fn fail_fast(self) -> WithCancelFailFast<F> {
66        let listen = self.cancel.listen();
67
68        WithCancelFailFast {
69            listen,
70            future: self,
71        }
72    }
73}
74
75impl<F> WithCancelFailFast<F> {
76    /// Convert to a fail-slow version.
77    ///
78    /// See [`WithCancel`] for details.
79    pub fn fail_slow(self) -> WithCancel<F> {
80        self.future
81    }
82}
83
84impl std::fmt::Display for Cancelled {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        write!(f, "Cancelled")
87    }
88}
89
90impl std::error::Error for Cancelled {}
91
92impl<F: ?Sized> Future for WithCancel<F>
93where
94    F: Future,
95{
96    type Output = F::Output;
97
98    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
99        let this = self.project();
100
101        if let Some(ext) = cx.ext().downcast_mut::<Ext>() {
102            ext.set_cancel(this.cancel);
103            this.future.poll(cx)
104        } else {
105            let mut ex = Ext::new().with_cancel(this.cancel);
106            let mut cx = ContextBuilder::from(cx).ext(&mut ex).build();
107            this.future.poll(&mut cx)
108        }
109    }
110}
111
112impl<F: ?Sized> Future for WithCancelFailFast<F>
113where
114    F: Future,
115{
116    type Output = Result<F::Output, Cancelled>;
117
118    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
119        let mut this = self.project();
120
121        if this.listen.poll_unpin(cx).is_ready() {
122            return Poll::Ready(Err(Cancelled));
123        }
124
125        this.future.poll_unpin(cx).map(Ok)
126    }
127}