compio_runtime\runtime/
future.rs1use std::{
4 future::Future,
5 marker::PhantomData,
6 pin::Pin,
7 task::{Context, Poll},
8};
9
10use compio_buf::BufResult;
11use compio_driver::{Extra, Key, OpCode, PushEntry};
12use futures_util::future::FusedFuture;
13
14use crate::{CancelToken, runtime::Runtime};
15
16pub(crate) trait ContextExt {
17 fn as_cancel(&mut self) -> Option<&CancelToken> {
18 None
19 }
20
21 fn as_extra(&mut self, default: impl FnOnce() -> Extra) -> Option<Extra> {
22 let _ = default;
23 None
24 }
25}
26
27#[cfg(not(feature = "future-combinator"))]
28impl ContextExt for Context<'_> {}
29
30#[cfg(feature = "future-combinator")]
31impl ContextExt for Context<'_> {
32 fn as_cancel(&mut self) -> Option<&CancelToken> {
33 self.ext()
34 .downcast_ref::<crate::future::Ext>()?
35 .get_cancel()
36 }
37
38 fn as_extra(&mut self, default: impl FnOnce() -> Extra) -> Option<Extra> {
39 let ext = self.ext().downcast_mut::<crate::future::Ext>()?;
40 let mut extra = default();
41 ext.set_extra(&mut extra);
42 Some(extra)
43 }
44}
45pin_project_lite::pin_project! {
46 pub struct Submit<T: OpCode, E = ()> {
57 runtime: Runtime,
58 state: Option<State<T, E>>,
59 }
60
61 impl<T: OpCode, E> PinnedDrop for Submit<T, E> {
62 fn drop(this: Pin<&mut Self>) {
63 let this = this.project();
64 if let Some(State::Submitted { key, .. }) = this.state.take() {
65 this.runtime.cancel(key);
66 }
67 }
68 }
69
70}
71
72enum State<T: OpCode, E> {
73 Idle { op: T },
74 Submitted { key: Key<T>, _p: PhantomData<E> },
75}
76
77impl<T: OpCode, E> State<T, E> {
78 fn submitted(key: Key<T>) -> Self {
79 State::Submitted {
80 key,
81 _p: PhantomData,
82 }
83 }
84}
85
86impl<T: OpCode> Submit<T, ()> {
87 pub(crate) fn new(runtime: Runtime, op: T) -> Self {
88 Submit {
89 runtime,
90 state: Some(State::Idle { op }),
91 }
92 }
93
94 pub fn with_extra(mut self) -> Submit<T, Extra> {
99 let runtime = self.runtime.clone();
100 let Some(state) = self.state.take() else {
101 return Submit {
102 runtime,
103 state: None,
104 };
105 };
106 let state = match state {
107 State::Submitted { key, .. } => State::Submitted {
108 key,
109 _p: PhantomData,
110 },
111 State::Idle { op } => State::Idle { op },
112 };
113 Submit {
114 runtime,
115 state: Some(state),
116 }
117 }
118}
119
120impl<T: OpCode + 'static> Future for Submit<T, ()> {
121 type Output = BufResult<usize, T>;
122
123 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
124 let this = self.project();
125
126 loop {
127 match this.state.take().expect("Cannot poll after ready") {
128 State::Submitted { key, .. } => match this.runtime.poll_task(cx.waker(), key) {
129 PushEntry::Pending(key) => {
130 *this.state = Some(State::submitted(key));
131 return Poll::Pending;
132 }
133 PushEntry::Ready(res) => return Poll::Ready(res),
134 },
135 State::Idle { op } => {
136 let extra = cx.as_extra(|| this.runtime.default_extra());
137 match this.runtime.submit_raw(op, extra) {
138 PushEntry::Pending(key) => {
139 if let Some(cancel) = cx.as_cancel() {
142 cancel.register(&key);
143 };
144
145 *this.state = Some(State::submitted(key))
146 }
147 PushEntry::Ready(res) => {
148 return Poll::Ready(res);
149 }
150 }
151 }
152 }
153 }
154 }
155}
156
157impl<T: OpCode + 'static> Future for Submit<T, Extra> {
158 type Output = (BufResult<usize, T>, Extra);
159
160 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
161 let this = self.project();
162
163 loop {
164 match this.state.take().expect("Cannot poll after ready") {
165 State::Submitted { key, .. } => {
166 match this.runtime.poll_task_with_extra(cx.waker(), key) {
167 PushEntry::Pending(key) => {
168 *this.state = Some(State::submitted(key));
169 return Poll::Pending;
170 }
171 PushEntry::Ready(res) => return Poll::Ready(res),
172 }
173 }
174 State::Idle { op } => {
175 let extra = cx.as_extra(|| this.runtime.default_extra());
176 match this.runtime.submit_raw(op, extra) {
177 PushEntry::Pending(key) => {
178 if let Some(cancel) = cx.as_cancel() {
179 cancel.register(&key);
180 }
181
182 *this.state = Some(State::submitted(key))
183 }
184 PushEntry::Ready(res) => {
185 return Poll::Ready((res, this.runtime.default_extra()));
186 }
187 }
188 }
189 }
190 }
191 }
192}
193
194impl<T: OpCode, E> FusedFuture for Submit<T, E>
195where
196 Submit<T, E>: Future,
197{
198 fn is_terminated(&self) -> bool {
199 self.state.is_none()
200 }
201}