xref: /aosp_15_r20/external/pigweed/pw_containers/py/pw_containers/inline_var_len_entry_queue.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2023 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Decodes the in-memory representation of a sized-entry ring buffer."""
15
16import struct
17from typing import Iterable
18
19_HEADER = struct.Struct('III')  # data_size_bytes, head, tail
20
21
22def _decode_leb128(
23    data: bytes, offset: int = 0, max_bits: int = 32
24) -> tuple[int, int]:
25    count = value = shift = 0
26
27    while offset < len(data):
28        byte = data[offset]
29
30        count += 1
31        value |= (byte & 0x7F) << shift
32
33        if not byte & 0x80:
34            return offset + count, value
35
36        shift += 7
37        if shift >= max_bits:
38            raise ValueError(f'Varint exceeded {max_bits}-bit limit')
39
40    raise ValueError(f'Unterminated varint {data[offset:]!r}')
41
42
43def parse(queue: bytes) -> Iterable[bytes]:
44    """Decodes the in-memory representation of a variable-length entry queue.
45
46    Args:
47      queue: The bytes representation of a variable-length entry queue.
48
49    Yields:
50      Each entry in the buffer as bytes.
51    """
52    array_size_bytes, head, tail = _HEADER.unpack_from(queue)
53
54    total_encoded_size = _HEADER.size + array_size_bytes
55    if len(queue) < total_encoded_size:
56        raise ValueError(
57            f'Ring buffer data ({len(queue)} B) is smaller than the encoded '
58            f'size ({total_encoded_size} B)'
59        )
60
61    data = queue[_HEADER.size : total_encoded_size]
62
63    if tail < head:
64        data = data[head:] + data[:tail]
65    else:
66        data = data[head:tail]
67
68    index = 0
69    while index < len(data):
70        index, size = _decode_leb128(data, index)
71
72        if index + size > len(data):
73            raise ValueError(
74                f'Corruption detected; '
75                f'encoded size {size} B is too large for a {len(data)} B array'
76            )
77        yield data[index : index + size]
78
79        index += size
80