# Copyright 2020 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License") # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. r"""Custom Text-format functions for Tink Keys, Keysets and Key Templates. Tink keys contain a serialized proto. Because we don't use any proto, the text output of the proto library is not helpful. The function key_util.text_format(msg: message.Message) is similar to text_format.MessageToString(msg), but additionally output the parsed serialized proto as a comment, which makes the proto human readable, but keep them readable by machines. For example, the AES128_EAX template looks like this: type_url: "type.googleapis.com/google.crypto.tink.AesEaxKey" # value: [type.googleapis.com/google.crypto.tink.AesEaxKeyFormat] { # params { # iv_size: 16 # } # key_size: 16 # } value: "\n\002\010\020\020\020" output_prefix_type: TINK The function assert_tink_proto_equal(self, a: message.Message, b: message.Message) can be used in tests to assert that two protos must be equal. If they are not equal, the function tries to output a meaningfull error message. """ import copy from typing import Any, Optional # copybara:tink_placeholder(encoder) from google.protobuf import descriptor from google.protobuf import message from google.protobuf import text_encoding from google.protobuf import text_format as proto_text_format from tink.proto import aes_cmac_pb2 from tink.proto import aes_cmac_prf_pb2 from tink.proto import aes_ctr_hmac_aead_pb2 from tink.proto import aes_ctr_hmac_streaming_pb2 from tink.proto import aes_eax_pb2 from tink.proto import aes_gcm_hkdf_streaming_pb2 from tink.proto import aes_gcm_pb2 from tink.proto import aes_gcm_siv_pb2 from tink.proto import aes_siv_pb2 from tink.proto import chacha20_poly1305_pb2 from tink.proto import ecdsa_pb2 from tink.proto import ecies_aead_hkdf_pb2 from tink.proto import ed25519_pb2 from tink.proto import hkdf_prf_pb2 from tink.proto import hmac_pb2 from tink.proto import hmac_prf_pb2 from tink.proto import hpke_pb2 from tink.proto import jwt_ecdsa_pb2 from tink.proto import jwt_hmac_pb2 from tink.proto import jwt_rsa_ssa_pkcs1_pb2 from tink.proto import jwt_rsa_ssa_pss_pb2 from tink.proto import kms_aead_pb2 from tink.proto import kms_envelope_pb2 from tink.proto import rsa_ssa_pkcs1_pb2 from tink.proto import rsa_ssa_pss_pb2 from tink.proto import xchacha20_poly1305_pb2 TYPE_STRING = 9 TYPE_MESSAGE = 11 TYPE_BYTES = 12 TYPE_ENUM = 14 LABEL_REPEATED = 3 TYPE_PREFIX = 'type.googleapis.com/' class KeyProto: """A map from type URLs to key protos and key format protos.""" _from_url = {} _format_from_url = {} @classmethod def from_url(cls, type_url: str) -> Any: return cls._from_url[type_url] @classmethod def format_from_url(cls, type_url: str) -> Any: return cls._format_from_url[type_url] @classmethod def add_key_type(cls, key_type: Any, key_format_type: Any): type_url = TYPE_PREFIX + key_type.DESCRIPTOR.full_name cls._from_url[type_url] = key_type cls._format_from_url[type_url] = key_format_type KeyProto.add_key_type(aes_eax_pb2.AesEaxKey, aes_eax_pb2.AesEaxKeyFormat) KeyProto.add_key_type(aes_gcm_pb2.AesGcmKey, aes_gcm_pb2.AesGcmKeyFormat) KeyProto.add_key_type(aes_gcm_siv_pb2.AesGcmSivKey, aes_gcm_siv_pb2.AesGcmSivKeyFormat) KeyProto.add_key_type(aes_ctr_hmac_aead_pb2.AesCtrHmacAeadKey, aes_ctr_hmac_aead_pb2.AesCtrHmacAeadKeyFormat) KeyProto.add_key_type(chacha20_poly1305_pb2.ChaCha20Poly1305Key, chacha20_poly1305_pb2.ChaCha20Poly1305KeyFormat) KeyProto.add_key_type(xchacha20_poly1305_pb2.XChaCha20Poly1305Key, xchacha20_poly1305_pb2.XChaCha20Poly1305KeyFormat) KeyProto.add_key_type(aes_siv_pb2.AesSivKey, aes_siv_pb2.AesSivKeyFormat) KeyProto.add_key_type(aes_ctr_hmac_streaming_pb2.AesCtrHmacStreamingKey, aes_ctr_hmac_streaming_pb2.AesCtrHmacStreamingKeyFormat) KeyProto.add_key_type(aes_gcm_hkdf_streaming_pb2.AesGcmHkdfStreamingKey, aes_gcm_hkdf_streaming_pb2.AesGcmHkdfStreamingKeyFormat) KeyProto.add_key_type(ecies_aead_hkdf_pb2.EciesAeadHkdfPrivateKey, ecies_aead_hkdf_pb2.EciesAeadHkdfKeyFormat) KeyProto.add_key_type(ecies_aead_hkdf_pb2.EciesAeadHkdfPublicKey, ecies_aead_hkdf_pb2.EciesAeadHkdfKeyFormat) KeyProto.add_key_type(hpke_pb2.HpkePrivateKey, hpke_pb2.HpkeKeyFormat) KeyProto.add_key_type(hpke_pb2.HpkePublicKey, hpke_pb2.HpkeKeyFormat) KeyProto.add_key_type(aes_cmac_pb2.AesCmacKey, aes_cmac_pb2.AesCmacKeyFormat) KeyProto.add_key_type(hmac_pb2.HmacKey, hmac_pb2.HmacKeyFormat) KeyProto.add_key_type(ecdsa_pb2.EcdsaPrivateKey, ecdsa_pb2.EcdsaKeyFormat) KeyProto.add_key_type(ecdsa_pb2.EcdsaPublicKey, ecdsa_pb2.EcdsaKeyFormat) KeyProto.add_key_type(ed25519_pb2.Ed25519PrivateKey, ed25519_pb2.Ed25519KeyFormat) KeyProto.add_key_type(ed25519_pb2.Ed25519PublicKey, ed25519_pb2.Ed25519KeyFormat) KeyProto.add_key_type(rsa_ssa_pkcs1_pb2.RsaSsaPkcs1PrivateKey, rsa_ssa_pkcs1_pb2.RsaSsaPkcs1KeyFormat) KeyProto.add_key_type(rsa_ssa_pkcs1_pb2.RsaSsaPkcs1PublicKey, rsa_ssa_pkcs1_pb2.RsaSsaPkcs1KeyFormat) KeyProto.add_key_type(rsa_ssa_pss_pb2.RsaSsaPssPrivateKey, rsa_ssa_pss_pb2.RsaSsaPssKeyFormat) KeyProto.add_key_type(rsa_ssa_pss_pb2.RsaSsaPssPublicKey, rsa_ssa_pss_pb2.RsaSsaPssKeyFormat) KeyProto.add_key_type(aes_cmac_prf_pb2.AesCmacPrfKey, aes_cmac_prf_pb2.AesCmacPrfKeyFormat) KeyProto.add_key_type(hmac_prf_pb2.HmacPrfKey, hmac_prf_pb2.HmacPrfKeyFormat) KeyProto.add_key_type(hkdf_prf_pb2.HkdfPrfKey, hkdf_prf_pb2.HkdfPrfKeyFormat) KeyProto.add_key_type(jwt_ecdsa_pb2.JwtEcdsaPrivateKey, jwt_ecdsa_pb2.JwtEcdsaKeyFormat) KeyProto.add_key_type(jwt_ecdsa_pb2.JwtEcdsaPublicKey, jwt_ecdsa_pb2.JwtEcdsaKeyFormat) KeyProto.add_key_type(jwt_hmac_pb2.JwtHmacKey, jwt_hmac_pb2.JwtHmacKeyFormat) KeyProto.add_key_type(jwt_rsa_ssa_pkcs1_pb2.JwtRsaSsaPkcs1PrivateKey, jwt_rsa_ssa_pkcs1_pb2.JwtRsaSsaPkcs1KeyFormat) KeyProto.add_key_type(jwt_rsa_ssa_pkcs1_pb2.JwtRsaSsaPkcs1PublicKey, jwt_rsa_ssa_pkcs1_pb2.JwtRsaSsaPkcs1KeyFormat) KeyProto.add_key_type(jwt_rsa_ssa_pss_pb2.JwtRsaSsaPssPrivateKey, jwt_rsa_ssa_pss_pb2.JwtRsaSsaPssKeyFormat) KeyProto.add_key_type(jwt_rsa_ssa_pss_pb2.JwtRsaSsaPssPublicKey, jwt_rsa_ssa_pss_pb2.JwtRsaSsaPssKeyFormat) KeyProto.add_key_type(kms_aead_pb2.KmsAeadKey, kms_aead_pb2.KmsAeadKeyFormat) KeyProto.add_key_type(kms_envelope_pb2.KmsEnvelopeAeadKey, kms_envelope_pb2.KmsEnvelopeAeadKeyFormat) def _text_format_field(value: Any, field: descriptor.FieldDescriptor, indent: str) -> str: """Returns a text formated proto field.""" if field.type == TYPE_MESSAGE: output = [ indent + field.name + ' {', _normalize_and_text_format_message(value, indent + ' '), indent + '}' ] return '\n'.join(output) elif field.type == TYPE_ENUM: value_name = field.enum_type.values_by_number[value].name return indent + field.name + ': ' + value_name elif field.type in [TYPE_STRING, TYPE_BYTES]: return (indent + field.name + ': "' + text_encoding.CEscape(value, False) + '"') else: return indent + field.name + ': ' + str(value) def _normalize_and_text_format_message(msg: message.Message, indent: str) -> str: """Returns a text formated proto message and changes msg to be canonical. Args: msg: the proto to be formated. indent: the indentation prefix of each line in the output. Returns: A proto text format output, where serialized fields are deserialized in a comment. """ output = [] fields = list(msg.DESCRIPTOR.fields) # special case for Tinks custom 'any' proto. if (msg.DESCRIPTOR.full_name == 'google.crypto.tink.KeyTemplate' or msg.DESCRIPTOR.full_name == 'google.crypto.tink.KeyData'): type_url = getattr(msg, 'type_url') # Pytype requires to use getattr output.append( _text_format_field(type_url, fields[0], indent)) value = getattr(msg, 'value') if msg.DESCRIPTOR.full_name == 'google.crypto.tink.KeyTemplate': # In KeyTemplates, type_url does not match the proto type used. proto_type = KeyProto.format_from_url(type_url) else: proto_type = KeyProto.from_url(type_url) # parse 'value' and text format the content in a comment. field_proto = proto_type.FromString(value) output.append(indent + '# value: [' + TYPE_PREFIX + proto_type.DESCRIPTOR.full_name + '] {') formatted_message = _normalize_and_text_format_message( field_proto, indent + '# ') if formatted_message: output.append(formatted_message) output.append(indent + '# }') # Serialize message again so it is canonicalized # We require here that proto serialization is in increasing field order # (Tink protos are basically unchangeable, so we don't need to worry about # unknown fields). This is not guaranteed by proto, but is currently the # case. If this ever changes we either hopefully have already a better # solution in Tink, or else the proto team provides us with a reflection # based API to do this (as they do in C++.) In this case, we simply use the # slow API here. value = field_proto.SerializeToString(deterministic = True) setattr(msg, 'value', value) output.append( _text_format_field(value, fields[1], indent)) fields = fields[2:] for field in fields: if field.label == LABEL_REPEATED: for value in getattr(msg, field.name): output.append(_text_format_field(value, field, indent)) else: output.append( _text_format_field( getattr(msg, field.name), field, indent)) return '\n'.join(output) def text_format(msg: message.Message) -> str: msgcopy = copy.deepcopy(msg) return _normalize_and_text_format_message(msgcopy, '') def parse_text_format(serialized: str, msg: message.Message) -> None: # Different from binary parsing, text_format.Parse does not Clear the message. msg.Clear() proto_text_format.Parse(serialized, msg) serialized_copy = text_format(msg) assert serialized_copy == serialized, serialized_copy def assert_tink_proto_equal(self, a: message.Message, b: message.Message, msg: Optional[str] = None) -> None: """Fails with a useful error if a and b aren't equal.""" a_copy = copy.deepcopy(a) b_copy = copy.deepcopy(b) self.assertMultiLineEqual( _normalize_and_text_format_message(a_copy, ''), _normalize_and_text_format_message(b_copy, ''), msg=msg)