1# Copyright (C) 2024 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# Lint as: python3 16"""Utility classes and functions used for testing polling frame notifications 17""" 18 19import time 20from typing import Collection 21from dataclasses import dataclass 22from .reader import TransceiveConfiguration 23 24 25@dataclass 26class PollingFrameTestCase: 27 """Defines a test case for polling frame tests, 28 containing data and transceive configuration to send the frame with 29 To verify against lists of expected types and data values 30 """ 31 configuration: TransceiveConfiguration 32 data: str 33 34 success_types: Collection = () 35 success_data: Collection = () 36 warning_data: Collection = () 37 38 def __init__( 39 self, 40 configuration, 41 data, 42 success_types=(), 43 success_data=(), 44 warning_data=() 45 ): 46 self.configuration = configuration 47 self.data = data 48 if len(success_types) == 0: 49 success_types = (configuration.type,) 50 # If no success data variations were given, 51 # assume only original is allowed 52 if len(success_data) == 0: 53 success_data = (data,) 54 self.success_types = success_types 55 self.success_data = success_data 56 self.warning_data = warning_data 57 58 def format_for_error(self, **kwargs): 59 """Formats testcase value for pretty reporting in errors""" 60 extras = {**kwargs} 61 if self.configuration.type not in {"O", "X"}: 62 extras["crc"] = self.configuration.crc 63 extras["bits"] = self.configuration.bits 64 if self.configuration.bitrate != 106: 65 extras["bitrate"] = self.configuration.bitrate 66 return {"type": self.configuration.type, "data": self.data, **extras} 67 68 @property 69 def expected_data(self): 70 """Returns all data variations that should not cause a test to fail""" 71 return [*self.success_data, *self.warning_data] 72 73 @property 74 def expected_types(self): 75 """Returns all types that should not cause a test to fail""" 76 return self.success_types 77 78 79@dataclass 80class PollingFrame: 81 """Represents PollingFrame object returned from an Android device""" 82 type: str 83 data: bytes = b"" 84 timestamp: int = 0 85 triggered_auto_transact: bool = False 86 vendor_specific_gain: int = 0 87 88 @staticmethod 89 def from_dict(json: dict): 90 """Creates a PollingFrame object from dict""" 91 return PollingFrame( 92 type=json.get("type"), 93 data=bytes.fromhex(json.get("data")), 94 timestamp=json.get("timestamp"), 95 triggered_auto_transact=json.get( 96 "triggeredAutoTransact", json.get("triggered_auto_transact") 97 ), 98 vendor_specific_gain=json.get( 99 "vendorSpecificGain", json.get("vendor_specific_gain") 100 ), 101 ) 102 103 def to_dict(self): 104 """Dumps PollingFrame object into a dict""" 105 return { 106 "type": self.type, 107 "data": self.data.hex().upper(), 108 "timestamp": self.timestamp, 109 "triggeredAutoTransact": self.triggered_auto_transact, 110 "vendorSpecificGain": self.vendor_specific_gain, 111 } 112 113 def __repr__(self) -> str: 114 return ( 115 f"{self.__class__.__name__}" 116 + f"({', '.join(f'{k}={v}' for k, v in self.to_dict().items())})" 117 ) 118 119 120_CARRIER = 13.56e6 121_A_TIMEOUT = (1236 + 384) / _CARRIER 122_B_TIMEOUT = 7680 / _CARRIER 123_F_TIMEOUT = 6800 / _CARRIER 124 125 126_GUARD_TIME_A = 0.0051 127_GUARD_TIME_B = 0.0051 128_GUARD_TIME_F = 0.02 129_GUARD_TIME = max(_GUARD_TIME_A, _GUARD_TIME_B, _GUARD_TIME_F) 130GUARD_TIME_PER_TECH = { 131 "O": _GUARD_TIME, 132 "X": _GUARD_TIME, 133 "A": _GUARD_TIME_A, 134 "B": _GUARD_TIME_B, 135 "F": _GUARD_TIME_F, 136} 137 138 139# Placeholder values for ON and OFF events 140_O = TransceiveConfiguration(type="O") 141_X = TransceiveConfiguration(type="X") 142 143# Possible transceive configurations for polling frames 144CONFIGURATION_A_LONG = _A = TransceiveConfiguration( 145 type="A", crc=True, bits=8, timeout=_A_TIMEOUT 146) 147_A_SHORT = TransceiveConfiguration( 148 type="A", crc=False, bits=7, timeout=_A_TIMEOUT 149) 150_A_NOCRC = TransceiveConfiguration( 151 type="A", crc=False, bits=8, timeout=_A_TIMEOUT 152) 153 154CONFIGURATION_B_LONG = _B = TransceiveConfiguration( 155 type="B", crc=True, bits=8, timeout=_B_TIMEOUT 156) 157_B_NOCRC = TransceiveConfiguration( 158 type="B", crc=False, bits=8, timeout=_B_TIMEOUT 159) 160 161_F = TransceiveConfiguration( 162 type="F", crc=True, bits=8, bitrate=212, timeout=_F_TIMEOUT 163) 164_F_424 = TransceiveConfiguration( 165 type="F", crc=True, bits=8, bitrate=424, timeout=_F_TIMEOUT 166) 167 168 169# Possible polling frame configurations 170# 1) Frames with special meaning like wakeup/request: 171# - WUPA/REQA WUPB/REQB, SENSF_REQ, etc. 172# 2) Special cases: 173# - 7-bit short frames (Type A only); 174# - 424 kbps (Type F only) 175# 3) Full frames without CRC (Types A,B only) 176# 4) Full frames with CRC (Types A,B only, F does not use PLF, no need to test) 177 178# Placeholder test cases for ON/OFF 179POLLING_FRAME_ON = PollingFrameTestCase(_O, "01", ["O"], ["01"]) 180POLLING_FRAME_OFF = PollingFrameTestCase(_X, "00", ["X"], ["00"]) 181 182# Type A 183# 1) 184POLLING_FRAMES_TYPE_A_SPECIAL = [ 185 # WUPA 186 PollingFrameTestCase(_A_SHORT, "26", ["A"], ["26"], ["52"]), 187 # REQA 188 PollingFrameTestCase(_A_SHORT, "52", ["A"], ["52"], ["26"]), 189 # Some readers send SLP_REQ in the polling loop 190 PollingFrameTestCase(_A, "5000", ["A", "U"], ["5000"]), 191] 192# 2) 7-bit short frames 193POLLING_FRAMES_TYPE_A_SHORT = [ 194 PollingFrameTestCase(_A_SHORT, "20", ["U"]), 195 PollingFrameTestCase(_A_SHORT, "06", ["U"]), 196 PollingFrameTestCase(_A_SHORT, "50", ["U"]), 197 PollingFrameTestCase(_A_SHORT, "02", ["U"]), 198 PollingFrameTestCase(_A_SHORT, "70", ["U"]), 199 PollingFrameTestCase(_A_SHORT, "7a", ["U"]), 200] 201# 3) 202POLLING_FRAMES_TYPE_A_NOCRC = [ 203 PollingFrameTestCase(_A_NOCRC, "aa", ["U"], ["aa"], [""]), 204 PollingFrameTestCase(_A_NOCRC, "55aa", ["U"], ["55aa"], [""]), 205 PollingFrameTestCase(_A_NOCRC, "aa55aa", ["U"], ["aa55aa"], ["aa"]), 206 PollingFrameTestCase(_A_NOCRC, "55aa55aa", ["U"], ["55aa55aa"], ["55aa"]), 207] 208# 4) 209POLLING_FRAMES_TYPE_A_LONG = [ 210 PollingFrameTestCase(_A, "02f1", ["U"]), 211 PollingFrameTestCase(_A, "ff00", ["U"]), 212 PollingFrameTestCase(_A, "ff001122", ["U"]), 213 PollingFrameTestCase(_A, "ff00112233445566", ["U"]), 214 PollingFrameTestCase(_A, "ff00112233445566778899aa", ["U"]), 215 PollingFrameTestCase(_A, "000102030405060708090a0b0c0d", ["U"]), 216 PollingFrameTestCase(_A, "101112131415161718191a1b1c1d1e", ["U"]), 217 PollingFrameTestCase(_A, "202122232425262728292a2b2c2d2e2f", ["U"]), 218 PollingFrameTestCase(_A, "303132333435363738393a3b3c3d3e3f30", ["U"]), 219 PollingFrameTestCase(_A, "404142434445464748494a4b4c4d4e4f4041", ["U"]), 220 PollingFrameTestCase(_A, "505152535455565758595a5b5c5d5e5f505152", ["U"]), 221 PollingFrameTestCase(_A, "606162636465666768696a6b6c6d6e6f60616263", ["U"]), 222] 223 224# Type B 225# 1) 226POLLING_FRAMES_TYPE_B_SPECIAL = [ 227 # 1.1) Common cases 228 # REQB, AFI 0x00, TS 0x00 229 PollingFrameTestCase(_B, "050000", ["B"]), 230 # WUPB, AFI 0x00, TS 0x00 231 PollingFrameTestCase(_B, "050008", ["B"]), 232 # 1.2) Different AFI values 233 # REQB, AFI 0x01, TS 0x00 234 PollingFrameTestCase(_B, "050100", ["B"]), 235 # WUPB, AFI 0x02, TS 0x00 236 PollingFrameTestCase(_B, "050208", ["B"]), 237 # 1.3) Different Timeslot counts 238 # REQB, AFI 0x00, TS 0x01 (2) 239 PollingFrameTestCase(_B, "050001", ["B"]), 240 # WUPB, AFI 0x00, TS 0x02 (4) 241 PollingFrameTestCase(_B, "05000a", ["B"]), 242] 243# 3) 244POLLING_FRAMES_TYPE_B_NOCRC = [ 245 PollingFrameTestCase(_B_NOCRC, "aa", ["U"]), 246 PollingFrameTestCase(_B_NOCRC, "55aa", ["U"]), 247 PollingFrameTestCase(_B_NOCRC, "aa55aa", ["U"]), 248 PollingFrameTestCase(_B_NOCRC, "55aa55aa", ["U"]), 249] 250# 4) 251POLLING_FRAMES_TYPE_B_LONG = [ 252 PollingFrameTestCase(_B, "02f1", ["U"]), 253 # 2 bytes 254 PollingFrameTestCase(_B, "ff00", ["U"]), 255 # 4 bytes 256 PollingFrameTestCase(_B, "ff001122", ["U"]), 257 # 8 bytes 258 PollingFrameTestCase(_B, "ff00112233445566", ["U"]), 259 # 12 bytes 260 PollingFrameTestCase(_B, "ff00112233445566778899aa", ["U"]), 261 # 16 bytes 262 PollingFrameTestCase(_B, "ff00112233445566778899aabbccddee", ["U"]), 263 # 20 bytes 264 PollingFrameTestCase(_B, "ff00112233445566778899aabbccddeeff001122", ["U"]), 265] 266 267# Type F 268# 1) 269POLLING_FRAMES_TYPE_F_SPECIAL = [ 270 # 1.0) Common 271 # SENSF_REQ, SC, 0xffff, RC 0x00, TS 0x00 272 PollingFrameTestCase(_F, "00ffff0000", ["F"]), 273 # SENSF_REQ, SC, 0x0003, RC 0x00, TS 0x00 274 PollingFrameTestCase(_F, "0000030000", ["F"]), 275 # 1.1) Different request codes 276 # SENSF_REQ, SC, 0xffff, RC 0x01, TS 0x00 277 PollingFrameTestCase(_F, "00ffff0100", ["F"]), 278 # SENSF_REQ, SC, 0x0003, RC 0x01, TS 0x00 279 PollingFrameTestCase(_F, "0000030100", ["F"]), 280 # 1.2) Different Timeslot counts 281 # SENSF_REQ, SC, 0xffff, RC 0x00, TS 0x01 (2) 282 PollingFrameTestCase(_F, "00ffff0001", ["F"]), 283 # SENSF_REQ, SC, 0x0003, RC 0x00, TS 0x02 (4) 284 PollingFrameTestCase(_F, "0000030002", ["F"]), 285 # 2) 424 kbps 286 # SENSF_REQ, SC, 0xffff 287 PollingFrameTestCase(_F_424, "00ffff0100", ["F"]), 288 # SENSF_REQ, SC, 0x0003 289 PollingFrameTestCase(_F_424, "00ffff0100", ["F"]), 290] 291 292POLLING_FRAME_ALL_TEST_CASES = [ 293 POLLING_FRAME_ON, 294 *POLLING_FRAMES_TYPE_A_SPECIAL, 295 *POLLING_FRAMES_TYPE_A_SHORT, 296 *POLLING_FRAMES_TYPE_A_NOCRC, 297 *POLLING_FRAMES_TYPE_A_LONG, 298 *POLLING_FRAMES_TYPE_B_SPECIAL, 299 *POLLING_FRAMES_TYPE_B_NOCRC, 300 *POLLING_FRAMES_TYPE_B_LONG, 301 *POLLING_FRAMES_TYPE_F_SPECIAL, 302 POLLING_FRAME_OFF, 303] 304 305 306EXPEDITABLE_POLLING_LOOP_EVENT_TYPES = ["F", "U"] 307 308 309def get_expedited_frames(frames): 310 """Finds and collects all expedited polling frames. 311 Expedited frames belong to F, U types and they get reported 312 to the service while the OS might still be evaluating the loop 313 """ 314 expedited_frames = [] 315 # Expedited frames come at the beginning 316 for frame in frames: 317 if frame.type not in EXPEDITABLE_POLLING_LOOP_EVENT_TYPES: 318 break 319 expedited_frames.append(frame) 320 return expedited_frames 321 322 323def split_frames_by_timestamp_wrap(frames, pivot_timestamp=None): 324 """Returns two lists of polling frames 325 split based on the timestamp value wrapping over to lower value 326 assuming that frames were provided in the way they arrived 327 """ 328 if not frames: 329 return [], [] 330 # Take the first timestamp from first frame (or the one provided) 331 # And check that timestamp for all frames that come afterwards is bigger 332 # otherwise consider them wrapped 333 pivot_timestamp = pivot_timestamp or frames[0].timestamp 334 not_wrapped = [] 335 for frame in frames: 336 if frame.timestamp < pivot_timestamp: 337 break 338 not_wrapped.append(frame) 339 wrapped = frames[len(not_wrapped) :] 340 return not_wrapped, wrapped 341 342 343def apply_expedited_frame_ordering(frames, limit=3): 344 """Attempts to replicate expedited frame delivery behavior 345 of HostEmulationManager for type F, U events 346 """ 347 leave, expedite = [], [] 348 349 for frame in frames: 350 if frame.type in EXPEDITABLE_POLLING_LOOP_EVENT_TYPES \ 351 and len(expedite) < limit: 352 expedite.append(frame) 353 else: 354 leave.append(frame) 355 return expedite + leave 356 357 358def apply_original_frame_ordering(frames): 359 """Reverts expedited frame ordering caused by HostEmulationManager, 360 useful when having the original polling frame order is preferable in a test 361 362 Call this function ONLY with a list of frames resembling a full polling loop 363 with possible expedited F, U events at the beginning. 364 """ 365 if len(frames) == 0: 366 return [] 367 368 expedited_frames = get_expedited_frames(frames) 369 # If no expedited frames were found at the beginning, leave 370 if len(expedited_frames) == 0: 371 return frames 372 373 # Original frames come after expedited ones 374 original_frames = frames[len(expedited_frames) :] 375 376 # In between expedited and original frames, 377 # which should be pre-sorted in their category 378 # there might be a timestamp wrap 379 original_not_wrapped, original_wrapped = split_frames_by_timestamp_wrap( 380 original_frames 381 ) 382 # Non-expedited, original frame should be the first one in the loop 383 # so we can use the timestamp of the first expedited frame as a pivot 384 expedited_not_wrapped, expedited_wrapped = split_frames_by_timestamp_wrap( 385 expedited_frames, 386 pivot_timestamp=( 387 original_not_wrapped[0].timestamp 388 if len(original_not_wrapped) > 0 else None 389 ), 390 ) 391 392 return sorted( 393 original_not_wrapped + expedited_not_wrapped, key=lambda f: f.timestamp 394 ) + sorted(original_wrapped + expedited_wrapped, key=lambda f: f.timestamp) 395 396 397def _test_apply_original_frame_ordering(): 398 """Verifies that 'apply_original_frame_ordering' works properly""" 399 testcases = [ 400 # Overflow after Normal B 401 ( 402 ("O", 4), ("A", 5), ("U", 6), ("B", 7), 403 ("U", 0), ("F", 1), ("U", 2), ("X", 3) 404 ), 405 # Overflow after Expeditable 406 ( 407 ("O", 4), ("A", 5), ("U", 6), ("B", 7), 408 ("U", 8), ("F", 0), ("U", 1), ("X", 2) 409 ), 410 # Overflow after Normal O 411 (("O", 4), ("A", 0), ("B", 1), ("F", 2), ("X", 3)), 412 # Overflow after Normal A 413 (("O", 4), ("A", 5), ("B", 0), ("F", 1), ("X", 2)), 414 # Overflow after Expeditable U 415 (("O", 4), ("U", 5), ("A", 0), ("B", 1), ("F", 2), ("X", 3)), 416 # No overflow 417 (("O", 0), ("A", 1), ("B", 2), ("X", 3)), 418 # No overflow 419 (("O", 0), ("A", 1), ("B", 2), ("F", 3), ("X", 4)), 420 # No overflow 421 (("O", 0), ("A", 1), ("U", 2), ("B", 3), ("U", 4), ("F", 5), ("X", 6)), 422 ] 423 424 for testcase in testcases: 425 original_frames = [ 426 PollingFrame(type_, b"", timestamp) 427 for (type_, timestamp) in testcase 428 ] 429 # Test for case where none or all frames get expedited 430 for limit in range(len(original_frames)): 431 expedited_frames = apply_expedited_frame_ordering( 432 original_frames, limit=limit 433 ) 434 restored_frames = apply_original_frame_ordering(expedited_frames) 435 assert original_frames == restored_frames 436 437 438# This should not raise anything when module is imported 439_test_apply_original_frame_ordering() 440 441 442_FRAME_EVENT_TIMEOUT_SEC = 1 443 444 445def poll_and_observe_frames( 446 pn532, 447 emulator, 448 testcases, 449 *, 450 restore_original_frame_ordering=False, 451 ignore_field_off_event_timeout=False, 452 **kwargs, 453): 454 """Handles broadcasting polling loop events for provided list of test cases. 455 Provided set of test cases MUST contain a complete polling loop, starting 456 with 'O' and ending with 'X' event. 457 """ 458 459 assert len(testcases) > 2 460 assert testcases[0].configuration.type == "O" 461 assert testcases[-1].configuration.type == "X" 462 463 off_event_handler = None 464 for idx, testcase in enumerate(testcases): 465 configuration = testcase.configuration 466 467 # On last 'X' Event, create handler 468 if idx == len(testcases) - 1 and configuration.type == "X": 469 off_event_handler = emulator.asyncWaitForPollingFrameOff("XEvent") 470 471 time.sleep(GUARD_TIME_PER_TECH[configuration.type]) 472 473 if configuration.type == "O": 474 pn532.unmute() 475 elif configuration.type == "X": 476 pn532.mute() 477 else: 478 if "power_level" in kwargs: 479 configuration = configuration.replace( 480 power=kwargs["power_level"] 481 ) 482 pn532.send_broadcast( 483 data=bytes.fromhex(testcase.data), 484 configuration=configuration 485 ) 486 if configuration.type in {"O", "X"}: 487 time.sleep(GUARD_TIME_PER_TECH[configuration.type]) 488 489 try: 490 if off_event_handler is not None: 491 off_event_handler.waitAndGet("XEvent", _FRAME_EVENT_TIMEOUT_SEC) 492 except (Exception, ) as e: 493 if not ignore_field_off_event_timeout: 494 emulator.log.warning( f"Timed out waiting for 'X' event due to {e}") 495 496 frames = [PollingFrame.from_dict(f) for f in emulator.getPollingFrames()] 497 498 if restore_original_frame_ordering: 499 # Attempt to revert expedited frame delivery ordering for U and F frames 500 # while keeping timestamp wrapping into account 501 frames = apply_original_frame_ordering(frames) 502 503 return frames 504 505 506def get_frame_test_stats(testcases, frames, timestamps=()): 507 """Creates a dict containing test info for error output""" 508 if len(timestamps) == 0: 509 timestamps = [-1] * len(testcases) 510 511 return { 512 "frames_sent_count": len(testcases), 513 "frames_received_count": len(frames), 514 "frames_sent": [ 515 testcase.format_for_error(timestamp=timestamp) 516 for timestamp, testcase in zip(timestamps, testcases) 517 ], 518 "frames_received": [frame.to_dict() for frame in frames], 519 } 520