Skip to main content

compio_driver/sys/poll/
op.rs

1#[cfg(aio)]
2use std::ptr::NonNull;
3use std::{
4    ffi::CString,
5    io,
6    marker::PhantomPinned,
7    os::fd::{AsRawFd, FromRawFd, OwnedFd},
8    pin::Pin,
9    task::Poll,
10};
11
12use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
13#[cfg(not(any(target_os = "linux", target_os = "android", target_os = "hurd")))]
14use libc::{pread, preadv, pwrite, pwritev};
15#[cfg(any(target_os = "linux", target_os = "android", target_os = "hurd"))]
16use libc::{pread64 as pread, preadv64 as preadv, pwrite64 as pwrite, pwritev64 as pwritev};
17use pin_project_lite::pin_project;
18use socket2::{SockAddr, SockAddrStorage, Socket as Socket2, socklen_t};
19
20pub use self::{
21    Send as SendZc, SendMsg as SendMsgZc, SendTo as SendToZc, SendToVectored as SendToVectoredZc,
22    SendVectored as SendVectoredZc,
23};
24use super::{AsFd, Decision, OpCode, OpType, syscall};
25pub use crate::sys::unix_op::*;
26use crate::{op::*, sys_slice::*};
27
28unsafe impl<
29    D: std::marker::Send + 'static,
30    F: (FnOnce() -> BufResult<usize, D>) + std::marker::Send + 'static,
31> OpCode for Asyncify<F, D>
32{
33    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
34        Ok(Decision::Blocking)
35    }
36
37    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
38        let this = self.project();
39        let f = this
40            .f
41            .take()
42            .expect("the operate method could only be called once");
43        let BufResult(res, data) = f();
44        *this.data = Some(data);
45        Poll::Ready(res)
46    }
47}
48
49unsafe impl<
50    S,
51    D: std::marker::Send + 'static,
52    F: (FnOnce(&S) -> BufResult<usize, D>) + std::marker::Send + 'static,
53> OpCode for AsyncifyFd<S, F, D>
54{
55    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
56        Ok(Decision::Blocking)
57    }
58
59    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
60        let this = self.project();
61        let f = this
62            .f
63            .take()
64            .expect("the operate method could only be called once");
65        let BufResult(res, data) = f(this.fd);
66        *this.data = Some(data);
67        Poll::Ready(res)
68    }
69}
70
71unsafe impl<
72    S1,
73    S2,
74    D: std::marker::Send + 'static,
75    F: (FnOnce(&S1, &S2) -> BufResult<usize, D>) + std::marker::Send + 'static,
76> OpCode for AsyncifyFd2<S1, S2, F, D>
77{
78    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
79        Ok(Decision::Blocking)
80    }
81
82    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
83        let this = self.project();
84        let f = this
85            .f
86            .take()
87            .expect("the operate method could only be called once");
88        let BufResult(res, data) = f(this.fd1, this.fd2);
89        *this.data = Some(data);
90        Poll::Ready(res)
91    }
92}
93
94unsafe impl<S: AsFd> OpCode for OpenFile<S> {
95    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
96        Ok(Decision::Blocking)
97    }
98
99    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
100        let fd = self.as_mut().call()?;
101        *self.project().opened_fd = Some(unsafe { OwnedFd::from_raw_fd(fd as _) });
102        Poll::Ready(Ok(fd))
103    }
104}
105
106unsafe impl OpCode for CloseFile {
107    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
108        Ok(Decision::Blocking)
109    }
110
111    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
112        Poll::Ready(self.call())
113    }
114}
115
116unsafe impl<S: AsFd> OpCode for TruncateFile<S> {
117    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
118        Ok(Decision::Blocking)
119    }
120
121    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
122        Poll::Ready(self.call())
123    }
124}
125
126pin_project! {
127    /// Get metadata of an opened file.
128    pub struct FileStat<S> {
129        pub(crate) fd: S,
130        pub(crate) stat: Stat,
131    }
132}
133
134impl<S> FileStat<S> {
135    /// Create [`FileStat`].
136    pub fn new(fd: S) -> Self {
137        Self {
138            fd,
139            stat: unsafe { std::mem::zeroed() },
140        }
141    }
142}
143
144unsafe impl<S: AsFd> OpCode for FileStat<S> {
145    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
146        Ok(Decision::Blocking)
147    }
148
149    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
150        let this = self.project();
151        #[cfg(gnulinux)]
152        {
153            let mut s: libc::statx = unsafe { std::mem::zeroed() };
154            static EMPTY_NAME: &[u8] = b"\0";
155            syscall!(libc::statx(
156                this.fd.as_fd().as_raw_fd(),
157                EMPTY_NAME.as_ptr().cast(),
158                libc::AT_EMPTY_PATH,
159                statx_mask(),
160                &mut s
161            ))?;
162            *this.stat = statx_to_stat(s);
163            Poll::Ready(Ok(0))
164        }
165        #[cfg(not(gnulinux))]
166        {
167            Poll::Ready(Ok(
168                syscall!(libc::fstat(this.fd.as_fd().as_raw_fd(), this.stat))? as _,
169            ))
170        }
171    }
172}
173
174impl<S> IntoInner for FileStat<S> {
175    type Inner = Stat;
176
177    fn into_inner(self) -> Self::Inner {
178        self.stat
179    }
180}
181
182pin_project! {
183    /// Get metadata from path.
184    pub struct PathStat<S: AsFd> {
185        pub(crate) dirfd: S,
186        pub(crate) path: CString,
187        pub(crate) stat: Stat,
188        pub(crate) follow_symlink: bool,
189    }
190}
191
192impl<S: AsFd> PathStat<S> {
193    /// Create [`PathStat`].
194    pub fn new(dirfd: S, path: CString, follow_symlink: bool) -> Self {
195        Self {
196            dirfd,
197            path,
198            stat: unsafe { std::mem::zeroed() },
199            follow_symlink,
200        }
201    }
202}
203
204unsafe impl<S: AsFd> OpCode for PathStat<S> {
205    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
206        Ok(Decision::Blocking)
207    }
208
209    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
210        let this = self.project();
211        #[cfg(gnulinux)]
212        let res = {
213            let mut flags = libc::AT_EMPTY_PATH;
214            if !*this.follow_symlink {
215                flags |= libc::AT_SYMLINK_NOFOLLOW;
216            }
217            let mut s: libc::statx = unsafe { std::mem::zeroed() };
218            let res = syscall!(libc::statx(
219                this.dirfd.as_fd().as_raw_fd(),
220                this.path.as_ptr(),
221                flags,
222                statx_mask(),
223                &mut s
224            ))?;
225            *this.stat = statx_to_stat(s);
226            res
227        };
228        // Some platforms don't support `AT_EMPTY_PATH`, so we have to use `fstat` when
229        // the path is empty.
230        #[cfg(not(gnulinux))]
231        let res = if this.path.is_empty() {
232            syscall!(libc::fstat(this.dirfd.as_fd().as_raw_fd(), this.stat))?
233        } else {
234            syscall!(libc::fstatat(
235                this.dirfd.as_fd().as_raw_fd(),
236                this.path.as_ptr(),
237                this.stat,
238                if !*this.follow_symlink {
239                    libc::AT_SYMLINK_NOFOLLOW
240                } else {
241                    0
242                }
243            ))?
244        };
245        Poll::Ready(Ok(res as _))
246    }
247}
248
249impl<S: AsFd> IntoInner for PathStat<S> {
250    type Inner = Stat;
251
252    fn into_inner(self) -> Self::Inner {
253        self.stat
254    }
255}
256
257unsafe impl<T: IoBufMut, S: AsFd> OpCode for ReadAt<T, S> {
258    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
259        #[cfg(aio)]
260        {
261            let this = self.project();
262            let slice = this.buffer.sys_slice_mut();
263
264            this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
265            this.aiocb.aio_offset = *this.offset as _;
266            this.aiocb.aio_buf = slice.ptr().cast();
267            this.aiocb.aio_nbytes = slice.len();
268
269            Ok(Decision::aio(this.aiocb, libc::aio_read))
270        }
271        #[cfg(not(aio))]
272        {
273            Ok(Decision::Blocking)
274        }
275    }
276
277    #[cfg(aio)]
278    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
279        Some(OpType::Aio(NonNull::from(self.project().aiocb)))
280    }
281
282    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
283        let fd = self.fd.as_fd().as_raw_fd();
284        let offset = self.offset;
285        let slice = self.project().buffer.sys_slice_mut();
286        syscall!(break pread(fd, slice.ptr() as _, slice.len() as _, offset as _,))
287    }
288}
289
290unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectoredAt<T, S> {
291    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
292        #[cfg(freebsd)]
293        {
294            let this = self.project();
295            *this.slices = this.buffer.sys_slices_mut();
296
297            this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
298            this.aiocb.aio_offset = *this.offset as _;
299            this.aiocb.aio_buf = this.slices.as_mut_ptr().cast();
300            this.aiocb.aio_nbytes = this.slices.len();
301
302            Ok(Decision::aio(this.aiocb, libc::aio_readv))
303        }
304        #[cfg(not(freebsd))]
305        {
306            Ok(Decision::Blocking)
307        }
308    }
309
310    #[cfg(freebsd)]
311    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
312        Some(OpType::Aio(NonNull::from(self.project().aiocb)))
313    }
314
315    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
316        let this = self.project();
317        *this.slices = this.buffer.sys_slices_mut();
318        syscall!(
319            break preadv(
320                this.fd.as_fd().as_raw_fd(),
321                this.slices.as_ptr() as _,
322                this.slices.len() as _,
323                *this.offset as _,
324            )
325        )
326    }
327}
328
329unsafe impl<T: IoBuf, S: AsFd> OpCode for WriteAt<T, S> {
330    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
331        #[cfg(aio)]
332        {
333            let this = self.project();
334            let slice = this.buffer.as_ref().sys_slice();
335
336            this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
337            this.aiocb.aio_offset = *this.offset as _;
338            this.aiocb.aio_buf = slice.ptr().cast();
339            this.aiocb.aio_nbytes = slice.len();
340
341            Ok(Decision::aio(this.aiocb, libc::aio_write))
342        }
343        #[cfg(not(aio))]
344        {
345            Ok(Decision::Blocking)
346        }
347    }
348
349    #[cfg(aio)]
350    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
351        Some(OpType::Aio(NonNull::from(self.project().aiocb)))
352    }
353
354    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
355        let slice = self.buffer.as_init();
356        syscall!(
357            break pwrite(
358                self.fd.as_fd().as_raw_fd(),
359                slice.as_ptr() as _,
360                slice.len() as _,
361                self.offset as _,
362            )
363        )
364    }
365}
366
367unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectoredAt<T, S> {
368    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
369        #[cfg(freebsd)]
370        {
371            let this = self.project();
372            *this.slices = this.buffer.as_ref().sys_slices();
373
374            this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
375            this.aiocb.aio_offset = *this.offset as _;
376            this.aiocb.aio_buf = this.slices.as_ptr().cast_mut().cast();
377            this.aiocb.aio_nbytes = this.slices.len();
378
379            Ok(Decision::aio(this.aiocb, libc::aio_writev))
380        }
381        #[cfg(not(freebsd))]
382        {
383            Ok(Decision::Blocking)
384        }
385    }
386
387    #[cfg(freebsd)]
388    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
389        Some(OpType::Aio(NonNull::from(self.project().aiocb)))
390    }
391
392    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
393        let this = self.project();
394        *this.slices = this.buffer.as_ref().sys_slices();
395        syscall!(
396            break pwritev(
397                this.fd.as_fd().as_raw_fd(),
398                this.slices.as_ptr() as _,
399                this.slices.len() as _,
400                *this.offset as _,
401            )
402        )
403    }
404}
405
406unsafe impl<S: AsFd> OpCode for crate::op::managed::ReadManagedAt<S> {
407    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
408        self.project().op.pre_submit()
409    }
410
411    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
412        self.project().op.op_type()
413    }
414
415    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
416        self.project().op.operate()
417    }
418}
419
420unsafe impl<T: IoBufMut, S: AsFd> OpCode for Read<T, S> {
421    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
422        Ok(Decision::wait_readable(self.fd.as_fd().as_raw_fd()))
423    }
424
425    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
426        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
427    }
428
429    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
430        let fd = self.fd.as_fd().as_raw_fd();
431        let slice = self.project().buffer.sys_slice_mut();
432        syscall!(break libc::read(fd, slice.ptr() as _, slice.len()))
433    }
434}
435
436unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectored<T, S> {
437    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
438        Ok(Decision::wait_readable(self.fd.as_fd().as_raw_fd()))
439    }
440
441    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
442        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
443    }
444
445    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
446        let this = self.project();
447        *this.slices = this.buffer.sys_slices_mut();
448        syscall!(
449            break libc::readv(
450                this.fd.as_fd().as_raw_fd(),
451                this.slices.as_ptr() as _,
452                this.slices.len() as _
453            )
454        )
455    }
456}
457
458unsafe impl<T: IoBuf, S: AsFd> OpCode for Write<T, S> {
459    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
460        Ok(Decision::wait_writable(self.fd.as_fd().as_raw_fd()))
461    }
462
463    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
464        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
465    }
466
467    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
468        let slice = self.buffer.as_init();
469        syscall!(
470            break libc::write(
471                self.fd.as_fd().as_raw_fd(),
472                slice.as_ptr() as _,
473                slice.len()
474            )
475        )
476    }
477}
478
479unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectored<T, S> {
480    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
481        Ok(Decision::wait_writable(self.fd.as_fd().as_raw_fd()))
482    }
483
484    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
485        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
486    }
487
488    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
489        let this = self.project();
490        *this.slices = this.buffer.as_ref().sys_slices();
491        syscall!(
492            break libc::writev(
493                this.fd.as_fd().as_raw_fd(),
494                this.slices.as_ptr() as _,
495                this.slices.len() as _
496            )
497        )
498    }
499}
500
501unsafe impl<S: AsFd> OpCode for crate::op::managed::ReadManaged<S> {
502    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
503        self.project().op.pre_submit()
504    }
505
506    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
507        self.project().op.op_type()
508    }
509
510    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
511        self.project().op.operate()
512    }
513}
514
515unsafe impl<S: AsFd> OpCode for Sync<S> {
516    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
517        #[cfg(aio)]
518        {
519            unsafe extern "C" fn aio_fsync(aiocbp: *mut libc::aiocb) -> i32 {
520                unsafe { libc::aio_fsync(libc::O_SYNC, aiocbp) }
521            }
522            unsafe extern "C" fn aio_fdatasync(aiocbp: *mut libc::aiocb) -> i32 {
523                unsafe { libc::aio_fsync(libc::O_DSYNC, aiocbp) }
524            }
525
526            let this = self.project();
527            this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
528
529            let f = if *this.datasync {
530                aio_fdatasync
531            } else {
532                aio_fsync
533            };
534
535            Ok(Decision::aio(this.aiocb, f))
536        }
537        #[cfg(not(aio))]
538        {
539            Ok(Decision::Blocking)
540        }
541    }
542
543    #[cfg(aio)]
544    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
545        Some(OpType::Aio(NonNull::from(self.project().aiocb)))
546    }
547
548    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
549        #[cfg(datasync)]
550        {
551            Poll::Ready(Ok(syscall!(if self.datasync {
552                libc::fdatasync(self.fd.as_fd().as_raw_fd())
553            } else {
554                libc::fsync(self.fd.as_fd().as_raw_fd())
555            })? as _))
556        }
557        #[cfg(not(datasync))]
558        {
559            Poll::Ready(Ok(syscall!(libc::fsync(self.fd.as_fd().as_raw_fd()))? as _))
560        }
561    }
562}
563
564unsafe impl<S: AsFd> OpCode for Unlink<S> {
565    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
566        Ok(Decision::Blocking)
567    }
568
569    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
570        Poll::Ready(self.call())
571    }
572}
573
574unsafe impl<S: AsFd> OpCode for CreateDir<S> {
575    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
576        Ok(Decision::Blocking)
577    }
578
579    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
580        Poll::Ready(self.call())
581    }
582}
583
584unsafe impl<S1: AsFd, S2: AsFd> OpCode for Rename<S1, S2> {
585    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
586        Ok(Decision::Blocking)
587    }
588
589    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
590        Poll::Ready(self.call())
591    }
592}
593
594unsafe impl<S: AsFd> OpCode for Symlink<S> {
595    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
596        Ok(Decision::Blocking)
597    }
598
599    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
600        Poll::Ready(self.call())
601    }
602}
603
604unsafe impl<S1: AsFd, S2: AsFd> OpCode for HardLink<S1, S2> {
605    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
606        Ok(Decision::Blocking)
607    }
608
609    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
610        Poll::Ready(self.call())
611    }
612}
613
614impl CreateSocket {
615    unsafe fn call(self: Pin<&mut Self>) -> io::Result<libc::c_int> {
616        #[allow(unused_mut)]
617        let mut ty: i32 = self.socket_type;
618        #[cfg(any(
619            target_os = "android",
620            target_os = "dragonfly",
621            target_os = "freebsd",
622            target_os = "fuchsia",
623            target_os = "hurd",
624            target_os = "illumos",
625            target_os = "linux",
626            target_os = "netbsd",
627            target_os = "openbsd",
628            target_os = "cygwin",
629        ))]
630        {
631            ty |= libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK;
632        }
633        let fd = syscall!(libc::socket(self.domain, ty, self.protocol))?;
634        let socket = unsafe { Socket2::from_raw_fd(fd) };
635        #[cfg(not(any(
636            target_os = "android",
637            target_os = "dragonfly",
638            target_os = "freebsd",
639            target_os = "fuchsia",
640            target_os = "hurd",
641            target_os = "illumos",
642            target_os = "linux",
643            target_os = "netbsd",
644            target_os = "openbsd",
645            target_os = "espidf",
646            target_os = "vita",
647            target_os = "cygwin",
648        )))]
649        socket.set_cloexec(true)?;
650        #[cfg(any(
651            target_os = "ios",
652            target_os = "macos",
653            target_os = "tvos",
654            target_os = "watchos",
655        ))]
656        socket.set_nosigpipe(true)?;
657        #[cfg(not(any(
658            target_os = "android",
659            target_os = "dragonfly",
660            target_os = "freebsd",
661            target_os = "fuchsia",
662            target_os = "hurd",
663            target_os = "illumos",
664            target_os = "linux",
665            target_os = "netbsd",
666            target_os = "openbsd",
667            target_os = "cygwin",
668        )))]
669        socket.set_nonblocking(true)?;
670        *self.project().opened_fd = Some(socket);
671        Ok(fd)
672    }
673}
674
675unsafe impl OpCode for CreateSocket {
676    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
677        Ok(Decision::Blocking)
678    }
679
680    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
681        Poll::Ready(Ok(unsafe { self.call()? } as _))
682    }
683}
684
685unsafe impl<S: AsFd> OpCode for ShutdownSocket<S> {
686    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
687        Ok(Decision::Blocking)
688    }
689
690    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
691        Poll::Ready(self.call())
692    }
693}
694
695unsafe impl OpCode for CloseSocket {
696    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
697        Ok(Decision::Blocking)
698    }
699
700    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
701        Poll::Ready(self.call())
702    }
703}
704
705impl<S: AsFd> Accept<S> {
706    // If the first call succeeds, there won't be another call.
707    unsafe fn call(self: Pin<&mut Self>) -> libc::c_int {
708        let this = self.project();
709        || -> io::Result<libc::c_int> {
710            #[cfg(any(
711                target_os = "android",
712                target_os = "dragonfly",
713                target_os = "freebsd",
714                target_os = "fuchsia",
715                target_os = "illumos",
716                target_os = "linux",
717                target_os = "netbsd",
718                target_os = "openbsd",
719                target_os = "cygwin",
720            ))]
721            {
722                let fd = syscall!(libc::accept4(
723                    this.fd.as_fd().as_raw_fd(),
724                    this.buffer as *mut _ as *mut _,
725                    this.addr_len,
726                    libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC,
727                ))?;
728                let socket = unsafe { Socket2::from_raw_fd(fd) };
729                *this.accepted_fd = Some(socket);
730                Ok(fd)
731            }
732            #[cfg(not(any(
733                target_os = "android",
734                target_os = "dragonfly",
735                target_os = "freebsd",
736                target_os = "fuchsia",
737                target_os = "illumos",
738                target_os = "linux",
739                target_os = "netbsd",
740                target_os = "openbsd",
741                target_os = "cygwin",
742            )))]
743            {
744                let fd = syscall!(libc::accept(
745                    this.fd.as_fd().as_raw_fd(),
746                    this.buffer as *mut _ as *mut _,
747                    this.addr_len,
748                ))?;
749                let socket = unsafe { Socket2::from_raw_fd(fd) };
750                socket.set_cloexec(true)?;
751                socket.set_nonblocking(true)?;
752                *this.accepted_fd = Some(socket);
753                Ok(fd)
754            }
755        }()
756        .unwrap_or(-1)
757    }
758}
759
760unsafe impl<S: AsFd> OpCode for Accept<S> {
761    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
762        let fd = self.fd.as_fd().as_raw_fd();
763        syscall!(self.as_mut().call(), wait_readable(fd))
764    }
765
766    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
767        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
768    }
769
770    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
771        syscall!(break self.as_mut().call())
772    }
773}
774
775pin_project! {
776    /// Accept multiple connections.
777    pub struct AcceptMulti<S> {
778        #[pin]
779        pub(crate) op: Accept<S>,
780    }
781}
782
783impl<S> AcceptMulti<S> {
784    /// Create [`AcceptMulti`].
785    pub fn new(fd: S) -> Self {
786        Self {
787            op: Accept::new(fd),
788        }
789    }
790}
791
792unsafe impl<S: AsFd> OpCode for AcceptMulti<S> {
793    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
794        self.project().op.pre_submit()
795    }
796
797    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
798        self.project().op.op_type()
799    }
800
801    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
802        self.project().op.operate()
803    }
804}
805
806impl<S> IntoInner for AcceptMulti<S> {
807    type Inner = Socket2;
808
809    fn into_inner(self) -> Self::Inner {
810        self.op.into_inner().0
811    }
812}
813unsafe impl<S: AsFd> OpCode for Connect<S> {
814    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
815        syscall!(
816            libc::connect(
817                self.fd.as_fd().as_raw_fd(),
818                self.addr.as_ptr().cast(),
819                self.addr.len()
820            ),
821            wait_writable(self.fd.as_fd().as_raw_fd())
822        )
823    }
824
825    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
826        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
827    }
828
829    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
830        let mut err: libc::c_int = 0;
831        let mut err_len = std::mem::size_of::<libc::c_int>() as libc::socklen_t;
832
833        syscall!(libc::getsockopt(
834            self.fd.as_fd().as_raw_fd(),
835            libc::SOL_SOCKET,
836            libc::SO_ERROR,
837            &mut err as *mut _ as *mut _,
838            &mut err_len
839        ))?;
840
841        let res = if err == 0 {
842            Ok(0)
843        } else {
844            Err(io::Error::from_raw_os_error(err))
845        };
846        Poll::Ready(res)
847    }
848}
849
850unsafe impl<T: IoBufMut, S: AsFd> OpCode for Recv<T, S> {
851    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
852        Ok(Decision::wait_readable(self.fd.as_fd().as_raw_fd()))
853    }
854
855    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
856        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
857    }
858
859    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
860        let fd = self.fd.as_fd().as_raw_fd();
861        let flags = self.flags;
862        let slice = self.project().buffer.sys_slice_mut();
863        syscall!(break libc::recv(fd, slice.ptr() as _, slice.len(), flags))
864    }
865}
866
867unsafe impl<T: IoBuf, S: AsFd> OpCode for Send<T, S> {
868    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
869        Ok(Decision::wait_writable(self.fd.as_fd().as_raw_fd()))
870    }
871
872    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
873        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
874    }
875
876    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
877        let slice = self.buffer.as_init();
878        syscall!(
879            break libc::send(
880                self.fd.as_fd().as_raw_fd(),
881                slice.as_ptr() as _,
882                slice.len(),
883                self.flags,
884            )
885        )
886    }
887}
888
889unsafe impl<S: AsFd> OpCode for crate::op::managed::RecvManaged<S> {
890    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
891        self.project().op.pre_submit()
892    }
893
894    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
895        self.project().op.op_type()
896    }
897
898    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
899        self.project().op.operate()
900    }
901}
902
903unsafe impl<S: AsFd> OpCode for crate::op::managed::RecvFromManaged<S> {
904    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
905        self.project().op.pre_submit()
906    }
907
908    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
909        self.project().op.op_type()
910    }
911
912    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
913        self.project().op.operate()
914    }
915}
916
917impl<T: IoVectoredBufMut, S: AsFd> RecvVectored<T, S> {
918    unsafe fn call(self: Pin<&mut Self>) -> libc::ssize_t {
919        let this = self.project();
920        unsafe { libc::recvmsg(this.fd.as_fd().as_raw_fd(), this.msg, *this.flags) }
921    }
922}
923
924unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvVectored<T, S> {
925    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
926        self.as_mut().set_msg();
927        let fd = self.fd.as_fd().as_raw_fd();
928        syscall!(self.as_mut().call(), wait_readable(fd))
929    }
930
931    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
932        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
933    }
934
935    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
936        syscall!(break self.as_mut().call())
937    }
938}
939
940impl<T: IoVectoredBuf, S: AsFd> SendVectored<T, S> {
941    unsafe fn call(&self) -> libc::ssize_t {
942        unsafe { libc::sendmsg(self.fd.as_fd().as_raw_fd(), &self.msg, self.flags) }
943    }
944}
945
946unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for SendVectored<T, S> {
947    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
948        self.as_mut().set_msg();
949        let fd = self.as_mut().project().fd.as_fd().as_raw_fd();
950        syscall!(self.as_mut().call(), wait_writable(fd))
951    }
952
953    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
954        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
955    }
956
957    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
958        syscall!(break self.call())
959    }
960}
961
962pin_project! {
963    /// Receive data and source address.
964    pub struct RecvFrom<T: IoBufMut, S> {
965        pub(crate) fd: S,
966        #[pin]
967        pub(crate) buffer: T,
968        pub(crate) addr: SockAddrStorage,
969        pub(crate) addr_len: socklen_t,
970        pub(crate) flags: i32,
971        _p: PhantomPinned,
972    }
973}
974
975impl<T: IoBufMut, S> RecvFrom<T, S> {
976    /// Create [`RecvFrom`].
977    pub fn new(fd: S, buffer: T, flags: i32) -> Self {
978        let addr = SockAddrStorage::zeroed();
979        let addr_len = addr.size_of();
980        Self {
981            fd,
982            buffer,
983            addr,
984            addr_len,
985            flags,
986            _p: PhantomPinned,
987        }
988    }
989}
990
991impl<T: IoBufMut, S: AsFd> RecvFrom<T, S> {
992    unsafe fn call(self: Pin<&mut Self>) -> libc::ssize_t {
993        let fd = self.fd.as_fd().as_raw_fd();
994        let this = self.project();
995        let slice = this.buffer.sys_slice_mut();
996        unsafe {
997            libc::recvfrom(
998                fd,
999                slice.ptr() as _,
1000                slice.len(),
1001                *this.flags,
1002                this.addr as *mut _ as _,
1003                this.addr_len,
1004            )
1005        }
1006    }
1007}
1008
1009unsafe impl<T: IoBufMut, S: AsFd> OpCode for RecvFrom<T, S> {
1010    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
1011        let fd = self.fd.as_fd().as_raw_fd();
1012        syscall!(self.as_mut().call(), wait_readable(fd))
1013    }
1014
1015    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1016        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
1017    }
1018
1019    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1020        syscall!(break self.as_mut().call())
1021    }
1022}
1023
1024impl<T: IoBufMut, S> IntoInner for RecvFrom<T, S> {
1025    type Inner = (T, SockAddrStorage, socklen_t);
1026
1027    fn into_inner(self) -> Self::Inner {
1028        (self.buffer, self.addr, self.addr_len)
1029    }
1030}
1031
1032pin_project! {
1033    /// Receive data and source address into vectored buffer.
1034    pub struct RecvFromVectored<T: IoVectoredBufMut, S> {
1035        pub(crate) fd: S,
1036        #[pin]
1037        pub(crate) buffer: T,
1038        pub(crate) slices: Vec<SysSlice>,
1039        pub(crate) addr: SockAddrStorage,
1040        pub(crate) msg: libc::msghdr,
1041        pub(crate) flags: i32,
1042        _p: PhantomPinned,
1043    }
1044}
1045
1046impl<T: IoVectoredBufMut, S> RecvFromVectored<T, S> {
1047    /// Create [`RecvFromVectored`].
1048    pub fn new(fd: S, buffer: T, flags: i32) -> Self {
1049        Self {
1050            fd,
1051            buffer,
1052            slices: vec![],
1053            addr: SockAddrStorage::zeroed(),
1054            msg: unsafe { std::mem::zeroed() },
1055            flags,
1056            _p: PhantomPinned,
1057        }
1058    }
1059}
1060
1061impl<T: IoVectoredBufMut, S: AsFd> RecvFromVectored<T, S> {
1062    fn set_msg(self: Pin<&mut Self>) {
1063        let this = self.project();
1064        *this.slices = this.buffer.sys_slices_mut();
1065        this.msg.msg_name = this.addr as *mut _ as _;
1066        this.msg.msg_namelen = this.addr.size_of() as _;
1067        this.msg.msg_iov = this.slices.as_mut_ptr() as _;
1068        this.msg.msg_iovlen = this.slices.len() as _;
1069    }
1070
1071    unsafe fn call(self: Pin<&mut Self>) -> libc::ssize_t {
1072        let this = self.project();
1073        unsafe { libc::recvmsg(this.fd.as_fd().as_raw_fd(), this.msg, *this.flags) }
1074    }
1075}
1076
1077unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvFromVectored<T, S> {
1078    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
1079        self.as_mut().set_msg();
1080        let fd = self.as_mut().project().fd.as_fd().as_raw_fd();
1081        syscall!(self.as_mut().call(), wait_readable(fd))
1082    }
1083
1084    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1085        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
1086    }
1087
1088    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1089        syscall!(break self.as_mut().call())
1090    }
1091}
1092
1093impl<T: IoVectoredBufMut, S> IntoInner for RecvFromVectored<T, S> {
1094    type Inner = (T, SockAddrStorage, socklen_t);
1095
1096    fn into_inner(self) -> Self::Inner {
1097        (self.buffer, self.addr, self.msg.msg_namelen)
1098    }
1099}
1100
1101pin_project! {
1102    /// Send data to specified address.
1103    pub struct SendTo<T: IoBuf, S> {
1104        pub(crate) fd: S,
1105        pub(crate) buffer: T,
1106        pub(crate) addr: SockAddr,
1107        flags: i32,
1108        _p: PhantomPinned,
1109    }
1110}
1111
1112impl<T: IoBuf, S> SendTo<T, S> {
1113    /// Create [`SendTo`].
1114    pub fn new(fd: S, buffer: T, addr: SockAddr, flags: i32) -> Self {
1115        Self {
1116            fd,
1117            buffer,
1118            addr,
1119            flags,
1120            _p: PhantomPinned,
1121        }
1122    }
1123}
1124
1125impl<T: IoBuf, S: AsFd> SendTo<T, S> {
1126    unsafe fn call(&self) -> libc::ssize_t {
1127        let slice = self.buffer.as_init();
1128        unsafe {
1129            libc::sendto(
1130                self.fd.as_fd().as_raw_fd(),
1131                slice.as_ptr() as _,
1132                slice.len(),
1133                self.flags,
1134                self.addr.as_ptr().cast(),
1135                self.addr.len(),
1136            )
1137        }
1138    }
1139}
1140
1141unsafe impl<T: IoBuf, S: AsFd> OpCode for SendTo<T, S> {
1142    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
1143        syscall!(self.call(), wait_writable(self.fd.as_fd().as_raw_fd()))
1144    }
1145
1146    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1147        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
1148    }
1149
1150    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1151        syscall!(break self.call())
1152    }
1153}
1154
1155impl<T: IoBuf, S> IntoInner for SendTo<T, S> {
1156    type Inner = T;
1157
1158    fn into_inner(self) -> Self::Inner {
1159        self.buffer
1160    }
1161}
1162
1163pin_project! {
1164    /// Send data to specified address from vectored buffer.
1165    pub struct SendToVectored<T: IoVectoredBuf, S> {
1166        pub(crate) fd: S,
1167        #[pin]
1168        pub(crate) buffer: T,
1169        pub(crate) addr: SockAddr,
1170        pub(crate) slices: Vec<SysSlice>,
1171        pub(crate) msg: libc::msghdr,
1172        pub(crate) flags: i32,
1173        _p: PhantomPinned,
1174    }
1175}
1176
1177impl<T: IoVectoredBuf, S> SendToVectored<T, S> {
1178    /// Create [`SendToVectored`].
1179    pub fn new(fd: S, buffer: T, addr: SockAddr, flags: i32) -> Self {
1180        Self {
1181            fd,
1182            buffer,
1183            addr,
1184            slices: vec![],
1185            msg: unsafe { std::mem::zeroed() },
1186            flags,
1187            _p: PhantomPinned,
1188        }
1189    }
1190}
1191
1192impl<T: IoVectoredBuf, S: AsFd> SendToVectored<T, S> {
1193    fn set_msg(self: Pin<&mut Self>) {
1194        let this = self.project();
1195        *this.slices = this.buffer.as_ref().sys_slices();
1196        this.msg.msg_name = this.addr as *mut _ as _;
1197        this.msg.msg_namelen = this.addr.len() as _;
1198        this.msg.msg_iov = this.slices.as_mut_ptr() as _;
1199        this.msg.msg_iovlen = this.slices.len() as _;
1200    }
1201
1202    unsafe fn call(self: Pin<&mut Self>) -> libc::ssize_t {
1203        unsafe { libc::sendmsg(self.fd.as_fd().as_raw_fd(), &self.msg, self.flags) }
1204    }
1205}
1206
1207unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for SendToVectored<T, S> {
1208    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
1209        self.as_mut().set_msg();
1210        let fd = self.fd.as_fd().as_raw_fd();
1211        syscall!(self.as_mut().call(), wait_writable(fd))
1212    }
1213
1214    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1215        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
1216    }
1217
1218    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1219        syscall!(break self.as_mut().call())
1220    }
1221}
1222
1223impl<T: IoVectoredBuf, S> IntoInner for SendToVectored<T, S> {
1224    type Inner = T;
1225
1226    fn into_inner(self) -> Self::Inner {
1227        self.buffer
1228    }
1229}
1230
1231impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> RecvMsg<T, C, S> {
1232    unsafe fn call(self: Pin<&mut Self>) -> libc::ssize_t {
1233        let this = self.project();
1234        unsafe { libc::recvmsg(this.fd.as_fd().as_raw_fd(), this.msg, *this.flags) }
1235    }
1236}
1237
1238unsafe impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> OpCode for RecvMsg<T, C, S> {
1239    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
1240        self.as_mut().set_msg();
1241        let fd = self.fd.as_fd().as_raw_fd();
1242        syscall!(self.as_mut().call(), wait_readable(fd))
1243    }
1244
1245    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1246        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
1247    }
1248
1249    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1250        syscall!(break self.as_mut().call())
1251    }
1252}
1253
1254impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> SendMsg<T, C, S> {
1255    unsafe fn call(self: Pin<&mut Self>) -> libc::ssize_t {
1256        unsafe { libc::sendmsg(self.fd.as_fd().as_raw_fd(), &self.msg, self.flags) }
1257    }
1258}
1259
1260unsafe impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> OpCode for SendMsg<T, C, S> {
1261    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
1262        self.as_mut().set_msg();
1263        let fd = self.fd.as_fd().as_raw_fd();
1264        syscall!(self.as_mut().call(), wait_writable(fd))
1265    }
1266
1267    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1268        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
1269    }
1270
1271    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1272        syscall!(break self.as_mut().call())
1273    }
1274}
1275
1276unsafe impl<S: AsFd> OpCode for PollOnce<S> {
1277    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
1278        Ok(Decision::wait_for(
1279            self.fd.as_fd().as_raw_fd(),
1280            self.interest,
1281        ))
1282    }
1283
1284    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1285        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
1286    }
1287
1288    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1289        Poll::Ready(Ok(0))
1290    }
1291}
1292
1293#[cfg(linux_all)]
1294unsafe impl<S1: AsFd, S2: AsFd> OpCode for Splice<S1, S2> {
1295    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
1296        use super::WaitArg;
1297
1298        Ok(Decision::wait_for_many([
1299            WaitArg::readable(self.fd_in.as_fd().as_raw_fd()),
1300            WaitArg::writable(self.fd_out.as_fd().as_raw_fd()),
1301        ]))
1302    }
1303
1304    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1305        Some(OpType::multi_fd([
1306            self.fd_in.as_fd().as_raw_fd(),
1307            self.fd_out.as_fd().as_raw_fd(),
1308        ]))
1309    }
1310
1311    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1312        let mut offset_in = self.offset_in;
1313        let mut offset_out = self.offset_out;
1314        let offset_in_ptr = if offset_in < 0 {
1315            std::ptr::null_mut()
1316        } else {
1317            &mut offset_in
1318        };
1319        let offset_out_ptr = if offset_out < 0 {
1320            std::ptr::null_mut()
1321        } else {
1322            &mut offset_out
1323        };
1324        syscall!(
1325            break libc::splice(
1326                self.fd_in.as_fd().as_raw_fd(),
1327                offset_in_ptr,
1328                self.fd_out.as_fd().as_raw_fd(),
1329                offset_out_ptr,
1330                self.len,
1331                self.flags | libc::SPLICE_F_NONBLOCK,
1332            )
1333        )
1334    }
1335}