Skip to main content

compio_io/read/
buf.rs

1use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut, buf_try};
2use futures_util::FutureExt;
3
4use crate::{AsyncRead, AsyncWrite, IoResult, buffer::Buffer, util::DEFAULT_BUF_SIZE};
5/// # AsyncBufRead
6///
7/// Async read with buffered content.
8pub trait AsyncBufRead: AsyncRead {
9    /// Try fill the internal buffer with data
10    async fn fill_buf(&mut self) -> IoResult<&'_ [u8]>;
11
12    /// Mark how much data is read
13    fn consume(&mut self, amount: usize);
14}
15
16impl<A: AsyncBufRead + ?Sized> AsyncBufRead for &mut A {
17    async fn fill_buf(&mut self) -> IoResult<&'_ [u8]> {
18        (**self).fill_buf().await
19    }
20
21    fn consume(&mut self, amount: usize) {
22        (**self).consume(amount)
23    }
24}
25
26/// Wraps a reader and buffers input from [`AsyncRead`]
27///
28/// It can be excessively inefficient to work directly with a [`AsyncRead`]
29/// instance. A `BufReader<R>` performs large, infrequent reads on the
30/// underlying [`AsyncRead`] and maintains an in-memory buffer of the results.
31///
32/// `BufReader<R>` can improve the speed of programs that make *small* and
33/// *repeated* read calls to the same file or network socket. It does not
34/// help when reading very large amounts at once, or reading just one or a few
35/// times. It also provides no advantage when reading from a source that is
36/// already in memory, like a `Vec<u8>`.
37///
38/// If the underlying reader also implements [`AsyncWrite`], `BufReader<R>`
39/// forwards write operations directly to the inner writer without touching the
40/// read buffer.
41///
42/// When the `BufReader<R>` is dropped, the contents of its buffer will be
43/// discarded. Reading from the underlying reader after unwrapping the
44/// `BufReader<R>` with [`BufReader::into_inner`] can cause data loss.
45///
46/// # Caution
47///
48/// Due to the pass-by-ownership nature of completion-based IO, the buffer is
49/// passed to the inner reader when [`fill_buf`] is called. If the future
50/// returned by [`fill_buf`] is dropped before inner `read` is completed,
51/// `BufReader` will not be able to retrieve the buffer, causing panic on next
52/// [`fill_buf`] call.
53///
54/// [`fill_buf`]: #method.fill_buf
55#[derive(Debug)]
56pub struct BufReader<R> {
57    reader: R,
58    buf: Buffer,
59}
60
61impl<R> BufReader<R> {
62    /// Creates a new `BufReader` with a default buffer capacity. The default is
63    /// currently 8 KiB, but may change in the future.
64    pub fn new(reader: R) -> Self {
65        Self::with_capacity(DEFAULT_BUF_SIZE, reader)
66    }
67
68    /// Creates a new `BufReader` with the specified buffer capacity.
69    pub fn with_capacity(cap: usize, reader: R) -> Self {
70        Self {
71            reader,
72            buf: Buffer::with_capacity(cap),
73        }
74    }
75}
76
77impl<R: AsyncRead> AsyncRead for BufReader<R> {
78    async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
79        let (mut slice, buf) = buf_try!(self.fill_buf().await, buf);
80        slice.read(buf).await.map_res(|res| {
81            self.consume(res);
82            res
83        })
84    }
85
86    async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
87        let (mut slice, buf) = buf_try!(self.fill_buf().await, buf);
88        slice.read_vectored(buf).await.map_res(|res| {
89            self.consume(res);
90            res
91        })
92    }
93}
94
95impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
96    async fn fill_buf(&mut self) -> IoResult<&'_ [u8]> {
97        let Self { reader, buf } = self;
98
99        if buf.all_done() {
100            buf.reset()
101        }
102
103        if buf.need_fill() {
104            buf.with(|b| {
105                let len = b.buf_len();
106                let b = b.slice(len..);
107                reader.read(b).map(IntoInner::into_inner)
108            })
109            .await?;
110        }
111
112        Ok(buf.buffer())
113    }
114
115    fn consume(&mut self, amount: usize) {
116        self.buf.advance(amount);
117    }
118}
119
120impl<R: AsyncRead + AsyncWrite> AsyncWrite for BufReader<R> {
121    async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B> {
122        self.reader.write(buf).await
123    }
124
125    async fn write_vectored<B: IoVectoredBuf>(&mut self, buf: B) -> BufResult<usize, B> {
126        self.reader.write_vectored(buf).await
127    }
128
129    async fn flush(&mut self) -> IoResult<()> {
130        self.reader.flush().await
131    }
132
133    async fn shutdown(&mut self) -> IoResult<()> {
134        self.reader.shutdown().await
135    }
136}
137
138impl<R> IntoInner for BufReader<R> {
139    type Inner = R;
140
141    fn into_inner(self) -> Self::Inner {
142        self.reader
143    }
144}