1*e1997b9aSAndroid Build Coastguard Worker // Copyright 2020, The Android Open Source Project 2*e1997b9aSAndroid Build Coastguard Worker // 3*e1997b9aSAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License"); 4*e1997b9aSAndroid Build Coastguard Worker // you may not use this file except in compliance with the License. 5*e1997b9aSAndroid Build Coastguard Worker // You may obtain a copy of the License at 6*e1997b9aSAndroid Build Coastguard Worker // 7*e1997b9aSAndroid Build Coastguard Worker // http://www.apache.org/licenses/LICENSE-2.0 8*e1997b9aSAndroid Build Coastguard Worker // 9*e1997b9aSAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software 10*e1997b9aSAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS, 11*e1997b9aSAndroid Build Coastguard Worker // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12*e1997b9aSAndroid Build Coastguard Worker // See the License for the specific language governing permissions and 13*e1997b9aSAndroid Build Coastguard Worker // limitations under the License. 14*e1997b9aSAndroid Build Coastguard Worker 15*e1997b9aSAndroid Build Coastguard Worker //! This module implements the handling of async tasks. 16*e1997b9aSAndroid Build Coastguard Worker //! The worker thread has a high priority and a low priority queue. Adding a job to either 17*e1997b9aSAndroid Build Coastguard Worker //! will cause one thread to be spawned if none exists. As a compromise between performance 18*e1997b9aSAndroid Build Coastguard Worker //! and resource consumption, the thread will linger for about 30 seconds after it has 19*e1997b9aSAndroid Build Coastguard Worker //! processed all tasks before it terminates. 20*e1997b9aSAndroid Build Coastguard Worker //! Note that low priority tasks are processed only when the high priority queue is empty. 21*e1997b9aSAndroid Build Coastguard Worker 22*e1997b9aSAndroid Build Coastguard Worker use std::{any::Any, any::TypeId, time::Duration}; 23*e1997b9aSAndroid Build Coastguard Worker use std::{ 24*e1997b9aSAndroid Build Coastguard Worker collections::{HashMap, VecDeque}, 25*e1997b9aSAndroid Build Coastguard Worker sync::Arc, 26*e1997b9aSAndroid Build Coastguard Worker sync::{Condvar, Mutex, MutexGuard}, 27*e1997b9aSAndroid Build Coastguard Worker thread, 28*e1997b9aSAndroid Build Coastguard Worker }; 29*e1997b9aSAndroid Build Coastguard Worker 30*e1997b9aSAndroid Build Coastguard Worker #[cfg(test)] 31*e1997b9aSAndroid Build Coastguard Worker mod tests; 32*e1997b9aSAndroid Build Coastguard Worker 33*e1997b9aSAndroid Build Coastguard Worker #[derive(Debug, PartialEq, Eq)] 34*e1997b9aSAndroid Build Coastguard Worker enum State { 35*e1997b9aSAndroid Build Coastguard Worker Exiting, 36*e1997b9aSAndroid Build Coastguard Worker Running, 37*e1997b9aSAndroid Build Coastguard Worker } 38*e1997b9aSAndroid Build Coastguard Worker 39*e1997b9aSAndroid Build Coastguard Worker /// The Shelf allows async tasks to store state across invocations. 40*e1997b9aSAndroid Build Coastguard Worker /// Note: Store elves at your own peril ;-). 41*e1997b9aSAndroid Build Coastguard Worker #[derive(Debug, Default)] 42*e1997b9aSAndroid Build Coastguard Worker pub struct Shelf(HashMap<TypeId, Box<dyn Any + Send>>); 43*e1997b9aSAndroid Build Coastguard Worker 44*e1997b9aSAndroid Build Coastguard Worker impl Shelf { 45*e1997b9aSAndroid Build Coastguard Worker /// Get a reference to the shelved data of type T. Returns Some if the data exists. get_downcast_ref<T: Any + Send>(&self) -> Option<&T>46*e1997b9aSAndroid Build Coastguard Worker pub fn get_downcast_ref<T: Any + Send>(&self) -> Option<&T> { 47*e1997b9aSAndroid Build Coastguard Worker self.0.get(&TypeId::of::<T>()).and_then(|v| v.downcast_ref::<T>()) 48*e1997b9aSAndroid Build Coastguard Worker } 49*e1997b9aSAndroid Build Coastguard Worker 50*e1997b9aSAndroid Build Coastguard Worker /// Get a mutable reference to the shelved data of type T. If a T was inserted using put, 51*e1997b9aSAndroid Build Coastguard Worker /// get_mut, or get_or_put_with. get_downcast_mut<T: Any + Send>(&mut self) -> Option<&mut T>52*e1997b9aSAndroid Build Coastguard Worker pub fn get_downcast_mut<T: Any + Send>(&mut self) -> Option<&mut T> { 53*e1997b9aSAndroid Build Coastguard Worker self.0.get_mut(&TypeId::of::<T>()).and_then(|v| v.downcast_mut::<T>()) 54*e1997b9aSAndroid Build Coastguard Worker } 55*e1997b9aSAndroid Build Coastguard Worker 56*e1997b9aSAndroid Build Coastguard Worker /// Remove the entry of the given type and returns the stored data if it existed. remove_downcast_ref<T: Any + Send>(&mut self) -> Option<T>57*e1997b9aSAndroid Build Coastguard Worker pub fn remove_downcast_ref<T: Any + Send>(&mut self) -> Option<T> { 58*e1997b9aSAndroid Build Coastguard Worker self.0.remove(&TypeId::of::<T>()).and_then(|v| v.downcast::<T>().ok().map(|b| *b)) 59*e1997b9aSAndroid Build Coastguard Worker } 60*e1997b9aSAndroid Build Coastguard Worker 61*e1997b9aSAndroid Build Coastguard Worker /// Puts data `v` on the shelf. If there already was an entry of type T it is returned. put<T: Any + Send>(&mut self, v: T) -> Option<T>62*e1997b9aSAndroid Build Coastguard Worker pub fn put<T: Any + Send>(&mut self, v: T) -> Option<T> { 63*e1997b9aSAndroid Build Coastguard Worker self.0 64*e1997b9aSAndroid Build Coastguard Worker .insert(TypeId::of::<T>(), Box::new(v) as Box<dyn Any + Send>) 65*e1997b9aSAndroid Build Coastguard Worker .and_then(|v| v.downcast::<T>().ok().map(|b| *b)) 66*e1997b9aSAndroid Build Coastguard Worker } 67*e1997b9aSAndroid Build Coastguard Worker 68*e1997b9aSAndroid Build Coastguard Worker /// Gets a mutable reference to the entry of the given type and default creates it if necessary. 69*e1997b9aSAndroid Build Coastguard Worker /// The type must implement Default. get_mut<T: Any + Send + Default>(&mut self) -> &mut T70*e1997b9aSAndroid Build Coastguard Worker pub fn get_mut<T: Any + Send + Default>(&mut self) -> &mut T { 71*e1997b9aSAndroid Build Coastguard Worker self.0 72*e1997b9aSAndroid Build Coastguard Worker .entry(TypeId::of::<T>()) 73*e1997b9aSAndroid Build Coastguard Worker .or_insert_with(|| Box::<T>::default() as Box<dyn Any + Send>) 74*e1997b9aSAndroid Build Coastguard Worker .downcast_mut::<T>() 75*e1997b9aSAndroid Build Coastguard Worker .unwrap() 76*e1997b9aSAndroid Build Coastguard Worker } 77*e1997b9aSAndroid Build Coastguard Worker 78*e1997b9aSAndroid Build Coastguard Worker /// Gets a mutable reference to the entry of the given type or creates it using the init 79*e1997b9aSAndroid Build Coastguard Worker /// function. Init is not executed if the entry already existed. get_or_put_with<T: Any + Send, F>(&mut self, init: F) -> &mut T where F: FnOnce() -> T,80*e1997b9aSAndroid Build Coastguard Worker pub fn get_or_put_with<T: Any + Send, F>(&mut self, init: F) -> &mut T 81*e1997b9aSAndroid Build Coastguard Worker where 82*e1997b9aSAndroid Build Coastguard Worker F: FnOnce() -> T, 83*e1997b9aSAndroid Build Coastguard Worker { 84*e1997b9aSAndroid Build Coastguard Worker self.0 85*e1997b9aSAndroid Build Coastguard Worker .entry(TypeId::of::<T>()) 86*e1997b9aSAndroid Build Coastguard Worker .or_insert_with(|| Box::new(init()) as Box<dyn Any + Send>) 87*e1997b9aSAndroid Build Coastguard Worker .downcast_mut::<T>() 88*e1997b9aSAndroid Build Coastguard Worker .unwrap() 89*e1997b9aSAndroid Build Coastguard Worker } 90*e1997b9aSAndroid Build Coastguard Worker } 91*e1997b9aSAndroid Build Coastguard Worker 92*e1997b9aSAndroid Build Coastguard Worker struct AsyncTaskState { 93*e1997b9aSAndroid Build Coastguard Worker state: State, 94*e1997b9aSAndroid Build Coastguard Worker thread: Option<thread::JoinHandle<()>>, 95*e1997b9aSAndroid Build Coastguard Worker timeout: Duration, 96*e1997b9aSAndroid Build Coastguard Worker hi_prio_req: VecDeque<Box<dyn FnOnce(&mut Shelf) + Send>>, 97*e1997b9aSAndroid Build Coastguard Worker lo_prio_req: VecDeque<Box<dyn FnOnce(&mut Shelf) + Send>>, 98*e1997b9aSAndroid Build Coastguard Worker idle_fns: Vec<Arc<dyn Fn(&mut Shelf) + Send + Sync>>, 99*e1997b9aSAndroid Build Coastguard Worker /// The store allows tasks to store state across invocations. It is passed to each invocation 100*e1997b9aSAndroid Build Coastguard Worker /// of each task. Tasks need to cooperate on the ids they use for storing state. 101*e1997b9aSAndroid Build Coastguard Worker shelf: Option<Shelf>, 102*e1997b9aSAndroid Build Coastguard Worker } 103*e1997b9aSAndroid Build Coastguard Worker 104*e1997b9aSAndroid Build Coastguard Worker /// AsyncTask spawns one worker thread on demand to process jobs inserted into 105*e1997b9aSAndroid Build Coastguard Worker /// a low and a high priority work queue. The queues are processed FIFO, and low 106*e1997b9aSAndroid Build Coastguard Worker /// priority queue is processed if the high priority queue is empty. 107*e1997b9aSAndroid Build Coastguard Worker /// Note: Because there is only one worker thread at a time for a given AsyncTask instance, 108*e1997b9aSAndroid Build Coastguard Worker /// all scheduled requests are guaranteed to be serialized with respect to one another. 109*e1997b9aSAndroid Build Coastguard Worker pub struct AsyncTask { 110*e1997b9aSAndroid Build Coastguard Worker state: Arc<(Condvar, Mutex<AsyncTaskState>)>, 111*e1997b9aSAndroid Build Coastguard Worker } 112*e1997b9aSAndroid Build Coastguard Worker 113*e1997b9aSAndroid Build Coastguard Worker impl Default for AsyncTask { default() -> Self114*e1997b9aSAndroid Build Coastguard Worker fn default() -> Self { 115*e1997b9aSAndroid Build Coastguard Worker Self::new(Duration::from_secs(30)) 116*e1997b9aSAndroid Build Coastguard Worker } 117*e1997b9aSAndroid Build Coastguard Worker } 118*e1997b9aSAndroid Build Coastguard Worker 119*e1997b9aSAndroid Build Coastguard Worker impl AsyncTask { 120*e1997b9aSAndroid Build Coastguard Worker /// Construct an [`AsyncTask`] with a specific timeout value. new(timeout: Duration) -> Self121*e1997b9aSAndroid Build Coastguard Worker pub fn new(timeout: Duration) -> Self { 122*e1997b9aSAndroid Build Coastguard Worker Self { 123*e1997b9aSAndroid Build Coastguard Worker state: Arc::new(( 124*e1997b9aSAndroid Build Coastguard Worker Condvar::new(), 125*e1997b9aSAndroid Build Coastguard Worker Mutex::new(AsyncTaskState { 126*e1997b9aSAndroid Build Coastguard Worker state: State::Exiting, 127*e1997b9aSAndroid Build Coastguard Worker thread: None, 128*e1997b9aSAndroid Build Coastguard Worker timeout, 129*e1997b9aSAndroid Build Coastguard Worker hi_prio_req: VecDeque::new(), 130*e1997b9aSAndroid Build Coastguard Worker lo_prio_req: VecDeque::new(), 131*e1997b9aSAndroid Build Coastguard Worker idle_fns: Vec::new(), 132*e1997b9aSAndroid Build Coastguard Worker shelf: None, 133*e1997b9aSAndroid Build Coastguard Worker }), 134*e1997b9aSAndroid Build Coastguard Worker )), 135*e1997b9aSAndroid Build Coastguard Worker } 136*e1997b9aSAndroid Build Coastguard Worker } 137*e1997b9aSAndroid Build Coastguard Worker 138*e1997b9aSAndroid Build Coastguard Worker /// Adds a one-off job to the high priority queue. High priority jobs are 139*e1997b9aSAndroid Build Coastguard Worker /// completed before low priority jobs and can also overtake low priority 140*e1997b9aSAndroid Build Coastguard Worker /// jobs. But they cannot preempt them. queue_hi<F>(&self, f: F) where F: for<'r> FnOnce(&'r mut Shelf) + Send + 'static,141*e1997b9aSAndroid Build Coastguard Worker pub fn queue_hi<F>(&self, f: F) 142*e1997b9aSAndroid Build Coastguard Worker where 143*e1997b9aSAndroid Build Coastguard Worker F: for<'r> FnOnce(&'r mut Shelf) + Send + 'static, 144*e1997b9aSAndroid Build Coastguard Worker { 145*e1997b9aSAndroid Build Coastguard Worker self.queue(f, true) 146*e1997b9aSAndroid Build Coastguard Worker } 147*e1997b9aSAndroid Build Coastguard Worker 148*e1997b9aSAndroid Build Coastguard Worker /// Adds a one-off job to the low priority queue. Low priority jobs are 149*e1997b9aSAndroid Build Coastguard Worker /// completed after high priority. And they are not executed as long as high 150*e1997b9aSAndroid Build Coastguard Worker /// priority jobs are present. Jobs always run to completion and are never 151*e1997b9aSAndroid Build Coastguard Worker /// preempted by high priority jobs. queue_lo<F>(&self, f: F) where F: FnOnce(&mut Shelf) + Send + 'static,152*e1997b9aSAndroid Build Coastguard Worker pub fn queue_lo<F>(&self, f: F) 153*e1997b9aSAndroid Build Coastguard Worker where 154*e1997b9aSAndroid Build Coastguard Worker F: FnOnce(&mut Shelf) + Send + 'static, 155*e1997b9aSAndroid Build Coastguard Worker { 156*e1997b9aSAndroid Build Coastguard Worker self.queue(f, false) 157*e1997b9aSAndroid Build Coastguard Worker } 158*e1997b9aSAndroid Build Coastguard Worker 159*e1997b9aSAndroid Build Coastguard Worker /// Adds an idle callback. This will be invoked whenever the worker becomes 160*e1997b9aSAndroid Build Coastguard Worker /// idle (all high and low priority jobs have been performed). add_idle<F>(&self, f: F) where F: Fn(&mut Shelf) + Send + Sync + 'static,161*e1997b9aSAndroid Build Coastguard Worker pub fn add_idle<F>(&self, f: F) 162*e1997b9aSAndroid Build Coastguard Worker where 163*e1997b9aSAndroid Build Coastguard Worker F: Fn(&mut Shelf) + Send + Sync + 'static, 164*e1997b9aSAndroid Build Coastguard Worker { 165*e1997b9aSAndroid Build Coastguard Worker let (ref _condvar, ref state) = *self.state; 166*e1997b9aSAndroid Build Coastguard Worker let mut state = state.lock().unwrap(); 167*e1997b9aSAndroid Build Coastguard Worker state.idle_fns.push(Arc::new(f)); 168*e1997b9aSAndroid Build Coastguard Worker } 169*e1997b9aSAndroid Build Coastguard Worker queue<F>(&self, f: F, hi_prio: bool) where F: for<'r> FnOnce(&'r mut Shelf) + Send + 'static,170*e1997b9aSAndroid Build Coastguard Worker fn queue<F>(&self, f: F, hi_prio: bool) 171*e1997b9aSAndroid Build Coastguard Worker where 172*e1997b9aSAndroid Build Coastguard Worker F: for<'r> FnOnce(&'r mut Shelf) + Send + 'static, 173*e1997b9aSAndroid Build Coastguard Worker { 174*e1997b9aSAndroid Build Coastguard Worker let (ref condvar, ref state) = *self.state; 175*e1997b9aSAndroid Build Coastguard Worker let mut state = state.lock().unwrap(); 176*e1997b9aSAndroid Build Coastguard Worker 177*e1997b9aSAndroid Build Coastguard Worker if hi_prio { 178*e1997b9aSAndroid Build Coastguard Worker state.hi_prio_req.push_back(Box::new(f)); 179*e1997b9aSAndroid Build Coastguard Worker } else { 180*e1997b9aSAndroid Build Coastguard Worker state.lo_prio_req.push_back(Box::new(f)); 181*e1997b9aSAndroid Build Coastguard Worker } 182*e1997b9aSAndroid Build Coastguard Worker 183*e1997b9aSAndroid Build Coastguard Worker if state.state != State::Running { 184*e1997b9aSAndroid Build Coastguard Worker self.spawn_thread(&mut state); 185*e1997b9aSAndroid Build Coastguard Worker } 186*e1997b9aSAndroid Build Coastguard Worker drop(state); 187*e1997b9aSAndroid Build Coastguard Worker condvar.notify_all(); 188*e1997b9aSAndroid Build Coastguard Worker } 189*e1997b9aSAndroid Build Coastguard Worker spawn_thread(&self, state: &mut MutexGuard<AsyncTaskState>)190*e1997b9aSAndroid Build Coastguard Worker fn spawn_thread(&self, state: &mut MutexGuard<AsyncTaskState>) { 191*e1997b9aSAndroid Build Coastguard Worker if let Some(t) = state.thread.take() { 192*e1997b9aSAndroid Build Coastguard Worker t.join().expect("AsyncTask panicked."); 193*e1997b9aSAndroid Build Coastguard Worker } 194*e1997b9aSAndroid Build Coastguard Worker 195*e1997b9aSAndroid Build Coastguard Worker let cloned_state = self.state.clone(); 196*e1997b9aSAndroid Build Coastguard Worker let timeout_period = state.timeout; 197*e1997b9aSAndroid Build Coastguard Worker 198*e1997b9aSAndroid Build Coastguard Worker state.thread = Some(thread::spawn(move || { 199*e1997b9aSAndroid Build Coastguard Worker let (ref condvar, ref state) = *cloned_state; 200*e1997b9aSAndroid Build Coastguard Worker 201*e1997b9aSAndroid Build Coastguard Worker enum Action { 202*e1997b9aSAndroid Build Coastguard Worker QueuedFn(Box<dyn FnOnce(&mut Shelf) + Send>), 203*e1997b9aSAndroid Build Coastguard Worker IdleFns(Vec<Arc<dyn Fn(&mut Shelf) + Send + Sync>>), 204*e1997b9aSAndroid Build Coastguard Worker } 205*e1997b9aSAndroid Build Coastguard Worker let mut done_idle = false; 206*e1997b9aSAndroid Build Coastguard Worker 207*e1997b9aSAndroid Build Coastguard Worker // When the worker starts, it takes the shelf and puts it on the stack. 208*e1997b9aSAndroid Build Coastguard Worker let mut shelf = state.lock().unwrap().shelf.take().unwrap_or_default(); 209*e1997b9aSAndroid Build Coastguard Worker loop { 210*e1997b9aSAndroid Build Coastguard Worker if let Some(action) = { 211*e1997b9aSAndroid Build Coastguard Worker let state = state.lock().unwrap(); 212*e1997b9aSAndroid Build Coastguard Worker if !done_idle && state.hi_prio_req.is_empty() && state.lo_prio_req.is_empty() { 213*e1997b9aSAndroid Build Coastguard Worker // No jobs queued so invoke the idle callbacks. 214*e1997b9aSAndroid Build Coastguard Worker Some(Action::IdleFns(state.idle_fns.clone())) 215*e1997b9aSAndroid Build Coastguard Worker } else { 216*e1997b9aSAndroid Build Coastguard Worker // Wait for either a queued job to arrive or a timeout. 217*e1997b9aSAndroid Build Coastguard Worker let (mut state, timeout) = condvar 218*e1997b9aSAndroid Build Coastguard Worker .wait_timeout_while(state, timeout_period, |state| { 219*e1997b9aSAndroid Build Coastguard Worker state.hi_prio_req.is_empty() && state.lo_prio_req.is_empty() 220*e1997b9aSAndroid Build Coastguard Worker }) 221*e1997b9aSAndroid Build Coastguard Worker .unwrap(); 222*e1997b9aSAndroid Build Coastguard Worker match ( 223*e1997b9aSAndroid Build Coastguard Worker state.hi_prio_req.pop_front(), 224*e1997b9aSAndroid Build Coastguard Worker state.lo_prio_req.is_empty(), 225*e1997b9aSAndroid Build Coastguard Worker timeout.timed_out(), 226*e1997b9aSAndroid Build Coastguard Worker ) { 227*e1997b9aSAndroid Build Coastguard Worker (Some(f), _, _) => Some(Action::QueuedFn(f)), 228*e1997b9aSAndroid Build Coastguard Worker (None, false, _) => { 229*e1997b9aSAndroid Build Coastguard Worker state.lo_prio_req.pop_front().map(|f| Action::QueuedFn(f)) 230*e1997b9aSAndroid Build Coastguard Worker } 231*e1997b9aSAndroid Build Coastguard Worker (None, true, true) => { 232*e1997b9aSAndroid Build Coastguard Worker // When the worker exits it puts the shelf back into the shared 233*e1997b9aSAndroid Build Coastguard Worker // state for the next worker to use. So state is preserved not 234*e1997b9aSAndroid Build Coastguard Worker // only across invocations but also across worker thread shut down. 235*e1997b9aSAndroid Build Coastguard Worker state.shelf = Some(shelf); 236*e1997b9aSAndroid Build Coastguard Worker state.state = State::Exiting; 237*e1997b9aSAndroid Build Coastguard Worker break; 238*e1997b9aSAndroid Build Coastguard Worker } 239*e1997b9aSAndroid Build Coastguard Worker (None, true, false) => None, 240*e1997b9aSAndroid Build Coastguard Worker } 241*e1997b9aSAndroid Build Coastguard Worker } 242*e1997b9aSAndroid Build Coastguard Worker } { 243*e1997b9aSAndroid Build Coastguard Worker // Now that the lock has been dropped, perform the action. 244*e1997b9aSAndroid Build Coastguard Worker match action { 245*e1997b9aSAndroid Build Coastguard Worker Action::QueuedFn(f) => { 246*e1997b9aSAndroid Build Coastguard Worker f(&mut shelf); 247*e1997b9aSAndroid Build Coastguard Worker done_idle = false; 248*e1997b9aSAndroid Build Coastguard Worker } 249*e1997b9aSAndroid Build Coastguard Worker Action::IdleFns(idle_fns) => { 250*e1997b9aSAndroid Build Coastguard Worker for idle_fn in idle_fns { 251*e1997b9aSAndroid Build Coastguard Worker idle_fn(&mut shelf); 252*e1997b9aSAndroid Build Coastguard Worker } 253*e1997b9aSAndroid Build Coastguard Worker done_idle = true; 254*e1997b9aSAndroid Build Coastguard Worker } 255*e1997b9aSAndroid Build Coastguard Worker } 256*e1997b9aSAndroid Build Coastguard Worker } 257*e1997b9aSAndroid Build Coastguard Worker } 258*e1997b9aSAndroid Build Coastguard Worker })); 259*e1997b9aSAndroid Build Coastguard Worker state.state = State::Running; 260*e1997b9aSAndroid Build Coastguard Worker } 261*e1997b9aSAndroid Build Coastguard Worker } 262