compio_runtime\runtime/
stream.rs1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use compio_buf::BufResult;
7use compio_driver::{Extra, Key, OpCode, PushEntry};
8use futures_util::{Stream, stream::FusedStream};
9
10use crate::{ContextExt, Runtime};
11
12pin_project_lite::pin_project! {
13 pub struct SubmitMulti<T: OpCode> {
18 runtime: Runtime,
19 state: Option<State<T>>,
20 }
21
22 impl<T: OpCode> PinnedDrop for SubmitMulti<T> {
23 fn drop(this: Pin<&mut Self>) {
24 let this = this.project();
25 if let Some(State::Submitted { key }) = this.state.take() {
26 this.runtime.cancel(key);
27 }
28 }
29 }
30}
31
32enum State<T: OpCode> {
33 Idle { op: T },
34 Submitted { key: Key<T> },
35 Finished { op: T },
36}
37
38impl<T: OpCode> State<T> {
39 fn submitted(key: Key<T>) -> Self {
40 State::Submitted { key }
41 }
42}
43
44impl<T: OpCode> SubmitMulti<T> {
45 pub(crate) fn new(runtime: Runtime, op: T) -> Self {
46 SubmitMulti {
47 runtime,
48 state: Some(State::Idle { op }),
49 }
50 }
51
52 pub fn try_take(mut self) -> Result<T, Self> {
61 match self.state.take() {
62 Some(State::Finished { op }) | Some(State::Idle { op }) => Ok(op),
63 state => {
64 debug_assert!(state.is_some());
65 self.state = state;
66 Err(self)
67 }
68 }
69 }
70}
71
72impl<T: OpCode + 'static> Stream for SubmitMulti<T> {
73 type Item = BufResult<usize, Extra>;
74
75 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
76 let this = self.project();
77
78 loop {
79 match this.state.take().expect("State error, this is a bug") {
80 State::Idle { op } => {
81 let extra = cx.as_extra(|| this.runtime.default_extra());
82 match this.runtime.submit_raw(op, extra) {
83 PushEntry::Pending(key) => {
84 if let Some(cancel) = cx.as_cancel() {
85 cancel.register(&key);
86 }
87
88 *this.state = Some(State::submitted(key))
89 }
90 PushEntry::Ready(BufResult(res, op)) => {
91 *this.state = Some(State::Finished { op });
92 let extra = this.runtime.default_extra();
93
94 return Poll::Ready(Some(BufResult(res, extra)));
95 }
96 }
97 }
98
99 State::Submitted { key, .. } => {
100 if let Some(res) = this.runtime.poll_multishot(cx.waker(), &key) {
101 *this.state = Some(State::submitted(key));
102
103 return Poll::Ready(Some(res));
104 };
105
106 match this.runtime.poll_task_with_extra(cx.waker(), key) {
107 PushEntry::Pending(key) => {
108 *this.state = Some(State::submitted(key));
109
110 return Poll::Pending;
111 }
112 PushEntry::Ready((BufResult(res, op), extra)) => {
113 *this.state = Some(State::Finished { op });
114
115 return Poll::Ready(Some(BufResult(res, extra)));
116 }
117 }
118 }
119
120 State::Finished { op } => {
121 *this.state = Some(State::Finished { op });
122
123 return Poll::Ready(None);
124 }
125 }
126 }
127 }
128}
129
130impl<T: OpCode + 'static> FusedStream for SubmitMulti<T> {
131 fn is_terminated(&self) -> bool {
132 matches!(self.state, None | Some(State::Finished { .. }))
133 }
134}