1 // This implementation is based on the one in the `polling` crate.
2 // Thanks to https://github.com/Kestrer for the original implementation!
3 // Permission to use this code has been granted by original author:
4 // https://github.com/tokio-rs/mio/pull/1602#issuecomment-1218441031
5 
6 use std::collections::HashMap;
7 use std::fmt::{Debug, Formatter};
8 #[cfg(not(target_os = "hermit"))]
9 use std::os::fd::{AsRawFd, RawFd};
10 // TODO: once <https://github.com/rust-lang/rust/issues/126198> is fixed this
11 // can use `std::os::fd` and be merged with the above.
12 #[cfg(target_os = "hermit")]
13 use std::os::hermit::io::{AsRawFd, RawFd};
14 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
15 use std::sync::{Arc, Condvar, Mutex};
16 use std::time::Duration;
17 use std::{cmp, fmt, io};
18 
19 use crate::sys::unix::waker::Waker as WakerInternal;
20 use crate::{Interest, Token};
21 
22 /// Unique id for use as `SelectorId`.
23 #[cfg(debug_assertions)]
24 static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
25 
26 #[derive(Debug)]
27 pub struct Selector {
28     state: Arc<SelectorState>,
29 }
30 
31 impl Selector {
new() -> io::Result<Selector>32     pub fn new() -> io::Result<Selector> {
33         let state = SelectorState::new()?;
34 
35         Ok(Selector {
36             state: Arc::new(state),
37         })
38     }
39 
try_clone(&self) -> io::Result<Selector>40     pub fn try_clone(&self) -> io::Result<Selector> {
41         let state = self.state.clone();
42 
43         Ok(Selector { state })
44     }
45 
select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()>46     pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
47         self.state.select(events, timeout)
48     }
49 
register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()>50     pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
51         self.state.register(fd, token, interests)
52     }
53 
54     #[allow(dead_code)]
register_internal( &self, fd: RawFd, token: Token, interests: Interest, ) -> io::Result<Arc<RegistrationRecord>>55     pub(crate) fn register_internal(
56         &self,
57         fd: RawFd,
58         token: Token,
59         interests: Interest,
60     ) -> io::Result<Arc<RegistrationRecord>> {
61         self.state.register_internal(fd, token, interests)
62     }
63 
reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()>64     pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
65         self.state.reregister(fd, token, interests)
66     }
67 
deregister(&self, fd: RawFd) -> io::Result<()>68     pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
69         self.state.deregister(fd)
70     }
71 
wake(&self, token: Token) -> io::Result<()>72     pub fn wake(&self, token: Token) -> io::Result<()> {
73         self.state.wake(token)
74     }
75 
76     cfg_io_source! {
77         #[cfg(debug_assertions)]
78         pub fn id(&self) -> usize {
79             self.state.id
80         }
81     }
82 }
83 
84 /// Interface to poll.
85 #[derive(Debug)]
86 struct SelectorState {
87     /// File descriptors to poll.
88     fds: Mutex<Fds>,
89 
90     /// File descriptors which will be removed before the next poll call.
91     ///
92     /// When a file descriptor is deregistered while a poll is running, we need to filter
93     /// out all removed descriptors after that poll is finished running.
94     pending_removal: Mutex<Vec<RawFd>>,
95 
96     /// Token associated with Waker that have recently asked to wake.  This will
97     /// cause a synthetic behaviour where on any wakeup we add all pending tokens
98     /// to the list of emitted events.
99     pending_wake_token: Mutex<Option<Token>>,
100 
101     /// Data is written to this to wake up the current instance of `wait`, which can occur when the
102     /// user notifies it (in which case `notified` would have been set) or when an operation needs
103     /// to occur (in which case `waiting_operations` would have been incremented).
104     notify_waker: WakerInternal,
105 
106     /// The number of operations (`add`, `modify` or `delete`) that are currently waiting on the
107     /// mutex to become free. When this is nonzero, `wait` must be suspended until it reaches zero
108     /// again.
109     waiting_operations: AtomicUsize,
110     /// The condition variable that gets notified when `waiting_operations` reaches zero or
111     /// `notified` becomes true.
112     ///
113     /// This is used with the `fds` mutex.
114     operations_complete: Condvar,
115 
116     /// This selectors id.
117     #[cfg(debug_assertions)]
118     #[allow(dead_code)]
119     id: usize,
120 }
121 
122 /// The file descriptors to poll in a `Poller`.
123 #[derive(Debug, Clone)]
124 struct Fds {
125     /// The list of `pollfds` taken by poll.
126     ///
127     /// The first file descriptor is always present and is used to notify the poller.
128     poll_fds: Vec<PollFd>,
129     /// The map of each file descriptor to data associated with it. This does not include the file
130     /// descriptors created by the internal notify waker.
131     fd_data: HashMap<RawFd, FdData>,
132 }
133 
134 /// Transparent wrapper around `libc::pollfd`, used to support `Debug` derives without adding the
135 /// `extra_traits` feature of `libc`.
136 #[repr(transparent)]
137 #[derive(Clone)]
138 struct PollFd(libc::pollfd);
139 
140 impl Debug for PollFd {
fmt(&self, f: &mut Formatter<'_>) -> fmt::Result141     fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
142         f.debug_struct("pollfd")
143             .field("fd", &self.0.fd)
144             .field("events", &self.0.events)
145             .field("revents", &self.0.revents)
146             .finish()
147     }
148 }
149 
150 /// Data associated with a file descriptor in a poller.
151 #[derive(Debug, Clone)]
152 struct FdData {
153     /// The index into `poll_fds` this file descriptor is.
154     poll_fds_index: usize,
155     /// The key of the `Event` associated with this file descriptor.
156     token: Token,
157     /// Used to communicate with IoSourceState when we need to internally deregister
158     /// based on a closed fd.
159     shared_record: Arc<RegistrationRecord>,
160 }
161 
162 impl SelectorState {
new() -> io::Result<SelectorState>163     pub fn new() -> io::Result<SelectorState> {
164         let notify_waker = WakerInternal::new_unregistered()?;
165 
166         Ok(Self {
167             fds: Mutex::new(Fds {
168                 poll_fds: vec![PollFd(libc::pollfd {
169                     fd: notify_waker.as_raw_fd(),
170                     events: libc::POLLIN,
171                     revents: 0,
172                 })],
173                 fd_data: HashMap::new(),
174             }),
175             pending_removal: Mutex::new(Vec::new()),
176             pending_wake_token: Mutex::new(None),
177             notify_waker,
178             waiting_operations: AtomicUsize::new(0),
179             operations_complete: Condvar::new(),
180             #[cfg(debug_assertions)]
181             id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
182         })
183     }
184 
select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()>185     pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
186         events.clear();
187 
188         let mut fds = self.fds.lock().unwrap();
189 
190         // Keep track of fds that receive POLLHUP or POLLERR (i.e. won't receive further
191         // events) and internally deregister them before they are externally deregister'd.  See
192         // IoSourceState below to track how the external deregister call will be handled
193         // when this state occurs.
194         let mut closed_raw_fds = Vec::new();
195 
196         loop {
197             // Complete all current operations.
198             loop {
199                 if self.waiting_operations.load(Ordering::SeqCst) == 0 {
200                     break;
201                 }
202 
203                 fds = self.operations_complete.wait(fds).unwrap();
204             }
205 
206             // Perform the poll.
207             trace!("Polling on {:?}", &fds);
208             let num_events = poll(&mut fds.poll_fds, timeout)?;
209             trace!("Poll finished: {:?}", &fds);
210 
211             if num_events == 0 {
212                 return Ok(());
213             }
214 
215             let waker_events = fds.poll_fds[0].0.revents;
216             let notified = waker_events != 0;
217             let mut num_fd_events = if notified { num_events - 1 } else { num_events };
218 
219             let pending_wake_token = self.pending_wake_token.lock().unwrap().take();
220 
221             if notified {
222                 self.notify_waker.ack_and_reset();
223                 if pending_wake_token.is_some() {
224                     num_fd_events += 1;
225                 }
226             }
227 
228             // We now check whether this poll was performed with descriptors which were pending
229             // for removal and filter out any matching.
230             let mut pending_removal_guard = self.pending_removal.lock().unwrap();
231             let mut pending_removal = std::mem::replace(pending_removal_guard.as_mut(), Vec::new());
232             drop(pending_removal_guard);
233 
234             // Store the events if there were any.
235             if num_fd_events > 0 {
236                 let fds = &mut *fds;
237 
238                 events.reserve(num_fd_events);
239 
240                 // Add synthetic events we picked up from calls to wake()
241                 if let Some(pending_wake_token) = pending_wake_token {
242                     events.push(Event {
243                         token: pending_wake_token,
244                         events: waker_events,
245                     });
246                 }
247 
248                 for fd_data in fds.fd_data.values_mut() {
249                     let PollFd(poll_fd) = &mut fds.poll_fds[fd_data.poll_fds_index];
250 
251                     if pending_removal.contains(&poll_fd.fd) {
252                         // Fd was removed while poll was running
253                         continue;
254                     }
255 
256                     if poll_fd.revents != 0 {
257                         // Store event
258                         events.push(Event {
259                             token: fd_data.token,
260                             events: poll_fd.revents,
261                         });
262 
263                         if poll_fd.revents & (libc::POLLHUP | libc::POLLERR) != 0 {
264                             pending_removal.push(poll_fd.fd);
265                             closed_raw_fds.push(poll_fd.fd);
266                         }
267 
268                         // Remove the interest which just got triggered the IoSourceState's do_io
269                         // wrapper used with this selector will add back the interest using
270                         // reregister.
271                         poll_fd.events &= !poll_fd.revents;
272 
273                         // Minor optimization to potentially avoid looping n times where n is the
274                         // number of input fds (i.e. we might loop between m and n times where m is
275                         // the number of fds with revents != 0).
276                         if events.len() == num_fd_events {
277                             break;
278                         }
279                     }
280                 }
281 
282                 break; // No more polling.
283             }
284 
285             // If we didn't break above it means we got woken up internally (for example for adding an fd), so we poll again.
286         }
287 
288         drop(fds);
289         let _ = self.deregister_all(&closed_raw_fds);
290 
291         Ok(())
292     }
293 
register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()>294     pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
295         self.register_internal(fd, token, interests).map(|_| ())
296     }
297 
register_internal( &self, fd: RawFd, token: Token, interests: Interest, ) -> io::Result<Arc<RegistrationRecord>>298     pub fn register_internal(
299         &self,
300         fd: RawFd,
301         token: Token,
302         interests: Interest,
303     ) -> io::Result<Arc<RegistrationRecord>> {
304         #[cfg(debug_assertions)]
305         if fd == self.notify_waker.as_raw_fd() {
306             return Err(io::Error::from(io::ErrorKind::InvalidInput));
307         }
308 
309         // We must handle the unlikely case that the following order of operations happens:
310         //
311         // register(1 as RawFd)
312         // deregister(1 as RawFd)
313         // register(1 as RawFd)
314         // <poll happens>
315         //
316         // Fd's pending removal only get cleared when poll has been run. It is possible that
317         // between registering and deregistering and then _again_ registering the file descriptor
318         // poll never gets called, thus the fd stays stuck in the pending removal list.
319         //
320         // To avoid this scenario we remove an fd from pending removals when registering it.
321         let mut pending_removal = self.pending_removal.lock().unwrap();
322         if let Some(idx) = pending_removal.iter().position(|&pending| pending == fd) {
323             pending_removal.swap_remove(idx);
324         }
325         drop(pending_removal);
326 
327         self.modify_fds(|fds| {
328             if fds.fd_data.contains_key(&fd) {
329                 return Err(io::Error::new(
330                     io::ErrorKind::AlreadyExists,
331                     "I/O source already registered this `Registry` \
332                     (an old file descriptor might have been closed without deregistration)",
333                 ));
334             }
335 
336             let poll_fds_index = fds.poll_fds.len();
337             let record = Arc::new(RegistrationRecord::new());
338             fds.fd_data.insert(
339                 fd,
340                 FdData {
341                     poll_fds_index,
342                     token,
343                     shared_record: record.clone(),
344                 },
345             );
346 
347             fds.poll_fds.push(PollFd(libc::pollfd {
348                 fd,
349                 events: interests_to_poll(interests),
350                 revents: 0,
351             }));
352 
353             Ok(record)
354         })
355     }
356 
reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()>357     pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
358         self.modify_fds(|fds| {
359             let data = fds.fd_data.get_mut(&fd).ok_or(io::ErrorKind::NotFound)?;
360             data.token = token;
361             let poll_fds_index = data.poll_fds_index;
362             fds.poll_fds[poll_fds_index].0.events = interests_to_poll(interests);
363 
364             Ok(())
365         })
366     }
367 
deregister(&self, fd: RawFd) -> io::Result<()>368     pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
369         self.deregister_all(&[fd])
370             .map_err(|_| io::ErrorKind::NotFound)?;
371         Ok(())
372     }
373 
374     /// Perform a modification on `fds`, interrupting the current caller of `wait` if it's running.
modify_fds<T>(&self, f: impl FnOnce(&mut Fds) -> T) -> T375     fn modify_fds<T>(&self, f: impl FnOnce(&mut Fds) -> T) -> T {
376         self.waiting_operations.fetch_add(1, Ordering::SeqCst);
377 
378         // Wake up the current caller of `wait` if there is one.
379         let sent_notification = self.notify_waker.wake().is_ok();
380 
381         let mut fds = self.fds.lock().unwrap();
382 
383         // If there was no caller of `wait` our notification was not removed from the pipe.
384         if sent_notification {
385             self.notify_waker.ack_and_reset();
386         }
387 
388         let res = f(&mut *fds);
389 
390         if self.waiting_operations.fetch_sub(1, Ordering::SeqCst) == 1 {
391             self.operations_complete.notify_one();
392         }
393 
394         res
395     }
396 
397     /// Special optimized version of [Self::deregister] which handles multiple removals
398     /// at once.  Ok result if all removals were performed, Err if any entries
399     /// were not found.
deregister_all(&self, targets: &[RawFd]) -> Result<(), ()>400     fn deregister_all(&self, targets: &[RawFd]) -> Result<(), ()> {
401         if targets.is_empty() {
402             return Ok(());
403         }
404 
405         let mut pending_removal = self.pending_removal.lock().unwrap();
406         pending_removal.extend(targets);
407         drop(pending_removal);
408 
409         self.modify_fds(|fds| {
410             let mut all_successful = true;
411 
412             for target in targets {
413                 match fds.fd_data.remove(target).ok_or(()) {
414                     Ok(data) => {
415                         data.shared_record.mark_unregistered();
416                         fds.poll_fds.swap_remove(data.poll_fds_index);
417                         if let Some(swapped_pollfd) = fds.poll_fds.get(data.poll_fds_index) {
418                             fds.fd_data
419                                 .get_mut(&swapped_pollfd.0.fd)
420                                 .unwrap()
421                                 .poll_fds_index = data.poll_fds_index;
422                         }
423                     }
424                     Err(_) => all_successful = false,
425                 }
426             }
427 
428             if all_successful {
429                 Ok(())
430             } else {
431                 Err(())
432             }
433         })
434     }
435 
wake(&self, token: Token) -> io::Result<()>436     pub fn wake(&self, token: Token) -> io::Result<()> {
437         self.pending_wake_token.lock().unwrap().replace(token);
438         self.notify_waker.wake()
439     }
440 }
441 
442 /// Shared record between IoSourceState and SelectorState that allows us to internally
443 /// deregister partially or fully closed fds (i.e. when we get POLLHUP or PULLERR) without
444 /// confusing IoSourceState and trying to deregister twice.  This isn't strictly
445 /// required as technically deregister is idempotent but it is confusing
446 /// when trying to debug behaviour as we get imbalanced calls to register/deregister and
447 /// superfluous NotFound errors.
448 #[derive(Debug)]
449 pub(crate) struct RegistrationRecord {
450     is_unregistered: AtomicBool,
451 }
452 
453 impl RegistrationRecord {
new() -> Self454     pub fn new() -> Self {
455         Self {
456             is_unregistered: AtomicBool::new(false),
457         }
458     }
459 
mark_unregistered(&self)460     pub fn mark_unregistered(&self) {
461         self.is_unregistered.store(true, Ordering::Relaxed);
462     }
463 
464     #[allow(dead_code)]
is_registered(&self) -> bool465     pub fn is_registered(&self) -> bool {
466         !self.is_unregistered.load(Ordering::Relaxed)
467     }
468 }
469 
470 #[cfg(target_os = "linux")]
471 const POLLRDHUP: libc::c_short = libc::POLLRDHUP;
472 #[cfg(not(target_os = "linux"))]
473 const POLLRDHUP: libc::c_short = 0;
474 
475 const READ_EVENTS: libc::c_short = libc::POLLIN | POLLRDHUP;
476 
477 const WRITE_EVENTS: libc::c_short = libc::POLLOUT;
478 
479 const PRIORITY_EVENTS: libc::c_short = libc::POLLPRI;
480 
481 /// Get the input poll events for the given event.
interests_to_poll(interest: Interest) -> libc::c_short482 fn interests_to_poll(interest: Interest) -> libc::c_short {
483     let mut kind = 0;
484 
485     if interest.is_readable() {
486         kind |= READ_EVENTS;
487     }
488 
489     if interest.is_writable() {
490         kind |= WRITE_EVENTS;
491     }
492 
493     if interest.is_priority() {
494         kind |= PRIORITY_EVENTS;
495     }
496 
497     kind
498 }
499 
500 /// Helper function to call poll.
poll(fds: &mut [PollFd], timeout: Option<Duration>) -> io::Result<usize>501 fn poll(fds: &mut [PollFd], timeout: Option<Duration>) -> io::Result<usize> {
502     loop {
503         // A bug in kernels < 2.6.37 makes timeouts larger than LONG_MAX / CONFIG_HZ
504         // (approx. 30 minutes with CONFIG_HZ=1200) effectively infinite on 32 bits
505         // architectures. The magic number is the same constant used by libuv.
506         #[cfg(target_pointer_width = "32")]
507         const MAX_SAFE_TIMEOUT: u128 = 1789569;
508         #[cfg(not(target_pointer_width = "32"))]
509         const MAX_SAFE_TIMEOUT: u128 = libc::c_int::MAX as u128;
510 
511         let timeout = timeout
512             .map(|to| {
513                 // `Duration::as_millis` truncates, so round up. This avoids
514                 // turning sub-millisecond timeouts into a zero timeout, unless
515                 // the caller explicitly requests that by specifying a zero
516                 // timeout.
517                 let to_ms = to
518                     .checked_add(Duration::from_nanos(999_999))
519                     .unwrap_or(to)
520                     .as_millis();
521                 cmp::min(MAX_SAFE_TIMEOUT, to_ms) as libc::c_int
522             })
523             .unwrap_or(-1);
524 
525         let res = syscall!(poll(
526             fds.as_mut_ptr() as *mut libc::pollfd,
527             fds.len() as libc::nfds_t,
528             timeout,
529         ));
530 
531         match res {
532             Ok(num_events) => break Ok(num_events as usize),
533             // poll returns EAGAIN if we can retry it.
534             Err(e) if e.raw_os_error() == Some(libc::EAGAIN) => continue,
535             Err(e) => return Err(e),
536         }
537     }
538 }
539 
540 #[derive(Debug, Clone)]
541 pub struct Event {
542     token: Token,
543     events: libc::c_short,
544 }
545 
546 pub type Events = Vec<Event>;
547 
548 pub mod event {
549     use std::fmt;
550 
551     use crate::sys::Event;
552     use crate::Token;
553 
554     use super::POLLRDHUP;
555 
token(event: &Event) -> Token556     pub fn token(event: &Event) -> Token {
557         event.token
558     }
559 
is_readable(event: &Event) -> bool560     pub fn is_readable(event: &Event) -> bool {
561         (event.events & libc::POLLIN) != 0 || (event.events & libc::POLLPRI) != 0
562     }
563 
is_writable(event: &Event) -> bool564     pub fn is_writable(event: &Event) -> bool {
565         (event.events & libc::POLLOUT) != 0
566     }
567 
is_error(event: &Event) -> bool568     pub fn is_error(event: &Event) -> bool {
569         (event.events & libc::POLLERR) != 0
570     }
571 
is_read_closed(event: &Event) -> bool572     pub fn is_read_closed(event: &Event) -> bool {
573         // Both halves of the socket have closed
574         (event.events & libc::POLLHUP) != 0
575             // Socket has received FIN or called shutdown(SHUT_RD)
576             || (event.events & POLLRDHUP) != 0
577     }
578 
is_write_closed(event: &Event) -> bool579     pub fn is_write_closed(event: &Event) -> bool {
580         // Both halves of the socket have closed
581         (event.events & libc::POLLHUP) != 0
582             // Unix pipe write end has closed
583             || ((event.events & libc::POLLOUT) != 0 && (event.events & libc::POLLERR) != 0)
584             // The other side (read end) of a Unix pipe has closed.
585             || (event.events == libc::POLLERR)
586     }
587 
is_priority(event: &Event) -> bool588     pub fn is_priority(event: &Event) -> bool {
589         (event.events & libc::POLLPRI) != 0
590     }
591 
is_aio(_: &Event) -> bool592     pub fn is_aio(_: &Event) -> bool {
593         // Not supported in the kernel, only in libc.
594         false
595     }
596 
is_lio(_: &Event) -> bool597     pub fn is_lio(_: &Event) -> bool {
598         // Not supported.
599         false
600     }
601 
debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result602     pub fn debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result {
603         #[allow(clippy::trivially_copy_pass_by_ref)]
604         fn check_events(got: &libc::c_short, want: &libc::c_short) -> bool {
605             (*got & want) != 0
606         }
607         debug_detail!(
608             EventsDetails(libc::c_short),
609             check_events,
610             libc::POLLIN,
611             libc::POLLPRI,
612             libc::POLLOUT,
613             libc::POLLRDNORM,
614             libc::POLLRDBAND,
615             libc::POLLWRNORM,
616             libc::POLLWRBAND,
617             libc::POLLERR,
618             libc::POLLHUP,
619         );
620 
621         f.debug_struct("poll_event")
622             .field("token", &event.token)
623             .field("events", &EventsDetails(event.events))
624             .finish()
625     }
626 }
627 
628 #[derive(Debug)]
629 pub(crate) struct Waker {
630     selector: Selector,
631     token: Token,
632 }
633 
634 impl Waker {
new(selector: &Selector, token: Token) -> io::Result<Waker>635     pub(crate) fn new(selector: &Selector, token: Token) -> io::Result<Waker> {
636         Ok(Waker {
637             selector: selector.try_clone()?,
638             token,
639         })
640     }
641 
wake(&self) -> io::Result<()>642     pub(crate) fn wake(&self) -> io::Result<()> {
643         self.selector.wake(self.token)
644     }
645 }
646 
647 cfg_io_source! {
648     use crate::Registry;
649 
650     struct InternalState {
651         selector: Selector,
652         token: Token,
653         interests: Interest,
654         fd: RawFd,
655         shared_record: Arc<RegistrationRecord>,
656     }
657 
658     impl Drop for InternalState {
659         fn drop(&mut self) {
660             if self.shared_record.is_registered() {
661                 let _ = self.selector.deregister(self.fd);
662             }
663         }
664     }
665 
666     pub(crate) struct IoSourceState {
667         inner: Option<Box<InternalState>>,
668     }
669 
670     impl IoSourceState {
671         pub fn new() -> IoSourceState {
672             IoSourceState { inner: None }
673         }
674 
675         pub fn do_io<T, F, R>(&self, f: F, io: &T) -> io::Result<R>
676         where
677         F: FnOnce(&T) -> io::Result<R>,
678         {
679             let result = f(io);
680 
681             if let Err(err) = &result {
682                 if err.kind() == io::ErrorKind::WouldBlock {
683                     self.inner.as_ref().map_or(Ok(()), |state| {
684                         state
685                         .selector
686                         .reregister(state.fd, state.token, state.interests)
687                     })?;
688                 }
689             }
690 
691             result
692         }
693 
694         pub fn register(
695             &mut self,
696             registry: &Registry,
697             token: Token,
698             interests: Interest,
699             fd: RawFd,
700         ) -> io::Result<()> {
701             if self.inner.is_some() {
702                 Err(io::ErrorKind::AlreadyExists.into())
703             } else {
704                 let selector = registry.selector().try_clone()?;
705 
706                 selector.register_internal(fd, token, interests).map(move |shared_record| {
707                     let state = InternalState {
708                         selector,
709                         token,
710                         interests,
711                         fd,
712                         shared_record,
713                     };
714 
715                     self.inner = Some(Box::new(state));
716                 })
717             }
718         }
719 
720         pub fn reregister(
721             &mut self,
722             registry: &Registry,
723             token: Token,
724             interests: Interest,
725             fd: RawFd,
726         ) -> io::Result<()> {
727             match self.inner.as_mut() {
728                 Some(state) => registry
729                 .selector()
730                 .reregister(fd, token, interests)
731                 .map(|()| {
732                     state.token = token;
733                     state.interests = interests;
734                 }),
735                 None => Err(io::ErrorKind::NotFound.into()),
736             }
737         }
738 
739         pub fn deregister(&mut self, registry: &Registry, fd: RawFd) -> io::Result<()> {
740             if let Some(state) = self.inner.take() {
741                 // Marking unregistered will short circuit the drop behaviour of calling
742                 // deregister so the call to deregister below is strictly required.
743                 state.shared_record.mark_unregistered();
744             }
745 
746             registry.selector().deregister(fd)
747         }
748     }
749 }
750