xref: /aosp_15_r20/external/crosvm/swap/src/worker.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1*bb4ee6a4SAndroid Build Coastguard Worker // Copyright 2022 The ChromiumOS Authors
2*bb4ee6a4SAndroid Build Coastguard Worker // Use of this source code is governed by a BSD-style license that can be
3*bb4ee6a4SAndroid Build Coastguard Worker // found in the LICENSE file.
4*bb4ee6a4SAndroid Build Coastguard Worker 
5*bb4ee6a4SAndroid Build Coastguard Worker //! Multi-thread worker.
6*bb4ee6a4SAndroid Build Coastguard Worker 
7*bb4ee6a4SAndroid Build Coastguard Worker #![deny(missing_docs)]
8*bb4ee6a4SAndroid Build Coastguard Worker 
9*bb4ee6a4SAndroid Build Coastguard Worker use std::collections::VecDeque;
10*bb4ee6a4SAndroid Build Coastguard Worker use std::sync::atomic::AtomicBool;
11*bb4ee6a4SAndroid Build Coastguard Worker use std::sync::atomic::Ordering;
12*bb4ee6a4SAndroid Build Coastguard Worker use std::sync::Arc;
13*bb4ee6a4SAndroid Build Coastguard Worker use std::thread;
14*bb4ee6a4SAndroid Build Coastguard Worker use std::time::Duration;
15*bb4ee6a4SAndroid Build Coastguard Worker 
16*bb4ee6a4SAndroid Build Coastguard Worker use anyhow::Context;
17*bb4ee6a4SAndroid Build Coastguard Worker use base::error;
18*bb4ee6a4SAndroid Build Coastguard Worker use base::Event;
19*bb4ee6a4SAndroid Build Coastguard Worker use base::EventWaitResult;
20*bb4ee6a4SAndroid Build Coastguard Worker use sync::Condvar;
21*bb4ee6a4SAndroid Build Coastguard Worker use sync::Mutex;
22*bb4ee6a4SAndroid Build Coastguard Worker 
23*bb4ee6a4SAndroid Build Coastguard Worker /// Task to run on the worker threads.
24*bb4ee6a4SAndroid Build Coastguard Worker pub trait Task {
25*bb4ee6a4SAndroid Build Coastguard Worker     /// Executes the task.
execute(self)26*bb4ee6a4SAndroid Build Coastguard Worker     fn execute(self);
27*bb4ee6a4SAndroid Build Coastguard Worker }
28*bb4ee6a4SAndroid Build Coastguard Worker 
29*bb4ee6a4SAndroid Build Coastguard Worker /// Multi thread based worker executing a single type [Task].
30*bb4ee6a4SAndroid Build Coastguard Worker ///
31*bb4ee6a4SAndroid Build Coastguard Worker /// See the doc of [Channel] as well for the behaviors of it.
32*bb4ee6a4SAndroid Build Coastguard Worker pub struct Worker<T> {
33*bb4ee6a4SAndroid Build Coastguard Worker     /// Shared [Channel] with the worker threads.
34*bb4ee6a4SAndroid Build Coastguard Worker     pub channel: Arc<Channel<T>>,
35*bb4ee6a4SAndroid Build Coastguard Worker     handles: Vec<thread::JoinHandle<()>>,
36*bb4ee6a4SAndroid Build Coastguard Worker }
37*bb4ee6a4SAndroid Build Coastguard Worker 
38*bb4ee6a4SAndroid Build Coastguard Worker impl<T: Task + Send + 'static> Worker<T> {
39*bb4ee6a4SAndroid Build Coastguard Worker     /// Spawns the numbers of worker threads.
new(len_channel: usize, n_workers: usize) -> Self40*bb4ee6a4SAndroid Build Coastguard Worker     pub fn new(len_channel: usize, n_workers: usize) -> Self {
41*bb4ee6a4SAndroid Build Coastguard Worker         let channel = Arc::new(Channel::<T>::new(len_channel, n_workers));
42*bb4ee6a4SAndroid Build Coastguard Worker         let mut handles = Vec::with_capacity(n_workers);
43*bb4ee6a4SAndroid Build Coastguard Worker         for _ in 0..n_workers {
44*bb4ee6a4SAndroid Build Coastguard Worker             let context = channel.clone();
45*bb4ee6a4SAndroid Build Coastguard Worker             let handle = thread::spawn(move || {
46*bb4ee6a4SAndroid Build Coastguard Worker                 Self::worker_thread(context);
47*bb4ee6a4SAndroid Build Coastguard Worker             });
48*bb4ee6a4SAndroid Build Coastguard Worker             handles.push(handle);
49*bb4ee6a4SAndroid Build Coastguard Worker         }
50*bb4ee6a4SAndroid Build Coastguard Worker         Self { channel, handles }
51*bb4ee6a4SAndroid Build Coastguard Worker     }
52*bb4ee6a4SAndroid Build Coastguard Worker 
worker_thread(context: Arc<Channel<T>>)53*bb4ee6a4SAndroid Build Coastguard Worker     fn worker_thread(context: Arc<Channel<T>>) {
54*bb4ee6a4SAndroid Build Coastguard Worker         while let Some(task) = context.pop() {
55*bb4ee6a4SAndroid Build Coastguard Worker             task.execute();
56*bb4ee6a4SAndroid Build Coastguard Worker         }
57*bb4ee6a4SAndroid Build Coastguard Worker     }
58*bb4ee6a4SAndroid Build Coastguard Worker 
59*bb4ee6a4SAndroid Build Coastguard Worker     /// Closes the channel and wait for worker threads shutdown.
60*bb4ee6a4SAndroid Build Coastguard Worker     ///
61*bb4ee6a4SAndroid Build Coastguard Worker     /// This also waits for all the tasks in the channel to be executed.
close(self)62*bb4ee6a4SAndroid Build Coastguard Worker     pub fn close(self) {
63*bb4ee6a4SAndroid Build Coastguard Worker         self.channel.close();
64*bb4ee6a4SAndroid Build Coastguard Worker         for handle in self.handles {
65*bb4ee6a4SAndroid Build Coastguard Worker             match handle.join() {
66*bb4ee6a4SAndroid Build Coastguard Worker                 Ok(()) => {}
67*bb4ee6a4SAndroid Build Coastguard Worker                 Err(e) => {
68*bb4ee6a4SAndroid Build Coastguard Worker                     error!("failed to wait for worker thread: {:?}", e);
69*bb4ee6a4SAndroid Build Coastguard Worker                 }
70*bb4ee6a4SAndroid Build Coastguard Worker             }
71*bb4ee6a4SAndroid Build Coastguard Worker         }
72*bb4ee6a4SAndroid Build Coastguard Worker     }
73*bb4ee6a4SAndroid Build Coastguard Worker }
74*bb4ee6a4SAndroid Build Coastguard Worker 
75*bb4ee6a4SAndroid Build Coastguard Worker /// MPMC (Multi Producers Multi Consumers) queue integrated with [Worker].
76*bb4ee6a4SAndroid Build Coastguard Worker ///
77*bb4ee6a4SAndroid Build Coastguard Worker /// [Channel] offers [Channel::wait_complete()] to guarantee all the tasks are executed.
78*bb4ee6a4SAndroid Build Coastguard Worker ///
79*bb4ee6a4SAndroid Build Coastguard Worker /// This only exposes methods for producers.
80*bb4ee6a4SAndroid Build Coastguard Worker pub struct Channel<T> {
81*bb4ee6a4SAndroid Build Coastguard Worker     state: Mutex<ChannelState<T>>,
82*bb4ee6a4SAndroid Build Coastguard Worker     consumer_wait: Condvar,
83*bb4ee6a4SAndroid Build Coastguard Worker     producer_wait: Condvar,
84*bb4ee6a4SAndroid Build Coastguard Worker     n_consumers: usize,
85*bb4ee6a4SAndroid Build Coastguard Worker }
86*bb4ee6a4SAndroid Build Coastguard Worker 
87*bb4ee6a4SAndroid Build Coastguard Worker impl<T> Channel<T> {
new(len: usize, n_consumers: usize) -> Self88*bb4ee6a4SAndroid Build Coastguard Worker     fn new(len: usize, n_consumers: usize) -> Self {
89*bb4ee6a4SAndroid Build Coastguard Worker         Self {
90*bb4ee6a4SAndroid Build Coastguard Worker             state: Mutex::new(ChannelState::new(len)),
91*bb4ee6a4SAndroid Build Coastguard Worker             consumer_wait: Condvar::new(),
92*bb4ee6a4SAndroid Build Coastguard Worker             producer_wait: Condvar::new(),
93*bb4ee6a4SAndroid Build Coastguard Worker             n_consumers,
94*bb4ee6a4SAndroid Build Coastguard Worker         }
95*bb4ee6a4SAndroid Build Coastguard Worker     }
96*bb4ee6a4SAndroid Build Coastguard Worker 
close(&self)97*bb4ee6a4SAndroid Build Coastguard Worker     fn close(&self) {
98*bb4ee6a4SAndroid Build Coastguard Worker         let mut state = self.state.lock();
99*bb4ee6a4SAndroid Build Coastguard Worker         state.is_closed = true;
100*bb4ee6a4SAndroid Build Coastguard Worker         self.consumer_wait.notify_all();
101*bb4ee6a4SAndroid Build Coastguard Worker         self.producer_wait.notify_all();
102*bb4ee6a4SAndroid Build Coastguard Worker     }
103*bb4ee6a4SAndroid Build Coastguard Worker 
104*bb4ee6a4SAndroid Build Coastguard Worker     /// Pops a task from the channel.
105*bb4ee6a4SAndroid Build Coastguard Worker     ///
106*bb4ee6a4SAndroid Build Coastguard Worker     /// If the queue is closed and also **empty**, this returns [None]. This returns all the tasks
107*bb4ee6a4SAndroid Build Coastguard Worker     /// in the queue even while this is closed.
108*bb4ee6a4SAndroid Build Coastguard Worker     #[inline]
pop(&self) -> Option<T>109*bb4ee6a4SAndroid Build Coastguard Worker     fn pop(&self) -> Option<T> {
110*bb4ee6a4SAndroid Build Coastguard Worker         let mut state = self.state.lock();
111*bb4ee6a4SAndroid Build Coastguard Worker         loop {
112*bb4ee6a4SAndroid Build Coastguard Worker             let was_full = state.queue.len() == state.capacity;
113*bb4ee6a4SAndroid Build Coastguard Worker             if let Some(item) = state.queue.pop_front() {
114*bb4ee6a4SAndroid Build Coastguard Worker                 if was_full {
115*bb4ee6a4SAndroid Build Coastguard Worker                     // notification for a producer waiting for `push()`.
116*bb4ee6a4SAndroid Build Coastguard Worker                     self.producer_wait.notify_one();
117*bb4ee6a4SAndroid Build Coastguard Worker                 }
118*bb4ee6a4SAndroid Build Coastguard Worker                 return Some(item);
119*bb4ee6a4SAndroid Build Coastguard Worker             } else {
120*bb4ee6a4SAndroid Build Coastguard Worker                 if state.is_closed {
121*bb4ee6a4SAndroid Build Coastguard Worker                     return None;
122*bb4ee6a4SAndroid Build Coastguard Worker                 }
123*bb4ee6a4SAndroid Build Coastguard Worker                 state.n_waiting += 1;
124*bb4ee6a4SAndroid Build Coastguard Worker                 if state.n_waiting == self.n_consumers {
125*bb4ee6a4SAndroid Build Coastguard Worker                     // notification for producers waiting for `wait_complete()`.
126*bb4ee6a4SAndroid Build Coastguard Worker                     self.producer_wait.notify_all();
127*bb4ee6a4SAndroid Build Coastguard Worker                 }
128*bb4ee6a4SAndroid Build Coastguard Worker                 state = self.consumer_wait.wait(state);
129*bb4ee6a4SAndroid Build Coastguard Worker                 state.n_waiting -= 1;
130*bb4ee6a4SAndroid Build Coastguard Worker             }
131*bb4ee6a4SAndroid Build Coastguard Worker         }
132*bb4ee6a4SAndroid Build Coastguard Worker     }
133*bb4ee6a4SAndroid Build Coastguard Worker 
134*bb4ee6a4SAndroid Build Coastguard Worker     /// Push a task.
135*bb4ee6a4SAndroid Build Coastguard Worker     ///
136*bb4ee6a4SAndroid Build Coastguard Worker     /// This blocks if the channel is full.
137*bb4ee6a4SAndroid Build Coastguard Worker     ///
138*bb4ee6a4SAndroid Build Coastguard Worker     /// If the channel is closed, this returns `false`.
push(&self, item: T) -> bool139*bb4ee6a4SAndroid Build Coastguard Worker     pub fn push(&self, item: T) -> bool {
140*bb4ee6a4SAndroid Build Coastguard Worker         let mut state = self.state.lock();
141*bb4ee6a4SAndroid Build Coastguard Worker         // Wait until the queue has room to push a task.
142*bb4ee6a4SAndroid Build Coastguard Worker         while state.queue.len() == state.capacity {
143*bb4ee6a4SAndroid Build Coastguard Worker             if state.is_closed {
144*bb4ee6a4SAndroid Build Coastguard Worker                 return false;
145*bb4ee6a4SAndroid Build Coastguard Worker             }
146*bb4ee6a4SAndroid Build Coastguard Worker             state = self.producer_wait.wait(state);
147*bb4ee6a4SAndroid Build Coastguard Worker         }
148*bb4ee6a4SAndroid Build Coastguard Worker         if state.is_closed {
149*bb4ee6a4SAndroid Build Coastguard Worker             return false;
150*bb4ee6a4SAndroid Build Coastguard Worker         }
151*bb4ee6a4SAndroid Build Coastguard Worker         state.queue.push_back(item);
152*bb4ee6a4SAndroid Build Coastguard Worker         self.consumer_wait.notify_one();
153*bb4ee6a4SAndroid Build Coastguard Worker         true
154*bb4ee6a4SAndroid Build Coastguard Worker     }
155*bb4ee6a4SAndroid Build Coastguard Worker 
156*bb4ee6a4SAndroid Build Coastguard Worker     /// Wait until all the tasks have been executed.
157*bb4ee6a4SAndroid Build Coastguard Worker     ///
158*bb4ee6a4SAndroid Build Coastguard Worker     /// This guarantees that all the tasks in this channel are not only consumed but also executed.
wait_complete(&self)159*bb4ee6a4SAndroid Build Coastguard Worker     pub fn wait_complete(&self) {
160*bb4ee6a4SAndroid Build Coastguard Worker         let mut state = self.state.lock();
161*bb4ee6a4SAndroid Build Coastguard Worker         while !(state.queue.is_empty() && state.n_waiting == self.n_consumers) {
162*bb4ee6a4SAndroid Build Coastguard Worker             state = self.producer_wait.wait(state);
163*bb4ee6a4SAndroid Build Coastguard Worker         }
164*bb4ee6a4SAndroid Build Coastguard Worker     }
165*bb4ee6a4SAndroid Build Coastguard Worker }
166*bb4ee6a4SAndroid Build Coastguard Worker 
167*bb4ee6a4SAndroid Build Coastguard Worker struct ChannelState<T> {
168*bb4ee6a4SAndroid Build Coastguard Worker     queue: VecDeque<T>,
169*bb4ee6a4SAndroid Build Coastguard Worker     capacity: usize,
170*bb4ee6a4SAndroid Build Coastguard Worker     n_waiting: usize,
171*bb4ee6a4SAndroid Build Coastguard Worker     is_closed: bool,
172*bb4ee6a4SAndroid Build Coastguard Worker }
173*bb4ee6a4SAndroid Build Coastguard Worker 
174*bb4ee6a4SAndroid Build Coastguard Worker impl<T> ChannelState<T> {
new(capacity: usize) -> Self175*bb4ee6a4SAndroid Build Coastguard Worker     fn new(capacity: usize) -> Self {
176*bb4ee6a4SAndroid Build Coastguard Worker         Self {
177*bb4ee6a4SAndroid Build Coastguard Worker             queue: VecDeque::with_capacity(capacity),
178*bb4ee6a4SAndroid Build Coastguard Worker             capacity,
179*bb4ee6a4SAndroid Build Coastguard Worker             n_waiting: 0,
180*bb4ee6a4SAndroid Build Coastguard Worker             is_closed: false,
181*bb4ee6a4SAndroid Build Coastguard Worker         }
182*bb4ee6a4SAndroid Build Coastguard Worker     }
183*bb4ee6a4SAndroid Build Coastguard Worker }
184*bb4ee6a4SAndroid Build Coastguard Worker 
185*bb4ee6a4SAndroid Build Coastguard Worker /// The event channel for background jobs.
186*bb4ee6a4SAndroid Build Coastguard Worker ///
187*bb4ee6a4SAndroid Build Coastguard Worker /// This sends an abort request from the main thread to the job thread via atomic boolean flag.
188*bb4ee6a4SAndroid Build Coastguard Worker ///
189*bb4ee6a4SAndroid Build Coastguard Worker /// This notifies the main thread that the job thread is completed via [Event].
190*bb4ee6a4SAndroid Build Coastguard Worker pub struct BackgroundJobControl {
191*bb4ee6a4SAndroid Build Coastguard Worker     event: Event,
192*bb4ee6a4SAndroid Build Coastguard Worker     abort_flag: AtomicBool,
193*bb4ee6a4SAndroid Build Coastguard Worker }
194*bb4ee6a4SAndroid Build Coastguard Worker 
195*bb4ee6a4SAndroid Build Coastguard Worker impl BackgroundJobControl {
196*bb4ee6a4SAndroid Build Coastguard Worker     /// Creates [BackgroundJobControl].
new() -> anyhow::Result<Self>197*bb4ee6a4SAndroid Build Coastguard Worker     pub fn new() -> anyhow::Result<Self> {
198*bb4ee6a4SAndroid Build Coastguard Worker         Ok(Self {
199*bb4ee6a4SAndroid Build Coastguard Worker             event: Event::new()?,
200*bb4ee6a4SAndroid Build Coastguard Worker             abort_flag: AtomicBool::new(false),
201*bb4ee6a4SAndroid Build Coastguard Worker         })
202*bb4ee6a4SAndroid Build Coastguard Worker     }
203*bb4ee6a4SAndroid Build Coastguard Worker 
204*bb4ee6a4SAndroid Build Coastguard Worker     /// Creates [BackgroundJob].
new_job(&self) -> BackgroundJob<'_>205*bb4ee6a4SAndroid Build Coastguard Worker     pub fn new_job(&self) -> BackgroundJob<'_> {
206*bb4ee6a4SAndroid Build Coastguard Worker         BackgroundJob {
207*bb4ee6a4SAndroid Build Coastguard Worker             event: &self.event,
208*bb4ee6a4SAndroid Build Coastguard Worker             abort_flag: &self.abort_flag,
209*bb4ee6a4SAndroid Build Coastguard Worker         }
210*bb4ee6a4SAndroid Build Coastguard Worker     }
211*bb4ee6a4SAndroid Build Coastguard Worker 
212*bb4ee6a4SAndroid Build Coastguard Worker     /// Abort the background job.
abort(&self)213*bb4ee6a4SAndroid Build Coastguard Worker     pub fn abort(&self) {
214*bb4ee6a4SAndroid Build Coastguard Worker         self.abort_flag.store(true, Ordering::Release);
215*bb4ee6a4SAndroid Build Coastguard Worker     }
216*bb4ee6a4SAndroid Build Coastguard Worker 
217*bb4ee6a4SAndroid Build Coastguard Worker     /// Reset the internal state for a next job.
218*bb4ee6a4SAndroid Build Coastguard Worker     ///
219*bb4ee6a4SAndroid Build Coastguard Worker     /// Returns false, if the event is already reset and no event exists.
reset(&self) -> anyhow::Result<bool>220*bb4ee6a4SAndroid Build Coastguard Worker     pub fn reset(&self) -> anyhow::Result<bool> {
221*bb4ee6a4SAndroid Build Coastguard Worker         self.abort_flag.store(false, Ordering::Release);
222*bb4ee6a4SAndroid Build Coastguard Worker         Ok(matches!(
223*bb4ee6a4SAndroid Build Coastguard Worker             self.event
224*bb4ee6a4SAndroid Build Coastguard Worker                 .wait_timeout(Duration::ZERO)
225*bb4ee6a4SAndroid Build Coastguard Worker                 .context("failed to get job complete event")?,
226*bb4ee6a4SAndroid Build Coastguard Worker             EventWaitResult::Signaled
227*bb4ee6a4SAndroid Build Coastguard Worker         ))
228*bb4ee6a4SAndroid Build Coastguard Worker     }
229*bb4ee6a4SAndroid Build Coastguard Worker 
230*bb4ee6a4SAndroid Build Coastguard Worker     /// Returns the event to notify the completion of background job.
get_completion_event(&self) -> &Event231*bb4ee6a4SAndroid Build Coastguard Worker     pub fn get_completion_event(&self) -> &Event {
232*bb4ee6a4SAndroid Build Coastguard Worker         &self.event
233*bb4ee6a4SAndroid Build Coastguard Worker     }
234*bb4ee6a4SAndroid Build Coastguard Worker }
235*bb4ee6a4SAndroid Build Coastguard Worker 
236*bb4ee6a4SAndroid Build Coastguard Worker /// Background job context.
237*bb4ee6a4SAndroid Build Coastguard Worker ///
238*bb4ee6a4SAndroid Build Coastguard Worker /// When dropped, this sends an event to the main thread via [Event].
239*bb4ee6a4SAndroid Build Coastguard Worker pub struct BackgroundJob<'a> {
240*bb4ee6a4SAndroid Build Coastguard Worker     event: &'a Event,
241*bb4ee6a4SAndroid Build Coastguard Worker     abort_flag: &'a AtomicBool,
242*bb4ee6a4SAndroid Build Coastguard Worker }
243*bb4ee6a4SAndroid Build Coastguard Worker 
244*bb4ee6a4SAndroid Build Coastguard Worker impl BackgroundJob<'_> {
245*bb4ee6a4SAndroid Build Coastguard Worker     /// Returns whether the background job is aborted or not.
is_aborted(&self) -> bool246*bb4ee6a4SAndroid Build Coastguard Worker     pub fn is_aborted(&self) -> bool {
247*bb4ee6a4SAndroid Build Coastguard Worker         self.abort_flag.load(Ordering::Acquire)
248*bb4ee6a4SAndroid Build Coastguard Worker     }
249*bb4ee6a4SAndroid Build Coastguard Worker }
250*bb4ee6a4SAndroid Build Coastguard Worker 
251*bb4ee6a4SAndroid Build Coastguard Worker impl Drop for BackgroundJob<'_> {
drop(&mut self)252*bb4ee6a4SAndroid Build Coastguard Worker     fn drop(&mut self) {
253*bb4ee6a4SAndroid Build Coastguard Worker         self.event.signal().expect("send job complete event");
254*bb4ee6a4SAndroid Build Coastguard Worker     }
255*bb4ee6a4SAndroid Build Coastguard Worker }
256*bb4ee6a4SAndroid Build Coastguard Worker 
257*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(test)]
258*bb4ee6a4SAndroid Build Coastguard Worker mod tests {
259*bb4ee6a4SAndroid Build Coastguard Worker     use std::time::Duration;
260*bb4ee6a4SAndroid Build Coastguard Worker 
261*bb4ee6a4SAndroid Build Coastguard Worker     use super::*;
262*bb4ee6a4SAndroid Build Coastguard Worker 
263*bb4ee6a4SAndroid Build Coastguard Worker     #[derive(Clone, Copy)]
264*bb4ee6a4SAndroid Build Coastguard Worker     struct Context {
265*bb4ee6a4SAndroid Build Coastguard Worker         n_consume: usize,
266*bb4ee6a4SAndroid Build Coastguard Worker         n_executed: usize,
267*bb4ee6a4SAndroid Build Coastguard Worker     }
268*bb4ee6a4SAndroid Build Coastguard Worker 
269*bb4ee6a4SAndroid Build Coastguard Worker     struct FakeTask {
270*bb4ee6a4SAndroid Build Coastguard Worker         context: Mutex<Context>,
271*bb4ee6a4SAndroid Build Coastguard Worker         waker: Condvar,
272*bb4ee6a4SAndroid Build Coastguard Worker     }
273*bb4ee6a4SAndroid Build Coastguard Worker 
274*bb4ee6a4SAndroid Build Coastguard Worker     impl FakeTask {
new() -> Arc<Self>275*bb4ee6a4SAndroid Build Coastguard Worker         fn new() -> Arc<Self> {
276*bb4ee6a4SAndroid Build Coastguard Worker             Arc::new(Self {
277*bb4ee6a4SAndroid Build Coastguard Worker                 context: Mutex::new(Context {
278*bb4ee6a4SAndroid Build Coastguard Worker                     n_consume: 0,
279*bb4ee6a4SAndroid Build Coastguard Worker                     n_executed: 0,
280*bb4ee6a4SAndroid Build Coastguard Worker                 }),
281*bb4ee6a4SAndroid Build Coastguard Worker                 waker: Condvar::new(),
282*bb4ee6a4SAndroid Build Coastguard Worker             })
283*bb4ee6a4SAndroid Build Coastguard Worker         }
284*bb4ee6a4SAndroid Build Coastguard Worker 
consume(&self, count: usize)285*bb4ee6a4SAndroid Build Coastguard Worker         fn consume(&self, count: usize) {
286*bb4ee6a4SAndroid Build Coastguard Worker             let mut context = self.context.lock();
287*bb4ee6a4SAndroid Build Coastguard Worker             context.n_consume += count;
288*bb4ee6a4SAndroid Build Coastguard Worker             self.waker.notify_all();
289*bb4ee6a4SAndroid Build Coastguard Worker         }
290*bb4ee6a4SAndroid Build Coastguard Worker 
n_executed(&self) -> usize291*bb4ee6a4SAndroid Build Coastguard Worker         fn n_executed(&self) -> usize {
292*bb4ee6a4SAndroid Build Coastguard Worker             self.context.lock().n_executed
293*bb4ee6a4SAndroid Build Coastguard Worker         }
294*bb4ee6a4SAndroid Build Coastguard Worker     }
295*bb4ee6a4SAndroid Build Coastguard Worker 
296*bb4ee6a4SAndroid Build Coastguard Worker     impl Task for Arc<FakeTask> {
execute(self)297*bb4ee6a4SAndroid Build Coastguard Worker         fn execute(self) {
298*bb4ee6a4SAndroid Build Coastguard Worker             let mut context = self.context.lock();
299*bb4ee6a4SAndroid Build Coastguard Worker             while context.n_consume == 0 {
300*bb4ee6a4SAndroid Build Coastguard Worker                 context = self.waker.wait(context);
301*bb4ee6a4SAndroid Build Coastguard Worker             }
302*bb4ee6a4SAndroid Build Coastguard Worker             context.n_consume -= 1;
303*bb4ee6a4SAndroid Build Coastguard Worker             context.n_executed += 1;
304*bb4ee6a4SAndroid Build Coastguard Worker         }
305*bb4ee6a4SAndroid Build Coastguard Worker     }
306*bb4ee6a4SAndroid Build Coastguard Worker 
wait_thread_with_timeout<T>(join_handle: thread::JoinHandle<T>, timeout_millis: u64) -> T307*bb4ee6a4SAndroid Build Coastguard Worker     fn wait_thread_with_timeout<T>(join_handle: thread::JoinHandle<T>, timeout_millis: u64) -> T {
308*bb4ee6a4SAndroid Build Coastguard Worker         for _ in 0..timeout_millis {
309*bb4ee6a4SAndroid Build Coastguard Worker             if join_handle.is_finished() {
310*bb4ee6a4SAndroid Build Coastguard Worker                 return join_handle.join().unwrap();
311*bb4ee6a4SAndroid Build Coastguard Worker             }
312*bb4ee6a4SAndroid Build Coastguard Worker             thread::sleep(Duration::from_millis(1));
313*bb4ee6a4SAndroid Build Coastguard Worker         }
314*bb4ee6a4SAndroid Build Coastguard Worker         panic!("thread join timeout");
315*bb4ee6a4SAndroid Build Coastguard Worker     }
316*bb4ee6a4SAndroid Build Coastguard Worker 
poll_until_with_timeout<F>(f: F, timeout_millis: u64) where F: Fn() -> bool,317*bb4ee6a4SAndroid Build Coastguard Worker     fn poll_until_with_timeout<F>(f: F, timeout_millis: u64)
318*bb4ee6a4SAndroid Build Coastguard Worker     where
319*bb4ee6a4SAndroid Build Coastguard Worker         F: Fn() -> bool,
320*bb4ee6a4SAndroid Build Coastguard Worker     {
321*bb4ee6a4SAndroid Build Coastguard Worker         for _ in 0..timeout_millis {
322*bb4ee6a4SAndroid Build Coastguard Worker             if f() {
323*bb4ee6a4SAndroid Build Coastguard Worker                 break;
324*bb4ee6a4SAndroid Build Coastguard Worker             }
325*bb4ee6a4SAndroid Build Coastguard Worker             thread::sleep(Duration::from_millis(1));
326*bb4ee6a4SAndroid Build Coastguard Worker         }
327*bb4ee6a4SAndroid Build Coastguard Worker     }
328*bb4ee6a4SAndroid Build Coastguard Worker 
329*bb4ee6a4SAndroid Build Coastguard Worker     #[test]
test_worker()330*bb4ee6a4SAndroid Build Coastguard Worker     fn test_worker() {
331*bb4ee6a4SAndroid Build Coastguard Worker         let worker = Worker::new(2, 4);
332*bb4ee6a4SAndroid Build Coastguard Worker         let task = FakeTask::new();
333*bb4ee6a4SAndroid Build Coastguard Worker         let channel = worker.channel.clone();
334*bb4ee6a4SAndroid Build Coastguard Worker 
335*bb4ee6a4SAndroid Build Coastguard Worker         for _ in 0..4 {
336*bb4ee6a4SAndroid Build Coastguard Worker             assert!(channel.push(task.clone()));
337*bb4ee6a4SAndroid Build Coastguard Worker         }
338*bb4ee6a4SAndroid Build Coastguard Worker 
339*bb4ee6a4SAndroid Build Coastguard Worker         assert_eq!(task.n_executed(), 0);
340*bb4ee6a4SAndroid Build Coastguard Worker         task.consume(4);
341*bb4ee6a4SAndroid Build Coastguard Worker         worker.channel.wait_complete();
342*bb4ee6a4SAndroid Build Coastguard Worker         assert_eq!(task.n_executed(), 4);
343*bb4ee6a4SAndroid Build Coastguard Worker         worker.close();
344*bb4ee6a4SAndroid Build Coastguard Worker     }
345*bb4ee6a4SAndroid Build Coastguard Worker 
346*bb4ee6a4SAndroid Build Coastguard Worker     #[test]
test_worker_push_after_close()347*bb4ee6a4SAndroid Build Coastguard Worker     fn test_worker_push_after_close() {
348*bb4ee6a4SAndroid Build Coastguard Worker         let worker = Worker::new(2, 4);
349*bb4ee6a4SAndroid Build Coastguard Worker         let task = FakeTask::new();
350*bb4ee6a4SAndroid Build Coastguard Worker         let channel = worker.channel.clone();
351*bb4ee6a4SAndroid Build Coastguard Worker 
352*bb4ee6a4SAndroid Build Coastguard Worker         worker.close();
353*bb4ee6a4SAndroid Build Coastguard Worker 
354*bb4ee6a4SAndroid Build Coastguard Worker         assert!(!channel.push(task));
355*bb4ee6a4SAndroid Build Coastguard Worker     }
356*bb4ee6a4SAndroid Build Coastguard Worker 
357*bb4ee6a4SAndroid Build Coastguard Worker     #[test]
test_worker_push_block()358*bb4ee6a4SAndroid Build Coastguard Worker     fn test_worker_push_block() {
359*bb4ee6a4SAndroid Build Coastguard Worker         let worker = Worker::new(2, 4);
360*bb4ee6a4SAndroid Build Coastguard Worker         let task = FakeTask::new();
361*bb4ee6a4SAndroid Build Coastguard Worker         let channel = worker.channel.clone();
362*bb4ee6a4SAndroid Build Coastguard Worker 
363*bb4ee6a4SAndroid Build Coastguard Worker         let task_cloned = task.clone();
364*bb4ee6a4SAndroid Build Coastguard Worker         // push tasks on another thread to avoid blocking forever
365*bb4ee6a4SAndroid Build Coastguard Worker         wait_thread_with_timeout(
366*bb4ee6a4SAndroid Build Coastguard Worker             thread::spawn(move || {
367*bb4ee6a4SAndroid Build Coastguard Worker                 for _ in 0..6 {
368*bb4ee6a4SAndroid Build Coastguard Worker                     assert!(channel.push(task_cloned.clone()));
369*bb4ee6a4SAndroid Build Coastguard Worker                 }
370*bb4ee6a4SAndroid Build Coastguard Worker             }),
371*bb4ee6a4SAndroid Build Coastguard Worker             100,
372*bb4ee6a4SAndroid Build Coastguard Worker         );
373*bb4ee6a4SAndroid Build Coastguard Worker         let channel = worker.channel.clone();
374*bb4ee6a4SAndroid Build Coastguard Worker         let task_cloned = task.clone();
375*bb4ee6a4SAndroid Build Coastguard Worker         let push_thread = thread::spawn(move || {
376*bb4ee6a4SAndroid Build Coastguard Worker             assert!(channel.push(task_cloned));
377*bb4ee6a4SAndroid Build Coastguard Worker         });
378*bb4ee6a4SAndroid Build Coastguard Worker         thread::sleep(Duration::from_millis(10));
379*bb4ee6a4SAndroid Build Coastguard Worker         assert!(!push_thread.is_finished());
380*bb4ee6a4SAndroid Build Coastguard Worker 
381*bb4ee6a4SAndroid Build Coastguard Worker         task.consume(1);
382*bb4ee6a4SAndroid Build Coastguard Worker         wait_thread_with_timeout(push_thread, 100);
383*bb4ee6a4SAndroid Build Coastguard Worker 
384*bb4ee6a4SAndroid Build Coastguard Worker         task.consume(6);
385*bb4ee6a4SAndroid Build Coastguard Worker         #[allow(clippy::redundant_clone)]
386*bb4ee6a4SAndroid Build Coastguard Worker         let task_clone = task.clone();
387*bb4ee6a4SAndroid Build Coastguard Worker         poll_until_with_timeout(|| task_clone.n_executed() == 7, 100);
388*bb4ee6a4SAndroid Build Coastguard Worker         assert_eq!(task.n_executed(), 7);
389*bb4ee6a4SAndroid Build Coastguard Worker         worker.close();
390*bb4ee6a4SAndroid Build Coastguard Worker     }
391*bb4ee6a4SAndroid Build Coastguard Worker 
392*bb4ee6a4SAndroid Build Coastguard Worker     #[test]
test_worker_close_on_push_blocked()393*bb4ee6a4SAndroid Build Coastguard Worker     fn test_worker_close_on_push_blocked() {
394*bb4ee6a4SAndroid Build Coastguard Worker         let worker = Worker::new(2, 4);
395*bb4ee6a4SAndroid Build Coastguard Worker         let task = FakeTask::new();
396*bb4ee6a4SAndroid Build Coastguard Worker         let channel = worker.channel.clone();
397*bb4ee6a4SAndroid Build Coastguard Worker 
398*bb4ee6a4SAndroid Build Coastguard Worker         let task_cloned = task.clone();
399*bb4ee6a4SAndroid Build Coastguard Worker         // push tasks on another thread to avoid blocking forever
400*bb4ee6a4SAndroid Build Coastguard Worker         wait_thread_with_timeout(
401*bb4ee6a4SAndroid Build Coastguard Worker             thread::spawn(move || {
402*bb4ee6a4SAndroid Build Coastguard Worker                 for _ in 0..6 {
403*bb4ee6a4SAndroid Build Coastguard Worker                     assert!(channel.push(task_cloned.clone()));
404*bb4ee6a4SAndroid Build Coastguard Worker                 }
405*bb4ee6a4SAndroid Build Coastguard Worker             }),
406*bb4ee6a4SAndroid Build Coastguard Worker             100,
407*bb4ee6a4SAndroid Build Coastguard Worker         );
408*bb4ee6a4SAndroid Build Coastguard Worker         let channel = worker.channel.clone();
409*bb4ee6a4SAndroid Build Coastguard Worker         let task_cloned = task.clone();
410*bb4ee6a4SAndroid Build Coastguard Worker         let push_thread = thread::spawn(move || channel.push(task_cloned));
411*bb4ee6a4SAndroid Build Coastguard Worker         // sleep to run push_thread.
412*bb4ee6a4SAndroid Build Coastguard Worker         thread::sleep(Duration::from_millis(10));
413*bb4ee6a4SAndroid Build Coastguard Worker         // close blocks until all the task are executed.
414*bb4ee6a4SAndroid Build Coastguard Worker         let close_thread = thread::spawn(move || {
415*bb4ee6a4SAndroid Build Coastguard Worker             worker.close();
416*bb4ee6a4SAndroid Build Coastguard Worker         });
417*bb4ee6a4SAndroid Build Coastguard Worker         let push_result = wait_thread_with_timeout(push_thread, 100);
418*bb4ee6a4SAndroid Build Coastguard Worker         // push fails.
419*bb4ee6a4SAndroid Build Coastguard Worker         assert!(!push_result);
420*bb4ee6a4SAndroid Build Coastguard Worker 
421*bb4ee6a4SAndroid Build Coastguard Worker         // cleanup
422*bb4ee6a4SAndroid Build Coastguard Worker         task.consume(6);
423*bb4ee6a4SAndroid Build Coastguard Worker         wait_thread_with_timeout(close_thread, 100);
424*bb4ee6a4SAndroid Build Coastguard Worker     }
425*bb4ee6a4SAndroid Build Coastguard Worker 
426*bb4ee6a4SAndroid Build Coastguard Worker     #[test]
new_background_job_event()427*bb4ee6a4SAndroid Build Coastguard Worker     fn new_background_job_event() {
428*bb4ee6a4SAndroid Build Coastguard Worker         assert!(BackgroundJobControl::new().is_ok());
429*bb4ee6a4SAndroid Build Coastguard Worker     }
430*bb4ee6a4SAndroid Build Coastguard Worker 
431*bb4ee6a4SAndroid Build Coastguard Worker     #[test]
background_job_is_not_aborted_default()432*bb4ee6a4SAndroid Build Coastguard Worker     fn background_job_is_not_aborted_default() {
433*bb4ee6a4SAndroid Build Coastguard Worker         let event = BackgroundJobControl::new().unwrap();
434*bb4ee6a4SAndroid Build Coastguard Worker 
435*bb4ee6a4SAndroid Build Coastguard Worker         let job = event.new_job();
436*bb4ee6a4SAndroid Build Coastguard Worker 
437*bb4ee6a4SAndroid Build Coastguard Worker         assert!(!job.is_aborted());
438*bb4ee6a4SAndroid Build Coastguard Worker     }
439*bb4ee6a4SAndroid Build Coastguard Worker 
440*bb4ee6a4SAndroid Build Coastguard Worker     #[test]
abort_background_job()441*bb4ee6a4SAndroid Build Coastguard Worker     fn abort_background_job() {
442*bb4ee6a4SAndroid Build Coastguard Worker         let event = BackgroundJobControl::new().unwrap();
443*bb4ee6a4SAndroid Build Coastguard Worker 
444*bb4ee6a4SAndroid Build Coastguard Worker         let job = event.new_job();
445*bb4ee6a4SAndroid Build Coastguard Worker         event.abort();
446*bb4ee6a4SAndroid Build Coastguard Worker 
447*bb4ee6a4SAndroid Build Coastguard Worker         assert!(job.is_aborted());
448*bb4ee6a4SAndroid Build Coastguard Worker     }
449*bb4ee6a4SAndroid Build Coastguard Worker 
450*bb4ee6a4SAndroid Build Coastguard Worker     #[test]
reset_background_job()451*bb4ee6a4SAndroid Build Coastguard Worker     fn reset_background_job() {
452*bb4ee6a4SAndroid Build Coastguard Worker         let event = BackgroundJobControl::new().unwrap();
453*bb4ee6a4SAndroid Build Coastguard Worker 
454*bb4ee6a4SAndroid Build Coastguard Worker         event.abort();
455*bb4ee6a4SAndroid Build Coastguard Worker         event.reset().unwrap();
456*bb4ee6a4SAndroid Build Coastguard Worker         let job = event.new_job();
457*bb4ee6a4SAndroid Build Coastguard Worker 
458*bb4ee6a4SAndroid Build Coastguard Worker         assert!(!job.is_aborted());
459*bb4ee6a4SAndroid Build Coastguard Worker     }
460*bb4ee6a4SAndroid Build Coastguard Worker 
461*bb4ee6a4SAndroid Build Coastguard Worker     #[test]
reset_background_job_event()462*bb4ee6a4SAndroid Build Coastguard Worker     fn reset_background_job_event() {
463*bb4ee6a4SAndroid Build Coastguard Worker         let event = BackgroundJobControl::new().unwrap();
464*bb4ee6a4SAndroid Build Coastguard Worker 
465*bb4ee6a4SAndroid Build Coastguard Worker         let job = event.new_job();
466*bb4ee6a4SAndroid Build Coastguard Worker         drop(job);
467*bb4ee6a4SAndroid Build Coastguard Worker 
468*bb4ee6a4SAndroid Build Coastguard Worker         assert!(event.reset().unwrap());
469*bb4ee6a4SAndroid Build Coastguard Worker     }
470*bb4ee6a4SAndroid Build Coastguard Worker 
471*bb4ee6a4SAndroid Build Coastguard Worker     #[test]
reset_background_job_event_twice()472*bb4ee6a4SAndroid Build Coastguard Worker     fn reset_background_job_event_twice() {
473*bb4ee6a4SAndroid Build Coastguard Worker         let event = BackgroundJobControl::new().unwrap();
474*bb4ee6a4SAndroid Build Coastguard Worker 
475*bb4ee6a4SAndroid Build Coastguard Worker         let job = event.new_job();
476*bb4ee6a4SAndroid Build Coastguard Worker         drop(job);
477*bb4ee6a4SAndroid Build Coastguard Worker 
478*bb4ee6a4SAndroid Build Coastguard Worker         event.reset().unwrap();
479*bb4ee6a4SAndroid Build Coastguard Worker         assert!(!event.reset().unwrap());
480*bb4ee6a4SAndroid Build Coastguard Worker     }
481*bb4ee6a4SAndroid Build Coastguard Worker 
482*bb4ee6a4SAndroid Build Coastguard Worker     #[test]
reset_background_job_event_no_jobs()483*bb4ee6a4SAndroid Build Coastguard Worker     fn reset_background_job_event_no_jobs() {
484*bb4ee6a4SAndroid Build Coastguard Worker         let event = BackgroundJobControl::new().unwrap();
485*bb4ee6a4SAndroid Build Coastguard Worker 
486*bb4ee6a4SAndroid Build Coastguard Worker         assert!(!event.reset().unwrap());
487*bb4ee6a4SAndroid Build Coastguard Worker     }
488*bb4ee6a4SAndroid Build Coastguard Worker }
489