Skip to main content

compio_fs/pipe/
mod.rs

1//! Unix pipe types.
2
3use std::{future::Future, io, os::fd::AsFd, path::Path};
4
5use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
6use compio_driver::{
7    BufferRef, impl_raw_fd,
8    op::{OFlags, Pipe},
9};
10use compio_io::{AsyncRead, AsyncReadManaged, AsyncReadMulti, AsyncWrite};
11use futures_util::Stream;
12use rustix::fs::{fcntl_getfl, fcntl_setfl};
13
14use crate::File;
15
16#[cfg(linux_all)]
17mod splice;
18
19#[cfg(linux_all)]
20pub use splice::*;
21
22/// Creates a pair of anonymous pipe.
23///
24/// ```
25/// use compio_fs::pipe::anonymous;
26/// use compio_io::{AsyncReadExt, AsyncWriteExt};
27///
28/// # compio_runtime::Runtime::new().unwrap().block_on(async {
29/// let (mut rx, mut tx) = anonymous().await.unwrap();
30///
31/// tx.write_all("Hello world!").await.unwrap();
32/// let (_, buf) = rx.read_exact(Vec::with_capacity(12)).await.unwrap();
33/// assert_eq!(&buf, b"Hello world!");
34/// # });
35/// ```
36pub async fn anonymous() -> io::Result<(Receiver, Sender)> {
37    let op = Pipe::new();
38    let BufResult(res, op) = compio_runtime::submit(op).await;
39    res?;
40    let (receiver, sender) = op.into_inner();
41    let receiver = Receiver {
42        file: File::from_std(std::fs::File::from(receiver))?,
43    };
44    let sender = Sender {
45        file: File::from_std(std::fs::File::from(sender))?,
46    };
47    Ok((receiver, sender))
48}
49
50/// Options and flags which can be used to configure how a FIFO file is opened.
51///
52/// This builder allows configuring how to create a pipe end from a FIFO file.
53/// Generally speaking, when using `OpenOptions`, you'll first call [`new`],
54/// then chain calls to methods to set each option, then call either
55/// [`open_receiver`] or [`open_sender`], passing the path of the FIFO file you
56/// are trying to open. This will give you a [`io::Result`] with a pipe end
57/// inside that you can further operate on.
58///
59/// [`new`]: OpenOptions::new
60/// [`open_receiver`]: OpenOptions::open_receiver
61/// [`open_sender`]: OpenOptions::open_sender
62///
63/// # Examples
64///
65/// Opening a pair of pipe ends from a FIFO file:
66///
67/// ```no_run
68/// use compio_fs::pipe;
69///
70/// const FIFO_NAME: &str = "path/to/a/fifo";
71///
72/// # async fn dox() -> std::io::Result<()> {
73/// let rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME).await?;
74/// let tx = pipe::OpenOptions::new().open_sender(FIFO_NAME).await?;
75/// # Ok(())
76/// # }
77/// ```
78///
79/// Opening a [`Sender`] on Linux when you are sure the file is a FIFO:
80///
81/// ```ignore
82/// use compio_fs::pipe;
83/// use nix::{sys::stat::Mode, unistd::mkfifo};
84///
85/// // Our program has exclusive access to this path.
86/// const FIFO_NAME: &str = "path/to/a/new/fifo";
87///
88/// # async fn dox() -> std::io::Result<()> {
89/// mkfifo(FIFO_NAME, Mode::S_IRWXU)?;
90/// let tx = pipe::OpenOptions::new()
91///     .read_write(true)
92///     .unchecked(true)
93///     .open_sender(FIFO_NAME)?;
94/// # Ok(())
95/// # }
96/// ```
97#[derive(Clone, Debug)]
98pub struct OpenOptions {
99    #[cfg(target_os = "linux")]
100    read_write: bool,
101    unchecked: bool,
102}
103
104impl OpenOptions {
105    /// Creates a blank new set of options ready for configuration.
106    ///
107    /// All options are initially set to `false`.
108    pub fn new() -> OpenOptions {
109        OpenOptions {
110            #[cfg(target_os = "linux")]
111            read_write: false,
112            unchecked: false,
113        }
114    }
115
116    /// Sets the option for read-write access.
117    ///
118    /// This option, when true, will indicate that a FIFO file will be opened
119    /// in read-write access mode. This operation is not defined by the POSIX
120    /// standard and is only guaranteed to work on Linux.
121    ///
122    /// # Examples
123    ///
124    /// Opening a [`Sender`] even if there are no open reading ends:
125    ///
126    /// ```
127    /// use compio_fs::pipe;
128    ///
129    /// # compio_runtime::Runtime::new().unwrap().block_on(async {
130    /// let tx = pipe::OpenOptions::new()
131    ///     .read_write(true)
132    ///     .open_sender("path/to/a/fifo")
133    ///     .await;
134    /// # });
135    /// ```
136    ///
137    /// Opening a resilient [`Receiver`] i.e. a reading pipe end which will not
138    /// fail with [`UnexpectedEof`] during reading if all writing ends of the
139    /// pipe close the FIFO file.
140    ///
141    /// [`UnexpectedEof`]: std::io::ErrorKind::UnexpectedEof
142    ///
143    /// ```
144    /// use compio_fs::pipe;
145    ///
146    /// # compio_runtime::Runtime::new().unwrap().block_on(async {
147    /// let tx = pipe::OpenOptions::new()
148    ///     .read_write(true)
149    ///     .open_receiver("path/to/a/fifo")
150    ///     .await;
151    /// # });
152    /// ```
153    #[cfg(target_os = "linux")]
154    #[cfg_attr(docsrs, doc(cfg(target_os = "linux")))]
155    pub fn read_write(&mut self, value: bool) -> &mut Self {
156        self.read_write = value;
157        self
158    }
159
160    /// Sets the option to skip the check for FIFO file type.
161    ///
162    /// By default, [`open_receiver`] and [`open_sender`] functions will check
163    /// if the opened file is a FIFO file. Set this option to `true` if you are
164    /// sure the file is a FIFO file.
165    ///
166    /// [`open_receiver`]: OpenOptions::open_receiver
167    /// [`open_sender`]: OpenOptions::open_sender
168    ///
169    /// # Examples
170    ///
171    /// ```no_run
172    /// use compio_fs::pipe;
173    /// use nix::{sys::stat::Mode, unistd::mkfifo};
174    ///
175    /// // Our program has exclusive access to this path.
176    /// const FIFO_NAME: &str = "path/to/a/new/fifo";
177    ///
178    /// # async fn dox() -> std::io::Result<()> {
179    /// mkfifo(FIFO_NAME, Mode::S_IRWXU)?;
180    /// let rx = pipe::OpenOptions::new()
181    ///     .unchecked(true)
182    ///     .open_receiver(FIFO_NAME)
183    ///     .await?;
184    /// # Ok(())
185    /// # }
186    /// ```
187    pub fn unchecked(&mut self, value: bool) -> &mut Self {
188        self.unchecked = value;
189        self
190    }
191
192    /// Creates a [`Receiver`] from a FIFO file with the options specified by
193    /// `self`.
194    ///
195    /// This function will open the FIFO file at the specified path, possibly
196    /// check if it is a pipe, and associate the pipe with the default event
197    /// loop for reading.
198    ///
199    /// # Errors
200    ///
201    /// If the file type check fails, this function will fail with
202    /// `io::ErrorKind::InvalidInput`. This function may also fail with
203    /// other standard OS errors.
204    pub async fn open_receiver<P: AsRef<Path>>(&self, path: P) -> io::Result<Receiver> {
205        let file = self.open(path.as_ref(), PipeEnd::Receiver).await?;
206        Receiver::from_file(file)
207    }
208
209    /// Creates a [`Sender`] from a FIFO file with the options specified by
210    /// `self`.
211    ///
212    /// This function will open the FIFO file at the specified path, possibly
213    /// check if it is a pipe, and associate the pipe with the default event
214    /// loop for writing.
215    ///
216    /// # Errors
217    ///
218    /// If the file type check fails, this function will fail with
219    /// `io::ErrorKind::InvalidInput`. If the file is not opened in
220    /// read-write access mode and the file is not currently open for
221    /// reading, this function will fail with `ENXIO`. This function may
222    /// also fail with other standard OS errors.
223    pub async fn open_sender<P: AsRef<Path>>(&self, path: P) -> io::Result<Sender> {
224        let file = self.open(path.as_ref(), PipeEnd::Sender).await?;
225        Sender::from_file(file)
226    }
227
228    async fn open(&self, path: &Path, pipe_end: PipeEnd) -> io::Result<File> {
229        let mut options = crate::OpenOptions::new();
230        options
231            .read(pipe_end == PipeEnd::Receiver)
232            .write(pipe_end == PipeEnd::Sender);
233
234        #[cfg(target_os = "linux")]
235        if self.read_write {
236            options.read(true).write(true);
237        }
238
239        let file = options.open(path).await?;
240
241        if !self.unchecked && !is_fifo(&file).await? {
242            return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
243        }
244
245        Ok(file)
246    }
247}
248
249impl Default for OpenOptions {
250    fn default() -> OpenOptions {
251        OpenOptions::new()
252    }
253}
254
255#[derive(Clone, Copy, PartialEq, Eq, Debug)]
256enum PipeEnd {
257    Sender,
258    Receiver,
259}
260
261/// Writing end of a Unix pipe.
262///
263/// It can be constructed from a FIFO file with [`OpenOptions::open_sender`].
264///
265/// Opening a named pipe for writing involves a few steps.
266/// Call to [`OpenOptions::open_sender`] might fail with an error indicating
267/// different things:
268///
269/// * [`io::ErrorKind::NotFound`] - There is no file at the specified path.
270/// * [`io::ErrorKind::InvalidInput`] - The file exists, but it is not a FIFO.
271/// * [`ENXIO`] - The file is a FIFO, but no process has it open for reading.
272///   Sleep for a while and try again.
273/// * Other OS errors not specific to opening FIFO files.
274///
275/// Opening a `Sender` from a FIFO file should look like this:
276///
277/// ```no_run
278/// use std::time::Duration;
279///
280/// use compio_fs::pipe;
281/// use compio_runtime::time;
282///
283/// const FIFO_NAME: &str = "path/to/a/fifo";
284///
285/// # async fn dox() -> std::io::Result<()> {
286/// // Wait for a reader to open the file.
287/// let tx = loop {
288///     match pipe::OpenOptions::new().open_sender(FIFO_NAME).await {
289///         Ok(tx) => break tx,
290///         Err(e) if e.raw_os_error() == Some(libc::ENXIO) => {}
291///         Err(e) => return Err(e.into()),
292///     }
293///
294///     time::sleep(Duration::from_millis(50)).await;
295/// };
296/// # Ok(())
297/// # }
298/// ```
299///
300/// On Linux, it is possible to create a `Sender` without waiting in a sleeping
301/// loop. This is done by opening a named pipe in read-write access mode with
302/// `OpenOptions::read_write`. This way, a `Sender` can at the same time hold
303/// both a writing end and a reading end, and the latter allows to open a FIFO
304/// without [`ENXIO`] error since the pipe is open for reading as well.
305///
306/// `Sender` cannot be used to read from a pipe, so in practice the read access
307/// is only used when a FIFO is opened. However, using a `Sender` in read-write
308/// mode **may lead to lost data**, because written data will be dropped by the
309/// system as soon as all pipe ends are closed. To avoid lost data you have to
310/// make sure that a reading end has been opened before dropping a `Sender`.
311///
312/// Note that using read-write access mode with FIFO files is not defined by
313/// the POSIX standard and it is only guaranteed to work on Linux.
314///
315/// ```ignore
316/// use compio_fs::pipe;
317/// use compio_io::AsyncWriteExt;
318///
319/// const FIFO_NAME: &str = "path/to/a/fifo";
320///
321/// # async fn dox() {
322/// let mut tx = pipe::OpenOptions::new()
323///     .read_write(true)
324///     .open_sender(FIFO_NAME)
325///     .unwrap();
326///
327/// // Asynchronously write to the pipe before a reader.
328/// tx.write_all("hello world").await.unwrap();
329/// # }
330/// ```
331///
332/// [`ENXIO`]: https://docs.rs/libc/latest/libc/constant.ENXIO.html
333#[derive(Debug, Clone)]
334pub struct Sender {
335    file: File,
336}
337
338impl Sender {
339    pub(crate) fn from_file(file: File) -> io::Result<Sender> {
340        set_nonblocking(&file)?;
341        Ok(Sender { file })
342    }
343
344    /// Close the pipe. If the returned future is dropped before polling, the
345    /// pipe won't be closed.
346    ///
347    /// See [`File::close`] for more details.
348    pub fn close(self) -> impl Future<Output = io::Result<()>> {
349        self.file.close()
350    }
351}
352
353impl AsyncWrite for Sender {
354    #[inline]
355    async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
356        (&*self).write(buf).await
357    }
358
359    #[inline]
360    async fn write_vectored<T: IoVectoredBuf>(&mut self, buf: T) -> BufResult<usize, T> {
361        (&*self).write_vectored(buf).await
362    }
363
364    #[inline]
365    async fn flush(&mut self) -> io::Result<()> {
366        (&*self).flush().await
367    }
368
369    #[inline]
370    async fn shutdown(&mut self) -> io::Result<()> {
371        (&*self).shutdown().await
372    }
373}
374
375impl AsyncWrite for &Sender {
376    async fn write<T: IoBuf>(&mut self, buffer: T) -> BufResult<usize, T> {
377        (&self.file.inner).write(buffer).await
378    }
379
380    async fn write_vectored<T: IoVectoredBuf>(&mut self, buffer: T) -> BufResult<usize, T> {
381        (&self.file.inner).write_vectored(buffer).await
382    }
383
384    #[inline]
385    async fn flush(&mut self) -> io::Result<()> {
386        Ok(())
387    }
388
389    #[inline]
390    async fn shutdown(&mut self) -> io::Result<()> {
391        Ok(())
392    }
393}
394
395impl_raw_fd!(Sender, std::fs::File, file, file);
396
397/// Reading end of a Unix pipe.
398///
399/// It can be constructed from a FIFO file with [`OpenOptions::open_receiver`].
400///
401/// # Examples
402///
403/// Receiving messages from a named pipe in a loop:
404///
405/// ```no_run
406/// use std::io;
407///
408/// use compio_buf::BufResult;
409/// use compio_fs::pipe;
410/// use compio_io::AsyncReadExt;
411///
412/// const FIFO_NAME: &str = "path/to/a/fifo";
413///
414/// # async fn dox() -> io::Result<()> {
415/// let mut rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME).await?;
416/// loop {
417///     let mut msg = Vec::with_capacity(256);
418///     let BufResult(res, msg) = rx.read_exact(msg).await;
419///     match res {
420///         Ok(_) => { /* handle the message */ }
421///         Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
422///             // Writing end has been closed, we should reopen the pipe.
423///             rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME).await?;
424///         }
425///         Err(e) => return Err(e.into()),
426///     }
427/// }
428/// # }
429/// ```
430///
431/// On Linux, you can use a `Receiver` in read-write access mode to implement
432/// resilient reading from a named pipe. Unlike `Receiver` opened in read-only
433/// mode, read from a pipe in read-write mode will not fail with `UnexpectedEof`
434/// when the writing end is closed. This way, a `Receiver` can asynchronously
435/// wait for the next writer to open the pipe.
436///
437/// You should not use functions waiting for EOF such as [`read_to_end`] with
438/// a `Receiver` in read-write access mode, since it **may wait forever**.
439/// `Receiver` in this mode also holds an open writing end, which prevents
440/// receiving EOF.
441///
442/// To set the read-write access mode you can use `OpenOptions::read_write`.
443/// Note that using read-write access mode with FIFO files is not defined by
444/// the POSIX standard and it is only guaranteed to work on Linux.
445///
446/// ```ignore
447/// use compio_fs::pipe;
448/// use compio_io::AsyncReadExt;
449///
450/// const FIFO_NAME: &str = "path/to/a/fifo";
451///
452/// # async fn dox() {
453/// let mut rx = pipe::OpenOptions::new()
454///     .read_write(true)
455///     .open_receiver(FIFO_NAME)
456///     .unwrap();
457/// loop {
458///     let mut msg = Vec::with_capacity(256);
459///     rx.read_exact(msg).await.unwrap();
460///     // handle the message
461/// }
462/// # }
463/// ```
464///
465/// [`read_to_end`]: compio_io::AsyncReadExt::read_to_end
466#[derive(Debug, Clone)]
467pub struct Receiver {
468    file: File,
469}
470
471impl Receiver {
472    pub(crate) fn from_file(file: File) -> io::Result<Receiver> {
473        set_nonblocking(&file)?;
474        Ok(Receiver { file })
475    }
476
477    /// Close the pipe. If the returned future is dropped before polling, the
478    /// pipe won't be closed.
479    ///
480    /// See [`File::close`] for more details.
481    pub fn close(self) -> impl Future<Output = io::Result<()>> {
482        self.file.close()
483    }
484}
485
486impl AsyncRead for Receiver {
487    async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
488        (&*self).read(buf).await
489    }
490
491    async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
492        (&*self).read_vectored(buf).await
493    }
494}
495
496impl AsyncRead for &Receiver {
497    async fn read<B: IoBufMut>(&mut self, buffer: B) -> BufResult<usize, B> {
498        (&self.file.inner).read(buffer).await
499    }
500
501    async fn read_vectored<V: IoVectoredBufMut>(&mut self, buffer: V) -> BufResult<usize, V> {
502        (&self.file.inner).read_vectored(buffer).await
503    }
504}
505
506impl AsyncReadManaged for Receiver {
507    type Buffer = BufferRef;
508
509    async fn read_managed(&mut self, len: usize) -> io::Result<Option<Self::Buffer>> {
510        (&*self).read_managed(len).await
511    }
512}
513
514impl AsyncReadManaged for &Receiver {
515    type Buffer = BufferRef;
516
517    async fn read_managed(&mut self, len: usize) -> io::Result<Option<Self::Buffer>> {
518        (&self.file.inner).read_managed(len).await
519    }
520}
521
522impl AsyncReadMulti for Receiver {
523    fn read_multi(&mut self, len: usize) -> impl Stream<Item = io::Result<Self::Buffer>> {
524        self.file.inner.read_multi(len)
525    }
526}
527
528impl AsyncReadMulti for &Receiver {
529    fn read_multi(&mut self, len: usize) -> impl Stream<Item = io::Result<Self::Buffer>> {
530        self.file.inner.read_multi_shared(len)
531    }
532}
533
534impl_raw_fd!(Receiver, std::fs::File, file, file);
535
536/// Checks if file is a FIFO
537async fn is_fifo(file: &File) -> io::Result<bool> {
538    use std::os::unix::prelude::FileTypeExt;
539
540    Ok(file.metadata().await?.file_type().is_fifo())
541}
542
543/// Sets file's flags with O_NONBLOCK by fcntl.
544fn set_nonblocking(file: &impl AsFd) -> io::Result<()> {
545    if compio_runtime::Runtime::with_current(|r| r.driver_type()).is_polling() {
546        let current_flags = fcntl_getfl(file)?;
547        let flags = current_flags | OFlags::NONBLOCK;
548        if flags != current_flags {
549            fcntl_setfl(file, flags)?;
550        }
551    }
552    Ok(())
553}