1 #![cfg(feature = "full")]
2 #![cfg(unix)]
3
4 use tokio::io::{AsyncReadExt, AsyncWriteExt, Interest};
5 use tokio::net::unix::pipe;
6 use tokio_test::task;
7 use tokio_test::{assert_err, assert_ok, assert_pending, assert_ready_ok};
8
9 use std::fs::File;
10 use std::io;
11 use std::os::unix::fs::OpenOptionsExt;
12 use std::os::unix::io::AsRawFd;
13 use std::path::{Path, PathBuf};
14
15 /// Helper struct which will clean up temporary files once dropped.
16 struct TempFifo {
17 path: PathBuf,
18 _dir: tempfile::TempDir,
19 }
20
21 impl TempFifo {
new(name: &str) -> io::Result<TempFifo>22 fn new(name: &str) -> io::Result<TempFifo> {
23 let dir = tempfile::Builder::new()
24 .prefix("tokio-fifo-tests")
25 .tempdir()?;
26 let path = dir.path().join(name);
27 nix::unistd::mkfifo(&path, nix::sys::stat::Mode::S_IRWXU)?;
28
29 Ok(TempFifo { path, _dir: dir })
30 }
31 }
32
33 impl AsRef<Path> for TempFifo {
as_ref(&self) -> &Path34 fn as_ref(&self) -> &Path {
35 self.path.as_ref()
36 }
37 }
38
39 #[tokio::test]
40 #[cfg_attr(miri, ignore)] // No `mkfifo` in miri.
fifo_simple_send() -> io::Result<()>41 async fn fifo_simple_send() -> io::Result<()> {
42 const DATA: &[u8] = b"this is some data to write to the fifo";
43
44 let fifo = TempFifo::new("simple_send")?;
45
46 // Create a reading task which should wait for data from the pipe.
47 let mut reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
48 let mut read_fut = task::spawn(async move {
49 let mut buf = vec![0; DATA.len()];
50 reader.read_exact(&mut buf).await?;
51 Ok::<_, io::Error>(buf)
52 });
53 assert_pending!(read_fut.poll());
54
55 let mut writer = pipe::OpenOptions::new().open_sender(&fifo)?;
56 writer.write_all(DATA).await?;
57
58 // Let the IO driver poll events for the reader.
59 while !read_fut.is_woken() {
60 tokio::task::yield_now().await;
61 }
62
63 // Reading task should be ready now.
64 let read_data = assert_ready_ok!(read_fut.poll());
65 assert_eq!(&read_data, DATA);
66
67 Ok(())
68 }
69
70 #[tokio::test]
71 #[cfg(target_os = "linux")]
72 #[cfg_attr(miri, ignore)] // No `mkfifo` in miri.
fifo_simple_send_sender_first() -> io::Result<()>73 async fn fifo_simple_send_sender_first() -> io::Result<()> {
74 const DATA: &[u8] = b"this is some data to write to the fifo";
75
76 // Create a new fifo file with *no reading ends open*.
77 let fifo = TempFifo::new("simple_send_sender_first")?;
78
79 // Simple `open_sender` should fail with ENXIO (no such device or address).
80 let err = assert_err!(pipe::OpenOptions::new().open_sender(&fifo));
81 assert_eq!(err.raw_os_error(), Some(libc::ENXIO));
82
83 // `open_sender` in read-write mode should succeed and the pipe should be ready to write.
84 let mut writer = pipe::OpenOptions::new()
85 .read_write(true)
86 .open_sender(&fifo)?;
87 writer.write_all(DATA).await?;
88
89 // Read the written data and validate.
90 let mut reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
91 let mut read_data = vec![0; DATA.len()];
92 reader.read_exact(&mut read_data).await?;
93 assert_eq!(&read_data, DATA);
94
95 Ok(())
96 }
97
98 // Opens a FIFO file, write and *close the writer*.
write_and_close(path: impl AsRef<Path>, msg: &[u8]) -> io::Result<()>99 async fn write_and_close(path: impl AsRef<Path>, msg: &[u8]) -> io::Result<()> {
100 let mut writer = pipe::OpenOptions::new().open_sender(path)?;
101 writer.write_all(msg).await?;
102 drop(writer); // Explicit drop.
103 Ok(())
104 }
105
106 /// Checks EOF behavior with single reader and writers sequentially opening
107 /// and closing a FIFO.
108 #[tokio::test]
109 #[cfg_attr(miri, ignore)] // No `mkfifo` in miri.
fifo_multiple_writes() -> io::Result<()>110 async fn fifo_multiple_writes() -> io::Result<()> {
111 const DATA: &[u8] = b"this is some data to write to the fifo";
112
113 let fifo = TempFifo::new("fifo_multiple_writes")?;
114
115 let mut reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
116
117 write_and_close(&fifo, DATA).await?;
118 let ev = reader.ready(Interest::READABLE).await?;
119 assert!(ev.is_readable());
120 let mut read_data = vec![0; DATA.len()];
121 assert_ok!(reader.read_exact(&mut read_data).await);
122
123 // Check that reader hits EOF.
124 let err = assert_err!(reader.read_exact(&mut read_data).await);
125 assert_eq!(err.kind(), io::ErrorKind::UnexpectedEof);
126
127 // Write more data and read again.
128 write_and_close(&fifo, DATA).await?;
129 assert_ok!(reader.read_exact(&mut read_data).await);
130
131 Ok(())
132 }
133
134 /// Checks behavior of a resilient reader (Receiver in O_RDWR access mode)
135 /// with writers sequentially opening and closing a FIFO.
136 #[tokio::test]
137 #[cfg(target_os = "linux")]
138 #[cfg_attr(miri, ignore)] // No `socket` in miri.
fifo_resilient_reader() -> io::Result<()>139 async fn fifo_resilient_reader() -> io::Result<()> {
140 const DATA: &[u8] = b"this is some data to write to the fifo";
141
142 let fifo = TempFifo::new("fifo_resilient_reader")?;
143
144 // Open reader in read-write access mode.
145 let mut reader = pipe::OpenOptions::new()
146 .read_write(true)
147 .open_receiver(&fifo)?;
148
149 write_and_close(&fifo, DATA).await?;
150 let ev = reader.ready(Interest::READABLE).await?;
151 let mut read_data = vec![0; DATA.len()];
152 reader.read_exact(&mut read_data).await?;
153
154 // Check that reader didn't hit EOF.
155 assert!(!ev.is_read_closed());
156
157 // Resilient reader can asynchronously wait for the next writer.
158 let mut second_read_fut = task::spawn(reader.read_exact(&mut read_data));
159 assert_pending!(second_read_fut.poll());
160
161 // Write more data and read again.
162 write_and_close(&fifo, DATA).await?;
163 assert_ok!(second_read_fut.await);
164
165 Ok(())
166 }
167
168 #[tokio::test]
169 #[cfg_attr(miri, ignore)] // No `O_NONBLOCK` for open64 in miri.
open_detects_not_a_fifo() -> io::Result<()>170 async fn open_detects_not_a_fifo() -> io::Result<()> {
171 let dir = tempfile::Builder::new()
172 .prefix("tokio-fifo-tests")
173 .tempdir()
174 .unwrap();
175 let path = dir.path().join("not_a_fifo");
176
177 // Create an ordinary file.
178 File::create(&path)?;
179
180 // Check if Sender detects invalid file type.
181 let err = assert_err!(pipe::OpenOptions::new().open_sender(&path));
182 assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
183
184 // Check if Receiver detects invalid file type.
185 let err = assert_err!(pipe::OpenOptions::new().open_sender(&path));
186 assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
187
188 Ok(())
189 }
190
191 #[tokio::test]
192 #[cfg_attr(miri, ignore)] // No `mkfifo` in miri.
from_file() -> io::Result<()>193 async fn from_file() -> io::Result<()> {
194 const DATA: &[u8] = b"this is some data to write to the fifo";
195
196 let fifo = TempFifo::new("from_file")?;
197
198 // Construct a Receiver from a File.
199 let file = std::fs::OpenOptions::new()
200 .read(true)
201 .custom_flags(libc::O_NONBLOCK)
202 .open(&fifo)?;
203 let mut reader = pipe::Receiver::from_file(file)?;
204
205 // Construct a Sender from a File.
206 let file = std::fs::OpenOptions::new()
207 .write(true)
208 .custom_flags(libc::O_NONBLOCK)
209 .open(&fifo)?;
210 let mut writer = pipe::Sender::from_file(file)?;
211
212 // Write and read some data to test async.
213 let mut read_fut = task::spawn(async move {
214 let mut buf = vec![0; DATA.len()];
215 reader.read_exact(&mut buf).await?;
216 Ok::<_, io::Error>(buf)
217 });
218 assert_pending!(read_fut.poll());
219
220 writer.write_all(DATA).await?;
221
222 let read_data = assert_ok!(read_fut.await);
223 assert_eq!(&read_data, DATA);
224
225 Ok(())
226 }
227
228 #[tokio::test]
229 #[cfg_attr(miri, ignore)] // No `fstat` in miri.
from_file_detects_not_a_fifo() -> io::Result<()>230 async fn from_file_detects_not_a_fifo() -> io::Result<()> {
231 let dir = tempfile::Builder::new()
232 .prefix("tokio-fifo-tests")
233 .tempdir()
234 .unwrap();
235 let path = dir.path().join("not_a_fifo");
236
237 // Create an ordinary file.
238 File::create(&path)?;
239
240 // Check if Sender detects invalid file type.
241 let file = std::fs::OpenOptions::new().write(true).open(&path)?;
242 let err = assert_err!(pipe::Sender::from_file(file));
243 assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
244
245 // Check if Receiver detects invalid file type.
246 let file = std::fs::OpenOptions::new().read(true).open(&path)?;
247 let err = assert_err!(pipe::Receiver::from_file(file));
248 assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
249
250 Ok(())
251 }
252
253 #[tokio::test]
254 #[cfg_attr(miri, ignore)] // No `mkfifo` in miri.
from_file_detects_wrong_access_mode() -> io::Result<()>255 async fn from_file_detects_wrong_access_mode() -> io::Result<()> {
256 let fifo = TempFifo::new("wrong_access_mode")?;
257
258 // Open a read end to open the fifo for writing.
259 let _reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
260
261 // Check if Receiver detects write-only access mode.
262 let wronly = std::fs::OpenOptions::new()
263 .write(true)
264 .custom_flags(libc::O_NONBLOCK)
265 .open(&fifo)?;
266 let err = assert_err!(pipe::Receiver::from_file(wronly));
267 assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
268
269 // Check if Sender detects read-only access mode.
270 let rdonly = std::fs::OpenOptions::new()
271 .read(true)
272 .custom_flags(libc::O_NONBLOCK)
273 .open(&fifo)?;
274 let err = assert_err!(pipe::Sender::from_file(rdonly));
275 assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
276
277 Ok(())
278 }
279
is_nonblocking<T: AsRawFd>(fd: &T) -> io::Result<bool>280 fn is_nonblocking<T: AsRawFd>(fd: &T) -> io::Result<bool> {
281 let flags = nix::fcntl::fcntl(fd.as_raw_fd(), nix::fcntl::F_GETFL)?;
282 Ok((flags & libc::O_NONBLOCK) != 0)
283 }
284
285 #[tokio::test]
286 #[cfg_attr(miri, ignore)] // No `mkfifo` in miri.
from_file_sets_nonblock() -> io::Result<()>287 async fn from_file_sets_nonblock() -> io::Result<()> {
288 let fifo = TempFifo::new("sets_nonblock")?;
289
290 // Open read and write ends to let blocking files open.
291 let _reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
292 let _writer = pipe::OpenOptions::new().open_sender(&fifo)?;
293
294 // Check if Receiver sets the pipe in non-blocking mode.
295 let rdonly = std::fs::OpenOptions::new().read(true).open(&fifo)?;
296 assert!(!is_nonblocking(&rdonly)?);
297 let reader = pipe::Receiver::from_file(rdonly)?;
298 assert!(is_nonblocking(&reader)?);
299
300 // Check if Sender sets the pipe in non-blocking mode.
301 let wronly = std::fs::OpenOptions::new().write(true).open(&fifo)?;
302 assert!(!is_nonblocking(&wronly)?);
303 let writer = pipe::Sender::from_file(wronly)?;
304 assert!(is_nonblocking(&writer)?);
305
306 Ok(())
307 }
308
writable_by_poll(writer: &pipe::Sender) -> bool309 fn writable_by_poll(writer: &pipe::Sender) -> bool {
310 task::spawn(writer.writable()).poll().is_ready()
311 }
312
313 #[tokio::test]
314 #[cfg_attr(miri, ignore)] // No `mkfifo` in miri.
try_read_write() -> io::Result<()>315 async fn try_read_write() -> io::Result<()> {
316 const DATA: &[u8] = b"this is some data to write to the fifo";
317
318 // Create a pipe pair over a fifo file.
319 let fifo = TempFifo::new("try_read_write")?;
320 let reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
321 let writer = pipe::OpenOptions::new().open_sender(&fifo)?;
322
323 // Fill the pipe buffer with `try_write`.
324 let mut write_data = Vec::new();
325 while writable_by_poll(&writer) {
326 match writer.try_write(DATA) {
327 Ok(n) => write_data.extend(&DATA[..n]),
328 Err(e) => {
329 assert_eq!(e.kind(), io::ErrorKind::WouldBlock);
330 break;
331 }
332 }
333 }
334
335 // Drain the pipe buffer with `try_read`.
336 let mut read_data = vec![0; write_data.len()];
337 let mut i = 0;
338 while i < write_data.len() {
339 reader.readable().await?;
340 match reader.try_read(&mut read_data[i..]) {
341 Ok(n) => i += n,
342 Err(e) => {
343 assert_eq!(e.kind(), io::ErrorKind::WouldBlock);
344 continue;
345 }
346 }
347 }
348
349 assert_eq!(read_data, write_data);
350
351 Ok(())
352 }
353
354 #[tokio::test]
355 #[cfg_attr(miri, ignore)] // No `mkfifo` in miri.
try_read_write_vectored() -> io::Result<()>356 async fn try_read_write_vectored() -> io::Result<()> {
357 const DATA: &[u8] = b"this is some data to write to the fifo";
358
359 // Create a pipe pair over a fifo file.
360 let fifo = TempFifo::new("try_read_write_vectored")?;
361 let reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
362 let writer = pipe::OpenOptions::new().open_sender(&fifo)?;
363
364 let write_bufs: Vec<_> = DATA.chunks(3).map(io::IoSlice::new).collect();
365
366 // Fill the pipe buffer with `try_write_vectored`.
367 let mut write_data = Vec::new();
368 while writable_by_poll(&writer) {
369 match writer.try_write_vectored(&write_bufs) {
370 Ok(n) => write_data.extend(&DATA[..n]),
371 Err(e) => {
372 assert_eq!(e.kind(), io::ErrorKind::WouldBlock);
373 break;
374 }
375 }
376 }
377
378 // Drain the pipe buffer with `try_read_vectored`.
379 let mut read_data = vec![0; write_data.len()];
380 let mut i = 0;
381 while i < write_data.len() {
382 reader.readable().await?;
383
384 let mut read_bufs: Vec<_> = read_data[i..]
385 .chunks_mut(0x10000)
386 .map(io::IoSliceMut::new)
387 .collect();
388 match reader.try_read_vectored(&mut read_bufs) {
389 Ok(n) => i += n,
390 Err(e) => {
391 assert_eq!(e.kind(), io::ErrorKind::WouldBlock);
392 continue;
393 }
394 }
395 }
396
397 assert_eq!(read_data, write_data);
398
399 Ok(())
400 }
401
402 #[tokio::test]
403 #[cfg_attr(miri, ignore)] // No `mkfifo` in miri.
try_read_buf() -> std::io::Result<()>404 async fn try_read_buf() -> std::io::Result<()> {
405 const DATA: &[u8] = b"this is some data to write to the fifo";
406
407 // Create a pipe pair over a fifo file.
408 let fifo = TempFifo::new("try_read_write_vectored")?;
409 let reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
410 let writer = pipe::OpenOptions::new().open_sender(&fifo)?;
411
412 // Fill the pipe buffer with `try_write`.
413 let mut write_data = Vec::new();
414 while writable_by_poll(&writer) {
415 match writer.try_write(DATA) {
416 Ok(n) => write_data.extend(&DATA[..n]),
417 Err(e) => {
418 assert_eq!(e.kind(), io::ErrorKind::WouldBlock);
419 break;
420 }
421 }
422 }
423
424 // Drain the pipe buffer with `try_read_buf`.
425 let mut read_data = vec![0; write_data.len()];
426 let mut i = 0;
427 while i < write_data.len() {
428 reader.readable().await?;
429 match reader.try_read_buf(&mut read_data) {
430 Ok(n) => i += n,
431 Err(e) => {
432 assert_eq!(e.kind(), io::ErrorKind::WouldBlock);
433 continue;
434 }
435 }
436 }
437
438 assert_eq!(read_data, write_data);
439
440 Ok(())
441 }
442
443 #[tokio::test]
anon_pipe_simple_send() -> io::Result<()>444 async fn anon_pipe_simple_send() -> io::Result<()> {
445 const DATA: &[u8] = b"this is some data to write to the pipe";
446
447 let (mut writer, mut reader) = pipe::pipe()?;
448
449 // Create a reading task which should wait for data from the pipe.
450 let mut read_fut = task::spawn(async move {
451 let mut buf = vec![0; DATA.len()];
452 reader.read_exact(&mut buf).await?;
453 Ok::<_, io::Error>(buf)
454 });
455 assert_pending!(read_fut.poll());
456
457 writer.write_all(DATA).await?;
458
459 // Let the IO driver poll events for the reader.
460 while !read_fut.is_woken() {
461 tokio::task::yield_now().await;
462 }
463
464 // Reading task should be ready now.
465 let read_data = assert_ready_ok!(read_fut.poll());
466 assert_eq!(&read_data, DATA);
467
468 Ok(())
469 }
470
471 #[tokio::test]
472 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
anon_pipe_spawn_echo() -> std::io::Result<()>473 async fn anon_pipe_spawn_echo() -> std::io::Result<()> {
474 use tokio::process::Command;
475
476 const DATA: &str = "this is some data to write to the pipe";
477
478 let (tx, mut rx) = pipe::pipe()?;
479
480 let status = Command::new("echo")
481 .arg("-n")
482 .arg(DATA)
483 .stdout(tx.into_blocking_fd()?)
484 .status();
485
486 let mut buf = vec![0; DATA.len()];
487 rx.read_exact(&mut buf).await?;
488 assert_eq!(String::from_utf8(buf).unwrap(), DATA);
489
490 let exit_code = status.await?;
491 assert!(exit_code.success());
492
493 // Check if the pipe is closed.
494 buf = Vec::new();
495 let total = assert_ok!(rx.try_read(&mut buf));
496 assert_eq!(total, 0);
497
498 Ok(())
499 }
500
501 #[tokio::test]
502 #[cfg(target_os = "linux")]
503 #[cfg_attr(miri, ignore)] // No `fstat` in miri.
anon_pipe_from_owned_fd() -> std::io::Result<()>504 async fn anon_pipe_from_owned_fd() -> std::io::Result<()> {
505 use nix::fcntl::OFlag;
506
507 const DATA: &[u8] = b"this is some data to write to the pipe";
508
509 let (rx_fd, tx_fd) = nix::unistd::pipe2(OFlag::O_CLOEXEC | OFlag::O_NONBLOCK)?;
510
511 let mut rx = pipe::Receiver::from_owned_fd(rx_fd)?;
512 let mut tx = pipe::Sender::from_owned_fd(tx_fd)?;
513
514 let mut buf = vec![0; DATA.len()];
515 tx.write_all(DATA).await?;
516 rx.read_exact(&mut buf).await?;
517 assert_eq!(buf, DATA);
518
519 Ok(())
520 }
521
522 #[tokio::test]
523 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
anon_pipe_into_nonblocking_fd() -> std::io::Result<()>524 async fn anon_pipe_into_nonblocking_fd() -> std::io::Result<()> {
525 let (tx, rx) = pipe::pipe()?;
526
527 let tx_fd = tx.into_nonblocking_fd()?;
528 let rx_fd = rx.into_nonblocking_fd()?;
529
530 assert!(is_nonblocking(&tx_fd)?);
531 assert!(is_nonblocking(&rx_fd)?);
532
533 Ok(())
534 }
535
536 #[tokio::test]
537 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
anon_pipe_into_blocking_fd() -> std::io::Result<()>538 async fn anon_pipe_into_blocking_fd() -> std::io::Result<()> {
539 let (tx, rx) = pipe::pipe()?;
540
541 let tx_fd = tx.into_blocking_fd()?;
542 let rx_fd = rx.into_blocking_fd()?;
543
544 assert!(!is_nonblocking(&tx_fd)?);
545 assert!(!is_nonblocking(&rx_fd)?);
546
547 Ok(())
548 }
549