1 // Copyright 2020 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 use std::future::Future; 6 use std::io; 7 use std::mem; 8 use std::os::fd::AsRawFd; 9 use std::pin::Pin; 10 use std::sync::Arc; 11 use std::sync::Weak; 12 use std::task::Context; 13 use std::task::Poll; 14 use std::task::Waker; 15 16 use base::add_fd_flags; 17 use base::warn; 18 use base::AsRawDescriptor; 19 use base::AsRawDescriptors; 20 use base::Event; 21 use base::EventType; 22 use base::RawDescriptor; 23 use base::WaitContext; 24 use remain::sorted; 25 use slab::Slab; 26 use sync::Mutex; 27 use thiserror::Error as ThisError; 28 29 use crate::common_executor::RawExecutor; 30 use crate::common_executor::RawTaskHandle; 31 use crate::common_executor::Reactor; 32 use crate::waker::WakerToken; 33 use crate::AsyncResult; 34 use crate::IoSource; 35 use crate::TaskHandle; 36 37 #[sorted] 38 #[derive(Debug, ThisError)] 39 pub enum Error { 40 #[error("Couldn't clear the wake eventfd")] 41 CantClearWakeEvent(base::Error), 42 /// Failed to clone the Event for waking the executor. 43 #[error("Failed to clone the Event for waking the executor: {0}")] 44 CloneEvent(base::Error), 45 /// Failed to create the Event for waking the executor. 46 #[error("Failed to create the Event for waking the executor: {0}")] 47 CreateEvent(base::Error), 48 /// Creating a context to wait on FDs failed. 49 #[error("An error creating the fd waiting context: {0}")] 50 CreatingContext(base::Error), 51 /// Failed to copy the FD for the polling context. 52 #[error("Failed to copy the FD for the polling context: {0}")] 53 DuplicatingFd(std::io::Error), 54 #[error("Executor failed")] 55 ExecutorError(anyhow::Error), 56 /// The Executor is gone. 57 #[error("The FDExecutor is gone")] 58 ExecutorGone, 59 /// An error occurred when setting the FD non-blocking. 60 #[error("An error occurred setting the FD non-blocking: {0}.")] 61 SettingNonBlocking(base::Error), 62 /// Failed to submit the waker to the polling context. 63 #[error("An error adding to the Aio context: {0}")] 64 SubmittingWaker(base::Error), 65 /// A Waker was canceled, but the operation isn't running. 66 #[error("Unknown waker")] 67 UnknownWaker, 68 /// WaitContext failure. 69 #[error("WaitContext failure: {0}")] 70 WaitContextError(base::Error), 71 } 72 pub type Result<T> = std::result::Result<T, Error>; 73 74 impl From<Error> for io::Error { from(e: Error) -> Self75 fn from(e: Error) -> Self { 76 use Error::*; 77 match e { 78 CantClearWakeEvent(e) => e.into(), 79 CloneEvent(e) => e.into(), 80 CreateEvent(e) => e.into(), 81 DuplicatingFd(e) => e, 82 ExecutorError(e) => io::Error::new(io::ErrorKind::Other, e), 83 ExecutorGone => io::Error::new(io::ErrorKind::Other, e), 84 CreatingContext(e) => e.into(), 85 SettingNonBlocking(e) => e.into(), 86 SubmittingWaker(e) => e.into(), 87 UnknownWaker => io::Error::new(io::ErrorKind::Other, e), 88 WaitContextError(e) => e.into(), 89 } 90 } 91 } 92 93 // A poll operation that has been submitted and is potentially being waited on. 94 struct OpData { 95 file: Arc<std::os::fd::OwnedFd>, 96 waker: Option<Waker>, 97 } 98 99 // The current status of a submitted operation. 100 enum OpStatus { 101 Pending(OpData), 102 Completed, 103 // Special status that identifies the "wake up" eventfd, which is essentially always pending. 104 WakeEvent, 105 } 106 107 // An IO source previously registered with an EpollReactor. Used to initiate asynchronous IO with 108 // the associated executor. 109 pub struct RegisteredSource<F> { 110 pub(crate) source: F, 111 ex: Weak<RawExecutor<EpollReactor>>, 112 /// A clone of `source`'s underlying FD. Allows us to ensure that the FD isn't closed during 113 /// the epoll wait call. There are well defined sematics for closing an FD in an epoll context 114 /// so it might be possible to eliminate this dup if someone thinks hard about it. 115 pub(crate) duped_fd: Arc<std::os::fd::OwnedFd>, 116 } 117 118 impl<F: AsRawDescriptor> RegisteredSource<F> { new(raw: &Arc<RawExecutor<EpollReactor>>, f: F) -> Result<Self>119 pub(crate) fn new(raw: &Arc<RawExecutor<EpollReactor>>, f: F) -> Result<Self> { 120 let raw_fd = f.as_raw_descriptor(); 121 assert_ne!(raw_fd, -1); 122 123 add_fd_flags(raw_fd, libc::O_NONBLOCK).map_err(Error::SettingNonBlocking)?; 124 125 // SAFETY: The FD is open for the duration of the BorrowedFd lifetime (this line) and not 126 // -1 (checked above). 127 let duped_fd = unsafe { std::os::fd::BorrowedFd::borrow_raw(raw_fd) } 128 .try_clone_to_owned() 129 .map_err(Error::DuplicatingFd)?; 130 Ok(RegisteredSource { 131 source: f, 132 ex: Arc::downgrade(raw), 133 duped_fd: Arc::new(duped_fd), 134 }) 135 } 136 137 // Start an asynchronous operation to wait for this source to become readable. The returned 138 // future will not be ready until the source is readable. wait_readable(&self) -> Result<PendingOperation>139 pub fn wait_readable(&self) -> Result<PendingOperation> { 140 let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?; 141 142 let token = ex 143 .reactor 144 .add_operation(Arc::clone(&self.duped_fd), EventType::Read)?; 145 146 Ok(PendingOperation { 147 token: Some(token), 148 ex: self.ex.clone(), 149 }) 150 } 151 152 // Start an asynchronous operation to wait for this source to become writable. The returned 153 // future will not be ready until the source is writable. wait_writable(&self) -> Result<PendingOperation>154 pub fn wait_writable(&self) -> Result<PendingOperation> { 155 let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?; 156 157 let token = ex 158 .reactor 159 .add_operation(Arc::clone(&self.duped_fd), EventType::Write)?; 160 161 Ok(PendingOperation { 162 token: Some(token), 163 ex: self.ex.clone(), 164 }) 165 } 166 } 167 168 /// A token returned from `add_operation` that can be used to cancel the waker before it completes. 169 /// Used to manage getting the result from the underlying executor for a completed operation. 170 /// Dropping a `PendingOperation` will get the result from the executor. 171 pub struct PendingOperation { 172 token: Option<WakerToken>, 173 ex: Weak<RawExecutor<EpollReactor>>, 174 } 175 176 impl Future for PendingOperation { 177 type Output = Result<()>; 178 poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>179 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { 180 let token = self 181 .token 182 .as_ref() 183 .expect("PendingOperation polled after returning Poll::Ready"); 184 if let Some(ex) = self.ex.upgrade() { 185 if ex.reactor.is_ready(token, cx) { 186 self.token = None; 187 Poll::Ready(Ok(())) 188 } else { 189 Poll::Pending 190 } 191 } else { 192 Poll::Ready(Err(Error::ExecutorGone)) 193 } 194 } 195 } 196 197 impl Drop for PendingOperation { drop(&mut self)198 fn drop(&mut self) { 199 if let Some(token) = self.token.take() { 200 if let Some(ex) = self.ex.upgrade() { 201 let _ = ex.reactor.cancel_operation(token); 202 } 203 } 204 } 205 } 206 207 /// `Reactor` that manages async IO work using epoll. 208 pub struct EpollReactor { 209 poll_ctx: WaitContext<usize>, 210 ops: Mutex<Slab<OpStatus>>, 211 // This event is always present in `poll_ctx` with the special op status `WakeEvent`. It is 212 // used by `RawExecutor::wake` to break other threads out of `poll_ctx.wait()` calls (usually 213 // to notify them that `queue` has new work). 214 wake_event: Event, 215 } 216 217 impl EpollReactor { new() -> Result<Self>218 fn new() -> Result<Self> { 219 let reactor = EpollReactor { 220 poll_ctx: WaitContext::new().map_err(Error::CreatingContext)?, 221 ops: Mutex::new(Slab::with_capacity(64)), 222 wake_event: { 223 let wake_event = Event::new().map_err(Error::CreateEvent)?; 224 add_fd_flags(wake_event.as_raw_descriptor(), libc::O_NONBLOCK) 225 .map_err(Error::SettingNonBlocking)?; 226 wake_event 227 }, 228 }; 229 230 // Add the special "wake up" op. 231 { 232 let mut ops = reactor.ops.lock(); 233 let entry = ops.vacant_entry(); 234 let next_token = entry.key(); 235 reactor 236 .poll_ctx 237 .add_for_event(&reactor.wake_event, EventType::Read, next_token) 238 .map_err(Error::SubmittingWaker)?; 239 entry.insert(OpStatus::WakeEvent); 240 } 241 242 Ok(reactor) 243 } 244 add_operation( &self, file: Arc<std::os::fd::OwnedFd>, event_type: EventType, ) -> Result<WakerToken>245 fn add_operation( 246 &self, 247 file: Arc<std::os::fd::OwnedFd>, 248 event_type: EventType, 249 ) -> Result<WakerToken> { 250 let mut ops = self.ops.lock(); 251 let entry = ops.vacant_entry(); 252 let next_token = entry.key(); 253 self.poll_ctx 254 .add_for_event(&base::Descriptor(file.as_raw_fd()), event_type, next_token) 255 .map_err(Error::SubmittingWaker)?; 256 entry.insert(OpStatus::Pending(OpData { file, waker: None })); 257 Ok(WakerToken(next_token)) 258 } 259 is_ready(&self, token: &WakerToken, cx: &mut Context) -> bool260 fn is_ready(&self, token: &WakerToken, cx: &mut Context) -> bool { 261 let mut ops = self.ops.lock(); 262 263 let op = ops 264 .get_mut(token.0) 265 .expect("`is_ready` called on unknown operation"); 266 match op { 267 OpStatus::Pending(data) => { 268 data.waker = Some(cx.waker().clone()); 269 false 270 } 271 OpStatus::Completed => { 272 ops.remove(token.0); 273 true 274 } 275 // unreachable because we never create a WakerToken for `wake_event`. 276 OpStatus::WakeEvent => unreachable!(), 277 } 278 } 279 280 // Remove the waker for the given token if it hasn't fired yet. cancel_operation(&self, token: WakerToken) -> Result<()>281 fn cancel_operation(&self, token: WakerToken) -> Result<()> { 282 match self.ops.lock().remove(token.0) { 283 OpStatus::Pending(data) => self 284 .poll_ctx 285 .delete(&base::Descriptor(data.file.as_raw_fd())) 286 .map_err(Error::WaitContextError), 287 OpStatus::Completed => Ok(()), 288 // unreachable because we never create a WakerToken for `wake_event`. 289 OpStatus::WakeEvent => unreachable!(), 290 } 291 } 292 } 293 294 impl Reactor for EpollReactor { new() -> std::io::Result<Self>295 fn new() -> std::io::Result<Self> { 296 Ok(EpollReactor::new()?) 297 } 298 wake(&self)299 fn wake(&self) { 300 if let Err(e) = self.wake_event.signal() { 301 warn!("Failed to notify executor that a future is ready: {}", e); 302 } 303 } 304 on_executor_drop<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>>305 fn on_executor_drop<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>> { 306 // At this point, there are no strong references to the executor (see `on_executor_drop` 307 // docs). That means all the `RegisteredSource::ex` will fail to upgrade and so no more IO 308 // work can be submitted. 309 310 // Wake up any futures still waiting on poll operations as they are just going to get an 311 // ExecutorGone error now. 312 for op in self.ops.lock().drain() { 313 match op { 314 OpStatus::Pending(mut data) => { 315 if let Some(waker) = data.waker.take() { 316 waker.wake(); 317 } 318 319 if let Err(e) = self 320 .poll_ctx 321 .delete(&base::Descriptor(data.file.as_raw_fd())) 322 { 323 warn!("Failed to remove file from EpollCtx: {}", e); 324 } 325 } 326 OpStatus::Completed => {} 327 OpStatus::WakeEvent => {} 328 } 329 } 330 331 // Now run the executor one more time to drive any remaining futures to completion. 332 Box::pin(async {}) 333 } 334 wait_for_work(&self, set_processing: impl Fn()) -> std::io::Result<()>335 fn wait_for_work(&self, set_processing: impl Fn()) -> std::io::Result<()> { 336 let events = self.poll_ctx.wait().map_err(Error::WaitContextError)?; 337 338 // Set the state back to PROCESSING to prevent any tasks woken up by the loop below from 339 // writing to the eventfd. 340 set_processing(); 341 for e in events.iter() { 342 let token = e.token; 343 let mut ops = self.ops.lock(); 344 345 // The op could have been canceled and removed by another thread so ignore it if it 346 // doesn't exist. 347 if let Some(op) = ops.get_mut(token) { 348 let (file, waker) = match mem::replace(op, OpStatus::Completed) { 349 OpStatus::Pending(OpData { file, waker }) => (file, waker), 350 OpStatus::Completed => panic!("poll operation completed more than once"), 351 OpStatus::WakeEvent => { 352 *op = OpStatus::WakeEvent; 353 match self.wake_event.wait() { 354 Ok(_) => {} 355 Err(e) if e.errno() == libc::EWOULDBLOCK => {} 356 Err(e) => return Err(e.into()), 357 } 358 continue; 359 } 360 }; 361 362 mem::drop(ops); 363 364 self.poll_ctx 365 .delete(&base::Descriptor(file.as_raw_fd())) 366 .map_err(Error::WaitContextError)?; 367 368 if let Some(waker) = waker { 369 waker.wake(); 370 } 371 } 372 } 373 Ok(()) 374 } 375 new_source<F: AsRawDescriptor>( &self, ex: &Arc<RawExecutor<Self>>, f: F, ) -> AsyncResult<IoSource<F>>376 fn new_source<F: AsRawDescriptor>( 377 &self, 378 ex: &Arc<RawExecutor<Self>>, 379 f: F, 380 ) -> AsyncResult<IoSource<F>> { 381 Ok(IoSource::Epoll(super::PollSource::new(f, ex)?)) 382 } 383 wrap_task_handle<R>(task: RawTaskHandle<EpollReactor, R>) -> TaskHandle<R>384 fn wrap_task_handle<R>(task: RawTaskHandle<EpollReactor, R>) -> TaskHandle<R> { 385 TaskHandle::Fd(task) 386 } 387 } 388 389 impl AsRawDescriptors for EpollReactor { as_raw_descriptors(&self) -> Vec<RawDescriptor>390 fn as_raw_descriptors(&self) -> Vec<RawDescriptor> { 391 vec![ 392 self.poll_ctx.as_raw_descriptor(), 393 self.wake_event.as_raw_descriptor(), 394 ] 395 } 396 } 397 398 #[cfg(test)] 399 mod test { 400 use std::cell::RefCell; 401 use std::fs::File; 402 use std::io::Read; 403 use std::io::Write; 404 use std::rc::Rc; 405 406 use futures::future::Either; 407 408 use super::*; 409 use crate::BlockingPool; 410 use crate::ExecutorTrait; 411 412 #[test] test_it()413 fn test_it() { 414 async fn do_test(ex: &Arc<RawExecutor<EpollReactor>>) { 415 let (r, _w) = base::pipe().unwrap(); 416 let done = Box::pin(async { 5usize }); 417 let source = RegisteredSource::new(ex, r).unwrap(); 418 let pending = source.wait_readable().unwrap(); 419 match futures::future::select(pending, done).await { 420 Either::Right((5, pending)) => std::mem::drop(pending), 421 _ => panic!("unexpected select result"), 422 } 423 } 424 425 let ex = RawExecutor::<EpollReactor>::new().unwrap(); 426 ex.run_until(do_test(&ex)).unwrap(); 427 428 // Example of starting the framework and running a future: 429 async fn my_async(x: Rc<RefCell<u64>>) { 430 x.replace(4); 431 } 432 433 let x = Rc::new(RefCell::new(0)); 434 { 435 let ex = RawExecutor::<EpollReactor>::new().unwrap(); 436 ex.run_until(my_async(x.clone())).unwrap(); 437 } 438 assert_eq!(*x.borrow(), 4); 439 } 440 441 #[test] drop_before_completion()442 fn drop_before_completion() { 443 const VALUE: u64 = 0x66ae_cb65_12fb_d260; 444 445 async fn write_value(mut tx: File) { 446 let buf = VALUE.to_ne_bytes(); 447 tx.write_all(&buf[..]).expect("Failed to write to pipe"); 448 } 449 450 async fn check_op(op: PendingOperation) { 451 let err = op.await.expect_err("Task completed successfully"); 452 match err { 453 Error::ExecutorGone => {} 454 e => panic!("Unexpected error from task: {}", e), 455 } 456 } 457 458 let (mut rx, tx) = base::pipe().expect("Pipe failed"); 459 460 let ex = RawExecutor::<EpollReactor>::new().unwrap(); 461 462 let source = RegisteredSource::new(&ex, tx.try_clone().unwrap()).unwrap(); 463 let op = source.wait_writable().unwrap(); 464 465 ex.spawn_local(write_value(tx)).detach(); 466 ex.spawn_local(check_op(op)).detach(); 467 468 // Now drop the executor. It should still run until the write to the pipe is complete. 469 mem::drop(ex); 470 471 let mut buf = 0u64.to_ne_bytes(); 472 rx.read_exact(&mut buf[..]) 473 .expect("Failed to read from pipe"); 474 475 assert_eq!(u64::from_ne_bytes(buf), VALUE); 476 } 477 478 // Dropping a task that owns a BlockingPool shouldn't leak the pool. 479 #[test] drop_detached_blocking_pool()480 fn drop_detached_blocking_pool() { 481 struct Cleanup(BlockingPool); 482 483 impl Drop for Cleanup { 484 fn drop(&mut self) { 485 // Make sure we shutdown cleanly (BlockingPool::drop just prints a warning). 486 self.0 487 .shutdown(Some( 488 std::time::Instant::now() + std::time::Duration::from_secs(1), 489 )) 490 .unwrap(); 491 } 492 } 493 494 let rc = Rc::new(std::cell::Cell::new(0)); 495 { 496 let ex = RawExecutor::<EpollReactor>::new().unwrap(); 497 let rc_clone = rc.clone(); 498 ex.spawn_local(async move { 499 rc_clone.set(1); 500 let pool = Cleanup(BlockingPool::new(1, std::time::Duration::new(60, 0))); 501 let (send, recv) = std::sync::mpsc::sync_channel::<()>(0); 502 // Spawn a blocking task. 503 let blocking_task = pool.0.spawn(move || { 504 // Rendezvous. 505 assert_eq!(recv.recv(), Ok(())); 506 // Wait for drop. 507 assert_eq!(recv.recv(), Err(std::sync::mpsc::RecvError)); 508 }); 509 // Make sure it has actually started (using a "rendezvous channel" send). 510 // 511 // Without this step, we'll have a race where we can shutdown the blocking pool 512 // before the worker thread pops off the task. 513 send.send(()).unwrap(); 514 // Wait for it to finish 515 blocking_task.await; 516 rc_clone.set(2); 517 }) 518 .detach(); 519 ex.run_until(async {}).unwrap(); 520 // `ex` is dropped here. If everything is working as expected, it should drop all of 521 // its tasks, including `send` and `pool` (in that order, which is important). `pool`'s 522 // `Drop` impl will try to join all the worker threads, which should work because send 523 // half of the channel closed. 524 } 525 assert_eq!(rc.get(), 1); 526 Rc::try_unwrap(rc).expect("Rc had too many refs"); 527 } 528 529 // Test the waker implementation. This code path doesn't get hit by `IoSource`, only by backend 530 // agnostic libraries, like `BlockingPool` and `futures::channel`. 531 #[test] test_non_io_waker()532 fn test_non_io_waker() { 533 use std::task::Poll; 534 535 struct Sleep(Option<u64>); 536 537 impl Future for Sleep { 538 type Output = (); 539 540 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 541 if let Some(ms) = self.0.take() { 542 let waker = cx.waker().clone(); 543 std::thread::spawn(move || { 544 std::thread::sleep(std::time::Duration::from_millis(ms)); 545 waker.wake(); 546 }); 547 Poll::Pending 548 } else { 549 Poll::Ready(()) 550 } 551 } 552 } 553 554 let ex = RawExecutor::<EpollReactor>::new().unwrap(); 555 ex.run_until(async move { 556 // Test twice because there was once a bug where the second time panic'd. 557 Sleep(Some(1)).await; 558 Sleep(Some(1)).await; 559 }) 560 .unwrap(); 561 } 562 } 563