1*7eba2f3bSAndroid Build Coastguard Worker#!/usr/bin/env python3 2*7eba2f3bSAndroid Build Coastguard Worker 3*7eba2f3bSAndroid Build Coastguard Worker# Copyright 2023 Google LLC 4*7eba2f3bSAndroid Build Coastguard Worker# 5*7eba2f3bSAndroid Build Coastguard Worker# Licensed under the Apache License, Version 2.0 (the "License"); 6*7eba2f3bSAndroid Build Coastguard Worker# you may not use this file except in compliance with the License. 7*7eba2f3bSAndroid Build Coastguard Worker# You may obtain a copy of the License at 8*7eba2f3bSAndroid Build Coastguard Worker# 9*7eba2f3bSAndroid Build Coastguard Worker# https://www.apache.org/licenses/LICENSE-2.0 10*7eba2f3bSAndroid Build Coastguard Worker# 11*7eba2f3bSAndroid Build Coastguard Worker# Unless required by applicable law or agreed to in writing, software 12*7eba2f3bSAndroid Build Coastguard Worker# distributed under the License is distributed on an "AS IS" BASIS, 13*7eba2f3bSAndroid Build Coastguard Worker# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14*7eba2f3bSAndroid Build Coastguard Worker# See the License for the specific language governing permissions and 15*7eba2f3bSAndroid Build Coastguard Worker# limitations under the License. 16*7eba2f3bSAndroid Build Coastguard Worker 17*7eba2f3bSAndroid Build Coastguard Workerimport argparse 18*7eba2f3bSAndroid Build Coastguard Workerimport inspect 19*7eba2f3bSAndroid Build Coastguard Workerimport json 20*7eba2f3bSAndroid Build Coastguard Workerimport random 21*7eba2f3bSAndroid Build Coastguard Workerimport readline 22*7eba2f3bSAndroid Build Coastguard Workerimport socket 23*7eba2f3bSAndroid Build Coastguard Workerimport sys 24*7eba2f3bSAndroid Build Coastguard Workerimport time 25*7eba2f3bSAndroid Build Coastguard Workerimport requests 26*7eba2f3bSAndroid Build Coastguard Workerimport struct 27*7eba2f3bSAndroid Build Coastguard Workerimport asyncio 28*7eba2f3bSAndroid Build Coastguard Workerfrom concurrent.futures import ThreadPoolExecutor 29*7eba2f3bSAndroid Build Coastguard Worker 30*7eba2f3bSAndroid Build Coastguard Workerimport rf_packets as rf 31*7eba2f3bSAndroid Build Coastguard Worker 32*7eba2f3bSAndroid Build Coastguard Worker 33*7eba2f3bSAndroid Build Coastguard Workerclass T4AT: 34*7eba2f3bSAndroid Build Coastguard Worker def __init__(self, reader, writer): 35*7eba2f3bSAndroid Build Coastguard Worker self.nfcid1 = bytes([0x08]) + int.to_bytes(random.randint(0, 0xffffff), length=3) 36*7eba2f3bSAndroid Build Coastguard Worker self.rats_response = bytes([0x2, 0x0]) 37*7eba2f3bSAndroid Build Coastguard Worker self.reader = reader 38*7eba2f3bSAndroid Build Coastguard Worker self.writer = writer 39*7eba2f3bSAndroid Build Coastguard Worker 40*7eba2f3bSAndroid Build Coastguard Worker async def _read(self) -> rf.RfPacket: 41*7eba2f3bSAndroid Build Coastguard Worker header_bytes = await self.reader.read(2) 42*7eba2f3bSAndroid Build Coastguard Worker packet_length = int.from_bytes(header_bytes, byteorder='little') 43*7eba2f3bSAndroid Build Coastguard Worker packet_bytes = await self.reader.read(packet_length) 44*7eba2f3bSAndroid Build Coastguard Worker 45*7eba2f3bSAndroid Build Coastguard Worker packet = rf.RfPacket.parse_all(packet_bytes) 46*7eba2f3bSAndroid Build Coastguard Worker packet.show() 47*7eba2f3bSAndroid Build Coastguard Worker return packet 48*7eba2f3bSAndroid Build Coastguard Worker 49*7eba2f3bSAndroid Build Coastguard Worker def _write(self, packet: rf.RfPacket): 50*7eba2f3bSAndroid Build Coastguard Worker packet_bytes = packet.serialize() 51*7eba2f3bSAndroid Build Coastguard Worker header_bytes = int.to_bytes(len(packet_bytes), length=2, byteorder='little') 52*7eba2f3bSAndroid Build Coastguard Worker self.writer.write(header_bytes + packet_bytes) 53*7eba2f3bSAndroid Build Coastguard Worker 54*7eba2f3bSAndroid Build Coastguard Worker async def listen(self): 55*7eba2f3bSAndroid Build Coastguard Worker """Emulate device in passive listen mode. Respond to poll requests until 56*7eba2f3bSAndroid Build Coastguard Worker the device is activated by a select command.""" 57*7eba2f3bSAndroid Build Coastguard Worker while True: 58*7eba2f3bSAndroid Build Coastguard Worker packet = await self._read() 59*7eba2f3bSAndroid Build Coastguard Worker match packet: 60*7eba2f3bSAndroid Build Coastguard Worker case rf.PollCommand(technology=rf.Technology.NFC_A): 61*7eba2f3bSAndroid Build Coastguard Worker self._write(rf.NfcAPollResponse( 62*7eba2f3bSAndroid Build Coastguard Worker nfcid1=self.nfcid1, int_protocol=0b01)) 63*7eba2f3bSAndroid Build Coastguard Worker case rf.T4ATSelectCommand(_): 64*7eba2f3bSAndroid Build Coastguard Worker self._write(rf.T4ATSelectResponse( 65*7eba2f3bSAndroid Build Coastguard Worker rats_response=self.rats_response, 66*7eba2f3bSAndroid Build Coastguard Worker receiver=packet.sender)) 67*7eba2f3bSAndroid Build Coastguard Worker print(f"t4at device selected by #{packet.sender}") 68*7eba2f3bSAndroid Build Coastguard Worker await self.active(packet.sender) 69*7eba2f3bSAndroid Build Coastguard Worker case _: 70*7eba2f3bSAndroid Build Coastguard Worker pass 71*7eba2f3bSAndroid Build Coastguard Worker 72*7eba2f3bSAndroid Build Coastguard Worker async def poll(self): 73*7eba2f3bSAndroid Build Coastguard Worker """Emulate device in passive poll mode. Automatically selects the 74*7eba2f3bSAndroid Build Coastguard Worker first discovered device.""" 75*7eba2f3bSAndroid Build Coastguard Worker while True: 76*7eba2f3bSAndroid Build Coastguard Worker try: 77*7eba2f3bSAndroid Build Coastguard Worker self._write(rf.PollCommand(technology=rf.Technology.NFC_A)) 78*7eba2f3bSAndroid Build Coastguard Worker packet = await asyncio.wait_for(self._read(), timeout=1.0) 79*7eba2f3bSAndroid Build Coastguard Worker match packet: 80*7eba2f3bSAndroid Build Coastguard Worker # [DIGITAL] Table 20: SEL_RES Response Format 81*7eba2f3bSAndroid Build Coastguard Worker # 01b: Configured for Type 4A Tag Platform 82*7eba2f3bSAndroid Build Coastguard Worker case rf.NfcAPollResponse(int_protocol=0b01): 83*7eba2f3bSAndroid Build Coastguard Worker nfcid1 = bytes(packet.nfcid1) 84*7eba2f3bSAndroid Build Coastguard Worker print(f"discovered t4at device with nfcid1 #{nfcid1.hex()}") 85*7eba2f3bSAndroid Build Coastguard Worker self._write(rf.T4ATSelectCommand(receiver=packet.sender, param=0)) 86*7eba2f3bSAndroid Build Coastguard Worker response = await asyncio.wait_for( 87*7eba2f3bSAndroid Build Coastguard Worker self.wait_for_select_response(packet.sender), timeout=1.0) 88*7eba2f3bSAndroid Build Coastguard Worker print(f"t4at device activation complete") 89*7eba2f3bSAndroid Build Coastguard Worker await self.active(response.sender) 90*7eba2f3bSAndroid Build Coastguard Worker case _: 91*7eba2f3bSAndroid Build Coastguard Worker pass 92*7eba2f3bSAndroid Build Coastguard Worker time.sleep(0.050); 93*7eba2f3bSAndroid Build Coastguard Worker except TimeoutError: 94*7eba2f3bSAndroid Build Coastguard Worker pass 95*7eba2f3bSAndroid Build Coastguard Worker time.sleep(0.050); 96*7eba2f3bSAndroid Build Coastguard Worker try: 97*7eba2f3bSAndroid Build Coastguard Worker signature = bytes([0x1, 0x2, 0x3, 0x4]); 98*7eba2f3bSAndroid Build Coastguard Worker self._write(rf.PollCommand(technology=rf.Technology.NFC_RAW, data=signature)) 99*7eba2f3bSAndroid Build Coastguard Worker await asyncio.wait_for(self._read(), timeout=1.0) 100*7eba2f3bSAndroid Build Coastguard Worker except TimeoutError: 101*7eba2f3bSAndroid Build Coastguard Worker pass 102*7eba2f3bSAndroid Build Coastguard Worker 103*7eba2f3bSAndroid Build Coastguard Worker async def wait_for_select_response(self, sender_id: int): 104*7eba2f3bSAndroid Build Coastguard Worker while True: 105*7eba2f3bSAndroid Build Coastguard Worker packet = await self._read() 106*7eba2f3bSAndroid Build Coastguard Worker if isinstance(packet, rf.T4ATSelectResponse) and packet.sender == sender_id: 107*7eba2f3bSAndroid Build Coastguard Worker return packet 108*7eba2f3bSAndroid Build Coastguard Worker 109*7eba2f3bSAndroid Build Coastguard Worker async def active(self, peer: int): 110*7eba2f3bSAndroid Build Coastguard Worker """Active mode. Respond to data requests until the device 111*7eba2f3bSAndroid Build Coastguard Worker is deselected.""" 112*7eba2f3bSAndroid Build Coastguard Worker while True: 113*7eba2f3bSAndroid Build Coastguard Worker packet = await self._read() 114*7eba2f3bSAndroid Build Coastguard Worker match packet: 115*7eba2f3bSAndroid Build Coastguard Worker case rf.DeactivateNotification(_): 116*7eba2f3bSAndroid Build Coastguard Worker return 117*7eba2f3bSAndroid Build Coastguard Worker case rf.Data(_): 118*7eba2f3bSAndroid Build Coastguard Worker pass 119*7eba2f3bSAndroid Build Coastguard Worker case _: 120*7eba2f3bSAndroid Build Coastguard Worker pass 121*7eba2f3bSAndroid Build Coastguard Worker 122*7eba2f3bSAndroid Build Coastguard Worker 123*7eba2f3bSAndroid Build Coastguard Workerasync def run(address: str, rf_port: int, mode: str): 124*7eba2f3bSAndroid Build Coastguard Worker """Emulate a T4AT compatible device in Listen mode.""" 125*7eba2f3bSAndroid Build Coastguard Worker try: 126*7eba2f3bSAndroid Build Coastguard Worker reader, writer = await asyncio.open_connection(address, rf_port) 127*7eba2f3bSAndroid Build Coastguard Worker device = T4AT(reader, writer) 128*7eba2f3bSAndroid Build Coastguard Worker if mode == 'poll': 129*7eba2f3bSAndroid Build Coastguard Worker await device.poll() 130*7eba2f3bSAndroid Build Coastguard Worker elif mode == 'listen': 131*7eba2f3bSAndroid Build Coastguard Worker await device.listen() 132*7eba2f3bSAndroid Build Coastguard Worker else: 133*7eba2f3bSAndroid Build Coastguard Worker print(f"unsupported device mode {mode}") 134*7eba2f3bSAndroid Build Coastguard Worker except Exception as exn: 135*7eba2f3bSAndroid Build Coastguard Worker print( 136*7eba2f3bSAndroid Build Coastguard Worker f'Failed to connect to Casimir server at address {address}:{rf_port}:\n' + 137*7eba2f3bSAndroid Build Coastguard Worker f' {exn}\n' + 138*7eba2f3bSAndroid Build Coastguard Worker 'Make sure the server is running') 139*7eba2f3bSAndroid Build Coastguard Worker exit(1) 140*7eba2f3bSAndroid Build Coastguard Worker 141*7eba2f3bSAndroid Build Coastguard Worker 142*7eba2f3bSAndroid Build Coastguard Workerdef main(): 143*7eba2f3bSAndroid Build Coastguard Worker """Start a Casimir interactive console.""" 144*7eba2f3bSAndroid Build Coastguard Worker parser = argparse.ArgumentParser(description=__doc__) 145*7eba2f3bSAndroid Build Coastguard Worker parser.add_argument('--address', 146*7eba2f3bSAndroid Build Coastguard Worker type=str, 147*7eba2f3bSAndroid Build Coastguard Worker default='127.0.0.1', 148*7eba2f3bSAndroid Build Coastguard Worker help='Select the casimir server address') 149*7eba2f3bSAndroid Build Coastguard Worker parser.add_argument('--rf-port', 150*7eba2f3bSAndroid Build Coastguard Worker type=int, 151*7eba2f3bSAndroid Build Coastguard Worker default=7001, 152*7eba2f3bSAndroid Build Coastguard Worker help='Select the casimir TCP RF port') 153*7eba2f3bSAndroid Build Coastguard Worker parser.add_argument('--mode', 154*7eba2f3bSAndroid Build Coastguard Worker type=str, 155*7eba2f3bSAndroid Build Coastguard Worker choices=['poll', 'listen'], 156*7eba2f3bSAndroid Build Coastguard Worker default='poll', 157*7eba2f3bSAndroid Build Coastguard Worker help='Select the tag mode') 158*7eba2f3bSAndroid Build Coastguard Worker asyncio.run(run(**vars(parser.parse_args()))) 159*7eba2f3bSAndroid Build Coastguard Worker 160*7eba2f3bSAndroid Build Coastguard Worker 161*7eba2f3bSAndroid Build Coastguard Workerif __name__ == '__main__': 162*7eba2f3bSAndroid Build Coastguard Worker main() 163