1use std::{io, marker::PhantomPinned, mem::ManuallyDrop, net::Shutdown};
8
9use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, SetLen};
10use pin_project_lite::pin_project;
11use socket2::{SockAddr, SockAddrStorage, socklen_t};
12
13#[cfg(linux_all)]
14pub use crate::sys::op::Splice;
15pub use crate::sys::op::{
16 Accept, Recv, RecvFrom, RecvFromVectored, RecvMsg, RecvVectored, Send, SendMsg, SendMsgZc,
17 SendTo, SendToVectored, SendToVectoredZc, SendToZc, SendVectored, SendVectoredZc, SendZc,
18};
19#[cfg(unix)]
20pub use crate::sys::op::{
21 AcceptMulti, CreateDir, CreateSocket, CurrentDir, FileStat, HardLink, Interest, OpenFile,
22 PathStat, PollOnce, ReadVectored, ReadVectoredAt, Rename, Stat, Symlink, TruncateFile, Unlink,
23 WriteVectored, WriteVectoredAt,
24};
25#[cfg(windows)]
26pub use crate::sys::op::{ConnectNamedPipe, DeviceIoControl};
27#[cfg(io_uring)]
28pub use crate::sys::op::{
29 ReadManaged, ReadManagedAt, ReadMulti, ReadMultiAt, RecvFromManaged, RecvManaged, RecvMulti,
30};
31use crate::{Extra, OwnedFd, SharedFd, TakeBuffer, sys::aio::*};
32
33pub trait BufResultExt {
35 unsafe fn map_advanced(self) -> Self;
41}
42
43pub trait VecBufResultExt {
45 unsafe fn map_vec_advanced(self) -> Self;
51}
52
53impl<T: SetLen + IoBuf> BufResultExt for BufResult<usize, T> {
54 unsafe fn map_advanced(self) -> Self {
55 unsafe {
56 self.map_res(|res| (res, ()))
57 .map_advanced()
58 .map_res(|(res, _)| res)
59 }
60 }
61}
62
63impl<T: SetLen + IoVectoredBuf> VecBufResultExt for BufResult<usize, T> {
64 unsafe fn map_vec_advanced(self) -> Self {
65 unsafe {
66 self.map_res(|res| (res, ()))
67 .map_vec_advanced()
68 .map_res(|(res, _)| res)
69 }
70 }
71}
72
73impl<T: SetLen + IoBuf, O> BufResultExt for BufResult<(usize, O), T> {
74 unsafe fn map_advanced(self) -> Self {
75 self.map(|(init, obj), mut buffer| {
76 unsafe {
77 buffer.advance_to(init);
78 }
79 ((init, obj), buffer)
80 })
81 }
82}
83
84impl<T: SetLen + IoVectoredBuf, O> VecBufResultExt for BufResult<(usize, O), T> {
85 unsafe fn map_vec_advanced(self) -> Self {
86 self.map(|(init, obj), mut buffer| {
87 unsafe {
88 buffer.advance_vec_to(init);
89 }
90 ((init, obj), buffer)
91 })
92 }
93}
94
95impl<T: SetLen + IoBuf, C: SetLen + IoBuf, O> BufResultExt
96 for BufResult<(usize, usize, O), (T, C)>
97{
98 unsafe fn map_advanced(self) -> Self {
99 self.map(
100 |(init_buffer, init_control, obj), (mut buffer, mut control)| {
101 unsafe {
102 buffer.advance_to(init_buffer);
103 control.advance_to(init_control);
104 }
105 ((init_buffer, init_control, obj), (buffer, control))
106 },
107 )
108 }
109}
110
111impl<T: SetLen + IoVectoredBuf, C: SetLen + IoBuf, O> VecBufResultExt
112 for BufResult<(usize, usize, O), (T, C)>
113{
114 unsafe fn map_vec_advanced(self) -> Self {
115 self.map(
116 |(init_buffer, init_control, obj), (mut buffer, mut control)| {
117 unsafe {
118 buffer.advance_vec_to(init_buffer);
119 control.advance_to(init_control);
120 }
121 ((init_buffer, init_control, obj), (buffer, control))
122 },
123 )
124 }
125}
126
127pub trait RecvResultExt {
129 type RecvResult;
131
132 fn map_addr(self) -> Self::RecvResult;
134}
135
136impl<T> RecvResultExt for BufResult<usize, (T, SockAddrStorage, socklen_t)> {
137 type RecvResult = BufResult<(usize, Option<SockAddr>), T>;
138
139 fn map_addr(self) -> Self::RecvResult {
140 self.map_buffer(|(buffer, addr_buffer, addr_size)| (buffer, addr_buffer, addr_size, 0))
141 .map_addr()
142 .map_res(|(res, _, addr)| (res, addr))
143 }
144}
145
146impl<T> RecvResultExt for BufResult<usize, (T, SockAddrStorage, socklen_t, usize)> {
147 type RecvResult = BufResult<(usize, usize, Option<SockAddr>), T>;
148
149 fn map_addr(self) -> Self::RecvResult {
150 self.map2(
151 |res, (buffer, addr_buffer, addr_size, len)| {
152 let addr =
153 (addr_size > 0).then(|| unsafe { SockAddr::new(addr_buffer, addr_size) });
154 ((res, len, addr), buffer)
155 },
156 |(buffer, ..)| buffer,
157 )
158 }
159}
160
161pub trait ResultTakeBuffer {
163 type BufferPool;
165 type Buffer<'a>;
167
168 fn take_buffer(self, pool: &Self::BufferPool) -> io::Result<Self::Buffer<'_>>;
170}
171
172impl<T: TakeBuffer> ResultTakeBuffer for (BufResult<usize, T>, Extra) {
173 type Buffer<'a> = T::Buffer<'a>;
174 type BufferPool = T::BufferPool;
175
176 fn take_buffer(self, pool: &Self::BufferPool) -> io::Result<Self::Buffer<'_>> {
177 let (BufResult(result, op), extra) = self;
178 op.take_buffer(pool, result, extra.buffer_id()?)
179 }
180}
181
182impl ResultTakeBuffer for BufResult<usize, Extra> {
183 type Buffer<'a> = crate::BorrowedBuffer<'a>;
184 type BufferPool = crate::BufferPool;
185
186 fn take_buffer(self, pool: &Self::BufferPool) -> io::Result<Self::Buffer<'_>> {
187 #[cfg(io_uring)]
188 {
189 let BufResult(result, extra) = self;
190 crate::sys::take_buffer(pool, result, extra.buffer_id()?)
191 }
192 #[cfg(not(io_uring))]
193 {
194 let _pool = pool;
195 unreachable!("take_buffer should not be called for non-io-uring ops")
196 }
197 }
198}
199
200pin_project! {
201 pub struct Asyncify<F, D> {
203 pub(crate) f: Option<F>,
204 pub(crate) data: Option<D>,
205 _p: PhantomPinned,
206 }
207}
208
209impl<F, D> Asyncify<F, D> {
210 pub fn new(f: F) -> Self {
212 Self {
213 f: Some(f),
214 data: None,
215 _p: PhantomPinned,
216 }
217 }
218}
219
220impl<F, D> IntoInner for Asyncify<F, D> {
221 type Inner = D;
222
223 fn into_inner(mut self) -> Self::Inner {
224 self.data.take().expect("the data should not be None")
225 }
226}
227
228pin_project! {
229 pub struct AsyncifyFd<S, F, D> {
231 pub(crate) fd: SharedFd<S>,
232 pub(crate) f: Option<F>,
233 pub(crate) data: Option<D>,
234 _p: PhantomPinned,
235 }
236}
237
238impl<S, F, D> AsyncifyFd<S, F, D> {
239 pub fn new(fd: SharedFd<S>, f: F) -> Self {
241 Self {
242 fd,
243 f: Some(f),
244 data: None,
245 _p: PhantomPinned,
246 }
247 }
248}
249
250impl<S, F, D> IntoInner for AsyncifyFd<S, F, D> {
251 type Inner = D;
252
253 fn into_inner(mut self) -> Self::Inner {
254 self.data.take().expect("the data should not be None")
255 }
256}
257
258pin_project! {
259 pub struct AsyncifyFd2<S1, S2, F, D> {
261 pub(crate) fd1: SharedFd<S1>,
262 pub(crate) fd2: SharedFd<S2>,
263 pub(crate) f: Option<F>,
264 pub(crate) data: Option<D>,
265 _p: PhantomPinned,
266 }
267}
268
269impl<S1, S2, F, D> AsyncifyFd2<S1, S2, F, D> {
270 pub fn new(fd1: SharedFd<S1>, fd2: SharedFd<S2>, f: F) -> Self {
272 Self {
273 fd1,
274 fd2,
275 f: Some(f),
276 data: None,
277 _p: PhantomPinned,
278 }
279 }
280}
281
282impl<S1, S2, F, D> IntoInner for AsyncifyFd2<S1, S2, F, D> {
283 type Inner = D;
284
285 fn into_inner(mut self) -> Self::Inner {
286 self.data.take().expect("the data should not be None")
287 }
288}
289
290pub struct CloseFile {
292 pub(crate) fd: ManuallyDrop<OwnedFd>,
293}
294
295impl CloseFile {
296 pub fn new(fd: OwnedFd) -> Self {
298 Self {
299 fd: ManuallyDrop::new(fd),
300 }
301 }
302}
303
304pin_project! {
305 #[derive(Debug)]
307 pub struct ReadAt<T: IoBufMut, S> {
308 pub(crate) fd: S,
309 pub(crate) offset: u64,
310 #[pin]
311 pub(crate) buffer: T,
312 pub(crate) aiocb: aiocb,
313 _p: PhantomPinned,
314 }
315}
316
317impl<T: IoBufMut, S> ReadAt<T, S> {
318 pub fn new(fd: S, offset: u64, buffer: T) -> Self {
320 Self {
321 fd,
322 offset,
323 buffer,
324 aiocb: new_aiocb(),
325 _p: PhantomPinned,
326 }
327 }
328}
329
330impl<T: IoBufMut, S> IntoInner for ReadAt<T, S> {
331 type Inner = T;
332
333 fn into_inner(self) -> Self::Inner {
334 self.buffer
335 }
336}
337
338pin_project! {
339 #[derive(Debug)]
341 pub struct WriteAt<T: IoBuf, S> {
342 pub(crate) fd: S,
343 pub(crate) offset: u64,
344 #[pin]
345 pub(crate) buffer: T,
346 pub(crate) aiocb: aiocb,
347 _p: PhantomPinned,
348 }
349}
350
351impl<T: IoBuf, S> WriteAt<T, S> {
352 pub fn new(fd: S, offset: u64, buffer: T) -> Self {
354 Self {
355 fd,
356 offset,
357 buffer,
358 aiocb: new_aiocb(),
359 _p: PhantomPinned,
360 }
361 }
362}
363
364impl<T: IoBuf, S> IntoInner for WriteAt<T, S> {
365 type Inner = T;
366
367 fn into_inner(self) -> Self::Inner {
368 self.buffer
369 }
370}
371
372pin_project! {
373 pub struct Read<T: IoBufMut, S> {
375 pub(crate) fd: S,
376 #[pin]
377 pub(crate) buffer: T,
378 _p: PhantomPinned,
379 }
380}
381
382impl<T: IoBufMut, S> Read<T, S> {
383 pub fn new(fd: S, buffer: T) -> Self {
385 Self {
386 fd,
387 buffer,
388 _p: PhantomPinned,
389 }
390 }
391}
392
393impl<T: IoBufMut, S> IntoInner for Read<T, S> {
394 type Inner = T;
395
396 fn into_inner(self) -> Self::Inner {
397 self.buffer
398 }
399}
400
401pub struct Write<T: IoBuf, S> {
403 pub(crate) fd: S,
404 pub(crate) buffer: T,
405 _p: PhantomPinned,
406}
407
408impl<T: IoBuf, S> Write<T, S> {
409 pub fn new(fd: S, buffer: T) -> Self {
411 Self {
412 fd,
413 buffer,
414 _p: PhantomPinned,
415 }
416 }
417}
418
419impl<T: IoBuf, S> IntoInner for Write<T, S> {
420 type Inner = T;
421
422 fn into_inner(self) -> Self::Inner {
423 self.buffer
424 }
425}
426
427pin_project! {
428 pub struct Sync<S> {
430 pub(crate) fd: S,
431 pub(crate) datasync: bool,
432 pub(crate) aiocb: aiocb,
433 }
434}
435
436impl<S> Sync<S> {
437 pub fn new(fd: S, datasync: bool) -> Self {
441 Self {
442 fd,
443 datasync,
444 aiocb: new_aiocb(),
445 }
446 }
447}
448
449pub struct ShutdownSocket<S> {
451 pub(crate) fd: S,
452 pub(crate) how: Shutdown,
453}
454
455impl<S> ShutdownSocket<S> {
456 pub fn new(fd: S, how: Shutdown) -> Self {
458 Self { fd, how }
459 }
460}
461
462pub struct CloseSocket {
464 pub(crate) fd: ManuallyDrop<OwnedFd>,
465}
466
467impl CloseSocket {
468 pub fn new(fd: OwnedFd) -> Self {
470 Self {
471 fd: ManuallyDrop::new(fd),
472 }
473 }
474}
475
476pub struct Connect<S> {
478 pub(crate) fd: S,
479 pub(crate) addr: SockAddr,
480}
481
482impl<S> Connect<S> {
483 pub fn new(fd: S, addr: SockAddr) -> Self {
485 Self { fd, addr }
486 }
487}
488
489#[cfg(any(not(io_uring), fusion))]
490pub(crate) mod managed {
491 use std::io;
492
493 use compio_buf::IntoInner;
494 use pin_project_lite::pin_project;
495 use socket2::SockAddr;
496
497 use super::{Read, ReadAt, Recv, RecvFrom};
498 use crate::{AsFd, BorrowedBuffer, BufferPool, FallbackOwnedBuffer, TakeBuffer};
499
500 fn take_buffer(
501 slice: FallbackOwnedBuffer,
502 buffer_pool: &BufferPool,
503 result: io::Result<usize>,
504 ) -> io::Result<BorrowedBuffer<'_>> {
505 let result = result?;
506 #[cfg(fusion)]
507 let buffer_pool = buffer_pool.as_poll();
508 let res = unsafe { buffer_pool.create_proxy(slice, result) };
510 #[cfg(fusion)]
511 let res = BorrowedBuffer::new_poll(res);
512 Ok(res)
513 }
514
515 pin_project! {
516 pub struct ReadManagedAt<S> {
518 #[pin]
519 pub(crate) op: ReadAt<FallbackOwnedBuffer, S>,
520 }
521 }
522
523 impl<S> ReadManagedAt<S> {
524 pub fn new(fd: S, offset: u64, pool: &BufferPool, len: usize) -> io::Result<Self> {
526 #[cfg(fusion)]
527 let pool = pool.as_poll();
528 Ok(Self {
529 op: ReadAt::new(fd, offset, pool.get_buffer(len)?),
530 })
531 }
532 }
533
534 impl<S> TakeBuffer for ReadManagedAt<S> {
535 type Buffer<'a> = BorrowedBuffer<'a>;
536 type BufferPool = BufferPool;
537
538 fn take_buffer(
539 self,
540 buffer_pool: &BufferPool,
541 result: io::Result<usize>,
542 _: u16,
543 ) -> io::Result<BorrowedBuffer<'_>> {
544 take_buffer(self.op.into_inner(), buffer_pool, result)
545 }
546 }
547
548 pin_project! {
549 pub struct ReadManaged<S> {
551 #[pin]
552 pub(crate) op: Read<FallbackOwnedBuffer, S>,
553 }
554 }
555
556 impl<S> ReadManaged<S> {
557 pub fn new(fd: S, pool: &BufferPool, len: usize) -> io::Result<Self> {
559 #[cfg(fusion)]
560 let pool = pool.as_poll();
561 Ok(Self {
562 op: Read::new(fd, pool.get_buffer(len)?),
563 })
564 }
565 }
566
567 impl<S> TakeBuffer for ReadManaged<S> {
568 type Buffer<'a> = BorrowedBuffer<'a>;
569 type BufferPool = BufferPool;
570
571 fn take_buffer(
572 self,
573 buffer_pool: &Self::BufferPool,
574 result: io::Result<usize>,
575 _: u16,
576 ) -> io::Result<Self::Buffer<'_>> {
577 take_buffer(self.op.into_inner(), buffer_pool, result)
578 }
579 }
580
581 pin_project! {
582 pub struct RecvManaged<S> {
587 #[pin]
588 pub(crate) op: Recv<FallbackOwnedBuffer, S>,
589 }
590 }
591
592 impl<S> RecvManaged<S> {
593 pub fn new(fd: S, pool: &BufferPool, len: usize, flags: i32) -> io::Result<Self> {
595 #[cfg(fusion)]
596 let pool = pool.as_poll();
597 Ok(Self {
598 op: Recv::new(fd, pool.get_buffer(len)?, flags),
599 })
600 }
601 }
602
603 impl<S> TakeBuffer for RecvManaged<S> {
604 type Buffer<'a> = BorrowedBuffer<'a>;
605 type BufferPool = BufferPool;
606
607 fn take_buffer(
608 self,
609 buffer_pool: &Self::BufferPool,
610 result: io::Result<usize>,
611 _: u16,
612 ) -> io::Result<Self::Buffer<'_>> {
613 take_buffer(self.op.into_inner(), buffer_pool, result)
614 }
615 }
616
617 pin_project! {
618 pub struct RecvFromManaged<S: AsFd> {
620 #[pin]
621 pub(crate) op: RecvFrom<FallbackOwnedBuffer, S>,
622 }
623 }
624
625 impl<S: AsFd> RecvFromManaged<S> {
626 pub fn new(fd: S, pool: &BufferPool, len: usize, flags: i32) -> io::Result<Self> {
628 #[cfg(fusion)]
629 let pool = pool.as_poll();
630 Ok(Self {
631 op: RecvFrom::new(fd, pool.get_buffer(len)?, flags),
632 })
633 }
634 }
635
636 impl<S: AsFd> TakeBuffer for RecvFromManaged<S> {
637 type Buffer<'a> = (BorrowedBuffer<'a>, Option<SockAddr>);
638 type BufferPool = BufferPool;
639
640 fn take_buffer(
641 self,
642 buffer_pool: &Self::BufferPool,
643 result: io::Result<usize>,
644 _: u16,
645 ) -> io::Result<Self::Buffer<'_>> {
646 let result = result?;
647 #[cfg(fusion)]
648 let buffer_pool = buffer_pool.as_poll();
649 let (slice, addr_buffer, addr_size) = self.op.into_inner();
650 let addr = (addr_size > 0).then(|| unsafe { SockAddr::new(addr_buffer, addr_size) });
651 let res = unsafe { buffer_pool.create_proxy(slice, result) };
653 #[cfg(fusion)]
654 let res = BorrowedBuffer::new_poll(res);
655 Ok((res, addr))
656 }
657 }
658
659 pub type ReadMultiAt<S> = ReadManagedAt<S>;
661 pub type ReadMulti<S> = ReadManaged<S>;
663 pub type RecvMulti<S> = RecvManaged<S>;
665}
666
667#[cfg(not(io_uring))]
668pub use managed::{
669 ReadManaged, ReadManagedAt, ReadMulti, ReadMultiAt, RecvFromManaged, RecvManaged, RecvMulti,
670};
671
672bitflags::bitflags! {
673 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
675 pub struct OpCodeFlag: u32 {
676 const Read = 1 << 0;
678 const Readv = 1 << 1;
680 const Write = 1 << 2;
682 const Writev = 1 << 3;
684 const Fsync = 1 << 4;
686 const Accept = 1 << 5;
688 const Connect = 1 << 6;
690 const Recv = 1 << 7;
692 const Send = 1 << 8;
694 const RecvMsg = 1 << 9;
696 const SendMsg = 1 << 10;
698 const AsyncCancel = 1 << 11;
700 const OpenAt = 1 << 12;
702 const Close = 1 << 13;
704 const Splice = 1 << 14;
706 const Shutdown = 1 << 15;
708 const PollAdd = 1 << 16;
710 }
711}
712
713impl OpCodeFlag {
714 pub fn basic() -> Self {
717 OpCodeFlag::Read
718 | OpCodeFlag::Readv
719 | OpCodeFlag::Write
720 | OpCodeFlag::Writev
721 | OpCodeFlag::Fsync
722 | OpCodeFlag::Accept
723 | OpCodeFlag::Connect
724 | OpCodeFlag::Recv
725 | OpCodeFlag::Send
726 | OpCodeFlag::RecvMsg
727 | OpCodeFlag::SendMsg
728 | OpCodeFlag::PollAdd
729 }
730}
731
732#[cfg(io_uring)]
733impl OpCodeFlag {
734 pub(crate) fn get_codes(self) -> impl Iterator<Item = u8> {
735 use io_uring::opcode::*;
736
737 self.iter().map(|flag| match flag {
738 OpCodeFlag::Read => Read::CODE,
739 OpCodeFlag::Readv => Readv::CODE,
740 OpCodeFlag::Write => Write::CODE,
741 OpCodeFlag::Writev => Writev::CODE,
742 OpCodeFlag::Fsync => Fsync::CODE,
743 OpCodeFlag::Accept => Accept::CODE,
744 OpCodeFlag::Connect => Connect::CODE,
745 OpCodeFlag::Recv => Recv::CODE,
746 OpCodeFlag::Send => Send::CODE,
747 OpCodeFlag::RecvMsg => RecvMsg::CODE,
748 OpCodeFlag::SendMsg => SendMsg::CODE,
749 OpCodeFlag::AsyncCancel => AsyncCancel::CODE,
750 OpCodeFlag::OpenAt => OpenAt::CODE,
751 OpCodeFlag::Close => Close::CODE,
752 OpCodeFlag::Splice => Splice::CODE,
753 OpCodeFlag::Shutdown => Shutdown::CODE,
754 OpCodeFlag::PollAdd => PollAdd::CODE,
755 unknown => unreachable!("Unknown OpCodeFlag specified: {unknown:?}"),
756 })
757 }
758}