xref: /aosp_15_r20/external/crosvm/base/tests/linux/tube.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::time;
6 
7 use base::deserialize_with_descriptors;
8 use base::BlockingMode;
9 use base::EventContext;
10 use base::EventToken;
11 use base::FramingMode;
12 use base::FromRawDescriptor;
13 use base::ReadNotifier;
14 use base::SafeDescriptor;
15 use base::SerializeDescriptors;
16 use base::StreamChannel;
17 use base::Tube;
18 use base::UnixSeqpacket;
19 
20 #[derive(EventToken, Debug, Eq, PartialEq, Copy, Clone)]
21 enum Token {
22     ReceivedData,
23 }
24 
25 const EVENT_WAIT_TIME: time::Duration = time::Duration::from_secs(10);
26 
27 #[test]
test_serialize_tube_new()28 fn test_serialize_tube_new() {
29     let (sock_send, sock_recv) =
30         StreamChannel::pair(BlockingMode::Nonblocking, FramingMode::Message).unwrap();
31     let tube_send = Tube::new(sock_send).unwrap();
32     let tube_recv = Tube::new(sock_recv).unwrap();
33 
34     // Serialize the Tube
35     let msg_serialize = SerializeDescriptors::new(&tube_send);
36     let serialized = serde_json::to_vec(&msg_serialize).unwrap();
37     let msg_descriptors = msg_serialize.into_descriptors();
38 
39     // Deserialize the Tube
40     let msg_descriptors_safe = msg_descriptors.into_iter().map(|v|
41             // SAFETY: Safe because `v` is a valid descriptor
42             unsafe { SafeDescriptor::from_raw_descriptor(v) });
43     let tube_deserialized: Tube =
44         deserialize_with_descriptors(|| serde_json::from_slice(&serialized), msg_descriptors_safe)
45             .unwrap();
46 
47     // Send a message through deserialized Tube
48     tube_deserialized.send(&"hi".to_string()).unwrap();
49 
50     // Wait for the message to arrive
51     let event_ctx: EventContext<Token> =
52         EventContext::build_with(&[(tube_recv.get_read_notifier(), Token::ReceivedData)]).unwrap();
53     let events = event_ctx.wait_timeout(EVENT_WAIT_TIME).unwrap();
54     let tokens: Vec<Token> = events
55         .iter()
56         .filter(|e| e.is_readable)
57         .map(|e| e.token)
58         .collect();
59     assert_eq!(tokens, vec! {Token::ReceivedData});
60 
61     assert_eq!(tube_recv.recv::<String>().unwrap(), "hi");
62 }
63 
64 #[test]
test_send_recv_new_from_seqpacket()65 fn test_send_recv_new_from_seqpacket() {
66     let (sock_send, sock_recv) = UnixSeqpacket::pair().unwrap();
67     let tube_send = Tube::new_from_unix_seqpacket(sock_send).unwrap();
68     let tube_recv = Tube::new_from_unix_seqpacket(sock_recv).unwrap();
69 
70     tube_send.send(&"hi".to_string()).unwrap();
71 
72     // Wait for the message to arrive
73     let event_ctx: EventContext<Token> =
74         EventContext::build_with(&[(tube_recv.get_read_notifier(), Token::ReceivedData)]).unwrap();
75     let events = event_ctx.wait_timeout(EVENT_WAIT_TIME).unwrap();
76     let tokens: Vec<Token> = events
77         .iter()
78         .filter(|e| e.is_readable)
79         .map(|e| e.token)
80         .collect();
81     assert_eq!(tokens, vec! {Token::ReceivedData});
82 
83     assert_eq!(tube_recv.recv::<String>().unwrap(), "hi");
84 }
85 
86 #[test]
test_tube_new_byte_mode_error()87 fn test_tube_new_byte_mode_error() {
88     let (sock_byte_mode, _) =
89         StreamChannel::pair(BlockingMode::Nonblocking, FramingMode::Byte).unwrap();
90     let tube_error = Tube::new(sock_byte_mode);
91 
92     assert!(tube_error.is_err());
93 }
94