xref: /aosp_15_r20/tools/netsim/rust/daemon/src/transport/fd.rs (revision cf78ab8cffb8fc9207af348f23af247fb04370a6)
1 // Copyright 2023 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 /// request packets flow into netsim
16 /// response packets flow out of netsim
17 /// packet transports read requests and write response packets over gRPC or Fds.
18 use super::h4;
19 use super::h4::PacketError;
20 use super::uci;
21 use crate::devices::chip;
22 use crate::devices::chip::ChipIdentifier;
23 use crate::devices::device::DeviceIdentifier;
24 use crate::devices::devices_handler::{add_chip, remove_chip};
25 use crate::ffi::ffi_transport;
26 use crate::wireless;
27 use crate::wireless::packet::{register_transport, unregister_transport, Response};
28 use bytes::Bytes;
29 use log::{error, info, warn};
30 use netsim_proto::common::ChipKind;
31 use netsim_proto::hci_packet::{hcipacket::PacketType, HCIPacket};
32 use netsim_proto::packet_streamer::PacketRequest;
33 use netsim_proto::startup::{ChipInfo as ChipInfoProto, StartupInfo as StartupInfoProto};
34 use protobuf::{Enum, EnumOrUnknown, Message, MessageField};
35 use std::collections::HashMap;
36 use std::fs::File;
37 use std::io::{ErrorKind, Write};
38 use std::os::fd::FromRawFd;
39 use std::sync::{Arc, OnceLock, RwLock};
40 use std::thread;
41 use std::thread::JoinHandle;
42 
43 struct FdTransport {
44     file: File,
45 }
46 
47 impl Response for FdTransport {
response(&mut self, packet: Bytes, packet_type: u8)48     fn response(&mut self, packet: Bytes, packet_type: u8) {
49         let mut buffer = Vec::<u8>::new();
50         if packet_type != (PacketType::HCI_PACKET_UNSPECIFIED.value() as u8) {
51             buffer.push(packet_type);
52         }
53         buffer.extend(packet);
54         if let Err(e) = self.file.write_all(&buffer[..]) {
55             error!("netsimd: error writing {}", e);
56         }
57     }
58 }
59 
60 /// read from the raw fd and pass to the packet hub.
61 ///
62 /// # Safety
63 ///
64 /// `fd_rx` must be a valid and open file descriptor.
fd_reader( fd_rx: i32, kind: ChipKind, device_id: DeviceIdentifier, chip_id: ChipIdentifier, ) -> JoinHandle<()>65 unsafe fn fd_reader(
66     fd_rx: i32,
67     kind: ChipKind,
68     device_id: DeviceIdentifier,
69     chip_id: ChipIdentifier,
70 ) -> JoinHandle<()> {
71     thread::Builder::new()
72         .name(format!("fd_reader_{}", fd_rx))
73         .spawn(move || {
74             // SAFETY: The caller promises that `fd_rx` is valid and open.
75             let mut rx = unsafe { File::from_raw_fd(fd_rx) };
76 
77             info!("Handling fd={} for kind: {:?} chip_id: {:?}", fd_rx, kind, chip_id);
78 
79             loop {
80                 match kind {
81                     ChipKind::UWB => match uci::read_uci_packet(&mut rx) {
82                         Err(e) => {
83                             error!("End reader connection with fd={}. Failed to reading uci control packet: {:?}", fd_rx, e);
84                             break;
85                         }
86                         Ok(uci::Packet { payload }) => {
87                             wireless::handle_request(chip_id, &payload, 0);
88                         }
89                     },
90                     ChipKind::BLUETOOTH => match h4::read_h4_packet(&mut rx) {
91                         Ok(h4::Packet { h4_type, payload }) => {
92                             wireless::handle_request(chip_id, &payload, h4_type);
93                         }
94                         Err(PacketError::IoError(e))
95                             if e.kind() == ErrorKind::UnexpectedEof =>
96                         {
97                             info!("End reader connection with fd={}.", fd_rx);
98                             break;
99                         }
100                         Err(e) => {
101                             error!("End reader connection with fd={}. Failed to reading hci control packet: {:?}", fd_rx, e);
102                             break;
103                         }
104                     },
105                     _ => {
106                         error!("unknown control packet chip_kind: {:?}", kind);
107                         break;
108                     }
109                 };
110             }
111 
112             // unregister before remove_chip because facade may re-use facade_id
113             // on an intertwining create_chip and the unregister here might remove
114             // the recently added chip creating a disconnected transport.
115             unregister_transport(chip_id);
116 
117             if let Err(err) = remove_chip(device_id, chip_id) {
118                 warn!("{err}");
119             }
120         })
121         .unwrap()
122 }
123 
124 /// start_fd_transport
125 ///
126 /// Create threads to read and write to file descriptors
127 ///
128 /// # Safety
129 ///
130 /// The file descriptors in the JSON must be valid and open.
run_fd_transport(startup_json: &String)131 pub unsafe fn run_fd_transport(startup_json: &String) {
132     info!("Running fd transport with {startup_json}");
133     let startup_info =
134         match protobuf_json_mapping::parse_from_str::<StartupInfoProto>(startup_json.as_str()) {
135             Ok(startup_info) => startup_info,
136             Err(e) => {
137                 error!("Error parsing startup info: {:?}", e);
138                 return;
139             }
140         };
141     // Vector for getting all fd_in, fd_out, chip_kind, device_id, chip_id information
142     // of adding all chips to frontend resource.
143     let mut fd_vec: Vec<(i32, i32, ChipKind, DeviceIdentifier, ChipIdentifier)> = Vec::new();
144     let chip_count = startup_info.devices.len();
145     for device in startup_info.devices {
146         info!("Processing startup device {}", device.name);
147         for chip in &device.chips {
148             info!("Processing chip {:?}", chip);
149             let chip_kind = chip.kind.enum_value_or_default();
150             // TODO(b/323899010): Avoid having cfg(test) in mainline code
151             #[cfg(not(test))]
152             let wireless_create_param = match chip_kind {
153                 ChipKind::BLUETOOTH => {
154                     wireless::CreateParam::Bluetooth(wireless::bluetooth::CreateParams {
155                         address: chip.address.clone(),
156                         bt_properties: Some(chip.bt_properties.clone()),
157                     })
158                 }
159                 ChipKind::WIFI => wireless::CreateParam::Wifi(wireless::wifi::CreateParams {}),
160                 ChipKind::UWB => wireless::CreateParam::Uwb(wireless::uwb::CreateParams {
161                     address: chip.address.clone(),
162                 }),
163                 _ => {
164                     warn!("The provided chip kind is unsupported: {:?}", chip_kind);
165                     return;
166                 }
167             };
168             #[cfg(test)]
169             let wireless_create_param =
170                 wireless::CreateParam::Mock(wireless::mocked::CreateParams { chip_kind });
171             let chip_create_params = chip::CreateParams {
172                 kind: chip_kind,
173                 address: chip.address.clone(),
174                 name: Some(chip.id.clone()),
175                 manufacturer: chip.manufacturer.clone(),
176                 product_name: chip.product_name.clone(),
177             };
178             let result = match add_chip(
179                 &format!("fd-device-{}", &device.name.clone()),
180                 &device.name.clone(),
181                 &chip_create_params,
182                 &wireless_create_param,
183                 device.device_info.clone().unwrap_or_default(),
184             ) {
185                 Ok(chip_result) => chip_result,
186                 Err(err) => {
187                     warn!("{err}");
188                     return;
189                 }
190             };
191             fd_vec.push((chip.fd_in, chip.fd_out, chip_kind, result.device_id, result.chip_id));
192         }
193     }
194 
195     // See https://tokio.rs/tokio/topics/bridging
196     // This code is synchronous hosting asynchronous until main is converted to rust.
197     thread::Builder::new()
198         .name("fd_transport".to_string())
199         .spawn(move || {
200             let mut handles = Vec::with_capacity(chip_count);
201             for (fd_in, fd_out, kind, device_id, chip_id) in fd_vec {
202                 // Cf writes to fd_out and reads from fd_in
203                 // SAFETY: Our caller promises that the file descriptors in the JSON are valid
204                 // and open.
205                 let file_in = unsafe { File::from_raw_fd(fd_in) };
206 
207                 register_transport(chip_id, Box::new(FdTransport { file: file_in }));
208                 // TODO: switch to runtime.spawn once FIFOs are available in Tokio
209                 // SAFETY: Our caller promises that the file descriptors in the JSON are valid
210                 // and open.
211                 handles.push(unsafe { fd_reader(fd_out, kind, device_id, chip_id) });
212             }
213             // Wait for all of them to complete.
214             for handle in handles {
215                 // The `spawn` method returns a `JoinHandle`. A `JoinHandle` is
216                 // a future, so we can wait for it using `block_on`.
217                 // runtime.block_on(handle).unwrap();
218                 // TODO: use runtime.block_on once FIFOs are available in Tokio
219                 handle.join().unwrap();
220             }
221             info!("done with all fd handlers");
222         })
223         .unwrap();
224 }
225 
226 /// Read from the raw fd and pass to the grpc server.
227 ///
228 /// # Safety
229 ///
230 /// `fd_rx` must be a valid and open file descriptor.
connector_fd_reader(fd_rx: i32, kind: ChipKind, stream_id: u32) -> JoinHandle<()>231 unsafe fn connector_fd_reader(fd_rx: i32, kind: ChipKind, stream_id: u32) -> JoinHandle<()> {
232     info!("Connecting fd reader for stream_id: {}, fd_rx: {}", stream_id, fd_rx);
233     thread::Builder::new()
234         .name(format!("fd_connector_{}_{}", stream_id, fd_rx))
235         .spawn(move || {
236             // SAFETY: The caller promises that `fd_rx` is valid and open.
237             let mut rx = unsafe { File::from_raw_fd(fd_rx) };
238             info!("Handling fd={} for kind: {:?} stream_id: {:?}", fd_rx, kind, stream_id);
239 
240             loop {
241                 match kind {
242                     ChipKind::UWB => match uci::read_uci_packet(&mut rx) {
243                         Err(e) => {
244                             error!(
245                                 "End reader connection with fd={}. Failed to read \
246                                      uci control packet: {:?}",
247                                 fd_rx, e
248                             );
249                             break;
250                         }
251                         Ok(uci::Packet { payload }) => {
252                             let mut request = PacketRequest::new();
253                             request.set_packet(payload.to_vec());
254                             let proto_bytes = request.write_to_bytes().unwrap();
255                             ffi_transport::write_packet_request(stream_id, &proto_bytes);
256                         }
257                     },
258                     ChipKind::BLUETOOTH => match h4::read_h4_packet(&mut rx) {
259                         Ok(h4::Packet { h4_type, payload }) => {
260                             let mut request = PacketRequest::new();
261                             let hci_packet = HCIPacket {
262                                 packet_type: EnumOrUnknown::from_i32(h4_type as i32),
263                                 packet: payload.to_vec(),
264                                 ..Default::default()
265                             };
266                             request.set_hci_packet(hci_packet);
267                             let proto_bytes = request.write_to_bytes().unwrap();
268                             ffi_transport::write_packet_request(stream_id, &proto_bytes);
269                         }
270                         Err(PacketError::IoError(e)) if e.kind() == ErrorKind::UnexpectedEof => {
271                             info!("End reader connection with fd={}.", fd_rx);
272                             break;
273                         }
274                         Err(e) => {
275                             error!(
276                                 "End reader connection with fd={}. Failed to read \
277                                      hci control packet: {:?}",
278                                 fd_rx, e
279                             );
280                             break;
281                         }
282                     },
283                     _ => {
284                         error!("unknown control packet chip_kind: {:?}", kind);
285                         break;
286                     }
287                 };
288             }
289         })
290         .unwrap()
291 }
292 
293 // For connector.
294 static CONNECTOR_FILES: OnceLock<Arc<RwLock<HashMap<u32, File>>>> = OnceLock::new();
295 
get_connector_files() -> Arc<RwLock<HashMap<u32, File>>>296 fn get_connector_files() -> Arc<RwLock<HashMap<u32, File>>> {
297     CONNECTOR_FILES.get_or_init(|| Arc::new(RwLock::new(HashMap::new()))).clone()
298 }
299 
300 /// This function is called when a packet is received from the gRPC server.
connector_grpc_read_callback(stream_id: u32, proto_bytes: &[u8])301 fn connector_grpc_read_callback(stream_id: u32, proto_bytes: &[u8]) {
302     let request = PacketRequest::parse_from_bytes(proto_bytes).unwrap();
303 
304     let mut buffer = Vec::<u8>::new();
305     if request.has_hci_packet() {
306         buffer.push(request.hci_packet().packet_type.enum_value_or_default().value() as u8);
307         buffer.extend(&request.hci_packet().packet);
308     } else if request.has_packet() {
309         buffer.extend(request.packet());
310     }
311 
312     if let Some(mut file_in) = get_connector_files().read().unwrap().get(&stream_id) {
313         if let Err(e) = file_in.write_all(&buffer[..]) {
314             error!("Failed to write: {}", e);
315         }
316     } else {
317         warn!("Unable to find file with stream_id {}", stream_id);
318     }
319 }
320 
321 /// Read from grpc server and write back to file descriptor.
connector_grpc_reader(chip_kind: ChipKind, stream_id: u32, file_in: File) -> JoinHandle<()>322 fn connector_grpc_reader(chip_kind: ChipKind, stream_id: u32, file_in: File) -> JoinHandle<()> {
323     info!("Connecting grpc reader for stream_id: {}", stream_id);
324     thread::Builder::new()
325         .name(format!("grpc_reader_{}", stream_id))
326         .spawn(move || {
327             {
328                 let connector_files = get_connector_files();
329                 let mut binding = connector_files.write().unwrap();
330                 if binding.contains_key(&stream_id) {
331                     error!(
332                         "register_connector: key already present for \
333                                  stream_id: {stream_id}"
334                     );
335                 }
336                 binding.insert(stream_id, file_in);
337             }
338             if (chip_kind != ChipKind::BLUETOOTH) && (chip_kind != ChipKind::UWB) {
339                 warn!("Unable to register connector for chip type {:?}", chip_kind);
340             }
341             // Read packet from grpc and send to file_in.
342             ffi_transport::read_packet_response_loop(stream_id, connector_grpc_read_callback);
343 
344             get_connector_files().write().unwrap().remove(&stream_id);
345         })
346         .unwrap()
347 }
348 
349 /// Create threads to forward file descriptors to another netsim daemon.
run_fd_connector(startup_json: &String, server: &str) -> Result<(), String>350 pub fn run_fd_connector(startup_json: &String, server: &str) -> Result<(), String> {
351     info!("Running fd connector with {startup_json}");
352     let startup_info =
353         match protobuf_json_mapping::parse_from_str::<StartupInfoProto>(startup_json.as_str()) {
354             Ok(startup_info) => startup_info,
355             Err(e) => {
356                 return Err(format!("Error parsing startup info: {:?}", e.to_string()));
357             }
358         };
359     let server = server.to_owned();
360 
361     let chip_count = startup_info.devices.len();
362     let mut handles = Vec::with_capacity(chip_count);
363 
364     for device in startup_info.devices {
365         for chip in device.chips {
366             let chip_kind = chip.kind.enum_value_or_default();
367             // Cf writes to fd_out and reads from fd_in
368             // SAFETY: Our caller promises that the file descriptors in the JSON are valid
369             // and open.
370             let file_in = unsafe { File::from_raw_fd(chip.fd_in) };
371 
372             let stream_id = ffi_transport::stream_packets(&server);
373             // Send out initial info of PacketRequest to grpc server.
374             let mut initial_request = PacketRequest::new();
375             initial_request.set_initial_info(ChipInfoProto {
376                 name: device.name.clone(),
377                 chip: MessageField::some(chip.clone()),
378                 device_info: device.device_info.clone(),
379                 ..Default::default()
380             });
381             ffi_transport::write_packet_request(
382                 stream_id,
383                 &initial_request.write_to_bytes().unwrap(),
384             );
385             info!("Sent initial request to grpc for stream_id: {}", stream_id);
386 
387             handles.push(connector_grpc_reader(chip_kind, stream_id, file_in));
388 
389             // TODO: switch to runtime.spawn once FIFOs are available in Tokio
390             // SAFETY: Our caller promises that the file descriptors in the JSON are valid
391             // and open.
392             handles.push(unsafe { connector_fd_reader(chip.fd_out, chip_kind, stream_id) });
393         }
394     }
395     // Wait for all of them to complete.
396     for handle in handles {
397         // The `spawn` method returns a `JoinHandle`. A `JoinHandle` is
398         // a future, so we can wait for it using `block_on`.
399         // runtime.block_on(handle).unwrap();
400         // TODO: use runtime.block_on once FIFOs are available in Tokio
401         handle.join().unwrap();
402     }
403     Ok(())
404 }
405