xref: /aosp_15_r20/tools/netsim/rust/daemon/src/wireless/packet.rs (revision cf78ab8cffb8fc9207af348f23af247fb04370a6)
1 // Copyright 2023 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 use std::collections::HashMap;
16 use std::sync::mpsc::{channel, Sender};
17 use std::sync::{OnceLock, RwLock};
18 use std::thread;
19 
20 use crate::captures;
21 use crate::devices::{chip, chip::ChipIdentifier};
22 
23 use bytes::Bytes;
24 use log::{error, info, warn};
25 use netsim_proto::hci_packet::hcipacket::PacketType;
26 use protobuf::Enum;
27 
28 /// The Packet module routes packets from a chip controller instance to
29 /// different transport managers. Currently transport managers include
30 ///
31 /// - GRPC is a PacketStreamer
32 /// - FD is a file descriptor to a pair of Unix Fifos used by "-s" startup
33 /// - SOCKET is a TCP stream
34 
35 // When a connection arrives, the transport registers a responder
36 // implementing Response trait for the packet stream.
37 pub trait Response {
response(&mut self, packet: Bytes, packet_type: u8)38     fn response(&mut self, packet: Bytes, packet_type: u8);
39 }
40 
41 // When a responder is registered a responder thread is created to
42 // decouple the chip controller from the network. The thread reads
43 // ResponsePacket from a queue and sends to responder.
44 struct ResponsePacket {
45     packet: Bytes,
46     packet_type: u8,
47 }
48 
49 // A hash map from chip_id to response channel.
50 
51 struct PacketManager {
52     transports: RwLock<HashMap<ChipIdentifier, Sender<ResponsePacket>>>,
53 }
54 
55 static MANAGER: OnceLock<PacketManager> = OnceLock::new();
56 
get_manager() -> &'static PacketManager57 fn get_manager() -> &'static PacketManager {
58     MANAGER.get_or_init(PacketManager::new)
59 }
60 
61 /// Register a chip controller instance to a transport manager.
register_transport(chip_id: ChipIdentifier, responder: Box<dyn Response + Send>)62 pub fn register_transport(chip_id: ChipIdentifier, responder: Box<dyn Response + Send>) {
63     get_manager().register_transport(chip_id, responder);
64 }
65 
66 /// Unregister a chip controller instance.
unregister_transport(chip_id: ChipIdentifier)67 pub fn unregister_transport(chip_id: ChipIdentifier) {
68     get_manager().unregister_transport(chip_id);
69 }
70 
71 impl PacketManager {
new() -> Self72     fn new() -> Self {
73         PacketManager { transports: RwLock::new(HashMap::new()) }
74     }
75     /// Register a transport stream for handle_response calls.
register_transport( &self, chip_id: ChipIdentifier, mut transport: Box<dyn Response + Send>, )76     pub fn register_transport(
77         &self,
78         chip_id: ChipIdentifier,
79         mut transport: Box<dyn Response + Send>,
80     ) {
81         let (tx, rx) = channel::<ResponsePacket>();
82         if self.transports.write().unwrap().insert(chip_id, tx).is_some() {
83             error!("register_transport: key already present for chip_id: {chip_id}");
84         }
85         let _ = thread::Builder::new().name(format!("transport_responder_{chip_id}")).spawn(
86             move || {
87                 info!("register_transport: started thread chip_id: {chip_id}");
88                 while let Ok(ResponsePacket { packet, packet_type }) = rx.recv() {
89                     transport.response(packet, packet_type);
90                 }
91                 info!("register_transport: finished thread chip_id: {chip_id}");
92             },
93         );
94     }
95 
96     /// Unregister a chip controller instance.
unregister_transport(&self, chip_id: ChipIdentifier)97     pub fn unregister_transport(&self, chip_id: ChipIdentifier) {
98         // Shuts down the responder thread, because sender is dropped.
99         self.transports.write().unwrap().remove(&chip_id);
100     }
101 }
102 
103 /// Handle requests from gRPC transport in C++.
handle_response_cxx(chip_id: u32, packet: &cxx::CxxVector<u8>, packet_type: u8)104 pub fn handle_response_cxx(chip_id: u32, packet: &cxx::CxxVector<u8>, packet_type: u8) {
105     // TODO(b/314840701):
106     // 1. Per EChip Struct should contain private field of channel & facade_id
107     // 2. Lookup from ECHIPS with given chip_id
108     // 3. Call adaptor.handle_response
109     let packet = Bytes::from(packet.as_slice().to_vec());
110     let chip_id = ChipIdentifier(chip_id);
111     captures::controller_to_host(chip_id, &packet, packet_type.into());
112 
113     let result = if let Some(transport) = get_manager().transports.read().unwrap().get(&chip_id) {
114         transport.send(ResponsePacket { packet, packet_type })
115     } else {
116         warn!("handle_response: chip {chip_id} not found");
117         Ok(())
118     };
119     // transports lock is now released
120     if let Err(e) = result {
121         warn!("handle_response: error {:?}", e);
122         unregister_transport(chip_id);
123     }
124 }
125 
126 // Handle response from rust libraries
handle_response(chip_id: ChipIdentifier, packet: &Bytes)127 pub fn handle_response(chip_id: ChipIdentifier, packet: &Bytes) {
128     let packet_type = PacketType::HCI_PACKET_UNSPECIFIED.value() as u8;
129     captures::controller_to_host(chip_id, packet, packet_type.into());
130 
131     let result = if let Some(transport) = get_manager().transports.read().unwrap().get(&chip_id) {
132         transport.send(ResponsePacket { packet: packet.clone(), packet_type })
133     } else {
134         warn!("handle_response: chip {chip_id} not found");
135         Ok(())
136     };
137     // transports lock is now released
138     if let Err(e) = result {
139         warn!("handle_response: error {:?}", e);
140         unregister_transport(chip_id);
141     }
142 }
143 
144 /// Handle requests from transports.
handle_request(chip_id: ChipIdentifier, packet: &Bytes, packet_type: u8)145 pub fn handle_request(chip_id: ChipIdentifier, packet: &Bytes, packet_type: u8) {
146     captures::host_to_controller(chip_id, packet, packet_type.into());
147 
148     let mut packet_vec = packet.to_vec();
149     // Prepend packet_type to packet if specified
150     if PacketType::HCI_PACKET_UNSPECIFIED.value()
151         != <u8 as std::convert::Into<i32>>::into(packet_type)
152     {
153         packet_vec.insert(0, packet_type);
154     }
155 
156     // Perform handle_request
157     match chip::get_chip(&chip_id) {
158         Some(c) => c.wireless_adaptor.handle_request(&Bytes::from(packet_vec)),
159         None => warn!("SharedWirelessAdaptor doesn't exist for chip_id: {chip_id}"),
160     }
161 }
162 
163 /// Handle requests from gRPC transport in C++.
handle_request_cxx(chip_id: u32, packet: &cxx::CxxVector<u8>, packet_type: u8)164 pub fn handle_request_cxx(chip_id: u32, packet: &cxx::CxxVector<u8>, packet_type: u8) {
165     let packet_bytes = Bytes::from(packet.as_slice().to_vec());
166     handle_request(ChipIdentifier(chip_id), &packet_bytes, packet_type);
167 }
168 
169 #[cfg(test)]
170 mod tests {
171     use super::*;
172 
173     struct TestTransport {}
174     impl Response for TestTransport {
response(&mut self, _packet: Bytes, _packet_type: u8)175         fn response(&mut self, _packet: Bytes, _packet_type: u8) {}
176     }
177 
178     #[test]
test_register_transport()179     fn test_register_transport() {
180         let val: Box<dyn Response + Send> = Box::new(TestTransport {});
181         let manager = PacketManager::new();
182         let chip_id = ChipIdentifier(0);
183         manager.register_transport(chip_id, val);
184         {
185             assert!(manager.transports.read().unwrap().contains_key(&chip_id));
186         }
187     }
188 
189     #[test]
test_unregister_transport()190     fn test_unregister_transport() {
191         let manager = PacketManager::new();
192         let chip_id = ChipIdentifier(1);
193         manager.register_transport(chip_id, Box::new(TestTransport {}));
194         manager.unregister_transport(chip_id);
195         assert!(manager.transports.read().unwrap().get(&chip_id).is_none());
196     }
197 }
198