xref: /aosp_15_r20/system/nfc/tools/casimir/scripts/t4at.py (revision 7eba2f3b06c51ae21384f6a4f14577b668a869b3)
1*7eba2f3bSAndroid Build Coastguard Worker#!/usr/bin/env python3
2*7eba2f3bSAndroid Build Coastguard Worker
3*7eba2f3bSAndroid Build Coastguard Worker# Copyright 2023 Google LLC
4*7eba2f3bSAndroid Build Coastguard Worker#
5*7eba2f3bSAndroid Build Coastguard Worker# Licensed under the Apache License, Version 2.0 (the "License");
6*7eba2f3bSAndroid Build Coastguard Worker# you may not use this file except in compliance with the License.
7*7eba2f3bSAndroid Build Coastguard Worker# You may obtain a copy of the License at
8*7eba2f3bSAndroid Build Coastguard Worker#
9*7eba2f3bSAndroid Build Coastguard Worker#     https://www.apache.org/licenses/LICENSE-2.0
10*7eba2f3bSAndroid Build Coastguard Worker#
11*7eba2f3bSAndroid Build Coastguard Worker# Unless required by applicable law or agreed to in writing, software
12*7eba2f3bSAndroid Build Coastguard Worker# distributed under the License is distributed on an "AS IS" BASIS,
13*7eba2f3bSAndroid Build Coastguard Worker# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14*7eba2f3bSAndroid Build Coastguard Worker# See the License for the specific language governing permissions and
15*7eba2f3bSAndroid Build Coastguard Worker# limitations under the License.
16*7eba2f3bSAndroid Build Coastguard Worker
17*7eba2f3bSAndroid Build Coastguard Workerimport argparse
18*7eba2f3bSAndroid Build Coastguard Workerimport inspect
19*7eba2f3bSAndroid Build Coastguard Workerimport json
20*7eba2f3bSAndroid Build Coastguard Workerimport random
21*7eba2f3bSAndroid Build Coastguard Workerimport readline
22*7eba2f3bSAndroid Build Coastguard Workerimport socket
23*7eba2f3bSAndroid Build Coastguard Workerimport sys
24*7eba2f3bSAndroid Build Coastguard Workerimport time
25*7eba2f3bSAndroid Build Coastguard Workerimport requests
26*7eba2f3bSAndroid Build Coastguard Workerimport struct
27*7eba2f3bSAndroid Build Coastguard Workerimport asyncio
28*7eba2f3bSAndroid Build Coastguard Workerfrom concurrent.futures import ThreadPoolExecutor
29*7eba2f3bSAndroid Build Coastguard Worker
30*7eba2f3bSAndroid Build Coastguard Workerimport rf_packets as rf
31*7eba2f3bSAndroid Build Coastguard Worker
32*7eba2f3bSAndroid Build Coastguard Worker
33*7eba2f3bSAndroid Build Coastguard Workerclass T4AT:
34*7eba2f3bSAndroid Build Coastguard Worker    def __init__(self, reader, writer):
35*7eba2f3bSAndroid Build Coastguard Worker        self.nfcid1 = bytes([0x08]) + int.to_bytes(random.randint(0, 0xffffff), length=3)
36*7eba2f3bSAndroid Build Coastguard Worker        self.rats_response = bytes([0x2, 0x0])
37*7eba2f3bSAndroid Build Coastguard Worker        self.reader = reader
38*7eba2f3bSAndroid Build Coastguard Worker        self.writer = writer
39*7eba2f3bSAndroid Build Coastguard Worker
40*7eba2f3bSAndroid Build Coastguard Worker    async def _read(self) -> rf.RfPacket:
41*7eba2f3bSAndroid Build Coastguard Worker        header_bytes = await self.reader.read(2)
42*7eba2f3bSAndroid Build Coastguard Worker        packet_length = int.from_bytes(header_bytes, byteorder='little')
43*7eba2f3bSAndroid Build Coastguard Worker        packet_bytes = await self.reader.read(packet_length)
44*7eba2f3bSAndroid Build Coastguard Worker
45*7eba2f3bSAndroid Build Coastguard Worker        packet = rf.RfPacket.parse_all(packet_bytes)
46*7eba2f3bSAndroid Build Coastguard Worker        packet.show()
47*7eba2f3bSAndroid Build Coastguard Worker        return packet
48*7eba2f3bSAndroid Build Coastguard Worker
49*7eba2f3bSAndroid Build Coastguard Worker    def _write(self, packet: rf.RfPacket):
50*7eba2f3bSAndroid Build Coastguard Worker        packet_bytes = packet.serialize()
51*7eba2f3bSAndroid Build Coastguard Worker        header_bytes = int.to_bytes(len(packet_bytes), length=2, byteorder='little')
52*7eba2f3bSAndroid Build Coastguard Worker        self.writer.write(header_bytes + packet_bytes)
53*7eba2f3bSAndroid Build Coastguard Worker
54*7eba2f3bSAndroid Build Coastguard Worker    async def listen(self):
55*7eba2f3bSAndroid Build Coastguard Worker        """Emulate device in passive listen mode. Respond to poll requests until
56*7eba2f3bSAndroid Build Coastguard Worker        the device is activated by a select command."""
57*7eba2f3bSAndroid Build Coastguard Worker        while True:
58*7eba2f3bSAndroid Build Coastguard Worker            packet = await self._read()
59*7eba2f3bSAndroid Build Coastguard Worker            match packet:
60*7eba2f3bSAndroid Build Coastguard Worker                case rf.PollCommand(technology=rf.Technology.NFC_A):
61*7eba2f3bSAndroid Build Coastguard Worker                    self._write(rf.NfcAPollResponse(
62*7eba2f3bSAndroid Build Coastguard Worker                        nfcid1=self.nfcid1, int_protocol=0b01))
63*7eba2f3bSAndroid Build Coastguard Worker                case rf.T4ATSelectCommand(_):
64*7eba2f3bSAndroid Build Coastguard Worker                    self._write(rf.T4ATSelectResponse(
65*7eba2f3bSAndroid Build Coastguard Worker                        rats_response=self.rats_response,
66*7eba2f3bSAndroid Build Coastguard Worker                        receiver=packet.sender))
67*7eba2f3bSAndroid Build Coastguard Worker                    print(f"t4at device selected by #{packet.sender}")
68*7eba2f3bSAndroid Build Coastguard Worker                    await self.active(packet.sender)
69*7eba2f3bSAndroid Build Coastguard Worker                case _:
70*7eba2f3bSAndroid Build Coastguard Worker                    pass
71*7eba2f3bSAndroid Build Coastguard Worker
72*7eba2f3bSAndroid Build Coastguard Worker    async def poll(self):
73*7eba2f3bSAndroid Build Coastguard Worker        """Emulate device in passive poll mode. Automatically selects the
74*7eba2f3bSAndroid Build Coastguard Worker        first discovered device."""
75*7eba2f3bSAndroid Build Coastguard Worker        while True:
76*7eba2f3bSAndroid Build Coastguard Worker            try:
77*7eba2f3bSAndroid Build Coastguard Worker                self._write(rf.PollCommand(technology=rf.Technology.NFC_A))
78*7eba2f3bSAndroid Build Coastguard Worker                packet = await asyncio.wait_for(self._read(), timeout=1.0)
79*7eba2f3bSAndroid Build Coastguard Worker                match packet:
80*7eba2f3bSAndroid Build Coastguard Worker                    # [DIGITAL] Table 20: SEL_RES Response Format
81*7eba2f3bSAndroid Build Coastguard Worker                    # 01b: Configured for Type 4A Tag Platform
82*7eba2f3bSAndroid Build Coastguard Worker                    case rf.NfcAPollResponse(int_protocol=0b01):
83*7eba2f3bSAndroid Build Coastguard Worker                        nfcid1 = bytes(packet.nfcid1)
84*7eba2f3bSAndroid Build Coastguard Worker                        print(f"discovered t4at device with nfcid1 #{nfcid1.hex()}")
85*7eba2f3bSAndroid Build Coastguard Worker                        self._write(rf.T4ATSelectCommand(receiver=packet.sender, param=0))
86*7eba2f3bSAndroid Build Coastguard Worker                        response = await asyncio.wait_for(
87*7eba2f3bSAndroid Build Coastguard Worker                            self.wait_for_select_response(packet.sender), timeout=1.0)
88*7eba2f3bSAndroid Build Coastguard Worker                        print(f"t4at device activation complete")
89*7eba2f3bSAndroid Build Coastguard Worker                        await self.active(response.sender)
90*7eba2f3bSAndroid Build Coastguard Worker                    case _:
91*7eba2f3bSAndroid Build Coastguard Worker                        pass
92*7eba2f3bSAndroid Build Coastguard Worker                time.sleep(0.050);
93*7eba2f3bSAndroid Build Coastguard Worker            except TimeoutError:
94*7eba2f3bSAndroid Build Coastguard Worker                pass
95*7eba2f3bSAndroid Build Coastguard Worker            time.sleep(0.050);
96*7eba2f3bSAndroid Build Coastguard Worker            try:
97*7eba2f3bSAndroid Build Coastguard Worker                signature = bytes([0x1, 0x2, 0x3, 0x4]);
98*7eba2f3bSAndroid Build Coastguard Worker                self._write(rf.PollCommand(technology=rf.Technology.NFC_RAW, data=signature))
99*7eba2f3bSAndroid Build Coastguard Worker                await asyncio.wait_for(self._read(), timeout=1.0)
100*7eba2f3bSAndroid Build Coastguard Worker            except TimeoutError:
101*7eba2f3bSAndroid Build Coastguard Worker                pass
102*7eba2f3bSAndroid Build Coastguard Worker
103*7eba2f3bSAndroid Build Coastguard Worker    async def wait_for_select_response(self, sender_id: int):
104*7eba2f3bSAndroid Build Coastguard Worker        while True:
105*7eba2f3bSAndroid Build Coastguard Worker            packet = await self._read()
106*7eba2f3bSAndroid Build Coastguard Worker            if isinstance(packet, rf.T4ATSelectResponse) and packet.sender == sender_id:
107*7eba2f3bSAndroid Build Coastguard Worker                return packet
108*7eba2f3bSAndroid Build Coastguard Worker
109*7eba2f3bSAndroid Build Coastguard Worker    async def active(self, peer: int):
110*7eba2f3bSAndroid Build Coastguard Worker        """Active mode. Respond to data requests until the device
111*7eba2f3bSAndroid Build Coastguard Worker        is deselected."""
112*7eba2f3bSAndroid Build Coastguard Worker        while True:
113*7eba2f3bSAndroid Build Coastguard Worker            packet = await self._read()
114*7eba2f3bSAndroid Build Coastguard Worker            match packet:
115*7eba2f3bSAndroid Build Coastguard Worker                case rf.DeactivateNotification(_):
116*7eba2f3bSAndroid Build Coastguard Worker                    return
117*7eba2f3bSAndroid Build Coastguard Worker                case rf.Data(_):
118*7eba2f3bSAndroid Build Coastguard Worker                    pass
119*7eba2f3bSAndroid Build Coastguard Worker                case _:
120*7eba2f3bSAndroid Build Coastguard Worker                    pass
121*7eba2f3bSAndroid Build Coastguard Worker
122*7eba2f3bSAndroid Build Coastguard Worker
123*7eba2f3bSAndroid Build Coastguard Workerasync def run(address: str, rf_port: int, mode: str):
124*7eba2f3bSAndroid Build Coastguard Worker    """Emulate a T4AT compatible device in Listen mode."""
125*7eba2f3bSAndroid Build Coastguard Worker    try:
126*7eba2f3bSAndroid Build Coastguard Worker        reader, writer = await asyncio.open_connection(address, rf_port)
127*7eba2f3bSAndroid Build Coastguard Worker        device = T4AT(reader, writer)
128*7eba2f3bSAndroid Build Coastguard Worker        if mode == 'poll':
129*7eba2f3bSAndroid Build Coastguard Worker            await device.poll()
130*7eba2f3bSAndroid Build Coastguard Worker        elif mode == 'listen':
131*7eba2f3bSAndroid Build Coastguard Worker            await device.listen()
132*7eba2f3bSAndroid Build Coastguard Worker        else:
133*7eba2f3bSAndroid Build Coastguard Worker            print(f"unsupported device mode {mode}")
134*7eba2f3bSAndroid Build Coastguard Worker    except Exception as exn:
135*7eba2f3bSAndroid Build Coastguard Worker        print(
136*7eba2f3bSAndroid Build Coastguard Worker            f'Failed to connect to Casimir server at address {address}:{rf_port}:\n' +
137*7eba2f3bSAndroid Build Coastguard Worker            f'    {exn}\n' +
138*7eba2f3bSAndroid Build Coastguard Worker            'Make sure the server is running')
139*7eba2f3bSAndroid Build Coastguard Worker        exit(1)
140*7eba2f3bSAndroid Build Coastguard Worker
141*7eba2f3bSAndroid Build Coastguard Worker
142*7eba2f3bSAndroid Build Coastguard Workerdef main():
143*7eba2f3bSAndroid Build Coastguard Worker    """Start a Casimir interactive console."""
144*7eba2f3bSAndroid Build Coastguard Worker    parser = argparse.ArgumentParser(description=__doc__)
145*7eba2f3bSAndroid Build Coastguard Worker    parser.add_argument('--address',
146*7eba2f3bSAndroid Build Coastguard Worker                        type=str,
147*7eba2f3bSAndroid Build Coastguard Worker                        default='127.0.0.1',
148*7eba2f3bSAndroid Build Coastguard Worker                        help='Select the casimir server address')
149*7eba2f3bSAndroid Build Coastguard Worker    parser.add_argument('--rf-port',
150*7eba2f3bSAndroid Build Coastguard Worker                        type=int,
151*7eba2f3bSAndroid Build Coastguard Worker                        default=7001,
152*7eba2f3bSAndroid Build Coastguard Worker                        help='Select the casimir TCP RF port')
153*7eba2f3bSAndroid Build Coastguard Worker    parser.add_argument('--mode',
154*7eba2f3bSAndroid Build Coastguard Worker                        type=str,
155*7eba2f3bSAndroid Build Coastguard Worker                        choices=['poll', 'listen'],
156*7eba2f3bSAndroid Build Coastguard Worker                        default='poll',
157*7eba2f3bSAndroid Build Coastguard Worker                        help='Select the tag mode')
158*7eba2f3bSAndroid Build Coastguard Worker    asyncio.run(run(**vars(parser.parse_args())))
159*7eba2f3bSAndroid Build Coastguard Worker
160*7eba2f3bSAndroid Build Coastguard Worker
161*7eba2f3bSAndroid Build Coastguard Workerif __name__ == '__main__':
162*7eba2f3bSAndroid Build Coastguard Worker    main()
163