xref: /aosp_15_r20/external/crosvm/cros_async/src/blocking/pool.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1*bb4ee6a4SAndroid Build Coastguard Worker // Copyright 2021 The ChromiumOS Authors
2*bb4ee6a4SAndroid Build Coastguard Worker // Use of this source code is governed by a BSD-style license that can be
3*bb4ee6a4SAndroid Build Coastguard Worker // found in the LICENSE file.
4*bb4ee6a4SAndroid Build Coastguard Worker 
5*bb4ee6a4SAndroid Build Coastguard Worker use std::collections::VecDeque;
6*bb4ee6a4SAndroid Build Coastguard Worker use std::future::Future;
7*bb4ee6a4SAndroid Build Coastguard Worker use std::mem;
8*bb4ee6a4SAndroid Build Coastguard Worker use std::sync::mpsc::channel;
9*bb4ee6a4SAndroid Build Coastguard Worker use std::sync::mpsc::Receiver;
10*bb4ee6a4SAndroid Build Coastguard Worker use std::sync::mpsc::Sender;
11*bb4ee6a4SAndroid Build Coastguard Worker use std::sync::Arc;
12*bb4ee6a4SAndroid Build Coastguard Worker use std::thread;
13*bb4ee6a4SAndroid Build Coastguard Worker use std::thread::JoinHandle;
14*bb4ee6a4SAndroid Build Coastguard Worker use std::time::Duration;
15*bb4ee6a4SAndroid Build Coastguard Worker use std::time::Instant;
16*bb4ee6a4SAndroid Build Coastguard Worker 
17*bb4ee6a4SAndroid Build Coastguard Worker use base::error;
18*bb4ee6a4SAndroid Build Coastguard Worker use base::warn;
19*bb4ee6a4SAndroid Build Coastguard Worker use futures::channel::oneshot;
20*bb4ee6a4SAndroid Build Coastguard Worker use slab::Slab;
21*bb4ee6a4SAndroid Build Coastguard Worker use sync::Condvar;
22*bb4ee6a4SAndroid Build Coastguard Worker use sync::Mutex;
23*bb4ee6a4SAndroid Build Coastguard Worker 
24*bb4ee6a4SAndroid Build Coastguard Worker const DEFAULT_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
25*bb4ee6a4SAndroid Build Coastguard Worker 
26*bb4ee6a4SAndroid Build Coastguard Worker struct State {
27*bb4ee6a4SAndroid Build Coastguard Worker     tasks: VecDeque<Box<dyn FnOnce() + Send>>,
28*bb4ee6a4SAndroid Build Coastguard Worker     num_threads: usize,
29*bb4ee6a4SAndroid Build Coastguard Worker     num_idle: usize,
30*bb4ee6a4SAndroid Build Coastguard Worker     num_notified: usize,
31*bb4ee6a4SAndroid Build Coastguard Worker     worker_threads: Slab<JoinHandle<()>>,
32*bb4ee6a4SAndroid Build Coastguard Worker     exited_threads: Option<Receiver<usize>>,
33*bb4ee6a4SAndroid Build Coastguard Worker     exit: Sender<usize>,
34*bb4ee6a4SAndroid Build Coastguard Worker     shutting_down: bool,
35*bb4ee6a4SAndroid Build Coastguard Worker }
36*bb4ee6a4SAndroid Build Coastguard Worker 
run_blocking_thread(idx: usize, inner: Arc<Inner>, exit: Sender<usize>)37*bb4ee6a4SAndroid Build Coastguard Worker fn run_blocking_thread(idx: usize, inner: Arc<Inner>, exit: Sender<usize>) {
38*bb4ee6a4SAndroid Build Coastguard Worker     let mut state = inner.state.lock();
39*bb4ee6a4SAndroid Build Coastguard Worker     while !state.shutting_down {
40*bb4ee6a4SAndroid Build Coastguard Worker         if let Some(f) = state.tasks.pop_front() {
41*bb4ee6a4SAndroid Build Coastguard Worker             drop(state);
42*bb4ee6a4SAndroid Build Coastguard Worker             f();
43*bb4ee6a4SAndroid Build Coastguard Worker             state = inner.state.lock();
44*bb4ee6a4SAndroid Build Coastguard Worker             continue;
45*bb4ee6a4SAndroid Build Coastguard Worker         }
46*bb4ee6a4SAndroid Build Coastguard Worker 
47*bb4ee6a4SAndroid Build Coastguard Worker         // No more tasks so wait for more work.
48*bb4ee6a4SAndroid Build Coastguard Worker         state.num_idle += 1;
49*bb4ee6a4SAndroid Build Coastguard Worker 
50*bb4ee6a4SAndroid Build Coastguard Worker         let (guard, result) = inner
51*bb4ee6a4SAndroid Build Coastguard Worker             .condvar
52*bb4ee6a4SAndroid Build Coastguard Worker             .wait_timeout_while(state, inner.keepalive, |s| {
53*bb4ee6a4SAndroid Build Coastguard Worker                 !s.shutting_down && s.num_notified == 0
54*bb4ee6a4SAndroid Build Coastguard Worker             });
55*bb4ee6a4SAndroid Build Coastguard Worker         state = guard;
56*bb4ee6a4SAndroid Build Coastguard Worker 
57*bb4ee6a4SAndroid Build Coastguard Worker         // If `state.num_notified > 0` then this was a real wakeup.
58*bb4ee6a4SAndroid Build Coastguard Worker         if state.num_notified > 0 {
59*bb4ee6a4SAndroid Build Coastguard Worker             state.num_notified -= 1;
60*bb4ee6a4SAndroid Build Coastguard Worker             continue;
61*bb4ee6a4SAndroid Build Coastguard Worker         }
62*bb4ee6a4SAndroid Build Coastguard Worker 
63*bb4ee6a4SAndroid Build Coastguard Worker         // Only decrement the idle count if we timed out. Otherwise, it was decremented when new
64*bb4ee6a4SAndroid Build Coastguard Worker         // work was added to `state.tasks`.
65*bb4ee6a4SAndroid Build Coastguard Worker         if result.timed_out() {
66*bb4ee6a4SAndroid Build Coastguard Worker             state.num_idle = state
67*bb4ee6a4SAndroid Build Coastguard Worker                 .num_idle
68*bb4ee6a4SAndroid Build Coastguard Worker                 .checked_sub(1)
69*bb4ee6a4SAndroid Build Coastguard Worker                 .expect("`num_idle` underflow on timeout");
70*bb4ee6a4SAndroid Build Coastguard Worker             break;
71*bb4ee6a4SAndroid Build Coastguard Worker         }
72*bb4ee6a4SAndroid Build Coastguard Worker     }
73*bb4ee6a4SAndroid Build Coastguard Worker 
74*bb4ee6a4SAndroid Build Coastguard Worker     state.num_threads -= 1;
75*bb4ee6a4SAndroid Build Coastguard Worker 
76*bb4ee6a4SAndroid Build Coastguard Worker     // If we're shutting down then the BlockingPool will take care of joining all the threads.
77*bb4ee6a4SAndroid Build Coastguard Worker     // Otherwise, we need to join the last worker thread that exited here.
78*bb4ee6a4SAndroid Build Coastguard Worker     let last_exited_thread = if let Some(exited_threads) = state.exited_threads.as_mut() {
79*bb4ee6a4SAndroid Build Coastguard Worker         exited_threads
80*bb4ee6a4SAndroid Build Coastguard Worker             .try_recv()
81*bb4ee6a4SAndroid Build Coastguard Worker             .map(|idx| state.worker_threads.remove(idx))
82*bb4ee6a4SAndroid Build Coastguard Worker             .ok()
83*bb4ee6a4SAndroid Build Coastguard Worker     } else {
84*bb4ee6a4SAndroid Build Coastguard Worker         None
85*bb4ee6a4SAndroid Build Coastguard Worker     };
86*bb4ee6a4SAndroid Build Coastguard Worker 
87*bb4ee6a4SAndroid Build Coastguard Worker     // Drop the lock before trying to join the last exited thread.
88*bb4ee6a4SAndroid Build Coastguard Worker     drop(state);
89*bb4ee6a4SAndroid Build Coastguard Worker 
90*bb4ee6a4SAndroid Build Coastguard Worker     if let Some(handle) = last_exited_thread {
91*bb4ee6a4SAndroid Build Coastguard Worker         let _ = handle.join();
92*bb4ee6a4SAndroid Build Coastguard Worker     }
93*bb4ee6a4SAndroid Build Coastguard Worker 
94*bb4ee6a4SAndroid Build Coastguard Worker     if let Err(e) = exit.send(idx) {
95*bb4ee6a4SAndroid Build Coastguard Worker         error!("Failed to send thread exit event on channel: {}", e);
96*bb4ee6a4SAndroid Build Coastguard Worker     }
97*bb4ee6a4SAndroid Build Coastguard Worker }
98*bb4ee6a4SAndroid Build Coastguard Worker 
99*bb4ee6a4SAndroid Build Coastguard Worker struct Inner {
100*bb4ee6a4SAndroid Build Coastguard Worker     state: Mutex<State>,
101*bb4ee6a4SAndroid Build Coastguard Worker     condvar: Condvar,
102*bb4ee6a4SAndroid Build Coastguard Worker     max_threads: usize,
103*bb4ee6a4SAndroid Build Coastguard Worker     keepalive: Duration,
104*bb4ee6a4SAndroid Build Coastguard Worker }
105*bb4ee6a4SAndroid Build Coastguard Worker 
106*bb4ee6a4SAndroid Build Coastguard Worker impl Inner {
spawn<F, R>(self: &Arc<Self>, f: F) -> impl Future<Output = R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static,107*bb4ee6a4SAndroid Build Coastguard Worker     pub fn spawn<F, R>(self: &Arc<Self>, f: F) -> impl Future<Output = R>
108*bb4ee6a4SAndroid Build Coastguard Worker     where
109*bb4ee6a4SAndroid Build Coastguard Worker         F: FnOnce() -> R + Send + 'static,
110*bb4ee6a4SAndroid Build Coastguard Worker         R: Send + 'static,
111*bb4ee6a4SAndroid Build Coastguard Worker     {
112*bb4ee6a4SAndroid Build Coastguard Worker         let mut state = self.state.lock();
113*bb4ee6a4SAndroid Build Coastguard Worker 
114*bb4ee6a4SAndroid Build Coastguard Worker         // If we're shutting down then nothing is going to run this task.
115*bb4ee6a4SAndroid Build Coastguard Worker         if state.shutting_down {
116*bb4ee6a4SAndroid Build Coastguard Worker             error!("spawn called after shutdown");
117*bb4ee6a4SAndroid Build Coastguard Worker             return futures::future::Either::Left(async {
118*bb4ee6a4SAndroid Build Coastguard Worker                 panic!("tried to poll BlockingPool task after shutdown")
119*bb4ee6a4SAndroid Build Coastguard Worker             });
120*bb4ee6a4SAndroid Build Coastguard Worker         }
121*bb4ee6a4SAndroid Build Coastguard Worker 
122*bb4ee6a4SAndroid Build Coastguard Worker         let (send_chan, recv_chan) = oneshot::channel();
123*bb4ee6a4SAndroid Build Coastguard Worker         state.tasks.push_back(Box::new(|| {
124*bb4ee6a4SAndroid Build Coastguard Worker             let _ = send_chan.send(f());
125*bb4ee6a4SAndroid Build Coastguard Worker         }));
126*bb4ee6a4SAndroid Build Coastguard Worker 
127*bb4ee6a4SAndroid Build Coastguard Worker         if state.num_idle == 0 {
128*bb4ee6a4SAndroid Build Coastguard Worker             // There are no idle threads.  Spawn a new one if possible.
129*bb4ee6a4SAndroid Build Coastguard Worker             if state.num_threads < self.max_threads {
130*bb4ee6a4SAndroid Build Coastguard Worker                 state.num_threads += 1;
131*bb4ee6a4SAndroid Build Coastguard Worker                 let exit = state.exit.clone();
132*bb4ee6a4SAndroid Build Coastguard Worker                 let entry = state.worker_threads.vacant_entry();
133*bb4ee6a4SAndroid Build Coastguard Worker                 let idx = entry.key();
134*bb4ee6a4SAndroid Build Coastguard Worker                 let inner = self.clone();
135*bb4ee6a4SAndroid Build Coastguard Worker                 entry.insert(
136*bb4ee6a4SAndroid Build Coastguard Worker                     thread::Builder::new()
137*bb4ee6a4SAndroid Build Coastguard Worker                         .name(format!("blockingPool{}", idx))
138*bb4ee6a4SAndroid Build Coastguard Worker                         .spawn(move || run_blocking_thread(idx, inner, exit))
139*bb4ee6a4SAndroid Build Coastguard Worker                         .unwrap(),
140*bb4ee6a4SAndroid Build Coastguard Worker                 );
141*bb4ee6a4SAndroid Build Coastguard Worker             }
142*bb4ee6a4SAndroid Build Coastguard Worker         } else {
143*bb4ee6a4SAndroid Build Coastguard Worker             // We have idle threads, wake one up.
144*bb4ee6a4SAndroid Build Coastguard Worker             state.num_idle -= 1;
145*bb4ee6a4SAndroid Build Coastguard Worker             state.num_notified += 1;
146*bb4ee6a4SAndroid Build Coastguard Worker             self.condvar.notify_one();
147*bb4ee6a4SAndroid Build Coastguard Worker         }
148*bb4ee6a4SAndroid Build Coastguard Worker 
149*bb4ee6a4SAndroid Build Coastguard Worker         futures::future::Either::Right(async {
150*bb4ee6a4SAndroid Build Coastguard Worker             recv_chan
151*bb4ee6a4SAndroid Build Coastguard Worker                 .await
152*bb4ee6a4SAndroid Build Coastguard Worker                 .expect("BlockingThread task unexpectedly cancelled")
153*bb4ee6a4SAndroid Build Coastguard Worker         })
154*bb4ee6a4SAndroid Build Coastguard Worker     }
155*bb4ee6a4SAndroid Build Coastguard Worker }
156*bb4ee6a4SAndroid Build Coastguard Worker 
157*bb4ee6a4SAndroid Build Coastguard Worker #[derive(Debug, thiserror::Error)]
158*bb4ee6a4SAndroid Build Coastguard Worker #[error("{0} BlockingPool threads did not exit in time and will be detached")]
159*bb4ee6a4SAndroid Build Coastguard Worker pub struct ShutdownTimedOut(usize);
160*bb4ee6a4SAndroid Build Coastguard Worker 
161*bb4ee6a4SAndroid Build Coastguard Worker /// A thread pool for running work that may block.
162*bb4ee6a4SAndroid Build Coastguard Worker ///
163*bb4ee6a4SAndroid Build Coastguard Worker /// It is generally discouraged to do any blocking work inside an async function. However, this is
164*bb4ee6a4SAndroid Build Coastguard Worker /// sometimes unavoidable when dealing with interfaces that don't provide async variants. In this
165*bb4ee6a4SAndroid Build Coastguard Worker /// case callers may use the `BlockingPool` to run the blocking work on a different thread and
166*bb4ee6a4SAndroid Build Coastguard Worker /// `await` for its result to finish, which will prevent blocking the main thread of the
167*bb4ee6a4SAndroid Build Coastguard Worker /// application.
168*bb4ee6a4SAndroid Build Coastguard Worker ///
169*bb4ee6a4SAndroid Build Coastguard Worker /// Since the blocking work is sent to another thread, users should be careful when using the
170*bb4ee6a4SAndroid Build Coastguard Worker /// `BlockingPool` for latency-sensitive operations. Additionally, the `BlockingPool` is intended to
171*bb4ee6a4SAndroid Build Coastguard Worker /// be used for work that will eventually complete on its own. Users who want to spawn a thread
172*bb4ee6a4SAndroid Build Coastguard Worker /// should just use `thread::spawn` directly.
173*bb4ee6a4SAndroid Build Coastguard Worker ///
174*bb4ee6a4SAndroid Build Coastguard Worker /// There is no way to cancel work once it has been picked up by one of the worker threads in the
175*bb4ee6a4SAndroid Build Coastguard Worker /// `BlockingPool`. Dropping or shutting down the pool will block up to a timeout (default 10
176*bb4ee6a4SAndroid Build Coastguard Worker /// seconds) to wait for any active blocking work to finish. Any threads running tasks that have not
177*bb4ee6a4SAndroid Build Coastguard Worker /// completed by that time will be detached.
178*bb4ee6a4SAndroid Build Coastguard Worker ///
179*bb4ee6a4SAndroid Build Coastguard Worker /// # Examples
180*bb4ee6a4SAndroid Build Coastguard Worker ///
181*bb4ee6a4SAndroid Build Coastguard Worker /// Spawn a task to run in the `BlockingPool` and await on its result.
182*bb4ee6a4SAndroid Build Coastguard Worker ///
183*bb4ee6a4SAndroid Build Coastguard Worker /// ```edition2018
184*bb4ee6a4SAndroid Build Coastguard Worker /// use cros_async::BlockingPool;
185*bb4ee6a4SAndroid Build Coastguard Worker ///
186*bb4ee6a4SAndroid Build Coastguard Worker /// # async fn do_it() {
187*bb4ee6a4SAndroid Build Coastguard Worker ///     let pool = BlockingPool::default();
188*bb4ee6a4SAndroid Build Coastguard Worker ///
189*bb4ee6a4SAndroid Build Coastguard Worker ///     let res = pool.spawn(move || {
190*bb4ee6a4SAndroid Build Coastguard Worker ///         // Do some CPU-intensive or blocking work here.
191*bb4ee6a4SAndroid Build Coastguard Worker ///
192*bb4ee6a4SAndroid Build Coastguard Worker ///         42
193*bb4ee6a4SAndroid Build Coastguard Worker ///     }).await;
194*bb4ee6a4SAndroid Build Coastguard Worker ///
195*bb4ee6a4SAndroid Build Coastguard Worker ///     assert_eq!(res, 42);
196*bb4ee6a4SAndroid Build Coastguard Worker /// # }
197*bb4ee6a4SAndroid Build Coastguard Worker /// # cros_async::block_on(do_it());
198*bb4ee6a4SAndroid Build Coastguard Worker /// ```
199*bb4ee6a4SAndroid Build Coastguard Worker pub struct BlockingPool {
200*bb4ee6a4SAndroid Build Coastguard Worker     inner: Arc<Inner>,
201*bb4ee6a4SAndroid Build Coastguard Worker }
202*bb4ee6a4SAndroid Build Coastguard Worker 
203*bb4ee6a4SAndroid Build Coastguard Worker impl BlockingPool {
204*bb4ee6a4SAndroid Build Coastguard Worker     /// Create a new `BlockingPool`.
205*bb4ee6a4SAndroid Build Coastguard Worker     ///
206*bb4ee6a4SAndroid Build Coastguard Worker     /// The `BlockingPool` will never spawn more than `max_threads` threads to do work, regardless
207*bb4ee6a4SAndroid Build Coastguard Worker     /// of the number of tasks that are added to it. This value should be set relatively low (for
208*bb4ee6a4SAndroid Build Coastguard Worker     /// example, the number of CPUs on the machine) if the pool is intended to run CPU intensive
209*bb4ee6a4SAndroid Build Coastguard Worker     /// work or it should be set relatively high (128 or more) if the pool is intended to be used
210*bb4ee6a4SAndroid Build Coastguard Worker     /// for various IO operations that cannot be completed asynchronously. The default value is 256.
211*bb4ee6a4SAndroid Build Coastguard Worker     ///
212*bb4ee6a4SAndroid Build Coastguard Worker     /// Worker threads are spawned on demand when new work is added to the pool and will
213*bb4ee6a4SAndroid Build Coastguard Worker     /// automatically exit after being idle for some time so there is no overhead for setting
214*bb4ee6a4SAndroid Build Coastguard Worker     /// `max_threads` to a large value when there is little to no work assigned to the
215*bb4ee6a4SAndroid Build Coastguard Worker     /// `BlockingPool`. `keepalive` determines the idle duration after which the worker thread will
216*bb4ee6a4SAndroid Build Coastguard Worker     /// exit. The default value is 10 seconds.
new(max_threads: usize, keepalive: Duration) -> BlockingPool217*bb4ee6a4SAndroid Build Coastguard Worker     pub fn new(max_threads: usize, keepalive: Duration) -> BlockingPool {
218*bb4ee6a4SAndroid Build Coastguard Worker         let (exit, exited_threads) = channel();
219*bb4ee6a4SAndroid Build Coastguard Worker         BlockingPool {
220*bb4ee6a4SAndroid Build Coastguard Worker             inner: Arc::new(Inner {
221*bb4ee6a4SAndroid Build Coastguard Worker                 state: Mutex::new(State {
222*bb4ee6a4SAndroid Build Coastguard Worker                     tasks: VecDeque::new(),
223*bb4ee6a4SAndroid Build Coastguard Worker                     num_threads: 0,
224*bb4ee6a4SAndroid Build Coastguard Worker                     num_idle: 0,
225*bb4ee6a4SAndroid Build Coastguard Worker                     num_notified: 0,
226*bb4ee6a4SAndroid Build Coastguard Worker                     worker_threads: Slab::new(),
227*bb4ee6a4SAndroid Build Coastguard Worker                     exited_threads: Some(exited_threads),
228*bb4ee6a4SAndroid Build Coastguard Worker                     exit,
229*bb4ee6a4SAndroid Build Coastguard Worker                     shutting_down: false,
230*bb4ee6a4SAndroid Build Coastguard Worker                 }),
231*bb4ee6a4SAndroid Build Coastguard Worker                 condvar: Condvar::new(),
232*bb4ee6a4SAndroid Build Coastguard Worker                 max_threads,
233*bb4ee6a4SAndroid Build Coastguard Worker                 keepalive,
234*bb4ee6a4SAndroid Build Coastguard Worker             }),
235*bb4ee6a4SAndroid Build Coastguard Worker         }
236*bb4ee6a4SAndroid Build Coastguard Worker     }
237*bb4ee6a4SAndroid Build Coastguard Worker 
238*bb4ee6a4SAndroid Build Coastguard Worker     /// Like new but with pre-allocating capacity for up to `max_threads`.
with_capacity(max_threads: usize, keepalive: Duration) -> BlockingPool239*bb4ee6a4SAndroid Build Coastguard Worker     pub fn with_capacity(max_threads: usize, keepalive: Duration) -> BlockingPool {
240*bb4ee6a4SAndroid Build Coastguard Worker         let (exit, exited_threads) = channel();
241*bb4ee6a4SAndroid Build Coastguard Worker         BlockingPool {
242*bb4ee6a4SAndroid Build Coastguard Worker             inner: Arc::new(Inner {
243*bb4ee6a4SAndroid Build Coastguard Worker                 state: Mutex::new(State {
244*bb4ee6a4SAndroid Build Coastguard Worker                     tasks: VecDeque::new(),
245*bb4ee6a4SAndroid Build Coastguard Worker                     num_threads: 0,
246*bb4ee6a4SAndroid Build Coastguard Worker                     num_idle: 0,
247*bb4ee6a4SAndroid Build Coastguard Worker                     num_notified: 0,
248*bb4ee6a4SAndroid Build Coastguard Worker                     worker_threads: Slab::with_capacity(max_threads),
249*bb4ee6a4SAndroid Build Coastguard Worker                     exited_threads: Some(exited_threads),
250*bb4ee6a4SAndroid Build Coastguard Worker                     exit,
251*bb4ee6a4SAndroid Build Coastguard Worker                     shutting_down: false,
252*bb4ee6a4SAndroid Build Coastguard Worker                 }),
253*bb4ee6a4SAndroid Build Coastguard Worker                 condvar: Condvar::new(),
254*bb4ee6a4SAndroid Build Coastguard Worker                 max_threads,
255*bb4ee6a4SAndroid Build Coastguard Worker                 keepalive,
256*bb4ee6a4SAndroid Build Coastguard Worker             }),
257*bb4ee6a4SAndroid Build Coastguard Worker         }
258*bb4ee6a4SAndroid Build Coastguard Worker     }
259*bb4ee6a4SAndroid Build Coastguard Worker 
260*bb4ee6a4SAndroid Build Coastguard Worker     /// Spawn a task to run in the `BlockingPool`.
261*bb4ee6a4SAndroid Build Coastguard Worker     ///
262*bb4ee6a4SAndroid Build Coastguard Worker     /// Callers may `await` the returned `Future` to be notified when the work is completed.
263*bb4ee6a4SAndroid Build Coastguard Worker     /// Dropping the future will not cancel the task.
264*bb4ee6a4SAndroid Build Coastguard Worker     ///
265*bb4ee6a4SAndroid Build Coastguard Worker     /// # Panics
266*bb4ee6a4SAndroid Build Coastguard Worker     ///
267*bb4ee6a4SAndroid Build Coastguard Worker     /// `await`ing a `Task` after dropping the `BlockingPool` or calling `BlockingPool::shutdown`
268*bb4ee6a4SAndroid Build Coastguard Worker     /// will panic if the work was not completed before the pool was shut down.
spawn<F, R>(&self, f: F) -> impl Future<Output = R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static,269*bb4ee6a4SAndroid Build Coastguard Worker     pub fn spawn<F, R>(&self, f: F) -> impl Future<Output = R>
270*bb4ee6a4SAndroid Build Coastguard Worker     where
271*bb4ee6a4SAndroid Build Coastguard Worker         F: FnOnce() -> R + Send + 'static,
272*bb4ee6a4SAndroid Build Coastguard Worker         R: Send + 'static,
273*bb4ee6a4SAndroid Build Coastguard Worker     {
274*bb4ee6a4SAndroid Build Coastguard Worker         self.inner.spawn(f)
275*bb4ee6a4SAndroid Build Coastguard Worker     }
276*bb4ee6a4SAndroid Build Coastguard Worker 
277*bb4ee6a4SAndroid Build Coastguard Worker     /// Shut down the `BlockingPool`.
278*bb4ee6a4SAndroid Build Coastguard Worker     ///
279*bb4ee6a4SAndroid Build Coastguard Worker     /// If `deadline` is provided then this will block until either all worker threads exit or the
280*bb4ee6a4SAndroid Build Coastguard Worker     /// deadline is exceeded. If `deadline` is not given then this will block indefinitely until all
281*bb4ee6a4SAndroid Build Coastguard Worker     /// worker threads exit. Any work that was added to the `BlockingPool` but not yet picked up by
282*bb4ee6a4SAndroid Build Coastguard Worker     /// a worker thread will not complete and `await`ing on the `Task` for that work will panic.
shutdown(&self, deadline: Option<Instant>) -> Result<(), ShutdownTimedOut>283*bb4ee6a4SAndroid Build Coastguard Worker     pub fn shutdown(&self, deadline: Option<Instant>) -> Result<(), ShutdownTimedOut> {
284*bb4ee6a4SAndroid Build Coastguard Worker         let mut state = self.inner.state.lock();
285*bb4ee6a4SAndroid Build Coastguard Worker 
286*bb4ee6a4SAndroid Build Coastguard Worker         if state.shutting_down {
287*bb4ee6a4SAndroid Build Coastguard Worker             // We've already shut down this BlockingPool.
288*bb4ee6a4SAndroid Build Coastguard Worker             return Ok(());
289*bb4ee6a4SAndroid Build Coastguard Worker         }
290*bb4ee6a4SAndroid Build Coastguard Worker 
291*bb4ee6a4SAndroid Build Coastguard Worker         state.shutting_down = true;
292*bb4ee6a4SAndroid Build Coastguard Worker         let exited_threads = state.exited_threads.take().expect("exited_threads missing");
293*bb4ee6a4SAndroid Build Coastguard Worker         let unfinished_tasks = std::mem::take(&mut state.tasks);
294*bb4ee6a4SAndroid Build Coastguard Worker         let mut worker_threads = mem::replace(&mut state.worker_threads, Slab::new());
295*bb4ee6a4SAndroid Build Coastguard Worker         drop(state);
296*bb4ee6a4SAndroid Build Coastguard Worker 
297*bb4ee6a4SAndroid Build Coastguard Worker         self.inner.condvar.notify_all();
298*bb4ee6a4SAndroid Build Coastguard Worker 
299*bb4ee6a4SAndroid Build Coastguard Worker         // Cancel any unfinished work after releasing the lock.
300*bb4ee6a4SAndroid Build Coastguard Worker         drop(unfinished_tasks);
301*bb4ee6a4SAndroid Build Coastguard Worker 
302*bb4ee6a4SAndroid Build Coastguard Worker         // Now wait for all worker threads to exit.
303*bb4ee6a4SAndroid Build Coastguard Worker         if let Some(deadline) = deadline {
304*bb4ee6a4SAndroid Build Coastguard Worker             let mut now = Instant::now();
305*bb4ee6a4SAndroid Build Coastguard Worker             while now < deadline && !worker_threads.is_empty() {
306*bb4ee6a4SAndroid Build Coastguard Worker                 if let Ok(idx) = exited_threads.recv_timeout(deadline - now) {
307*bb4ee6a4SAndroid Build Coastguard Worker                     let _ = worker_threads.remove(idx).join();
308*bb4ee6a4SAndroid Build Coastguard Worker                 }
309*bb4ee6a4SAndroid Build Coastguard Worker                 now = Instant::now();
310*bb4ee6a4SAndroid Build Coastguard Worker             }
311*bb4ee6a4SAndroid Build Coastguard Worker 
312*bb4ee6a4SAndroid Build Coastguard Worker             // Any threads that have not yet joined will just be detached.
313*bb4ee6a4SAndroid Build Coastguard Worker             if !worker_threads.is_empty() {
314*bb4ee6a4SAndroid Build Coastguard Worker                 return Err(ShutdownTimedOut(worker_threads.len()));
315*bb4ee6a4SAndroid Build Coastguard Worker             }
316*bb4ee6a4SAndroid Build Coastguard Worker 
317*bb4ee6a4SAndroid Build Coastguard Worker             Ok(())
318*bb4ee6a4SAndroid Build Coastguard Worker         } else {
319*bb4ee6a4SAndroid Build Coastguard Worker             // Block indefinitely until all worker threads exit.
320*bb4ee6a4SAndroid Build Coastguard Worker             for handle in worker_threads.drain() {
321*bb4ee6a4SAndroid Build Coastguard Worker                 let _ = handle.join();
322*bb4ee6a4SAndroid Build Coastguard Worker             }
323*bb4ee6a4SAndroid Build Coastguard Worker 
324*bb4ee6a4SAndroid Build Coastguard Worker             Ok(())
325*bb4ee6a4SAndroid Build Coastguard Worker         }
326*bb4ee6a4SAndroid Build Coastguard Worker     }
327*bb4ee6a4SAndroid Build Coastguard Worker 
328*bb4ee6a4SAndroid Build Coastguard Worker     #[cfg(test)]
shutting_down(&self) -> bool329*bb4ee6a4SAndroid Build Coastguard Worker     pub(crate) fn shutting_down(&self) -> bool {
330*bb4ee6a4SAndroid Build Coastguard Worker         self.inner.state.lock().shutting_down
331*bb4ee6a4SAndroid Build Coastguard Worker     }
332*bb4ee6a4SAndroid Build Coastguard Worker }
333*bb4ee6a4SAndroid Build Coastguard Worker 
334*bb4ee6a4SAndroid Build Coastguard Worker impl Default for BlockingPool {
default() -> BlockingPool335*bb4ee6a4SAndroid Build Coastguard Worker     fn default() -> BlockingPool {
336*bb4ee6a4SAndroid Build Coastguard Worker         BlockingPool::new(256, Duration::from_secs(10))
337*bb4ee6a4SAndroid Build Coastguard Worker     }
338*bb4ee6a4SAndroid Build Coastguard Worker }
339*bb4ee6a4SAndroid Build Coastguard Worker 
340*bb4ee6a4SAndroid Build Coastguard Worker impl Drop for BlockingPool {
drop(&mut self)341*bb4ee6a4SAndroid Build Coastguard Worker     fn drop(&mut self) {
342*bb4ee6a4SAndroid Build Coastguard Worker         if let Err(e) = self.shutdown(Some(Instant::now() + DEFAULT_SHUTDOWN_TIMEOUT)) {
343*bb4ee6a4SAndroid Build Coastguard Worker             warn!("{}", e);
344*bb4ee6a4SAndroid Build Coastguard Worker         }
345*bb4ee6a4SAndroid Build Coastguard Worker     }
346*bb4ee6a4SAndroid Build Coastguard Worker }
347*bb4ee6a4SAndroid Build Coastguard Worker 
348*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(test)]
349*bb4ee6a4SAndroid Build Coastguard Worker mod test {
350*bb4ee6a4SAndroid Build Coastguard Worker     use std::sync::Arc;
351*bb4ee6a4SAndroid Build Coastguard Worker     use std::sync::Barrier;
352*bb4ee6a4SAndroid Build Coastguard Worker     use std::thread;
353*bb4ee6a4SAndroid Build Coastguard Worker     use std::time::Duration;
354*bb4ee6a4SAndroid Build Coastguard Worker     use std::time::Instant;
355*bb4ee6a4SAndroid Build Coastguard Worker 
356*bb4ee6a4SAndroid Build Coastguard Worker     use futures::executor::block_on;
357*bb4ee6a4SAndroid Build Coastguard Worker     use futures::stream::FuturesUnordered;
358*bb4ee6a4SAndroid Build Coastguard Worker     use futures::StreamExt;
359*bb4ee6a4SAndroid Build Coastguard Worker     use sync::Condvar;
360*bb4ee6a4SAndroid Build Coastguard Worker     use sync::Mutex;
361*bb4ee6a4SAndroid Build Coastguard Worker 
362*bb4ee6a4SAndroid Build Coastguard Worker     use super::super::super::BlockingPool;
363*bb4ee6a4SAndroid Build Coastguard Worker 
364*bb4ee6a4SAndroid Build Coastguard Worker     #[test]
blocking_sleep()365*bb4ee6a4SAndroid Build Coastguard Worker     fn blocking_sleep() {
366*bb4ee6a4SAndroid Build Coastguard Worker         let pool = BlockingPool::default();
367*bb4ee6a4SAndroid Build Coastguard Worker 
368*bb4ee6a4SAndroid Build Coastguard Worker         let res = block_on(pool.spawn(|| 42));
369*bb4ee6a4SAndroid Build Coastguard Worker         assert_eq!(res, 42);
370*bb4ee6a4SAndroid Build Coastguard Worker     }
371*bb4ee6a4SAndroid Build Coastguard Worker 
372*bb4ee6a4SAndroid Build Coastguard Worker     #[test]
drop_doesnt_block()373*bb4ee6a4SAndroid Build Coastguard Worker     fn drop_doesnt_block() {
374*bb4ee6a4SAndroid Build Coastguard Worker         let pool = BlockingPool::default();
375*bb4ee6a4SAndroid Build Coastguard Worker         let (tx, rx) = std::sync::mpsc::sync_channel(0);
376*bb4ee6a4SAndroid Build Coastguard Worker         // The blocking work should continue even though we drop the future.
377*bb4ee6a4SAndroid Build Coastguard Worker         //
378*bb4ee6a4SAndroid Build Coastguard Worker         // If we cancelled the work, then the recv call would fail. If we blocked on the work, then
379*bb4ee6a4SAndroid Build Coastguard Worker         // the send would never complete because the channel is size zero and so waits for a
380*bb4ee6a4SAndroid Build Coastguard Worker         // matching recv call.
381*bb4ee6a4SAndroid Build Coastguard Worker         std::mem::drop(pool.spawn(move || tx.send(()).unwrap()));
382*bb4ee6a4SAndroid Build Coastguard Worker         rx.recv().unwrap();
383*bb4ee6a4SAndroid Build Coastguard Worker     }
384*bb4ee6a4SAndroid Build Coastguard Worker 
385*bb4ee6a4SAndroid Build Coastguard Worker     #[test]
fast_tasks_with_short_keepalive()386*bb4ee6a4SAndroid Build Coastguard Worker     fn fast_tasks_with_short_keepalive() {
387*bb4ee6a4SAndroid Build Coastguard Worker         let pool = BlockingPool::new(256, Duration::from_millis(1));
388*bb4ee6a4SAndroid Build Coastguard Worker 
389*bb4ee6a4SAndroid Build Coastguard Worker         let streams = FuturesUnordered::new();
390*bb4ee6a4SAndroid Build Coastguard Worker         for _ in 0..2 {
391*bb4ee6a4SAndroid Build Coastguard Worker             for _ in 0..256 {
392*bb4ee6a4SAndroid Build Coastguard Worker                 let task = pool.spawn(|| ());
393*bb4ee6a4SAndroid Build Coastguard Worker                 streams.push(task);
394*bb4ee6a4SAndroid Build Coastguard Worker             }
395*bb4ee6a4SAndroid Build Coastguard Worker 
396*bb4ee6a4SAndroid Build Coastguard Worker             thread::sleep(Duration::from_millis(1));
397*bb4ee6a4SAndroid Build Coastguard Worker         }
398*bb4ee6a4SAndroid Build Coastguard Worker 
399*bb4ee6a4SAndroid Build Coastguard Worker         block_on(streams.collect::<Vec<_>>());
400*bb4ee6a4SAndroid Build Coastguard Worker 
401*bb4ee6a4SAndroid Build Coastguard Worker         // The test passes if there are no panics, which would happen if one of the worker threads
402*bb4ee6a4SAndroid Build Coastguard Worker         // triggered an underflow on `pool.inner.state.num_idle`.
403*bb4ee6a4SAndroid Build Coastguard Worker     }
404*bb4ee6a4SAndroid Build Coastguard Worker 
405*bb4ee6a4SAndroid Build Coastguard Worker     #[test]
more_tasks_than_threads()406*bb4ee6a4SAndroid Build Coastguard Worker     fn more_tasks_than_threads() {
407*bb4ee6a4SAndroid Build Coastguard Worker         let pool = BlockingPool::new(4, Duration::from_secs(10));
408*bb4ee6a4SAndroid Build Coastguard Worker 
409*bb4ee6a4SAndroid Build Coastguard Worker         let stream = (0..19)
410*bb4ee6a4SAndroid Build Coastguard Worker             .map(|_| pool.spawn(|| thread::sleep(Duration::from_millis(5))))
411*bb4ee6a4SAndroid Build Coastguard Worker             .collect::<FuturesUnordered<_>>();
412*bb4ee6a4SAndroid Build Coastguard Worker 
413*bb4ee6a4SAndroid Build Coastguard Worker         let results = block_on(stream.collect::<Vec<_>>());
414*bb4ee6a4SAndroid Build Coastguard Worker         assert_eq!(results.len(), 19);
415*bb4ee6a4SAndroid Build Coastguard Worker     }
416*bb4ee6a4SAndroid Build Coastguard Worker 
417*bb4ee6a4SAndroid Build Coastguard Worker     #[test]
shutdown()418*bb4ee6a4SAndroid Build Coastguard Worker     fn shutdown() {
419*bb4ee6a4SAndroid Build Coastguard Worker         let pool = BlockingPool::default();
420*bb4ee6a4SAndroid Build Coastguard Worker 
421*bb4ee6a4SAndroid Build Coastguard Worker         let stream = (0..19)
422*bb4ee6a4SAndroid Build Coastguard Worker             .map(|_| pool.spawn(|| thread::sleep(Duration::from_millis(5))))
423*bb4ee6a4SAndroid Build Coastguard Worker             .collect::<FuturesUnordered<_>>();
424*bb4ee6a4SAndroid Build Coastguard Worker 
425*bb4ee6a4SAndroid Build Coastguard Worker         let results = block_on(stream.collect::<Vec<_>>());
426*bb4ee6a4SAndroid Build Coastguard Worker         assert_eq!(results.len(), 19);
427*bb4ee6a4SAndroid Build Coastguard Worker 
428*bb4ee6a4SAndroid Build Coastguard Worker         pool.shutdown(Some(Instant::now() + Duration::from_secs(10)))
429*bb4ee6a4SAndroid Build Coastguard Worker             .unwrap();
430*bb4ee6a4SAndroid Build Coastguard Worker         let state = pool.inner.state.lock();
431*bb4ee6a4SAndroid Build Coastguard Worker         assert_eq!(state.num_threads, 0);
432*bb4ee6a4SAndroid Build Coastguard Worker     }
433*bb4ee6a4SAndroid Build Coastguard Worker 
434*bb4ee6a4SAndroid Build Coastguard Worker     #[test]
keepalive_timeout()435*bb4ee6a4SAndroid Build Coastguard Worker     fn keepalive_timeout() {
436*bb4ee6a4SAndroid Build Coastguard Worker         // Set the keepalive to a very low value so that threads will exit soon after they run out
437*bb4ee6a4SAndroid Build Coastguard Worker         // of work.
438*bb4ee6a4SAndroid Build Coastguard Worker         let pool = BlockingPool::new(7, Duration::from_millis(1));
439*bb4ee6a4SAndroid Build Coastguard Worker 
440*bb4ee6a4SAndroid Build Coastguard Worker         let stream = (0..19)
441*bb4ee6a4SAndroid Build Coastguard Worker             .map(|_| pool.spawn(|| thread::sleep(Duration::from_millis(5))))
442*bb4ee6a4SAndroid Build Coastguard Worker             .collect::<FuturesUnordered<_>>();
443*bb4ee6a4SAndroid Build Coastguard Worker 
444*bb4ee6a4SAndroid Build Coastguard Worker         let results = block_on(stream.collect::<Vec<_>>());
445*bb4ee6a4SAndroid Build Coastguard Worker         assert_eq!(results.len(), 19);
446*bb4ee6a4SAndroid Build Coastguard Worker 
447*bb4ee6a4SAndroid Build Coastguard Worker         // Wait for all threads to exit.
448*bb4ee6a4SAndroid Build Coastguard Worker         let deadline = Instant::now() + Duration::from_secs(10);
449*bb4ee6a4SAndroid Build Coastguard Worker         while Instant::now() < deadline {
450*bb4ee6a4SAndroid Build Coastguard Worker             thread::sleep(Duration::from_millis(100));
451*bb4ee6a4SAndroid Build Coastguard Worker             let state = pool.inner.state.lock();
452*bb4ee6a4SAndroid Build Coastguard Worker             if state.num_threads == 0 {
453*bb4ee6a4SAndroid Build Coastguard Worker                 break;
454*bb4ee6a4SAndroid Build Coastguard Worker             }
455*bb4ee6a4SAndroid Build Coastguard Worker         }
456*bb4ee6a4SAndroid Build Coastguard Worker 
457*bb4ee6a4SAndroid Build Coastguard Worker         {
458*bb4ee6a4SAndroid Build Coastguard Worker             let state = pool.inner.state.lock();
459*bb4ee6a4SAndroid Build Coastguard Worker             assert_eq!(state.num_threads, 0);
460*bb4ee6a4SAndroid Build Coastguard Worker             assert_eq!(state.num_idle, 0);
461*bb4ee6a4SAndroid Build Coastguard Worker         }
462*bb4ee6a4SAndroid Build Coastguard Worker     }
463*bb4ee6a4SAndroid Build Coastguard Worker 
464*bb4ee6a4SAndroid Build Coastguard Worker     #[test]
465*bb4ee6a4SAndroid Build Coastguard Worker     #[should_panic]
shutdown_with_pending_work()466*bb4ee6a4SAndroid Build Coastguard Worker     fn shutdown_with_pending_work() {
467*bb4ee6a4SAndroid Build Coastguard Worker         let pool = BlockingPool::new(1, Duration::from_secs(10));
468*bb4ee6a4SAndroid Build Coastguard Worker 
469*bb4ee6a4SAndroid Build Coastguard Worker         let mu = Arc::new(Mutex::new(false));
470*bb4ee6a4SAndroid Build Coastguard Worker         let cv = Arc::new(Condvar::new());
471*bb4ee6a4SAndroid Build Coastguard Worker 
472*bb4ee6a4SAndroid Build Coastguard Worker         // First spawn a thread that blocks the pool.
473*bb4ee6a4SAndroid Build Coastguard Worker         let task_mu = mu.clone();
474*bb4ee6a4SAndroid Build Coastguard Worker         let task_cv = cv.clone();
475*bb4ee6a4SAndroid Build Coastguard Worker         let _blocking_task = pool.spawn(move || {
476*bb4ee6a4SAndroid Build Coastguard Worker             let mut ready = task_mu.lock();
477*bb4ee6a4SAndroid Build Coastguard Worker             while !*ready {
478*bb4ee6a4SAndroid Build Coastguard Worker                 ready = task_cv.wait(ready);
479*bb4ee6a4SAndroid Build Coastguard Worker             }
480*bb4ee6a4SAndroid Build Coastguard Worker         });
481*bb4ee6a4SAndroid Build Coastguard Worker 
482*bb4ee6a4SAndroid Build Coastguard Worker         // This task will never finish because we will shut down the pool first.
483*bb4ee6a4SAndroid Build Coastguard Worker         let unfinished = pool.spawn(|| 5);
484*bb4ee6a4SAndroid Build Coastguard Worker 
485*bb4ee6a4SAndroid Build Coastguard Worker         // Spawn a thread to unblock the work we started earlier once it sees that the pool is
486*bb4ee6a4SAndroid Build Coastguard Worker         // shutting down.
487*bb4ee6a4SAndroid Build Coastguard Worker         let inner = pool.inner.clone();
488*bb4ee6a4SAndroid Build Coastguard Worker         thread::spawn(move || {
489*bb4ee6a4SAndroid Build Coastguard Worker             let mut state = inner.state.lock();
490*bb4ee6a4SAndroid Build Coastguard Worker             while !state.shutting_down {
491*bb4ee6a4SAndroid Build Coastguard Worker                 state = inner.condvar.wait(state);
492*bb4ee6a4SAndroid Build Coastguard Worker             }
493*bb4ee6a4SAndroid Build Coastguard Worker 
494*bb4ee6a4SAndroid Build Coastguard Worker             *mu.lock() = true;
495*bb4ee6a4SAndroid Build Coastguard Worker             cv.notify_all();
496*bb4ee6a4SAndroid Build Coastguard Worker         });
497*bb4ee6a4SAndroid Build Coastguard Worker         pool.shutdown(None).unwrap();
498*bb4ee6a4SAndroid Build Coastguard Worker 
499*bb4ee6a4SAndroid Build Coastguard Worker         // This should panic.
500*bb4ee6a4SAndroid Build Coastguard Worker         assert_eq!(block_on(unfinished), 5);
501*bb4ee6a4SAndroid Build Coastguard Worker     }
502*bb4ee6a4SAndroid Build Coastguard Worker 
503*bb4ee6a4SAndroid Build Coastguard Worker     #[test]
unfinished_worker_thread()504*bb4ee6a4SAndroid Build Coastguard Worker     fn unfinished_worker_thread() {
505*bb4ee6a4SAndroid Build Coastguard Worker         let pool = BlockingPool::default();
506*bb4ee6a4SAndroid Build Coastguard Worker 
507*bb4ee6a4SAndroid Build Coastguard Worker         let ready = Arc::new(Mutex::new(false));
508*bb4ee6a4SAndroid Build Coastguard Worker         let cv = Arc::new(Condvar::new());
509*bb4ee6a4SAndroid Build Coastguard Worker         let barrier = Arc::new(Barrier::new(2));
510*bb4ee6a4SAndroid Build Coastguard Worker 
511*bb4ee6a4SAndroid Build Coastguard Worker         let thread_ready = ready.clone();
512*bb4ee6a4SAndroid Build Coastguard Worker         let thread_barrier = barrier.clone();
513*bb4ee6a4SAndroid Build Coastguard Worker         let thread_cv = cv.clone();
514*bb4ee6a4SAndroid Build Coastguard Worker 
515*bb4ee6a4SAndroid Build Coastguard Worker         let task = pool.spawn(move || {
516*bb4ee6a4SAndroid Build Coastguard Worker             thread_barrier.wait();
517*bb4ee6a4SAndroid Build Coastguard Worker             let mut ready = thread_ready.lock();
518*bb4ee6a4SAndroid Build Coastguard Worker             while !*ready {
519*bb4ee6a4SAndroid Build Coastguard Worker                 ready = thread_cv.wait(ready);
520*bb4ee6a4SAndroid Build Coastguard Worker             }
521*bb4ee6a4SAndroid Build Coastguard Worker         });
522*bb4ee6a4SAndroid Build Coastguard Worker 
523*bb4ee6a4SAndroid Build Coastguard Worker         // Wait to shut down the pool until after the worker thread has started.
524*bb4ee6a4SAndroid Build Coastguard Worker         barrier.wait();
525*bb4ee6a4SAndroid Build Coastguard Worker         pool.shutdown(Some(Instant::now() + Duration::from_millis(5)))
526*bb4ee6a4SAndroid Build Coastguard Worker             .unwrap_err();
527*bb4ee6a4SAndroid Build Coastguard Worker 
528*bb4ee6a4SAndroid Build Coastguard Worker         let num_threads = pool.inner.state.lock().num_threads;
529*bb4ee6a4SAndroid Build Coastguard Worker         assert_eq!(num_threads, 1);
530*bb4ee6a4SAndroid Build Coastguard Worker 
531*bb4ee6a4SAndroid Build Coastguard Worker         // Now wake up the blocked task so we don't leak the thread.
532*bb4ee6a4SAndroid Build Coastguard Worker         *ready.lock() = true;
533*bb4ee6a4SAndroid Build Coastguard Worker         cv.notify_all();
534*bb4ee6a4SAndroid Build Coastguard Worker 
535*bb4ee6a4SAndroid Build Coastguard Worker         block_on(task);
536*bb4ee6a4SAndroid Build Coastguard Worker 
537*bb4ee6a4SAndroid Build Coastguard Worker         let deadline = Instant::now() + Duration::from_secs(10);
538*bb4ee6a4SAndroid Build Coastguard Worker         while Instant::now() < deadline {
539*bb4ee6a4SAndroid Build Coastguard Worker             thread::sleep(Duration::from_millis(100));
540*bb4ee6a4SAndroid Build Coastguard Worker             let state = pool.inner.state.lock();
541*bb4ee6a4SAndroid Build Coastguard Worker             if state.num_threads == 0 {
542*bb4ee6a4SAndroid Build Coastguard Worker                 break;
543*bb4ee6a4SAndroid Build Coastguard Worker             }
544*bb4ee6a4SAndroid Build Coastguard Worker         }
545*bb4ee6a4SAndroid Build Coastguard Worker 
546*bb4ee6a4SAndroid Build Coastguard Worker         {
547*bb4ee6a4SAndroid Build Coastguard Worker             let state = pool.inner.state.lock();
548*bb4ee6a4SAndroid Build Coastguard Worker             assert_eq!(state.num_threads, 0);
549*bb4ee6a4SAndroid Build Coastguard Worker             assert_eq!(state.num_idle, 0);
550*bb4ee6a4SAndroid Build Coastguard Worker         }
551*bb4ee6a4SAndroid Build Coastguard Worker     }
552*bb4ee6a4SAndroid Build Coastguard Worker }
553