1 //! Bindings to IOCP, I/O Completion Ports
2 
3 use super::{Handle, Overlapped};
4 use std::cmp;
5 use std::fmt;
6 use std::io;
7 use std::mem;
8 use std::os::windows::io::*;
9 use std::time::Duration;
10 
11 use windows_sys::Win32::Foundation::{HANDLE, INVALID_HANDLE_VALUE};
12 use windows_sys::Win32::System::IO::{
13     CreateIoCompletionPort, GetQueuedCompletionStatusEx, PostQueuedCompletionStatus, OVERLAPPED,
14     OVERLAPPED_ENTRY,
15 };
16 
17 /// A handle to an Windows I/O Completion Port.
18 #[derive(Debug)]
19 pub(crate) struct CompletionPort {
20     handle: Handle,
21 }
22 
23 /// A status message received from an I/O completion port.
24 ///
25 /// These statuses can be created via the `new` or `empty` constructors and then
26 /// provided to a completion port, or they are read out of a completion port.
27 /// The fields of each status are read through its accessor methods.
28 #[derive(Clone, Copy)]
29 #[repr(transparent)]
30 pub struct CompletionStatus(OVERLAPPED_ENTRY);
31 
32 impl fmt::Debug for CompletionStatus {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result33     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
34         write!(f, "CompletionStatus(OVERLAPPED_ENTRY)")
35     }
36 }
37 
38 unsafe impl Send for CompletionStatus {}
39 unsafe impl Sync for CompletionStatus {}
40 
41 impl CompletionPort {
42     /// Creates a new I/O completion port with the specified concurrency value.
43     ///
44     /// The number of threads given corresponds to the level of concurrency
45     /// allowed for threads associated with this port. Consult the Windows
46     /// documentation for more information about this value.
new(threads: u32) -> io::Result<CompletionPort>47     pub fn new(threads: u32) -> io::Result<CompletionPort> {
48         let ret = unsafe { CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, threads) };
49         if ret == 0 {
50             Err(io::Error::last_os_error())
51         } else {
52             Ok(CompletionPort {
53                 handle: Handle::new(ret),
54             })
55         }
56     }
57 
58     /// Associates a new `HANDLE` to this I/O completion port.
59     ///
60     /// This function will associate the given handle to this port with the
61     /// given `token` to be returned in status messages whenever it receives a
62     /// notification.
63     ///
64     /// Any object which is convertible to a `HANDLE` via the `AsRawHandle`
65     /// trait can be provided to this function, such as `std::fs::File` and
66     /// friends.
67     #[cfg(any(feature = "net", feature = "os-ext"))]
add_handle<T: AsRawHandle + ?Sized>(&self, token: usize, t: &T) -> io::Result<()>68     pub fn add_handle<T: AsRawHandle + ?Sized>(&self, token: usize, t: &T) -> io::Result<()> {
69         let ret = unsafe {
70             CreateIoCompletionPort(t.as_raw_handle() as HANDLE, self.handle.raw(), token, 0)
71         };
72         if ret == 0 {
73             Err(io::Error::last_os_error())
74         } else {
75             Ok(())
76         }
77     }
78 
79     /// Dequeues a number of completion statuses from this I/O completion port.
80     ///
81     /// This function is the same as `get` except that it may return more than
82     /// one status. A buffer of "zero" statuses is provided (the contents are
83     /// not read) and then on success this function will return a sub-slice of
84     /// statuses which represent those which were dequeued from this port. This
85     /// function does not wait to fill up the entire list of statuses provided.
86     ///
87     /// Like with `get`, a timeout may be specified for this operation.
get_many<'a>( &self, list: &'a mut [CompletionStatus], timeout: Option<Duration>, ) -> io::Result<&'a mut [CompletionStatus]>88     pub fn get_many<'a>(
89         &self,
90         list: &'a mut [CompletionStatus],
91         timeout: Option<Duration>,
92     ) -> io::Result<&'a mut [CompletionStatus]> {
93         debug_assert_eq!(
94             mem::size_of::<CompletionStatus>(),
95             mem::size_of::<OVERLAPPED_ENTRY>()
96         );
97         let mut removed = 0;
98         let timeout = duration_millis(timeout);
99         let len = cmp::min(list.len(), u32::MAX as usize) as u32;
100         let ret = unsafe {
101             GetQueuedCompletionStatusEx(
102                 self.handle.raw(),
103                 list.as_ptr() as *mut _,
104                 len,
105                 &mut removed,
106                 timeout,
107                 0,
108             )
109         };
110 
111         if ret == 0 {
112             Err(io::Error::last_os_error())
113         } else {
114             Ok(&mut list[..removed as usize])
115         }
116     }
117 
118     /// Posts a new completion status onto this I/O completion port.
119     ///
120     /// This function will post the given status, with custom parameters, to the
121     /// port. Threads blocked in `get` or `get_many` will eventually receive
122     /// this status.
post(&self, status: CompletionStatus) -> io::Result<()>123     pub fn post(&self, status: CompletionStatus) -> io::Result<()> {
124         let ret = unsafe {
125             PostQueuedCompletionStatus(
126                 self.handle.raw(),
127                 status.0.dwNumberOfBytesTransferred,
128                 status.0.lpCompletionKey,
129                 status.0.lpOverlapped,
130             )
131         };
132 
133         if ret == 0 {
134             Err(io::Error::last_os_error())
135         } else {
136             Ok(())
137         }
138     }
139 }
140 
141 impl AsRawHandle for CompletionPort {
as_raw_handle(&self) -> RawHandle142     fn as_raw_handle(&self) -> RawHandle {
143         self.handle.raw() as RawHandle
144     }
145 }
146 
147 impl FromRawHandle for CompletionPort {
from_raw_handle(handle: RawHandle) -> CompletionPort148     unsafe fn from_raw_handle(handle: RawHandle) -> CompletionPort {
149         CompletionPort {
150             handle: Handle::new(handle as HANDLE),
151         }
152     }
153 }
154 
155 impl IntoRawHandle for CompletionPort {
into_raw_handle(self) -> RawHandle156     fn into_raw_handle(self) -> RawHandle {
157         self.handle.into_raw()
158     }
159 }
160 
161 impl CompletionStatus {
162     /// Creates a new completion status with the provided parameters.
163     ///
164     /// This function is useful when creating a status to send to a port with
165     /// the `post` method. The parameters are opaquely passed through and not
166     /// interpreted by the system at all.
new(bytes: u32, token: usize, overlapped: *mut Overlapped) -> Self167     pub(crate) fn new(bytes: u32, token: usize, overlapped: *mut Overlapped) -> Self {
168         CompletionStatus(OVERLAPPED_ENTRY {
169             dwNumberOfBytesTransferred: bytes,
170             lpCompletionKey: token,
171             lpOverlapped: overlapped as *mut _,
172             Internal: 0,
173         })
174     }
175 
176     /// Creates a new borrowed completion status from the borrowed
177     /// `OVERLAPPED_ENTRY` argument provided.
178     ///
179     /// This method will wrap the `OVERLAPPED_ENTRY` in a `CompletionStatus`,
180     /// returning the wrapped structure.
181     #[cfg(feature = "os-ext")]
from_entry(entry: &OVERLAPPED_ENTRY) -> &Self182     pub fn from_entry(entry: &OVERLAPPED_ENTRY) -> &Self {
183         // Safety: CompletionStatus is repr(transparent) w/ OVERLAPPED_ENTRY, so
184         // a reference to one is guaranteed to be layout compatible with the
185         // reference to another.
186         unsafe { &*(entry as *const _ as *const _) }
187     }
188 
189     /// Creates a new "zero" completion status.
190     ///
191     /// This function is useful when creating a stack buffer or vector of
192     /// completion statuses to be passed to the `get_many` function.
zero() -> Self193     pub fn zero() -> Self {
194         Self::new(0, 0, std::ptr::null_mut())
195     }
196 
197     /// Returns the number of bytes that were transferred for the I/O operation
198     /// associated with this completion status.
bytes_transferred(&self) -> u32199     pub fn bytes_transferred(&self) -> u32 {
200         self.0.dwNumberOfBytesTransferred
201     }
202 
203     /// Returns the completion key value associated with the file handle whose
204     /// I/O operation has completed.
205     ///
206     /// A completion key is a per-handle key that is specified when it is added
207     /// to an I/O completion port via `add_handle` or `add_socket`.
token(&self) -> usize208     pub fn token(&self) -> usize {
209         self.0.lpCompletionKey as usize
210     }
211 
212     /// Returns a pointer to the `Overlapped` structure that was specified when
213     /// the I/O operation was started.
overlapped(&self) -> *mut OVERLAPPED214     pub fn overlapped(&self) -> *mut OVERLAPPED {
215         self.0.lpOverlapped
216     }
217 
218     /// Returns a pointer to the internal `OVERLAPPED_ENTRY` object.
entry(&self) -> &OVERLAPPED_ENTRY219     pub fn entry(&self) -> &OVERLAPPED_ENTRY {
220         &self.0
221     }
222 }
223 
224 #[inline]
duration_millis(dur: Option<Duration>) -> u32225 fn duration_millis(dur: Option<Duration>) -> u32 {
226     if let Some(dur) = dur {
227         // `Duration::as_millis` truncates, so round up. This avoids
228         // turning sub-millisecond timeouts into a zero timeout, unless
229         // the caller explicitly requests that by specifying a zero
230         // timeout.
231         let dur_ms = dur
232             .checked_add(Duration::from_nanos(999_999))
233             .unwrap_or(dur)
234             .as_millis();
235         cmp::min(dur_ms, u32::MAX as u128) as u32
236     } else {
237         u32::MAX
238     }
239 }
240 
241 #[cfg(test)]
242 mod tests {
243     use super::{CompletionPort, CompletionStatus};
244 
245     #[test]
is_send_sync()246     fn is_send_sync() {
247         fn is_send_sync<T: Send + Sync>() {}
248         is_send_sync::<CompletionPort>();
249     }
250 
251     #[test]
get_many()252     fn get_many() {
253         let c = CompletionPort::new(1).unwrap();
254 
255         c.post(CompletionStatus::new(1, 2, 3 as *mut _)).unwrap();
256         c.post(CompletionStatus::new(4, 5, 6 as *mut _)).unwrap();
257 
258         let mut s = vec![CompletionStatus::zero(); 4];
259         {
260             let s = c.get_many(&mut s, None).unwrap();
261             assert_eq!(s.len(), 2);
262             assert_eq!(s[0].bytes_transferred(), 1);
263             assert_eq!(s[0].token(), 2);
264             assert_eq!(s[0].overlapped(), 3 as *mut _);
265             assert_eq!(s[1].bytes_transferred(), 4);
266             assert_eq!(s[1].token(), 5);
267             assert_eq!(s[1].overlapped(), 6 as *mut _);
268         }
269         assert_eq!(s[2].bytes_transferred(), 0);
270         assert_eq!(s[2].token(), 0);
271         assert_eq!(s[2].overlapped(), 0 as *mut _);
272     }
273 }
274