1#  Copyright (C) 2024 The Android Open Source Project
2#
3#  Licensed under the Apache License, Version 2.0 (the "License");
4#  you may not use this file except in compliance with the License.
5#  You may obtain a copy of the License at
6#
7#       http://www.apache.org/licenses/LICENSE-2.0
8#
9#  Unless required by applicable law or agreed to in writing, software
10#  distributed under the License is distributed on an "AS IS" BASIS,
11#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12#  See the License for the specific language governing permissions and
13#  limitations under the License.
14
15# Lint as: python3
16
17import logging
18from . import tag
19from binascii import hexlify
20from mobly import logger as mobly_logger
21import http
22from urllib.parse import urlparse
23from http.client import HTTPSConnection
24from .nfcutils.reader import Reader, ReaderTag, CONFIGURATION_A_LONG
25import ssl
26import json
27
28
29def responses_match(expected: bytes, actual: bytes) -> bool:
30    if expected == actual:
31        return True
32    if expected is None or actual is None:
33        return False
34    if len(expected) == 0 or len(actual) == 0:
35        return False
36    if expected[0] != 0x00 and actual[0] == 0x00:
37        if expected == actual[1:]:
38            return True
39    return False
40
41
42class CasimirTag(ReaderTag):
43    def __init__(self, casimir, sender_id):
44        """Empty init"""
45        self.casimir = casimir
46        self.sender_id = sender_id
47        self.sel_res = 0x60
48        self.ats = [0x70, 0x80, 0x08, 0x00]
49        self.log = casimir.log
50
51    def transact(self, command_apdus, expected_response_apdus):
52        response_apdus = self.casimir.transceive_multiple(self.sender_id, command_apdus)
53        if response_apdus is None:
54            self.log.info("received None for response APDUs")
55            return False
56        if len(response_apdus) < len(expected_response_apdus):
57            self.log.info(f"received {len(response_apdus)} responses, expected {len(expected_response_apdus)}")
58            return False
59
60        for i in range(len(expected_response_apdus)):
61            if expected_response_apdus[i] != "*" and len(response_apdus) > i and not responses_match(expected_response_apdus[i], response_apdus[i]):
62                received_apdu = hexlify(response_apdus[i]).decode() if type(response_apdus[i]) is bytes else "None"
63                self.log.error(
64                    "Unexpected APDU: received %s, expected %s",
65                    received_apdu,
66                    hexlify(expected_response_apdus[i]).decode(),
67                )
68                return False
69        return True
70
71
72class Casimir(Reader):
73    def __init__(self, id):
74        """ Init """
75        self.id = id
76        self.host = 'localhost'
77        self.conn = None
78        self.rf_on = False
79        self.log = mobly_logger.PrefixLoggerAdapter(
80            logging.getLogger(),
81            {
82                mobly_logger.PrefixLoggerAdapter.EXTRA_KEY_LOG_PREFIX: (
83                    f"[Casimir|{id}]"
84                )
85            },
86        )
87
88    def __del__(self):
89        self.mute()
90
91    def poll_a(self):
92        """Attempts to detect target for NFC type A."""
93        response = self._send_command("PollA", {})
94        if response is None:
95            return None
96        if response == {}:
97            sender_id = 0
98        else:
99            sender_id = response["sender_id"]
100        self.log.debug("got sender_id: " + str(sender_id))
101        return CasimirTag(self, sender_id)
102
103    def poll_b(self):
104        """Attempts to detect target for NFC type B."""
105        raise RuntimeError("not implemented")
106
107    def send_broadcast(
108        self,
109        data,
110        *,
111        configuration=CONFIGURATION_A_LONG,
112    ):
113        """Emits broadcast frame"""
114        if configuration.power != 100:
115            self._send_command(
116                "SetPowerLevel", {"power_level": configuration.power / 10}
117            )
118        return self.transceive(data)
119
120    def transceive(self, apdu):
121        ret = self.transceive_multiple(None, [apdu])
122        if isinstance(ret, list) and len(ret) > 0:
123            return ret[0]
124        return None
125
126    def transceive_multiple(self, sender_id, command_apdus):
127        self.unmute()
128        data = {"apdu_hex_strings": [c.hex() for c in command_apdus]}
129        if isinstance(sender_id, int):
130            data["sender_id"] = sender_id
131        response = self._send_command('SendApdu', data)
132        if response in (None, {}):
133            return []
134        return [
135            bytes.fromhex(apdu) for apdu in response["responseHexStrings"]
136        ]
137
138    def unmute(self):
139        """Turns on device's RF antenna."""
140        if self.rf_on:
141            return
142        self._send_command('SetRadioState', {"radio_on": True})
143        self.rf_on = True
144
145    def mute(self):
146        """Turns off device's RF antenna."""
147        if self.conn is None:
148            self.rf_on = False
149            return
150        if self.rf_on:
151            self.rf_on = False
152            self._send_command('SetRadioState', {"radio_on": False})
153        self._send_command("Close", {})
154        self.conn.close()
155        self.conn = None
156
157    def reset(self):
158        """Nothing to reset"""
159
160    def _ensure_connected(self):
161        if self.conn is not None:
162            return
163        self.conn = HTTPSConnection(
164            self.host, 1443,
165            context=ssl._create_unverified_context()
166        )
167        self._send_command("Init", {})
168        self.rf_on = False
169
170    def _send_command(self, command, data):
171        self._ensure_connected()
172        self.conn.request(
173            method="POST",
174            url=f"/devices/{self.id}/services/CasimirControlService/{command}",
175            body= json.dumps(data),
176            headers={'Content-type': 'application/json'}
177        )
178        response = self.conn.getresponse()
179        response_data = response.read()
180        self.log.debug(f"response_data: {response_data}")
181        if str(response_data).startswith("b'rpc error"):
182            return None
183        response_string = json.loads(response_data)
184        return json.loads(response_string)
185