1 use std::{
2     io::{Read, Seek, Write},
3     ops::Deref,
4     os::unix::io::AsRawFd,
5     pin::Pin,
6     sync::atomic::{AtomicBool, Ordering},
7     thread, time,
8 };
9 
10 use libc::c_int;
11 use nix::{
12     errno::*,
13     sys::{
14         aio::*,
15         signal::{
16             sigaction, SaFlags, SigAction, SigHandler, SigSet, SigevNotify,
17             Signal,
18         },
19         time::{TimeSpec, TimeValLike},
20     },
21 };
22 use tempfile::tempfile;
23 
24 pub static SIGNALED: AtomicBool = AtomicBool::new(false);
25 
sigfunc(_: c_int)26 extern "C" fn sigfunc(_: c_int) {
27     SIGNALED.store(true, Ordering::Relaxed);
28 }
29 
30 // Helper that polls an AioCb for completion or error
31 macro_rules! poll_aio {
32     ($aiocb: expr) => {
33         loop {
34             let err = $aiocb.as_mut().error();
35             if err != Err(Errno::EINPROGRESS) {
36                 break err;
37             };
38             thread::sleep(time::Duration::from_millis(10));
39         }
40     };
41 }
42 
43 mod aio_fsync {
44     use super::*;
45 
46     #[test]
test_accessors()47     fn test_accessors() {
48         let aiocb = AioFsync::new(
49             1001,
50             AioFsyncMode::O_SYNC,
51             42,
52             SigevNotify::SigevSignal {
53                 signal: Signal::SIGUSR2,
54                 si_value: 99,
55             },
56         );
57         assert_eq!(1001, aiocb.fd());
58         assert_eq!(AioFsyncMode::O_SYNC, aiocb.mode());
59         assert_eq!(42, aiocb.priority());
60         let sev = aiocb.sigevent().sigevent();
61         assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo);
62         assert_eq!(99, sev.sigev_value.sival_ptr as i64);
63     }
64 
65     /// `AioFsync::submit` should not modify the `AioCb` object if
66     /// `libc::aio_fsync` returns an error
67     // Skip on Linux, because Linux's AIO implementation can't detect errors
68     // synchronously
69     #[test]
70     #[cfg(any(target_os = "freebsd", apple_targets))]
error()71     fn error() {
72         use std::mem;
73 
74         const INITIAL: &[u8] = b"abcdef123456";
75         // Create an invalid AioFsyncMode
76         let mode = unsafe { mem::transmute(666) };
77         let mut f = tempfile().unwrap();
78         f.write_all(INITIAL).unwrap();
79         let mut aiof = Box::pin(AioFsync::new(
80             f.as_raw_fd(),
81             mode,
82             0,
83             SigevNotify::SigevNone,
84         ));
85         let err = aiof.as_mut().submit();
86         err.expect_err("assertion failed");
87     }
88 
89     #[test]
90     #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
ok()91     fn ok() {
92         const INITIAL: &[u8] = b"abcdef123456";
93         let mut f = tempfile().unwrap();
94         f.write_all(INITIAL).unwrap();
95         let fd = f.as_raw_fd();
96         let mut aiof = Box::pin(AioFsync::new(
97             fd,
98             AioFsyncMode::O_SYNC,
99             0,
100             SigevNotify::SigevNone,
101         ));
102         aiof.as_mut().submit().unwrap();
103         poll_aio!(&mut aiof).unwrap();
104         aiof.as_mut().aio_return().unwrap();
105     }
106 }
107 
108 mod aio_read {
109     use super::*;
110 
111     #[test]
test_accessors()112     fn test_accessors() {
113         let mut rbuf = vec![0; 4];
114         let aiocb = AioRead::new(
115             1001,
116             2, //offset
117             &mut rbuf,
118             42, //priority
119             SigevNotify::SigevSignal {
120                 signal: Signal::SIGUSR2,
121                 si_value: 99,
122             },
123         );
124         assert_eq!(1001, aiocb.fd());
125         assert_eq!(4, aiocb.nbytes());
126         assert_eq!(2, aiocb.offset());
127         assert_eq!(42, aiocb.priority());
128         let sev = aiocb.sigevent().sigevent();
129         assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo);
130         assert_eq!(99, sev.sigev_value.sival_ptr as i64);
131     }
132 
133     // Tests AioWrite.cancel.  We aren't trying to test the OS's implementation,
134     // only our bindings.  So it's sufficient to check that cancel
135     // returned any AioCancelStat value.
136     #[test]
137     #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
cancel()138     fn cancel() {
139         const INITIAL: &[u8] = b"abcdef123456";
140         let mut rbuf = vec![0; 4];
141         let mut f = tempfile().unwrap();
142         f.write_all(INITIAL).unwrap();
143         let fd = f.as_raw_fd();
144         let mut aior =
145             Box::pin(AioRead::new(fd, 2, &mut rbuf, 0, SigevNotify::SigevNone));
146         aior.as_mut().submit().unwrap();
147 
148         aior.as_mut().cancel().unwrap();
149 
150         // Wait for aiow to complete, but don't care whether it succeeded
151         let _ = poll_aio!(&mut aior);
152         let _ = aior.as_mut().aio_return();
153     }
154 
155     /// `AioRead::submit` should not modify the `AioCb` object if
156     /// `libc::aio_read` returns an error
157     // Skip on Linux, because Linux's AIO implementation can't detect errors
158     // synchronously
159     #[test]
160     #[cfg(any(target_os = "freebsd", apple_targets))]
error()161     fn error() {
162         const INITIAL: &[u8] = b"abcdef123456";
163         let mut rbuf = vec![0; 4];
164         let mut f = tempfile().unwrap();
165         f.write_all(INITIAL).unwrap();
166         let mut aior = Box::pin(AioRead::new(
167             f.as_raw_fd(),
168             -1, //an invalid offset
169             &mut rbuf,
170             0, //priority
171             SigevNotify::SigevNone,
172         ));
173         aior.as_mut().submit().expect_err("assertion failed");
174     }
175 
176     // Test a simple aio operation with no completion notification.  We must
177     // poll for completion
178     #[test]
179     #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
ok()180     fn ok() {
181         const INITIAL: &[u8] = b"abcdef123456";
182         let mut rbuf = vec![0; 4];
183         const EXPECT: &[u8] = b"cdef";
184         let mut f = tempfile().unwrap();
185         f.write_all(INITIAL).unwrap();
186         {
187             let fd = f.as_raw_fd();
188             let mut aior = Box::pin(AioRead::new(
189                 fd,
190                 2,
191                 &mut rbuf,
192                 0,
193                 SigevNotify::SigevNone,
194             ));
195             aior.as_mut().submit().unwrap();
196 
197             let err = poll_aio!(&mut aior);
198             assert_eq!(err, Ok(()));
199             assert_eq!(aior.as_mut().aio_return().unwrap(), EXPECT.len());
200         }
201         assert_eq!(EXPECT, rbuf.deref());
202     }
203 
204     // Like ok, but allocates the structure on the stack.
205     #[test]
206     #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
on_stack()207     fn on_stack() {
208         const INITIAL: &[u8] = b"abcdef123456";
209         let mut rbuf = vec![0; 4];
210         const EXPECT: &[u8] = b"cdef";
211         let mut f = tempfile().unwrap();
212         f.write_all(INITIAL).unwrap();
213         {
214             let fd = f.as_raw_fd();
215             let mut aior =
216                 AioRead::new(fd, 2, &mut rbuf, 0, SigevNotify::SigevNone);
217             let mut aior = unsafe { Pin::new_unchecked(&mut aior) };
218             aior.as_mut().submit().unwrap();
219 
220             let err = poll_aio!(&mut aior);
221             assert_eq!(err, Ok(()));
222             assert_eq!(aior.as_mut().aio_return().unwrap(), EXPECT.len());
223         }
224         assert_eq!(EXPECT, rbuf.deref());
225     }
226 }
227 
228 #[cfg(target_os = "freebsd")]
229 #[cfg(fbsd14)]
230 mod aio_readv {
231     use std::io::IoSliceMut;
232 
233     use super::*;
234 
235     #[test]
test_accessors()236     fn test_accessors() {
237         let mut rbuf0 = vec![0; 4];
238         let mut rbuf1 = vec![0; 8];
239         let mut rbufs =
240             [IoSliceMut::new(&mut rbuf0), IoSliceMut::new(&mut rbuf1)];
241         let aiocb = AioReadv::new(
242             1001,
243             2, //offset
244             &mut rbufs,
245             42, //priority
246             SigevNotify::SigevSignal {
247                 signal: Signal::SIGUSR2,
248                 si_value: 99,
249             },
250         );
251         assert_eq!(1001, aiocb.fd());
252         assert_eq!(2, aiocb.iovlen());
253         assert_eq!(2, aiocb.offset());
254         assert_eq!(42, aiocb.priority());
255         let sev = aiocb.sigevent().sigevent();
256         assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo);
257         assert_eq!(99, sev.sigev_value.sival_ptr as i64);
258     }
259 
260     #[test]
261     #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
ok()262     fn ok() {
263         const INITIAL: &[u8] = b"abcdef123456";
264         let mut rbuf0 = vec![0; 4];
265         let mut rbuf1 = vec![0; 2];
266         let mut rbufs =
267             [IoSliceMut::new(&mut rbuf0), IoSliceMut::new(&mut rbuf1)];
268         const EXPECT0: &[u8] = b"cdef";
269         const EXPECT1: &[u8] = b"12";
270         let mut f = tempfile().unwrap();
271         f.write_all(INITIAL).unwrap();
272         {
273             let fd = f.as_raw_fd();
274             let mut aior = Box::pin(AioReadv::new(
275                 fd,
276                 2,
277                 &mut rbufs,
278                 0,
279                 SigevNotify::SigevNone,
280             ));
281             aior.as_mut().submit().unwrap();
282 
283             let err = poll_aio!(&mut aior);
284             assert_eq!(err, Ok(()));
285             assert_eq!(
286                 aior.as_mut().aio_return().unwrap(),
287                 EXPECT0.len() + EXPECT1.len()
288             );
289         }
290         assert_eq!(&EXPECT0, &rbuf0);
291         assert_eq!(&EXPECT1, &rbuf1);
292     }
293 }
294 
295 mod aio_write {
296     use super::*;
297 
298     #[test]
test_accessors()299     fn test_accessors() {
300         let wbuf = vec![0; 4];
301         let aiocb = AioWrite::new(
302             1001,
303             2, //offset
304             &wbuf,
305             42, //priority
306             SigevNotify::SigevSignal {
307                 signal: Signal::SIGUSR2,
308                 si_value: 99,
309             },
310         );
311         assert_eq!(1001, aiocb.fd());
312         assert_eq!(4, aiocb.nbytes());
313         assert_eq!(2, aiocb.offset());
314         assert_eq!(42, aiocb.priority());
315         let sev = aiocb.sigevent().sigevent();
316         assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo);
317         assert_eq!(99, sev.sigev_value.sival_ptr as i64);
318     }
319 
320     // Tests AioWrite.cancel.  We aren't trying to test the OS's implementation,
321     // only our bindings.  So it's sufficient to check that cancel
322     // returned any AioCancelStat value.
323     #[test]
324     #[cfg_attr(target_env = "musl", ignore)]
cancel()325     fn cancel() {
326         let wbuf: &[u8] = b"CDEF";
327 
328         let f = tempfile().unwrap();
329         let mut aiow = Box::pin(AioWrite::new(
330             f.as_raw_fd(),
331             0,
332             wbuf,
333             0,
334             SigevNotify::SigevNone,
335         ));
336         aiow.as_mut().submit().unwrap();
337         let err = aiow.as_mut().error();
338         assert!(err == Ok(()) || err == Err(Errno::EINPROGRESS));
339 
340         aiow.as_mut().cancel().unwrap();
341 
342         // Wait for aiow to complete, but don't care whether it succeeded
343         let _ = poll_aio!(&mut aiow);
344         let _ = aiow.as_mut().aio_return();
345     }
346 
347     // Test a simple aio operation with no completion notification.  We must
348     // poll for completion.
349     #[test]
350     #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
ok()351     fn ok() {
352         const INITIAL: &[u8] = b"abcdef123456";
353         let wbuf = "CDEF".to_string().into_bytes();
354         let mut rbuf = Vec::new();
355         const EXPECT: &[u8] = b"abCDEF123456";
356 
357         let mut f = tempfile().unwrap();
358         f.write_all(INITIAL).unwrap();
359         let mut aiow = Box::pin(AioWrite::new(
360             f.as_raw_fd(),
361             2,
362             &wbuf,
363             0,
364             SigevNotify::SigevNone,
365         ));
366         aiow.as_mut().submit().unwrap();
367 
368         let err = poll_aio!(&mut aiow);
369         assert_eq!(err, Ok(()));
370         assert_eq!(aiow.as_mut().aio_return().unwrap(), wbuf.len());
371 
372         f.rewind().unwrap();
373         let len = f.read_to_end(&mut rbuf).unwrap();
374         assert_eq!(len, EXPECT.len());
375         assert_eq!(rbuf, EXPECT);
376     }
377 
378     // Like ok, but allocates the structure on the stack.
379     #[test]
380     #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
on_stack()381     fn on_stack() {
382         const INITIAL: &[u8] = b"abcdef123456";
383         let wbuf = "CDEF".to_string().into_bytes();
384         let mut rbuf = Vec::new();
385         const EXPECT: &[u8] = b"abCDEF123456";
386 
387         let mut f = tempfile().unwrap();
388         f.write_all(INITIAL).unwrap();
389         let mut aiow = AioWrite::new(
390             f.as_raw_fd(),
391             2, //offset
392             &wbuf,
393             0, //priority
394             SigevNotify::SigevNone,
395         );
396         let mut aiow = unsafe { Pin::new_unchecked(&mut aiow) };
397         aiow.as_mut().submit().unwrap();
398 
399         let err = poll_aio!(&mut aiow);
400         assert_eq!(err, Ok(()));
401         assert_eq!(aiow.as_mut().aio_return().unwrap(), wbuf.len());
402 
403         f.rewind().unwrap();
404         let len = f.read_to_end(&mut rbuf).unwrap();
405         assert_eq!(len, EXPECT.len());
406         assert_eq!(rbuf, EXPECT);
407     }
408 
409     /// `AioWrite::write` should not modify the `AioCb` object if
410     /// `libc::aio_write` returns an error.
411     // Skip on Linux, because Linux's AIO implementation can't detect errors
412     // synchronously
413     #[test]
414     #[cfg(any(target_os = "freebsd", apple_targets))]
error()415     fn error() {
416         let wbuf = "CDEF".to_string().into_bytes();
417         let mut aiow = Box::pin(AioWrite::new(
418             666, // An invalid file descriptor
419             0,   //offset
420             &wbuf,
421             0, //priority
422             SigevNotify::SigevNone,
423         ));
424         aiow.as_mut().submit().expect_err("assertion failed");
425         // Dropping the AioWrite at this point should not panic
426     }
427 }
428 
429 #[cfg(target_os = "freebsd")]
430 #[cfg(fbsd14)]
431 mod aio_writev {
432     use std::io::IoSlice;
433 
434     use super::*;
435 
436     #[test]
test_accessors()437     fn test_accessors() {
438         let wbuf0 = vec![0; 4];
439         let wbuf1 = vec![0; 8];
440         let wbufs = [IoSlice::new(&wbuf0), IoSlice::new(&wbuf1)];
441         let aiocb = AioWritev::new(
442             1001,
443             2, //offset
444             &wbufs,
445             42, //priority
446             SigevNotify::SigevSignal {
447                 signal: Signal::SIGUSR2,
448                 si_value: 99,
449             },
450         );
451         assert_eq!(1001, aiocb.fd());
452         assert_eq!(2, aiocb.iovlen());
453         assert_eq!(2, aiocb.offset());
454         assert_eq!(42, aiocb.priority());
455         let sev = aiocb.sigevent().sigevent();
456         assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo);
457         assert_eq!(99, sev.sigev_value.sival_ptr as i64);
458     }
459 
460     // Test a simple aio operation with no completion notification.  We must
461     // poll for completion.
462     #[test]
463     #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
ok()464     fn ok() {
465         const INITIAL: &[u8] = b"abcdef123456";
466         let wbuf0 = b"BC";
467         let wbuf1 = b"DEF";
468         let wbufs = [IoSlice::new(wbuf0), IoSlice::new(wbuf1)];
469         let wlen = wbuf0.len() + wbuf1.len();
470         let mut rbuf = Vec::new();
471         const EXPECT: &[u8] = b"aBCDEF123456";
472 
473         let mut f = tempfile().unwrap();
474         f.write_all(INITIAL).unwrap();
475         let mut aiow = Box::pin(AioWritev::new(
476             f.as_raw_fd(),
477             1,
478             &wbufs,
479             0,
480             SigevNotify::SigevNone,
481         ));
482         aiow.as_mut().submit().unwrap();
483 
484         let err = poll_aio!(&mut aiow);
485         assert_eq!(err, Ok(()));
486         assert_eq!(aiow.as_mut().aio_return().unwrap(), wlen);
487 
488         f.rewind().unwrap();
489         let len = f.read_to_end(&mut rbuf).unwrap();
490         assert_eq!(len, EXPECT.len());
491         assert_eq!(rbuf, EXPECT);
492     }
493 }
494 
495 // Test an aio operation with completion delivered by a signal
496 #[test]
497 #[cfg_attr(
498     any(
499         all(target_env = "musl", target_arch = "x86_64"),
500         target_arch = "mips",
501         target_arch = "mips32r6",
502         target_arch = "mips64",
503         target_arch = "mips64r6"
504     ),
505     ignore
506 )]
sigev_signal()507 fn sigev_signal() {
508     let _m = crate::SIGNAL_MTX.lock();
509     let sa = SigAction::new(
510         SigHandler::Handler(sigfunc),
511         SaFlags::SA_RESETHAND,
512         SigSet::empty(),
513     );
514     SIGNALED.store(false, Ordering::Relaxed);
515     unsafe { sigaction(Signal::SIGUSR2, &sa) }.unwrap();
516 
517     const INITIAL: &[u8] = b"abcdef123456";
518     const WBUF: &[u8] = b"CDEF";
519     let mut rbuf = Vec::new();
520     const EXPECT: &[u8] = b"abCDEF123456";
521 
522     let mut f = tempfile().unwrap();
523     f.write_all(INITIAL).unwrap();
524     let mut aiow = Box::pin(AioWrite::new(
525         f.as_raw_fd(),
526         2, //offset
527         WBUF,
528         0, //priority
529         SigevNotify::SigevSignal {
530             signal: Signal::SIGUSR2,
531             si_value: 0, //TODO: validate in sigfunc
532         },
533     ));
534     aiow.as_mut().submit().unwrap();
535     while !SIGNALED.load(Ordering::Relaxed) {
536         thread::sleep(time::Duration::from_millis(10));
537     }
538 
539     assert_eq!(aiow.as_mut().aio_return().unwrap(), WBUF.len());
540     f.rewind().unwrap();
541     let len = f.read_to_end(&mut rbuf).unwrap();
542     assert_eq!(len, EXPECT.len());
543     assert_eq!(rbuf, EXPECT);
544 }
545 
546 // Tests using aio_cancel_all for all outstanding IOs.
547 #[test]
548 #[cfg_attr(target_env = "musl", ignore)]
test_aio_cancel_all()549 fn test_aio_cancel_all() {
550     let wbuf: &[u8] = b"CDEF";
551 
552     let f = tempfile().unwrap();
553     let mut aiocb = Box::pin(AioWrite::new(
554         f.as_raw_fd(),
555         0, //offset
556         wbuf,
557         0, //priority
558         SigevNotify::SigevNone,
559     ));
560     aiocb.as_mut().submit().unwrap();
561     let err = aiocb.as_mut().error();
562     assert!(err == Ok(()) || err == Err(Errno::EINPROGRESS));
563 
564     aio_cancel_all(f.as_raw_fd()).unwrap();
565 
566     // Wait for aiocb to complete, but don't care whether it succeeded
567     let _ = poll_aio!(&mut aiocb);
568     let _ = aiocb.as_mut().aio_return();
569 }
570 
571 #[test]
test_aio_suspend()572 fn test_aio_suspend() {
573     const INITIAL: &[u8] = b"abcdef123456";
574     const WBUF: &[u8] = b"CDEFG";
575     let timeout = TimeSpec::seconds(10);
576     let mut rbuf = vec![0; 4];
577     let rlen = rbuf.len();
578     let mut f = tempfile().unwrap();
579     f.write_all(INITIAL).unwrap();
580 
581     let mut wcb = Box::pin(AioWrite::new(
582         f.as_raw_fd(),
583         2, //offset
584         WBUF,
585         0, //priority
586         SigevNotify::SigevNone,
587     ));
588 
589     let mut rcb = Box::pin(AioRead::new(
590         f.as_raw_fd(),
591         8, //offset
592         &mut rbuf,
593         0, //priority
594         SigevNotify::SigevNone,
595     ));
596     wcb.as_mut().submit().unwrap();
597     rcb.as_mut().submit().unwrap();
598     loop {
599         {
600             let cbbuf = [
601                 &*wcb as &dyn AsRef<libc::aiocb>,
602                 &*rcb as &dyn AsRef<libc::aiocb>,
603             ];
604             let r = aio_suspend(&cbbuf[..], Some(timeout));
605             match r {
606                 Err(Errno::EINTR) => continue,
607                 Err(e) => panic!("aio_suspend returned {e:?}"),
608                 Ok(_) => (),
609             };
610         }
611         if rcb.as_mut().error() != Err(Errno::EINPROGRESS)
612             && wcb.as_mut().error() != Err(Errno::EINPROGRESS)
613         {
614             break;
615         }
616     }
617 
618     assert_eq!(wcb.as_mut().aio_return().unwrap(), WBUF.len());
619     assert_eq!(rcb.as_mut().aio_return().unwrap(), rlen);
620 }
621 
622 /// aio_suspend relies on casting Rust Aio* struct pointers to libc::aiocb
623 /// pointers.  This test ensures that such casts are valid.
624 #[test]
casting()625 fn casting() {
626     let sev = SigevNotify::SigevNone;
627     let aiof = AioFsync::new(666, AioFsyncMode::O_SYNC, 0, sev);
628     assert_eq!(
629         aiof.as_ref() as *const libc::aiocb,
630         &aiof as *const AioFsync as *const libc::aiocb
631     );
632 
633     let mut rbuf = [];
634     let aior = AioRead::new(666, 0, &mut rbuf, 0, sev);
635     assert_eq!(
636         aior.as_ref() as *const libc::aiocb,
637         &aior as *const AioRead as *const libc::aiocb
638     );
639 
640     let wbuf = [];
641     let aiow = AioWrite::new(666, 0, &wbuf, 0, sev);
642     assert_eq!(
643         aiow.as_ref() as *const libc::aiocb,
644         &aiow as *const AioWrite as *const libc::aiocb
645     );
646 }
647 
648 #[cfg(target_os = "freebsd")]
649 #[test]
casting_vectored()650 fn casting_vectored() {
651     use std::io::{IoSlice, IoSliceMut};
652 
653     let sev = SigevNotify::SigevNone;
654 
655     let mut rbuf = [];
656     let mut rbufs = [IoSliceMut::new(&mut rbuf)];
657     let aiorv = AioReadv::new(666, 0, &mut rbufs[..], 0, sev);
658     assert_eq!(
659         aiorv.as_ref() as *const libc::aiocb,
660         &aiorv as *const AioReadv as *const libc::aiocb
661     );
662 
663     let wbuf = [];
664     let wbufs = [IoSlice::new(&wbuf)];
665     let aiowv = AioWritev::new(666, 0, &wbufs, 0, sev);
666     assert_eq!(
667         aiowv.as_ref() as *const libc::aiocb,
668         &aiowv as *const AioWritev as *const libc::aiocb
669     );
670 }
671