Skip to main content

compio_runtime\runtime/
future.rs

1//! Future for submitting operations to the runtime.
2
3use std::{
4    future::Future,
5    marker::PhantomData,
6    pin::Pin,
7    task::{Context, Poll},
8};
9
10use compio_buf::BufResult;
11use compio_driver::{Extra, Key, OpCode, PushEntry};
12use futures_util::future::FusedFuture;
13
14use crate::{CancelToken, runtime::Runtime};
15
16pub(crate) trait ContextExt {
17    fn as_cancel(&mut self) -> Option<&CancelToken> {
18        None
19    }
20
21    fn as_extra(&mut self, default: impl FnOnce() -> Extra) -> Option<Extra> {
22        let _ = default;
23        None
24    }
25}
26
27#[cfg(not(feature = "future-combinator"))]
28impl ContextExt for Context<'_> {}
29
30#[cfg(feature = "future-combinator")]
31impl ContextExt for Context<'_> {
32    fn as_cancel(&mut self) -> Option<&CancelToken> {
33        self.ext()
34            .downcast_ref::<crate::future::Ext>()?
35            .get_cancel()
36    }
37
38    fn as_extra(&mut self, default: impl FnOnce() -> Extra) -> Option<Extra> {
39        let ext = self.ext().downcast_mut::<crate::future::Ext>()?;
40        let mut extra = default();
41        ext.set_extra(&mut extra);
42        Some(extra)
43    }
44}
45pin_project_lite::pin_project! {
46    /// Returned [`Future`] for [`Runtime::submit`].
47    ///
48    /// When this is dropped and the operation hasn't finished yet, it will try to
49    /// cancel the operation.
50    ///
51    /// By default, this implements `Future<Output = BufResult<usize, T>>`. If
52    /// [`Extra`] is needed, call [`.with_extra()`] to get a `Submit<T, Extra>`
53    /// which implements `Future<Output = (BufResult<usize, T>, Extra)>`.
54    ///
55    /// [`.with_extra()`]: Submit::with_extra
56    pub struct Submit<T: OpCode, E = ()> {
57        runtime: Runtime,
58        state: Option<State<T, E>>,
59    }
60
61    impl<T: OpCode, E> PinnedDrop for Submit<T, E> {
62        fn drop(this: Pin<&mut Self>) {
63            let this = this.project();
64            if let Some(State::Submitted { key, .. }) = this.state.take() {
65                this.runtime.cancel(key);
66            }
67        }
68    }
69
70}
71
72enum State<T: OpCode, E> {
73    Idle { op: T },
74    Submitted { key: Key<T>, _p: PhantomData<E> },
75}
76
77impl<T: OpCode, E> State<T, E> {
78    fn submitted(key: Key<T>) -> Self {
79        State::Submitted {
80            key,
81            _p: PhantomData,
82        }
83    }
84}
85
86impl<T: OpCode> Submit<T, ()> {
87    pub(crate) fn new(runtime: Runtime, op: T) -> Self {
88        Submit {
89            runtime,
90            state: Some(State::Idle { op }),
91        }
92    }
93
94    /// Convert this future to one that returns [`Extra`] along with the result.
95    ///
96    /// This is useful if you need to access extra information provided by the
97    /// runtime upon completion of the operation.
98    pub fn with_extra(mut self) -> Submit<T, Extra> {
99        let runtime = self.runtime.clone();
100        let Some(state) = self.state.take() else {
101            return Submit {
102                runtime,
103                state: None,
104            };
105        };
106        let state = match state {
107            State::Submitted { key, .. } => State::Submitted {
108                key,
109                _p: PhantomData,
110            },
111            State::Idle { op } => State::Idle { op },
112        };
113        Submit {
114            runtime,
115            state: Some(state),
116        }
117    }
118}
119
120impl<T: OpCode + 'static> Future for Submit<T, ()> {
121    type Output = BufResult<usize, T>;
122
123    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
124        let this = self.project();
125
126        loop {
127            match this.state.take().expect("Cannot poll after ready") {
128                State::Submitted { key, .. } => match this.runtime.poll_task(cx.waker(), key) {
129                    PushEntry::Pending(key) => {
130                        *this.state = Some(State::submitted(key));
131                        return Poll::Pending;
132                    }
133                    PushEntry::Ready(res) => return Poll::Ready(res),
134                },
135                State::Idle { op } => {
136                    let extra = cx.as_extra(|| this.runtime.default_extra());
137                    match this.runtime.submit_raw(op, extra) {
138                        PushEntry::Pending(key) => {
139                            // TODO: Should we register it only the first time or every time it's
140                            // being polled?
141                            if let Some(cancel) = cx.as_cancel() {
142                                cancel.register(&key);
143                            };
144
145                            *this.state = Some(State::submitted(key))
146                        }
147                        PushEntry::Ready(res) => {
148                            return Poll::Ready(res);
149                        }
150                    }
151                }
152            }
153        }
154    }
155}
156
157impl<T: OpCode + 'static> Future for Submit<T, Extra> {
158    type Output = (BufResult<usize, T>, Extra);
159
160    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
161        let this = self.project();
162
163        loop {
164            match this.state.take().expect("Cannot poll after ready") {
165                State::Submitted { key, .. } => {
166                    match this.runtime.poll_task_with_extra(cx.waker(), key) {
167                        PushEntry::Pending(key) => {
168                            *this.state = Some(State::submitted(key));
169                            return Poll::Pending;
170                        }
171                        PushEntry::Ready(res) => return Poll::Ready(res),
172                    }
173                }
174                State::Idle { op } => {
175                    let extra = cx.as_extra(|| this.runtime.default_extra());
176                    match this.runtime.submit_raw(op, extra) {
177                        PushEntry::Pending(key) => {
178                            if let Some(cancel) = cx.as_cancel() {
179                                cancel.register(&key);
180                            }
181
182                            *this.state = Some(State::submitted(key))
183                        }
184                        PushEntry::Ready(res) => {
185                            return Poll::Ready((res, this.runtime.default_extra()));
186                        }
187                    }
188                }
189            }
190        }
191    }
192}
193
194impl<T: OpCode, E> FusedFuture for Submit<T, E>
195where
196    Submit<T, E>: Future,
197{
198    fn is_terminated(&self) -> bool {
199        self.state.is_none()
200    }
201}