xref: /aosp_15_r20/external/crosvm/base/tests/tube.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1*bb4ee6a4SAndroid Build Coastguard Worker // Copyright 2021 The ChromiumOS Authors
2*bb4ee6a4SAndroid Build Coastguard Worker // Use of this source code is governed by a BSD-style license that can be
3*bb4ee6a4SAndroid Build Coastguard Worker // found in the LICENSE file.
4*bb4ee6a4SAndroid Build Coastguard Worker 
5*bb4ee6a4SAndroid Build Coastguard Worker use std::collections::HashMap;
6*bb4ee6a4SAndroid Build Coastguard Worker use std::sync::Arc;
7*bb4ee6a4SAndroid Build Coastguard Worker use std::sync::Barrier;
8*bb4ee6a4SAndroid Build Coastguard Worker use std::thread;
9*bb4ee6a4SAndroid Build Coastguard Worker use std::time::Duration;
10*bb4ee6a4SAndroid Build Coastguard Worker 
11*bb4ee6a4SAndroid Build Coastguard Worker use base::descriptor::FromRawDescriptor;
12*bb4ee6a4SAndroid Build Coastguard Worker use base::descriptor::SafeDescriptor;
13*bb4ee6a4SAndroid Build Coastguard Worker use base::deserialize_with_descriptors;
14*bb4ee6a4SAndroid Build Coastguard Worker use base::Event;
15*bb4ee6a4SAndroid Build Coastguard Worker use base::EventToken;
16*bb4ee6a4SAndroid Build Coastguard Worker use base::ReadNotifier;
17*bb4ee6a4SAndroid Build Coastguard Worker use base::RecvTube;
18*bb4ee6a4SAndroid Build Coastguard Worker use base::SendTube;
19*bb4ee6a4SAndroid Build Coastguard Worker use base::SerializeDescriptors;
20*bb4ee6a4SAndroid Build Coastguard Worker use base::Tube;
21*bb4ee6a4SAndroid Build Coastguard Worker use base::WaitContext;
22*bb4ee6a4SAndroid Build Coastguard Worker use serde::Deserialize;
23*bb4ee6a4SAndroid Build Coastguard Worker use serde::Serialize;
24*bb4ee6a4SAndroid Build Coastguard Worker 
25*bb4ee6a4SAndroid Build Coastguard Worker #[derive(Serialize, Deserialize)]
26*bb4ee6a4SAndroid Build Coastguard Worker struct DataStruct {
27*bb4ee6a4SAndroid Build Coastguard Worker     x: u32,
28*bb4ee6a4SAndroid Build Coastguard Worker }
29*bb4ee6a4SAndroid Build Coastguard Worker 
30*bb4ee6a4SAndroid Build Coastguard Worker #[derive(EventToken, Debug, Eq, PartialEq, Copy, Clone)]
31*bb4ee6a4SAndroid Build Coastguard Worker enum Token {
32*bb4ee6a4SAndroid Build Coastguard Worker     ReceivedData,
33*bb4ee6a4SAndroid Build Coastguard Worker }
34*bb4ee6a4SAndroid Build Coastguard Worker 
35*bb4ee6a4SAndroid Build Coastguard Worker // Magics to identify which producer sent a message (& detect corruption).
36*bb4ee6a4SAndroid Build Coastguard Worker const PRODUCER_ID_1: u32 = 801279273;
37*bb4ee6a4SAndroid Build Coastguard Worker const PRODUCER_ID_2: u32 = 345234861;
38*bb4ee6a4SAndroid Build Coastguard Worker 
39*bb4ee6a4SAndroid Build Coastguard Worker #[track_caller]
test_event_pair(send: Event, recv: Event)40*bb4ee6a4SAndroid Build Coastguard Worker fn test_event_pair(send: Event, recv: Event) {
41*bb4ee6a4SAndroid Build Coastguard Worker     send.signal().unwrap();
42*bb4ee6a4SAndroid Build Coastguard Worker     recv.wait_timeout(Duration::from_secs(1)).unwrap();
43*bb4ee6a4SAndroid Build Coastguard Worker }
44*bb4ee6a4SAndroid Build Coastguard Worker 
45*bb4ee6a4SAndroid Build Coastguard Worker #[test]
send_recv_no_fd()46*bb4ee6a4SAndroid Build Coastguard Worker fn send_recv_no_fd() {
47*bb4ee6a4SAndroid Build Coastguard Worker     let (s1, s2) = Tube::pair().unwrap();
48*bb4ee6a4SAndroid Build Coastguard Worker 
49*bb4ee6a4SAndroid Build Coastguard Worker     let test_msg = "hello world";
50*bb4ee6a4SAndroid Build Coastguard Worker     s1.send(&test_msg).unwrap();
51*bb4ee6a4SAndroid Build Coastguard Worker     let recv_msg: String = s2.recv().unwrap();
52*bb4ee6a4SAndroid Build Coastguard Worker 
53*bb4ee6a4SAndroid Build Coastguard Worker     assert_eq!(test_msg, recv_msg);
54*bb4ee6a4SAndroid Build Coastguard Worker }
55*bb4ee6a4SAndroid Build Coastguard Worker 
56*bb4ee6a4SAndroid Build Coastguard Worker #[test]
send_recv_one_fd()57*bb4ee6a4SAndroid Build Coastguard Worker fn send_recv_one_fd() {
58*bb4ee6a4SAndroid Build Coastguard Worker     #[derive(Serialize, Deserialize)]
59*bb4ee6a4SAndroid Build Coastguard Worker     struct EventStruct {
60*bb4ee6a4SAndroid Build Coastguard Worker         x: u32,
61*bb4ee6a4SAndroid Build Coastguard Worker         b: Event,
62*bb4ee6a4SAndroid Build Coastguard Worker     }
63*bb4ee6a4SAndroid Build Coastguard Worker 
64*bb4ee6a4SAndroid Build Coastguard Worker     let (s1, s2) = Tube::pair().unwrap();
65*bb4ee6a4SAndroid Build Coastguard Worker 
66*bb4ee6a4SAndroid Build Coastguard Worker     let test_msg = EventStruct {
67*bb4ee6a4SAndroid Build Coastguard Worker         x: 100,
68*bb4ee6a4SAndroid Build Coastguard Worker         b: Event::new().unwrap(),
69*bb4ee6a4SAndroid Build Coastguard Worker     };
70*bb4ee6a4SAndroid Build Coastguard Worker     s1.send(&test_msg).unwrap();
71*bb4ee6a4SAndroid Build Coastguard Worker     let recv_msg: EventStruct = s2.recv().unwrap();
72*bb4ee6a4SAndroid Build Coastguard Worker 
73*bb4ee6a4SAndroid Build Coastguard Worker     assert_eq!(test_msg.x, recv_msg.x);
74*bb4ee6a4SAndroid Build Coastguard Worker 
75*bb4ee6a4SAndroid Build Coastguard Worker     test_event_pair(test_msg.b, recv_msg.b);
76*bb4ee6a4SAndroid Build Coastguard Worker }
77*bb4ee6a4SAndroid Build Coastguard Worker 
78*bb4ee6a4SAndroid Build Coastguard Worker #[test]
send_recv_event()79*bb4ee6a4SAndroid Build Coastguard Worker fn send_recv_event() {
80*bb4ee6a4SAndroid Build Coastguard Worker     let (req, res) = Tube::pair().unwrap();
81*bb4ee6a4SAndroid Build Coastguard Worker     let e1 = Event::new().unwrap();
82*bb4ee6a4SAndroid Build Coastguard Worker     res.send(&e1).unwrap();
83*bb4ee6a4SAndroid Build Coastguard Worker 
84*bb4ee6a4SAndroid Build Coastguard Worker     let recv_event: Event = req.recv().unwrap();
85*bb4ee6a4SAndroid Build Coastguard Worker     recv_event.signal().unwrap();
86*bb4ee6a4SAndroid Build Coastguard Worker     e1.wait().unwrap();
87*bb4ee6a4SAndroid Build Coastguard Worker }
88*bb4ee6a4SAndroid Build Coastguard Worker 
89*bb4ee6a4SAndroid Build Coastguard Worker /// Send messages to a Tube with the given identifier (see `consume_messages`; we use this to
90*bb4ee6a4SAndroid Build Coastguard Worker /// track different message producers).
91*bb4ee6a4SAndroid Build Coastguard Worker #[track_caller]
produce_messages(tube: SendTube, data: u32, barrier: Arc<Barrier>) -> SendTube92*bb4ee6a4SAndroid Build Coastguard Worker fn produce_messages(tube: SendTube, data: u32, barrier: Arc<Barrier>) -> SendTube {
93*bb4ee6a4SAndroid Build Coastguard Worker     let data = DataStruct { x: data };
94*bb4ee6a4SAndroid Build Coastguard Worker     barrier.wait();
95*bb4ee6a4SAndroid Build Coastguard Worker     for _ in 0..100 {
96*bb4ee6a4SAndroid Build Coastguard Worker         tube.send(&data).unwrap();
97*bb4ee6a4SAndroid Build Coastguard Worker     }
98*bb4ee6a4SAndroid Build Coastguard Worker     tube
99*bb4ee6a4SAndroid Build Coastguard Worker }
100*bb4ee6a4SAndroid Build Coastguard Worker 
101*bb4ee6a4SAndroid Build Coastguard Worker /// Consumes the given number of messages from a Tube, returning the number messages read with
102*bb4ee6a4SAndroid Build Coastguard Worker /// each producer ID.
103*bb4ee6a4SAndroid Build Coastguard Worker #[track_caller]
consume_messages( tube: RecvTube, count: usize, barrier: Arc<Barrier>, ) -> (RecvTube, usize, usize)104*bb4ee6a4SAndroid Build Coastguard Worker fn consume_messages(
105*bb4ee6a4SAndroid Build Coastguard Worker     tube: RecvTube,
106*bb4ee6a4SAndroid Build Coastguard Worker     count: usize,
107*bb4ee6a4SAndroid Build Coastguard Worker     barrier: Arc<Barrier>,
108*bb4ee6a4SAndroid Build Coastguard Worker ) -> (RecvTube, usize, usize) {
109*bb4ee6a4SAndroid Build Coastguard Worker     barrier.wait();
110*bb4ee6a4SAndroid Build Coastguard Worker 
111*bb4ee6a4SAndroid Build Coastguard Worker     let mut id1_count = 0usize;
112*bb4ee6a4SAndroid Build Coastguard Worker     let mut id2_count = 0usize;
113*bb4ee6a4SAndroid Build Coastguard Worker 
114*bb4ee6a4SAndroid Build Coastguard Worker     for _ in 0..count {
115*bb4ee6a4SAndroid Build Coastguard Worker         let msg = tube.recv::<DataStruct>().unwrap();
116*bb4ee6a4SAndroid Build Coastguard Worker         match msg.x {
117*bb4ee6a4SAndroid Build Coastguard Worker             PRODUCER_ID_1 => id1_count += 1,
118*bb4ee6a4SAndroid Build Coastguard Worker             PRODUCER_ID_2 => id2_count += 1,
119*bb4ee6a4SAndroid Build Coastguard Worker             _ => panic!(
120*bb4ee6a4SAndroid Build Coastguard Worker                 "want message with ID {} or {}; got message w/ ID {}.",
121*bb4ee6a4SAndroid Build Coastguard Worker                 PRODUCER_ID_1, PRODUCER_ID_2, msg.x
122*bb4ee6a4SAndroid Build Coastguard Worker             ),
123*bb4ee6a4SAndroid Build Coastguard Worker         }
124*bb4ee6a4SAndroid Build Coastguard Worker     }
125*bb4ee6a4SAndroid Build Coastguard Worker     (tube, id1_count, id2_count)
126*bb4ee6a4SAndroid Build Coastguard Worker }
127*bb4ee6a4SAndroid Build Coastguard Worker 
128*bb4ee6a4SAndroid Build Coastguard Worker #[test]
test_serialize_tube_pair()129*bb4ee6a4SAndroid Build Coastguard Worker fn test_serialize_tube_pair() {
130*bb4ee6a4SAndroid Build Coastguard Worker     let (tube_send, tube_recv) = Tube::pair().unwrap();
131*bb4ee6a4SAndroid Build Coastguard Worker 
132*bb4ee6a4SAndroid Build Coastguard Worker     // Serialize the Tube
133*bb4ee6a4SAndroid Build Coastguard Worker     let msg_serialize = SerializeDescriptors::new(&tube_send);
134*bb4ee6a4SAndroid Build Coastguard Worker     let serialized = serde_json::to_vec(&msg_serialize).unwrap();
135*bb4ee6a4SAndroid Build Coastguard Worker     let msg_descriptors = msg_serialize.into_descriptors();
136*bb4ee6a4SAndroid Build Coastguard Worker 
137*bb4ee6a4SAndroid Build Coastguard Worker     // Deserialize the Tube
138*bb4ee6a4SAndroid Build Coastguard Worker     let msg_descriptors_safe = msg_descriptors.into_iter().map(|v|
139*bb4ee6a4SAndroid Build Coastguard Worker             // SAFETY: `v` is expected to be valid
140*bb4ee6a4SAndroid Build Coastguard Worker             unsafe { SafeDescriptor::from_raw_descriptor(v) });
141*bb4ee6a4SAndroid Build Coastguard Worker     let tube_deserialized: Tube =
142*bb4ee6a4SAndroid Build Coastguard Worker         deserialize_with_descriptors(|| serde_json::from_slice(&serialized), msg_descriptors_safe)
143*bb4ee6a4SAndroid Build Coastguard Worker             .unwrap();
144*bb4ee6a4SAndroid Build Coastguard Worker 
145*bb4ee6a4SAndroid Build Coastguard Worker     // Send a message through deserialized Tube
146*bb4ee6a4SAndroid Build Coastguard Worker     tube_deserialized.send(&"hi".to_string()).unwrap();
147*bb4ee6a4SAndroid Build Coastguard Worker 
148*bb4ee6a4SAndroid Build Coastguard Worker     // Wait for the message to arrive
149*bb4ee6a4SAndroid Build Coastguard Worker     let wait_ctx: WaitContext<Token> =
150*bb4ee6a4SAndroid Build Coastguard Worker         WaitContext::build_with(&[(tube_recv.get_read_notifier(), Token::ReceivedData)]).unwrap();
151*bb4ee6a4SAndroid Build Coastguard Worker     let events = wait_ctx.wait_timeout(Duration::from_secs(10)).unwrap();
152*bb4ee6a4SAndroid Build Coastguard Worker     let tokens: Vec<Token> = events
153*bb4ee6a4SAndroid Build Coastguard Worker         .iter()
154*bb4ee6a4SAndroid Build Coastguard Worker         .filter(|e| e.is_readable)
155*bb4ee6a4SAndroid Build Coastguard Worker         .map(|e| e.token)
156*bb4ee6a4SAndroid Build Coastguard Worker         .collect();
157*bb4ee6a4SAndroid Build Coastguard Worker     assert_eq!(tokens, vec! {Token::ReceivedData});
158*bb4ee6a4SAndroid Build Coastguard Worker 
159*bb4ee6a4SAndroid Build Coastguard Worker     assert_eq!(tube_recv.recv::<String>().unwrap(), "hi");
160*bb4ee6a4SAndroid Build Coastguard Worker }
161*bb4ee6a4SAndroid Build Coastguard Worker 
162*bb4ee6a4SAndroid Build Coastguard Worker #[test]
send_recv_mpsc()163*bb4ee6a4SAndroid Build Coastguard Worker fn send_recv_mpsc() {
164*bb4ee6a4SAndroid Build Coastguard Worker     let (p1, consumer) = Tube::directional_pair().unwrap();
165*bb4ee6a4SAndroid Build Coastguard Worker     let p2 = p1.try_clone().unwrap();
166*bb4ee6a4SAndroid Build Coastguard Worker     let start_block_p1 = Arc::new(Barrier::new(3));
167*bb4ee6a4SAndroid Build Coastguard Worker     let start_block_p2 = start_block_p1.clone();
168*bb4ee6a4SAndroid Build Coastguard Worker     let start_block_consumer = start_block_p1.clone();
169*bb4ee6a4SAndroid Build Coastguard Worker 
170*bb4ee6a4SAndroid Build Coastguard Worker     let p1_thread = thread::spawn(move || produce_messages(p1, PRODUCER_ID_1, start_block_p1));
171*bb4ee6a4SAndroid Build Coastguard Worker     let p2_thread = thread::spawn(move || produce_messages(p2, PRODUCER_ID_2, start_block_p2));
172*bb4ee6a4SAndroid Build Coastguard Worker 
173*bb4ee6a4SAndroid Build Coastguard Worker     let (_tube, id1_count, id2_count) = consume_messages(consumer, 200, start_block_consumer);
174*bb4ee6a4SAndroid Build Coastguard Worker     assert_eq!(id1_count, 100);
175*bb4ee6a4SAndroid Build Coastguard Worker     assert_eq!(id2_count, 100);
176*bb4ee6a4SAndroid Build Coastguard Worker 
177*bb4ee6a4SAndroid Build Coastguard Worker     p1_thread.join().unwrap();
178*bb4ee6a4SAndroid Build Coastguard Worker     p2_thread.join().unwrap();
179*bb4ee6a4SAndroid Build Coastguard Worker }
180*bb4ee6a4SAndroid Build Coastguard Worker 
181*bb4ee6a4SAndroid Build Coastguard Worker #[test]
send_recv_hash_map()182*bb4ee6a4SAndroid Build Coastguard Worker fn send_recv_hash_map() {
183*bb4ee6a4SAndroid Build Coastguard Worker     let (s1, s2) = Tube::pair().unwrap();
184*bb4ee6a4SAndroid Build Coastguard Worker 
185*bb4ee6a4SAndroid Build Coastguard Worker     let mut test_msg = HashMap::new();
186*bb4ee6a4SAndroid Build Coastguard Worker     test_msg.insert("Red".to_owned(), Event::new().unwrap());
187*bb4ee6a4SAndroid Build Coastguard Worker     test_msg.insert("White".to_owned(), Event::new().unwrap());
188*bb4ee6a4SAndroid Build Coastguard Worker     test_msg.insert("Blue".to_owned(), Event::new().unwrap());
189*bb4ee6a4SAndroid Build Coastguard Worker     test_msg.insert("Orange".to_owned(), Event::new().unwrap());
190*bb4ee6a4SAndroid Build Coastguard Worker     test_msg.insert("Green".to_owned(), Event::new().unwrap());
191*bb4ee6a4SAndroid Build Coastguard Worker     s1.send(&test_msg).unwrap();
192*bb4ee6a4SAndroid Build Coastguard Worker     let mut recv_msg: HashMap<String, Event> = s2.recv().unwrap();
193*bb4ee6a4SAndroid Build Coastguard Worker 
194*bb4ee6a4SAndroid Build Coastguard Worker     let mut test_msg_keys: Vec<_> = test_msg.keys().collect();
195*bb4ee6a4SAndroid Build Coastguard Worker     test_msg_keys.sort();
196*bb4ee6a4SAndroid Build Coastguard Worker     let mut recv_msg_keys: Vec<_> = recv_msg.keys().collect();
197*bb4ee6a4SAndroid Build Coastguard Worker     recv_msg_keys.sort();
198*bb4ee6a4SAndroid Build Coastguard Worker     assert_eq!(test_msg_keys, recv_msg_keys);
199*bb4ee6a4SAndroid Build Coastguard Worker 
200*bb4ee6a4SAndroid Build Coastguard Worker     for (key, test_event) in test_msg {
201*bb4ee6a4SAndroid Build Coastguard Worker         let recv_event = recv_msg.remove(&key).unwrap();
202*bb4ee6a4SAndroid Build Coastguard Worker         test_event_pair(test_event, recv_event);
203*bb4ee6a4SAndroid Build Coastguard Worker     }
204*bb4ee6a4SAndroid Build Coastguard Worker }
205