compio_runtime\future/
cancel.rs1use std::{
2 pin::Pin,
3 task::{Context, ContextBuilder, Poll},
4};
5
6use futures_util::FutureExt;
7use pin_project_lite::pin_project;
8use synchrony::unsync::event::EventListener;
9
10use crate::{CancelToken, future::Ext};
11
12pin_project! {
13 pub struct WithCancel<F: ?Sized> {
25 cancel: CancelToken,
26 #[pin]
27 future: F,
28 }
29}
30
31pin_project! {
32 pub struct WithCancelFailFast<F: ?Sized> {
40 listen: EventListener,
41 #[pin]
42 future: WithCancel<F>,
43 }
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
48pub struct Cancelled;
49
50impl<F: ?Sized> WithCancel<F> {
51 pub fn new(future: F, cancel: CancelToken) -> Self
53 where
54 F: Sized,
55 {
56 Self { cancel, future }
57 }
58}
59
60impl<F> WithCancel<F> {
61 pub fn fail_fast(self) -> WithCancelFailFast<F> {
66 let listen = self.cancel.listen();
67
68 WithCancelFailFast {
69 listen,
70 future: self,
71 }
72 }
73}
74
75impl<F> WithCancelFailFast<F> {
76 pub fn fail_slow(self) -> WithCancel<F> {
80 self.future
81 }
82}
83
84impl std::fmt::Display for Cancelled {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 write!(f, "Cancelled")
87 }
88}
89
90impl std::error::Error for Cancelled {}
91
92impl<F: ?Sized> Future for WithCancel<F>
93where
94 F: Future,
95{
96 type Output = F::Output;
97
98 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
99 let this = self.project();
100
101 if let Some(ext) = cx.ext().downcast_mut::<Ext>() {
102 ext.set_cancel(this.cancel);
103 this.future.poll(cx)
104 } else {
105 let mut ex = Ext::new().with_cancel(this.cancel);
106 let mut cx = ContextBuilder::from(cx).ext(&mut ex).build();
107 this.future.poll(&mut cx)
108 }
109 }
110}
111
112impl<F: ?Sized> Future for WithCancelFailFast<F>
113where
114 F: Future,
115{
116 type Output = Result<F::Output, Cancelled>;
117
118 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
119 let mut this = self.project();
120
121 if this.listen.poll_unpin(cx).is_ready() {
122 return Poll::Ready(Err(Cancelled));
123 }
124
125 this.future.poll_unpin(cx).map(Ok)
126 }
127}