1use std::{
2 collections::VecDeque,
3 ffi::CString,
4 io,
5 marker::PhantomPinned,
6 os::fd::{AsFd, AsRawFd, FromRawFd, IntoRawFd, OwnedFd},
7 pin::Pin,
8};
9
10use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
11use io_uring::{
12 opcode,
13 types::{Fd, FsyncFlags},
14};
15use pin_project_lite::pin_project;
16use socket2::{SockAddr, SockAddrStorage, Socket as Socket2, socklen_t};
17
18use super::OpCode;
19pub use crate::sys::unix_op::*;
20use crate::{Extra, OpEntry, op::*, sys_slice::*, syscall};
21
22unsafe impl<
23 D: std::marker::Send + 'static,
24 F: (FnOnce() -> BufResult<usize, D>) + std::marker::Send + 'static,
25> OpCode for Asyncify<F, D>
26{
27 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
28 OpEntry::Blocking
29 }
30
31 fn call_blocking(self: Pin<&mut Self>) -> std::io::Result<usize> {
32 let this = self.project();
33 let f = this
34 .f
35 .take()
36 .expect("the operate method could only be called once");
37 let BufResult(res, data) = f();
38 *this.data = Some(data);
39 res
40 }
41}
42
43unsafe impl<
44 S,
45 D: std::marker::Send + 'static,
46 F: (FnOnce(&S) -> BufResult<usize, D>) + std::marker::Send + 'static,
47> OpCode for AsyncifyFd<S, F, D>
48{
49 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
50 OpEntry::Blocking
51 }
52
53 fn call_blocking(self: Pin<&mut Self>) -> std::io::Result<usize> {
54 let this = self.project();
55 let f = this
56 .f
57 .take()
58 .expect("the operate method could only be called once");
59 let BufResult(res, data) = f(this.fd);
60 *this.data = Some(data);
61 res
62 }
63}
64
65unsafe impl<
66 S1,
67 S2,
68 D: std::marker::Send + 'static,
69 F: (FnOnce(&S1, &S2) -> BufResult<usize, D>) + std::marker::Send + 'static,
70> OpCode for AsyncifyFd2<S1, S2, F, D>
71{
72 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
73 OpEntry::Blocking
74 }
75
76 fn call_blocking(self: Pin<&mut Self>) -> std::io::Result<usize> {
77 let this = self.project();
78 let f = this
79 .f
80 .take()
81 .expect("the operate method could only be called once");
82 let BufResult(res, data) = f(this.fd1, this.fd2);
83 *this.data = Some(data);
84 res
85 }
86}
87
88unsafe impl<S: AsFd> OpCode for OpenFile<S> {
89 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
90 opcode::OpenAt::new(Fd(self.dirfd.as_fd().as_raw_fd()), self.path.as_ptr())
91 .flags(self.flags | libc::O_CLOEXEC)
92 .mode(self.mode)
93 .build()
94 .into()
95 }
96
97 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
98 self.call()
99 }
100
101 unsafe fn set_result(self: Pin<&mut Self>, res: &io::Result<usize>, _: &Extra) {
102 if let Ok(fd) = res {
103 let fd = unsafe { OwnedFd::from_raw_fd(*fd as _) };
105 *self.project().opened_fd = Some(fd);
106 }
107 }
108}
109
110unsafe impl OpCode for CloseFile {
111 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
112 opcode::Close::new(Fd(self.fd.as_fd().as_raw_fd()))
113 .build()
114 .into()
115 }
116
117 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
118 self.call()
119 }
120}
121
122unsafe impl<S: AsFd> OpCode for TruncateFile<S> {
123 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
124 opcode::Ftruncate::new(Fd(self.fd.as_fd().as_raw_fd()), self.size)
125 .build()
126 .into()
127 }
128
129 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
130 self.call()
131 }
132}
133
134pin_project! {
135 pub struct FileStat<S> {
137 pub(crate) fd: S,
138 pub(crate) stat: Statx,
139 }
140}
141
142impl<S> FileStat<S> {
143 pub fn new(fd: S) -> Self {
145 Self {
146 fd,
147 stat: unsafe { std::mem::zeroed() },
148 }
149 }
150}
151
152unsafe impl<S: AsFd> OpCode for FileStat<S> {
153 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
154 let this = self.project();
155 static EMPTY_NAME: &[u8] = b"\0";
156 opcode::Statx::new(
157 Fd(this.fd.as_fd().as_fd().as_raw_fd()),
158 EMPTY_NAME.as_ptr().cast(),
159 this.stat as *mut _ as _,
160 )
161 .flags(libc::AT_EMPTY_PATH)
162 .mask(statx_mask())
163 .build()
164 .into()
165 }
166
167 #[cfg(gnulinux)]
168 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
169 let this = self.project();
170 static EMPTY_NAME: &[u8] = b"\0";
171 let res = syscall!(libc::statx(
172 this.fd.as_fd().as_raw_fd(),
173 EMPTY_NAME.as_ptr().cast(),
174 libc::AT_EMPTY_PATH,
175 statx_mask(),
176 this.stat as *mut _ as _
177 ))?;
178 Ok(res as _)
179 }
180
181 #[cfg(not(gnulinux))]
182 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
183 let this = self.project();
184 let mut stat = unsafe { std::mem::zeroed() };
185 let res = syscall!(libc::fstat(this.fd.as_fd().as_raw_fd(), &mut stat))?;
186 *this.stat = stat_to_statx(stat);
187 Ok(res as _)
188 }
189}
190
191impl<S> IntoInner for FileStat<S> {
192 type Inner = Stat;
193
194 fn into_inner(self) -> Self::Inner {
195 statx_to_stat(self.stat)
196 }
197}
198
199pin_project! {
200 pub struct PathStat<S: AsFd> {
202 pub(crate) dirfd: S,
203 pub(crate) path: CString,
204 pub(crate) stat: Statx,
205 pub(crate) follow_symlink: bool,
206 }
207}
208
209impl<S: AsFd> PathStat<S> {
210 pub fn new(dirfd: S, path: CString, follow_symlink: bool) -> Self {
212 Self {
213 dirfd,
214 path,
215 stat: unsafe { std::mem::zeroed() },
216 follow_symlink,
217 }
218 }
219}
220
221unsafe impl<S: AsFd> OpCode for PathStat<S> {
222 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
223 let this = self.project();
224 let mut flags = libc::AT_EMPTY_PATH;
225 if !*this.follow_symlink {
226 flags |= libc::AT_SYMLINK_NOFOLLOW;
227 }
228 opcode::Statx::new(
229 Fd(this.dirfd.as_fd().as_raw_fd()),
230 this.path.as_ptr(),
231 this.stat as *mut _ as _,
232 )
233 .flags(flags)
234 .mask(statx_mask())
235 .build()
236 .into()
237 }
238
239 #[cfg(gnulinux)]
240 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
241 let this = self.project();
242 let mut flags = libc::AT_EMPTY_PATH;
243 if !*this.follow_symlink {
244 flags |= libc::AT_SYMLINK_NOFOLLOW;
245 }
246 let res = syscall!(libc::statx(
247 this.dirfd.as_fd().as_raw_fd(),
248 this.path.as_ptr(),
249 flags,
250 statx_mask(),
251 this.stat
252 ))?;
253 Ok(res as _)
254 }
255
256 #[cfg(not(gnulinux))]
257 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
258 let this = self.project();
259 let mut flags = libc::AT_EMPTY_PATH;
260 if !*this.follow_symlink {
261 flags |= libc::AT_SYMLINK_NOFOLLOW;
262 }
263 let mut stat = unsafe { std::mem::zeroed() };
264 let res = syscall!(libc::fstatat(
265 this.dirfd.as_fd().as_raw_fd(),
266 this.path.as_ptr(),
267 &mut stat,
268 flags
269 ))?;
270 *this.stat = stat_to_statx(stat);
271 Ok(res as _)
272 }
273}
274
275impl<S: AsFd> IntoInner for PathStat<S> {
276 type Inner = Stat;
277
278 fn into_inner(self) -> Self::Inner {
279 statx_to_stat(self.stat)
280 }
281}
282
283unsafe impl<T: IoBufMut, S: AsFd> OpCode for ReadAt<T, S> {
284 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
285 let this = self.project();
286 let fd = Fd(this.fd.as_fd().as_raw_fd());
287 let slice = this.buffer.sys_slice_mut();
288 opcode::Read::new(
289 fd,
290 slice.ptr() as _,
291 slice.len().try_into().unwrap_or(u32::MAX),
292 )
293 .offset(*this.offset)
294 .build()
295 .into()
296 }
297}
298
299unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectoredAt<T, S> {
300 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
301 let this = self.project();
302 *this.slices = this.buffer.sys_slices_mut();
303 opcode::Readv::new(
304 Fd(this.fd.as_fd().as_raw_fd()),
305 this.slices.as_ptr() as _,
306 this.slices.len().try_into().unwrap_or(u32::MAX),
307 )
308 .offset(*this.offset)
309 .build()
310 .into()
311 }
312}
313
314unsafe impl<T: IoBuf, S: AsFd> OpCode for WriteAt<T, S> {
315 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
316 let slice = self.buffer.as_init();
317 opcode::Write::new(
318 Fd(self.fd.as_fd().as_raw_fd()),
319 slice.as_ptr(),
320 slice.len().try_into().unwrap_or(u32::MAX),
321 )
322 .offset(self.offset)
323 .build()
324 .into()
325 }
326}
327
328unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectoredAt<T, S> {
329 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
330 let this = self.project();
331 *this.slices = this.buffer.as_ref().sys_slices();
332 opcode::Writev::new(
333 Fd(this.fd.as_fd().as_raw_fd()),
334 this.slices.as_ptr() as _,
335 this.slices.len().try_into().unwrap_or(u32::MAX),
336 )
337 .offset(*this.offset)
338 .build()
339 .into()
340 }
341}
342
343unsafe impl<T: IoBufMut, S: AsFd> OpCode for Read<T, S> {
344 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
345 let fd = self.fd.as_fd().as_raw_fd();
346 let slice = self.project().buffer.sys_slice_mut();
347 opcode::Read::new(
348 Fd(fd),
349 slice.ptr() as _,
350 slice.len().try_into().unwrap_or(u32::MAX),
351 )
352 .build()
353 .into()
354 }
355}
356
357unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectored<T, S> {
358 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
359 let this = self.project();
360 *this.slices = this.buffer.sys_slices_mut();
361 opcode::Readv::new(
362 Fd(this.fd.as_fd().as_raw_fd()),
363 this.slices.as_ptr() as _,
364 this.slices.len().try_into().unwrap_or(u32::MAX),
365 )
366 .build()
367 .into()
368 }
369}
370
371unsafe impl<T: IoBuf, S: AsFd> OpCode for Write<T, S> {
372 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
373 let slice = self.buffer.as_init();
374 opcode::Write::new(
375 Fd(self.fd.as_fd().as_raw_fd()),
376 slice.as_ptr(),
377 slice.len().try_into().unwrap_or(u32::MAX),
378 )
379 .build()
380 .into()
381 }
382}
383
384unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectored<T, S> {
385 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
386 let this = self.project();
387 *this.slices = this.buffer.as_ref().sys_slices();
388 opcode::Writev::new(
389 Fd(this.fd.as_fd().as_raw_fd()),
390 this.slices.as_ptr() as _,
391 this.slices.len().try_into().unwrap_or(u32::MAX),
392 )
393 .build()
394 .into()
395 }
396}
397
398unsafe impl<S: AsFd> OpCode for Sync<S> {
399 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
400 opcode::Fsync::new(Fd(self.fd.as_fd().as_raw_fd()))
401 .flags(if self.datasync {
402 FsyncFlags::DATASYNC
403 } else {
404 FsyncFlags::empty()
405 })
406 .build()
407 .into()
408 }
409}
410
411unsafe impl<S: AsFd> OpCode for Unlink<S> {
412 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
413 opcode::UnlinkAt::new(Fd(self.dirfd.as_fd().as_raw_fd()), self.path.as_ptr())
414 .flags(if self.dir { libc::AT_REMOVEDIR } else { 0 })
415 .build()
416 .into()
417 }
418
419 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
420 self.call()
421 }
422}
423
424unsafe impl<S: AsFd> OpCode for CreateDir<S> {
425 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
426 opcode::MkDirAt::new(Fd(self.dirfd.as_fd().as_raw_fd()), self.path.as_ptr())
427 .mode(self.mode)
428 .build()
429 .into()
430 }
431
432 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
433 self.call()
434 }
435}
436
437unsafe impl<S1: AsFd, S2: AsFd> OpCode for Rename<S1, S2> {
438 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
439 opcode::RenameAt::new(
440 Fd(self.old_dirfd.as_fd().as_raw_fd()),
441 self.old_path.as_ptr(),
442 Fd(self.new_dirfd.as_fd().as_raw_fd()),
443 self.new_path.as_ptr(),
444 )
445 .build()
446 .into()
447 }
448
449 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
450 self.call()
451 }
452}
453
454unsafe impl<S: AsFd> OpCode for Symlink<S> {
455 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
456 opcode::SymlinkAt::new(
457 Fd(self.dirfd.as_fd().as_raw_fd()),
458 self.source.as_ptr(),
459 self.target.as_ptr(),
460 )
461 .build()
462 .into()
463 }
464
465 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
466 self.call()
467 }
468}
469
470unsafe impl<S1: AsFd, S2: AsFd> OpCode for HardLink<S1, S2> {
471 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
472 opcode::LinkAt::new(
473 Fd(self.source_dirfd.as_fd().as_raw_fd()),
474 self.source.as_ptr(),
475 Fd(self.target_dirfd.as_fd().as_raw_fd()),
476 self.target.as_ptr(),
477 )
478 .build()
479 .into()
480 }
481
482 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
483 self.call()
484 }
485}
486
487unsafe impl OpCode for CreateSocket {
488 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
489 opcode::Socket::new(
490 self.domain,
491 self.socket_type | libc::SOCK_CLOEXEC,
492 self.protocol,
493 )
494 .build()
495 .into()
496 }
497
498 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
499 Ok(syscall!(libc::socket(
500 self.domain,
501 self.socket_type | libc::SOCK_CLOEXEC,
502 self.protocol
503 ))? as _)
504 }
505
506 unsafe fn set_result(self: Pin<&mut Self>, res: &io::Result<usize>, _: &Extra) {
507 if let Ok(fd) = res {
508 let fd = unsafe { Socket2::from_raw_fd(*fd as _) };
510 *self.project().opened_fd = Some(fd);
511 }
512 }
513}
514
515unsafe impl<S: AsFd> OpCode for ShutdownSocket<S> {
516 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
517 opcode::Shutdown::new(Fd(self.fd.as_fd().as_raw_fd()), self.how())
518 .build()
519 .into()
520 }
521
522 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
523 self.call()
524 }
525}
526
527unsafe impl OpCode for CloseSocket {
528 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
529 opcode::Close::new(Fd(self.fd.as_fd().as_raw_fd()))
530 .build()
531 .into()
532 }
533
534 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
535 self.call()
536 }
537}
538
539unsafe impl<S: AsFd> OpCode for Accept<S> {
540 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
541 let this = self.project();
542 opcode::Accept::new(
543 Fd(this.fd.as_fd().as_raw_fd()),
544 unsafe { this.buffer.view_as::<libc::sockaddr>() },
545 this.addr_len,
546 )
547 .flags(libc::SOCK_CLOEXEC)
548 .build()
549 .into()
550 }
551
552 unsafe fn set_result(self: Pin<&mut Self>, res: &io::Result<usize>, _: &Extra) {
553 if let Ok(fd) = res {
554 let fd = unsafe { Socket2::from_raw_fd(*fd as _) };
556 *self.project().accepted_fd = Some(fd);
557 }
558 }
559}
560
561struct AcceptMultishotResult {
562 res: io::Result<Socket2>,
563 extra: crate::Extra,
564}
565
566impl AcceptMultishotResult {
567 pub unsafe fn new(res: io::Result<usize>, extra: crate::Extra) -> Self {
568 Self {
569 res: res.map(|fd| unsafe { Socket2::from_raw_fd(fd as _) }),
570 extra,
571 }
572 }
573
574 pub fn into_result(self) -> BufResult<usize, crate::Extra> {
575 BufResult(self.res.map(|fd| fd.into_raw_fd() as _), self.extra)
576 }
577}
578
579pin_project! {
580 pub struct AcceptMulti<S> {
582 #[pin]
583 pub(crate) op: Accept<S>,
584 multishots: VecDeque<AcceptMultishotResult>
585 }
586}
587
588impl<S> AcceptMulti<S> {
589 pub fn new(fd: S) -> Self {
591 Self {
592 op: Accept::new(fd),
593 multishots: VecDeque::new(),
594 }
595 }
596}
597
598unsafe impl<S: AsFd> OpCode for AcceptMulti<S> {
599 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
600 let this = self.project();
601 opcode::AcceptMulti::new(Fd(this.op.fd.as_fd().as_raw_fd()))
602 .flags(libc::SOCK_CLOEXEC)
603 .build()
604 .into()
605 }
606
607 fn create_entry_fallback(self: Pin<&mut Self>) -> OpEntry {
608 self.project().op.create_entry()
609 }
610
611 unsafe fn set_result(self: Pin<&mut Self>, res: &io::Result<usize>, extra: &crate::Extra) {
612 unsafe { self.project().op.set_result(res, extra) }
613 }
614
615 unsafe fn push_multishot(self: Pin<&mut Self>, res: io::Result<usize>, extra: crate::Extra) {
616 self.project()
617 .multishots
618 .push_back(unsafe { AcceptMultishotResult::new(res, extra) });
619 }
620
621 fn pop_multishot(self: Pin<&mut Self>) -> Option<BufResult<usize, crate::sys::Extra>> {
622 self.project()
623 .multishots
624 .pop_front()
625 .map(|res| res.into_result())
626 }
627}
628
629impl<S> IntoInner for AcceptMulti<S> {
630 type Inner = Socket2;
631
632 fn into_inner(self) -> Self::Inner {
633 self.op.into_inner().0
634 }
635}
636
637unsafe impl<S: AsFd> OpCode for Connect<S> {
638 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
639 opcode::Connect::new(
640 Fd(self.fd.as_fd().as_raw_fd()),
641 self.addr.as_ptr().cast(),
642 self.addr.len(),
643 )
644 .build()
645 .into()
646 }
647}
648
649unsafe impl<T: IoBufMut, S: AsFd> OpCode for Recv<T, S> {
650 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
651 let fd = self.fd.as_fd().as_raw_fd();
652 let flags = self.flags;
653 let slice = self.project().buffer.sys_slice_mut();
654 opcode::Recv::new(
655 Fd(fd),
656 slice.ptr() as _,
657 slice.len().try_into().unwrap_or(u32::MAX),
658 )
659 .flags(flags)
660 .build()
661 .into()
662 }
663}
664
665unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvVectored<T, S> {
666 fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
667 self.as_mut().set_msg();
668 let this = self.project();
669 opcode::RecvMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
670 .flags(*this.flags as _)
671 .build()
672 .into()
673 }
674}
675
676unsafe impl<T: IoBuf, S: AsFd> OpCode for Send<T, S> {
677 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
678 let slice = self.buffer.as_init();
679 opcode::Send::new(
680 Fd(self.fd.as_fd().as_raw_fd()),
681 slice.as_ptr(),
682 slice.len().try_into().unwrap_or(u32::MAX),
683 )
684 .flags(self.flags)
685 .build()
686 .into()
687 }
688}
689
690pin_project! {
691 pub struct SendZc<T: IoBuf, S> {
693 #[pin]
694 pub(crate) op: Send<T, S>,
695 pub(crate) res: Option<BufResult<usize, crate::Extra>>,
696 _p: PhantomPinned,
697 }
698}
699
700impl<T: IoBuf, S> SendZc<T, S> {
701 pub fn new(fd: S, buffer: T, flags: i32) -> Self {
703 Self {
704 op: Send::new(fd, buffer, flags),
705 res: None,
706 _p: PhantomPinned,
707 }
708 }
709}
710
711unsafe impl<T: IoBuf, S: AsFd> OpCode for SendZc<T, S> {
712 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
713 let this = self.project();
714 let slice = this.op.buffer.as_init();
715 opcode::SendZc::new(
716 Fd(this.op.fd.as_fd().as_raw_fd()),
717 slice.as_ptr(),
718 slice.len().try_into().unwrap_or(u32::MAX),
719 )
720 .flags(this.op.flags)
721 .build()
722 .into()
723 }
724
725 fn create_entry_fallback(self: Pin<&mut Self>) -> OpEntry {
726 self.project().op.create_entry()
727 }
728
729 unsafe fn push_multishot(self: Pin<&mut Self>, res: io::Result<usize>, extra: crate::Extra) {
730 self.project().res.replace(BufResult(res, extra));
731 }
732
733 fn pop_multishot(self: Pin<&mut Self>) -> Option<BufResult<usize, crate::Extra>> {
734 self.project().res.take()
735 }
736}
737
738impl<T: IoBuf, S> IntoInner for SendZc<T, S> {
739 type Inner = T;
740
741 fn into_inner(self) -> Self::Inner {
742 self.op.into_inner()
743 }
744}
745
746unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for SendVectored<T, S> {
747 fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
748 self.as_mut().set_msg();
749 let this = self.project();
750 opcode::SendMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
751 .flags(*this.flags as _)
752 .build()
753 .into()
754 }
755}
756
757pin_project! {
758 pub struct SendVectoredZc<T: IoVectoredBuf, S> {
760 #[pin]
761 pub(crate) op: SendVectored<T, S>,
762 pub(crate) res: Option<BufResult<usize, crate::Extra>>,
763 _p: PhantomPinned,
764 }
765}
766
767impl<T: IoVectoredBuf, S> SendVectoredZc<T, S> {
768 pub fn new(fd: S, buffer: T, flags: i32) -> Self {
770 Self {
771 op: SendVectored::new(fd, buffer, flags),
772 res: None,
773 _p: PhantomPinned,
774 }
775 }
776}
777
778unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for SendVectoredZc<T, S> {
779 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
780 let mut this = self.project();
781 this.op.as_mut().set_msg();
782 let op = this.op.project();
783 opcode::SendMsgZc::new(Fd(op.fd.as_fd().as_raw_fd()), op.msg)
784 .flags(*op.flags as _)
785 .build()
786 .into()
787 }
788
789 fn create_entry_fallback(self: Pin<&mut Self>) -> OpEntry {
790 self.project().op.create_entry()
791 }
792
793 unsafe fn push_multishot(self: Pin<&mut Self>, res: io::Result<usize>, extra: crate::Extra) {
794 self.project().res.replace(BufResult(res, extra));
795 }
796
797 fn pop_multishot(self: Pin<&mut Self>) -> Option<BufResult<usize, crate::Extra>> {
798 self.project().res.take()
799 }
800}
801
802impl<T: IoVectoredBuf, S> IntoInner for SendVectoredZc<T, S> {
803 type Inner = T;
804
805 fn into_inner(self) -> Self::Inner {
806 self.op.into_inner()
807 }
808}
809
810struct RecvFromHeader<S> {
811 pub(crate) fd: S,
812 pub(crate) addr: SockAddrStorage,
813 pub(crate) msg: libc::msghdr,
814 pub(crate) flags: i32,
815 _p: PhantomPinned,
816}
817
818impl<S> RecvFromHeader<S> {
819 pub fn new(fd: S, flags: i32) -> Self {
820 Self {
821 fd,
822 addr: SockAddrStorage::zeroed(),
823 msg: unsafe { std::mem::zeroed() },
824 flags,
825 _p: PhantomPinned,
826 }
827 }
828}
829
830impl<S: AsFd> RecvFromHeader<S> {
831 pub fn create_entry(&mut self, slices: &mut [SysSlice]) -> OpEntry {
832 self.msg.msg_name = &mut self.addr as *mut _ as _;
833 self.msg.msg_namelen = self.addr.size_of() as _;
834 self.msg.msg_iov = slices.as_mut_ptr() as _;
835 self.msg.msg_iovlen = slices.len() as _;
836 opcode::RecvMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &mut self.msg)
837 .flags(self.flags as _)
838 .build()
839 .into()
840 }
841
842 pub fn into_addr(self) -> (SockAddrStorage, socklen_t) {
843 (self.addr, self.msg.msg_namelen)
844 }
845}
846
847pin_project! {
848 pub struct RecvFrom<T: IoBufMut, S> {
850 header: RecvFromHeader<S>,
851 #[pin]
852 buffer: T,
853 slice: Option<SysSlice>,
854 }
855}
856
857impl<T: IoBufMut, S> RecvFrom<T, S> {
858 pub fn new(fd: S, buffer: T, flags: i32) -> Self {
860 Self {
861 header: RecvFromHeader::new(fd, flags),
862 buffer,
863 slice: None,
864 }
865 }
866}
867
868unsafe impl<T: IoBufMut, S: AsFd> OpCode for RecvFrom<T, S> {
869 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
870 let this = self.project();
871 let slice = this.slice.insert(this.buffer.sys_slice_mut());
872 this.header.create_entry(std::slice::from_mut(slice))
873 }
874}
875
876impl<T: IoBufMut, S: AsFd> IntoInner for RecvFrom<T, S> {
877 type Inner = (T, SockAddrStorage, socklen_t);
878
879 fn into_inner(self) -> Self::Inner {
880 let (addr, addr_len) = self.header.into_addr();
881 (self.buffer, addr, addr_len)
882 }
883}
884
885pin_project! {
886 pub struct RecvFromVectored<T: IoVectoredBufMut, S> {
888 header: RecvFromHeader<S>,
889 #[pin]
890 buffer: T,
891 slice: Vec<SysSlice>,
892 }
893}
894
895impl<T: IoVectoredBufMut, S> RecvFromVectored<T, S> {
896 pub fn new(fd: S, buffer: T, flags: i32) -> Self {
898 Self {
899 header: RecvFromHeader::new(fd, flags),
900 buffer,
901 slice: vec![],
902 }
903 }
904}
905
906unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvFromVectored<T, S> {
907 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
908 let this = self.project();
909 *this.slice = this.buffer.sys_slices_mut();
910 this.header.create_entry(this.slice)
911 }
912}
913
914impl<T: IoVectoredBufMut, S: AsFd> IntoInner for RecvFromVectored<T, S> {
915 type Inner = (T, SockAddrStorage, socklen_t);
916
917 fn into_inner(self) -> Self::Inner {
918 let (addr, addr_len) = self.header.into_addr();
919 (self.buffer, addr, addr_len)
920 }
921}
922
923struct SendToHeader<S> {
924 pub(crate) fd: S,
925 pub(crate) addr: SockAddr,
926 pub(crate) msg: libc::msghdr,
927 pub(crate) flags: i32,
928 _p: PhantomPinned,
929}
930
931impl<S> SendToHeader<S> {
932 pub fn new(fd: S, addr: SockAddr, flags: i32) -> Self {
933 Self {
934 fd,
935 addr,
936 msg: unsafe { std::mem::zeroed() },
937 flags,
938 _p: PhantomPinned,
939 }
940 }
941}
942
943impl<S: AsFd> SendToHeader<S> {
944 pub fn set_msg(&mut self, slices: &mut [SysSlice]) {
945 self.msg.msg_name = self.addr.as_ptr() as _;
946 self.msg.msg_namelen = self.addr.len();
947 self.msg.msg_iov = slices.as_mut_ptr() as _;
948 self.msg.msg_iovlen = slices.len() as _;
949 }
950}
951
952pin_project! {
953 pub struct SendTo<T: IoBuf, S> {
955 header: SendToHeader<S>,
956 #[pin]
957 buffer: T,
958 slice: Option<SysSlice>,
959 }
960}
961
962impl<T: IoBuf, S> SendTo<T, S> {
963 pub fn new(fd: S, buffer: T, addr: SockAddr, flags: i32) -> Self {
965 Self {
966 header: SendToHeader::new(fd, addr, flags),
967 buffer,
968 slice: None,
969 }
970 }
971}
972
973unsafe impl<T: IoBuf, S: AsFd> OpCode for SendTo<T, S> {
974 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
975 let this = self.project();
976 let slice = this.slice.insert(this.buffer.as_ref().sys_slice());
977 this.header.set_msg(std::slice::from_mut(slice));
978 opcode::SendMsg::new(Fd(this.header.fd.as_fd().as_raw_fd()), &this.header.msg)
979 .flags(this.header.flags as _)
980 .build()
981 .into()
982 }
983}
984
985impl<T: IoBuf, S> IntoInner for SendTo<T, S> {
986 type Inner = T;
987
988 fn into_inner(self) -> Self::Inner {
989 self.buffer
990 }
991}
992
993pin_project! {
994 pub struct SendToZc<T: IoBuf, S: AsFd> {
996 #[pin]
997 pub(crate) op: SendTo<T, S>,
998 pub(crate) res: Option<BufResult<usize, crate::Extra>>,
999 _p: PhantomPinned,
1000 }
1001}
1002
1003impl<T: IoBuf, S: AsFd> SendToZc<T, S> {
1004 pub fn new(fd: S, buffer: T, addr: SockAddr, flags: i32) -> Self {
1006 Self {
1007 op: SendTo::new(fd, buffer, addr, flags),
1008 res: None,
1009 _p: PhantomPinned,
1010 }
1011 }
1012}
1013
1014unsafe impl<T: IoBuf, S: AsFd> OpCode for SendToZc<T, S> {
1015 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
1016 let this = self.project().op.project();
1017 let slice = this.slice.insert(this.buffer.as_ref().sys_slice());
1018 this.header.set_msg(std::slice::from_mut(slice));
1019 opcode::SendMsgZc::new(Fd(this.header.fd.as_fd().as_raw_fd()), &this.header.msg)
1020 .flags(this.header.flags as _)
1021 .build()
1022 .into()
1023 }
1024
1025 fn create_entry_fallback(self: Pin<&mut Self>) -> OpEntry {
1026 self.project().op.create_entry()
1027 }
1028
1029 unsafe fn push_multishot(self: Pin<&mut Self>, res: io::Result<usize>, extra: crate::Extra) {
1030 self.project().res.replace(BufResult(res, extra));
1031 }
1032
1033 fn pop_multishot(self: Pin<&mut Self>) -> Option<BufResult<usize, crate::Extra>> {
1034 self.project().res.take()
1035 }
1036}
1037
1038impl<T: IoBuf, S: AsFd> IntoInner for SendToZc<T, S> {
1039 type Inner = T;
1040
1041 fn into_inner(self) -> Self::Inner {
1042 self.op.into_inner()
1043 }
1044}
1045
1046pin_project! {
1047 pub struct SendToVectored<T: IoVectoredBuf, S> {
1049 header: SendToHeader<S>,
1050 #[pin]
1051 buffer: T,
1052 slice: Vec<SysSlice>,
1053 }
1054}
1055
1056impl<T: IoVectoredBuf, S> SendToVectored<T, S> {
1057 pub fn new(fd: S, buffer: T, addr: SockAddr, flags: i32) -> Self {
1059 Self {
1060 header: SendToHeader::new(fd, addr, flags),
1061 buffer,
1062 slice: vec![],
1063 }
1064 }
1065}
1066
1067unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for SendToVectored<T, S> {
1068 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
1069 let this = self.project();
1070 *this.slice = this.buffer.as_ref().sys_slices();
1071 this.header.set_msg(this.slice);
1072 opcode::SendMsg::new(Fd(this.header.fd.as_fd().as_raw_fd()), &this.header.msg)
1073 .flags(this.header.flags as _)
1074 .build()
1075 .into()
1076 }
1077}
1078
1079impl<T: IoVectoredBuf, S> IntoInner for SendToVectored<T, S> {
1080 type Inner = T;
1081
1082 fn into_inner(self) -> Self::Inner {
1083 self.buffer
1084 }
1085}
1086
1087pin_project! {
1088 pub struct SendToVectoredZc<T: IoVectoredBuf, S: AsFd> {
1090 #[pin]
1091 pub(crate) op: SendToVectored<T, S>,
1092 pub(crate) res: Option<BufResult<usize, crate::Extra>>,
1093 _p: PhantomPinned,
1094 }
1095}
1096
1097impl<T: IoVectoredBuf, S: AsFd> SendToVectoredZc<T, S> {
1098 pub fn new(fd: S, buffer: T, addr: SockAddr, flags: i32) -> Self {
1100 Self {
1101 op: SendToVectored::new(fd, buffer, addr, flags),
1102 res: None,
1103 _p: PhantomPinned,
1104 }
1105 }
1106}
1107
1108unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for SendToVectoredZc<T, S> {
1109 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
1110 let this = self.project().op.project();
1111 *this.slice = this.buffer.as_ref().sys_slices();
1112 this.header.set_msg(this.slice);
1113 opcode::SendMsgZc::new(Fd(this.header.fd.as_fd().as_raw_fd()), &this.header.msg)
1114 .flags(this.header.flags as _)
1115 .build()
1116 .into()
1117 }
1118
1119 fn create_entry_fallback(self: Pin<&mut Self>) -> OpEntry {
1120 self.project().op.create_entry()
1121 }
1122
1123 unsafe fn push_multishot(self: Pin<&mut Self>, res: io::Result<usize>, extra: crate::Extra) {
1124 self.project().res.replace(BufResult(res, extra));
1125 }
1126
1127 fn pop_multishot(self: Pin<&mut Self>) -> Option<BufResult<usize, crate::Extra>> {
1128 self.project().res.take()
1129 }
1130}
1131
1132impl<T: IoVectoredBuf, S: AsFd> IntoInner for SendToVectoredZc<T, S> {
1133 type Inner = T;
1134
1135 fn into_inner(self) -> Self::Inner {
1136 self.op.into_inner()
1137 }
1138}
1139
1140unsafe impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> OpCode for RecvMsg<T, C, S> {
1141 fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
1142 self.as_mut().set_msg();
1143 let this = self.project();
1144 opcode::RecvMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
1145 .flags(*this.flags as _)
1146 .build()
1147 .into()
1148 }
1149}
1150
1151unsafe impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> OpCode for SendMsg<T, C, S> {
1152 fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
1153 self.as_mut().set_msg();
1154 let this = self.project();
1155 opcode::SendMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
1156 .flags(*this.flags as _)
1157 .build()
1158 .into()
1159 }
1160}
1161
1162pin_project! {
1163 pub struct SendMsgZc<T: IoVectoredBuf, C: IoBuf, S> {
1165 #[pin]
1166 pub(crate) op: SendMsg<T, C, S>,
1167 pub(crate) res: Option<BufResult<usize, crate::Extra>>,
1168 _p: PhantomPinned,
1169 }
1170}
1171
1172impl<T: IoVectoredBuf, C: IoBuf, S> SendMsgZc<T, C, S> {
1173 pub fn new(fd: S, buffer: T, control: C, addr: Option<SockAddr>, flags: i32) -> Self {
1175 Self {
1176 op: SendMsg::new(fd, buffer, control, addr, flags),
1177 res: None,
1178 _p: PhantomPinned,
1179 }
1180 }
1181}
1182
1183unsafe impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> OpCode for SendMsgZc<T, C, S> {
1184 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
1185 let mut this = self.project();
1186 this.op.as_mut().set_msg();
1187 let op = this.op.project();
1188 opcode::SendMsgZc::new(Fd(op.fd.as_fd().as_raw_fd()), op.msg)
1189 .flags(*op.flags as _)
1190 .build()
1191 .into()
1192 }
1193
1194 fn create_entry_fallback(self: Pin<&mut Self>) -> OpEntry {
1195 self.project().op.create_entry()
1196 }
1197
1198 unsafe fn push_multishot(self: Pin<&mut Self>, res: io::Result<usize>, extra: crate::Extra) {
1199 self.project().res.replace(BufResult(res, extra));
1200 }
1201
1202 fn pop_multishot(self: Pin<&mut Self>) -> Option<BufResult<usize, crate::Extra>> {
1203 self.project().res.take()
1204 }
1205}
1206impl<T: IoVectoredBuf, C: IoBuf, S> IntoInner for SendMsgZc<T, C, S> {
1207 type Inner = (T, C);
1208
1209 fn into_inner(self) -> Self::Inner {
1210 self.op.into_inner()
1211 }
1212}
1213
1214unsafe impl<S: AsFd> OpCode for PollOnce<S> {
1215 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
1216 let flags = match self.interest {
1217 Interest::Readable => libc::POLLIN,
1218 Interest::Writable => libc::POLLOUT,
1219 };
1220 opcode::PollAdd::new(Fd(self.fd.as_fd().as_raw_fd()), flags as _)
1221 .build()
1222 .into()
1223 }
1224}
1225
1226unsafe impl<S1: AsFd, S2: AsFd> OpCode for Splice<S1, S2> {
1227 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
1228 opcode::Splice::new(
1229 Fd(self.fd_in.as_fd().as_raw_fd()),
1230 self.offset_in,
1231 Fd(self.fd_out.as_fd().as_raw_fd()),
1232 self.offset_out,
1233 self.len.try_into().unwrap_or(u32::MAX),
1234 )
1235 .flags(self.flags)
1236 .build()
1237 .into()
1238 }
1239
1240 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
1241 let mut offset_in = self.offset_in;
1242 let mut offset_out = self.offset_out;
1243 let offset_in_ptr = if offset_in < 0 {
1244 std::ptr::null_mut()
1245 } else {
1246 &mut offset_in
1247 };
1248 let offset_out_ptr = if offset_out < 0 {
1249 std::ptr::null_mut()
1250 } else {
1251 &mut offset_out
1252 };
1253 Ok(syscall!(libc::splice(
1254 self.fd_in.as_fd().as_raw_fd(),
1255 offset_in_ptr,
1256 self.fd_out.as_fd().as_raw_fd(),
1257 offset_out_ptr,
1258 self.len,
1259 self.flags,
1260 ))? as _)
1261 }
1262}
1263
1264mod buf_ring {
1265 use std::{
1266 collections::VecDeque,
1267 io,
1268 marker::PhantomPinned,
1269 os::fd::{AsFd, AsRawFd},
1270 pin::Pin,
1271 ptr,
1272 };
1273
1274 use compio_buf::BufResult;
1275 use io_uring::{opcode, squeue::Flags, types::Fd};
1276 use pin_project_lite::pin_project;
1277 use socket2::{SockAddr, SockAddrStorage, socklen_t};
1278
1279 use super::{Extra, OpCode};
1280 use crate::{
1281 BorrowedBuffer, BufferPool, IoUringBufferPool, IoUringOwnedBuffer, OpEntry, TakeBuffer,
1282 };
1283
1284 pub(crate) fn take_buffer(
1285 buffer_pool: &BufferPool,
1286 result: io::Result<usize>,
1287 buffer_id: u16,
1288 ) -> io::Result<BorrowedBuffer<'_>> {
1289 #[cfg(fusion)]
1290 let buffer_pool = buffer_pool.as_io_uring();
1291 let result = result.inspect_err(|_| buffer_pool.reuse_buffer(buffer_id))?;
1292 let buffer = unsafe { buffer_pool.get_buffer(buffer_id, result) };
1294 let res = unsafe { buffer_pool.create_proxy(buffer, result) }?;
1295 #[cfg(fusion)]
1296 let res = BorrowedBuffer::new_io_uring(res);
1297 Ok(res)
1298 }
1299
1300 pin_project! {
1301 pub struct ReadManagedAt<S> {
1303 pub(crate) fd: S,
1304 pub(crate) offset: u64,
1305 buffer_group: u16,
1306 len: u32,
1307 pool: IoUringBufferPool,
1308 buffer: Option<IoUringOwnedBuffer>,
1309 _p: PhantomPinned,
1310 }
1311 }
1312
1313 impl<S> ReadManagedAt<S> {
1314 pub fn new(fd: S, offset: u64, buffer_pool: &BufferPool, len: usize) -> io::Result<Self> {
1316 #[cfg(fusion)]
1317 let buffer_pool = buffer_pool.as_io_uring();
1318 Ok(Self {
1319 fd,
1320 offset,
1321 buffer_group: buffer_pool.buffer_group(),
1322 len: len.try_into().map_err(|_| {
1323 io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
1324 })?,
1325 pool: buffer_pool.clone(),
1326 buffer: None,
1327 _p: PhantomPinned,
1328 })
1329 }
1330 }
1331
1332 unsafe impl<S: AsFd> OpCode for ReadManagedAt<S> {
1333 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
1334 let fd = Fd(self.fd.as_fd().as_raw_fd());
1335 let offset = self.offset;
1336 opcode::Read::new(fd, ptr::null_mut(), self.len)
1337 .offset(offset)
1338 .buf_group(self.buffer_group)
1339 .build()
1340 .flags(Flags::BUFFER_SELECT)
1341 .into()
1342 }
1343
1344 unsafe fn set_result(self: Pin<&mut Self>, res: &io::Result<usize>, extra: &Extra) {
1345 if let Ok(buffer_id) = extra.buffer_id() {
1346 let this = self.project();
1347 this.buffer.replace(unsafe {
1348 this.pool.get_buffer(buffer_id, *res.as_ref().unwrap_or(&0))
1349 });
1350 }
1351 }
1352 }
1353
1354 impl<S> TakeBuffer for ReadManagedAt<S> {
1355 type Buffer<'a> = BorrowedBuffer<'a>;
1356 type BufferPool = BufferPool;
1357
1358 fn take_buffer(
1359 mut self,
1360 buffer_pool: &Self::BufferPool,
1361 result: io::Result<usize>,
1362 buffer_id: u16,
1363 ) -> io::Result<Self::Buffer<'_>> {
1364 self.buffer.take().map(|buf| buf.leak());
1365 take_buffer(buffer_pool, result, buffer_id)
1366 }
1367 }
1368
1369 pin_project! {
1370 pub struct ReadManaged<S> {
1372 fd: S,
1373 buffer_group: u16,
1374 len: u32,
1375 pool: IoUringBufferPool,
1376 buffer: Option<IoUringOwnedBuffer>,
1377 _p: PhantomPinned,
1378 }
1379 }
1380
1381 impl<S> ReadManaged<S> {
1382 pub fn new(fd: S, buffer_pool: &BufferPool, len: usize) -> io::Result<Self> {
1384 #[cfg(fusion)]
1385 let buffer_pool = buffer_pool.as_io_uring();
1386 Ok(Self {
1387 fd,
1388 buffer_group: buffer_pool.buffer_group(),
1389 len: len.try_into().map_err(|_| {
1390 io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
1391 })?,
1392 pool: buffer_pool.clone(),
1393 buffer: None,
1394 _p: PhantomPinned,
1395 })
1396 }
1397 }
1398
1399 unsafe impl<S: AsFd> OpCode for ReadManaged<S> {
1400 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
1401 let fd = self.fd.as_fd().as_raw_fd();
1402 opcode::Read::new(Fd(fd), ptr::null_mut(), self.len)
1403 .buf_group(self.buffer_group)
1404 .offset(u64::MAX)
1405 .build()
1406 .flags(Flags::BUFFER_SELECT)
1407 .into()
1408 }
1409
1410 unsafe fn set_result(self: Pin<&mut Self>, res: &io::Result<usize>, extra: &Extra) {
1411 if let Ok(buffer_id) = extra.buffer_id() {
1412 let this = self.project();
1413 this.buffer.replace(unsafe {
1414 this.pool.get_buffer(buffer_id, *res.as_ref().unwrap_or(&0))
1415 });
1416 }
1417 }
1418 }
1419
1420 impl<S> TakeBuffer for ReadManaged<S> {
1421 type Buffer<'a> = BorrowedBuffer<'a>;
1422 type BufferPool = BufferPool;
1423
1424 fn take_buffer(
1425 mut self,
1426 buffer_pool: &Self::BufferPool,
1427 result: io::Result<usize>,
1428 buffer_id: u16,
1429 ) -> io::Result<Self::Buffer<'_>> {
1430 self.buffer.take().map(|buf| buf.leak());
1431 take_buffer(buffer_pool, result, buffer_id)
1432 }
1433 }
1434
1435 pin_project! {
1436 pub struct RecvManaged<S> {
1438 fd: S,
1439 buffer_group: u16,
1440 len: u32,
1441 flags: i32,
1442 pool: IoUringBufferPool,
1443 buffer: Option<IoUringOwnedBuffer>,
1444 _p: PhantomPinned,
1445 }
1446 }
1447
1448 impl<S> RecvManaged<S> {
1449 pub fn new(fd: S, buffer_pool: &BufferPool, len: usize, flags: i32) -> io::Result<Self> {
1451 #[cfg(fusion)]
1452 let buffer_pool = buffer_pool.as_io_uring();
1453 Ok(Self {
1454 fd,
1455 buffer_group: buffer_pool.buffer_group(),
1456 len: len.try_into().map_err(|_| {
1457 io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
1458 })?,
1459 flags,
1460 pool: buffer_pool.clone(),
1461 buffer: None,
1462 _p: PhantomPinned,
1463 })
1464 }
1465 }
1466
1467 unsafe impl<S: AsFd> OpCode for RecvManaged<S> {
1468 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
1469 let fd = self.fd.as_fd().as_raw_fd();
1470 opcode::Recv::new(Fd(fd), ptr::null_mut(), self.len)
1471 .flags(self.flags)
1472 .buf_group(self.buffer_group)
1473 .build()
1474 .flags(Flags::BUFFER_SELECT)
1475 .into()
1476 }
1477
1478 unsafe fn set_result(self: Pin<&mut Self>, res: &io::Result<usize>, extra: &Extra) {
1479 if let Ok(buffer_id) = extra.buffer_id() {
1480 let this = self.project();
1481 this.buffer.replace(unsafe {
1482 this.pool.get_buffer(buffer_id, *res.as_ref().unwrap_or(&0))
1483 });
1484 }
1485 }
1486 }
1487
1488 impl<S> TakeBuffer for RecvManaged<S> {
1489 type Buffer<'a> = BorrowedBuffer<'a>;
1490 type BufferPool = BufferPool;
1491
1492 fn take_buffer(
1493 mut self,
1494 buffer_pool: &Self::BufferPool,
1495 result: io::Result<usize>,
1496 buffer_id: u16,
1497 ) -> io::Result<Self::Buffer<'_>> {
1498 self.buffer.take().map(|buf| buf.leak());
1499 take_buffer(buffer_pool, result, buffer_id)
1500 }
1501 }
1502
1503 pin_project! {
1504 pub struct RecvFromManaged<S> {
1506 fd: S,
1507 buffer_group: u16,
1508 flags: i32,
1509 addr: SockAddrStorage,
1510 addr_len: socklen_t,
1511 iovec: libc::iovec,
1512 msg: libc::msghdr,
1513 pool: IoUringBufferPool,
1514 buffer: Option<IoUringOwnedBuffer>,
1515 _p: PhantomPinned,
1516 }
1517 }
1518
1519 impl<S> RecvFromManaged<S> {
1520 pub fn new(fd: S, buffer_pool: &BufferPool, len: usize, flags: i32) -> io::Result<Self> {
1522 #[cfg(fusion)]
1523 let buffer_pool = buffer_pool.as_io_uring();
1524 let len: u32 = len.try_into().map_err(|_| {
1525 io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
1526 })?;
1527 let addr = SockAddrStorage::zeroed();
1528 Ok(Self {
1529 fd,
1530 buffer_group: buffer_pool.buffer_group(),
1531 flags,
1532 addr_len: addr.size_of() as _,
1533 addr,
1534 iovec: libc::iovec {
1535 iov_base: ptr::null_mut(),
1536 iov_len: len as _,
1537 },
1538 msg: unsafe { std::mem::zeroed() },
1539 pool: buffer_pool.clone(),
1540 buffer: None,
1541 _p: PhantomPinned,
1542 })
1543 }
1544 }
1545
1546 unsafe impl<S: AsFd> OpCode for RecvFromManaged<S> {
1547 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
1548 let this = self.project();
1549 this.msg.msg_name = this.addr as *mut _ as _;
1550 this.msg.msg_namelen = *this.addr_len;
1551 this.msg.msg_iov = this.iovec as *const _ as *mut _;
1552 this.msg.msg_iovlen = 1;
1553 opcode::RecvMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
1554 .flags(*this.flags as _)
1555 .buf_group(*this.buffer_group)
1556 .build()
1557 .flags(Flags::BUFFER_SELECT)
1558 .into()
1559 }
1560
1561 unsafe fn set_result(self: Pin<&mut Self>, res: &io::Result<usize>, extra: &Extra) {
1562 if let Ok(buffer_id) = extra.buffer_id() {
1563 let this = self.project();
1564 this.buffer.replace(unsafe {
1565 this.pool.get_buffer(buffer_id, *res.as_ref().unwrap_or(&0))
1566 });
1567 }
1568 }
1569 }
1570
1571 impl<S> TakeBuffer for RecvFromManaged<S> {
1572 type Buffer<'a> = (BorrowedBuffer<'a>, Option<SockAddr>);
1573 type BufferPool = BufferPool;
1574
1575 fn take_buffer(
1576 mut self,
1577 buffer_pool: &Self::BufferPool,
1578 result: io::Result<usize>,
1579 buffer_id: u16,
1580 ) -> io::Result<Self::Buffer<'_>> {
1581 #[cfg(fusion)]
1582 let buffer_pool = buffer_pool.as_io_uring();
1583 let result = result.inspect_err(|_| buffer_pool.reuse_buffer(buffer_id))?;
1584 let addr =
1585 (self.addr_len > 0).then(|| unsafe { SockAddr::new(self.addr, self.addr_len) });
1586 let buffer = self
1588 .buffer
1589 .take()
1590 .unwrap_or_else(|| unsafe { buffer_pool.get_buffer(buffer_id, result) });
1591 let buffer = unsafe { buffer_pool.create_proxy(buffer, result) }?;
1592 #[cfg(fusion)]
1593 let buffer = BorrowedBuffer::new_io_uring(buffer);
1594 Ok((buffer, addr))
1595 }
1596 }
1597
1598 struct MultishotResult {
1599 result: io::Result<usize>,
1600 extra: Extra,
1601 buffer: Option<IoUringOwnedBuffer>,
1602 }
1603
1604 impl MultishotResult {
1605 pub fn new(result: io::Result<usize>, extra: Extra, pool: &IoUringBufferPool) -> Self {
1606 let buffer = extra
1607 .buffer_id()
1608 .map(|buffer_id| unsafe {
1609 pool.get_buffer(buffer_id, *result.as_ref().unwrap_or(&0))
1610 })
1611 .ok();
1612 Self {
1613 result,
1614 extra,
1615 buffer,
1616 }
1617 }
1618
1619 pub fn leak(&mut self) {
1620 self.buffer.take().map(|buf| buf.leak());
1621 }
1622 }
1623
1624 pin_project! {
1625 pub struct ReadMultiAt<S> {
1627 #[pin]
1628 inner: ReadManagedAt<S>,
1629 multishots: VecDeque<MultishotResult>
1630 }
1631 }
1632
1633 impl<S> ReadMultiAt<S> {
1634 pub fn new(fd: S, offset: u64, buffer_pool: &BufferPool, len: usize) -> io::Result<Self> {
1636 Ok(Self {
1637 inner: ReadManagedAt::new(fd, offset, buffer_pool, len)?,
1638 multishots: VecDeque::new(),
1639 })
1640 }
1641 }
1642
1643 unsafe impl<S: AsFd> OpCode for ReadMultiAt<S> {
1644 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
1645 let fd = self.inner.fd.as_fd().as_raw_fd();
1646 opcode::ReadMulti::new(Fd(fd), self.inner.len, self.inner.buffer_group)
1647 .offset(self.inner.offset)
1648 .build()
1649 .into()
1650 }
1651
1652 fn create_entry_fallback(self: Pin<&mut Self>) -> OpEntry {
1653 self.project().inner.create_entry()
1654 }
1655
1656 unsafe fn set_result(self: Pin<&mut Self>, res: &io::Result<usize>, extra: &crate::Extra) {
1657 unsafe { self.project().inner.set_result(res, extra) }
1658 }
1659
1660 unsafe fn push_multishot(
1661 self: Pin<&mut Self>,
1662 res: io::Result<usize>,
1663 extra: crate::Extra,
1664 ) {
1665 let this = self.project();
1666 this.multishots
1667 .push_back(MultishotResult::new(res, extra, &this.inner.pool));
1668 }
1669
1670 fn pop_multishot(self: Pin<&mut Self>) -> Option<BufResult<usize, crate::sys::Extra>> {
1671 self.project().multishots.pop_front().map(|mut proxy| {
1672 proxy.leak();
1673 BufResult(proxy.result, proxy.extra)
1674 })
1675 }
1676 }
1677
1678 impl<S> TakeBuffer for ReadMultiAt<S> {
1679 type Buffer<'a> = BorrowedBuffer<'a>;
1680 type BufferPool = BufferPool;
1681
1682 fn take_buffer(
1683 self,
1684 buffer_pool: &Self::BufferPool,
1685 result: io::Result<usize>,
1686 buffer_id: u16,
1687 ) -> io::Result<Self::Buffer<'_>> {
1688 self.inner.take_buffer(buffer_pool, result, buffer_id)
1689 }
1690 }
1691
1692 pin_project! {
1693 pub struct ReadMulti<S> {
1695 #[pin]
1696 inner: ReadManaged<S>,
1697 multishots: VecDeque<MultishotResult>
1698 }
1699 }
1700
1701 impl<S> ReadMulti<S> {
1702 pub fn new(fd: S, buffer_pool: &BufferPool, len: usize) -> io::Result<Self> {
1704 Ok(Self {
1705 inner: ReadManaged::new(fd, buffer_pool, len)?,
1706 multishots: VecDeque::new(),
1707 })
1708 }
1709 }
1710
1711 unsafe impl<S: AsFd> OpCode for ReadMulti<S> {
1712 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
1713 let fd = self.inner.fd.as_fd().as_raw_fd();
1714 opcode::ReadMulti::new(Fd(fd), self.inner.len, self.inner.buffer_group)
1715 .offset(u64::MAX)
1716 .build()
1717 .into()
1718 }
1719
1720 fn create_entry_fallback(self: Pin<&mut Self>) -> OpEntry {
1721 self.project().inner.create_entry()
1722 }
1723
1724 unsafe fn set_result(self: Pin<&mut Self>, res: &io::Result<usize>, extra: &crate::Extra) {
1725 unsafe { self.project().inner.set_result(res, extra) }
1726 }
1727
1728 unsafe fn push_multishot(
1729 self: Pin<&mut Self>,
1730 res: io::Result<usize>,
1731 extra: crate::Extra,
1732 ) {
1733 let this = self.project();
1734 this.multishots
1735 .push_back(MultishotResult::new(res, extra, &this.inner.pool));
1736 }
1737
1738 fn pop_multishot(self: Pin<&mut Self>) -> Option<BufResult<usize, crate::sys::Extra>> {
1739 self.project().multishots.pop_front().map(|mut proxy| {
1740 proxy.leak();
1741 BufResult(proxy.result, proxy.extra)
1742 })
1743 }
1744 }
1745
1746 impl<S> TakeBuffer for ReadMulti<S> {
1747 type Buffer<'a> = BorrowedBuffer<'a>;
1748 type BufferPool = BufferPool;
1749
1750 fn take_buffer(
1751 self,
1752 buffer_pool: &Self::BufferPool,
1753 result: io::Result<usize>,
1754 buffer_id: u16,
1755 ) -> io::Result<Self::Buffer<'_>> {
1756 self.inner.take_buffer(buffer_pool, result, buffer_id)
1757 }
1758 }
1759
1760 pin_project! {
1761 pub struct RecvMulti<S> {
1763 #[pin]
1764 inner: RecvManaged<S>,
1765 multishots: VecDeque<MultishotResult>
1766 }
1767 }
1768
1769 impl<S> RecvMulti<S> {
1770 pub fn new(fd: S, buffer_pool: &BufferPool, len: usize, flags: i32) -> io::Result<Self> {
1772 Ok(Self {
1773 inner: RecvManaged::new(fd, buffer_pool, len, flags)?,
1774 multishots: VecDeque::new(),
1775 })
1776 }
1777 }
1778
1779 unsafe impl<S: AsFd> OpCode for RecvMulti<S> {
1780 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
1781 let fd = self.inner.fd.as_fd().as_raw_fd();
1782 opcode::RecvMulti::new(Fd(fd), self.inner.buffer_group)
1783 .flags(self.inner.flags)
1784 .build()
1785 .into()
1786 }
1787
1788 fn create_entry_fallback(self: Pin<&mut Self>) -> OpEntry {
1789 self.project().inner.create_entry()
1790 }
1791
1792 unsafe fn set_result(self: Pin<&mut Self>, res: &io::Result<usize>, extra: &crate::Extra) {
1793 unsafe { self.project().inner.set_result(res, extra) }
1794 }
1795
1796 unsafe fn push_multishot(
1797 self: Pin<&mut Self>,
1798 res: io::Result<usize>,
1799 extra: crate::Extra,
1800 ) {
1801 let this = self.project();
1802 this.multishots
1803 .push_back(MultishotResult::new(res, extra, &this.inner.pool));
1804 }
1805
1806 fn pop_multishot(self: Pin<&mut Self>) -> Option<BufResult<usize, crate::sys::Extra>> {
1807 self.project().multishots.pop_front().map(|mut proxy| {
1808 proxy.leak();
1809 BufResult(proxy.result, proxy.extra)
1810 })
1811 }
1812 }
1813
1814 impl<S> TakeBuffer for RecvMulti<S> {
1815 type Buffer<'a> = BorrowedBuffer<'a>;
1816 type BufferPool = BufferPool;
1817
1818 fn take_buffer(
1819 self,
1820 buffer_pool: &Self::BufferPool,
1821 result: io::Result<usize>,
1822 buffer_id: u16,
1823 ) -> io::Result<Self::Buffer<'_>> {
1824 self.inner.take_buffer(buffer_pool, result, buffer_id)
1825 }
1826 }
1827}
1828
1829pub(crate) use buf_ring::take_buffer;
1830pub use buf_ring::{
1831 ReadManaged, ReadManagedAt, ReadMulti, ReadMultiAt, RecvFromManaged, RecvManaged, RecvMulti,
1832};