compio_io\compat/
sync_stream.rs

1use std::{
2    io::{self, BufRead, Read, Write},
3    mem::MaybeUninit,
4};
5
6use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut};
7
8use crate::{buffer::Buffer, util::DEFAULT_BUF_SIZE};
9
10/// A growable buffered stream adapter that bridges async I/O with sync traits.
11///
12/// # Buffer Growth Strategy
13///
14/// - **Read buffer**: Grows as needed to accommodate incoming data, up to
15///   `max_buffer_size`
16/// - **Write buffer**: Grows as needed for outgoing data, up to
17///   `max_buffer_size`
18/// - Both buffers shrink back to `base_capacity` when fully consumed and
19///   capacity exceeds 4x base
20///
21/// # Usage Pattern
22///
23/// The sync `Read` and `Write` implementations will return `WouldBlock` errors
24/// when buffers need servicing via the async methods:
25///
26/// - Call `fill_read_buf()` when `Read::read()` returns `WouldBlock`
27/// - Call `flush_write_buf()` when `Write::write()` returns `WouldBlock`
28///
29/// # Note on flush()
30///
31/// The `Write::flush()` method intentionally returns `Ok(())` without checking
32/// if there's buffered data. This is for compatibility with libraries like
33/// tungstenite that call `flush()` after every write. Actual flushing happens
34/// via the async `flush_write_buf()` method.
35#[derive(Debug)]
36pub struct SyncStream<S> {
37    inner: S,
38    read_buf: Buffer,
39    write_buf: Buffer,
40    eof: bool,
41    base_capacity: usize,
42    max_buffer_size: usize,
43}
44
45impl<S> SyncStream<S> {
46    // 64MiB max
47    const DEFAULT_MAX_BUFFER: usize = 64 * 1024 * 1024;
48
49    /// Creates a new `SyncStream` with default buffer sizes.
50    ///
51    /// - Base capacity: 8KiB
52    /// - Max buffer size: 64MiB
53    pub fn new(stream: S) -> Self {
54        Self::with_capacity(DEFAULT_BUF_SIZE, stream)
55    }
56
57    /// Creates a new `SyncStream` with a custom base capacity.
58    ///
59    /// The maximum buffer size defaults to 64MiB.
60    pub fn with_capacity(base_capacity: usize, stream: S) -> Self {
61        Self::with_limits(base_capacity, Self::DEFAULT_MAX_BUFFER, stream)
62    }
63
64    /// Creates a new `SyncStream` with custom base capacity and maximum
65    /// buffer size.
66    pub fn with_limits(base_capacity: usize, max_buffer_size: usize, stream: S) -> Self {
67        Self {
68            inner: stream,
69            read_buf: Buffer::with_capacity(base_capacity),
70            write_buf: Buffer::with_capacity(base_capacity),
71            eof: false,
72            base_capacity,
73            max_buffer_size,
74        }
75    }
76
77    /// Returns a reference to the underlying stream.
78    pub fn get_ref(&self) -> &S {
79        &self.inner
80    }
81
82    /// Returns a mutable reference to the underlying stream.
83    pub fn get_mut(&mut self) -> &mut S {
84        &mut self.inner
85    }
86
87    /// Consumes the `SyncStream`, returning the underlying stream.
88    pub fn into_inner(self) -> S {
89        self.inner
90    }
91
92    /// Returns `true` if the stream has reached EOF.
93    pub fn is_eof(&self) -> bool {
94        self.eof
95    }
96
97    /// Returns the available bytes in the read buffer.
98    fn available_read(&self) -> &[u8] {
99        self.read_buf.buffer()
100    }
101
102    /// Marks `amt` bytes as consumed from the read buffer.
103    ///
104    /// Resets the buffer when all data is consumed and shrinks capacity
105    /// if it has grown significantly beyond the base capacity.
106    fn consume_read(&mut self, amt: usize) {
107        let all_done = self.read_buf.advance(amt);
108
109        // Shrink oversized buffers back to base capacity
110        if all_done {
111            self.read_buf
112                .compact_to(self.base_capacity, self.max_buffer_size);
113        }
114    }
115
116    /// Pull some bytes from this source into the specified buffer.
117    pub fn read_buf_uninit(&mut self, buf: &mut [MaybeUninit<u8>]) -> io::Result<usize> {
118        let available = self.fill_buf()?;
119
120        let to_read = available.len().min(buf.len());
121        buf[..to_read].copy_from_slice(unsafe {
122            std::slice::from_raw_parts(available.as_ptr().cast(), to_read)
123        });
124        self.consume(to_read);
125
126        Ok(to_read)
127    }
128}
129
130impl<S> Read for SyncStream<S> {
131    /// Reads data from the internal buffer.
132    ///
133    /// Returns `WouldBlock` if the buffer is empty and not at EOF,
134    /// indicating that `fill_read_buf()` should be called.
135    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
136        let mut slice = self.fill_buf()?;
137        slice.read(buf).inspect(|res| {
138            self.consume(*res);
139        })
140    }
141
142    #[cfg(feature = "read_buf")]
143    fn read_buf(&mut self, mut buf: io::BorrowedCursor<'_>) -> io::Result<()> {
144        let mut slice = self.fill_buf()?;
145        let old_written = buf.written();
146        slice.read_buf(buf.reborrow())?;
147        let len = buf.written() - old_written;
148        self.consume(len);
149        Ok(())
150    }
151}
152
153impl<S> BufRead for SyncStream<S> {
154    fn fill_buf(&mut self) -> io::Result<&[u8]> {
155        let available = self.available_read();
156
157        if available.is_empty() && !self.eof {
158            return Err(would_block("need to fill read buffer"));
159        }
160
161        Ok(available)
162    }
163
164    fn consume(&mut self, amt: usize) {
165        self.consume_read(amt);
166    }
167}
168
169impl<S> Write for SyncStream<S> {
170    /// Writes data to the internal buffer.
171    ///
172    /// Returns `WouldBlock` if the buffer needs flushing or has reached max
173    /// capacity. In the latter case, it may write partial data before
174    /// returning `WouldBlock`.
175    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
176        // Check if we should flush first
177        if self.write_buf.need_flush() && !self.write_buf.is_empty() {
178            return Err(would_block("need to flush write buffer"));
179        }
180
181        let written = self.write_buf.with_sync(|mut inner| {
182            let res = if inner.buf_len() + buf.len() > self.max_buffer_size {
183                let space = self.max_buffer_size - inner.buf_len();
184                if space == 0 {
185                    Err(would_block("write buffer full, need to flush"))
186                } else {
187                    inner.extend_from_slice(&buf[..space]);
188                    Ok(space)
189                }
190            } else {
191                inner.extend_from_slice(buf);
192                Ok(buf.len())
193            };
194            BufResult(res, inner)
195        })?;
196
197        Ok(written)
198    }
199
200    /// Returns `Ok(())` without checking for buffered data.
201    ///
202    /// **Important**: This does NOT actually flush data to the underlying
203    /// stream. This behavior is intentional for compatibility with
204    /// libraries like tungstenite that call `flush()` after every write
205    /// operation. The actual async flush happens when `flush_write_buf()`
206    /// is called.
207    ///
208    /// This prevents spurious errors in sync code that expects `flush()` to
209    /// succeed after successfully buffering data.
210    fn flush(&mut self) -> io::Result<()> {
211        Ok(())
212    }
213}
214
215fn would_block(msg: &str) -> io::Error {
216    io::Error::new(io::ErrorKind::WouldBlock, msg)
217}
218
219impl<S: crate::AsyncRead> SyncStream<S> {
220    /// Fills the read buffer by reading from the underlying async stream.
221    ///
222    /// This method:
223    /// 1. Compacts the buffer if there's unconsumed data
224    /// 2. Ensures there's space for at least `base_capacity` more bytes
225    /// 3. Reads data from the underlying stream
226    /// 4. Returns the number of bytes read (0 indicates EOF)
227    ///
228    /// # Errors
229    ///
230    /// Returns an error if:
231    /// - The read buffer has reached `max_buffer_size`
232    /// - The underlying stream returns an error
233    pub async fn fill_read_buf(&mut self) -> io::Result<usize> {
234        if self.eof {
235            return Ok(0);
236        }
237
238        // Compact buffer, move unconsumed data to the front
239        self.read_buf
240            .compact_to(self.base_capacity, self.max_buffer_size);
241
242        let read = self
243            .read_buf
244            .with(|mut inner| async {
245                let current_len = inner.buf_len();
246
247                if current_len >= self.max_buffer_size {
248                    return BufResult(
249                        Err(io::Error::new(
250                            io::ErrorKind::OutOfMemory,
251                            format!("read buffer size limit ({}) exceeded", self.max_buffer_size),
252                        )),
253                        inner,
254                    );
255                }
256
257                let capacity = inner.buf_capacity();
258                let available_space = capacity - current_len;
259
260                // If target space is less than base capacity, grow the buffer.
261                let target_space = self.base_capacity;
262                if available_space < target_space {
263                    let new_capacity = current_len + target_space;
264                    inner.reserve_exact(new_capacity - capacity);
265                }
266
267                let len = inner.buf_len();
268                let read_slice = inner.slice(len..);
269                self.inner.read(read_slice).await.into_inner()
270            })
271            .await?;
272        if read == 0 {
273            self.eof = true;
274        }
275        Ok(read)
276    }
277}
278
279impl<S: crate::AsyncWrite> SyncStream<S> {
280    /// Flushes the write buffer to the underlying async stream.
281    ///
282    /// This method:
283    /// 1. Writes all buffered data to the underlying stream
284    /// 2. Calls `flush()` on the underlying stream
285    /// 3. Returns the total number of bytes flushed
286    ///
287    /// On error, any unwritten data remains in the buffer and can be retried.
288    ///
289    /// # Errors
290    ///
291    /// Returns an error if the underlying stream returns an error.
292    /// In this case, the buffer retains any data that wasn't successfully
293    /// written.
294    pub async fn flush_write_buf(&mut self) -> io::Result<usize> {
295        let flushed = self.write_buf.flush_to(&mut self.inner).await?;
296        self.write_buf
297            .compact_to(self.base_capacity, self.max_buffer_size);
298        self.inner.flush().await?;
299        Ok(flushed)
300    }
301}