xref: /aosp_15_r20/external/pigweed/pw_tokenizer/py/pw_tokenizer/proto/__init__.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Utilities for working with tokenized fields in protobufs."""
15
16from typing import Iterator
17
18from google.protobuf.descriptor import FieldDescriptor
19from google.protobuf.message import Message
20
21from pw_tokenizer_proto import options_pb2
22from pw_tokenizer import detokenize, encode
23
24
25def _tokenized_fields(proto: Message) -> Iterator[FieldDescriptor]:
26    for field in proto.DESCRIPTOR.fields:
27        extensions = field.GetOptions().Extensions
28        if (
29            options_pb2.format in extensions
30            and extensions[options_pb2.format]
31            == options_pb2.TOKENIZATION_OPTIONAL
32        ):
33            yield field
34
35
36def decode_optionally_tokenized(
37    detokenizer: detokenize.Detokenizer | None,
38    data: bytes,
39) -> str:
40    """Decodes data that may be plain text or binary / Base64 tokenized text.
41
42    Args:
43      detokenizer: detokenizer to use; if `None`, binary logs as Base64 encoded
44      data: encoded text or binary data
45    """
46    prefix = detokenizer.prefix if detokenizer else encode.NESTED_TOKEN_PREFIX
47
48    if detokenizer:
49        # Try detokenizing as binary.
50        result = detokenizer.detokenize(data)
51        if result.best_result() is not None:
52            # Rather than just returning the detokenized string, continue
53            # detokenization in case recursive Base64 detokenization is needed.
54            data = str(result).encode()
55
56    # Attempt to decode as UTF-8.
57    try:
58        text = data.decode()
59    except UnicodeDecodeError:
60        # Not UTF-8. Assume the token is unknown or the data is corrupt.
61        return encode.prefixed_base64(data, prefix)
62
63    # See if the string is prefixed Base64 or contains prefixed Base64.
64    if detokenizer:
65        detokenized = detokenize.detokenize_base64(detokenizer, data)
66        if detokenized != data:  # If detokenized successfully, use the result.
67            return detokenized.decode()
68
69    # Attempt to determine whether this is an unknown token or plain text.
70    # Any string with only printable or whitespace characters is plain text.
71    if ''.join(text.split()).isprintable():
72        return text
73
74    # Assume this field is tokenized data that could not be decoded.
75    return encode.prefixed_base64(data, prefix)
76
77
78def detokenize_fields(
79    detokenizer: detokenize.Detokenizer | None,
80    proto: Message,
81) -> None:
82    """Detokenizes fields annotated as tokenized in the given proto.
83
84    The fields are replaced with their detokenized version in the proto.
85    Tokenized fields are bytes fields, so the detokenized string is stored as
86    bytes. Call .decode() to convert the detokenized string from bytes to str.
87    """
88    for field in _tokenized_fields(proto):
89        decoded = decode_optionally_tokenized(
90            detokenizer, getattr(proto, field.name)
91        )
92        setattr(proto, field.name, decoded.encode())
93