1 // Copyright 2024 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 // TODO(b/318439696): Remove once it is used 6 #![allow(dead_code)] 7 8 use std::collections::HashMap; 9 use std::fmt::Write; 10 use std::sync::atomic::AtomicU32; 11 use std::sync::atomic::Ordering; 12 use std::sync::Arc; 13 use std::sync::RwLock; 14 use std::time::Duration; 15 16 use thiserror::Error as ThisError; 17 18 use crate::EventToken; 19 use crate::Timer; 20 use crate::TimerTrait; 21 use crate::WaitContext; 22 use crate::WorkerThread; 23 24 /// Utility class that helps count and log high frequency events periodically. 25 pub struct PeriodicLogger { 26 // Name that is printed out to differentiate between other `PeriodicLogger`s 27 name: String, 28 // Interval to log 29 interval: Duration, 30 // Map of event counters that are periodically logged 31 counters: Arc<RwLock<HashMap<String, AtomicU32>>>, 32 // The periodic logger thread 33 worker_thread: Option<WorkerThread<Result<(), PeriodicLoggerError>>>, 34 } 35 36 impl PeriodicLogger { new(name: String, interval: Duration) -> Self37 pub fn new(name: String, interval: Duration) -> Self { 38 PeriodicLogger { 39 name, 40 interval, 41 counters: Arc::new(RwLock::new(HashMap::new())), 42 worker_thread: None, 43 } 44 } 45 46 /// Add a new event item to be counted. add_counter_item(&self, name: String) -> Result<(), PeriodicLoggerError>47 pub fn add_counter_item(&self, name: String) -> Result<(), PeriodicLoggerError> { 48 // This write lock will likely be acquired infrequently. 49 let mut counters_write_lock = self 50 .counters 51 .write() 52 .map_err(|e| PeriodicLoggerError::WriteLockError(e.to_string()))?; 53 54 if counters_write_lock.contains_key(&name) { 55 return Err(PeriodicLoggerError::CounterAlreadyExist(name)); 56 } 57 58 counters_write_lock.insert(name, AtomicU32::new(0)); 59 Ok(()) 60 } 61 62 /// Increment event counter by an `amount` increment_counter(&self, name: String, amount: u32) -> Result<(), PeriodicLoggerError>63 pub fn increment_counter(&self, name: String, amount: u32) -> Result<(), PeriodicLoggerError> { 64 match self.counters.read() { 65 Ok(counters_map) => { 66 if let Some(atomic_counter) = counters_map.get(&name) { 67 atomic_counter.fetch_add(amount, Ordering::Relaxed); 68 Ok(()) 69 } else { 70 Err(PeriodicLoggerError::CounterDoesNotExist(name)) 71 } 72 } 73 Err(e) => Err(PeriodicLoggerError::ReadLockError(e.to_string())), 74 } 75 } 76 77 /// Starts a thread that will log the count of events within a `self.interval` time period. 78 /// All counters will be reset to 0 after logging. start_logging_thread(&mut self) -> Result<(), PeriodicLoggerError>79 pub fn start_logging_thread(&mut self) -> Result<(), PeriodicLoggerError> { 80 if self.worker_thread.is_some() { 81 return Err(PeriodicLoggerError::ThreadAlreadyStarted); 82 } 83 84 #[derive(EventToken)] 85 enum Token { 86 Exit, 87 PeriodicLog, 88 } 89 90 let cloned_counter = self.counters.clone(); 91 let interval_copy = self.interval; 92 let name_copy = self.name.clone(); 93 self.worker_thread = Some(WorkerThread::start( 94 format!("PeriodicLogger_{}", self.name), 95 move |kill_evt| { 96 let mut timer = Timer::new().map_err(PeriodicLoggerError::TimerNewError)?; 97 timer 98 .reset_repeating(interval_copy) 99 .map_err(PeriodicLoggerError::TimerResetError)?; 100 101 let wait_ctx = WaitContext::build_with(&[ 102 (&kill_evt, Token::Exit), 103 (&timer, Token::PeriodicLog), 104 ]) 105 .map_err(PeriodicLoggerError::WaitContextBuildError)?; 106 107 'outer: loop { 108 let events = wait_ctx.wait().expect("wait failed"); 109 for event in events.iter().filter(|e| e.is_readable) { 110 match event.token { 111 Token::Exit => { 112 break 'outer; 113 } 114 Token::PeriodicLog => { 115 timer.mark_waited().unwrap(); 116 117 let counter_map = cloned_counter.read().map_err(|e| { 118 PeriodicLoggerError::ReadLockError(e.to_string()) 119 })?; 120 121 let mut logged_string = 122 format!("{} {:?}:", name_copy, interval_copy); 123 for (counter_name, counter_value) in counter_map.iter() { 124 let value = counter_value.swap(0, Ordering::Relaxed); 125 let _ = 126 write!(logged_string, "\n {}: {}", counter_name, value); 127 } 128 129 // Log all counters 130 crate::info!("{}", logged_string); 131 } 132 } 133 } 134 } 135 Ok(()) 136 }, 137 )); 138 139 Ok(()) 140 } 141 } 142 143 #[derive(Debug, ThisError, PartialEq)] 144 pub enum PeriodicLoggerError { 145 #[error("Periodic logger thread already started.")] 146 ThreadAlreadyStarted, 147 #[error("Failed to acquire write lock: {0}")] 148 WriteLockError(String), 149 #[error("Failed to acquire read lock: {0}")] 150 ReadLockError(String), 151 #[error("Counter already exists: {0}")] 152 CounterAlreadyExist(String), 153 #[error("Counter does not exist: {0}")] 154 CounterDoesNotExist(String), 155 #[error("Failed to build WaitContext: {0}")] 156 WaitContextBuildError(crate::Error), 157 #[error("Failed to wait on WaitContext: {0}")] 158 WaitContextWaitError(crate::Error), 159 #[error("Failed to reset Timer: {0}")] 160 TimerResetError(crate::Error), 161 #[error("Failed initialize Timer: {0}")] 162 TimerNewError(crate::Error), 163 } 164 165 #[cfg(test)] 166 mod tests { 167 use std::thread; 168 169 use super::*; 170 171 #[test] periodic_add()172 fn periodic_add() { 173 let periodic_logger = PeriodicLogger::new("test".to_string(), Duration::from_secs(3)); 174 periodic_logger 175 .add_counter_item("counter_1".to_string()) 176 .unwrap(); 177 periodic_logger 178 .increment_counter("counter_1".to_string(), 2) 179 .unwrap(); 180 periodic_logger 181 .increment_counter("counter_1".to_string(), 5) 182 .unwrap(); 183 184 assert_eq!(periodic_logger.counters.read().unwrap().len(), 1); 185 assert_eq!( 186 periodic_logger 187 .counters 188 .read() 189 .unwrap() 190 .get("counter_1") 191 .unwrap() 192 .load(Ordering::Relaxed), 193 7 194 ); 195 } 196 197 #[test] worker_thread_cannot_start_twice()198 fn worker_thread_cannot_start_twice() { 199 let mut periodic_logger = PeriodicLogger::new("test".to_string(), Duration::from_secs(3)); 200 assert!(periodic_logger.start_logging_thread().is_ok()); 201 assert!(periodic_logger.start_logging_thread().is_err()); 202 } 203 204 #[test] add_same_counter_item_twice_return_err()205 fn add_same_counter_item_twice_return_err() { 206 let periodic_logger = PeriodicLogger::new("test".to_string(), Duration::from_secs(3)); 207 assert!(periodic_logger 208 .add_counter_item("counter_1".to_string()) 209 .is_ok()); 210 assert_eq!( 211 periodic_logger.add_counter_item("counter_1".to_string()), 212 Err(PeriodicLoggerError::CounterAlreadyExist( 213 "counter_1".to_string() 214 )) 215 ); 216 } 217 218 /// Ignored because this is intended to be ran locally 219 #[ignore] 220 #[test] periodic_logger_smoke_test()221 fn periodic_logger_smoke_test() { 222 let mut periodic_logger = PeriodicLogger::new("test".to_string(), Duration::from_secs(3)); 223 periodic_logger 224 .add_counter_item("counter_1".to_string()) 225 .unwrap(); 226 227 periodic_logger.start_logging_thread().unwrap(); 228 periodic_logger 229 .increment_counter("counter_1".to_string(), 5) 230 .unwrap(); 231 232 thread::sleep(Duration::from_secs(5)); 233 } 234 } 235