1 //! Unix pipe types.
2 
3 use crate::io::interest::Interest;
4 use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf, Ready};
5 
6 use mio::unix::pipe as mio_pipe;
7 use std::fs::File;
8 use std::io::{self, Read, Write};
9 use std::os::unix::fs::OpenOptionsExt;
10 use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
11 use std::path::Path;
12 use std::pin::Pin;
13 use std::task::{Context, Poll};
14 
15 cfg_io_util! {
16     use bytes::BufMut;
17 }
18 
19 /// Creates a new anonymous Unix pipe.
20 ///
21 /// This function will open a new pipe and associate both pipe ends with the default
22 /// event loop.
23 ///
24 /// If you need to create a pipe for communication with a spawned process, you can
25 /// use [`Stdio::piped()`] instead.
26 ///
27 /// [`Stdio::piped()`]: std::process::Stdio::piped
28 ///
29 /// # Errors
30 ///
31 /// If creating a pipe fails, this function will return with the related OS error.
32 ///
33 /// # Examples
34 ///
35 /// Create a pipe and pass the writing end to a spawned process.
36 ///
37 /// ```no_run
38 /// use tokio::net::unix::pipe;
39 /// use tokio::process::Command;
40 /// # use tokio::io::AsyncReadExt;
41 /// # use std::error::Error;
42 ///
43 /// # async fn dox() -> Result<(), Box<dyn Error>> {
44 /// let (tx, mut rx) = pipe::pipe()?;
45 /// let mut buffer = String::new();
46 ///
47 /// let status = Command::new("echo")
48 ///     .arg("Hello, world!")
49 ///     .stdout(tx.into_blocking_fd()?)
50 ///     .status();
51 /// rx.read_to_string(&mut buffer).await?;
52 ///
53 /// assert!(status.await?.success());
54 /// assert_eq!(buffer, "Hello, world!\n");
55 /// # Ok(())
56 /// # }
57 /// ```
58 ///
59 /// # Panics
60 ///
61 /// This function panics if it is not called from within a runtime with
62 /// IO enabled.
63 ///
64 /// The runtime is usually set implicitly when this function is called
65 /// from a future driven by a tokio runtime, otherwise runtime can be set
66 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pipe() -> io::Result<(Sender, Receiver)>67 pub fn pipe() -> io::Result<(Sender, Receiver)> {
68     let (tx, rx) = mio_pipe::new()?;
69     Ok((Sender::from_mio(tx)?, Receiver::from_mio(rx)?))
70 }
71 
72 /// Options and flags which can be used to configure how a FIFO file is opened.
73 ///
74 /// This builder allows configuring how to create a pipe end from a FIFO file.
75 /// Generally speaking, when using `OpenOptions`, you'll first call [`new`],
76 /// then chain calls to methods to set each option, then call either
77 /// [`open_receiver`] or [`open_sender`], passing the path of the FIFO file you
78 /// are trying to open. This will give you a [`io::Result`] with a pipe end
79 /// inside that you can further operate on.
80 ///
81 /// [`new`]: OpenOptions::new
82 /// [`open_receiver`]: OpenOptions::open_receiver
83 /// [`open_sender`]: OpenOptions::open_sender
84 ///
85 /// # Examples
86 ///
87 /// Opening a pair of pipe ends from a FIFO file:
88 ///
89 /// ```no_run
90 /// use tokio::net::unix::pipe;
91 /// # use std::error::Error;
92 ///
93 /// const FIFO_NAME: &str = "path/to/a/fifo";
94 ///
95 /// # async fn dox() -> Result<(), Box<dyn Error>> {
96 /// let rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?;
97 /// let tx = pipe::OpenOptions::new().open_sender(FIFO_NAME)?;
98 /// # Ok(())
99 /// # }
100 /// ```
101 ///
102 /// Opening a [`Sender`] on Linux when you are sure the file is a FIFO:
103 ///
104 /// ```ignore
105 /// use tokio::net::unix::pipe;
106 /// use nix::{unistd::mkfifo, sys::stat::Mode};
107 /// # use std::error::Error;
108 ///
109 /// // Our program has exclusive access to this path.
110 /// const FIFO_NAME: &str = "path/to/a/new/fifo";
111 ///
112 /// # async fn dox() -> Result<(), Box<dyn Error>> {
113 /// mkfifo(FIFO_NAME, Mode::S_IRWXU)?;
114 /// let tx = pipe::OpenOptions::new()
115 ///     .read_write(true)
116 ///     .unchecked(true)
117 ///     .open_sender(FIFO_NAME)?;
118 /// # Ok(())
119 /// # }
120 /// ```
121 #[derive(Clone, Debug)]
122 pub struct OpenOptions {
123     #[cfg(target_os = "linux")]
124     read_write: bool,
125     unchecked: bool,
126 }
127 
128 impl OpenOptions {
129     /// Creates a blank new set of options ready for configuration.
130     ///
131     /// All options are initially set to `false`.
new() -> OpenOptions132     pub fn new() -> OpenOptions {
133         OpenOptions {
134             #[cfg(target_os = "linux")]
135             read_write: false,
136             unchecked: false,
137         }
138     }
139 
140     /// Sets the option for read-write access.
141     ///
142     /// This option, when true, will indicate that a FIFO file will be opened
143     /// in read-write access mode. This operation is not defined by the POSIX
144     /// standard and is only guaranteed to work on Linux.
145     ///
146     /// # Examples
147     ///
148     /// Opening a [`Sender`] even if there are no open reading ends:
149     ///
150     /// ```ignore
151     /// use tokio::net::unix::pipe;
152     ///
153     /// let tx = pipe::OpenOptions::new()
154     ///     .read_write(true)
155     ///     .open_sender("path/to/a/fifo");
156     /// ```
157     ///
158     /// Opening a resilient [`Receiver`] i.e. a reading pipe end which will not
159     /// fail with [`UnexpectedEof`] during reading if all writing ends of the
160     /// pipe close the FIFO file.
161     ///
162     /// [`UnexpectedEof`]: std::io::ErrorKind::UnexpectedEof
163     ///
164     /// ```ignore
165     /// use tokio::net::unix::pipe;
166     ///
167     /// let tx = pipe::OpenOptions::new()
168     ///     .read_write(true)
169     ///     .open_receiver("path/to/a/fifo");
170     /// ```
171     #[cfg(target_os = "linux")]
172     #[cfg_attr(docsrs, doc(cfg(target_os = "linux")))]
read_write(&mut self, value: bool) -> &mut Self173     pub fn read_write(&mut self, value: bool) -> &mut Self {
174         self.read_write = value;
175         self
176     }
177 
178     /// Sets the option to skip the check for FIFO file type.
179     ///
180     /// By default, [`open_receiver`] and [`open_sender`] functions will check
181     /// if the opened file is a FIFO file. Set this option to `true` if you are
182     /// sure the file is a FIFO file.
183     ///
184     /// [`open_receiver`]: OpenOptions::open_receiver
185     /// [`open_sender`]: OpenOptions::open_sender
186     ///
187     /// # Examples
188     ///
189     /// ```no_run
190     /// use tokio::net::unix::pipe;
191     /// use nix::{unistd::mkfifo, sys::stat::Mode};
192     /// # use std::error::Error;
193     ///
194     /// // Our program has exclusive access to this path.
195     /// const FIFO_NAME: &str = "path/to/a/new/fifo";
196     ///
197     /// # async fn dox() -> Result<(), Box<dyn Error>> {
198     /// mkfifo(FIFO_NAME, Mode::S_IRWXU)?;
199     /// let rx = pipe::OpenOptions::new()
200     ///     .unchecked(true)
201     ///     .open_receiver(FIFO_NAME)?;
202     /// # Ok(())
203     /// # }
204     /// ```
unchecked(&mut self, value: bool) -> &mut Self205     pub fn unchecked(&mut self, value: bool) -> &mut Self {
206         self.unchecked = value;
207         self
208     }
209 
210     /// Creates a [`Receiver`] from a FIFO file with the options specified by `self`.
211     ///
212     /// This function will open the FIFO file at the specified path, possibly
213     /// check if it is a pipe, and associate the pipe with the default event
214     /// loop for reading.
215     ///
216     /// # Errors
217     ///
218     /// If the file type check fails, this function will fail with `io::ErrorKind::InvalidInput`.
219     /// This function may also fail with other standard OS errors.
220     ///
221     /// # Panics
222     ///
223     /// This function panics if it is not called from within a runtime with
224     /// IO enabled.
225     ///
226     /// The runtime is usually set implicitly when this function is called
227     /// from a future driven by a tokio runtime, otherwise runtime can be set
228     /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
open_receiver<P: AsRef<Path>>(&self, path: P) -> io::Result<Receiver>229     pub fn open_receiver<P: AsRef<Path>>(&self, path: P) -> io::Result<Receiver> {
230         let file = self.open(path.as_ref(), PipeEnd::Receiver)?;
231         Receiver::from_file_unchecked(file)
232     }
233 
234     /// Creates a [`Sender`] from a FIFO file with the options specified by `self`.
235     ///
236     /// This function will open the FIFO file at the specified path, possibly
237     /// check if it is a pipe, and associate the pipe with the default event
238     /// loop for writing.
239     ///
240     /// # Errors
241     ///
242     /// If the file type check fails, this function will fail with `io::ErrorKind::InvalidInput`.
243     /// If the file is not opened in read-write access mode and the file is not
244     /// currently open for reading, this function will fail with `ENXIO`.
245     /// This function may also fail with other standard OS errors.
246     ///
247     /// # Panics
248     ///
249     /// This function panics if it is not called from within a runtime with
250     /// IO enabled.
251     ///
252     /// The runtime is usually set implicitly when this function is called
253     /// from a future driven by a tokio runtime, otherwise runtime can be set
254     /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
open_sender<P: AsRef<Path>>(&self, path: P) -> io::Result<Sender>255     pub fn open_sender<P: AsRef<Path>>(&self, path: P) -> io::Result<Sender> {
256         let file = self.open(path.as_ref(), PipeEnd::Sender)?;
257         Sender::from_file_unchecked(file)
258     }
259 
open(&self, path: &Path, pipe_end: PipeEnd) -> io::Result<File>260     fn open(&self, path: &Path, pipe_end: PipeEnd) -> io::Result<File> {
261         let mut options = std::fs::OpenOptions::new();
262         options
263             .read(pipe_end == PipeEnd::Receiver)
264             .write(pipe_end == PipeEnd::Sender)
265             .custom_flags(libc::O_NONBLOCK);
266 
267         #[cfg(target_os = "linux")]
268         if self.read_write {
269             options.read(true).write(true);
270         }
271 
272         let file = options.open(path)?;
273 
274         if !self.unchecked && !is_pipe(file.as_fd())? {
275             return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
276         }
277 
278         Ok(file)
279     }
280 }
281 
282 impl Default for OpenOptions {
default() -> OpenOptions283     fn default() -> OpenOptions {
284         OpenOptions::new()
285     }
286 }
287 
288 #[derive(Clone, Copy, PartialEq, Eq, Debug)]
289 enum PipeEnd {
290     Sender,
291     Receiver,
292 }
293 
294 /// Writing end of a Unix pipe.
295 ///
296 /// It can be constructed from a FIFO file with [`OpenOptions::open_sender`].
297 ///
298 /// Opening a named pipe for writing involves a few steps.
299 /// Call to [`OpenOptions::open_sender`] might fail with an error indicating
300 /// different things:
301 ///
302 /// * [`io::ErrorKind::NotFound`] - There is no file at the specified path.
303 /// * [`io::ErrorKind::InvalidInput`] - The file exists, but it is not a FIFO.
304 /// * [`ENXIO`] - The file is a FIFO, but no process has it open for reading.
305 ///   Sleep for a while and try again.
306 /// * Other OS errors not specific to opening FIFO files.
307 ///
308 /// Opening a `Sender` from a FIFO file should look like this:
309 ///
310 /// ```no_run
311 /// use tokio::net::unix::pipe;
312 /// use tokio::time::{self, Duration};
313 ///
314 /// const FIFO_NAME: &str = "path/to/a/fifo";
315 ///
316 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
317 /// // Wait for a reader to open the file.
318 /// let tx = loop {
319 ///     match pipe::OpenOptions::new().open_sender(FIFO_NAME) {
320 ///         Ok(tx) => break tx,
321 ///         Err(e) if e.raw_os_error() == Some(libc::ENXIO) => {},
322 ///         Err(e) => return Err(e.into()),
323 ///     }
324 ///
325 ///     time::sleep(Duration::from_millis(50)).await;
326 /// };
327 /// # Ok(())
328 /// # }
329 /// ```
330 ///
331 /// On Linux, it is possible to create a `Sender` without waiting in a sleeping
332 /// loop. This is done by opening a named pipe in read-write access mode with
333 /// `OpenOptions::read_write`. This way, a `Sender` can at the same time hold
334 /// both a writing end and a reading end, and the latter allows to open a FIFO
335 /// without [`ENXIO`] error since the pipe is open for reading as well.
336 ///
337 /// `Sender` cannot be used to read from a pipe, so in practice the read access
338 /// is only used when a FIFO is opened. However, using a `Sender` in read-write
339 /// mode **may lead to lost data**, because written data will be dropped by the
340 /// system as soon as all pipe ends are closed. To avoid lost data you have to
341 /// make sure that a reading end has been opened before dropping a `Sender`.
342 ///
343 /// Note that using read-write access mode with FIFO files is not defined by
344 /// the POSIX standard and it is only guaranteed to work on Linux.
345 ///
346 /// ```ignore
347 /// use tokio::io::AsyncWriteExt;
348 /// use tokio::net::unix::pipe;
349 ///
350 /// const FIFO_NAME: &str = "path/to/a/fifo";
351 ///
352 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
353 /// let mut tx = pipe::OpenOptions::new()
354 ///     .read_write(true)
355 ///     .open_sender(FIFO_NAME)?;
356 ///
357 /// // Asynchronously write to the pipe before a reader.
358 /// tx.write_all(b"hello world").await?;
359 /// # Ok(())
360 /// # }
361 /// ```
362 ///
363 /// [`ENXIO`]: https://docs.rs/libc/latest/libc/constant.ENXIO.html
364 #[derive(Debug)]
365 pub struct Sender {
366     io: PollEvented<mio_pipe::Sender>,
367 }
368 
369 impl Sender {
from_mio(mio_tx: mio_pipe::Sender) -> io::Result<Sender>370     fn from_mio(mio_tx: mio_pipe::Sender) -> io::Result<Sender> {
371         let io = PollEvented::new_with_interest(mio_tx, Interest::WRITABLE)?;
372         Ok(Sender { io })
373     }
374 
375     /// Creates a new `Sender` from a [`File`].
376     ///
377     /// This function is intended to construct a pipe from a [`File`] representing
378     /// a special FIFO file. It will check if the file is a pipe and has write access,
379     /// set it in non-blocking mode and perform the conversion.
380     ///
381     /// # Errors
382     ///
383     /// Fails with `io::ErrorKind::InvalidInput` if the file is not a pipe or it
384     /// does not have write access. Also fails with any standard OS error if it occurs.
385     ///
386     /// # Panics
387     ///
388     /// This function panics if it is not called from within a runtime with
389     /// IO enabled.
390     ///
391     /// The runtime is usually set implicitly when this function is called
392     /// from a future driven by a tokio runtime, otherwise runtime can be set
393     /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
from_file(file: File) -> io::Result<Sender>394     pub fn from_file(file: File) -> io::Result<Sender> {
395         Sender::from_owned_fd(file.into())
396     }
397 
398     /// Creates a new `Sender` from an [`OwnedFd`].
399     ///
400     /// This function is intended to construct a pipe from an [`OwnedFd`] representing
401     /// an anonymous pipe or a special FIFO file. It will check if the file descriptor
402     /// is a pipe and has write access, set it in non-blocking mode and perform the
403     /// conversion.
404     ///
405     /// # Errors
406     ///
407     /// Fails with `io::ErrorKind::InvalidInput` if the file descriptor is not a pipe
408     /// or it does not have write access. Also fails with any standard OS error if it
409     /// occurs.
410     ///
411     /// # Panics
412     ///
413     /// This function panics if it is not called from within a runtime with
414     /// IO enabled.
415     ///
416     /// The runtime is usually set implicitly when this function is called
417     /// from a future driven by a tokio runtime, otherwise runtime can be set
418     /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
from_owned_fd(owned_fd: OwnedFd) -> io::Result<Sender>419     pub fn from_owned_fd(owned_fd: OwnedFd) -> io::Result<Sender> {
420         if !is_pipe(owned_fd.as_fd())? {
421             return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
422         }
423 
424         let flags = get_file_flags(owned_fd.as_fd())?;
425         if has_write_access(flags) {
426             set_nonblocking(owned_fd.as_fd(), flags)?;
427             Sender::from_owned_fd_unchecked(owned_fd)
428         } else {
429             Err(io::Error::new(
430                 io::ErrorKind::InvalidInput,
431                 "not in O_WRONLY or O_RDWR access mode",
432             ))
433         }
434     }
435 
436     /// Creates a new `Sender` from a [`File`] without checking pipe properties.
437     ///
438     /// This function is intended to construct a pipe from a File representing
439     /// a special FIFO file. The conversion assumes nothing about the underlying
440     /// file; it is left up to the user to make sure it is opened with write access,
441     /// represents a pipe and is set in non-blocking mode.
442     ///
443     /// # Examples
444     ///
445     /// ```no_run
446     /// use tokio::net::unix::pipe;
447     /// use std::fs::OpenOptions;
448     /// use std::os::unix::fs::{FileTypeExt, OpenOptionsExt};
449     /// # use std::error::Error;
450     ///
451     /// const FIFO_NAME: &str = "path/to/a/fifo";
452     ///
453     /// # async fn dox() -> Result<(), Box<dyn Error>> {
454     /// let file = OpenOptions::new()
455     ///     .write(true)
456     ///     .custom_flags(libc::O_NONBLOCK)
457     ///     .open(FIFO_NAME)?;
458     /// if file.metadata()?.file_type().is_fifo() {
459     ///     let tx = pipe::Sender::from_file_unchecked(file)?;
460     ///     /* use the Sender */
461     /// }
462     /// # Ok(())
463     /// # }
464     /// ```
465     ///
466     /// # Panics
467     ///
468     /// This function panics if it is not called from within a runtime with
469     /// IO enabled.
470     ///
471     /// The runtime is usually set implicitly when this function is called
472     /// from a future driven by a tokio runtime, otherwise runtime can be set
473     /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
from_file_unchecked(file: File) -> io::Result<Sender>474     pub fn from_file_unchecked(file: File) -> io::Result<Sender> {
475         Sender::from_owned_fd_unchecked(file.into())
476     }
477 
478     /// Creates a new `Sender` from an [`OwnedFd`] without checking pipe properties.
479     ///
480     /// This function is intended to construct a pipe from an [`OwnedFd`] representing
481     /// an anonymous pipe or a special FIFO file. The conversion assumes nothing about
482     /// the underlying pipe; it is left up to the user to make sure that the file
483     /// descriptor represents the writing end of a pipe and the pipe is set in
484     /// non-blocking mode.
485     ///
486     /// # Panics
487     ///
488     /// This function panics if it is not called from within a runtime with
489     /// IO enabled.
490     ///
491     /// The runtime is usually set implicitly when this function is called
492     /// from a future driven by a tokio runtime, otherwise runtime can be set
493     /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
from_owned_fd_unchecked(owned_fd: OwnedFd) -> io::Result<Sender>494     pub fn from_owned_fd_unchecked(owned_fd: OwnedFd) -> io::Result<Sender> {
495         // Safety: OwnedFd represents a valid, open file descriptor.
496         let mio_tx = unsafe { mio_pipe::Sender::from_raw_fd(owned_fd.into_raw_fd()) };
497         Sender::from_mio(mio_tx)
498     }
499 
500     /// Waits for any of the requested ready states.
501     ///
502     /// This function can be used instead of [`writable()`] to check the returned
503     /// ready set for [`Ready::WRITABLE`] and [`Ready::WRITE_CLOSED`] events.
504     ///
505     /// The function may complete without the pipe being ready. This is a
506     /// false-positive and attempting an operation will return with
507     /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
508     /// [`Ready`] set, so you should always check the returned value and possibly
509     /// wait again if the requested states are not set.
510     ///
511     /// [`writable()`]: Self::writable
512     ///
513     /// # Cancel safety
514     ///
515     /// This method is cancel safe. Once a readiness event occurs, the method
516     /// will continue to return immediately until the readiness event is
517     /// consumed by an attempt to write that fails with `WouldBlock` or
518     /// `Poll::Pending`.
ready(&self, interest: Interest) -> io::Result<Ready>519     pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
520         let event = self.io.registration().readiness(interest).await?;
521         Ok(event.ready)
522     }
523 
524     /// Waits for the pipe to become writable.
525     ///
526     /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
527     /// paired with [`try_write()`].
528     ///
529     /// [`try_write()`]: Self::try_write
530     ///
531     /// # Examples
532     ///
533     /// ```no_run
534     /// use tokio::net::unix::pipe;
535     /// use std::io;
536     ///
537     /// #[tokio::main]
538     /// async fn main() -> io::Result<()> {
539     ///     // Open a writing end of a fifo
540     ///     let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo")?;
541     ///
542     ///     loop {
543     ///         // Wait for the pipe to be writable
544     ///         tx.writable().await?;
545     ///
546     ///         // Try to write data, this may still fail with `WouldBlock`
547     ///         // if the readiness event is a false positive.
548     ///         match tx.try_write(b"hello world") {
549     ///             Ok(n) => {
550     ///                 break;
551     ///             }
552     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
553     ///                 continue;
554     ///             }
555     ///             Err(e) => {
556     ///                 return Err(e.into());
557     ///             }
558     ///         }
559     ///     }
560     ///
561     ///     Ok(())
562     /// }
563     /// ```
writable(&self) -> io::Result<()>564     pub async fn writable(&self) -> io::Result<()> {
565         self.ready(Interest::WRITABLE).await?;
566         Ok(())
567     }
568 
569     /// Polls for write readiness.
570     ///
571     /// If the pipe is not currently ready for writing, this method will
572     /// store a clone of the `Waker` from the provided `Context`. When the pipe
573     /// becomes ready for writing, `Waker::wake` will be called on the waker.
574     ///
575     /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
576     /// the `Waker` from the `Context` passed to the most recent call is
577     /// scheduled to receive a wakeup.
578     ///
579     /// This function is intended for cases where creating and pinning a future
580     /// via [`writable`] is not feasible. Where possible, using [`writable`] is
581     /// preferred, as this supports polling from multiple tasks at once.
582     ///
583     /// [`writable`]: Self::writable
584     ///
585     /// # Return value
586     ///
587     /// The function returns:
588     ///
589     /// * `Poll::Pending` if the pipe is not ready for writing.
590     /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing.
591     /// * `Poll::Ready(Err(e))` if an error is encountered.
592     ///
593     /// # Errors
594     ///
595     /// This function may encounter any standard I/O error except `WouldBlock`.
poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>596     pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
597         self.io.registration().poll_write_ready(cx).map_ok(|_| ())
598     }
599 
600     /// Tries to write a buffer to the pipe, returning how many bytes were
601     /// written.
602     ///
603     /// The function will attempt to write the entire contents of `buf`, but
604     /// only part of the buffer may be written. If the length of `buf` is not
605     /// greater than `PIPE_BUF` (an OS constant, 4096 under Linux), then the
606     /// write is guaranteed to be atomic, i.e. either the entire content of
607     /// `buf` will be written or this method will fail with `WouldBlock`. There
608     /// is no such guarantee if `buf` is larger than `PIPE_BUF`.
609     ///
610     /// This function is usually paired with [`writable`].
611     ///
612     /// [`writable`]: Self::writable
613     ///
614     /// # Return
615     ///
616     /// If data is successfully written, `Ok(n)` is returned, where `n` is the
617     /// number of bytes written. If the pipe is not ready to write data,
618     /// `Err(io::ErrorKind::WouldBlock)` is returned.
619     ///
620     /// # Examples
621     ///
622     /// ```no_run
623     /// use tokio::net::unix::pipe;
624     /// use std::io;
625     ///
626     /// #[tokio::main]
627     /// async fn main() -> io::Result<()> {
628     ///     // Open a writing end of a fifo
629     ///     let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo")?;
630     ///
631     ///     loop {
632     ///         // Wait for the pipe to be writable
633     ///         tx.writable().await?;
634     ///
635     ///         // Try to write data, this may still fail with `WouldBlock`
636     ///         // if the readiness event is a false positive.
637     ///         match tx.try_write(b"hello world") {
638     ///             Ok(n) => {
639     ///                 break;
640     ///             }
641     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
642     ///                 continue;
643     ///             }
644     ///             Err(e) => {
645     ///                 return Err(e.into());
646     ///             }
647     ///         }
648     ///     }
649     ///
650     ///     Ok(())
651     /// }
652     /// ```
try_write(&self, buf: &[u8]) -> io::Result<usize>653     pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
654         self.io
655             .registration()
656             .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
657     }
658 
659     /// Tries to write several buffers to the pipe, returning how many bytes
660     /// were written.
661     ///
662     /// Data is written from each buffer in order, with the final buffer read
663     /// from possible being only partially consumed. This method behaves
664     /// equivalently to a single call to [`try_write()`] with concatenated
665     /// buffers.
666     ///
667     /// If the total length of buffers is not greater than `PIPE_BUF` (an OS
668     /// constant, 4096 under Linux), then the write is guaranteed to be atomic,
669     /// i.e. either the entire contents of buffers will be written or this
670     /// method will fail with `WouldBlock`. There is no such guarantee if the
671     /// total length of buffers is greater than `PIPE_BUF`.
672     ///
673     /// This function is usually paired with [`writable`].
674     ///
675     /// [`try_write()`]: Self::try_write()
676     /// [`writable`]: Self::writable
677     ///
678     /// # Return
679     ///
680     /// If data is successfully written, `Ok(n)` is returned, where `n` is the
681     /// number of bytes written. If the pipe is not ready to write data,
682     /// `Err(io::ErrorKind::WouldBlock)` is returned.
683     ///
684     /// # Examples
685     ///
686     /// ```no_run
687     /// use tokio::net::unix::pipe;
688     /// use std::io;
689     ///
690     /// #[tokio::main]
691     /// async fn main() -> io::Result<()> {
692     ///     // Open a writing end of a fifo
693     ///     let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo")?;
694     ///
695     ///     let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
696     ///
697     ///     loop {
698     ///         // Wait for the pipe to be writable
699     ///         tx.writable().await?;
700     ///
701     ///         // Try to write data, this may still fail with `WouldBlock`
702     ///         // if the readiness event is a false positive.
703     ///         match tx.try_write_vectored(&bufs) {
704     ///             Ok(n) => {
705     ///                 break;
706     ///             }
707     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
708     ///                 continue;
709     ///             }
710     ///             Err(e) => {
711     ///                 return Err(e.into());
712     ///             }
713     ///         }
714     ///     }
715     ///
716     ///     Ok(())
717     /// }
718     /// ```
try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize>719     pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
720         self.io
721             .registration()
722             .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
723     }
724 
725     /// Converts the pipe into an [`OwnedFd`] in blocking mode.
726     ///
727     /// This function will deregister this pipe end from the event loop, set
728     /// it in blocking mode and perform the conversion.
into_blocking_fd(self) -> io::Result<OwnedFd>729     pub fn into_blocking_fd(self) -> io::Result<OwnedFd> {
730         let fd = self.into_nonblocking_fd()?;
731         set_blocking(&fd)?;
732         Ok(fd)
733     }
734 
735     /// Converts the pipe into an [`OwnedFd`] in nonblocking mode.
736     ///
737     /// This function will deregister this pipe end from the event loop and
738     /// perform the conversion. The returned file descriptor will be in nonblocking
739     /// mode.
into_nonblocking_fd(self) -> io::Result<OwnedFd>740     pub fn into_nonblocking_fd(self) -> io::Result<OwnedFd> {
741         let mio_pipe = self.io.into_inner()?;
742 
743         // Safety: the pipe is now deregistered from the event loop
744         // and we are the only owner of this pipe end.
745         let owned_fd = unsafe { OwnedFd::from_raw_fd(mio_pipe.into_raw_fd()) };
746 
747         Ok(owned_fd)
748     }
749 }
750 
751 impl AsyncWrite for Sender {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>752     fn poll_write(
753         self: Pin<&mut Self>,
754         cx: &mut Context<'_>,
755         buf: &[u8],
756     ) -> Poll<io::Result<usize>> {
757         self.io.poll_write(cx, buf)
758     }
759 
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>760     fn poll_write_vectored(
761         self: Pin<&mut Self>,
762         cx: &mut Context<'_>,
763         bufs: &[io::IoSlice<'_>],
764     ) -> Poll<io::Result<usize>> {
765         self.io.poll_write_vectored(cx, bufs)
766     }
767 
is_write_vectored(&self) -> bool768     fn is_write_vectored(&self) -> bool {
769         true
770     }
771 
poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>772     fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
773         Poll::Ready(Ok(()))
774     }
775 
poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>776     fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
777         Poll::Ready(Ok(()))
778     }
779 }
780 
781 impl AsRawFd for Sender {
as_raw_fd(&self) -> RawFd782     fn as_raw_fd(&self) -> RawFd {
783         self.io.as_raw_fd()
784     }
785 }
786 
787 impl AsFd for Sender {
as_fd(&self) -> BorrowedFd<'_>788     fn as_fd(&self) -> BorrowedFd<'_> {
789         unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
790     }
791 }
792 
793 /// Reading end of a Unix pipe.
794 ///
795 /// It can be constructed from a FIFO file with [`OpenOptions::open_receiver`].
796 ///
797 /// # Examples
798 ///
799 /// Receiving messages from a named pipe in a loop:
800 ///
801 /// ```no_run
802 /// use tokio::net::unix::pipe;
803 /// use tokio::io::{self, AsyncReadExt};
804 ///
805 /// const FIFO_NAME: &str = "path/to/a/fifo";
806 ///
807 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
808 /// let mut rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?;
809 /// loop {
810 ///     let mut msg = vec![0; 256];
811 ///     match rx.read_exact(&mut msg).await {
812 ///         Ok(_) => {
813 ///             /* handle the message */
814 ///         }
815 ///         Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
816 ///             // Writing end has been closed, we should reopen the pipe.
817 ///             rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?;
818 ///         }
819 ///         Err(e) => return Err(e.into()),
820 ///     }
821 /// }
822 /// # }
823 /// ```
824 ///
825 /// On Linux, you can use a `Receiver` in read-write access mode to implement
826 /// resilient reading from a named pipe. Unlike `Receiver` opened in read-only
827 /// mode, read from a pipe in read-write mode will not fail with `UnexpectedEof`
828 /// when the writing end is closed. This way, a `Receiver` can asynchronously
829 /// wait for the next writer to open the pipe.
830 ///
831 /// You should not use functions waiting for EOF such as [`read_to_end`] with
832 /// a `Receiver` in read-write access mode, since it **may wait forever**.
833 /// `Receiver` in this mode also holds an open writing end, which prevents
834 /// receiving EOF.
835 ///
836 /// To set the read-write access mode you can use `OpenOptions::read_write`.
837 /// Note that using read-write access mode with FIFO files is not defined by
838 /// the POSIX standard and it is only guaranteed to work on Linux.
839 ///
840 /// ```ignore
841 /// use tokio::net::unix::pipe;
842 /// use tokio::io::AsyncReadExt;
843 /// # use std::error::Error;
844 ///
845 /// const FIFO_NAME: &str = "path/to/a/fifo";
846 ///
847 /// # async fn dox() -> Result<(), Box<dyn Error>> {
848 /// let mut rx = pipe::OpenOptions::new()
849 ///     .read_write(true)
850 ///     .open_receiver(FIFO_NAME)?;
851 /// loop {
852 ///     let mut msg = vec![0; 256];
853 ///     rx.read_exact(&mut msg).await?;
854 ///     /* handle the message */
855 /// }
856 /// # }
857 /// ```
858 ///
859 /// [`read_to_end`]: crate::io::AsyncReadExt::read_to_end
860 #[derive(Debug)]
861 pub struct Receiver {
862     io: PollEvented<mio_pipe::Receiver>,
863 }
864 
865 impl Receiver {
from_mio(mio_rx: mio_pipe::Receiver) -> io::Result<Receiver>866     fn from_mio(mio_rx: mio_pipe::Receiver) -> io::Result<Receiver> {
867         let io = PollEvented::new_with_interest(mio_rx, Interest::READABLE)?;
868         Ok(Receiver { io })
869     }
870 
871     /// Creates a new `Receiver` from a [`File`].
872     ///
873     /// This function is intended to construct a pipe from a [`File`] representing
874     /// a special FIFO file. It will check if the file is a pipe and has read access,
875     /// set it in non-blocking mode and perform the conversion.
876     ///
877     /// # Errors
878     ///
879     /// Fails with `io::ErrorKind::InvalidInput` if the file is not a pipe or it
880     /// does not have read access. Also fails with any standard OS error if it occurs.
881     ///
882     /// # Panics
883     ///
884     /// This function panics if it is not called from within a runtime with
885     /// IO enabled.
886     ///
887     /// The runtime is usually set implicitly when this function is called
888     /// from a future driven by a tokio runtime, otherwise runtime can be set
889     /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
from_file(file: File) -> io::Result<Receiver>890     pub fn from_file(file: File) -> io::Result<Receiver> {
891         Receiver::from_owned_fd(file.into())
892     }
893 
894     /// Creates a new `Receiver` from an [`OwnedFd`].
895     ///
896     /// This function is intended to construct a pipe from an [`OwnedFd`] representing
897     /// an anonymous pipe or a special FIFO file. It will check if the file descriptor
898     /// is a pipe and has read access, set it in non-blocking mode and perform the
899     /// conversion.
900     ///
901     /// # Errors
902     ///
903     /// Fails with `io::ErrorKind::InvalidInput` if the file descriptor is not a pipe
904     /// or it does not have read access. Also fails with any standard OS error if it
905     /// occurs.
906     ///
907     /// # Panics
908     ///
909     /// This function panics if it is not called from within a runtime with
910     /// IO enabled.
911     ///
912     /// The runtime is usually set implicitly when this function is called
913     /// from a future driven by a tokio runtime, otherwise runtime can be set
914     /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
from_owned_fd(owned_fd: OwnedFd) -> io::Result<Receiver>915     pub fn from_owned_fd(owned_fd: OwnedFd) -> io::Result<Receiver> {
916         if !is_pipe(owned_fd.as_fd())? {
917             return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
918         }
919 
920         let flags = get_file_flags(owned_fd.as_fd())?;
921         if has_read_access(flags) {
922             set_nonblocking(owned_fd.as_fd(), flags)?;
923             Receiver::from_owned_fd_unchecked(owned_fd)
924         } else {
925             Err(io::Error::new(
926                 io::ErrorKind::InvalidInput,
927                 "not in O_RDONLY or O_RDWR access mode",
928             ))
929         }
930     }
931 
932     /// Creates a new `Receiver` from a [`File`] without checking pipe properties.
933     ///
934     /// This function is intended to construct a pipe from a File representing
935     /// a special FIFO file. The conversion assumes nothing about the underlying
936     /// file; it is left up to the user to make sure it is opened with read access,
937     /// represents a pipe and is set in non-blocking mode.
938     ///
939     /// # Examples
940     ///
941     /// ```no_run
942     /// use tokio::net::unix::pipe;
943     /// use std::fs::OpenOptions;
944     /// use std::os::unix::fs::{FileTypeExt, OpenOptionsExt};
945     /// # use std::error::Error;
946     ///
947     /// const FIFO_NAME: &str = "path/to/a/fifo";
948     ///
949     /// # async fn dox() -> Result<(), Box<dyn Error>> {
950     /// let file = OpenOptions::new()
951     ///     .read(true)
952     ///     .custom_flags(libc::O_NONBLOCK)
953     ///     .open(FIFO_NAME)?;
954     /// if file.metadata()?.file_type().is_fifo() {
955     ///     let rx = pipe::Receiver::from_file_unchecked(file)?;
956     ///     /* use the Receiver */
957     /// }
958     /// # Ok(())
959     /// # }
960     /// ```
961     ///
962     /// # Panics
963     ///
964     /// This function panics if it is not called from within a runtime with
965     /// IO enabled.
966     ///
967     /// The runtime is usually set implicitly when this function is called
968     /// from a future driven by a tokio runtime, otherwise runtime can be set
969     /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
from_file_unchecked(file: File) -> io::Result<Receiver>970     pub fn from_file_unchecked(file: File) -> io::Result<Receiver> {
971         Receiver::from_owned_fd_unchecked(file.into())
972     }
973 
974     /// Creates a new `Receiver` from an [`OwnedFd`] without checking pipe properties.
975     ///
976     /// This function is intended to construct a pipe from an [`OwnedFd`] representing
977     /// an anonymous pipe or a special FIFO file. The conversion assumes nothing about
978     /// the underlying pipe; it is left up to the user to make sure that the file
979     /// descriptor represents the reading end of a pipe and the pipe is set in
980     /// non-blocking mode.
981     ///
982     /// # Panics
983     ///
984     /// This function panics if it is not called from within a runtime with
985     /// IO enabled.
986     ///
987     /// The runtime is usually set implicitly when this function is called
988     /// from a future driven by a tokio runtime, otherwise runtime can be set
989     /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
from_owned_fd_unchecked(owned_fd: OwnedFd) -> io::Result<Receiver>990     pub fn from_owned_fd_unchecked(owned_fd: OwnedFd) -> io::Result<Receiver> {
991         // Safety: OwnedFd represents a valid, open file descriptor.
992         let mio_rx = unsafe { mio_pipe::Receiver::from_raw_fd(owned_fd.into_raw_fd()) };
993         Receiver::from_mio(mio_rx)
994     }
995 
996     /// Waits for any of the requested ready states.
997     ///
998     /// This function can be used instead of [`readable()`] to check the returned
999     /// ready set for [`Ready::READABLE`] and [`Ready::READ_CLOSED`] events.
1000     ///
1001     /// The function may complete without the pipe being ready. This is a
1002     /// false-positive and attempting an operation will return with
1003     /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
1004     /// [`Ready`] set, so you should always check the returned value and possibly
1005     /// wait again if the requested states are not set.
1006     ///
1007     /// [`readable()`]: Self::readable
1008     ///
1009     /// # Cancel safety
1010     ///
1011     /// This method is cancel safe. Once a readiness event occurs, the method
1012     /// will continue to return immediately until the readiness event is
1013     /// consumed by an attempt to read that fails with `WouldBlock` or
1014     /// `Poll::Pending`.
ready(&self, interest: Interest) -> io::Result<Ready>1015     pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
1016         let event = self.io.registration().readiness(interest).await?;
1017         Ok(event.ready)
1018     }
1019 
1020     /// Waits for the pipe to become readable.
1021     ///
1022     /// This function is equivalent to `ready(Interest::READABLE)` and is usually
1023     /// paired with [`try_read()`].
1024     ///
1025     /// [`try_read()`]: Self::try_read()
1026     ///
1027     /// # Examples
1028     ///
1029     /// ```no_run
1030     /// use tokio::net::unix::pipe;
1031     /// use std::io;
1032     ///
1033     /// #[tokio::main]
1034     /// async fn main() -> io::Result<()> {
1035     ///     // Open a reading end of a fifo
1036     ///     let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
1037     ///
1038     ///     let mut msg = vec![0; 1024];
1039     ///
1040     ///     loop {
1041     ///         // Wait for the pipe to be readable
1042     ///         rx.readable().await?;
1043     ///
1044     ///         // Try to read data, this may still fail with `WouldBlock`
1045     ///         // if the readiness event is a false positive.
1046     ///         match rx.try_read(&mut msg) {
1047     ///             Ok(n) => {
1048     ///                 msg.truncate(n);
1049     ///                 break;
1050     ///             }
1051     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1052     ///                 continue;
1053     ///             }
1054     ///             Err(e) => {
1055     ///                 return Err(e.into());
1056     ///             }
1057     ///         }
1058     ///     }
1059     ///
1060     ///     println!("GOT = {:?}", msg);
1061     ///     Ok(())
1062     /// }
1063     /// ```
readable(&self) -> io::Result<()>1064     pub async fn readable(&self) -> io::Result<()> {
1065         self.ready(Interest::READABLE).await?;
1066         Ok(())
1067     }
1068 
1069     /// Polls for read readiness.
1070     ///
1071     /// If the pipe is not currently ready for reading, this method will
1072     /// store a clone of the `Waker` from the provided `Context`. When the pipe
1073     /// becomes ready for reading, `Waker::wake` will be called on the waker.
1074     ///
1075     /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
1076     /// the `Waker` from the `Context` passed to the most recent call is
1077     /// scheduled to receive a wakeup.
1078     ///
1079     /// This function is intended for cases where creating and pinning a future
1080     /// via [`readable`] is not feasible. Where possible, using [`readable`] is
1081     /// preferred, as this supports polling from multiple tasks at once.
1082     ///
1083     /// [`readable`]: Self::readable
1084     ///
1085     /// # Return value
1086     ///
1087     /// The function returns:
1088     ///
1089     /// * `Poll::Pending` if the pipe is not ready for reading.
1090     /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading.
1091     /// * `Poll::Ready(Err(e))` if an error is encountered.
1092     ///
1093     /// # Errors
1094     ///
1095     /// This function may encounter any standard I/O error except `WouldBlock`.
poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>1096     pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1097         self.io.registration().poll_read_ready(cx).map_ok(|_| ())
1098     }
1099 
1100     /// Tries to read data from the pipe into the provided buffer, returning how
1101     /// many bytes were read.
1102     ///
1103     /// Reads any pending data from the pipe but does not wait for new data
1104     /// to arrive. On success, returns the number of bytes read. Because
1105     /// `try_read()` is non-blocking, the buffer does not have to be stored by
1106     /// the async task and can exist entirely on the stack.
1107     ///
1108     /// Usually [`readable()`] is used with this function.
1109     ///
1110     /// [`readable()`]: Self::readable()
1111     ///
1112     /// # Return
1113     ///
1114     /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1115     /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
1116     ///
1117     /// 1. The pipe's writing end is closed and will no longer write data.
1118     /// 2. The specified buffer was 0 bytes in length.
1119     ///
1120     /// If the pipe is not ready to read data,
1121     /// `Err(io::ErrorKind::WouldBlock)` is returned.
1122     ///
1123     /// # Examples
1124     ///
1125     /// ```no_run
1126     /// use tokio::net::unix::pipe;
1127     /// use std::io;
1128     ///
1129     /// #[tokio::main]
1130     /// async fn main() -> io::Result<()> {
1131     ///     // Open a reading end of a fifo
1132     ///     let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
1133     ///
1134     ///     let mut msg = vec![0; 1024];
1135     ///
1136     ///     loop {
1137     ///         // Wait for the pipe to be readable
1138     ///         rx.readable().await?;
1139     ///
1140     ///         // Try to read data, this may still fail with `WouldBlock`
1141     ///         // if the readiness event is a false positive.
1142     ///         match rx.try_read(&mut msg) {
1143     ///             Ok(n) => {
1144     ///                 msg.truncate(n);
1145     ///                 break;
1146     ///             }
1147     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1148     ///                 continue;
1149     ///             }
1150     ///             Err(e) => {
1151     ///                 return Err(e.into());
1152     ///             }
1153     ///         }
1154     ///     }
1155     ///
1156     ///     println!("GOT = {:?}", msg);
1157     ///     Ok(())
1158     /// }
1159     /// ```
try_read(&self, buf: &mut [u8]) -> io::Result<usize>1160     pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
1161         self.io
1162             .registration()
1163             .try_io(Interest::READABLE, || (&*self.io).read(buf))
1164     }
1165 
1166     /// Tries to read data from the pipe into the provided buffers, returning
1167     /// how many bytes were read.
1168     ///
1169     /// Data is copied to fill each buffer in order, with the final buffer
1170     /// written to possibly being only partially filled. This method behaves
1171     /// equivalently to a single call to [`try_read()`] with concatenated
1172     /// buffers.
1173     ///
1174     /// Reads any pending data from the pipe but does not wait for new data
1175     /// to arrive. On success, returns the number of bytes read. Because
1176     /// `try_read_vectored()` is non-blocking, the buffer does not have to be
1177     /// stored by the async task and can exist entirely on the stack.
1178     ///
1179     /// Usually, [`readable()`] is used with this function.
1180     ///
1181     /// [`try_read()`]: Self::try_read()
1182     /// [`readable()`]: Self::readable()
1183     ///
1184     /// # Return
1185     ///
1186     /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1187     /// number of bytes read. `Ok(0)` indicates the pipe's writing end is
1188     /// closed and will no longer write data. If the pipe is not ready to read
1189     /// data `Err(io::ErrorKind::WouldBlock)` is returned.
1190     ///
1191     /// # Examples
1192     ///
1193     /// ```no_run
1194     /// use tokio::net::unix::pipe;
1195     /// use std::io;
1196     ///
1197     /// #[tokio::main]
1198     /// async fn main() -> io::Result<()> {
1199     ///     // Open a reading end of a fifo
1200     ///     let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
1201     ///
1202     ///     loop {
1203     ///         // Wait for the pipe to be readable
1204     ///         rx.readable().await?;
1205     ///
1206     ///         // Creating the buffer **after** the `await` prevents it from
1207     ///         // being stored in the async task.
1208     ///         let mut buf_a = [0; 512];
1209     ///         let mut buf_b = [0; 1024];
1210     ///         let mut bufs = [
1211     ///             io::IoSliceMut::new(&mut buf_a),
1212     ///             io::IoSliceMut::new(&mut buf_b),
1213     ///         ];
1214     ///
1215     ///         // Try to read data, this may still fail with `WouldBlock`
1216     ///         // if the readiness event is a false positive.
1217     ///         match rx.try_read_vectored(&mut bufs) {
1218     ///             Ok(0) => break,
1219     ///             Ok(n) => {
1220     ///                 println!("read {} bytes", n);
1221     ///             }
1222     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1223     ///                 continue;
1224     ///             }
1225     ///             Err(e) => {
1226     ///                 return Err(e.into());
1227     ///             }
1228     ///         }
1229     ///     }
1230     ///
1231     ///     Ok(())
1232     /// }
1233     /// ```
try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize>1234     pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
1235         self.io
1236             .registration()
1237             .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
1238     }
1239 
1240     cfg_io_util! {
1241         /// Tries to read data from the pipe into the provided buffer, advancing the
1242         /// buffer's internal cursor, returning how many bytes were read.
1243         ///
1244         /// Reads any pending data from the pipe but does not wait for new data
1245         /// to arrive. On success, returns the number of bytes read. Because
1246         /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
1247         /// the async task and can exist entirely on the stack.
1248         ///
1249         /// Usually, [`readable()`] or [`ready()`] is used with this function.
1250         ///
1251         /// [`readable()`]: Self::readable
1252         /// [`ready()`]: Self::ready
1253         ///
1254         /// # Return
1255         ///
1256         /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1257         /// number of bytes read. `Ok(0)` indicates the pipe's writing end is
1258         /// closed and will no longer write data. If the pipe is not ready to read
1259         /// data `Err(io::ErrorKind::WouldBlock)` is returned.
1260         ///
1261         /// # Examples
1262         ///
1263         /// ```no_run
1264         /// use tokio::net::unix::pipe;
1265         /// use std::io;
1266         ///
1267         /// #[tokio::main]
1268         /// async fn main() -> io::Result<()> {
1269         ///     // Open a reading end of a fifo
1270         ///     let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
1271         ///
1272         ///     loop {
1273         ///         // Wait for the pipe to be readable
1274         ///         rx.readable().await?;
1275         ///
1276         ///         let mut buf = Vec::with_capacity(4096);
1277         ///
1278         ///         // Try to read data, this may still fail with `WouldBlock`
1279         ///         // if the readiness event is a false positive.
1280         ///         match rx.try_read_buf(&mut buf) {
1281         ///             Ok(0) => break,
1282         ///             Ok(n) => {
1283         ///                 println!("read {} bytes", n);
1284         ///             }
1285         ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1286         ///                 continue;
1287         ///             }
1288         ///             Err(e) => {
1289         ///                 return Err(e.into());
1290         ///             }
1291         ///         }
1292         ///     }
1293         ///
1294         ///     Ok(())
1295         /// }
1296         /// ```
1297         pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
1298             self.io.registration().try_io(Interest::READABLE, || {
1299                 use std::io::Read;
1300 
1301                 let dst = buf.chunk_mut();
1302                 let dst =
1303                     unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
1304 
1305                 // Safety: `mio_pipe::Receiver` uses a `std::fs::File` underneath,
1306                 // which correctly handles reads into uninitialized memory.
1307                 let n = (&*self.io).read(dst)?;
1308 
1309                 unsafe {
1310                     buf.advance_mut(n);
1311                 }
1312 
1313                 Ok(n)
1314             })
1315         }
1316     }
1317 
1318     /// Converts the pipe into an [`OwnedFd`] in blocking mode.
1319     ///
1320     /// This function will deregister this pipe end from the event loop, set
1321     /// it in blocking mode and perform the conversion.
into_blocking_fd(self) -> io::Result<OwnedFd>1322     pub fn into_blocking_fd(self) -> io::Result<OwnedFd> {
1323         let fd = self.into_nonblocking_fd()?;
1324         set_blocking(&fd)?;
1325         Ok(fd)
1326     }
1327 
1328     /// Converts the pipe into an [`OwnedFd`] in nonblocking mode.
1329     ///
1330     /// This function will deregister this pipe end from the event loop and
1331     /// perform the conversion. Returned file descriptor will be in nonblocking
1332     /// mode.
into_nonblocking_fd(self) -> io::Result<OwnedFd>1333     pub fn into_nonblocking_fd(self) -> io::Result<OwnedFd> {
1334         let mio_pipe = self.io.into_inner()?;
1335 
1336         // Safety: the pipe is now deregistered from the event loop
1337         // and we are the only owner of this pipe end.
1338         let owned_fd = unsafe { OwnedFd::from_raw_fd(mio_pipe.into_raw_fd()) };
1339 
1340         Ok(owned_fd)
1341     }
1342 }
1343 
1344 impl AsyncRead for Receiver {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>1345     fn poll_read(
1346         self: Pin<&mut Self>,
1347         cx: &mut Context<'_>,
1348         buf: &mut ReadBuf<'_>,
1349     ) -> Poll<io::Result<()>> {
1350         // Safety: `mio_pipe::Receiver` uses a `std::fs::File` underneath,
1351         // which correctly handles reads into uninitialized memory.
1352         unsafe { self.io.poll_read(cx, buf) }
1353     }
1354 }
1355 
1356 impl AsRawFd for Receiver {
as_raw_fd(&self) -> RawFd1357     fn as_raw_fd(&self) -> RawFd {
1358         self.io.as_raw_fd()
1359     }
1360 }
1361 
1362 impl AsFd for Receiver {
as_fd(&self) -> BorrowedFd<'_>1363     fn as_fd(&self) -> BorrowedFd<'_> {
1364         unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1365     }
1366 }
1367 
1368 /// Checks if the file descriptor is a pipe or a FIFO.
is_pipe(fd: BorrowedFd<'_>) -> io::Result<bool>1369 fn is_pipe(fd: BorrowedFd<'_>) -> io::Result<bool> {
1370     // Safety: `libc::stat` is C-like struct used for syscalls and all-zero
1371     // byte pattern forms a valid value.
1372     let mut stat: libc::stat = unsafe { std::mem::zeroed() };
1373 
1374     // Safety: it's safe to call `fstat` with a valid, open file descriptor
1375     // and a valid pointer to a `stat` struct.
1376     let r = unsafe { libc::fstat(fd.as_raw_fd(), &mut stat) };
1377 
1378     if r == -1 {
1379         Err(io::Error::last_os_error())
1380     } else {
1381         Ok((stat.st_mode as libc::mode_t & libc::S_IFMT) == libc::S_IFIFO)
1382     }
1383 }
1384 
1385 /// Gets file descriptor's flags by fcntl.
get_file_flags(fd: BorrowedFd<'_>) -> io::Result<libc::c_int>1386 fn get_file_flags(fd: BorrowedFd<'_>) -> io::Result<libc::c_int> {
1387     // Safety: it's safe to use `fcntl` to read flags of a valid, open file descriptor.
1388     let flags = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_GETFL) };
1389     if flags < 0 {
1390         Err(io::Error::last_os_error())
1391     } else {
1392         Ok(flags)
1393     }
1394 }
1395 
1396 /// Checks for `O_RDONLY` or `O_RDWR` access mode.
has_read_access(flags: libc::c_int) -> bool1397 fn has_read_access(flags: libc::c_int) -> bool {
1398     let mode = flags & libc::O_ACCMODE;
1399     mode == libc::O_RDONLY || mode == libc::O_RDWR
1400 }
1401 
1402 /// Checks for `O_WRONLY` or `O_RDWR` access mode.
has_write_access(flags: libc::c_int) -> bool1403 fn has_write_access(flags: libc::c_int) -> bool {
1404     let mode = flags & libc::O_ACCMODE;
1405     mode == libc::O_WRONLY || mode == libc::O_RDWR
1406 }
1407 
1408 /// Sets file descriptor's flags with `O_NONBLOCK` by fcntl.
set_nonblocking(fd: BorrowedFd<'_>, current_flags: libc::c_int) -> io::Result<()>1409 fn set_nonblocking(fd: BorrowedFd<'_>, current_flags: libc::c_int) -> io::Result<()> {
1410     let flags = current_flags | libc::O_NONBLOCK;
1411 
1412     if flags != current_flags {
1413         // Safety: it's safe to use `fcntl` to set the `O_NONBLOCK` flag of a valid,
1414         // open file descriptor.
1415         let ret = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_SETFL, flags) };
1416         if ret < 0 {
1417             return Err(io::Error::last_os_error());
1418         }
1419     }
1420 
1421     Ok(())
1422 }
1423 
1424 /// Removes `O_NONBLOCK` from fd's flags.
set_blocking<T: AsRawFd>(fd: &T) -> io::Result<()>1425 fn set_blocking<T: AsRawFd>(fd: &T) -> io::Result<()> {
1426     // Safety: it's safe to use `fcntl` to read flags of a valid, open file descriptor.
1427     let previous = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_GETFL) };
1428     if previous == -1 {
1429         return Err(io::Error::last_os_error());
1430     }
1431 
1432     let new = previous & !libc::O_NONBLOCK;
1433 
1434     // Safety: it's safe to use `fcntl` to unset the `O_NONBLOCK` flag of a valid,
1435     // open file descriptor.
1436     let r = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_SETFL, new) };
1437     if r == -1 {
1438         Err(io::Error::last_os_error())
1439     } else {
1440         Ok(())
1441     }
1442 }
1443