1# Copyright 2020 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS-IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests for tink.tools.testing.python.testing_server.""" 15 16from absl.testing import absltest 17import grpc 18 19import tink 20from tink import aead 21from tink import daead 22from tink import hybrid 23from tink import mac 24from tink import prf 25from tink import signature 26from tink import streaming_aead 27 28 29from protos import testing_api_pb2 30import services 31 32 33class DummyServicerContext(grpc.ServicerContext): 34 35 def is_active(self): 36 pass 37 38 def time_remaining(self): 39 pass 40 41 def cancel(self): 42 pass 43 44 def add_callback(self, callback): 45 pass 46 47 def invocation_metadata(self): 48 pass 49 50 def peer(self): 51 pass 52 53 def peer_identities(self): 54 pass 55 56 def peer_identity_key(self): 57 pass 58 59 def auth_context(self): 60 pass 61 62 def set_compression(self, compression): 63 pass 64 65 def send_initial_metadata(self, initial_metadata): 66 pass 67 68 def set_trailing_metadata(self, trailing_metadata): 69 pass 70 71 def abort(self, code, details): 72 pass 73 74 def abort_with_status(self, status): 75 pass 76 77 def set_code(self, code): 78 pass 79 80 def set_details(self, details): 81 pass 82 83 def disable_next_message_compression(self): 84 pass 85 86 87class ServicesTest(absltest.TestCase): 88 89 _ctx = DummyServicerContext() 90 91 @classmethod 92 def setUpClass(cls): 93 super().setUpClass() 94 aead.register() 95 daead.register() 96 mac.register() 97 hybrid.register() 98 prf.register() 99 signature.register() 100 streaming_aead.register() 101 102 def test_from_json(self): 103 keyset_servicer = services.KeysetServicer() 104 json_keyset = """ 105 { 106 "primaryKeyId": 42, 107 "key": [ 108 { 109 "keyData": { 110 "typeUrl": "type.googleapis.com/google.crypto.tink.AesGcmKey", 111 "keyMaterialType": "SYMMETRIC", 112 "value": "AFakeTestKeyValue1234567" 113 114 }, 115 "outputPrefixType": "TINK", 116 "keyId": 42, 117 "status": "ENABLED" 118 } 119 ] 120 }""" 121 request = testing_api_pb2.KeysetFromJsonRequest(json_keyset=json_keyset) 122 response = keyset_servicer.FromJson(request, self._ctx) 123 self.assertEqual(response.WhichOneof('result'), 'keyset') 124 keyset = tink.BinaryKeysetReader(response.keyset).read() 125 self.assertEqual(keyset.primary_key_id, 42) 126 self.assertLen(keyset.key, 1) 127 128 def test_from_json_fail(self): 129 keyset_servicer = services.KeysetServicer() 130 request = testing_api_pb2.KeysetFromJsonRequest(json_keyset='bad json') 131 response = keyset_servicer.FromJson(request, self._ctx) 132 self.assertEqual(response.WhichOneof('result'), 'err') 133 self.assertNotEmpty(response.err) 134 135 def test_generate_to_from_json(self): 136 keyset_servicer = services.KeysetServicer() 137 138 template = aead.aead_key_templates.AES128_GCM.SerializeToString() 139 gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 140 gen_response = keyset_servicer.Generate(gen_request, self._ctx) 141 self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 142 keyset = gen_response.keyset 143 144 tojson_request = testing_api_pb2.KeysetToJsonRequest(keyset=keyset) 145 tojson_response = keyset_servicer.ToJson(tojson_request, self._ctx) 146 self.assertEqual(tojson_response.WhichOneof('result'), 'json_keyset') 147 json_keyset = tojson_response.json_keyset 148 149 fromjson_request = testing_api_pb2.KeysetFromJsonRequest( 150 json_keyset=json_keyset) 151 fromjson_response = keyset_servicer.FromJson(fromjson_request, self._ctx) 152 self.assertEqual(fromjson_response.WhichOneof('result'), 'keyset') 153 self.assertEqual(fromjson_response.keyset, keyset) 154 155 def test_to_json_fail(self): 156 keyset_servicer = services.KeysetServicer() 157 request = testing_api_pb2.KeysetToJsonRequest(keyset=b'bad keyset') 158 response = keyset_servicer.ToJson(request, self._ctx) 159 self.assertEqual(response.WhichOneof('result'), 'err') 160 self.assertNotEmpty(response.err) 161 162 def test_generate_keyset_write_read_encrypted(self): 163 keyset_servicer = services.KeysetServicer() 164 165 template = aead.aead_key_templates.AES128_GCM.SerializeToString() 166 gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 167 master_response = keyset_servicer.Generate(gen_request, self._ctx) 168 self.assertEqual(master_response.WhichOneof('result'), 'keyset') 169 master_keyset = master_response.keyset 170 171 keyset_response = keyset_servicer.Generate(gen_request, self._ctx) 172 self.assertEqual(keyset_response.WhichOneof('result'), 'keyset') 173 keyset = keyset_response.keyset 174 175 write_encrypted_request = testing_api_pb2.KeysetWriteEncryptedRequest( 176 keyset=keyset, 177 master_keyset=master_keyset, 178 keyset_writer_type=testing_api_pb2.KEYSET_WRITER_BINARY) 179 write_encrypted_response = keyset_servicer.WriteEncrypted( 180 write_encrypted_request, self._ctx) 181 self.assertEqual( 182 write_encrypted_response.WhichOneof('result'), 'encrypted_keyset') 183 encrypted_keyset = write_encrypted_response.encrypted_keyset 184 185 read_encrypted_request = testing_api_pb2.KeysetReadEncryptedRequest( 186 encrypted_keyset=encrypted_keyset, 187 master_keyset=master_keyset, 188 keyset_reader_type=testing_api_pb2.KEYSET_READER_BINARY) 189 read_encrypted_response = keyset_servicer.ReadEncrypted( 190 read_encrypted_request, self._ctx) 191 self.assertEqual(read_encrypted_response.WhichOneof('result'), 'keyset') 192 self.assertEqual(read_encrypted_response.keyset, keyset) 193 194 def test_generate_keyset_write_read_encrypted_with_associated_data(self): 195 keyset_servicer = services.KeysetServicer() 196 197 template = aead.aead_key_templates.AES128_GCM.SerializeToString() 198 gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 199 master_response = keyset_servicer.Generate(gen_request, self._ctx) 200 self.assertEqual(master_response.WhichOneof('result'), 'keyset') 201 master_keyset = master_response.keyset 202 203 keyset_response = keyset_servicer.Generate(gen_request, self._ctx) 204 self.assertEqual(keyset_response.WhichOneof('result'), 'keyset') 205 keyset = keyset_response.keyset 206 207 associated_data = b'associated_data' 208 209 write_encrypted_request = testing_api_pb2.KeysetWriteEncryptedRequest( 210 keyset=keyset, 211 master_keyset=master_keyset, 212 associated_data=testing_api_pb2.BytesValue(value=associated_data), 213 keyset_writer_type=testing_api_pb2.KEYSET_WRITER_BINARY) 214 write_encrypted_response = keyset_servicer.WriteEncrypted( 215 write_encrypted_request, self._ctx) 216 self.assertEqual( 217 write_encrypted_response.WhichOneof('result'), 'encrypted_keyset') 218 encrypted_keyset = write_encrypted_response.encrypted_keyset 219 220 read_encrypted_request = testing_api_pb2.KeysetReadEncryptedRequest( 221 encrypted_keyset=encrypted_keyset, 222 master_keyset=master_keyset, 223 associated_data=testing_api_pb2.BytesValue(value=associated_data), 224 keyset_reader_type=testing_api_pb2.KEYSET_READER_BINARY) 225 read_encrypted_response = keyset_servicer.ReadEncrypted( 226 read_encrypted_request, self._ctx) 227 self.assertEqual(read_encrypted_response.WhichOneof('result'), 'keyset') 228 self.assertEqual(read_encrypted_response.keyset, keyset) 229 230 # Using the wrong associated_data fails 231 read_encrypted_request = testing_api_pb2.KeysetReadEncryptedRequest( 232 encrypted_keyset=encrypted_keyset, 233 master_keyset=master_keyset, 234 associated_data=testing_api_pb2.BytesValue(value=b'wrong ad'), 235 keyset_reader_type=testing_api_pb2.KEYSET_READER_BINARY) 236 read_encrypted_response = keyset_servicer.ReadEncrypted( 237 read_encrypted_request, self._ctx) 238 self.assertEqual(read_encrypted_response.WhichOneof('result'), 'err') 239 240 def test_keyset_write_encrypted_fails_when_keyset_is_invalid(self): 241 keyset_servicer = services.KeysetServicer() 242 243 template = aead.aead_key_templates.AES128_GCM.SerializeToString() 244 gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 245 master_response = keyset_servicer.Generate(gen_request, self._ctx) 246 self.assertEqual(master_response.WhichOneof('result'), 'keyset') 247 master_keyset = master_response.keyset 248 249 write_encrypted_request = testing_api_pb2.KeysetWriteEncryptedRequest( 250 keyset=b'invalid', 251 master_keyset=master_keyset, 252 keyset_writer_type=testing_api_pb2.KEYSET_WRITER_BINARY) 253 write_encrypted_response = keyset_servicer.WriteEncrypted( 254 write_encrypted_request, self._ctx) 255 self.assertEqual(write_encrypted_response.WhichOneof('result'), 'err') 256 257 def test_keyset_read_encrypted_fails_when_encrypted_keyset_is_invalid(self): 258 keyset_servicer = services.KeysetServicer() 259 260 template = aead.aead_key_templates.AES128_GCM.SerializeToString() 261 gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 262 master_response = keyset_servicer.Generate(gen_request, self._ctx) 263 self.assertEqual(master_response.WhichOneof('result'), 'keyset') 264 master_keyset = master_response.keyset 265 266 read_encrypted_request = testing_api_pb2.KeysetReadEncryptedRequest( 267 encrypted_keyset=b'invalid', 268 master_keyset=master_keyset, 269 keyset_reader_type=testing_api_pb2.KEYSET_READER_BINARY) 270 read_encrypted_response = keyset_servicer.ReadEncrypted( 271 read_encrypted_request, self._ctx) 272 self.assertEqual(read_encrypted_response.WhichOneof('result'), 'err') 273 274 def test_create_aead(self): 275 keyset_servicer = services.KeysetServicer() 276 aead_servicer = services.AeadServicer() 277 278 template = aead.aead_key_templates.AES128_GCM.SerializeToString() 279 gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 280 gen_response = keyset_servicer.Generate(gen_request, self._ctx) 281 self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 282 283 creation_request = testing_api_pb2.CreationRequest( 284 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 285 serialized_keyset=gen_response.keyset)) 286 creation_response = aead_servicer.Create(creation_request, self._ctx) 287 self.assertEmpty(creation_response.err) 288 289 def test_create_aead_broken_keyset(self): 290 aead_servicer = services.AeadServicer() 291 292 creation_request = testing_api_pb2.CreationRequest( 293 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 294 serialized_keyset=b'\x80')) 295 creation_response = aead_servicer.Create(creation_request, self._ctx) 296 self.assertNotEmpty(creation_response.err) 297 298 def test_encrypt_decrypt_wrong_keyset(self): 299 aead_servicer = services.AeadServicer() 300 keyset_servicer = services.KeysetServicer() 301 # HMAC keysets will not allow creation of an AEAD. 302 template = mac.mac_key_templates.HMAC_SHA256_128BITTAG.SerializeToString() 303 gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 304 gen_response = keyset_servicer.Generate(gen_request, self._ctx) 305 self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 306 keyset = gen_response.keyset 307 308 with self.assertRaises(tink.TinkError): 309 aead_servicer.Encrypt( 310 testing_api_pb2.AeadEncryptRequest( 311 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 312 serialized_keyset=keyset)), self._ctx) 313 314 with self.assertRaises(tink.TinkError): 315 aead_servicer.Decrypt( 316 testing_api_pb2.AeadDecryptRequest( 317 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 318 serialized_keyset=keyset)), self._ctx) 319 320 def test_generate_encrypt_decrypt(self): 321 keyset_servicer = services.KeysetServicer() 322 aead_servicer = services.AeadServicer() 323 324 template = aead.aead_key_templates.AES128_GCM.SerializeToString() 325 gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 326 gen_response = keyset_servicer.Generate(gen_request, self._ctx) 327 self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 328 keyset = gen_response.keyset 329 plaintext = b'The quick brown fox jumps over the lazy dog' 330 associated_data = b'associated_data' 331 enc_request = testing_api_pb2.AeadEncryptRequest( 332 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 333 serialized_keyset=keyset), 334 plaintext=plaintext, 335 associated_data=associated_data) 336 enc_response = aead_servicer.Encrypt(enc_request, self._ctx) 337 self.assertEqual(enc_response.WhichOneof('result'), 'ciphertext') 338 ciphertext = enc_response.ciphertext 339 dec_request = testing_api_pb2.AeadDecryptRequest( 340 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 341 serialized_keyset=keyset), 342 ciphertext=ciphertext, 343 associated_data=associated_data) 344 dec_response = aead_servicer.Decrypt(dec_request, self._ctx) 345 self.assertEqual(dec_response.WhichOneof('result'), 'plaintext') 346 self.assertEqual(dec_response.plaintext, plaintext) 347 348 def test_generate_decrypt_fail(self): 349 keyset_servicer = services.KeysetServicer() 350 aead_servicer = services.AeadServicer() 351 352 template = aead.aead_key_templates.AES128_GCM.SerializeToString() 353 gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 354 gen_response = keyset_servicer.Generate(gen_request, self._ctx) 355 self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 356 keyset = gen_response.keyset 357 358 ciphertext = b'some invalid ciphertext' 359 associated_data = b'associated_data' 360 dec_request = testing_api_pb2.AeadDecryptRequest( 361 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 362 serialized_keyset=keyset), 363 ciphertext=ciphertext, 364 associated_data=associated_data) 365 dec_response = aead_servicer.Decrypt(dec_request, self._ctx) 366 self.assertEqual(dec_response.WhichOneof('result'), 'err') 367 self.assertNotEmpty(dec_response.err) 368 369 def test_server_info(self): 370 metadata_servicer = services.MetadataServicer() 371 request = testing_api_pb2.ServerInfoRequest() 372 response = metadata_servicer.GetServerInfo(request, self._ctx) 373 self.assertEqual(response.language, 'python') 374 375 def test_create_deterministic_aead(self): 376 keyset_servicer = services.KeysetServicer() 377 daead_servicer = services.DeterministicAeadServicer() 378 379 template_proto = daead.deterministic_aead_key_templates.AES256_SIV 380 template = template_proto.SerializeToString() 381 gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 382 gen_response = keyset_servicer.Generate(gen_request, self._ctx) 383 self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 384 385 creation_request = testing_api_pb2.CreationRequest( 386 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 387 serialized_keyset=gen_response.keyset)) 388 creation_response = daead_servicer.Create( 389 creation_request, self._ctx) 390 self.assertEmpty(creation_response.err) 391 392 def test_create_deterministic_aead_broken_keyset(self): 393 daead_servicer = services.DeterministicAeadServicer() 394 395 creation_request = testing_api_pb2.CreationRequest( 396 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 397 serialized_keyset=b'\x80')) 398 creation_response = daead_servicer.Create(creation_request, self._ctx) 399 self.assertNotEmpty(creation_response.err) 400 401 def test_encrypt_decrypt_deterministic_aead_broken_keyset(self): 402 keyset_servicer = services.KeysetServicer() 403 daead_servicer = services.DeterministicAeadServicer() 404 405 # AES128_GCM keysets will not allow creation of an Deterministic AEAD. 406 template = aead.aead_key_templates.AES128_GCM.SerializeToString() 407 gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 408 gen_response = keyset_servicer.Generate(gen_request, self._ctx) 409 self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 410 keyset = gen_response.keyset 411 412 enc_request = testing_api_pb2.DeterministicAeadEncryptRequest( 413 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 414 serialized_keyset=keyset)) 415 with self.assertRaises(tink.TinkError): 416 daead_servicer.EncryptDeterministically(enc_request, self._ctx) 417 dec_request = testing_api_pb2.DeterministicAeadDecryptRequest( 418 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 419 serialized_keyset=keyset)) 420 with self.assertRaises(tink.TinkError): 421 daead_servicer.DecryptDeterministically(dec_request, self._ctx) 422 423 def test_generate_encrypt_decrypt_deterministically(self): 424 keyset_servicer = services.KeysetServicer() 425 daead_servicer = services.DeterministicAeadServicer() 426 427 template_proto = daead.deterministic_aead_key_templates.AES256_SIV 428 template = template_proto.SerializeToString() 429 gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 430 gen_response = keyset_servicer.Generate(gen_request, self._ctx) 431 self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 432 keyset = gen_response.keyset 433 plaintext = b'The quick brown fox jumps over the lazy dog' 434 associated_data = b'associated_data' 435 enc_request = testing_api_pb2.DeterministicAeadEncryptRequest( 436 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 437 serialized_keyset=keyset), 438 plaintext=plaintext, 439 associated_data=associated_data) 440 enc_response = daead_servicer.EncryptDeterministically(enc_request, 441 self._ctx) 442 self.assertEqual(enc_response.WhichOneof('result'), 'ciphertext') 443 enc_response2 = daead_servicer.EncryptDeterministically(enc_request, 444 self._ctx) 445 self.assertEqual(enc_response2.WhichOneof('result'), 'ciphertext') 446 self.assertEqual(enc_response2.ciphertext, enc_response.ciphertext) 447 ciphertext = enc_response.ciphertext 448 dec_request = testing_api_pb2.DeterministicAeadDecryptRequest( 449 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 450 serialized_keyset=keyset), 451 ciphertext=ciphertext, 452 associated_data=associated_data) 453 dec_response = daead_servicer.DecryptDeterministically(dec_request, 454 self._ctx) 455 self.assertEqual(dec_response.WhichOneof('result'), 'plaintext') 456 self.assertEqual(dec_response.plaintext, plaintext) 457 458 def test_generate_decrypt_deterministically_fail(self): 459 keyset_servicer = services.KeysetServicer() 460 daead_servicer = services.DeterministicAeadServicer() 461 462 template_proto = daead.deterministic_aead_key_templates.AES256_SIV 463 template = template_proto.SerializeToString() 464 gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 465 gen_response = keyset_servicer.Generate(gen_request, self._ctx) 466 self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 467 keyset = gen_response.keyset 468 469 ciphertext = b'some invalid ciphertext' 470 associated_data = b'associated_data' 471 dec_request = testing_api_pb2.DeterministicAeadDecryptRequest( 472 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 473 serialized_keyset=keyset), 474 ciphertext=ciphertext, 475 associated_data=associated_data) 476 dec_response = daead_servicer.DecryptDeterministically(dec_request, 477 self._ctx) 478 self.assertEqual(dec_response.WhichOneof('result'), 'err') 479 self.assertNotEmpty(dec_response.err) 480 481 def test_create_mac(self): 482 keyset_servicer = services.KeysetServicer() 483 mac_servicer = services.MacServicer() 484 485 template = mac.mac_key_templates.HMAC_SHA256_128BITTAG.SerializeToString() 486 gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 487 gen_response = keyset_servicer.Generate(gen_request, self._ctx) 488 self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 489 490 creation_request = testing_api_pb2.CreationRequest( 491 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 492 serialized_keyset=gen_response.keyset)) 493 creation_response = mac_servicer.Create( 494 creation_request, self._ctx) 495 self.assertEmpty(creation_response.err) 496 497 def test_create_mac_broken_keyset(self): 498 mac_servicer = services.MacServicer() 499 500 creation_request = testing_api_pb2.CreationRequest( 501 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 502 serialized_keyset=b'\x80')) 503 creation_response = mac_servicer.Create(creation_request, self._ctx) 504 self.assertNotEmpty(creation_response.err) 505 506 def test_generate_compute_verify_mac(self): 507 keyset_servicer = services.KeysetServicer() 508 mac_servicer = services.MacServicer() 509 510 template = mac.mac_key_templates.HMAC_SHA256_128BITTAG.SerializeToString() 511 gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 512 gen_response = keyset_servicer.Generate(gen_request, self._ctx) 513 self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 514 keyset = gen_response.keyset 515 data = b'The quick brown fox jumps over the lazy dog' 516 comp_request = testing_api_pb2.ComputeMacRequest( 517 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 518 serialized_keyset=keyset), 519 data=data) 520 comp_response = mac_servicer.ComputeMac(comp_request, self._ctx) 521 self.assertEqual(comp_response.WhichOneof('result'), 'mac_value') 522 mac_value = comp_response.mac_value 523 verify_request = testing_api_pb2.VerifyMacRequest( 524 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 525 serialized_keyset=keyset), 526 mac_value=mac_value, 527 data=data) 528 verify_response = mac_servicer.VerifyMac(verify_request, self._ctx) 529 self.assertEmpty(verify_response.err) 530 531 def test_generate_compute_verify_mac_fail(self): 532 keyset_servicer = services.KeysetServicer() 533 mac_servicer = services.MacServicer() 534 535 template = mac.mac_key_templates.HMAC_SHA256_128BITTAG.SerializeToString() 536 gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 537 gen_response = keyset_servicer.Generate(gen_request, self._ctx) 538 self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 539 keyset = gen_response.keyset 540 541 verify_request = testing_api_pb2.VerifyMacRequest( 542 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 543 serialized_keyset=keyset), 544 mac_value=b'invalid mac_value', 545 data=b'data') 546 verify_response = mac_servicer.VerifyMac(verify_request, self._ctx) 547 self.assertNotEmpty(verify_response.err) 548 549 def test_create_hybrid_decrypt(self): 550 keyset_servicer = services.KeysetServicer() 551 hybrid_servicer = services.HybridServicer() 552 553 tp = hybrid.hybrid_key_templates.ECIES_P256_HKDF_HMAC_SHA256_AES128_GCM 554 template = tp.SerializeToString() 555 gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 556 gen_response = keyset_servicer.Generate(gen_request, self._ctx) 557 self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 558 559 creation_request = testing_api_pb2.CreationRequest( 560 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 561 serialized_keyset=gen_response.keyset)) 562 creation_response = hybrid_servicer.CreateHybridDecrypt( 563 creation_request, self._ctx) 564 self.assertEmpty(creation_response.err) 565 566 def test_create_hybrid_decrypt_bad_keyset(self): 567 hybrid_servicer = services.HybridServicer() 568 569 creation_request = testing_api_pb2.CreationRequest( 570 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 571 serialized_keyset=b'\x80')) 572 creation_response = hybrid_servicer.CreateHybridDecrypt( 573 creation_request, self._ctx) 574 self.assertNotEmpty(creation_response.err) 575 576 def test_create_hybrid_encrypt(self): 577 keyset_servicer = services.KeysetServicer() 578 hybrid_servicer = services.HybridServicer() 579 580 tp = hybrid.hybrid_key_templates.ECIES_P256_HKDF_HMAC_SHA256_AES128_GCM 581 template = tp.SerializeToString() 582 gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 583 gen_response = keyset_servicer.Generate(gen_request, self._ctx) 584 self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 585 pub_request = testing_api_pb2.KeysetPublicRequest( 586 private_keyset=gen_response.keyset) 587 pub_response = keyset_servicer.Public(pub_request, self._ctx) 588 self.assertEqual(pub_response.WhichOneof('result'), 'public_keyset') 589 590 creation_request = testing_api_pb2.CreationRequest( 591 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 592 serialized_keyset=pub_response.public_keyset)) 593 creation_response = hybrid_servicer.CreateHybridEncrypt( 594 creation_request, self._ctx) 595 self.assertEmpty(creation_response.err) 596 597 def test_create_hybrid_encrypt_bad_keyset(self): 598 hybrid_servicer = services.HybridServicer() 599 600 creation_request = testing_api_pb2.CreationRequest( 601 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 602 serialized_keyset=b'\x80')) 603 creation_response = hybrid_servicer.CreateHybridEncrypt( 604 creation_request, self._ctx) 605 self.assertNotEmpty(creation_response.err) 606 607 def test_generate_hybrid_encrypt_decrypt(self): 608 keyset_servicer = services.KeysetServicer() 609 hybrid_servicer = services.HybridServicer() 610 611 tp = hybrid.hybrid_key_templates.ECIES_P256_HKDF_HMAC_SHA256_AES128_GCM 612 template = tp.SerializeToString() 613 gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 614 gen_response = keyset_servicer.Generate(gen_request, self._ctx) 615 self.assertEmpty(gen_response.err) 616 private_keyset = gen_response.keyset 617 618 pub_request = testing_api_pb2.KeysetPublicRequest( 619 private_keyset=private_keyset) 620 pub_response = keyset_servicer.Public(pub_request, self._ctx) 621 self.assertEqual(pub_response.WhichOneof('result'), 'public_keyset') 622 public_keyset = pub_response.public_keyset 623 624 plaintext = b'The quick brown fox jumps over the lazy dog' 625 context_info = b'context_info' 626 enc_request = testing_api_pb2.HybridEncryptRequest( 627 public_annotated_keyset=testing_api_pb2.AnnotatedKeyset( 628 serialized_keyset=public_keyset), 629 plaintext=plaintext, 630 context_info=context_info) 631 enc_response = hybrid_servicer.Encrypt(enc_request, self._ctx) 632 self.assertEqual(enc_response.WhichOneof('result'), 'ciphertext') 633 ciphertext = enc_response.ciphertext 634 635 dec_request = testing_api_pb2.HybridDecryptRequest( 636 private_annotated_keyset=testing_api_pb2.AnnotatedKeyset( 637 serialized_keyset=private_keyset), 638 ciphertext=ciphertext, 639 context_info=context_info) 640 dec_response = hybrid_servicer.Decrypt(dec_request, self._ctx) 641 self.assertEqual(dec_response.WhichOneof('result'), 'plaintext') 642 self.assertEqual(dec_response.plaintext, plaintext) 643 644 def test_generate_hybrid_encrypt_decrypt_fail(self): 645 keyset_servicer = services.KeysetServicer() 646 hybrid_servicer = services.HybridServicer() 647 648 tp = hybrid.hybrid_key_templates.ECIES_P256_HKDF_HMAC_SHA256_AES128_GCM 649 template = tp.SerializeToString() 650 gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 651 gen_response = keyset_servicer.Generate(gen_request, self._ctx) 652 self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 653 private_keyset = gen_response.keyset 654 655 dec_request = testing_api_pb2.HybridDecryptRequest( 656 private_annotated_keyset=testing_api_pb2.AnnotatedKeyset( 657 serialized_keyset=private_keyset), 658 ciphertext=b'invalid ciphertext', 659 context_info=b'context_info') 660 dec_response = hybrid_servicer.Decrypt(dec_request, self._ctx) 661 self.assertEqual(dec_response.WhichOneof('result'), 'err') 662 self.assertNotEmpty(dec_response.err) 663 664 def test_create_public_key_sign(self): 665 keyset_servicer = services.KeysetServicer() 666 signature_servicer = services.SignatureServicer() 667 668 template = signature.signature_key_templates.ECDSA_P256.SerializeToString() 669 gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 670 gen_response = keyset_servicer.Generate(gen_request, self._ctx) 671 self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 672 673 creation_request = testing_api_pb2.CreationRequest( 674 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 675 serialized_keyset=gen_response.keyset)) 676 creation_response = signature_servicer.CreatePublicKeySign( 677 creation_request, self._ctx) 678 self.assertEmpty(creation_response.err) 679 680 def test_create_public_key_sign_bad_keyset(self): 681 signature_servicer = services.SignatureServicer() 682 683 creation_request = testing_api_pb2.CreationRequest( 684 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 685 serialized_keyset=b'\x80')) 686 creation_response = signature_servicer.CreatePublicKeySign( 687 creation_request, self._ctx) 688 self.assertNotEmpty(creation_response.err) 689 690 def test_create_public_key_verify(self): 691 keyset_servicer = services.KeysetServicer() 692 signature_servicer = services.SignatureServicer() 693 694 template = signature.signature_key_templates.ECDSA_P256.SerializeToString() 695 gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 696 gen_response = keyset_servicer.Generate(gen_request, self._ctx) 697 self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 698 pub_request = testing_api_pb2.KeysetPublicRequest( 699 private_keyset=gen_response.keyset) 700 pub_response = keyset_servicer.Public(pub_request, self._ctx) 701 self.assertEqual(pub_response.WhichOneof('result'), 'public_keyset') 702 703 creation_request = testing_api_pb2.CreationRequest( 704 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 705 serialized_keyset=pub_response.public_keyset)) 706 creation_response = signature_servicer.CreatePublicKeyVerify( 707 creation_request, self._ctx) 708 self.assertEmpty(creation_response.err) 709 710 def test_create_public_key_verify_bad_keyset(self): 711 signature_servicer = services.SignatureServicer() 712 713 creation_request = testing_api_pb2.CreationRequest( 714 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 715 serialized_keyset=b'\x80')) 716 creation_response = signature_servicer.CreatePublicKeyVerify( 717 creation_request, self._ctx) 718 self.assertNotEmpty(creation_response.err) 719 720 def test_sign_verify(self): 721 keyset_servicer = services.KeysetServicer() 722 signature_servicer = services.SignatureServicer() 723 724 template = signature.signature_key_templates.ECDSA_P256.SerializeToString() 725 gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 726 gen_response = keyset_servicer.Generate(gen_request, self._ctx) 727 728 self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 729 private_keyset = gen_response.keyset 730 731 pub_request = testing_api_pb2.KeysetPublicRequest( 732 private_keyset=private_keyset) 733 pub_response = keyset_servicer.Public(pub_request, self._ctx) 734 self.assertEqual(pub_response.WhichOneof('result'), 'public_keyset') 735 public_keyset = pub_response.public_keyset 736 737 data = b'The quick brown fox jumps over the lazy dog' 738 739 sign_request = testing_api_pb2.SignatureSignRequest( 740 private_annotated_keyset=testing_api_pb2.AnnotatedKeyset( 741 serialized_keyset=private_keyset), 742 data=data) 743 sign_response = signature_servicer.Sign(sign_request, self._ctx) 744 self.assertEqual(sign_response.WhichOneof('result'), 'signature') 745 a_signature = sign_response.signature 746 747 verify_request = testing_api_pb2.SignatureVerifyRequest( 748 public_annotated_keyset=testing_api_pb2.AnnotatedKeyset( 749 serialized_keyset=public_keyset), 750 signature=a_signature, 751 data=data) 752 verify_response = signature_servicer.Verify(verify_request, self._ctx) 753 self.assertEmpty(verify_response.err) 754 755 def test_sign_verify_fail(self): 756 keyset_servicer = services.KeysetServicer() 757 signature_servicer = services.SignatureServicer() 758 759 template = signature.signature_key_templates.ECDSA_P256.SerializeToString() 760 gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 761 gen_response = keyset_servicer.Generate(gen_request, self._ctx) 762 self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 763 self.assertEmpty(gen_response.err) 764 private_keyset = gen_response.keyset 765 766 pub_request = testing_api_pb2.KeysetPublicRequest( 767 private_keyset=private_keyset) 768 pub_response = keyset_servicer.Public(pub_request, self._ctx) 769 self.assertEqual(pub_response.WhichOneof('result'), 'public_keyset') 770 public_keyset = pub_response.public_keyset 771 772 invalid_request = testing_api_pb2.SignatureVerifyRequest( 773 public_annotated_keyset=testing_api_pb2.AnnotatedKeyset( 774 serialized_keyset=public_keyset), 775 signature=b'invalid signature', 776 data=b'The quick brown fox jumps over the lazy dog') 777 invalid_response = signature_servicer.Verify(invalid_request, self._ctx) 778 self.assertNotEmpty(invalid_response.err) 779 780 def test_create_prf_set(self): 781 keyset_servicer = services.KeysetServicer() 782 prf_set_servicer = services.PrfSetServicer() 783 784 template = prf.prf_key_templates.HMAC_SHA256.SerializeToString() 785 gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 786 gen_response = keyset_servicer.Generate(gen_request, self._ctx) 787 self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 788 789 creation_request = testing_api_pb2.CreationRequest( 790 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 791 serialized_keyset=gen_response.keyset)) 792 creation_response = prf_set_servicer.Create(creation_request, self._ctx) 793 self.assertEmpty(creation_response.err) 794 795 def test_create_prf_set_wrong_keyset(self): 796 prf_set_servicer = services.PrfSetServicer() 797 798 creation_request = testing_api_pb2.CreationRequest( 799 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 800 serialized_keyset=b'\x80')) 801 creation_response = prf_set_servicer.Create(creation_request, self._ctx) 802 self.assertNotEmpty(creation_response.err) 803 804 def test_compute_prf(self): 805 keyset_servicer = services.KeysetServicer() 806 prf_set_servicer = services.PrfSetServicer() 807 template = prf.prf_key_templates.HMAC_SHA256.SerializeToString() 808 gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 809 gen_response = keyset_servicer.Generate(gen_request, self._ctx) 810 self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 811 keyset = gen_response.keyset 812 813 key_ids_request = testing_api_pb2.PrfSetKeyIdsRequest( 814 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 815 serialized_keyset=keyset)) 816 key_ids_response = prf_set_servicer.KeyIds(key_ids_request, self._ctx) 817 self.assertEqual(key_ids_response.WhichOneof('result'), 'output') 818 self.assertLen(key_ids_response.output.key_id, 1) 819 self.assertEqual(key_ids_response.output.key_id[0], 820 key_ids_response.output.primary_key_id) 821 822 output_length = 31 823 compute_request = testing_api_pb2.PrfSetComputeRequest( 824 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 825 serialized_keyset=keyset), 826 key_id=key_ids_response.output.primary_key_id, 827 input_data=b'input_data', 828 output_length=output_length) 829 compute_response = prf_set_servicer.Compute(compute_request, self._ctx) 830 self.assertEqual(compute_response.WhichOneof('result'), 'output') 831 self.assertLen(compute_response.output, output_length) 832 833 def test_key_ids_prf_fail(self): 834 prf_set_servicer = services.PrfSetServicer() 835 invalid_key_ids_response = prf_set_servicer.KeyIds( 836 testing_api_pb2.PrfSetKeyIdsRequest( 837 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 838 serialized_keyset=b'badkeyset')), self._ctx) 839 self.assertNotEmpty(invalid_key_ids_response.err) 840 841 def test_compute_prf_fail(self): 842 keyset_servicer = services.KeysetServicer() 843 prf_set_servicer = services.PrfSetServicer() 844 template = prf.prf_key_templates.HMAC_SHA256.SerializeToString() 845 gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 846 gen_response = keyset_servicer.Generate(gen_request, self._ctx) 847 self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 848 keyset = gen_response.keyset 849 key_ids_request = testing_api_pb2.PrfSetKeyIdsRequest( 850 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 851 serialized_keyset=keyset)) 852 key_ids_response = prf_set_servicer.KeyIds(key_ids_request, self._ctx) 853 self.assertEqual(key_ids_response.WhichOneof('result'), 'output') 854 primary_key_id = key_ids_response.output.primary_key_id 855 856 invalid_output_length = 123456 857 invalid_compute_request = testing_api_pb2.PrfSetComputeRequest( 858 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 859 serialized_keyset=keyset), 860 key_id=primary_key_id, 861 input_data=b'input_data', 862 output_length=invalid_output_length) 863 invalid_compute_response = prf_set_servicer.Compute(invalid_compute_request, 864 self._ctx) 865 self.assertEqual(invalid_compute_response.WhichOneof('result'), 'err') 866 self.assertNotEmpty(invalid_compute_response.err) 867 868 def test_create_streaming_aead(self): 869 keyset_servicer = services.KeysetServicer() 870 streaming_aead_servicer = services.StreamingAeadServicer() 871 872 templates = streaming_aead.streaming_aead_key_templates 873 template = templates.AES128_CTR_HMAC_SHA256_4KB.SerializeToString() 874 gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 875 gen_response = keyset_servicer.Generate(gen_request, self._ctx) 876 self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 877 878 creation_request = testing_api_pb2.CreationRequest( 879 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 880 serialized_keyset=gen_response.keyset)) 881 creation_response = streaming_aead_servicer.Create( 882 creation_request, self._ctx) 883 self.assertEmpty(creation_response.err) 884 885 def test_create_streaming_aead_broken_keyset(self): 886 streaming_aead_servicer = services.StreamingAeadServicer() 887 888 creation_request = testing_api_pb2.CreationRequest( 889 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 890 serialized_keyset=b'\x80')) 891 creation_response = streaming_aead_servicer.Create(creation_request, 892 self._ctx) 893 self.assertNotEmpty(creation_response.err) 894 895 def test_generate_streaming_encrypt_decrypt(self): 896 keyset_servicer = services.KeysetServicer() 897 streaming_aead_servicer = services.StreamingAeadServicer() 898 899 templates = streaming_aead.streaming_aead_key_templates 900 template = templates.AES128_CTR_HMAC_SHA256_4KB.SerializeToString() 901 gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 902 gen_response = keyset_servicer.Generate(gen_request, self._ctx) 903 self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 904 keyset = gen_response.keyset 905 plaintext = b'The quick brown fox jumps over the lazy dog' 906 associated_data = b'associated_data' 907 908 enc_request = testing_api_pb2.StreamingAeadEncryptRequest( 909 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 910 serialized_keyset=keyset), 911 plaintext=plaintext, 912 associated_data=associated_data) 913 enc_response = streaming_aead_servicer.Encrypt(enc_request, self._ctx) 914 self.assertEqual(enc_response.WhichOneof('result'), 'ciphertext') 915 ciphertext = enc_response.ciphertext 916 917 dec_request = testing_api_pb2.StreamingAeadDecryptRequest( 918 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 919 serialized_keyset=keyset), 920 ciphertext=ciphertext, 921 associated_data=associated_data) 922 dec_response = streaming_aead_servicer.Decrypt(dec_request, self._ctx) 923 self.assertEqual(dec_response.WhichOneof('result'), 'plaintext') 924 925 self.assertEqual(dec_response.plaintext, plaintext) 926 927 def test_generate_streaming_decrypt_fail(self): 928 keyset_servicer = services.KeysetServicer() 929 streaming_aead_servicer = services.StreamingAeadServicer() 930 931 templates = streaming_aead.streaming_aead_key_templates 932 template = templates.AES128_CTR_HMAC_SHA256_4KB.SerializeToString() 933 gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 934 gen_response = keyset_servicer.Generate(gen_request, self._ctx) 935 self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 936 keyset = gen_response.keyset 937 938 ciphertext = b'some invalid ciphertext' 939 associated_data = b'associated_data' 940 dec_request = testing_api_pb2.StreamingAeadDecryptRequest( 941 annotated_keyset=testing_api_pb2.AnnotatedKeyset( 942 serialized_keyset=keyset), 943 ciphertext=ciphertext, 944 associated_data=associated_data) 945 dec_response = streaming_aead_servicer.Decrypt(dec_request, self._ctx) 946 self.assertEqual(dec_response.WhichOneof('result'), 'err') 947 self.assertNotEmpty(dec_response.err) 948 949 950if __name__ == '__main__': 951 absltest.main() 952