1 // Copyright 2022 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 use anyhow::Result;
16 use clap::Parser;
17 use hyper::service::{make_service_fn, service_fn};
18 use hyper::{body, Body, Request, Response, Server, StatusCode as HttpStatusCode};
19 use serde::{Deserialize, Serialize};
20 use serde_json::error::Category as SerdeErrorCategory;
21 use std::collections::HashMap;
22 use std::convert::Infallible;
23 use std::net::{Ipv4Addr, SocketAddrV4};
24 use std::path::PathBuf;
25 use std::sync::Arc;
26 use std::sync::Mutex;
27 use tokio::net::TcpListener;
28 use tokio::sync::{broadcast, mpsc, oneshot};
29 use tokio::try_join;
30 use tokio_stream::{wrappers::BroadcastStream, StreamExt};
31 
32 use pica::{Category, MacAddress, Pica, PicaCommand, PicaCommandError, PicaEvent};
33 
34 mod position;
35 use position::Position;
36 
37 const DEFAULT_UCI_PORT: u16 = 7000;
38 const DEFAULT_WEB_PORT: u16 = 3000;
39 
40 const STATIC_FILES: &[(&str, &str, &str)] = &[
41     ("/", "text/html", include_str!("../../../static/index.html")),
42     (
43         "/openapi",
44         "text/html",
45         include_str!("../../../static/openapi.html"),
46     ),
47     (
48         "/openapi.yaml",
49         "text/yaml",
50         include_str!("../../../static/openapi.yaml"),
51     ),
52     (
53         "/src/components/Map.js",
54         "application/javascript",
55         include_str!("../../../static/src/components/Map.js"),
56     ),
57     (
58         "/src/components/DeviceInfo.js",
59         "application/javascript",
60         include_str!("../../../static/src/components/DeviceInfo.js"),
61     ),
62     (
63         "/src/components/Orientation.js",
64         "application/javascript",
65         include_str!("../../../static/src/components/Orientation.js"),
66     ),
67 ];
68 
69 /// Record information about an active device.
70 #[derive(Debug, Serialize, Clone)]
71 struct DeviceInformation {
72     category: pica::Category,
73     mac_address: MacAddress,
74     #[serde(flatten)]
75     position: Position,
76 }
77 
78 /// Record information about an active device.
79 #[derive(Clone, Debug, Serialize)]
80 #[serde(untagged)]
81 pub enum Event {
82     DeviceAdded {
83         category: Category,
84         mac_address: MacAddress,
85         #[serde(flatten)]
86         position: Position,
87     },
88     DeviceRemoved {
89         category: Category,
90         mac_address: MacAddress,
91     },
92     DeviceUpdated {
93         category: Category,
94         mac_address: MacAddress,
95         #[serde(flatten)]
96         position: Position,
97     },
98     NeighborUpdated {
99         source_category: Category,
100         source_mac_address: MacAddress,
101         destination_category: Category,
102         destination_mac_address: MacAddress,
103         distance: u16,
104         azimuth: i16,
105         elevation: i8,
106     },
107 }
108 
109 /// Record the position of active devices for reference by the
110 /// ranging estimator.
111 #[derive(Clone)]
112 struct Context {
113     devices: Arc<Mutex<HashMap<pica::Handle, DeviceInformation>>>,
114     events: broadcast::Sender<Event>,
115 }
116 
117 impl Context {
new() -> Self118     fn new() -> Self {
119         let (events, _) = broadcast::channel(1024);
120         Context {
121             devices: Arc::new(Mutex::new(HashMap::new())),
122             events,
123         }
124     }
125 
handle_connection_events( self, mut events: broadcast::Receiver<PicaEvent>, ) -> Result<()>126     async fn handle_connection_events(
127         self,
128         mut events: broadcast::Receiver<PicaEvent>,
129     ) -> Result<()> {
130         loop {
131             match events.recv().await {
132                 Ok(PicaEvent::Connected {
133                     mac_address,
134                     handle,
135                 }) => {
136                     let mut devices = self.devices.lock().unwrap();
137                     devices.insert(
138                         handle,
139                         DeviceInformation {
140                             category: Category::Uci,
141                             mac_address,
142                             position: Default::default(),
143                         },
144                     );
145                     self.events
146                         .send(Event::DeviceAdded {
147                             category: Category::Uci,
148                             mac_address,
149                             position: Default::default(),
150                         })
151                         .unwrap();
152                 }
153                 Ok(PicaEvent::Disconnected {
154                     mac_address,
155                     handle,
156                 }) => {
157                     let mut devices = self.devices.lock().unwrap();
158                     devices.remove(&handle);
159                     self.events
160                         .send(Event::DeviceRemoved {
161                             category: Category::Uci,
162                             mac_address,
163                         })
164                         .unwrap();
165                 }
166                 Err(err) => anyhow::bail!(err),
167             }
168         }
169     }
170 
http_events(&self) -> Response<Body>171     fn http_events(&self) -> Response<Body> {
172         let stream = BroadcastStream::new(self.events.subscribe()).map(|result| {
173             result.map(|event| {
174                 format!(
175                     "event: {}\ndata: {}\n\n",
176                     event.name(),
177                     serde_json::to_string(&event).unwrap()
178                 )
179             })
180         });
181         Response::builder()
182             .header("content-type", "text/event-stream")
183             .body(Body::wrap_stream(stream))
184             .unwrap()
185     }
186 
http_set_position(&self, mac_address: MacAddress, position: Position) -> Response<Body>187     fn http_set_position(&self, mac_address: MacAddress, position: Position) -> Response<Body> {
188         log::info!("set-position({}, {})", mac_address, position);
189 
190         let mut devices = self.devices.lock().unwrap();
191         let mut found_device = None;
192         for (_, device) in devices.iter_mut() {
193             if device.mac_address == mac_address {
194                 device.position = position;
195                 found_device = Some(device.clone());
196                 break;
197             }
198         }
199 
200         let Some(device) = found_device else {
201             return Response::builder()
202                 .status(HttpStatusCode::NOT_FOUND)
203                 .body("".into())
204                 .unwrap();
205         };
206 
207         self.events
208             .send(Event::DeviceUpdated {
209                 category: device.category,
210                 mac_address,
211                 position,
212             })
213             .unwrap();
214 
215         for other in devices.values() {
216             if other.mac_address != device.mac_address {
217                 let local = device
218                     .position
219                     .compute_range_azimuth_elevation(&other.position);
220                 let remote = other
221                     .position
222                     .compute_range_azimuth_elevation(&device.position);
223 
224                 assert!(local.0 == remote.0);
225 
226                 self.events
227                     .send(Event::NeighborUpdated {
228                         source_category: device.category,
229                         source_mac_address: device.mac_address,
230                         destination_category: other.category,
231                         destination_mac_address: other.mac_address,
232                         distance: local.0,
233                         azimuth: local.1,
234                         elevation: local.2,
235                     })
236                     .unwrap();
237 
238                 let _ = self
239                     .events
240                     .send(Event::NeighborUpdated {
241                         source_category: other.category,
242                         source_mac_address: other.mac_address,
243                         destination_category: device.category,
244                         destination_mac_address: device.mac_address,
245                         distance: remote.0,
246                         azimuth: remote.1,
247                         elevation: remote.2,
248                     })
249                     .unwrap();
250             }
251         }
252 
253         Response::builder()
254             .status(HttpStatusCode::OK)
255             .body("".into())
256             .unwrap()
257     }
258 
http_create_anchor( &self, mac_address: MacAddress, position: Position, cmd_tx: mpsc::Sender<PicaCommand>, ) -> Response<Body>259     async fn http_create_anchor(
260         &self,
261         mac_address: MacAddress,
262         position: Position,
263         cmd_tx: mpsc::Sender<PicaCommand>,
264     ) -> Response<Body> {
265         log::info!("create-anchor({}, {})", mac_address, position);
266 
267         let (rsp_tx, rsp_rx) = oneshot::channel::<Result<pica::Handle, PicaCommandError>>();
268         cmd_tx
269             .send(PicaCommand::CreateAnchor(mac_address, rsp_tx))
270             .await
271             .unwrap();
272 
273         let status = match rsp_rx.await {
274             Ok(Ok(handle)) => {
275                 let mut devices = self.devices.lock().unwrap();
276                 devices.insert(
277                     handle,
278                     DeviceInformation {
279                         position,
280                         mac_address,
281                         category: Category::Anchor,
282                     },
283                 );
284                 self.events
285                     .send(Event::DeviceAdded {
286                         category: Category::Anchor,
287                         mac_address,
288                         position,
289                     })
290                     .unwrap();
291                 HttpStatusCode::OK
292             }
293             Ok(Err(PicaCommandError::DeviceAlreadyExists(_))) => HttpStatusCode::CONFLICT,
294             Ok(Err(PicaCommandError::DeviceNotFound(_))) => HttpStatusCode::NOT_FOUND,
295             Err(_) => HttpStatusCode::INTERNAL_SERVER_ERROR,
296         };
297 
298         Response::builder().status(status).body("".into()).unwrap()
299     }
300 
http_destroy_anchor( &self, mac_address: MacAddress, cmd_tx: mpsc::Sender<PicaCommand>, ) -> Response<Body>301     async fn http_destroy_anchor(
302         &self,
303         mac_address: MacAddress,
304         cmd_tx: mpsc::Sender<PicaCommand>,
305     ) -> Response<Body> {
306         log::info!("destroy-anchor({})", mac_address);
307 
308         let (rsp_tx, rsp_rx) = oneshot::channel::<Result<pica::Handle, PicaCommandError>>();
309         cmd_tx
310             .send(PicaCommand::DestroyAnchor(mac_address, rsp_tx))
311             .await
312             .unwrap();
313 
314         let status = match rsp_rx.await {
315             Ok(Ok(handle)) => {
316                 let mut devices = self.devices.lock().unwrap();
317                 devices.remove(&handle);
318                 self.events
319                     .send(Event::DeviceRemoved {
320                         category: Category::Anchor,
321                         mac_address,
322                     })
323                     .unwrap();
324                 HttpStatusCode::OK
325             }
326             Ok(Err(PicaCommandError::DeviceAlreadyExists(_))) => HttpStatusCode::CONFLICT,
327             Ok(Err(PicaCommandError::DeviceNotFound(_))) => HttpStatusCode::NOT_FOUND,
328             Err(_) => HttpStatusCode::INTERNAL_SERVER_ERROR,
329         };
330 
331         Response::builder().status(status).body("".into()).unwrap()
332     }
333 
http_get_state(&self) -> Response<Body>334     fn http_get_state(&self) -> Response<Body> {
335         log::info!("get-state()");
336 
337         #[derive(Serialize)]
338         struct GetStateResponse {
339             devices: Vec<DeviceInformation>,
340         }
341 
342         let devices = self.devices.lock().unwrap();
343         let response = GetStateResponse {
344             devices: devices.values().cloned().collect::<Vec<_>>(),
345         };
346         let body = serde_json::to_string(&response).unwrap();
347         Response::builder()
348             .status(HttpStatusCode::OK)
349             .body(body.into())
350             .unwrap()
351     }
352 }
353 
354 impl pica::RangingEstimator for Context {
estimate( &self, left: &pica::Handle, right: &pica::Handle, ) -> Option<pica::RangingMeasurement>355     fn estimate(
356         &self,
357         left: &pica::Handle,
358         right: &pica::Handle,
359     ) -> Option<pica::RangingMeasurement> {
360         let devices = self.devices.lock().ok()?;
361         let left_pos = devices.get(left)?.position;
362         let right_pos = devices.get(right)?.position;
363         let (range, azimuth, elevation) = left_pos.compute_range_azimuth_elevation(&right_pos);
364         Some(pica::RangingMeasurement {
365             range,
366             azimuth,
367             elevation,
368         })
369     }
370 }
371 
372 #[derive(Deserialize)]
373 struct PositionBody {
374     x: i16,
375     y: i16,
376     z: i16,
377     yaw: i16,
378     pitch: i8,
379     roll: i16,
380 }
381 
382 macro_rules! position {
383     ($body: ident) => {
384         position!($body, false)
385     };
386     ($body: ident, $mandatory: ident) => {
387         match serde_json::from_slice::<PositionBody>(&$body) {
388             Ok(body) => Position::new(body.x, body.y, body.z, body.yaw, body.pitch, body.roll),
389             Err(err) => {
390                 if !$mandatory && err.classify() == SerdeErrorCategory::Eof {
391                     Position::default()
392                 } else {
393                     let reason = format!("Error while deserializing position: {}", err);
394                     log::error!("{}", reason);
395                     return Ok(Response::builder().status(406).body(reason.into()).unwrap());
396                 }
397             }
398         }
399     };
400 }
401 
402 macro_rules! mac_address {
403     ($mac_address: ident) => {
404         match MacAddress::new($mac_address.to_string()) {
405             Ok(mac_address) => mac_address,
406             Err(err) => {
407                 let reason = format!("Error mac_address: {}", err);
408                 log::error!("{}", reason);
409                 return Ok(Response::builder().status(406).body(reason.into()).unwrap());
410             }
411         }
412     };
413 }
414 
415 impl Event {
name(&self) -> &'static str416     fn name(&self) -> &'static str {
417         match self {
418             Event::DeviceAdded { .. } => "device-added",
419             Event::DeviceRemoved { .. } => "device-removed",
420             Event::DeviceUpdated { .. } => "device-updated",
421             Event::NeighborUpdated { .. } => "neighbor-updated",
422         }
423     }
424 }
425 
http_request( mut req: Request<Body>, cmd_tx: mpsc::Sender<PicaCommand>, context: Context, ) -> Result<Response<Body>, Infallible>426 async fn http_request(
427     mut req: Request<Body>,
428     cmd_tx: mpsc::Sender<PicaCommand>,
429     context: Context,
430 ) -> Result<Response<Body>, Infallible> {
431     let static_file = STATIC_FILES
432         .iter()
433         .find(|(path, _, _)| req.uri().path() == *path);
434 
435     if let Some((_, mime, content)) = static_file {
436         return Ok(Response::builder()
437             .header("content-type", *mime)
438             .body((*content).into())
439             .unwrap());
440     }
441 
442     let body = body::to_bytes(req.body_mut()).await.unwrap();
443     let response = match req
444         .uri_mut()
445         .path()
446         .trim_start_matches('/')
447         .split('/')
448         .collect::<Vec<_>>()[..]
449     {
450         ["events"] => context.http_events(),
451         ["init-uci-device", mac_address] => {
452             context.http_set_position(mac_address!(mac_address), position!(body))
453         }
454         ["set-position", mac_address] => {
455             context.http_set_position(mac_address!(mac_address), position!(body))
456         }
457         ["create-anchor", mac_address] => {
458             context
459                 .http_create_anchor(mac_address!(mac_address), position!(body), cmd_tx)
460                 .await
461         }
462         ["destroy-anchor", mac_address] => {
463             context
464                 .http_destroy_anchor(mac_address!(mac_address), cmd_tx)
465                 .await
466         }
467         ["get-state"] => context.http_get_state(),
468 
469         _ => Response::builder()
470             .status(HttpStatusCode::NOT_FOUND)
471             .body("".into())
472             .unwrap(),
473     };
474 
475     Ok(response)
476 }
477 
serve(context: Context, tx: mpsc::Sender<PicaCommand>, web_port: u16) -> Result<()>478 async fn serve(context: Context, tx: mpsc::Sender<PicaCommand>, web_port: u16) -> Result<()> {
479     let addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, web_port);
480     let make_svc = make_service_fn(move |_conn| {
481         let tx = tx.clone();
482         let local_context = context.clone();
483         async move {
484             Ok::<_, Infallible>(service_fn(move |req| {
485                 http_request(req, tx.clone(), local_context.clone())
486             }))
487         }
488     });
489 
490     let server = Server::bind(&addr.into()).serve(make_svc);
491 
492     log::info!("Pica: Web server started on http://0.0.0.0:{}", web_port);
493 
494     server.await?;
495     Ok(())
496 }
497 
listen(tx: mpsc::Sender<PicaCommand>, uci_port: u16) -> Result<()>498 async fn listen(tx: mpsc::Sender<PicaCommand>, uci_port: u16) -> Result<()> {
499     let uci_socket = SocketAddrV4::new(Ipv4Addr::LOCALHOST, uci_port);
500     let uci_listener = TcpListener::bind(uci_socket).await?;
501     log::info!("Pica: Listening on: {}", uci_port);
502 
503     loop {
504         let (socket, addr) = uci_listener.accept().await?;
505         log::info!("Uwb host addr: {}", addr);
506 
507         let (read_half, write_half) = socket.into_split();
508         let stream = Box::pin(futures::stream::unfold(read_half, pica::packets::uci::read));
509         let sink = Box::pin(futures::sink::unfold(write_half, pica::packets::uci::write));
510 
511         tx.send(PicaCommand::Connect(stream, sink))
512             .await
513             .map_err(|_| anyhow::anyhow!("pica command stream closed"))?
514     }
515 }
516 
517 #[derive(Parser, Debug)]
518 #[command(name = "pica", about = "Virtual UWB subsystem")]
519 struct Args {
520     /// Output directory for storing .pcapng traces.
521     /// If provided, .pcapng traces of client connections are automatically
522     /// saved under the name `device-{handle}.pcapng`.
523     #[arg(short, long, value_name = "DIR")]
524     pcapng_dir: Option<PathBuf>,
525     /// Configure the TCP port for the UCI server.
526     #[arg(short, long, value_name = "PORT", default_value_t = DEFAULT_UCI_PORT)]
527     uci_port: u16,
528     /// Configure the HTTP port for the web interface.
529     #[arg(short, long, value_name = "PORT", default_value_t = DEFAULT_WEB_PORT)]
530     web_port: u16,
531 }
532 
533 #[tokio::main]
main() -> Result<()>534 async fn main() -> Result<()> {
535     let args = Args::parse();
536     assert_ne!(
537         args.uci_port, args.web_port,
538         "UCI port and WEB port must be different."
539     );
540 
541     let context = Context::new();
542 
543     let pica = Pica::new(Box::new(context.clone()), args.pcapng_dir);
544     let cmd_tx = pica.commands();
545     let events_rx = pica.events();
546 
547     try_join!(
548         pica.run(),
549         listen(cmd_tx.clone(), args.uci_port),
550         serve(context.clone(), cmd_tx.clone(), args.web_port),
551         context.handle_connection_events(events_rx),
552     )?;
553 
554     Ok(())
555 }
556