1#[cfg(feature = "allocator_api")]
2use std::alloc::Allocator;
3use std::{io, io::ErrorKind};
4
5use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBufMut, Uninit, t_alloc};
6
7use crate::{
8 AsyncRead, AsyncReadAt, IoResult, framed,
9 util::{Splittable, Take},
10};
11
12macro_rules! read_scalar {
14 ($t:ty, $be:ident, $le:ident) => {
15 ::paste::paste! {
16 #[doc = concat!("Read a big endian `", stringify!($t), "` from the underlying reader.")]
17 async fn [< read_ $t >](&mut self) -> IoResult<$t> {
18 use ::compio_buf::{arrayvec::ArrayVec, BufResult};
19
20 const LEN: usize = ::std::mem::size_of::<$t>();
21 let BufResult(res, buf) = self.read_exact(ArrayVec::<u8, LEN>::new()).await;
22 res?;
23 Ok($t::$be(unsafe { buf.into_inner_unchecked() }))
25 }
26
27 #[doc = concat!("Read a little endian `", stringify!($t), "` from the underlying reader.")]
28 async fn [< read_ $t _le >](&mut self) -> IoResult<$t> {
29 use ::compio_buf::{arrayvec::ArrayVec, BufResult};
30
31 const LEN: usize = ::std::mem::size_of::<$t>();
32 let BufResult(res, buf) = self.read_exact(ArrayVec::<u8, LEN>::new()).await;
33 res?;
34 Ok($t::$le(unsafe { buf.into_inner_unchecked() }))
36 }
37 }
38 };
39}
40
41macro_rules! loop_read_exact {
43 ($buf:ident, $len:expr, $tracker:ident,loop $read_expr:expr) => {
44 let mut $tracker = 0;
45 let len = $len;
46
47 while $tracker < len {
48 match $read_expr.await.into_inner() {
49 BufResult(Ok(0), buf) => {
50 return BufResult(
51 Err(::std::io::Error::new(
52 ::std::io::ErrorKind::UnexpectedEof,
53 "failed to fill whole buffer",
54 )),
55 buf,
56 );
57 }
58 BufResult(Ok(n), buf) => {
59 $tracker += n;
60 $buf = buf;
61 }
62 BufResult(Err(ref e), buf) if e.kind() == ::std::io::ErrorKind::Interrupted => {
63 $buf = buf;
64 }
65 BufResult(Err(e), buf) => return BufResult(Err(e), buf),
66 }
67 }
68 return BufResult(Ok(()), $buf)
69 };
70}
71
72macro_rules! loop_read_vectored {
73 ($buf:ident, $iter:ident, $read_expr:expr) => {{
74 let mut $iter = match $buf.owned_iter() {
75 Ok(buf) => buf,
76 Err(buf) => return BufResult(Ok(0), buf),
77 };
78
79 loop {
80 let len = $iter.buf_capacity();
81 if len > 0 {
82 return $read_expr.await.into_inner();
83 }
84
85 match $iter.next() {
86 Ok(next) => $iter = next,
87 Err(buf) => return BufResult(Ok(0), buf),
88 }
89 }
90 }};
91}
92
93macro_rules! loop_read_to_end {
94 ($buf:ident, $tracker:ident : $tracker_ty:ty,loop $read_expr:expr) => {{
95 let mut $tracker: $tracker_ty = 0;
96 loop {
97 if $buf.len() == $buf.capacity() {
98 $buf.reserve(32);
99 }
100 match $read_expr.await.into_inner() {
101 BufResult(Ok(0), buf) => {
102 $buf = buf;
103 break;
104 }
105 BufResult(Ok(read), buf) => {
106 $tracker += read as $tracker_ty;
107 $buf = buf;
108 }
109 BufResult(Err(ref e), buf) if e.kind() == ::std::io::ErrorKind::Interrupted => {
110 $buf = buf
111 }
112 res => return res,
113 }
114 }
115 BufResult(Ok($tracker as usize), $buf)
116 }};
117}
118
119#[inline]
120fn after_read_to_string(res: io::Result<usize>, buf: Vec<u8>) -> BufResult<usize, String> {
121 match res {
122 Err(err) => {
123 let buf = String::from_utf8(buf).unwrap_or_else(|err| {
125 let mut buf = err.into_bytes();
126 buf.clear();
127
128 unsafe { String::from_utf8_unchecked(buf) }
130 });
131
132 BufResult(Err(err), buf)
133 }
134 Ok(n) => match String::from_utf8(buf) {
135 Err(err) => BufResult(
136 Err(std::io::Error::new(ErrorKind::InvalidData, err)),
137 String::new(),
138 ),
139 Ok(data) => BufResult(Ok(n), data),
140 },
141 }
142}
143
144pub trait AsyncReadExt: AsyncRead {
148 fn by_ref(&mut self) -> &mut Self
153 where
154 Self: Sized,
155 {
156 self
157 }
158
159 async fn append<T: IoBufMut>(&mut self, buf: T) -> BufResult<usize, T> {
163 self.read(buf.uninit()).await.map_buffer(Uninit::into_inner)
164 }
165
166 async fn read_exact<T: IoBufMut>(&mut self, mut buf: T) -> BufResult<(), T> {
168 loop_read_exact!(buf, buf.buf_capacity(), read, loop self.read(buf.slice(read..)));
169 }
170
171 async fn read_to_string(&mut self, buf: String) -> BufResult<usize, String> {
173 let BufResult(res, buf) = self.read_to_end(buf.into_bytes()).await;
174 after_read_to_string(res, buf)
175 }
176
177 async fn read_to_end<#[cfg(feature = "allocator_api")] A: Allocator + 'static>(
179 &mut self,
180 mut buf: t_alloc!(Vec, u8, A),
181 ) -> BufResult<usize, t_alloc!(Vec, u8, A)> {
182 loop_read_to_end!(buf, total: usize, loop self.read(buf.slice(total..)))
183 }
184
185 async fn read_vectored_exact<T: IoVectoredBufMut>(&mut self, mut buf: T) -> BufResult<(), T> {
187 let len = buf.total_capacity();
188 loop_read_exact!(buf, len, read, loop self.read_vectored(buf.slice_mut(read)));
189 }
190
191 fn framed<T, C, F>(
194 self,
195 codec: C,
196 framer: F,
197 ) -> framed::Framed<Self::ReadHalf, Self::WriteHalf, C, F, T, T>
198 where
199 Self: Splittable + Sized,
200 {
201 framed::Framed::new(codec, framer).with_duplex(self)
202 }
203
204 #[cfg(feature = "bytes")]
207 fn bytes(self) -> framed::BytesFramed<Self::ReadHalf, Self::WriteHalf>
208 where
209 Self: Splittable + Sized,
210 {
211 framed::BytesFramed::new_bytes().with_duplex(self)
212 }
213
214 fn read_only(self) -> ReadOnly<Self>
235 where
236 Self: Sized,
237 {
238 ReadOnly(self)
239 }
240
241 fn take(self, limit: u64) -> Take<Self>
250 where
251 Self: Sized,
252 {
253 Take::new(self, limit)
254 }
255
256 read_scalar!(u8, from_be_bytes, from_le_bytes);
257 read_scalar!(u16, from_be_bytes, from_le_bytes);
258 read_scalar!(u32, from_be_bytes, from_le_bytes);
259 read_scalar!(u64, from_be_bytes, from_le_bytes);
260 read_scalar!(u128, from_be_bytes, from_le_bytes);
261 read_scalar!(i8, from_be_bytes, from_le_bytes);
262 read_scalar!(i16, from_be_bytes, from_le_bytes);
263 read_scalar!(i32, from_be_bytes, from_le_bytes);
264 read_scalar!(i64, from_be_bytes, from_le_bytes);
265 read_scalar!(i128, from_be_bytes, from_le_bytes);
266 read_scalar!(f32, from_be_bytes, from_le_bytes);
267 read_scalar!(f64, from_be_bytes, from_le_bytes);
268}
269
270impl<A: AsyncRead + ?Sized> AsyncReadExt for A {}
271
272pub trait AsyncReadAtExt: AsyncReadAt {
276 async fn read_exact_at<T: IoBufMut>(&self, mut buf: T, pos: u64) -> BufResult<(), T> {
297 loop_read_exact!(
298 buf,
299 buf.buf_capacity(),
300 read,
301 loop self.read_at(buf.slice(read..), pos + read as u64)
302 );
303 }
304
305 async fn read_to_string_at(&mut self, buf: String, pos: u64) -> BufResult<usize, String> {
308 let BufResult(res, buf) = self.read_to_end_at(buf.into_bytes(), pos).await;
309 after_read_to_string(res, buf)
310 }
311
312 async fn read_to_end_at<#[cfg(feature = "allocator_api")] A: Allocator + 'static>(
323 &self,
324 mut buffer: t_alloc!(Vec, u8, A),
325 pos: u64,
326 ) -> BufResult<usize, t_alloc!(Vec, u8, A)> {
327 loop_read_to_end!(buffer, total: u64, loop self.read_at(buffer.slice(total as usize..), pos + total))
328 }
329
330 async fn read_vectored_exact_at<T: IoVectoredBufMut>(
333 &self,
334 mut buf: T,
335 pos: u64,
336 ) -> BufResult<(), T> {
337 let len = buf.total_capacity();
338 loop_read_exact!(buf, len, read, loop self.read_vectored_at(buf.slice_mut(read), pos + read as u64));
339 }
340}
341
342impl<A: AsyncReadAt + ?Sized> AsyncReadAtExt for A {}
343
344pub struct ReadOnly<R>(pub R);
350
351impl<R: AsyncRead> AsyncRead for ReadOnly<R> {
352 async fn read<T: IoBufMut>(&mut self, buf: T) -> BufResult<usize, T> {
353 self.0.read(buf).await
354 }
355
356 async fn read_vectored<T: IoVectoredBufMut>(&mut self, buf: T) -> BufResult<usize, T> {
357 self.0.read_vectored(buf).await
358 }
359}
360
361impl<R> Splittable for ReadOnly<R> {
362 type ReadHalf = R;
363 type WriteHalf = ();
364
365 fn split(self) -> (Self::ReadHalf, Self::WriteHalf) {
366 (self.0, ())
367 }
368}