1 // Copyright 2020 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 use std::collections::HashMap; 6 use std::future::Future; 7 use std::io; 8 use std::mem; 9 use std::pin::Pin; 10 use std::sync::mpsc; 11 use std::sync::Arc; 12 use std::sync::Weak; 13 use std::task::Waker; 14 15 use base::named_pipes::BoxedOverlapped; 16 use base::warn; 17 use base::AsRawDescriptor; 18 use base::Error as SysError; 19 use base::RawDescriptor; 20 use futures::task::Context; 21 use futures::task::Poll; 22 use sync::Mutex; 23 use thiserror::Error as ThisError; 24 use winapi::um::handleapi::INVALID_HANDLE_VALUE; 25 use winapi::um::minwinbase::OVERLAPPED; 26 27 use crate::common_executor; 28 use crate::common_executor::RawExecutor; 29 use crate::common_executor::RawTaskHandle; 30 use crate::sys::windows::io_completion_port::CompletionPacket; 31 use crate::sys::windows::io_completion_port::IoCompletionPort; 32 use crate::waker::WakerToken; 33 use crate::waker::WeakWake; 34 use crate::AsyncError; 35 use crate::AsyncResult; 36 use crate::IoSource; 37 use crate::TaskHandle; 38 39 const DEFAULT_IO_CONCURRENCY: u32 = 1; 40 41 #[derive(Debug, ThisError)] 42 pub enum Error { 43 #[error("IO completion port operation failed: {0}")] 44 IocpOperationFailed(SysError), 45 #[error("Failed to get future from executor run.")] 46 FailedToReadFutureFromWakerChannel(mpsc::RecvError), 47 #[error("executor gone before future was dropped.")] 48 ExecutorGone, 49 #[error("tried to remove overlapped operation but it didn't exist.")] 50 RemoveNonExistentOperation, 51 } 52 53 impl From<Error> for io::Error { from(e: Error) -> Self54 fn from(e: Error) -> Self { 55 use Error::*; 56 match e { 57 FailedToReadFutureFromWakerChannel(e) => io::Error::new(io::ErrorKind::Other, e), 58 IocpOperationFailed(e) => io::Error::new(io::ErrorKind::Other, e), 59 ExecutorGone => io::Error::new(io::ErrorKind::Other, e), 60 RemoveNonExistentOperation => io::Error::new(io::ErrorKind::Other, e), 61 } 62 } 63 } 64 65 impl From<Error> for AsyncError { from(e: Error) -> Self66 fn from(e: Error) -> Self { 67 AsyncError::SysVariants(e.into()) 68 } 69 } 70 71 pub type Result<T> = std::result::Result<T, Error>; 72 73 /// Represents an overlapped operation that running (or has completed but not yet woken). 74 struct OpData { 75 waker: Option<Waker>, 76 } 77 78 /// The current status of a future that is running or has completed on HandleReactor. 79 enum OpStatus { 80 Pending(OpData), 81 Completed(CompletionPacket), 82 } 83 84 pub struct HandleReactor { 85 iocp: IoCompletionPort, 86 overlapped_ops: Mutex<HashMap<WakerToken, OpStatus>>, 87 } 88 89 impl HandleReactor { new_with(concurrency: u32) -> Result<Self>90 pub fn new_with(concurrency: u32) -> Result<Self> { 91 let iocp = IoCompletionPort::new(concurrency)?; 92 Ok(Self { 93 iocp, 94 overlapped_ops: Mutex::new(HashMap::with_capacity(64)), 95 }) 96 } 97 new() -> Result<Self>98 pub fn new() -> Result<Self> { 99 Self::new_with(DEFAULT_IO_CONCURRENCY) 100 } 101 102 /// All descriptors must be first registered with IOCP before any completion packets can be 103 /// received for them. register_descriptor(&self, rd: &dyn AsRawDescriptor) -> Result<()>104 pub(crate) fn register_descriptor(&self, rd: &dyn AsRawDescriptor) -> Result<()> { 105 self.iocp.register_descriptor(rd) 106 } 107 108 /// When an overlapped operation is created, it is registered with the executor here. This way, 109 /// when the executor's run thread picks up the completion events, it can associate them back 110 /// to the correct overlapped operation. Notice that here, no waker is registered. This is 111 /// because the await hasn't happened yet, so there is no waker. Once the await is triggered, 112 /// we'll invoke get_overlapped_op_if_ready which will register the waker. register_overlapped_op(&self, token: &WakerToken)113 pub(crate) fn register_overlapped_op(&self, token: &WakerToken) { 114 let mut ops = self.overlapped_ops.lock(); 115 ops.insert(*token, OpStatus::Pending(OpData { waker: None })); 116 } 117 118 /// Called to register an overlapped IO source with the executor. From here, the source can 119 /// register overlapped operations that will be managed by the executor. 120 #[allow(dead_code)] register_overlapped_source( &self, raw: &Arc<RawExecutor<HandleReactor>>, rd: &dyn AsRawDescriptor, ) -> Result<RegisteredOverlappedSource>121 pub(crate) fn register_overlapped_source( 122 &self, 123 raw: &Arc<RawExecutor<HandleReactor>>, 124 rd: &dyn AsRawDescriptor, 125 ) -> Result<RegisteredOverlappedSource> { 126 RegisteredOverlappedSource::new(rd, raw) 127 } 128 129 /// Every time an `OverlappedOperation` is polled, this method will be called. It's a trick to 130 /// register the waker so that completion events can trigger it from the executor's main thread. get_overlapped_op_if_ready( &self, token: &WakerToken, cx: &mut Context, ) -> Option<CompletionPacket>131 fn get_overlapped_op_if_ready( 132 &self, 133 token: &WakerToken, 134 cx: &mut Context, 135 ) -> Option<CompletionPacket> { 136 let mut ops = self.overlapped_ops.lock(); 137 138 if let OpStatus::Pending(data) = ops 139 .get_mut(token) 140 .expect("`get_overlapped_op_if_ready` called on unknown operation") 141 { 142 data.waker = Some(cx.waker().clone()); 143 return None; 144 } 145 if let OpStatus::Completed(pkt) = ops.remove(token).unwrap() { 146 return Some(pkt); 147 } 148 unreachable!("OpStatus didn't match any known variant."); 149 } 150 151 /// When an `OverlappedOperation` is dropped, this is called to so we don't leak registered 152 /// operations. It's possible the operation was already removed (e.g. via polling), in which 153 /// case this has no effect. remove_overlapped_op(&self, token: &WakerToken)154 fn remove_overlapped_op(&self, token: &WakerToken) { 155 let mut ops = self.overlapped_ops.lock(); 156 if ops.remove(token).is_none() { 157 warn!("Tried to remove non-existent overlapped operation from HandleReactor."); 158 } 159 } 160 } 161 162 impl common_executor::Reactor for HandleReactor { new() -> std::io::Result<Self>163 fn new() -> std::io::Result<Self> { 164 Ok(HandleReactor::new()?) 165 } 166 wake(&self)167 fn wake(&self) { 168 self.iocp.wake().expect("wakeup failed on HandleReactor."); 169 } 170 on_executor_drop<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>>171 fn on_executor_drop<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>> { 172 // TODO: Cancel overlapped ops and/or wait for everything to complete like the linux 173 // reactors? 174 Box::pin(async {}) 175 } 176 wait_for_work(&self, set_processing: impl Fn()) -> std::io::Result<()>177 fn wait_for_work(&self, set_processing: impl Fn()) -> std::io::Result<()> { 178 let completion_packets = self.iocp.poll()?; 179 180 set_processing(); 181 182 for pkt in completion_packets { 183 if pkt.completion_key as RawDescriptor == INVALID_HANDLE_VALUE { 184 // These completion packets aren't from overlapped operations. They're from 185 // something calling HandleReactor::wake, so they've already enqueued whatever 186 // they think is runnable into the queue. All the packet does is wake up the 187 // executor loop. 188 continue; 189 } 190 191 let mut overlapped_ops = self.overlapped_ops.lock(); 192 if let Some(op) = overlapped_ops.get_mut(&WakerToken(pkt.overlapped_ptr)) { 193 let waker = match mem::replace(op, OpStatus::Completed(pkt)) { 194 OpStatus::Pending(OpData { waker }) => waker, 195 OpStatus::Completed(_) => panic!("operation completed more than once"), 196 }; 197 drop(overlapped_ops); 198 if let Some(waker) = waker { 199 waker.wake(); 200 } else { 201 // We shouldn't ever get a completion packet for an IO operation that hasn't 202 // registered its waker. 203 warn!( 204 "got a completion packet for an IO operation that had no waker.\ 205 future may be stalled." 206 ) 207 } 208 } 209 } 210 Ok(()) 211 } 212 new_source<F: AsRawDescriptor>( &self, _ex: &Arc<RawExecutor<Self>>, f: F, ) -> AsyncResult<IoSource<F>>213 fn new_source<F: AsRawDescriptor>( 214 &self, 215 _ex: &Arc<RawExecutor<Self>>, 216 f: F, 217 ) -> AsyncResult<IoSource<F>> { 218 Ok(IoSource::Handle(super::HandleSource::new(f)?)) 219 } 220 wrap_task_handle<R>(task: RawTaskHandle<HandleReactor, R>) -> TaskHandle<R>221 fn wrap_task_handle<R>(task: RawTaskHandle<HandleReactor, R>) -> TaskHandle<R> { 222 TaskHandle::Handle(task) 223 } 224 } 225 226 /// Represents a handle that has been registered for overlapped operations with a specific executor. 227 /// From here, the OverlappedSource can register overlapped operations with the executor. 228 pub(crate) struct RegisteredOverlappedSource { 229 ex: Weak<RawExecutor<HandleReactor>>, 230 } 231 232 impl RegisteredOverlappedSource { new( rd: &dyn AsRawDescriptor, ex: &Arc<RawExecutor<HandleReactor>>, ) -> Result<RegisteredOverlappedSource>233 fn new( 234 rd: &dyn AsRawDescriptor, 235 ex: &Arc<RawExecutor<HandleReactor>>, 236 ) -> Result<RegisteredOverlappedSource> { 237 ex.reactor.register_descriptor(rd)?; 238 Ok(Self { 239 ex: Arc::downgrade(ex), 240 }) 241 } 242 243 /// Registers an overlapped IO operation with this executor. Call this function with the 244 /// overlapped struct that represents the operation **before** making the overlapped IO call. 245 /// 246 /// NOTE: you MUST pass OverlappedOperation::get_overlapped_ptr() as the overlapped IO pointer 247 /// in the IO call. register_overlapped_operation( &self, offset: Option<u64>, ) -> Result<OverlappedOperation>248 pub fn register_overlapped_operation( 249 &self, 250 offset: Option<u64>, 251 ) -> Result<OverlappedOperation> { 252 OverlappedOperation::new(offset, self.ex.clone()) 253 } 254 } 255 256 impl WeakWake for HandleReactor { wake_by_ref(weak_self: &Weak<Self>)257 fn wake_by_ref(weak_self: &Weak<Self>) { 258 if let Some(arc_self) = weak_self.upgrade() { 259 common_executor::Reactor::wake(&*arc_self); 260 } 261 } 262 } 263 264 /// Represents a pending overlapped IO operation. This must be used in the following manner or 265 /// undefined behavior will result: 266 /// 1. The reactor in use is a HandleReactor. 267 /// 2. Immediately after the IO syscall, this future MUST be awaited. We rely on the fact that 268 /// the executor cannot poll the IOCP before this future is polled for the first time to 269 /// ensure the waker has been registered. (If the executor polls the IOCP before the waker is 270 /// registered, the future will stall.) 271 pub(crate) struct OverlappedOperation { 272 overlapped: BoxedOverlapped, 273 ex: Weak<RawExecutor<HandleReactor>>, 274 completed: bool, 275 } 276 277 impl OverlappedOperation { new(offset: Option<u64>, ex: Weak<RawExecutor<HandleReactor>>) -> Result<Self>278 fn new(offset: Option<u64>, ex: Weak<RawExecutor<HandleReactor>>) -> Result<Self> { 279 let ret = Self { 280 overlapped: BoxedOverlapped(Box::new(base::create_overlapped(offset, None))), 281 ex, 282 completed: false, 283 }; 284 ret.register_op()?; 285 Ok(ret) 286 } 287 register_op(&self) -> Result<()>288 fn register_op(&self) -> Result<()> { 289 self.ex 290 .upgrade() 291 .ok_or(Error::ExecutorGone)? 292 .reactor 293 .register_overlapped_op(&self.get_token()); 294 Ok(()) 295 } 296 297 /// Returns a pointer to the overlapped struct representing the operation. This MUST be used 298 /// when making the overlapped IO call or the executor will not be able to wake the right 299 /// future. get_overlapped(&mut self) -> &mut OVERLAPPED300 pub fn get_overlapped(&mut self) -> &mut OVERLAPPED { 301 &mut self.overlapped.0 302 } 303 304 #[inline] get_token(&self) -> WakerToken305 pub fn get_token(&self) -> WakerToken { 306 WakerToken((&*self.overlapped.0) as *const _ as usize) 307 } 308 } 309 310 impl Future for OverlappedOperation { 311 type Output = Result<CompletionPacket>; 312 poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>313 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { 314 if self.completed { 315 panic!("OverlappedOperation polled after returning Poll::Ready"); 316 } 317 if let Some(ex) = self.ex.upgrade() { 318 if let Some(completion_pkt) = 319 ex.reactor.get_overlapped_op_if_ready(&self.get_token(), cx) 320 { 321 self.completed = true; 322 Poll::Ready(Ok(completion_pkt)) 323 } else { 324 Poll::Pending 325 } 326 } else { 327 Poll::Ready(Err(Error::ExecutorGone)) 328 } 329 } 330 } 331 332 impl Drop for OverlappedOperation { drop(&mut self)333 fn drop(&mut self) { 334 if !self.completed { 335 if let Some(ex) = self.ex.upgrade() { 336 ex.reactor.remove_overlapped_op(&self.get_token()); 337 } 338 } 339 } 340 } 341 342 #[cfg(test)] 343 mod test { 344 use super::*; 345 const FUT_MSG: i32 = 5; 346 use std::rc::Rc; 347 348 use futures::channel::mpsc as fut_mpsc; 349 use futures::SinkExt; 350 use futures::StreamExt; 351 352 use crate::BlockingPool; 353 use crate::ExecutorTrait; 354 355 #[test] run_future()356 fn run_future() { 357 let (send, recv) = mpsc::channel(); 358 async fn this_test(send: mpsc::Sender<i32>) { 359 send.send(FUT_MSG).unwrap(); 360 } 361 362 let ex = RawExecutor::<HandleReactor>::new().unwrap(); 363 ex.run_until(this_test(send)).unwrap(); 364 assert_eq!(recv.recv().unwrap(), FUT_MSG); 365 } 366 367 #[test] spawn_future()368 fn spawn_future() { 369 let (send, recv) = fut_mpsc::channel(1); 370 let (send_done_signal, receive_done_signal) = mpsc::channel(); 371 372 async fn message_sender(mut send: fut_mpsc::Sender<i32>) { 373 send.send(FUT_MSG).await.unwrap(); 374 } 375 376 async fn message_receiver(mut recv: fut_mpsc::Receiver<i32>, done: mpsc::Sender<bool>) { 377 assert_eq!(recv.next().await.unwrap(), FUT_MSG); 378 done.send(true).unwrap(); 379 } 380 381 let ex = RawExecutor::<HandleReactor>::new().unwrap(); 382 ex.spawn(message_sender(send)).detach(); 383 ex.run_until(message_receiver(recv, send_done_signal)) 384 .unwrap(); 385 assert_eq!(receive_done_signal.recv().unwrap(), true); 386 } 387 388 // Dropping a task that owns a BlockingPool shouldn't leak the pool. 389 #[test] drop_detached_blocking_pool()390 fn drop_detached_blocking_pool() { 391 struct Cleanup(BlockingPool); 392 393 impl Drop for Cleanup { 394 fn drop(&mut self) { 395 // Make sure we shutdown cleanly (BlockingPool::drop just prints a warning). 396 self.0 397 .shutdown(Some( 398 std::time::Instant::now() + std::time::Duration::from_secs(1), 399 )) 400 .unwrap(); 401 } 402 } 403 404 let rc = Rc::new(std::cell::Cell::new(0)); 405 { 406 let ex = RawExecutor::<HandleReactor>::new().unwrap(); 407 let rc_clone = rc.clone(); 408 ex.spawn_local(async move { 409 rc_clone.set(1); 410 let pool = Cleanup(BlockingPool::new(1, std::time::Duration::new(60, 0))); 411 let (send, recv) = std::sync::mpsc::sync_channel::<()>(0); 412 // Spawn a blocking task. 413 let blocking_task = pool.0.spawn(move || { 414 // Rendezvous. 415 assert_eq!(recv.recv(), Ok(())); 416 // Wait for drop. 417 assert_eq!(recv.recv(), Err(std::sync::mpsc::RecvError)); 418 }); 419 // Make sure it has actually started (using a "rendezvous channel" send). 420 // 421 // Without this step, we'll have a race where we can shutdown the blocking pool 422 // before the worker thread pops off the task. 423 send.send(()).unwrap(); 424 // Wait for it to finish 425 blocking_task.await; 426 rc_clone.set(2); 427 }) 428 .detach(); 429 ex.run_until(async {}).unwrap(); 430 // `ex` is dropped here. If everything is working as expected, it should drop all of 431 // its tasks, including `send` and `pool` (in that order, which is important). `pool`'s 432 // `Drop` impl will try to join all the worker threads, which should work because send 433 // half of the channel closed. 434 } 435 assert_eq!(rc.get(), 1); 436 Rc::try_unwrap(rc).expect("Rc had too many refs"); 437 } 438 } 439