compio_fs/pipe/mod.rs
1//! Unix pipe types.
2
3use std::{future::Future, io, os::fd::AsFd, path::Path};
4
5use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
6use compio_driver::{
7 BufferRef, impl_raw_fd,
8 op::{OFlags, Pipe},
9};
10use compio_io::{AsyncRead, AsyncReadManaged, AsyncReadMulti, AsyncWrite};
11use futures_util::Stream;
12use rustix::fs::{fcntl_getfl, fcntl_setfl};
13
14use crate::File;
15
16#[cfg(linux_all)]
17mod splice;
18
19#[cfg(linux_all)]
20pub use splice::*;
21
22/// Creates a pair of anonymous pipe.
23///
24/// ```
25/// use compio_fs::pipe::anonymous;
26/// use compio_io::{AsyncReadExt, AsyncWriteExt};
27///
28/// # compio_runtime::Runtime::new().unwrap().block_on(async {
29/// let (mut rx, mut tx) = anonymous().await.unwrap();
30///
31/// tx.write_all("Hello world!").await.unwrap();
32/// let (_, buf) = rx.read_exact(Vec::with_capacity(12)).await.unwrap();
33/// assert_eq!(&buf, b"Hello world!");
34/// # });
35/// ```
36pub async fn anonymous() -> io::Result<(Receiver, Sender)> {
37 let op = Pipe::new();
38 let BufResult(res, op) = compio_runtime::submit(op).await;
39 res?;
40 let (receiver, sender) = op.into_inner();
41 let receiver = Receiver {
42 file: File::from_std(std::fs::File::from(receiver))?,
43 };
44 let sender = Sender {
45 file: File::from_std(std::fs::File::from(sender))?,
46 };
47 Ok((receiver, sender))
48}
49
50/// Options and flags which can be used to configure how a FIFO file is opened.
51///
52/// This builder allows configuring how to create a pipe end from a FIFO file.
53/// Generally speaking, when using `OpenOptions`, you'll first call [`new`],
54/// then chain calls to methods to set each option, then call either
55/// [`open_receiver`] or [`open_sender`], passing the path of the FIFO file you
56/// are trying to open. This will give you a [`io::Result`] with a pipe end
57/// inside that you can further operate on.
58///
59/// [`new`]: OpenOptions::new
60/// [`open_receiver`]: OpenOptions::open_receiver
61/// [`open_sender`]: OpenOptions::open_sender
62///
63/// # Examples
64///
65/// Opening a pair of pipe ends from a FIFO file:
66///
67/// ```no_run
68/// use compio_fs::pipe;
69///
70/// const FIFO_NAME: &str = "path/to/a/fifo";
71///
72/// # async fn dox() -> std::io::Result<()> {
73/// let rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME).await?;
74/// let tx = pipe::OpenOptions::new().open_sender(FIFO_NAME).await?;
75/// # Ok(())
76/// # }
77/// ```
78///
79/// Opening a [`Sender`] on Linux when you are sure the file is a FIFO:
80///
81/// ```ignore
82/// use compio_fs::pipe;
83/// use nix::{sys::stat::Mode, unistd::mkfifo};
84///
85/// // Our program has exclusive access to this path.
86/// const FIFO_NAME: &str = "path/to/a/new/fifo";
87///
88/// # async fn dox() -> std::io::Result<()> {
89/// mkfifo(FIFO_NAME, Mode::S_IRWXU)?;
90/// let tx = pipe::OpenOptions::new()
91/// .read_write(true)
92/// .unchecked(true)
93/// .open_sender(FIFO_NAME)?;
94/// # Ok(())
95/// # }
96/// ```
97#[derive(Clone, Debug)]
98pub struct OpenOptions {
99 #[cfg(target_os = "linux")]
100 read_write: bool,
101 unchecked: bool,
102}
103
104impl OpenOptions {
105 /// Creates a blank new set of options ready for configuration.
106 ///
107 /// All options are initially set to `false`.
108 pub fn new() -> OpenOptions {
109 OpenOptions {
110 #[cfg(target_os = "linux")]
111 read_write: false,
112 unchecked: false,
113 }
114 }
115
116 /// Sets the option for read-write access.
117 ///
118 /// This option, when true, will indicate that a FIFO file will be opened
119 /// in read-write access mode. This operation is not defined by the POSIX
120 /// standard and is only guaranteed to work on Linux.
121 ///
122 /// # Examples
123 ///
124 /// Opening a [`Sender`] even if there are no open reading ends:
125 ///
126 /// ```
127 /// use compio_fs::pipe;
128 ///
129 /// # compio_runtime::Runtime::new().unwrap().block_on(async {
130 /// let tx = pipe::OpenOptions::new()
131 /// .read_write(true)
132 /// .open_sender("path/to/a/fifo")
133 /// .await;
134 /// # });
135 /// ```
136 ///
137 /// Opening a resilient [`Receiver`] i.e. a reading pipe end which will not
138 /// fail with [`UnexpectedEof`] during reading if all writing ends of the
139 /// pipe close the FIFO file.
140 ///
141 /// [`UnexpectedEof`]: std::io::ErrorKind::UnexpectedEof
142 ///
143 /// ```
144 /// use compio_fs::pipe;
145 ///
146 /// # compio_runtime::Runtime::new().unwrap().block_on(async {
147 /// let tx = pipe::OpenOptions::new()
148 /// .read_write(true)
149 /// .open_receiver("path/to/a/fifo")
150 /// .await;
151 /// # });
152 /// ```
153 #[cfg(target_os = "linux")]
154 #[cfg_attr(docsrs, doc(cfg(target_os = "linux")))]
155 pub fn read_write(&mut self, value: bool) -> &mut Self {
156 self.read_write = value;
157 self
158 }
159
160 /// Sets the option to skip the check for FIFO file type.
161 ///
162 /// By default, [`open_receiver`] and [`open_sender`] functions will check
163 /// if the opened file is a FIFO file. Set this option to `true` if you are
164 /// sure the file is a FIFO file.
165 ///
166 /// [`open_receiver`]: OpenOptions::open_receiver
167 /// [`open_sender`]: OpenOptions::open_sender
168 ///
169 /// # Examples
170 ///
171 /// ```no_run
172 /// use compio_fs::pipe;
173 /// use nix::{sys::stat::Mode, unistd::mkfifo};
174 ///
175 /// // Our program has exclusive access to this path.
176 /// const FIFO_NAME: &str = "path/to/a/new/fifo";
177 ///
178 /// # async fn dox() -> std::io::Result<()> {
179 /// mkfifo(FIFO_NAME, Mode::S_IRWXU)?;
180 /// let rx = pipe::OpenOptions::new()
181 /// .unchecked(true)
182 /// .open_receiver(FIFO_NAME)
183 /// .await?;
184 /// # Ok(())
185 /// # }
186 /// ```
187 pub fn unchecked(&mut self, value: bool) -> &mut Self {
188 self.unchecked = value;
189 self
190 }
191
192 /// Creates a [`Receiver`] from a FIFO file with the options specified by
193 /// `self`.
194 ///
195 /// This function will open the FIFO file at the specified path, possibly
196 /// check if it is a pipe, and associate the pipe with the default event
197 /// loop for reading.
198 ///
199 /// # Errors
200 ///
201 /// If the file type check fails, this function will fail with
202 /// `io::ErrorKind::InvalidInput`. This function may also fail with
203 /// other standard OS errors.
204 pub async fn open_receiver<P: AsRef<Path>>(&self, path: P) -> io::Result<Receiver> {
205 let file = self.open(path.as_ref(), PipeEnd::Receiver).await?;
206 Receiver::from_file(file)
207 }
208
209 /// Creates a [`Sender`] from a FIFO file with the options specified by
210 /// `self`.
211 ///
212 /// This function will open the FIFO file at the specified path, possibly
213 /// check if it is a pipe, and associate the pipe with the default event
214 /// loop for writing.
215 ///
216 /// # Errors
217 ///
218 /// If the file type check fails, this function will fail with
219 /// `io::ErrorKind::InvalidInput`. If the file is not opened in
220 /// read-write access mode and the file is not currently open for
221 /// reading, this function will fail with `ENXIO`. This function may
222 /// also fail with other standard OS errors.
223 pub async fn open_sender<P: AsRef<Path>>(&self, path: P) -> io::Result<Sender> {
224 let file = self.open(path.as_ref(), PipeEnd::Sender).await?;
225 Sender::from_file(file)
226 }
227
228 async fn open(&self, path: &Path, pipe_end: PipeEnd) -> io::Result<File> {
229 let mut options = crate::OpenOptions::new();
230 options
231 .read(pipe_end == PipeEnd::Receiver)
232 .write(pipe_end == PipeEnd::Sender);
233
234 #[cfg(target_os = "linux")]
235 if self.read_write {
236 options.read(true).write(true);
237 }
238
239 let file = options.open(path).await?;
240
241 if !self.unchecked && !is_fifo(&file).await? {
242 return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
243 }
244
245 Ok(file)
246 }
247}
248
249impl Default for OpenOptions {
250 fn default() -> OpenOptions {
251 OpenOptions::new()
252 }
253}
254
255#[derive(Clone, Copy, PartialEq, Eq, Debug)]
256enum PipeEnd {
257 Sender,
258 Receiver,
259}
260
261/// Writing end of a Unix pipe.
262///
263/// It can be constructed from a FIFO file with [`OpenOptions::open_sender`].
264///
265/// Opening a named pipe for writing involves a few steps.
266/// Call to [`OpenOptions::open_sender`] might fail with an error indicating
267/// different things:
268///
269/// * [`io::ErrorKind::NotFound`] - There is no file at the specified path.
270/// * [`io::ErrorKind::InvalidInput`] - The file exists, but it is not a FIFO.
271/// * [`ENXIO`] - The file is a FIFO, but no process has it open for reading.
272/// Sleep for a while and try again.
273/// * Other OS errors not specific to opening FIFO files.
274///
275/// Opening a `Sender` from a FIFO file should look like this:
276///
277/// ```no_run
278/// use std::time::Duration;
279///
280/// use compio_fs::pipe;
281/// use compio_runtime::time;
282///
283/// const FIFO_NAME: &str = "path/to/a/fifo";
284///
285/// # async fn dox() -> std::io::Result<()> {
286/// // Wait for a reader to open the file.
287/// let tx = loop {
288/// match pipe::OpenOptions::new().open_sender(FIFO_NAME).await {
289/// Ok(tx) => break tx,
290/// Err(e) if e.raw_os_error() == Some(libc::ENXIO) => {}
291/// Err(e) => return Err(e.into()),
292/// }
293///
294/// time::sleep(Duration::from_millis(50)).await;
295/// };
296/// # Ok(())
297/// # }
298/// ```
299///
300/// On Linux, it is possible to create a `Sender` without waiting in a sleeping
301/// loop. This is done by opening a named pipe in read-write access mode with
302/// `OpenOptions::read_write`. This way, a `Sender` can at the same time hold
303/// both a writing end and a reading end, and the latter allows to open a FIFO
304/// without [`ENXIO`] error since the pipe is open for reading as well.
305///
306/// `Sender` cannot be used to read from a pipe, so in practice the read access
307/// is only used when a FIFO is opened. However, using a `Sender` in read-write
308/// mode **may lead to lost data**, because written data will be dropped by the
309/// system as soon as all pipe ends are closed. To avoid lost data you have to
310/// make sure that a reading end has been opened before dropping a `Sender`.
311///
312/// Note that using read-write access mode with FIFO files is not defined by
313/// the POSIX standard and it is only guaranteed to work on Linux.
314///
315/// ```ignore
316/// use compio_fs::pipe;
317/// use compio_io::AsyncWriteExt;
318///
319/// const FIFO_NAME: &str = "path/to/a/fifo";
320///
321/// # async fn dox() {
322/// let mut tx = pipe::OpenOptions::new()
323/// .read_write(true)
324/// .open_sender(FIFO_NAME)
325/// .unwrap();
326///
327/// // Asynchronously write to the pipe before a reader.
328/// tx.write_all("hello world").await.unwrap();
329/// # }
330/// ```
331///
332/// [`ENXIO`]: https://docs.rs/libc/latest/libc/constant.ENXIO.html
333#[derive(Debug, Clone)]
334pub struct Sender {
335 file: File,
336}
337
338impl Sender {
339 pub(crate) fn from_file(file: File) -> io::Result<Sender> {
340 set_nonblocking(&file)?;
341 Ok(Sender { file })
342 }
343
344 /// Close the pipe. If the returned future is dropped before polling, the
345 /// pipe won't be closed.
346 ///
347 /// See [`File::close`] for more details.
348 pub fn close(self) -> impl Future<Output = io::Result<()>> {
349 self.file.close()
350 }
351}
352
353impl AsyncWrite for Sender {
354 #[inline]
355 async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
356 (&*self).write(buf).await
357 }
358
359 #[inline]
360 async fn write_vectored<T: IoVectoredBuf>(&mut self, buf: T) -> BufResult<usize, T> {
361 (&*self).write_vectored(buf).await
362 }
363
364 #[inline]
365 async fn flush(&mut self) -> io::Result<()> {
366 (&*self).flush().await
367 }
368
369 #[inline]
370 async fn shutdown(&mut self) -> io::Result<()> {
371 (&*self).shutdown().await
372 }
373}
374
375impl AsyncWrite for &Sender {
376 async fn write<T: IoBuf>(&mut self, buffer: T) -> BufResult<usize, T> {
377 (&self.file.inner).write(buffer).await
378 }
379
380 async fn write_vectored<T: IoVectoredBuf>(&mut self, buffer: T) -> BufResult<usize, T> {
381 (&self.file.inner).write_vectored(buffer).await
382 }
383
384 #[inline]
385 async fn flush(&mut self) -> io::Result<()> {
386 Ok(())
387 }
388
389 #[inline]
390 async fn shutdown(&mut self) -> io::Result<()> {
391 Ok(())
392 }
393}
394
395impl_raw_fd!(Sender, std::fs::File, file, file);
396
397/// Reading end of a Unix pipe.
398///
399/// It can be constructed from a FIFO file with [`OpenOptions::open_receiver`].
400///
401/// # Examples
402///
403/// Receiving messages from a named pipe in a loop:
404///
405/// ```no_run
406/// use std::io;
407///
408/// use compio_buf::BufResult;
409/// use compio_fs::pipe;
410/// use compio_io::AsyncReadExt;
411///
412/// const FIFO_NAME: &str = "path/to/a/fifo";
413///
414/// # async fn dox() -> io::Result<()> {
415/// let mut rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME).await?;
416/// loop {
417/// let mut msg = Vec::with_capacity(256);
418/// let BufResult(res, msg) = rx.read_exact(msg).await;
419/// match res {
420/// Ok(_) => { /* handle the message */ }
421/// Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
422/// // Writing end has been closed, we should reopen the pipe.
423/// rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME).await?;
424/// }
425/// Err(e) => return Err(e.into()),
426/// }
427/// }
428/// # }
429/// ```
430///
431/// On Linux, you can use a `Receiver` in read-write access mode to implement
432/// resilient reading from a named pipe. Unlike `Receiver` opened in read-only
433/// mode, read from a pipe in read-write mode will not fail with `UnexpectedEof`
434/// when the writing end is closed. This way, a `Receiver` can asynchronously
435/// wait for the next writer to open the pipe.
436///
437/// You should not use functions waiting for EOF such as [`read_to_end`] with
438/// a `Receiver` in read-write access mode, since it **may wait forever**.
439/// `Receiver` in this mode also holds an open writing end, which prevents
440/// receiving EOF.
441///
442/// To set the read-write access mode you can use `OpenOptions::read_write`.
443/// Note that using read-write access mode with FIFO files is not defined by
444/// the POSIX standard and it is only guaranteed to work on Linux.
445///
446/// ```ignore
447/// use compio_fs::pipe;
448/// use compio_io::AsyncReadExt;
449///
450/// const FIFO_NAME: &str = "path/to/a/fifo";
451///
452/// # async fn dox() {
453/// let mut rx = pipe::OpenOptions::new()
454/// .read_write(true)
455/// .open_receiver(FIFO_NAME)
456/// .unwrap();
457/// loop {
458/// let mut msg = Vec::with_capacity(256);
459/// rx.read_exact(msg).await.unwrap();
460/// // handle the message
461/// }
462/// # }
463/// ```
464///
465/// [`read_to_end`]: compio_io::AsyncReadExt::read_to_end
466#[derive(Debug, Clone)]
467pub struct Receiver {
468 file: File,
469}
470
471impl Receiver {
472 pub(crate) fn from_file(file: File) -> io::Result<Receiver> {
473 set_nonblocking(&file)?;
474 Ok(Receiver { file })
475 }
476
477 /// Close the pipe. If the returned future is dropped before polling, the
478 /// pipe won't be closed.
479 ///
480 /// See [`File::close`] for more details.
481 pub fn close(self) -> impl Future<Output = io::Result<()>> {
482 self.file.close()
483 }
484}
485
486impl AsyncRead for Receiver {
487 async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
488 (&*self).read(buf).await
489 }
490
491 async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
492 (&*self).read_vectored(buf).await
493 }
494}
495
496impl AsyncRead for &Receiver {
497 async fn read<B: IoBufMut>(&mut self, buffer: B) -> BufResult<usize, B> {
498 (&self.file.inner).read(buffer).await
499 }
500
501 async fn read_vectored<V: IoVectoredBufMut>(&mut self, buffer: V) -> BufResult<usize, V> {
502 (&self.file.inner).read_vectored(buffer).await
503 }
504}
505
506impl AsyncReadManaged for Receiver {
507 type Buffer = BufferRef;
508
509 async fn read_managed(&mut self, len: usize) -> io::Result<Option<Self::Buffer>> {
510 (&*self).read_managed(len).await
511 }
512}
513
514impl AsyncReadManaged for &Receiver {
515 type Buffer = BufferRef;
516
517 async fn read_managed(&mut self, len: usize) -> io::Result<Option<Self::Buffer>> {
518 (&self.file.inner).read_managed(len).await
519 }
520}
521
522impl AsyncReadMulti for Receiver {
523 fn read_multi(&mut self, len: usize) -> impl Stream<Item = io::Result<Self::Buffer>> {
524 self.file.inner.read_multi(len)
525 }
526}
527
528impl AsyncReadMulti for &Receiver {
529 fn read_multi(&mut self, len: usize) -> impl Stream<Item = io::Result<Self::Buffer>> {
530 self.file.inner.read_multi_shared(len)
531 }
532}
533
534impl_raw_fd!(Receiver, std::fs::File, file, file);
535
536/// Checks if file is a FIFO
537async fn is_fifo(file: &File) -> io::Result<bool> {
538 use std::os::unix::prelude::FileTypeExt;
539
540 Ok(file.metadata().await?.file_type().is_fifo())
541}
542
543/// Sets file's flags with O_NONBLOCK by fcntl.
544fn set_nonblocking(file: &impl AsFd) -> io::Result<()> {
545 if compio_runtime::Runtime::with_current(|r| r.driver_type()).is_polling() {
546 let current_flags = fcntl_getfl(file)?;
547 let flags = current_flags | OFlags::NONBLOCK;
548 if flags != current_flags {
549 fcntl_setfl(file, flags)?;
550 }
551 }
552 Ok(())
553}