1 // Copyright 2023 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 use std::collections::HashMap;
16 use std::sync::mpsc::{channel, Sender};
17 use std::sync::{OnceLock, RwLock};
18 use std::thread;
19
20 use crate::captures;
21 use crate::devices::{chip, chip::ChipIdentifier};
22
23 use bytes::Bytes;
24 use log::{error, info, warn};
25 use netsim_proto::hci_packet::hcipacket::PacketType;
26 use protobuf::Enum;
27
28 /// The Packet module routes packets from a chip controller instance to
29 /// different transport managers. Currently transport managers include
30 ///
31 /// - GRPC is a PacketStreamer
32 /// - FD is a file descriptor to a pair of Unix Fifos used by "-s" startup
33 /// - SOCKET is a TCP stream
34
35 // When a connection arrives, the transport registers a responder
36 // implementing Response trait for the packet stream.
37 pub trait Response {
response(&mut self, packet: Bytes, packet_type: u8)38 fn response(&mut self, packet: Bytes, packet_type: u8);
39 }
40
41 // When a responder is registered a responder thread is created to
42 // decouple the chip controller from the network. The thread reads
43 // ResponsePacket from a queue and sends to responder.
44 struct ResponsePacket {
45 packet: Bytes,
46 packet_type: u8,
47 }
48
49 // A hash map from chip_id to response channel.
50
51 struct PacketManager {
52 transports: RwLock<HashMap<ChipIdentifier, Sender<ResponsePacket>>>,
53 }
54
55 static MANAGER: OnceLock<PacketManager> = OnceLock::new();
56
get_manager() -> &'static PacketManager57 fn get_manager() -> &'static PacketManager {
58 MANAGER.get_or_init(PacketManager::new)
59 }
60
61 /// Register a chip controller instance to a transport manager.
register_transport(chip_id: ChipIdentifier, responder: Box<dyn Response + Send>)62 pub fn register_transport(chip_id: ChipIdentifier, responder: Box<dyn Response + Send>) {
63 get_manager().register_transport(chip_id, responder);
64 }
65
66 /// Unregister a chip controller instance.
unregister_transport(chip_id: ChipIdentifier)67 pub fn unregister_transport(chip_id: ChipIdentifier) {
68 get_manager().unregister_transport(chip_id);
69 }
70
71 impl PacketManager {
new() -> Self72 fn new() -> Self {
73 PacketManager { transports: RwLock::new(HashMap::new()) }
74 }
75 /// Register a transport stream for handle_response calls.
register_transport( &self, chip_id: ChipIdentifier, mut transport: Box<dyn Response + Send>, )76 pub fn register_transport(
77 &self,
78 chip_id: ChipIdentifier,
79 mut transport: Box<dyn Response + Send>,
80 ) {
81 let (tx, rx) = channel::<ResponsePacket>();
82 if self.transports.write().unwrap().insert(chip_id, tx).is_some() {
83 error!("register_transport: key already present for chip_id: {chip_id}");
84 }
85 let _ = thread::Builder::new().name(format!("transport_responder_{chip_id}")).spawn(
86 move || {
87 info!("register_transport: started thread chip_id: {chip_id}");
88 while let Ok(ResponsePacket { packet, packet_type }) = rx.recv() {
89 transport.response(packet, packet_type);
90 }
91 info!("register_transport: finished thread chip_id: {chip_id}");
92 },
93 );
94 }
95
96 /// Unregister a chip controller instance.
unregister_transport(&self, chip_id: ChipIdentifier)97 pub fn unregister_transport(&self, chip_id: ChipIdentifier) {
98 // Shuts down the responder thread, because sender is dropped.
99 self.transports.write().unwrap().remove(&chip_id);
100 }
101 }
102
103 /// Handle requests from gRPC transport in C++.
handle_response_cxx(chip_id: u32, packet: &cxx::CxxVector<u8>, packet_type: u8)104 pub fn handle_response_cxx(chip_id: u32, packet: &cxx::CxxVector<u8>, packet_type: u8) {
105 // TODO(b/314840701):
106 // 1. Per EChip Struct should contain private field of channel & facade_id
107 // 2. Lookup from ECHIPS with given chip_id
108 // 3. Call adaptor.handle_response
109 let packet = Bytes::from(packet.as_slice().to_vec());
110 let chip_id = ChipIdentifier(chip_id);
111 captures::controller_to_host(chip_id, &packet, packet_type.into());
112
113 let result = if let Some(transport) = get_manager().transports.read().unwrap().get(&chip_id) {
114 transport.send(ResponsePacket { packet, packet_type })
115 } else {
116 warn!("handle_response: chip {chip_id} not found");
117 Ok(())
118 };
119 // transports lock is now released
120 if let Err(e) = result {
121 warn!("handle_response: error {:?}", e);
122 unregister_transport(chip_id);
123 }
124 }
125
126 // Handle response from rust libraries
handle_response(chip_id: ChipIdentifier, packet: &Bytes)127 pub fn handle_response(chip_id: ChipIdentifier, packet: &Bytes) {
128 let packet_type = PacketType::HCI_PACKET_UNSPECIFIED.value() as u8;
129 captures::controller_to_host(chip_id, packet, packet_type.into());
130
131 let result = if let Some(transport) = get_manager().transports.read().unwrap().get(&chip_id) {
132 transport.send(ResponsePacket { packet: packet.clone(), packet_type })
133 } else {
134 warn!("handle_response: chip {chip_id} not found");
135 Ok(())
136 };
137 // transports lock is now released
138 if let Err(e) = result {
139 warn!("handle_response: error {:?}", e);
140 unregister_transport(chip_id);
141 }
142 }
143
144 /// Handle requests from transports.
handle_request(chip_id: ChipIdentifier, packet: &Bytes, packet_type: u8)145 pub fn handle_request(chip_id: ChipIdentifier, packet: &Bytes, packet_type: u8) {
146 captures::host_to_controller(chip_id, packet, packet_type.into());
147
148 let mut packet_vec = packet.to_vec();
149 // Prepend packet_type to packet if specified
150 if PacketType::HCI_PACKET_UNSPECIFIED.value()
151 != <u8 as std::convert::Into<i32>>::into(packet_type)
152 {
153 packet_vec.insert(0, packet_type);
154 }
155
156 // Perform handle_request
157 match chip::get_chip(&chip_id) {
158 Some(c) => c.wireless_adaptor.handle_request(&Bytes::from(packet_vec)),
159 None => warn!("SharedWirelessAdaptor doesn't exist for chip_id: {chip_id}"),
160 }
161 }
162
163 /// Handle requests from gRPC transport in C++.
handle_request_cxx(chip_id: u32, packet: &cxx::CxxVector<u8>, packet_type: u8)164 pub fn handle_request_cxx(chip_id: u32, packet: &cxx::CxxVector<u8>, packet_type: u8) {
165 let packet_bytes = Bytes::from(packet.as_slice().to_vec());
166 handle_request(ChipIdentifier(chip_id), &packet_bytes, packet_type);
167 }
168
169 #[cfg(test)]
170 mod tests {
171 use super::*;
172
173 struct TestTransport {}
174 impl Response for TestTransport {
response(&mut self, _packet: Bytes, _packet_type: u8)175 fn response(&mut self, _packet: Bytes, _packet_type: u8) {}
176 }
177
178 #[test]
test_register_transport()179 fn test_register_transport() {
180 let val: Box<dyn Response + Send> = Box::new(TestTransport {});
181 let manager = PacketManager::new();
182 let chip_id = ChipIdentifier(0);
183 manager.register_transport(chip_id, val);
184 {
185 assert!(manager.transports.read().unwrap().contains_key(&chip_id));
186 }
187 }
188
189 #[test]
test_unregister_transport()190 fn test_unregister_transport() {
191 let manager = PacketManager::new();
192 let chip_id = ChipIdentifier(1);
193 manager.register_transport(chip_id, Box::new(TestTransport {}));
194 manager.unregister_transport(chip_id);
195 assert!(manager.transports.read().unwrap().get(&chip_id).is_none());
196 }
197 }
198