1 // Copyright 2023, The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 //! NFCC and RF emulator.
16
17 use anyhow::Result;
18 use argh::FromArgs;
19 use log::{error, info, warn};
20 use std::collections::HashMap;
21 use std::future::Future;
22 use std::net::{Ipv4Addr, SocketAddrV4};
23 use std::pin::{pin, Pin};
24 use std::task::Poll;
25 use tokio::io::AsyncReadExt;
26 use tokio::io::AsyncWriteExt;
27 use tokio::net::{tcp, TcpListener, TcpStream};
28 use tokio::select;
29 use tokio::sync::mpsc;
30
31 pub mod controller;
32 pub mod packets;
33 mod proto;
34
35 use controller::Controller;
36 use packets::{nci, rf};
37 use proto::{casimir, casimir_grpc};
38
39 const MAX_DEVICES: usize = 128;
40 type Id = u16;
41
42 /// Read RF Control and Data packets received on the RF transport.
43 /// Performs recombination of the segmented packets.
44 pub struct RfReader {
45 socket: tcp::OwnedReadHalf,
46 }
47
48 /// Write RF Control and Data packets received to the RF transport.
49 /// Performs segmentation of the packets.
50 pub struct RfWriter {
51 socket: tcp::OwnedWriteHalf,
52 }
53
54 impl RfReader {
55 /// Create a new RF reader from the TCP socket half.
new(socket: tcp::OwnedReadHalf) -> Self56 pub fn new(socket: tcp::OwnedReadHalf) -> Self {
57 RfReader { socket }
58 }
59
60 /// Read a single RF packet from the reader.
61 /// RF packets are framed with the byte size encoded as little-endian u16.
read(&mut self) -> Result<Vec<u8>>62 pub async fn read(&mut self) -> Result<Vec<u8>> {
63 const HEADER_SIZE: usize = 2;
64 let mut header_bytes = [0; HEADER_SIZE];
65
66 // Read the header bytes.
67 self.socket.read_exact(&mut header_bytes[0..HEADER_SIZE]).await?;
68 let packet_length = u16::from_le_bytes(header_bytes) as usize;
69
70 // Read the packet data.
71 let mut packet_bytes = vec![0; packet_length];
72 self.socket.read_exact(&mut packet_bytes).await?;
73
74 Ok(packet_bytes)
75 }
76 }
77
78 impl RfWriter {
79 /// Create a new RF writer from the TCP socket half.
new(socket: tcp::OwnedWriteHalf) -> Self80 pub fn new(socket: tcp::OwnedWriteHalf) -> Self {
81 RfWriter { socket }
82 }
83
84 /// Write a single RF packet to the writer.
85 /// RF packets are framed with the byte size encoded as little-endian u16.
write(&mut self, packet: &[u8]) -> Result<()>86 async fn write(&mut self, packet: &[u8]) -> Result<()> {
87 let packet_length: u16 = packet.len().try_into()?;
88 let header_bytes = packet_length.to_le_bytes();
89
90 // Write the header bytes.
91 self.socket.write_all(&header_bytes).await?;
92
93 // Write the packet data.
94 self.socket.write_all(packet).await?;
95
96 Ok(())
97 }
98 }
99
100 /// Identify the device type.
101 pub enum DeviceType {
102 Nci,
103 Rf,
104 }
105
106 /// Represent shared contextual information.
107 pub struct DeviceInformation {
108 id: Id,
109 position: u32,
110 r#type: DeviceType,
111 }
112
113 /// Represent a generic NFC device interacting on the RF transport.
114 /// Devices communicate together through the RF mpsc channel.
115 /// NFCCs are an instance of Device.
116 pub struct Device {
117 // Unique identifier associated with the device.
118 // The identifier is assured never to be reused in the lifetime of
119 // the emulator.
120 id: Id,
121 // Async task running the controller main loop.
122 task: Pin<Box<dyn Future<Output = Result<()>>>>,
123 // Channel for injecting RF data packets into the controller instance.
124 rf_tx: mpsc::UnboundedSender<rf::RfPacket>,
125 }
126
127 impl Device {
nci( id: Id, socket: TcpStream, controller_rf_tx: mpsc::UnboundedSender<rf::RfPacket>, ) -> Device128 fn nci(
129 id: Id,
130 socket: TcpStream,
131 controller_rf_tx: mpsc::UnboundedSender<rf::RfPacket>,
132 ) -> Device {
133 let (rf_tx, rf_rx) = mpsc::unbounded_channel();
134 Device {
135 id,
136 rf_tx,
137 task: Box::pin(async move {
138 let (nci_rx, nci_tx) = socket.into_split();
139 Controller::run(
140 id,
141 pin!(nci::Reader::new(nci_rx).into_stream()),
142 nci::Writer::new(nci_tx),
143 rf_rx,
144 controller_rf_tx,
145 )
146 .await
147 }),
148 }
149 }
150
rf( id: Id, socket: TcpStream, controller_rf_tx: mpsc::UnboundedSender<rf::RfPacket>, ) -> Device151 fn rf(
152 id: Id,
153 socket: TcpStream,
154 controller_rf_tx: mpsc::UnboundedSender<rf::RfPacket>,
155 ) -> Device {
156 let (rf_tx, mut rf_rx) = mpsc::unbounded_channel();
157 Device {
158 id,
159 rf_tx,
160 task: Box::pin(async move {
161 let (socket_rx, socket_tx) = socket.into_split();
162 let mut rf_reader = RfReader::new(socket_rx);
163 let mut rf_writer = RfWriter::new(socket_tx);
164
165 let result: Result<((), ())> = futures::future::try_join(
166 async {
167 loop {
168 // Replace the sender identifier in the packet
169 // with the assigned number for the RF connection.
170 // TODO: currently the generated API does not allow
171 // modifying the parsed fields so the change needs to be
172 // applied to the unparsed packet.
173 let mut packet_bytes = rf_reader.read().await?;
174 packet_bytes[0..2].copy_from_slice(&id.to_le_bytes());
175
176 // Parse the input packet.
177 let packet = rf::RfPacket::parse(&packet_bytes)?;
178
179 // Forward the packet to other devices.
180 controller_rf_tx.send(packet)?;
181 }
182 },
183 async {
184 loop {
185 // Forward the packet to the socket connection.
186 use pdl_runtime::Packet;
187 let packet = rf_rx
188 .recv()
189 .await
190 .ok_or(anyhow::anyhow!("rf_rx channel closed"))?;
191 rf_writer.write(&packet.to_vec()).await?;
192 }
193 },
194 )
195 .await;
196
197 result?;
198 Ok(())
199 }),
200 }
201 }
202 }
203
204 struct Scene {
205 next_id: u16,
206 waker: Option<std::task::Waker>,
207 devices: [Option<Device>; MAX_DEVICES],
208 context: std::sync::Arc<std::sync::Mutex<HashMap<Id, DeviceInformation>>>,
209 }
210
211 impl Scene {
new() -> Scene212 fn new() -> Scene {
213 const NONE: Option<Device> = None;
214 Scene {
215 next_id: 0,
216 waker: None,
217 devices: [NONE; MAX_DEVICES],
218 context: std::sync::Arc::new(std::sync::Mutex::new(HashMap::new())),
219 }
220 }
221
wake(&mut self)222 fn wake(&mut self) {
223 if let Some(waker) = self.waker.take() {
224 waker.wake()
225 }
226 }
227
add_device(&mut self, builder: impl FnOnce(Id) -> Device) -> Result<Id>228 fn add_device(&mut self, builder: impl FnOnce(Id) -> Device) -> Result<Id> {
229 for n in 0..MAX_DEVICES {
230 if self.devices[n].is_none() {
231 let id = self.next_id;
232 self.devices[n] = Some(builder(id));
233 self.next_id += 1;
234 self.wake();
235 return Ok(id);
236 }
237 }
238 Err(anyhow::anyhow!("max number of connections reached"))
239 }
240
disconnect(&mut self, n: usize)241 fn disconnect(&mut self, n: usize) {
242 let id = self.devices[n].as_ref().unwrap().id;
243 self.devices[n] = None;
244 self.context.lock().unwrap().remove(&id);
245 for other_n in 0..MAX_DEVICES {
246 let Some(ref device) = self.devices[other_n] else { continue };
247 assert!(n != other_n);
248 device
249 .rf_tx
250 .send(
251 rf::DeactivateNotificationBuilder {
252 type_: rf::DeactivateType::Discovery,
253 reason: rf::DeactivateReason::RfLinkLoss,
254 sender: id,
255 receiver: device.id,
256 technology: rf::Technology::NfcA,
257 protocol: rf::Protocol::Undetermined,
258 }
259 .into(),
260 )
261 .expect("failed to send deactive notification")
262 }
263 }
264
send(&self, packet: &rf::RfPacket) -> Result<()>265 fn send(&self, packet: &rf::RfPacket) -> Result<()> {
266 let context = self.context.lock().unwrap();
267 for n in 0..MAX_DEVICES {
268 let Some(ref device) = self.devices[n] else { continue };
269 if packet.get_sender() != device.id
270 && (packet.get_receiver() == u16::MAX || packet.get_receiver() == device.id)
271 && context.get(&device.id).map(|info| info.position)
272 == context.get(&packet.get_sender()).map(|info| info.position)
273 {
274 device.rf_tx.send(packet.to_owned())?;
275 }
276 }
277
278 Ok(())
279 }
280 }
281
282 impl Future for Scene {
283 type Output = ();
284
poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<()>285 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<()> {
286 for n in 0..MAX_DEVICES {
287 let dropped = match self.devices[n] {
288 Some(ref mut device) => match device.task.as_mut().poll(cx) {
289 Poll::Ready(Ok(_)) => unreachable!(),
290 Poll::Ready(Err(err)) => {
291 warn!("dropping device {}: {}", n, err);
292 true
293 }
294 Poll::Pending => false,
295 },
296 None => false,
297 };
298 if dropped {
299 self.disconnect(n)
300 }
301 }
302 self.waker = Some(cx.waker().clone());
303 Poll::Pending
304 }
305 }
306
307 #[derive(Clone)]
308 struct Service {
309 context: std::sync::Arc<std::sync::Mutex<HashMap<Id, DeviceInformation>>>,
310 }
311
312 impl From<&DeviceInformation> for casimir::Device {
from(info: &DeviceInformation) -> Self313 fn from(info: &DeviceInformation) -> Self {
314 let mut device = casimir::Device::new();
315 device.set_device_id(info.id as u32);
316 device.set_position(info.position);
317 match info.r#type {
318 DeviceType::Nci => device.set_nci(Default::default()),
319 DeviceType::Rf => device.set_rf(Default::default()),
320 }
321 device
322 }
323 }
324
325 impl casimir_grpc::Casimir for Service {
list_devices( &mut self, _ctx: grpcio::RpcContext<'_>, _req: casimir::ListDevicesRequest, sink: grpcio::UnarySink<casimir::ListDevicesResponse>, )326 fn list_devices(
327 &mut self,
328 _ctx: grpcio::RpcContext<'_>,
329 _req: casimir::ListDevicesRequest,
330 sink: grpcio::UnarySink<casimir::ListDevicesResponse>,
331 ) {
332 let mut response = casimir::ListDevicesResponse::new();
333 response.set_device(
334 self.context
335 .lock()
336 .unwrap()
337 .values()
338 .map(casimir::Device::from)
339 .collect::<Vec<_>>()
340 .into(),
341 );
342 sink.success(response);
343 }
344
get_device( &mut self, _ctx: grpcio::RpcContext<'_>, req: casimir::GetDeviceRequest, sink: grpcio::UnarySink<casimir::GetDeviceResponse>, )345 fn get_device(
346 &mut self,
347 _ctx: grpcio::RpcContext<'_>,
348 req: casimir::GetDeviceRequest,
349 sink: grpcio::UnarySink<casimir::GetDeviceResponse>,
350 ) {
351 match self.context.lock().unwrap().get(&(req.get_device_id() as u16)) {
352 Some(info) => {
353 let mut response = casimir::GetDeviceResponse::new();
354 response.set_device(info.into());
355 sink.success(response)
356 }
357 None => sink.fail(grpcio::RpcStatus::with_message(
358 grpcio::RpcStatusCode::INVALID_ARGUMENT,
359 format!("device_id {} not found", req.get_device_id()),
360 )),
361 };
362 }
363
move_device( &mut self, _ctx: grpcio::RpcContext<'_>, req: casimir::MoveDeviceRequest, sink: grpcio::UnarySink<casimir::MoveDeviceResponse>, )364 fn move_device(
365 &mut self,
366 _ctx: grpcio::RpcContext<'_>,
367 req: casimir::MoveDeviceRequest,
368 sink: grpcio::UnarySink<casimir::MoveDeviceResponse>,
369 ) {
370 match self.context.lock().unwrap().get_mut(&(req.get_device_id() as u16)) {
371 Some(info) => {
372 info.position = req.get_position();
373 sink.success(Default::default())
374 }
375 None => sink.fail(grpcio::RpcStatus::with_message(
376 grpcio::RpcStatusCode::INVALID_ARGUMENT,
377 format!("device_id {} not found", req.get_device_id()),
378 )),
379 };
380 }
381 }
382
383 #[derive(FromArgs, Debug)]
384 /// Nfc emulator.
385 struct Opt {
386 #[argh(option, default = "7000")]
387 /// configure the TCP port for the NCI server.
388 nci_port: u16,
389 #[argh(option, default = "7001")]
390 /// configure the TCP port for the RF server.
391 rf_port: u16,
392 #[argh(option, default = "50051")]
393 /// configure the gRPC port.
394 grpc_port: u16,
395 }
396
run() -> Result<()>397 async fn run() -> Result<()> {
398 env_logger::init_from_env(
399 env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "debug"),
400 );
401
402 let opt: Opt = argh::from_env();
403 let nci_listener =
404 TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, opt.nci_port)).await?;
405 let rf_listener =
406 TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, opt.rf_port)).await?;
407 let (rf_tx, mut rf_rx) = mpsc::unbounded_channel();
408 let mut scene = Scene::new();
409
410 info!("Listening for NCI connections at address 127.0.0.1:{}", opt.nci_port);
411 info!("Listening for RF connections at address 127.0.0.1:{}", opt.rf_port);
412 info!("Listening for gRPC connections at address 127.0.0.1:{}", opt.grpc_port);
413
414 let env = std::sync::Arc::new(grpcio::Environment::new(1));
415 let service = casimir_grpc::create_casimir(Service { context: scene.context.clone() });
416 let quota = grpcio::ResourceQuota::new(Some("CasimirQuota")).resize_memory(1024 * 1024);
417 let channel_builder = grpcio::ChannelBuilder::new(env.clone()).set_resource_quota(quota);
418
419 let mut server = grpcio::ServerBuilder::new(env)
420 .register_service(service)
421 .channel_args(channel_builder.build_args())
422 .build()
423 .unwrap();
424 server
425 .add_listening_port(
426 format!("127.0.0.1:{}", opt.grpc_port),
427 grpcio::ServerCredentials::insecure(),
428 )
429 .unwrap();
430 server.start();
431
432 loop {
433 select! {
434 result = nci_listener.accept() => {
435 let (socket, addr) = result?;
436 info!("Incoming NCI connection from {}", addr);
437 match scene.add_device(|id| Device::nci(id, socket, rf_tx.clone())) {
438 Ok(id) => {
439 scene.context.lock().unwrap().insert(id, DeviceInformation {
440 id, position: id as u32, r#type: DeviceType::Nci
441 });
442 info!("Accepted NCI connection from {} with id {}", addr, id)
443 }
444 Err(err) => error!("Failed to accept NCI connection from {}: {}", addr, err)
445 }
446 },
447 result = rf_listener.accept() => {
448 let (socket, addr) = result?;
449 info!("Incoming RF connection from {}", addr);
450 match scene.add_device(|id| Device::rf(id, socket, rf_tx.clone())) {
451 Ok(id) => {
452 scene.context.lock().unwrap().insert(id, DeviceInformation {
453 id, position: id as u32, r#type: DeviceType::Rf
454 });
455 info!("Accepted RF connection from {} with id {}", addr, id)
456 }
457 Err(err) => error!("Failed to accept RF connection from {}: {}", addr, err)
458 }
459 },
460 _ = &mut scene => (),
461 result = rf_rx.recv() => {
462 let packet = result.ok_or(anyhow::anyhow!("rf_rx channel closed"))?;
463 scene.send(&packet)?
464 }
465 }
466 }
467 }
468
469 #[tokio::main]
main() -> Result<()>470 async fn main() -> Result<()> {
471 run().await
472 }
473