1 // Copyright 2022 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 use anyhow::Result;
16 use pdl_runtime::Packet;
17 use serde::{Deserialize, Serialize};
18 use std::collections::HashMap;
19 use std::fmt::Display;
20 use std::path::PathBuf;
21 use std::pin::Pin;
22 use thiserror::Error;
23 use tokio::sync::{broadcast, mpsc, oneshot};
24
25 pub mod packets;
26 mod pcapng;
27
28 use packets::uci::{self, *};
29
30 mod device;
31 use device::{Device, MAX_DEVICE, MAX_SESSION};
32
33 mod session;
34
35 mod mac_address;
36 pub use mac_address::MacAddress;
37
38 mod app_config;
39 pub use app_config::AppConfig;
40
41 pub type UciPacket = Vec<u8>;
42 pub type UciStream = Pin<Box<dyn futures::stream::Stream<Item = Vec<u8>> + Send>>;
43 pub type UciSink = Pin<Box<dyn futures::sink::Sink<Vec<u8>, Error = anyhow::Error> + Send>>;
44
45 /// Handle allocated for created devices or anchors.
46 /// The handle is unique across the lifetime of the Pica context
47 /// and callers may assume that one handle is never reused.
48 pub type Handle = usize;
49
50 /// Ranging measurement produced by a ranging estimator.
51 #[derive(Clone, Copy, Default, Debug)]
52 pub struct RangingMeasurement {
53 pub range: u16,
54 pub azimuth: i16,
55 pub elevation: i8,
56 }
57
58 /// Trait matching the capabilities of a ranging estimator.
59 /// The estimator manages the position of the devices, and chooses
60 /// the algorithm used to generate the ranging measurements.
61 pub trait RangingEstimator: Send + Sync {
62 /// Evaluate the ranging measurement for the two input devices
63 /// identified by their respective handle. The result is a triplet
64 /// containing the range, azimuth, and elevation of the right device
65 /// relative to the left device.
66 /// Return `None` if the measurement could not be estimated, e.g. because
67 /// the devices are out of range.
estimate(&self, left: &Handle, right: &Handle) -> Option<RangingMeasurement>68 fn estimate(&self, left: &Handle, right: &Handle) -> Option<RangingMeasurement>;
69 }
70
71 /// Pica emulation environment.
72 /// All the devices added to this environment are emulated as if they were
73 /// from the same physical space.
74 pub struct Pica {
75 counter: usize,
76 devices: HashMap<Handle, Device>,
77 anchors: HashMap<MacAddress, Anchor>,
78 command_rx: Option<mpsc::Receiver<PicaCommand>>,
79 command_tx: mpsc::Sender<PicaCommand>,
80 event_tx: broadcast::Sender<PicaEvent>,
81 ranging_estimator: Box<dyn RangingEstimator>,
82 pcapng_dir: Option<PathBuf>,
83 }
84
85 #[derive(Error, Debug, Clone, PartialEq, Eq)]
86 pub enum PicaCommandError {
87 #[error("Device already exists: {0}")]
88 DeviceAlreadyExists(MacAddress),
89 #[error("Device not found: {0}")]
90 DeviceNotFound(MacAddress),
91 }
92
93 pub enum PicaCommand {
94 // Connect a new device.
95 Connect(UciStream, UciSink),
96 // Disconnect the selected device.
97 Disconnect(usize),
98 // Execute ranging command for selected device and session.
99 Ranging(usize, u32),
100 // Send an in-band request to stop ranging to a peer controlee identified by address and session id.
101 StopRanging(MacAddress, u32),
102 // UCI packet received for the selected device.
103 UciPacket(usize, Vec<u8>),
104 // Create Anchor
105 CreateAnchor(
106 MacAddress,
107 oneshot::Sender<Result<Handle, PicaCommandError>>,
108 ),
109 // Destroy Anchor
110 DestroyAnchor(
111 MacAddress,
112 oneshot::Sender<Result<Handle, PicaCommandError>>,
113 ),
114 }
115
116 impl Display for PicaCommand {
fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result117 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118 let cmd = match self {
119 PicaCommand::Connect(_, _) => "Connect",
120 PicaCommand::Disconnect(_) => "Disconnect",
121 PicaCommand::Ranging(_, _) => "Ranging",
122 PicaCommand::StopRanging(_, _) => "StopRanging",
123 PicaCommand::UciPacket(_, _) => "UciPacket",
124 PicaCommand::CreateAnchor(_, _) => "CreateAnchor",
125 PicaCommand::DestroyAnchor(_, _) => "DestroyAnchor",
126 };
127 write!(f, "{}", cmd)
128 }
129 }
130
131 #[derive(Clone, Debug, Serialize)]
132 #[serde(untagged)]
133 pub enum PicaEvent {
134 // A UCI connection was added
135 Connected {
136 handle: Handle,
137 mac_address: MacAddress,
138 },
139 // A UCI connection was lost
140 Disconnected {
141 handle: Handle,
142 mac_address: MacAddress,
143 },
144 }
145
146 #[derive(Clone, Copy, Debug, Serialize, Deserialize)]
147 pub enum Category {
148 Uci,
149 Anchor,
150 }
151
152 #[derive(Debug, Clone, Copy)]
153 struct Anchor {
154 handle: Handle,
155 #[allow(unused)]
156 mac_address: MacAddress,
157 }
158
make_measurement( mac_address: &MacAddress, local: RangingMeasurement, remote: RangingMeasurement, ) -> ShortAddressTwoWayRangingMeasurement159 fn make_measurement(
160 mac_address: &MacAddress,
161 local: RangingMeasurement,
162 remote: RangingMeasurement,
163 ) -> ShortAddressTwoWayRangingMeasurement {
164 if let MacAddress::Short(address) = mac_address {
165 ShortAddressTwoWayRangingMeasurement {
166 mac_address: u16::from_le_bytes(*address),
167 status: uci::Status::Ok,
168 nlos: 0, // in Line Of Sight
169 distance: local.range,
170 aoa_azimuth: local.azimuth as u16,
171 aoa_azimuth_fom: 100, // Yup, pretty sure about this
172 aoa_elevation: local.elevation as u16,
173 aoa_elevation_fom: 100, // Yup, pretty sure about this
174 aoa_destination_azimuth: remote.azimuth as u16,
175 aoa_destination_azimuth_fom: 100,
176 aoa_destination_elevation: remote.elevation as u16,
177 aoa_destination_elevation_fom: 100,
178 slot_index: 0,
179 rssi: u8::MAX,
180 }
181 } else {
182 panic!("Extended address is not supported.")
183 }
184 }
185
186 impl Pica {
new(ranging_estimator: Box<dyn RangingEstimator>, pcapng_dir: Option<PathBuf>) -> Self187 pub fn new(ranging_estimator: Box<dyn RangingEstimator>, pcapng_dir: Option<PathBuf>) -> Self {
188 let (command_tx, command_rx) = mpsc::channel(MAX_SESSION * MAX_DEVICE);
189 let (event_tx, _) = broadcast::channel(16);
190 Pica {
191 devices: HashMap::new(),
192 anchors: HashMap::new(),
193 counter: 0,
194 command_rx: Some(command_rx),
195 command_tx,
196 event_tx,
197 ranging_estimator,
198 pcapng_dir,
199 }
200 }
201
events(&self) -> broadcast::Receiver<PicaEvent>202 pub fn events(&self) -> broadcast::Receiver<PicaEvent> {
203 self.event_tx.subscribe()
204 }
205
commands(&self) -> mpsc::Sender<PicaCommand>206 pub fn commands(&self) -> mpsc::Sender<PicaCommand> {
207 self.command_tx.clone()
208 }
209
get_device_mut(&mut self, device_handle: usize) -> Option<&mut Device>210 fn get_device_mut(&mut self, device_handle: usize) -> Option<&mut Device> {
211 self.devices.get_mut(&device_handle)
212 }
213
get_device(&self, device_handle: usize) -> Option<&Device>214 fn get_device(&self, device_handle: usize) -> Option<&Device> {
215 self.devices.get(&device_handle)
216 }
217
get_category(&self, mac_address: &MacAddress) -> Option<Category>218 fn get_category(&self, mac_address: &MacAddress) -> Option<Category> {
219 if self.anchors.contains_key(mac_address) {
220 Some(Category::Anchor)
221 } else if self
222 .devices
223 .iter()
224 .any(|(_, device)| device.mac_address == *mac_address)
225 {
226 Some(Category::Uci)
227 } else {
228 None
229 }
230 }
231
send_event(&self, event: PicaEvent)232 fn send_event(&self, event: PicaEvent) {
233 // An error here means that we have
234 // no receivers, so ignore it
235 let _ = self.event_tx.send(event);
236 }
237
238 /// Handle an incoming stream of UCI packets.
239 /// Reassemble control packets when fragmented, data packets are unmodified.
read_routine( mut uci_stream: impl futures::stream::Stream<Item = Vec<u8>> + Unpin, cmd_tx: mpsc::Sender<PicaCommand>, handle: Handle, pcapng_file: Option<&pcapng::File>, ) -> anyhow::Result<()>240 async fn read_routine(
241 mut uci_stream: impl futures::stream::Stream<Item = Vec<u8>> + Unpin,
242 cmd_tx: mpsc::Sender<PicaCommand>,
243 handle: Handle,
244 pcapng_file: Option<&pcapng::File>,
245 ) -> anyhow::Result<()> {
246 use futures::stream::StreamExt;
247
248 loop {
249 let mut complete_packet: Option<Vec<u8>> = None;
250 loop {
251 let packet = uci_stream
252 .next()
253 .await
254 .ok_or(anyhow::anyhow!("input packet stream closed"))?;
255 let header =
256 packets::uci::CommonPacketHeader::parse(&packet[0..COMMON_HEADER_SIZE])?;
257
258 if let Some(file) = pcapng_file {
259 file.write(&packet, pcapng::Direction::Tx)?;
260 }
261
262 match &mut complete_packet {
263 Some(complete_packet) => {
264 complete_packet.extend_from_slice(&packet[HEADER_SIZE..])
265 }
266 None => complete_packet = Some(packet),
267 }
268
269 if header.get_pbf() == packets::uci::PacketBoundaryFlag::Complete
270 || header.get_mt() == packets::uci::MessageType::Data
271 {
272 break;
273 }
274 }
275
276 cmd_tx
277 .send(PicaCommand::UciPacket(handle, complete_packet.unwrap()))
278 .await
279 .unwrap()
280 }
281 }
282
283 /// Segment a stream of UCI packets.
write_routine( mut uci_sink: impl futures::sink::Sink<Vec<u8>> + Unpin, mut packet_rx: mpsc::UnboundedReceiver<UciPacket>, _handle: Handle, pcapng_file: Option<&pcapng::File>, ) -> anyhow::Result<()>284 async fn write_routine(
285 mut uci_sink: impl futures::sink::Sink<Vec<u8>> + Unpin,
286 mut packet_rx: mpsc::UnboundedReceiver<UciPacket>,
287 _handle: Handle,
288 pcapng_file: Option<&pcapng::File>,
289 ) -> anyhow::Result<()> {
290 use futures::sink::SinkExt;
291
292 loop {
293 let complete_packet = packet_rx
294 .recv()
295 .await
296 .ok_or(anyhow::anyhow!("output packet stream closed"))?;
297 let mut offset = HEADER_SIZE;
298 let mt = parse_message_type(complete_packet[0]);
299
300 while offset < complete_packet.len() {
301 let remaining_length = complete_packet.len() - offset;
302 let fragment_length = std::cmp::min(
303 remaining_length,
304 if mt == MessageType::Data {
305 MAX_DATA_PACKET_PAYLOAD_SIZE
306 } else {
307 MAX_CTRL_PACKET_PAYLOAD_SIZE
308 },
309 );
310 let pbf = if fragment_length == remaining_length {
311 PacketBoundaryFlag::Complete
312 } else {
313 PacketBoundaryFlag::NotComplete
314 };
315
316 let mut packet = Vec::with_capacity(HEADER_SIZE + fragment_length);
317
318 packet.extend_from_slice(&complete_packet[0..HEADER_SIZE]);
319 const PBF_MASK: u8 = 0x10;
320 packet[0] &= !PBF_MASK;
321 packet[0] |= (pbf as u8) << 4;
322
323 match mt {
324 MessageType::Data => {
325 packet[2..4].copy_from_slice(&(fragment_length as u16).to_le_bytes())
326 }
327 _ => packet[3] = fragment_length as u8,
328 }
329
330 packet.extend_from_slice(&complete_packet[offset..offset + fragment_length]);
331
332 if let Some(file) = pcapng_file {
333 file.write(&packet, pcapng::Direction::Rx)?;
334 }
335
336 uci_sink
337 .send(packet)
338 .await
339 .map_err(|_| anyhow::anyhow!("output packet sink closed"))?;
340
341 offset += fragment_length;
342 }
343 }
344 }
345
add_device(&mut self, stream: UciStream, sink: UciSink) -> Result<Handle>346 pub fn add_device(&mut self, stream: UciStream, sink: UciSink) -> Result<Handle> {
347 let (packet_tx, packet_rx) = mpsc::unbounded_channel();
348 let pica_tx = self.command_tx.clone();
349 let disconnect_tx = self.command_tx.clone();
350 let pcapng_dir = self.pcapng_dir.clone();
351
352 let handle = self.counter;
353 self.counter += 1;
354
355 log::debug!("[{}] Connecting device", handle);
356
357 let mac_address = MacAddress::Short((handle as u16).to_be_bytes());
358 let mut device = Device::new(handle, mac_address, packet_tx, self.command_tx.clone());
359 device.init();
360
361 self.send_event(PicaEvent::Connected {
362 handle,
363 mac_address: device.mac_address,
364 });
365
366 self.devices.insert(handle, device);
367
368 // Spawn and detach the connection handling task.
369 // The task notifies pica when exiting to let it clean
370 // the state.
371 tokio::task::spawn(async move {
372 let pcapng_file = if let Some(dir) = pcapng_dir {
373 let full_path = dir.join(format!("device-{}.pcapng", handle));
374 log::debug!("Recording pcapng to file {}", full_path.as_path().display());
375 Some(pcapng::File::create(full_path).unwrap())
376 } else {
377 None
378 };
379
380 let _ = tokio::try_join!(
381 async { Self::read_routine(stream, pica_tx, handle, pcapng_file.as_ref()).await },
382 async { Self::write_routine(sink, packet_rx, handle, pcapng_file.as_ref()).await }
383 );
384
385 disconnect_tx
386 .send(PicaCommand::Disconnect(handle))
387 .await
388 .unwrap()
389 });
390
391 Ok(handle)
392 }
393
disconnect(&mut self, device_handle: usize)394 fn disconnect(&mut self, device_handle: usize) {
395 log::debug!("[{}] Disconnecting device", device_handle);
396
397 if let Some(device) = self.devices.get(&device_handle) {
398 self.send_event(PicaEvent::Disconnected {
399 handle: device_handle,
400 mac_address: device.mac_address,
401 });
402 self.devices.remove(&device_handle);
403 }
404 }
405
ranging(&mut self, device_handle: usize, session_id: u32)406 fn ranging(&mut self, device_handle: usize, session_id: u32) {
407 log::debug!("[{}] Ranging event", device_handle);
408 log::debug!(" session_id={}", session_id);
409
410 let device = self.get_device(device_handle).unwrap();
411 let session = device.session(session_id).unwrap();
412
413 let mut data_transfer = Vec::new();
414 let mut measurements = Vec::new();
415
416 // Look for compatible anchors.
417 for mac_address in session.get_dst_mac_address() {
418 if let Some(other) = self.anchors.get(mac_address) {
419 let Some(local) = self
420 .ranging_estimator
421 .estimate(&device.handle, &other.handle)
422 else {
423 continue;
424 };
425 let Some(remote) = self
426 .ranging_estimator
427 .estimate(&other.handle, &device.handle)
428 else {
429 continue;
430 };
431 measurements.push(make_measurement(mac_address, local, remote));
432 }
433 }
434
435 // Look for compatible ranging sessions in other devices.
436 for peer_device in self.devices.values() {
437 if peer_device.handle == device_handle {
438 continue;
439 }
440
441 if peer_device.can_start_ranging(session, session_id) {
442 let peer_mac_address = peer_device
443 .session(session_id)
444 .unwrap()
445 .app_config
446 .device_mac_address
447 .unwrap();
448 let local = self
449 .ranging_estimator
450 .estimate(&device.handle, &peer_device.handle)
451 .unwrap_or(Default::default());
452 let remote = self
453 .ranging_estimator
454 .estimate(&peer_device.handle, &device.handle)
455 .unwrap_or(Default::default());
456 measurements.push(make_measurement(&peer_mac_address, local, remote));
457 }
458
459 if device.can_start_data_transfer(session_id)
460 && peer_device.can_receive_data_transfer(session_id)
461 {
462 data_transfer.push(peer_device);
463 }
464 }
465
466 // TODO: Data transfer should be limited in size for
467 // each round of ranging
468 for peer_device in data_transfer.iter() {
469 peer_device
470 .tx
471 .send(
472 DataMessageRcvBuilder {
473 application_data: session.data().clone().into(),
474 data_sequence_number: 0x01,
475 pbf: PacketBoundaryFlag::Complete,
476 session_handle: session_id,
477 source_address: session.app_config.device_mac_address.unwrap().into(),
478 status: uci::Status::Ok,
479 }
480 .build()
481 .encode_to_vec()
482 .unwrap(),
483 )
484 .unwrap();
485 }
486 if session.is_session_info_ntf_enabled() {
487 device
488 .tx
489 .send(
490 // TODO: support extended address
491 ShortMacTwoWaySessionInfoNtfBuilder {
492 sequence_number: session.sequence_number,
493 session_token: session_id,
494 rcr_indicator: 0, //TODO
495 current_ranging_interval: 0, //TODO
496 two_way_ranging_measurements: measurements,
497 vendor_data: vec![],
498 }
499 .build()
500 .encode_to_vec()
501 .unwrap(),
502 )
503 .unwrap();
504
505 let device = self.get_device_mut(device_handle).unwrap();
506 let session = device.session_mut(session_id).unwrap();
507
508 session.sequence_number += 1;
509 }
510
511 // TODO: Clean the data only when all the data is transfered
512 let device = self.get_device_mut(device_handle).unwrap();
513 let session = device.session_mut(session_id).unwrap();
514
515 session.clear_data();
516 }
517
uci_packet(&mut self, device_handle: usize, packet: Vec<u8>)518 fn uci_packet(&mut self, device_handle: usize, packet: Vec<u8>) {
519 match self.get_device_mut(device_handle) {
520 Some(device) => device.receive_packet(packet),
521 None => log::error!("Device {} not found", device_handle),
522 }
523 }
524
pica_command(&mut self, command: PicaCommand)525 fn pica_command(&mut self, command: PicaCommand) {
526 use PicaCommand::*;
527 match command {
528 Connect(stream, sink) => {
529 let _ = self.add_device(stream, sink);
530 }
531 Disconnect(device_handle) => self.disconnect(device_handle),
532 Ranging(device_handle, session_id) => self.ranging(device_handle, session_id),
533 StopRanging(mac_address, session_id) => {
534 self.stop_controlee_ranging(&mac_address, session_id)
535 }
536 UciPacket(device_handle, packet) => self.uci_packet(device_handle, packet),
537 CreateAnchor(mac_address, pica_cmd_rsp_tx) => {
538 self.create_anchor(mac_address, pica_cmd_rsp_tx)
539 }
540 DestroyAnchor(mac_address, pica_cmd_rsp_tx) => {
541 self.destroy_anchor(mac_address, pica_cmd_rsp_tx)
542 }
543 }
544 }
545
546 /// Run the internal pica event loop.
run(mut self) -> Result<()>547 pub async fn run(mut self) -> Result<()> {
548 let Some(mut command_rx) = self.command_rx.take() else {
549 anyhow::bail!("missing pica command receiver")
550 };
551 loop {
552 if let Some(command) = command_rx.recv().await {
553 self.pica_command(command)
554 }
555 }
556 }
557
558 // Handle the in-band StopRanging command sent from controller to the controlee with
559 // corresponding mac_address and session_id.
stop_controlee_ranging(&mut self, mac_address: &MacAddress, session_id: u32)560 fn stop_controlee_ranging(&mut self, mac_address: &MacAddress, session_id: u32) {
561 for device in self.devices.values_mut() {
562 let Some(session) = device.session_mut(session_id) else {
563 continue;
564 };
565
566 if session.app_config.device_mac_address != Some(*mac_address) {
567 continue;
568 }
569
570 if session.session_state() == SessionState::SessionStateActive {
571 session.stop_ranging_task();
572 session.set_state(
573 SessionState::SessionStateIdle,
574 ReasonCode::SessionStoppedDueToInbandSignal,
575 );
576 device.n_active_sessions = device.n_active_sessions.saturating_sub(1);
577 if device.n_active_sessions == 0 {
578 device.set_state(DeviceState::DeviceStateReady);
579 }
580 } else {
581 log::warn!("stop_controlee_ranging: session is not active !");
582 }
583 }
584 }
585
586 #[allow(clippy::map_entry)]
create_anchor( &mut self, mac_address: MacAddress, rsp_tx: oneshot::Sender<Result<Handle, PicaCommandError>>, )587 fn create_anchor(
588 &mut self,
589 mac_address: MacAddress,
590 rsp_tx: oneshot::Sender<Result<Handle, PicaCommandError>>,
591 ) {
592 log::debug!("[_] Create anchor");
593 log::debug!(" mac_address: {}", mac_address);
594
595 let status = if self.get_category(&mac_address).is_some() {
596 Err(PicaCommandError::DeviceAlreadyExists(mac_address))
597 } else {
598 let handle = self.counter;
599 self.counter += 1;
600
601 assert!(self
602 .anchors
603 .insert(
604 mac_address,
605 Anchor {
606 handle,
607 mac_address,
608 },
609 )
610 .is_none());
611
612 Ok(handle)
613 };
614
615 rsp_tx.send(status).unwrap_or_else(|err| {
616 log::error!("Failed to send create-anchor command response: {:?}", err)
617 })
618 }
619
destroy_anchor( &mut self, mac_address: MacAddress, rsp_tx: oneshot::Sender<Result<Handle, PicaCommandError>>, )620 fn destroy_anchor(
621 &mut self,
622 mac_address: MacAddress,
623 rsp_tx: oneshot::Sender<Result<Handle, PicaCommandError>>,
624 ) {
625 log::debug!("[_] Destroy anchor");
626 log::debug!(" mac_address: {}", mac_address);
627
628 let status = match self.anchors.remove(&mac_address) {
629 None => Err(PicaCommandError::DeviceNotFound(mac_address)),
630 Some(anchor) => Ok(anchor.handle),
631 };
632
633 rsp_tx.send(status).unwrap_or_else(|err| {
634 log::error!("Failed to send destroy-anchor command response: {:?}", err)
635 })
636 }
637 }
638
639 /// Run the internal pica event loop.
640 /// As opposed to Pica::run, the context is passed under a mutex, which
641 /// allows synchronous access to the context for device creation.
run(this: &std::sync::Mutex<Pica>) -> Result<()>642 pub async fn run(this: &std::sync::Mutex<Pica>) -> Result<()> {
643 // Extract the mpsc receiver from the Pica context.
644 // The receiver cannot be cloned.
645 let Some(mut command_rx) = this.lock().unwrap().command_rx.take() else {
646 anyhow::bail!("missing pica command receiver");
647 };
648
649 loop {
650 if let Some(command) = command_rx.recv().await {
651 this.lock().unwrap().pica_command(command)
652 }
653 }
654 }