1*bb4ee6a4SAndroid Build Coastguard Worker // Copyright 2022 The ChromiumOS Authors 2*bb4ee6a4SAndroid Build Coastguard Worker // Use of this source code is governed by a BSD-style license that can be 3*bb4ee6a4SAndroid Build Coastguard Worker // found in the LICENSE file. 4*bb4ee6a4SAndroid Build Coastguard Worker 5*bb4ee6a4SAndroid Build Coastguard Worker //! Multi-thread worker. 6*bb4ee6a4SAndroid Build Coastguard Worker 7*bb4ee6a4SAndroid Build Coastguard Worker #![deny(missing_docs)] 8*bb4ee6a4SAndroid Build Coastguard Worker 9*bb4ee6a4SAndroid Build Coastguard Worker use std::collections::VecDeque; 10*bb4ee6a4SAndroid Build Coastguard Worker use std::sync::atomic::AtomicBool; 11*bb4ee6a4SAndroid Build Coastguard Worker use std::sync::atomic::Ordering; 12*bb4ee6a4SAndroid Build Coastguard Worker use std::sync::Arc; 13*bb4ee6a4SAndroid Build Coastguard Worker use std::thread; 14*bb4ee6a4SAndroid Build Coastguard Worker use std::time::Duration; 15*bb4ee6a4SAndroid Build Coastguard Worker 16*bb4ee6a4SAndroid Build Coastguard Worker use anyhow::Context; 17*bb4ee6a4SAndroid Build Coastguard Worker use base::error; 18*bb4ee6a4SAndroid Build Coastguard Worker use base::Event; 19*bb4ee6a4SAndroid Build Coastguard Worker use base::EventWaitResult; 20*bb4ee6a4SAndroid Build Coastguard Worker use sync::Condvar; 21*bb4ee6a4SAndroid Build Coastguard Worker use sync::Mutex; 22*bb4ee6a4SAndroid Build Coastguard Worker 23*bb4ee6a4SAndroid Build Coastguard Worker /// Task to run on the worker threads. 24*bb4ee6a4SAndroid Build Coastguard Worker pub trait Task { 25*bb4ee6a4SAndroid Build Coastguard Worker /// Executes the task. execute(self)26*bb4ee6a4SAndroid Build Coastguard Worker fn execute(self); 27*bb4ee6a4SAndroid Build Coastguard Worker } 28*bb4ee6a4SAndroid Build Coastguard Worker 29*bb4ee6a4SAndroid Build Coastguard Worker /// Multi thread based worker executing a single type [Task]. 30*bb4ee6a4SAndroid Build Coastguard Worker /// 31*bb4ee6a4SAndroid Build Coastguard Worker /// See the doc of [Channel] as well for the behaviors of it. 32*bb4ee6a4SAndroid Build Coastguard Worker pub struct Worker<T> { 33*bb4ee6a4SAndroid Build Coastguard Worker /// Shared [Channel] with the worker threads. 34*bb4ee6a4SAndroid Build Coastguard Worker pub channel: Arc<Channel<T>>, 35*bb4ee6a4SAndroid Build Coastguard Worker handles: Vec<thread::JoinHandle<()>>, 36*bb4ee6a4SAndroid Build Coastguard Worker } 37*bb4ee6a4SAndroid Build Coastguard Worker 38*bb4ee6a4SAndroid Build Coastguard Worker impl<T: Task + Send + 'static> Worker<T> { 39*bb4ee6a4SAndroid Build Coastguard Worker /// Spawns the numbers of worker threads. new(len_channel: usize, n_workers: usize) -> Self40*bb4ee6a4SAndroid Build Coastguard Worker pub fn new(len_channel: usize, n_workers: usize) -> Self { 41*bb4ee6a4SAndroid Build Coastguard Worker let channel = Arc::new(Channel::<T>::new(len_channel, n_workers)); 42*bb4ee6a4SAndroid Build Coastguard Worker let mut handles = Vec::with_capacity(n_workers); 43*bb4ee6a4SAndroid Build Coastguard Worker for _ in 0..n_workers { 44*bb4ee6a4SAndroid Build Coastguard Worker let context = channel.clone(); 45*bb4ee6a4SAndroid Build Coastguard Worker let handle = thread::spawn(move || { 46*bb4ee6a4SAndroid Build Coastguard Worker Self::worker_thread(context); 47*bb4ee6a4SAndroid Build Coastguard Worker }); 48*bb4ee6a4SAndroid Build Coastguard Worker handles.push(handle); 49*bb4ee6a4SAndroid Build Coastguard Worker } 50*bb4ee6a4SAndroid Build Coastguard Worker Self { channel, handles } 51*bb4ee6a4SAndroid Build Coastguard Worker } 52*bb4ee6a4SAndroid Build Coastguard Worker worker_thread(context: Arc<Channel<T>>)53*bb4ee6a4SAndroid Build Coastguard Worker fn worker_thread(context: Arc<Channel<T>>) { 54*bb4ee6a4SAndroid Build Coastguard Worker while let Some(task) = context.pop() { 55*bb4ee6a4SAndroid Build Coastguard Worker task.execute(); 56*bb4ee6a4SAndroid Build Coastguard Worker } 57*bb4ee6a4SAndroid Build Coastguard Worker } 58*bb4ee6a4SAndroid Build Coastguard Worker 59*bb4ee6a4SAndroid Build Coastguard Worker /// Closes the channel and wait for worker threads shutdown. 60*bb4ee6a4SAndroid Build Coastguard Worker /// 61*bb4ee6a4SAndroid Build Coastguard Worker /// This also waits for all the tasks in the channel to be executed. close(self)62*bb4ee6a4SAndroid Build Coastguard Worker pub fn close(self) { 63*bb4ee6a4SAndroid Build Coastguard Worker self.channel.close(); 64*bb4ee6a4SAndroid Build Coastguard Worker for handle in self.handles { 65*bb4ee6a4SAndroid Build Coastguard Worker match handle.join() { 66*bb4ee6a4SAndroid Build Coastguard Worker Ok(()) => {} 67*bb4ee6a4SAndroid Build Coastguard Worker Err(e) => { 68*bb4ee6a4SAndroid Build Coastguard Worker error!("failed to wait for worker thread: {:?}", e); 69*bb4ee6a4SAndroid Build Coastguard Worker } 70*bb4ee6a4SAndroid Build Coastguard Worker } 71*bb4ee6a4SAndroid Build Coastguard Worker } 72*bb4ee6a4SAndroid Build Coastguard Worker } 73*bb4ee6a4SAndroid Build Coastguard Worker } 74*bb4ee6a4SAndroid Build Coastguard Worker 75*bb4ee6a4SAndroid Build Coastguard Worker /// MPMC (Multi Producers Multi Consumers) queue integrated with [Worker]. 76*bb4ee6a4SAndroid Build Coastguard Worker /// 77*bb4ee6a4SAndroid Build Coastguard Worker /// [Channel] offers [Channel::wait_complete()] to guarantee all the tasks are executed. 78*bb4ee6a4SAndroid Build Coastguard Worker /// 79*bb4ee6a4SAndroid Build Coastguard Worker /// This only exposes methods for producers. 80*bb4ee6a4SAndroid Build Coastguard Worker pub struct Channel<T> { 81*bb4ee6a4SAndroid Build Coastguard Worker state: Mutex<ChannelState<T>>, 82*bb4ee6a4SAndroid Build Coastguard Worker consumer_wait: Condvar, 83*bb4ee6a4SAndroid Build Coastguard Worker producer_wait: Condvar, 84*bb4ee6a4SAndroid Build Coastguard Worker n_consumers: usize, 85*bb4ee6a4SAndroid Build Coastguard Worker } 86*bb4ee6a4SAndroid Build Coastguard Worker 87*bb4ee6a4SAndroid Build Coastguard Worker impl<T> Channel<T> { new(len: usize, n_consumers: usize) -> Self88*bb4ee6a4SAndroid Build Coastguard Worker fn new(len: usize, n_consumers: usize) -> Self { 89*bb4ee6a4SAndroid Build Coastguard Worker Self { 90*bb4ee6a4SAndroid Build Coastguard Worker state: Mutex::new(ChannelState::new(len)), 91*bb4ee6a4SAndroid Build Coastguard Worker consumer_wait: Condvar::new(), 92*bb4ee6a4SAndroid Build Coastguard Worker producer_wait: Condvar::new(), 93*bb4ee6a4SAndroid Build Coastguard Worker n_consumers, 94*bb4ee6a4SAndroid Build Coastguard Worker } 95*bb4ee6a4SAndroid Build Coastguard Worker } 96*bb4ee6a4SAndroid Build Coastguard Worker close(&self)97*bb4ee6a4SAndroid Build Coastguard Worker fn close(&self) { 98*bb4ee6a4SAndroid Build Coastguard Worker let mut state = self.state.lock(); 99*bb4ee6a4SAndroid Build Coastguard Worker state.is_closed = true; 100*bb4ee6a4SAndroid Build Coastguard Worker self.consumer_wait.notify_all(); 101*bb4ee6a4SAndroid Build Coastguard Worker self.producer_wait.notify_all(); 102*bb4ee6a4SAndroid Build Coastguard Worker } 103*bb4ee6a4SAndroid Build Coastguard Worker 104*bb4ee6a4SAndroid Build Coastguard Worker /// Pops a task from the channel. 105*bb4ee6a4SAndroid Build Coastguard Worker /// 106*bb4ee6a4SAndroid Build Coastguard Worker /// If the queue is closed and also **empty**, this returns [None]. This returns all the tasks 107*bb4ee6a4SAndroid Build Coastguard Worker /// in the queue even while this is closed. 108*bb4ee6a4SAndroid Build Coastguard Worker #[inline] pop(&self) -> Option<T>109*bb4ee6a4SAndroid Build Coastguard Worker fn pop(&self) -> Option<T> { 110*bb4ee6a4SAndroid Build Coastguard Worker let mut state = self.state.lock(); 111*bb4ee6a4SAndroid Build Coastguard Worker loop { 112*bb4ee6a4SAndroid Build Coastguard Worker let was_full = state.queue.len() == state.capacity; 113*bb4ee6a4SAndroid Build Coastguard Worker if let Some(item) = state.queue.pop_front() { 114*bb4ee6a4SAndroid Build Coastguard Worker if was_full { 115*bb4ee6a4SAndroid Build Coastguard Worker // notification for a producer waiting for `push()`. 116*bb4ee6a4SAndroid Build Coastguard Worker self.producer_wait.notify_one(); 117*bb4ee6a4SAndroid Build Coastguard Worker } 118*bb4ee6a4SAndroid Build Coastguard Worker return Some(item); 119*bb4ee6a4SAndroid Build Coastguard Worker } else { 120*bb4ee6a4SAndroid Build Coastguard Worker if state.is_closed { 121*bb4ee6a4SAndroid Build Coastguard Worker return None; 122*bb4ee6a4SAndroid Build Coastguard Worker } 123*bb4ee6a4SAndroid Build Coastguard Worker state.n_waiting += 1; 124*bb4ee6a4SAndroid Build Coastguard Worker if state.n_waiting == self.n_consumers { 125*bb4ee6a4SAndroid Build Coastguard Worker // notification for producers waiting for `wait_complete()`. 126*bb4ee6a4SAndroid Build Coastguard Worker self.producer_wait.notify_all(); 127*bb4ee6a4SAndroid Build Coastguard Worker } 128*bb4ee6a4SAndroid Build Coastguard Worker state = self.consumer_wait.wait(state); 129*bb4ee6a4SAndroid Build Coastguard Worker state.n_waiting -= 1; 130*bb4ee6a4SAndroid Build Coastguard Worker } 131*bb4ee6a4SAndroid Build Coastguard Worker } 132*bb4ee6a4SAndroid Build Coastguard Worker } 133*bb4ee6a4SAndroid Build Coastguard Worker 134*bb4ee6a4SAndroid Build Coastguard Worker /// Push a task. 135*bb4ee6a4SAndroid Build Coastguard Worker /// 136*bb4ee6a4SAndroid Build Coastguard Worker /// This blocks if the channel is full. 137*bb4ee6a4SAndroid Build Coastguard Worker /// 138*bb4ee6a4SAndroid Build Coastguard Worker /// If the channel is closed, this returns `false`. push(&self, item: T) -> bool139*bb4ee6a4SAndroid Build Coastguard Worker pub fn push(&self, item: T) -> bool { 140*bb4ee6a4SAndroid Build Coastguard Worker let mut state = self.state.lock(); 141*bb4ee6a4SAndroid Build Coastguard Worker // Wait until the queue has room to push a task. 142*bb4ee6a4SAndroid Build Coastguard Worker while state.queue.len() == state.capacity { 143*bb4ee6a4SAndroid Build Coastguard Worker if state.is_closed { 144*bb4ee6a4SAndroid Build Coastguard Worker return false; 145*bb4ee6a4SAndroid Build Coastguard Worker } 146*bb4ee6a4SAndroid Build Coastguard Worker state = self.producer_wait.wait(state); 147*bb4ee6a4SAndroid Build Coastguard Worker } 148*bb4ee6a4SAndroid Build Coastguard Worker if state.is_closed { 149*bb4ee6a4SAndroid Build Coastguard Worker return false; 150*bb4ee6a4SAndroid Build Coastguard Worker } 151*bb4ee6a4SAndroid Build Coastguard Worker state.queue.push_back(item); 152*bb4ee6a4SAndroid Build Coastguard Worker self.consumer_wait.notify_one(); 153*bb4ee6a4SAndroid Build Coastguard Worker true 154*bb4ee6a4SAndroid Build Coastguard Worker } 155*bb4ee6a4SAndroid Build Coastguard Worker 156*bb4ee6a4SAndroid Build Coastguard Worker /// Wait until all the tasks have been executed. 157*bb4ee6a4SAndroid Build Coastguard Worker /// 158*bb4ee6a4SAndroid Build Coastguard Worker /// This guarantees that all the tasks in this channel are not only consumed but also executed. wait_complete(&self)159*bb4ee6a4SAndroid Build Coastguard Worker pub fn wait_complete(&self) { 160*bb4ee6a4SAndroid Build Coastguard Worker let mut state = self.state.lock(); 161*bb4ee6a4SAndroid Build Coastguard Worker while !(state.queue.is_empty() && state.n_waiting == self.n_consumers) { 162*bb4ee6a4SAndroid Build Coastguard Worker state = self.producer_wait.wait(state); 163*bb4ee6a4SAndroid Build Coastguard Worker } 164*bb4ee6a4SAndroid Build Coastguard Worker } 165*bb4ee6a4SAndroid Build Coastguard Worker } 166*bb4ee6a4SAndroid Build Coastguard Worker 167*bb4ee6a4SAndroid Build Coastguard Worker struct ChannelState<T> { 168*bb4ee6a4SAndroid Build Coastguard Worker queue: VecDeque<T>, 169*bb4ee6a4SAndroid Build Coastguard Worker capacity: usize, 170*bb4ee6a4SAndroid Build Coastguard Worker n_waiting: usize, 171*bb4ee6a4SAndroid Build Coastguard Worker is_closed: bool, 172*bb4ee6a4SAndroid Build Coastguard Worker } 173*bb4ee6a4SAndroid Build Coastguard Worker 174*bb4ee6a4SAndroid Build Coastguard Worker impl<T> ChannelState<T> { new(capacity: usize) -> Self175*bb4ee6a4SAndroid Build Coastguard Worker fn new(capacity: usize) -> Self { 176*bb4ee6a4SAndroid Build Coastguard Worker Self { 177*bb4ee6a4SAndroid Build Coastguard Worker queue: VecDeque::with_capacity(capacity), 178*bb4ee6a4SAndroid Build Coastguard Worker capacity, 179*bb4ee6a4SAndroid Build Coastguard Worker n_waiting: 0, 180*bb4ee6a4SAndroid Build Coastguard Worker is_closed: false, 181*bb4ee6a4SAndroid Build Coastguard Worker } 182*bb4ee6a4SAndroid Build Coastguard Worker } 183*bb4ee6a4SAndroid Build Coastguard Worker } 184*bb4ee6a4SAndroid Build Coastguard Worker 185*bb4ee6a4SAndroid Build Coastguard Worker /// The event channel for background jobs. 186*bb4ee6a4SAndroid Build Coastguard Worker /// 187*bb4ee6a4SAndroid Build Coastguard Worker /// This sends an abort request from the main thread to the job thread via atomic boolean flag. 188*bb4ee6a4SAndroid Build Coastguard Worker /// 189*bb4ee6a4SAndroid Build Coastguard Worker /// This notifies the main thread that the job thread is completed via [Event]. 190*bb4ee6a4SAndroid Build Coastguard Worker pub struct BackgroundJobControl { 191*bb4ee6a4SAndroid Build Coastguard Worker event: Event, 192*bb4ee6a4SAndroid Build Coastguard Worker abort_flag: AtomicBool, 193*bb4ee6a4SAndroid Build Coastguard Worker } 194*bb4ee6a4SAndroid Build Coastguard Worker 195*bb4ee6a4SAndroid Build Coastguard Worker impl BackgroundJobControl { 196*bb4ee6a4SAndroid Build Coastguard Worker /// Creates [BackgroundJobControl]. new() -> anyhow::Result<Self>197*bb4ee6a4SAndroid Build Coastguard Worker pub fn new() -> anyhow::Result<Self> { 198*bb4ee6a4SAndroid Build Coastguard Worker Ok(Self { 199*bb4ee6a4SAndroid Build Coastguard Worker event: Event::new()?, 200*bb4ee6a4SAndroid Build Coastguard Worker abort_flag: AtomicBool::new(false), 201*bb4ee6a4SAndroid Build Coastguard Worker }) 202*bb4ee6a4SAndroid Build Coastguard Worker } 203*bb4ee6a4SAndroid Build Coastguard Worker 204*bb4ee6a4SAndroid Build Coastguard Worker /// Creates [BackgroundJob]. new_job(&self) -> BackgroundJob<'_>205*bb4ee6a4SAndroid Build Coastguard Worker pub fn new_job(&self) -> BackgroundJob<'_> { 206*bb4ee6a4SAndroid Build Coastguard Worker BackgroundJob { 207*bb4ee6a4SAndroid Build Coastguard Worker event: &self.event, 208*bb4ee6a4SAndroid Build Coastguard Worker abort_flag: &self.abort_flag, 209*bb4ee6a4SAndroid Build Coastguard Worker } 210*bb4ee6a4SAndroid Build Coastguard Worker } 211*bb4ee6a4SAndroid Build Coastguard Worker 212*bb4ee6a4SAndroid Build Coastguard Worker /// Abort the background job. abort(&self)213*bb4ee6a4SAndroid Build Coastguard Worker pub fn abort(&self) { 214*bb4ee6a4SAndroid Build Coastguard Worker self.abort_flag.store(true, Ordering::Release); 215*bb4ee6a4SAndroid Build Coastguard Worker } 216*bb4ee6a4SAndroid Build Coastguard Worker 217*bb4ee6a4SAndroid Build Coastguard Worker /// Reset the internal state for a next job. 218*bb4ee6a4SAndroid Build Coastguard Worker /// 219*bb4ee6a4SAndroid Build Coastguard Worker /// Returns false, if the event is already reset and no event exists. reset(&self) -> anyhow::Result<bool>220*bb4ee6a4SAndroid Build Coastguard Worker pub fn reset(&self) -> anyhow::Result<bool> { 221*bb4ee6a4SAndroid Build Coastguard Worker self.abort_flag.store(false, Ordering::Release); 222*bb4ee6a4SAndroid Build Coastguard Worker Ok(matches!( 223*bb4ee6a4SAndroid Build Coastguard Worker self.event 224*bb4ee6a4SAndroid Build Coastguard Worker .wait_timeout(Duration::ZERO) 225*bb4ee6a4SAndroid Build Coastguard Worker .context("failed to get job complete event")?, 226*bb4ee6a4SAndroid Build Coastguard Worker EventWaitResult::Signaled 227*bb4ee6a4SAndroid Build Coastguard Worker )) 228*bb4ee6a4SAndroid Build Coastguard Worker } 229*bb4ee6a4SAndroid Build Coastguard Worker 230*bb4ee6a4SAndroid Build Coastguard Worker /// Returns the event to notify the completion of background job. get_completion_event(&self) -> &Event231*bb4ee6a4SAndroid Build Coastguard Worker pub fn get_completion_event(&self) -> &Event { 232*bb4ee6a4SAndroid Build Coastguard Worker &self.event 233*bb4ee6a4SAndroid Build Coastguard Worker } 234*bb4ee6a4SAndroid Build Coastguard Worker } 235*bb4ee6a4SAndroid Build Coastguard Worker 236*bb4ee6a4SAndroid Build Coastguard Worker /// Background job context. 237*bb4ee6a4SAndroid Build Coastguard Worker /// 238*bb4ee6a4SAndroid Build Coastguard Worker /// When dropped, this sends an event to the main thread via [Event]. 239*bb4ee6a4SAndroid Build Coastguard Worker pub struct BackgroundJob<'a> { 240*bb4ee6a4SAndroid Build Coastguard Worker event: &'a Event, 241*bb4ee6a4SAndroid Build Coastguard Worker abort_flag: &'a AtomicBool, 242*bb4ee6a4SAndroid Build Coastguard Worker } 243*bb4ee6a4SAndroid Build Coastguard Worker 244*bb4ee6a4SAndroid Build Coastguard Worker impl BackgroundJob<'_> { 245*bb4ee6a4SAndroid Build Coastguard Worker /// Returns whether the background job is aborted or not. is_aborted(&self) -> bool246*bb4ee6a4SAndroid Build Coastguard Worker pub fn is_aborted(&self) -> bool { 247*bb4ee6a4SAndroid Build Coastguard Worker self.abort_flag.load(Ordering::Acquire) 248*bb4ee6a4SAndroid Build Coastguard Worker } 249*bb4ee6a4SAndroid Build Coastguard Worker } 250*bb4ee6a4SAndroid Build Coastguard Worker 251*bb4ee6a4SAndroid Build Coastguard Worker impl Drop for BackgroundJob<'_> { drop(&mut self)252*bb4ee6a4SAndroid Build Coastguard Worker fn drop(&mut self) { 253*bb4ee6a4SAndroid Build Coastguard Worker self.event.signal().expect("send job complete event"); 254*bb4ee6a4SAndroid Build Coastguard Worker } 255*bb4ee6a4SAndroid Build Coastguard Worker } 256*bb4ee6a4SAndroid Build Coastguard Worker 257*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(test)] 258*bb4ee6a4SAndroid Build Coastguard Worker mod tests { 259*bb4ee6a4SAndroid Build Coastguard Worker use std::time::Duration; 260*bb4ee6a4SAndroid Build Coastguard Worker 261*bb4ee6a4SAndroid Build Coastguard Worker use super::*; 262*bb4ee6a4SAndroid Build Coastguard Worker 263*bb4ee6a4SAndroid Build Coastguard Worker #[derive(Clone, Copy)] 264*bb4ee6a4SAndroid Build Coastguard Worker struct Context { 265*bb4ee6a4SAndroid Build Coastguard Worker n_consume: usize, 266*bb4ee6a4SAndroid Build Coastguard Worker n_executed: usize, 267*bb4ee6a4SAndroid Build Coastguard Worker } 268*bb4ee6a4SAndroid Build Coastguard Worker 269*bb4ee6a4SAndroid Build Coastguard Worker struct FakeTask { 270*bb4ee6a4SAndroid Build Coastguard Worker context: Mutex<Context>, 271*bb4ee6a4SAndroid Build Coastguard Worker waker: Condvar, 272*bb4ee6a4SAndroid Build Coastguard Worker } 273*bb4ee6a4SAndroid Build Coastguard Worker 274*bb4ee6a4SAndroid Build Coastguard Worker impl FakeTask { new() -> Arc<Self>275*bb4ee6a4SAndroid Build Coastguard Worker fn new() -> Arc<Self> { 276*bb4ee6a4SAndroid Build Coastguard Worker Arc::new(Self { 277*bb4ee6a4SAndroid Build Coastguard Worker context: Mutex::new(Context { 278*bb4ee6a4SAndroid Build Coastguard Worker n_consume: 0, 279*bb4ee6a4SAndroid Build Coastguard Worker n_executed: 0, 280*bb4ee6a4SAndroid Build Coastguard Worker }), 281*bb4ee6a4SAndroid Build Coastguard Worker waker: Condvar::new(), 282*bb4ee6a4SAndroid Build Coastguard Worker }) 283*bb4ee6a4SAndroid Build Coastguard Worker } 284*bb4ee6a4SAndroid Build Coastguard Worker consume(&self, count: usize)285*bb4ee6a4SAndroid Build Coastguard Worker fn consume(&self, count: usize) { 286*bb4ee6a4SAndroid Build Coastguard Worker let mut context = self.context.lock(); 287*bb4ee6a4SAndroid Build Coastguard Worker context.n_consume += count; 288*bb4ee6a4SAndroid Build Coastguard Worker self.waker.notify_all(); 289*bb4ee6a4SAndroid Build Coastguard Worker } 290*bb4ee6a4SAndroid Build Coastguard Worker n_executed(&self) -> usize291*bb4ee6a4SAndroid Build Coastguard Worker fn n_executed(&self) -> usize { 292*bb4ee6a4SAndroid Build Coastguard Worker self.context.lock().n_executed 293*bb4ee6a4SAndroid Build Coastguard Worker } 294*bb4ee6a4SAndroid Build Coastguard Worker } 295*bb4ee6a4SAndroid Build Coastguard Worker 296*bb4ee6a4SAndroid Build Coastguard Worker impl Task for Arc<FakeTask> { execute(self)297*bb4ee6a4SAndroid Build Coastguard Worker fn execute(self) { 298*bb4ee6a4SAndroid Build Coastguard Worker let mut context = self.context.lock(); 299*bb4ee6a4SAndroid Build Coastguard Worker while context.n_consume == 0 { 300*bb4ee6a4SAndroid Build Coastguard Worker context = self.waker.wait(context); 301*bb4ee6a4SAndroid Build Coastguard Worker } 302*bb4ee6a4SAndroid Build Coastguard Worker context.n_consume -= 1; 303*bb4ee6a4SAndroid Build Coastguard Worker context.n_executed += 1; 304*bb4ee6a4SAndroid Build Coastguard Worker } 305*bb4ee6a4SAndroid Build Coastguard Worker } 306*bb4ee6a4SAndroid Build Coastguard Worker wait_thread_with_timeout<T>(join_handle: thread::JoinHandle<T>, timeout_millis: u64) -> T307*bb4ee6a4SAndroid Build Coastguard Worker fn wait_thread_with_timeout<T>(join_handle: thread::JoinHandle<T>, timeout_millis: u64) -> T { 308*bb4ee6a4SAndroid Build Coastguard Worker for _ in 0..timeout_millis { 309*bb4ee6a4SAndroid Build Coastguard Worker if join_handle.is_finished() { 310*bb4ee6a4SAndroid Build Coastguard Worker return join_handle.join().unwrap(); 311*bb4ee6a4SAndroid Build Coastguard Worker } 312*bb4ee6a4SAndroid Build Coastguard Worker thread::sleep(Duration::from_millis(1)); 313*bb4ee6a4SAndroid Build Coastguard Worker } 314*bb4ee6a4SAndroid Build Coastguard Worker panic!("thread join timeout"); 315*bb4ee6a4SAndroid Build Coastguard Worker } 316*bb4ee6a4SAndroid Build Coastguard Worker poll_until_with_timeout<F>(f: F, timeout_millis: u64) where F: Fn() -> bool,317*bb4ee6a4SAndroid Build Coastguard Worker fn poll_until_with_timeout<F>(f: F, timeout_millis: u64) 318*bb4ee6a4SAndroid Build Coastguard Worker where 319*bb4ee6a4SAndroid Build Coastguard Worker F: Fn() -> bool, 320*bb4ee6a4SAndroid Build Coastguard Worker { 321*bb4ee6a4SAndroid Build Coastguard Worker for _ in 0..timeout_millis { 322*bb4ee6a4SAndroid Build Coastguard Worker if f() { 323*bb4ee6a4SAndroid Build Coastguard Worker break; 324*bb4ee6a4SAndroid Build Coastguard Worker } 325*bb4ee6a4SAndroid Build Coastguard Worker thread::sleep(Duration::from_millis(1)); 326*bb4ee6a4SAndroid Build Coastguard Worker } 327*bb4ee6a4SAndroid Build Coastguard Worker } 328*bb4ee6a4SAndroid Build Coastguard Worker 329*bb4ee6a4SAndroid Build Coastguard Worker #[test] test_worker()330*bb4ee6a4SAndroid Build Coastguard Worker fn test_worker() { 331*bb4ee6a4SAndroid Build Coastguard Worker let worker = Worker::new(2, 4); 332*bb4ee6a4SAndroid Build Coastguard Worker let task = FakeTask::new(); 333*bb4ee6a4SAndroid Build Coastguard Worker let channel = worker.channel.clone(); 334*bb4ee6a4SAndroid Build Coastguard Worker 335*bb4ee6a4SAndroid Build Coastguard Worker for _ in 0..4 { 336*bb4ee6a4SAndroid Build Coastguard Worker assert!(channel.push(task.clone())); 337*bb4ee6a4SAndroid Build Coastguard Worker } 338*bb4ee6a4SAndroid Build Coastguard Worker 339*bb4ee6a4SAndroid Build Coastguard Worker assert_eq!(task.n_executed(), 0); 340*bb4ee6a4SAndroid Build Coastguard Worker task.consume(4); 341*bb4ee6a4SAndroid Build Coastguard Worker worker.channel.wait_complete(); 342*bb4ee6a4SAndroid Build Coastguard Worker assert_eq!(task.n_executed(), 4); 343*bb4ee6a4SAndroid Build Coastguard Worker worker.close(); 344*bb4ee6a4SAndroid Build Coastguard Worker } 345*bb4ee6a4SAndroid Build Coastguard Worker 346*bb4ee6a4SAndroid Build Coastguard Worker #[test] test_worker_push_after_close()347*bb4ee6a4SAndroid Build Coastguard Worker fn test_worker_push_after_close() { 348*bb4ee6a4SAndroid Build Coastguard Worker let worker = Worker::new(2, 4); 349*bb4ee6a4SAndroid Build Coastguard Worker let task = FakeTask::new(); 350*bb4ee6a4SAndroid Build Coastguard Worker let channel = worker.channel.clone(); 351*bb4ee6a4SAndroid Build Coastguard Worker 352*bb4ee6a4SAndroid Build Coastguard Worker worker.close(); 353*bb4ee6a4SAndroid Build Coastguard Worker 354*bb4ee6a4SAndroid Build Coastguard Worker assert!(!channel.push(task)); 355*bb4ee6a4SAndroid Build Coastguard Worker } 356*bb4ee6a4SAndroid Build Coastguard Worker 357*bb4ee6a4SAndroid Build Coastguard Worker #[test] test_worker_push_block()358*bb4ee6a4SAndroid Build Coastguard Worker fn test_worker_push_block() { 359*bb4ee6a4SAndroid Build Coastguard Worker let worker = Worker::new(2, 4); 360*bb4ee6a4SAndroid Build Coastguard Worker let task = FakeTask::new(); 361*bb4ee6a4SAndroid Build Coastguard Worker let channel = worker.channel.clone(); 362*bb4ee6a4SAndroid Build Coastguard Worker 363*bb4ee6a4SAndroid Build Coastguard Worker let task_cloned = task.clone(); 364*bb4ee6a4SAndroid Build Coastguard Worker // push tasks on another thread to avoid blocking forever 365*bb4ee6a4SAndroid Build Coastguard Worker wait_thread_with_timeout( 366*bb4ee6a4SAndroid Build Coastguard Worker thread::spawn(move || { 367*bb4ee6a4SAndroid Build Coastguard Worker for _ in 0..6 { 368*bb4ee6a4SAndroid Build Coastguard Worker assert!(channel.push(task_cloned.clone())); 369*bb4ee6a4SAndroid Build Coastguard Worker } 370*bb4ee6a4SAndroid Build Coastguard Worker }), 371*bb4ee6a4SAndroid Build Coastguard Worker 100, 372*bb4ee6a4SAndroid Build Coastguard Worker ); 373*bb4ee6a4SAndroid Build Coastguard Worker let channel = worker.channel.clone(); 374*bb4ee6a4SAndroid Build Coastguard Worker let task_cloned = task.clone(); 375*bb4ee6a4SAndroid Build Coastguard Worker let push_thread = thread::spawn(move || { 376*bb4ee6a4SAndroid Build Coastguard Worker assert!(channel.push(task_cloned)); 377*bb4ee6a4SAndroid Build Coastguard Worker }); 378*bb4ee6a4SAndroid Build Coastguard Worker thread::sleep(Duration::from_millis(10)); 379*bb4ee6a4SAndroid Build Coastguard Worker assert!(!push_thread.is_finished()); 380*bb4ee6a4SAndroid Build Coastguard Worker 381*bb4ee6a4SAndroid Build Coastguard Worker task.consume(1); 382*bb4ee6a4SAndroid Build Coastguard Worker wait_thread_with_timeout(push_thread, 100); 383*bb4ee6a4SAndroid Build Coastguard Worker 384*bb4ee6a4SAndroid Build Coastguard Worker task.consume(6); 385*bb4ee6a4SAndroid Build Coastguard Worker #[allow(clippy::redundant_clone)] 386*bb4ee6a4SAndroid Build Coastguard Worker let task_clone = task.clone(); 387*bb4ee6a4SAndroid Build Coastguard Worker poll_until_with_timeout(|| task_clone.n_executed() == 7, 100); 388*bb4ee6a4SAndroid Build Coastguard Worker assert_eq!(task.n_executed(), 7); 389*bb4ee6a4SAndroid Build Coastguard Worker worker.close(); 390*bb4ee6a4SAndroid Build Coastguard Worker } 391*bb4ee6a4SAndroid Build Coastguard Worker 392*bb4ee6a4SAndroid Build Coastguard Worker #[test] test_worker_close_on_push_blocked()393*bb4ee6a4SAndroid Build Coastguard Worker fn test_worker_close_on_push_blocked() { 394*bb4ee6a4SAndroid Build Coastguard Worker let worker = Worker::new(2, 4); 395*bb4ee6a4SAndroid Build Coastguard Worker let task = FakeTask::new(); 396*bb4ee6a4SAndroid Build Coastguard Worker let channel = worker.channel.clone(); 397*bb4ee6a4SAndroid Build Coastguard Worker 398*bb4ee6a4SAndroid Build Coastguard Worker let task_cloned = task.clone(); 399*bb4ee6a4SAndroid Build Coastguard Worker // push tasks on another thread to avoid blocking forever 400*bb4ee6a4SAndroid Build Coastguard Worker wait_thread_with_timeout( 401*bb4ee6a4SAndroid Build Coastguard Worker thread::spawn(move || { 402*bb4ee6a4SAndroid Build Coastguard Worker for _ in 0..6 { 403*bb4ee6a4SAndroid Build Coastguard Worker assert!(channel.push(task_cloned.clone())); 404*bb4ee6a4SAndroid Build Coastguard Worker } 405*bb4ee6a4SAndroid Build Coastguard Worker }), 406*bb4ee6a4SAndroid Build Coastguard Worker 100, 407*bb4ee6a4SAndroid Build Coastguard Worker ); 408*bb4ee6a4SAndroid Build Coastguard Worker let channel = worker.channel.clone(); 409*bb4ee6a4SAndroid Build Coastguard Worker let task_cloned = task.clone(); 410*bb4ee6a4SAndroid Build Coastguard Worker let push_thread = thread::spawn(move || channel.push(task_cloned)); 411*bb4ee6a4SAndroid Build Coastguard Worker // sleep to run push_thread. 412*bb4ee6a4SAndroid Build Coastguard Worker thread::sleep(Duration::from_millis(10)); 413*bb4ee6a4SAndroid Build Coastguard Worker // close blocks until all the task are executed. 414*bb4ee6a4SAndroid Build Coastguard Worker let close_thread = thread::spawn(move || { 415*bb4ee6a4SAndroid Build Coastguard Worker worker.close(); 416*bb4ee6a4SAndroid Build Coastguard Worker }); 417*bb4ee6a4SAndroid Build Coastguard Worker let push_result = wait_thread_with_timeout(push_thread, 100); 418*bb4ee6a4SAndroid Build Coastguard Worker // push fails. 419*bb4ee6a4SAndroid Build Coastguard Worker assert!(!push_result); 420*bb4ee6a4SAndroid Build Coastguard Worker 421*bb4ee6a4SAndroid Build Coastguard Worker // cleanup 422*bb4ee6a4SAndroid Build Coastguard Worker task.consume(6); 423*bb4ee6a4SAndroid Build Coastguard Worker wait_thread_with_timeout(close_thread, 100); 424*bb4ee6a4SAndroid Build Coastguard Worker } 425*bb4ee6a4SAndroid Build Coastguard Worker 426*bb4ee6a4SAndroid Build Coastguard Worker #[test] new_background_job_event()427*bb4ee6a4SAndroid Build Coastguard Worker fn new_background_job_event() { 428*bb4ee6a4SAndroid Build Coastguard Worker assert!(BackgroundJobControl::new().is_ok()); 429*bb4ee6a4SAndroid Build Coastguard Worker } 430*bb4ee6a4SAndroid Build Coastguard Worker 431*bb4ee6a4SAndroid Build Coastguard Worker #[test] background_job_is_not_aborted_default()432*bb4ee6a4SAndroid Build Coastguard Worker fn background_job_is_not_aborted_default() { 433*bb4ee6a4SAndroid Build Coastguard Worker let event = BackgroundJobControl::new().unwrap(); 434*bb4ee6a4SAndroid Build Coastguard Worker 435*bb4ee6a4SAndroid Build Coastguard Worker let job = event.new_job(); 436*bb4ee6a4SAndroid Build Coastguard Worker 437*bb4ee6a4SAndroid Build Coastguard Worker assert!(!job.is_aborted()); 438*bb4ee6a4SAndroid Build Coastguard Worker } 439*bb4ee6a4SAndroid Build Coastguard Worker 440*bb4ee6a4SAndroid Build Coastguard Worker #[test] abort_background_job()441*bb4ee6a4SAndroid Build Coastguard Worker fn abort_background_job() { 442*bb4ee6a4SAndroid Build Coastguard Worker let event = BackgroundJobControl::new().unwrap(); 443*bb4ee6a4SAndroid Build Coastguard Worker 444*bb4ee6a4SAndroid Build Coastguard Worker let job = event.new_job(); 445*bb4ee6a4SAndroid Build Coastguard Worker event.abort(); 446*bb4ee6a4SAndroid Build Coastguard Worker 447*bb4ee6a4SAndroid Build Coastguard Worker assert!(job.is_aborted()); 448*bb4ee6a4SAndroid Build Coastguard Worker } 449*bb4ee6a4SAndroid Build Coastguard Worker 450*bb4ee6a4SAndroid Build Coastguard Worker #[test] reset_background_job()451*bb4ee6a4SAndroid Build Coastguard Worker fn reset_background_job() { 452*bb4ee6a4SAndroid Build Coastguard Worker let event = BackgroundJobControl::new().unwrap(); 453*bb4ee6a4SAndroid Build Coastguard Worker 454*bb4ee6a4SAndroid Build Coastguard Worker event.abort(); 455*bb4ee6a4SAndroid Build Coastguard Worker event.reset().unwrap(); 456*bb4ee6a4SAndroid Build Coastguard Worker let job = event.new_job(); 457*bb4ee6a4SAndroid Build Coastguard Worker 458*bb4ee6a4SAndroid Build Coastguard Worker assert!(!job.is_aborted()); 459*bb4ee6a4SAndroid Build Coastguard Worker } 460*bb4ee6a4SAndroid Build Coastguard Worker 461*bb4ee6a4SAndroid Build Coastguard Worker #[test] reset_background_job_event()462*bb4ee6a4SAndroid Build Coastguard Worker fn reset_background_job_event() { 463*bb4ee6a4SAndroid Build Coastguard Worker let event = BackgroundJobControl::new().unwrap(); 464*bb4ee6a4SAndroid Build Coastguard Worker 465*bb4ee6a4SAndroid Build Coastguard Worker let job = event.new_job(); 466*bb4ee6a4SAndroid Build Coastguard Worker drop(job); 467*bb4ee6a4SAndroid Build Coastguard Worker 468*bb4ee6a4SAndroid Build Coastguard Worker assert!(event.reset().unwrap()); 469*bb4ee6a4SAndroid Build Coastguard Worker } 470*bb4ee6a4SAndroid Build Coastguard Worker 471*bb4ee6a4SAndroid Build Coastguard Worker #[test] reset_background_job_event_twice()472*bb4ee6a4SAndroid Build Coastguard Worker fn reset_background_job_event_twice() { 473*bb4ee6a4SAndroid Build Coastguard Worker let event = BackgroundJobControl::new().unwrap(); 474*bb4ee6a4SAndroid Build Coastguard Worker 475*bb4ee6a4SAndroid Build Coastguard Worker let job = event.new_job(); 476*bb4ee6a4SAndroid Build Coastguard Worker drop(job); 477*bb4ee6a4SAndroid Build Coastguard Worker 478*bb4ee6a4SAndroid Build Coastguard Worker event.reset().unwrap(); 479*bb4ee6a4SAndroid Build Coastguard Worker assert!(!event.reset().unwrap()); 480*bb4ee6a4SAndroid Build Coastguard Worker } 481*bb4ee6a4SAndroid Build Coastguard Worker 482*bb4ee6a4SAndroid Build Coastguard Worker #[test] reset_background_job_event_no_jobs()483*bb4ee6a4SAndroid Build Coastguard Worker fn reset_background_job_event_no_jobs() { 484*bb4ee6a4SAndroid Build Coastguard Worker let event = BackgroundJobControl::new().unwrap(); 485*bb4ee6a4SAndroid Build Coastguard Worker 486*bb4ee6a4SAndroid Build Coastguard Worker assert!(!event.reset().unwrap()); 487*bb4ee6a4SAndroid Build Coastguard Worker } 488*bb4ee6a4SAndroid Build Coastguard Worker } 489