1 // Copyright 2023, The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 //! NFCC and RF emulator.
16
17 use anyhow::Result;
18 use argh::FromArgs;
19 use log::{error, info, warn};
20 use rustutils::inherited_fd;
21 use std::future::Future;
22 use std::net::{Ipv4Addr, SocketAddrV4};
23 use std::pin::{pin, Pin};
24 use std::task::Context;
25 use std::task::Poll;
26 use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
27 use tokio::net::{TcpListener, UnixListener};
28 use tokio::select;
29 use tokio::sync::mpsc;
30
31 pub mod controller;
32 pub mod packets;
33
34 use controller::Controller;
35 use packets::{nci, rf};
36
37 const MAX_DEVICES: usize = 128;
38 type Id = u16;
39
40 /// Read RF Control and Data packets received on the RF transport.
41 /// Performs recombination of the segmented packets.
42 pub struct RfReader {
43 socket: Pin<Box<dyn AsyncRead>>,
44 }
45
46 /// Write RF Control and Data packets received to the RF transport.
47 /// Performs segmentation of the packets.
48 pub struct RfWriter {
49 socket: Pin<Box<dyn AsyncWrite>>,
50 }
51
52 impl RfReader {
53 /// Create a new RF reader from an `AsyncRead` implementation.
new(socket: impl AsyncRead + 'static) -> Self54 pub fn new(socket: impl AsyncRead + 'static) -> Self {
55 RfReader { socket: Box::pin(socket) }
56 }
57
58 /// Read a single RF packet from the reader.
59 /// RF packets are framed with the byte size encoded as little-endian u16.
read(&mut self) -> Result<Vec<u8>>60 pub async fn read(&mut self) -> Result<Vec<u8>> {
61 const HEADER_SIZE: usize = 2;
62 let mut header_bytes = [0; HEADER_SIZE];
63
64 // Read the header bytes.
65 self.socket.read_exact(&mut header_bytes[0..HEADER_SIZE]).await?;
66 let packet_length = u16::from_le_bytes(header_bytes) as usize;
67
68 // Read the packet data.
69 let mut packet_bytes = vec![0; packet_length];
70 self.socket.read_exact(&mut packet_bytes).await?;
71
72 Ok(packet_bytes)
73 }
74 }
75
76 impl RfWriter {
77 /// Create a new RF writer from an `AsyncWrite` implementation.
new(socket: impl AsyncWrite + 'static) -> Self78 pub fn new(socket: impl AsyncWrite + 'static) -> Self {
79 RfWriter { socket: Box::pin(socket) }
80 }
81
82 /// Write a single RF packet to the writer.
83 /// RF packets are framed with the byte size encoded as little-endian u16.
write(&mut self, packet: &[u8]) -> Result<()>84 async fn write(&mut self, packet: &[u8]) -> Result<()> {
85 let packet_length: u16 = packet.len().try_into()?;
86 let header_bytes = packet_length.to_le_bytes();
87
88 // Write the header bytes.
89 self.socket.write_all(&header_bytes).await?;
90
91 // Write the packet data.
92 self.socket.write_all(packet).await?;
93
94 Ok(())
95 }
96 }
97
98 /// Represent a generic NFC device interacting on the RF transport.
99 /// Devices communicate together through the RF mpsc channel.
100 /// NFCCs are an instance of Device.
101 pub struct Device {
102 // Unique identifier associated with the device.
103 // The identifier is assured never to be reused in the lifetime of
104 // the emulator.
105 id: u16,
106 // Async task running the controller main loop.
107 task: Pin<Box<dyn Future<Output = Result<()>>>>,
108 // Channel for injecting RF data packets into the controller instance.
109 rf_tx: mpsc::UnboundedSender<rf::RfPacket>,
110 }
111
112 impl Device {
nci( id: Id, nci_rx: impl AsyncRead + 'static, nci_tx: impl AsyncWrite + 'static, controller_rf_tx: mpsc::UnboundedSender<rf::RfPacket>, ) -> Device113 fn nci(
114 id: Id,
115 nci_rx: impl AsyncRead + 'static,
116 nci_tx: impl AsyncWrite + 'static,
117 controller_rf_tx: mpsc::UnboundedSender<rf::RfPacket>,
118 ) -> Device {
119 let (rf_tx, rf_rx) = mpsc::unbounded_channel();
120 Device {
121 id,
122 rf_tx,
123 task: Box::pin(async move {
124 Controller::run(
125 id,
126 pin!(nci::Reader::new(nci_rx).into_stream()),
127 nci::Writer::new(nci_tx),
128 rf_rx,
129 controller_rf_tx,
130 )
131 .await
132 }),
133 }
134 }
135
rf( id: Id, socket_rx: impl AsyncRead + 'static, socket_tx: impl AsyncWrite + 'static, controller_rf_tx: mpsc::UnboundedSender<rf::RfPacket>, ) -> Device136 fn rf(
137 id: Id,
138 socket_rx: impl AsyncRead + 'static,
139 socket_tx: impl AsyncWrite + 'static,
140 controller_rf_tx: mpsc::UnboundedSender<rf::RfPacket>,
141 ) -> Device {
142 let (rf_tx, mut rf_rx) = mpsc::unbounded_channel();
143 Device {
144 id,
145 rf_tx,
146 task: Box::pin(async move {
147 let mut rf_reader = RfReader::new(socket_rx);
148 let mut rf_writer = RfWriter::new(socket_tx);
149
150 let result: Result<((), ())> = futures::future::try_join(
151 async {
152 loop {
153 // Replace the sender identifier in the packet
154 // with the assigned number for the RF connection.
155 // TODO: currently the generated API does not allow
156 // modifying the parsed fields so the change needs to be
157 // applied to the unparsed packet.
158 let mut packet_bytes = rf_reader.read().await?;
159 packet_bytes[0..2].copy_from_slice(&id.to_le_bytes());
160
161 // Parse the input packet.
162 let packet = rf::RfPacket::parse(&packet_bytes)?;
163
164 // Forward the packet to other devices.
165 controller_rf_tx.send(packet)?;
166 }
167 },
168 async {
169 loop {
170 // Forward the packet to the socket connection.
171 use pdl_runtime::Packet;
172 let packet = rf_rx
173 .recv()
174 .await
175 .ok_or(anyhow::anyhow!("rf_rx channel closed"))?;
176 rf_writer.write(&packet.encode_to_vec()?).await?;
177 }
178 },
179 )
180 .await;
181
182 result?;
183 Ok(())
184 }),
185 }
186 }
187 }
188
189 struct Scene {
190 next_id: u16,
191 waker: Option<std::task::Waker>,
192 devices: [Option<Device>; MAX_DEVICES],
193 }
194
195 impl Default for Scene {
default() -> Self196 fn default() -> Self {
197 const NONE: Option<Device> = None;
198 Scene { next_id: 0, waker: None, devices: [NONE; MAX_DEVICES] }
199 }
200 }
201
202 impl Scene {
new() -> Scene203 fn new() -> Scene {
204 Default::default()
205 }
206
wake(&mut self)207 fn wake(&mut self) {
208 if let Some(waker) = self.waker.take() {
209 waker.wake()
210 }
211 }
212
add_device(&mut self, builder: impl FnOnce(Id) -> Device) -> Result<Id>213 fn add_device(&mut self, builder: impl FnOnce(Id) -> Device) -> Result<Id> {
214 for n in 0..MAX_DEVICES {
215 if self.devices[n].is_none() {
216 self.devices[n] = Some(builder(self.next_id));
217 self.next_id += 1;
218 self.wake();
219 return Ok(n as Id);
220 }
221 }
222 Err(anyhow::anyhow!("max number of connections reached"))
223 }
224
disconnect(&mut self, n: usize)225 fn disconnect(&mut self, n: usize) {
226 let id = self.devices[n].as_ref().unwrap().id;
227 self.devices[n] = None;
228 for other_n in 0..MAX_DEVICES {
229 let Some(ref device) = self.devices[other_n] else { continue };
230 assert!(n != other_n);
231 device
232 .rf_tx
233 .send(
234 rf::DeactivateNotificationBuilder {
235 type_: rf::DeactivateType::Discovery,
236 reason: rf::DeactivateReason::RfLinkLoss,
237 sender: id,
238 receiver: device.id,
239 power_level: 255,
240 technology: rf::Technology::NfcA,
241 protocol: rf::Protocol::Undetermined,
242 }
243 .into(),
244 )
245 .expect("failed to send deactive notification")
246 }
247 }
248
send(&self, packet: &rf::RfPacket) -> Result<()>249 fn send(&self, packet: &rf::RfPacket) -> Result<()> {
250 for n in 0..MAX_DEVICES {
251 let Some(ref device) = self.devices[n] else { continue };
252 if packet.get_sender() != device.id
253 && (packet.get_receiver() == u16::MAX || packet.get_receiver() == device.id)
254 {
255 device.rf_tx.send(packet.to_owned())?;
256 }
257 }
258
259 Ok(())
260 }
261 }
262
263 impl Future for Scene {
264 type Output = ();
265
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>266 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
267 for n in 0..MAX_DEVICES {
268 let dropped = match self.devices[n] {
269 Some(ref mut device) => match device.task.as_mut().poll(cx) {
270 Poll::Ready(Ok(_)) => unreachable!(),
271 Poll::Ready(Err(err)) => {
272 warn!("dropping device {}: {}", n, err);
273 true
274 }
275 Poll::Pending => false,
276 },
277 None => false,
278 };
279 if dropped {
280 self.disconnect(n)
281 }
282 }
283 self.waker = Some(cx.waker().clone());
284 Poll::Pending
285 }
286 }
287
288 #[derive(FromArgs, Debug)]
289 /// Nfc emulator.
290 struct Opt {
291 #[argh(option)]
292 /// configure the TCP port for the NCI server.
293 nci_port: Option<u16>,
294 #[argh(option)]
295 /// configure a preexisting unix server fd for the NCI server.
296 nci_unix_fd: Option<i32>,
297 #[argh(option)]
298 /// configure the TCP port for the RF server.
299 rf_port: Option<u16>,
300 #[argh(option)]
301 /// configure a preexisting unix server fd for the RF server.
302 rf_unix_fd: Option<i32>,
303 }
304
305 /// Abstraction between different server sources
306 enum Listener {
307 Tcp(TcpListener),
308 #[allow(unused)]
309 Unix(UnixListener),
310 }
311
312 impl Listener {
accept_split( &self, ) -> Result<(Pin<Box<dyn AsyncRead>>, Pin<Box<dyn AsyncWrite>>, String)>313 async fn accept_split(
314 &self,
315 ) -> Result<(Pin<Box<dyn AsyncRead>>, Pin<Box<dyn AsyncWrite>>, String)> {
316 match self {
317 Listener::Tcp(tcp) => {
318 let (socket, addr) = tcp.accept().await?;
319 let (rx, tx) = socket.into_split();
320 Ok((Box::pin(rx), Box::pin(tx), format!("{}", addr)))
321 }
322 Listener::Unix(unix) => {
323 let (socket, addr) = unix.accept().await?;
324 let (rx, tx) = socket.into_split();
325 Ok((Box::pin(rx), Box::pin(tx), format!("{:?}", addr)))
326 }
327 }
328 }
329 }
330
331 #[tokio::main]
run() -> Result<()>332 async fn run() -> Result<()> {
333 env_logger::init_from_env(
334 env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "debug"),
335 );
336
337 let opt: Opt = argh::from_env();
338
339 let nci_listener = match (opt.nci_port, opt.nci_unix_fd) {
340 (None, Some(unix_fd)) => {
341 let owned_fd = inherited_fd::take_fd_ownership(unix_fd)?;
342 let nci_listener = std::os::unix::net::UnixListener::from(owned_fd);
343 nci_listener.set_nonblocking(true)?;
344 let nci_listener = UnixListener::from_std(nci_listener)?;
345 info!("Listening for NCI connections on fd {}", unix_fd);
346 Listener::Unix(nci_listener)
347 }
348 (port, None) => {
349 let port = port.unwrap_or(7000);
350 let nci_addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, port);
351 let nci_listener = TcpListener::bind(nci_addr).await?;
352 info!("Listening for NCI connections at address {}", nci_addr);
353 Listener::Tcp(nci_listener)
354 }
355 _ => anyhow::bail!("Specify at most one of `--nci-port` and `--nci-unix-fd`."),
356 };
357
358 let rf_listener = match (opt.rf_port, opt.rf_unix_fd) {
359 (None, Some(unix_fd)) => {
360 let owned_fd = inherited_fd::take_fd_ownership(unix_fd)?;
361 let nci_listener = std::os::unix::net::UnixListener::from(owned_fd);
362 nci_listener.set_nonblocking(true)?;
363 let nci_listener = UnixListener::from_std(nci_listener)?;
364 info!("Listening for RF connections on fd {}", unix_fd);
365 Listener::Unix(nci_listener)
366 }
367 (port, None) => {
368 let port = port.unwrap_or(7001);
369 let rf_addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, port);
370 let rf_listener = TcpListener::bind(rf_addr).await?;
371 info!("Listening for RF connections at address {}", rf_addr);
372 Listener::Tcp(rf_listener)
373 }
374 _ => anyhow::bail!("Specify at most one of `--rf-port` and `--rf-unix-fd`"),
375 };
376
377 let (rf_tx, mut rf_rx) = mpsc::unbounded_channel();
378 let mut scene = Scene::new();
379 loop {
380 select! {
381 result = nci_listener.accept_split() => {
382 let (socket_rx, socket_tx, addr) = result?;
383 info!("Incoming NCI connection from {}", addr);
384 match scene.add_device(|id| Device::nci(id, socket_rx, socket_tx, rf_tx.clone())) {
385 Ok(id) => info!("Accepted NCI connection from {} in slot {}", addr, id),
386 Err(err) => error!("Failed to accept NCI connection from {}: {}", addr, err)
387 }
388 },
389 result = rf_listener.accept_split() => {
390 let (socket_rx, socket_tx, addr) = result?;
391 info!("Incoming RF connection from {}", addr);
392 match scene.add_device(|id| Device::rf(id, socket_rx, socket_tx, rf_tx.clone())) {
393 Ok(id) => info!("Accepted RF connection from {} in slot {}", addr, id),
394 Err(err) => error!("Failed to accept RF connection from {}: {}", addr, err)
395 }
396 },
397 _ = &mut scene => (),
398 result = rf_rx.recv() => {
399 let packet = result.ok_or(anyhow::anyhow!("rf_rx channel closed"))?;
400 scene.send(&packet)?
401 }
402 }
403 }
404 }
405
main() -> Result<()>406 fn main() -> Result<()> {
407 // Safety: First function call in the `main` function, before any other library calls
408 unsafe { inherited_fd::init_once()? };
409 run()
410 }
411