1 use std::fs::File;
2 use std::io::{self, Read, Write};
3 #[cfg(not(target_os = "hermit"))]
4 use std::os::fd::{AsRawFd, FromRawFd, RawFd};
5 // TODO: once <https://github.com/rust-lang/rust/issues/126198> is fixed this
6 // can use `std::os::fd` and be merged with the above.
7 #[cfg(target_os = "hermit")]
8 use std::os::hermit::io::{AsRawFd, FromRawFd, RawFd};
9 
10 use crate::sys::unix::pipe;
11 use crate::sys::Selector;
12 use crate::{Interest, Token};
13 
14 /// Waker backed by a unix pipe.
15 ///
16 /// Waker controls both the sending and receiving ends and empties the pipe
17 /// if writing to it (waking) fails.
18 #[derive(Debug)]
19 pub(crate) struct Waker {
20     sender: File,
21     receiver: File,
22 }
23 
24 impl Waker {
25     #[allow(dead_code)] // Not used by the `poll(2)` implementation.
new(selector: &Selector, token: Token) -> io::Result<Waker>26     pub(crate) fn new(selector: &Selector, token: Token) -> io::Result<Waker> {
27         let waker = Waker::new_unregistered()?;
28         selector.register(waker.receiver.as_raw_fd(), token, Interest::READABLE)?;
29         Ok(waker)
30     }
31 
new_unregistered() -> io::Result<Waker>32     pub(crate) fn new_unregistered() -> io::Result<Waker> {
33         let [receiver, sender] = pipe::new_raw()?;
34         let sender = unsafe { File::from_raw_fd(sender) };
35         let receiver = unsafe { File::from_raw_fd(receiver) };
36         Ok(Waker { sender, receiver })
37     }
38 
wake(&self) -> io::Result<()>39     pub(crate) fn wake(&self) -> io::Result<()> {
40         // The epoll emulation on some illumos systems currently requires
41         // the pipe buffer to be completely empty for an edge-triggered
42         // wakeup on the pipe read side.
43         // See https://www.illumos.org/issues/13436.
44         #[cfg(target_os = "illumos")]
45         self.empty();
46 
47         match (&self.sender).write(&[1]) {
48             Ok(_) => Ok(()),
49             Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
50                 // The reading end is full so we'll empty the buffer and try
51                 // again.
52                 self.empty();
53                 self.wake()
54             }
55             Err(ref err) if err.kind() == io::ErrorKind::Interrupted => self.wake(),
56             Err(err) => Err(err),
57         }
58     }
59 
60     #[allow(dead_code)] // Only used by the `poll(2)` implementation.
ack_and_reset(&self)61     pub(crate) fn ack_and_reset(&self) {
62         self.empty();
63     }
64 
65     /// Empty the pipe's buffer, only need to call this if `wake` fails.
66     /// This ignores any errors.
empty(&self)67     fn empty(&self) {
68         let mut buf = [0; 4096];
69         loop {
70             match (&self.receiver).read(&mut buf) {
71                 Ok(n) if n > 0 => continue,
72                 _ => return,
73             }
74         }
75     }
76 }
77 
78 impl AsRawFd for Waker {
as_raw_fd(&self) -> RawFd79     fn as_raw_fd(&self) -> RawFd {
80         self.receiver.as_raw_fd()
81     }
82 }
83