#!/usr/bin/env python # Copyright 2020 The Pigweed Authors # # Licensed under the Apache License, Version 2.0 (the "License"); you may not # use this file except in compliance with the License. You may obtain a copy of # the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations under # the License. """Contains the Python decoder tests and generates C++ decoder tests.""" from __future__ import annotations import queue from typing import Iterator, NamedTuple import unittest from pw_build.generated_tests import Context, PyTest, TestGenerator, GroupOrTest from pw_build.generated_tests import parse_test_generation_args from pw_hdlc.decode import ( Frame, FrameDecoder, FrameAndNonFrameDecoder, FrameStatus, NO_ADDRESS, ) from pw_hdlc.protocol import frame_check_sequence as fcs from pw_hdlc.protocol import encode_address def _encode(address: int, control: int, data: bytes) -> bytes: frame = encode_address(address) + bytes([control]) + data frame += fcs(frame) frame = frame.replace(b'}', b'}\x5d') frame = frame.replace(b'~', b'}\x5e') return b''.join([b'~', frame, b'~']) class Expected(NamedTuple): address: int control: bytes data: bytes status: FrameStatus = FrameStatus.OK @classmethod def error(cls, status: FrameStatus): assert status is not FrameStatus.OK return cls(NO_ADDRESS, b'', b'', status) def __eq__(self, other) -> bool: """Define == so an Expected and a Frame can be compared.""" return ( self.address == other.address and self.control == other.control and self.data == other.data and self.status is other.status ) class ExpectedRaw(NamedTuple): raw_encoded: bytes status: FrameStatus def __eq__(self, other) -> bool: """Define == so an ExpectedRaw and a Frame can be compared.""" return ( self.raw_encoded == other.raw_encoded and self.status is other.status ) class TestCase(NamedTuple): data: bytes frames: list[Expected | ExpectedRaw] raw_data: bytes def case(data: bytes, frames: list, raw: bytes | None = None) -> TestCase: """Creates a TestCase, filling in the default value for the raw bytes.""" if raw is not None: return TestCase(data, frames, raw) if not frames or all(f.status is not FrameStatus.OK for f in frames): return TestCase(data, frames, data) if all(f.status is FrameStatus.OK for f in frames): return TestCase(data, frames, b'') raise AssertionError( f'Must specify expected non-frame data for this test case ({data=})!' ) _PARTIAL = fcs(b'\x0ACmsg\x5e') _ESCAPED_FLAG_TEST_CASE = case( b'~\x0ACmsg}~' + _PARTIAL + b'~', [ Expected.error(FrameStatus.FRAMING_ERROR), Expected.error(FrameStatus.FRAMING_ERROR), ], ) # Test cases are a tuple with the following elements: # # - raw data stream # - expected valid & invalid frames # - [optional] expected raw, non-HDLC data; defaults to the full raw data # stream if no valid frames are expected, or b'' if only valid frames are # expected # # These tests are executed twice: once for the standard HDLC decoder, and a # second time for the FrameAndNonFrameDecoder. The FrameAndNonFrameDecoder tests # flush the non-frame data to simulate a timeout or MTU overflow, so the # expected raw data includes all bytes not in an HDLC frame. TEST_CASES: tuple[GroupOrTest[TestCase], ...] = ( 'Empty payload', case(_encode(0, 0, b''), [Expected(0, b'\0', b'')]), case(_encode(55, 0x99, b''), [Expected(55, b'\x99', b'')]), case(_encode(55, 0x99, b'') * 3, [Expected(55, b'\x99', b'')] * 3), 'Simple one-byte payload', case(_encode(0, 0, b'\0'), [Expected(0, b'\0', b'\0')]), case(_encode(123, 0, b'A'), [Expected(123, b'\0', b'A')]), 'Simple multi-byte payload', case( _encode(0, 0, b'Hello, world!'), [Expected(0, b'\0', b'Hello, world!')] ), case(_encode(123, 0, b'\0\0\1\0\0'), [Expected(123, b'\0', b'\0\0\1\0\0')]), 'Escaped one-byte payload', case(_encode(1, 2, b'~'), [Expected(1, b'\2', b'~')]), case(_encode(1, 2, b'}'), [Expected(1, b'\2', b'}')]), case( _encode(1, 2, b'~') + _encode(1, 2, b'}'), [Expected(1, b'\2', b'~'), Expected(1, b'\2', b'}')], ), 'Escaped address', case(_encode(0x7E, 0, b'A'), [Expected(0x7E, b'\0', b'A')]), case(_encode(0x7D, 0, b'B'), [Expected(0x7D, b'\0', b'B')]), 'Escaped control', case(_encode(0, 0x7E, b'C'), [Expected(0, b'~', b'C')]), case(_encode(0, 0x7D, b'D'), [Expected(0, b'}', b'D')]), 'Escaped address and control', case(_encode(0x7E, 0x7D, b'E'), [Expected(0x7E, b'}', b'E')]), case(_encode(0x7D, 0x7E, b'F'), [Expected(0x7D, b'~', b'F')]), case(_encode(0x7E, 0x7E, b'~'), [Expected(0x7E, b'~', b'~')]), 'Multibyte address', case( _encode(128, 0, b'big address'), [Expected(128, b'\0', b'big address')] ), case( _encode(0xFFFFFFFF, 0, b'\0\0\1\0\0'), [Expected(0xFFFFFFFF, b'\0', b'\0\0\1\0\0')], ), 'Multiple frames separated by single flag', case( _encode(0, 0, b'A')[:-1] + _encode(1, 2, b'123'), [Expected(0, b'\0', b'A'), Expected(1, b'\2', b'123')], ), case( _encode(0xFF, 0, b'Yo')[:-1] * 3 + b'~', [Expected(0xFF, b'\0', b'Yo')] * 3, ), 'Empty frames produce framing errors with raw data', case(b'~~', [ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR)], b'~~'), case( b'~' * 10, [ ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), ], ), case( b'~~' + _encode(1, 2, b'3') + b'~' * 5, [ ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), Expected(1, b'\2', b'3'), ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), # One flag byte remains in the decoding state machine. ], b'~~~~~~~', ), case( b'~' * 10 + _encode(1, 2, b':O') + b'~' * 3 + _encode(3, 4, b':P'), [ ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), Expected(1, b'\2', b':O'), ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), Expected(3, b'\4', b':P'), ], b'~' * 13, ), 'Cannot escape flag', case( b'~\xAA}~\xab\x00Hello' + fcs(b'\xab\0Hello') + b'~', [ Expected.error(FrameStatus.FRAMING_ERROR), Expected(0x55, b'\0', b'Hello'), ], b'~\xAA}', ), _ESCAPED_FLAG_TEST_CASE, 'Frame too short', case(b'~1~', [Expected.error(FrameStatus.FRAMING_ERROR)]), case(b'~12~', [Expected.error(FrameStatus.FRAMING_ERROR)]), case(b'~12345~', [Expected.error(FrameStatus.FRAMING_ERROR)]), 'Multibyte address too long', case( _encode(2**100, 0, b'too long'), [Expected.error(FrameStatus.BAD_ADDRESS)], ), 'Incorrect frame check sequence', case(b'~123456~', [Expected.error(FrameStatus.FCS_MISMATCH)]), case( b'~\1\2msg\xff\xff\xff\xff~', [Expected.error(FrameStatus.FCS_MISMATCH)] ), case( _encode(0xA, 0xB, b'???')[:-2] + _encode(1, 2, b'def'), [ Expected.error(FrameStatus.FCS_MISMATCH), Expected(1, b'\2', b'def'), ], _encode(0xA, 0xB, b'???')[:-2], ), 'Invalid escape in address', case( b'~}}\0' + fcs(b'\x5d\0') + b'~', [Expected.error(FrameStatus.FRAMING_ERROR)], ), 'Invalid escape in control', case( b'~\0}}' + fcs(b'\0\x5d') + b'~', [Expected.error(FrameStatus.FRAMING_ERROR)], ), 'Invalid escape in data', case( b'~\0\1}}' + fcs(b'\0\1\x5d') + b'~', [Expected.error(FrameStatus.FRAMING_ERROR)], ), 'Frame ends with escape', case(b'~}~', [Expected.error(FrameStatus.FRAMING_ERROR)]), case(b'~\1}~', [Expected.error(FrameStatus.FRAMING_ERROR)]), case(b'~\1\2abc}~', [Expected.error(FrameStatus.FRAMING_ERROR)]), case(b'~\1\2abcd}~', [Expected.error(FrameStatus.FRAMING_ERROR)]), case(b'~\1\2abcd1234}~', [Expected.error(FrameStatus.FRAMING_ERROR)]), 'Inter-frame data is only escapes', case( b'~}~}~', [ Expected.error(FrameStatus.FRAMING_ERROR), Expected.error(FrameStatus.FRAMING_ERROR), ], ), case( b'~}}~}}~', [ Expected.error(FrameStatus.FRAMING_ERROR), Expected.error(FrameStatus.FRAMING_ERROR), ], ), 'Data before first flag', case(b'\0\1' + fcs(b'\0\1'), []), case( b'\0\1' + fcs(b'\0\1') + b'~', [Expected.error(FrameStatus.FRAMING_ERROR)], ), 'No frames emitted until flag', case(_encode(1, 2, b'3')[:-1], []), case(b'~' + _encode(1, 2, b'3')[1:-1] * 2, []), 'Only flag and escape characters can be escaped', case( b'~}\0' + _encode(1, 2, b'3'), [Expected.error(FrameStatus.FRAMING_ERROR), Expected(1, b'\2', b'3')], b'~}\0', ), case( b'~1234}a' + _encode(1, 2, b'3'), [Expected.error(FrameStatus.FRAMING_ERROR), Expected(1, b'\2', b'3')], b'~1234}a', ), 'Invalid frame records raw data', case(b'Hello?~', [ExpectedRaw(b'Hello?~', FrameStatus.FRAMING_ERROR)]), case( b'~~Hel}}lo~', [ Expected.error(FrameStatus.FRAMING_ERROR), ExpectedRaw(b'Hel}}lo~', FrameStatus.FRAMING_ERROR), ], ), case( b'Hello?~~~~~', [ ExpectedRaw(b'Hello?~', FrameStatus.FRAMING_ERROR), Expected.error(FrameStatus.FRAMING_ERROR), Expected.error(FrameStatus.FRAMING_ERROR), ], ), case( b'~~~~Hello?~~~~~', [ ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), ExpectedRaw(b'Hello?~', FrameStatus.FCS_MISMATCH), ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), ], ), case( b'Hello?~~Goodbye~', [ ExpectedRaw(b'Hello?~', FrameStatus.FRAMING_ERROR), ExpectedRaw(b'~Goodbye~', FrameStatus.FCS_MISMATCH), ], ), 'Valid data followed by frame followed by invalid', case( b'Hi~ this is a log message\r\n' + _encode(0, 0, b'') + b'More log messages!\r\n', [ Expected.error(FrameStatus.FRAMING_ERROR), Expected.error(FrameStatus.FCS_MISMATCH), Expected(0, b'\0', b''), ], b'Hi~ this is a log message\r\nMore log messages!\r\n', ), case( b'Hi~ this is a log message\r\n', [Expected.error(FrameStatus.FRAMING_ERROR)], ), case( b'~Hi~' + _encode(1, 2, b'def') + b' How are you?', [Expected.error(FrameStatus.FRAMING_ERROR), Expected(1, b'\2', b'def')], b'~Hi~ How are you?', ), ) # Formatting for the above tuple is very slow, so disable yapf. Manually enable # it as needed to format the test cases. _TESTS = TestGenerator(TEST_CASES) def _expected(frames: list[Frame]) -> Iterator[str]: for i, frame in enumerate(frames, 1): if frame.ok(): yield f' Frame::Parse(kDecodedFrame{i:02}).value(),' elif frame.status is FrameStatus.BAD_ADDRESS: yield f' Frame::Parse(kDecodedFrame{i:02}).status(),' else: yield f' Status::DataLoss(), // Frame {i}' _CPP_HEADER = """\ #include "pw_hdlc/decoder.h" #include #include #include #include "pw_unit_test/framework.h" #include "pw_bytes/array.h" namespace pw::hdlc { namespace { """ _CPP_FOOTER = """\ } // namespace } // namespace pw::hdlc""" _TS_HEADER = """\ import 'jasmine'; import {Buffer} from 'buffer'; import {Decoder, FrameStatus} from './decoder' import * as protocol from './protocol' import * as util from './util' class Expected { address: number control: Uint8Array data: Uint8Array status: FrameStatus constructor( address: number, control: Uint8Array, data: Uint8Array, status: FrameStatus) { this.address = address; this.control = control; this.data = data; this.status = status; } } class ExpectedRaw { raw: Uint8Array status: FrameStatus constructor(raw: Uint8Array, status: FrameStatus) { this.status = status; this.raw = raw; } } describe('Decoder', () => { let decoder: Decoder; let textEncoder: TextEncoder; beforeEach(() => { decoder = new Decoder(); textEncoder = new TextEncoder(); }); """ _TS_FOOTER = """\ }); """ def _py_only_frame(frame: Frame) -> bool: """Returns true for frames only returned by the Python library""" return ( frame.status is FrameStatus.FRAMING_ERROR and frame.raw_encoded == b'~~' ) def _cpp_test(ctx: Context) -> Iterator[str]: """Generates a C++ test for the provided test data.""" data, _, _ = ctx.test_case frames = [ f for f in list(FrameDecoder().process(data)) if not _py_only_frame(f) ] data_bytes = ''.join(rf'\x{byte:02x}' for byte in data) yield f'TEST(Decoder, {ctx.cc_name()}) {{' yield f' static constexpr auto kData = bytes::String("{data_bytes}");\n' for i, frame in enumerate(frames, 1): if frame.ok() or frame.status is FrameStatus.BAD_ADDRESS: frame_bytes = ''.join( rf'\x{byte:02x}' for byte in frame.raw_decoded ) yield ( f' static constexpr auto kDecodedFrame{i:02} = ' f'bytes::String("{frame_bytes}");' ) else: yield f' // Frame {i}: {frame.status.value}' yield '' expected = '\n'.join(_expected(frames)) or ' // No frames' decoder_size = max(len(data), 8) # Make sure large enough for a frame yield f"""\ DecoderBuffer<{decoder_size}> decoder; static std::array, {len(frames)}> kExpected = {{ {expected} }}; size_t decoded_frames = 0; decoder.Process(kData, [&](const Result& result) {{ ASSERT_LT(decoded_frames++, kExpected.size()); auto& expected = kExpected[decoded_frames - 1]; if (std::holds_alternative(expected)) {{ EXPECT_EQ(Status::DataLoss(), result.status()); }} else {{ ASSERT_EQ(OkStatus(), result.status()); const Frame& decoded_frame = result.value(); const Frame& expected_frame = std::get(expected); EXPECT_EQ(expected_frame.address(), decoded_frame.address()); EXPECT_EQ(expected_frame.control(), decoded_frame.control()); ASSERT_EQ(expected_frame.data().size(), decoded_frame.data().size()); EXPECT_EQ(std::memcmp(expected_frame.data().data(), decoded_frame.data().data(), expected_frame.data().size()), 0); }} }}); EXPECT_EQ(decoded_frames, kExpected.size()); }}""" def _define_py_decoder_test(ctx: Context) -> PyTest: data, expected_frames, _ = ctx.test_case def test(self) -> None: self.maxDiff = None # Decode in one call self.assertEqual( expected_frames, list(FrameDecoder().process(data)), msg=f'{ctx.group}: {data!r}', ) # Decode byte-by-byte decoder = FrameDecoder() decoded_frames: list[Frame] = [] for i in range(len(data)): decoded_frames += decoder.process(data[i : i + 1]) self.assertEqual( expected_frames, decoded_frames, msg=f'{ctx.group} (byte-by-byte): {data!r}', ) return test def _define_raw_decoder_py_test(ctx: Context) -> PyTest: raw_data, expected_frames, expected_non_frame_data = ctx.test_case # The non-frame data decoder only yields valid frames. expected_frames = [f for f in expected_frames if f.status is FrameStatus.OK] def test(self) -> None: self.maxDiff = None non_frame_data = bytearray() # Decode in one call decoder = FrameAndNonFrameDecoder( non_frame_data_handler=non_frame_data.extend ) self.assertEqual( expected_frames, list(decoder.process(raw_data)), msg=f'{ctx.group}: {raw_data!r}', ) decoder.flush_non_frame_data() self.assertEqual(expected_non_frame_data, bytes(non_frame_data)) # Decode byte-by-byte non_frame_data.clear() decoder = FrameAndNonFrameDecoder( non_frame_data_handler=non_frame_data.extend ) decoded_frames: list[Frame] = [] for i in range(len(raw_data)): decoded_frames += decoder.process(raw_data[i : i + 1]) self.assertEqual( expected_frames, decoded_frames, msg=f'{ctx.group} (byte-by-byte): {raw_data!r}', ) decoder.flush_non_frame_data() self.assertEqual(expected_non_frame_data, bytes(non_frame_data)) return test def _ts_byte_array(data: bytes) -> str: return '[' + ', '.join(rf'0x{byte:02x}' for byte in data) + ']' def _ts_test(ctx: Context) -> Iterator[str]: """Generates a TS test for the provided test data.""" data, _, _ = ctx.test_case frames = [ f for f in list(FrameDecoder().process(data)) if not _py_only_frame(f) ] data_bytes = _ts_byte_array(data) yield f' it(\'{ctx.ts_name()}\', () => {{' yield f' const data = new Uint8Array({data_bytes});' yield ' const expectedFrames = [' for frame in frames: control_bytes = _ts_byte_array(frame.control) frame_bytes = _ts_byte_array(frame.data) if frame is Expected: yield ( f' new Expected({frame.address}, ' f'new Uint8Array({control_bytes}), ' f'new Uint8Array({frame_bytes}), {frame.status}),' ) else: raw = _ts_byte_array(frame.raw_encoded) yield ( f' new ExpectedRaw(new Uint8Array({raw}), {frame.status}),' ) yield ' ].values();\n' yield """\ const result = decoder.process(data); while (true) { const expectedFrame = expectedFrames.next(); const actualFrame = result.next(); if (expectedFrame.done && actualFrame.done) { break; } expect(expectedFrame.done).toBeFalse(); expect(actualFrame.done).toBeFalse(); const expected = expectedFrame.value; const actual = actualFrame.value; if (expected instanceof Expected) { expect(actual.address).toEqual(expected.address); expect(actual.control).toEqual(expected.control); expect(actual.data).toEqual(expected.data); expect(actual.status).toEqual(expected.status); } else { // Expected Raw expect(actual.rawEncoded).toEqual(expected.raw); expect(actual.status).toEqual(expected.status); } } }); """ # Class that tests all cases in TEST_CASES. DecoderTest = _TESTS.python_tests('DecoderTest', _define_py_decoder_test) NonFrameDecoderTest = _TESTS.python_tests( 'NonFrameDecoderTest', _define_raw_decoder_py_test ) class AdditionalNonFrameDecoderTests(unittest.TestCase): """Additional tests for the non-frame decoder.""" def test_shared_flags_waits_for_tilde_to_emit_data(self) -> None: non_frame_data = bytearray() decoder = FrameAndNonFrameDecoder(non_frame_data.extend) self.assertEqual( [Expected(0, b'\0', b'')], list(decoder.process(_encode(0, 0, b''))) ) self.assertEqual(non_frame_data, b'') self.assertEqual([], list(decoder.process(b'uh oh, no tilde!'))) self.assertEqual(non_frame_data, b'') self.assertEqual([], list(decoder.process(b'~'))) self.assertEqual(non_frame_data, b'uh oh, no tilde!') def test_no_shared_flags_immediately_emits_data(self) -> None: non_frame_data = bytearray() decoder = FrameAndNonFrameDecoder( non_frame_data.extend, handle_shared_flags=False ) self.assertEqual( [Expected(0, b'\0', b'')], list(decoder.process(_encode(0, 0, b''))) ) self.assertEqual(non_frame_data, b'') self.assertEqual([], list(decoder.process(b'uh oh, no tilde!'))) self.assertEqual(non_frame_data, b'uh oh, no tilde!') def test_emits_data_if_mtu_is_exceeded(self) -> None: frame_start = b'~this looks like a real frame' non_frame_data = bytearray() decoder = FrameAndNonFrameDecoder( non_frame_data.extend, mtu=len(frame_start) ) self.assertEqual([], list(decoder.process(frame_start))) self.assertEqual(non_frame_data, b'') self.assertEqual([], list(decoder.process(b'!'))) self.assertEqual(non_frame_data, frame_start + b'!') def test_emits_data_if_timeout_expires(self) -> None: frame_start = b'~this looks like a real frame' non_frame_data: queue.Queue[bytes] = queue.Queue() decoder = FrameAndNonFrameDecoder(non_frame_data.put, timeout_s=0.001) self.assertEqual([], list(decoder.process(frame_start))) self.assertEqual(non_frame_data.get(timeout=2), frame_start) def test_emits_raw_data_and_valid_frame_if_flushed_partway(self) -> None: payload = b'Do you wanna ride in my blimp?' frame = _encode(1, 2, payload) non_frame_data = bytearray() decoder = FrameAndNonFrameDecoder(non_frame_data.extend) self.assertEqual([], list(decoder.process(frame[:5]))) decoder.flush_non_frame_data() self.assertEqual( [Expected(1, b'\2', payload)], list(decoder.process(frame[5:])) ) if __name__ == '__main__': args = parse_test_generation_args() if args.generate_cc_test: _TESTS.cc_tests( args.generate_cc_test, _cpp_test, _CPP_HEADER, _CPP_FOOTER ) elif args.generate_ts_test: _TESTS.ts_tests(args.generate_ts_test, _ts_test, _TS_HEADER, _TS_FOOTER) else: unittest.main()