compio_fs\stdio/
windows.rs

1use std::{
2    io::{self, IsTerminal, Read, Write},
3    os::windows::io::{AsRawHandle, BorrowedHandle, RawHandle},
4    pin::Pin,
5    sync::OnceLock,
6    task::Poll,
7};
8
9use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut};
10use compio_driver::{
11    AsFd, AsRawFd, BorrowedFd, OpCode, OpType, RawFd, SharedFd,
12    op::{BufResultExt, Read as OpRead, ReadManaged, ResultTakeBuffer, Write as OpWrite},
13};
14use compio_io::{AsyncRead, AsyncReadManaged, AsyncWrite};
15use compio_runtime::{BorrowedBuffer, BufferPool, Runtime};
16use windows_sys::Win32::System::IO::OVERLAPPED;
17
18#[cfg(doc)]
19use super::{stderr, stdin, stdout};
20
21struct StdRead<R: Read, B: IoBufMut> {
22    reader: R,
23    buffer: B,
24}
25
26impl<R: Read, B: IoBufMut> StdRead<R, B> {
27    pub fn new(reader: R, buffer: B) -> Self {
28        Self { reader, buffer }
29    }
30}
31
32impl<R: Read, B: IoBufMut> OpCode for StdRead<R, B> {
33    fn op_type(&self) -> OpType {
34        OpType::Blocking
35    }
36
37    unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
38        let this = unsafe { self.get_unchecked_mut() };
39        let slice = this.buffer.as_uninit();
40        #[cfg(feature = "read_buf")]
41        {
42            let mut buf = io::BorrowedBuf::from(slice);
43            let mut cursor = buf.unfilled();
44            this.reader.read_buf(cursor.reborrow())?;
45            Poll::Ready(Ok(cursor.written()))
46        }
47        #[cfg(not(feature = "read_buf"))]
48        {
49            use std::mem::MaybeUninit;
50
51            slice.fill(MaybeUninit::new(0));
52            this.reader
53                .read(unsafe {
54                    std::slice::from_raw_parts_mut(
55                        this.buffer.buf_mut_ptr() as _,
56                        this.buffer.buf_capacity(),
57                    )
58                })
59                .into()
60        }
61    }
62}
63
64impl<R: Read, B: IoBufMut> IntoInner for StdRead<R, B> {
65    type Inner = B;
66
67    fn into_inner(self) -> Self::Inner {
68        self.buffer
69    }
70}
71
72struct StdWrite<W: Write, B: IoBuf> {
73    writer: W,
74    buffer: B,
75}
76
77impl<W: Write, B: IoBuf> StdWrite<W, B> {
78    pub fn new(writer: W, buffer: B) -> Self {
79        Self { writer, buffer }
80    }
81}
82
83impl<W: Write, B: IoBuf> OpCode for StdWrite<W, B> {
84    fn op_type(&self) -> OpType {
85        OpType::Blocking
86    }
87
88    unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
89        let this = unsafe { self.get_unchecked_mut() };
90        let slice = this.buffer.as_init();
91        this.writer.write(slice).into()
92    }
93}
94
95impl<W: Write, B: IoBuf> IntoInner for StdWrite<W, B> {
96    type Inner = B;
97
98    fn into_inner(self) -> Self::Inner {
99        self.buffer
100    }
101}
102
103#[derive(Debug)]
104struct StaticFd(RawHandle);
105
106impl AsFd for StaticFd {
107    fn as_fd(&self) -> BorrowedFd<'_> {
108        // SAFETY: we only use it for console handles.
109        BorrowedFd::File(unsafe { BorrowedHandle::borrow_raw(self.0) })
110    }
111}
112
113impl AsRawFd for StaticFd {
114    fn as_raw_fd(&self) -> RawFd {
115        self.0 as _
116    }
117}
118
119static STDIN_ISATTY: OnceLock<bool> = OnceLock::new();
120
121/// A handle to the standard input stream of a process.
122///
123/// See [`stdin`].
124#[derive(Debug, Clone)]
125pub struct Stdin {
126    fd: SharedFd<StaticFd>,
127    isatty: bool,
128}
129
130impl Stdin {
131    pub(crate) fn new() -> Self {
132        let stdin = io::stdin();
133        let isatty = *STDIN_ISATTY.get_or_init(|| {
134            stdin.is_terminal()
135                || Runtime::with_current(|r| r.attach(stdin.as_raw_handle() as _)).is_err()
136        });
137        Self {
138            fd: SharedFd::new(StaticFd(stdin.as_raw_handle())),
139            isatty,
140        }
141    }
142}
143
144impl AsyncRead for Stdin {
145    async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
146        let res = if self.isatty {
147            let op = StdRead::new(io::stdin(), buf);
148            compio_runtime::submit(op).await.into_inner()
149        } else {
150            let op = OpRead::new(self.fd.clone(), buf);
151            compio_runtime::submit(op).await.into_inner()
152        };
153        unsafe { res.map_advanced() }
154    }
155}
156
157impl AsyncReadManaged for Stdin {
158    type Buffer<'a> = BorrowedBuffer<'a>;
159    type BufferPool = BufferPool;
160
161    async fn read_managed<'a>(
162        &mut self,
163        buffer_pool: &'a Self::BufferPool,
164        len: usize,
165    ) -> io::Result<Self::Buffer<'a>> {
166        (&*self).read_managed(buffer_pool, len).await
167    }
168}
169
170impl AsyncReadManaged for &Stdin {
171    type Buffer<'a> = BorrowedBuffer<'a>;
172    type BufferPool = BufferPool;
173
174    async fn read_managed<'a>(
175        &mut self,
176        buffer_pool: &'a Self::BufferPool,
177        len: usize,
178    ) -> io::Result<Self::Buffer<'a>> {
179        let buffer_pool = buffer_pool.try_inner()?;
180        if self.isatty {
181            let buf = buffer_pool.get_buffer(len)?;
182            let op = StdRead::new(io::stdin(), buf);
183            let BufResult(res, buf) = compio_runtime::submit(op).await.into_inner();
184            let res = unsafe { buffer_pool.create_proxy(buf, res?) };
185            Ok(res)
186        } else {
187            let op = ReadManaged::new(self.fd.clone(), buffer_pool, len)?;
188            compio_runtime::submit(op)
189                .with_extra()
190                .await
191                .take_buffer(buffer_pool)
192        }
193    }
194}
195
196impl AsRawFd for Stdin {
197    fn as_raw_fd(&self) -> RawFd {
198        self.fd.as_raw_fd()
199    }
200}
201
202static STDOUT_ISATTY: OnceLock<bool> = OnceLock::new();
203
204/// A handle to the standard output stream of a process.
205///
206/// See [`stdout`].
207#[derive(Debug, Clone)]
208pub struct Stdout {
209    fd: SharedFd<StaticFd>,
210    isatty: bool,
211}
212
213impl Stdout {
214    pub(crate) fn new() -> Self {
215        let stdout = io::stdout();
216        let isatty = *STDOUT_ISATTY.get_or_init(|| {
217            stdout.is_terminal()
218                || Runtime::with_current(|r| r.attach(stdout.as_raw_handle() as _)).is_err()
219        });
220        Self {
221            fd: SharedFd::new(StaticFd(stdout.as_raw_handle())),
222            isatty,
223        }
224    }
225}
226
227impl AsyncWrite for Stdout {
228    async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
229        if self.isatty {
230            let op = StdWrite::new(io::stdout(), buf);
231            compio_runtime::submit(op).await.into_inner()
232        } else {
233            let op = OpWrite::new(self.fd.clone(), buf);
234            compio_runtime::submit(op).await.into_inner()
235        }
236    }
237
238    async fn flush(&mut self) -> io::Result<()> {
239        Ok(())
240    }
241
242    async fn shutdown(&mut self) -> io::Result<()> {
243        self.flush().await
244    }
245}
246
247impl AsRawFd for Stdout {
248    fn as_raw_fd(&self) -> RawFd {
249        self.fd.as_raw_fd()
250    }
251}
252
253static STDERR_ISATTY: OnceLock<bool> = OnceLock::new();
254
255/// A handle to the standard output stream of a process.
256///
257/// See [`stderr`].
258#[derive(Debug, Clone)]
259pub struct Stderr {
260    fd: SharedFd<StaticFd>,
261    isatty: bool,
262}
263
264impl Stderr {
265    pub(crate) fn new() -> Self {
266        let stderr = io::stderr();
267        let isatty = *STDERR_ISATTY.get_or_init(|| {
268            stderr.is_terminal()
269                || Runtime::with_current(|r| r.attach(stderr.as_raw_handle() as _)).is_err()
270        });
271        Self {
272            fd: SharedFd::new(StaticFd(stderr.as_raw_handle())),
273            isatty,
274        }
275    }
276}
277
278impl AsyncWrite for Stderr {
279    async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
280        if self.isatty {
281            let op = StdWrite::new(io::stderr(), buf);
282            compio_runtime::submit(op).await.into_inner()
283        } else {
284            let op = OpWrite::new(self.fd.clone(), buf);
285            compio_runtime::submit(op).await.into_inner()
286        }
287    }
288
289    async fn flush(&mut self) -> io::Result<()> {
290        Ok(())
291    }
292
293    async fn shutdown(&mut self) -> io::Result<()> {
294        self.flush().await
295    }
296}
297
298impl AsRawFd for Stderr {
299    fn as_raw_fd(&self) -> RawFd {
300        self.fd.as_raw_fd()
301    }
302}