// Copyright 2021, The Android Open Source Project // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. //! NCI Protocol Abstraction Layer //! Supports sending NCI commands to the HAL and receiving //! NCI messages back use bytes::{BufMut, BytesMut}; use log::{debug, error}; use nfc_hal::{Hal, HalEventRegistry}; use nfc_packets::nci::DataPacketChild::Payload; use nfc_packets::nci::NciPacketChild; use nfc_packets::nci::NotificationChild::ConnCreditsNotification; use nfc_packets::nci::{Command, DataPacket, DataPacketBuilder, Notification}; use nfc_packets::nci::{Opcode, PacketBoundaryFlag, Response}; use pdl_runtime::Packet; use std::collections::HashMap; use std::collections::VecDeque; use std::sync::{Arc, Mutex}; use tokio::select; use tokio::sync::mpsc::{channel, Receiver, Sender, UnboundedSender}; use tokio::sync::{oneshot, RwLock}; use tokio::time::{sleep, Duration, Instant}; pub mod api; /// Result type type Result = std::result::Result>; /// Initialize the module and connect the channels pub async fn init() -> Nci { let hc = nfc_hal::init().await; // Channel to handle data upstream messages // let (in_data_int, in_data_ext) = channel::(10); // Internal data channels // let ic = InternalChannels { in_data_int }; let (cmd_tx, cmd_rx) = channel::(10); let commands = CommandSender { cmd_tx }; let hal_events = hc.hal_events.clone(); let notifications = EventRegistry { handlers: Arc::new(Mutex::new(HashMap::new())) }; let connections = LogicalConnectionsRegistry { conns: Arc::new(RwLock::new(HashMap::new())), sender: hc.out_data_tx.clone(), }; tokio::spawn(dispatch(notifications, connections.clone(), hc, cmd_rx)); Nci { hal_events, commands, connections } } /// NCI module external interface pub struct Nci { /// HAL events pub hal_events: HalEventRegistry, /// NCI command communication interface pub commands: CommandSender, /// NCI logical connections pub connections: LogicalConnectionsRegistry, } #[derive(Debug)] struct PendingCommand { cmd: Command, response: oneshot::Sender, } #[derive(Debug)] struct QueuedCommand { pending: PendingCommand, notification: Option>, } /// Sends raw commands. Only useful for facades & shims, or wrapped as a CommandSender. pub struct CommandSender { cmd_tx: Sender, } /// The data returned by send_notify() method. pub struct ResponsePendingNotification { /// Command response pub response: Response, /// Pending notification receiver pub notification: oneshot::Receiver, } impl CommandSender { /// Send a command, but do not expect notification to be returned pub async fn send(&mut self, cmd: Command) -> Result { let (tx, rx) = oneshot::channel::(); self.cmd_tx .send(QueuedCommand { pending: PendingCommand { cmd, response: tx }, notification: None, }) .await?; let event = rx.await?; Ok(event) } /// Send a command which expects notification as a result pub async fn send_and_notify(&mut self, cmd: Command) -> Result { let (tx, rx) = oneshot::channel::(); let (ntx, nrx) = oneshot::channel::(); self.cmd_tx .send(QueuedCommand { pending: PendingCommand { cmd, response: tx }, notification: Some(ntx), }) .await?; let event = rx.await?; Ok(ResponsePendingNotification { response: event, notification: nrx }) } } impl Drop for CommandSender { fn drop(&mut self) { debug!("CommandSender is dropped"); } } /// Parameters of a logical connection struct ConnectionParameters { callback: Option, max_payload_size: u8, nfcc_credits_avail: u8, sendq: VecDeque, recvq: VecDeque, } impl ConnectionParameters { /// Flush TX queue fn flush_tx(&mut self) { self.sendq.clear(); } } /// To keep track of currentry open logical connections #[derive(Clone)] pub struct LogicalConnectionsRegistry { conns: Arc>>>, sender: UnboundedSender, } impl LogicalConnectionsRegistry { /// Create a logical connection pub async fn open( &mut self, conn_id: u8, cb: Option, max_payload_size: u8, nfcc_credits_avail: u8, ) { let conn_params = ConnectionParameters { callback: cb, max_payload_size, nfcc_credits_avail, sendq: VecDeque::::new(), recvq: VecDeque::::new(), }; assert!( self.conns.write().await.insert(conn_id, Mutex::new(conn_params)).is_none(), "A logical connection with id {:?} already exists", conn_id ); } /// Set static callback pub async fn set_static_callback(&mut self, conn_id: u8, cb: Option) { if conn_id < 2 && cb.is_some() { // Static connections if let Some(conn_params) = self.conns.read().await.get(&conn_id) { let mut conn_params = conn_params.lock().unwrap(); conn_params.callback = cb; } } } /// Close a logical connection pub async fn close(&mut self, conn_id: u8) -> Option { if let Some(conn_params) = self.conns.write().await.remove(&conn_id) { conn_params.lock().unwrap().callback } else { None } } /// Add credits to a logical connection pub async fn add_credits(&self, conn_id: u8, ncreds: u8) { if let Some(conn_params) = self.conns.read().await.get(&conn_id) { let mut conn_params = conn_params.lock().unwrap(); conn_params.nfcc_credits_avail += ncreds; while !conn_params.sendq.is_empty() && conn_params.nfcc_credits_avail > 0 { self.sender.send(conn_params.sendq.pop_front().unwrap()).unwrap(); conn_params.nfcc_credits_avail -= 1; } } } /// Send a packet to a logical channel, splitting it if needed pub async fn send_packet(&mut self, conn_id: u8, pkt: DataPacket) { if let Some(conn_params) = self.conns.read().await.get(&conn_id) { let mut conn_params = conn_params.lock().unwrap(); if let Payload(mut p) = pkt.specialize() { if p.len() > conn_params.max_payload_size.into() { let conn_id = pkt.get_conn_id(); while p.len() > conn_params.max_payload_size.into() { let part = DataPacketBuilder { conn_id, pbf: PacketBoundaryFlag::Incomplete, cr: 0, payload: Some(p.split_to(conn_params.max_payload_size.into())), } .build(); conn_params.sendq.push_back(part); } if !p.is_empty() { let end = DataPacketBuilder { conn_id, pbf: PacketBoundaryFlag::CompleteOrFinal, cr: 0, payload: Some(p), } .build(); conn_params.sendq.push_back(end); } } else { conn_params.sendq.push_back(pkt); } } while conn_params.nfcc_credits_avail > 0 && !conn_params.sendq.is_empty() { self.sender.send(conn_params.sendq.pop_front().unwrap()).unwrap(); conn_params.nfcc_credits_avail -= 1; } } } /// Send data packet callback to the upper layers pub async fn send_callback(&self, pkt: DataPacket) { let conn_id = pkt.get_conn_id(); let ncreds = pkt.get_cr(); if ncreds > 0 { self.add_credits(conn_id, ncreds).await; } let done = pkt.get_pbf() == PacketBoundaryFlag::CompleteOrFinal; if let Some(conn_params) = self.conns.read().await.get(&conn_id) { let mut conn_params = conn_params.lock().unwrap(); if !done && conn_params.recvq.is_empty() { const NFC_DATA_START_CEVT: u16 = 5; let cb = conn_params.callback.unwrap(); cb(conn_id, NFC_DATA_START_CEVT, &[]); } conn_params.recvq.push_back(pkt); if done { const NFC_DATA_CEVT_SIZE: usize = 4; // 3 for header and 1 for status let cap = conn_params.recvq.len() * conn_params.max_payload_size as usize + NFC_DATA_CEVT_SIZE; let mut buffer = BytesMut::with_capacity(cap); buffer.put_u8(0u8); // status let pkt = conn_params.recvq.pop_front().unwrap(); buffer.put(pkt.encode_to_bytes().unwrap()); while !conn_params.recvq.is_empty() { let pkt = conn_params.recvq.pop_front().unwrap(); if let Payload(p) = pkt.specialize() { buffer.put(p); } } let data_cevt = buffer.freeze(); let cb = conn_params.callback.unwrap(); const NFC_DATA_CEVT: u16 = 3; cb(conn_id, NFC_DATA_CEVT, data_cevt.as_ref()); } } } /// Flush outgoing data queue pub async fn flush_data(&mut self, conn_id: u8) -> bool { if let Some(conn_params) = self.conns.read().await.get(&conn_id) { conn_params.lock().unwrap().flush_tx(); true } else { false } } } /// Provides ability to register and unregister for NCI notifications #[derive(Clone)] pub struct EventRegistry { handlers: Arc>>>, } impl EventRegistry { /// Indicate interest in specific NCI notification pub async fn register(&mut self, code: Opcode, sender: oneshot::Sender) { assert!( self.handlers.lock().unwrap().insert(code, sender).is_none(), "A handler for {:?} is already registered", code ); } /// Remove interest in specific NCI notification pub async fn unregister(&mut self, code: Opcode) -> Option> { self.handlers.lock().unwrap().remove(&code) } } async fn dispatch( mut ntfs: EventRegistry, lcons: LogicalConnectionsRegistry, mut hc: Hal, // ic: InternalChannels, mut cmd_rx: Receiver, ) -> Result<()> { let mut pending: Option = None; let timeout = sleep(Duration::MAX); // The max_deadline is used to set the sleep() deadline to a very distant moment in // the future, when the notification from the timer is not required. let max_deadline = timeout.deadline(); tokio::pin!(timeout); loop { select! { Some(cmd) = hc.in_cmd_rx.recv() => { match cmd.specialize() { NciPacketChild::Response(rsp) => { timeout.as_mut().reset(max_deadline); let this_opcode = rsp.get_cmd_op(); match pending.take() { Some(PendingCommand{cmd, response}) if cmd.get_op() == this_opcode => { if let Err(e) = response.send(rsp) { error!("failure dispatching command status {:?}", e); } }, Some(PendingCommand{cmd, ..}) => panic!("Waiting for {:?}, got {:?}", cmd.get_op(), this_opcode), None => panic!("Unexpected status event with opcode {:?}", this_opcode), } }, NciPacketChild::Notification(ntfy) => { match ntfy.specialize() { ConnCreditsNotification(ccnp) => { let conns = ccnp.get_conns(); for conn in conns { lcons.add_credits(conn.conn_id, conn.ncredits).await; } }, _ => { let code = ntfy.get_cmd_op(); match ntfs.unregister(code).await { Some(sender) => { if let Err(e) = sender.send(ntfy) { error!("notification channel closed {:?}", e); } }, None => panic!("Unhandled notification {:?}", code), } }, } }, _ => error!("Unexpected NCI data received {:?}", cmd), } }, qc = cmd_rx.recv(), if pending.is_none() => if let Some(queued) = qc { debug!("cmd_rx got a q"); if let Some(nsender) = queued.notification { ntfs.register(queued.pending.cmd.get_op(), nsender).await; } if let Err(e) = hc.out_cmd_tx.send(queued.pending.cmd.clone().into()) { error!("command queue closed: {:?}", e); } timeout.as_mut().reset(Instant::now() + Duration::from_millis(20)); pending = Some(queued.pending); } else { break; }, () = &mut timeout => { error!("Command processing timeout"); timeout.as_mut().reset(max_deadline); pending = None; }, Some(data) = hc.in_data_rx.recv() => lcons.send_callback(data).await, else => { debug!("Select is done"); break; }, } } debug!("NCI dispatch is terminated."); Ok(()) }