xref: /aosp_15_r20/system/security/keystore2/src/async_task.rs (revision e1997b9af69e3155ead6e072d106a0077849ffba)
1 // Copyright 2020, The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 //! This module implements the handling of async tasks.
16 //! The worker thread has a high priority and a low priority queue. Adding a job to either
17 //! will cause one thread to be spawned if none exists. As a compromise between performance
18 //! and resource consumption, the thread will linger for about 30 seconds after it has
19 //! processed all tasks before it terminates.
20 //! Note that low priority tasks are processed only when the high priority queue is empty.
21 
22 use std::{any::Any, any::TypeId, time::Duration};
23 use std::{
24     collections::{HashMap, VecDeque},
25     sync::Arc,
26     sync::{Condvar, Mutex, MutexGuard},
27     thread,
28 };
29 
30 #[cfg(test)]
31 mod tests;
32 
33 #[derive(Debug, PartialEq, Eq)]
34 enum State {
35     Exiting,
36     Running,
37 }
38 
39 /// The Shelf allows async tasks to store state across invocations.
40 /// Note: Store elves at your own peril ;-).
41 #[derive(Debug, Default)]
42 pub struct Shelf(HashMap<TypeId, Box<dyn Any + Send>>);
43 
44 impl Shelf {
45     /// Get a reference to the shelved data of type T. Returns Some if the data exists.
get_downcast_ref<T: Any + Send>(&self) -> Option<&T>46     pub fn get_downcast_ref<T: Any + Send>(&self) -> Option<&T> {
47         self.0.get(&TypeId::of::<T>()).and_then(|v| v.downcast_ref::<T>())
48     }
49 
50     /// Get a mutable reference to the shelved data of type T. If a T was inserted using put,
51     /// get_mut, or get_or_put_with.
get_downcast_mut<T: Any + Send>(&mut self) -> Option<&mut T>52     pub fn get_downcast_mut<T: Any + Send>(&mut self) -> Option<&mut T> {
53         self.0.get_mut(&TypeId::of::<T>()).and_then(|v| v.downcast_mut::<T>())
54     }
55 
56     /// Remove the entry of the given type and returns the stored data if it existed.
remove_downcast_ref<T: Any + Send>(&mut self) -> Option<T>57     pub fn remove_downcast_ref<T: Any + Send>(&mut self) -> Option<T> {
58         self.0.remove(&TypeId::of::<T>()).and_then(|v| v.downcast::<T>().ok().map(|b| *b))
59     }
60 
61     /// Puts data `v` on the shelf. If there already was an entry of type T it is returned.
put<T: Any + Send>(&mut self, v: T) -> Option<T>62     pub fn put<T: Any + Send>(&mut self, v: T) -> Option<T> {
63         self.0
64             .insert(TypeId::of::<T>(), Box::new(v) as Box<dyn Any + Send>)
65             .and_then(|v| v.downcast::<T>().ok().map(|b| *b))
66     }
67 
68     /// Gets a mutable reference to the entry of the given type and default creates it if necessary.
69     /// The type must implement Default.
get_mut<T: Any + Send + Default>(&mut self) -> &mut T70     pub fn get_mut<T: Any + Send + Default>(&mut self) -> &mut T {
71         self.0
72             .entry(TypeId::of::<T>())
73             .or_insert_with(|| Box::<T>::default() as Box<dyn Any + Send>)
74             .downcast_mut::<T>()
75             .unwrap()
76     }
77 
78     /// Gets a mutable reference to the entry of the given type or creates it using the init
79     /// function. Init is not executed if the entry already existed.
get_or_put_with<T: Any + Send, F>(&mut self, init: F) -> &mut T where F: FnOnce() -> T,80     pub fn get_or_put_with<T: Any + Send, F>(&mut self, init: F) -> &mut T
81     where
82         F: FnOnce() -> T,
83     {
84         self.0
85             .entry(TypeId::of::<T>())
86             .or_insert_with(|| Box::new(init()) as Box<dyn Any + Send>)
87             .downcast_mut::<T>()
88             .unwrap()
89     }
90 }
91 
92 struct AsyncTaskState {
93     state: State,
94     thread: Option<thread::JoinHandle<()>>,
95     timeout: Duration,
96     hi_prio_req: VecDeque<Box<dyn FnOnce(&mut Shelf) + Send>>,
97     lo_prio_req: VecDeque<Box<dyn FnOnce(&mut Shelf) + Send>>,
98     idle_fns: Vec<Arc<dyn Fn(&mut Shelf) + Send + Sync>>,
99     /// The store allows tasks to store state across invocations. It is passed to each invocation
100     /// of each task. Tasks need to cooperate on the ids they use for storing state.
101     shelf: Option<Shelf>,
102 }
103 
104 /// AsyncTask spawns one worker thread on demand to process jobs inserted into
105 /// a low and a high priority work queue. The queues are processed FIFO, and low
106 /// priority queue is processed if the high priority queue is empty.
107 /// Note: Because there is only one worker thread at a time for a given AsyncTask instance,
108 /// all scheduled requests are guaranteed to be serialized with respect to one another.
109 pub struct AsyncTask {
110     state: Arc<(Condvar, Mutex<AsyncTaskState>)>,
111 }
112 
113 impl Default for AsyncTask {
default() -> Self114     fn default() -> Self {
115         Self::new(Duration::from_secs(30))
116     }
117 }
118 
119 impl AsyncTask {
120     /// Construct an [`AsyncTask`] with a specific timeout value.
new(timeout: Duration) -> Self121     pub fn new(timeout: Duration) -> Self {
122         Self {
123             state: Arc::new((
124                 Condvar::new(),
125                 Mutex::new(AsyncTaskState {
126                     state: State::Exiting,
127                     thread: None,
128                     timeout,
129                     hi_prio_req: VecDeque::new(),
130                     lo_prio_req: VecDeque::new(),
131                     idle_fns: Vec::new(),
132                     shelf: None,
133                 }),
134             )),
135         }
136     }
137 
138     /// Adds a one-off job to the high priority queue. High priority jobs are
139     /// completed before low priority jobs and can also overtake low priority
140     /// jobs. But they cannot preempt them.
queue_hi<F>(&self, f: F) where F: for<'r> FnOnce(&'r mut Shelf) + Send + 'static,141     pub fn queue_hi<F>(&self, f: F)
142     where
143         F: for<'r> FnOnce(&'r mut Shelf) + Send + 'static,
144     {
145         self.queue(f, true)
146     }
147 
148     /// Adds a one-off job to the low priority queue. Low priority jobs are
149     /// completed after high priority. And they are not executed as long as high
150     /// priority jobs are present. Jobs always run to completion and are never
151     /// preempted by high priority jobs.
queue_lo<F>(&self, f: F) where F: FnOnce(&mut Shelf) + Send + 'static,152     pub fn queue_lo<F>(&self, f: F)
153     where
154         F: FnOnce(&mut Shelf) + Send + 'static,
155     {
156         self.queue(f, false)
157     }
158 
159     /// Adds an idle callback. This will be invoked whenever the worker becomes
160     /// idle (all high and low priority jobs have been performed).
add_idle<F>(&self, f: F) where F: Fn(&mut Shelf) + Send + Sync + 'static,161     pub fn add_idle<F>(&self, f: F)
162     where
163         F: Fn(&mut Shelf) + Send + Sync + 'static,
164     {
165         let (ref _condvar, ref state) = *self.state;
166         let mut state = state.lock().unwrap();
167         state.idle_fns.push(Arc::new(f));
168     }
169 
queue<F>(&self, f: F, hi_prio: bool) where F: for<'r> FnOnce(&'r mut Shelf) + Send + 'static,170     fn queue<F>(&self, f: F, hi_prio: bool)
171     where
172         F: for<'r> FnOnce(&'r mut Shelf) + Send + 'static,
173     {
174         let (ref condvar, ref state) = *self.state;
175         let mut state = state.lock().unwrap();
176 
177         if hi_prio {
178             state.hi_prio_req.push_back(Box::new(f));
179         } else {
180             state.lo_prio_req.push_back(Box::new(f));
181         }
182 
183         if state.state != State::Running {
184             self.spawn_thread(&mut state);
185         }
186         drop(state);
187         condvar.notify_all();
188     }
189 
spawn_thread(&self, state: &mut MutexGuard<AsyncTaskState>)190     fn spawn_thread(&self, state: &mut MutexGuard<AsyncTaskState>) {
191         if let Some(t) = state.thread.take() {
192             t.join().expect("AsyncTask panicked.");
193         }
194 
195         let cloned_state = self.state.clone();
196         let timeout_period = state.timeout;
197 
198         state.thread = Some(thread::spawn(move || {
199             let (ref condvar, ref state) = *cloned_state;
200 
201             enum Action {
202                 QueuedFn(Box<dyn FnOnce(&mut Shelf) + Send>),
203                 IdleFns(Vec<Arc<dyn Fn(&mut Shelf) + Send + Sync>>),
204             }
205             let mut done_idle = false;
206 
207             // When the worker starts, it takes the shelf and puts it on the stack.
208             let mut shelf = state.lock().unwrap().shelf.take().unwrap_or_default();
209             loop {
210                 if let Some(action) = {
211                     let state = state.lock().unwrap();
212                     if !done_idle && state.hi_prio_req.is_empty() && state.lo_prio_req.is_empty() {
213                         // No jobs queued so invoke the idle callbacks.
214                         Some(Action::IdleFns(state.idle_fns.clone()))
215                     } else {
216                         // Wait for either a queued job to arrive or a timeout.
217                         let (mut state, timeout) = condvar
218                             .wait_timeout_while(state, timeout_period, |state| {
219                                 state.hi_prio_req.is_empty() && state.lo_prio_req.is_empty()
220                             })
221                             .unwrap();
222                         match (
223                             state.hi_prio_req.pop_front(),
224                             state.lo_prio_req.is_empty(),
225                             timeout.timed_out(),
226                         ) {
227                             (Some(f), _, _) => Some(Action::QueuedFn(f)),
228                             (None, false, _) => {
229                                 state.lo_prio_req.pop_front().map(|f| Action::QueuedFn(f))
230                             }
231                             (None, true, true) => {
232                                 // When the worker exits it puts the shelf back into the shared
233                                 // state for the next worker to use. So state is preserved not
234                                 // only across invocations but also across worker thread shut down.
235                                 state.shelf = Some(shelf);
236                                 state.state = State::Exiting;
237                                 break;
238                             }
239                             (None, true, false) => None,
240                         }
241                     }
242                 } {
243                     // Now that the lock has been dropped, perform the action.
244                     match action {
245                         Action::QueuedFn(f) => {
246                             f(&mut shelf);
247                             done_idle = false;
248                         }
249                         Action::IdleFns(idle_fns) => {
250                             for idle_fn in idle_fns {
251                                 idle_fn(&mut shelf);
252                             }
253                             done_idle = true;
254                         }
255                     }
256                 }
257             }
258         }));
259         state.state = State::Running;
260     }
261 }
262