1 use super::afd::{self, Afd, AfdPollInfo};
2 use super::io_status_block::IoStatusBlock;
3 use super::Event;
4 use crate::sys::Events;
5
6 cfg_net! {
7 use crate::sys::event::{
8 ERROR_FLAGS, READABLE_FLAGS, READ_CLOSED_FLAGS, WRITABLE_FLAGS, WRITE_CLOSED_FLAGS,
9 };
10 use crate::Interest;
11 }
12
13 use super::iocp::{CompletionPort, CompletionStatus};
14 use std::collections::VecDeque;
15 use std::ffi::c_void;
16 use std::io;
17 use std::marker::PhantomPinned;
18 use std::os::windows::io::RawSocket;
19 use std::pin::Pin;
20 #[cfg(debug_assertions)]
21 use std::sync::atomic::AtomicUsize;
22 use std::sync::atomic::{AtomicBool, Ordering};
23 use std::sync::{Arc, Mutex};
24 use std::time::Duration;
25
26 use windows_sys::Win32::Foundation::{
27 ERROR_INVALID_HANDLE, ERROR_IO_PENDING, HANDLE, STATUS_CANCELLED, WAIT_TIMEOUT,
28 };
29 use windows_sys::Win32::System::IO::OVERLAPPED;
30
31 #[derive(Debug)]
32 struct AfdGroup {
33 #[cfg_attr(not(feature = "net"), allow(dead_code))]
34 cp: Arc<CompletionPort>,
35 afd_group: Mutex<Vec<Arc<Afd>>>,
36 }
37
38 impl AfdGroup {
new(cp: Arc<CompletionPort>) -> AfdGroup39 pub fn new(cp: Arc<CompletionPort>) -> AfdGroup {
40 AfdGroup {
41 afd_group: Mutex::new(Vec::new()),
42 cp,
43 }
44 }
45
release_unused_afd(&self)46 pub fn release_unused_afd(&self) {
47 let mut afd_group = self.afd_group.lock().unwrap();
48 afd_group.retain(|g| Arc::strong_count(g) > 1);
49 }
50 }
51
52 cfg_io_source! {
53 const POLL_GROUP_MAX_GROUP_SIZE: usize = 32;
54
55 impl AfdGroup {
56 pub fn acquire(&self) -> io::Result<Arc<Afd>> {
57 let mut afd_group = self.afd_group.lock().unwrap();
58 if afd_group.len() == 0 {
59 self._alloc_afd_group(&mut afd_group)?;
60 } else {
61 // + 1 reference in Vec
62 if Arc::strong_count(afd_group.last().unwrap()) > POLL_GROUP_MAX_GROUP_SIZE {
63 self._alloc_afd_group(&mut afd_group)?;
64 }
65 }
66
67 match afd_group.last() {
68 Some(arc) => Ok(arc.clone()),
69 None => unreachable!(
70 "Cannot acquire afd, {:#?}, afd_group: {:#?}",
71 self, afd_group
72 ),
73 }
74 }
75
76 fn _alloc_afd_group(&self, afd_group: &mut Vec<Arc<Afd>>) -> io::Result<()> {
77 let afd = Afd::new(&self.cp)?;
78 let arc = Arc::new(afd);
79 afd_group.push(arc);
80 Ok(())
81 }
82 }
83 }
84
85 #[derive(Debug)]
86 enum SockPollStatus {
87 Idle,
88 Pending,
89 Cancelled,
90 }
91
92 #[derive(Debug)]
93 pub struct SockState {
94 iosb: IoStatusBlock,
95 poll_info: AfdPollInfo,
96 afd: Arc<Afd>,
97
98 base_socket: RawSocket,
99
100 user_evts: u32,
101 pending_evts: u32,
102
103 user_data: u64,
104
105 poll_status: SockPollStatus,
106 delete_pending: bool,
107
108 // last raw os error
109 error: Option<i32>,
110
111 _pinned: PhantomPinned,
112 }
113
114 impl SockState {
update(&mut self, self_arc: &Pin<Arc<Mutex<SockState>>>) -> io::Result<()>115 fn update(&mut self, self_arc: &Pin<Arc<Mutex<SockState>>>) -> io::Result<()> {
116 assert!(!self.delete_pending);
117
118 // make sure to reset previous error before a new update
119 self.error = None;
120
121 if let SockPollStatus::Pending = self.poll_status {
122 if (self.user_evts & afd::KNOWN_EVENTS & !self.pending_evts) == 0 {
123 /* All the events the user is interested in are already being monitored by
124 * the pending poll operation. It might spuriously complete because of an
125 * event that we're no longer interested in; when that happens we'll submit
126 * a new poll operation with the updated event mask. */
127 } else {
128 /* A poll operation is already pending, but it's not monitoring for all the
129 * events that the user is interested in. Therefore, cancel the pending
130 * poll operation; when we receive it's completion package, a new poll
131 * operation will be submitted with the correct event mask. */
132 if let Err(e) = self.cancel() {
133 self.error = e.raw_os_error();
134 return Err(e);
135 }
136 return Ok(());
137 }
138 } else if let SockPollStatus::Cancelled = self.poll_status {
139 /* The poll operation has already been cancelled, we're still waiting for
140 * it to return. For now, there's nothing that needs to be done. */
141 } else if let SockPollStatus::Idle = self.poll_status {
142 /* No poll operation is pending; start one. */
143 self.poll_info.exclusive = 0;
144 self.poll_info.number_of_handles = 1;
145 self.poll_info.timeout = i64::MAX;
146 self.poll_info.handles[0].handle = self.base_socket as HANDLE;
147 self.poll_info.handles[0].status = 0;
148 self.poll_info.handles[0].events = self.user_evts | afd::POLL_LOCAL_CLOSE;
149
150 // Increase the ref count as the memory will be used by the kernel.
151 let overlapped_ptr = into_overlapped(self_arc.clone());
152
153 let result = unsafe {
154 self.afd
155 .poll(&mut self.poll_info, &mut *self.iosb, overlapped_ptr)
156 };
157 if let Err(e) = result {
158 let code = e.raw_os_error().unwrap();
159 if code == ERROR_IO_PENDING as i32 {
160 /* Overlapped poll operation in progress; this is expected. */
161 } else {
162 // Since the operation failed it means the kernel won't be
163 // using the memory any more.
164 drop(from_overlapped(overlapped_ptr as *mut _));
165 if code == ERROR_INVALID_HANDLE as i32 {
166 /* Socket closed; it'll be dropped. */
167 self.mark_delete();
168 return Ok(());
169 } else {
170 self.error = e.raw_os_error();
171 return Err(e);
172 }
173 }
174 }
175
176 self.poll_status = SockPollStatus::Pending;
177 self.pending_evts = self.user_evts;
178 } else {
179 unreachable!("Invalid poll status during update, {:#?}", self)
180 }
181
182 Ok(())
183 }
184
cancel(&mut self) -> io::Result<()>185 fn cancel(&mut self) -> io::Result<()> {
186 match self.poll_status {
187 SockPollStatus::Pending => {}
188 _ => unreachable!("Invalid poll status during cancel, {:#?}", self),
189 };
190 unsafe {
191 self.afd.cancel(&mut *self.iosb)?;
192 }
193 self.poll_status = SockPollStatus::Cancelled;
194 self.pending_evts = 0;
195 Ok(())
196 }
197
198 // This is the function called from the overlapped using as Arc<Mutex<SockState>>. Watch out for reference counting.
feed_event(&mut self) -> Option<Event>199 fn feed_event(&mut self) -> Option<Event> {
200 self.poll_status = SockPollStatus::Idle;
201 self.pending_evts = 0;
202
203 let mut afd_events = 0;
204 // We use the status info in IO_STATUS_BLOCK to determine the socket poll status. It is unsafe to use a pointer of IO_STATUS_BLOCK.
205 unsafe {
206 if self.delete_pending {
207 return None;
208 } else if self.iosb.Anonymous.Status == STATUS_CANCELLED {
209 /* The poll request was cancelled by CancelIoEx. */
210 } else if self.iosb.Anonymous.Status < 0 {
211 /* The overlapped request itself failed in an unexpected way. */
212 afd_events = afd::POLL_CONNECT_FAIL;
213 } else if self.poll_info.number_of_handles < 1 {
214 /* This poll operation succeeded but didn't report any socket events. */
215 } else if self.poll_info.handles[0].events & afd::POLL_LOCAL_CLOSE != 0 {
216 /* The poll operation reported that the socket was closed. */
217 self.mark_delete();
218 return None;
219 } else {
220 afd_events = self.poll_info.handles[0].events;
221 }
222 }
223
224 afd_events &= self.user_evts;
225
226 if afd_events == 0 {
227 return None;
228 }
229
230 // In mio, we have to simulate Edge-triggered behavior to match API usage.
231 // The strategy here is to intercept all read/write from user that could cause WouldBlock usage,
232 // then reregister the socket to reset the interests.
233 self.user_evts &= !afd_events;
234
235 Some(Event {
236 data: self.user_data,
237 flags: afd_events,
238 })
239 }
240
is_pending_deletion(&self) -> bool241 pub fn is_pending_deletion(&self) -> bool {
242 self.delete_pending
243 }
244
mark_delete(&mut self)245 pub fn mark_delete(&mut self) {
246 if !self.delete_pending {
247 if let SockPollStatus::Pending = self.poll_status {
248 drop(self.cancel());
249 }
250
251 self.delete_pending = true;
252 }
253 }
254
has_error(&self) -> bool255 fn has_error(&self) -> bool {
256 self.error.is_some()
257 }
258 }
259
260 cfg_io_source! {
261 impl SockState {
262 fn new(raw_socket: RawSocket, afd: Arc<Afd>) -> io::Result<SockState> {
263 Ok(SockState {
264 iosb: IoStatusBlock::zeroed(),
265 poll_info: AfdPollInfo::zeroed(),
266 afd,
267 base_socket: get_base_socket(raw_socket)?,
268 user_evts: 0,
269 pending_evts: 0,
270 user_data: 0,
271 poll_status: SockPollStatus::Idle,
272 delete_pending: false,
273 error: None,
274 _pinned: PhantomPinned,
275 })
276 }
277
278 /// True if need to be added on update queue, false otherwise.
279 fn set_event(&mut self, ev: Event) -> bool {
280 /* afd::POLL_CONNECT_FAIL and afd::POLL_ABORT are always reported, even when not requested by the caller. */
281 let events = ev.flags | afd::POLL_CONNECT_FAIL | afd::POLL_ABORT;
282
283 self.user_evts = events;
284 self.user_data = ev.data;
285
286 (events & !self.pending_evts) != 0
287 }
288 }
289 }
290
291 impl Drop for SockState {
drop(&mut self)292 fn drop(&mut self) {
293 self.mark_delete();
294 }
295 }
296
297 /// Converts the pointer to a `SockState` into a raw pointer.
298 /// To revert see `from_overlapped`.
into_overlapped(sock_state: Pin<Arc<Mutex<SockState>>>) -> *mut c_void299 fn into_overlapped(sock_state: Pin<Arc<Mutex<SockState>>>) -> *mut c_void {
300 let overlapped_ptr: *const Mutex<SockState> =
301 unsafe { Arc::into_raw(Pin::into_inner_unchecked(sock_state)) };
302 overlapped_ptr as *mut _
303 }
304
305 /// Convert a raw overlapped pointer into a reference to `SockState`.
306 /// Reverts `into_overlapped`.
from_overlapped(ptr: *mut OVERLAPPED) -> Pin<Arc<Mutex<SockState>>>307 fn from_overlapped(ptr: *mut OVERLAPPED) -> Pin<Arc<Mutex<SockState>>> {
308 let sock_ptr: *const Mutex<SockState> = ptr as *const _;
309 unsafe { Pin::new_unchecked(Arc::from_raw(sock_ptr)) }
310 }
311
312 /// Each Selector has a globally unique(ish) ID associated with it. This ID
313 /// gets tracked by `TcpStream`, `TcpListener`, etc... when they are first
314 /// registered with the `Selector`. If a type that is previously associated with
315 /// a `Selector` attempts to register itself with a different `Selector`, the
316 /// operation will return with an error. This matches windows behavior.
317 #[cfg(debug_assertions)]
318 static NEXT_ID: AtomicUsize = AtomicUsize::new(0);
319
320 /// Windows implementation of `sys::Selector`
321 ///
322 /// Edge-triggered event notification is simulated by resetting internal event flag of each socket state `SockState`
323 /// and setting all events back by intercepting all requests that could cause `io::ErrorKind::WouldBlock` happening.
324 ///
325 /// This selector is currently only support socket due to `Afd` driver is winsock2 specific.
326 #[derive(Debug)]
327 pub struct Selector {
328 #[cfg(debug_assertions)]
329 id: usize,
330 pub(super) inner: Arc<SelectorInner>,
331 }
332
333 impl Selector {
new() -> io::Result<Selector>334 pub fn new() -> io::Result<Selector> {
335 SelectorInner::new().map(|inner| {
336 #[cfg(debug_assertions)]
337 let id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1;
338 Selector {
339 #[cfg(debug_assertions)]
340 id,
341 inner: Arc::new(inner),
342 }
343 })
344 }
345
try_clone(&self) -> io::Result<Selector>346 pub fn try_clone(&self) -> io::Result<Selector> {
347 Ok(Selector {
348 #[cfg(debug_assertions)]
349 id: self.id,
350 inner: Arc::clone(&self.inner),
351 })
352 }
353
354 /// # Safety
355 ///
356 /// This requires a mutable reference to self because only a single thread
357 /// can poll IOCP at a time.
select(&mut self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()>358 pub fn select(&mut self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
359 self.inner.select(events, timeout)
360 }
361
clone_port(&self) -> Arc<CompletionPort>362 pub(super) fn clone_port(&self) -> Arc<CompletionPort> {
363 self.inner.cp.clone()
364 }
365
366 #[cfg(feature = "os-ext")]
same_port(&self, other: &Arc<CompletionPort>) -> bool367 pub(super) fn same_port(&self, other: &Arc<CompletionPort>) -> bool {
368 Arc::ptr_eq(&self.inner.cp, other)
369 }
370 }
371
372 cfg_io_source! {
373 use super::InternalState;
374 use crate::Token;
375
376 impl Selector {
377 pub(super) fn register(
378 &self,
379 socket: RawSocket,
380 token: Token,
381 interests: Interest,
382 ) -> io::Result<InternalState> {
383 SelectorInner::register(&self.inner, socket, token, interests)
384 }
385
386 pub(super) fn reregister(
387 &self,
388 state: Pin<Arc<Mutex<SockState>>>,
389 token: Token,
390 interests: Interest,
391 ) -> io::Result<()> {
392 self.inner.reregister(state, token, interests)
393 }
394
395 #[cfg(debug_assertions)]
396 pub fn id(&self) -> usize {
397 self.id
398 }
399 }
400 }
401
402 #[derive(Debug)]
403 pub struct SelectorInner {
404 pub(super) cp: Arc<CompletionPort>,
405 update_queue: Mutex<VecDeque<Pin<Arc<Mutex<SockState>>>>>,
406 afd_group: AfdGroup,
407 is_polling: AtomicBool,
408 }
409
410 // We have ensured thread safety by introducing lock manually.
411 unsafe impl Sync for SelectorInner {}
412
413 impl SelectorInner {
new() -> io::Result<SelectorInner>414 pub fn new() -> io::Result<SelectorInner> {
415 CompletionPort::new(0).map(|cp| {
416 let cp = Arc::new(cp);
417 let cp_afd = Arc::clone(&cp);
418
419 SelectorInner {
420 cp,
421 update_queue: Mutex::new(VecDeque::new()),
422 afd_group: AfdGroup::new(cp_afd),
423 is_polling: AtomicBool::new(false),
424 }
425 })
426 }
427
428 /// # Safety
429 ///
430 /// May only be calling via `Selector::select`.
select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()>431 pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
432 events.clear();
433
434 if timeout.is_none() {
435 loop {
436 let len = self.select2(&mut events.statuses, &mut events.events, None)?;
437 if len == 0 {
438 continue;
439 }
440 break Ok(());
441 }
442 } else {
443 self.select2(&mut events.statuses, &mut events.events, timeout)?;
444 Ok(())
445 }
446 }
447
select2( &self, statuses: &mut [CompletionStatus], events: &mut Vec<Event>, timeout: Option<Duration>, ) -> io::Result<usize>448 pub fn select2(
449 &self,
450 statuses: &mut [CompletionStatus],
451 events: &mut Vec<Event>,
452 timeout: Option<Duration>,
453 ) -> io::Result<usize> {
454 assert!(!self.is_polling.swap(true, Ordering::AcqRel));
455
456 unsafe { self.update_sockets_events() }?;
457
458 let result = self.cp.get_many(statuses, timeout);
459
460 self.is_polling.store(false, Ordering::Relaxed);
461
462 match result {
463 Ok(iocp_events) => Ok(unsafe { self.feed_events(events, iocp_events) }),
464 Err(ref e) if e.raw_os_error() == Some(WAIT_TIMEOUT as i32) => Ok(0),
465 Err(e) => Err(e),
466 }
467 }
468
update_sockets_events(&self) -> io::Result<()>469 unsafe fn update_sockets_events(&self) -> io::Result<()> {
470 let mut update_queue = self.update_queue.lock().unwrap();
471 for sock in update_queue.iter_mut() {
472 let mut sock_internal = sock.lock().unwrap();
473 if !sock_internal.is_pending_deletion() {
474 sock_internal.update(sock)?;
475 }
476 }
477
478 // remove all sock which do not have error, they have afd op pending
479 update_queue.retain(|sock| sock.lock().unwrap().has_error());
480
481 self.afd_group.release_unused_afd();
482 Ok(())
483 }
484
485 // It returns processed count of iocp_events rather than the events itself.
feed_events( &self, events: &mut Vec<Event>, iocp_events: &[CompletionStatus], ) -> usize486 unsafe fn feed_events(
487 &self,
488 events: &mut Vec<Event>,
489 iocp_events: &[CompletionStatus],
490 ) -> usize {
491 let mut n = 0;
492 let mut update_queue = self.update_queue.lock().unwrap();
493 for iocp_event in iocp_events.iter() {
494 if iocp_event.overlapped().is_null() {
495 events.push(Event::from_completion_status(iocp_event));
496 n += 1;
497 continue;
498 } else if iocp_event.token() % 2 == 1 {
499 // Handle is a named pipe. This could be extended to be any non-AFD event.
500 let callback = (*(iocp_event.overlapped() as *mut super::Overlapped)).callback;
501
502 let len = events.len();
503 callback(iocp_event.entry(), Some(events));
504 n += events.len() - len;
505 continue;
506 }
507
508 let sock_state = from_overlapped(iocp_event.overlapped());
509 let mut sock_guard = sock_state.lock().unwrap();
510 if let Some(e) = sock_guard.feed_event() {
511 events.push(e);
512 n += 1;
513 }
514
515 if !sock_guard.is_pending_deletion() {
516 update_queue.push_back(sock_state.clone());
517 }
518 }
519 self.afd_group.release_unused_afd();
520 n
521 }
522 }
523
524 cfg_io_source! {
525 use std::mem::size_of;
526 use std::ptr::null_mut;
527
528 use windows_sys::Win32::Networking::WinSock::{
529 WSAGetLastError, WSAIoctl, SIO_BASE_HANDLE, SIO_BSP_HANDLE,
530 SIO_BSP_HANDLE_POLL, SIO_BSP_HANDLE_SELECT, SOCKET_ERROR,
531 };
532
533
534 impl SelectorInner {
535 fn register(
536 this: &Arc<Self>,
537 socket: RawSocket,
538 token: Token,
539 interests: Interest,
540 ) -> io::Result<InternalState> {
541 let flags = interests_to_afd_flags(interests);
542
543 let sock = {
544 let sock = this._alloc_sock_for_rawsocket(socket)?;
545 let event = Event {
546 flags,
547 data: token.0 as u64,
548 };
549 sock.lock().unwrap().set_event(event);
550 sock
551 };
552
553 let state = InternalState {
554 selector: this.clone(),
555 token,
556 interests,
557 sock_state: sock.clone(),
558 };
559
560 this.queue_state(sock);
561 unsafe { this.update_sockets_events_if_polling()? };
562
563 Ok(state)
564 }
565
566 // Directly accessed in `IoSourceState::do_io`.
567 pub(super) fn reregister(
568 &self,
569 state: Pin<Arc<Mutex<SockState>>>,
570 token: Token,
571 interests: Interest,
572 ) -> io::Result<()> {
573 {
574 let event = Event {
575 flags: interests_to_afd_flags(interests),
576 data: token.0 as u64,
577 };
578
579 state.lock().unwrap().set_event(event);
580 }
581
582 // FIXME: a sock which has_error true should not be re-added to
583 // the update queue because it's already there.
584 self.queue_state(state);
585 unsafe { self.update_sockets_events_if_polling() }
586 }
587
588 /// This function is called by register() and reregister() to start an
589 /// IOCTL_AFD_POLL operation corresponding to the registered events, but
590 /// only if necessary.
591 ///
592 /// Since it is not possible to modify or synchronously cancel an AFD_POLL
593 /// operation, and there can be only one active AFD_POLL operation per
594 /// (socket, completion port) pair at any time, it is expensive to change
595 /// a socket's event registration after it has been submitted to the kernel.
596 ///
597 /// Therefore, if no other threads are polling when interest in a socket
598 /// event is (re)registered, the socket is added to the 'update queue', but
599 /// the actual syscall to start the IOCTL_AFD_POLL operation is deferred
600 /// until just before the GetQueuedCompletionStatusEx() syscall is made.
601 ///
602 /// However, when another thread is already blocked on
603 /// GetQueuedCompletionStatusEx() we tell the kernel about the registered
604 /// socket event(s) immediately.
605 unsafe fn update_sockets_events_if_polling(&self) -> io::Result<()> {
606 if self.is_polling.load(Ordering::Acquire) {
607 self.update_sockets_events()
608 } else {
609 Ok(())
610 }
611 }
612
613 fn queue_state(&self, sock_state: Pin<Arc<Mutex<SockState>>>) {
614 let mut update_queue = self.update_queue.lock().unwrap();
615 update_queue.push_back(sock_state);
616 }
617
618 fn _alloc_sock_for_rawsocket(
619 &self,
620 raw_socket: RawSocket,
621 ) -> io::Result<Pin<Arc<Mutex<SockState>>>> {
622 let afd = self.afd_group.acquire()?;
623 Ok(Arc::pin(Mutex::new(SockState::new(raw_socket, afd)?)))
624 }
625 }
626
627 fn try_get_base_socket(raw_socket: RawSocket, ioctl: u32) -> Result<RawSocket, i32> {
628 let mut base_socket: RawSocket = 0;
629 let mut bytes: u32 = 0;
630 unsafe {
631 if WSAIoctl(
632 raw_socket as usize,
633 ioctl,
634 null_mut(),
635 0,
636 &mut base_socket as *mut _ as *mut c_void,
637 size_of::<RawSocket>() as u32,
638 &mut bytes,
639 null_mut(),
640 None,
641 ) != SOCKET_ERROR
642 {
643 Ok(base_socket)
644 } else {
645 Err(WSAGetLastError())
646 }
647 }
648 }
649
650 fn get_base_socket(raw_socket: RawSocket) -> io::Result<RawSocket> {
651 let res = try_get_base_socket(raw_socket, SIO_BASE_HANDLE);
652 if let Ok(base_socket) = res {
653 return Ok(base_socket);
654 }
655
656 // The `SIO_BASE_HANDLE` should not be intercepted by LSPs, therefore
657 // it should not fail as long as `raw_socket` is a valid socket. See
658 // https://docs.microsoft.com/en-us/windows/win32/winsock/winsock-ioctls.
659 // However, at least one known LSP deliberately breaks it, so we try
660 // some alternative IOCTLs, starting with the most appropriate one.
661 for &ioctl in &[
662 SIO_BSP_HANDLE_SELECT,
663 SIO_BSP_HANDLE_POLL,
664 SIO_BSP_HANDLE,
665 ] {
666 if let Ok(base_socket) = try_get_base_socket(raw_socket, ioctl) {
667 // Since we know now that we're dealing with an LSP (otherwise
668 // SIO_BASE_HANDLE would't have failed), only return any result
669 // when it is different from the original `raw_socket`.
670 if base_socket != raw_socket {
671 return Ok(base_socket);
672 }
673 }
674 }
675
676 // If the alternative IOCTLs also failed, return the original error.
677 let os_error = res.unwrap_err();
678 let err = io::Error::from_raw_os_error(os_error);
679 Err(err)
680 }
681 }
682
683 impl Drop for SelectorInner {
drop(&mut self)684 fn drop(&mut self) {
685 loop {
686 let events_num: usize;
687 let mut statuses: [CompletionStatus; 1024] = [CompletionStatus::zero(); 1024];
688
689 let result = self
690 .cp
691 .get_many(&mut statuses, Some(std::time::Duration::from_millis(0)));
692 match result {
693 Ok(iocp_events) => {
694 events_num = iocp_events.iter().len();
695 for iocp_event in iocp_events.iter() {
696 if iocp_event.overlapped().is_null() {
697 // Custom event
698 } else if iocp_event.token() % 2 == 1 {
699 // Named pipe, dispatch the event so it can release resources
700 let callback = unsafe {
701 (*(iocp_event.overlapped() as *mut super::Overlapped)).callback
702 };
703
704 callback(iocp_event.entry(), None);
705 } else {
706 // drain sock state to release memory of Arc reference
707 let _sock_state = from_overlapped(iocp_event.overlapped());
708 }
709 }
710 }
711
712 Err(_) => {
713 break;
714 }
715 }
716
717 if events_num == 0 {
718 // continue looping until all completion statuses have been drained
719 break;
720 }
721 }
722
723 self.afd_group.release_unused_afd();
724 }
725 }
726
727 cfg_net! {
728 fn interests_to_afd_flags(interests: Interest) -> u32 {
729 let mut flags = 0;
730
731 if interests.is_readable() {
732 flags |= READABLE_FLAGS | READ_CLOSED_FLAGS | ERROR_FLAGS;
733 }
734
735 if interests.is_writable() {
736 flags |= WRITABLE_FLAGS | WRITE_CLOSED_FLAGS | ERROR_FLAGS;
737 }
738
739 flags
740 }
741 }
742