Skip to main content

compio_fs\stdio/
windows.rs

1use std::{
2    io::{self, IsTerminal, Read, Write},
3    os::windows::io::{AsRawHandle, BorrowedHandle, RawHandle},
4    sync::OnceLock,
5    task::Poll,
6};
7
8use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut};
9use compio_driver::{
10    AsFd, AsRawFd, BorrowedFd, BufferRef, OpCode, OpType, RawFd, ResultTakeBuffer, SharedFd,
11    op::{BufResultExt, Read as OpRead, ReadManaged, Write as OpWrite},
12};
13use compio_io::{AsyncRead, AsyncReadManaged, AsyncReadMulti, AsyncWrite};
14use compio_runtime::Runtime;
15use futures_util::{Stream, StreamExt};
16use windows_sys::Win32::System::IO::OVERLAPPED;
17
18#[cfg(doc)]
19use super::{stderr, stdin, stdout};
20
21struct StdRead<R: Read, B: IoBufMut> {
22    reader: R,
23    buffer: B,
24}
25
26impl<R: Read, B: IoBufMut> StdRead<R, B> {
27    pub fn new(reader: R, buffer: B) -> Self {
28        Self { reader, buffer }
29    }
30}
31
32unsafe impl<R: Read, B: IoBufMut> OpCode for StdRead<R, B> {
33    type Control = ();
34
35    unsafe fn init(&mut self, _: &mut Self::Control) {}
36
37    fn op_type(&self, _: &Self::Control) -> OpType {
38        OpType::Blocking
39    }
40
41    unsafe fn operate(
42        &mut self,
43        _: &mut Self::Control,
44        _optr: *mut OVERLAPPED,
45    ) -> Poll<io::Result<usize>> {
46        #[cfg(feature = "read_buf")]
47        {
48            let slice = self.buffer.as_uninit();
49            let mut buf = io::BorrowedBuf::from(slice);
50            let mut cursor = buf.unfilled();
51            self.reader.read_buf(cursor.reborrow())?;
52            Poll::Ready(Ok(cursor.written()))
53        }
54        #[cfg(not(feature = "read_buf"))]
55        {
56            let slice = self.buffer.ensure_init();
57            self.reader.read(slice).into()
58        }
59    }
60}
61
62impl<R: Read, B: IoBufMut> IntoInner for StdRead<R, B> {
63    type Inner = B;
64
65    fn into_inner(self) -> Self::Inner {
66        self.buffer
67    }
68}
69
70struct StdWrite<W: Write, B: IoBuf> {
71    writer: W,
72    buffer: B,
73}
74
75impl<W: Write, B: IoBuf> StdWrite<W, B> {
76    pub fn new(writer: W, buffer: B) -> Self {
77        Self { writer, buffer }
78    }
79}
80
81unsafe impl<W: Write, B: IoBuf> OpCode for StdWrite<W, B> {
82    type Control = ();
83
84    unsafe fn init(&mut self, _: &mut Self::Control) {}
85
86    fn op_type(&self, _: &Self::Control) -> OpType {
87        OpType::Blocking
88    }
89
90    unsafe fn operate(
91        &mut self,
92        _: &mut Self::Control,
93        _optr: *mut OVERLAPPED,
94    ) -> Poll<io::Result<usize>> {
95        let slice = self.buffer.as_init();
96        self.writer.write(slice).into()
97    }
98}
99
100impl<W: Write, B: IoBuf> IntoInner for StdWrite<W, B> {
101    type Inner = B;
102
103    fn into_inner(self) -> Self::Inner {
104        self.buffer
105    }
106}
107
108#[derive(Debug)]
109struct StaticFd(RawHandle);
110
111impl AsFd for StaticFd {
112    fn as_fd(&self) -> BorrowedFd<'_> {
113        // SAFETY: we only use it for console handles.
114        BorrowedFd::File(unsafe { BorrowedHandle::borrow_raw(self.0) })
115    }
116}
117
118impl AsRawFd for StaticFd {
119    fn as_raw_fd(&self) -> RawFd {
120        self.0 as _
121    }
122}
123
124static STDIN_ISATTY: OnceLock<bool> = OnceLock::new();
125
126/// A handle to the standard input stream of a process.
127///
128/// See [`stdin`].
129#[derive(Debug, Clone)]
130pub struct Stdin {
131    fd: SharedFd<StaticFd>,
132    isatty: bool,
133}
134
135impl Stdin {
136    pub(crate) fn new() -> Self {
137        let stdin = io::stdin();
138        let isatty = *STDIN_ISATTY.get_or_init(|| {
139            stdin.is_terminal()
140                || Runtime::with_current(|r| r.attach(stdin.as_raw_handle() as _)).is_err()
141        });
142        Self {
143            fd: SharedFd::new(StaticFd(stdin.as_raw_handle())),
144            isatty,
145        }
146    }
147}
148
149impl AsyncRead for Stdin {
150    async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
151        let res = if self.isatty {
152            let op = StdRead::new(io::stdin(), buf);
153            compio_runtime::submit(op).await.into_inner()
154        } else {
155            let op = OpRead::new(self.fd.clone(), buf);
156            compio_runtime::submit(op).await.into_inner()
157        };
158        unsafe { res.map_advanced() }
159    }
160}
161
162impl AsyncReadManaged for Stdin {
163    type Buffer = BufferRef;
164
165    async fn read_managed(&mut self, len: usize) -> io::Result<Option<Self::Buffer>> {
166        (&*self).read_managed(len).await
167    }
168}
169
170impl AsyncReadManaged for &Stdin {
171    type Buffer = BufferRef;
172
173    async fn read_managed(&mut self, len: usize) -> io::Result<Option<Self::Buffer>> {
174        let runtime = Runtime::current();
175        let buffer_pool = runtime.buffer_pool()?;
176        if self.isatty {
177            let buf = buffer_pool.pop()?;
178            let op = StdRead::new(io::stdin(), buf);
179            unsafe { compio_runtime::submit(op).await.take_buffer() }
180        } else {
181            let op = ReadManaged::new(self.fd.clone(), &buffer_pool, len)?;
182            unsafe { compio_runtime::submit(op).await.take_buffer() }
183        }
184    }
185}
186
187impl AsyncReadMulti for Stdin {
188    fn read_multi(&mut self, len: usize) -> impl Stream<Item = io::Result<Self::Buffer>> {
189        futures_util::stream::once(self.read_managed(len))
190            .filter_map(|res| std::future::ready(res.transpose()))
191    }
192}
193
194impl AsyncReadMulti for &Stdin {
195    fn read_multi(&mut self, len: usize) -> impl Stream<Item = io::Result<Self::Buffer>> {
196        futures_util::stream::once(self.read_managed(len))
197            .filter_map(|res| std::future::ready(res.transpose()))
198    }
199}
200
201impl AsRawFd for Stdin {
202    fn as_raw_fd(&self) -> RawFd {
203        self.fd.as_raw_fd()
204    }
205}
206
207static STDOUT_ISATTY: OnceLock<bool> = OnceLock::new();
208
209/// A handle to the standard output stream of a process.
210///
211/// See [`stdout`].
212#[derive(Debug, Clone)]
213pub struct Stdout {
214    fd: SharedFd<StaticFd>,
215    isatty: bool,
216}
217
218impl Stdout {
219    pub(crate) fn new() -> Self {
220        let stdout = io::stdout();
221        let isatty = *STDOUT_ISATTY.get_or_init(|| {
222            stdout.is_terminal()
223                || Runtime::with_current(|r| r.attach(stdout.as_raw_handle() as _)).is_err()
224        });
225        Self {
226            fd: SharedFd::new(StaticFd(stdout.as_raw_handle())),
227            isatty,
228        }
229    }
230}
231
232impl AsyncWrite for Stdout {
233    async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
234        if self.isatty {
235            let op = StdWrite::new(io::stdout(), buf);
236            compio_runtime::submit(op).await.into_inner()
237        } else {
238            let op = OpWrite::new(self.fd.clone(), buf);
239            compio_runtime::submit(op).await.into_inner()
240        }
241    }
242
243    async fn flush(&mut self) -> io::Result<()> {
244        Ok(())
245    }
246
247    async fn shutdown(&mut self) -> io::Result<()> {
248        self.flush().await
249    }
250}
251
252impl AsRawFd for Stdout {
253    fn as_raw_fd(&self) -> RawFd {
254        self.fd.as_raw_fd()
255    }
256}
257
258static STDERR_ISATTY: OnceLock<bool> = OnceLock::new();
259
260/// A handle to the standard output stream of a process.
261///
262/// See [`stderr`].
263#[derive(Debug, Clone)]
264pub struct Stderr {
265    fd: SharedFd<StaticFd>,
266    isatty: bool,
267}
268
269impl Stderr {
270    pub(crate) fn new() -> Self {
271        let stderr = io::stderr();
272        let isatty = *STDERR_ISATTY.get_or_init(|| {
273            stderr.is_terminal()
274                || Runtime::with_current(|r| r.attach(stderr.as_raw_handle() as _)).is_err()
275        });
276        Self {
277            fd: SharedFd::new(StaticFd(stderr.as_raw_handle())),
278            isatty,
279        }
280    }
281}
282
283impl AsyncWrite for Stderr {
284    async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
285        if self.isatty {
286            let op = StdWrite::new(io::stderr(), buf);
287            compio_runtime::submit(op).await.into_inner()
288        } else {
289            let op = OpWrite::new(self.fd.clone(), buf);
290            compio_runtime::submit(op).await.into_inner()
291        }
292    }
293
294    async fn flush(&mut self) -> io::Result<()> {
295        Ok(())
296    }
297
298    async fn shutdown(&mut self) -> io::Result<()> {
299        self.flush().await
300    }
301}
302
303impl AsRawFd for Stderr {
304    fn as_raw_fd(&self) -> RawFd {
305        self.fd.as_raw_fd()
306    }
307}