1 #![cfg(feature = "full")]
2 #![cfg(unix)]
3 
4 use tokio::io::{AsyncReadExt, AsyncWriteExt, Interest};
5 use tokio::net::unix::pipe;
6 use tokio_test::task;
7 use tokio_test::{assert_err, assert_ok, assert_pending, assert_ready_ok};
8 
9 use std::fs::File;
10 use std::io;
11 use std::os::unix::fs::OpenOptionsExt;
12 use std::os::unix::io::AsRawFd;
13 use std::path::{Path, PathBuf};
14 
15 /// Helper struct which will clean up temporary files once dropped.
16 struct TempFifo {
17     path: PathBuf,
18     _dir: tempfile::TempDir,
19 }
20 
21 impl TempFifo {
new(name: &str) -> io::Result<TempFifo>22     fn new(name: &str) -> io::Result<TempFifo> {
23         let dir = tempfile::Builder::new()
24             .prefix("tokio-fifo-tests")
25             .tempdir()?;
26         let path = dir.path().join(name);
27         nix::unistd::mkfifo(&path, nix::sys::stat::Mode::S_IRWXU)?;
28 
29         Ok(TempFifo { path, _dir: dir })
30     }
31 }
32 
33 impl AsRef<Path> for TempFifo {
as_ref(&self) -> &Path34     fn as_ref(&self) -> &Path {
35         self.path.as_ref()
36     }
37 }
38 
39 #[tokio::test]
40 #[cfg_attr(miri, ignore)] // No `mkfifo` in miri.
fifo_simple_send() -> io::Result<()>41 async fn fifo_simple_send() -> io::Result<()> {
42     const DATA: &[u8] = b"this is some data to write to the fifo";
43 
44     let fifo = TempFifo::new("simple_send")?;
45 
46     // Create a reading task which should wait for data from the pipe.
47     let mut reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
48     let mut read_fut = task::spawn(async move {
49         let mut buf = vec![0; DATA.len()];
50         reader.read_exact(&mut buf).await?;
51         Ok::<_, io::Error>(buf)
52     });
53     assert_pending!(read_fut.poll());
54 
55     let mut writer = pipe::OpenOptions::new().open_sender(&fifo)?;
56     writer.write_all(DATA).await?;
57 
58     // Let the IO driver poll events for the reader.
59     while !read_fut.is_woken() {
60         tokio::task::yield_now().await;
61     }
62 
63     // Reading task should be ready now.
64     let read_data = assert_ready_ok!(read_fut.poll());
65     assert_eq!(&read_data, DATA);
66 
67     Ok(())
68 }
69 
70 #[tokio::test]
71 #[cfg(target_os = "linux")]
72 #[cfg_attr(miri, ignore)] // No `mkfifo` in miri.
fifo_simple_send_sender_first() -> io::Result<()>73 async fn fifo_simple_send_sender_first() -> io::Result<()> {
74     const DATA: &[u8] = b"this is some data to write to the fifo";
75 
76     // Create a new fifo file with *no reading ends open*.
77     let fifo = TempFifo::new("simple_send_sender_first")?;
78 
79     // Simple `open_sender` should fail with ENXIO (no such device or address).
80     let err = assert_err!(pipe::OpenOptions::new().open_sender(&fifo));
81     assert_eq!(err.raw_os_error(), Some(libc::ENXIO));
82 
83     // `open_sender` in read-write mode should succeed and the pipe should be ready to write.
84     let mut writer = pipe::OpenOptions::new()
85         .read_write(true)
86         .open_sender(&fifo)?;
87     writer.write_all(DATA).await?;
88 
89     // Read the written data and validate.
90     let mut reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
91     let mut read_data = vec![0; DATA.len()];
92     reader.read_exact(&mut read_data).await?;
93     assert_eq!(&read_data, DATA);
94 
95     Ok(())
96 }
97 
98 // Opens a FIFO file, write and *close the writer*.
write_and_close(path: impl AsRef<Path>, msg: &[u8]) -> io::Result<()>99 async fn write_and_close(path: impl AsRef<Path>, msg: &[u8]) -> io::Result<()> {
100     let mut writer = pipe::OpenOptions::new().open_sender(path)?;
101     writer.write_all(msg).await?;
102     drop(writer); // Explicit drop.
103     Ok(())
104 }
105 
106 /// Checks EOF behavior with single reader and writers sequentially opening
107 /// and closing a FIFO.
108 #[tokio::test]
109 #[cfg_attr(miri, ignore)] // No `mkfifo` in miri.
fifo_multiple_writes() -> io::Result<()>110 async fn fifo_multiple_writes() -> io::Result<()> {
111     const DATA: &[u8] = b"this is some data to write to the fifo";
112 
113     let fifo = TempFifo::new("fifo_multiple_writes")?;
114 
115     let mut reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
116 
117     write_and_close(&fifo, DATA).await?;
118     let ev = reader.ready(Interest::READABLE).await?;
119     assert!(ev.is_readable());
120     let mut read_data = vec![0; DATA.len()];
121     assert_ok!(reader.read_exact(&mut read_data).await);
122 
123     // Check that reader hits EOF.
124     let err = assert_err!(reader.read_exact(&mut read_data).await);
125     assert_eq!(err.kind(), io::ErrorKind::UnexpectedEof);
126 
127     // Write more data and read again.
128     write_and_close(&fifo, DATA).await?;
129     assert_ok!(reader.read_exact(&mut read_data).await);
130 
131     Ok(())
132 }
133 
134 /// Checks behavior of a resilient reader (Receiver in O_RDWR access mode)
135 /// with writers sequentially opening and closing a FIFO.
136 #[tokio::test]
137 #[cfg(target_os = "linux")]
138 #[cfg_attr(miri, ignore)] // No `socket` in miri.
fifo_resilient_reader() -> io::Result<()>139 async fn fifo_resilient_reader() -> io::Result<()> {
140     const DATA: &[u8] = b"this is some data to write to the fifo";
141 
142     let fifo = TempFifo::new("fifo_resilient_reader")?;
143 
144     // Open reader in read-write access mode.
145     let mut reader = pipe::OpenOptions::new()
146         .read_write(true)
147         .open_receiver(&fifo)?;
148 
149     write_and_close(&fifo, DATA).await?;
150     let ev = reader.ready(Interest::READABLE).await?;
151     let mut read_data = vec![0; DATA.len()];
152     reader.read_exact(&mut read_data).await?;
153 
154     // Check that reader didn't hit EOF.
155     assert!(!ev.is_read_closed());
156 
157     // Resilient reader can asynchronously wait for the next writer.
158     let mut second_read_fut = task::spawn(reader.read_exact(&mut read_data));
159     assert_pending!(second_read_fut.poll());
160 
161     // Write more data and read again.
162     write_and_close(&fifo, DATA).await?;
163     assert_ok!(second_read_fut.await);
164 
165     Ok(())
166 }
167 
168 #[tokio::test]
169 #[cfg_attr(miri, ignore)] // No `O_NONBLOCK` for open64 in miri.
open_detects_not_a_fifo() -> io::Result<()>170 async fn open_detects_not_a_fifo() -> io::Result<()> {
171     let dir = tempfile::Builder::new()
172         .prefix("tokio-fifo-tests")
173         .tempdir()
174         .unwrap();
175     let path = dir.path().join("not_a_fifo");
176 
177     // Create an ordinary file.
178     File::create(&path)?;
179 
180     // Check if Sender detects invalid file type.
181     let err = assert_err!(pipe::OpenOptions::new().open_sender(&path));
182     assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
183 
184     // Check if Receiver detects invalid file type.
185     let err = assert_err!(pipe::OpenOptions::new().open_sender(&path));
186     assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
187 
188     Ok(())
189 }
190 
191 #[tokio::test]
192 #[cfg_attr(miri, ignore)] // No `mkfifo` in miri.
from_file() -> io::Result<()>193 async fn from_file() -> io::Result<()> {
194     const DATA: &[u8] = b"this is some data to write to the fifo";
195 
196     let fifo = TempFifo::new("from_file")?;
197 
198     // Construct a Receiver from a File.
199     let file = std::fs::OpenOptions::new()
200         .read(true)
201         .custom_flags(libc::O_NONBLOCK)
202         .open(&fifo)?;
203     let mut reader = pipe::Receiver::from_file(file)?;
204 
205     // Construct a Sender from a File.
206     let file = std::fs::OpenOptions::new()
207         .write(true)
208         .custom_flags(libc::O_NONBLOCK)
209         .open(&fifo)?;
210     let mut writer = pipe::Sender::from_file(file)?;
211 
212     // Write and read some data to test async.
213     let mut read_fut = task::spawn(async move {
214         let mut buf = vec![0; DATA.len()];
215         reader.read_exact(&mut buf).await?;
216         Ok::<_, io::Error>(buf)
217     });
218     assert_pending!(read_fut.poll());
219 
220     writer.write_all(DATA).await?;
221 
222     let read_data = assert_ok!(read_fut.await);
223     assert_eq!(&read_data, DATA);
224 
225     Ok(())
226 }
227 
228 #[tokio::test]
229 #[cfg_attr(miri, ignore)] // No `fstat` in miri.
from_file_detects_not_a_fifo() -> io::Result<()>230 async fn from_file_detects_not_a_fifo() -> io::Result<()> {
231     let dir = tempfile::Builder::new()
232         .prefix("tokio-fifo-tests")
233         .tempdir()
234         .unwrap();
235     let path = dir.path().join("not_a_fifo");
236 
237     // Create an ordinary file.
238     File::create(&path)?;
239 
240     // Check if Sender detects invalid file type.
241     let file = std::fs::OpenOptions::new().write(true).open(&path)?;
242     let err = assert_err!(pipe::Sender::from_file(file));
243     assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
244 
245     // Check if Receiver detects invalid file type.
246     let file = std::fs::OpenOptions::new().read(true).open(&path)?;
247     let err = assert_err!(pipe::Receiver::from_file(file));
248     assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
249 
250     Ok(())
251 }
252 
253 #[tokio::test]
254 #[cfg_attr(miri, ignore)] // No `mkfifo` in miri.
from_file_detects_wrong_access_mode() -> io::Result<()>255 async fn from_file_detects_wrong_access_mode() -> io::Result<()> {
256     let fifo = TempFifo::new("wrong_access_mode")?;
257 
258     // Open a read end to open the fifo for writing.
259     let _reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
260 
261     // Check if Receiver detects write-only access mode.
262     let wronly = std::fs::OpenOptions::new()
263         .write(true)
264         .custom_flags(libc::O_NONBLOCK)
265         .open(&fifo)?;
266     let err = assert_err!(pipe::Receiver::from_file(wronly));
267     assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
268 
269     // Check if Sender detects read-only access mode.
270     let rdonly = std::fs::OpenOptions::new()
271         .read(true)
272         .custom_flags(libc::O_NONBLOCK)
273         .open(&fifo)?;
274     let err = assert_err!(pipe::Sender::from_file(rdonly));
275     assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
276 
277     Ok(())
278 }
279 
is_nonblocking<T: AsRawFd>(fd: &T) -> io::Result<bool>280 fn is_nonblocking<T: AsRawFd>(fd: &T) -> io::Result<bool> {
281     let flags = nix::fcntl::fcntl(fd.as_raw_fd(), nix::fcntl::F_GETFL)?;
282     Ok((flags & libc::O_NONBLOCK) != 0)
283 }
284 
285 #[tokio::test]
286 #[cfg_attr(miri, ignore)] // No `mkfifo` in miri.
from_file_sets_nonblock() -> io::Result<()>287 async fn from_file_sets_nonblock() -> io::Result<()> {
288     let fifo = TempFifo::new("sets_nonblock")?;
289 
290     // Open read and write ends to let blocking files open.
291     let _reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
292     let _writer = pipe::OpenOptions::new().open_sender(&fifo)?;
293 
294     // Check if Receiver sets the pipe in non-blocking mode.
295     let rdonly = std::fs::OpenOptions::new().read(true).open(&fifo)?;
296     assert!(!is_nonblocking(&rdonly)?);
297     let reader = pipe::Receiver::from_file(rdonly)?;
298     assert!(is_nonblocking(&reader)?);
299 
300     // Check if Sender sets the pipe in non-blocking mode.
301     let wronly = std::fs::OpenOptions::new().write(true).open(&fifo)?;
302     assert!(!is_nonblocking(&wronly)?);
303     let writer = pipe::Sender::from_file(wronly)?;
304     assert!(is_nonblocking(&writer)?);
305 
306     Ok(())
307 }
308 
writable_by_poll(writer: &pipe::Sender) -> bool309 fn writable_by_poll(writer: &pipe::Sender) -> bool {
310     task::spawn(writer.writable()).poll().is_ready()
311 }
312 
313 #[tokio::test]
314 #[cfg_attr(miri, ignore)] // No `mkfifo` in miri.
try_read_write() -> io::Result<()>315 async fn try_read_write() -> io::Result<()> {
316     const DATA: &[u8] = b"this is some data to write to the fifo";
317 
318     // Create a pipe pair over a fifo file.
319     let fifo = TempFifo::new("try_read_write")?;
320     let reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
321     let writer = pipe::OpenOptions::new().open_sender(&fifo)?;
322 
323     // Fill the pipe buffer with `try_write`.
324     let mut write_data = Vec::new();
325     while writable_by_poll(&writer) {
326         match writer.try_write(DATA) {
327             Ok(n) => write_data.extend(&DATA[..n]),
328             Err(e) => {
329                 assert_eq!(e.kind(), io::ErrorKind::WouldBlock);
330                 break;
331             }
332         }
333     }
334 
335     // Drain the pipe buffer with `try_read`.
336     let mut read_data = vec![0; write_data.len()];
337     let mut i = 0;
338     while i < write_data.len() {
339         reader.readable().await?;
340         match reader.try_read(&mut read_data[i..]) {
341             Ok(n) => i += n,
342             Err(e) => {
343                 assert_eq!(e.kind(), io::ErrorKind::WouldBlock);
344                 continue;
345             }
346         }
347     }
348 
349     assert_eq!(read_data, write_data);
350 
351     Ok(())
352 }
353 
354 #[tokio::test]
355 #[cfg_attr(miri, ignore)] // No `mkfifo` in miri.
try_read_write_vectored() -> io::Result<()>356 async fn try_read_write_vectored() -> io::Result<()> {
357     const DATA: &[u8] = b"this is some data to write to the fifo";
358 
359     // Create a pipe pair over a fifo file.
360     let fifo = TempFifo::new("try_read_write_vectored")?;
361     let reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
362     let writer = pipe::OpenOptions::new().open_sender(&fifo)?;
363 
364     let write_bufs: Vec<_> = DATA.chunks(3).map(io::IoSlice::new).collect();
365 
366     // Fill the pipe buffer with `try_write_vectored`.
367     let mut write_data = Vec::new();
368     while writable_by_poll(&writer) {
369         match writer.try_write_vectored(&write_bufs) {
370             Ok(n) => write_data.extend(&DATA[..n]),
371             Err(e) => {
372                 assert_eq!(e.kind(), io::ErrorKind::WouldBlock);
373                 break;
374             }
375         }
376     }
377 
378     // Drain the pipe buffer with `try_read_vectored`.
379     let mut read_data = vec![0; write_data.len()];
380     let mut i = 0;
381     while i < write_data.len() {
382         reader.readable().await?;
383 
384         let mut read_bufs: Vec<_> = read_data[i..]
385             .chunks_mut(0x10000)
386             .map(io::IoSliceMut::new)
387             .collect();
388         match reader.try_read_vectored(&mut read_bufs) {
389             Ok(n) => i += n,
390             Err(e) => {
391                 assert_eq!(e.kind(), io::ErrorKind::WouldBlock);
392                 continue;
393             }
394         }
395     }
396 
397     assert_eq!(read_data, write_data);
398 
399     Ok(())
400 }
401 
402 #[tokio::test]
403 #[cfg_attr(miri, ignore)] // No `mkfifo` in miri.
try_read_buf() -> std::io::Result<()>404 async fn try_read_buf() -> std::io::Result<()> {
405     const DATA: &[u8] = b"this is some data to write to the fifo";
406 
407     // Create a pipe pair over a fifo file.
408     let fifo = TempFifo::new("try_read_write_vectored")?;
409     let reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
410     let writer = pipe::OpenOptions::new().open_sender(&fifo)?;
411 
412     // Fill the pipe buffer with `try_write`.
413     let mut write_data = Vec::new();
414     while writable_by_poll(&writer) {
415         match writer.try_write(DATA) {
416             Ok(n) => write_data.extend(&DATA[..n]),
417             Err(e) => {
418                 assert_eq!(e.kind(), io::ErrorKind::WouldBlock);
419                 break;
420             }
421         }
422     }
423 
424     // Drain the pipe buffer with `try_read_buf`.
425     let mut read_data = vec![0; write_data.len()];
426     let mut i = 0;
427     while i < write_data.len() {
428         reader.readable().await?;
429         match reader.try_read_buf(&mut read_data) {
430             Ok(n) => i += n,
431             Err(e) => {
432                 assert_eq!(e.kind(), io::ErrorKind::WouldBlock);
433                 continue;
434             }
435         }
436     }
437 
438     assert_eq!(read_data, write_data);
439 
440     Ok(())
441 }
442 
443 #[tokio::test]
anon_pipe_simple_send() -> io::Result<()>444 async fn anon_pipe_simple_send() -> io::Result<()> {
445     const DATA: &[u8] = b"this is some data to write to the pipe";
446 
447     let (mut writer, mut reader) = pipe::pipe()?;
448 
449     // Create a reading task which should wait for data from the pipe.
450     let mut read_fut = task::spawn(async move {
451         let mut buf = vec![0; DATA.len()];
452         reader.read_exact(&mut buf).await?;
453         Ok::<_, io::Error>(buf)
454     });
455     assert_pending!(read_fut.poll());
456 
457     writer.write_all(DATA).await?;
458 
459     // Let the IO driver poll events for the reader.
460     while !read_fut.is_woken() {
461         tokio::task::yield_now().await;
462     }
463 
464     // Reading task should be ready now.
465     let read_data = assert_ready_ok!(read_fut.poll());
466     assert_eq!(&read_data, DATA);
467 
468     Ok(())
469 }
470 
471 #[tokio::test]
472 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
anon_pipe_spawn_echo() -> std::io::Result<()>473 async fn anon_pipe_spawn_echo() -> std::io::Result<()> {
474     use tokio::process::Command;
475 
476     const DATA: &str = "this is some data to write to the pipe";
477 
478     let (tx, mut rx) = pipe::pipe()?;
479 
480     let status = Command::new("echo")
481         .arg("-n")
482         .arg(DATA)
483         .stdout(tx.into_blocking_fd()?)
484         .status();
485 
486     let mut buf = vec![0; DATA.len()];
487     rx.read_exact(&mut buf).await?;
488     assert_eq!(String::from_utf8(buf).unwrap(), DATA);
489 
490     let exit_code = status.await?;
491     assert!(exit_code.success());
492 
493     // Check if the pipe is closed.
494     buf = Vec::new();
495     let total = assert_ok!(rx.try_read(&mut buf));
496     assert_eq!(total, 0);
497 
498     Ok(())
499 }
500 
501 #[tokio::test]
502 #[cfg(target_os = "linux")]
503 #[cfg_attr(miri, ignore)] // No `fstat` in miri.
anon_pipe_from_owned_fd() -> std::io::Result<()>504 async fn anon_pipe_from_owned_fd() -> std::io::Result<()> {
505     use nix::fcntl::OFlag;
506 
507     const DATA: &[u8] = b"this is some data to write to the pipe";
508 
509     let (rx_fd, tx_fd) = nix::unistd::pipe2(OFlag::O_CLOEXEC | OFlag::O_NONBLOCK)?;
510 
511     let mut rx = pipe::Receiver::from_owned_fd(rx_fd)?;
512     let mut tx = pipe::Sender::from_owned_fd(tx_fd)?;
513 
514     let mut buf = vec![0; DATA.len()];
515     tx.write_all(DATA).await?;
516     rx.read_exact(&mut buf).await?;
517     assert_eq!(buf, DATA);
518 
519     Ok(())
520 }
521 
522 #[tokio::test]
523 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
anon_pipe_into_nonblocking_fd() -> std::io::Result<()>524 async fn anon_pipe_into_nonblocking_fd() -> std::io::Result<()> {
525     let (tx, rx) = pipe::pipe()?;
526 
527     let tx_fd = tx.into_nonblocking_fd()?;
528     let rx_fd = rx.into_nonblocking_fd()?;
529 
530     assert!(is_nonblocking(&tx_fd)?);
531     assert!(is_nonblocking(&rx_fd)?);
532 
533     Ok(())
534 }
535 
536 #[tokio::test]
537 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
anon_pipe_into_blocking_fd() -> std::io::Result<()>538 async fn anon_pipe_into_blocking_fd() -> std::io::Result<()> {
539     let (tx, rx) = pipe::pipe()?;
540 
541     let tx_fd = tx.into_blocking_fd()?;
542     let rx_fd = rx.into_blocking_fd()?;
543 
544     assert!(!is_nonblocking(&tx_fd)?);
545     assert!(!is_nonblocking(&rx_fd)?);
546 
547     Ok(())
548 }
549