1# Copyright 2023 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Decodes the in-memory representation of a sized-entry ring buffer.""" 15 16import struct 17from typing import Iterable 18 19_HEADER = struct.Struct('III') # data_size_bytes, head, tail 20 21 22def _decode_leb128( 23 data: bytes, offset: int = 0, max_bits: int = 32 24) -> tuple[int, int]: 25 count = value = shift = 0 26 27 while offset < len(data): 28 byte = data[offset] 29 30 count += 1 31 value |= (byte & 0x7F) << shift 32 33 if not byte & 0x80: 34 return offset + count, value 35 36 shift += 7 37 if shift >= max_bits: 38 raise ValueError(f'Varint exceeded {max_bits}-bit limit') 39 40 raise ValueError(f'Unterminated varint {data[offset:]!r}') 41 42 43def parse(queue: bytes) -> Iterable[bytes]: 44 """Decodes the in-memory representation of a variable-length entry queue. 45 46 Args: 47 queue: The bytes representation of a variable-length entry queue. 48 49 Yields: 50 Each entry in the buffer as bytes. 51 """ 52 array_size_bytes, head, tail = _HEADER.unpack_from(queue) 53 54 total_encoded_size = _HEADER.size + array_size_bytes 55 if len(queue) < total_encoded_size: 56 raise ValueError( 57 f'Ring buffer data ({len(queue)} B) is smaller than the encoded ' 58 f'size ({total_encoded_size} B)' 59 ) 60 61 data = queue[_HEADER.size : total_encoded_size] 62 63 if tail < head: 64 data = data[head:] + data[:tail] 65 else: 66 data = data[head:tail] 67 68 index = 0 69 while index < len(data): 70 index, size = _decode_leb128(data, index) 71 72 if index + size > len(data): 73 raise ValueError( 74 f'Corruption detected; ' 75 f'encoded size {size} B is too large for a {len(data)} B array' 76 ) 77 yield data[index : index + size] 78 79 index += size 80