Skip to main content

compio_runtime/fd/async_fd/
mod.rs

1use std::{io, ops::Deref};
2
3use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut};
4use compio_driver::{
5    AsFd, AsRawFd, BorrowedFd, RawFd, SharedFd, ToSharedFd,
6    op::{BufResultExt, Read, ReadManaged, ResultTakeBuffer, Write},
7};
8use compio_io::{AsyncRead, AsyncReadManaged, AsyncWrite, util::Splittable};
9#[cfg(unix)]
10use {
11    compio_buf::{IoVectoredBuf, IoVectoredBufMut},
12    compio_driver::op::{ReadVectored, WriteVectored},
13};
14
15use crate::{Attacher, BorrowedBuffer, BufferPool};
16
17#[cfg(windows)]
18mod windows;
19
20#[cfg(unix)]
21mod unix;
22
23/// Providing implementations for [`AsyncRead`] and [`AsyncWrite`].
24#[derive(Debug)]
25pub struct AsyncFd<T: AsFd> {
26    inner: Attacher<T>,
27}
28
29impl<T: AsFd> AsyncFd<T> {
30    /// Create [`AsyncFd`] and attach the source to the current runtime.
31    pub fn new(source: T) -> io::Result<Self> {
32        Ok(Self {
33            inner: Attacher::new(source)?,
34        })
35    }
36
37    /// Create [`AsyncFd`] without attaching the source.
38    ///
39    /// # Safety
40    ///
41    /// * The user should handle the attachment correctly.
42    /// * `T` should be an owned fd.
43    pub unsafe fn new_unchecked(source: T) -> Self {
44        Self {
45            inner: unsafe { Attacher::new_unchecked(source) },
46        }
47    }
48}
49
50impl<T: AsFd + 'static> AsyncRead for AsyncFd<T> {
51    #[inline]
52    async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
53        (&*self).read(buf).await
54    }
55
56    #[cfg(unix)]
57    #[inline]
58    async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
59        (&*self).read_vectored(buf).await
60    }
61}
62
63impl<T: AsFd + 'static> AsyncReadManaged for AsyncFd<T> {
64    type Buffer<'a> = BorrowedBuffer<'a>;
65    type BufferPool = BufferPool;
66
67    async fn read_managed<'a>(
68        &mut self,
69        buffer_pool: &'a Self::BufferPool,
70        len: usize,
71    ) -> io::Result<Self::Buffer<'a>> {
72        (&*self).read_managed(buffer_pool, len).await
73    }
74}
75
76impl<T: AsFd + 'static> AsyncReadManaged for &AsyncFd<T> {
77    type Buffer<'a> = BorrowedBuffer<'a>;
78    type BufferPool = BufferPool;
79
80    async fn read_managed<'a>(
81        &mut self,
82        buffer_pool: &'a Self::BufferPool,
83        len: usize,
84    ) -> io::Result<Self::Buffer<'a>> {
85        let fd = self.to_shared_fd();
86        let buffer_pool = buffer_pool.try_inner()?;
87        let op = ReadManaged::new(fd, buffer_pool, len)?;
88        crate::submit(op)
89            .with_extra()
90            .await
91            .take_buffer(buffer_pool)
92    }
93}
94
95impl<T: AsFd + 'static> AsyncRead for &AsyncFd<T> {
96    async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
97        let fd = self.inner.to_shared_fd();
98        let op = Read::new(fd, buf);
99        let res = crate::submit(op).await.into_inner();
100        unsafe { res.map_advanced() }
101    }
102
103    #[cfg(unix)]
104    async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
105        use compio_driver::op::VecBufResultExt;
106
107        let fd = self.inner.to_shared_fd();
108        let op = ReadVectored::new(fd, buf);
109        let res = crate::submit(op).await.into_inner();
110        unsafe { res.map_vec_advanced() }
111    }
112}
113
114impl<T: AsFd + 'static> AsyncWrite for AsyncFd<T> {
115    #[inline]
116    async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B> {
117        (&*self).write(buf).await
118    }
119
120    #[cfg(unix)]
121    #[inline]
122    async fn write_vectored<V: IoVectoredBuf>(&mut self, buf: V) -> BufResult<usize, V> {
123        (&*self).write_vectored(buf).await
124    }
125
126    #[inline]
127    async fn flush(&mut self) -> io::Result<()> {
128        (&*self).flush().await
129    }
130
131    #[inline]
132    async fn shutdown(&mut self) -> io::Result<()> {
133        (&*self).shutdown().await
134    }
135}
136
137impl<T: AsFd + 'static> AsyncWrite for &AsyncFd<T> {
138    async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B> {
139        let fd = self.inner.to_shared_fd();
140        let op = Write::new(fd, buf);
141        crate::submit(op).await.into_inner()
142    }
143
144    #[cfg(unix)]
145    async fn write_vectored<V: IoVectoredBuf>(&mut self, buf: V) -> BufResult<usize, V> {
146        let fd = self.inner.to_shared_fd();
147        let op = WriteVectored::new(fd, buf);
148        crate::submit(op).await.into_inner()
149    }
150
151    async fn flush(&mut self) -> io::Result<()> {
152        Ok(())
153    }
154
155    async fn shutdown(&mut self) -> io::Result<()> {
156        Ok(())
157    }
158}
159
160impl<T: AsFd> IntoInner for AsyncFd<T> {
161    type Inner = SharedFd<T>;
162
163    fn into_inner(self) -> Self::Inner {
164        self.inner.into_inner()
165    }
166}
167
168impl<T: AsFd> AsFd for AsyncFd<T> {
169    fn as_fd(&self) -> BorrowedFd<'_> {
170        self.inner.as_fd()
171    }
172}
173
174impl<T: AsFd> AsRawFd for AsyncFd<T> {
175    fn as_raw_fd(&self) -> RawFd {
176        self.inner.as_fd().as_raw_fd()
177    }
178}
179
180impl<T: AsFd> ToSharedFd<T> for AsyncFd<T> {
181    fn to_shared_fd(&self) -> SharedFd<T> {
182        self.inner.to_shared_fd()
183    }
184}
185
186impl<T: AsFd> Clone for AsyncFd<T> {
187    fn clone(&self) -> Self {
188        Self {
189            inner: self.inner.clone(),
190        }
191    }
192}
193
194impl<T: AsFd> Deref for AsyncFd<T> {
195    type Target = T;
196
197    fn deref(&self) -> &Self::Target {
198        &self.inner
199    }
200}
201
202impl<T: AsFd> Splittable for AsyncFd<T> {
203    type ReadHalf = Self;
204    type WriteHalf = Self;
205
206    fn split(self) -> (Self::ReadHalf, Self::WriteHalf) {
207        (self.clone(), self)
208    }
209}