1*cf78ab8cSAndroid Build Coastguard Worker // Copyright 2023 Google LLC
2*cf78ab8cSAndroid Build Coastguard Worker //
3*cf78ab8cSAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License");
4*cf78ab8cSAndroid Build Coastguard Worker // you may not use this file except in compliance with the License.
5*cf78ab8cSAndroid Build Coastguard Worker // You may obtain a copy of the License at
6*cf78ab8cSAndroid Build Coastguard Worker //
7*cf78ab8cSAndroid Build Coastguard Worker // https://www.apache.org/licenses/LICENSE-2.0
8*cf78ab8cSAndroid Build Coastguard Worker //
9*cf78ab8cSAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
10*cf78ab8cSAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS,
11*cf78ab8cSAndroid Build Coastguard Worker // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12*cf78ab8cSAndroid Build Coastguard Worker // See the License for the specific language governing permissions and
13*cf78ab8cSAndroid Build Coastguard Worker // limitations under the License.
14*cf78ab8cSAndroid Build Coastguard Worker
15*cf78ab8cSAndroid Build Coastguard Worker //! A module with mpmc channels for distributing global events.
16*cf78ab8cSAndroid Build Coastguard Worker
17*cf78ab8cSAndroid Build Coastguard Worker use netsim_proto::common::ChipKind;
18*cf78ab8cSAndroid Build Coastguard Worker use std::sync::mpsc::{channel, Receiver, Sender};
19*cf78ab8cSAndroid Build Coastguard Worker
20*cf78ab8cSAndroid Build Coastguard Worker use crate::devices::chip::ChipIdentifier;
21*cf78ab8cSAndroid Build Coastguard Worker use crate::devices::device::DeviceIdentifier;
22*cf78ab8cSAndroid Build Coastguard Worker use netsim_proto::stats::{
23*cf78ab8cSAndroid Build Coastguard Worker NetsimDeviceStats as ProtoDeviceStats, NetsimRadioStats as ProtoRadioStats,
24*cf78ab8cSAndroid Build Coastguard Worker };
25*cf78ab8cSAndroid Build Coastguard Worker
26*cf78ab8cSAndroid Build Coastguard Worker use std::sync::{Arc, Mutex, OnceLock};
27*cf78ab8cSAndroid Build Coastguard Worker
28*cf78ab8cSAndroid Build Coastguard Worker // Publish the event to all subscribers
publish(event: Event)29*cf78ab8cSAndroid Build Coastguard Worker pub fn publish(event: Event) {
30*cf78ab8cSAndroid Build Coastguard Worker get_events().lock().expect("Failed to acquire lock on events").publish(event);
31*cf78ab8cSAndroid Build Coastguard Worker }
32*cf78ab8cSAndroid Build Coastguard Worker
33*cf78ab8cSAndroid Build Coastguard Worker // Subscribe to events over the receiver
subscribe() -> Receiver<Event>34*cf78ab8cSAndroid Build Coastguard Worker pub fn subscribe() -> Receiver<Event> {
35*cf78ab8cSAndroid Build Coastguard Worker get_events().lock().expect("Failed to acquire locks on events").subscribe()
36*cf78ab8cSAndroid Build Coastguard Worker }
37*cf78ab8cSAndroid Build Coastguard Worker
38*cf78ab8cSAndroid Build Coastguard Worker #[derive(Clone, Debug, Default)]
39*cf78ab8cSAndroid Build Coastguard Worker pub struct DeviceAdded {
40*cf78ab8cSAndroid Build Coastguard Worker pub id: DeviceIdentifier,
41*cf78ab8cSAndroid Build Coastguard Worker pub name: String,
42*cf78ab8cSAndroid Build Coastguard Worker pub builtin: bool,
43*cf78ab8cSAndroid Build Coastguard Worker pub device_stats: ProtoDeviceStats,
44*cf78ab8cSAndroid Build Coastguard Worker }
45*cf78ab8cSAndroid Build Coastguard Worker
46*cf78ab8cSAndroid Build Coastguard Worker #[derive(Clone, Debug, Default)]
47*cf78ab8cSAndroid Build Coastguard Worker pub struct DeviceRemoved {
48*cf78ab8cSAndroid Build Coastguard Worker pub id: DeviceIdentifier,
49*cf78ab8cSAndroid Build Coastguard Worker pub name: String,
50*cf78ab8cSAndroid Build Coastguard Worker pub builtin: bool,
51*cf78ab8cSAndroid Build Coastguard Worker }
52*cf78ab8cSAndroid Build Coastguard Worker
53*cf78ab8cSAndroid Build Coastguard Worker #[derive(Clone, Debug, Default)]
54*cf78ab8cSAndroid Build Coastguard Worker pub struct DevicePatched {
55*cf78ab8cSAndroid Build Coastguard Worker pub id: DeviceIdentifier,
56*cf78ab8cSAndroid Build Coastguard Worker pub name: String,
57*cf78ab8cSAndroid Build Coastguard Worker }
58*cf78ab8cSAndroid Build Coastguard Worker
59*cf78ab8cSAndroid Build Coastguard Worker #[derive(Clone, Debug, Default)]
60*cf78ab8cSAndroid Build Coastguard Worker pub struct ChipAdded {
61*cf78ab8cSAndroid Build Coastguard Worker pub chip_id: ChipIdentifier,
62*cf78ab8cSAndroid Build Coastguard Worker pub chip_kind: ChipKind,
63*cf78ab8cSAndroid Build Coastguard Worker pub device_name: String,
64*cf78ab8cSAndroid Build Coastguard Worker pub builtin: bool,
65*cf78ab8cSAndroid Build Coastguard Worker }
66*cf78ab8cSAndroid Build Coastguard Worker
67*cf78ab8cSAndroid Build Coastguard Worker #[derive(Clone, Debug, Default)]
68*cf78ab8cSAndroid Build Coastguard Worker pub struct ChipRemoved {
69*cf78ab8cSAndroid Build Coastguard Worker pub chip_id: ChipIdentifier,
70*cf78ab8cSAndroid Build Coastguard Worker pub device_id: DeviceIdentifier,
71*cf78ab8cSAndroid Build Coastguard Worker pub remaining_nonbuiltin_devices: usize,
72*cf78ab8cSAndroid Build Coastguard Worker pub radio_stats: Vec<ProtoRadioStats>,
73*cf78ab8cSAndroid Build Coastguard Worker }
74*cf78ab8cSAndroid Build Coastguard Worker
75*cf78ab8cSAndroid Build Coastguard Worker #[derive(Clone, Debug, Default)]
76*cf78ab8cSAndroid Build Coastguard Worker pub struct ShutDown {
77*cf78ab8cSAndroid Build Coastguard Worker pub reason: String,
78*cf78ab8cSAndroid Build Coastguard Worker }
79*cf78ab8cSAndroid Build Coastguard Worker
80*cf78ab8cSAndroid Build Coastguard Worker /// Event messages shared across various components in a loosely
81*cf78ab8cSAndroid Build Coastguard Worker /// coupled manner.
82*cf78ab8cSAndroid Build Coastguard Worker #[derive(Clone, Debug)]
83*cf78ab8cSAndroid Build Coastguard Worker pub enum Event {
84*cf78ab8cSAndroid Build Coastguard Worker DeviceAdded(DeviceAdded),
85*cf78ab8cSAndroid Build Coastguard Worker DeviceRemoved(DeviceRemoved),
86*cf78ab8cSAndroid Build Coastguard Worker DevicePatched(DevicePatched),
87*cf78ab8cSAndroid Build Coastguard Worker DeviceReset,
88*cf78ab8cSAndroid Build Coastguard Worker ChipAdded(ChipAdded),
89*cf78ab8cSAndroid Build Coastguard Worker ChipRemoved(ChipRemoved),
90*cf78ab8cSAndroid Build Coastguard Worker ShutDown(ShutDown),
91*cf78ab8cSAndroid Build Coastguard Worker }
92*cf78ab8cSAndroid Build Coastguard Worker
93*cf78ab8cSAndroid Build Coastguard Worker static EVENTS: OnceLock<Arc<Mutex<Events>>> = OnceLock::new();
94*cf78ab8cSAndroid Build Coastguard Worker
get_events() -> Arc<Mutex<Events>>95*cf78ab8cSAndroid Build Coastguard Worker pub fn get_events() -> Arc<Mutex<Events>> {
96*cf78ab8cSAndroid Build Coastguard Worker EVENTS.get_or_init(Events::new).clone()
97*cf78ab8cSAndroid Build Coastguard Worker }
98*cf78ab8cSAndroid Build Coastguard Worker
99*cf78ab8cSAndroid Build Coastguard Worker /// A multi-producer, multi-consumer broadcast queue based on
100*cf78ab8cSAndroid Build Coastguard Worker /// `std::sync::mpsc`.
101*cf78ab8cSAndroid Build Coastguard Worker ///
102*cf78ab8cSAndroid Build Coastguard Worker /// Each Event message `published` is seen by all subscribers.
103*cf78ab8cSAndroid Build Coastguard Worker ///
104*cf78ab8cSAndroid Build Coastguard Worker /// Warning: invoke `subscribe()` before `publish()` or else messages
105*cf78ab8cSAndroid Build Coastguard Worker /// will be lost.
106*cf78ab8cSAndroid Build Coastguard Worker ///
107*cf78ab8cSAndroid Build Coastguard Worker pub struct Events {
108*cf78ab8cSAndroid Build Coastguard Worker // For each subscriber this module retrain the sender half and the
109*cf78ab8cSAndroid Build Coastguard Worker // subscriber reads events from the receiver half.
110*cf78ab8cSAndroid Build Coastguard Worker subscribers: Vec<Sender<Event>>,
111*cf78ab8cSAndroid Build Coastguard Worker }
112*cf78ab8cSAndroid Build Coastguard Worker
113*cf78ab8cSAndroid Build Coastguard Worker impl Events {
114*cf78ab8cSAndroid Build Coastguard Worker // Events is always owned by multiple publishers and subscribers
115*cf78ab8cSAndroid Build Coastguard Worker // across threads so return an Arc type.
new() -> Arc<Mutex<Events>>116*cf78ab8cSAndroid Build Coastguard Worker fn new() -> Arc<Mutex<Events>> {
117*cf78ab8cSAndroid Build Coastguard Worker Arc::new(Mutex::new(Self { subscribers: Vec::new() }))
118*cf78ab8cSAndroid Build Coastguard Worker }
119*cf78ab8cSAndroid Build Coastguard Worker
120*cf78ab8cSAndroid Build Coastguard Worker // Creates a new asynchronous channel, returning the receiver
121*cf78ab8cSAndroid Build Coastguard Worker // half. All `Event` messages sent through `publish` will become
122*cf78ab8cSAndroid Build Coastguard Worker // available on the receiver in the same order as it was sent.
subscribe(&mut self) -> Receiver<Event>123*cf78ab8cSAndroid Build Coastguard Worker fn subscribe(&mut self) -> Receiver<Event> {
124*cf78ab8cSAndroid Build Coastguard Worker let (tx, rx) = channel::<Event>();
125*cf78ab8cSAndroid Build Coastguard Worker self.subscribers.push(tx);
126*cf78ab8cSAndroid Build Coastguard Worker rx
127*cf78ab8cSAndroid Build Coastguard Worker }
128*cf78ab8cSAndroid Build Coastguard Worker
129*cf78ab8cSAndroid Build Coastguard Worker // Attempts to send an Event on the events channel.
publish(&mut self, msg: Event)130*cf78ab8cSAndroid Build Coastguard Worker pub fn publish(&mut self, msg: Event) {
131*cf78ab8cSAndroid Build Coastguard Worker if self.subscribers.is_empty() {
132*cf78ab8cSAndroid Build Coastguard Worker log::warn!("No Subscribers to the event: {msg:?}");
133*cf78ab8cSAndroid Build Coastguard Worker } else {
134*cf78ab8cSAndroid Build Coastguard Worker // Any channel with a disconnected receiver will return an
135*cf78ab8cSAndroid Build Coastguard Worker // error and be removed by retain.
136*cf78ab8cSAndroid Build Coastguard Worker log::info!("{msg:?}");
137*cf78ab8cSAndroid Build Coastguard Worker self.subscribers.retain(|subscriber| subscriber.send(msg.clone()).is_ok())
138*cf78ab8cSAndroid Build Coastguard Worker }
139*cf78ab8cSAndroid Build Coastguard Worker }
140*cf78ab8cSAndroid Build Coastguard Worker }
141*cf78ab8cSAndroid Build Coastguard Worker
142*cf78ab8cSAndroid Build Coastguard Worker // Test public functions to allow testing with local Events struct.
143*cf78ab8cSAndroid Build Coastguard Worker #[cfg(test)]
144*cf78ab8cSAndroid Build Coastguard Worker pub mod test {
145*cf78ab8cSAndroid Build Coastguard Worker use super::*;
146*cf78ab8cSAndroid Build Coastguard Worker
new() -> Arc<Mutex<Events>>147*cf78ab8cSAndroid Build Coastguard Worker pub fn new() -> Arc<Mutex<Events>> {
148*cf78ab8cSAndroid Build Coastguard Worker Events::new()
149*cf78ab8cSAndroid Build Coastguard Worker }
150*cf78ab8cSAndroid Build Coastguard Worker
publish(s: &mut Arc<Mutex<Events>>, msg: Event)151*cf78ab8cSAndroid Build Coastguard Worker pub fn publish(s: &mut Arc<Mutex<Events>>, msg: Event) {
152*cf78ab8cSAndroid Build Coastguard Worker s.lock().unwrap().publish(msg);
153*cf78ab8cSAndroid Build Coastguard Worker }
154*cf78ab8cSAndroid Build Coastguard Worker
subscribe(s: &mut Arc<Mutex<Events>>) -> Receiver<Event>155*cf78ab8cSAndroid Build Coastguard Worker pub fn subscribe(s: &mut Arc<Mutex<Events>>) -> Receiver<Event> {
156*cf78ab8cSAndroid Build Coastguard Worker s.lock().unwrap().subscribe()
157*cf78ab8cSAndroid Build Coastguard Worker }
158*cf78ab8cSAndroid Build Coastguard Worker }
159*cf78ab8cSAndroid Build Coastguard Worker
160*cf78ab8cSAndroid Build Coastguard Worker #[cfg(test)]
161*cf78ab8cSAndroid Build Coastguard Worker mod tests {
162*cf78ab8cSAndroid Build Coastguard Worker use super::Events;
163*cf78ab8cSAndroid Build Coastguard Worker use super::*;
164*cf78ab8cSAndroid Build Coastguard Worker use std::sync::Arc;
165*cf78ab8cSAndroid Build Coastguard Worker use std::thread;
166*cf78ab8cSAndroid Build Coastguard Worker
167*cf78ab8cSAndroid Build Coastguard Worker #[test]
test_subscribe_and_publish()168*cf78ab8cSAndroid Build Coastguard Worker fn test_subscribe_and_publish() {
169*cf78ab8cSAndroid Build Coastguard Worker let events = Events::new();
170*cf78ab8cSAndroid Build Coastguard Worker
171*cf78ab8cSAndroid Build Coastguard Worker let events_clone = Arc::clone(&events);
172*cf78ab8cSAndroid Build Coastguard Worker let rx = events_clone.lock().unwrap().subscribe();
173*cf78ab8cSAndroid Build Coastguard Worker let handle = thread::spawn(move || match rx.recv() {
174*cf78ab8cSAndroid Build Coastguard Worker Ok(Event::DeviceAdded(DeviceAdded { id, name, builtin, device_stats })) => {
175*cf78ab8cSAndroid Build Coastguard Worker assert_eq!(id.0, 123);
176*cf78ab8cSAndroid Build Coastguard Worker assert_eq!(name, "Device1");
177*cf78ab8cSAndroid Build Coastguard Worker assert!(!builtin);
178*cf78ab8cSAndroid Build Coastguard Worker assert_eq!(device_stats, ProtoDeviceStats::new());
179*cf78ab8cSAndroid Build Coastguard Worker }
180*cf78ab8cSAndroid Build Coastguard Worker _ => panic!("Unexpected event"),
181*cf78ab8cSAndroid Build Coastguard Worker });
182*cf78ab8cSAndroid Build Coastguard Worker
183*cf78ab8cSAndroid Build Coastguard Worker events.lock().unwrap().publish(Event::DeviceAdded(DeviceAdded {
184*cf78ab8cSAndroid Build Coastguard Worker id: DeviceIdentifier(123),
185*cf78ab8cSAndroid Build Coastguard Worker name: "Device1".into(),
186*cf78ab8cSAndroid Build Coastguard Worker builtin: false,
187*cf78ab8cSAndroid Build Coastguard Worker device_stats: ProtoDeviceStats::new(),
188*cf78ab8cSAndroid Build Coastguard Worker }));
189*cf78ab8cSAndroid Build Coastguard Worker
190*cf78ab8cSAndroid Build Coastguard Worker // Wait for the other thread to process the message.
191*cf78ab8cSAndroid Build Coastguard Worker handle.join().unwrap();
192*cf78ab8cSAndroid Build Coastguard Worker }
193*cf78ab8cSAndroid Build Coastguard Worker
194*cf78ab8cSAndroid Build Coastguard Worker #[test]
test_publish_to_multiple_subscribers()195*cf78ab8cSAndroid Build Coastguard Worker fn test_publish_to_multiple_subscribers() {
196*cf78ab8cSAndroid Build Coastguard Worker let events = Events::new();
197*cf78ab8cSAndroid Build Coastguard Worker
198*cf78ab8cSAndroid Build Coastguard Worker let num_subscribers = 10;
199*cf78ab8cSAndroid Build Coastguard Worker let mut handles = Vec::with_capacity(num_subscribers);
200*cf78ab8cSAndroid Build Coastguard Worker for _ in 0..num_subscribers {
201*cf78ab8cSAndroid Build Coastguard Worker let events_clone = Arc::clone(&events);
202*cf78ab8cSAndroid Build Coastguard Worker let rx = events_clone.lock().unwrap().subscribe();
203*cf78ab8cSAndroid Build Coastguard Worker let handle = thread::spawn(move || match rx.recv() {
204*cf78ab8cSAndroid Build Coastguard Worker Ok(Event::DeviceAdded(DeviceAdded { id, name, builtin, device_stats })) => {
205*cf78ab8cSAndroid Build Coastguard Worker assert_eq!(id.0, 123);
206*cf78ab8cSAndroid Build Coastguard Worker assert_eq!(name, "Device1");
207*cf78ab8cSAndroid Build Coastguard Worker assert!(!builtin);
208*cf78ab8cSAndroid Build Coastguard Worker assert_eq!(device_stats, ProtoDeviceStats::new());
209*cf78ab8cSAndroid Build Coastguard Worker }
210*cf78ab8cSAndroid Build Coastguard Worker _ => panic!("Unexpected event"),
211*cf78ab8cSAndroid Build Coastguard Worker });
212*cf78ab8cSAndroid Build Coastguard Worker handles.push(handle);
213*cf78ab8cSAndroid Build Coastguard Worker }
214*cf78ab8cSAndroid Build Coastguard Worker
215*cf78ab8cSAndroid Build Coastguard Worker events.lock().unwrap().publish(Event::DeviceAdded(DeviceAdded {
216*cf78ab8cSAndroid Build Coastguard Worker id: DeviceIdentifier(123),
217*cf78ab8cSAndroid Build Coastguard Worker name: "Device1".into(),
218*cf78ab8cSAndroid Build Coastguard Worker builtin: false,
219*cf78ab8cSAndroid Build Coastguard Worker device_stats: ProtoDeviceStats::new(),
220*cf78ab8cSAndroid Build Coastguard Worker }));
221*cf78ab8cSAndroid Build Coastguard Worker
222*cf78ab8cSAndroid Build Coastguard Worker // Wait for the other threads to process the message.
223*cf78ab8cSAndroid Build Coastguard Worker for handle in handles {
224*cf78ab8cSAndroid Build Coastguard Worker handle.join().unwrap();
225*cf78ab8cSAndroid Build Coastguard Worker }
226*cf78ab8cSAndroid Build Coastguard Worker }
227*cf78ab8cSAndroid Build Coastguard Worker
228*cf78ab8cSAndroid Build Coastguard Worker #[test]
229*cf78ab8cSAndroid Build Coastguard Worker // Test the case where the subscriber half of the channel returned
230*cf78ab8cSAndroid Build Coastguard Worker // by subscribe() is dropped. We expect the subscriber to be auto
231*cf78ab8cSAndroid Build Coastguard Worker // removed when send() notices an error.
test_publish_to_dropped_subscriber()232*cf78ab8cSAndroid Build Coastguard Worker fn test_publish_to_dropped_subscriber() {
233*cf78ab8cSAndroid Build Coastguard Worker let events = Events::new();
234*cf78ab8cSAndroid Build Coastguard Worker let rx = events.lock().unwrap().subscribe();
235*cf78ab8cSAndroid Build Coastguard Worker assert_eq!(events.lock().unwrap().subscribers.len(), 1);
236*cf78ab8cSAndroid Build Coastguard Worker std::mem::drop(rx);
237*cf78ab8cSAndroid Build Coastguard Worker events.lock().unwrap().publish(Event::DeviceAdded(DeviceAdded {
238*cf78ab8cSAndroid Build Coastguard Worker id: DeviceIdentifier(123),
239*cf78ab8cSAndroid Build Coastguard Worker name: "Device1".into(),
240*cf78ab8cSAndroid Build Coastguard Worker builtin: false,
241*cf78ab8cSAndroid Build Coastguard Worker device_stats: ProtoDeviceStats::new(),
242*cf78ab8cSAndroid Build Coastguard Worker }));
243*cf78ab8cSAndroid Build Coastguard Worker assert_eq!(events.lock().unwrap().subscribers.len(), 0);
244*cf78ab8cSAndroid Build Coastguard Worker }
245*cf78ab8cSAndroid Build Coastguard Worker }
246