1 // Copyright 2022 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 use anyhow::Result;
16 use pdl_runtime::Packet;
17 use serde::{Deserialize, Serialize};
18 use std::collections::HashMap;
19 use std::fmt::Display;
20 use std::path::PathBuf;
21 use std::pin::Pin;
22 use thiserror::Error;
23 use tokio::sync::{broadcast, mpsc, oneshot};
24 
25 pub mod packets;
26 mod pcapng;
27 
28 use packets::uci::{self, *};
29 
30 mod device;
31 use device::{Device, MAX_DEVICE, MAX_SESSION};
32 
33 mod session;
34 
35 mod mac_address;
36 pub use mac_address::MacAddress;
37 
38 mod app_config;
39 pub use app_config::AppConfig;
40 
41 pub type UciPacket = Vec<u8>;
42 pub type UciStream = Pin<Box<dyn futures::stream::Stream<Item = Vec<u8>> + Send>>;
43 pub type UciSink = Pin<Box<dyn futures::sink::Sink<Vec<u8>, Error = anyhow::Error> + Send>>;
44 
45 /// Handle allocated for created devices or anchors.
46 /// The handle is unique across the lifetime of the Pica context
47 /// and callers may assume that one handle is never reused.
48 pub type Handle = usize;
49 
50 /// Ranging measurement produced by a ranging estimator.
51 #[derive(Clone, Copy, Default, Debug)]
52 pub struct RangingMeasurement {
53     pub range: u16,
54     pub azimuth: i16,
55     pub elevation: i8,
56 }
57 
58 /// Trait matching the capabilities of a ranging estimator.
59 /// The estimator manages the position of the devices, and chooses
60 /// the algorithm used to generate the ranging measurements.
61 pub trait RangingEstimator: Send + Sync {
62     /// Evaluate the ranging measurement for the two input devices
63     /// identified by their respective handle. The result is a triplet
64     /// containing the range, azimuth, and elevation of the right device
65     /// relative to the left device.
66     /// Return `None` if the measurement could not be estimated, e.g. because
67     /// the devices are out of range.
estimate(&self, left: &Handle, right: &Handle) -> Option<RangingMeasurement>68     fn estimate(&self, left: &Handle, right: &Handle) -> Option<RangingMeasurement>;
69 }
70 
71 /// Pica emulation environment.
72 /// All the devices added to this environment are emulated as if they were
73 /// from the same physical space.
74 pub struct Pica {
75     counter: usize,
76     devices: HashMap<Handle, Device>,
77     anchors: HashMap<MacAddress, Anchor>,
78     command_rx: Option<mpsc::Receiver<PicaCommand>>,
79     command_tx: mpsc::Sender<PicaCommand>,
80     event_tx: broadcast::Sender<PicaEvent>,
81     ranging_estimator: Box<dyn RangingEstimator>,
82     pcapng_dir: Option<PathBuf>,
83 }
84 
85 #[derive(Error, Debug, Clone, PartialEq, Eq)]
86 pub enum PicaCommandError {
87     #[error("Device already exists: {0}")]
88     DeviceAlreadyExists(MacAddress),
89     #[error("Device not found: {0}")]
90     DeviceNotFound(MacAddress),
91 }
92 
93 pub enum PicaCommand {
94     // Connect a new device.
95     Connect(UciStream, UciSink),
96     // Disconnect the selected device.
97     Disconnect(usize),
98     // Execute ranging command for selected device and session.
99     Ranging(usize, u32),
100     // Send an in-band request to stop ranging to a peer controlee identified by address and session id.
101     StopRanging(MacAddress, u32),
102     // UCI packet received for the selected device.
103     UciPacket(usize, Vec<u8>),
104     // Create Anchor
105     CreateAnchor(
106         MacAddress,
107         oneshot::Sender<Result<Handle, PicaCommandError>>,
108     ),
109     // Destroy Anchor
110     DestroyAnchor(
111         MacAddress,
112         oneshot::Sender<Result<Handle, PicaCommandError>>,
113     ),
114 }
115 
116 impl Display for PicaCommand {
fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result117     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118         let cmd = match self {
119             PicaCommand::Connect(_, _) => "Connect",
120             PicaCommand::Disconnect(_) => "Disconnect",
121             PicaCommand::Ranging(_, _) => "Ranging",
122             PicaCommand::StopRanging(_, _) => "StopRanging",
123             PicaCommand::UciPacket(_, _) => "UciPacket",
124             PicaCommand::CreateAnchor(_, _) => "CreateAnchor",
125             PicaCommand::DestroyAnchor(_, _) => "DestroyAnchor",
126         };
127         write!(f, "{}", cmd)
128     }
129 }
130 
131 #[derive(Clone, Debug, Serialize)]
132 #[serde(untagged)]
133 pub enum PicaEvent {
134     // A UCI connection was added
135     Connected {
136         handle: Handle,
137         mac_address: MacAddress,
138     },
139     // A UCI connection was lost
140     Disconnected {
141         handle: Handle,
142         mac_address: MacAddress,
143     },
144 }
145 
146 #[derive(Clone, Copy, Debug, Serialize, Deserialize)]
147 pub enum Category {
148     Uci,
149     Anchor,
150 }
151 
152 #[derive(Debug, Clone, Copy)]
153 struct Anchor {
154     handle: Handle,
155     #[allow(unused)]
156     mac_address: MacAddress,
157 }
158 
make_measurement( mac_address: &MacAddress, local: RangingMeasurement, remote: RangingMeasurement, ) -> ShortAddressTwoWayRangingMeasurement159 fn make_measurement(
160     mac_address: &MacAddress,
161     local: RangingMeasurement,
162     remote: RangingMeasurement,
163 ) -> ShortAddressTwoWayRangingMeasurement {
164     if let MacAddress::Short(address) = mac_address {
165         ShortAddressTwoWayRangingMeasurement {
166             mac_address: u16::from_le_bytes(*address),
167             status: uci::Status::Ok,
168             nlos: 0, // in Line Of Sight
169             distance: local.range,
170             aoa_azimuth: local.azimuth as u16,
171             aoa_azimuth_fom: 100, // Yup, pretty sure about this
172             aoa_elevation: local.elevation as u16,
173             aoa_elevation_fom: 100, // Yup, pretty sure about this
174             aoa_destination_azimuth: remote.azimuth as u16,
175             aoa_destination_azimuth_fom: 100,
176             aoa_destination_elevation: remote.elevation as u16,
177             aoa_destination_elevation_fom: 100,
178             slot_index: 0,
179             rssi: u8::MAX,
180         }
181     } else {
182         panic!("Extended address is not supported.")
183     }
184 }
185 
186 impl Pica {
new(ranging_estimator: Box<dyn RangingEstimator>, pcapng_dir: Option<PathBuf>) -> Self187     pub fn new(ranging_estimator: Box<dyn RangingEstimator>, pcapng_dir: Option<PathBuf>) -> Self {
188         let (command_tx, command_rx) = mpsc::channel(MAX_SESSION * MAX_DEVICE);
189         let (event_tx, _) = broadcast::channel(16);
190         Pica {
191             devices: HashMap::new(),
192             anchors: HashMap::new(),
193             counter: 0,
194             command_rx: Some(command_rx),
195             command_tx,
196             event_tx,
197             ranging_estimator,
198             pcapng_dir,
199         }
200     }
201 
events(&self) -> broadcast::Receiver<PicaEvent>202     pub fn events(&self) -> broadcast::Receiver<PicaEvent> {
203         self.event_tx.subscribe()
204     }
205 
commands(&self) -> mpsc::Sender<PicaCommand>206     pub fn commands(&self) -> mpsc::Sender<PicaCommand> {
207         self.command_tx.clone()
208     }
209 
get_device_mut(&mut self, device_handle: usize) -> Option<&mut Device>210     fn get_device_mut(&mut self, device_handle: usize) -> Option<&mut Device> {
211         self.devices.get_mut(&device_handle)
212     }
213 
get_device(&self, device_handle: usize) -> Option<&Device>214     fn get_device(&self, device_handle: usize) -> Option<&Device> {
215         self.devices.get(&device_handle)
216     }
217 
get_category(&self, mac_address: &MacAddress) -> Option<Category>218     fn get_category(&self, mac_address: &MacAddress) -> Option<Category> {
219         if self.anchors.contains_key(mac_address) {
220             Some(Category::Anchor)
221         } else if self
222             .devices
223             .iter()
224             .any(|(_, device)| device.mac_address == *mac_address)
225         {
226             Some(Category::Uci)
227         } else {
228             None
229         }
230     }
231 
send_event(&self, event: PicaEvent)232     fn send_event(&self, event: PicaEvent) {
233         // An error here means that we have
234         // no receivers, so ignore it
235         let _ = self.event_tx.send(event);
236     }
237 
238     /// Handle an incoming stream of UCI packets.
239     /// Reassemble control packets when fragmented, data packets are unmodified.
read_routine( mut uci_stream: impl futures::stream::Stream<Item = Vec<u8>> + Unpin, cmd_tx: mpsc::Sender<PicaCommand>, handle: Handle, pcapng_file: Option<&pcapng::File>, ) -> anyhow::Result<()>240     async fn read_routine(
241         mut uci_stream: impl futures::stream::Stream<Item = Vec<u8>> + Unpin,
242         cmd_tx: mpsc::Sender<PicaCommand>,
243         handle: Handle,
244         pcapng_file: Option<&pcapng::File>,
245     ) -> anyhow::Result<()> {
246         use futures::stream::StreamExt;
247 
248         loop {
249             let mut complete_packet: Option<Vec<u8>> = None;
250             loop {
251                 let packet = uci_stream
252                     .next()
253                     .await
254                     .ok_or(anyhow::anyhow!("input packet stream closed"))?;
255                 let header =
256                     packets::uci::CommonPacketHeader::parse(&packet[0..COMMON_HEADER_SIZE])?;
257 
258                 if let Some(file) = pcapng_file {
259                     file.write(&packet, pcapng::Direction::Tx)?;
260                 }
261 
262                 match &mut complete_packet {
263                     Some(complete_packet) => {
264                         complete_packet.extend_from_slice(&packet[HEADER_SIZE..])
265                     }
266                     None => complete_packet = Some(packet),
267                 }
268 
269                 if header.get_pbf() == packets::uci::PacketBoundaryFlag::Complete
270                     || header.get_mt() == packets::uci::MessageType::Data
271                 {
272                     break;
273                 }
274             }
275 
276             cmd_tx
277                 .send(PicaCommand::UciPacket(handle, complete_packet.unwrap()))
278                 .await
279                 .unwrap()
280         }
281     }
282 
283     /// Segment a stream of UCI packets.
write_routine( mut uci_sink: impl futures::sink::Sink<Vec<u8>> + Unpin, mut packet_rx: mpsc::UnboundedReceiver<UciPacket>, _handle: Handle, pcapng_file: Option<&pcapng::File>, ) -> anyhow::Result<()>284     async fn write_routine(
285         mut uci_sink: impl futures::sink::Sink<Vec<u8>> + Unpin,
286         mut packet_rx: mpsc::UnboundedReceiver<UciPacket>,
287         _handle: Handle,
288         pcapng_file: Option<&pcapng::File>,
289     ) -> anyhow::Result<()> {
290         use futures::sink::SinkExt;
291 
292         loop {
293             let complete_packet = packet_rx
294                 .recv()
295                 .await
296                 .ok_or(anyhow::anyhow!("output packet stream closed"))?;
297             let mut offset = HEADER_SIZE;
298             let mt = parse_message_type(complete_packet[0]);
299 
300             while offset < complete_packet.len() {
301                 let remaining_length = complete_packet.len() - offset;
302                 let fragment_length = std::cmp::min(
303                     remaining_length,
304                     if mt == MessageType::Data {
305                         MAX_DATA_PACKET_PAYLOAD_SIZE
306                     } else {
307                         MAX_CTRL_PACKET_PAYLOAD_SIZE
308                     },
309                 );
310                 let pbf = if fragment_length == remaining_length {
311                     PacketBoundaryFlag::Complete
312                 } else {
313                     PacketBoundaryFlag::NotComplete
314                 };
315 
316                 let mut packet = Vec::with_capacity(HEADER_SIZE + fragment_length);
317 
318                 packet.extend_from_slice(&complete_packet[0..HEADER_SIZE]);
319                 const PBF_MASK: u8 = 0x10;
320                 packet[0] &= !PBF_MASK;
321                 packet[0] |= (pbf as u8) << 4;
322 
323                 match mt {
324                     MessageType::Data => {
325                         packet[2..4].copy_from_slice(&(fragment_length as u16).to_le_bytes())
326                     }
327                     _ => packet[3] = fragment_length as u8,
328                 }
329 
330                 packet.extend_from_slice(&complete_packet[offset..offset + fragment_length]);
331 
332                 if let Some(file) = pcapng_file {
333                     file.write(&packet, pcapng::Direction::Rx)?;
334                 }
335 
336                 uci_sink
337                     .send(packet)
338                     .await
339                     .map_err(|_| anyhow::anyhow!("output packet sink closed"))?;
340 
341                 offset += fragment_length;
342             }
343         }
344     }
345 
add_device(&mut self, stream: UciStream, sink: UciSink) -> Result<Handle>346     pub fn add_device(&mut self, stream: UciStream, sink: UciSink) -> Result<Handle> {
347         let (packet_tx, packet_rx) = mpsc::unbounded_channel();
348         let pica_tx = self.command_tx.clone();
349         let disconnect_tx = self.command_tx.clone();
350         let pcapng_dir = self.pcapng_dir.clone();
351 
352         let handle = self.counter;
353         self.counter += 1;
354 
355         log::debug!("[{}] Connecting device", handle);
356 
357         let mac_address = MacAddress::Short((handle as u16).to_be_bytes());
358         let mut device = Device::new(handle, mac_address, packet_tx, self.command_tx.clone());
359         device.init();
360 
361         self.send_event(PicaEvent::Connected {
362             handle,
363             mac_address: device.mac_address,
364         });
365 
366         self.devices.insert(handle, device);
367 
368         // Spawn and detach the connection handling task.
369         // The task notifies pica when exiting to let it clean
370         // the state.
371         tokio::task::spawn(async move {
372             let pcapng_file = if let Some(dir) = pcapng_dir {
373                 let full_path = dir.join(format!("device-{}.pcapng", handle));
374                 log::debug!("Recording pcapng to file {}", full_path.as_path().display());
375                 Some(pcapng::File::create(full_path).unwrap())
376             } else {
377                 None
378             };
379 
380             let _ = tokio::try_join!(
381                 async { Self::read_routine(stream, pica_tx, handle, pcapng_file.as_ref()).await },
382                 async { Self::write_routine(sink, packet_rx, handle, pcapng_file.as_ref()).await }
383             );
384 
385             disconnect_tx
386                 .send(PicaCommand::Disconnect(handle))
387                 .await
388                 .unwrap()
389         });
390 
391         Ok(handle)
392     }
393 
disconnect(&mut self, device_handle: usize)394     fn disconnect(&mut self, device_handle: usize) {
395         log::debug!("[{}] Disconnecting device", device_handle);
396 
397         if let Some(device) = self.devices.get(&device_handle) {
398             self.send_event(PicaEvent::Disconnected {
399                 handle: device_handle,
400                 mac_address: device.mac_address,
401             });
402             self.devices.remove(&device_handle);
403         }
404     }
405 
ranging(&mut self, device_handle: usize, session_id: u32)406     fn ranging(&mut self, device_handle: usize, session_id: u32) {
407         log::debug!("[{}] Ranging event", device_handle);
408         log::debug!("  session_id={}", session_id);
409 
410         let device = self.get_device(device_handle).unwrap();
411         let session = device.session(session_id).unwrap();
412 
413         let mut data_transfer = Vec::new();
414         let mut measurements = Vec::new();
415 
416         // Look for compatible anchors.
417         for mac_address in session.get_dst_mac_address() {
418             if let Some(other) = self.anchors.get(mac_address) {
419                 let Some(local) = self
420                     .ranging_estimator
421                     .estimate(&device.handle, &other.handle)
422                 else {
423                     continue;
424                 };
425                 let Some(remote) = self
426                     .ranging_estimator
427                     .estimate(&other.handle, &device.handle)
428                 else {
429                     continue;
430                 };
431                 measurements.push(make_measurement(mac_address, local, remote));
432             }
433         }
434 
435         // Look for compatible ranging sessions in other devices.
436         for peer_device in self.devices.values() {
437             if peer_device.handle == device_handle {
438                 continue;
439             }
440 
441             if peer_device.can_start_ranging(session, session_id) {
442                 let peer_mac_address = peer_device
443                     .session(session_id)
444                     .unwrap()
445                     .app_config
446                     .device_mac_address
447                     .unwrap();
448                 let local = self
449                     .ranging_estimator
450                     .estimate(&device.handle, &peer_device.handle)
451                     .unwrap_or(Default::default());
452                 let remote = self
453                     .ranging_estimator
454                     .estimate(&peer_device.handle, &device.handle)
455                     .unwrap_or(Default::default());
456                 measurements.push(make_measurement(&peer_mac_address, local, remote));
457             }
458 
459             if device.can_start_data_transfer(session_id)
460                 && peer_device.can_receive_data_transfer(session_id)
461             {
462                 data_transfer.push(peer_device);
463             }
464         }
465 
466         // TODO: Data transfer should be limited in size for
467         // each round of ranging
468         for peer_device in data_transfer.iter() {
469             peer_device
470                 .tx
471                 .send(
472                     DataMessageRcvBuilder {
473                         application_data: session.data().clone().into(),
474                         data_sequence_number: 0x01,
475                         pbf: PacketBoundaryFlag::Complete,
476                         session_handle: session_id,
477                         source_address: session.app_config.device_mac_address.unwrap().into(),
478                         status: uci::Status::Ok,
479                     }
480                     .build()
481                     .encode_to_vec()
482                     .unwrap(),
483                 )
484                 .unwrap();
485         }
486         if session.is_session_info_ntf_enabled() {
487             device
488                 .tx
489                 .send(
490                     // TODO: support extended address
491                     ShortMacTwoWaySessionInfoNtfBuilder {
492                         sequence_number: session.sequence_number,
493                         session_token: session_id,
494                         rcr_indicator: 0,            //TODO
495                         current_ranging_interval: 0, //TODO
496                         two_way_ranging_measurements: measurements,
497                         vendor_data: vec![],
498                     }
499                     .build()
500                     .encode_to_vec()
501                     .unwrap(),
502                 )
503                 .unwrap();
504 
505             let device = self.get_device_mut(device_handle).unwrap();
506             let session = device.session_mut(session_id).unwrap();
507 
508             session.sequence_number += 1;
509         }
510 
511         // TODO: Clean the data only when all the data is transfered
512         let device = self.get_device_mut(device_handle).unwrap();
513         let session = device.session_mut(session_id).unwrap();
514 
515         session.clear_data();
516     }
517 
uci_packet(&mut self, device_handle: usize, packet: Vec<u8>)518     fn uci_packet(&mut self, device_handle: usize, packet: Vec<u8>) {
519         match self.get_device_mut(device_handle) {
520             Some(device) => device.receive_packet(packet),
521             None => log::error!("Device {} not found", device_handle),
522         }
523     }
524 
pica_command(&mut self, command: PicaCommand)525     fn pica_command(&mut self, command: PicaCommand) {
526         use PicaCommand::*;
527         match command {
528             Connect(stream, sink) => {
529                 let _ = self.add_device(stream, sink);
530             }
531             Disconnect(device_handle) => self.disconnect(device_handle),
532             Ranging(device_handle, session_id) => self.ranging(device_handle, session_id),
533             StopRanging(mac_address, session_id) => {
534                 self.stop_controlee_ranging(&mac_address, session_id)
535             }
536             UciPacket(device_handle, packet) => self.uci_packet(device_handle, packet),
537             CreateAnchor(mac_address, pica_cmd_rsp_tx) => {
538                 self.create_anchor(mac_address, pica_cmd_rsp_tx)
539             }
540             DestroyAnchor(mac_address, pica_cmd_rsp_tx) => {
541                 self.destroy_anchor(mac_address, pica_cmd_rsp_tx)
542             }
543         }
544     }
545 
546     /// Run the internal pica event loop.
run(mut self) -> Result<()>547     pub async fn run(mut self) -> Result<()> {
548         let Some(mut command_rx) = self.command_rx.take() else {
549             anyhow::bail!("missing pica command receiver")
550         };
551         loop {
552             if let Some(command) = command_rx.recv().await {
553                 self.pica_command(command)
554             }
555         }
556     }
557 
558     // Handle the in-band StopRanging command sent from controller to the controlee with
559     // corresponding mac_address and session_id.
stop_controlee_ranging(&mut self, mac_address: &MacAddress, session_id: u32)560     fn stop_controlee_ranging(&mut self, mac_address: &MacAddress, session_id: u32) {
561         for device in self.devices.values_mut() {
562             let Some(session) = device.session_mut(session_id) else {
563                 continue;
564             };
565 
566             if session.app_config.device_mac_address != Some(*mac_address) {
567                 continue;
568             }
569 
570             if session.session_state() == SessionState::SessionStateActive {
571                 session.stop_ranging_task();
572                 session.set_state(
573                     SessionState::SessionStateIdle,
574                     ReasonCode::SessionStoppedDueToInbandSignal,
575                 );
576                 device.n_active_sessions = device.n_active_sessions.saturating_sub(1);
577                 if device.n_active_sessions == 0 {
578                     device.set_state(DeviceState::DeviceStateReady);
579                 }
580             } else {
581                 log::warn!("stop_controlee_ranging: session is not active !");
582             }
583         }
584     }
585 
586     #[allow(clippy::map_entry)]
create_anchor( &mut self, mac_address: MacAddress, rsp_tx: oneshot::Sender<Result<Handle, PicaCommandError>>, )587     fn create_anchor(
588         &mut self,
589         mac_address: MacAddress,
590         rsp_tx: oneshot::Sender<Result<Handle, PicaCommandError>>,
591     ) {
592         log::debug!("[_] Create anchor");
593         log::debug!("  mac_address: {}", mac_address);
594 
595         let status = if self.get_category(&mac_address).is_some() {
596             Err(PicaCommandError::DeviceAlreadyExists(mac_address))
597         } else {
598             let handle = self.counter;
599             self.counter += 1;
600 
601             assert!(self
602                 .anchors
603                 .insert(
604                     mac_address,
605                     Anchor {
606                         handle,
607                         mac_address,
608                     },
609                 )
610                 .is_none());
611 
612             Ok(handle)
613         };
614 
615         rsp_tx.send(status).unwrap_or_else(|err| {
616             log::error!("Failed to send create-anchor command response: {:?}", err)
617         })
618     }
619 
destroy_anchor( &mut self, mac_address: MacAddress, rsp_tx: oneshot::Sender<Result<Handle, PicaCommandError>>, )620     fn destroy_anchor(
621         &mut self,
622         mac_address: MacAddress,
623         rsp_tx: oneshot::Sender<Result<Handle, PicaCommandError>>,
624     ) {
625         log::debug!("[_] Destroy anchor");
626         log::debug!("  mac_address: {}", mac_address);
627 
628         let status = match self.anchors.remove(&mac_address) {
629             None => Err(PicaCommandError::DeviceNotFound(mac_address)),
630             Some(anchor) => Ok(anchor.handle),
631         };
632 
633         rsp_tx.send(status).unwrap_or_else(|err| {
634             log::error!("Failed to send destroy-anchor command response: {:?}", err)
635         })
636     }
637 }
638 
639 /// Run the internal pica event loop.
640 /// As opposed to Pica::run, the context is passed under a mutex, which
641 /// allows synchronous access to the context for device creation.
run(this: &std::sync::Mutex<Pica>) -> Result<()>642 pub async fn run(this: &std::sync::Mutex<Pica>) -> Result<()> {
643     // Extract the mpsc receiver from the Pica context.
644     // The receiver cannot be cloned.
645     let Some(mut command_rx) = this.lock().unwrap().command_rx.take() else {
646         anyhow::bail!("missing pica command receiver");
647     };
648 
649     loop {
650         if let Some(command) = command_rx.recv().await {
651             this.lock().unwrap().pica_command(command)
652         }
653     }
654 }