1 //! Bindings to IOCP, I/O Completion Ports
2
3 use super::{Handle, Overlapped};
4 use std::cmp;
5 use std::fmt;
6 use std::io;
7 use std::mem;
8 use std::os::windows::io::*;
9 use std::time::Duration;
10
11 use windows_sys::Win32::Foundation::{HANDLE, INVALID_HANDLE_VALUE};
12 use windows_sys::Win32::System::IO::{
13 CreateIoCompletionPort, GetQueuedCompletionStatusEx, PostQueuedCompletionStatus, OVERLAPPED,
14 OVERLAPPED_ENTRY,
15 };
16
17 /// A handle to an Windows I/O Completion Port.
18 #[derive(Debug)]
19 pub(crate) struct CompletionPort {
20 handle: Handle,
21 }
22
23 /// A status message received from an I/O completion port.
24 ///
25 /// These statuses can be created via the `new` or `empty` constructors and then
26 /// provided to a completion port, or they are read out of a completion port.
27 /// The fields of each status are read through its accessor methods.
28 #[derive(Clone, Copy)]
29 #[repr(transparent)]
30 pub struct CompletionStatus(OVERLAPPED_ENTRY);
31
32 impl fmt::Debug for CompletionStatus {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result33 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
34 write!(f, "CompletionStatus(OVERLAPPED_ENTRY)")
35 }
36 }
37
38 unsafe impl Send for CompletionStatus {}
39 unsafe impl Sync for CompletionStatus {}
40
41 impl CompletionPort {
42 /// Creates a new I/O completion port with the specified concurrency value.
43 ///
44 /// The number of threads given corresponds to the level of concurrency
45 /// allowed for threads associated with this port. Consult the Windows
46 /// documentation for more information about this value.
new(threads: u32) -> io::Result<CompletionPort>47 pub fn new(threads: u32) -> io::Result<CompletionPort> {
48 let ret = unsafe { CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, threads) };
49 if ret == 0 {
50 Err(io::Error::last_os_error())
51 } else {
52 Ok(CompletionPort {
53 handle: Handle::new(ret),
54 })
55 }
56 }
57
58 /// Associates a new `HANDLE` to this I/O completion port.
59 ///
60 /// This function will associate the given handle to this port with the
61 /// given `token` to be returned in status messages whenever it receives a
62 /// notification.
63 ///
64 /// Any object which is convertible to a `HANDLE` via the `AsRawHandle`
65 /// trait can be provided to this function, such as `std::fs::File` and
66 /// friends.
67 #[cfg(any(feature = "net", feature = "os-ext"))]
add_handle<T: AsRawHandle + ?Sized>(&self, token: usize, t: &T) -> io::Result<()>68 pub fn add_handle<T: AsRawHandle + ?Sized>(&self, token: usize, t: &T) -> io::Result<()> {
69 let ret = unsafe {
70 CreateIoCompletionPort(t.as_raw_handle() as HANDLE, self.handle.raw(), token, 0)
71 };
72 if ret == 0 {
73 Err(io::Error::last_os_error())
74 } else {
75 Ok(())
76 }
77 }
78
79 /// Dequeues a number of completion statuses from this I/O completion port.
80 ///
81 /// This function is the same as `get` except that it may return more than
82 /// one status. A buffer of "zero" statuses is provided (the contents are
83 /// not read) and then on success this function will return a sub-slice of
84 /// statuses which represent those which were dequeued from this port. This
85 /// function does not wait to fill up the entire list of statuses provided.
86 ///
87 /// Like with `get`, a timeout may be specified for this operation.
get_many<'a>( &self, list: &'a mut [CompletionStatus], timeout: Option<Duration>, ) -> io::Result<&'a mut [CompletionStatus]>88 pub fn get_many<'a>(
89 &self,
90 list: &'a mut [CompletionStatus],
91 timeout: Option<Duration>,
92 ) -> io::Result<&'a mut [CompletionStatus]> {
93 debug_assert_eq!(
94 mem::size_of::<CompletionStatus>(),
95 mem::size_of::<OVERLAPPED_ENTRY>()
96 );
97 let mut removed = 0;
98 let timeout = duration_millis(timeout);
99 let len = cmp::min(list.len(), u32::MAX as usize) as u32;
100 let ret = unsafe {
101 GetQueuedCompletionStatusEx(
102 self.handle.raw(),
103 list.as_ptr() as *mut _,
104 len,
105 &mut removed,
106 timeout,
107 0,
108 )
109 };
110
111 if ret == 0 {
112 Err(io::Error::last_os_error())
113 } else {
114 Ok(&mut list[..removed as usize])
115 }
116 }
117
118 /// Posts a new completion status onto this I/O completion port.
119 ///
120 /// This function will post the given status, with custom parameters, to the
121 /// port. Threads blocked in `get` or `get_many` will eventually receive
122 /// this status.
post(&self, status: CompletionStatus) -> io::Result<()>123 pub fn post(&self, status: CompletionStatus) -> io::Result<()> {
124 let ret = unsafe {
125 PostQueuedCompletionStatus(
126 self.handle.raw(),
127 status.0.dwNumberOfBytesTransferred,
128 status.0.lpCompletionKey,
129 status.0.lpOverlapped,
130 )
131 };
132
133 if ret == 0 {
134 Err(io::Error::last_os_error())
135 } else {
136 Ok(())
137 }
138 }
139 }
140
141 impl AsRawHandle for CompletionPort {
as_raw_handle(&self) -> RawHandle142 fn as_raw_handle(&self) -> RawHandle {
143 self.handle.raw() as RawHandle
144 }
145 }
146
147 impl FromRawHandle for CompletionPort {
from_raw_handle(handle: RawHandle) -> CompletionPort148 unsafe fn from_raw_handle(handle: RawHandle) -> CompletionPort {
149 CompletionPort {
150 handle: Handle::new(handle as HANDLE),
151 }
152 }
153 }
154
155 impl IntoRawHandle for CompletionPort {
into_raw_handle(self) -> RawHandle156 fn into_raw_handle(self) -> RawHandle {
157 self.handle.into_raw()
158 }
159 }
160
161 impl CompletionStatus {
162 /// Creates a new completion status with the provided parameters.
163 ///
164 /// This function is useful when creating a status to send to a port with
165 /// the `post` method. The parameters are opaquely passed through and not
166 /// interpreted by the system at all.
new(bytes: u32, token: usize, overlapped: *mut Overlapped) -> Self167 pub(crate) fn new(bytes: u32, token: usize, overlapped: *mut Overlapped) -> Self {
168 CompletionStatus(OVERLAPPED_ENTRY {
169 dwNumberOfBytesTransferred: bytes,
170 lpCompletionKey: token,
171 lpOverlapped: overlapped as *mut _,
172 Internal: 0,
173 })
174 }
175
176 /// Creates a new borrowed completion status from the borrowed
177 /// `OVERLAPPED_ENTRY` argument provided.
178 ///
179 /// This method will wrap the `OVERLAPPED_ENTRY` in a `CompletionStatus`,
180 /// returning the wrapped structure.
181 #[cfg(feature = "os-ext")]
from_entry(entry: &OVERLAPPED_ENTRY) -> &Self182 pub fn from_entry(entry: &OVERLAPPED_ENTRY) -> &Self {
183 // Safety: CompletionStatus is repr(transparent) w/ OVERLAPPED_ENTRY, so
184 // a reference to one is guaranteed to be layout compatible with the
185 // reference to another.
186 unsafe { &*(entry as *const _ as *const _) }
187 }
188
189 /// Creates a new "zero" completion status.
190 ///
191 /// This function is useful when creating a stack buffer or vector of
192 /// completion statuses to be passed to the `get_many` function.
zero() -> Self193 pub fn zero() -> Self {
194 Self::new(0, 0, std::ptr::null_mut())
195 }
196
197 /// Returns the number of bytes that were transferred for the I/O operation
198 /// associated with this completion status.
bytes_transferred(&self) -> u32199 pub fn bytes_transferred(&self) -> u32 {
200 self.0.dwNumberOfBytesTransferred
201 }
202
203 /// Returns the completion key value associated with the file handle whose
204 /// I/O operation has completed.
205 ///
206 /// A completion key is a per-handle key that is specified when it is added
207 /// to an I/O completion port via `add_handle` or `add_socket`.
token(&self) -> usize208 pub fn token(&self) -> usize {
209 self.0.lpCompletionKey as usize
210 }
211
212 /// Returns a pointer to the `Overlapped` structure that was specified when
213 /// the I/O operation was started.
overlapped(&self) -> *mut OVERLAPPED214 pub fn overlapped(&self) -> *mut OVERLAPPED {
215 self.0.lpOverlapped
216 }
217
218 /// Returns a pointer to the internal `OVERLAPPED_ENTRY` object.
entry(&self) -> &OVERLAPPED_ENTRY219 pub fn entry(&self) -> &OVERLAPPED_ENTRY {
220 &self.0
221 }
222 }
223
224 #[inline]
duration_millis(dur: Option<Duration>) -> u32225 fn duration_millis(dur: Option<Duration>) -> u32 {
226 if let Some(dur) = dur {
227 // `Duration::as_millis` truncates, so round up. This avoids
228 // turning sub-millisecond timeouts into a zero timeout, unless
229 // the caller explicitly requests that by specifying a zero
230 // timeout.
231 let dur_ms = dur
232 .checked_add(Duration::from_nanos(999_999))
233 .unwrap_or(dur)
234 .as_millis();
235 cmp::min(dur_ms, u32::MAX as u128) as u32
236 } else {
237 u32::MAX
238 }
239 }
240
241 #[cfg(test)]
242 mod tests {
243 use super::{CompletionPort, CompletionStatus};
244
245 #[test]
is_send_sync()246 fn is_send_sync() {
247 fn is_send_sync<T: Send + Sync>() {}
248 is_send_sync::<CompletionPort>();
249 }
250
251 #[test]
get_many()252 fn get_many() {
253 let c = CompletionPort::new(1).unwrap();
254
255 c.post(CompletionStatus::new(1, 2, 3 as *mut _)).unwrap();
256 c.post(CompletionStatus::new(4, 5, 6 as *mut _)).unwrap();
257
258 let mut s = vec![CompletionStatus::zero(); 4];
259 {
260 let s = c.get_many(&mut s, None).unwrap();
261 assert_eq!(s.len(), 2);
262 assert_eq!(s[0].bytes_transferred(), 1);
263 assert_eq!(s[0].token(), 2);
264 assert_eq!(s[0].overlapped(), 3 as *mut _);
265 assert_eq!(s[1].bytes_transferred(), 4);
266 assert_eq!(s[1].token(), 5);
267 assert_eq!(s[1].overlapped(), 6 as *mut _);
268 }
269 assert_eq!(s[2].bytes_transferred(), 0);
270 assert_eq!(s[2].token(), 0);
271 assert_eq!(s[2].overlapped(), 0 as *mut _);
272 }
273 }
274