1 // Copyright 2022 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 use anyhow::Result;
16 use clap::Parser;
17 use hyper::service::{make_service_fn, service_fn};
18 use hyper::{body, Body, Request, Response, Server, StatusCode as HttpStatusCode};
19 use serde::{Deserialize, Serialize};
20 use serde_json::error::Category as SerdeErrorCategory;
21 use std::collections::HashMap;
22 use std::convert::Infallible;
23 use std::net::{Ipv4Addr, SocketAddrV4};
24 use std::path::PathBuf;
25 use std::sync::Arc;
26 use std::sync::Mutex;
27 use tokio::net::TcpListener;
28 use tokio::sync::{broadcast, mpsc, oneshot};
29 use tokio::try_join;
30 use tokio_stream::{wrappers::BroadcastStream, StreamExt};
31
32 use pica::{Category, MacAddress, Pica, PicaCommand, PicaCommandError, PicaEvent};
33
34 mod position;
35 use position::Position;
36
37 const DEFAULT_UCI_PORT: u16 = 7000;
38 const DEFAULT_WEB_PORT: u16 = 3000;
39
40 const STATIC_FILES: &[(&str, &str, &str)] = &[
41 ("/", "text/html", include_str!("../../../static/index.html")),
42 (
43 "/openapi",
44 "text/html",
45 include_str!("../../../static/openapi.html"),
46 ),
47 (
48 "/openapi.yaml",
49 "text/yaml",
50 include_str!("../../../static/openapi.yaml"),
51 ),
52 (
53 "/src/components/Map.js",
54 "application/javascript",
55 include_str!("../../../static/src/components/Map.js"),
56 ),
57 (
58 "/src/components/DeviceInfo.js",
59 "application/javascript",
60 include_str!("../../../static/src/components/DeviceInfo.js"),
61 ),
62 (
63 "/src/components/Orientation.js",
64 "application/javascript",
65 include_str!("../../../static/src/components/Orientation.js"),
66 ),
67 ];
68
69 /// Record information about an active device.
70 #[derive(Debug, Serialize, Clone)]
71 struct DeviceInformation {
72 category: pica::Category,
73 mac_address: MacAddress,
74 #[serde(flatten)]
75 position: Position,
76 }
77
78 /// Record information about an active device.
79 #[derive(Clone, Debug, Serialize)]
80 #[serde(untagged)]
81 pub enum Event {
82 DeviceAdded {
83 category: Category,
84 mac_address: MacAddress,
85 #[serde(flatten)]
86 position: Position,
87 },
88 DeviceRemoved {
89 category: Category,
90 mac_address: MacAddress,
91 },
92 DeviceUpdated {
93 category: Category,
94 mac_address: MacAddress,
95 #[serde(flatten)]
96 position: Position,
97 },
98 NeighborUpdated {
99 source_category: Category,
100 source_mac_address: MacAddress,
101 destination_category: Category,
102 destination_mac_address: MacAddress,
103 distance: u16,
104 azimuth: i16,
105 elevation: i8,
106 },
107 }
108
109 /// Record the position of active devices for reference by the
110 /// ranging estimator.
111 #[derive(Clone)]
112 struct Context {
113 devices: Arc<Mutex<HashMap<pica::Handle, DeviceInformation>>>,
114 events: broadcast::Sender<Event>,
115 }
116
117 impl Context {
new() -> Self118 fn new() -> Self {
119 let (events, _) = broadcast::channel(1024);
120 Context {
121 devices: Arc::new(Mutex::new(HashMap::new())),
122 events,
123 }
124 }
125
handle_connection_events( self, mut events: broadcast::Receiver<PicaEvent>, ) -> Result<()>126 async fn handle_connection_events(
127 self,
128 mut events: broadcast::Receiver<PicaEvent>,
129 ) -> Result<()> {
130 loop {
131 match events.recv().await {
132 Ok(PicaEvent::Connected {
133 mac_address,
134 handle,
135 }) => {
136 let mut devices = self.devices.lock().unwrap();
137 devices.insert(
138 handle,
139 DeviceInformation {
140 category: Category::Uci,
141 mac_address,
142 position: Default::default(),
143 },
144 );
145 self.events
146 .send(Event::DeviceAdded {
147 category: Category::Uci,
148 mac_address,
149 position: Default::default(),
150 })
151 .unwrap();
152 }
153 Ok(PicaEvent::Disconnected {
154 mac_address,
155 handle,
156 }) => {
157 let mut devices = self.devices.lock().unwrap();
158 devices.remove(&handle);
159 self.events
160 .send(Event::DeviceRemoved {
161 category: Category::Uci,
162 mac_address,
163 })
164 .unwrap();
165 }
166 Err(err) => anyhow::bail!(err),
167 }
168 }
169 }
170
http_events(&self) -> Response<Body>171 fn http_events(&self) -> Response<Body> {
172 let stream = BroadcastStream::new(self.events.subscribe()).map(|result| {
173 result.map(|event| {
174 format!(
175 "event: {}\ndata: {}\n\n",
176 event.name(),
177 serde_json::to_string(&event).unwrap()
178 )
179 })
180 });
181 Response::builder()
182 .header("content-type", "text/event-stream")
183 .body(Body::wrap_stream(stream))
184 .unwrap()
185 }
186
http_set_position(&self, mac_address: MacAddress, position: Position) -> Response<Body>187 fn http_set_position(&self, mac_address: MacAddress, position: Position) -> Response<Body> {
188 log::info!("set-position({}, {})", mac_address, position);
189
190 let mut devices = self.devices.lock().unwrap();
191 let mut found_device = None;
192 for (_, device) in devices.iter_mut() {
193 if device.mac_address == mac_address {
194 device.position = position;
195 found_device = Some(device.clone());
196 break;
197 }
198 }
199
200 let Some(device) = found_device else {
201 return Response::builder()
202 .status(HttpStatusCode::NOT_FOUND)
203 .body("".into())
204 .unwrap();
205 };
206
207 self.events
208 .send(Event::DeviceUpdated {
209 category: device.category,
210 mac_address,
211 position,
212 })
213 .unwrap();
214
215 for other in devices.values() {
216 if other.mac_address != device.mac_address {
217 let local = device
218 .position
219 .compute_range_azimuth_elevation(&other.position);
220 let remote = other
221 .position
222 .compute_range_azimuth_elevation(&device.position);
223
224 assert!(local.0 == remote.0);
225
226 self.events
227 .send(Event::NeighborUpdated {
228 source_category: device.category,
229 source_mac_address: device.mac_address,
230 destination_category: other.category,
231 destination_mac_address: other.mac_address,
232 distance: local.0,
233 azimuth: local.1,
234 elevation: local.2,
235 })
236 .unwrap();
237
238 let _ = self
239 .events
240 .send(Event::NeighborUpdated {
241 source_category: other.category,
242 source_mac_address: other.mac_address,
243 destination_category: device.category,
244 destination_mac_address: device.mac_address,
245 distance: remote.0,
246 azimuth: remote.1,
247 elevation: remote.2,
248 })
249 .unwrap();
250 }
251 }
252
253 Response::builder()
254 .status(HttpStatusCode::OK)
255 .body("".into())
256 .unwrap()
257 }
258
http_create_anchor( &self, mac_address: MacAddress, position: Position, cmd_tx: mpsc::Sender<PicaCommand>, ) -> Response<Body>259 async fn http_create_anchor(
260 &self,
261 mac_address: MacAddress,
262 position: Position,
263 cmd_tx: mpsc::Sender<PicaCommand>,
264 ) -> Response<Body> {
265 log::info!("create-anchor({}, {})", mac_address, position);
266
267 let (rsp_tx, rsp_rx) = oneshot::channel::<Result<pica::Handle, PicaCommandError>>();
268 cmd_tx
269 .send(PicaCommand::CreateAnchor(mac_address, rsp_tx))
270 .await
271 .unwrap();
272
273 let status = match rsp_rx.await {
274 Ok(Ok(handle)) => {
275 let mut devices = self.devices.lock().unwrap();
276 devices.insert(
277 handle,
278 DeviceInformation {
279 position,
280 mac_address,
281 category: Category::Anchor,
282 },
283 );
284 self.events
285 .send(Event::DeviceAdded {
286 category: Category::Anchor,
287 mac_address,
288 position,
289 })
290 .unwrap();
291 HttpStatusCode::OK
292 }
293 Ok(Err(PicaCommandError::DeviceAlreadyExists(_))) => HttpStatusCode::CONFLICT,
294 Ok(Err(PicaCommandError::DeviceNotFound(_))) => HttpStatusCode::NOT_FOUND,
295 Err(_) => HttpStatusCode::INTERNAL_SERVER_ERROR,
296 };
297
298 Response::builder().status(status).body("".into()).unwrap()
299 }
300
http_destroy_anchor( &self, mac_address: MacAddress, cmd_tx: mpsc::Sender<PicaCommand>, ) -> Response<Body>301 async fn http_destroy_anchor(
302 &self,
303 mac_address: MacAddress,
304 cmd_tx: mpsc::Sender<PicaCommand>,
305 ) -> Response<Body> {
306 log::info!("destroy-anchor({})", mac_address);
307
308 let (rsp_tx, rsp_rx) = oneshot::channel::<Result<pica::Handle, PicaCommandError>>();
309 cmd_tx
310 .send(PicaCommand::DestroyAnchor(mac_address, rsp_tx))
311 .await
312 .unwrap();
313
314 let status = match rsp_rx.await {
315 Ok(Ok(handle)) => {
316 let mut devices = self.devices.lock().unwrap();
317 devices.remove(&handle);
318 self.events
319 .send(Event::DeviceRemoved {
320 category: Category::Anchor,
321 mac_address,
322 })
323 .unwrap();
324 HttpStatusCode::OK
325 }
326 Ok(Err(PicaCommandError::DeviceAlreadyExists(_))) => HttpStatusCode::CONFLICT,
327 Ok(Err(PicaCommandError::DeviceNotFound(_))) => HttpStatusCode::NOT_FOUND,
328 Err(_) => HttpStatusCode::INTERNAL_SERVER_ERROR,
329 };
330
331 Response::builder().status(status).body("".into()).unwrap()
332 }
333
http_get_state(&self) -> Response<Body>334 fn http_get_state(&self) -> Response<Body> {
335 log::info!("get-state()");
336
337 #[derive(Serialize)]
338 struct GetStateResponse {
339 devices: Vec<DeviceInformation>,
340 }
341
342 let devices = self.devices.lock().unwrap();
343 let response = GetStateResponse {
344 devices: devices.values().cloned().collect::<Vec<_>>(),
345 };
346 let body = serde_json::to_string(&response).unwrap();
347 Response::builder()
348 .status(HttpStatusCode::OK)
349 .body(body.into())
350 .unwrap()
351 }
352 }
353
354 impl pica::RangingEstimator for Context {
estimate( &self, left: &pica::Handle, right: &pica::Handle, ) -> Option<pica::RangingMeasurement>355 fn estimate(
356 &self,
357 left: &pica::Handle,
358 right: &pica::Handle,
359 ) -> Option<pica::RangingMeasurement> {
360 let devices = self.devices.lock().ok()?;
361 let left_pos = devices.get(left)?.position;
362 let right_pos = devices.get(right)?.position;
363 let (range, azimuth, elevation) = left_pos.compute_range_azimuth_elevation(&right_pos);
364 Some(pica::RangingMeasurement {
365 range,
366 azimuth,
367 elevation,
368 })
369 }
370 }
371
372 #[derive(Deserialize)]
373 struct PositionBody {
374 x: i16,
375 y: i16,
376 z: i16,
377 yaw: i16,
378 pitch: i8,
379 roll: i16,
380 }
381
382 macro_rules! position {
383 ($body: ident) => {
384 position!($body, false)
385 };
386 ($body: ident, $mandatory: ident) => {
387 match serde_json::from_slice::<PositionBody>(&$body) {
388 Ok(body) => Position::new(body.x, body.y, body.z, body.yaw, body.pitch, body.roll),
389 Err(err) => {
390 if !$mandatory && err.classify() == SerdeErrorCategory::Eof {
391 Position::default()
392 } else {
393 let reason = format!("Error while deserializing position: {}", err);
394 log::error!("{}", reason);
395 return Ok(Response::builder().status(406).body(reason.into()).unwrap());
396 }
397 }
398 }
399 };
400 }
401
402 macro_rules! mac_address {
403 ($mac_address: ident) => {
404 match MacAddress::new($mac_address.to_string()) {
405 Ok(mac_address) => mac_address,
406 Err(err) => {
407 let reason = format!("Error mac_address: {}", err);
408 log::error!("{}", reason);
409 return Ok(Response::builder().status(406).body(reason.into()).unwrap());
410 }
411 }
412 };
413 }
414
415 impl Event {
name(&self) -> &'static str416 fn name(&self) -> &'static str {
417 match self {
418 Event::DeviceAdded { .. } => "device-added",
419 Event::DeviceRemoved { .. } => "device-removed",
420 Event::DeviceUpdated { .. } => "device-updated",
421 Event::NeighborUpdated { .. } => "neighbor-updated",
422 }
423 }
424 }
425
http_request( mut req: Request<Body>, cmd_tx: mpsc::Sender<PicaCommand>, context: Context, ) -> Result<Response<Body>, Infallible>426 async fn http_request(
427 mut req: Request<Body>,
428 cmd_tx: mpsc::Sender<PicaCommand>,
429 context: Context,
430 ) -> Result<Response<Body>, Infallible> {
431 let static_file = STATIC_FILES
432 .iter()
433 .find(|(path, _, _)| req.uri().path() == *path);
434
435 if let Some((_, mime, content)) = static_file {
436 return Ok(Response::builder()
437 .header("content-type", *mime)
438 .body((*content).into())
439 .unwrap());
440 }
441
442 let body = body::to_bytes(req.body_mut()).await.unwrap();
443 let response = match req
444 .uri_mut()
445 .path()
446 .trim_start_matches('/')
447 .split('/')
448 .collect::<Vec<_>>()[..]
449 {
450 ["events"] => context.http_events(),
451 ["init-uci-device", mac_address] => {
452 context.http_set_position(mac_address!(mac_address), position!(body))
453 }
454 ["set-position", mac_address] => {
455 context.http_set_position(mac_address!(mac_address), position!(body))
456 }
457 ["create-anchor", mac_address] => {
458 context
459 .http_create_anchor(mac_address!(mac_address), position!(body), cmd_tx)
460 .await
461 }
462 ["destroy-anchor", mac_address] => {
463 context
464 .http_destroy_anchor(mac_address!(mac_address), cmd_tx)
465 .await
466 }
467 ["get-state"] => context.http_get_state(),
468
469 _ => Response::builder()
470 .status(HttpStatusCode::NOT_FOUND)
471 .body("".into())
472 .unwrap(),
473 };
474
475 Ok(response)
476 }
477
serve(context: Context, tx: mpsc::Sender<PicaCommand>, web_port: u16) -> Result<()>478 async fn serve(context: Context, tx: mpsc::Sender<PicaCommand>, web_port: u16) -> Result<()> {
479 let addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, web_port);
480 let make_svc = make_service_fn(move |_conn| {
481 let tx = tx.clone();
482 let local_context = context.clone();
483 async move {
484 Ok::<_, Infallible>(service_fn(move |req| {
485 http_request(req, tx.clone(), local_context.clone())
486 }))
487 }
488 });
489
490 let server = Server::bind(&addr.into()).serve(make_svc);
491
492 log::info!("Pica: Web server started on http://0.0.0.0:{}", web_port);
493
494 server.await?;
495 Ok(())
496 }
497
listen(tx: mpsc::Sender<PicaCommand>, uci_port: u16) -> Result<()>498 async fn listen(tx: mpsc::Sender<PicaCommand>, uci_port: u16) -> Result<()> {
499 let uci_socket = SocketAddrV4::new(Ipv4Addr::LOCALHOST, uci_port);
500 let uci_listener = TcpListener::bind(uci_socket).await?;
501 log::info!("Pica: Listening on: {}", uci_port);
502
503 loop {
504 let (socket, addr) = uci_listener.accept().await?;
505 log::info!("Uwb host addr: {}", addr);
506
507 let (read_half, write_half) = socket.into_split();
508 let stream = Box::pin(futures::stream::unfold(read_half, pica::packets::uci::read));
509 let sink = Box::pin(futures::sink::unfold(write_half, pica::packets::uci::write));
510
511 tx.send(PicaCommand::Connect(stream, sink))
512 .await
513 .map_err(|_| anyhow::anyhow!("pica command stream closed"))?
514 }
515 }
516
517 #[derive(Parser, Debug)]
518 #[command(name = "pica", about = "Virtual UWB subsystem")]
519 struct Args {
520 /// Output directory for storing .pcapng traces.
521 /// If provided, .pcapng traces of client connections are automatically
522 /// saved under the name `device-{handle}.pcapng`.
523 #[arg(short, long, value_name = "DIR")]
524 pcapng_dir: Option<PathBuf>,
525 /// Configure the TCP port for the UCI server.
526 #[arg(short, long, value_name = "PORT", default_value_t = DEFAULT_UCI_PORT)]
527 uci_port: u16,
528 /// Configure the HTTP port for the web interface.
529 #[arg(short, long, value_name = "PORT", default_value_t = DEFAULT_WEB_PORT)]
530 web_port: u16,
531 }
532
533 #[tokio::main]
main() -> Result<()>534 async fn main() -> Result<()> {
535 let args = Args::parse();
536 assert_ne!(
537 args.uci_port, args.web_port,
538 "UCI port and WEB port must be different."
539 );
540
541 let context = Context::new();
542
543 let pica = Pica::new(Box::new(context.clone()), args.pcapng_dir);
544 let cmd_tx = pica.commands();
545 let events_rx = pica.events();
546
547 try_join!(
548 pica.run(),
549 listen(cmd_tx.clone(), args.uci_port),
550 serve(context.clone(), cmd_tx.clone(), args.web_port),
551 context.handle_connection_events(events_rx),
552 )?;
553
554 Ok(())
555 }
556