Skip to main content

compio_io/read/
buf.rs

1use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut, buf_try};
2
3use crate::{AsyncRead, AsyncWrite, IoResult, buffer::Buffer, util::DEFAULT_BUF_SIZE};
4/// # AsyncBufRead
5///
6/// Async read with buffered content.
7pub trait AsyncBufRead: AsyncRead {
8    /// Try fill the internal buffer with data
9    async fn fill_buf(&mut self) -> IoResult<&'_ [u8]>;
10
11    /// Mark how much data is read
12    fn consume(&mut self, amount: usize);
13}
14
15impl<A: AsyncBufRead + ?Sized> AsyncBufRead for &mut A {
16    async fn fill_buf(&mut self) -> IoResult<&'_ [u8]> {
17        (**self).fill_buf().await
18    }
19
20    fn consume(&mut self, amount: usize) {
21        (**self).consume(amount)
22    }
23}
24
25/// Wraps a reader and buffers input from [`AsyncRead`]
26///
27/// It can be excessively inefficient to work directly with a [`AsyncRead`]
28/// instance. A `BufReader<R>` performs large, infrequent reads on the
29/// underlying [`AsyncRead`] and maintains an in-memory buffer of the results.
30///
31/// `BufReader<R>` can improve the speed of programs that make *small* and
32/// *repeated* read calls to the same file or network socket. It does not
33/// help when reading very large amounts at once, or reading just one or a few
34/// times. It also provides no advantage when reading from a source that is
35/// already in memory, like a `Vec<u8>`.
36///
37/// If the underlying reader also implements [`AsyncWrite`], `BufReader<R>`
38/// forwards write operations directly to the inner writer without touching the
39/// read buffer.
40///
41/// When the `BufReader<R>` is dropped, the contents of its buffer will be
42/// discarded. Reading from the underlying reader after unwrapping the
43/// `BufReader<R>` with [`BufReader::into_inner`] can cause data loss.
44///
45/// # Caution
46///
47/// Due to the pass-by-ownership nature of completion-based IO, the buffer is
48/// passed to the inner reader when [`fill_buf`] is called. If the future
49/// returned by [`fill_buf`] is dropped before inner `read` is completed,
50/// `BufReader` will not be able to retrieve the buffer, causing panic on next
51/// [`fill_buf`] call.
52///
53/// [`fill_buf`]: #method.fill_buf
54#[derive(Debug)]
55pub struct BufReader<R> {
56    reader: R,
57    buf: Buffer,
58}
59
60impl<R> BufReader<R> {
61    /// Creates a new `BufReader` with a default buffer capacity. The default is
62    /// currently 8 KiB, but may change in the future.
63    pub fn new(reader: R) -> Self {
64        Self::with_capacity(DEFAULT_BUF_SIZE, reader)
65    }
66
67    /// Creates a new `BufReader` with the specified buffer capacity.
68    pub fn with_capacity(cap: usize, reader: R) -> Self {
69        Self {
70            reader,
71            buf: Buffer::with_capacity(cap),
72        }
73    }
74}
75
76impl<R: AsyncRead> AsyncRead for BufReader<R> {
77    async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
78        let (mut slice, buf) = buf_try!(self.fill_buf().await, buf);
79        slice.read(buf).await.map_res(|res| {
80            self.consume(res);
81            res
82        })
83    }
84
85    async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
86        let (mut slice, buf) = buf_try!(self.fill_buf().await, buf);
87        slice.read_vectored(buf).await.map_res(|res| {
88            self.consume(res);
89            res
90        })
91    }
92}
93
94impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
95    async fn fill_buf(&mut self) -> IoResult<&'_ [u8]> {
96        let Self { reader, buf } = self;
97
98        if buf.all_done() {
99            buf.reset()
100        }
101
102        if buf.need_fill() {
103            buf.with(|b| async move {
104                let len = b.buf_len();
105                let b = b.slice(len..);
106                reader.read(b).await.into_inner()
107            })
108            .await?;
109        }
110
111        Ok(buf.buffer())
112    }
113
114    fn consume(&mut self, amount: usize) {
115        self.buf.advance(amount);
116    }
117}
118
119impl<R: AsyncRead + AsyncWrite> AsyncWrite for BufReader<R> {
120    async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B> {
121        self.reader.write(buf).await
122    }
123
124    async fn write_vectored<B: IoVectoredBuf>(&mut self, buf: B) -> BufResult<usize, B> {
125        self.reader.write_vectored(buf).await
126    }
127
128    async fn flush(&mut self) -> IoResult<()> {
129        self.reader.flush().await
130    }
131
132    async fn shutdown(&mut self) -> IoResult<()> {
133        self.reader.shutdown().await
134    }
135}
136
137impl<R> IntoInner for BufReader<R> {
138    type Inner = R;
139
140    fn into_inner(self) -> Self::Inner {
141        self.reader
142    }
143}