1#[cfg(aio)]
2use std::ptr::NonNull;
3use std::{
4 ffi::CString,
5 io,
6 marker::PhantomPinned,
7 os::fd::{AsRawFd, FromRawFd, OwnedFd},
8 pin::Pin,
9 task::Poll,
10};
11
12use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
13#[cfg(not(any(target_os = "linux", target_os = "android", target_os = "hurd")))]
14use libc::{pread, preadv, pwrite, pwritev};
15#[cfg(any(target_os = "linux", target_os = "android", target_os = "hurd"))]
16use libc::{pread64 as pread, preadv64 as preadv, pwrite64 as pwrite, pwritev64 as pwritev};
17use pin_project_lite::pin_project;
18use socket2::{SockAddr, SockAddrStorage, Socket as Socket2, socklen_t};
19
20pub use self::{
21 Send as SendZc, SendMsg as SendMsgZc, SendTo as SendToZc, SendToVectored as SendToVectoredZc,
22 SendVectored as SendVectoredZc,
23};
24use super::{AsFd, Decision, OpCode, OpType, syscall};
25pub use crate::sys::unix_op::*;
26use crate::{op::*, sys_slice::*};
27
28unsafe impl<
29 D: std::marker::Send + 'static,
30 F: (FnOnce() -> BufResult<usize, D>) + std::marker::Send + 'static,
31> OpCode for Asyncify<F, D>
32{
33 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
34 Ok(Decision::Blocking)
35 }
36
37 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
38 let this = self.project();
39 let f = this
40 .f
41 .take()
42 .expect("the operate method could only be called once");
43 let BufResult(res, data) = f();
44 *this.data = Some(data);
45 Poll::Ready(res)
46 }
47}
48
49unsafe impl<
50 S,
51 D: std::marker::Send + 'static,
52 F: (FnOnce(&S) -> BufResult<usize, D>) + std::marker::Send + 'static,
53> OpCode for AsyncifyFd<S, F, D>
54{
55 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
56 Ok(Decision::Blocking)
57 }
58
59 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
60 let this = self.project();
61 let f = this
62 .f
63 .take()
64 .expect("the operate method could only be called once");
65 let BufResult(res, data) = f(this.fd);
66 *this.data = Some(data);
67 Poll::Ready(res)
68 }
69}
70
71unsafe impl<
72 S1,
73 S2,
74 D: std::marker::Send + 'static,
75 F: (FnOnce(&S1, &S2) -> BufResult<usize, D>) + std::marker::Send + 'static,
76> OpCode for AsyncifyFd2<S1, S2, F, D>
77{
78 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
79 Ok(Decision::Blocking)
80 }
81
82 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
83 let this = self.project();
84 let f = this
85 .f
86 .take()
87 .expect("the operate method could only be called once");
88 let BufResult(res, data) = f(this.fd1, this.fd2);
89 *this.data = Some(data);
90 Poll::Ready(res)
91 }
92}
93
94unsafe impl<S: AsFd> OpCode for OpenFile<S> {
95 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
96 Ok(Decision::Blocking)
97 }
98
99 fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
100 let fd = self.as_mut().call()?;
101 *self.project().opened_fd = Some(unsafe { OwnedFd::from_raw_fd(fd as _) });
102 Poll::Ready(Ok(fd))
103 }
104}
105
106unsafe impl OpCode for CloseFile {
107 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
108 Ok(Decision::Blocking)
109 }
110
111 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
112 Poll::Ready(self.call())
113 }
114}
115
116unsafe impl<S: AsFd> OpCode for TruncateFile<S> {
117 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
118 Ok(Decision::Blocking)
119 }
120
121 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
122 Poll::Ready(self.call())
123 }
124}
125
126pin_project! {
127 pub struct FileStat<S> {
129 pub(crate) fd: S,
130 pub(crate) stat: Stat,
131 }
132}
133
134impl<S> FileStat<S> {
135 pub fn new(fd: S) -> Self {
137 Self {
138 fd,
139 stat: unsafe { std::mem::zeroed() },
140 }
141 }
142}
143
144unsafe impl<S: AsFd> OpCode for FileStat<S> {
145 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
146 Ok(Decision::Blocking)
147 }
148
149 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
150 let this = self.project();
151 #[cfg(gnulinux)]
152 {
153 let mut s: libc::statx = unsafe { std::mem::zeroed() };
154 static EMPTY_NAME: &[u8] = b"\0";
155 syscall!(libc::statx(
156 this.fd.as_fd().as_raw_fd(),
157 EMPTY_NAME.as_ptr().cast(),
158 libc::AT_EMPTY_PATH,
159 statx_mask(),
160 &mut s
161 ))?;
162 *this.stat = statx_to_stat(s);
163 Poll::Ready(Ok(0))
164 }
165 #[cfg(not(gnulinux))]
166 {
167 Poll::Ready(Ok(
168 syscall!(libc::fstat(this.fd.as_fd().as_raw_fd(), this.stat))? as _,
169 ))
170 }
171 }
172}
173
174impl<S> IntoInner for FileStat<S> {
175 type Inner = Stat;
176
177 fn into_inner(self) -> Self::Inner {
178 self.stat
179 }
180}
181
182pin_project! {
183 pub struct PathStat<S: AsFd> {
185 pub(crate) dirfd: S,
186 pub(crate) path: CString,
187 pub(crate) stat: Stat,
188 pub(crate) follow_symlink: bool,
189 }
190}
191
192impl<S: AsFd> PathStat<S> {
193 pub fn new(dirfd: S, path: CString, follow_symlink: bool) -> Self {
195 Self {
196 dirfd,
197 path,
198 stat: unsafe { std::mem::zeroed() },
199 follow_symlink,
200 }
201 }
202}
203
204unsafe impl<S: AsFd> OpCode for PathStat<S> {
205 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
206 Ok(Decision::Blocking)
207 }
208
209 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
210 let this = self.project();
211 #[cfg(gnulinux)]
212 let res = {
213 let mut flags = libc::AT_EMPTY_PATH;
214 if !*this.follow_symlink {
215 flags |= libc::AT_SYMLINK_NOFOLLOW;
216 }
217 let mut s: libc::statx = unsafe { std::mem::zeroed() };
218 let res = syscall!(libc::statx(
219 this.dirfd.as_fd().as_raw_fd(),
220 this.path.as_ptr(),
221 flags,
222 statx_mask(),
223 &mut s
224 ))?;
225 *this.stat = statx_to_stat(s);
226 res
227 };
228 #[cfg(not(gnulinux))]
231 let res = if this.path.is_empty() {
232 syscall!(libc::fstat(this.dirfd.as_fd().as_raw_fd(), this.stat))?
233 } else {
234 syscall!(libc::fstatat(
235 this.dirfd.as_fd().as_raw_fd(),
236 this.path.as_ptr(),
237 this.stat,
238 if !*this.follow_symlink {
239 libc::AT_SYMLINK_NOFOLLOW
240 } else {
241 0
242 }
243 ))?
244 };
245 Poll::Ready(Ok(res as _))
246 }
247}
248
249impl<S: AsFd> IntoInner for PathStat<S> {
250 type Inner = Stat;
251
252 fn into_inner(self) -> Self::Inner {
253 self.stat
254 }
255}
256
257unsafe impl<T: IoBufMut, S: AsFd> OpCode for ReadAt<T, S> {
258 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
259 #[cfg(aio)]
260 {
261 let this = self.project();
262 let slice = this.buffer.sys_slice_mut();
263
264 this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
265 this.aiocb.aio_offset = *this.offset as _;
266 this.aiocb.aio_buf = slice.ptr().cast();
267 this.aiocb.aio_nbytes = slice.len();
268
269 Ok(Decision::aio(this.aiocb, libc::aio_read))
270 }
271 #[cfg(not(aio))]
272 {
273 Ok(Decision::Blocking)
274 }
275 }
276
277 #[cfg(aio)]
278 fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
279 Some(OpType::Aio(NonNull::from(self.project().aiocb)))
280 }
281
282 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
283 let fd = self.fd.as_fd().as_raw_fd();
284 let offset = self.offset;
285 let slice = self.project().buffer.sys_slice_mut();
286 syscall!(break pread(fd, slice.ptr() as _, slice.len() as _, offset as _,))
287 }
288}
289
290unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectoredAt<T, S> {
291 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
292 #[cfg(freebsd)]
293 {
294 let this = self.project();
295 *this.slices = this.buffer.sys_slices_mut();
296
297 this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
298 this.aiocb.aio_offset = *this.offset as _;
299 this.aiocb.aio_buf = this.slices.as_mut_ptr().cast();
300 this.aiocb.aio_nbytes = this.slices.len();
301
302 Ok(Decision::aio(this.aiocb, libc::aio_readv))
303 }
304 #[cfg(not(freebsd))]
305 {
306 Ok(Decision::Blocking)
307 }
308 }
309
310 #[cfg(freebsd)]
311 fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
312 Some(OpType::Aio(NonNull::from(self.project().aiocb)))
313 }
314
315 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
316 let this = self.project();
317 *this.slices = this.buffer.sys_slices_mut();
318 syscall!(
319 break preadv(
320 this.fd.as_fd().as_raw_fd(),
321 this.slices.as_ptr() as _,
322 this.slices.len() as _,
323 *this.offset as _,
324 )
325 )
326 }
327}
328
329unsafe impl<T: IoBuf, S: AsFd> OpCode for WriteAt<T, S> {
330 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
331 #[cfg(aio)]
332 {
333 let this = self.project();
334 let slice = this.buffer.as_ref().sys_slice();
335
336 this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
337 this.aiocb.aio_offset = *this.offset as _;
338 this.aiocb.aio_buf = slice.ptr().cast();
339 this.aiocb.aio_nbytes = slice.len();
340
341 Ok(Decision::aio(this.aiocb, libc::aio_write))
342 }
343 #[cfg(not(aio))]
344 {
345 Ok(Decision::Blocking)
346 }
347 }
348
349 #[cfg(aio)]
350 fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
351 Some(OpType::Aio(NonNull::from(self.project().aiocb)))
352 }
353
354 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
355 let slice = self.buffer.as_init();
356 syscall!(
357 break pwrite(
358 self.fd.as_fd().as_raw_fd(),
359 slice.as_ptr() as _,
360 slice.len() as _,
361 self.offset as _,
362 )
363 )
364 }
365}
366
367unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectoredAt<T, S> {
368 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
369 #[cfg(freebsd)]
370 {
371 let this = self.project();
372 *this.slices = this.buffer.as_ref().sys_slices();
373
374 this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
375 this.aiocb.aio_offset = *this.offset as _;
376 this.aiocb.aio_buf = this.slices.as_ptr().cast_mut().cast();
377 this.aiocb.aio_nbytes = this.slices.len();
378
379 Ok(Decision::aio(this.aiocb, libc::aio_writev))
380 }
381 #[cfg(not(freebsd))]
382 {
383 Ok(Decision::Blocking)
384 }
385 }
386
387 #[cfg(freebsd)]
388 fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
389 Some(OpType::Aio(NonNull::from(self.project().aiocb)))
390 }
391
392 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
393 let this = self.project();
394 *this.slices = this.buffer.as_ref().sys_slices();
395 syscall!(
396 break pwritev(
397 this.fd.as_fd().as_raw_fd(),
398 this.slices.as_ptr() as _,
399 this.slices.len() as _,
400 *this.offset as _,
401 )
402 )
403 }
404}
405
406unsafe impl<S: AsFd> OpCode for crate::op::managed::ReadManagedAt<S> {
407 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
408 self.project().op.pre_submit()
409 }
410
411 fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
412 self.project().op.op_type()
413 }
414
415 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
416 self.project().op.operate()
417 }
418}
419
420unsafe impl<T: IoBufMut, S: AsFd> OpCode for Read<T, S> {
421 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
422 Ok(Decision::wait_readable(self.fd.as_fd().as_raw_fd()))
423 }
424
425 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
426 Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
427 }
428
429 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
430 let fd = self.fd.as_fd().as_raw_fd();
431 let slice = self.project().buffer.sys_slice_mut();
432 syscall!(break libc::read(fd, slice.ptr() as _, slice.len()))
433 }
434}
435
436unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectored<T, S> {
437 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
438 Ok(Decision::wait_readable(self.fd.as_fd().as_raw_fd()))
439 }
440
441 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
442 Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
443 }
444
445 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
446 let this = self.project();
447 *this.slices = this.buffer.sys_slices_mut();
448 syscall!(
449 break libc::readv(
450 this.fd.as_fd().as_raw_fd(),
451 this.slices.as_ptr() as _,
452 this.slices.len() as _
453 )
454 )
455 }
456}
457
458unsafe impl<T: IoBuf, S: AsFd> OpCode for Write<T, S> {
459 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
460 Ok(Decision::wait_writable(self.fd.as_fd().as_raw_fd()))
461 }
462
463 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
464 Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
465 }
466
467 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
468 let slice = self.buffer.as_init();
469 syscall!(
470 break libc::write(
471 self.fd.as_fd().as_raw_fd(),
472 slice.as_ptr() as _,
473 slice.len()
474 )
475 )
476 }
477}
478
479unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectored<T, S> {
480 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
481 Ok(Decision::wait_writable(self.fd.as_fd().as_raw_fd()))
482 }
483
484 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
485 Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
486 }
487
488 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
489 let this = self.project();
490 *this.slices = this.buffer.as_ref().sys_slices();
491 syscall!(
492 break libc::writev(
493 this.fd.as_fd().as_raw_fd(),
494 this.slices.as_ptr() as _,
495 this.slices.len() as _
496 )
497 )
498 }
499}
500
501unsafe impl<S: AsFd> OpCode for crate::op::managed::ReadManaged<S> {
502 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
503 self.project().op.pre_submit()
504 }
505
506 fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
507 self.project().op.op_type()
508 }
509
510 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
511 self.project().op.operate()
512 }
513}
514
515unsafe impl<S: AsFd> OpCode for Sync<S> {
516 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
517 #[cfg(aio)]
518 {
519 unsafe extern "C" fn aio_fsync(aiocbp: *mut libc::aiocb) -> i32 {
520 unsafe { libc::aio_fsync(libc::O_SYNC, aiocbp) }
521 }
522 unsafe extern "C" fn aio_fdatasync(aiocbp: *mut libc::aiocb) -> i32 {
523 unsafe { libc::aio_fsync(libc::O_DSYNC, aiocbp) }
524 }
525
526 let this = self.project();
527 this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
528
529 let f = if *this.datasync {
530 aio_fdatasync
531 } else {
532 aio_fsync
533 };
534
535 Ok(Decision::aio(this.aiocb, f))
536 }
537 #[cfg(not(aio))]
538 {
539 Ok(Decision::Blocking)
540 }
541 }
542
543 #[cfg(aio)]
544 fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
545 Some(OpType::Aio(NonNull::from(self.project().aiocb)))
546 }
547
548 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
549 #[cfg(datasync)]
550 {
551 Poll::Ready(Ok(syscall!(if self.datasync {
552 libc::fdatasync(self.fd.as_fd().as_raw_fd())
553 } else {
554 libc::fsync(self.fd.as_fd().as_raw_fd())
555 })? as _))
556 }
557 #[cfg(not(datasync))]
558 {
559 Poll::Ready(Ok(syscall!(libc::fsync(self.fd.as_fd().as_raw_fd()))? as _))
560 }
561 }
562}
563
564unsafe impl<S: AsFd> OpCode for Unlink<S> {
565 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
566 Ok(Decision::Blocking)
567 }
568
569 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
570 Poll::Ready(self.call())
571 }
572}
573
574unsafe impl<S: AsFd> OpCode for CreateDir<S> {
575 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
576 Ok(Decision::Blocking)
577 }
578
579 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
580 Poll::Ready(self.call())
581 }
582}
583
584unsafe impl<S1: AsFd, S2: AsFd> OpCode for Rename<S1, S2> {
585 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
586 Ok(Decision::Blocking)
587 }
588
589 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
590 Poll::Ready(self.call())
591 }
592}
593
594unsafe impl<S: AsFd> OpCode for Symlink<S> {
595 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
596 Ok(Decision::Blocking)
597 }
598
599 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
600 Poll::Ready(self.call())
601 }
602}
603
604unsafe impl<S1: AsFd, S2: AsFd> OpCode for HardLink<S1, S2> {
605 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
606 Ok(Decision::Blocking)
607 }
608
609 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
610 Poll::Ready(self.call())
611 }
612}
613
614impl CreateSocket {
615 unsafe fn call(self: Pin<&mut Self>) -> io::Result<libc::c_int> {
616 #[allow(unused_mut)]
617 let mut ty: i32 = self.socket_type;
618 #[cfg(any(
619 target_os = "android",
620 target_os = "dragonfly",
621 target_os = "freebsd",
622 target_os = "fuchsia",
623 target_os = "hurd",
624 target_os = "illumos",
625 target_os = "linux",
626 target_os = "netbsd",
627 target_os = "openbsd",
628 target_os = "cygwin",
629 ))]
630 {
631 ty |= libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK;
632 }
633 let fd = syscall!(libc::socket(self.domain, ty, self.protocol))?;
634 let socket = unsafe { Socket2::from_raw_fd(fd) };
635 #[cfg(not(any(
636 target_os = "android",
637 target_os = "dragonfly",
638 target_os = "freebsd",
639 target_os = "fuchsia",
640 target_os = "hurd",
641 target_os = "illumos",
642 target_os = "linux",
643 target_os = "netbsd",
644 target_os = "openbsd",
645 target_os = "espidf",
646 target_os = "vita",
647 target_os = "cygwin",
648 )))]
649 socket.set_cloexec(true)?;
650 #[cfg(any(
651 target_os = "ios",
652 target_os = "macos",
653 target_os = "tvos",
654 target_os = "watchos",
655 ))]
656 socket.set_nosigpipe(true)?;
657 #[cfg(not(any(
658 target_os = "android",
659 target_os = "dragonfly",
660 target_os = "freebsd",
661 target_os = "fuchsia",
662 target_os = "hurd",
663 target_os = "illumos",
664 target_os = "linux",
665 target_os = "netbsd",
666 target_os = "openbsd",
667 target_os = "cygwin",
668 )))]
669 socket.set_nonblocking(true)?;
670 *self.project().opened_fd = Some(socket);
671 Ok(fd)
672 }
673}
674
675unsafe impl OpCode for CreateSocket {
676 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
677 Ok(Decision::Blocking)
678 }
679
680 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
681 Poll::Ready(Ok(unsafe { self.call()? } as _))
682 }
683}
684
685unsafe impl<S: AsFd> OpCode for ShutdownSocket<S> {
686 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
687 Ok(Decision::Blocking)
688 }
689
690 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
691 Poll::Ready(self.call())
692 }
693}
694
695unsafe impl OpCode for CloseSocket {
696 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
697 Ok(Decision::Blocking)
698 }
699
700 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
701 Poll::Ready(self.call())
702 }
703}
704
705impl<S: AsFd> Accept<S> {
706 unsafe fn call(self: Pin<&mut Self>) -> libc::c_int {
708 let this = self.project();
709 || -> io::Result<libc::c_int> {
710 #[cfg(any(
711 target_os = "android",
712 target_os = "dragonfly",
713 target_os = "freebsd",
714 target_os = "fuchsia",
715 target_os = "illumos",
716 target_os = "linux",
717 target_os = "netbsd",
718 target_os = "openbsd",
719 target_os = "cygwin",
720 ))]
721 {
722 let fd = syscall!(libc::accept4(
723 this.fd.as_fd().as_raw_fd(),
724 this.buffer as *mut _ as *mut _,
725 this.addr_len,
726 libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC,
727 ))?;
728 let socket = unsafe { Socket2::from_raw_fd(fd) };
729 *this.accepted_fd = Some(socket);
730 Ok(fd)
731 }
732 #[cfg(not(any(
733 target_os = "android",
734 target_os = "dragonfly",
735 target_os = "freebsd",
736 target_os = "fuchsia",
737 target_os = "illumos",
738 target_os = "linux",
739 target_os = "netbsd",
740 target_os = "openbsd",
741 target_os = "cygwin",
742 )))]
743 {
744 let fd = syscall!(libc::accept(
745 this.fd.as_fd().as_raw_fd(),
746 this.buffer as *mut _ as *mut _,
747 this.addr_len,
748 ))?;
749 let socket = unsafe { Socket2::from_raw_fd(fd) };
750 socket.set_cloexec(true)?;
751 socket.set_nonblocking(true)?;
752 *this.accepted_fd = Some(socket);
753 Ok(fd)
754 }
755 }()
756 .unwrap_or(-1)
757 }
758}
759
760unsafe impl<S: AsFd> OpCode for Accept<S> {
761 fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
762 let fd = self.fd.as_fd().as_raw_fd();
763 syscall!(self.as_mut().call(), wait_readable(fd))
764 }
765
766 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
767 Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
768 }
769
770 fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
771 syscall!(break self.as_mut().call())
772 }
773}
774
775pin_project! {
776 pub struct AcceptMulti<S> {
778 #[pin]
779 pub(crate) op: Accept<S>,
780 }
781}
782
783impl<S> AcceptMulti<S> {
784 pub fn new(fd: S) -> Self {
786 Self {
787 op: Accept::new(fd),
788 }
789 }
790}
791
792unsafe impl<S: AsFd> OpCode for AcceptMulti<S> {
793 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
794 self.project().op.pre_submit()
795 }
796
797 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
798 self.project().op.op_type()
799 }
800
801 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
802 self.project().op.operate()
803 }
804}
805
806impl<S> IntoInner for AcceptMulti<S> {
807 type Inner = Socket2;
808
809 fn into_inner(self) -> Self::Inner {
810 self.op.into_inner().0
811 }
812}
813unsafe impl<S: AsFd> OpCode for Connect<S> {
814 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
815 syscall!(
816 libc::connect(
817 self.fd.as_fd().as_raw_fd(),
818 self.addr.as_ptr().cast(),
819 self.addr.len()
820 ),
821 wait_writable(self.fd.as_fd().as_raw_fd())
822 )
823 }
824
825 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
826 Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
827 }
828
829 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
830 let mut err: libc::c_int = 0;
831 let mut err_len = std::mem::size_of::<libc::c_int>() as libc::socklen_t;
832
833 syscall!(libc::getsockopt(
834 self.fd.as_fd().as_raw_fd(),
835 libc::SOL_SOCKET,
836 libc::SO_ERROR,
837 &mut err as *mut _ as *mut _,
838 &mut err_len
839 ))?;
840
841 let res = if err == 0 {
842 Ok(0)
843 } else {
844 Err(io::Error::from_raw_os_error(err))
845 };
846 Poll::Ready(res)
847 }
848}
849
850unsafe impl<T: IoBufMut, S: AsFd> OpCode for Recv<T, S> {
851 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
852 Ok(Decision::wait_readable(self.fd.as_fd().as_raw_fd()))
853 }
854
855 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
856 Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
857 }
858
859 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
860 let fd = self.fd.as_fd().as_raw_fd();
861 let flags = self.flags;
862 let slice = self.project().buffer.sys_slice_mut();
863 syscall!(break libc::recv(fd, slice.ptr() as _, slice.len(), flags))
864 }
865}
866
867unsafe impl<T: IoBuf, S: AsFd> OpCode for Send<T, S> {
868 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
869 Ok(Decision::wait_writable(self.fd.as_fd().as_raw_fd()))
870 }
871
872 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
873 Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
874 }
875
876 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
877 let slice = self.buffer.as_init();
878 syscall!(
879 break libc::send(
880 self.fd.as_fd().as_raw_fd(),
881 slice.as_ptr() as _,
882 slice.len(),
883 self.flags,
884 )
885 )
886 }
887}
888
889unsafe impl<S: AsFd> OpCode for crate::op::managed::RecvManaged<S> {
890 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
891 self.project().op.pre_submit()
892 }
893
894 fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
895 self.project().op.op_type()
896 }
897
898 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
899 self.project().op.operate()
900 }
901}
902
903unsafe impl<S: AsFd> OpCode for crate::op::managed::RecvFromManaged<S> {
904 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
905 self.project().op.pre_submit()
906 }
907
908 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
909 self.project().op.op_type()
910 }
911
912 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
913 self.project().op.operate()
914 }
915}
916
917impl<T: IoVectoredBufMut, S: AsFd> RecvVectored<T, S> {
918 unsafe fn call(self: Pin<&mut Self>) -> libc::ssize_t {
919 let this = self.project();
920 unsafe { libc::recvmsg(this.fd.as_fd().as_raw_fd(), this.msg, *this.flags) }
921 }
922}
923
924unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvVectored<T, S> {
925 fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
926 self.as_mut().set_msg();
927 let fd = self.fd.as_fd().as_raw_fd();
928 syscall!(self.as_mut().call(), wait_readable(fd))
929 }
930
931 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
932 Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
933 }
934
935 fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
936 syscall!(break self.as_mut().call())
937 }
938}
939
940impl<T: IoVectoredBuf, S: AsFd> SendVectored<T, S> {
941 unsafe fn call(&self) -> libc::ssize_t {
942 unsafe { libc::sendmsg(self.fd.as_fd().as_raw_fd(), &self.msg, self.flags) }
943 }
944}
945
946unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for SendVectored<T, S> {
947 fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
948 self.as_mut().set_msg();
949 let fd = self.as_mut().project().fd.as_fd().as_raw_fd();
950 syscall!(self.as_mut().call(), wait_writable(fd))
951 }
952
953 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
954 Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
955 }
956
957 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
958 syscall!(break self.call())
959 }
960}
961
962pin_project! {
963 pub struct RecvFrom<T: IoBufMut, S> {
965 pub(crate) fd: S,
966 #[pin]
967 pub(crate) buffer: T,
968 pub(crate) addr: SockAddrStorage,
969 pub(crate) addr_len: socklen_t,
970 pub(crate) flags: i32,
971 _p: PhantomPinned,
972 }
973}
974
975impl<T: IoBufMut, S> RecvFrom<T, S> {
976 pub fn new(fd: S, buffer: T, flags: i32) -> Self {
978 let addr = SockAddrStorage::zeroed();
979 let addr_len = addr.size_of();
980 Self {
981 fd,
982 buffer,
983 addr,
984 addr_len,
985 flags,
986 _p: PhantomPinned,
987 }
988 }
989}
990
991impl<T: IoBufMut, S: AsFd> RecvFrom<T, S> {
992 unsafe fn call(self: Pin<&mut Self>) -> libc::ssize_t {
993 let fd = self.fd.as_fd().as_raw_fd();
994 let this = self.project();
995 let slice = this.buffer.sys_slice_mut();
996 unsafe {
997 libc::recvfrom(
998 fd,
999 slice.ptr() as _,
1000 slice.len(),
1001 *this.flags,
1002 this.addr as *mut _ as _,
1003 this.addr_len,
1004 )
1005 }
1006 }
1007}
1008
1009unsafe impl<T: IoBufMut, S: AsFd> OpCode for RecvFrom<T, S> {
1010 fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
1011 let fd = self.fd.as_fd().as_raw_fd();
1012 syscall!(self.as_mut().call(), wait_readable(fd))
1013 }
1014
1015 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1016 Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
1017 }
1018
1019 fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1020 syscall!(break self.as_mut().call())
1021 }
1022}
1023
1024impl<T: IoBufMut, S> IntoInner for RecvFrom<T, S> {
1025 type Inner = (T, SockAddrStorage, socklen_t);
1026
1027 fn into_inner(self) -> Self::Inner {
1028 (self.buffer, self.addr, self.addr_len)
1029 }
1030}
1031
1032pin_project! {
1033 pub struct RecvFromVectored<T: IoVectoredBufMut, S> {
1035 pub(crate) fd: S,
1036 #[pin]
1037 pub(crate) buffer: T,
1038 pub(crate) slices: Vec<SysSlice>,
1039 pub(crate) addr: SockAddrStorage,
1040 pub(crate) msg: libc::msghdr,
1041 pub(crate) flags: i32,
1042 _p: PhantomPinned,
1043 }
1044}
1045
1046impl<T: IoVectoredBufMut, S> RecvFromVectored<T, S> {
1047 pub fn new(fd: S, buffer: T, flags: i32) -> Self {
1049 Self {
1050 fd,
1051 buffer,
1052 slices: vec![],
1053 addr: SockAddrStorage::zeroed(),
1054 msg: unsafe { std::mem::zeroed() },
1055 flags,
1056 _p: PhantomPinned,
1057 }
1058 }
1059}
1060
1061impl<T: IoVectoredBufMut, S: AsFd> RecvFromVectored<T, S> {
1062 fn set_msg(self: Pin<&mut Self>) {
1063 let this = self.project();
1064 *this.slices = this.buffer.sys_slices_mut();
1065 this.msg.msg_name = this.addr as *mut _ as _;
1066 this.msg.msg_namelen = this.addr.size_of() as _;
1067 this.msg.msg_iov = this.slices.as_mut_ptr() as _;
1068 this.msg.msg_iovlen = this.slices.len() as _;
1069 }
1070
1071 unsafe fn call(self: Pin<&mut Self>) -> libc::ssize_t {
1072 let this = self.project();
1073 unsafe { libc::recvmsg(this.fd.as_fd().as_raw_fd(), this.msg, *this.flags) }
1074 }
1075}
1076
1077unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvFromVectored<T, S> {
1078 fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
1079 self.as_mut().set_msg();
1080 let fd = self.as_mut().project().fd.as_fd().as_raw_fd();
1081 syscall!(self.as_mut().call(), wait_readable(fd))
1082 }
1083
1084 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1085 Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
1086 }
1087
1088 fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1089 syscall!(break self.as_mut().call())
1090 }
1091}
1092
1093impl<T: IoVectoredBufMut, S> IntoInner for RecvFromVectored<T, S> {
1094 type Inner = (T, SockAddrStorage, socklen_t);
1095
1096 fn into_inner(self) -> Self::Inner {
1097 (self.buffer, self.addr, self.msg.msg_namelen)
1098 }
1099}
1100
1101pin_project! {
1102 pub struct SendTo<T: IoBuf, S> {
1104 pub(crate) fd: S,
1105 pub(crate) buffer: T,
1106 pub(crate) addr: SockAddr,
1107 flags: i32,
1108 _p: PhantomPinned,
1109 }
1110}
1111
1112impl<T: IoBuf, S> SendTo<T, S> {
1113 pub fn new(fd: S, buffer: T, addr: SockAddr, flags: i32) -> Self {
1115 Self {
1116 fd,
1117 buffer,
1118 addr,
1119 flags,
1120 _p: PhantomPinned,
1121 }
1122 }
1123}
1124
1125impl<T: IoBuf, S: AsFd> SendTo<T, S> {
1126 unsafe fn call(&self) -> libc::ssize_t {
1127 let slice = self.buffer.as_init();
1128 unsafe {
1129 libc::sendto(
1130 self.fd.as_fd().as_raw_fd(),
1131 slice.as_ptr() as _,
1132 slice.len(),
1133 self.flags,
1134 self.addr.as_ptr().cast(),
1135 self.addr.len(),
1136 )
1137 }
1138 }
1139}
1140
1141unsafe impl<T: IoBuf, S: AsFd> OpCode for SendTo<T, S> {
1142 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
1143 syscall!(self.call(), wait_writable(self.fd.as_fd().as_raw_fd()))
1144 }
1145
1146 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1147 Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
1148 }
1149
1150 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1151 syscall!(break self.call())
1152 }
1153}
1154
1155impl<T: IoBuf, S> IntoInner for SendTo<T, S> {
1156 type Inner = T;
1157
1158 fn into_inner(self) -> Self::Inner {
1159 self.buffer
1160 }
1161}
1162
1163pin_project! {
1164 pub struct SendToVectored<T: IoVectoredBuf, S> {
1166 pub(crate) fd: S,
1167 #[pin]
1168 pub(crate) buffer: T,
1169 pub(crate) addr: SockAddr,
1170 pub(crate) slices: Vec<SysSlice>,
1171 pub(crate) msg: libc::msghdr,
1172 pub(crate) flags: i32,
1173 _p: PhantomPinned,
1174 }
1175}
1176
1177impl<T: IoVectoredBuf, S> SendToVectored<T, S> {
1178 pub fn new(fd: S, buffer: T, addr: SockAddr, flags: i32) -> Self {
1180 Self {
1181 fd,
1182 buffer,
1183 addr,
1184 slices: vec![],
1185 msg: unsafe { std::mem::zeroed() },
1186 flags,
1187 _p: PhantomPinned,
1188 }
1189 }
1190}
1191
1192impl<T: IoVectoredBuf, S: AsFd> SendToVectored<T, S> {
1193 fn set_msg(self: Pin<&mut Self>) {
1194 let this = self.project();
1195 *this.slices = this.buffer.as_ref().sys_slices();
1196 this.msg.msg_name = this.addr as *mut _ as _;
1197 this.msg.msg_namelen = this.addr.len() as _;
1198 this.msg.msg_iov = this.slices.as_mut_ptr() as _;
1199 this.msg.msg_iovlen = this.slices.len() as _;
1200 }
1201
1202 unsafe fn call(self: Pin<&mut Self>) -> libc::ssize_t {
1203 unsafe { libc::sendmsg(self.fd.as_fd().as_raw_fd(), &self.msg, self.flags) }
1204 }
1205}
1206
1207unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for SendToVectored<T, S> {
1208 fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
1209 self.as_mut().set_msg();
1210 let fd = self.fd.as_fd().as_raw_fd();
1211 syscall!(self.as_mut().call(), wait_writable(fd))
1212 }
1213
1214 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1215 Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
1216 }
1217
1218 fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1219 syscall!(break self.as_mut().call())
1220 }
1221}
1222
1223impl<T: IoVectoredBuf, S> IntoInner for SendToVectored<T, S> {
1224 type Inner = T;
1225
1226 fn into_inner(self) -> Self::Inner {
1227 self.buffer
1228 }
1229}
1230
1231impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> RecvMsg<T, C, S> {
1232 unsafe fn call(self: Pin<&mut Self>) -> libc::ssize_t {
1233 let this = self.project();
1234 unsafe { libc::recvmsg(this.fd.as_fd().as_raw_fd(), this.msg, *this.flags) }
1235 }
1236}
1237
1238unsafe impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> OpCode for RecvMsg<T, C, S> {
1239 fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
1240 self.as_mut().set_msg();
1241 let fd = self.fd.as_fd().as_raw_fd();
1242 syscall!(self.as_mut().call(), wait_readable(fd))
1243 }
1244
1245 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1246 Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
1247 }
1248
1249 fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1250 syscall!(break self.as_mut().call())
1251 }
1252}
1253
1254impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> SendMsg<T, C, S> {
1255 unsafe fn call(self: Pin<&mut Self>) -> libc::ssize_t {
1256 unsafe { libc::sendmsg(self.fd.as_fd().as_raw_fd(), &self.msg, self.flags) }
1257 }
1258}
1259
1260unsafe impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> OpCode for SendMsg<T, C, S> {
1261 fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
1262 self.as_mut().set_msg();
1263 let fd = self.fd.as_fd().as_raw_fd();
1264 syscall!(self.as_mut().call(), wait_writable(fd))
1265 }
1266
1267 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1268 Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
1269 }
1270
1271 fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1272 syscall!(break self.as_mut().call())
1273 }
1274}
1275
1276unsafe impl<S: AsFd> OpCode for PollOnce<S> {
1277 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
1278 Ok(Decision::wait_for(
1279 self.fd.as_fd().as_raw_fd(),
1280 self.interest,
1281 ))
1282 }
1283
1284 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1285 Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
1286 }
1287
1288 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1289 Poll::Ready(Ok(0))
1290 }
1291}
1292
1293#[cfg(linux_all)]
1294unsafe impl<S1: AsFd, S2: AsFd> OpCode for Splice<S1, S2> {
1295 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
1296 use super::WaitArg;
1297
1298 Ok(Decision::wait_for_many([
1299 WaitArg::readable(self.fd_in.as_fd().as_raw_fd()),
1300 WaitArg::writable(self.fd_out.as_fd().as_raw_fd()),
1301 ]))
1302 }
1303
1304 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1305 Some(OpType::multi_fd([
1306 self.fd_in.as_fd().as_raw_fd(),
1307 self.fd_out.as_fd().as_raw_fd(),
1308 ]))
1309 }
1310
1311 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1312 let mut offset_in = self.offset_in;
1313 let mut offset_out = self.offset_out;
1314 let offset_in_ptr = if offset_in < 0 {
1315 std::ptr::null_mut()
1316 } else {
1317 &mut offset_in
1318 };
1319 let offset_out_ptr = if offset_out < 0 {
1320 std::ptr::null_mut()
1321 } else {
1322 &mut offset_out
1323 };
1324 syscall!(
1325 break libc::splice(
1326 self.fd_in.as_fd().as_raw_fd(),
1327 offset_in_ptr,
1328 self.fd_out.as_fd().as_raw_fd(),
1329 offset_out_ptr,
1330 self.len,
1331 self.flags | libc::SPLICE_F_NONBLOCK,
1332 )
1333 )
1334 }
1335}