compio_io\util/
split.rs

1//! Functionality to split an I/O type into separate read and write halves.
2
3use std::fmt::Debug;
4
5use compio_buf::{BufResult, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
6
7use crate::{AsyncRead, AsyncReadAt, AsyncWrite, AsyncWriteAt, IoResult, sync::bilock::BiLock};
8
9/// Splits a single value implementing `AsyncRead + AsyncWrite` into separate
10/// [`AsyncRead`] and [`AsyncWrite`] handles with or without internal
11/// synchronization dependes on whether `sync` feature is turned on.
12pub fn split<T: AsyncRead + AsyncWrite>(stream: T) -> (ReadHalf<T>, WriteHalf<T>) {
13    Split::new(stream).split()
14}
15
16/// A trait for types that can be split into separate read and write halves.
17///
18/// This trait enables an I/O type to be divided into two separate components:
19/// one for reading and one for writing. This is particularly useful in async
20/// contexts where you might want to perform concurrent read and write
21/// operations from different tasks.
22///
23/// # Implementor
24///
25/// - Any `(R, W)` tuple implements this trait.
26/// - `TcpStream`, `UnixStream` and references to them in `compio::net`
27///   implement this trait without any lock thanks to the underlying sockets'
28///   duplex nature.
29/// - `File` and named pipes in `compio::fs` implement this trait with
30///   [`ReadHalf`] and [`WriteHalf`] being the file itself since it's
31///   reference-counted under the hood.
32/// - For other type to be compatible with this trait, it must be wrapped with
33///   [`Split`], which wrap the type in a unsynced or synced lock depends on
34///   whether `sync` feature is turned on.
35pub trait Splittable {
36    /// The type of the read half, which normally implements [`AsyncRead`] or
37    /// [`AsyncReadAt`].
38    type ReadHalf;
39
40    /// The type of the write half, which normally implements [`AsyncWrite`] or
41    /// [`AsyncWriteAt`].
42    type WriteHalf;
43
44    /// Consumes `self` and returns a tuple containing separate read and write
45    /// halves.
46    ///
47    /// The returned halves can be used independently to perform read and write
48    /// operations respectively, potentially from different tasks
49    /// concurrently.
50    fn split(self) -> (Self::ReadHalf, Self::WriteHalf);
51}
52
53impl<R, W> Splittable for (R, W) {
54    type ReadHalf = R;
55    type WriteHalf = W;
56
57    fn split(self) -> (Self::ReadHalf, Self::WriteHalf) {
58        self
59    }
60}
61
62/// Splitting an I/O type into separate read and write halves
63#[derive(Debug)]
64pub struct Split<T>(BiLock<T>, BiLock<T>);
65
66impl<T> Split<T> {
67    /// Creates a new `Split` from the given stream.
68    pub fn new(stream: T) -> Self {
69        let (l, r) = BiLock::new(stream);
70        Split(l, r)
71    }
72}
73
74impl<T: AsyncRead + AsyncWrite> Splittable for Split<T> {
75    type ReadHalf = ReadHalf<T>;
76    type WriteHalf = WriteHalf<T>;
77
78    fn split(self) -> (Self::ReadHalf, Self::WriteHalf) {
79        (ReadHalf(self.0), WriteHalf(self.1))
80    }
81}
82
83/// The readable half of a value returned from [`split`](super::split).
84#[derive(Debug)]
85pub struct ReadHalf<T>(BiLock<T>);
86
87impl<T: Unpin> ReadHalf<T> {
88    /// Reunites with a previously split [`WriteHalf`].
89    ///
90    /// # Panics
91    ///
92    /// If this [`ReadHalf`] and the given [`WriteHalf`] do not originate from
93    /// the same [`split`](super::split) operation this method will panic.
94    /// This can be checked ahead of time by comparing the stored pointer
95    /// of the two halves.
96    #[track_caller]
97    pub fn unsplit(self, w: WriteHalf<T>) -> T {
98        self.0.try_join(w.0).expect("Not the same pair")
99    }
100
101    /// Try to reunites with a previously split [`WriteHalf`].
102    #[track_caller]
103    pub fn try_unsplit(self, w: WriteHalf<T>) -> Option<T> {
104        self.0.try_join(w.0)
105    }
106}
107
108impl<T: AsyncRead> AsyncRead for ReadHalf<T> {
109    async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
110        self.0.lock().await.read(buf).await
111    }
112
113    async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
114        self.0.lock().await.read_vectored(buf).await
115    }
116}
117
118impl<T: AsyncReadAt> AsyncReadAt for ReadHalf<T> {
119    async fn read_at<B: IoBufMut>(&self, buf: B, pos: u64) -> BufResult<usize, B> {
120        self.0.lock().await.read_at(buf, pos).await
121    }
122}
123
124/// The writable half of a value returned from [`split`](super::split).
125#[derive(Debug)]
126pub struct WriteHalf<T>(BiLock<T>);
127
128impl<T: AsyncWrite> AsyncWrite for WriteHalf<T> {
129    async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B> {
130        self.0.lock().await.write(buf).await
131    }
132
133    async fn write_vectored<B: IoVectoredBuf>(&mut self, buf: B) -> BufResult<usize, B> {
134        self.0.lock().await.write_vectored(buf).await
135    }
136
137    async fn flush(&mut self) -> IoResult<()> {
138        self.0.lock().await.flush().await
139    }
140
141    async fn shutdown(&mut self) -> IoResult<()> {
142        self.0.lock().await.shutdown().await
143    }
144}
145
146impl<T: AsyncWriteAt> AsyncWriteAt for WriteHalf<T> {
147    async fn write_at<B: IoBuf>(&mut self, buf: B, pos: u64) -> BufResult<usize, B> {
148        self.0.lock().await.write_at(buf, pos).await
149    }
150
151    async fn write_vectored_at<B: IoVectoredBuf>(
152        &mut self,
153        buf: B,
154        pos: u64,
155    ) -> BufResult<usize, B> {
156        self.0.lock().await.write_vectored_at(buf, pos).await
157    }
158}