Skip to main content

compio_runtime/runtime/
stream.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use compio_buf::BufResult;
7use compio_driver::{Extra, Key, OpCode, PushEntry};
8use futures_util::{Stream, stream::FusedStream};
9
10use crate::{ContextExt, Runtime};
11
12pin_project_lite::pin_project! {
13    /// Returned [`Stream`] for [`Runtime::submit_multi`].
14    ///
15    /// When this is dropped and the operation hasn't finished yet, it will try to
16    /// cancel the operation.
17    pub struct SubmitMulti<T: OpCode> {
18        runtime: Runtime,
19        state: Option<State<T>>,
20    }
21
22    impl<T: OpCode> PinnedDrop for SubmitMulti<T> {
23        fn drop(this: Pin<&mut Self>) {
24            let this = this.project();
25            if let Some(State::Submitted { key }) = this.state.take() {
26                this.runtime.cancel(key);
27            }
28        }
29    }
30}
31
32enum State<T: OpCode> {
33    Idle { op: T },
34    Submitted { key: Key<T> },
35    Finished { op: T },
36}
37
38impl<T: OpCode> State<T> {
39    fn submitted(key: Key<T>) -> Self {
40        State::Submitted { key }
41    }
42}
43
44impl<T: OpCode> SubmitMulti<T> {
45    pub(crate) fn new(runtime: Runtime, op: T) -> Self {
46        SubmitMulti {
47            runtime,
48            state: Some(State::Idle { op }),
49        }
50    }
51
52    /// Try to take the inner op from the stream.
53    ///
54    /// Returns `Ok(T)` if the stream:
55    ///
56    /// - has not been polled yet, or
57    /// - is finished and the op is returned by the driver
58    ///
59    /// Returns `Err(Self)` if it's still running.
60    pub fn try_take(mut self) -> Result<T, Self> {
61        match self.state.take() {
62            Some(State::Finished { op }) | Some(State::Idle { op }) => Ok(op),
63            state => {
64                debug_assert!(state.is_some());
65                self.state = state;
66                Err(self)
67            }
68        }
69    }
70}
71
72impl<T: OpCode + 'static> Stream for SubmitMulti<T> {
73    type Item = BufResult<usize, Extra>;
74
75    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
76        let this = self.project();
77
78        loop {
79            match this.state.take().expect("State error, this is a bug") {
80                State::Idle { op } => {
81                    let extra = cx.as_extra(|| this.runtime.default_extra());
82                    match this.runtime.submit_raw(op, extra) {
83                        PushEntry::Pending(key) => {
84                            if let Some(cancel) = cx.as_cancel() {
85                                cancel.register(&key);
86                            }
87
88                            *this.state = Some(State::submitted(key))
89                        }
90                        PushEntry::Ready(BufResult(res, op)) => {
91                            *this.state = Some(State::Finished { op });
92                            let extra = this.runtime.default_extra();
93
94                            return Poll::Ready(Some(BufResult(res, extra)));
95                        }
96                    }
97                }
98
99                State::Submitted { key, .. } => {
100                    if let Some(res) = this.runtime.poll_multishot(cx.waker(), &key) {
101                        *this.state = Some(State::submitted(key));
102
103                        return Poll::Ready(Some(res));
104                    };
105
106                    match this.runtime.poll_task_with_extra(cx.waker(), key) {
107                        PushEntry::Pending(key) => {
108                            *this.state = Some(State::submitted(key));
109
110                            return Poll::Pending;
111                        }
112                        PushEntry::Ready((BufResult(res, op), extra)) => {
113                            *this.state = Some(State::Finished { op });
114
115                            return Poll::Ready(Some(BufResult(res, extra)));
116                        }
117                    }
118                }
119
120                State::Finished { op } => {
121                    *this.state = Some(State::Finished { op });
122
123                    return Poll::Ready(None);
124                }
125            }
126        }
127    }
128}
129
130impl<T: OpCode + 'static> FusedStream for SubmitMulti<T> {
131    fn is_terminated(&self) -> bool {
132        matches!(self.state, None | Some(State::Finished { .. }))
133    }
134}