1 // Copyright 2020, The Android Open Source Project 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 //! This module implements the handling of async tasks. 16 //! The worker thread has a high priority and a low priority queue. Adding a job to either 17 //! will cause one thread to be spawned if none exists. As a compromise between performance 18 //! and resource consumption, the thread will linger for about 30 seconds after it has 19 //! processed all tasks before it terminates. 20 //! Note that low priority tasks are processed only when the high priority queue is empty. 21 22 use std::{any::Any, any::TypeId, time::Duration}; 23 use std::{ 24 collections::{HashMap, VecDeque}, 25 sync::Arc, 26 sync::{Condvar, Mutex, MutexGuard}, 27 thread, 28 }; 29 30 #[cfg(test)] 31 mod tests; 32 33 #[derive(Debug, PartialEq, Eq)] 34 enum State { 35 Exiting, 36 Running, 37 } 38 39 /// The Shelf allows async tasks to store state across invocations. 40 /// Note: Store elves at your own peril ;-). 41 #[derive(Debug, Default)] 42 pub struct Shelf(HashMap<TypeId, Box<dyn Any + Send>>); 43 44 impl Shelf { 45 /// Get a reference to the shelved data of type T. Returns Some if the data exists. get_downcast_ref<T: Any + Send>(&self) -> Option<&T>46 pub fn get_downcast_ref<T: Any + Send>(&self) -> Option<&T> { 47 self.0.get(&TypeId::of::<T>()).and_then(|v| v.downcast_ref::<T>()) 48 } 49 50 /// Get a mutable reference to the shelved data of type T. If a T was inserted using put, 51 /// get_mut, or get_or_put_with. get_downcast_mut<T: Any + Send>(&mut self) -> Option<&mut T>52 pub fn get_downcast_mut<T: Any + Send>(&mut self) -> Option<&mut T> { 53 self.0.get_mut(&TypeId::of::<T>()).and_then(|v| v.downcast_mut::<T>()) 54 } 55 56 /// Remove the entry of the given type and returns the stored data if it existed. remove_downcast_ref<T: Any + Send>(&mut self) -> Option<T>57 pub fn remove_downcast_ref<T: Any + Send>(&mut self) -> Option<T> { 58 self.0.remove(&TypeId::of::<T>()).and_then(|v| v.downcast::<T>().ok().map(|b| *b)) 59 } 60 61 /// Puts data `v` on the shelf. If there already was an entry of type T it is returned. put<T: Any + Send>(&mut self, v: T) -> Option<T>62 pub fn put<T: Any + Send>(&mut self, v: T) -> Option<T> { 63 self.0 64 .insert(TypeId::of::<T>(), Box::new(v) as Box<dyn Any + Send>) 65 .and_then(|v| v.downcast::<T>().ok().map(|b| *b)) 66 } 67 68 /// Gets a mutable reference to the entry of the given type and default creates it if necessary. 69 /// The type must implement Default. get_mut<T: Any + Send + Default>(&mut self) -> &mut T70 pub fn get_mut<T: Any + Send + Default>(&mut self) -> &mut T { 71 self.0 72 .entry(TypeId::of::<T>()) 73 .or_insert_with(|| Box::<T>::default() as Box<dyn Any + Send>) 74 .downcast_mut::<T>() 75 .unwrap() 76 } 77 78 /// Gets a mutable reference to the entry of the given type or creates it using the init 79 /// function. Init is not executed if the entry already existed. get_or_put_with<T: Any + Send, F>(&mut self, init: F) -> &mut T where F: FnOnce() -> T,80 pub fn get_or_put_with<T: Any + Send, F>(&mut self, init: F) -> &mut T 81 where 82 F: FnOnce() -> T, 83 { 84 self.0 85 .entry(TypeId::of::<T>()) 86 .or_insert_with(|| Box::new(init()) as Box<dyn Any + Send>) 87 .downcast_mut::<T>() 88 .unwrap() 89 } 90 } 91 92 struct AsyncTaskState { 93 state: State, 94 thread: Option<thread::JoinHandle<()>>, 95 timeout: Duration, 96 hi_prio_req: VecDeque<Box<dyn FnOnce(&mut Shelf) + Send>>, 97 lo_prio_req: VecDeque<Box<dyn FnOnce(&mut Shelf) + Send>>, 98 idle_fns: Vec<Arc<dyn Fn(&mut Shelf) + Send + Sync>>, 99 /// The store allows tasks to store state across invocations. It is passed to each invocation 100 /// of each task. Tasks need to cooperate on the ids they use for storing state. 101 shelf: Option<Shelf>, 102 } 103 104 /// AsyncTask spawns one worker thread on demand to process jobs inserted into 105 /// a low and a high priority work queue. The queues are processed FIFO, and low 106 /// priority queue is processed if the high priority queue is empty. 107 /// Note: Because there is only one worker thread at a time for a given AsyncTask instance, 108 /// all scheduled requests are guaranteed to be serialized with respect to one another. 109 pub struct AsyncTask { 110 state: Arc<(Condvar, Mutex<AsyncTaskState>)>, 111 } 112 113 impl Default for AsyncTask { default() -> Self114 fn default() -> Self { 115 Self::new(Duration::from_secs(30)) 116 } 117 } 118 119 impl AsyncTask { 120 /// Construct an [`AsyncTask`] with a specific timeout value. new(timeout: Duration) -> Self121 pub fn new(timeout: Duration) -> Self { 122 Self { 123 state: Arc::new(( 124 Condvar::new(), 125 Mutex::new(AsyncTaskState { 126 state: State::Exiting, 127 thread: None, 128 timeout, 129 hi_prio_req: VecDeque::new(), 130 lo_prio_req: VecDeque::new(), 131 idle_fns: Vec::new(), 132 shelf: None, 133 }), 134 )), 135 } 136 } 137 138 /// Adds a one-off job to the high priority queue. High priority jobs are 139 /// completed before low priority jobs and can also overtake low priority 140 /// jobs. But they cannot preempt them. queue_hi<F>(&self, f: F) where F: for<'r> FnOnce(&'r mut Shelf) + Send + 'static,141 pub fn queue_hi<F>(&self, f: F) 142 where 143 F: for<'r> FnOnce(&'r mut Shelf) + Send + 'static, 144 { 145 self.queue(f, true) 146 } 147 148 /// Adds a one-off job to the low priority queue. Low priority jobs are 149 /// completed after high priority. And they are not executed as long as high 150 /// priority jobs are present. Jobs always run to completion and are never 151 /// preempted by high priority jobs. queue_lo<F>(&self, f: F) where F: FnOnce(&mut Shelf) + Send + 'static,152 pub fn queue_lo<F>(&self, f: F) 153 where 154 F: FnOnce(&mut Shelf) + Send + 'static, 155 { 156 self.queue(f, false) 157 } 158 159 /// Adds an idle callback. This will be invoked whenever the worker becomes 160 /// idle (all high and low priority jobs have been performed). add_idle<F>(&self, f: F) where F: Fn(&mut Shelf) + Send + Sync + 'static,161 pub fn add_idle<F>(&self, f: F) 162 where 163 F: Fn(&mut Shelf) + Send + Sync + 'static, 164 { 165 let (ref _condvar, ref state) = *self.state; 166 let mut state = state.lock().unwrap(); 167 state.idle_fns.push(Arc::new(f)); 168 } 169 queue<F>(&self, f: F, hi_prio: bool) where F: for<'r> FnOnce(&'r mut Shelf) + Send + 'static,170 fn queue<F>(&self, f: F, hi_prio: bool) 171 where 172 F: for<'r> FnOnce(&'r mut Shelf) + Send + 'static, 173 { 174 let (ref condvar, ref state) = *self.state; 175 let mut state = state.lock().unwrap(); 176 177 if hi_prio { 178 state.hi_prio_req.push_back(Box::new(f)); 179 } else { 180 state.lo_prio_req.push_back(Box::new(f)); 181 } 182 183 if state.state != State::Running { 184 self.spawn_thread(&mut state); 185 } 186 drop(state); 187 condvar.notify_all(); 188 } 189 spawn_thread(&self, state: &mut MutexGuard<AsyncTaskState>)190 fn spawn_thread(&self, state: &mut MutexGuard<AsyncTaskState>) { 191 if let Some(t) = state.thread.take() { 192 t.join().expect("AsyncTask panicked."); 193 } 194 195 let cloned_state = self.state.clone(); 196 let timeout_period = state.timeout; 197 198 state.thread = Some(thread::spawn(move || { 199 let (ref condvar, ref state) = *cloned_state; 200 201 enum Action { 202 QueuedFn(Box<dyn FnOnce(&mut Shelf) + Send>), 203 IdleFns(Vec<Arc<dyn Fn(&mut Shelf) + Send + Sync>>), 204 } 205 let mut done_idle = false; 206 207 // When the worker starts, it takes the shelf and puts it on the stack. 208 let mut shelf = state.lock().unwrap().shelf.take().unwrap_or_default(); 209 loop { 210 if let Some(action) = { 211 let state = state.lock().unwrap(); 212 if !done_idle && state.hi_prio_req.is_empty() && state.lo_prio_req.is_empty() { 213 // No jobs queued so invoke the idle callbacks. 214 Some(Action::IdleFns(state.idle_fns.clone())) 215 } else { 216 // Wait for either a queued job to arrive or a timeout. 217 let (mut state, timeout) = condvar 218 .wait_timeout_while(state, timeout_period, |state| { 219 state.hi_prio_req.is_empty() && state.lo_prio_req.is_empty() 220 }) 221 .unwrap(); 222 match ( 223 state.hi_prio_req.pop_front(), 224 state.lo_prio_req.is_empty(), 225 timeout.timed_out(), 226 ) { 227 (Some(f), _, _) => Some(Action::QueuedFn(f)), 228 (None, false, _) => { 229 state.lo_prio_req.pop_front().map(|f| Action::QueuedFn(f)) 230 } 231 (None, true, true) => { 232 // When the worker exits it puts the shelf back into the shared 233 // state for the next worker to use. So state is preserved not 234 // only across invocations but also across worker thread shut down. 235 state.shelf = Some(shelf); 236 state.state = State::Exiting; 237 break; 238 } 239 (None, true, false) => None, 240 } 241 } 242 } { 243 // Now that the lock has been dropped, perform the action. 244 match action { 245 Action::QueuedFn(f) => { 246 f(&mut shelf); 247 done_idle = false; 248 } 249 Action::IdleFns(idle_fns) => { 250 for idle_fn in idle_fns { 251 idle_fn(&mut shelf); 252 } 253 done_idle = true; 254 } 255 } 256 } 257 } 258 })); 259 state.state = State::Running; 260 } 261 } 262