Skip to main content

compio_driver/sys/op/managed/
fusion.rs

1use compio_buf::*;
2use rustix::net::RecvFlags;
3use socket2::SockAddr;
4
5use super::{fallback, iour};
6use crate::{BufferPool, BufferRef, IourOpCode, OpEntry, OpType, PollOpCode, sys::pal::*};
7
8macro_rules! mop {
9    (<$($ty:ident: $trait:ident),* $(,)?> $name:ident( $($arg:ident: $arg_t:ty),* $(,)? ) with $pool:ident) => {
10        mop!(<$($ty: $trait),*> $name( $($arg: $arg_t),* ) with $pool; crate::BufferRef);
11    };
12    (<$($ty:ident: $trait:ident),* $(,)?> $name:ident( $($arg:ident: $arg_t:ty),* $(,)? ) with $pool:ident; $inner:ty) => {
13        ::paste::paste!{
14            enum [< $name Inner >] <$($ty: $trait),*> {
15                Poll(fallback::$name<$($ty),*>),
16                IoUring(iour::$name<$($ty),*>),
17            }
18
19            impl<$($ty: $trait),*> [< $name Inner >]<$($ty),*> {
20                fn poll(&mut self) -> &mut fallback::$name<$($ty),*> {
21                    match self {
22                        Self::Poll(op) => op,
23                        Self::IoUring(_) => unreachable!("Current driver is not `io-uring`"),
24                    }
25                }
26
27                fn iour(&mut self) -> &mut iour::$name<$($ty),*> {
28                    match self {
29                        Self::IoUring(op) => op,
30                        Self::Poll(_) => unreachable!("Current driver is not `polling`"),
31                    }
32                }
33            }
34
35            #[doc = concat!("A fused `", stringify!($name), "` operation")]
36            pub struct $name <$($ty: $trait),*> {
37                inner: [< $name Inner >] <$($ty),*>
38            }
39
40            impl<$($ty: $trait),*> $name <$($ty),*> {
41                #[doc = concat!("Create a new `", stringify!($name), "`.")]
42                pub fn new($($arg: $arg_t),*) -> std::io::Result<Self> {
43                    Ok(if $pool.is_io_uring()? {
44                        Self {
45                            inner: [< $name Inner >]::IoUring(iour::$name::new($($arg),*)?),
46                        }
47                    } else {
48                        Self {
49                            inner: [< $name Inner >]::Poll(fallback::$name::new($($arg),*)?),
50                        }
51                    })
52                }
53            }
54
55            impl <$($ty: $trait),*> crate::TakeBuffer for $name <$($ty),*> {
56                type Buffer = $inner;
57
58                fn take_buffer(self) -> Option<$inner> {
59                    match self.inner {
60                        [< $name Inner >]::IoUring(op) => op.take_buffer().map(Into::into),
61                        [< $name Inner >]::Poll(op) => op.take_buffer().map(Into::into),
62                    }
63                }
64            }
65
66            unsafe impl<$($ty: $trait),*> PollOpCode for $name<$($ty),*> {
67                type Control = <fallback::$name<$($ty),*> as PollOpCode>::Control;
68
69                unsafe fn init(&mut self, ctrl: &mut Self::Control) {
70                    unsafe { self.inner.poll().init(ctrl) }
71                }
72
73                fn pre_submit(&mut self, control: &mut Self::Control) -> std::io::Result<crate::Decision> {
74                    self.inner.poll().pre_submit(control)
75                }
76
77                fn op_type(&mut self, control: &mut Self::Control) -> Option<OpType> {
78                    self.inner.poll().op_type(control)
79                }
80
81                fn operate(
82                    &mut self, control: &mut Self::Control,
83                ) -> std::task::Poll<std::io::Result<usize>> {
84                    self.inner.poll().operate(control)
85                }
86            }
87
88            unsafe impl<$($ty: $trait),*> IourOpCode for $name<$($ty),*> {
89                type Control = <iour::$name<$($ty),*> as IourOpCode>::Control;
90
91                unsafe fn init(&mut self, ctrl: &mut Self::Control) {
92                    unsafe { self.inner.iour().init(ctrl) }
93                }
94
95                fn create_entry(&mut self, control: &mut Self::Control) -> OpEntry {
96                    self.inner.iour().create_entry(control)
97                }
98
99                fn create_entry_fallback(&mut self, control: &mut Self::Control) -> OpEntry {
100                    self.inner.iour().create_entry_fallback(control)
101                }
102
103                fn call_blocking(&mut self, control: &mut Self::Control) -> std::io::Result<usize> {
104                    self.inner.iour().call_blocking(control)
105                }
106
107                unsafe fn set_result(&mut self, control: &mut Self::Control, result: &std::io::Result<usize>, extra: &crate::Extra) {
108                    unsafe { self.inner.iour().set_result(control, result, extra) }
109                }
110
111                unsafe fn push_multishot(&mut self, control: &mut Self::Control, result: std::io::Result<usize>, extra: crate::Extra) {
112                    unsafe { self.inner.iour().push_multishot(control, result, extra) }
113                }
114
115                fn pop_multishot(&mut self, control: &mut Self::Control) -> Option<BufResult<usize, crate::Extra>> {
116                    self.inner.iour().pop_multishot(control)
117                }
118            }
119        }
120    };
121}
122
123mop!(<S: AsFd> ReadManagedAt(fd: S, offset: u64, pool: &BufferPool, len: usize) with pool);
124mop!(<S: AsFd> ReadManaged(fd: S, pool: &BufferPool, len: usize) with pool);
125mop!(<S: AsFd> RecvManaged(fd: S, pool: &BufferPool, len: usize, flags: RecvFlags) with pool);
126mop!(<S: AsFd> RecvFromManaged(fd: S, pool: &BufferPool, len: usize, flags: RecvFlags) with pool; (BufferRef, Option<SockAddr>));
127mop!(<C: IoBufMut, S: AsFd> RecvMsgManaged(fd: S, pool: &BufferPool, len: usize, control: C, flags: RecvFlags) with pool; ((BufferRef, C), Option<SockAddr>, usize));
128mop!(<S: AsFd> ReadMultiAt(fd: S, offset: u64, pool: &BufferPool, len: usize) with pool);
129mop!(<S: AsFd> ReadMulti(fd: S, pool: &BufferPool, len: usize) with pool);
130mop!(<S: AsFd> RecvMulti(fd: S, pool: &BufferPool, len: usize, flags: RecvFlags) with pool);
131mop!(<S: AsFd> RecvFromMulti(fd: S, pool: &BufferPool, flags: RecvFlags) with pool; RecvFromMultiResult);
132mop!(<S: AsFd> RecvMsgMulti(fd: S, pool: &BufferPool, control_len: usize, flags: RecvFlags) with pool; RecvMsgMultiResult);
133
134impl<S: AsFd> RecvManaged<S> {
135    /// This method sets the `IORING_RECVSEND_POLL_FIRST` flag in the `ioprio`
136    /// of the SQE on the IO_URING driver.
137    pub fn poll_first(&mut self) {
138        match self.inner {
139            RecvManagedInner::Poll(ref mut i) => i.poll_first(),
140            RecvManagedInner::IoUring(ref mut i) => i.poll_first(),
141        }
142    }
143}
144
145impl<S: AsFd> RecvFromManaged<S> {
146    /// This method sets the `IORING_RECVSEND_POLL_FIRST` flag in the `ioprio`
147    /// of the SQE on the IO_URING driver.
148    pub fn poll_first(&mut self) {
149        match self.inner {
150            RecvFromManagedInner::Poll(ref mut i) => i.poll_first(),
151            RecvFromManagedInner::IoUring(ref mut i) => i.poll_first(),
152        }
153    }
154}
155
156impl<C: IoBufMut, S: AsFd> RecvMsgManaged<C, S> {
157    /// This method sets the `IORING_RECVSEND_POLL_FIRST` flag in the `ioprio`
158    /// of the SQE on the IO_URING driver.
159    pub fn poll_first(&mut self) {
160        match self.inner {
161            RecvMsgManagedInner::Poll(ref mut i) => i.poll_first(),
162            RecvMsgManagedInner::IoUring(ref mut i) => i.poll_first(),
163        }
164    }
165}
166
167enum RecvFromMultiResultInner {
168    Poll(fallback::RecvFromMultiResult),
169    IoUring(iour::RecvFromMultiResult),
170}
171
172/// Result of [`RecvFromMulti`].
173pub struct RecvFromMultiResult {
174    inner: RecvFromMultiResultInner,
175}
176
177impl From<fallback::RecvFromMultiResult> for RecvFromMultiResult {
178    fn from(result: fallback::RecvFromMultiResult) -> Self {
179        Self {
180            inner: RecvFromMultiResultInner::Poll(result),
181        }
182    }
183}
184
185impl From<iour::RecvFromMultiResult> for RecvFromMultiResult {
186    fn from(result: iour::RecvFromMultiResult) -> Self {
187        Self {
188            inner: RecvFromMultiResultInner::IoUring(result),
189        }
190    }
191}
192
193impl RecvFromMultiResult {
194    /// Create [`RecvFromMultiResult`] from a buffer received from
195    /// [`RecvFromMulti`]. It should be used for io-uring only.
196    ///
197    /// # Safety
198    ///
199    /// The buffer must be received from [`RecvFromMulti`] or have the same
200    /// format as the buffer received from [`RecvFromMulti`].
201    pub unsafe fn new(buffer: BufferRef) -> Self {
202        Self {
203            inner: RecvFromMultiResultInner::IoUring(unsafe {
204                iour::RecvFromMultiResult::new(buffer)
205            }),
206        }
207    }
208
209    /// Get the payload data.
210    pub fn data(&self) -> &[u8] {
211        match &self.inner {
212            RecvFromMultiResultInner::Poll(result) => result.data(),
213            RecvFromMultiResultInner::IoUring(result) => result.data(),
214        }
215    }
216
217    /// Get the source address if applicable.
218    pub fn addr(&self) -> Option<SockAddr> {
219        match &self.inner {
220            RecvFromMultiResultInner::Poll(result) => result.addr(),
221            RecvFromMultiResultInner::IoUring(result) => result.addr(),
222        }
223    }
224}
225
226impl IntoInner for RecvFromMultiResult {
227    type Inner = BufferRef;
228
229    fn into_inner(self) -> Self::Inner {
230        match self.inner {
231            RecvFromMultiResultInner::Poll(result) => result.into_inner(),
232            RecvFromMultiResultInner::IoUring(result) => result.into_inner(),
233        }
234    }
235}
236
237enum RecvMsgMultiResultInner {
238    Poll(fallback::RecvMsgMultiResult),
239    IoUring(iour::RecvMsgMultiResult),
240}
241
242/// Result of [`RecvMsgMulti`].
243pub struct RecvMsgMultiResult {
244    inner: RecvMsgMultiResultInner,
245}
246
247impl From<fallback::RecvMsgMultiResult> for RecvMsgMultiResult {
248    fn from(result: fallback::RecvMsgMultiResult) -> Self {
249        Self {
250            inner: RecvMsgMultiResultInner::Poll(result),
251        }
252    }
253}
254
255impl From<iour::RecvMsgMultiResult> for RecvMsgMultiResult {
256    fn from(result: iour::RecvMsgMultiResult) -> Self {
257        Self {
258            inner: RecvMsgMultiResultInner::IoUring(result),
259        }
260    }
261}
262
263impl RecvMsgMultiResult {
264    /// Create [`RecvMsgMultiResult`] from a buffer received from
265    /// [`RecvMsgMulti`]. It should be used for io-uring only.
266    ///
267    /// # Safety
268    ///
269    /// The buffer must be received from [`RecvMsgMulti`] or have the same
270    /// format as the buffer received from [`RecvMsgMulti`].
271    pub unsafe fn new(buffer: BufferRef, clen: usize) -> Self {
272        Self {
273            inner: RecvMsgMultiResultInner::IoUring(unsafe {
274                iour::RecvMsgMultiResult::new(buffer, clen)
275            }),
276        }
277    }
278
279    /// Get the payload data.
280    pub fn data(&self) -> &[u8] {
281        match &self.inner {
282            RecvMsgMultiResultInner::Poll(result) => result.data(),
283            RecvMsgMultiResultInner::IoUring(result) => result.data(),
284        }
285    }
286
287    /// Get the ancillary data.
288    pub fn ancillary(&self) -> &[u8] {
289        match &self.inner {
290            RecvMsgMultiResultInner::Poll(result) => result.ancillary(),
291            RecvMsgMultiResultInner::IoUring(result) => result.ancillary(),
292        }
293    }
294
295    /// Get the source address if applicable.
296    pub fn addr(&self) -> Option<SockAddr> {
297        match &self.inner {
298            RecvMsgMultiResultInner::Poll(result) => result.addr(),
299            RecvMsgMultiResultInner::IoUring(result) => result.addr(),
300        }
301    }
302}
303
304impl IntoInner for RecvMsgMultiResult {
305    type Inner = BufferRef;
306
307    fn into_inner(self) -> Self::Inner {
308        match self.inner {
309            RecvMsgMultiResultInner::Poll(result) => result.into_inner(),
310            RecvMsgMultiResultInner::IoUring(result) => result.into_inner(),
311        }
312    }
313}