1 // Copyright 2023 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 //! Worker thread abstraction 6 7 use std::panic; 8 use std::thread; 9 use std::thread::JoinHandle; 10 use std::thread::Thread; 11 12 use crate::Error; 13 use crate::Event; 14 15 /// Wrapper object for creating a worker thread that can be stopped by signaling an event. 16 pub struct WorkerThread<T: Send + 'static> { 17 worker: Option<(Event, JoinHandle<T>)>, 18 } 19 20 impl<T: Send + 'static> WorkerThread<T> { 21 /// Starts a worker thread named `thread_name` running the `thread_func` function. 22 /// 23 /// The `thread_func` implementation must monitor the provided `Event` and return from the 24 /// thread when it is signaled. 25 /// 26 /// Call [`stop()`](Self::stop) to stop the thread. start<F>(thread_name: impl Into<String>, thread_func: F) -> Self where F: FnOnce(Event) -> T + Send + 'static,27 pub fn start<F>(thread_name: impl Into<String>, thread_func: F) -> Self 28 where 29 F: FnOnce(Event) -> T + Send + 'static, 30 { 31 let stop_event = Event::new().expect("Event::new() failed"); 32 let thread_stop_event = stop_event.try_clone().expect("Event::try_clone() failed"); 33 34 let thread_handle = thread::Builder::new() 35 .name(thread_name.into()) 36 .spawn(move || thread_func(thread_stop_event)) 37 .expect("thread spawn failed"); 38 39 WorkerThread { 40 worker: Some((stop_event, thread_handle)), 41 } 42 } 43 44 /// Stops the worker thread. 45 /// 46 /// Returns the value returned by the function running in the thread. stop(mut self) -> T47 pub fn stop(mut self) -> T { 48 // The only time the internal `Option` should be `None` is in a `drop` after `stop`, so this 49 // `expect()` should never fail. 50 self.stop_internal().expect("invalid worker state") 51 } 52 53 // `stop_internal` accepts a reference so it can be called from `drop`. 54 #[doc(hidden)] stop_internal(&mut self) -> Option<T>55 fn stop_internal(&mut self) -> Option<T> { 56 self.worker.take().map(|(stop_event, thread_handle)| { 57 // There is nothing the caller can do to handle `stop_event.signal()` failure, and we 58 // don't want to leave the thread running, so panic in that case. 59 stop_event 60 .signal() 61 .expect("WorkerThread stop event signal failed"); 62 63 match thread_handle.join() { 64 Ok(v) => v, 65 Err(e) => panic::resume_unwind(e), 66 } 67 }) 68 } 69 70 /// Signal thread's stop event. Unlike stop, the function doesn't wait 71 /// on joining the thread. 72 /// The function can be called multiple times. 73 /// Calling `stop` or `drop` will internally signal the stop event again 74 /// and join the thread. signal(&mut self) -> Result<(), Error>75 pub fn signal(&mut self) -> Result<(), Error> { 76 if let Some((event, _)) = &mut self.worker { 77 event.signal() 78 } else { 79 Ok(()) 80 } 81 } 82 83 /// Returns a handle to the running thread. thread(&self) -> &Thread84 pub fn thread(&self) -> &Thread { 85 // The only time the internal `Option` should be `None` is in a `drop` after `stop`, so this 86 // `unwrap()` should never fail. 87 self.worker.as_ref().unwrap().1.thread() 88 } 89 } 90 91 impl<T: Send + 'static> Drop for WorkerThread<T> { 92 /// Stops the thread if the `WorkerThread` is dropped without calling [`stop()`](Self::stop). drop(&mut self)93 fn drop(&mut self) { 94 let _ = self.stop_internal(); 95 } 96 } 97