1 use super::afd::{self, Afd, AfdPollInfo};
2 use super::io_status_block::IoStatusBlock;
3 use super::Event;
4 use crate::sys::Events;
5 
6 cfg_net! {
7     use crate::sys::event::{
8         ERROR_FLAGS, READABLE_FLAGS, READ_CLOSED_FLAGS, WRITABLE_FLAGS, WRITE_CLOSED_FLAGS,
9     };
10     use crate::Interest;
11 }
12 
13 use super::iocp::{CompletionPort, CompletionStatus};
14 use std::collections::VecDeque;
15 use std::ffi::c_void;
16 use std::io;
17 use std::marker::PhantomPinned;
18 use std::os::windows::io::RawSocket;
19 use std::pin::Pin;
20 #[cfg(debug_assertions)]
21 use std::sync::atomic::AtomicUsize;
22 use std::sync::atomic::{AtomicBool, Ordering};
23 use std::sync::{Arc, Mutex};
24 use std::time::Duration;
25 
26 use windows_sys::Win32::Foundation::{
27     ERROR_INVALID_HANDLE, ERROR_IO_PENDING, HANDLE, STATUS_CANCELLED, WAIT_TIMEOUT,
28 };
29 use windows_sys::Win32::System::IO::OVERLAPPED;
30 
31 #[derive(Debug)]
32 struct AfdGroup {
33     #[cfg_attr(not(feature = "net"), allow(dead_code))]
34     cp: Arc<CompletionPort>,
35     afd_group: Mutex<Vec<Arc<Afd>>>,
36 }
37 
38 impl AfdGroup {
new(cp: Arc<CompletionPort>) -> AfdGroup39     pub fn new(cp: Arc<CompletionPort>) -> AfdGroup {
40         AfdGroup {
41             afd_group: Mutex::new(Vec::new()),
42             cp,
43         }
44     }
45 
release_unused_afd(&self)46     pub fn release_unused_afd(&self) {
47         let mut afd_group = self.afd_group.lock().unwrap();
48         afd_group.retain(|g| Arc::strong_count(g) > 1);
49     }
50 }
51 
52 cfg_io_source! {
53     const POLL_GROUP_MAX_GROUP_SIZE: usize = 32;
54 
55     impl AfdGroup {
56         pub fn acquire(&self) -> io::Result<Arc<Afd>> {
57             let mut afd_group = self.afd_group.lock().unwrap();
58             if afd_group.len() == 0 {
59                 self._alloc_afd_group(&mut afd_group)?;
60             } else {
61                 // + 1 reference in Vec
62                 if Arc::strong_count(afd_group.last().unwrap()) > POLL_GROUP_MAX_GROUP_SIZE  {
63                     self._alloc_afd_group(&mut afd_group)?;
64                 }
65             }
66 
67             match afd_group.last() {
68                 Some(arc) => Ok(arc.clone()),
69                 None => unreachable!(
70                     "Cannot acquire afd, {:#?}, afd_group: {:#?}",
71                     self, afd_group
72                 ),
73             }
74         }
75 
76         fn _alloc_afd_group(&self, afd_group: &mut Vec<Arc<Afd>>) -> io::Result<()> {
77             let afd = Afd::new(&self.cp)?;
78             let arc = Arc::new(afd);
79             afd_group.push(arc);
80             Ok(())
81         }
82     }
83 }
84 
85 #[derive(Debug)]
86 enum SockPollStatus {
87     Idle,
88     Pending,
89     Cancelled,
90 }
91 
92 #[derive(Debug)]
93 pub struct SockState {
94     iosb: IoStatusBlock,
95     poll_info: AfdPollInfo,
96     afd: Arc<Afd>,
97 
98     base_socket: RawSocket,
99 
100     user_evts: u32,
101     pending_evts: u32,
102 
103     user_data: u64,
104 
105     poll_status: SockPollStatus,
106     delete_pending: bool,
107 
108     // last raw os error
109     error: Option<i32>,
110 
111     _pinned: PhantomPinned,
112 }
113 
114 impl SockState {
update(&mut self, self_arc: &Pin<Arc<Mutex<SockState>>>) -> io::Result<()>115     fn update(&mut self, self_arc: &Pin<Arc<Mutex<SockState>>>) -> io::Result<()> {
116         assert!(!self.delete_pending);
117 
118         // make sure to reset previous error before a new update
119         self.error = None;
120 
121         if let SockPollStatus::Pending = self.poll_status {
122             if (self.user_evts & afd::KNOWN_EVENTS & !self.pending_evts) == 0 {
123                 /* All the events the user is interested in are already being monitored by
124                  * the pending poll operation. It might spuriously complete because of an
125                  * event that we're no longer interested in; when that happens we'll submit
126                  * a new poll operation with the updated event mask. */
127             } else {
128                 /* A poll operation is already pending, but it's not monitoring for all the
129                  * events that the user is interested in. Therefore, cancel the pending
130                  * poll operation; when we receive it's completion package, a new poll
131                  * operation will be submitted with the correct event mask. */
132                 if let Err(e) = self.cancel() {
133                     self.error = e.raw_os_error();
134                     return Err(e);
135                 }
136                 return Ok(());
137             }
138         } else if let SockPollStatus::Cancelled = self.poll_status {
139             /* The poll operation has already been cancelled, we're still waiting for
140              * it to return. For now, there's nothing that needs to be done. */
141         } else if let SockPollStatus::Idle = self.poll_status {
142             /* No poll operation is pending; start one. */
143             self.poll_info.exclusive = 0;
144             self.poll_info.number_of_handles = 1;
145             self.poll_info.timeout = i64::MAX;
146             self.poll_info.handles[0].handle = self.base_socket as HANDLE;
147             self.poll_info.handles[0].status = 0;
148             self.poll_info.handles[0].events = self.user_evts | afd::POLL_LOCAL_CLOSE;
149 
150             // Increase the ref count as the memory will be used by the kernel.
151             let overlapped_ptr = into_overlapped(self_arc.clone());
152 
153             let result = unsafe {
154                 self.afd
155                     .poll(&mut self.poll_info, &mut *self.iosb, overlapped_ptr)
156             };
157             if let Err(e) = result {
158                 let code = e.raw_os_error().unwrap();
159                 if code == ERROR_IO_PENDING as i32 {
160                     /* Overlapped poll operation in progress; this is expected. */
161                 } else {
162                     // Since the operation failed it means the kernel won't be
163                     // using the memory any more.
164                     drop(from_overlapped(overlapped_ptr as *mut _));
165                     if code == ERROR_INVALID_HANDLE as i32 {
166                         /* Socket closed; it'll be dropped. */
167                         self.mark_delete();
168                         return Ok(());
169                     } else {
170                         self.error = e.raw_os_error();
171                         return Err(e);
172                     }
173                 }
174             }
175 
176             self.poll_status = SockPollStatus::Pending;
177             self.pending_evts = self.user_evts;
178         } else {
179             unreachable!("Invalid poll status during update, {:#?}", self)
180         }
181 
182         Ok(())
183     }
184 
cancel(&mut self) -> io::Result<()>185     fn cancel(&mut self) -> io::Result<()> {
186         match self.poll_status {
187             SockPollStatus::Pending => {}
188             _ => unreachable!("Invalid poll status during cancel, {:#?}", self),
189         };
190         unsafe {
191             self.afd.cancel(&mut *self.iosb)?;
192         }
193         self.poll_status = SockPollStatus::Cancelled;
194         self.pending_evts = 0;
195         Ok(())
196     }
197 
198     // This is the function called from the overlapped using as Arc<Mutex<SockState>>. Watch out for reference counting.
feed_event(&mut self) -> Option<Event>199     fn feed_event(&mut self) -> Option<Event> {
200         self.poll_status = SockPollStatus::Idle;
201         self.pending_evts = 0;
202 
203         let mut afd_events = 0;
204         // We use the status info in IO_STATUS_BLOCK to determine the socket poll status. It is unsafe to use a pointer of IO_STATUS_BLOCK.
205         unsafe {
206             if self.delete_pending {
207                 return None;
208             } else if self.iosb.Anonymous.Status == STATUS_CANCELLED {
209                 /* The poll request was cancelled by CancelIoEx. */
210             } else if self.iosb.Anonymous.Status < 0 {
211                 /* The overlapped request itself failed in an unexpected way. */
212                 afd_events = afd::POLL_CONNECT_FAIL;
213             } else if self.poll_info.number_of_handles < 1 {
214                 /* This poll operation succeeded but didn't report any socket events. */
215             } else if self.poll_info.handles[0].events & afd::POLL_LOCAL_CLOSE != 0 {
216                 /* The poll operation reported that the socket was closed. */
217                 self.mark_delete();
218                 return None;
219             } else {
220                 afd_events = self.poll_info.handles[0].events;
221             }
222         }
223 
224         afd_events &= self.user_evts;
225 
226         if afd_events == 0 {
227             return None;
228         }
229 
230         // In mio, we have to simulate Edge-triggered behavior to match API usage.
231         // The strategy here is to intercept all read/write from user that could cause WouldBlock usage,
232         // then reregister the socket to reset the interests.
233         self.user_evts &= !afd_events;
234 
235         Some(Event {
236             data: self.user_data,
237             flags: afd_events,
238         })
239     }
240 
is_pending_deletion(&self) -> bool241     pub fn is_pending_deletion(&self) -> bool {
242         self.delete_pending
243     }
244 
mark_delete(&mut self)245     pub fn mark_delete(&mut self) {
246         if !self.delete_pending {
247             if let SockPollStatus::Pending = self.poll_status {
248                 drop(self.cancel());
249             }
250 
251             self.delete_pending = true;
252         }
253     }
254 
has_error(&self) -> bool255     fn has_error(&self) -> bool {
256         self.error.is_some()
257     }
258 }
259 
260 cfg_io_source! {
261     impl SockState {
262         fn new(raw_socket: RawSocket, afd: Arc<Afd>) -> io::Result<SockState> {
263             Ok(SockState {
264                 iosb: IoStatusBlock::zeroed(),
265                 poll_info: AfdPollInfo::zeroed(),
266                 afd,
267                 base_socket: get_base_socket(raw_socket)?,
268                 user_evts: 0,
269                 pending_evts: 0,
270                 user_data: 0,
271                 poll_status: SockPollStatus::Idle,
272                 delete_pending: false,
273                 error: None,
274                 _pinned: PhantomPinned,
275             })
276         }
277 
278         /// True if need to be added on update queue, false otherwise.
279         fn set_event(&mut self, ev: Event) -> bool {
280             /* afd::POLL_CONNECT_FAIL and afd::POLL_ABORT are always reported, even when not requested by the caller. */
281             let events = ev.flags | afd::POLL_CONNECT_FAIL | afd::POLL_ABORT;
282 
283             self.user_evts = events;
284             self.user_data = ev.data;
285 
286             (events & !self.pending_evts) != 0
287         }
288     }
289 }
290 
291 impl Drop for SockState {
drop(&mut self)292     fn drop(&mut self) {
293         self.mark_delete();
294     }
295 }
296 
297 /// Converts the pointer to a `SockState` into a raw pointer.
298 /// To revert see `from_overlapped`.
into_overlapped(sock_state: Pin<Arc<Mutex<SockState>>>) -> *mut c_void299 fn into_overlapped(sock_state: Pin<Arc<Mutex<SockState>>>) -> *mut c_void {
300     let overlapped_ptr: *const Mutex<SockState> =
301         unsafe { Arc::into_raw(Pin::into_inner_unchecked(sock_state)) };
302     overlapped_ptr as *mut _
303 }
304 
305 /// Convert a raw overlapped pointer into a reference to `SockState`.
306 /// Reverts `into_overlapped`.
from_overlapped(ptr: *mut OVERLAPPED) -> Pin<Arc<Mutex<SockState>>>307 fn from_overlapped(ptr: *mut OVERLAPPED) -> Pin<Arc<Mutex<SockState>>> {
308     let sock_ptr: *const Mutex<SockState> = ptr as *const _;
309     unsafe { Pin::new_unchecked(Arc::from_raw(sock_ptr)) }
310 }
311 
312 /// Each Selector has a globally unique(ish) ID associated with it. This ID
313 /// gets tracked by `TcpStream`, `TcpListener`, etc... when they are first
314 /// registered with the `Selector`. If a type that is previously associated with
315 /// a `Selector` attempts to register itself with a different `Selector`, the
316 /// operation will return with an error. This matches windows behavior.
317 #[cfg(debug_assertions)]
318 static NEXT_ID: AtomicUsize = AtomicUsize::new(0);
319 
320 /// Windows implementation of `sys::Selector`
321 ///
322 /// Edge-triggered event notification is simulated by resetting internal event flag of each socket state `SockState`
323 /// and setting all events back by intercepting all requests that could cause `io::ErrorKind::WouldBlock` happening.
324 ///
325 /// This selector is currently only support socket due to `Afd` driver is winsock2 specific.
326 #[derive(Debug)]
327 pub struct Selector {
328     #[cfg(debug_assertions)]
329     id: usize,
330     pub(super) inner: Arc<SelectorInner>,
331 }
332 
333 impl Selector {
new() -> io::Result<Selector>334     pub fn new() -> io::Result<Selector> {
335         SelectorInner::new().map(|inner| {
336             #[cfg(debug_assertions)]
337             let id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1;
338             Selector {
339                 #[cfg(debug_assertions)]
340                 id,
341                 inner: Arc::new(inner),
342             }
343         })
344     }
345 
try_clone(&self) -> io::Result<Selector>346     pub fn try_clone(&self) -> io::Result<Selector> {
347         Ok(Selector {
348             #[cfg(debug_assertions)]
349             id: self.id,
350             inner: Arc::clone(&self.inner),
351         })
352     }
353 
354     /// # Safety
355     ///
356     /// This requires a mutable reference to self because only a single thread
357     /// can poll IOCP at a time.
select(&mut self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()>358     pub fn select(&mut self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
359         self.inner.select(events, timeout)
360     }
361 
clone_port(&self) -> Arc<CompletionPort>362     pub(super) fn clone_port(&self) -> Arc<CompletionPort> {
363         self.inner.cp.clone()
364     }
365 
366     #[cfg(feature = "os-ext")]
same_port(&self, other: &Arc<CompletionPort>) -> bool367     pub(super) fn same_port(&self, other: &Arc<CompletionPort>) -> bool {
368         Arc::ptr_eq(&self.inner.cp, other)
369     }
370 }
371 
372 cfg_io_source! {
373     use super::InternalState;
374     use crate::Token;
375 
376     impl Selector {
377         pub(super) fn register(
378             &self,
379             socket: RawSocket,
380             token: Token,
381             interests: Interest,
382         ) -> io::Result<InternalState> {
383             SelectorInner::register(&self.inner, socket, token, interests)
384         }
385 
386         pub(super) fn reregister(
387             &self,
388             state: Pin<Arc<Mutex<SockState>>>,
389             token: Token,
390             interests: Interest,
391         ) -> io::Result<()> {
392             self.inner.reregister(state, token, interests)
393         }
394 
395         #[cfg(debug_assertions)]
396         pub fn id(&self) -> usize {
397             self.id
398         }
399     }
400 }
401 
402 #[derive(Debug)]
403 pub struct SelectorInner {
404     pub(super) cp: Arc<CompletionPort>,
405     update_queue: Mutex<VecDeque<Pin<Arc<Mutex<SockState>>>>>,
406     afd_group: AfdGroup,
407     is_polling: AtomicBool,
408 }
409 
410 // We have ensured thread safety by introducing lock manually.
411 unsafe impl Sync for SelectorInner {}
412 
413 impl SelectorInner {
new() -> io::Result<SelectorInner>414     pub fn new() -> io::Result<SelectorInner> {
415         CompletionPort::new(0).map(|cp| {
416             let cp = Arc::new(cp);
417             let cp_afd = Arc::clone(&cp);
418 
419             SelectorInner {
420                 cp,
421                 update_queue: Mutex::new(VecDeque::new()),
422                 afd_group: AfdGroup::new(cp_afd),
423                 is_polling: AtomicBool::new(false),
424             }
425         })
426     }
427 
428     /// # Safety
429     ///
430     /// May only be calling via `Selector::select`.
select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()>431     pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
432         events.clear();
433 
434         if timeout.is_none() {
435             loop {
436                 let len = self.select2(&mut events.statuses, &mut events.events, None)?;
437                 if len == 0 {
438                     continue;
439                 }
440                 break Ok(());
441             }
442         } else {
443             self.select2(&mut events.statuses, &mut events.events, timeout)?;
444             Ok(())
445         }
446     }
447 
select2( &self, statuses: &mut [CompletionStatus], events: &mut Vec<Event>, timeout: Option<Duration>, ) -> io::Result<usize>448     pub fn select2(
449         &self,
450         statuses: &mut [CompletionStatus],
451         events: &mut Vec<Event>,
452         timeout: Option<Duration>,
453     ) -> io::Result<usize> {
454         assert!(!self.is_polling.swap(true, Ordering::AcqRel));
455 
456         unsafe { self.update_sockets_events() }?;
457 
458         let result = self.cp.get_many(statuses, timeout);
459 
460         self.is_polling.store(false, Ordering::Relaxed);
461 
462         match result {
463             Ok(iocp_events) => Ok(unsafe { self.feed_events(events, iocp_events) }),
464             Err(ref e) if e.raw_os_error() == Some(WAIT_TIMEOUT as i32) => Ok(0),
465             Err(e) => Err(e),
466         }
467     }
468 
update_sockets_events(&self) -> io::Result<()>469     unsafe fn update_sockets_events(&self) -> io::Result<()> {
470         let mut update_queue = self.update_queue.lock().unwrap();
471         for sock in update_queue.iter_mut() {
472             let mut sock_internal = sock.lock().unwrap();
473             if !sock_internal.is_pending_deletion() {
474                 sock_internal.update(sock)?;
475             }
476         }
477 
478         // remove all sock which do not have error, they have afd op pending
479         update_queue.retain(|sock| sock.lock().unwrap().has_error());
480 
481         self.afd_group.release_unused_afd();
482         Ok(())
483     }
484 
485     // It returns processed count of iocp_events rather than the events itself.
feed_events( &self, events: &mut Vec<Event>, iocp_events: &[CompletionStatus], ) -> usize486     unsafe fn feed_events(
487         &self,
488         events: &mut Vec<Event>,
489         iocp_events: &[CompletionStatus],
490     ) -> usize {
491         let mut n = 0;
492         let mut update_queue = self.update_queue.lock().unwrap();
493         for iocp_event in iocp_events.iter() {
494             if iocp_event.overlapped().is_null() {
495                 events.push(Event::from_completion_status(iocp_event));
496                 n += 1;
497                 continue;
498             } else if iocp_event.token() % 2 == 1 {
499                 // Handle is a named pipe. This could be extended to be any non-AFD event.
500                 let callback = (*(iocp_event.overlapped() as *mut super::Overlapped)).callback;
501 
502                 let len = events.len();
503                 callback(iocp_event.entry(), Some(events));
504                 n += events.len() - len;
505                 continue;
506             }
507 
508             let sock_state = from_overlapped(iocp_event.overlapped());
509             let mut sock_guard = sock_state.lock().unwrap();
510             if let Some(e) = sock_guard.feed_event() {
511                 events.push(e);
512                 n += 1;
513             }
514 
515             if !sock_guard.is_pending_deletion() {
516                 update_queue.push_back(sock_state.clone());
517             }
518         }
519         self.afd_group.release_unused_afd();
520         n
521     }
522 }
523 
524 cfg_io_source! {
525     use std::mem::size_of;
526     use std::ptr::null_mut;
527 
528     use windows_sys::Win32::Networking::WinSock::{
529         WSAGetLastError, WSAIoctl, SIO_BASE_HANDLE, SIO_BSP_HANDLE,
530         SIO_BSP_HANDLE_POLL, SIO_BSP_HANDLE_SELECT, SOCKET_ERROR,
531     };
532 
533 
534     impl SelectorInner {
535         fn register(
536             this: &Arc<Self>,
537             socket: RawSocket,
538             token: Token,
539             interests: Interest,
540         ) -> io::Result<InternalState> {
541             let flags = interests_to_afd_flags(interests);
542 
543             let sock = {
544                 let sock = this._alloc_sock_for_rawsocket(socket)?;
545                 let event = Event {
546                     flags,
547                     data: token.0 as u64,
548                 };
549                 sock.lock().unwrap().set_event(event);
550                 sock
551             };
552 
553             let state = InternalState {
554                 selector: this.clone(),
555                 token,
556                 interests,
557                 sock_state: sock.clone(),
558             };
559 
560             this.queue_state(sock);
561             unsafe { this.update_sockets_events_if_polling()? };
562 
563             Ok(state)
564         }
565 
566         // Directly accessed in `IoSourceState::do_io`.
567         pub(super) fn reregister(
568             &self,
569             state: Pin<Arc<Mutex<SockState>>>,
570             token: Token,
571             interests: Interest,
572         ) -> io::Result<()> {
573             {
574                 let event = Event {
575                     flags: interests_to_afd_flags(interests),
576                     data: token.0 as u64,
577                 };
578 
579                 state.lock().unwrap().set_event(event);
580             }
581 
582             // FIXME: a sock which has_error true should not be re-added to
583             // the update queue because it's already there.
584             self.queue_state(state);
585             unsafe { self.update_sockets_events_if_polling() }
586         }
587 
588         /// This function is called by register() and reregister() to start an
589         /// IOCTL_AFD_POLL operation corresponding to the registered events, but
590         /// only if necessary.
591         ///
592         /// Since it is not possible to modify or synchronously cancel an AFD_POLL
593         /// operation, and there can be only one active AFD_POLL operation per
594         /// (socket, completion port) pair at any time, it is expensive to change
595         /// a socket's event registration after it has been submitted to the kernel.
596         ///
597         /// Therefore, if no other threads are polling when interest in a socket
598         /// event is (re)registered, the socket is added to the 'update queue', but
599         /// the actual syscall to start the IOCTL_AFD_POLL operation is deferred
600         /// until just before the GetQueuedCompletionStatusEx() syscall is made.
601         ///
602         /// However, when another thread is already blocked on
603         /// GetQueuedCompletionStatusEx() we tell the kernel about the registered
604         /// socket event(s) immediately.
605         unsafe fn update_sockets_events_if_polling(&self) -> io::Result<()> {
606             if self.is_polling.load(Ordering::Acquire) {
607                 self.update_sockets_events()
608             } else {
609                 Ok(())
610             }
611         }
612 
613         fn queue_state(&self, sock_state: Pin<Arc<Mutex<SockState>>>) {
614             let mut update_queue = self.update_queue.lock().unwrap();
615             update_queue.push_back(sock_state);
616         }
617 
618         fn _alloc_sock_for_rawsocket(
619             &self,
620             raw_socket: RawSocket,
621         ) -> io::Result<Pin<Arc<Mutex<SockState>>>> {
622             let afd = self.afd_group.acquire()?;
623             Ok(Arc::pin(Mutex::new(SockState::new(raw_socket, afd)?)))
624         }
625     }
626 
627     fn try_get_base_socket(raw_socket: RawSocket, ioctl: u32) -> Result<RawSocket, i32> {
628         let mut base_socket: RawSocket = 0;
629         let mut bytes: u32 = 0;
630         unsafe {
631             if WSAIoctl(
632                 raw_socket as usize,
633                 ioctl,
634                 null_mut(),
635                 0,
636                 &mut base_socket as *mut _ as *mut c_void,
637                 size_of::<RawSocket>() as u32,
638                 &mut bytes,
639                 null_mut(),
640                 None,
641             ) != SOCKET_ERROR
642             {
643                 Ok(base_socket)
644             } else {
645                 Err(WSAGetLastError())
646             }
647         }
648     }
649 
650     fn get_base_socket(raw_socket: RawSocket) -> io::Result<RawSocket> {
651         let res = try_get_base_socket(raw_socket, SIO_BASE_HANDLE);
652         if let Ok(base_socket) = res {
653             return Ok(base_socket);
654         }
655 
656         // The `SIO_BASE_HANDLE` should not be intercepted by LSPs, therefore
657         // it should not fail as long as `raw_socket` is a valid socket. See
658         // https://docs.microsoft.com/en-us/windows/win32/winsock/winsock-ioctls.
659         // However, at least one known LSP deliberately breaks it, so we try
660         // some alternative IOCTLs, starting with the most appropriate one.
661         for &ioctl in &[
662             SIO_BSP_HANDLE_SELECT,
663             SIO_BSP_HANDLE_POLL,
664             SIO_BSP_HANDLE,
665         ] {
666             if let Ok(base_socket) = try_get_base_socket(raw_socket, ioctl) {
667                 // Since we know now that we're dealing with an LSP (otherwise
668                 // SIO_BASE_HANDLE would't have failed), only return any result
669                 // when it is different from the original `raw_socket`.
670                 if base_socket != raw_socket {
671                     return Ok(base_socket);
672                 }
673             }
674         }
675 
676         // If the alternative IOCTLs also failed, return the original error.
677         let os_error = res.unwrap_err();
678         let err = io::Error::from_raw_os_error(os_error);
679         Err(err)
680     }
681 }
682 
683 impl Drop for SelectorInner {
drop(&mut self)684     fn drop(&mut self) {
685         loop {
686             let events_num: usize;
687             let mut statuses: [CompletionStatus; 1024] = [CompletionStatus::zero(); 1024];
688 
689             let result = self
690                 .cp
691                 .get_many(&mut statuses, Some(std::time::Duration::from_millis(0)));
692             match result {
693                 Ok(iocp_events) => {
694                     events_num = iocp_events.iter().len();
695                     for iocp_event in iocp_events.iter() {
696                         if iocp_event.overlapped().is_null() {
697                             // Custom event
698                         } else if iocp_event.token() % 2 == 1 {
699                             // Named pipe, dispatch the event so it can release resources
700                             let callback = unsafe {
701                                 (*(iocp_event.overlapped() as *mut super::Overlapped)).callback
702                             };
703 
704                             callback(iocp_event.entry(), None);
705                         } else {
706                             // drain sock state to release memory of Arc reference
707                             let _sock_state = from_overlapped(iocp_event.overlapped());
708                         }
709                     }
710                 }
711 
712                 Err(_) => {
713                     break;
714                 }
715             }
716 
717             if events_num == 0 {
718                 // continue looping until all completion statuses have been drained
719                 break;
720             }
721         }
722 
723         self.afd_group.release_unused_afd();
724     }
725 }
726 
727 cfg_net! {
728     fn interests_to_afd_flags(interests: Interest) -> u32 {
729         let mut flags = 0;
730 
731         if interests.is_readable() {
732             flags |= READABLE_FLAGS | READ_CLOSED_FLAGS | ERROR_FLAGS;
733         }
734 
735         if interests.is_writable() {
736             flags |= WRITABLE_FLAGS | WRITE_CLOSED_FLAGS | ERROR_FLAGS;
737         }
738 
739         flags
740     }
741 }
742