1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Utilities for working with tokenized fields in protobufs.""" 15 16from typing import Iterator 17 18from google.protobuf.descriptor import FieldDescriptor 19from google.protobuf.message import Message 20 21from pw_tokenizer_proto import options_pb2 22from pw_tokenizer import detokenize, encode 23 24 25def _tokenized_fields(proto: Message) -> Iterator[FieldDescriptor]: 26 for field in proto.DESCRIPTOR.fields: 27 extensions = field.GetOptions().Extensions 28 if ( 29 options_pb2.format in extensions 30 and extensions[options_pb2.format] 31 == options_pb2.TOKENIZATION_OPTIONAL 32 ): 33 yield field 34 35 36def decode_optionally_tokenized( 37 detokenizer: detokenize.Detokenizer | None, 38 data: bytes, 39) -> str: 40 """Decodes data that may be plain text or binary / Base64 tokenized text. 41 42 Args: 43 detokenizer: detokenizer to use; if `None`, binary logs as Base64 encoded 44 data: encoded text or binary data 45 """ 46 prefix = detokenizer.prefix if detokenizer else encode.NESTED_TOKEN_PREFIX 47 48 if detokenizer: 49 # Try detokenizing as binary. 50 result = detokenizer.detokenize(data) 51 if result.best_result() is not None: 52 # Rather than just returning the detokenized string, continue 53 # detokenization in case recursive Base64 detokenization is needed. 54 data = str(result).encode() 55 56 # Attempt to decode as UTF-8. 57 try: 58 text = data.decode() 59 except UnicodeDecodeError: 60 # Not UTF-8. Assume the token is unknown or the data is corrupt. 61 return encode.prefixed_base64(data, prefix) 62 63 # See if the string is prefixed Base64 or contains prefixed Base64. 64 if detokenizer: 65 detokenized = detokenize.detokenize_base64(detokenizer, data) 66 if detokenized != data: # If detokenized successfully, use the result. 67 return detokenized.decode() 68 69 # Attempt to determine whether this is an unknown token or plain text. 70 # Any string with only printable or whitespace characters is plain text. 71 if ''.join(text.split()).isprintable(): 72 return text 73 74 # Assume this field is tokenized data that could not be decoded. 75 return encode.prefixed_base64(data, prefix) 76 77 78def detokenize_fields( 79 detokenizer: detokenize.Detokenizer | None, 80 proto: Message, 81) -> None: 82 """Detokenizes fields annotated as tokenized in the given proto. 83 84 The fields are replaced with their detokenized version in the proto. 85 Tokenized fields are bytes fields, so the detokenized string is stored as 86 bytes. Call .decode() to convert the detokenized string from bytes to str. 87 """ 88 for field in _tokenized_fields(proto): 89 decoded = decode_optionally_tokenized( 90 detokenizer, getattr(proto, field.name) 91 ) 92 setattr(proto, field.name, decoded.encode()) 93