1use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut, buf_try};
2use futures_util::FutureExt;
3
4use crate::{AsyncRead, AsyncWrite, IoResult, buffer::Buffer, util::DEFAULT_BUF_SIZE};
5pub trait AsyncBufRead: AsyncRead {
9 async fn fill_buf(&mut self) -> IoResult<&'_ [u8]>;
11
12 fn consume(&mut self, amount: usize);
14}
15
16impl<A: AsyncBufRead + ?Sized> AsyncBufRead for &mut A {
17 async fn fill_buf(&mut self) -> IoResult<&'_ [u8]> {
18 (**self).fill_buf().await
19 }
20
21 fn consume(&mut self, amount: usize) {
22 (**self).consume(amount)
23 }
24}
25
26#[derive(Debug)]
56pub struct BufReader<R> {
57 reader: R,
58 buf: Buffer,
59}
60
61impl<R> BufReader<R> {
62 pub fn new(reader: R) -> Self {
65 Self::with_capacity(DEFAULT_BUF_SIZE, reader)
66 }
67
68 pub fn with_capacity(cap: usize, reader: R) -> Self {
70 Self {
71 reader,
72 buf: Buffer::with_capacity(cap),
73 }
74 }
75}
76
77impl<R: AsyncRead> AsyncRead for BufReader<R> {
78 async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
79 let (mut slice, buf) = buf_try!(self.fill_buf().await, buf);
80 slice.read(buf).await.map_res(|res| {
81 self.consume(res);
82 res
83 })
84 }
85
86 async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
87 let (mut slice, buf) = buf_try!(self.fill_buf().await, buf);
88 slice.read_vectored(buf).await.map_res(|res| {
89 self.consume(res);
90 res
91 })
92 }
93}
94
95impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
96 async fn fill_buf(&mut self) -> IoResult<&'_ [u8]> {
97 let Self { reader, buf } = self;
98
99 if buf.all_done() {
100 buf.reset()
101 }
102
103 if buf.need_fill() {
104 buf.with(|b| {
105 let len = b.buf_len();
106 let b = b.slice(len..);
107 reader.read(b).map(IntoInner::into_inner)
108 })
109 .await?;
110 }
111
112 Ok(buf.buffer())
113 }
114
115 fn consume(&mut self, amount: usize) {
116 self.buf.advance(amount);
117 }
118}
119
120impl<R: AsyncRead + AsyncWrite> AsyncWrite for BufReader<R> {
121 async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B> {
122 self.reader.write(buf).await
123 }
124
125 async fn write_vectored<B: IoVectoredBuf>(&mut self, buf: B) -> BufResult<usize, B> {
126 self.reader.write_vectored(buf).await
127 }
128
129 async fn flush(&mut self) -> IoResult<()> {
130 self.reader.flush().await
131 }
132
133 async fn shutdown(&mut self) -> IoResult<()> {
134 self.reader.shutdown().await
135 }
136}
137
138impl<R> IntoInner for BufReader<R> {
139 type Inner = R;
140
141 fn into_inner(self) -> Self::Inner {
142 self.reader
143 }
144}