1use std::{
2 io::{self, IsTerminal, Read, Write},
3 os::windows::io::{AsRawHandle, BorrowedHandle, RawHandle},
4 sync::OnceLock,
5 task::Poll,
6};
7
8use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut};
9use compio_driver::{
10 AsFd, AsRawFd, BorrowedFd, BufferRef, OpCode, OpType, RawFd, ResultTakeBuffer, SharedFd,
11 op::{BufResultExt, Read as OpRead, ReadManaged, Write as OpWrite},
12};
13use compio_io::{AsyncRead, AsyncReadManaged, AsyncReadMulti, AsyncWrite};
14use compio_runtime::Runtime;
15use futures_util::{Stream, StreamExt};
16use windows_sys::Win32::System::IO::OVERLAPPED;
17
18#[cfg(doc)]
19use super::{stderr, stdin, stdout};
20
21struct StdRead<R: Read, B: IoBufMut> {
22 reader: R,
23 buffer: B,
24}
25
26impl<R: Read, B: IoBufMut> StdRead<R, B> {
27 pub fn new(reader: R, buffer: B) -> Self {
28 Self { reader, buffer }
29 }
30}
31
32unsafe impl<R: Read, B: IoBufMut> OpCode for StdRead<R, B> {
33 type Control = ();
34
35 unsafe fn init(&mut self, _: &mut Self::Control) {}
36
37 fn op_type(&self, _: &Self::Control) -> OpType {
38 OpType::Blocking
39 }
40
41 unsafe fn operate(
42 &mut self,
43 _: &mut Self::Control,
44 _optr: *mut OVERLAPPED,
45 ) -> Poll<io::Result<usize>> {
46 #[cfg(feature = "read_buf")]
47 {
48 let slice = self.buffer.as_uninit();
49 let mut buf = io::BorrowedBuf::from(slice);
50 let mut cursor = buf.unfilled();
51 self.reader.read_buf(cursor.reborrow())?;
52 Poll::Ready(Ok(cursor.written()))
53 }
54 #[cfg(not(feature = "read_buf"))]
55 {
56 let slice = self.buffer.ensure_init();
57 self.reader.read(slice).into()
58 }
59 }
60}
61
62impl<R: Read, B: IoBufMut> IntoInner for StdRead<R, B> {
63 type Inner = B;
64
65 fn into_inner(self) -> Self::Inner {
66 self.buffer
67 }
68}
69
70struct StdWrite<W: Write, B: IoBuf> {
71 writer: W,
72 buffer: B,
73}
74
75impl<W: Write, B: IoBuf> StdWrite<W, B> {
76 pub fn new(writer: W, buffer: B) -> Self {
77 Self { writer, buffer }
78 }
79}
80
81unsafe impl<W: Write, B: IoBuf> OpCode for StdWrite<W, B> {
82 type Control = ();
83
84 unsafe fn init(&mut self, _: &mut Self::Control) {}
85
86 fn op_type(&self, _: &Self::Control) -> OpType {
87 OpType::Blocking
88 }
89
90 unsafe fn operate(
91 &mut self,
92 _: &mut Self::Control,
93 _optr: *mut OVERLAPPED,
94 ) -> Poll<io::Result<usize>> {
95 let slice = self.buffer.as_init();
96 self.writer.write(slice).into()
97 }
98}
99
100impl<W: Write, B: IoBuf> IntoInner for StdWrite<W, B> {
101 type Inner = B;
102
103 fn into_inner(self) -> Self::Inner {
104 self.buffer
105 }
106}
107
108#[derive(Debug)]
109struct StaticFd(RawHandle);
110
111impl AsFd for StaticFd {
112 fn as_fd(&self) -> BorrowedFd<'_> {
113 BorrowedFd::File(unsafe { BorrowedHandle::borrow_raw(self.0) })
115 }
116}
117
118impl AsRawFd for StaticFd {
119 fn as_raw_fd(&self) -> RawFd {
120 self.0 as _
121 }
122}
123
124static STDIN_ISATTY: OnceLock<bool> = OnceLock::new();
125
126#[derive(Debug, Clone)]
130pub struct Stdin {
131 fd: SharedFd<StaticFd>,
132 isatty: bool,
133}
134
135impl Stdin {
136 pub(crate) fn new() -> Self {
137 let stdin = io::stdin();
138 let isatty = *STDIN_ISATTY.get_or_init(|| {
139 stdin.is_terminal()
140 || Runtime::with_current(|r| r.attach(stdin.as_raw_handle() as _)).is_err()
141 });
142 Self {
143 fd: SharedFd::new(StaticFd(stdin.as_raw_handle())),
144 isatty,
145 }
146 }
147}
148
149impl AsyncRead for Stdin {
150 async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
151 let res = if self.isatty {
152 let op = StdRead::new(io::stdin(), buf);
153 compio_runtime::submit(op).await.into_inner()
154 } else {
155 let op = OpRead::new(self.fd.clone(), buf);
156 compio_runtime::submit(op).await.into_inner()
157 };
158 unsafe { res.map_advanced() }
159 }
160}
161
162impl AsyncReadManaged for Stdin {
163 type Buffer = BufferRef;
164
165 async fn read_managed(&mut self, len: usize) -> io::Result<Option<Self::Buffer>> {
166 (&*self).read_managed(len).await
167 }
168}
169
170impl AsyncReadManaged for &Stdin {
171 type Buffer = BufferRef;
172
173 async fn read_managed(&mut self, len: usize) -> io::Result<Option<Self::Buffer>> {
174 let runtime = Runtime::current();
175 let buffer_pool = runtime.buffer_pool()?;
176 if self.isatty {
177 let buf = buffer_pool.pop()?;
178 let op = StdRead::new(io::stdin(), buf);
179 unsafe { compio_runtime::submit(op).await.take_buffer() }
180 } else {
181 let op = ReadManaged::new(self.fd.clone(), &buffer_pool, len)?;
182 unsafe { compio_runtime::submit(op).await.take_buffer() }
183 }
184 }
185}
186
187impl AsyncReadMulti for Stdin {
188 fn read_multi(&mut self, len: usize) -> impl Stream<Item = io::Result<Self::Buffer>> {
189 futures_util::stream::once(self.read_managed(len))
190 .filter_map(|res| std::future::ready(res.transpose()))
191 }
192}
193
194impl AsyncReadMulti for &Stdin {
195 fn read_multi(&mut self, len: usize) -> impl Stream<Item = io::Result<Self::Buffer>> {
196 futures_util::stream::once(self.read_managed(len))
197 .filter_map(|res| std::future::ready(res.transpose()))
198 }
199}
200
201impl AsRawFd for Stdin {
202 fn as_raw_fd(&self) -> RawFd {
203 self.fd.as_raw_fd()
204 }
205}
206
207static STDOUT_ISATTY: OnceLock<bool> = OnceLock::new();
208
209#[derive(Debug, Clone)]
213pub struct Stdout {
214 fd: SharedFd<StaticFd>,
215 isatty: bool,
216}
217
218impl Stdout {
219 pub(crate) fn new() -> Self {
220 let stdout = io::stdout();
221 let isatty = *STDOUT_ISATTY.get_or_init(|| {
222 stdout.is_terminal()
223 || Runtime::with_current(|r| r.attach(stdout.as_raw_handle() as _)).is_err()
224 });
225 Self {
226 fd: SharedFd::new(StaticFd(stdout.as_raw_handle())),
227 isatty,
228 }
229 }
230}
231
232impl AsyncWrite for Stdout {
233 async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
234 if self.isatty {
235 let op = StdWrite::new(io::stdout(), buf);
236 compio_runtime::submit(op).await.into_inner()
237 } else {
238 let op = OpWrite::new(self.fd.clone(), buf);
239 compio_runtime::submit(op).await.into_inner()
240 }
241 }
242
243 async fn flush(&mut self) -> io::Result<()> {
244 Ok(())
245 }
246
247 async fn shutdown(&mut self) -> io::Result<()> {
248 self.flush().await
249 }
250}
251
252impl AsRawFd for Stdout {
253 fn as_raw_fd(&self) -> RawFd {
254 self.fd.as_raw_fd()
255 }
256}
257
258static STDERR_ISATTY: OnceLock<bool> = OnceLock::new();
259
260#[derive(Debug, Clone)]
264pub struct Stderr {
265 fd: SharedFd<StaticFd>,
266 isatty: bool,
267}
268
269impl Stderr {
270 pub(crate) fn new() -> Self {
271 let stderr = io::stderr();
272 let isatty = *STDERR_ISATTY.get_or_init(|| {
273 stderr.is_terminal()
274 || Runtime::with_current(|r| r.attach(stderr.as_raw_handle() as _)).is_err()
275 });
276 Self {
277 fd: SharedFd::new(StaticFd(stderr.as_raw_handle())),
278 isatty,
279 }
280 }
281}
282
283impl AsyncWrite for Stderr {
284 async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
285 if self.isatty {
286 let op = StdWrite::new(io::stderr(), buf);
287 compio_runtime::submit(op).await.into_inner()
288 } else {
289 let op = OpWrite::new(self.fd.clone(), buf);
290 compio_runtime::submit(op).await.into_inner()
291 }
292 }
293
294 async fn flush(&mut self) -> io::Result<()> {
295 Ok(())
296 }
297
298 async fn shutdown(&mut self) -> io::Result<()> {
299 self.flush().await
300 }
301}
302
303impl AsRawFd for Stderr {
304 fn as_raw_fd(&self) -> RawFd {
305 self.fd.as_raw_fd()
306 }
307}