xref: /aosp_15_r20/external/crosvm/cros_async/src/sys/linux/fd_executor.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2020 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::future::Future;
6 use std::io;
7 use std::mem;
8 use std::os::fd::AsRawFd;
9 use std::pin::Pin;
10 use std::sync::Arc;
11 use std::sync::Weak;
12 use std::task::Context;
13 use std::task::Poll;
14 use std::task::Waker;
15 
16 use base::add_fd_flags;
17 use base::warn;
18 use base::AsRawDescriptor;
19 use base::AsRawDescriptors;
20 use base::Event;
21 use base::EventType;
22 use base::RawDescriptor;
23 use base::WaitContext;
24 use remain::sorted;
25 use slab::Slab;
26 use sync::Mutex;
27 use thiserror::Error as ThisError;
28 
29 use crate::common_executor::RawExecutor;
30 use crate::common_executor::RawTaskHandle;
31 use crate::common_executor::Reactor;
32 use crate::waker::WakerToken;
33 use crate::AsyncResult;
34 use crate::IoSource;
35 use crate::TaskHandle;
36 
37 #[sorted]
38 #[derive(Debug, ThisError)]
39 pub enum Error {
40     #[error("Couldn't clear the wake eventfd")]
41     CantClearWakeEvent(base::Error),
42     /// Failed to clone the Event for waking the executor.
43     #[error("Failed to clone the Event for waking the executor: {0}")]
44     CloneEvent(base::Error),
45     /// Failed to create the Event for waking the executor.
46     #[error("Failed to create the Event for waking the executor: {0}")]
47     CreateEvent(base::Error),
48     /// Creating a context to wait on FDs failed.
49     #[error("An error creating the fd waiting context: {0}")]
50     CreatingContext(base::Error),
51     /// Failed to copy the FD for the polling context.
52     #[error("Failed to copy the FD for the polling context: {0}")]
53     DuplicatingFd(std::io::Error),
54     #[error("Executor failed")]
55     ExecutorError(anyhow::Error),
56     /// The Executor is gone.
57     #[error("The FDExecutor is gone")]
58     ExecutorGone,
59     /// An error occurred when setting the FD non-blocking.
60     #[error("An error occurred setting the FD non-blocking: {0}.")]
61     SettingNonBlocking(base::Error),
62     /// Failed to submit the waker to the polling context.
63     #[error("An error adding to the Aio context: {0}")]
64     SubmittingWaker(base::Error),
65     /// A Waker was canceled, but the operation isn't running.
66     #[error("Unknown waker")]
67     UnknownWaker,
68     /// WaitContext failure.
69     #[error("WaitContext failure: {0}")]
70     WaitContextError(base::Error),
71 }
72 pub type Result<T> = std::result::Result<T, Error>;
73 
74 impl From<Error> for io::Error {
from(e: Error) -> Self75     fn from(e: Error) -> Self {
76         use Error::*;
77         match e {
78             CantClearWakeEvent(e) => e.into(),
79             CloneEvent(e) => e.into(),
80             CreateEvent(e) => e.into(),
81             DuplicatingFd(e) => e,
82             ExecutorError(e) => io::Error::new(io::ErrorKind::Other, e),
83             ExecutorGone => io::Error::new(io::ErrorKind::Other, e),
84             CreatingContext(e) => e.into(),
85             SettingNonBlocking(e) => e.into(),
86             SubmittingWaker(e) => e.into(),
87             UnknownWaker => io::Error::new(io::ErrorKind::Other, e),
88             WaitContextError(e) => e.into(),
89         }
90     }
91 }
92 
93 // A poll operation that has been submitted and is potentially being waited on.
94 struct OpData {
95     file: Arc<std::os::fd::OwnedFd>,
96     waker: Option<Waker>,
97 }
98 
99 // The current status of a submitted operation.
100 enum OpStatus {
101     Pending(OpData),
102     Completed,
103     // Special status that identifies the "wake up" eventfd, which is essentially always pending.
104     WakeEvent,
105 }
106 
107 // An IO source previously registered with an EpollReactor. Used to initiate asynchronous IO with
108 // the associated executor.
109 pub struct RegisteredSource<F> {
110     pub(crate) source: F,
111     ex: Weak<RawExecutor<EpollReactor>>,
112     /// A clone of `source`'s underlying FD. Allows us to ensure that the FD isn't closed during
113     /// the epoll wait call. There are well defined sematics for closing an FD in an epoll context
114     /// so it might be possible to eliminate this dup if someone thinks hard about it.
115     pub(crate) duped_fd: Arc<std::os::fd::OwnedFd>,
116 }
117 
118 impl<F: AsRawDescriptor> RegisteredSource<F> {
new(raw: &Arc<RawExecutor<EpollReactor>>, f: F) -> Result<Self>119     pub(crate) fn new(raw: &Arc<RawExecutor<EpollReactor>>, f: F) -> Result<Self> {
120         let raw_fd = f.as_raw_descriptor();
121         assert_ne!(raw_fd, -1);
122 
123         add_fd_flags(raw_fd, libc::O_NONBLOCK).map_err(Error::SettingNonBlocking)?;
124 
125         // SAFETY: The FD is open for the duration of the BorrowedFd lifetime (this line) and not
126         // -1 (checked above).
127         let duped_fd = unsafe { std::os::fd::BorrowedFd::borrow_raw(raw_fd) }
128             .try_clone_to_owned()
129             .map_err(Error::DuplicatingFd)?;
130         Ok(RegisteredSource {
131             source: f,
132             ex: Arc::downgrade(raw),
133             duped_fd: Arc::new(duped_fd),
134         })
135     }
136 
137     // Start an asynchronous operation to wait for this source to become readable. The returned
138     // future will not be ready until the source is readable.
wait_readable(&self) -> Result<PendingOperation>139     pub fn wait_readable(&self) -> Result<PendingOperation> {
140         let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
141 
142         let token = ex
143             .reactor
144             .add_operation(Arc::clone(&self.duped_fd), EventType::Read)?;
145 
146         Ok(PendingOperation {
147             token: Some(token),
148             ex: self.ex.clone(),
149         })
150     }
151 
152     // Start an asynchronous operation to wait for this source to become writable. The returned
153     // future will not be ready until the source is writable.
wait_writable(&self) -> Result<PendingOperation>154     pub fn wait_writable(&self) -> Result<PendingOperation> {
155         let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
156 
157         let token = ex
158             .reactor
159             .add_operation(Arc::clone(&self.duped_fd), EventType::Write)?;
160 
161         Ok(PendingOperation {
162             token: Some(token),
163             ex: self.ex.clone(),
164         })
165     }
166 }
167 
168 /// A token returned from `add_operation` that can be used to cancel the waker before it completes.
169 /// Used to manage getting the result from the underlying executor for a completed operation.
170 /// Dropping a `PendingOperation` will get the result from the executor.
171 pub struct PendingOperation {
172     token: Option<WakerToken>,
173     ex: Weak<RawExecutor<EpollReactor>>,
174 }
175 
176 impl Future for PendingOperation {
177     type Output = Result<()>;
178 
poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>179     fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
180         let token = self
181             .token
182             .as_ref()
183             .expect("PendingOperation polled after returning Poll::Ready");
184         if let Some(ex) = self.ex.upgrade() {
185             if ex.reactor.is_ready(token, cx) {
186                 self.token = None;
187                 Poll::Ready(Ok(()))
188             } else {
189                 Poll::Pending
190             }
191         } else {
192             Poll::Ready(Err(Error::ExecutorGone))
193         }
194     }
195 }
196 
197 impl Drop for PendingOperation {
drop(&mut self)198     fn drop(&mut self) {
199         if let Some(token) = self.token.take() {
200             if let Some(ex) = self.ex.upgrade() {
201                 let _ = ex.reactor.cancel_operation(token);
202             }
203         }
204     }
205 }
206 
207 /// `Reactor` that manages async IO work using epoll.
208 pub struct EpollReactor {
209     poll_ctx: WaitContext<usize>,
210     ops: Mutex<Slab<OpStatus>>,
211     // This event is always present in `poll_ctx` with the special op status `WakeEvent`. It is
212     // used by `RawExecutor::wake` to break other threads out of `poll_ctx.wait()` calls (usually
213     // to notify them that `queue` has new work).
214     wake_event: Event,
215 }
216 
217 impl EpollReactor {
new() -> Result<Self>218     fn new() -> Result<Self> {
219         let reactor = EpollReactor {
220             poll_ctx: WaitContext::new().map_err(Error::CreatingContext)?,
221             ops: Mutex::new(Slab::with_capacity(64)),
222             wake_event: {
223                 let wake_event = Event::new().map_err(Error::CreateEvent)?;
224                 add_fd_flags(wake_event.as_raw_descriptor(), libc::O_NONBLOCK)
225                     .map_err(Error::SettingNonBlocking)?;
226                 wake_event
227             },
228         };
229 
230         // Add the special "wake up" op.
231         {
232             let mut ops = reactor.ops.lock();
233             let entry = ops.vacant_entry();
234             let next_token = entry.key();
235             reactor
236                 .poll_ctx
237                 .add_for_event(&reactor.wake_event, EventType::Read, next_token)
238                 .map_err(Error::SubmittingWaker)?;
239             entry.insert(OpStatus::WakeEvent);
240         }
241 
242         Ok(reactor)
243     }
244 
add_operation( &self, file: Arc<std::os::fd::OwnedFd>, event_type: EventType, ) -> Result<WakerToken>245     fn add_operation(
246         &self,
247         file: Arc<std::os::fd::OwnedFd>,
248         event_type: EventType,
249     ) -> Result<WakerToken> {
250         let mut ops = self.ops.lock();
251         let entry = ops.vacant_entry();
252         let next_token = entry.key();
253         self.poll_ctx
254             .add_for_event(&base::Descriptor(file.as_raw_fd()), event_type, next_token)
255             .map_err(Error::SubmittingWaker)?;
256         entry.insert(OpStatus::Pending(OpData { file, waker: None }));
257         Ok(WakerToken(next_token))
258     }
259 
is_ready(&self, token: &WakerToken, cx: &mut Context) -> bool260     fn is_ready(&self, token: &WakerToken, cx: &mut Context) -> bool {
261         let mut ops = self.ops.lock();
262 
263         let op = ops
264             .get_mut(token.0)
265             .expect("`is_ready` called on unknown operation");
266         match op {
267             OpStatus::Pending(data) => {
268                 data.waker = Some(cx.waker().clone());
269                 false
270             }
271             OpStatus::Completed => {
272                 ops.remove(token.0);
273                 true
274             }
275             // unreachable because we never create a WakerToken for `wake_event`.
276             OpStatus::WakeEvent => unreachable!(),
277         }
278     }
279 
280     // Remove the waker for the given token if it hasn't fired yet.
cancel_operation(&self, token: WakerToken) -> Result<()>281     fn cancel_operation(&self, token: WakerToken) -> Result<()> {
282         match self.ops.lock().remove(token.0) {
283             OpStatus::Pending(data) => self
284                 .poll_ctx
285                 .delete(&base::Descriptor(data.file.as_raw_fd()))
286                 .map_err(Error::WaitContextError),
287             OpStatus::Completed => Ok(()),
288             // unreachable because we never create a WakerToken for `wake_event`.
289             OpStatus::WakeEvent => unreachable!(),
290         }
291     }
292 }
293 
294 impl Reactor for EpollReactor {
new() -> std::io::Result<Self>295     fn new() -> std::io::Result<Self> {
296         Ok(EpollReactor::new()?)
297     }
298 
wake(&self)299     fn wake(&self) {
300         if let Err(e) = self.wake_event.signal() {
301             warn!("Failed to notify executor that a future is ready: {}", e);
302         }
303     }
304 
on_executor_drop<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>>305     fn on_executor_drop<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
306         // At this point, there are no strong references to the executor (see `on_executor_drop`
307         // docs). That means all the `RegisteredSource::ex` will fail to upgrade and so no more IO
308         // work can be submitted.
309 
310         // Wake up any futures still waiting on poll operations as they are just going to get an
311         // ExecutorGone error now.
312         for op in self.ops.lock().drain() {
313             match op {
314                 OpStatus::Pending(mut data) => {
315                     if let Some(waker) = data.waker.take() {
316                         waker.wake();
317                     }
318 
319                     if let Err(e) = self
320                         .poll_ctx
321                         .delete(&base::Descriptor(data.file.as_raw_fd()))
322                     {
323                         warn!("Failed to remove file from EpollCtx: {}", e);
324                     }
325                 }
326                 OpStatus::Completed => {}
327                 OpStatus::WakeEvent => {}
328             }
329         }
330 
331         // Now run the executor one more time to drive any remaining futures to completion.
332         Box::pin(async {})
333     }
334 
wait_for_work(&self, set_processing: impl Fn()) -> std::io::Result<()>335     fn wait_for_work(&self, set_processing: impl Fn()) -> std::io::Result<()> {
336         let events = self.poll_ctx.wait().map_err(Error::WaitContextError)?;
337 
338         // Set the state back to PROCESSING to prevent any tasks woken up by the loop below from
339         // writing to the eventfd.
340         set_processing();
341         for e in events.iter() {
342             let token = e.token;
343             let mut ops = self.ops.lock();
344 
345             // The op could have been canceled and removed by another thread so ignore it if it
346             // doesn't exist.
347             if let Some(op) = ops.get_mut(token) {
348                 let (file, waker) = match mem::replace(op, OpStatus::Completed) {
349                     OpStatus::Pending(OpData { file, waker }) => (file, waker),
350                     OpStatus::Completed => panic!("poll operation completed more than once"),
351                     OpStatus::WakeEvent => {
352                         *op = OpStatus::WakeEvent;
353                         match self.wake_event.wait() {
354                             Ok(_) => {}
355                             Err(e) if e.errno() == libc::EWOULDBLOCK => {}
356                             Err(e) => return Err(e.into()),
357                         }
358                         continue;
359                     }
360                 };
361 
362                 mem::drop(ops);
363 
364                 self.poll_ctx
365                     .delete(&base::Descriptor(file.as_raw_fd()))
366                     .map_err(Error::WaitContextError)?;
367 
368                 if let Some(waker) = waker {
369                     waker.wake();
370                 }
371             }
372         }
373         Ok(())
374     }
375 
new_source<F: AsRawDescriptor>( &self, ex: &Arc<RawExecutor<Self>>, f: F, ) -> AsyncResult<IoSource<F>>376     fn new_source<F: AsRawDescriptor>(
377         &self,
378         ex: &Arc<RawExecutor<Self>>,
379         f: F,
380     ) -> AsyncResult<IoSource<F>> {
381         Ok(IoSource::Epoll(super::PollSource::new(f, ex)?))
382     }
383 
wrap_task_handle<R>(task: RawTaskHandle<EpollReactor, R>) -> TaskHandle<R>384     fn wrap_task_handle<R>(task: RawTaskHandle<EpollReactor, R>) -> TaskHandle<R> {
385         TaskHandle::Fd(task)
386     }
387 }
388 
389 impl AsRawDescriptors for EpollReactor {
as_raw_descriptors(&self) -> Vec<RawDescriptor>390     fn as_raw_descriptors(&self) -> Vec<RawDescriptor> {
391         vec![
392             self.poll_ctx.as_raw_descriptor(),
393             self.wake_event.as_raw_descriptor(),
394         ]
395     }
396 }
397 
398 #[cfg(test)]
399 mod test {
400     use std::cell::RefCell;
401     use std::fs::File;
402     use std::io::Read;
403     use std::io::Write;
404     use std::rc::Rc;
405 
406     use futures::future::Either;
407 
408     use super::*;
409     use crate::BlockingPool;
410     use crate::ExecutorTrait;
411 
412     #[test]
test_it()413     fn test_it() {
414         async fn do_test(ex: &Arc<RawExecutor<EpollReactor>>) {
415             let (r, _w) = base::pipe().unwrap();
416             let done = Box::pin(async { 5usize });
417             let source = RegisteredSource::new(ex, r).unwrap();
418             let pending = source.wait_readable().unwrap();
419             match futures::future::select(pending, done).await {
420                 Either::Right((5, pending)) => std::mem::drop(pending),
421                 _ => panic!("unexpected select result"),
422             }
423         }
424 
425         let ex = RawExecutor::<EpollReactor>::new().unwrap();
426         ex.run_until(do_test(&ex)).unwrap();
427 
428         // Example of starting the framework and running a future:
429         async fn my_async(x: Rc<RefCell<u64>>) {
430             x.replace(4);
431         }
432 
433         let x = Rc::new(RefCell::new(0));
434         {
435             let ex = RawExecutor::<EpollReactor>::new().unwrap();
436             ex.run_until(my_async(x.clone())).unwrap();
437         }
438         assert_eq!(*x.borrow(), 4);
439     }
440 
441     #[test]
drop_before_completion()442     fn drop_before_completion() {
443         const VALUE: u64 = 0x66ae_cb65_12fb_d260;
444 
445         async fn write_value(mut tx: File) {
446             let buf = VALUE.to_ne_bytes();
447             tx.write_all(&buf[..]).expect("Failed to write to pipe");
448         }
449 
450         async fn check_op(op: PendingOperation) {
451             let err = op.await.expect_err("Task completed successfully");
452             match err {
453                 Error::ExecutorGone => {}
454                 e => panic!("Unexpected error from task: {}", e),
455             }
456         }
457 
458         let (mut rx, tx) = base::pipe().expect("Pipe failed");
459 
460         let ex = RawExecutor::<EpollReactor>::new().unwrap();
461 
462         let source = RegisteredSource::new(&ex, tx.try_clone().unwrap()).unwrap();
463         let op = source.wait_writable().unwrap();
464 
465         ex.spawn_local(write_value(tx)).detach();
466         ex.spawn_local(check_op(op)).detach();
467 
468         // Now drop the executor. It should still run until the write to the pipe is complete.
469         mem::drop(ex);
470 
471         let mut buf = 0u64.to_ne_bytes();
472         rx.read_exact(&mut buf[..])
473             .expect("Failed to read from pipe");
474 
475         assert_eq!(u64::from_ne_bytes(buf), VALUE);
476     }
477 
478     // Dropping a task that owns a BlockingPool shouldn't leak the pool.
479     #[test]
drop_detached_blocking_pool()480     fn drop_detached_blocking_pool() {
481         struct Cleanup(BlockingPool);
482 
483         impl Drop for Cleanup {
484             fn drop(&mut self) {
485                 // Make sure we shutdown cleanly (BlockingPool::drop just prints a warning).
486                 self.0
487                     .shutdown(Some(
488                         std::time::Instant::now() + std::time::Duration::from_secs(1),
489                     ))
490                     .unwrap();
491             }
492         }
493 
494         let rc = Rc::new(std::cell::Cell::new(0));
495         {
496             let ex = RawExecutor::<EpollReactor>::new().unwrap();
497             let rc_clone = rc.clone();
498             ex.spawn_local(async move {
499                 rc_clone.set(1);
500                 let pool = Cleanup(BlockingPool::new(1, std::time::Duration::new(60, 0)));
501                 let (send, recv) = std::sync::mpsc::sync_channel::<()>(0);
502                 // Spawn a blocking task.
503                 let blocking_task = pool.0.spawn(move || {
504                     // Rendezvous.
505                     assert_eq!(recv.recv(), Ok(()));
506                     // Wait for drop.
507                     assert_eq!(recv.recv(), Err(std::sync::mpsc::RecvError));
508                 });
509                 // Make sure it has actually started (using a "rendezvous channel" send).
510                 //
511                 // Without this step, we'll have a race where we can shutdown the blocking pool
512                 // before the worker thread pops off the task.
513                 send.send(()).unwrap();
514                 // Wait for it to finish
515                 blocking_task.await;
516                 rc_clone.set(2);
517             })
518             .detach();
519             ex.run_until(async {}).unwrap();
520             // `ex` is dropped here. If everything is working as expected, it should drop all of
521             // its tasks, including `send` and `pool` (in that order, which is important). `pool`'s
522             // `Drop` impl will try to join all the worker threads, which should work because send
523             // half of the channel closed.
524         }
525         assert_eq!(rc.get(), 1);
526         Rc::try_unwrap(rc).expect("Rc had too many refs");
527     }
528 
529     // Test the waker implementation. This code path doesn't get hit by `IoSource`, only by backend
530     // agnostic libraries, like `BlockingPool` and `futures::channel`.
531     #[test]
test_non_io_waker()532     fn test_non_io_waker() {
533         use std::task::Poll;
534 
535         struct Sleep(Option<u64>);
536 
537         impl Future for Sleep {
538             type Output = ();
539 
540             fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
541                 if let Some(ms) = self.0.take() {
542                     let waker = cx.waker().clone();
543                     std::thread::spawn(move || {
544                         std::thread::sleep(std::time::Duration::from_millis(ms));
545                         waker.wake();
546                     });
547                     Poll::Pending
548                 } else {
549                     Poll::Ready(())
550                 }
551             }
552         }
553 
554         let ex = RawExecutor::<EpollReactor>::new().unwrap();
555         ex.run_until(async move {
556             // Test twice because there was once a bug where the second time panic'd.
557             Sleep(Some(1)).await;
558             Sleep(Some(1)).await;
559         })
560         .unwrap();
561     }
562 }
563