compio_fs/pipe/mod.rs
1//! Unix pipe types.
2
3use std::{
4 future::Future,
5 io,
6 os::fd::{FromRawFd, IntoRawFd},
7 path::Path,
8};
9
10use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
11use compio_driver::{
12 AsRawFd, ToSharedFd, impl_raw_fd,
13 op::{
14 BufResultExt, Read, ReadManaged, ReadVectored, ResultTakeBuffer, VecBufResultExt, Write,
15 WriteVectored,
16 },
17 syscall,
18};
19use compio_io::{AsyncRead, AsyncReadManaged, AsyncWrite};
20use compio_runtime::{BorrowedBuffer, BufferPool};
21
22use crate::File;
23
24#[cfg(linux_all)]
25mod splice;
26
27#[cfg(linux_all)]
28pub use splice::*;
29
30/// Creates a pair of anonymous pipe.
31///
32/// ```
33/// use compio_fs::pipe::anonymous;
34/// use compio_io::{AsyncReadExt, AsyncWriteExt};
35///
36/// # compio_runtime::Runtime::new().unwrap().block_on(async {
37/// let (mut rx, mut tx) = anonymous().unwrap();
38///
39/// tx.write_all("Hello world!").await.unwrap();
40/// let (_, buf) = rx.read_exact(Vec::with_capacity(12)).await.unwrap();
41/// assert_eq!(&buf, b"Hello world!");
42/// # });
43/// ```
44pub fn anonymous() -> io::Result<(Receiver, Sender)> {
45 let (receiver, sender) = os_pipe::pipe()?;
46 let receiver = Receiver::from_file(File::from_std(unsafe {
47 std::fs::File::from_raw_fd(receiver.into_raw_fd())
48 })?)?;
49 let sender = Sender::from_file(File::from_std(unsafe {
50 std::fs::File::from_raw_fd(sender.into_raw_fd())
51 })?)?;
52 Ok((receiver, sender))
53}
54
55/// Options and flags which can be used to configure how a FIFO file is opened.
56///
57/// This builder allows configuring how to create a pipe end from a FIFO file.
58/// Generally speaking, when using `OpenOptions`, you'll first call [`new`],
59/// then chain calls to methods to set each option, then call either
60/// [`open_receiver`] or [`open_sender`], passing the path of the FIFO file you
61/// are trying to open. This will give you a [`io::Result`] with a pipe end
62/// inside that you can further operate on.
63///
64/// [`new`]: OpenOptions::new
65/// [`open_receiver`]: OpenOptions::open_receiver
66/// [`open_sender`]: OpenOptions::open_sender
67///
68/// # Examples
69///
70/// Opening a pair of pipe ends from a FIFO file:
71///
72/// ```no_run
73/// use compio_fs::pipe;
74///
75/// const FIFO_NAME: &str = "path/to/a/fifo";
76///
77/// # async fn dox() -> std::io::Result<()> {
78/// let rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME).await?;
79/// let tx = pipe::OpenOptions::new().open_sender(FIFO_NAME).await?;
80/// # Ok(())
81/// # }
82/// ```
83///
84/// Opening a [`Sender`] on Linux when you are sure the file is a FIFO:
85///
86/// ```ignore
87/// use compio_fs::pipe;
88/// use nix::{sys::stat::Mode, unistd::mkfifo};
89///
90/// // Our program has exclusive access to this path.
91/// const FIFO_NAME: &str = "path/to/a/new/fifo";
92///
93/// # async fn dox() -> std::io::Result<()> {
94/// mkfifo(FIFO_NAME, Mode::S_IRWXU)?;
95/// let tx = pipe::OpenOptions::new()
96/// .read_write(true)
97/// .unchecked(true)
98/// .open_sender(FIFO_NAME)?;
99/// # Ok(())
100/// # }
101/// ```
102#[derive(Clone, Debug)]
103pub struct OpenOptions {
104 #[cfg(target_os = "linux")]
105 read_write: bool,
106 unchecked: bool,
107}
108
109impl OpenOptions {
110 /// Creates a blank new set of options ready for configuration.
111 ///
112 /// All options are initially set to `false`.
113 pub fn new() -> OpenOptions {
114 OpenOptions {
115 #[cfg(target_os = "linux")]
116 read_write: false,
117 unchecked: false,
118 }
119 }
120
121 /// Sets the option for read-write access.
122 ///
123 /// This option, when true, will indicate that a FIFO file will be opened
124 /// in read-write access mode. This operation is not defined by the POSIX
125 /// standard and is only guaranteed to work on Linux.
126 ///
127 /// # Examples
128 ///
129 /// Opening a [`Sender`] even if there are no open reading ends:
130 ///
131 /// ```
132 /// use compio_fs::pipe;
133 ///
134 /// # compio_runtime::Runtime::new().unwrap().block_on(async {
135 /// let tx = pipe::OpenOptions::new()
136 /// .read_write(true)
137 /// .open_sender("path/to/a/fifo")
138 /// .await;
139 /// # });
140 /// ```
141 ///
142 /// Opening a resilient [`Receiver`] i.e. a reading pipe end which will not
143 /// fail with [`UnexpectedEof`] during reading if all writing ends of the
144 /// pipe close the FIFO file.
145 ///
146 /// [`UnexpectedEof`]: std::io::ErrorKind::UnexpectedEof
147 ///
148 /// ```
149 /// use compio_fs::pipe;
150 ///
151 /// # compio_runtime::Runtime::new().unwrap().block_on(async {
152 /// let tx = pipe::OpenOptions::new()
153 /// .read_write(true)
154 /// .open_receiver("path/to/a/fifo")
155 /// .await;
156 /// # });
157 /// ```
158 #[cfg(target_os = "linux")]
159 #[cfg_attr(docsrs, doc(cfg(target_os = "linux")))]
160 pub fn read_write(&mut self, value: bool) -> &mut Self {
161 self.read_write = value;
162 self
163 }
164
165 /// Sets the option to skip the check for FIFO file type.
166 ///
167 /// By default, [`open_receiver`] and [`open_sender`] functions will check
168 /// if the opened file is a FIFO file. Set this option to `true` if you are
169 /// sure the file is a FIFO file.
170 ///
171 /// [`open_receiver`]: OpenOptions::open_receiver
172 /// [`open_sender`]: OpenOptions::open_sender
173 ///
174 /// # Examples
175 ///
176 /// ```no_run
177 /// use compio_fs::pipe;
178 /// use nix::{sys::stat::Mode, unistd::mkfifo};
179 ///
180 /// // Our program has exclusive access to this path.
181 /// const FIFO_NAME: &str = "path/to/a/new/fifo";
182 ///
183 /// # async fn dox() -> std::io::Result<()> {
184 /// mkfifo(FIFO_NAME, Mode::S_IRWXU)?;
185 /// let rx = pipe::OpenOptions::new()
186 /// .unchecked(true)
187 /// .open_receiver(FIFO_NAME)
188 /// .await?;
189 /// # Ok(())
190 /// # }
191 /// ```
192 pub fn unchecked(&mut self, value: bool) -> &mut Self {
193 self.unchecked = value;
194 self
195 }
196
197 /// Creates a [`Receiver`] from a FIFO file with the options specified by
198 /// `self`.
199 ///
200 /// This function will open the FIFO file at the specified path, possibly
201 /// check if it is a pipe, and associate the pipe with the default event
202 /// loop for reading.
203 ///
204 /// # Errors
205 ///
206 /// If the file type check fails, this function will fail with
207 /// `io::ErrorKind::InvalidInput`. This function may also fail with
208 /// other standard OS errors.
209 pub async fn open_receiver<P: AsRef<Path>>(&self, path: P) -> io::Result<Receiver> {
210 let file = self.open(path.as_ref(), PipeEnd::Receiver).await?;
211 Receiver::from_file(file)
212 }
213
214 /// Creates a [`Sender`] from a FIFO file with the options specified by
215 /// `self`.
216 ///
217 /// This function will open the FIFO file at the specified path, possibly
218 /// check if it is a pipe, and associate the pipe with the default event
219 /// loop for writing.
220 ///
221 /// # Errors
222 ///
223 /// If the file type check fails, this function will fail with
224 /// `io::ErrorKind::InvalidInput`. If the file is not opened in
225 /// read-write access mode and the file is not currently open for
226 /// reading, this function will fail with `ENXIO`. This function may
227 /// also fail with other standard OS errors.
228 pub async fn open_sender<P: AsRef<Path>>(&self, path: P) -> io::Result<Sender> {
229 let file = self.open(path.as_ref(), PipeEnd::Sender).await?;
230 Sender::from_file(file)
231 }
232
233 async fn open(&self, path: &Path, pipe_end: PipeEnd) -> io::Result<File> {
234 let mut options = crate::OpenOptions::new();
235 options
236 .read(pipe_end == PipeEnd::Receiver)
237 .write(pipe_end == PipeEnd::Sender);
238
239 #[cfg(target_os = "linux")]
240 if self.read_write {
241 options.read(true).write(true);
242 }
243
244 let file = options.open(path).await?;
245
246 if !self.unchecked && !is_fifo(&file).await? {
247 return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
248 }
249
250 Ok(file)
251 }
252}
253
254impl Default for OpenOptions {
255 fn default() -> OpenOptions {
256 OpenOptions::new()
257 }
258}
259
260#[derive(Clone, Copy, PartialEq, Eq, Debug)]
261enum PipeEnd {
262 Sender,
263 Receiver,
264}
265
266/// Writing end of a Unix pipe.
267///
268/// It can be constructed from a FIFO file with [`OpenOptions::open_sender`].
269///
270/// Opening a named pipe for writing involves a few steps.
271/// Call to [`OpenOptions::open_sender`] might fail with an error indicating
272/// different things:
273///
274/// * [`io::ErrorKind::NotFound`] - There is no file at the specified path.
275/// * [`io::ErrorKind::InvalidInput`] - The file exists, but it is not a FIFO.
276/// * [`ENXIO`] - The file is a FIFO, but no process has it open for reading.
277/// Sleep for a while and try again.
278/// * Other OS errors not specific to opening FIFO files.
279///
280/// Opening a `Sender` from a FIFO file should look like this:
281///
282/// ```no_run
283/// use std::time::Duration;
284///
285/// use compio_fs::pipe;
286/// use compio_runtime::time;
287///
288/// const FIFO_NAME: &str = "path/to/a/fifo";
289///
290/// # async fn dox() -> std::io::Result<()> {
291/// // Wait for a reader to open the file.
292/// let tx = loop {
293/// match pipe::OpenOptions::new().open_sender(FIFO_NAME).await {
294/// Ok(tx) => break tx,
295/// Err(e) if e.raw_os_error() == Some(libc::ENXIO) => {}
296/// Err(e) => return Err(e.into()),
297/// }
298///
299/// time::sleep(Duration::from_millis(50)).await;
300/// };
301/// # Ok(())
302/// # }
303/// ```
304///
305/// On Linux, it is possible to create a `Sender` without waiting in a sleeping
306/// loop. This is done by opening a named pipe in read-write access mode with
307/// `OpenOptions::read_write`. This way, a `Sender` can at the same time hold
308/// both a writing end and a reading end, and the latter allows to open a FIFO
309/// without [`ENXIO`] error since the pipe is open for reading as well.
310///
311/// `Sender` cannot be used to read from a pipe, so in practice the read access
312/// is only used when a FIFO is opened. However, using a `Sender` in read-write
313/// mode **may lead to lost data**, because written data will be dropped by the
314/// system as soon as all pipe ends are closed. To avoid lost data you have to
315/// make sure that a reading end has been opened before dropping a `Sender`.
316///
317/// Note that using read-write access mode with FIFO files is not defined by
318/// the POSIX standard and it is only guaranteed to work on Linux.
319///
320/// ```ignore
321/// use compio_fs::pipe;
322/// use compio_io::AsyncWriteExt;
323///
324/// const FIFO_NAME: &str = "path/to/a/fifo";
325///
326/// # async fn dox() {
327/// let mut tx = pipe::OpenOptions::new()
328/// .read_write(true)
329/// .open_sender(FIFO_NAME)
330/// .unwrap();
331///
332/// // Asynchronously write to the pipe before a reader.
333/// tx.write_all("hello world").await.unwrap();
334/// # }
335/// ```
336///
337/// [`ENXIO`]: https://docs.rs/libc/latest/libc/constant.ENXIO.html
338#[derive(Debug, Clone)]
339pub struct Sender {
340 file: File,
341}
342
343impl Sender {
344 pub(crate) fn from_file(file: File) -> io::Result<Sender> {
345 set_nonblocking(&file)?;
346 Ok(Sender { file })
347 }
348
349 /// Close the pipe. If the returned future is dropped before polling, the
350 /// pipe won't be closed.
351 pub fn close(self) -> impl Future<Output = io::Result<()>> {
352 self.file.close()
353 }
354}
355
356impl AsyncWrite for Sender {
357 #[inline]
358 async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
359 (&*self).write(buf).await
360 }
361
362 #[inline]
363 async fn write_vectored<T: IoVectoredBuf>(&mut self, buf: T) -> BufResult<usize, T> {
364 (&*self).write_vectored(buf).await
365 }
366
367 #[inline]
368 async fn flush(&mut self) -> io::Result<()> {
369 (&*self).flush().await
370 }
371
372 #[inline]
373 async fn shutdown(&mut self) -> io::Result<()> {
374 (&*self).shutdown().await
375 }
376}
377
378impl AsyncWrite for &Sender {
379 async fn write<T: IoBuf>(&mut self, buffer: T) -> BufResult<usize, T> {
380 let fd = self.to_shared_fd();
381 let op = Write::new(fd, buffer);
382 compio_runtime::submit(op).await.into_inner()
383 }
384
385 async fn write_vectored<T: IoVectoredBuf>(&mut self, buffer: T) -> BufResult<usize, T> {
386 let fd = self.to_shared_fd();
387 let op = WriteVectored::new(fd, buffer);
388 compio_runtime::submit(op).await.into_inner()
389 }
390
391 #[inline]
392 async fn flush(&mut self) -> io::Result<()> {
393 Ok(())
394 }
395
396 #[inline]
397 async fn shutdown(&mut self) -> io::Result<()> {
398 Ok(())
399 }
400}
401
402impl_raw_fd!(Sender, std::fs::File, file, file);
403
404/// Reading end of a Unix pipe.
405///
406/// It can be constructed from a FIFO file with [`OpenOptions::open_receiver`].
407///
408/// # Examples
409///
410/// Receiving messages from a named pipe in a loop:
411///
412/// ```no_run
413/// use std::io;
414///
415/// use compio_buf::BufResult;
416/// use compio_fs::pipe;
417/// use compio_io::AsyncReadExt;
418///
419/// const FIFO_NAME: &str = "path/to/a/fifo";
420///
421/// # async fn dox() -> io::Result<()> {
422/// let mut rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME).await?;
423/// loop {
424/// let mut msg = Vec::with_capacity(256);
425/// let BufResult(res, msg) = rx.read_exact(msg).await;
426/// match res {
427/// Ok(_) => { /* handle the message */ }
428/// Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
429/// // Writing end has been closed, we should reopen the pipe.
430/// rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME).await?;
431/// }
432/// Err(e) => return Err(e.into()),
433/// }
434/// }
435/// # }
436/// ```
437///
438/// On Linux, you can use a `Receiver` in read-write access mode to implement
439/// resilient reading from a named pipe. Unlike `Receiver` opened in read-only
440/// mode, read from a pipe in read-write mode will not fail with `UnexpectedEof`
441/// when the writing end is closed. This way, a `Receiver` can asynchronously
442/// wait for the next writer to open the pipe.
443///
444/// You should not use functions waiting for EOF such as [`read_to_end`] with
445/// a `Receiver` in read-write access mode, since it **may wait forever**.
446/// `Receiver` in this mode also holds an open writing end, which prevents
447/// receiving EOF.
448///
449/// To set the read-write access mode you can use `OpenOptions::read_write`.
450/// Note that using read-write access mode with FIFO files is not defined by
451/// the POSIX standard and it is only guaranteed to work on Linux.
452///
453/// ```ignore
454/// use compio_fs::pipe;
455/// use compio_io::AsyncReadExt;
456///
457/// const FIFO_NAME: &str = "path/to/a/fifo";
458///
459/// # async fn dox() {
460/// let mut rx = pipe::OpenOptions::new()
461/// .read_write(true)
462/// .open_receiver(FIFO_NAME)
463/// .unwrap();
464/// loop {
465/// let mut msg = Vec::with_capacity(256);
466/// rx.read_exact(msg).await.unwrap();
467/// // handle the message
468/// }
469/// # }
470/// ```
471///
472/// [`read_to_end`]: compio_io::AsyncReadExt::read_to_end
473#[derive(Debug, Clone)]
474pub struct Receiver {
475 file: File,
476}
477
478impl Receiver {
479 pub(crate) fn from_file(file: File) -> io::Result<Receiver> {
480 set_nonblocking(&file)?;
481 Ok(Receiver { file })
482 }
483
484 /// Close the pipe. If the returned future is dropped before polling, the
485 /// pipe won't be closed.
486 pub fn close(self) -> impl Future<Output = io::Result<()>> {
487 self.file.close()
488 }
489}
490
491impl AsyncRead for Receiver {
492 async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
493 (&*self).read(buf).await
494 }
495
496 async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
497 (&*self).read_vectored(buf).await
498 }
499}
500
501impl AsyncRead for &Receiver {
502 async fn read<B: IoBufMut>(&mut self, buffer: B) -> BufResult<usize, B> {
503 let fd = self.to_shared_fd();
504 let op = Read::new(fd, buffer);
505 let res = compio_runtime::submit(op).await.into_inner();
506 unsafe { res.map_advanced() }
507 }
508
509 async fn read_vectored<V: IoVectoredBufMut>(&mut self, buffer: V) -> BufResult<usize, V> {
510 let fd = self.to_shared_fd();
511 let op = ReadVectored::new(fd, buffer);
512 let res = compio_runtime::submit(op).await.into_inner();
513 unsafe { res.map_vec_advanced() }
514 }
515}
516
517impl AsyncReadManaged for Receiver {
518 type Buffer<'a> = BorrowedBuffer<'a>;
519 type BufferPool = BufferPool;
520
521 async fn read_managed<'a>(
522 &mut self,
523 buffer_pool: &'a Self::BufferPool,
524 len: usize,
525 ) -> io::Result<Self::Buffer<'a>> {
526 (&*self).read_managed(buffer_pool, len).await
527 }
528}
529
530impl AsyncReadManaged for &Receiver {
531 type Buffer<'a> = BorrowedBuffer<'a>;
532 type BufferPool = BufferPool;
533
534 async fn read_managed<'a>(
535 &mut self,
536 buffer_pool: &'a Self::BufferPool,
537 len: usize,
538 ) -> io::Result<Self::Buffer<'a>> {
539 let fd = self.to_shared_fd();
540 let buffer_pool = buffer_pool.try_inner()?;
541 let op = ReadManaged::new(fd, buffer_pool, len)?;
542 compio_runtime::submit(op)
543 .with_extra()
544 .await
545 .take_buffer(buffer_pool)
546 }
547}
548
549impl_raw_fd!(Receiver, std::fs::File, file, file);
550
551/// Checks if file is a FIFO
552async fn is_fifo(file: &File) -> io::Result<bool> {
553 use std::os::unix::prelude::FileTypeExt;
554
555 Ok(file.metadata().await?.file_type().is_fifo())
556}
557
558/// Sets file's flags with O_NONBLOCK by fcntl.
559fn set_nonblocking(file: &impl AsRawFd) -> io::Result<()> {
560 if compio_runtime::Runtime::with_current(|r| r.driver_type()).is_polling() {
561 let fd = file.as_raw_fd();
562 let current_flags = syscall!(libc::fcntl(fd, libc::F_GETFL))?;
563 let flags = current_flags | libc::O_NONBLOCK;
564 if flags != current_flags {
565 syscall!(libc::fcntl(fd, libc::F_SETFL, flags))?;
566 }
567 }
568 Ok(())
569}