1use std::fmt::Debug;
4
5use compio_buf::{BufResult, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
6
7use crate::{AsyncRead, AsyncReadAt, AsyncWrite, AsyncWriteAt, IoResult, sync::bilock::BiLock};
8
9pub fn split<T: AsyncRead + AsyncWrite>(stream: T) -> (ReadHalf<T>, WriteHalf<T>) {
13 Split::new(stream).split()
14}
15
16pub trait Splittable {
36 type ReadHalf;
39
40 type WriteHalf;
43
44 fn split(self) -> (Self::ReadHalf, Self::WriteHalf);
51}
52
53impl<R, W> Splittable for (R, W) {
54 type ReadHalf = R;
55 type WriteHalf = W;
56
57 fn split(self) -> (Self::ReadHalf, Self::WriteHalf) {
58 self
59 }
60}
61
62#[derive(Debug)]
64pub struct Split<T>(BiLock<T>, BiLock<T>);
65
66impl<T> Split<T> {
67 pub fn new(stream: T) -> Self {
69 let (l, r) = BiLock::new(stream);
70 Split(l, r)
71 }
72}
73
74impl<T: AsyncRead + AsyncWrite> Splittable for Split<T> {
75 type ReadHalf = ReadHalf<T>;
76 type WriteHalf = WriteHalf<T>;
77
78 fn split(self) -> (Self::ReadHalf, Self::WriteHalf) {
79 (ReadHalf(self.0), WriteHalf(self.1))
80 }
81}
82
83#[derive(Debug)]
85pub struct ReadHalf<T>(BiLock<T>);
86
87impl<T: Unpin> ReadHalf<T> {
88 #[track_caller]
97 pub fn unsplit(self, w: WriteHalf<T>) -> T {
98 self.0.try_join(w.0).expect("Not the same pair")
99 }
100
101 #[track_caller]
103 pub fn try_unsplit(self, w: WriteHalf<T>) -> Option<T> {
104 self.0.try_join(w.0)
105 }
106}
107
108impl<T: AsyncRead> AsyncRead for ReadHalf<T> {
109 async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
110 self.0.lock().await.read(buf).await
111 }
112
113 async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
114 self.0.lock().await.read_vectored(buf).await
115 }
116}
117
118impl<T: AsyncReadAt> AsyncReadAt for ReadHalf<T> {
119 async fn read_at<B: IoBufMut>(&self, buf: B, pos: u64) -> BufResult<usize, B> {
120 self.0.lock().await.read_at(buf, pos).await
121 }
122}
123
124#[derive(Debug)]
126pub struct WriteHalf<T>(BiLock<T>);
127
128impl<T: AsyncWrite> AsyncWrite for WriteHalf<T> {
129 async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B> {
130 self.0.lock().await.write(buf).await
131 }
132
133 async fn write_vectored<B: IoVectoredBuf>(&mut self, buf: B) -> BufResult<usize, B> {
134 self.0.lock().await.write_vectored(buf).await
135 }
136
137 async fn flush(&mut self) -> IoResult<()> {
138 self.0.lock().await.flush().await
139 }
140
141 async fn shutdown(&mut self) -> IoResult<()> {
142 self.0.lock().await.shutdown().await
143 }
144}
145
146impl<T: AsyncWriteAt> AsyncWriteAt for WriteHalf<T> {
147 async fn write_at<B: IoBuf>(&mut self, buf: B, pos: u64) -> BufResult<usize, B> {
148 self.0.lock().await.write_at(buf, pos).await
149 }
150
151 async fn write_vectored_at<B: IoVectoredBuf>(
152 &mut self,
153 buf: B,
154 pos: u64,
155 ) -> BufResult<usize, B> {
156 self.0.lock().await.write_vectored_at(buf, pos).await
157 }
158}