1 // This implementation is based on the one in the `polling` crate.
2 // Thanks to https://github.com/Kestrer for the original implementation!
3 // Permission to use this code has been granted by original author:
4 // https://github.com/tokio-rs/mio/pull/1602#issuecomment-1218441031
5
6 use std::collections::HashMap;
7 use std::fmt::{Debug, Formatter};
8 #[cfg(not(target_os = "hermit"))]
9 use std::os::fd::{AsRawFd, RawFd};
10 // TODO: once <https://github.com/rust-lang/rust/issues/126198> is fixed this
11 // can use `std::os::fd` and be merged with the above.
12 #[cfg(target_os = "hermit")]
13 use std::os::hermit::io::{AsRawFd, RawFd};
14 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
15 use std::sync::{Arc, Condvar, Mutex};
16 use std::time::Duration;
17 use std::{cmp, fmt, io};
18
19 use crate::sys::unix::waker::Waker as WakerInternal;
20 use crate::{Interest, Token};
21
22 /// Unique id for use as `SelectorId`.
23 #[cfg(debug_assertions)]
24 static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
25
26 #[derive(Debug)]
27 pub struct Selector {
28 state: Arc<SelectorState>,
29 }
30
31 impl Selector {
new() -> io::Result<Selector>32 pub fn new() -> io::Result<Selector> {
33 let state = SelectorState::new()?;
34
35 Ok(Selector {
36 state: Arc::new(state),
37 })
38 }
39
try_clone(&self) -> io::Result<Selector>40 pub fn try_clone(&self) -> io::Result<Selector> {
41 let state = self.state.clone();
42
43 Ok(Selector { state })
44 }
45
select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()>46 pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
47 self.state.select(events, timeout)
48 }
49
register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()>50 pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
51 self.state.register(fd, token, interests)
52 }
53
54 #[allow(dead_code)]
register_internal( &self, fd: RawFd, token: Token, interests: Interest, ) -> io::Result<Arc<RegistrationRecord>>55 pub(crate) fn register_internal(
56 &self,
57 fd: RawFd,
58 token: Token,
59 interests: Interest,
60 ) -> io::Result<Arc<RegistrationRecord>> {
61 self.state.register_internal(fd, token, interests)
62 }
63
reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()>64 pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
65 self.state.reregister(fd, token, interests)
66 }
67
deregister(&self, fd: RawFd) -> io::Result<()>68 pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
69 self.state.deregister(fd)
70 }
71
wake(&self, token: Token) -> io::Result<()>72 pub fn wake(&self, token: Token) -> io::Result<()> {
73 self.state.wake(token)
74 }
75
76 cfg_io_source! {
77 #[cfg(debug_assertions)]
78 pub fn id(&self) -> usize {
79 self.state.id
80 }
81 }
82 }
83
84 /// Interface to poll.
85 #[derive(Debug)]
86 struct SelectorState {
87 /// File descriptors to poll.
88 fds: Mutex<Fds>,
89
90 /// File descriptors which will be removed before the next poll call.
91 ///
92 /// When a file descriptor is deregistered while a poll is running, we need to filter
93 /// out all removed descriptors after that poll is finished running.
94 pending_removal: Mutex<Vec<RawFd>>,
95
96 /// Token associated with Waker that have recently asked to wake. This will
97 /// cause a synthetic behaviour where on any wakeup we add all pending tokens
98 /// to the list of emitted events.
99 pending_wake_token: Mutex<Option<Token>>,
100
101 /// Data is written to this to wake up the current instance of `wait`, which can occur when the
102 /// user notifies it (in which case `notified` would have been set) or when an operation needs
103 /// to occur (in which case `waiting_operations` would have been incremented).
104 notify_waker: WakerInternal,
105
106 /// The number of operations (`add`, `modify` or `delete`) that are currently waiting on the
107 /// mutex to become free. When this is nonzero, `wait` must be suspended until it reaches zero
108 /// again.
109 waiting_operations: AtomicUsize,
110 /// The condition variable that gets notified when `waiting_operations` reaches zero or
111 /// `notified` becomes true.
112 ///
113 /// This is used with the `fds` mutex.
114 operations_complete: Condvar,
115
116 /// This selectors id.
117 #[cfg(debug_assertions)]
118 #[allow(dead_code)]
119 id: usize,
120 }
121
122 /// The file descriptors to poll in a `Poller`.
123 #[derive(Debug, Clone)]
124 struct Fds {
125 /// The list of `pollfds` taken by poll.
126 ///
127 /// The first file descriptor is always present and is used to notify the poller.
128 poll_fds: Vec<PollFd>,
129 /// The map of each file descriptor to data associated with it. This does not include the file
130 /// descriptors created by the internal notify waker.
131 fd_data: HashMap<RawFd, FdData>,
132 }
133
134 /// Transparent wrapper around `libc::pollfd`, used to support `Debug` derives without adding the
135 /// `extra_traits` feature of `libc`.
136 #[repr(transparent)]
137 #[derive(Clone)]
138 struct PollFd(libc::pollfd);
139
140 impl Debug for PollFd {
fmt(&self, f: &mut Formatter<'_>) -> fmt::Result141 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
142 f.debug_struct("pollfd")
143 .field("fd", &self.0.fd)
144 .field("events", &self.0.events)
145 .field("revents", &self.0.revents)
146 .finish()
147 }
148 }
149
150 /// Data associated with a file descriptor in a poller.
151 #[derive(Debug, Clone)]
152 struct FdData {
153 /// The index into `poll_fds` this file descriptor is.
154 poll_fds_index: usize,
155 /// The key of the `Event` associated with this file descriptor.
156 token: Token,
157 /// Used to communicate with IoSourceState when we need to internally deregister
158 /// based on a closed fd.
159 shared_record: Arc<RegistrationRecord>,
160 }
161
162 impl SelectorState {
new() -> io::Result<SelectorState>163 pub fn new() -> io::Result<SelectorState> {
164 let notify_waker = WakerInternal::new_unregistered()?;
165
166 Ok(Self {
167 fds: Mutex::new(Fds {
168 poll_fds: vec![PollFd(libc::pollfd {
169 fd: notify_waker.as_raw_fd(),
170 events: libc::POLLIN,
171 revents: 0,
172 })],
173 fd_data: HashMap::new(),
174 }),
175 pending_removal: Mutex::new(Vec::new()),
176 pending_wake_token: Mutex::new(None),
177 notify_waker,
178 waiting_operations: AtomicUsize::new(0),
179 operations_complete: Condvar::new(),
180 #[cfg(debug_assertions)]
181 id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
182 })
183 }
184
select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()>185 pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
186 events.clear();
187
188 let mut fds = self.fds.lock().unwrap();
189
190 // Keep track of fds that receive POLLHUP or POLLERR (i.e. won't receive further
191 // events) and internally deregister them before they are externally deregister'd. See
192 // IoSourceState below to track how the external deregister call will be handled
193 // when this state occurs.
194 let mut closed_raw_fds = Vec::new();
195
196 loop {
197 // Complete all current operations.
198 loop {
199 if self.waiting_operations.load(Ordering::SeqCst) == 0 {
200 break;
201 }
202
203 fds = self.operations_complete.wait(fds).unwrap();
204 }
205
206 // Perform the poll.
207 trace!("Polling on {:?}", &fds);
208 let num_events = poll(&mut fds.poll_fds, timeout)?;
209 trace!("Poll finished: {:?}", &fds);
210
211 if num_events == 0 {
212 return Ok(());
213 }
214
215 let waker_events = fds.poll_fds[0].0.revents;
216 let notified = waker_events != 0;
217 let mut num_fd_events = if notified { num_events - 1 } else { num_events };
218
219 let pending_wake_token = self.pending_wake_token.lock().unwrap().take();
220
221 if notified {
222 self.notify_waker.ack_and_reset();
223 if pending_wake_token.is_some() {
224 num_fd_events += 1;
225 }
226 }
227
228 // We now check whether this poll was performed with descriptors which were pending
229 // for removal and filter out any matching.
230 let mut pending_removal_guard = self.pending_removal.lock().unwrap();
231 let mut pending_removal = std::mem::replace(pending_removal_guard.as_mut(), Vec::new());
232 drop(pending_removal_guard);
233
234 // Store the events if there were any.
235 if num_fd_events > 0 {
236 let fds = &mut *fds;
237
238 events.reserve(num_fd_events);
239
240 // Add synthetic events we picked up from calls to wake()
241 if let Some(pending_wake_token) = pending_wake_token {
242 events.push(Event {
243 token: pending_wake_token,
244 events: waker_events,
245 });
246 }
247
248 for fd_data in fds.fd_data.values_mut() {
249 let PollFd(poll_fd) = &mut fds.poll_fds[fd_data.poll_fds_index];
250
251 if pending_removal.contains(&poll_fd.fd) {
252 // Fd was removed while poll was running
253 continue;
254 }
255
256 if poll_fd.revents != 0 {
257 // Store event
258 events.push(Event {
259 token: fd_data.token,
260 events: poll_fd.revents,
261 });
262
263 if poll_fd.revents & (libc::POLLHUP | libc::POLLERR) != 0 {
264 pending_removal.push(poll_fd.fd);
265 closed_raw_fds.push(poll_fd.fd);
266 }
267
268 // Remove the interest which just got triggered the IoSourceState's do_io
269 // wrapper used with this selector will add back the interest using
270 // reregister.
271 poll_fd.events &= !poll_fd.revents;
272
273 // Minor optimization to potentially avoid looping n times where n is the
274 // number of input fds (i.e. we might loop between m and n times where m is
275 // the number of fds with revents != 0).
276 if events.len() == num_fd_events {
277 break;
278 }
279 }
280 }
281
282 break; // No more polling.
283 }
284
285 // If we didn't break above it means we got woken up internally (for example for adding an fd), so we poll again.
286 }
287
288 drop(fds);
289 let _ = self.deregister_all(&closed_raw_fds);
290
291 Ok(())
292 }
293
register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()>294 pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
295 self.register_internal(fd, token, interests).map(|_| ())
296 }
297
register_internal( &self, fd: RawFd, token: Token, interests: Interest, ) -> io::Result<Arc<RegistrationRecord>>298 pub fn register_internal(
299 &self,
300 fd: RawFd,
301 token: Token,
302 interests: Interest,
303 ) -> io::Result<Arc<RegistrationRecord>> {
304 #[cfg(debug_assertions)]
305 if fd == self.notify_waker.as_raw_fd() {
306 return Err(io::Error::from(io::ErrorKind::InvalidInput));
307 }
308
309 // We must handle the unlikely case that the following order of operations happens:
310 //
311 // register(1 as RawFd)
312 // deregister(1 as RawFd)
313 // register(1 as RawFd)
314 // <poll happens>
315 //
316 // Fd's pending removal only get cleared when poll has been run. It is possible that
317 // between registering and deregistering and then _again_ registering the file descriptor
318 // poll never gets called, thus the fd stays stuck in the pending removal list.
319 //
320 // To avoid this scenario we remove an fd from pending removals when registering it.
321 let mut pending_removal = self.pending_removal.lock().unwrap();
322 if let Some(idx) = pending_removal.iter().position(|&pending| pending == fd) {
323 pending_removal.swap_remove(idx);
324 }
325 drop(pending_removal);
326
327 self.modify_fds(|fds| {
328 if fds.fd_data.contains_key(&fd) {
329 return Err(io::Error::new(
330 io::ErrorKind::AlreadyExists,
331 "I/O source already registered this `Registry` \
332 (an old file descriptor might have been closed without deregistration)",
333 ));
334 }
335
336 let poll_fds_index = fds.poll_fds.len();
337 let record = Arc::new(RegistrationRecord::new());
338 fds.fd_data.insert(
339 fd,
340 FdData {
341 poll_fds_index,
342 token,
343 shared_record: record.clone(),
344 },
345 );
346
347 fds.poll_fds.push(PollFd(libc::pollfd {
348 fd,
349 events: interests_to_poll(interests),
350 revents: 0,
351 }));
352
353 Ok(record)
354 })
355 }
356
reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()>357 pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
358 self.modify_fds(|fds| {
359 let data = fds.fd_data.get_mut(&fd).ok_or(io::ErrorKind::NotFound)?;
360 data.token = token;
361 let poll_fds_index = data.poll_fds_index;
362 fds.poll_fds[poll_fds_index].0.events = interests_to_poll(interests);
363
364 Ok(())
365 })
366 }
367
deregister(&self, fd: RawFd) -> io::Result<()>368 pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
369 self.deregister_all(&[fd])
370 .map_err(|_| io::ErrorKind::NotFound)?;
371 Ok(())
372 }
373
374 /// Perform a modification on `fds`, interrupting the current caller of `wait` if it's running.
modify_fds<T>(&self, f: impl FnOnce(&mut Fds) -> T) -> T375 fn modify_fds<T>(&self, f: impl FnOnce(&mut Fds) -> T) -> T {
376 self.waiting_operations.fetch_add(1, Ordering::SeqCst);
377
378 // Wake up the current caller of `wait` if there is one.
379 let sent_notification = self.notify_waker.wake().is_ok();
380
381 let mut fds = self.fds.lock().unwrap();
382
383 // If there was no caller of `wait` our notification was not removed from the pipe.
384 if sent_notification {
385 self.notify_waker.ack_and_reset();
386 }
387
388 let res = f(&mut *fds);
389
390 if self.waiting_operations.fetch_sub(1, Ordering::SeqCst) == 1 {
391 self.operations_complete.notify_one();
392 }
393
394 res
395 }
396
397 /// Special optimized version of [Self::deregister] which handles multiple removals
398 /// at once. Ok result if all removals were performed, Err if any entries
399 /// were not found.
deregister_all(&self, targets: &[RawFd]) -> Result<(), ()>400 fn deregister_all(&self, targets: &[RawFd]) -> Result<(), ()> {
401 if targets.is_empty() {
402 return Ok(());
403 }
404
405 let mut pending_removal = self.pending_removal.lock().unwrap();
406 pending_removal.extend(targets);
407 drop(pending_removal);
408
409 self.modify_fds(|fds| {
410 let mut all_successful = true;
411
412 for target in targets {
413 match fds.fd_data.remove(target).ok_or(()) {
414 Ok(data) => {
415 data.shared_record.mark_unregistered();
416 fds.poll_fds.swap_remove(data.poll_fds_index);
417 if let Some(swapped_pollfd) = fds.poll_fds.get(data.poll_fds_index) {
418 fds.fd_data
419 .get_mut(&swapped_pollfd.0.fd)
420 .unwrap()
421 .poll_fds_index = data.poll_fds_index;
422 }
423 }
424 Err(_) => all_successful = false,
425 }
426 }
427
428 if all_successful {
429 Ok(())
430 } else {
431 Err(())
432 }
433 })
434 }
435
wake(&self, token: Token) -> io::Result<()>436 pub fn wake(&self, token: Token) -> io::Result<()> {
437 self.pending_wake_token.lock().unwrap().replace(token);
438 self.notify_waker.wake()
439 }
440 }
441
442 /// Shared record between IoSourceState and SelectorState that allows us to internally
443 /// deregister partially or fully closed fds (i.e. when we get POLLHUP or PULLERR) without
444 /// confusing IoSourceState and trying to deregister twice. This isn't strictly
445 /// required as technically deregister is idempotent but it is confusing
446 /// when trying to debug behaviour as we get imbalanced calls to register/deregister and
447 /// superfluous NotFound errors.
448 #[derive(Debug)]
449 pub(crate) struct RegistrationRecord {
450 is_unregistered: AtomicBool,
451 }
452
453 impl RegistrationRecord {
new() -> Self454 pub fn new() -> Self {
455 Self {
456 is_unregistered: AtomicBool::new(false),
457 }
458 }
459
mark_unregistered(&self)460 pub fn mark_unregistered(&self) {
461 self.is_unregistered.store(true, Ordering::Relaxed);
462 }
463
464 #[allow(dead_code)]
is_registered(&self) -> bool465 pub fn is_registered(&self) -> bool {
466 !self.is_unregistered.load(Ordering::Relaxed)
467 }
468 }
469
470 #[cfg(target_os = "linux")]
471 const POLLRDHUP: libc::c_short = libc::POLLRDHUP;
472 #[cfg(not(target_os = "linux"))]
473 const POLLRDHUP: libc::c_short = 0;
474
475 const READ_EVENTS: libc::c_short = libc::POLLIN | POLLRDHUP;
476
477 const WRITE_EVENTS: libc::c_short = libc::POLLOUT;
478
479 const PRIORITY_EVENTS: libc::c_short = libc::POLLPRI;
480
481 /// Get the input poll events for the given event.
interests_to_poll(interest: Interest) -> libc::c_short482 fn interests_to_poll(interest: Interest) -> libc::c_short {
483 let mut kind = 0;
484
485 if interest.is_readable() {
486 kind |= READ_EVENTS;
487 }
488
489 if interest.is_writable() {
490 kind |= WRITE_EVENTS;
491 }
492
493 if interest.is_priority() {
494 kind |= PRIORITY_EVENTS;
495 }
496
497 kind
498 }
499
500 /// Helper function to call poll.
poll(fds: &mut [PollFd], timeout: Option<Duration>) -> io::Result<usize>501 fn poll(fds: &mut [PollFd], timeout: Option<Duration>) -> io::Result<usize> {
502 loop {
503 // A bug in kernels < 2.6.37 makes timeouts larger than LONG_MAX / CONFIG_HZ
504 // (approx. 30 minutes with CONFIG_HZ=1200) effectively infinite on 32 bits
505 // architectures. The magic number is the same constant used by libuv.
506 #[cfg(target_pointer_width = "32")]
507 const MAX_SAFE_TIMEOUT: u128 = 1789569;
508 #[cfg(not(target_pointer_width = "32"))]
509 const MAX_SAFE_TIMEOUT: u128 = libc::c_int::MAX as u128;
510
511 let timeout = timeout
512 .map(|to| {
513 // `Duration::as_millis` truncates, so round up. This avoids
514 // turning sub-millisecond timeouts into a zero timeout, unless
515 // the caller explicitly requests that by specifying a zero
516 // timeout.
517 let to_ms = to
518 .checked_add(Duration::from_nanos(999_999))
519 .unwrap_or(to)
520 .as_millis();
521 cmp::min(MAX_SAFE_TIMEOUT, to_ms) as libc::c_int
522 })
523 .unwrap_or(-1);
524
525 let res = syscall!(poll(
526 fds.as_mut_ptr() as *mut libc::pollfd,
527 fds.len() as libc::nfds_t,
528 timeout,
529 ));
530
531 match res {
532 Ok(num_events) => break Ok(num_events as usize),
533 // poll returns EAGAIN if we can retry it.
534 Err(e) if e.raw_os_error() == Some(libc::EAGAIN) => continue,
535 Err(e) => return Err(e),
536 }
537 }
538 }
539
540 #[derive(Debug, Clone)]
541 pub struct Event {
542 token: Token,
543 events: libc::c_short,
544 }
545
546 pub type Events = Vec<Event>;
547
548 pub mod event {
549 use std::fmt;
550
551 use crate::sys::Event;
552 use crate::Token;
553
554 use super::POLLRDHUP;
555
token(event: &Event) -> Token556 pub fn token(event: &Event) -> Token {
557 event.token
558 }
559
is_readable(event: &Event) -> bool560 pub fn is_readable(event: &Event) -> bool {
561 (event.events & libc::POLLIN) != 0 || (event.events & libc::POLLPRI) != 0
562 }
563
is_writable(event: &Event) -> bool564 pub fn is_writable(event: &Event) -> bool {
565 (event.events & libc::POLLOUT) != 0
566 }
567
is_error(event: &Event) -> bool568 pub fn is_error(event: &Event) -> bool {
569 (event.events & libc::POLLERR) != 0
570 }
571
is_read_closed(event: &Event) -> bool572 pub fn is_read_closed(event: &Event) -> bool {
573 // Both halves of the socket have closed
574 (event.events & libc::POLLHUP) != 0
575 // Socket has received FIN or called shutdown(SHUT_RD)
576 || (event.events & POLLRDHUP) != 0
577 }
578
is_write_closed(event: &Event) -> bool579 pub fn is_write_closed(event: &Event) -> bool {
580 // Both halves of the socket have closed
581 (event.events & libc::POLLHUP) != 0
582 // Unix pipe write end has closed
583 || ((event.events & libc::POLLOUT) != 0 && (event.events & libc::POLLERR) != 0)
584 // The other side (read end) of a Unix pipe has closed.
585 || (event.events == libc::POLLERR)
586 }
587
is_priority(event: &Event) -> bool588 pub fn is_priority(event: &Event) -> bool {
589 (event.events & libc::POLLPRI) != 0
590 }
591
is_aio(_: &Event) -> bool592 pub fn is_aio(_: &Event) -> bool {
593 // Not supported in the kernel, only in libc.
594 false
595 }
596
is_lio(_: &Event) -> bool597 pub fn is_lio(_: &Event) -> bool {
598 // Not supported.
599 false
600 }
601
debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result602 pub fn debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result {
603 #[allow(clippy::trivially_copy_pass_by_ref)]
604 fn check_events(got: &libc::c_short, want: &libc::c_short) -> bool {
605 (*got & want) != 0
606 }
607 debug_detail!(
608 EventsDetails(libc::c_short),
609 check_events,
610 libc::POLLIN,
611 libc::POLLPRI,
612 libc::POLLOUT,
613 libc::POLLRDNORM,
614 libc::POLLRDBAND,
615 libc::POLLWRNORM,
616 libc::POLLWRBAND,
617 libc::POLLERR,
618 libc::POLLHUP,
619 );
620
621 f.debug_struct("poll_event")
622 .field("token", &event.token)
623 .field("events", &EventsDetails(event.events))
624 .finish()
625 }
626 }
627
628 #[derive(Debug)]
629 pub(crate) struct Waker {
630 selector: Selector,
631 token: Token,
632 }
633
634 impl Waker {
new(selector: &Selector, token: Token) -> io::Result<Waker>635 pub(crate) fn new(selector: &Selector, token: Token) -> io::Result<Waker> {
636 Ok(Waker {
637 selector: selector.try_clone()?,
638 token,
639 })
640 }
641
wake(&self) -> io::Result<()>642 pub(crate) fn wake(&self) -> io::Result<()> {
643 self.selector.wake(self.token)
644 }
645 }
646
647 cfg_io_source! {
648 use crate::Registry;
649
650 struct InternalState {
651 selector: Selector,
652 token: Token,
653 interests: Interest,
654 fd: RawFd,
655 shared_record: Arc<RegistrationRecord>,
656 }
657
658 impl Drop for InternalState {
659 fn drop(&mut self) {
660 if self.shared_record.is_registered() {
661 let _ = self.selector.deregister(self.fd);
662 }
663 }
664 }
665
666 pub(crate) struct IoSourceState {
667 inner: Option<Box<InternalState>>,
668 }
669
670 impl IoSourceState {
671 pub fn new() -> IoSourceState {
672 IoSourceState { inner: None }
673 }
674
675 pub fn do_io<T, F, R>(&self, f: F, io: &T) -> io::Result<R>
676 where
677 F: FnOnce(&T) -> io::Result<R>,
678 {
679 let result = f(io);
680
681 if let Err(err) = &result {
682 if err.kind() == io::ErrorKind::WouldBlock {
683 self.inner.as_ref().map_or(Ok(()), |state| {
684 state
685 .selector
686 .reregister(state.fd, state.token, state.interests)
687 })?;
688 }
689 }
690
691 result
692 }
693
694 pub fn register(
695 &mut self,
696 registry: &Registry,
697 token: Token,
698 interests: Interest,
699 fd: RawFd,
700 ) -> io::Result<()> {
701 if self.inner.is_some() {
702 Err(io::ErrorKind::AlreadyExists.into())
703 } else {
704 let selector = registry.selector().try_clone()?;
705
706 selector.register_internal(fd, token, interests).map(move |shared_record| {
707 let state = InternalState {
708 selector,
709 token,
710 interests,
711 fd,
712 shared_record,
713 };
714
715 self.inner = Some(Box::new(state));
716 })
717 }
718 }
719
720 pub fn reregister(
721 &mut self,
722 registry: &Registry,
723 token: Token,
724 interests: Interest,
725 fd: RawFd,
726 ) -> io::Result<()> {
727 match self.inner.as_mut() {
728 Some(state) => registry
729 .selector()
730 .reregister(fd, token, interests)
731 .map(|()| {
732 state.token = token;
733 state.interests = interests;
734 }),
735 None => Err(io::ErrorKind::NotFound.into()),
736 }
737 }
738
739 pub fn deregister(&mut self, registry: &Registry, fd: RawFd) -> io::Result<()> {
740 if let Some(state) = self.inner.take() {
741 // Marking unregistered will short circuit the drop behaviour of calling
742 // deregister so the call to deregister below is strictly required.
743 state.shared_record.mark_unregistered();
744 }
745
746 registry.selector().deregister(fd)
747 }
748 }
749 }
750