1use std::{
2 io::{self, IsTerminal, Read, Write},
3 os::windows::io::{AsRawHandle, BorrowedHandle, RawHandle},
4 pin::Pin,
5 sync::OnceLock,
6 task::Poll,
7};
8
9use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut};
10use compio_driver::{
11 AsFd, AsRawFd, BorrowedFd, OpCode, OpType, RawFd, SharedFd,
12 op::{BufResultExt, Read as OpRead, ReadManaged, ResultTakeBuffer, Write as OpWrite},
13};
14use compio_io::{AsyncRead, AsyncReadManaged, AsyncWrite};
15use compio_runtime::{BorrowedBuffer, BufferPool, Runtime};
16use windows_sys::Win32::System::IO::OVERLAPPED;
17
18#[cfg(doc)]
19use super::{stderr, stdin, stdout};
20
21struct StdRead<R: Read, B: IoBufMut> {
22 reader: R,
23 buffer: B,
24}
25
26impl<R: Read, B: IoBufMut> StdRead<R, B> {
27 pub fn new(reader: R, buffer: B) -> Self {
28 Self { reader, buffer }
29 }
30}
31
32impl<R: Read, B: IoBufMut> OpCode for StdRead<R, B> {
33 fn op_type(&self) -> OpType {
34 OpType::Blocking
35 }
36
37 unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
38 let this = unsafe { self.get_unchecked_mut() };
39 let slice = this.buffer.as_uninit();
40 #[cfg(feature = "read_buf")]
41 {
42 let mut buf = io::BorrowedBuf::from(slice);
43 let mut cursor = buf.unfilled();
44 this.reader.read_buf(cursor.reborrow())?;
45 Poll::Ready(Ok(cursor.written()))
46 }
47 #[cfg(not(feature = "read_buf"))]
48 {
49 use std::mem::MaybeUninit;
50
51 slice.fill(MaybeUninit::new(0));
52 this.reader
53 .read(unsafe {
54 std::slice::from_raw_parts_mut(
55 this.buffer.buf_mut_ptr() as _,
56 this.buffer.buf_capacity(),
57 )
58 })
59 .into()
60 }
61 }
62}
63
64impl<R: Read, B: IoBufMut> IntoInner for StdRead<R, B> {
65 type Inner = B;
66
67 fn into_inner(self) -> Self::Inner {
68 self.buffer
69 }
70}
71
72struct StdWrite<W: Write, B: IoBuf> {
73 writer: W,
74 buffer: B,
75}
76
77impl<W: Write, B: IoBuf> StdWrite<W, B> {
78 pub fn new(writer: W, buffer: B) -> Self {
79 Self { writer, buffer }
80 }
81}
82
83impl<W: Write, B: IoBuf> OpCode for StdWrite<W, B> {
84 fn op_type(&self) -> OpType {
85 OpType::Blocking
86 }
87
88 unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
89 let this = unsafe { self.get_unchecked_mut() };
90 let slice = this.buffer.as_init();
91 this.writer.write(slice).into()
92 }
93}
94
95impl<W: Write, B: IoBuf> IntoInner for StdWrite<W, B> {
96 type Inner = B;
97
98 fn into_inner(self) -> Self::Inner {
99 self.buffer
100 }
101}
102
103#[derive(Debug)]
104struct StaticFd(RawHandle);
105
106impl AsFd for StaticFd {
107 fn as_fd(&self) -> BorrowedFd<'_> {
108 BorrowedFd::File(unsafe { BorrowedHandle::borrow_raw(self.0) })
110 }
111}
112
113impl AsRawFd for StaticFd {
114 fn as_raw_fd(&self) -> RawFd {
115 self.0 as _
116 }
117}
118
119static STDIN_ISATTY: OnceLock<bool> = OnceLock::new();
120
121#[derive(Debug, Clone)]
125pub struct Stdin {
126 fd: SharedFd<StaticFd>,
127 isatty: bool,
128}
129
130impl Stdin {
131 pub(crate) fn new() -> Self {
132 let stdin = io::stdin();
133 let isatty = *STDIN_ISATTY.get_or_init(|| {
134 stdin.is_terminal()
135 || Runtime::with_current(|r| r.attach(stdin.as_raw_handle() as _)).is_err()
136 });
137 Self {
138 fd: SharedFd::new(StaticFd(stdin.as_raw_handle())),
139 isatty,
140 }
141 }
142}
143
144impl AsyncRead for Stdin {
145 async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
146 let res = if self.isatty {
147 let op = StdRead::new(io::stdin(), buf);
148 compio_runtime::submit(op).await.into_inner()
149 } else {
150 let op = OpRead::new(self.fd.clone(), buf);
151 compio_runtime::submit(op).await.into_inner()
152 };
153 unsafe { res.map_advanced() }
154 }
155}
156
157impl AsyncReadManaged for Stdin {
158 type Buffer<'a> = BorrowedBuffer<'a>;
159 type BufferPool = BufferPool;
160
161 async fn read_managed<'a>(
162 &mut self,
163 buffer_pool: &'a Self::BufferPool,
164 len: usize,
165 ) -> io::Result<Self::Buffer<'a>> {
166 (&*self).read_managed(buffer_pool, len).await
167 }
168}
169
170impl AsyncReadManaged for &Stdin {
171 type Buffer<'a> = BorrowedBuffer<'a>;
172 type BufferPool = BufferPool;
173
174 async fn read_managed<'a>(
175 &mut self,
176 buffer_pool: &'a Self::BufferPool,
177 len: usize,
178 ) -> io::Result<Self::Buffer<'a>> {
179 let buffer_pool = buffer_pool.try_inner()?;
180 if self.isatty {
181 let buf = buffer_pool.get_buffer(len)?;
182 let op = StdRead::new(io::stdin(), buf);
183 let BufResult(res, buf) = compio_runtime::submit(op).await.into_inner();
184 let res = unsafe { buffer_pool.create_proxy(buf, res?) };
185 Ok(res)
186 } else {
187 let op = ReadManaged::new(self.fd.clone(), buffer_pool, len)?;
188 compio_runtime::submit(op)
189 .with_extra()
190 .await
191 .take_buffer(buffer_pool)
192 }
193 }
194}
195
196impl AsRawFd for Stdin {
197 fn as_raw_fd(&self) -> RawFd {
198 self.fd.as_raw_fd()
199 }
200}
201
202static STDOUT_ISATTY: OnceLock<bool> = OnceLock::new();
203
204#[derive(Debug, Clone)]
208pub struct Stdout {
209 fd: SharedFd<StaticFd>,
210 isatty: bool,
211}
212
213impl Stdout {
214 pub(crate) fn new() -> Self {
215 let stdout = io::stdout();
216 let isatty = *STDOUT_ISATTY.get_or_init(|| {
217 stdout.is_terminal()
218 || Runtime::with_current(|r| r.attach(stdout.as_raw_handle() as _)).is_err()
219 });
220 Self {
221 fd: SharedFd::new(StaticFd(stdout.as_raw_handle())),
222 isatty,
223 }
224 }
225}
226
227impl AsyncWrite for Stdout {
228 async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
229 if self.isatty {
230 let op = StdWrite::new(io::stdout(), buf);
231 compio_runtime::submit(op).await.into_inner()
232 } else {
233 let op = OpWrite::new(self.fd.clone(), buf);
234 compio_runtime::submit(op).await.into_inner()
235 }
236 }
237
238 async fn flush(&mut self) -> io::Result<()> {
239 Ok(())
240 }
241
242 async fn shutdown(&mut self) -> io::Result<()> {
243 self.flush().await
244 }
245}
246
247impl AsRawFd for Stdout {
248 fn as_raw_fd(&self) -> RawFd {
249 self.fd.as_raw_fd()
250 }
251}
252
253static STDERR_ISATTY: OnceLock<bool> = OnceLock::new();
254
255#[derive(Debug, Clone)]
259pub struct Stderr {
260 fd: SharedFd<StaticFd>,
261 isatty: bool,
262}
263
264impl Stderr {
265 pub(crate) fn new() -> Self {
266 let stderr = io::stderr();
267 let isatty = *STDERR_ISATTY.get_or_init(|| {
268 stderr.is_terminal()
269 || Runtime::with_current(|r| r.attach(stderr.as_raw_handle() as _)).is_err()
270 });
271 Self {
272 fd: SharedFd::new(StaticFd(stderr.as_raw_handle())),
273 isatty,
274 }
275 }
276}
277
278impl AsyncWrite for Stderr {
279 async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
280 if self.isatty {
281 let op = StdWrite::new(io::stderr(), buf);
282 compio_runtime::submit(op).await.into_inner()
283 } else {
284 let op = OpWrite::new(self.fd.clone(), buf);
285 compio_runtime::submit(op).await.into_inner()
286 }
287 }
288
289 async fn flush(&mut self) -> io::Result<()> {
290 Ok(())
291 }
292
293 async fn shutdown(&mut self) -> io::Result<()> {
294 self.flush().await
295 }
296}
297
298impl AsRawFd for Stderr {
299 fn as_raw_fd(&self) -> RawFd {
300 self.fd.as_raw_fd()
301 }
302}