1# Copyright (C) 2024 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# Lint as: python3 16 17import logging 18from . import tag 19from binascii import hexlify 20from mobly import logger as mobly_logger 21import http 22from urllib.parse import urlparse 23from http.client import HTTPSConnection 24from .nfcutils.reader import Reader, ReaderTag, CONFIGURATION_A_LONG 25import ssl 26import json 27 28 29def responses_match(expected: bytes, actual: bytes) -> bool: 30 if expected == actual: 31 return True 32 if expected is None or actual is None: 33 return False 34 if len(expected) == 0 or len(actual) == 0: 35 return False 36 if expected[0] != 0x00 and actual[0] == 0x00: 37 if expected == actual[1:]: 38 return True 39 return False 40 41 42class CasimirTag(ReaderTag): 43 def __init__(self, casimir, sender_id): 44 """Empty init""" 45 self.casimir = casimir 46 self.sender_id = sender_id 47 self.sel_res = 0x60 48 self.ats = [0x70, 0x80, 0x08, 0x00] 49 self.log = casimir.log 50 51 def transact(self, command_apdus, expected_response_apdus): 52 response_apdus = self.casimir.transceive_multiple(self.sender_id, command_apdus) 53 if response_apdus is None: 54 self.log.info("received None for response APDUs") 55 return False 56 if len(response_apdus) < len(expected_response_apdus): 57 self.log.info(f"received {len(response_apdus)} responses, expected {len(expected_response_apdus)}") 58 return False 59 60 for i in range(len(expected_response_apdus)): 61 if expected_response_apdus[i] != "*" and len(response_apdus) > i and not responses_match(expected_response_apdus[i], response_apdus[i]): 62 received_apdu = hexlify(response_apdus[i]).decode() if type(response_apdus[i]) is bytes else "None" 63 self.log.error( 64 "Unexpected APDU: received %s, expected %s", 65 received_apdu, 66 hexlify(expected_response_apdus[i]).decode(), 67 ) 68 return False 69 return True 70 71 72class Casimir(Reader): 73 def __init__(self, id): 74 """ Init """ 75 self.id = id 76 self.host = 'localhost' 77 self.conn = None 78 self.rf_on = False 79 self.log = mobly_logger.PrefixLoggerAdapter( 80 logging.getLogger(), 81 { 82 mobly_logger.PrefixLoggerAdapter.EXTRA_KEY_LOG_PREFIX: ( 83 f"[Casimir|{id}]" 84 ) 85 }, 86 ) 87 88 def __del__(self): 89 self.mute() 90 91 def poll_a(self): 92 """Attempts to detect target for NFC type A.""" 93 response = self._send_command("PollA", {}) 94 if response is None: 95 return None 96 if response == {}: 97 sender_id = 0 98 else: 99 sender_id = response["sender_id"] 100 self.log.debug("got sender_id: " + str(sender_id)) 101 return CasimirTag(self, sender_id) 102 103 def poll_b(self): 104 """Attempts to detect target for NFC type B.""" 105 raise RuntimeError("not implemented") 106 107 def send_broadcast( 108 self, 109 data, 110 *, 111 configuration=CONFIGURATION_A_LONG, 112 ): 113 """Emits broadcast frame""" 114 if configuration.power != 100: 115 self._send_command( 116 "SetPowerLevel", {"power_level": configuration.power / 10} 117 ) 118 return self.transceive(data) 119 120 def transceive(self, apdu): 121 ret = self.transceive_multiple(None, [apdu]) 122 if isinstance(ret, list) and len(ret) > 0: 123 return ret[0] 124 return None 125 126 def transceive_multiple(self, sender_id, command_apdus): 127 self.unmute() 128 data = {"apdu_hex_strings": [c.hex() for c in command_apdus]} 129 if isinstance(sender_id, int): 130 data["sender_id"] = sender_id 131 response = self._send_command('SendApdu', data) 132 if response in (None, {}): 133 return [] 134 return [ 135 bytes.fromhex(apdu) for apdu in response["responseHexStrings"] 136 ] 137 138 def unmute(self): 139 """Turns on device's RF antenna.""" 140 if self.rf_on: 141 return 142 self._send_command('SetRadioState', {"radio_on": True}) 143 self.rf_on = True 144 145 def mute(self): 146 """Turns off device's RF antenna.""" 147 if self.conn is None: 148 self.rf_on = False 149 return 150 if self.rf_on: 151 self.rf_on = False 152 self._send_command('SetRadioState', {"radio_on": False}) 153 self._send_command("Close", {}) 154 self.conn.close() 155 self.conn = None 156 157 def reset(self): 158 """Nothing to reset""" 159 160 def _ensure_connected(self): 161 if self.conn is not None: 162 return 163 self.conn = HTTPSConnection( 164 self.host, 1443, 165 context=ssl._create_unverified_context() 166 ) 167 self._send_command("Init", {}) 168 self.rf_on = False 169 170 def _send_command(self, command, data): 171 self._ensure_connected() 172 self.conn.request( 173 method="POST", 174 url=f"/devices/{self.id}/services/CasimirControlService/{command}", 175 body= json.dumps(data), 176 headers={'Content-type': 'application/json'} 177 ) 178 response = self.conn.getresponse() 179 response_data = response.read() 180 self.log.debug(f"response_data: {response_data}") 181 if str(response_data).startswith("b'rpc error"): 182 return None 183 response_string = json.loads(response_data) 184 return json.loads(response_string) 185