xref: /aosp_15_r20/tools/netsim/rust/daemon/src/session.rs (revision cf78ab8cffb8fc9207af348f23af247fb04370a6)
1 // Copyright 2023 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 //! A module to collect and write session stats
16 
17 use crate::devices::devices_handler::get_radio_stats;
18 use crate::events::{ChipRemoved, DeviceAdded, Event, ShutDown};
19 use crate::version::get_version;
20 use anyhow::Context;
21 use log::error;
22 use log::info;
23 use netsim_common::system::netsimd_temp_dir;
24 use netsim_proto::stats::NetsimStats as ProtoNetsimStats;
25 use protobuf_json_mapping::print_to_string;
26 use std::fs::File;
27 use std::io::Write;
28 use std::sync::mpsc::Receiver;
29 use std::sync::Arc;
30 use std::sync::RwLock;
31 use std::sync::RwLockWriteGuard;
32 use std::thread::{Builder, JoinHandle};
33 use std::time::{Duration, Instant};
34 
35 const WRITE_INTERVAL: Duration = Duration::from_secs(10);
36 
37 pub struct Session {
38     // Handle for the session monitor thread
39     handle: Option<JoinHandle<String>>,
40     // Session info is accessed by multiple threads
41     info: Arc<RwLock<SessionInfo>>,
42 }
43 
44 // Fields accessed by threads
45 struct SessionInfo {
46     stats_proto: ProtoNetsimStats,
47     current_device_count: i32,
48     session_start: Instant,
49     write_json: bool,
50 }
51 
52 impl Session {
new() -> Self53     pub fn new() -> Self {
54         Self::new_internal(true)
55     }
new_internal(write_json: bool) -> Self56     fn new_internal(write_json: bool) -> Self {
57         Session {
58             handle: None,
59             info: Arc::new(RwLock::new(SessionInfo {
60                 stats_proto: ProtoNetsimStats {
61                     version: Some(get_version()),
62                     ..Default::default()
63                 },
64                 current_device_count: 0,
65                 session_start: Instant::now(),
66                 write_json,
67             })),
68         }
69     }
70 
71     // Start the netsim states session.
72     //
73     // Starts the session monitor thread to handle events and
74     // write session stats to json file on event and periodically.
start(&mut self, events_rx: Receiver<Event>) -> &mut Self75     pub fn start(&mut self, events_rx: Receiver<Event>) -> &mut Self {
76         let info = Arc::clone(&self.info);
77 
78         // Start up session monitor thread
79         self.handle = Some(
80             Builder::new()
81                 .name("session_monitor".to_string())
82                 .spawn(move || {
83                     let mut next_instant = Instant::now() + WRITE_INTERVAL;
84                     loop {
85                         let mut write_stats = true;
86                         let this_instant = Instant::now();
87                         let timeout = if next_instant > this_instant {
88                             next_instant - this_instant
89                         } else {
90                             Duration::ZERO
91                         };
92                         let next_event = events_rx.recv_timeout(timeout);
93 
94                         // Hold the write lock for the duration of this loop iteration
95                         let mut lock = info.write().expect("Could not acquire session lock");
96                         match next_event {
97                             Ok(Event::ShutDown(ShutDown { reason })) => {
98                                 // Shutting down, save the session duration and exit
99                                 update_session_duration(&mut lock);
100                                 return reason;
101                             }
102 
103                             Ok(Event::DeviceRemoved(_)) => {
104                                 lock.current_device_count -= 1;
105                             }
106 
107                             Ok(Event::DeviceAdded(DeviceAdded { device_stats, .. })) => {
108                                 // update the current_device_count and peak device usage
109                                 lock.current_device_count += 1;
110                                 let current_device_count = lock.current_device_count;
111                                 // incremement total number of devices started
112                                 let device_count = lock.stats_proto.device_count();
113                                 lock.stats_proto.set_device_count(device_count + 1);
114                                 // track the peak device usage
115                                 if current_device_count > lock.stats_proto.peak_concurrent_devices()
116                                 {
117                                     lock.stats_proto
118                                         .set_peak_concurrent_devices(current_device_count);
119                                 }
120                                 // Add added device's stats
121                                 lock.stats_proto.device_stats.push(device_stats);
122                             }
123 
124                             Ok(Event::ChipRemoved(ChipRemoved {
125                                 radio_stats, device_id, ..
126                             })) => {
127                                 // Update the radio stats proto when a
128                                 // chip is removed.  In the case of
129                                 // bluetooth there will be 2 radios,
130                                 // otherwise 1
131                                 for mut r in radio_stats {
132                                     r.set_device_id(device_id.0);
133                                     lock.stats_proto.radio_stats.push(r);
134                                 }
135                             }
136 
137                             Ok(Event::ChipAdded(_)) => {
138                                 // No session stat update required when Chip is added but do proceed to write stats
139                             }
140                             _ => {
141                                 // other events are ignored, check to perform periodic write
142                                 if next_instant > Instant::now() {
143                                     write_stats = false;
144                                 }
145                             }
146                         }
147                         // End of event match - write current stats to json
148                         if write_stats {
149                             update_session_duration(&mut lock);
150                             if lock.write_json {
151                                 let current_stats = get_current_stats(lock.stats_proto.clone());
152                                 if let Err(err) = write_stats_to_json(current_stats) {
153                                     error!("Failed to write stats to json: {err:?}");
154                                 }
155                             }
156                             next_instant = Instant::now() + WRITE_INTERVAL;
157                         }
158                     }
159                 })
160                 .expect("failed to start session monitor thread"),
161         );
162         self
163     }
164 
165     // Stop the netsim stats session.
166     //
167     // Waits for the session monitor thread to finish and writes
168     // the session proto to a json file. Consumes the session.
stop(mut self) -> anyhow::Result<()>169     pub fn stop(mut self) -> anyhow::Result<()> {
170         if !self.handle.as_ref().expect("no session monitor").is_finished() {
171             info!("session monitor active, waiting...");
172         }
173 
174         // Synchronize on session monitor thread
175         self.handle.take().map(JoinHandle::join);
176 
177         let lock = self.info.read().expect("Could not acquire session lock");
178         if lock.write_json {
179             let current_stats = get_current_stats(lock.stats_proto.clone());
180             write_stats_to_json(current_stats)?;
181         }
182         Ok(())
183     }
184 }
185 
186 /// Update session duration
update_session_duration(session_lock: &mut RwLockWriteGuard<'_, SessionInfo>)187 fn update_session_duration(session_lock: &mut RwLockWriteGuard<'_, SessionInfo>) {
188     let duration_secs = session_lock.session_start.elapsed().as_secs();
189     session_lock.stats_proto.set_duration_secs(duration_secs);
190 }
191 
192 /// Construct current radio stats
get_current_stats(mut current_stats: ProtoNetsimStats) -> ProtoNetsimStats193 fn get_current_stats(mut current_stats: ProtoNetsimStats) -> ProtoNetsimStats {
194     current_stats.radio_stats.extend(get_radio_stats());
195     current_stats
196 }
197 
198 /// Write netsim stats to json file
write_stats_to_json(stats_proto: ProtoNetsimStats) -> anyhow::Result<()>199 fn write_stats_to_json(stats_proto: ProtoNetsimStats) -> anyhow::Result<()> {
200     let filename = netsimd_temp_dir().join("netsim_session_stats.json");
201     let mut file = File::create(filename)?;
202     let json = print_to_string(&stats_proto)?;
203     file.write(json.as_bytes()).context("Unable to write json session stats")?;
204     file.flush()?;
205     Ok(())
206 }
207 
208 #[cfg(test)]
209 mod tests {
210     use super::*;
211     use crate::devices::chip::ChipIdentifier;
212     use crate::devices::device::DeviceIdentifier;
213     use crate::events;
214     use crate::events::{ChipAdded, ChipRemoved, DeviceRemoved, Event, Events, ShutDown};
215     use netsim_proto::stats::{
216         NetsimDeviceStats as ProtoDeviceStats, NetsimRadioStats as ProtoRadioStats,
217     };
218     use std::sync::Mutex;
219 
220     const TEST_DEVICE_KIND: &str = "TEST_DEVICE";
221 
222     #[test]
test_new()223     fn test_new() {
224         let session: Session = Session::new();
225         assert!(session.handle.is_none());
226         let lock = session.info.read().expect("Could not acquire session lock");
227         assert_eq!(lock.current_device_count, 0);
228         assert!(matches!(
229             lock.stats_proto,
230             ProtoNetsimStats { version: Some(_), duration_secs: None, .. }
231         ));
232         assert_eq!(lock.stats_proto.version.clone().unwrap(), get_version().clone());
233         assert_eq!(lock.stats_proto.radio_stats.len(), 0);
234     }
235 
setup_session_start_test() -> (Session, Arc<Mutex<Events>>)236     fn setup_session_start_test() -> (Session, Arc<Mutex<Events>>) {
237         let mut session = Session::new_internal(false);
238         let mut events = events::test::new();
239         let events_rx = events::test::subscribe(&mut events);
240         session.start(events_rx);
241         (session, events)
242     }
243 
get_stats_proto(session: &Session) -> ProtoNetsimStats244     fn get_stats_proto(session: &Session) -> ProtoNetsimStats {
245         session.info.read().expect("Could not acquire session lock").stats_proto.clone()
246     }
247 
248     #[test]
test_start_and_shutdown()249     fn test_start_and_shutdown() {
250         let (mut session, mut events) = setup_session_start_test();
251 
252         // we want to be able to check the session time gets incremented
253         std::thread::sleep(std::time::Duration::from_secs(1));
254 
255         // publish the shutdown afterwards to cause the separate thread to stop
256         events::test::publish(
257             &mut events,
258             Event::ShutDown(ShutDown { reason: "Stop the session".to_string() }),
259         );
260 
261         // join the handle
262         session.handle.take().map(JoinHandle::join);
263 
264         let stats_proto = get_stats_proto(&session);
265 
266         // check device counts are missing if no device add/remove events occurred
267         assert!(stats_proto.device_count.is_none());
268 
269         assert!(stats_proto.peak_concurrent_devices.is_none());
270 
271         // check the session time is > 0
272         assert!(stats_proto.duration_secs.unwrap() > 0u64);
273     }
274 
275     #[test]
test_start_and_stop()276     fn test_start_and_stop() {
277         let (session, mut events) = setup_session_start_test();
278 
279         // we want to be able to check the session time gets incremented
280         std::thread::sleep(std::time::Duration::from_secs(1));
281 
282         // publish the shutdown which is required when using `session.stop()`
283         events::test::publish(
284             &mut events,
285             Event::ShutDown(ShutDown { reason: "Stop the session".to_string() }),
286         );
287 
288         // should not panic or deadlock
289         session.stop().unwrap();
290     }
291 
292     // Tests for session.rs involving devices
293     #[test]
test_start_and_device_add()294     fn test_start_and_device_add() {
295         let (mut session, mut events) = setup_session_start_test();
296 
297         // we want to be able to check the session time gets incremented
298         std::thread::sleep(std::time::Duration::from_secs(1));
299 
300         // Create a device, publishing DeviceAdded event
301         events::test::publish(
302             &mut events,
303             Event::DeviceAdded(DeviceAdded {
304                 builtin: false,
305                 id: DeviceIdentifier(1),
306                 device_stats: ProtoDeviceStats {
307                     kind: Some(TEST_DEVICE_KIND.to_string()),
308                     ..Default::default()
309                 },
310                 ..Default::default()
311             }),
312         );
313 
314         // publish the shutdown afterwards to cause the separate thread to stop
315         events::test::publish(
316             &mut events,
317             Event::ShutDown(ShutDown { reason: "Stop the session".to_string() }),
318         );
319 
320         // join the handle
321         session.handle.take().map(JoinHandle::join);
322 
323         // check device counts were incremented
324         assert_eq!(
325             session.info.read().expect("Could not acquire session lock").current_device_count,
326             1i32
327         );
328         let stats_proto = get_stats_proto(&session);
329 
330         assert_eq!(stats_proto.device_count.unwrap(), 1i32);
331         assert_eq!(stats_proto.peak_concurrent_devices.unwrap(), 1i32);
332         // check the session time is > 0
333         assert!(stats_proto.duration_secs.unwrap() > 0u64);
334         // check the device stats is populated
335         assert_eq!(stats_proto.device_stats.len(), 1);
336     }
337 
338     #[test]
test_start_and_device_add_and_remove()339     fn test_start_and_device_add_and_remove() {
340         let (mut session, mut events) = setup_session_start_test();
341 
342         // we want to be able to check the session time gets incremented
343         std::thread::sleep(std::time::Duration::from_secs(1));
344 
345         // Create a device, publishing DeviceAdded event
346         events::test::publish(
347             &mut events,
348             Event::DeviceAdded(DeviceAdded {
349                 builtin: false,
350                 id: DeviceIdentifier(1),
351                 device_stats: ProtoDeviceStats {
352                     kind: Some(TEST_DEVICE_KIND.to_string()),
353                     ..Default::default()
354                 },
355                 ..Default::default()
356             }),
357         );
358 
359         events::test::publish(
360             &mut events,
361             Event::DeviceRemoved(DeviceRemoved {
362                 builtin: false,
363                 id: DeviceIdentifier(1),
364                 ..Default::default()
365             }),
366         );
367 
368         // Create another device, publishing DeviceAdded event
369         events::test::publish(
370             &mut events,
371             Event::DeviceAdded(DeviceAdded {
372                 builtin: false,
373                 id: DeviceIdentifier(2),
374                 device_stats: ProtoDeviceStats {
375                     kind: Some(TEST_DEVICE_KIND.to_string()),
376                     ..Default::default()
377                 },
378                 ..Default::default()
379             }),
380         );
381 
382         events::test::publish(
383             &mut events,
384             Event::DeviceRemoved(DeviceRemoved {
385                 builtin: false,
386                 id: DeviceIdentifier(2),
387                 ..Default::default()
388             }),
389         );
390 
391         // publish the shutdown afterwards to cause the separate thread to stop
392         events::test::publish(
393             &mut events,
394             Event::ShutDown(ShutDown { reason: "Stop the session".to_string() }),
395         );
396 
397         // join the handle
398         session.handle.take().map(JoinHandle::join);
399 
400         // check the different device counts were incremented as expected
401         assert_eq!(
402             session.info.read().expect("Could not acquire session lock").current_device_count,
403             0i32
404         );
405         let stats_proto = get_stats_proto(&session);
406         assert_eq!(stats_proto.device_count.unwrap(), 2i32);
407         assert_eq!(stats_proto.peak_concurrent_devices.unwrap(), 1i32);
408 
409         // check the session time is > 0
410         assert!(stats_proto.duration_secs.unwrap() > 0u64);
411         // check the device stats are populated
412         assert_eq!(stats_proto.device_stats.len(), 2);
413     }
414 
415     #[test]
test_start_and_chip_add_and_remove()416     fn test_start_and_chip_add_and_remove() {
417         let (mut session, mut events) = setup_session_start_test();
418 
419         // we want to be able to check the session time gets incremented
420         std::thread::sleep(std::time::Duration::from_secs(1));
421 
422         events::test::publish(
423             &mut events,
424             Event::ChipAdded(ChipAdded {
425                 builtin: false,
426                 chip_id: ChipIdentifier(0),
427                 ..Default::default()
428             }),
429         );
430 
431         std::thread::sleep(std::time::Duration::from_secs(1));
432 
433         // no radio stats until after we remove the chip
434         assert_eq!(get_stats_proto(&session).radio_stats.len(), 0usize);
435 
436         events::test::publish(
437             &mut events,
438             Event::ChipRemoved(ChipRemoved {
439                 chip_id: ChipIdentifier(0),
440                 radio_stats: vec![ProtoRadioStats { ..Default::default() }],
441                 ..Default::default()
442             }),
443         );
444 
445         // publish the shutdown afterwards to cause the separate thread to stop
446         events::test::publish(
447             &mut events,
448             Event::ShutDown(ShutDown { reason: "Stop the session".to_string() }),
449         );
450 
451         // join the handle
452         session.handle.take().map(JoinHandle::join);
453 
454         // check devices were not incremented (here we only added and removed the chip)
455         assert_eq!(
456             session.info.read().expect("Could not acquire session lock").current_device_count,
457             0i32
458         );
459 
460         let stats_proto = get_stats_proto(&session);
461         assert_eq!(stats_proto.radio_stats.len(), 1usize);
462 
463         // these will still be none since no device level events were processed
464         assert!(stats_proto.device_count.is_none());
465 
466         assert!(stats_proto.peak_concurrent_devices.is_none());
467 
468         // check the session time is > 0
469         assert!(stats_proto.duration_secs.unwrap() > 0u64);
470     }
471 }
472