compio_fs/pipe/
mod.rs

1//! Unix pipe types.
2
3use std::{
4    future::Future,
5    io,
6    os::fd::{FromRawFd, IntoRawFd},
7    path::Path,
8};
9
10use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
11use compio_driver::{
12    AsRawFd, ToSharedFd, impl_raw_fd,
13    op::{
14        BufResultExt, Read, ReadManaged, ReadVectored, ResultTakeBuffer, VecBufResultExt, Write,
15        WriteVectored,
16    },
17    syscall,
18};
19use compio_io::{AsyncRead, AsyncReadManaged, AsyncWrite};
20use compio_runtime::{BorrowedBuffer, BufferPool};
21
22use crate::File;
23
24#[cfg(linux_all)]
25mod splice;
26
27#[cfg(linux_all)]
28pub use splice::*;
29
30/// Creates a pair of anonymous pipe.
31///
32/// ```
33/// use compio_fs::pipe::anonymous;
34/// use compio_io::{AsyncReadExt, AsyncWriteExt};
35///
36/// # compio_runtime::Runtime::new().unwrap().block_on(async {
37/// let (mut rx, mut tx) = anonymous().unwrap();
38///
39/// tx.write_all("Hello world!").await.unwrap();
40/// let (_, buf) = rx.read_exact(Vec::with_capacity(12)).await.unwrap();
41/// assert_eq!(&buf, b"Hello world!");
42/// # });
43/// ```
44pub fn anonymous() -> io::Result<(Receiver, Sender)> {
45    let (receiver, sender) = os_pipe::pipe()?;
46    let receiver = Receiver::from_file(File::from_std(unsafe {
47        std::fs::File::from_raw_fd(receiver.into_raw_fd())
48    })?)?;
49    let sender = Sender::from_file(File::from_std(unsafe {
50        std::fs::File::from_raw_fd(sender.into_raw_fd())
51    })?)?;
52    Ok((receiver, sender))
53}
54
55/// Options and flags which can be used to configure how a FIFO file is opened.
56///
57/// This builder allows configuring how to create a pipe end from a FIFO file.
58/// Generally speaking, when using `OpenOptions`, you'll first call [`new`],
59/// then chain calls to methods to set each option, then call either
60/// [`open_receiver`] or [`open_sender`], passing the path of the FIFO file you
61/// are trying to open. This will give you a [`io::Result`] with a pipe end
62/// inside that you can further operate on.
63///
64/// [`new`]: OpenOptions::new
65/// [`open_receiver`]: OpenOptions::open_receiver
66/// [`open_sender`]: OpenOptions::open_sender
67///
68/// # Examples
69///
70/// Opening a pair of pipe ends from a FIFO file:
71///
72/// ```no_run
73/// use compio_fs::pipe;
74///
75/// const FIFO_NAME: &str = "path/to/a/fifo";
76///
77/// # async fn dox() -> std::io::Result<()> {
78/// let rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME).await?;
79/// let tx = pipe::OpenOptions::new().open_sender(FIFO_NAME).await?;
80/// # Ok(())
81/// # }
82/// ```
83///
84/// Opening a [`Sender`] on Linux when you are sure the file is a FIFO:
85///
86/// ```ignore
87/// use compio_fs::pipe;
88/// use nix::{sys::stat::Mode, unistd::mkfifo};
89///
90/// // Our program has exclusive access to this path.
91/// const FIFO_NAME: &str = "path/to/a/new/fifo";
92///
93/// # async fn dox() -> std::io::Result<()> {
94/// mkfifo(FIFO_NAME, Mode::S_IRWXU)?;
95/// let tx = pipe::OpenOptions::new()
96///     .read_write(true)
97///     .unchecked(true)
98///     .open_sender(FIFO_NAME)?;
99/// # Ok(())
100/// # }
101/// ```
102#[derive(Clone, Debug)]
103pub struct OpenOptions {
104    #[cfg(target_os = "linux")]
105    read_write: bool,
106    unchecked: bool,
107}
108
109impl OpenOptions {
110    /// Creates a blank new set of options ready for configuration.
111    ///
112    /// All options are initially set to `false`.
113    pub fn new() -> OpenOptions {
114        OpenOptions {
115            #[cfg(target_os = "linux")]
116            read_write: false,
117            unchecked: false,
118        }
119    }
120
121    /// Sets the option for read-write access.
122    ///
123    /// This option, when true, will indicate that a FIFO file will be opened
124    /// in read-write access mode. This operation is not defined by the POSIX
125    /// standard and is only guaranteed to work on Linux.
126    ///
127    /// # Examples
128    ///
129    /// Opening a [`Sender`] even if there are no open reading ends:
130    ///
131    /// ```
132    /// use compio_fs::pipe;
133    ///
134    /// # compio_runtime::Runtime::new().unwrap().block_on(async {
135    /// let tx = pipe::OpenOptions::new()
136    ///     .read_write(true)
137    ///     .open_sender("path/to/a/fifo")
138    ///     .await;
139    /// # });
140    /// ```
141    ///
142    /// Opening a resilient [`Receiver`] i.e. a reading pipe end which will not
143    /// fail with [`UnexpectedEof`] during reading if all writing ends of the
144    /// pipe close the FIFO file.
145    ///
146    /// [`UnexpectedEof`]: std::io::ErrorKind::UnexpectedEof
147    ///
148    /// ```
149    /// use compio_fs::pipe;
150    ///
151    /// # compio_runtime::Runtime::new().unwrap().block_on(async {
152    /// let tx = pipe::OpenOptions::new()
153    ///     .read_write(true)
154    ///     .open_receiver("path/to/a/fifo")
155    ///     .await;
156    /// # });
157    /// ```
158    #[cfg(target_os = "linux")]
159    #[cfg_attr(docsrs, doc(cfg(target_os = "linux")))]
160    pub fn read_write(&mut self, value: bool) -> &mut Self {
161        self.read_write = value;
162        self
163    }
164
165    /// Sets the option to skip the check for FIFO file type.
166    ///
167    /// By default, [`open_receiver`] and [`open_sender`] functions will check
168    /// if the opened file is a FIFO file. Set this option to `true` if you are
169    /// sure the file is a FIFO file.
170    ///
171    /// [`open_receiver`]: OpenOptions::open_receiver
172    /// [`open_sender`]: OpenOptions::open_sender
173    ///
174    /// # Examples
175    ///
176    /// ```no_run
177    /// use compio_fs::pipe;
178    /// use nix::{sys::stat::Mode, unistd::mkfifo};
179    ///
180    /// // Our program has exclusive access to this path.
181    /// const FIFO_NAME: &str = "path/to/a/new/fifo";
182    ///
183    /// # async fn dox() -> std::io::Result<()> {
184    /// mkfifo(FIFO_NAME, Mode::S_IRWXU)?;
185    /// let rx = pipe::OpenOptions::new()
186    ///     .unchecked(true)
187    ///     .open_receiver(FIFO_NAME)
188    ///     .await?;
189    /// # Ok(())
190    /// # }
191    /// ```
192    pub fn unchecked(&mut self, value: bool) -> &mut Self {
193        self.unchecked = value;
194        self
195    }
196
197    /// Creates a [`Receiver`] from a FIFO file with the options specified by
198    /// `self`.
199    ///
200    /// This function will open the FIFO file at the specified path, possibly
201    /// check if it is a pipe, and associate the pipe with the default event
202    /// loop for reading.
203    ///
204    /// # Errors
205    ///
206    /// If the file type check fails, this function will fail with
207    /// `io::ErrorKind::InvalidInput`. This function may also fail with
208    /// other standard OS errors.
209    pub async fn open_receiver<P: AsRef<Path>>(&self, path: P) -> io::Result<Receiver> {
210        let file = self.open(path.as_ref(), PipeEnd::Receiver).await?;
211        Receiver::from_file(file)
212    }
213
214    /// Creates a [`Sender`] from a FIFO file with the options specified by
215    /// `self`.
216    ///
217    /// This function will open the FIFO file at the specified path, possibly
218    /// check if it is a pipe, and associate the pipe with the default event
219    /// loop for writing.
220    ///
221    /// # Errors
222    ///
223    /// If the file type check fails, this function will fail with
224    /// `io::ErrorKind::InvalidInput`. If the file is not opened in
225    /// read-write access mode and the file is not currently open for
226    /// reading, this function will fail with `ENXIO`. This function may
227    /// also fail with other standard OS errors.
228    pub async fn open_sender<P: AsRef<Path>>(&self, path: P) -> io::Result<Sender> {
229        let file = self.open(path.as_ref(), PipeEnd::Sender).await?;
230        Sender::from_file(file)
231    }
232
233    async fn open(&self, path: &Path, pipe_end: PipeEnd) -> io::Result<File> {
234        let mut options = crate::OpenOptions::new();
235        options
236            .read(pipe_end == PipeEnd::Receiver)
237            .write(pipe_end == PipeEnd::Sender);
238
239        #[cfg(target_os = "linux")]
240        if self.read_write {
241            options.read(true).write(true);
242        }
243
244        let file = options.open(path).await?;
245
246        if !self.unchecked && !is_fifo(&file).await? {
247            return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
248        }
249
250        Ok(file)
251    }
252}
253
254impl Default for OpenOptions {
255    fn default() -> OpenOptions {
256        OpenOptions::new()
257    }
258}
259
260#[derive(Clone, Copy, PartialEq, Eq, Debug)]
261enum PipeEnd {
262    Sender,
263    Receiver,
264}
265
266/// Writing end of a Unix pipe.
267///
268/// It can be constructed from a FIFO file with [`OpenOptions::open_sender`].
269///
270/// Opening a named pipe for writing involves a few steps.
271/// Call to [`OpenOptions::open_sender`] might fail with an error indicating
272/// different things:
273///
274/// * [`io::ErrorKind::NotFound`] - There is no file at the specified path.
275/// * [`io::ErrorKind::InvalidInput`] - The file exists, but it is not a FIFO.
276/// * [`ENXIO`] - The file is a FIFO, but no process has it open for reading.
277///   Sleep for a while and try again.
278/// * Other OS errors not specific to opening FIFO files.
279///
280/// Opening a `Sender` from a FIFO file should look like this:
281///
282/// ```no_run
283/// use std::time::Duration;
284///
285/// use compio_fs::pipe;
286/// use compio_runtime::time;
287///
288/// const FIFO_NAME: &str = "path/to/a/fifo";
289///
290/// # async fn dox() -> std::io::Result<()> {
291/// // Wait for a reader to open the file.
292/// let tx = loop {
293///     match pipe::OpenOptions::new().open_sender(FIFO_NAME).await {
294///         Ok(tx) => break tx,
295///         Err(e) if e.raw_os_error() == Some(libc::ENXIO) => {}
296///         Err(e) => return Err(e.into()),
297///     }
298///
299///     time::sleep(Duration::from_millis(50)).await;
300/// };
301/// # Ok(())
302/// # }
303/// ```
304///
305/// On Linux, it is possible to create a `Sender` without waiting in a sleeping
306/// loop. This is done by opening a named pipe in read-write access mode with
307/// `OpenOptions::read_write`. This way, a `Sender` can at the same time hold
308/// both a writing end and a reading end, and the latter allows to open a FIFO
309/// without [`ENXIO`] error since the pipe is open for reading as well.
310///
311/// `Sender` cannot be used to read from a pipe, so in practice the read access
312/// is only used when a FIFO is opened. However, using a `Sender` in read-write
313/// mode **may lead to lost data**, because written data will be dropped by the
314/// system as soon as all pipe ends are closed. To avoid lost data you have to
315/// make sure that a reading end has been opened before dropping a `Sender`.
316///
317/// Note that using read-write access mode with FIFO files is not defined by
318/// the POSIX standard and it is only guaranteed to work on Linux.
319///
320/// ```ignore
321/// use compio_fs::pipe;
322/// use compio_io::AsyncWriteExt;
323///
324/// const FIFO_NAME: &str = "path/to/a/fifo";
325///
326/// # async fn dox() {
327/// let mut tx = pipe::OpenOptions::new()
328///     .read_write(true)
329///     .open_sender(FIFO_NAME)
330///     .unwrap();
331///
332/// // Asynchronously write to the pipe before a reader.
333/// tx.write_all("hello world").await.unwrap();
334/// # }
335/// ```
336///
337/// [`ENXIO`]: https://docs.rs/libc/latest/libc/constant.ENXIO.html
338#[derive(Debug, Clone)]
339pub struct Sender {
340    file: File,
341}
342
343impl Sender {
344    pub(crate) fn from_file(file: File) -> io::Result<Sender> {
345        set_nonblocking(&file)?;
346        Ok(Sender { file })
347    }
348
349    /// Close the pipe. If the returned future is dropped before polling, the
350    /// pipe won't be closed.
351    pub fn close(self) -> impl Future<Output = io::Result<()>> {
352        self.file.close()
353    }
354}
355
356impl AsyncWrite for Sender {
357    #[inline]
358    async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
359        (&*self).write(buf).await
360    }
361
362    #[inline]
363    async fn write_vectored<T: IoVectoredBuf>(&mut self, buf: T) -> BufResult<usize, T> {
364        (&*self).write_vectored(buf).await
365    }
366
367    #[inline]
368    async fn flush(&mut self) -> io::Result<()> {
369        (&*self).flush().await
370    }
371
372    #[inline]
373    async fn shutdown(&mut self) -> io::Result<()> {
374        (&*self).shutdown().await
375    }
376}
377
378impl AsyncWrite for &Sender {
379    async fn write<T: IoBuf>(&mut self, buffer: T) -> BufResult<usize, T> {
380        let fd = self.to_shared_fd();
381        let op = Write::new(fd, buffer);
382        compio_runtime::submit(op).await.into_inner()
383    }
384
385    async fn write_vectored<T: IoVectoredBuf>(&mut self, buffer: T) -> BufResult<usize, T> {
386        let fd = self.to_shared_fd();
387        let op = WriteVectored::new(fd, buffer);
388        compio_runtime::submit(op).await.into_inner()
389    }
390
391    #[inline]
392    async fn flush(&mut self) -> io::Result<()> {
393        Ok(())
394    }
395
396    #[inline]
397    async fn shutdown(&mut self) -> io::Result<()> {
398        Ok(())
399    }
400}
401
402impl_raw_fd!(Sender, std::fs::File, file, file);
403
404/// Reading end of a Unix pipe.
405///
406/// It can be constructed from a FIFO file with [`OpenOptions::open_receiver`].
407///
408/// # Examples
409///
410/// Receiving messages from a named pipe in a loop:
411///
412/// ```no_run
413/// use std::io;
414///
415/// use compio_buf::BufResult;
416/// use compio_fs::pipe;
417/// use compio_io::AsyncReadExt;
418///
419/// const FIFO_NAME: &str = "path/to/a/fifo";
420///
421/// # async fn dox() -> io::Result<()> {
422/// let mut rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME).await?;
423/// loop {
424///     let mut msg = Vec::with_capacity(256);
425///     let BufResult(res, msg) = rx.read_exact(msg).await;
426///     match res {
427///         Ok(_) => { /* handle the message */ }
428///         Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
429///             // Writing end has been closed, we should reopen the pipe.
430///             rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME).await?;
431///         }
432///         Err(e) => return Err(e.into()),
433///     }
434/// }
435/// # }
436/// ```
437///
438/// On Linux, you can use a `Receiver` in read-write access mode to implement
439/// resilient reading from a named pipe. Unlike `Receiver` opened in read-only
440/// mode, read from a pipe in read-write mode will not fail with `UnexpectedEof`
441/// when the writing end is closed. This way, a `Receiver` can asynchronously
442/// wait for the next writer to open the pipe.
443///
444/// You should not use functions waiting for EOF such as [`read_to_end`] with
445/// a `Receiver` in read-write access mode, since it **may wait forever**.
446/// `Receiver` in this mode also holds an open writing end, which prevents
447/// receiving EOF.
448///
449/// To set the read-write access mode you can use `OpenOptions::read_write`.
450/// Note that using read-write access mode with FIFO files is not defined by
451/// the POSIX standard and it is only guaranteed to work on Linux.
452///
453/// ```ignore
454/// use compio_fs::pipe;
455/// use compio_io::AsyncReadExt;
456///
457/// const FIFO_NAME: &str = "path/to/a/fifo";
458///
459/// # async fn dox() {
460/// let mut rx = pipe::OpenOptions::new()
461///     .read_write(true)
462///     .open_receiver(FIFO_NAME)
463///     .unwrap();
464/// loop {
465///     let mut msg = Vec::with_capacity(256);
466///     rx.read_exact(msg).await.unwrap();
467///     // handle the message
468/// }
469/// # }
470/// ```
471///
472/// [`read_to_end`]: compio_io::AsyncReadExt::read_to_end
473#[derive(Debug, Clone)]
474pub struct Receiver {
475    file: File,
476}
477
478impl Receiver {
479    pub(crate) fn from_file(file: File) -> io::Result<Receiver> {
480        set_nonblocking(&file)?;
481        Ok(Receiver { file })
482    }
483
484    /// Close the pipe. If the returned future is dropped before polling, the
485    /// pipe won't be closed.
486    pub fn close(self) -> impl Future<Output = io::Result<()>> {
487        self.file.close()
488    }
489}
490
491impl AsyncRead for Receiver {
492    async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
493        (&*self).read(buf).await
494    }
495
496    async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
497        (&*self).read_vectored(buf).await
498    }
499}
500
501impl AsyncRead for &Receiver {
502    async fn read<B: IoBufMut>(&mut self, buffer: B) -> BufResult<usize, B> {
503        let fd = self.to_shared_fd();
504        let op = Read::new(fd, buffer);
505        let res = compio_runtime::submit(op).await.into_inner();
506        unsafe { res.map_advanced() }
507    }
508
509    async fn read_vectored<V: IoVectoredBufMut>(&mut self, buffer: V) -> BufResult<usize, V> {
510        let fd = self.to_shared_fd();
511        let op = ReadVectored::new(fd, buffer);
512        let res = compio_runtime::submit(op).await.into_inner();
513        unsafe { res.map_vec_advanced() }
514    }
515}
516
517impl AsyncReadManaged for Receiver {
518    type Buffer<'a> = BorrowedBuffer<'a>;
519    type BufferPool = BufferPool;
520
521    async fn read_managed<'a>(
522        &mut self,
523        buffer_pool: &'a Self::BufferPool,
524        len: usize,
525    ) -> io::Result<Self::Buffer<'a>> {
526        (&*self).read_managed(buffer_pool, len).await
527    }
528}
529
530impl AsyncReadManaged for &Receiver {
531    type Buffer<'a> = BorrowedBuffer<'a>;
532    type BufferPool = BufferPool;
533
534    async fn read_managed<'a>(
535        &mut self,
536        buffer_pool: &'a Self::BufferPool,
537        len: usize,
538    ) -> io::Result<Self::Buffer<'a>> {
539        let fd = self.to_shared_fd();
540        let buffer_pool = buffer_pool.try_inner()?;
541        let op = ReadManaged::new(fd, buffer_pool, len)?;
542        compio_runtime::submit(op)
543            .with_extra()
544            .await
545            .take_buffer(buffer_pool)
546    }
547}
548
549impl_raw_fd!(Receiver, std::fs::File, file, file);
550
551/// Checks if file is a FIFO
552async fn is_fifo(file: &File) -> io::Result<bool> {
553    use std::os::unix::prelude::FileTypeExt;
554
555    Ok(file.metadata().await?.file_type().is_fifo())
556}
557
558/// Sets file's flags with O_NONBLOCK by fcntl.
559fn set_nonblocking(file: &impl AsRawFd) -> io::Result<()> {
560    if compio_runtime::Runtime::with_current(|r| r.driver_type()).is_polling() {
561        let fd = file.as_raw_fd();
562        let current_flags = syscall!(libc::fcntl(fd, libc::F_GETFL))?;
563        let flags = current_flags | libc::O_NONBLOCK;
564        if flags != current_flags {
565            syscall!(libc::fcntl(fd, libc::F_SETFL, flags))?;
566        }
567    }
568    Ok(())
569}