Skip to main content

compio_io/write/
buf.rs

1use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut, buf_try};
2
3use crate::{
4    AsyncBufRead, AsyncRead, AsyncWrite, IoResult,
5    buffer::Buffer,
6    util::{DEFAULT_BUF_SIZE, slice_to_buf},
7};
8
9/// Wraps a writer and buffers its output.
10///
11/// It can be excessively inefficient to work directly with something that
12/// implements [`AsyncWrite`].  A `BufWriter<W>` keeps an in-memory buffer of
13/// data and writes it to an underlying writer in large, infrequent batches.
14//
15/// `BufWriter<W>` can improve the speed of programs that make *small* and
16/// *repeated* write calls to the same file or network socket. It does not
17/// help when writing very large amounts at once, or writing just one or a few
18/// times. It also provides no advantage when writing to a destination that is
19/// in memory, like a `Vec<u8>`.
20///
21/// If the underlying writer also implements [`AsyncRead`] or [`AsyncBufRead`],
22/// `BufWriter<W>` forwards read operations directly to the inner reader without
23/// flushing the write buffer.
24///
25/// Dropping `BufWriter<W>` also discards any bytes left in the buffer, so it is
26/// critical to call [`flush`] before `BufWriter<W>` is dropped. Calling
27/// [`flush`] ensures that the buffer is empty and thus no data is lost.
28///
29/// [`flush`]: AsyncWrite::flush
30
31#[derive(Debug)]
32pub struct BufWriter<W> {
33    writer: W,
34    buf: Buffer,
35}
36
37impl<W> BufWriter<W> {
38    /// Creates a new `BufWriter` with a default buffer capacity. The default is
39    /// currently 8 KiB, but may change in the future.
40    pub fn new(writer: W) -> Self {
41        Self::with_capacity(DEFAULT_BUF_SIZE, writer)
42    }
43
44    /// Creates a new `BufWriter` with the specified buffer capacity.
45    pub fn with_capacity(cap: usize, writer: W) -> Self {
46        Self {
47            writer,
48            buf: Buffer::with_capacity(cap),
49        }
50    }
51}
52
53impl<W: AsyncWrite> BufWriter<W> {
54    async fn flush_if_needed(&mut self) -> IoResult<()> {
55        if self.buf.need_flush() {
56            self.flush().await?;
57        }
58        Ok(())
59    }
60}
61
62impl<W: AsyncWrite> AsyncWrite for BufWriter<W> {
63    async fn write<T: IoBuf>(&mut self, mut buf: T) -> BufResult<usize, T> {
64        // The previous flush may error because disk full. We need to make the buffer
65        // all-done before writing new data to it.
66        (_, buf) = buf_try!(self.flush_if_needed().await, buf);
67
68        let written = self
69            .buf
70            .with_sync(|w| {
71                let len = w.buf_len();
72                let mut w = w.slice(len..);
73                let written = slice_to_buf(buf.as_init(), &mut w);
74                BufResult(Ok(written), w.into_inner())
75            })
76            .expect("Closure always return Ok");
77
78        (_, buf) = buf_try!(self.flush_if_needed().await, buf);
79
80        BufResult(Ok(written), buf)
81    }
82
83    async fn write_vectored<T: IoVectoredBuf>(&mut self, mut buf: T) -> BufResult<usize, T> {
84        (_, buf) = buf_try!(self.flush_if_needed().await, buf);
85
86        let written = self
87            .buf
88            .with_sync(|mut w| {
89                let mut written = 0;
90                for buf in buf.iter_slice() {
91                    let len = w.buf_len();
92                    let mut slice = w.slice(len..);
93                    written += slice_to_buf(buf, &mut slice);
94                    w = slice.into_inner();
95
96                    if w.buf_len() == w.buf_capacity() {
97                        break;
98                    }
99                }
100                BufResult(Ok(written), w)
101            })
102            .expect("Closure always return Ok");
103
104        (_, buf) = buf_try!(self.flush_if_needed().await, buf);
105
106        BufResult(Ok(written), buf)
107    }
108
109    async fn flush(&mut self) -> IoResult<()> {
110        let Self { writer, buf } = self;
111
112        buf.flush_to(writer).await?;
113
114        Ok(())
115    }
116
117    async fn shutdown(&mut self) -> IoResult<()> {
118        self.flush().await?;
119        self.writer.shutdown().await
120    }
121}
122
123impl<W: AsyncRead + AsyncWrite> AsyncRead for BufWriter<W> {
124    async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
125        self.writer.read(buf).await
126    }
127
128    async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
129        self.writer.read_vectored(buf).await
130    }
131}
132
133impl<W: AsyncBufRead + AsyncWrite> AsyncBufRead for BufWriter<W> {
134    async fn fill_buf(&mut self) -> IoResult<&'_ [u8]> {
135        self.writer.fill_buf().await
136    }
137
138    fn consume(&mut self, amount: usize) {
139        self.writer.consume(amount)
140    }
141}
142
143impl<W> IntoInner for BufWriter<W> {
144    type Inner = W;
145
146    fn into_inner(self) -> Self::Inner {
147        self.writer
148    }
149}