#!/usr/bin/env python3 # Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import argparse import inspect import json import random import readline import socket import sys import time import requests import struct import asyncio from concurrent.futures import ThreadPoolExecutor import rf_packets as rf class T4AT: def __init__(self, reader, writer): self.nfcid1 = bytes([0x08]) + int.to_bytes(random.randint(0, 0xffffff), length=3) self.rats_response = bytes([0x2, 0x0]) self.reader = reader self.writer = writer async def _read(self) -> rf.RfPacket: header_bytes = await self.reader.read(2) packet_length = int.from_bytes(header_bytes, byteorder='little') packet_bytes = await self.reader.read(packet_length) packet = rf.RfPacket.parse_all(packet_bytes) packet.show() return packet def _write(self, packet: rf.RfPacket): packet_bytes = packet.serialize() header_bytes = int.to_bytes(len(packet_bytes), length=2, byteorder='little') self.writer.write(header_bytes + packet_bytes) async def listen(self): """Emulate device in passive listen mode. Respond to poll requests until the device is activated by a select command.""" while True: packet = await self._read() match packet: case rf.PollCommand(technology=rf.Technology.NFC_A): self._write(rf.NfcAPollResponse( nfcid1=self.nfcid1, int_protocol=0b01)) case rf.T4ATSelectCommand(_): self._write(rf.T4ATSelectResponse( rats_response=self.rats_response, receiver=packet.sender)) print(f"t4at device selected by #{packet.sender}") await self.active(packet.sender) case _: pass async def poll(self): """Emulate device in passive poll mode. Automatically selects the first discovered device.""" while True: try: self._write(rf.PollCommand(technology=rf.Technology.NFC_A)) packet = await asyncio.wait_for(self._read(), timeout=1.0) match packet: # [DIGITAL] Table 20: SEL_RES Response Format # 01b: Configured for Type 4A Tag Platform case rf.NfcAPollResponse(int_protocol=0b01): nfcid1 = bytes(packet.nfcid1) print(f"discovered t4at device with nfcid1 #{nfcid1.hex()}") self._write(rf.T4ATSelectCommand(receiver=packet.sender, param=0)) response = await asyncio.wait_for( self.wait_for_select_response(packet.sender), timeout=1.0) print(f"t4at device activation complete") await self.active(response.sender) case _: pass time.sleep(0.050); except TimeoutError: pass time.sleep(0.050); try: signature = bytes([0x1, 0x2, 0x3, 0x4]); self._write(rf.PollCommand(technology=rf.Technology.NFC_RAW, data=signature)) await asyncio.wait_for(self._read(), timeout=1.0) except TimeoutError: pass async def wait_for_select_response(self, sender_id: int): while True: packet = await self._read() if isinstance(packet, rf.T4ATSelectResponse) and packet.sender == sender_id: return packet async def active(self, peer: int): """Active mode. Respond to data requests until the device is deselected.""" while True: packet = await self._read() match packet: case rf.DeactivateNotification(_): return case rf.Data(_): pass case _: pass async def run(address: str, rf_port: int, mode: str): """Emulate a T4AT compatible device in Listen mode.""" try: reader, writer = await asyncio.open_connection(address, rf_port) device = T4AT(reader, writer) if mode == 'poll': await device.poll() elif mode == 'listen': await device.listen() else: print(f"unsupported device mode {mode}") except Exception as exn: print( f'Failed to connect to Casimir server at address {address}:{rf_port}:\n' + f' {exn}\n' + 'Make sure the server is running') exit(1) def main(): """Start a Casimir interactive console.""" parser = argparse.ArgumentParser(description=__doc__) parser.add_argument('--address', type=str, default='127.0.0.1', help='Select the casimir server address') parser.add_argument('--rf-port', type=int, default=7001, help='Select the casimir TCP RF port') parser.add_argument('--mode', type=str, choices=['poll', 'listen'], default='poll', help='Select the tag mode') asyncio.run(run(**vars(parser.parse_args()))) if __name__ == '__main__': main()