1 use std::os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd};
2 #[cfg(debug_assertions)]
3 use std::sync::atomic::{AtomicUsize, Ordering};
4 use std::time::Duration;
5 use std::{io, ptr};
6 
7 use libc::{EPOLLET, EPOLLIN, EPOLLOUT, EPOLLPRI, EPOLLRDHUP};
8 
9 use crate::{Interest, Token};
10 
11 /// Unique id for use as `SelectorId`.
12 #[cfg(debug_assertions)]
13 static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
14 
15 #[derive(Debug)]
16 pub struct Selector {
17     #[cfg(debug_assertions)]
18     id: usize,
19     ep: OwnedFd,
20 }
21 
22 impl Selector {
new() -> io::Result<Selector>23     pub fn new() -> io::Result<Selector> {
24         // SAFETY: `epoll_create1(2)` ensures the fd is valid.
25         let ep = unsafe { OwnedFd::from_raw_fd(syscall!(epoll_create1(libc::EPOLL_CLOEXEC))?) };
26         Ok(Selector {
27             #[cfg(debug_assertions)]
28             id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
29             ep,
30         })
31     }
32 
try_clone(&self) -> io::Result<Selector>33     pub fn try_clone(&self) -> io::Result<Selector> {
34         self.ep.try_clone().map(|ep| Selector {
35             // It's the same selector, so we use the same id.
36             #[cfg(debug_assertions)]
37             id: self.id,
38             ep,
39         })
40     }
41 
select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()>42     pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
43         let timeout = timeout
44             .map(|to| {
45                 // `Duration::as_millis` truncates, so round up. This avoids
46                 // turning sub-millisecond timeouts into a zero timeout, unless
47                 // the caller explicitly requests that by specifying a zero
48                 // timeout.
49                 to.checked_add(Duration::from_nanos(999_999))
50                     .unwrap_or(to)
51                     .as_millis() as libc::c_int
52             })
53             .unwrap_or(-1);
54 
55         events.clear();
56         syscall!(epoll_wait(
57             self.ep.as_raw_fd(),
58             events.as_mut_ptr(),
59             events.capacity() as i32,
60             timeout,
61         ))
62         .map(|n_events| {
63             // This is safe because `epoll_wait` ensures that `n_events` are
64             // assigned.
65             unsafe { events.set_len(n_events as usize) };
66         })
67     }
68 
register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()>69     pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
70         let mut event = libc::epoll_event {
71             events: interests_to_epoll(interests),
72             u64: usize::from(token) as u64,
73             #[cfg(target_os = "redox")]
74             _pad: 0,
75         };
76 
77         let ep = self.ep.as_raw_fd();
78         syscall!(epoll_ctl(ep, libc::EPOLL_CTL_ADD, fd, &mut event)).map(|_| ())
79     }
80 
reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()>81     pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
82         let mut event = libc::epoll_event {
83             events: interests_to_epoll(interests),
84             u64: usize::from(token) as u64,
85             #[cfg(target_os = "redox")]
86             _pad: 0,
87         };
88 
89         let ep = self.ep.as_raw_fd();
90         syscall!(epoll_ctl(ep, libc::EPOLL_CTL_MOD, fd, &mut event)).map(|_| ())
91     }
92 
deregister(&self, fd: RawFd) -> io::Result<()>93     pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
94         let ep = self.ep.as_raw_fd();
95         syscall!(epoll_ctl(ep, libc::EPOLL_CTL_DEL, fd, ptr::null_mut())).map(|_| ())
96     }
97 }
98 
99 cfg_io_source! {
100     impl Selector {
101         #[cfg(debug_assertions)]
102         pub fn id(&self) -> usize {
103             self.id
104         }
105     }
106 }
107 
108 impl AsRawFd for Selector {
as_raw_fd(&self) -> RawFd109     fn as_raw_fd(&self) -> RawFd {
110         self.ep.as_raw_fd()
111     }
112 }
113 
interests_to_epoll(interests: Interest) -> u32114 fn interests_to_epoll(interests: Interest) -> u32 {
115     let mut kind = EPOLLET;
116 
117     if interests.is_readable() {
118         kind = kind | EPOLLIN | EPOLLRDHUP;
119     }
120 
121     if interests.is_writable() {
122         kind |= EPOLLOUT;
123     }
124 
125     if interests.is_priority() {
126         kind |= EPOLLPRI;
127     }
128 
129     kind as u32
130 }
131 
132 pub type Event = libc::epoll_event;
133 pub type Events = Vec<Event>;
134 
135 pub mod event {
136     use std::fmt;
137 
138     use crate::sys::Event;
139     use crate::Token;
140 
token(event: &Event) -> Token141     pub fn token(event: &Event) -> Token {
142         Token(event.u64 as usize)
143     }
144 
is_readable(event: &Event) -> bool145     pub fn is_readable(event: &Event) -> bool {
146         (event.events as libc::c_int & libc::EPOLLIN) != 0
147             || (event.events as libc::c_int & libc::EPOLLPRI) != 0
148     }
149 
is_writable(event: &Event) -> bool150     pub fn is_writable(event: &Event) -> bool {
151         (event.events as libc::c_int & libc::EPOLLOUT) != 0
152     }
153 
is_error(event: &Event) -> bool154     pub fn is_error(event: &Event) -> bool {
155         (event.events as libc::c_int & libc::EPOLLERR) != 0
156     }
157 
is_read_closed(event: &Event) -> bool158     pub fn is_read_closed(event: &Event) -> bool {
159         // Both halves of the socket have closed
160         event.events as libc::c_int & libc::EPOLLHUP != 0
161             // Socket has received FIN or called shutdown(SHUT_RD)
162             || (event.events as libc::c_int & libc::EPOLLIN != 0
163                 && event.events as libc::c_int & libc::EPOLLRDHUP != 0)
164     }
165 
is_write_closed(event: &Event) -> bool166     pub fn is_write_closed(event: &Event) -> bool {
167         // Both halves of the socket have closed
168         event.events as libc::c_int & libc::EPOLLHUP != 0
169             // Unix pipe write end has closed
170             || (event.events as libc::c_int & libc::EPOLLOUT != 0
171                 && event.events as libc::c_int & libc::EPOLLERR != 0)
172             // The other side (read end) of a Unix pipe has closed.
173             || event.events as libc::c_int == libc::EPOLLERR
174     }
175 
is_priority(event: &Event) -> bool176     pub fn is_priority(event: &Event) -> bool {
177         (event.events as libc::c_int & libc::EPOLLPRI) != 0
178     }
179 
is_aio(_: &Event) -> bool180     pub fn is_aio(_: &Event) -> bool {
181         // Not supported in the kernel, only in libc.
182         false
183     }
184 
is_lio(_: &Event) -> bool185     pub fn is_lio(_: &Event) -> bool {
186         // Not supported.
187         false
188     }
189 
debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result190     pub fn debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result {
191         #[allow(clippy::trivially_copy_pass_by_ref)]
192         fn check_events(got: &u32, want: &libc::c_int) -> bool {
193             (*got as libc::c_int & want) != 0
194         }
195         debug_detail!(
196             EventsDetails(u32),
197             check_events,
198             libc::EPOLLIN,
199             libc::EPOLLPRI,
200             libc::EPOLLOUT,
201             libc::EPOLLRDNORM,
202             libc::EPOLLRDBAND,
203             libc::EPOLLWRNORM,
204             libc::EPOLLWRBAND,
205             libc::EPOLLMSG,
206             libc::EPOLLERR,
207             libc::EPOLLHUP,
208             libc::EPOLLET,
209             libc::EPOLLRDHUP,
210             libc::EPOLLONESHOT,
211             libc::EPOLLEXCLUSIVE,
212             libc::EPOLLWAKEUP,
213             libc::EPOLL_CLOEXEC,
214         );
215 
216         // Can't reference fields in packed structures.
217         let e_u64 = event.u64;
218         f.debug_struct("epoll_event")
219             .field("events", &EventsDetails(event.events))
220             .field("u64", &e_u64)
221             .finish()
222     }
223 }
224 
225 // No special requirement from the implementation around waking.
226 pub(crate) use crate::sys::unix::waker::Waker;
227 
228 cfg_io_source! {
229     mod stateless_io_source;
230     pub(crate) use stateless_io_source::IoSourceState;
231 }
232