1 // Copyright 2023 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 //! A module to collect and write session stats
16
17 use crate::devices::devices_handler::get_radio_stats;
18 use crate::events::{ChipRemoved, DeviceAdded, Event, ShutDown};
19 use crate::version::get_version;
20 use anyhow::Context;
21 use log::error;
22 use log::info;
23 use netsim_common::system::netsimd_temp_dir;
24 use netsim_proto::stats::NetsimStats as ProtoNetsimStats;
25 use protobuf_json_mapping::print_to_string;
26 use std::fs::File;
27 use std::io::Write;
28 use std::sync::mpsc::Receiver;
29 use std::sync::Arc;
30 use std::sync::RwLock;
31 use std::sync::RwLockWriteGuard;
32 use std::thread::{Builder, JoinHandle};
33 use std::time::{Duration, Instant};
34
35 const WRITE_INTERVAL: Duration = Duration::from_secs(10);
36
37 pub struct Session {
38 // Handle for the session monitor thread
39 handle: Option<JoinHandle<String>>,
40 // Session info is accessed by multiple threads
41 info: Arc<RwLock<SessionInfo>>,
42 }
43
44 // Fields accessed by threads
45 struct SessionInfo {
46 stats_proto: ProtoNetsimStats,
47 current_device_count: i32,
48 session_start: Instant,
49 write_json: bool,
50 }
51
52 impl Session {
new() -> Self53 pub fn new() -> Self {
54 Self::new_internal(true)
55 }
new_internal(write_json: bool) -> Self56 fn new_internal(write_json: bool) -> Self {
57 Session {
58 handle: None,
59 info: Arc::new(RwLock::new(SessionInfo {
60 stats_proto: ProtoNetsimStats {
61 version: Some(get_version()),
62 ..Default::default()
63 },
64 current_device_count: 0,
65 session_start: Instant::now(),
66 write_json,
67 })),
68 }
69 }
70
71 // Start the netsim states session.
72 //
73 // Starts the session monitor thread to handle events and
74 // write session stats to json file on event and periodically.
start(&mut self, events_rx: Receiver<Event>) -> &mut Self75 pub fn start(&mut self, events_rx: Receiver<Event>) -> &mut Self {
76 let info = Arc::clone(&self.info);
77
78 // Start up session monitor thread
79 self.handle = Some(
80 Builder::new()
81 .name("session_monitor".to_string())
82 .spawn(move || {
83 let mut next_instant = Instant::now() + WRITE_INTERVAL;
84 loop {
85 let mut write_stats = true;
86 let this_instant = Instant::now();
87 let timeout = if next_instant > this_instant {
88 next_instant - this_instant
89 } else {
90 Duration::ZERO
91 };
92 let next_event = events_rx.recv_timeout(timeout);
93
94 // Hold the write lock for the duration of this loop iteration
95 let mut lock = info.write().expect("Could not acquire session lock");
96 match next_event {
97 Ok(Event::ShutDown(ShutDown { reason })) => {
98 // Shutting down, save the session duration and exit
99 update_session_duration(&mut lock);
100 return reason;
101 }
102
103 Ok(Event::DeviceRemoved(_)) => {
104 lock.current_device_count -= 1;
105 }
106
107 Ok(Event::DeviceAdded(DeviceAdded { device_stats, .. })) => {
108 // update the current_device_count and peak device usage
109 lock.current_device_count += 1;
110 let current_device_count = lock.current_device_count;
111 // incremement total number of devices started
112 let device_count = lock.stats_proto.device_count();
113 lock.stats_proto.set_device_count(device_count + 1);
114 // track the peak device usage
115 if current_device_count > lock.stats_proto.peak_concurrent_devices()
116 {
117 lock.stats_proto
118 .set_peak_concurrent_devices(current_device_count);
119 }
120 // Add added device's stats
121 lock.stats_proto.device_stats.push(device_stats);
122 }
123
124 Ok(Event::ChipRemoved(ChipRemoved {
125 radio_stats, device_id, ..
126 })) => {
127 // Update the radio stats proto when a
128 // chip is removed. In the case of
129 // bluetooth there will be 2 radios,
130 // otherwise 1
131 for mut r in radio_stats {
132 r.set_device_id(device_id.0);
133 lock.stats_proto.radio_stats.push(r);
134 }
135 }
136
137 Ok(Event::ChipAdded(_)) => {
138 // No session stat update required when Chip is added but do proceed to write stats
139 }
140 _ => {
141 // other events are ignored, check to perform periodic write
142 if next_instant > Instant::now() {
143 write_stats = false;
144 }
145 }
146 }
147 // End of event match - write current stats to json
148 if write_stats {
149 update_session_duration(&mut lock);
150 if lock.write_json {
151 let current_stats = get_current_stats(lock.stats_proto.clone());
152 if let Err(err) = write_stats_to_json(current_stats) {
153 error!("Failed to write stats to json: {err:?}");
154 }
155 }
156 next_instant = Instant::now() + WRITE_INTERVAL;
157 }
158 }
159 })
160 .expect("failed to start session monitor thread"),
161 );
162 self
163 }
164
165 // Stop the netsim stats session.
166 //
167 // Waits for the session monitor thread to finish and writes
168 // the session proto to a json file. Consumes the session.
stop(mut self) -> anyhow::Result<()>169 pub fn stop(mut self) -> anyhow::Result<()> {
170 if !self.handle.as_ref().expect("no session monitor").is_finished() {
171 info!("session monitor active, waiting...");
172 }
173
174 // Synchronize on session monitor thread
175 self.handle.take().map(JoinHandle::join);
176
177 let lock = self.info.read().expect("Could not acquire session lock");
178 if lock.write_json {
179 let current_stats = get_current_stats(lock.stats_proto.clone());
180 write_stats_to_json(current_stats)?;
181 }
182 Ok(())
183 }
184 }
185
186 /// Update session duration
update_session_duration(session_lock: &mut RwLockWriteGuard<'_, SessionInfo>)187 fn update_session_duration(session_lock: &mut RwLockWriteGuard<'_, SessionInfo>) {
188 let duration_secs = session_lock.session_start.elapsed().as_secs();
189 session_lock.stats_proto.set_duration_secs(duration_secs);
190 }
191
192 /// Construct current radio stats
get_current_stats(mut current_stats: ProtoNetsimStats) -> ProtoNetsimStats193 fn get_current_stats(mut current_stats: ProtoNetsimStats) -> ProtoNetsimStats {
194 current_stats.radio_stats.extend(get_radio_stats());
195 current_stats
196 }
197
198 /// Write netsim stats to json file
write_stats_to_json(stats_proto: ProtoNetsimStats) -> anyhow::Result<()>199 fn write_stats_to_json(stats_proto: ProtoNetsimStats) -> anyhow::Result<()> {
200 let filename = netsimd_temp_dir().join("netsim_session_stats.json");
201 let mut file = File::create(filename)?;
202 let json = print_to_string(&stats_proto)?;
203 file.write(json.as_bytes()).context("Unable to write json session stats")?;
204 file.flush()?;
205 Ok(())
206 }
207
208 #[cfg(test)]
209 mod tests {
210 use super::*;
211 use crate::devices::chip::ChipIdentifier;
212 use crate::devices::device::DeviceIdentifier;
213 use crate::events;
214 use crate::events::{ChipAdded, ChipRemoved, DeviceRemoved, Event, Events, ShutDown};
215 use netsim_proto::stats::{
216 NetsimDeviceStats as ProtoDeviceStats, NetsimRadioStats as ProtoRadioStats,
217 };
218 use std::sync::Mutex;
219
220 const TEST_DEVICE_KIND: &str = "TEST_DEVICE";
221
222 #[test]
test_new()223 fn test_new() {
224 let session: Session = Session::new();
225 assert!(session.handle.is_none());
226 let lock = session.info.read().expect("Could not acquire session lock");
227 assert_eq!(lock.current_device_count, 0);
228 assert!(matches!(
229 lock.stats_proto,
230 ProtoNetsimStats { version: Some(_), duration_secs: None, .. }
231 ));
232 assert_eq!(lock.stats_proto.version.clone().unwrap(), get_version().clone());
233 assert_eq!(lock.stats_proto.radio_stats.len(), 0);
234 }
235
setup_session_start_test() -> (Session, Arc<Mutex<Events>>)236 fn setup_session_start_test() -> (Session, Arc<Mutex<Events>>) {
237 let mut session = Session::new_internal(false);
238 let mut events = events::test::new();
239 let events_rx = events::test::subscribe(&mut events);
240 session.start(events_rx);
241 (session, events)
242 }
243
get_stats_proto(session: &Session) -> ProtoNetsimStats244 fn get_stats_proto(session: &Session) -> ProtoNetsimStats {
245 session.info.read().expect("Could not acquire session lock").stats_proto.clone()
246 }
247
248 #[test]
test_start_and_shutdown()249 fn test_start_and_shutdown() {
250 let (mut session, mut events) = setup_session_start_test();
251
252 // we want to be able to check the session time gets incremented
253 std::thread::sleep(std::time::Duration::from_secs(1));
254
255 // publish the shutdown afterwards to cause the separate thread to stop
256 events::test::publish(
257 &mut events,
258 Event::ShutDown(ShutDown { reason: "Stop the session".to_string() }),
259 );
260
261 // join the handle
262 session.handle.take().map(JoinHandle::join);
263
264 let stats_proto = get_stats_proto(&session);
265
266 // check device counts are missing if no device add/remove events occurred
267 assert!(stats_proto.device_count.is_none());
268
269 assert!(stats_proto.peak_concurrent_devices.is_none());
270
271 // check the session time is > 0
272 assert!(stats_proto.duration_secs.unwrap() > 0u64);
273 }
274
275 #[test]
test_start_and_stop()276 fn test_start_and_stop() {
277 let (session, mut events) = setup_session_start_test();
278
279 // we want to be able to check the session time gets incremented
280 std::thread::sleep(std::time::Duration::from_secs(1));
281
282 // publish the shutdown which is required when using `session.stop()`
283 events::test::publish(
284 &mut events,
285 Event::ShutDown(ShutDown { reason: "Stop the session".to_string() }),
286 );
287
288 // should not panic or deadlock
289 session.stop().unwrap();
290 }
291
292 // Tests for session.rs involving devices
293 #[test]
test_start_and_device_add()294 fn test_start_and_device_add() {
295 let (mut session, mut events) = setup_session_start_test();
296
297 // we want to be able to check the session time gets incremented
298 std::thread::sleep(std::time::Duration::from_secs(1));
299
300 // Create a device, publishing DeviceAdded event
301 events::test::publish(
302 &mut events,
303 Event::DeviceAdded(DeviceAdded {
304 builtin: false,
305 id: DeviceIdentifier(1),
306 device_stats: ProtoDeviceStats {
307 kind: Some(TEST_DEVICE_KIND.to_string()),
308 ..Default::default()
309 },
310 ..Default::default()
311 }),
312 );
313
314 // publish the shutdown afterwards to cause the separate thread to stop
315 events::test::publish(
316 &mut events,
317 Event::ShutDown(ShutDown { reason: "Stop the session".to_string() }),
318 );
319
320 // join the handle
321 session.handle.take().map(JoinHandle::join);
322
323 // check device counts were incremented
324 assert_eq!(
325 session.info.read().expect("Could not acquire session lock").current_device_count,
326 1i32
327 );
328 let stats_proto = get_stats_proto(&session);
329
330 assert_eq!(stats_proto.device_count.unwrap(), 1i32);
331 assert_eq!(stats_proto.peak_concurrent_devices.unwrap(), 1i32);
332 // check the session time is > 0
333 assert!(stats_proto.duration_secs.unwrap() > 0u64);
334 // check the device stats is populated
335 assert_eq!(stats_proto.device_stats.len(), 1);
336 }
337
338 #[test]
test_start_and_device_add_and_remove()339 fn test_start_and_device_add_and_remove() {
340 let (mut session, mut events) = setup_session_start_test();
341
342 // we want to be able to check the session time gets incremented
343 std::thread::sleep(std::time::Duration::from_secs(1));
344
345 // Create a device, publishing DeviceAdded event
346 events::test::publish(
347 &mut events,
348 Event::DeviceAdded(DeviceAdded {
349 builtin: false,
350 id: DeviceIdentifier(1),
351 device_stats: ProtoDeviceStats {
352 kind: Some(TEST_DEVICE_KIND.to_string()),
353 ..Default::default()
354 },
355 ..Default::default()
356 }),
357 );
358
359 events::test::publish(
360 &mut events,
361 Event::DeviceRemoved(DeviceRemoved {
362 builtin: false,
363 id: DeviceIdentifier(1),
364 ..Default::default()
365 }),
366 );
367
368 // Create another device, publishing DeviceAdded event
369 events::test::publish(
370 &mut events,
371 Event::DeviceAdded(DeviceAdded {
372 builtin: false,
373 id: DeviceIdentifier(2),
374 device_stats: ProtoDeviceStats {
375 kind: Some(TEST_DEVICE_KIND.to_string()),
376 ..Default::default()
377 },
378 ..Default::default()
379 }),
380 );
381
382 events::test::publish(
383 &mut events,
384 Event::DeviceRemoved(DeviceRemoved {
385 builtin: false,
386 id: DeviceIdentifier(2),
387 ..Default::default()
388 }),
389 );
390
391 // publish the shutdown afterwards to cause the separate thread to stop
392 events::test::publish(
393 &mut events,
394 Event::ShutDown(ShutDown { reason: "Stop the session".to_string() }),
395 );
396
397 // join the handle
398 session.handle.take().map(JoinHandle::join);
399
400 // check the different device counts were incremented as expected
401 assert_eq!(
402 session.info.read().expect("Could not acquire session lock").current_device_count,
403 0i32
404 );
405 let stats_proto = get_stats_proto(&session);
406 assert_eq!(stats_proto.device_count.unwrap(), 2i32);
407 assert_eq!(stats_proto.peak_concurrent_devices.unwrap(), 1i32);
408
409 // check the session time is > 0
410 assert!(stats_proto.duration_secs.unwrap() > 0u64);
411 // check the device stats are populated
412 assert_eq!(stats_proto.device_stats.len(), 2);
413 }
414
415 #[test]
test_start_and_chip_add_and_remove()416 fn test_start_and_chip_add_and_remove() {
417 let (mut session, mut events) = setup_session_start_test();
418
419 // we want to be able to check the session time gets incremented
420 std::thread::sleep(std::time::Duration::from_secs(1));
421
422 events::test::publish(
423 &mut events,
424 Event::ChipAdded(ChipAdded {
425 builtin: false,
426 chip_id: ChipIdentifier(0),
427 ..Default::default()
428 }),
429 );
430
431 std::thread::sleep(std::time::Duration::from_secs(1));
432
433 // no radio stats until after we remove the chip
434 assert_eq!(get_stats_proto(&session).radio_stats.len(), 0usize);
435
436 events::test::publish(
437 &mut events,
438 Event::ChipRemoved(ChipRemoved {
439 chip_id: ChipIdentifier(0),
440 radio_stats: vec![ProtoRadioStats { ..Default::default() }],
441 ..Default::default()
442 }),
443 );
444
445 // publish the shutdown afterwards to cause the separate thread to stop
446 events::test::publish(
447 &mut events,
448 Event::ShutDown(ShutDown { reason: "Stop the session".to_string() }),
449 );
450
451 // join the handle
452 session.handle.take().map(JoinHandle::join);
453
454 // check devices were not incremented (here we only added and removed the chip)
455 assert_eq!(
456 session.info.read().expect("Could not acquire session lock").current_device_count,
457 0i32
458 );
459
460 let stats_proto = get_stats_proto(&session);
461 assert_eq!(stats_proto.radio_stats.len(), 1usize);
462
463 // these will still be none since no device level events were processed
464 assert!(stats_proto.device_count.is_none());
465
466 assert!(stats_proto.peak_concurrent_devices.is_none());
467
468 // check the session time is > 0
469 assert!(stats_proto.duration_secs.unwrap() > 0u64);
470 }
471 }
472