1use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut, buf_try};
2
3use crate::{
4 AsyncBufRead, AsyncRead, AsyncWrite, IoResult,
5 buffer::Buffer,
6 util::{DEFAULT_BUF_SIZE, slice_to_buf},
7};
8
9#[derive(Debug)]
32pub struct BufWriter<W> {
33 writer: W,
34 buf: Buffer,
35}
36
37impl<W> BufWriter<W> {
38 pub fn new(writer: W) -> Self {
41 Self::with_capacity(DEFAULT_BUF_SIZE, writer)
42 }
43
44 pub fn with_capacity(cap: usize, writer: W) -> Self {
46 Self {
47 writer,
48 buf: Buffer::with_capacity(cap),
49 }
50 }
51}
52
53impl<W: AsyncWrite> BufWriter<W> {
54 async fn flush_if_needed(&mut self) -> IoResult<()> {
55 if self.buf.need_flush() {
56 self.flush().await?;
57 }
58 Ok(())
59 }
60}
61
62impl<W: AsyncWrite> AsyncWrite for BufWriter<W> {
63 async fn write<T: IoBuf>(&mut self, mut buf: T) -> BufResult<usize, T> {
64 (_, buf) = buf_try!(self.flush_if_needed().await, buf);
67
68 let written = self
69 .buf
70 .with_sync(|w| {
71 let len = w.buf_len();
72 let mut w = w.slice(len..);
73 let written = slice_to_buf(buf.as_init(), &mut w);
74 BufResult(Ok(written), w.into_inner())
75 })
76 .expect("Closure always return Ok");
77
78 (_, buf) = buf_try!(self.flush_if_needed().await, buf);
79
80 BufResult(Ok(written), buf)
81 }
82
83 async fn write_vectored<T: IoVectoredBuf>(&mut self, mut buf: T) -> BufResult<usize, T> {
84 (_, buf) = buf_try!(self.flush_if_needed().await, buf);
85
86 let written = self
87 .buf
88 .with_sync(|mut w| {
89 let mut written = 0;
90 for buf in buf.iter_slice() {
91 let len = w.buf_len();
92 let mut slice = w.slice(len..);
93 written += slice_to_buf(buf, &mut slice);
94 w = slice.into_inner();
95
96 if w.buf_len() == w.buf_capacity() {
97 break;
98 }
99 }
100 BufResult(Ok(written), w)
101 })
102 .expect("Closure always return Ok");
103
104 (_, buf) = buf_try!(self.flush_if_needed().await, buf);
105
106 BufResult(Ok(written), buf)
107 }
108
109 async fn flush(&mut self) -> IoResult<()> {
110 let Self { writer, buf } = self;
111
112 buf.flush_to(writer).await?;
113
114 Ok(())
115 }
116
117 async fn shutdown(&mut self) -> IoResult<()> {
118 self.flush().await?;
119 self.writer.shutdown().await
120 }
121}
122
123impl<W: AsyncRead + AsyncWrite> AsyncRead for BufWriter<W> {
124 async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
125 self.writer.read(buf).await
126 }
127
128 async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
129 self.writer.read_vectored(buf).await
130 }
131}
132
133impl<W: AsyncBufRead + AsyncWrite> AsyncBufRead for BufWriter<W> {
134 async fn fill_buf(&mut self) -> IoResult<&'_ [u8]> {
135 self.writer.fill_buf().await
136 }
137
138 fn consume(&mut self, amount: usize) {
139 self.writer.consume(amount)
140 }
141}
142
143impl<W> IntoInner for BufWriter<W> {
144 type Inner = W;
145
146 fn into_inner(self) -> Self::Inner {
147 self.writer
148 }
149}