1use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut, buf_try};
2
3use crate::{AsyncRead, AsyncWrite, IoResult, buffer::Buffer, util::DEFAULT_BUF_SIZE};
4pub trait AsyncBufRead: AsyncRead {
8 async fn fill_buf(&mut self) -> IoResult<&'_ [u8]>;
10
11 fn consume(&mut self, amount: usize);
13}
14
15impl<A: AsyncBufRead + ?Sized> AsyncBufRead for &mut A {
16 async fn fill_buf(&mut self) -> IoResult<&'_ [u8]> {
17 (**self).fill_buf().await
18 }
19
20 fn consume(&mut self, amount: usize) {
21 (**self).consume(amount)
22 }
23}
24
25#[derive(Debug)]
55pub struct BufReader<R> {
56 reader: R,
57 buf: Buffer,
58}
59
60impl<R> BufReader<R> {
61 pub fn new(reader: R) -> Self {
64 Self::with_capacity(DEFAULT_BUF_SIZE, reader)
65 }
66
67 pub fn with_capacity(cap: usize, reader: R) -> Self {
69 Self {
70 reader,
71 buf: Buffer::with_capacity(cap),
72 }
73 }
74}
75
76impl<R: AsyncRead> AsyncRead for BufReader<R> {
77 async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
78 let (mut slice, buf) = buf_try!(self.fill_buf().await, buf);
79 slice.read(buf).await.map_res(|res| {
80 self.consume(res);
81 res
82 })
83 }
84
85 async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
86 let (mut slice, buf) = buf_try!(self.fill_buf().await, buf);
87 slice.read_vectored(buf).await.map_res(|res| {
88 self.consume(res);
89 res
90 })
91 }
92}
93
94impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
95 async fn fill_buf(&mut self) -> IoResult<&'_ [u8]> {
96 let Self { reader, buf } = self;
97
98 if buf.all_done() {
99 buf.reset()
100 }
101
102 if buf.need_fill() {
103 buf.with(|b| async move {
104 let len = b.buf_len();
105 let b = b.slice(len..);
106 reader.read(b).await.into_inner()
107 })
108 .await?;
109 }
110
111 Ok(buf.buffer())
112 }
113
114 fn consume(&mut self, amount: usize) {
115 self.buf.advance(amount);
116 }
117}
118
119impl<R: AsyncRead + AsyncWrite> AsyncWrite for BufReader<R> {
120 async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B> {
121 self.reader.write(buf).await
122 }
123
124 async fn write_vectored<B: IoVectoredBuf>(&mut self, buf: B) -> BufResult<usize, B> {
125 self.reader.write_vectored(buf).await
126 }
127
128 async fn flush(&mut self) -> IoResult<()> {
129 self.reader.flush().await
130 }
131
132 async fn shutdown(&mut self) -> IoResult<()> {
133 self.reader.shutdown().await
134 }
135}
136
137impl<R> IntoInner for BufReader<R> {
138 type Inner = R;
139
140 fn into_inner(self) -> Self::Inner {
141 self.reader
142 }
143}