xref: /aosp_15_r20/external/crosvm/cros_async/src/sys/windows/handle_executor.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2020 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::collections::HashMap;
6 use std::future::Future;
7 use std::io;
8 use std::mem;
9 use std::pin::Pin;
10 use std::sync::mpsc;
11 use std::sync::Arc;
12 use std::sync::Weak;
13 use std::task::Waker;
14 
15 use base::named_pipes::BoxedOverlapped;
16 use base::warn;
17 use base::AsRawDescriptor;
18 use base::Error as SysError;
19 use base::RawDescriptor;
20 use futures::task::Context;
21 use futures::task::Poll;
22 use sync::Mutex;
23 use thiserror::Error as ThisError;
24 use winapi::um::handleapi::INVALID_HANDLE_VALUE;
25 use winapi::um::minwinbase::OVERLAPPED;
26 
27 use crate::common_executor;
28 use crate::common_executor::RawExecutor;
29 use crate::common_executor::RawTaskHandle;
30 use crate::sys::windows::io_completion_port::CompletionPacket;
31 use crate::sys::windows::io_completion_port::IoCompletionPort;
32 use crate::waker::WakerToken;
33 use crate::waker::WeakWake;
34 use crate::AsyncError;
35 use crate::AsyncResult;
36 use crate::IoSource;
37 use crate::TaskHandle;
38 
39 const DEFAULT_IO_CONCURRENCY: u32 = 1;
40 
41 #[derive(Debug, ThisError)]
42 pub enum Error {
43     #[error("IO completion port operation failed: {0}")]
44     IocpOperationFailed(SysError),
45     #[error("Failed to get future from executor run.")]
46     FailedToReadFutureFromWakerChannel(mpsc::RecvError),
47     #[error("executor gone before future was dropped.")]
48     ExecutorGone,
49     #[error("tried to remove overlapped operation but it didn't exist.")]
50     RemoveNonExistentOperation,
51 }
52 
53 impl From<Error> for io::Error {
from(e: Error) -> Self54     fn from(e: Error) -> Self {
55         use Error::*;
56         match e {
57             FailedToReadFutureFromWakerChannel(e) => io::Error::new(io::ErrorKind::Other, e),
58             IocpOperationFailed(e) => io::Error::new(io::ErrorKind::Other, e),
59             ExecutorGone => io::Error::new(io::ErrorKind::Other, e),
60             RemoveNonExistentOperation => io::Error::new(io::ErrorKind::Other, e),
61         }
62     }
63 }
64 
65 impl From<Error> for AsyncError {
from(e: Error) -> Self66     fn from(e: Error) -> Self {
67         AsyncError::SysVariants(e.into())
68     }
69 }
70 
71 pub type Result<T> = std::result::Result<T, Error>;
72 
73 /// Represents an overlapped operation that running (or has completed but not yet woken).
74 struct OpData {
75     waker: Option<Waker>,
76 }
77 
78 /// The current status of a future that is running or has completed on HandleReactor.
79 enum OpStatus {
80     Pending(OpData),
81     Completed(CompletionPacket),
82 }
83 
84 pub struct HandleReactor {
85     iocp: IoCompletionPort,
86     overlapped_ops: Mutex<HashMap<WakerToken, OpStatus>>,
87 }
88 
89 impl HandleReactor {
new_with(concurrency: u32) -> Result<Self>90     pub fn new_with(concurrency: u32) -> Result<Self> {
91         let iocp = IoCompletionPort::new(concurrency)?;
92         Ok(Self {
93             iocp,
94             overlapped_ops: Mutex::new(HashMap::with_capacity(64)),
95         })
96     }
97 
new() -> Result<Self>98     pub fn new() -> Result<Self> {
99         Self::new_with(DEFAULT_IO_CONCURRENCY)
100     }
101 
102     /// All descriptors must be first registered with IOCP before any completion packets can be
103     /// received for them.
register_descriptor(&self, rd: &dyn AsRawDescriptor) -> Result<()>104     pub(crate) fn register_descriptor(&self, rd: &dyn AsRawDescriptor) -> Result<()> {
105         self.iocp.register_descriptor(rd)
106     }
107 
108     /// When an overlapped operation is created, it is registered with the executor here. This way,
109     /// when the executor's run thread picks up the completion events, it can associate them back
110     /// to the correct overlapped operation. Notice that here, no waker is registered. This is
111     /// because the await hasn't happened yet, so there is no waker. Once the await is triggered,
112     /// we'll invoke get_overlapped_op_if_ready which will register the waker.
register_overlapped_op(&self, token: &WakerToken)113     pub(crate) fn register_overlapped_op(&self, token: &WakerToken) {
114         let mut ops = self.overlapped_ops.lock();
115         ops.insert(*token, OpStatus::Pending(OpData { waker: None }));
116     }
117 
118     /// Called to register an overlapped IO source with the executor. From here, the source can
119     /// register overlapped operations that will be managed by the executor.
120     #[allow(dead_code)]
register_overlapped_source( &self, raw: &Arc<RawExecutor<HandleReactor>>, rd: &dyn AsRawDescriptor, ) -> Result<RegisteredOverlappedSource>121     pub(crate) fn register_overlapped_source(
122         &self,
123         raw: &Arc<RawExecutor<HandleReactor>>,
124         rd: &dyn AsRawDescriptor,
125     ) -> Result<RegisteredOverlappedSource> {
126         RegisteredOverlappedSource::new(rd, raw)
127     }
128 
129     /// Every time an `OverlappedOperation` is polled, this method will be called. It's a trick to
130     /// register the waker so that completion events can trigger it from the executor's main thread.
get_overlapped_op_if_ready( &self, token: &WakerToken, cx: &mut Context, ) -> Option<CompletionPacket>131     fn get_overlapped_op_if_ready(
132         &self,
133         token: &WakerToken,
134         cx: &mut Context,
135     ) -> Option<CompletionPacket> {
136         let mut ops = self.overlapped_ops.lock();
137 
138         if let OpStatus::Pending(data) = ops
139             .get_mut(token)
140             .expect("`get_overlapped_op_if_ready` called on unknown operation")
141         {
142             data.waker = Some(cx.waker().clone());
143             return None;
144         }
145         if let OpStatus::Completed(pkt) = ops.remove(token).unwrap() {
146             return Some(pkt);
147         }
148         unreachable!("OpStatus didn't match any known variant.");
149     }
150 
151     /// When an `OverlappedOperation` is dropped, this is called to so we don't leak registered
152     /// operations. It's possible the operation was already removed (e.g. via polling), in which
153     /// case this has no effect.
remove_overlapped_op(&self, token: &WakerToken)154     fn remove_overlapped_op(&self, token: &WakerToken) {
155         let mut ops = self.overlapped_ops.lock();
156         if ops.remove(token).is_none() {
157             warn!("Tried to remove non-existent overlapped operation from HandleReactor.");
158         }
159     }
160 }
161 
162 impl common_executor::Reactor for HandleReactor {
new() -> std::io::Result<Self>163     fn new() -> std::io::Result<Self> {
164         Ok(HandleReactor::new()?)
165     }
166 
wake(&self)167     fn wake(&self) {
168         self.iocp.wake().expect("wakeup failed on HandleReactor.");
169     }
170 
on_executor_drop<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>>171     fn on_executor_drop<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
172         // TODO: Cancel overlapped ops and/or wait for everything to complete like the linux
173         // reactors?
174         Box::pin(async {})
175     }
176 
wait_for_work(&self, set_processing: impl Fn()) -> std::io::Result<()>177     fn wait_for_work(&self, set_processing: impl Fn()) -> std::io::Result<()> {
178         let completion_packets = self.iocp.poll()?;
179 
180         set_processing();
181 
182         for pkt in completion_packets {
183             if pkt.completion_key as RawDescriptor == INVALID_HANDLE_VALUE {
184                 // These completion packets aren't from overlapped operations. They're from
185                 // something calling HandleReactor::wake, so they've already enqueued whatever
186                 // they think is runnable into the queue. All the packet does is wake up the
187                 // executor loop.
188                 continue;
189             }
190 
191             let mut overlapped_ops = self.overlapped_ops.lock();
192             if let Some(op) = overlapped_ops.get_mut(&WakerToken(pkt.overlapped_ptr)) {
193                 let waker = match mem::replace(op, OpStatus::Completed(pkt)) {
194                     OpStatus::Pending(OpData { waker }) => waker,
195                     OpStatus::Completed(_) => panic!("operation completed more than once"),
196                 };
197                 drop(overlapped_ops);
198                 if let Some(waker) = waker {
199                     waker.wake();
200                 } else {
201                     // We shouldn't ever get a completion packet for an IO operation that hasn't
202                     // registered its waker.
203                     warn!(
204                         "got a completion packet for an IO operation that had no waker.\
205                              future may be stalled."
206                     )
207                 }
208             }
209         }
210         Ok(())
211     }
212 
new_source<F: AsRawDescriptor>( &self, _ex: &Arc<RawExecutor<Self>>, f: F, ) -> AsyncResult<IoSource<F>>213     fn new_source<F: AsRawDescriptor>(
214         &self,
215         _ex: &Arc<RawExecutor<Self>>,
216         f: F,
217     ) -> AsyncResult<IoSource<F>> {
218         Ok(IoSource::Handle(super::HandleSource::new(f)?))
219     }
220 
wrap_task_handle<R>(task: RawTaskHandle<HandleReactor, R>) -> TaskHandle<R>221     fn wrap_task_handle<R>(task: RawTaskHandle<HandleReactor, R>) -> TaskHandle<R> {
222         TaskHandle::Handle(task)
223     }
224 }
225 
226 /// Represents a handle that has been registered for overlapped operations with a specific executor.
227 /// From here, the OverlappedSource can register overlapped operations with the executor.
228 pub(crate) struct RegisteredOverlappedSource {
229     ex: Weak<RawExecutor<HandleReactor>>,
230 }
231 
232 impl RegisteredOverlappedSource {
new( rd: &dyn AsRawDescriptor, ex: &Arc<RawExecutor<HandleReactor>>, ) -> Result<RegisteredOverlappedSource>233     fn new(
234         rd: &dyn AsRawDescriptor,
235         ex: &Arc<RawExecutor<HandleReactor>>,
236     ) -> Result<RegisteredOverlappedSource> {
237         ex.reactor.register_descriptor(rd)?;
238         Ok(Self {
239             ex: Arc::downgrade(ex),
240         })
241     }
242 
243     /// Registers an overlapped IO operation with this executor. Call this function with the
244     /// overlapped struct that represents the operation **before** making the overlapped IO call.
245     ///
246     /// NOTE: you MUST pass OverlappedOperation::get_overlapped_ptr() as the overlapped IO pointer
247     /// in the IO call.
register_overlapped_operation( &self, offset: Option<u64>, ) -> Result<OverlappedOperation>248     pub fn register_overlapped_operation(
249         &self,
250         offset: Option<u64>,
251     ) -> Result<OverlappedOperation> {
252         OverlappedOperation::new(offset, self.ex.clone())
253     }
254 }
255 
256 impl WeakWake for HandleReactor {
wake_by_ref(weak_self: &Weak<Self>)257     fn wake_by_ref(weak_self: &Weak<Self>) {
258         if let Some(arc_self) = weak_self.upgrade() {
259             common_executor::Reactor::wake(&*arc_self);
260         }
261     }
262 }
263 
264 /// Represents a pending overlapped IO operation. This must be used in the following manner or
265 /// undefined behavior will result:
266 ///     1. The reactor in use is a HandleReactor.
267 ///     2. Immediately after the IO syscall, this future MUST be awaited. We rely on the fact that
268 ///        the executor cannot poll the IOCP before this future is polled for the first time to
269 ///        ensure the waker has been registered. (If the executor polls the IOCP before the waker is
270 ///        registered, the future will stall.)
271 pub(crate) struct OverlappedOperation {
272     overlapped: BoxedOverlapped,
273     ex: Weak<RawExecutor<HandleReactor>>,
274     completed: bool,
275 }
276 
277 impl OverlappedOperation {
new(offset: Option<u64>, ex: Weak<RawExecutor<HandleReactor>>) -> Result<Self>278     fn new(offset: Option<u64>, ex: Weak<RawExecutor<HandleReactor>>) -> Result<Self> {
279         let ret = Self {
280             overlapped: BoxedOverlapped(Box::new(base::create_overlapped(offset, None))),
281             ex,
282             completed: false,
283         };
284         ret.register_op()?;
285         Ok(ret)
286     }
287 
register_op(&self) -> Result<()>288     fn register_op(&self) -> Result<()> {
289         self.ex
290             .upgrade()
291             .ok_or(Error::ExecutorGone)?
292             .reactor
293             .register_overlapped_op(&self.get_token());
294         Ok(())
295     }
296 
297     /// Returns a pointer to the overlapped struct representing the operation. This MUST be used
298     /// when making the overlapped IO call or the executor will not be able to wake the right
299     /// future.
get_overlapped(&mut self) -> &mut OVERLAPPED300     pub fn get_overlapped(&mut self) -> &mut OVERLAPPED {
301         &mut self.overlapped.0
302     }
303 
304     #[inline]
get_token(&self) -> WakerToken305     pub fn get_token(&self) -> WakerToken {
306         WakerToken((&*self.overlapped.0) as *const _ as usize)
307     }
308 }
309 
310 impl Future for OverlappedOperation {
311     type Output = Result<CompletionPacket>;
312 
poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>313     fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
314         if self.completed {
315             panic!("OverlappedOperation polled after returning Poll::Ready");
316         }
317         if let Some(ex) = self.ex.upgrade() {
318             if let Some(completion_pkt) =
319                 ex.reactor.get_overlapped_op_if_ready(&self.get_token(), cx)
320             {
321                 self.completed = true;
322                 Poll::Ready(Ok(completion_pkt))
323             } else {
324                 Poll::Pending
325             }
326         } else {
327             Poll::Ready(Err(Error::ExecutorGone))
328         }
329     }
330 }
331 
332 impl Drop for OverlappedOperation {
drop(&mut self)333     fn drop(&mut self) {
334         if !self.completed {
335             if let Some(ex) = self.ex.upgrade() {
336                 ex.reactor.remove_overlapped_op(&self.get_token());
337             }
338         }
339     }
340 }
341 
342 #[cfg(test)]
343 mod test {
344     use super::*;
345     const FUT_MSG: i32 = 5;
346     use std::rc::Rc;
347 
348     use futures::channel::mpsc as fut_mpsc;
349     use futures::SinkExt;
350     use futures::StreamExt;
351 
352     use crate::BlockingPool;
353     use crate::ExecutorTrait;
354 
355     #[test]
run_future()356     fn run_future() {
357         let (send, recv) = mpsc::channel();
358         async fn this_test(send: mpsc::Sender<i32>) {
359             send.send(FUT_MSG).unwrap();
360         }
361 
362         let ex = RawExecutor::<HandleReactor>::new().unwrap();
363         ex.run_until(this_test(send)).unwrap();
364         assert_eq!(recv.recv().unwrap(), FUT_MSG);
365     }
366 
367     #[test]
spawn_future()368     fn spawn_future() {
369         let (send, recv) = fut_mpsc::channel(1);
370         let (send_done_signal, receive_done_signal) = mpsc::channel();
371 
372         async fn message_sender(mut send: fut_mpsc::Sender<i32>) {
373             send.send(FUT_MSG).await.unwrap();
374         }
375 
376         async fn message_receiver(mut recv: fut_mpsc::Receiver<i32>, done: mpsc::Sender<bool>) {
377             assert_eq!(recv.next().await.unwrap(), FUT_MSG);
378             done.send(true).unwrap();
379         }
380 
381         let ex = RawExecutor::<HandleReactor>::new().unwrap();
382         ex.spawn(message_sender(send)).detach();
383         ex.run_until(message_receiver(recv, send_done_signal))
384             .unwrap();
385         assert_eq!(receive_done_signal.recv().unwrap(), true);
386     }
387 
388     // Dropping a task that owns a BlockingPool shouldn't leak the pool.
389     #[test]
drop_detached_blocking_pool()390     fn drop_detached_blocking_pool() {
391         struct Cleanup(BlockingPool);
392 
393         impl Drop for Cleanup {
394             fn drop(&mut self) {
395                 // Make sure we shutdown cleanly (BlockingPool::drop just prints a warning).
396                 self.0
397                     .shutdown(Some(
398                         std::time::Instant::now() + std::time::Duration::from_secs(1),
399                     ))
400                     .unwrap();
401             }
402         }
403 
404         let rc = Rc::new(std::cell::Cell::new(0));
405         {
406             let ex = RawExecutor::<HandleReactor>::new().unwrap();
407             let rc_clone = rc.clone();
408             ex.spawn_local(async move {
409                 rc_clone.set(1);
410                 let pool = Cleanup(BlockingPool::new(1, std::time::Duration::new(60, 0)));
411                 let (send, recv) = std::sync::mpsc::sync_channel::<()>(0);
412                 // Spawn a blocking task.
413                 let blocking_task = pool.0.spawn(move || {
414                     // Rendezvous.
415                     assert_eq!(recv.recv(), Ok(()));
416                     // Wait for drop.
417                     assert_eq!(recv.recv(), Err(std::sync::mpsc::RecvError));
418                 });
419                 // Make sure it has actually started (using a "rendezvous channel" send).
420                 //
421                 // Without this step, we'll have a race where we can shutdown the blocking pool
422                 // before the worker thread pops off the task.
423                 send.send(()).unwrap();
424                 // Wait for it to finish
425                 blocking_task.await;
426                 rc_clone.set(2);
427             })
428             .detach();
429             ex.run_until(async {}).unwrap();
430             // `ex` is dropped here. If everything is working as expected, it should drop all of
431             // its tasks, including `send` and `pool` (in that order, which is important). `pool`'s
432             // `Drop` impl will try to join all the worker threads, which should work because send
433             // half of the channel closed.
434         }
435         assert_eq!(rc.get(), 1);
436         Rc::try_unwrap(rc).expect("Rc had too many refs");
437     }
438 }
439