1 use std::fs::File; 2 use std::io::{self, Read, Write}; 3 #[cfg(not(target_os = "hermit"))] 4 use std::os::fd::{AsRawFd, FromRawFd, RawFd}; 5 // TODO: once <https://github.com/rust-lang/rust/issues/126198> is fixed this 6 // can use `std::os::fd` and be merged with the above. 7 #[cfg(target_os = "hermit")] 8 use std::os::hermit::io::{AsRawFd, FromRawFd, RawFd}; 9 10 use crate::sys::unix::pipe; 11 use crate::sys::Selector; 12 use crate::{Interest, Token}; 13 14 /// Waker backed by a unix pipe. 15 /// 16 /// Waker controls both the sending and receiving ends and empties the pipe 17 /// if writing to it (waking) fails. 18 #[derive(Debug)] 19 pub(crate) struct Waker { 20 sender: File, 21 receiver: File, 22 } 23 24 impl Waker { 25 #[allow(dead_code)] // Not used by the `poll(2)` implementation. new(selector: &Selector, token: Token) -> io::Result<Waker>26 pub(crate) fn new(selector: &Selector, token: Token) -> io::Result<Waker> { 27 let waker = Waker::new_unregistered()?; 28 selector.register(waker.receiver.as_raw_fd(), token, Interest::READABLE)?; 29 Ok(waker) 30 } 31 new_unregistered() -> io::Result<Waker>32 pub(crate) fn new_unregistered() -> io::Result<Waker> { 33 let [receiver, sender] = pipe::new_raw()?; 34 let sender = unsafe { File::from_raw_fd(sender) }; 35 let receiver = unsafe { File::from_raw_fd(receiver) }; 36 Ok(Waker { sender, receiver }) 37 } 38 wake(&self) -> io::Result<()>39 pub(crate) fn wake(&self) -> io::Result<()> { 40 // The epoll emulation on some illumos systems currently requires 41 // the pipe buffer to be completely empty for an edge-triggered 42 // wakeup on the pipe read side. 43 // See https://www.illumos.org/issues/13436. 44 #[cfg(target_os = "illumos")] 45 self.empty(); 46 47 match (&self.sender).write(&[1]) { 48 Ok(_) => Ok(()), 49 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { 50 // The reading end is full so we'll empty the buffer and try 51 // again. 52 self.empty(); 53 self.wake() 54 } 55 Err(ref err) if err.kind() == io::ErrorKind::Interrupted => self.wake(), 56 Err(err) => Err(err), 57 } 58 } 59 60 #[allow(dead_code)] // Only used by the `poll(2)` implementation. ack_and_reset(&self)61 pub(crate) fn ack_and_reset(&self) { 62 self.empty(); 63 } 64 65 /// Empty the pipe's buffer, only need to call this if `wake` fails. 66 /// This ignores any errors. empty(&self)67 fn empty(&self) { 68 let mut buf = [0; 4096]; 69 loop { 70 match (&self.receiver).read(&mut buf) { 71 Ok(n) if n > 0 => continue, 72 _ => return, 73 } 74 } 75 } 76 } 77 78 impl AsRawFd for Waker { as_raw_fd(&self) -> RawFd79 fn as_raw_fd(&self) -> RawFd { 80 self.receiver.as_raw_fd() 81 } 82 } 83