xref: /aosp_15_r20/system/nfc/tools/casimir/src/main-grpc.rs (revision 7eba2f3b06c51ae21384f6a4f14577b668a869b3)
1 // Copyright 2023, The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 //! NFCC and RF emulator.
16 
17 use anyhow::Result;
18 use argh::FromArgs;
19 use log::{error, info, warn};
20 use std::collections::HashMap;
21 use std::future::Future;
22 use std::net::{Ipv4Addr, SocketAddrV4};
23 use std::pin::{pin, Pin};
24 use std::task::Poll;
25 use tokio::io::AsyncReadExt;
26 use tokio::io::AsyncWriteExt;
27 use tokio::net::{tcp, TcpListener, TcpStream};
28 use tokio::select;
29 use tokio::sync::mpsc;
30 
31 pub mod controller;
32 pub mod packets;
33 mod proto;
34 
35 use controller::Controller;
36 use packets::{nci, rf};
37 use proto::{casimir, casimir_grpc};
38 
39 const MAX_DEVICES: usize = 128;
40 type Id = u16;
41 
42 /// Read RF Control and Data packets received on the RF transport.
43 /// Performs recombination of the segmented packets.
44 pub struct RfReader {
45     socket: tcp::OwnedReadHalf,
46 }
47 
48 /// Write RF Control and Data packets received to the RF transport.
49 /// Performs segmentation of the packets.
50 pub struct RfWriter {
51     socket: tcp::OwnedWriteHalf,
52 }
53 
54 impl RfReader {
55     /// Create a new RF reader from the TCP socket half.
new(socket: tcp::OwnedReadHalf) -> Self56     pub fn new(socket: tcp::OwnedReadHalf) -> Self {
57         RfReader { socket }
58     }
59 
60     /// Read a single RF packet from the reader.
61     /// RF packets are framed with the byte size encoded as little-endian u16.
read(&mut self) -> Result<Vec<u8>>62     pub async fn read(&mut self) -> Result<Vec<u8>> {
63         const HEADER_SIZE: usize = 2;
64         let mut header_bytes = [0; HEADER_SIZE];
65 
66         // Read the header bytes.
67         self.socket.read_exact(&mut header_bytes[0..HEADER_SIZE]).await?;
68         let packet_length = u16::from_le_bytes(header_bytes) as usize;
69 
70         // Read the packet data.
71         let mut packet_bytes = vec![0; packet_length];
72         self.socket.read_exact(&mut packet_bytes).await?;
73 
74         Ok(packet_bytes)
75     }
76 }
77 
78 impl RfWriter {
79     /// Create a new RF writer from the TCP socket half.
new(socket: tcp::OwnedWriteHalf) -> Self80     pub fn new(socket: tcp::OwnedWriteHalf) -> Self {
81         RfWriter { socket }
82     }
83 
84     /// Write a single RF packet to the writer.
85     /// RF packets are framed with the byte size encoded as little-endian u16.
write(&mut self, packet: &[u8]) -> Result<()>86     async fn write(&mut self, packet: &[u8]) -> Result<()> {
87         let packet_length: u16 = packet.len().try_into()?;
88         let header_bytes = packet_length.to_le_bytes();
89 
90         // Write the header bytes.
91         self.socket.write_all(&header_bytes).await?;
92 
93         // Write the packet data.
94         self.socket.write_all(packet).await?;
95 
96         Ok(())
97     }
98 }
99 
100 /// Identify the device type.
101 pub enum DeviceType {
102     Nci,
103     Rf,
104 }
105 
106 /// Represent shared contextual information.
107 pub struct DeviceInformation {
108     id: Id,
109     position: u32,
110     r#type: DeviceType,
111 }
112 
113 /// Represent a generic NFC device interacting on the RF transport.
114 /// Devices communicate together through the RF mpsc channel.
115 /// NFCCs are an instance of Device.
116 pub struct Device {
117     // Unique identifier associated with the device.
118     // The identifier is assured never to be reused in the lifetime of
119     // the emulator.
120     id: Id,
121     // Async task running the controller main loop.
122     task: Pin<Box<dyn Future<Output = Result<()>>>>,
123     // Channel for injecting RF data packets into the controller instance.
124     rf_tx: mpsc::UnboundedSender<rf::RfPacket>,
125 }
126 
127 impl Device {
nci( id: Id, socket: TcpStream, controller_rf_tx: mpsc::UnboundedSender<rf::RfPacket>, ) -> Device128     fn nci(
129         id: Id,
130         socket: TcpStream,
131         controller_rf_tx: mpsc::UnboundedSender<rf::RfPacket>,
132     ) -> Device {
133         let (rf_tx, rf_rx) = mpsc::unbounded_channel();
134         Device {
135             id,
136             rf_tx,
137             task: Box::pin(async move {
138                 let (nci_rx, nci_tx) = socket.into_split();
139                 Controller::run(
140                     id,
141                     pin!(nci::Reader::new(nci_rx).into_stream()),
142                     nci::Writer::new(nci_tx),
143                     rf_rx,
144                     controller_rf_tx,
145                 )
146                 .await
147             }),
148         }
149     }
150 
rf( id: Id, socket: TcpStream, controller_rf_tx: mpsc::UnboundedSender<rf::RfPacket>, ) -> Device151     fn rf(
152         id: Id,
153         socket: TcpStream,
154         controller_rf_tx: mpsc::UnboundedSender<rf::RfPacket>,
155     ) -> Device {
156         let (rf_tx, mut rf_rx) = mpsc::unbounded_channel();
157         Device {
158             id,
159             rf_tx,
160             task: Box::pin(async move {
161                 let (socket_rx, socket_tx) = socket.into_split();
162                 let mut rf_reader = RfReader::new(socket_rx);
163                 let mut rf_writer = RfWriter::new(socket_tx);
164 
165                 let result: Result<((), ())> = futures::future::try_join(
166                     async {
167                         loop {
168                             // Replace the sender identifier in the packet
169                             // with the assigned number for the RF connection.
170                             // TODO: currently the generated API does not allow
171                             // modifying the parsed fields so the change needs to be
172                             // applied to the unparsed packet.
173                             let mut packet_bytes = rf_reader.read().await?;
174                             packet_bytes[0..2].copy_from_slice(&id.to_le_bytes());
175 
176                             // Parse the input packet.
177                             let packet = rf::RfPacket::parse(&packet_bytes)?;
178 
179                             // Forward the packet to other devices.
180                             controller_rf_tx.send(packet)?;
181                         }
182                     },
183                     async {
184                         loop {
185                             // Forward the packet to the socket connection.
186                             use pdl_runtime::Packet;
187                             let packet = rf_rx
188                                 .recv()
189                                 .await
190                                 .ok_or(anyhow::anyhow!("rf_rx channel closed"))?;
191                             rf_writer.write(&packet.to_vec()).await?;
192                         }
193                     },
194                 )
195                 .await;
196 
197                 result?;
198                 Ok(())
199             }),
200         }
201     }
202 }
203 
204 struct Scene {
205     next_id: u16,
206     waker: Option<std::task::Waker>,
207     devices: [Option<Device>; MAX_DEVICES],
208     context: std::sync::Arc<std::sync::Mutex<HashMap<Id, DeviceInformation>>>,
209 }
210 
211 impl Scene {
new() -> Scene212     fn new() -> Scene {
213         const NONE: Option<Device> = None;
214         Scene {
215             next_id: 0,
216             waker: None,
217             devices: [NONE; MAX_DEVICES],
218             context: std::sync::Arc::new(std::sync::Mutex::new(HashMap::new())),
219         }
220     }
221 
wake(&mut self)222     fn wake(&mut self) {
223         if let Some(waker) = self.waker.take() {
224             waker.wake()
225         }
226     }
227 
add_device(&mut self, builder: impl FnOnce(Id) -> Device) -> Result<Id>228     fn add_device(&mut self, builder: impl FnOnce(Id) -> Device) -> Result<Id> {
229         for n in 0..MAX_DEVICES {
230             if self.devices[n].is_none() {
231                 let id = self.next_id;
232                 self.devices[n] = Some(builder(id));
233                 self.next_id += 1;
234                 self.wake();
235                 return Ok(id);
236             }
237         }
238         Err(anyhow::anyhow!("max number of connections reached"))
239     }
240 
disconnect(&mut self, n: usize)241     fn disconnect(&mut self, n: usize) {
242         let id = self.devices[n].as_ref().unwrap().id;
243         self.devices[n] = None;
244         self.context.lock().unwrap().remove(&id);
245         for other_n in 0..MAX_DEVICES {
246             let Some(ref device) = self.devices[other_n] else { continue };
247             assert!(n != other_n);
248             device
249                 .rf_tx
250                 .send(
251                     rf::DeactivateNotificationBuilder {
252                         type_: rf::DeactivateType::Discovery,
253                         reason: rf::DeactivateReason::RfLinkLoss,
254                         sender: id,
255                         receiver: device.id,
256                         technology: rf::Technology::NfcA,
257                         protocol: rf::Protocol::Undetermined,
258                     }
259                     .into(),
260                 )
261                 .expect("failed to send deactive notification")
262         }
263     }
264 
send(&self, packet: &rf::RfPacket) -> Result<()>265     fn send(&self, packet: &rf::RfPacket) -> Result<()> {
266         let context = self.context.lock().unwrap();
267         for n in 0..MAX_DEVICES {
268             let Some(ref device) = self.devices[n] else { continue };
269             if packet.get_sender() != device.id
270                 && (packet.get_receiver() == u16::MAX || packet.get_receiver() == device.id)
271                 && context.get(&device.id).map(|info| info.position)
272                     == context.get(&packet.get_sender()).map(|info| info.position)
273             {
274                 device.rf_tx.send(packet.to_owned())?;
275             }
276         }
277 
278         Ok(())
279     }
280 }
281 
282 impl Future for Scene {
283     type Output = ();
284 
poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<()>285     fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<()> {
286         for n in 0..MAX_DEVICES {
287             let dropped = match self.devices[n] {
288                 Some(ref mut device) => match device.task.as_mut().poll(cx) {
289                     Poll::Ready(Ok(_)) => unreachable!(),
290                     Poll::Ready(Err(err)) => {
291                         warn!("dropping device {}: {}", n, err);
292                         true
293                     }
294                     Poll::Pending => false,
295                 },
296                 None => false,
297             };
298             if dropped {
299                 self.disconnect(n)
300             }
301         }
302         self.waker = Some(cx.waker().clone());
303         Poll::Pending
304     }
305 }
306 
307 #[derive(Clone)]
308 struct Service {
309     context: std::sync::Arc<std::sync::Mutex<HashMap<Id, DeviceInformation>>>,
310 }
311 
312 impl From<&DeviceInformation> for casimir::Device {
from(info: &DeviceInformation) -> Self313     fn from(info: &DeviceInformation) -> Self {
314         let mut device = casimir::Device::new();
315         device.set_device_id(info.id as u32);
316         device.set_position(info.position);
317         match info.r#type {
318             DeviceType::Nci => device.set_nci(Default::default()),
319             DeviceType::Rf => device.set_rf(Default::default()),
320         }
321         device
322     }
323 }
324 
325 impl casimir_grpc::Casimir for Service {
list_devices( &mut self, _ctx: grpcio::RpcContext<'_>, _req: casimir::ListDevicesRequest, sink: grpcio::UnarySink<casimir::ListDevicesResponse>, )326     fn list_devices(
327         &mut self,
328         _ctx: grpcio::RpcContext<'_>,
329         _req: casimir::ListDevicesRequest,
330         sink: grpcio::UnarySink<casimir::ListDevicesResponse>,
331     ) {
332         let mut response = casimir::ListDevicesResponse::new();
333         response.set_device(
334             self.context
335                 .lock()
336                 .unwrap()
337                 .values()
338                 .map(casimir::Device::from)
339                 .collect::<Vec<_>>()
340                 .into(),
341         );
342         sink.success(response);
343     }
344 
get_device( &mut self, _ctx: grpcio::RpcContext<'_>, req: casimir::GetDeviceRequest, sink: grpcio::UnarySink<casimir::GetDeviceResponse>, )345     fn get_device(
346         &mut self,
347         _ctx: grpcio::RpcContext<'_>,
348         req: casimir::GetDeviceRequest,
349         sink: grpcio::UnarySink<casimir::GetDeviceResponse>,
350     ) {
351         match self.context.lock().unwrap().get(&(req.get_device_id() as u16)) {
352             Some(info) => {
353                 let mut response = casimir::GetDeviceResponse::new();
354                 response.set_device(info.into());
355                 sink.success(response)
356             }
357             None => sink.fail(grpcio::RpcStatus::with_message(
358                 grpcio::RpcStatusCode::INVALID_ARGUMENT,
359                 format!("device_id {} not found", req.get_device_id()),
360             )),
361         };
362     }
363 
move_device( &mut self, _ctx: grpcio::RpcContext<'_>, req: casimir::MoveDeviceRequest, sink: grpcio::UnarySink<casimir::MoveDeviceResponse>, )364     fn move_device(
365         &mut self,
366         _ctx: grpcio::RpcContext<'_>,
367         req: casimir::MoveDeviceRequest,
368         sink: grpcio::UnarySink<casimir::MoveDeviceResponse>,
369     ) {
370         match self.context.lock().unwrap().get_mut(&(req.get_device_id() as u16)) {
371             Some(info) => {
372                 info.position = req.get_position();
373                 sink.success(Default::default())
374             }
375             None => sink.fail(grpcio::RpcStatus::with_message(
376                 grpcio::RpcStatusCode::INVALID_ARGUMENT,
377                 format!("device_id {} not found", req.get_device_id()),
378             )),
379         };
380     }
381 }
382 
383 #[derive(FromArgs, Debug)]
384 /// Nfc emulator.
385 struct Opt {
386     #[argh(option, default = "7000")]
387     /// configure the TCP port for the NCI server.
388     nci_port: u16,
389     #[argh(option, default = "7001")]
390     /// configure the TCP port for the RF server.
391     rf_port: u16,
392     #[argh(option, default = "50051")]
393     /// configure the gRPC port.
394     grpc_port: u16,
395 }
396 
run() -> Result<()>397 async fn run() -> Result<()> {
398     env_logger::init_from_env(
399         env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "debug"),
400     );
401 
402     let opt: Opt = argh::from_env();
403     let nci_listener =
404         TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, opt.nci_port)).await?;
405     let rf_listener =
406         TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, opt.rf_port)).await?;
407     let (rf_tx, mut rf_rx) = mpsc::unbounded_channel();
408     let mut scene = Scene::new();
409 
410     info!("Listening for NCI connections at address 127.0.0.1:{}", opt.nci_port);
411     info!("Listening for RF connections at address 127.0.0.1:{}", opt.rf_port);
412     info!("Listening for gRPC connections at address 127.0.0.1:{}", opt.grpc_port);
413 
414     let env = std::sync::Arc::new(grpcio::Environment::new(1));
415     let service = casimir_grpc::create_casimir(Service { context: scene.context.clone() });
416     let quota = grpcio::ResourceQuota::new(Some("CasimirQuota")).resize_memory(1024 * 1024);
417     let channel_builder = grpcio::ChannelBuilder::new(env.clone()).set_resource_quota(quota);
418 
419     let mut server = grpcio::ServerBuilder::new(env)
420         .register_service(service)
421         .channel_args(channel_builder.build_args())
422         .build()
423         .unwrap();
424     server
425         .add_listening_port(
426             format!("127.0.0.1:{}", opt.grpc_port),
427             grpcio::ServerCredentials::insecure(),
428         )
429         .unwrap();
430     server.start();
431 
432     loop {
433         select! {
434             result = nci_listener.accept() => {
435                 let (socket, addr) = result?;
436                 info!("Incoming NCI connection from {}", addr);
437                 match scene.add_device(|id| Device::nci(id, socket, rf_tx.clone())) {
438                     Ok(id) => {
439                         scene.context.lock().unwrap().insert(id, DeviceInformation {
440                             id, position: id as u32, r#type: DeviceType::Nci
441                         });
442                         info!("Accepted NCI connection from {} with id {}", addr, id)
443                     }
444                     Err(err) => error!("Failed to accept NCI connection from {}: {}", addr, err)
445                 }
446             },
447             result = rf_listener.accept() => {
448                 let (socket, addr) = result?;
449                 info!("Incoming RF connection from {}", addr);
450                 match scene.add_device(|id| Device::rf(id, socket, rf_tx.clone())) {
451                     Ok(id) => {
452                         scene.context.lock().unwrap().insert(id, DeviceInformation {
453                             id, position: id as u32, r#type: DeviceType::Rf
454                         });
455                         info!("Accepted RF connection from {} with id {}", addr, id)
456                     }
457                     Err(err) => error!("Failed to accept RF connection from {}: {}", addr, err)
458                 }
459             },
460             _ = &mut scene => (),
461             result = rf_rx.recv() => {
462                 let packet = result.ok_or(anyhow::anyhow!("rf_rx channel closed"))?;
463                 scene.send(&packet)?
464             }
465         }
466     }
467 }
468 
469 #[tokio::main]
main() -> Result<()>470 async fn main() -> Result<()> {
471     run().await
472 }
473