Skip to main content

compio_driver/sys/iour/
op.rs

1use std::{
2    collections::VecDeque,
3    ffi::CString,
4    io,
5    marker::PhantomPinned,
6    os::fd::{AsFd, AsRawFd, FromRawFd, IntoRawFd, OwnedFd},
7    pin::Pin,
8};
9
10use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
11use io_uring::{
12    opcode,
13    types::{Fd, FsyncFlags},
14};
15use pin_project_lite::pin_project;
16use socket2::{SockAddr, SockAddrStorage, Socket as Socket2, socklen_t};
17
18use super::OpCode;
19pub use crate::sys::unix_op::*;
20use crate::{Extra, OpEntry, op::*, sys_slice::*, syscall};
21
22unsafe impl<
23    D: std::marker::Send + 'static,
24    F: (FnOnce() -> BufResult<usize, D>) + std::marker::Send + 'static,
25> OpCode for Asyncify<F, D>
26{
27    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
28        OpEntry::Blocking
29    }
30
31    fn call_blocking(self: Pin<&mut Self>) -> std::io::Result<usize> {
32        let this = self.project();
33        let f = this
34            .f
35            .take()
36            .expect("the operate method could only be called once");
37        let BufResult(res, data) = f();
38        *this.data = Some(data);
39        res
40    }
41}
42
43unsafe impl<
44    S,
45    D: std::marker::Send + 'static,
46    F: (FnOnce(&S) -> BufResult<usize, D>) + std::marker::Send + 'static,
47> OpCode for AsyncifyFd<S, F, D>
48{
49    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
50        OpEntry::Blocking
51    }
52
53    fn call_blocking(self: Pin<&mut Self>) -> std::io::Result<usize> {
54        let this = self.project();
55        let f = this
56            .f
57            .take()
58            .expect("the operate method could only be called once");
59        let BufResult(res, data) = f(this.fd);
60        *this.data = Some(data);
61        res
62    }
63}
64
65unsafe impl<
66    S1,
67    S2,
68    D: std::marker::Send + 'static,
69    F: (FnOnce(&S1, &S2) -> BufResult<usize, D>) + std::marker::Send + 'static,
70> OpCode for AsyncifyFd2<S1, S2, F, D>
71{
72    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
73        OpEntry::Blocking
74    }
75
76    fn call_blocking(self: Pin<&mut Self>) -> std::io::Result<usize> {
77        let this = self.project();
78        let f = this
79            .f
80            .take()
81            .expect("the operate method could only be called once");
82        let BufResult(res, data) = f(this.fd1, this.fd2);
83        *this.data = Some(data);
84        res
85    }
86}
87
88unsafe impl<S: AsFd> OpCode for OpenFile<S> {
89    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
90        opcode::OpenAt::new(Fd(self.dirfd.as_fd().as_raw_fd()), self.path.as_ptr())
91            .flags(self.flags | libc::O_CLOEXEC)
92            .mode(self.mode)
93            .build()
94            .into()
95    }
96
97    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
98        self.call()
99    }
100
101    unsafe fn set_result(self: Pin<&mut Self>, res: &io::Result<usize>, _: &Extra) {
102        if let Ok(fd) = res {
103            // SAFETY: fd is a valid fd returned from kernel
104            let fd = unsafe { OwnedFd::from_raw_fd(*fd as _) };
105            *self.project().opened_fd = Some(fd);
106        }
107    }
108}
109
110unsafe impl OpCode for CloseFile {
111    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
112        opcode::Close::new(Fd(self.fd.as_fd().as_raw_fd()))
113            .build()
114            .into()
115    }
116
117    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
118        self.call()
119    }
120}
121
122unsafe impl<S: AsFd> OpCode for TruncateFile<S> {
123    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
124        opcode::Ftruncate::new(Fd(self.fd.as_fd().as_raw_fd()), self.size)
125            .build()
126            .into()
127    }
128
129    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
130        self.call()
131    }
132}
133
134pin_project! {
135    /// Get metadata of an opened file.
136    pub struct FileStat<S> {
137        pub(crate) fd: S,
138        pub(crate) stat: Statx,
139    }
140}
141
142impl<S> FileStat<S> {
143    /// Create [`FileStat`].
144    pub fn new(fd: S) -> Self {
145        Self {
146            fd,
147            stat: unsafe { std::mem::zeroed() },
148        }
149    }
150}
151
152unsafe impl<S: AsFd> OpCode for FileStat<S> {
153    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
154        let this = self.project();
155        static EMPTY_NAME: &[u8] = b"\0";
156        opcode::Statx::new(
157            Fd(this.fd.as_fd().as_fd().as_raw_fd()),
158            EMPTY_NAME.as_ptr().cast(),
159            this.stat as *mut _ as _,
160        )
161        .flags(libc::AT_EMPTY_PATH)
162        .mask(statx_mask())
163        .build()
164        .into()
165    }
166
167    #[cfg(gnulinux)]
168    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
169        let this = self.project();
170        static EMPTY_NAME: &[u8] = b"\0";
171        let res = syscall!(libc::statx(
172            this.fd.as_fd().as_raw_fd(),
173            EMPTY_NAME.as_ptr().cast(),
174            libc::AT_EMPTY_PATH,
175            statx_mask(),
176            this.stat as *mut _ as _
177        ))?;
178        Ok(res as _)
179    }
180
181    #[cfg(not(gnulinux))]
182    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
183        let this = self.project();
184        let mut stat = unsafe { std::mem::zeroed() };
185        let res = syscall!(libc::fstat(this.fd.as_fd().as_raw_fd(), &mut stat))?;
186        *this.stat = stat_to_statx(stat);
187        Ok(res as _)
188    }
189}
190
191impl<S> IntoInner for FileStat<S> {
192    type Inner = Stat;
193
194    fn into_inner(self) -> Self::Inner {
195        statx_to_stat(self.stat)
196    }
197}
198
199pin_project! {
200    /// Get metadata from path.
201    pub struct PathStat<S: AsFd> {
202        pub(crate) dirfd: S,
203        pub(crate) path: CString,
204        pub(crate) stat: Statx,
205        pub(crate) follow_symlink: bool,
206    }
207}
208
209impl<S: AsFd> PathStat<S> {
210    /// Create [`PathStat`].
211    pub fn new(dirfd: S, path: CString, follow_symlink: bool) -> Self {
212        Self {
213            dirfd,
214            path,
215            stat: unsafe { std::mem::zeroed() },
216            follow_symlink,
217        }
218    }
219}
220
221unsafe impl<S: AsFd> OpCode for PathStat<S> {
222    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
223        let this = self.project();
224        let mut flags = libc::AT_EMPTY_PATH;
225        if !*this.follow_symlink {
226            flags |= libc::AT_SYMLINK_NOFOLLOW;
227        }
228        opcode::Statx::new(
229            Fd(this.dirfd.as_fd().as_raw_fd()),
230            this.path.as_ptr(),
231            this.stat as *mut _ as _,
232        )
233        .flags(flags)
234        .mask(statx_mask())
235        .build()
236        .into()
237    }
238
239    #[cfg(gnulinux)]
240    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
241        let this = self.project();
242        let mut flags = libc::AT_EMPTY_PATH;
243        if !*this.follow_symlink {
244            flags |= libc::AT_SYMLINK_NOFOLLOW;
245        }
246        let res = syscall!(libc::statx(
247            this.dirfd.as_fd().as_raw_fd(),
248            this.path.as_ptr(),
249            flags,
250            statx_mask(),
251            this.stat
252        ))?;
253        Ok(res as _)
254    }
255
256    #[cfg(not(gnulinux))]
257    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
258        let this = self.project();
259        let mut flags = libc::AT_EMPTY_PATH;
260        if !*this.follow_symlink {
261            flags |= libc::AT_SYMLINK_NOFOLLOW;
262        }
263        let mut stat = unsafe { std::mem::zeroed() };
264        let res = syscall!(libc::fstatat(
265            this.dirfd.as_fd().as_raw_fd(),
266            this.path.as_ptr(),
267            &mut stat,
268            flags
269        ))?;
270        *this.stat = stat_to_statx(stat);
271        Ok(res as _)
272    }
273}
274
275impl<S: AsFd> IntoInner for PathStat<S> {
276    type Inner = Stat;
277
278    fn into_inner(self) -> Self::Inner {
279        statx_to_stat(self.stat)
280    }
281}
282
283unsafe impl<T: IoBufMut, S: AsFd> OpCode for ReadAt<T, S> {
284    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
285        let this = self.project();
286        let fd = Fd(this.fd.as_fd().as_raw_fd());
287        let slice = this.buffer.sys_slice_mut();
288        opcode::Read::new(
289            fd,
290            slice.ptr() as _,
291            slice.len().try_into().unwrap_or(u32::MAX),
292        )
293        .offset(*this.offset)
294        .build()
295        .into()
296    }
297}
298
299unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectoredAt<T, S> {
300    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
301        let this = self.project();
302        *this.slices = this.buffer.sys_slices_mut();
303        opcode::Readv::new(
304            Fd(this.fd.as_fd().as_raw_fd()),
305            this.slices.as_ptr() as _,
306            this.slices.len().try_into().unwrap_or(u32::MAX),
307        )
308        .offset(*this.offset)
309        .build()
310        .into()
311    }
312}
313
314unsafe impl<T: IoBuf, S: AsFd> OpCode for WriteAt<T, S> {
315    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
316        let slice = self.buffer.as_init();
317        opcode::Write::new(
318            Fd(self.fd.as_fd().as_raw_fd()),
319            slice.as_ptr(),
320            slice.len().try_into().unwrap_or(u32::MAX),
321        )
322        .offset(self.offset)
323        .build()
324        .into()
325    }
326}
327
328unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectoredAt<T, S> {
329    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
330        let this = self.project();
331        *this.slices = this.buffer.as_ref().sys_slices();
332        opcode::Writev::new(
333            Fd(this.fd.as_fd().as_raw_fd()),
334            this.slices.as_ptr() as _,
335            this.slices.len().try_into().unwrap_or(u32::MAX),
336        )
337        .offset(*this.offset)
338        .build()
339        .into()
340    }
341}
342
343unsafe impl<T: IoBufMut, S: AsFd> OpCode for Read<T, S> {
344    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
345        let fd = self.fd.as_fd().as_raw_fd();
346        let slice = self.project().buffer.sys_slice_mut();
347        opcode::Read::new(
348            Fd(fd),
349            slice.ptr() as _,
350            slice.len().try_into().unwrap_or(u32::MAX),
351        )
352        .build()
353        .into()
354    }
355}
356
357unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectored<T, S> {
358    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
359        let this = self.project();
360        *this.slices = this.buffer.sys_slices_mut();
361        opcode::Readv::new(
362            Fd(this.fd.as_fd().as_raw_fd()),
363            this.slices.as_ptr() as _,
364            this.slices.len().try_into().unwrap_or(u32::MAX),
365        )
366        .build()
367        .into()
368    }
369}
370
371unsafe impl<T: IoBuf, S: AsFd> OpCode for Write<T, S> {
372    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
373        let slice = self.buffer.as_init();
374        opcode::Write::new(
375            Fd(self.fd.as_fd().as_raw_fd()),
376            slice.as_ptr(),
377            slice.len().try_into().unwrap_or(u32::MAX),
378        )
379        .build()
380        .into()
381    }
382}
383
384unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectored<T, S> {
385    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
386        let this = self.project();
387        *this.slices = this.buffer.as_ref().sys_slices();
388        opcode::Writev::new(
389            Fd(this.fd.as_fd().as_raw_fd()),
390            this.slices.as_ptr() as _,
391            this.slices.len().try_into().unwrap_or(u32::MAX),
392        )
393        .build()
394        .into()
395    }
396}
397
398unsafe impl<S: AsFd> OpCode for Sync<S> {
399    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
400        opcode::Fsync::new(Fd(self.fd.as_fd().as_raw_fd()))
401            .flags(if self.datasync {
402                FsyncFlags::DATASYNC
403            } else {
404                FsyncFlags::empty()
405            })
406            .build()
407            .into()
408    }
409}
410
411unsafe impl<S: AsFd> OpCode for Unlink<S> {
412    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
413        opcode::UnlinkAt::new(Fd(self.dirfd.as_fd().as_raw_fd()), self.path.as_ptr())
414            .flags(if self.dir { libc::AT_REMOVEDIR } else { 0 })
415            .build()
416            .into()
417    }
418
419    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
420        self.call()
421    }
422}
423
424unsafe impl<S: AsFd> OpCode for CreateDir<S> {
425    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
426        opcode::MkDirAt::new(Fd(self.dirfd.as_fd().as_raw_fd()), self.path.as_ptr())
427            .mode(self.mode)
428            .build()
429            .into()
430    }
431
432    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
433        self.call()
434    }
435}
436
437unsafe impl<S1: AsFd, S2: AsFd> OpCode for Rename<S1, S2> {
438    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
439        opcode::RenameAt::new(
440            Fd(self.old_dirfd.as_fd().as_raw_fd()),
441            self.old_path.as_ptr(),
442            Fd(self.new_dirfd.as_fd().as_raw_fd()),
443            self.new_path.as_ptr(),
444        )
445        .build()
446        .into()
447    }
448
449    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
450        self.call()
451    }
452}
453
454unsafe impl<S: AsFd> OpCode for Symlink<S> {
455    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
456        opcode::SymlinkAt::new(
457            Fd(self.dirfd.as_fd().as_raw_fd()),
458            self.source.as_ptr(),
459            self.target.as_ptr(),
460        )
461        .build()
462        .into()
463    }
464
465    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
466        self.call()
467    }
468}
469
470unsafe impl<S1: AsFd, S2: AsFd> OpCode for HardLink<S1, S2> {
471    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
472        opcode::LinkAt::new(
473            Fd(self.source_dirfd.as_fd().as_raw_fd()),
474            self.source.as_ptr(),
475            Fd(self.target_dirfd.as_fd().as_raw_fd()),
476            self.target.as_ptr(),
477        )
478        .build()
479        .into()
480    }
481
482    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
483        self.call()
484    }
485}
486
487unsafe impl OpCode for CreateSocket {
488    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
489        opcode::Socket::new(
490            self.domain,
491            self.socket_type | libc::SOCK_CLOEXEC,
492            self.protocol,
493        )
494        .build()
495        .into()
496    }
497
498    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
499        Ok(syscall!(libc::socket(
500            self.domain,
501            self.socket_type | libc::SOCK_CLOEXEC,
502            self.protocol
503        ))? as _)
504    }
505
506    unsafe fn set_result(self: Pin<&mut Self>, res: &io::Result<usize>, _: &Extra) {
507        if let Ok(fd) = res {
508            // SAFETY: fd is a valid fd returned from kernel
509            let fd = unsafe { Socket2::from_raw_fd(*fd as _) };
510            *self.project().opened_fd = Some(fd);
511        }
512    }
513}
514
515unsafe impl<S: AsFd> OpCode for ShutdownSocket<S> {
516    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
517        opcode::Shutdown::new(Fd(self.fd.as_fd().as_raw_fd()), self.how())
518            .build()
519            .into()
520    }
521
522    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
523        self.call()
524    }
525}
526
527unsafe impl OpCode for CloseSocket {
528    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
529        opcode::Close::new(Fd(self.fd.as_fd().as_raw_fd()))
530            .build()
531            .into()
532    }
533
534    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
535        self.call()
536    }
537}
538
539unsafe impl<S: AsFd> OpCode for Accept<S> {
540    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
541        let this = self.project();
542        opcode::Accept::new(
543            Fd(this.fd.as_fd().as_raw_fd()),
544            unsafe { this.buffer.view_as::<libc::sockaddr>() },
545            this.addr_len,
546        )
547        .flags(libc::SOCK_CLOEXEC)
548        .build()
549        .into()
550    }
551
552    unsafe fn set_result(self: Pin<&mut Self>, res: &io::Result<usize>, _: &Extra) {
553        if let Ok(fd) = res {
554            // SAFETY: fd is a valid fd returned from kernel
555            let fd = unsafe { Socket2::from_raw_fd(*fd as _) };
556            *self.project().accepted_fd = Some(fd);
557        }
558    }
559}
560
561struct AcceptMultishotResult {
562    res: io::Result<Socket2>,
563    extra: crate::Extra,
564}
565
566impl AcceptMultishotResult {
567    pub unsafe fn new(res: io::Result<usize>, extra: crate::Extra) -> Self {
568        Self {
569            res: res.map(|fd| unsafe { Socket2::from_raw_fd(fd as _) }),
570            extra,
571        }
572    }
573
574    pub fn into_result(self) -> BufResult<usize, crate::Extra> {
575        BufResult(self.res.map(|fd| fd.into_raw_fd() as _), self.extra)
576    }
577}
578
579pin_project! {
580    /// Accept multiple connections.
581    pub struct AcceptMulti<S> {
582        #[pin]
583        pub(crate) op: Accept<S>,
584        multishots: VecDeque<AcceptMultishotResult>
585    }
586}
587
588impl<S> AcceptMulti<S> {
589    /// Create [`AcceptMulti`].
590    pub fn new(fd: S) -> Self {
591        Self {
592            op: Accept::new(fd),
593            multishots: VecDeque::new(),
594        }
595    }
596}
597
598unsafe impl<S: AsFd> OpCode for AcceptMulti<S> {
599    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
600        let this = self.project();
601        opcode::AcceptMulti::new(Fd(this.op.fd.as_fd().as_raw_fd()))
602            .flags(libc::SOCK_CLOEXEC)
603            .build()
604            .into()
605    }
606
607    fn create_entry_fallback(self: Pin<&mut Self>) -> OpEntry {
608        self.project().op.create_entry()
609    }
610
611    unsafe fn set_result(self: Pin<&mut Self>, res: &io::Result<usize>, extra: &crate::Extra) {
612        unsafe { self.project().op.set_result(res, extra) }
613    }
614
615    unsafe fn push_multishot(self: Pin<&mut Self>, res: io::Result<usize>, extra: crate::Extra) {
616        self.project()
617            .multishots
618            .push_back(unsafe { AcceptMultishotResult::new(res, extra) });
619    }
620
621    fn pop_multishot(self: Pin<&mut Self>) -> Option<BufResult<usize, crate::sys::Extra>> {
622        self.project()
623            .multishots
624            .pop_front()
625            .map(|res| res.into_result())
626    }
627}
628
629impl<S> IntoInner for AcceptMulti<S> {
630    type Inner = Socket2;
631
632    fn into_inner(self) -> Self::Inner {
633        self.op.into_inner().0
634    }
635}
636
637unsafe impl<S: AsFd> OpCode for Connect<S> {
638    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
639        opcode::Connect::new(
640            Fd(self.fd.as_fd().as_raw_fd()),
641            self.addr.as_ptr().cast(),
642            self.addr.len(),
643        )
644        .build()
645        .into()
646    }
647}
648
649unsafe impl<T: IoBufMut, S: AsFd> OpCode for Recv<T, S> {
650    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
651        let fd = self.fd.as_fd().as_raw_fd();
652        let flags = self.flags;
653        let slice = self.project().buffer.sys_slice_mut();
654        opcode::Recv::new(
655            Fd(fd),
656            slice.ptr() as _,
657            slice.len().try_into().unwrap_or(u32::MAX),
658        )
659        .flags(flags)
660        .build()
661        .into()
662    }
663}
664
665unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvVectored<T, S> {
666    fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
667        self.as_mut().set_msg();
668        let this = self.project();
669        opcode::RecvMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
670            .flags(*this.flags as _)
671            .build()
672            .into()
673    }
674}
675
676unsafe impl<T: IoBuf, S: AsFd> OpCode for Send<T, S> {
677    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
678        let slice = self.buffer.as_init();
679        opcode::Send::new(
680            Fd(self.fd.as_fd().as_raw_fd()),
681            slice.as_ptr(),
682            slice.len().try_into().unwrap_or(u32::MAX),
683        )
684        .flags(self.flags)
685        .build()
686        .into()
687    }
688}
689
690pin_project! {
691    /// Zerocopy [`Send`].
692    pub struct SendZc<T: IoBuf, S> {
693        #[pin]
694        pub(crate) op: Send<T, S>,
695        pub(crate) res: Option<BufResult<usize, crate::Extra>>,
696        _p: PhantomPinned,
697    }
698}
699
700impl<T: IoBuf, S> SendZc<T, S> {
701    /// Create [`SendZc`].
702    pub fn new(fd: S, buffer: T, flags: i32) -> Self {
703        Self {
704            op: Send::new(fd, buffer, flags),
705            res: None,
706            _p: PhantomPinned,
707        }
708    }
709}
710
711unsafe impl<T: IoBuf, S: AsFd> OpCode for SendZc<T, S> {
712    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
713        let this = self.project();
714        let slice = this.op.buffer.as_init();
715        opcode::SendZc::new(
716            Fd(this.op.fd.as_fd().as_raw_fd()),
717            slice.as_ptr(),
718            slice.len().try_into().unwrap_or(u32::MAX),
719        )
720        .flags(this.op.flags)
721        .build()
722        .into()
723    }
724
725    fn create_entry_fallback(self: Pin<&mut Self>) -> OpEntry {
726        self.project().op.create_entry()
727    }
728
729    unsafe fn push_multishot(self: Pin<&mut Self>, res: io::Result<usize>, extra: crate::Extra) {
730        self.project().res.replace(BufResult(res, extra));
731    }
732
733    fn pop_multishot(self: Pin<&mut Self>) -> Option<BufResult<usize, crate::Extra>> {
734        self.project().res.take()
735    }
736}
737
738impl<T: IoBuf, S> IntoInner for SendZc<T, S> {
739    type Inner = T;
740
741    fn into_inner(self) -> Self::Inner {
742        self.op.into_inner()
743    }
744}
745
746unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for SendVectored<T, S> {
747    fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
748        self.as_mut().set_msg();
749        let this = self.project();
750        opcode::SendMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
751            .flags(*this.flags as _)
752            .build()
753            .into()
754    }
755}
756
757pin_project! {
758    /// Zerocopy [`SendVectored`].
759    pub struct SendVectoredZc<T: IoVectoredBuf, S> {
760        #[pin]
761        pub(crate) op: SendVectored<T, S>,
762        pub(crate) res: Option<BufResult<usize, crate::Extra>>,
763        _p: PhantomPinned,
764    }
765}
766
767impl<T: IoVectoredBuf, S> SendVectoredZc<T, S> {
768    /// Create [`SendVectoredZc`].
769    pub fn new(fd: S, buffer: T, flags: i32) -> Self {
770        Self {
771            op: SendVectored::new(fd, buffer, flags),
772            res: None,
773            _p: PhantomPinned,
774        }
775    }
776}
777
778unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for SendVectoredZc<T, S> {
779    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
780        let mut this = self.project();
781        this.op.as_mut().set_msg();
782        let op = this.op.project();
783        opcode::SendMsgZc::new(Fd(op.fd.as_fd().as_raw_fd()), op.msg)
784            .flags(*op.flags as _)
785            .build()
786            .into()
787    }
788
789    fn create_entry_fallback(self: Pin<&mut Self>) -> OpEntry {
790        self.project().op.create_entry()
791    }
792
793    unsafe fn push_multishot(self: Pin<&mut Self>, res: io::Result<usize>, extra: crate::Extra) {
794        self.project().res.replace(BufResult(res, extra));
795    }
796
797    fn pop_multishot(self: Pin<&mut Self>) -> Option<BufResult<usize, crate::Extra>> {
798        self.project().res.take()
799    }
800}
801
802impl<T: IoVectoredBuf, S> IntoInner for SendVectoredZc<T, S> {
803    type Inner = T;
804
805    fn into_inner(self) -> Self::Inner {
806        self.op.into_inner()
807    }
808}
809
810struct RecvFromHeader<S> {
811    pub(crate) fd: S,
812    pub(crate) addr: SockAddrStorage,
813    pub(crate) msg: libc::msghdr,
814    pub(crate) flags: i32,
815    _p: PhantomPinned,
816}
817
818impl<S> RecvFromHeader<S> {
819    pub fn new(fd: S, flags: i32) -> Self {
820        Self {
821            fd,
822            addr: SockAddrStorage::zeroed(),
823            msg: unsafe { std::mem::zeroed() },
824            flags,
825            _p: PhantomPinned,
826        }
827    }
828}
829
830impl<S: AsFd> RecvFromHeader<S> {
831    pub fn create_entry(&mut self, slices: &mut [SysSlice]) -> OpEntry {
832        self.msg.msg_name = &mut self.addr as *mut _ as _;
833        self.msg.msg_namelen = self.addr.size_of() as _;
834        self.msg.msg_iov = slices.as_mut_ptr() as _;
835        self.msg.msg_iovlen = slices.len() as _;
836        opcode::RecvMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &mut self.msg)
837            .flags(self.flags as _)
838            .build()
839            .into()
840    }
841
842    pub fn into_addr(self) -> (SockAddrStorage, socklen_t) {
843        (self.addr, self.msg.msg_namelen)
844    }
845}
846
847pin_project! {
848    /// Receive data and source address.
849    pub struct RecvFrom<T: IoBufMut, S> {
850        header: RecvFromHeader<S>,
851        #[pin]
852        buffer: T,
853        slice: Option<SysSlice>,
854    }
855}
856
857impl<T: IoBufMut, S> RecvFrom<T, S> {
858    /// Create [`RecvFrom`].
859    pub fn new(fd: S, buffer: T, flags: i32) -> Self {
860        Self {
861            header: RecvFromHeader::new(fd, flags),
862            buffer,
863            slice: None,
864        }
865    }
866}
867
868unsafe impl<T: IoBufMut, S: AsFd> OpCode for RecvFrom<T, S> {
869    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
870        let this = self.project();
871        let slice = this.slice.insert(this.buffer.sys_slice_mut());
872        this.header.create_entry(std::slice::from_mut(slice))
873    }
874}
875
876impl<T: IoBufMut, S: AsFd> IntoInner for RecvFrom<T, S> {
877    type Inner = (T, SockAddrStorage, socklen_t);
878
879    fn into_inner(self) -> Self::Inner {
880        let (addr, addr_len) = self.header.into_addr();
881        (self.buffer, addr, addr_len)
882    }
883}
884
885pin_project! {
886    /// Receive data and source address into vectored buffer.
887    pub struct RecvFromVectored<T: IoVectoredBufMut, S> {
888        header: RecvFromHeader<S>,
889        #[pin]
890        buffer: T,
891        slice: Vec<SysSlice>,
892    }
893}
894
895impl<T: IoVectoredBufMut, S> RecvFromVectored<T, S> {
896    /// Create [`RecvFromVectored`].
897    pub fn new(fd: S, buffer: T, flags: i32) -> Self {
898        Self {
899            header: RecvFromHeader::new(fd, flags),
900            buffer,
901            slice: vec![],
902        }
903    }
904}
905
906unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvFromVectored<T, S> {
907    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
908        let this = self.project();
909        *this.slice = this.buffer.sys_slices_mut();
910        this.header.create_entry(this.slice)
911    }
912}
913
914impl<T: IoVectoredBufMut, S: AsFd> IntoInner for RecvFromVectored<T, S> {
915    type Inner = (T, SockAddrStorage, socklen_t);
916
917    fn into_inner(self) -> Self::Inner {
918        let (addr, addr_len) = self.header.into_addr();
919        (self.buffer, addr, addr_len)
920    }
921}
922
923struct SendToHeader<S> {
924    pub(crate) fd: S,
925    pub(crate) addr: SockAddr,
926    pub(crate) msg: libc::msghdr,
927    pub(crate) flags: i32,
928    _p: PhantomPinned,
929}
930
931impl<S> SendToHeader<S> {
932    pub fn new(fd: S, addr: SockAddr, flags: i32) -> Self {
933        Self {
934            fd,
935            addr,
936            msg: unsafe { std::mem::zeroed() },
937            flags,
938            _p: PhantomPinned,
939        }
940    }
941}
942
943impl<S: AsFd> SendToHeader<S> {
944    pub fn set_msg(&mut self, slices: &mut [SysSlice]) {
945        self.msg.msg_name = self.addr.as_ptr() as _;
946        self.msg.msg_namelen = self.addr.len();
947        self.msg.msg_iov = slices.as_mut_ptr() as _;
948        self.msg.msg_iovlen = slices.len() as _;
949    }
950}
951
952pin_project! {
953    /// Send data to specified address.
954    pub struct SendTo<T: IoBuf, S> {
955        header: SendToHeader<S>,
956        #[pin]
957        buffer: T,
958        slice: Option<SysSlice>,
959    }
960}
961
962impl<T: IoBuf, S> SendTo<T, S> {
963    /// Create [`SendTo`].
964    pub fn new(fd: S, buffer: T, addr: SockAddr, flags: i32) -> Self {
965        Self {
966            header: SendToHeader::new(fd, addr, flags),
967            buffer,
968            slice: None,
969        }
970    }
971}
972
973unsafe impl<T: IoBuf, S: AsFd> OpCode for SendTo<T, S> {
974    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
975        let this = self.project();
976        let slice = this.slice.insert(this.buffer.as_ref().sys_slice());
977        this.header.set_msg(std::slice::from_mut(slice));
978        opcode::SendMsg::new(Fd(this.header.fd.as_fd().as_raw_fd()), &this.header.msg)
979            .flags(this.header.flags as _)
980            .build()
981            .into()
982    }
983}
984
985impl<T: IoBuf, S> IntoInner for SendTo<T, S> {
986    type Inner = T;
987
988    fn into_inner(self) -> Self::Inner {
989        self.buffer
990    }
991}
992
993pin_project! {
994    /// Zerocopy [`SendTo`].
995    pub struct SendToZc<T: IoBuf, S: AsFd> {
996        #[pin]
997        pub(crate) op: SendTo<T, S>,
998        pub(crate) res: Option<BufResult<usize, crate::Extra>>,
999        _p: PhantomPinned,
1000    }
1001}
1002
1003impl<T: IoBuf, S: AsFd> SendToZc<T, S> {
1004    /// Create [`SendToZc`].
1005    pub fn new(fd: S, buffer: T, addr: SockAddr, flags: i32) -> Self {
1006        Self {
1007            op: SendTo::new(fd, buffer, addr, flags),
1008            res: None,
1009            _p: PhantomPinned,
1010        }
1011    }
1012}
1013
1014unsafe impl<T: IoBuf, S: AsFd> OpCode for SendToZc<T, S> {
1015    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
1016        let this = self.project().op.project();
1017        let slice = this.slice.insert(this.buffer.as_ref().sys_slice());
1018        this.header.set_msg(std::slice::from_mut(slice));
1019        opcode::SendMsgZc::new(Fd(this.header.fd.as_fd().as_raw_fd()), &this.header.msg)
1020            .flags(this.header.flags as _)
1021            .build()
1022            .into()
1023    }
1024
1025    fn create_entry_fallback(self: Pin<&mut Self>) -> OpEntry {
1026        self.project().op.create_entry()
1027    }
1028
1029    unsafe fn push_multishot(self: Pin<&mut Self>, res: io::Result<usize>, extra: crate::Extra) {
1030        self.project().res.replace(BufResult(res, extra));
1031    }
1032
1033    fn pop_multishot(self: Pin<&mut Self>) -> Option<BufResult<usize, crate::Extra>> {
1034        self.project().res.take()
1035    }
1036}
1037
1038impl<T: IoBuf, S: AsFd> IntoInner for SendToZc<T, S> {
1039    type Inner = T;
1040
1041    fn into_inner(self) -> Self::Inner {
1042        self.op.into_inner()
1043    }
1044}
1045
1046pin_project! {
1047    /// Send data to specified address from vectored buffer.
1048    pub struct SendToVectored<T: IoVectoredBuf, S> {
1049        header: SendToHeader<S>,
1050        #[pin]
1051        buffer: T,
1052        slice: Vec<SysSlice>,
1053    }
1054}
1055
1056impl<T: IoVectoredBuf, S> SendToVectored<T, S> {
1057    /// Create [`SendToVectored`].
1058    pub fn new(fd: S, buffer: T, addr: SockAddr, flags: i32) -> Self {
1059        Self {
1060            header: SendToHeader::new(fd, addr, flags),
1061            buffer,
1062            slice: vec![],
1063        }
1064    }
1065}
1066
1067unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for SendToVectored<T, S> {
1068    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
1069        let this = self.project();
1070        *this.slice = this.buffer.as_ref().sys_slices();
1071        this.header.set_msg(this.slice);
1072        opcode::SendMsg::new(Fd(this.header.fd.as_fd().as_raw_fd()), &this.header.msg)
1073            .flags(this.header.flags as _)
1074            .build()
1075            .into()
1076    }
1077}
1078
1079impl<T: IoVectoredBuf, S> IntoInner for SendToVectored<T, S> {
1080    type Inner = T;
1081
1082    fn into_inner(self) -> Self::Inner {
1083        self.buffer
1084    }
1085}
1086
1087pin_project! {
1088    /// Zerocopy [`SendToVectored`].
1089    pub struct SendToVectoredZc<T: IoVectoredBuf, S: AsFd> {
1090        #[pin]
1091        pub(crate) op: SendToVectored<T, S>,
1092        pub(crate) res: Option<BufResult<usize, crate::Extra>>,
1093        _p: PhantomPinned,
1094    }
1095}
1096
1097impl<T: IoVectoredBuf, S: AsFd> SendToVectoredZc<T, S> {
1098    /// Create [`SendToVectoredZc`].
1099    pub fn new(fd: S, buffer: T, addr: SockAddr, flags: i32) -> Self {
1100        Self {
1101            op: SendToVectored::new(fd, buffer, addr, flags),
1102            res: None,
1103            _p: PhantomPinned,
1104        }
1105    }
1106}
1107
1108unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for SendToVectoredZc<T, S> {
1109    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
1110        let this = self.project().op.project();
1111        *this.slice = this.buffer.as_ref().sys_slices();
1112        this.header.set_msg(this.slice);
1113        opcode::SendMsgZc::new(Fd(this.header.fd.as_fd().as_raw_fd()), &this.header.msg)
1114            .flags(this.header.flags as _)
1115            .build()
1116            .into()
1117    }
1118
1119    fn create_entry_fallback(self: Pin<&mut Self>) -> OpEntry {
1120        self.project().op.create_entry()
1121    }
1122
1123    unsafe fn push_multishot(self: Pin<&mut Self>, res: io::Result<usize>, extra: crate::Extra) {
1124        self.project().res.replace(BufResult(res, extra));
1125    }
1126
1127    fn pop_multishot(self: Pin<&mut Self>) -> Option<BufResult<usize, crate::Extra>> {
1128        self.project().res.take()
1129    }
1130}
1131
1132impl<T: IoVectoredBuf, S: AsFd> IntoInner for SendToVectoredZc<T, S> {
1133    type Inner = T;
1134
1135    fn into_inner(self) -> Self::Inner {
1136        self.op.into_inner()
1137    }
1138}
1139
1140unsafe impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> OpCode for RecvMsg<T, C, S> {
1141    fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
1142        self.as_mut().set_msg();
1143        let this = self.project();
1144        opcode::RecvMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
1145            .flags(*this.flags as _)
1146            .build()
1147            .into()
1148    }
1149}
1150
1151unsafe impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> OpCode for SendMsg<T, C, S> {
1152    fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
1153        self.as_mut().set_msg();
1154        let this = self.project();
1155        opcode::SendMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
1156            .flags(*this.flags as _)
1157            .build()
1158            .into()
1159    }
1160}
1161
1162pin_project! {
1163    /// Zerocopy [`SendMsg`].
1164    pub struct SendMsgZc<T: IoVectoredBuf, C: IoBuf, S> {
1165        #[pin]
1166        pub(crate) op: SendMsg<T, C, S>,
1167        pub(crate) res: Option<BufResult<usize, crate::Extra>>,
1168        _p: PhantomPinned,
1169    }
1170}
1171
1172impl<T: IoVectoredBuf, C: IoBuf, S> SendMsgZc<T, C, S> {
1173    /// Create [`SendMsgZc`].
1174    pub fn new(fd: S, buffer: T, control: C, addr: Option<SockAddr>, flags: i32) -> Self {
1175        Self {
1176            op: SendMsg::new(fd, buffer, control, addr, flags),
1177            res: None,
1178            _p: PhantomPinned,
1179        }
1180    }
1181}
1182
1183unsafe impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> OpCode for SendMsgZc<T, C, S> {
1184    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
1185        let mut this = self.project();
1186        this.op.as_mut().set_msg();
1187        let op = this.op.project();
1188        opcode::SendMsgZc::new(Fd(op.fd.as_fd().as_raw_fd()), op.msg)
1189            .flags(*op.flags as _)
1190            .build()
1191            .into()
1192    }
1193
1194    fn create_entry_fallback(self: Pin<&mut Self>) -> OpEntry {
1195        self.project().op.create_entry()
1196    }
1197
1198    unsafe fn push_multishot(self: Pin<&mut Self>, res: io::Result<usize>, extra: crate::Extra) {
1199        self.project().res.replace(BufResult(res, extra));
1200    }
1201
1202    fn pop_multishot(self: Pin<&mut Self>) -> Option<BufResult<usize, crate::Extra>> {
1203        self.project().res.take()
1204    }
1205}
1206impl<T: IoVectoredBuf, C: IoBuf, S> IntoInner for SendMsgZc<T, C, S> {
1207    type Inner = (T, C);
1208
1209    fn into_inner(self) -> Self::Inner {
1210        self.op.into_inner()
1211    }
1212}
1213
1214unsafe impl<S: AsFd> OpCode for PollOnce<S> {
1215    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
1216        let flags = match self.interest {
1217            Interest::Readable => libc::POLLIN,
1218            Interest::Writable => libc::POLLOUT,
1219        };
1220        opcode::PollAdd::new(Fd(self.fd.as_fd().as_raw_fd()), flags as _)
1221            .build()
1222            .into()
1223    }
1224}
1225
1226unsafe impl<S1: AsFd, S2: AsFd> OpCode for Splice<S1, S2> {
1227    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
1228        opcode::Splice::new(
1229            Fd(self.fd_in.as_fd().as_raw_fd()),
1230            self.offset_in,
1231            Fd(self.fd_out.as_fd().as_raw_fd()),
1232            self.offset_out,
1233            self.len.try_into().unwrap_or(u32::MAX),
1234        )
1235        .flags(self.flags)
1236        .build()
1237        .into()
1238    }
1239
1240    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
1241        let mut offset_in = self.offset_in;
1242        let mut offset_out = self.offset_out;
1243        let offset_in_ptr = if offset_in < 0 {
1244            std::ptr::null_mut()
1245        } else {
1246            &mut offset_in
1247        };
1248        let offset_out_ptr = if offset_out < 0 {
1249            std::ptr::null_mut()
1250        } else {
1251            &mut offset_out
1252        };
1253        Ok(syscall!(libc::splice(
1254            self.fd_in.as_fd().as_raw_fd(),
1255            offset_in_ptr,
1256            self.fd_out.as_fd().as_raw_fd(),
1257            offset_out_ptr,
1258            self.len,
1259            self.flags,
1260        ))? as _)
1261    }
1262}
1263
1264mod buf_ring {
1265    use std::{
1266        collections::VecDeque,
1267        io,
1268        marker::PhantomPinned,
1269        os::fd::{AsFd, AsRawFd},
1270        pin::Pin,
1271        ptr,
1272    };
1273
1274    use compio_buf::BufResult;
1275    use io_uring::{opcode, squeue::Flags, types::Fd};
1276    use pin_project_lite::pin_project;
1277    use socket2::{SockAddr, SockAddrStorage, socklen_t};
1278
1279    use super::{Extra, OpCode};
1280    use crate::{
1281        BorrowedBuffer, BufferPool, IoUringBufferPool, IoUringOwnedBuffer, OpEntry, TakeBuffer,
1282    };
1283
1284    pub(crate) fn take_buffer(
1285        buffer_pool: &BufferPool,
1286        result: io::Result<usize>,
1287        buffer_id: u16,
1288    ) -> io::Result<BorrowedBuffer<'_>> {
1289        #[cfg(fusion)]
1290        let buffer_pool = buffer_pool.as_io_uring();
1291        let result = result.inspect_err(|_| buffer_pool.reuse_buffer(buffer_id))?;
1292        // SAFETY: result is valid
1293        let buffer = unsafe { buffer_pool.get_buffer(buffer_id, result) };
1294        let res = unsafe { buffer_pool.create_proxy(buffer, result) }?;
1295        #[cfg(fusion)]
1296        let res = BorrowedBuffer::new_io_uring(res);
1297        Ok(res)
1298    }
1299
1300    pin_project! {
1301        /// Read a file at specified position into specified buffer.
1302        pub struct ReadManagedAt<S> {
1303            pub(crate) fd: S,
1304            pub(crate) offset: u64,
1305            buffer_group: u16,
1306            len: u32,
1307            pool: IoUringBufferPool,
1308            buffer: Option<IoUringOwnedBuffer>,
1309            _p: PhantomPinned,
1310        }
1311    }
1312
1313    impl<S> ReadManagedAt<S> {
1314        /// Create [`ReadManagedAt`].
1315        pub fn new(fd: S, offset: u64, buffer_pool: &BufferPool, len: usize) -> io::Result<Self> {
1316            #[cfg(fusion)]
1317            let buffer_pool = buffer_pool.as_io_uring();
1318            Ok(Self {
1319                fd,
1320                offset,
1321                buffer_group: buffer_pool.buffer_group(),
1322                len: len.try_into().map_err(|_| {
1323                    io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
1324                })?,
1325                pool: buffer_pool.clone(),
1326                buffer: None,
1327                _p: PhantomPinned,
1328            })
1329        }
1330    }
1331
1332    unsafe impl<S: AsFd> OpCode for ReadManagedAt<S> {
1333        fn create_entry(self: Pin<&mut Self>) -> OpEntry {
1334            let fd = Fd(self.fd.as_fd().as_raw_fd());
1335            let offset = self.offset;
1336            opcode::Read::new(fd, ptr::null_mut(), self.len)
1337                .offset(offset)
1338                .buf_group(self.buffer_group)
1339                .build()
1340                .flags(Flags::BUFFER_SELECT)
1341                .into()
1342        }
1343
1344        unsafe fn set_result(self: Pin<&mut Self>, res: &io::Result<usize>, extra: &Extra) {
1345            if let Ok(buffer_id) = extra.buffer_id() {
1346                let this = self.project();
1347                this.buffer.replace(unsafe {
1348                    this.pool.get_buffer(buffer_id, *res.as_ref().unwrap_or(&0))
1349                });
1350            }
1351        }
1352    }
1353
1354    impl<S> TakeBuffer for ReadManagedAt<S> {
1355        type Buffer<'a> = BorrowedBuffer<'a>;
1356        type BufferPool = BufferPool;
1357
1358        fn take_buffer(
1359            mut self,
1360            buffer_pool: &Self::BufferPool,
1361            result: io::Result<usize>,
1362            buffer_id: u16,
1363        ) -> io::Result<Self::Buffer<'_>> {
1364            self.buffer.take().map(|buf| buf.leak());
1365            take_buffer(buffer_pool, result, buffer_id)
1366        }
1367    }
1368
1369    pin_project! {
1370        /// Read a file.
1371        pub struct ReadManaged<S> {
1372            fd: S,
1373            buffer_group: u16,
1374            len: u32,
1375            pool: IoUringBufferPool,
1376            buffer: Option<IoUringOwnedBuffer>,
1377            _p: PhantomPinned,
1378        }
1379    }
1380
1381    impl<S> ReadManaged<S> {
1382        /// Create [`ReadManaged`].
1383        pub fn new(fd: S, buffer_pool: &BufferPool, len: usize) -> io::Result<Self> {
1384            #[cfg(fusion)]
1385            let buffer_pool = buffer_pool.as_io_uring();
1386            Ok(Self {
1387                fd,
1388                buffer_group: buffer_pool.buffer_group(),
1389                len: len.try_into().map_err(|_| {
1390                    io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
1391                })?,
1392                pool: buffer_pool.clone(),
1393                buffer: None,
1394                _p: PhantomPinned,
1395            })
1396        }
1397    }
1398
1399    unsafe impl<S: AsFd> OpCode for ReadManaged<S> {
1400        fn create_entry(self: Pin<&mut Self>) -> OpEntry {
1401            let fd = self.fd.as_fd().as_raw_fd();
1402            opcode::Read::new(Fd(fd), ptr::null_mut(), self.len)
1403                .buf_group(self.buffer_group)
1404                .offset(u64::MAX)
1405                .build()
1406                .flags(Flags::BUFFER_SELECT)
1407                .into()
1408        }
1409
1410        unsafe fn set_result(self: Pin<&mut Self>, res: &io::Result<usize>, extra: &Extra) {
1411            if let Ok(buffer_id) = extra.buffer_id() {
1412                let this = self.project();
1413                this.buffer.replace(unsafe {
1414                    this.pool.get_buffer(buffer_id, *res.as_ref().unwrap_or(&0))
1415                });
1416            }
1417        }
1418    }
1419
1420    impl<S> TakeBuffer for ReadManaged<S> {
1421        type Buffer<'a> = BorrowedBuffer<'a>;
1422        type BufferPool = BufferPool;
1423
1424        fn take_buffer(
1425            mut self,
1426            buffer_pool: &Self::BufferPool,
1427            result: io::Result<usize>,
1428            buffer_id: u16,
1429        ) -> io::Result<Self::Buffer<'_>> {
1430            self.buffer.take().map(|buf| buf.leak());
1431            take_buffer(buffer_pool, result, buffer_id)
1432        }
1433    }
1434
1435    pin_project! {
1436        /// Receive data from remote.
1437        pub struct RecvManaged<S> {
1438            fd: S,
1439            buffer_group: u16,
1440            len: u32,
1441            flags: i32,
1442            pool: IoUringBufferPool,
1443            buffer: Option<IoUringOwnedBuffer>,
1444            _p: PhantomPinned,
1445        }
1446    }
1447
1448    impl<S> RecvManaged<S> {
1449        /// Create [`RecvManaged`].
1450        pub fn new(fd: S, buffer_pool: &BufferPool, len: usize, flags: i32) -> io::Result<Self> {
1451            #[cfg(fusion)]
1452            let buffer_pool = buffer_pool.as_io_uring();
1453            Ok(Self {
1454                fd,
1455                buffer_group: buffer_pool.buffer_group(),
1456                len: len.try_into().map_err(|_| {
1457                    io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
1458                })?,
1459                flags,
1460                pool: buffer_pool.clone(),
1461                buffer: None,
1462                _p: PhantomPinned,
1463            })
1464        }
1465    }
1466
1467    unsafe impl<S: AsFd> OpCode for RecvManaged<S> {
1468        fn create_entry(self: Pin<&mut Self>) -> OpEntry {
1469            let fd = self.fd.as_fd().as_raw_fd();
1470            opcode::Recv::new(Fd(fd), ptr::null_mut(), self.len)
1471                .flags(self.flags)
1472                .buf_group(self.buffer_group)
1473                .build()
1474                .flags(Flags::BUFFER_SELECT)
1475                .into()
1476        }
1477
1478        unsafe fn set_result(self: Pin<&mut Self>, res: &io::Result<usize>, extra: &Extra) {
1479            if let Ok(buffer_id) = extra.buffer_id() {
1480                let this = self.project();
1481                this.buffer.replace(unsafe {
1482                    this.pool.get_buffer(buffer_id, *res.as_ref().unwrap_or(&0))
1483                });
1484            }
1485        }
1486    }
1487
1488    impl<S> TakeBuffer for RecvManaged<S> {
1489        type Buffer<'a> = BorrowedBuffer<'a>;
1490        type BufferPool = BufferPool;
1491
1492        fn take_buffer(
1493            mut self,
1494            buffer_pool: &Self::BufferPool,
1495            result: io::Result<usize>,
1496            buffer_id: u16,
1497        ) -> io::Result<Self::Buffer<'_>> {
1498            self.buffer.take().map(|buf| buf.leak());
1499            take_buffer(buffer_pool, result, buffer_id)
1500        }
1501    }
1502
1503    pin_project! {
1504        /// Receive data and source address into managed buffer.
1505        pub struct RecvFromManaged<S> {
1506            fd: S,
1507            buffer_group: u16,
1508            flags: i32,
1509            addr: SockAddrStorage,
1510            addr_len: socklen_t,
1511            iovec: libc::iovec,
1512            msg: libc::msghdr,
1513            pool: IoUringBufferPool,
1514            buffer: Option<IoUringOwnedBuffer>,
1515            _p: PhantomPinned,
1516        }
1517    }
1518
1519    impl<S> RecvFromManaged<S> {
1520        /// Create [`RecvFromManaged`].
1521        pub fn new(fd: S, buffer_pool: &BufferPool, len: usize, flags: i32) -> io::Result<Self> {
1522            #[cfg(fusion)]
1523            let buffer_pool = buffer_pool.as_io_uring();
1524            let len: u32 = len.try_into().map_err(|_| {
1525                io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
1526            })?;
1527            let addr = SockAddrStorage::zeroed();
1528            Ok(Self {
1529                fd,
1530                buffer_group: buffer_pool.buffer_group(),
1531                flags,
1532                addr_len: addr.size_of() as _,
1533                addr,
1534                iovec: libc::iovec {
1535                    iov_base: ptr::null_mut(),
1536                    iov_len: len as _,
1537                },
1538                msg: unsafe { std::mem::zeroed() },
1539                pool: buffer_pool.clone(),
1540                buffer: None,
1541                _p: PhantomPinned,
1542            })
1543        }
1544    }
1545
1546    unsafe impl<S: AsFd> OpCode for RecvFromManaged<S> {
1547        fn create_entry(self: Pin<&mut Self>) -> OpEntry {
1548            let this = self.project();
1549            this.msg.msg_name = this.addr as *mut _ as _;
1550            this.msg.msg_namelen = *this.addr_len;
1551            this.msg.msg_iov = this.iovec as *const _ as *mut _;
1552            this.msg.msg_iovlen = 1;
1553            opcode::RecvMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
1554                .flags(*this.flags as _)
1555                .buf_group(*this.buffer_group)
1556                .build()
1557                .flags(Flags::BUFFER_SELECT)
1558                .into()
1559        }
1560
1561        unsafe fn set_result(self: Pin<&mut Self>, res: &io::Result<usize>, extra: &Extra) {
1562            if let Ok(buffer_id) = extra.buffer_id() {
1563                let this = self.project();
1564                this.buffer.replace(unsafe {
1565                    this.pool.get_buffer(buffer_id, *res.as_ref().unwrap_or(&0))
1566                });
1567            }
1568        }
1569    }
1570
1571    impl<S> TakeBuffer for RecvFromManaged<S> {
1572        type Buffer<'a> = (BorrowedBuffer<'a>, Option<SockAddr>);
1573        type BufferPool = BufferPool;
1574
1575        fn take_buffer(
1576            mut self,
1577            buffer_pool: &Self::BufferPool,
1578            result: io::Result<usize>,
1579            buffer_id: u16,
1580        ) -> io::Result<Self::Buffer<'_>> {
1581            #[cfg(fusion)]
1582            let buffer_pool = buffer_pool.as_io_uring();
1583            let result = result.inspect_err(|_| buffer_pool.reuse_buffer(buffer_id))?;
1584            let addr =
1585                (self.addr_len > 0).then(|| unsafe { SockAddr::new(self.addr, self.addr_len) });
1586            // SAFETY: result is valid
1587            let buffer = self
1588                .buffer
1589                .take()
1590                .unwrap_or_else(|| unsafe { buffer_pool.get_buffer(buffer_id, result) });
1591            let buffer = unsafe { buffer_pool.create_proxy(buffer, result) }?;
1592            #[cfg(fusion)]
1593            let buffer = BorrowedBuffer::new_io_uring(buffer);
1594            Ok((buffer, addr))
1595        }
1596    }
1597
1598    struct MultishotResult {
1599        result: io::Result<usize>,
1600        extra: Extra,
1601        buffer: Option<IoUringOwnedBuffer>,
1602    }
1603
1604    impl MultishotResult {
1605        pub fn new(result: io::Result<usize>, extra: Extra, pool: &IoUringBufferPool) -> Self {
1606            let buffer = extra
1607                .buffer_id()
1608                .map(|buffer_id| unsafe {
1609                    pool.get_buffer(buffer_id, *result.as_ref().unwrap_or(&0))
1610                })
1611                .ok();
1612            Self {
1613                result,
1614                extra,
1615                buffer,
1616            }
1617        }
1618
1619        pub fn leak(&mut self) {
1620            self.buffer.take().map(|buf| buf.leak());
1621        }
1622    }
1623
1624    pin_project! {
1625        /// Read a file at specified position into multiple managed buffers.
1626        pub struct ReadMultiAt<S> {
1627            #[pin]
1628            inner: ReadManagedAt<S>,
1629            multishots: VecDeque<MultishotResult>
1630        }
1631    }
1632
1633    impl<S> ReadMultiAt<S> {
1634        /// Create [`ReadMultiAt`].
1635        pub fn new(fd: S, offset: u64, buffer_pool: &BufferPool, len: usize) -> io::Result<Self> {
1636            Ok(Self {
1637                inner: ReadManagedAt::new(fd, offset, buffer_pool, len)?,
1638                multishots: VecDeque::new(),
1639            })
1640        }
1641    }
1642
1643    unsafe impl<S: AsFd> OpCode for ReadMultiAt<S> {
1644        fn create_entry(self: Pin<&mut Self>) -> OpEntry {
1645            let fd = self.inner.fd.as_fd().as_raw_fd();
1646            opcode::ReadMulti::new(Fd(fd), self.inner.len, self.inner.buffer_group)
1647                .offset(self.inner.offset)
1648                .build()
1649                .into()
1650        }
1651
1652        fn create_entry_fallback(self: Pin<&mut Self>) -> OpEntry {
1653            self.project().inner.create_entry()
1654        }
1655
1656        unsafe fn set_result(self: Pin<&mut Self>, res: &io::Result<usize>, extra: &crate::Extra) {
1657            unsafe { self.project().inner.set_result(res, extra) }
1658        }
1659
1660        unsafe fn push_multishot(
1661            self: Pin<&mut Self>,
1662            res: io::Result<usize>,
1663            extra: crate::Extra,
1664        ) {
1665            let this = self.project();
1666            this.multishots
1667                .push_back(MultishotResult::new(res, extra, &this.inner.pool));
1668        }
1669
1670        fn pop_multishot(self: Pin<&mut Self>) -> Option<BufResult<usize, crate::sys::Extra>> {
1671            self.project().multishots.pop_front().map(|mut proxy| {
1672                proxy.leak();
1673                BufResult(proxy.result, proxy.extra)
1674            })
1675        }
1676    }
1677
1678    impl<S> TakeBuffer for ReadMultiAt<S> {
1679        type Buffer<'a> = BorrowedBuffer<'a>;
1680        type BufferPool = BufferPool;
1681
1682        fn take_buffer(
1683            self,
1684            buffer_pool: &Self::BufferPool,
1685            result: io::Result<usize>,
1686            buffer_id: u16,
1687        ) -> io::Result<Self::Buffer<'_>> {
1688            self.inner.take_buffer(buffer_pool, result, buffer_id)
1689        }
1690    }
1691
1692    pin_project! {
1693        /// Read a file into multiple managed buffers.
1694        pub struct ReadMulti<S> {
1695            #[pin]
1696            inner: ReadManaged<S>,
1697            multishots: VecDeque<MultishotResult>
1698        }
1699    }
1700
1701    impl<S> ReadMulti<S> {
1702        /// Create [`ReadMulti`].
1703        pub fn new(fd: S, buffer_pool: &BufferPool, len: usize) -> io::Result<Self> {
1704            Ok(Self {
1705                inner: ReadManaged::new(fd, buffer_pool, len)?,
1706                multishots: VecDeque::new(),
1707            })
1708        }
1709    }
1710
1711    unsafe impl<S: AsFd> OpCode for ReadMulti<S> {
1712        fn create_entry(self: Pin<&mut Self>) -> OpEntry {
1713            let fd = self.inner.fd.as_fd().as_raw_fd();
1714            opcode::ReadMulti::new(Fd(fd), self.inner.len, self.inner.buffer_group)
1715                .offset(u64::MAX)
1716                .build()
1717                .into()
1718        }
1719
1720        fn create_entry_fallback(self: Pin<&mut Self>) -> OpEntry {
1721            self.project().inner.create_entry()
1722        }
1723
1724        unsafe fn set_result(self: Pin<&mut Self>, res: &io::Result<usize>, extra: &crate::Extra) {
1725            unsafe { self.project().inner.set_result(res, extra) }
1726        }
1727
1728        unsafe fn push_multishot(
1729            self: Pin<&mut Self>,
1730            res: io::Result<usize>,
1731            extra: crate::Extra,
1732        ) {
1733            let this = self.project();
1734            this.multishots
1735                .push_back(MultishotResult::new(res, extra, &this.inner.pool));
1736        }
1737
1738        fn pop_multishot(self: Pin<&mut Self>) -> Option<BufResult<usize, crate::sys::Extra>> {
1739            self.project().multishots.pop_front().map(|mut proxy| {
1740                proxy.leak();
1741                BufResult(proxy.result, proxy.extra)
1742            })
1743        }
1744    }
1745
1746    impl<S> TakeBuffer for ReadMulti<S> {
1747        type Buffer<'a> = BorrowedBuffer<'a>;
1748        type BufferPool = BufferPool;
1749
1750        fn take_buffer(
1751            self,
1752            buffer_pool: &Self::BufferPool,
1753            result: io::Result<usize>,
1754            buffer_id: u16,
1755        ) -> io::Result<Self::Buffer<'_>> {
1756            self.inner.take_buffer(buffer_pool, result, buffer_id)
1757        }
1758    }
1759
1760    pin_project! {
1761        /// Receive data from remote into multiple managed buffers.
1762        pub struct RecvMulti<S> {
1763            #[pin]
1764            inner: RecvManaged<S>,
1765            multishots: VecDeque<MultishotResult>
1766        }
1767    }
1768
1769    impl<S> RecvMulti<S> {
1770        /// Create [`RecvMulti`].
1771        pub fn new(fd: S, buffer_pool: &BufferPool, len: usize, flags: i32) -> io::Result<Self> {
1772            Ok(Self {
1773                inner: RecvManaged::new(fd, buffer_pool, len, flags)?,
1774                multishots: VecDeque::new(),
1775            })
1776        }
1777    }
1778
1779    unsafe impl<S: AsFd> OpCode for RecvMulti<S> {
1780        fn create_entry(self: Pin<&mut Self>) -> OpEntry {
1781            let fd = self.inner.fd.as_fd().as_raw_fd();
1782            opcode::RecvMulti::new(Fd(fd), self.inner.buffer_group)
1783                .flags(self.inner.flags)
1784                .build()
1785                .into()
1786        }
1787
1788        fn create_entry_fallback(self: Pin<&mut Self>) -> OpEntry {
1789            self.project().inner.create_entry()
1790        }
1791
1792        unsafe fn set_result(self: Pin<&mut Self>, res: &io::Result<usize>, extra: &crate::Extra) {
1793            unsafe { self.project().inner.set_result(res, extra) }
1794        }
1795
1796        unsafe fn push_multishot(
1797            self: Pin<&mut Self>,
1798            res: io::Result<usize>,
1799            extra: crate::Extra,
1800        ) {
1801            let this = self.project();
1802            this.multishots
1803                .push_back(MultishotResult::new(res, extra, &this.inner.pool));
1804        }
1805
1806        fn pop_multishot(self: Pin<&mut Self>) -> Option<BufResult<usize, crate::sys::Extra>> {
1807            self.project().multishots.pop_front().map(|mut proxy| {
1808                proxy.leak();
1809                BufResult(proxy.result, proxy.extra)
1810            })
1811        }
1812    }
1813
1814    impl<S> TakeBuffer for RecvMulti<S> {
1815        type Buffer<'a> = BorrowedBuffer<'a>;
1816        type BufferPool = BufferPool;
1817
1818        fn take_buffer(
1819            self,
1820            buffer_pool: &Self::BufferPool,
1821            result: io::Result<usize>,
1822            buffer_id: u16,
1823        ) -> io::Result<Self::Buffer<'_>> {
1824            self.inner.take_buffer(buffer_pool, result, buffer_id)
1825        }
1826    }
1827}
1828
1829pub(crate) use buf_ring::take_buffer;
1830pub use buf_ring::{
1831    ReadManaged, ReadManagedAt, ReadMulti, ReadMultiAt, RecvFromManaged, RecvManaged, RecvMulti,
1832};