1 // Copyright 2023 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 /// request packets flow into netsim
16 /// response packets flow out of netsim
17 /// packet transports read requests and write response packets over gRPC or Fds.
18 use super::h4;
19 use super::h4::PacketError;
20 use super::uci;
21 use crate::devices::chip;
22 use crate::devices::chip::ChipIdentifier;
23 use crate::devices::device::DeviceIdentifier;
24 use crate::devices::devices_handler::{add_chip, remove_chip};
25 use crate::ffi::ffi_transport;
26 use crate::wireless;
27 use crate::wireless::packet::{register_transport, unregister_transport, Response};
28 use bytes::Bytes;
29 use log::{error, info, warn};
30 use netsim_proto::common::ChipKind;
31 use netsim_proto::hci_packet::{hcipacket::PacketType, HCIPacket};
32 use netsim_proto::packet_streamer::PacketRequest;
33 use netsim_proto::startup::{ChipInfo as ChipInfoProto, StartupInfo as StartupInfoProto};
34 use protobuf::{Enum, EnumOrUnknown, Message, MessageField};
35 use std::collections::HashMap;
36 use std::fs::File;
37 use std::io::{ErrorKind, Write};
38 use std::os::fd::FromRawFd;
39 use std::sync::{Arc, OnceLock, RwLock};
40 use std::thread;
41 use std::thread::JoinHandle;
42
43 struct FdTransport {
44 file: File,
45 }
46
47 impl Response for FdTransport {
response(&mut self, packet: Bytes, packet_type: u8)48 fn response(&mut self, packet: Bytes, packet_type: u8) {
49 let mut buffer = Vec::<u8>::new();
50 if packet_type != (PacketType::HCI_PACKET_UNSPECIFIED.value() as u8) {
51 buffer.push(packet_type);
52 }
53 buffer.extend(packet);
54 if let Err(e) = self.file.write_all(&buffer[..]) {
55 error!("netsimd: error writing {}", e);
56 }
57 }
58 }
59
60 /// read from the raw fd and pass to the packet hub.
61 ///
62 /// # Safety
63 ///
64 /// `fd_rx` must be a valid and open file descriptor.
fd_reader( fd_rx: i32, kind: ChipKind, device_id: DeviceIdentifier, chip_id: ChipIdentifier, ) -> JoinHandle<()>65 unsafe fn fd_reader(
66 fd_rx: i32,
67 kind: ChipKind,
68 device_id: DeviceIdentifier,
69 chip_id: ChipIdentifier,
70 ) -> JoinHandle<()> {
71 thread::Builder::new()
72 .name(format!("fd_reader_{}", fd_rx))
73 .spawn(move || {
74 // SAFETY: The caller promises that `fd_rx` is valid and open.
75 let mut rx = unsafe { File::from_raw_fd(fd_rx) };
76
77 info!("Handling fd={} for kind: {:?} chip_id: {:?}", fd_rx, kind, chip_id);
78
79 loop {
80 match kind {
81 ChipKind::UWB => match uci::read_uci_packet(&mut rx) {
82 Err(e) => {
83 error!("End reader connection with fd={}. Failed to reading uci control packet: {:?}", fd_rx, e);
84 break;
85 }
86 Ok(uci::Packet { payload }) => {
87 wireless::handle_request(chip_id, &payload, 0);
88 }
89 },
90 ChipKind::BLUETOOTH => match h4::read_h4_packet(&mut rx) {
91 Ok(h4::Packet { h4_type, payload }) => {
92 wireless::handle_request(chip_id, &payload, h4_type);
93 }
94 Err(PacketError::IoError(e))
95 if e.kind() == ErrorKind::UnexpectedEof =>
96 {
97 info!("End reader connection with fd={}.", fd_rx);
98 break;
99 }
100 Err(e) => {
101 error!("End reader connection with fd={}. Failed to reading hci control packet: {:?}", fd_rx, e);
102 break;
103 }
104 },
105 _ => {
106 error!("unknown control packet chip_kind: {:?}", kind);
107 break;
108 }
109 };
110 }
111
112 // unregister before remove_chip because facade may re-use facade_id
113 // on an intertwining create_chip and the unregister here might remove
114 // the recently added chip creating a disconnected transport.
115 unregister_transport(chip_id);
116
117 if let Err(err) = remove_chip(device_id, chip_id) {
118 warn!("{err}");
119 }
120 })
121 .unwrap()
122 }
123
124 /// start_fd_transport
125 ///
126 /// Create threads to read and write to file descriptors
127 ///
128 /// # Safety
129 ///
130 /// The file descriptors in the JSON must be valid and open.
run_fd_transport(startup_json: &String)131 pub unsafe fn run_fd_transport(startup_json: &String) {
132 info!("Running fd transport with {startup_json}");
133 let startup_info =
134 match protobuf_json_mapping::parse_from_str::<StartupInfoProto>(startup_json.as_str()) {
135 Ok(startup_info) => startup_info,
136 Err(e) => {
137 error!("Error parsing startup info: {:?}", e);
138 return;
139 }
140 };
141 // Vector for getting all fd_in, fd_out, chip_kind, device_id, chip_id information
142 // of adding all chips to frontend resource.
143 let mut fd_vec: Vec<(i32, i32, ChipKind, DeviceIdentifier, ChipIdentifier)> = Vec::new();
144 let chip_count = startup_info.devices.len();
145 for device in startup_info.devices {
146 info!("Processing startup device {}", device.name);
147 for chip in &device.chips {
148 info!("Processing chip {:?}", chip);
149 let chip_kind = chip.kind.enum_value_or_default();
150 // TODO(b/323899010): Avoid having cfg(test) in mainline code
151 #[cfg(not(test))]
152 let wireless_create_param = match chip_kind {
153 ChipKind::BLUETOOTH => {
154 wireless::CreateParam::Bluetooth(wireless::bluetooth::CreateParams {
155 address: chip.address.clone(),
156 bt_properties: Some(chip.bt_properties.clone()),
157 })
158 }
159 ChipKind::WIFI => wireless::CreateParam::Wifi(wireless::wifi::CreateParams {}),
160 ChipKind::UWB => wireless::CreateParam::Uwb(wireless::uwb::CreateParams {
161 address: chip.address.clone(),
162 }),
163 _ => {
164 warn!("The provided chip kind is unsupported: {:?}", chip_kind);
165 return;
166 }
167 };
168 #[cfg(test)]
169 let wireless_create_param =
170 wireless::CreateParam::Mock(wireless::mocked::CreateParams { chip_kind });
171 let chip_create_params = chip::CreateParams {
172 kind: chip_kind,
173 address: chip.address.clone(),
174 name: Some(chip.id.clone()),
175 manufacturer: chip.manufacturer.clone(),
176 product_name: chip.product_name.clone(),
177 };
178 let result = match add_chip(
179 &format!("fd-device-{}", &device.name.clone()),
180 &device.name.clone(),
181 &chip_create_params,
182 &wireless_create_param,
183 device.device_info.clone().unwrap_or_default(),
184 ) {
185 Ok(chip_result) => chip_result,
186 Err(err) => {
187 warn!("{err}");
188 return;
189 }
190 };
191 fd_vec.push((chip.fd_in, chip.fd_out, chip_kind, result.device_id, result.chip_id));
192 }
193 }
194
195 // See https://tokio.rs/tokio/topics/bridging
196 // This code is synchronous hosting asynchronous until main is converted to rust.
197 thread::Builder::new()
198 .name("fd_transport".to_string())
199 .spawn(move || {
200 let mut handles = Vec::with_capacity(chip_count);
201 for (fd_in, fd_out, kind, device_id, chip_id) in fd_vec {
202 // Cf writes to fd_out and reads from fd_in
203 // SAFETY: Our caller promises that the file descriptors in the JSON are valid
204 // and open.
205 let file_in = unsafe { File::from_raw_fd(fd_in) };
206
207 register_transport(chip_id, Box::new(FdTransport { file: file_in }));
208 // TODO: switch to runtime.spawn once FIFOs are available in Tokio
209 // SAFETY: Our caller promises that the file descriptors in the JSON are valid
210 // and open.
211 handles.push(unsafe { fd_reader(fd_out, kind, device_id, chip_id) });
212 }
213 // Wait for all of them to complete.
214 for handle in handles {
215 // The `spawn` method returns a `JoinHandle`. A `JoinHandle` is
216 // a future, so we can wait for it using `block_on`.
217 // runtime.block_on(handle).unwrap();
218 // TODO: use runtime.block_on once FIFOs are available in Tokio
219 handle.join().unwrap();
220 }
221 info!("done with all fd handlers");
222 })
223 .unwrap();
224 }
225
226 /// Read from the raw fd and pass to the grpc server.
227 ///
228 /// # Safety
229 ///
230 /// `fd_rx` must be a valid and open file descriptor.
connector_fd_reader(fd_rx: i32, kind: ChipKind, stream_id: u32) -> JoinHandle<()>231 unsafe fn connector_fd_reader(fd_rx: i32, kind: ChipKind, stream_id: u32) -> JoinHandle<()> {
232 info!("Connecting fd reader for stream_id: {}, fd_rx: {}", stream_id, fd_rx);
233 thread::Builder::new()
234 .name(format!("fd_connector_{}_{}", stream_id, fd_rx))
235 .spawn(move || {
236 // SAFETY: The caller promises that `fd_rx` is valid and open.
237 let mut rx = unsafe { File::from_raw_fd(fd_rx) };
238 info!("Handling fd={} for kind: {:?} stream_id: {:?}", fd_rx, kind, stream_id);
239
240 loop {
241 match kind {
242 ChipKind::UWB => match uci::read_uci_packet(&mut rx) {
243 Err(e) => {
244 error!(
245 "End reader connection with fd={}. Failed to read \
246 uci control packet: {:?}",
247 fd_rx, e
248 );
249 break;
250 }
251 Ok(uci::Packet { payload }) => {
252 let mut request = PacketRequest::new();
253 request.set_packet(payload.to_vec());
254 let proto_bytes = request.write_to_bytes().unwrap();
255 ffi_transport::write_packet_request(stream_id, &proto_bytes);
256 }
257 },
258 ChipKind::BLUETOOTH => match h4::read_h4_packet(&mut rx) {
259 Ok(h4::Packet { h4_type, payload }) => {
260 let mut request = PacketRequest::new();
261 let hci_packet = HCIPacket {
262 packet_type: EnumOrUnknown::from_i32(h4_type as i32),
263 packet: payload.to_vec(),
264 ..Default::default()
265 };
266 request.set_hci_packet(hci_packet);
267 let proto_bytes = request.write_to_bytes().unwrap();
268 ffi_transport::write_packet_request(stream_id, &proto_bytes);
269 }
270 Err(PacketError::IoError(e)) if e.kind() == ErrorKind::UnexpectedEof => {
271 info!("End reader connection with fd={}.", fd_rx);
272 break;
273 }
274 Err(e) => {
275 error!(
276 "End reader connection with fd={}. Failed to read \
277 hci control packet: {:?}",
278 fd_rx, e
279 );
280 break;
281 }
282 },
283 _ => {
284 error!("unknown control packet chip_kind: {:?}", kind);
285 break;
286 }
287 };
288 }
289 })
290 .unwrap()
291 }
292
293 // For connector.
294 static CONNECTOR_FILES: OnceLock<Arc<RwLock<HashMap<u32, File>>>> = OnceLock::new();
295
get_connector_files() -> Arc<RwLock<HashMap<u32, File>>>296 fn get_connector_files() -> Arc<RwLock<HashMap<u32, File>>> {
297 CONNECTOR_FILES.get_or_init(|| Arc::new(RwLock::new(HashMap::new()))).clone()
298 }
299
300 /// This function is called when a packet is received from the gRPC server.
connector_grpc_read_callback(stream_id: u32, proto_bytes: &[u8])301 fn connector_grpc_read_callback(stream_id: u32, proto_bytes: &[u8]) {
302 let request = PacketRequest::parse_from_bytes(proto_bytes).unwrap();
303
304 let mut buffer = Vec::<u8>::new();
305 if request.has_hci_packet() {
306 buffer.push(request.hci_packet().packet_type.enum_value_or_default().value() as u8);
307 buffer.extend(&request.hci_packet().packet);
308 } else if request.has_packet() {
309 buffer.extend(request.packet());
310 }
311
312 if let Some(mut file_in) = get_connector_files().read().unwrap().get(&stream_id) {
313 if let Err(e) = file_in.write_all(&buffer[..]) {
314 error!("Failed to write: {}", e);
315 }
316 } else {
317 warn!("Unable to find file with stream_id {}", stream_id);
318 }
319 }
320
321 /// Read from grpc server and write back to file descriptor.
connector_grpc_reader(chip_kind: ChipKind, stream_id: u32, file_in: File) -> JoinHandle<()>322 fn connector_grpc_reader(chip_kind: ChipKind, stream_id: u32, file_in: File) -> JoinHandle<()> {
323 info!("Connecting grpc reader for stream_id: {}", stream_id);
324 thread::Builder::new()
325 .name(format!("grpc_reader_{}", stream_id))
326 .spawn(move || {
327 {
328 let connector_files = get_connector_files();
329 let mut binding = connector_files.write().unwrap();
330 if binding.contains_key(&stream_id) {
331 error!(
332 "register_connector: key already present for \
333 stream_id: {stream_id}"
334 );
335 }
336 binding.insert(stream_id, file_in);
337 }
338 if (chip_kind != ChipKind::BLUETOOTH) && (chip_kind != ChipKind::UWB) {
339 warn!("Unable to register connector for chip type {:?}", chip_kind);
340 }
341 // Read packet from grpc and send to file_in.
342 ffi_transport::read_packet_response_loop(stream_id, connector_grpc_read_callback);
343
344 get_connector_files().write().unwrap().remove(&stream_id);
345 })
346 .unwrap()
347 }
348
349 /// Create threads to forward file descriptors to another netsim daemon.
run_fd_connector(startup_json: &String, server: &str) -> Result<(), String>350 pub fn run_fd_connector(startup_json: &String, server: &str) -> Result<(), String> {
351 info!("Running fd connector with {startup_json}");
352 let startup_info =
353 match protobuf_json_mapping::parse_from_str::<StartupInfoProto>(startup_json.as_str()) {
354 Ok(startup_info) => startup_info,
355 Err(e) => {
356 return Err(format!("Error parsing startup info: {:?}", e.to_string()));
357 }
358 };
359 let server = server.to_owned();
360
361 let chip_count = startup_info.devices.len();
362 let mut handles = Vec::with_capacity(chip_count);
363
364 for device in startup_info.devices {
365 for chip in device.chips {
366 let chip_kind = chip.kind.enum_value_or_default();
367 // Cf writes to fd_out and reads from fd_in
368 // SAFETY: Our caller promises that the file descriptors in the JSON are valid
369 // and open.
370 let file_in = unsafe { File::from_raw_fd(chip.fd_in) };
371
372 let stream_id = ffi_transport::stream_packets(&server);
373 // Send out initial info of PacketRequest to grpc server.
374 let mut initial_request = PacketRequest::new();
375 initial_request.set_initial_info(ChipInfoProto {
376 name: device.name.clone(),
377 chip: MessageField::some(chip.clone()),
378 device_info: device.device_info.clone(),
379 ..Default::default()
380 });
381 ffi_transport::write_packet_request(
382 stream_id,
383 &initial_request.write_to_bytes().unwrap(),
384 );
385 info!("Sent initial request to grpc for stream_id: {}", stream_id);
386
387 handles.push(connector_grpc_reader(chip_kind, stream_id, file_in));
388
389 // TODO: switch to runtime.spawn once FIFOs are available in Tokio
390 // SAFETY: Our caller promises that the file descriptors in the JSON are valid
391 // and open.
392 handles.push(unsafe { connector_fd_reader(chip.fd_out, chip_kind, stream_id) });
393 }
394 }
395 // Wait for all of them to complete.
396 for handle in handles {
397 // The `spawn` method returns a `JoinHandle`. A `JoinHandle` is
398 // a future, so we can wait for it using `block_on`.
399 // runtime.block_on(handle).unwrap();
400 // TODO: use runtime.block_on once FIFOs are available in Tokio
401 handle.join().unwrap();
402 }
403 Ok(())
404 }
405