compio_io\framed/mod.rs
1//! Framed I/O operations.
2//!
3//! This module provides functionality for encoding and decoding frames
4//! for network protocols and other stream-based communication.
5
6use std::marker::PhantomData;
7
8use compio_buf::IoBufMut;
9use futures_util::FutureExt;
10
11use crate::{AsyncRead, util::Splittable};
12
13pub mod codec;
14pub mod frame;
15
16mod read;
17mod write;
18
19const CONFIG_POLLED_ERROR: &str = "`Framed` should not be configured after being polled";
20const INCONSISTENT_ERROR: &str = "`Framed` is in an inconsistent state";
21
22#[cold]
23fn panic_config_polled() -> ! {
24 panic!("{}", CONFIG_POLLED_ERROR);
25}
26
27/// A framed encoder/decoder that handles both [`Sink`] for writing frames and
28/// [`Stream`] for reading frames.
29///
30/// It uses a [`codec`] to encode/decode messages into/from bytes (`T <-->
31/// IoBufMut`) and a [`Framer`] to define how frames are laid out in buffer
32/// (`&[u8] <--> IoBufMut`).
33///
34/// [`Framer`]: frame::Framer
35/// [`Sink`]: futures_util::Sink
36/// [`Stream`]: futures_util::Stream
37pub struct Framed<R, W, C, F, In, Out, B = Vec<u8>> {
38 read_state: read::State<R, B>,
39 write_state: write::State<W, B>,
40 codec: C,
41 framer: F,
42 types: PhantomData<(In, Out)>,
43}
44
45/// [`Framed`] with same `In` ([`Sink`]) and `Out` ([`Stream::Item`]) type
46///
47/// [`Sink`]: futures_util::Sink
48/// [`Stream::Item`]: futures_util::Stream::Item
49pub type SymmetricFramed<R, W, C, F, T, B = Vec<u8>> = Framed<R, W, C, F, T, T, B>;
50
51impl<R, W, C, F, In, Out, B> Framed<R, W, C, F, In, Out, B> {
52 /// Change the reader of the `Framed` object.
53 pub fn with_reader<Io>(self, reader: Io) -> Framed<Io, W, C, F, In, Out, B> {
54 Framed {
55 read_state: self.read_state.with_io(reader),
56 write_state: self.write_state,
57 codec: self.codec,
58 framer: self.framer,
59 types: PhantomData,
60 }
61 }
62
63 /// Change the writer of the `Framed` object.
64 pub fn with_writer<Io>(self, writer: Io) -> Framed<R, Io, C, F, In, Out, B> {
65 Framed {
66 read_state: self.read_state,
67 write_state: self.write_state.with_io(writer),
68 codec: self.codec,
69 framer: self.framer,
70 types: PhantomData,
71 }
72 }
73
74 /// Change the codec of the `Framed` object.
75 ///
76 /// This is useful when you have a duplex I/O type, e.g., a
77 /// `compio::net::TcpStream` or `compio::fs::File`, and you want
78 /// [`Framed`] to implement both [`Sink`](futures_util::Sink) and
79 /// [`Stream`](futures_util::Stream).
80 ///
81 /// Some types like the ones mentioned above are multiplexed by nature, so
82 /// they implement the [`Splittable`] trait by themselves. For other types,
83 /// you may want to wrap them in [`Split`] first, which uses lock or
84 /// `RefCell` under the hood.
85 ///
86 /// [`Split`]: crate::util::split::Split
87 pub fn with_duplex<Io: Splittable>(
88 self,
89 io: Io,
90 ) -> Framed<Io::ReadHalf, Io::WriteHalf, C, F, In, Out, B> {
91 let (read_half, write_half) = io.split();
92
93 Framed {
94 read_state: self.read_state.with_io(read_half),
95 write_state: self.write_state.with_io(write_half),
96 codec: self.codec,
97 framer: self.framer,
98 types: PhantomData,
99 }
100 }
101
102 /// Change both the read and write buffers of the `Framed` object.
103 ///
104 /// This is useful when you want to provide custom buffers for reading and
105 /// writing.
106 pub fn with_buffer<Buf: IoBufMut>(
107 self,
108 read_buffer: Buf,
109 write_buffer: Buf,
110 ) -> Framed<R, W, C, F, In, Out, Buf> {
111 Framed {
112 read_state: self.read_state.with_buf(read_buffer),
113 write_state: self.write_state.with_buf(write_buffer),
114 codec: self.codec,
115 framer: self.framer,
116 types: PhantomData,
117 }
118 }
119}
120
121impl<C, F> Framed<(), (), C, F, (), (), ()> {
122 /// Creates a new `Framed` with the given I/O object, codec, framer and a
123 /// different input and output type.
124 pub fn new<In, Out>(codec: C, framer: F) -> Framed<(), (), C, F, In, Out> {
125 Framed {
126 read_state: read::State::empty(),
127 write_state: write::State::empty(),
128 codec,
129 framer,
130 types: PhantomData,
131 }
132 }
133
134 /// Creates a new `Framed` with the given I/O object, codec, and framer with
135 /// the same input and output type.
136 pub fn symmetric<T>(codec: C, framer: F) -> Framed<(), (), C, F, T, T> {
137 Framed {
138 read_state: read::State::empty(),
139 write_state: write::State::empty(),
140 codec,
141 framer,
142 types: PhantomData,
143 }
144 }
145}
146
147/// [`Framed`] that bridges [`AsyncRead`]/[`AsyncWrite`] with [`Bytes`].
148///
149/// This is useful when you want to read/write raw bytes into/from [`Bytes`]
150/// without any additional framing or de/encoding.
151///
152/// See also: [`ReaderStream`] and [`ReaderStream`].
153///
154/// [`Bytes`]: compio_buf::bytes::Bytes
155/// [`AsyncWrite`]: crate::AsyncWrite
156/// [`ReaderStream`]: https://docs.rs/tokio-util/latest/tokio_util/io/struct.ReaderStream.html
157/// [`StreamReader`]: https://docs.rs/tokio-util/latest/tokio_util/io/struct.StreamReader.html
158#[cfg(feature = "bytes")]
159pub type BytesFramed<R, W> = Framed<
160 R,
161 W,
162 codec::bytes::BytesCodec,
163 frame::NoopFramer,
164 compio_buf::bytes::Bytes,
165 compio_buf::bytes::Bytes,
166>;
167
168#[cfg(feature = "bytes")]
169impl BytesFramed<(), ()> {
170 /// Creates a new [`BytesFramed`] that bridges [`AsyncRead`]/[`AsyncWrite`]
171 /// with [`Bytes`].
172 ///
173 /// See also: [`ReaderStream`] and [`StreamReader`].
174 ///
175 /// [`Bytes`]: compio_buf::bytes::Bytes
176 /// [`AsyncWrite`]: crate::AsyncWrite
177 /// [`ReaderStream`]: https://docs.rs/tokio-util/latest/tokio_util/io/struct.ReaderStream.html
178 /// [`StreamReader`]: https://docs.rs/tokio-util/latest/tokio_util/io/struct.StreamReader.html
179 pub fn new_bytes() -> Self {
180 Framed {
181 read_state: read::State::empty(),
182 write_state: write::State::empty(),
183 codec: codec::bytes::BytesCodec::new(),
184 framer: frame::NoopFramer::new(),
185 types: PhantomData,
186 }
187 }
188}