1#  Copyright (C) 2024 The Android Open Source Project
2#
3#  Licensed under the Apache License, Version 2.0 (the "License");
4#  you may not use this file except in compliance with the License.
5#  You may obtain a copy of the License at
6#
7#       http://www.apache.org/licenses/LICENSE-2.0
8#
9#  Unless required by applicable law or agreed to in writing, software
10#  distributed under the License is distributed on an "AS IS" BASIS,
11#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12#  See the License for the specific language governing permissions and
13#  limitations under the License.
14
15# Lint as: python3
16"""Utility classes and functions used for testing polling frame notifications
17"""
18
19import time
20from typing import Collection
21from dataclasses import dataclass
22from .reader import TransceiveConfiguration
23
24
25@dataclass
26class PollingFrameTestCase:
27    """Defines a test case for polling frame tests,
28    containing data and transceive configuration to send the frame with
29    To verify against lists of expected types and data values
30    """
31    configuration: TransceiveConfiguration
32    data: str
33
34    success_types: Collection = ()
35    success_data: Collection = ()
36    warning_data: Collection = ()
37
38    def __init__(
39        self,
40        configuration,
41        data,
42        success_types=(),
43        success_data=(),
44        warning_data=()
45    ):
46        self.configuration = configuration
47        self.data = data
48        if len(success_types) == 0:
49            success_types = (configuration.type,)
50        # If no success data variations were given,
51        # assume only original is allowed
52        if len(success_data) == 0:
53            success_data = (data,)
54        self.success_types = success_types
55        self.success_data = success_data
56        self.warning_data = warning_data
57
58    def format_for_error(self, **kwargs):
59        """Formats testcase value for pretty reporting in errors"""
60        extras = {**kwargs}
61        if self.configuration.type not in {"O", "X"}:
62            extras["crc"] = self.configuration.crc
63            extras["bits"] = self.configuration.bits
64        if self.configuration.bitrate != 106:
65            extras["bitrate"] = self.configuration.bitrate
66        return {"type": self.configuration.type, "data": self.data, **extras}
67
68    @property
69    def expected_data(self):
70        """Returns all data variations that should not cause a test to fail"""
71        return [*self.success_data, *self.warning_data]
72
73    @property
74    def expected_types(self):
75        """Returns all types that should not cause a test to fail"""
76        return self.success_types
77
78
79@dataclass
80class PollingFrame:
81    """Represents PollingFrame object returned from an Android device"""
82    type: str
83    data: bytes = b""
84    timestamp: int = 0
85    triggered_auto_transact: bool = False
86    vendor_specific_gain: int = 0
87
88    @staticmethod
89    def from_dict(json: dict):
90        """Creates a PollingFrame object from dict"""
91        return PollingFrame(
92            type=json.get("type"),
93            data=bytes.fromhex(json.get("data")),
94            timestamp=json.get("timestamp"),
95            triggered_auto_transact=json.get(
96                "triggeredAutoTransact", json.get("triggered_auto_transact")
97            ),
98            vendor_specific_gain=json.get(
99                "vendorSpecificGain", json.get("vendor_specific_gain")
100            ),
101        )
102
103    def to_dict(self):
104        """Dumps PollingFrame object into a dict"""
105        return {
106            "type": self.type,
107            "data": self.data.hex().upper(),
108            "timestamp": self.timestamp,
109            "triggeredAutoTransact": self.triggered_auto_transact,
110            "vendorSpecificGain": self.vendor_specific_gain,
111        }
112
113    def __repr__(self) -> str:
114        return (
115            f"{self.__class__.__name__}"
116            + f"({', '.join(f'{k}={v}' for k, v in self.to_dict().items())})"
117        )
118
119
120_CARRIER = 13.56e6
121_A_TIMEOUT = (1236 + 384) / _CARRIER
122_B_TIMEOUT = 7680 / _CARRIER
123_F_TIMEOUT = 6800 / _CARRIER
124
125
126_GUARD_TIME_A = 0.0051
127_GUARD_TIME_B = 0.0051
128_GUARD_TIME_F = 0.02
129_GUARD_TIME = max(_GUARD_TIME_A, _GUARD_TIME_B, _GUARD_TIME_F)
130GUARD_TIME_PER_TECH = {
131    "O": _GUARD_TIME,
132    "X": _GUARD_TIME,
133    "A": _GUARD_TIME_A,
134    "B": _GUARD_TIME_B,
135    "F": _GUARD_TIME_F,
136}
137
138
139# Placeholder values for ON and OFF events
140_O = TransceiveConfiguration(type="O")
141_X = TransceiveConfiguration(type="X")
142
143# Possible transceive configurations for polling frames
144CONFIGURATION_A_LONG = _A = TransceiveConfiguration(
145    type="A", crc=True, bits=8, timeout=_A_TIMEOUT
146)
147_A_SHORT = TransceiveConfiguration(
148    type="A", crc=False, bits=7, timeout=_A_TIMEOUT
149)
150_A_NOCRC = TransceiveConfiguration(
151    type="A", crc=False, bits=8, timeout=_A_TIMEOUT
152)
153
154CONFIGURATION_B_LONG = _B = TransceiveConfiguration(
155    type="B", crc=True, bits=8, timeout=_B_TIMEOUT
156)
157_B_NOCRC = TransceiveConfiguration(
158    type="B", crc=False, bits=8, timeout=_B_TIMEOUT
159)
160
161_F = TransceiveConfiguration(
162    type="F", crc=True, bits=8, bitrate=212, timeout=_F_TIMEOUT
163)
164_F_424 = TransceiveConfiguration(
165    type="F", crc=True, bits=8, bitrate=424, timeout=_F_TIMEOUT
166)
167
168
169# Possible polling frame configurations
170# 1) Frames with special meaning like wakeup/request:
171#    - WUPA/REQA WUPB/REQB, SENSF_REQ, etc.
172# 2) Special cases:
173#    - 7-bit short frames (Type A only);
174#    - 424 kbps (Type F only)
175# 3) Full frames without CRC (Types A,B only)
176# 4) Full frames with CRC (Types A,B only, F does not use PLF, no need to test)
177
178# Placeholder test cases for ON/OFF
179POLLING_FRAME_ON = PollingFrameTestCase(_O, "01", ["O"], ["01"])
180POLLING_FRAME_OFF = PollingFrameTestCase(_X, "00", ["X"], ["00"])
181
182# Type A
183# 1)
184POLLING_FRAMES_TYPE_A_SPECIAL = [
185    # WUPA
186    PollingFrameTestCase(_A_SHORT, "26", ["A"], ["26"], ["52"]),
187    # REQA
188    PollingFrameTestCase(_A_SHORT, "52", ["A"], ["52"], ["26"]),
189    # Some readers send SLP_REQ in the polling loop
190    PollingFrameTestCase(_A, "5000", ["A", "U"], ["5000"]),
191]
192# 2) 7-bit short frames
193POLLING_FRAMES_TYPE_A_SHORT = [
194    PollingFrameTestCase(_A_SHORT, "20", ["U"]),
195    PollingFrameTestCase(_A_SHORT, "06", ["U"]),
196    PollingFrameTestCase(_A_SHORT, "50", ["U"]),
197    PollingFrameTestCase(_A_SHORT, "02", ["U"]),
198    PollingFrameTestCase(_A_SHORT, "70", ["U"]),
199    PollingFrameTestCase(_A_SHORT, "7a", ["U"]),
200]
201# 3)
202POLLING_FRAMES_TYPE_A_NOCRC = [
203    PollingFrameTestCase(_A_NOCRC, "aa", ["U"], ["aa"], [""]),
204    PollingFrameTestCase(_A_NOCRC, "55aa", ["U"], ["55aa"], [""]),
205    PollingFrameTestCase(_A_NOCRC, "aa55aa", ["U"], ["aa55aa"], ["aa"]),
206    PollingFrameTestCase(_A_NOCRC, "55aa55aa", ["U"], ["55aa55aa"], ["55aa"]),
207]
208# 4)
209POLLING_FRAMES_TYPE_A_LONG = [
210    PollingFrameTestCase(_A, "02f1", ["U"]),
211    PollingFrameTestCase(_A, "ff00", ["U"]),
212    PollingFrameTestCase(_A, "ff001122", ["U"]),
213    PollingFrameTestCase(_A, "ff00112233445566", ["U"]),
214    PollingFrameTestCase(_A, "ff00112233445566778899aa", ["U"]),
215    PollingFrameTestCase(_A, "000102030405060708090a0b0c0d", ["U"]),
216    PollingFrameTestCase(_A, "101112131415161718191a1b1c1d1e", ["U"]),
217    PollingFrameTestCase(_A, "202122232425262728292a2b2c2d2e2f", ["U"]),
218    PollingFrameTestCase(_A, "303132333435363738393a3b3c3d3e3f30", ["U"]),
219    PollingFrameTestCase(_A, "404142434445464748494a4b4c4d4e4f4041", ["U"]),
220    PollingFrameTestCase(_A, "505152535455565758595a5b5c5d5e5f505152", ["U"]),
221    PollingFrameTestCase(_A, "606162636465666768696a6b6c6d6e6f60616263", ["U"]),
222]
223
224# Type B
225# 1)
226POLLING_FRAMES_TYPE_B_SPECIAL = [
227    # 1.1) Common cases
228    #   REQB, AFI 0x00, TS 0x00
229    PollingFrameTestCase(_B, "050000", ["B"]),
230    #   WUPB, AFI 0x00, TS 0x00
231    PollingFrameTestCase(_B, "050008", ["B"]),
232    # 1.2) Different AFI values
233    #   REQB, AFI 0x01, TS 0x00
234    PollingFrameTestCase(_B, "050100", ["B"]),
235    #   WUPB, AFI 0x02, TS 0x00
236    PollingFrameTestCase(_B, "050208", ["B"]),
237    # 1.3) Different Timeslot counts
238    #   REQB, AFI 0x00, TS 0x01 (2)
239    PollingFrameTestCase(_B, "050001", ["B"]),
240    #   WUPB, AFI 0x00, TS 0x02 (4)
241    PollingFrameTestCase(_B, "05000a", ["B"]),
242]
243# 3)
244POLLING_FRAMES_TYPE_B_NOCRC = [
245    PollingFrameTestCase(_B_NOCRC, "aa", ["U"]),
246    PollingFrameTestCase(_B_NOCRC, "55aa", ["U"]),
247    PollingFrameTestCase(_B_NOCRC, "aa55aa", ["U"]),
248    PollingFrameTestCase(_B_NOCRC, "55aa55aa", ["U"]),
249]
250# 4)
251POLLING_FRAMES_TYPE_B_LONG = [
252    PollingFrameTestCase(_B, "02f1", ["U"]),
253    # 2 bytes
254    PollingFrameTestCase(_B, "ff00", ["U"]),
255    # 4 bytes
256    PollingFrameTestCase(_B, "ff001122", ["U"]),
257    # 8 bytes
258    PollingFrameTestCase(_B, "ff00112233445566", ["U"]),
259    # 12 bytes
260    PollingFrameTestCase(_B, "ff00112233445566778899aa", ["U"]),
261    # 16 bytes
262    PollingFrameTestCase(_B, "ff00112233445566778899aabbccddee", ["U"]),
263    # 20 bytes
264    PollingFrameTestCase(_B, "ff00112233445566778899aabbccddeeff001122", ["U"]),
265]
266
267# Type F
268# 1)
269POLLING_FRAMES_TYPE_F_SPECIAL = [
270    # 1.0) Common
271    #   SENSF_REQ, SC, 0xffff, RC 0x00, TS 0x00
272    PollingFrameTestCase(_F, "00ffff0000", ["F"]),
273    #   SENSF_REQ, SC, 0x0003, RC 0x00, TS 0x00
274    PollingFrameTestCase(_F, "0000030000", ["F"]),
275    # 1.1) Different request codes
276    #   SENSF_REQ, SC, 0xffff, RC 0x01, TS 0x00
277    PollingFrameTestCase(_F, "00ffff0100", ["F"]),
278    #   SENSF_REQ, SC, 0x0003, RC 0x01, TS 0x00
279    PollingFrameTestCase(_F, "0000030100", ["F"]),
280    # 1.2) Different Timeslot counts
281    #   SENSF_REQ, SC, 0xffff, RC 0x00, TS 0x01 (2)
282    PollingFrameTestCase(_F, "00ffff0001", ["F"]),
283    #   SENSF_REQ, SC, 0x0003, RC 0x00, TS 0x02 (4)
284    PollingFrameTestCase(_F, "0000030002", ["F"]),
285    # 2) 424 kbps
286    #   SENSF_REQ, SC, 0xffff
287    PollingFrameTestCase(_F_424, "00ffff0100", ["F"]),
288    #   SENSF_REQ, SC, 0x0003
289    PollingFrameTestCase(_F_424, "00ffff0100", ["F"]),
290]
291
292POLLING_FRAME_ALL_TEST_CASES = [
293    POLLING_FRAME_ON,
294    *POLLING_FRAMES_TYPE_A_SPECIAL,
295    *POLLING_FRAMES_TYPE_A_SHORT,
296    *POLLING_FRAMES_TYPE_A_NOCRC,
297    *POLLING_FRAMES_TYPE_A_LONG,
298    *POLLING_FRAMES_TYPE_B_SPECIAL,
299    *POLLING_FRAMES_TYPE_B_NOCRC,
300    *POLLING_FRAMES_TYPE_B_LONG,
301    *POLLING_FRAMES_TYPE_F_SPECIAL,
302    POLLING_FRAME_OFF,
303]
304
305
306EXPEDITABLE_POLLING_LOOP_EVENT_TYPES = ["F", "U"]
307
308
309def get_expedited_frames(frames):
310    """Finds and collects all expedited polling frames.
311    Expedited frames belong to F, U types and they get reported
312    to the service while the OS might still be evaluating the loop
313    """
314    expedited_frames = []
315    # Expedited frames come at the beginning
316    for frame in frames:
317        if frame.type not in EXPEDITABLE_POLLING_LOOP_EVENT_TYPES:
318            break
319        expedited_frames.append(frame)
320    return expedited_frames
321
322
323def split_frames_by_timestamp_wrap(frames, pivot_timestamp=None):
324    """Returns two lists of polling frames
325    split based on the timestamp value wrapping over to lower value
326    assuming that frames were provided in the way they arrived
327    """
328    if not frames:
329        return [], []
330    # Take the first timestamp from first frame (or the one provided)
331    # And check that timestamp for all frames that come afterwards is bigger
332    # otherwise consider them wrapped
333    pivot_timestamp = pivot_timestamp or frames[0].timestamp
334    not_wrapped = []
335    for frame in frames:
336        if frame.timestamp < pivot_timestamp:
337            break
338        not_wrapped.append(frame)
339    wrapped = frames[len(not_wrapped) :]
340    return not_wrapped, wrapped
341
342
343def apply_expedited_frame_ordering(frames, limit=3):
344    """Attempts to replicate expedited frame delivery behavior
345    of HostEmulationManager for type F, U events
346    """
347    leave, expedite = [], []
348
349    for frame in frames:
350        if frame.type in EXPEDITABLE_POLLING_LOOP_EVENT_TYPES \
351            and len(expedite) < limit:
352            expedite.append(frame)
353        else:
354            leave.append(frame)
355    return expedite + leave
356
357
358def apply_original_frame_ordering(frames):
359    """Reverts expedited frame ordering caused by HostEmulationManager,
360    useful when having the original polling frame order is preferable in a test
361
362    Call this function ONLY with a list of frames resembling a full polling loop
363    with possible expedited F, U events at the beginning.
364    """
365    if len(frames) == 0:
366        return []
367
368    expedited_frames = get_expedited_frames(frames)
369    # If no expedited frames were found at the beginning, leave
370    if len(expedited_frames) == 0:
371        return frames
372
373    # Original frames come after expedited ones
374    original_frames = frames[len(expedited_frames) :]
375
376    # In between expedited and original frames,
377    # which should be pre-sorted in their category
378    # there might be a timestamp wrap
379    original_not_wrapped, original_wrapped = split_frames_by_timestamp_wrap(
380        original_frames
381    )
382    # Non-expedited, original frame should be the first one in the loop
383    # so we can use the timestamp of the first expedited frame as a pivot
384    expedited_not_wrapped, expedited_wrapped = split_frames_by_timestamp_wrap(
385        expedited_frames,
386        pivot_timestamp=(
387            original_not_wrapped[0].timestamp
388            if len(original_not_wrapped) > 0 else None
389        ),
390    )
391
392    return sorted(
393        original_not_wrapped + expedited_not_wrapped, key=lambda f: f.timestamp
394    ) + sorted(original_wrapped + expedited_wrapped, key=lambda f: f.timestamp)
395
396
397def _test_apply_original_frame_ordering():
398    """Verifies that 'apply_original_frame_ordering' works properly"""
399    testcases = [
400        # Overflow after Normal B
401        (
402            ("O", 4), ("A", 5), ("U", 6), ("B", 7),
403            ("U", 0), ("F", 1), ("U", 2), ("X", 3)
404        ),
405        # Overflow after Expeditable
406        (
407            ("O", 4), ("A", 5), ("U", 6), ("B", 7),
408            ("U", 8), ("F", 0), ("U", 1), ("X", 2)
409        ),
410        # Overflow after Normal O
411        (("O", 4), ("A", 0), ("B", 1), ("F", 2), ("X", 3)),
412        # Overflow after Normal A
413        (("O", 4), ("A", 5), ("B", 0), ("F", 1), ("X", 2)),
414        # Overflow after Expeditable U
415        (("O", 4), ("U", 5), ("A", 0), ("B", 1), ("F", 2), ("X", 3)),
416        # No overflow
417        (("O", 0), ("A", 1), ("B", 2), ("X", 3)),
418        # No overflow
419        (("O", 0), ("A", 1), ("B", 2), ("F", 3), ("X", 4)),
420        # No overflow
421        (("O", 0), ("A", 1), ("U", 2), ("B", 3), ("U", 4), ("F", 5), ("X", 6)),
422    ]
423
424    for testcase in testcases:
425        original_frames = [
426            PollingFrame(type_, b"", timestamp)
427            for (type_, timestamp) in testcase
428        ]
429        # Test for case where none or all frames get expedited
430        for limit in range(len(original_frames)):
431            expedited_frames = apply_expedited_frame_ordering(
432                original_frames, limit=limit
433            )
434            restored_frames = apply_original_frame_ordering(expedited_frames)
435            assert original_frames == restored_frames
436
437
438# This should not raise anything when module is imported
439_test_apply_original_frame_ordering()
440
441
442_FRAME_EVENT_TIMEOUT_SEC = 1
443
444
445def poll_and_observe_frames(
446    pn532,
447    emulator,
448    testcases,
449    *,
450    restore_original_frame_ordering=False,
451    ignore_field_off_event_timeout=False,
452    **kwargs,
453):
454    """Handles broadcasting polling loop events for provided list of test cases.
455    Provided set of test cases MUST contain a complete polling loop, starting
456    with 'O' and ending with 'X' event.
457    """
458
459    assert len(testcases) > 2
460    assert testcases[0].configuration.type == "O"
461    assert testcases[-1].configuration.type == "X"
462
463    off_event_handler = None
464    for idx, testcase in enumerate(testcases):
465        configuration = testcase.configuration
466
467        # On last 'X' Event, create handler
468        if idx == len(testcases) - 1 and configuration.type == "X":
469            off_event_handler = emulator.asyncWaitForPollingFrameOff("XEvent")
470
471        time.sleep(GUARD_TIME_PER_TECH[configuration.type])
472
473        if configuration.type == "O":
474            pn532.unmute()
475        elif configuration.type == "X":
476            pn532.mute()
477        else:
478            if "power_level" in kwargs:
479                configuration = configuration.replace(
480                    power=kwargs["power_level"]
481                )
482            pn532.send_broadcast(
483                data=bytes.fromhex(testcase.data),
484                configuration=configuration
485            )
486        if configuration.type in {"O", "X"}:
487            time.sleep(GUARD_TIME_PER_TECH[configuration.type])
488
489    try:
490        if off_event_handler is not None:
491            off_event_handler.waitAndGet("XEvent", _FRAME_EVENT_TIMEOUT_SEC)
492    except (Exception, ) as e:
493        if not ignore_field_off_event_timeout:
494            emulator.log.warning( f"Timed out waiting for 'X' event due to {e}")
495
496    frames = [PollingFrame.from_dict(f) for f in emulator.getPollingFrames()]
497
498    if restore_original_frame_ordering:
499        # Attempt to revert expedited frame delivery ordering for U and F frames
500        # while keeping timestamp wrapping into account
501        frames = apply_original_frame_ordering(frames)
502
503    return frames
504
505
506def get_frame_test_stats(testcases, frames, timestamps=()):
507    """Creates a dict containing test info for error output"""
508    if len(timestamps) == 0:
509        timestamps = [-1] * len(testcases)
510
511    return  {
512        "frames_sent_count": len(testcases),
513        "frames_received_count": len(frames),
514        "frames_sent": [
515            testcase.format_for_error(timestamp=timestamp)
516            for timestamp, testcase in zip(timestamps, testcases)
517        ],
518        "frames_received": [frame.to_dict() for frame in frames],
519    }
520