compio_io\compat/sync_stream.rs
1use std::{
2 io::{self, BufRead, Read, Write},
3 mem::MaybeUninit,
4};
5
6use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut};
7
8use crate::{buffer::Buffer, util::DEFAULT_BUF_SIZE};
9
10/// A growable buffered stream adapter that bridges async I/O with sync traits.
11///
12/// # Buffer Growth Strategy
13///
14/// - **Read buffer**: Grows as needed to accommodate incoming data, up to
15/// `max_buffer_size`
16/// - **Write buffer**: Grows as needed for outgoing data, up to
17/// `max_buffer_size`
18/// - Both buffers shrink back to `base_capacity` when fully consumed and
19/// capacity exceeds 4x base
20///
21/// # Usage Pattern
22///
23/// The sync `Read` and `Write` implementations will return `WouldBlock` errors
24/// when buffers need servicing via the async methods:
25///
26/// - Call `fill_read_buf()` when `Read::read()` returns `WouldBlock`
27/// - Call `flush_write_buf()` when `Write::write()` returns `WouldBlock`
28///
29/// # Note on flush()
30///
31/// The `Write::flush()` method intentionally returns `Ok(())` without checking
32/// if there's buffered data. This is for compatibility with libraries like
33/// tungstenite that call `flush()` after every write. Actual flushing happens
34/// via the async `flush_write_buf()` method.
35#[derive(Debug)]
36pub struct SyncStream<S> {
37 inner: S,
38 read_buf: Buffer,
39 write_buf: Buffer,
40 eof: bool,
41 base_capacity: usize,
42 max_buffer_size: usize,
43}
44
45impl<S> SyncStream<S> {
46 // 64MiB max
47 const DEFAULT_MAX_BUFFER: usize = 64 * 1024 * 1024;
48
49 /// Creates a new `SyncStream` with default buffer sizes.
50 ///
51 /// - Base capacity: 8KiB
52 /// - Max buffer size: 64MiB
53 pub fn new(stream: S) -> Self {
54 Self::with_capacity(DEFAULT_BUF_SIZE, stream)
55 }
56
57 /// Creates a new `SyncStream` with a custom base capacity.
58 ///
59 /// The maximum buffer size defaults to 64MiB.
60 pub fn with_capacity(base_capacity: usize, stream: S) -> Self {
61 Self::with_limits(base_capacity, Self::DEFAULT_MAX_BUFFER, stream)
62 }
63
64 /// Creates a new `SyncStream` with custom base capacity and maximum
65 /// buffer size.
66 pub fn with_limits(base_capacity: usize, max_buffer_size: usize, stream: S) -> Self {
67 Self {
68 inner: stream,
69 read_buf: Buffer::with_capacity(base_capacity),
70 write_buf: Buffer::with_capacity(base_capacity),
71 eof: false,
72 base_capacity,
73 max_buffer_size,
74 }
75 }
76
77 /// Returns a reference to the underlying stream.
78 pub fn get_ref(&self) -> &S {
79 &self.inner
80 }
81
82 /// Returns a mutable reference to the underlying stream.
83 pub fn get_mut(&mut self) -> &mut S {
84 &mut self.inner
85 }
86
87 /// Consumes the `SyncStream`, returning the underlying stream.
88 pub fn into_inner(self) -> S {
89 self.inner
90 }
91
92 /// Returns `true` if the stream has reached EOF.
93 pub fn is_eof(&self) -> bool {
94 self.eof
95 }
96
97 /// Returns the available bytes in the read buffer.
98 fn available_read(&self) -> &[u8] {
99 self.read_buf.buffer()
100 }
101
102 /// Marks `amt` bytes as consumed from the read buffer.
103 ///
104 /// Resets the buffer when all data is consumed and shrinks capacity
105 /// if it has grown significantly beyond the base capacity.
106 fn consume_read(&mut self, amt: usize) {
107 let all_done = self.read_buf.advance(amt);
108
109 // Shrink oversized buffers back to base capacity
110 if all_done {
111 self.read_buf
112 .compact_to(self.base_capacity, self.max_buffer_size);
113 }
114 }
115
116 /// Pull some bytes from this source into the specified buffer.
117 pub fn read_buf_uninit(&mut self, buf: &mut [MaybeUninit<u8>]) -> io::Result<usize> {
118 let available = self.fill_buf()?;
119
120 let to_read = available.len().min(buf.len());
121 buf[..to_read].copy_from_slice(unsafe {
122 std::slice::from_raw_parts(available.as_ptr().cast(), to_read)
123 });
124 self.consume(to_read);
125
126 Ok(to_read)
127 }
128}
129
130impl<S> Read for SyncStream<S> {
131 /// Reads data from the internal buffer.
132 ///
133 /// Returns `WouldBlock` if the buffer is empty and not at EOF,
134 /// indicating that `fill_read_buf()` should be called.
135 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
136 let mut slice = self.fill_buf()?;
137 slice.read(buf).inspect(|res| {
138 self.consume(*res);
139 })
140 }
141
142 #[cfg(feature = "read_buf")]
143 fn read_buf(&mut self, mut buf: io::BorrowedCursor<'_>) -> io::Result<()> {
144 let mut slice = self.fill_buf()?;
145 let old_written = buf.written();
146 slice.read_buf(buf.reborrow())?;
147 let len = buf.written() - old_written;
148 self.consume(len);
149 Ok(())
150 }
151}
152
153impl<S> BufRead for SyncStream<S> {
154 fn fill_buf(&mut self) -> io::Result<&[u8]> {
155 let available = self.available_read();
156
157 if available.is_empty() && !self.eof {
158 return Err(would_block("need to fill read buffer"));
159 }
160
161 Ok(available)
162 }
163
164 fn consume(&mut self, amt: usize) {
165 self.consume_read(amt);
166 }
167}
168
169impl<S> Write for SyncStream<S> {
170 /// Writes data to the internal buffer.
171 ///
172 /// Returns `WouldBlock` if the buffer needs flushing or has reached max
173 /// capacity. In the latter case, it may write partial data before
174 /// returning `WouldBlock`.
175 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
176 // Check if we should flush first
177 if self.write_buf.need_flush() && !self.write_buf.is_empty() {
178 return Err(would_block("need to flush write buffer"));
179 }
180
181 let written = self.write_buf.with_sync(|mut inner| {
182 let res = if inner.buf_len() + buf.len() > self.max_buffer_size {
183 let space = self.max_buffer_size - inner.buf_len();
184 if space == 0 {
185 Err(would_block("write buffer full, need to flush"))
186 } else {
187 inner.extend_from_slice(&buf[..space]);
188 Ok(space)
189 }
190 } else {
191 inner.extend_from_slice(buf);
192 Ok(buf.len())
193 };
194 BufResult(res, inner)
195 })?;
196
197 Ok(written)
198 }
199
200 /// Returns `Ok(())` without checking for buffered data.
201 ///
202 /// **Important**: This does NOT actually flush data to the underlying
203 /// stream. This behavior is intentional for compatibility with
204 /// libraries like tungstenite that call `flush()` after every write
205 /// operation. The actual async flush happens when `flush_write_buf()`
206 /// is called.
207 ///
208 /// This prevents spurious errors in sync code that expects `flush()` to
209 /// succeed after successfully buffering data.
210 fn flush(&mut self) -> io::Result<()> {
211 Ok(())
212 }
213}
214
215fn would_block(msg: &str) -> io::Error {
216 io::Error::new(io::ErrorKind::WouldBlock, msg)
217}
218
219impl<S: crate::AsyncRead> SyncStream<S> {
220 /// Fills the read buffer by reading from the underlying async stream.
221 ///
222 /// This method:
223 /// 1. Compacts the buffer if there's unconsumed data
224 /// 2. Ensures there's space for at least `base_capacity` more bytes
225 /// 3. Reads data from the underlying stream
226 /// 4. Returns the number of bytes read (0 indicates EOF)
227 ///
228 /// # Errors
229 ///
230 /// Returns an error if:
231 /// - The read buffer has reached `max_buffer_size`
232 /// - The underlying stream returns an error
233 pub async fn fill_read_buf(&mut self) -> io::Result<usize> {
234 if self.eof {
235 return Ok(0);
236 }
237
238 // Compact buffer, move unconsumed data to the front
239 self.read_buf
240 .compact_to(self.base_capacity, self.max_buffer_size);
241
242 let read = self
243 .read_buf
244 .with(|mut inner| async {
245 let current_len = inner.buf_len();
246
247 if current_len >= self.max_buffer_size {
248 return BufResult(
249 Err(io::Error::new(
250 io::ErrorKind::OutOfMemory,
251 format!("read buffer size limit ({}) exceeded", self.max_buffer_size),
252 )),
253 inner,
254 );
255 }
256
257 let capacity = inner.buf_capacity();
258 let available_space = capacity - current_len;
259
260 // If target space is less than base capacity, grow the buffer.
261 let target_space = self.base_capacity;
262 if available_space < target_space {
263 let new_capacity = current_len + target_space;
264 inner.reserve_exact(new_capacity - capacity);
265 }
266
267 let len = inner.buf_len();
268 let read_slice = inner.slice(len..);
269 self.inner.read(read_slice).await.into_inner()
270 })
271 .await?;
272 if read == 0 {
273 self.eof = true;
274 }
275 Ok(read)
276 }
277}
278
279impl<S: crate::AsyncWrite> SyncStream<S> {
280 /// Flushes the write buffer to the underlying async stream.
281 ///
282 /// This method:
283 /// 1. Writes all buffered data to the underlying stream
284 /// 2. Calls `flush()` on the underlying stream
285 /// 3. Returns the total number of bytes flushed
286 ///
287 /// On error, any unwritten data remains in the buffer and can be retried.
288 ///
289 /// # Errors
290 ///
291 /// Returns an error if the underlying stream returns an error.
292 /// In this case, the buffer retains any data that wasn't successfully
293 /// written.
294 pub async fn flush_write_buf(&mut self) -> io::Result<usize> {
295 let flushed = self.write_buf.flush_to(&mut self.inner).await?;
296 self.write_buf
297 .compact_to(self.base_capacity, self.max_buffer_size);
298 self.inner.flush().await?;
299 Ok(flushed)
300 }
301}