1*bb4ee6a4SAndroid Build Coastguard Worker // Copyright 2021 The ChromiumOS Authors
2*bb4ee6a4SAndroid Build Coastguard Worker // Use of this source code is governed by a BSD-style license that can be
3*bb4ee6a4SAndroid Build Coastguard Worker // found in the LICENSE file.
4*bb4ee6a4SAndroid Build Coastguard Worker
5*bb4ee6a4SAndroid Build Coastguard Worker use std::collections::HashMap;
6*bb4ee6a4SAndroid Build Coastguard Worker use std::sync::Arc;
7*bb4ee6a4SAndroid Build Coastguard Worker use std::sync::Barrier;
8*bb4ee6a4SAndroid Build Coastguard Worker use std::thread;
9*bb4ee6a4SAndroid Build Coastguard Worker use std::time::Duration;
10*bb4ee6a4SAndroid Build Coastguard Worker
11*bb4ee6a4SAndroid Build Coastguard Worker use base::descriptor::FromRawDescriptor;
12*bb4ee6a4SAndroid Build Coastguard Worker use base::descriptor::SafeDescriptor;
13*bb4ee6a4SAndroid Build Coastguard Worker use base::deserialize_with_descriptors;
14*bb4ee6a4SAndroid Build Coastguard Worker use base::Event;
15*bb4ee6a4SAndroid Build Coastguard Worker use base::EventToken;
16*bb4ee6a4SAndroid Build Coastguard Worker use base::ReadNotifier;
17*bb4ee6a4SAndroid Build Coastguard Worker use base::RecvTube;
18*bb4ee6a4SAndroid Build Coastguard Worker use base::SendTube;
19*bb4ee6a4SAndroid Build Coastguard Worker use base::SerializeDescriptors;
20*bb4ee6a4SAndroid Build Coastguard Worker use base::Tube;
21*bb4ee6a4SAndroid Build Coastguard Worker use base::WaitContext;
22*bb4ee6a4SAndroid Build Coastguard Worker use serde::Deserialize;
23*bb4ee6a4SAndroid Build Coastguard Worker use serde::Serialize;
24*bb4ee6a4SAndroid Build Coastguard Worker
25*bb4ee6a4SAndroid Build Coastguard Worker #[derive(Serialize, Deserialize)]
26*bb4ee6a4SAndroid Build Coastguard Worker struct DataStruct {
27*bb4ee6a4SAndroid Build Coastguard Worker x: u32,
28*bb4ee6a4SAndroid Build Coastguard Worker }
29*bb4ee6a4SAndroid Build Coastguard Worker
30*bb4ee6a4SAndroid Build Coastguard Worker #[derive(EventToken, Debug, Eq, PartialEq, Copy, Clone)]
31*bb4ee6a4SAndroid Build Coastguard Worker enum Token {
32*bb4ee6a4SAndroid Build Coastguard Worker ReceivedData,
33*bb4ee6a4SAndroid Build Coastguard Worker }
34*bb4ee6a4SAndroid Build Coastguard Worker
35*bb4ee6a4SAndroid Build Coastguard Worker // Magics to identify which producer sent a message (& detect corruption).
36*bb4ee6a4SAndroid Build Coastguard Worker const PRODUCER_ID_1: u32 = 801279273;
37*bb4ee6a4SAndroid Build Coastguard Worker const PRODUCER_ID_2: u32 = 345234861;
38*bb4ee6a4SAndroid Build Coastguard Worker
39*bb4ee6a4SAndroid Build Coastguard Worker #[track_caller]
test_event_pair(send: Event, recv: Event)40*bb4ee6a4SAndroid Build Coastguard Worker fn test_event_pair(send: Event, recv: Event) {
41*bb4ee6a4SAndroid Build Coastguard Worker send.signal().unwrap();
42*bb4ee6a4SAndroid Build Coastguard Worker recv.wait_timeout(Duration::from_secs(1)).unwrap();
43*bb4ee6a4SAndroid Build Coastguard Worker }
44*bb4ee6a4SAndroid Build Coastguard Worker
45*bb4ee6a4SAndroid Build Coastguard Worker #[test]
send_recv_no_fd()46*bb4ee6a4SAndroid Build Coastguard Worker fn send_recv_no_fd() {
47*bb4ee6a4SAndroid Build Coastguard Worker let (s1, s2) = Tube::pair().unwrap();
48*bb4ee6a4SAndroid Build Coastguard Worker
49*bb4ee6a4SAndroid Build Coastguard Worker let test_msg = "hello world";
50*bb4ee6a4SAndroid Build Coastguard Worker s1.send(&test_msg).unwrap();
51*bb4ee6a4SAndroid Build Coastguard Worker let recv_msg: String = s2.recv().unwrap();
52*bb4ee6a4SAndroid Build Coastguard Worker
53*bb4ee6a4SAndroid Build Coastguard Worker assert_eq!(test_msg, recv_msg);
54*bb4ee6a4SAndroid Build Coastguard Worker }
55*bb4ee6a4SAndroid Build Coastguard Worker
56*bb4ee6a4SAndroid Build Coastguard Worker #[test]
send_recv_one_fd()57*bb4ee6a4SAndroid Build Coastguard Worker fn send_recv_one_fd() {
58*bb4ee6a4SAndroid Build Coastguard Worker #[derive(Serialize, Deserialize)]
59*bb4ee6a4SAndroid Build Coastguard Worker struct EventStruct {
60*bb4ee6a4SAndroid Build Coastguard Worker x: u32,
61*bb4ee6a4SAndroid Build Coastguard Worker b: Event,
62*bb4ee6a4SAndroid Build Coastguard Worker }
63*bb4ee6a4SAndroid Build Coastguard Worker
64*bb4ee6a4SAndroid Build Coastguard Worker let (s1, s2) = Tube::pair().unwrap();
65*bb4ee6a4SAndroid Build Coastguard Worker
66*bb4ee6a4SAndroid Build Coastguard Worker let test_msg = EventStruct {
67*bb4ee6a4SAndroid Build Coastguard Worker x: 100,
68*bb4ee6a4SAndroid Build Coastguard Worker b: Event::new().unwrap(),
69*bb4ee6a4SAndroid Build Coastguard Worker };
70*bb4ee6a4SAndroid Build Coastguard Worker s1.send(&test_msg).unwrap();
71*bb4ee6a4SAndroid Build Coastguard Worker let recv_msg: EventStruct = s2.recv().unwrap();
72*bb4ee6a4SAndroid Build Coastguard Worker
73*bb4ee6a4SAndroid Build Coastguard Worker assert_eq!(test_msg.x, recv_msg.x);
74*bb4ee6a4SAndroid Build Coastguard Worker
75*bb4ee6a4SAndroid Build Coastguard Worker test_event_pair(test_msg.b, recv_msg.b);
76*bb4ee6a4SAndroid Build Coastguard Worker }
77*bb4ee6a4SAndroid Build Coastguard Worker
78*bb4ee6a4SAndroid Build Coastguard Worker #[test]
send_recv_event()79*bb4ee6a4SAndroid Build Coastguard Worker fn send_recv_event() {
80*bb4ee6a4SAndroid Build Coastguard Worker let (req, res) = Tube::pair().unwrap();
81*bb4ee6a4SAndroid Build Coastguard Worker let e1 = Event::new().unwrap();
82*bb4ee6a4SAndroid Build Coastguard Worker res.send(&e1).unwrap();
83*bb4ee6a4SAndroid Build Coastguard Worker
84*bb4ee6a4SAndroid Build Coastguard Worker let recv_event: Event = req.recv().unwrap();
85*bb4ee6a4SAndroid Build Coastguard Worker recv_event.signal().unwrap();
86*bb4ee6a4SAndroid Build Coastguard Worker e1.wait().unwrap();
87*bb4ee6a4SAndroid Build Coastguard Worker }
88*bb4ee6a4SAndroid Build Coastguard Worker
89*bb4ee6a4SAndroid Build Coastguard Worker /// Send messages to a Tube with the given identifier (see `consume_messages`; we use this to
90*bb4ee6a4SAndroid Build Coastguard Worker /// track different message producers).
91*bb4ee6a4SAndroid Build Coastguard Worker #[track_caller]
produce_messages(tube: SendTube, data: u32, barrier: Arc<Barrier>) -> SendTube92*bb4ee6a4SAndroid Build Coastguard Worker fn produce_messages(tube: SendTube, data: u32, barrier: Arc<Barrier>) -> SendTube {
93*bb4ee6a4SAndroid Build Coastguard Worker let data = DataStruct { x: data };
94*bb4ee6a4SAndroid Build Coastguard Worker barrier.wait();
95*bb4ee6a4SAndroid Build Coastguard Worker for _ in 0..100 {
96*bb4ee6a4SAndroid Build Coastguard Worker tube.send(&data).unwrap();
97*bb4ee6a4SAndroid Build Coastguard Worker }
98*bb4ee6a4SAndroid Build Coastguard Worker tube
99*bb4ee6a4SAndroid Build Coastguard Worker }
100*bb4ee6a4SAndroid Build Coastguard Worker
101*bb4ee6a4SAndroid Build Coastguard Worker /// Consumes the given number of messages from a Tube, returning the number messages read with
102*bb4ee6a4SAndroid Build Coastguard Worker /// each producer ID.
103*bb4ee6a4SAndroid Build Coastguard Worker #[track_caller]
consume_messages( tube: RecvTube, count: usize, barrier: Arc<Barrier>, ) -> (RecvTube, usize, usize)104*bb4ee6a4SAndroid Build Coastguard Worker fn consume_messages(
105*bb4ee6a4SAndroid Build Coastguard Worker tube: RecvTube,
106*bb4ee6a4SAndroid Build Coastguard Worker count: usize,
107*bb4ee6a4SAndroid Build Coastguard Worker barrier: Arc<Barrier>,
108*bb4ee6a4SAndroid Build Coastguard Worker ) -> (RecvTube, usize, usize) {
109*bb4ee6a4SAndroid Build Coastguard Worker barrier.wait();
110*bb4ee6a4SAndroid Build Coastguard Worker
111*bb4ee6a4SAndroid Build Coastguard Worker let mut id1_count = 0usize;
112*bb4ee6a4SAndroid Build Coastguard Worker let mut id2_count = 0usize;
113*bb4ee6a4SAndroid Build Coastguard Worker
114*bb4ee6a4SAndroid Build Coastguard Worker for _ in 0..count {
115*bb4ee6a4SAndroid Build Coastguard Worker let msg = tube.recv::<DataStruct>().unwrap();
116*bb4ee6a4SAndroid Build Coastguard Worker match msg.x {
117*bb4ee6a4SAndroid Build Coastguard Worker PRODUCER_ID_1 => id1_count += 1,
118*bb4ee6a4SAndroid Build Coastguard Worker PRODUCER_ID_2 => id2_count += 1,
119*bb4ee6a4SAndroid Build Coastguard Worker _ => panic!(
120*bb4ee6a4SAndroid Build Coastguard Worker "want message with ID {} or {}; got message w/ ID {}.",
121*bb4ee6a4SAndroid Build Coastguard Worker PRODUCER_ID_1, PRODUCER_ID_2, msg.x
122*bb4ee6a4SAndroid Build Coastguard Worker ),
123*bb4ee6a4SAndroid Build Coastguard Worker }
124*bb4ee6a4SAndroid Build Coastguard Worker }
125*bb4ee6a4SAndroid Build Coastguard Worker (tube, id1_count, id2_count)
126*bb4ee6a4SAndroid Build Coastguard Worker }
127*bb4ee6a4SAndroid Build Coastguard Worker
128*bb4ee6a4SAndroid Build Coastguard Worker #[test]
test_serialize_tube_pair()129*bb4ee6a4SAndroid Build Coastguard Worker fn test_serialize_tube_pair() {
130*bb4ee6a4SAndroid Build Coastguard Worker let (tube_send, tube_recv) = Tube::pair().unwrap();
131*bb4ee6a4SAndroid Build Coastguard Worker
132*bb4ee6a4SAndroid Build Coastguard Worker // Serialize the Tube
133*bb4ee6a4SAndroid Build Coastguard Worker let msg_serialize = SerializeDescriptors::new(&tube_send);
134*bb4ee6a4SAndroid Build Coastguard Worker let serialized = serde_json::to_vec(&msg_serialize).unwrap();
135*bb4ee6a4SAndroid Build Coastguard Worker let msg_descriptors = msg_serialize.into_descriptors();
136*bb4ee6a4SAndroid Build Coastguard Worker
137*bb4ee6a4SAndroid Build Coastguard Worker // Deserialize the Tube
138*bb4ee6a4SAndroid Build Coastguard Worker let msg_descriptors_safe = msg_descriptors.into_iter().map(|v|
139*bb4ee6a4SAndroid Build Coastguard Worker // SAFETY: `v` is expected to be valid
140*bb4ee6a4SAndroid Build Coastguard Worker unsafe { SafeDescriptor::from_raw_descriptor(v) });
141*bb4ee6a4SAndroid Build Coastguard Worker let tube_deserialized: Tube =
142*bb4ee6a4SAndroid Build Coastguard Worker deserialize_with_descriptors(|| serde_json::from_slice(&serialized), msg_descriptors_safe)
143*bb4ee6a4SAndroid Build Coastguard Worker .unwrap();
144*bb4ee6a4SAndroid Build Coastguard Worker
145*bb4ee6a4SAndroid Build Coastguard Worker // Send a message through deserialized Tube
146*bb4ee6a4SAndroid Build Coastguard Worker tube_deserialized.send(&"hi".to_string()).unwrap();
147*bb4ee6a4SAndroid Build Coastguard Worker
148*bb4ee6a4SAndroid Build Coastguard Worker // Wait for the message to arrive
149*bb4ee6a4SAndroid Build Coastguard Worker let wait_ctx: WaitContext<Token> =
150*bb4ee6a4SAndroid Build Coastguard Worker WaitContext::build_with(&[(tube_recv.get_read_notifier(), Token::ReceivedData)]).unwrap();
151*bb4ee6a4SAndroid Build Coastguard Worker let events = wait_ctx.wait_timeout(Duration::from_secs(10)).unwrap();
152*bb4ee6a4SAndroid Build Coastguard Worker let tokens: Vec<Token> = events
153*bb4ee6a4SAndroid Build Coastguard Worker .iter()
154*bb4ee6a4SAndroid Build Coastguard Worker .filter(|e| e.is_readable)
155*bb4ee6a4SAndroid Build Coastguard Worker .map(|e| e.token)
156*bb4ee6a4SAndroid Build Coastguard Worker .collect();
157*bb4ee6a4SAndroid Build Coastguard Worker assert_eq!(tokens, vec! {Token::ReceivedData});
158*bb4ee6a4SAndroid Build Coastguard Worker
159*bb4ee6a4SAndroid Build Coastguard Worker assert_eq!(tube_recv.recv::<String>().unwrap(), "hi");
160*bb4ee6a4SAndroid Build Coastguard Worker }
161*bb4ee6a4SAndroid Build Coastguard Worker
162*bb4ee6a4SAndroid Build Coastguard Worker #[test]
send_recv_mpsc()163*bb4ee6a4SAndroid Build Coastguard Worker fn send_recv_mpsc() {
164*bb4ee6a4SAndroid Build Coastguard Worker let (p1, consumer) = Tube::directional_pair().unwrap();
165*bb4ee6a4SAndroid Build Coastguard Worker let p2 = p1.try_clone().unwrap();
166*bb4ee6a4SAndroid Build Coastguard Worker let start_block_p1 = Arc::new(Barrier::new(3));
167*bb4ee6a4SAndroid Build Coastguard Worker let start_block_p2 = start_block_p1.clone();
168*bb4ee6a4SAndroid Build Coastguard Worker let start_block_consumer = start_block_p1.clone();
169*bb4ee6a4SAndroid Build Coastguard Worker
170*bb4ee6a4SAndroid Build Coastguard Worker let p1_thread = thread::spawn(move || produce_messages(p1, PRODUCER_ID_1, start_block_p1));
171*bb4ee6a4SAndroid Build Coastguard Worker let p2_thread = thread::spawn(move || produce_messages(p2, PRODUCER_ID_2, start_block_p2));
172*bb4ee6a4SAndroid Build Coastguard Worker
173*bb4ee6a4SAndroid Build Coastguard Worker let (_tube, id1_count, id2_count) = consume_messages(consumer, 200, start_block_consumer);
174*bb4ee6a4SAndroid Build Coastguard Worker assert_eq!(id1_count, 100);
175*bb4ee6a4SAndroid Build Coastguard Worker assert_eq!(id2_count, 100);
176*bb4ee6a4SAndroid Build Coastguard Worker
177*bb4ee6a4SAndroid Build Coastguard Worker p1_thread.join().unwrap();
178*bb4ee6a4SAndroid Build Coastguard Worker p2_thread.join().unwrap();
179*bb4ee6a4SAndroid Build Coastguard Worker }
180*bb4ee6a4SAndroid Build Coastguard Worker
181*bb4ee6a4SAndroid Build Coastguard Worker #[test]
send_recv_hash_map()182*bb4ee6a4SAndroid Build Coastguard Worker fn send_recv_hash_map() {
183*bb4ee6a4SAndroid Build Coastguard Worker let (s1, s2) = Tube::pair().unwrap();
184*bb4ee6a4SAndroid Build Coastguard Worker
185*bb4ee6a4SAndroid Build Coastguard Worker let mut test_msg = HashMap::new();
186*bb4ee6a4SAndroid Build Coastguard Worker test_msg.insert("Red".to_owned(), Event::new().unwrap());
187*bb4ee6a4SAndroid Build Coastguard Worker test_msg.insert("White".to_owned(), Event::new().unwrap());
188*bb4ee6a4SAndroid Build Coastguard Worker test_msg.insert("Blue".to_owned(), Event::new().unwrap());
189*bb4ee6a4SAndroid Build Coastguard Worker test_msg.insert("Orange".to_owned(), Event::new().unwrap());
190*bb4ee6a4SAndroid Build Coastguard Worker test_msg.insert("Green".to_owned(), Event::new().unwrap());
191*bb4ee6a4SAndroid Build Coastguard Worker s1.send(&test_msg).unwrap();
192*bb4ee6a4SAndroid Build Coastguard Worker let mut recv_msg: HashMap<String, Event> = s2.recv().unwrap();
193*bb4ee6a4SAndroid Build Coastguard Worker
194*bb4ee6a4SAndroid Build Coastguard Worker let mut test_msg_keys: Vec<_> = test_msg.keys().collect();
195*bb4ee6a4SAndroid Build Coastguard Worker test_msg_keys.sort();
196*bb4ee6a4SAndroid Build Coastguard Worker let mut recv_msg_keys: Vec<_> = recv_msg.keys().collect();
197*bb4ee6a4SAndroid Build Coastguard Worker recv_msg_keys.sort();
198*bb4ee6a4SAndroid Build Coastguard Worker assert_eq!(test_msg_keys, recv_msg_keys);
199*bb4ee6a4SAndroid Build Coastguard Worker
200*bb4ee6a4SAndroid Build Coastguard Worker for (key, test_event) in test_msg {
201*bb4ee6a4SAndroid Build Coastguard Worker let recv_event = recv_msg.remove(&key).unwrap();
202*bb4ee6a4SAndroid Build Coastguard Worker test_event_pair(test_event, recv_event);
203*bb4ee6a4SAndroid Build Coastguard Worker }
204*bb4ee6a4SAndroid Build Coastguard Worker }
205