xref: /aosp_15_r20/external/tink/testing/python/services_test.py (revision e7b1675dde1b92d52ec075b0a92829627f2c52a5)
1# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS-IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Tests for tink.tools.testing.python.testing_server."""
15
16from absl.testing import absltest
17import grpc
18
19import tink
20from tink import aead
21from tink import daead
22from tink import hybrid
23from tink import mac
24from tink import prf
25from tink import signature
26from tink import streaming_aead
27
28
29from protos import testing_api_pb2
30import services
31
32
33class DummyServicerContext(grpc.ServicerContext):
34
35  def is_active(self):
36    pass
37
38  def time_remaining(self):
39    pass
40
41  def cancel(self):
42    pass
43
44  def add_callback(self, callback):
45    pass
46
47  def invocation_metadata(self):
48    pass
49
50  def peer(self):
51    pass
52
53  def peer_identities(self):
54    pass
55
56  def peer_identity_key(self):
57    pass
58
59  def auth_context(self):
60    pass
61
62  def set_compression(self, compression):
63    pass
64
65  def send_initial_metadata(self, initial_metadata):
66    pass
67
68  def set_trailing_metadata(self, trailing_metadata):
69    pass
70
71  def abort(self, code, details):
72    pass
73
74  def abort_with_status(self, status):
75    pass
76
77  def set_code(self, code):
78    pass
79
80  def set_details(self, details):
81    pass
82
83  def disable_next_message_compression(self):
84    pass
85
86
87class ServicesTest(absltest.TestCase):
88
89  _ctx = DummyServicerContext()
90
91  @classmethod
92  def setUpClass(cls):
93    super().setUpClass()
94    aead.register()
95    daead.register()
96    mac.register()
97    hybrid.register()
98    prf.register()
99    signature.register()
100    streaming_aead.register()
101
102  def test_from_json(self):
103    keyset_servicer = services.KeysetServicer()
104    json_keyset = """
105        {
106          "primaryKeyId": 42,
107          "key": [
108            {
109              "keyData": {
110                "typeUrl": "type.googleapis.com/google.crypto.tink.AesGcmKey",
111                "keyMaterialType": "SYMMETRIC",
112                "value": "AFakeTestKeyValue1234567"
113
114              },
115              "outputPrefixType": "TINK",
116              "keyId": 42,
117              "status": "ENABLED"
118            }
119          ]
120        }"""
121    request = testing_api_pb2.KeysetFromJsonRequest(json_keyset=json_keyset)
122    response = keyset_servicer.FromJson(request, self._ctx)
123    self.assertEqual(response.WhichOneof('result'), 'keyset')
124    keyset = tink.BinaryKeysetReader(response.keyset).read()
125    self.assertEqual(keyset.primary_key_id, 42)
126    self.assertLen(keyset.key, 1)
127
128  def test_from_json_fail(self):
129    keyset_servicer = services.KeysetServicer()
130    request = testing_api_pb2.KeysetFromJsonRequest(json_keyset='bad json')
131    response = keyset_servicer.FromJson(request, self._ctx)
132    self.assertEqual(response.WhichOneof('result'), 'err')
133    self.assertNotEmpty(response.err)
134
135  def test_generate_to_from_json(self):
136    keyset_servicer = services.KeysetServicer()
137
138    template = aead.aead_key_templates.AES128_GCM.SerializeToString()
139    gen_request = testing_api_pb2.KeysetGenerateRequest(template=template)
140    gen_response = keyset_servicer.Generate(gen_request, self._ctx)
141    self.assertEqual(gen_response.WhichOneof('result'), 'keyset')
142    keyset = gen_response.keyset
143
144    tojson_request = testing_api_pb2.KeysetToJsonRequest(keyset=keyset)
145    tojson_response = keyset_servicer.ToJson(tojson_request, self._ctx)
146    self.assertEqual(tojson_response.WhichOneof('result'), 'json_keyset')
147    json_keyset = tojson_response.json_keyset
148
149    fromjson_request = testing_api_pb2.KeysetFromJsonRequest(
150        json_keyset=json_keyset)
151    fromjson_response = keyset_servicer.FromJson(fromjson_request, self._ctx)
152    self.assertEqual(fromjson_response.WhichOneof('result'), 'keyset')
153    self.assertEqual(fromjson_response.keyset, keyset)
154
155  def test_to_json_fail(self):
156    keyset_servicer = services.KeysetServicer()
157    request = testing_api_pb2.KeysetToJsonRequest(keyset=b'bad keyset')
158    response = keyset_servicer.ToJson(request, self._ctx)
159    self.assertEqual(response.WhichOneof('result'), 'err')
160    self.assertNotEmpty(response.err)
161
162  def test_generate_keyset_write_read_encrypted(self):
163    keyset_servicer = services.KeysetServicer()
164
165    template = aead.aead_key_templates.AES128_GCM.SerializeToString()
166    gen_request = testing_api_pb2.KeysetGenerateRequest(template=template)
167    master_response = keyset_servicer.Generate(gen_request, self._ctx)
168    self.assertEqual(master_response.WhichOneof('result'), 'keyset')
169    master_keyset = master_response.keyset
170
171    keyset_response = keyset_servicer.Generate(gen_request, self._ctx)
172    self.assertEqual(keyset_response.WhichOneof('result'), 'keyset')
173    keyset = keyset_response.keyset
174
175    write_encrypted_request = testing_api_pb2.KeysetWriteEncryptedRequest(
176        keyset=keyset,
177        master_keyset=master_keyset,
178        keyset_writer_type=testing_api_pb2.KEYSET_WRITER_BINARY)
179    write_encrypted_response = keyset_servicer.WriteEncrypted(
180        write_encrypted_request, self._ctx)
181    self.assertEqual(
182        write_encrypted_response.WhichOneof('result'), 'encrypted_keyset')
183    encrypted_keyset = write_encrypted_response.encrypted_keyset
184
185    read_encrypted_request = testing_api_pb2.KeysetReadEncryptedRequest(
186        encrypted_keyset=encrypted_keyset,
187        master_keyset=master_keyset,
188        keyset_reader_type=testing_api_pb2.KEYSET_READER_BINARY)
189    read_encrypted_response = keyset_servicer.ReadEncrypted(
190        read_encrypted_request, self._ctx)
191    self.assertEqual(read_encrypted_response.WhichOneof('result'), 'keyset')
192    self.assertEqual(read_encrypted_response.keyset, keyset)
193
194  def test_generate_keyset_write_read_encrypted_with_associated_data(self):
195    keyset_servicer = services.KeysetServicer()
196
197    template = aead.aead_key_templates.AES128_GCM.SerializeToString()
198    gen_request = testing_api_pb2.KeysetGenerateRequest(template=template)
199    master_response = keyset_servicer.Generate(gen_request, self._ctx)
200    self.assertEqual(master_response.WhichOneof('result'), 'keyset')
201    master_keyset = master_response.keyset
202
203    keyset_response = keyset_servicer.Generate(gen_request, self._ctx)
204    self.assertEqual(keyset_response.WhichOneof('result'), 'keyset')
205    keyset = keyset_response.keyset
206
207    associated_data = b'associated_data'
208
209    write_encrypted_request = testing_api_pb2.KeysetWriteEncryptedRequest(
210        keyset=keyset,
211        master_keyset=master_keyset,
212        associated_data=testing_api_pb2.BytesValue(value=associated_data),
213        keyset_writer_type=testing_api_pb2.KEYSET_WRITER_BINARY)
214    write_encrypted_response = keyset_servicer.WriteEncrypted(
215        write_encrypted_request, self._ctx)
216    self.assertEqual(
217        write_encrypted_response.WhichOneof('result'), 'encrypted_keyset')
218    encrypted_keyset = write_encrypted_response.encrypted_keyset
219
220    read_encrypted_request = testing_api_pb2.KeysetReadEncryptedRequest(
221        encrypted_keyset=encrypted_keyset,
222        master_keyset=master_keyset,
223        associated_data=testing_api_pb2.BytesValue(value=associated_data),
224        keyset_reader_type=testing_api_pb2.KEYSET_READER_BINARY)
225    read_encrypted_response = keyset_servicer.ReadEncrypted(
226        read_encrypted_request, self._ctx)
227    self.assertEqual(read_encrypted_response.WhichOneof('result'), 'keyset')
228    self.assertEqual(read_encrypted_response.keyset, keyset)
229
230    # Using the wrong associated_data fails
231    read_encrypted_request = testing_api_pb2.KeysetReadEncryptedRequest(
232        encrypted_keyset=encrypted_keyset,
233        master_keyset=master_keyset,
234        associated_data=testing_api_pb2.BytesValue(value=b'wrong ad'),
235        keyset_reader_type=testing_api_pb2.KEYSET_READER_BINARY)
236    read_encrypted_response = keyset_servicer.ReadEncrypted(
237        read_encrypted_request, self._ctx)
238    self.assertEqual(read_encrypted_response.WhichOneof('result'), 'err')
239
240  def test_keyset_write_encrypted_fails_when_keyset_is_invalid(self):
241    keyset_servicer = services.KeysetServicer()
242
243    template = aead.aead_key_templates.AES128_GCM.SerializeToString()
244    gen_request = testing_api_pb2.KeysetGenerateRequest(template=template)
245    master_response = keyset_servicer.Generate(gen_request, self._ctx)
246    self.assertEqual(master_response.WhichOneof('result'), 'keyset')
247    master_keyset = master_response.keyset
248
249    write_encrypted_request = testing_api_pb2.KeysetWriteEncryptedRequest(
250        keyset=b'invalid',
251        master_keyset=master_keyset,
252        keyset_writer_type=testing_api_pb2.KEYSET_WRITER_BINARY)
253    write_encrypted_response = keyset_servicer.WriteEncrypted(
254        write_encrypted_request, self._ctx)
255    self.assertEqual(write_encrypted_response.WhichOneof('result'), 'err')
256
257  def test_keyset_read_encrypted_fails_when_encrypted_keyset_is_invalid(self):
258    keyset_servicer = services.KeysetServicer()
259
260    template = aead.aead_key_templates.AES128_GCM.SerializeToString()
261    gen_request = testing_api_pb2.KeysetGenerateRequest(template=template)
262    master_response = keyset_servicer.Generate(gen_request, self._ctx)
263    self.assertEqual(master_response.WhichOneof('result'), 'keyset')
264    master_keyset = master_response.keyset
265
266    read_encrypted_request = testing_api_pb2.KeysetReadEncryptedRequest(
267        encrypted_keyset=b'invalid',
268        master_keyset=master_keyset,
269        keyset_reader_type=testing_api_pb2.KEYSET_READER_BINARY)
270    read_encrypted_response = keyset_servicer.ReadEncrypted(
271        read_encrypted_request, self._ctx)
272    self.assertEqual(read_encrypted_response.WhichOneof('result'), 'err')
273
274  def test_create_aead(self):
275    keyset_servicer = services.KeysetServicer()
276    aead_servicer = services.AeadServicer()
277
278    template = aead.aead_key_templates.AES128_GCM.SerializeToString()
279    gen_request = testing_api_pb2.KeysetGenerateRequest(template=template)
280    gen_response = keyset_servicer.Generate(gen_request, self._ctx)
281    self.assertEqual(gen_response.WhichOneof('result'), 'keyset')
282
283    creation_request = testing_api_pb2.CreationRequest(
284        annotated_keyset=testing_api_pb2.AnnotatedKeyset(
285            serialized_keyset=gen_response.keyset))
286    creation_response = aead_servicer.Create(creation_request, self._ctx)
287    self.assertEmpty(creation_response.err)
288
289  def test_create_aead_broken_keyset(self):
290    aead_servicer = services.AeadServicer()
291
292    creation_request = testing_api_pb2.CreationRequest(
293        annotated_keyset=testing_api_pb2.AnnotatedKeyset(
294            serialized_keyset=b'\x80'))
295    creation_response = aead_servicer.Create(creation_request, self._ctx)
296    self.assertNotEmpty(creation_response.err)
297
298  def test_encrypt_decrypt_wrong_keyset(self):
299    aead_servicer = services.AeadServicer()
300    keyset_servicer = services.KeysetServicer()
301    # HMAC keysets will not allow creation of an AEAD.
302    template = mac.mac_key_templates.HMAC_SHA256_128BITTAG.SerializeToString()
303    gen_request = testing_api_pb2.KeysetGenerateRequest(template=template)
304    gen_response = keyset_servicer.Generate(gen_request, self._ctx)
305    self.assertEqual(gen_response.WhichOneof('result'), 'keyset')
306    keyset = gen_response.keyset
307
308    with self.assertRaises(tink.TinkError):
309      aead_servicer.Encrypt(
310          testing_api_pb2.AeadEncryptRequest(
311              annotated_keyset=testing_api_pb2.AnnotatedKeyset(
312                  serialized_keyset=keyset)), self._ctx)
313
314    with self.assertRaises(tink.TinkError):
315      aead_servicer.Decrypt(
316          testing_api_pb2.AeadDecryptRequest(
317              annotated_keyset=testing_api_pb2.AnnotatedKeyset(
318                  serialized_keyset=keyset)), self._ctx)
319
320  def test_generate_encrypt_decrypt(self):
321    keyset_servicer = services.KeysetServicer()
322    aead_servicer = services.AeadServicer()
323
324    template = aead.aead_key_templates.AES128_GCM.SerializeToString()
325    gen_request = testing_api_pb2.KeysetGenerateRequest(template=template)
326    gen_response = keyset_servicer.Generate(gen_request, self._ctx)
327    self.assertEqual(gen_response.WhichOneof('result'), 'keyset')
328    keyset = gen_response.keyset
329    plaintext = b'The quick brown fox jumps over the lazy dog'
330    associated_data = b'associated_data'
331    enc_request = testing_api_pb2.AeadEncryptRequest(
332        annotated_keyset=testing_api_pb2.AnnotatedKeyset(
333            serialized_keyset=keyset),
334        plaintext=plaintext,
335        associated_data=associated_data)
336    enc_response = aead_servicer.Encrypt(enc_request, self._ctx)
337    self.assertEqual(enc_response.WhichOneof('result'), 'ciphertext')
338    ciphertext = enc_response.ciphertext
339    dec_request = testing_api_pb2.AeadDecryptRequest(
340        annotated_keyset=testing_api_pb2.AnnotatedKeyset(
341            serialized_keyset=keyset),
342        ciphertext=ciphertext,
343        associated_data=associated_data)
344    dec_response = aead_servicer.Decrypt(dec_request, self._ctx)
345    self.assertEqual(dec_response.WhichOneof('result'), 'plaintext')
346    self.assertEqual(dec_response.plaintext, plaintext)
347
348  def test_generate_decrypt_fail(self):
349    keyset_servicer = services.KeysetServicer()
350    aead_servicer = services.AeadServicer()
351
352    template = aead.aead_key_templates.AES128_GCM.SerializeToString()
353    gen_request = testing_api_pb2.KeysetGenerateRequest(template=template)
354    gen_response = keyset_servicer.Generate(gen_request, self._ctx)
355    self.assertEqual(gen_response.WhichOneof('result'), 'keyset')
356    keyset = gen_response.keyset
357
358    ciphertext = b'some invalid ciphertext'
359    associated_data = b'associated_data'
360    dec_request = testing_api_pb2.AeadDecryptRequest(
361        annotated_keyset=testing_api_pb2.AnnotatedKeyset(
362            serialized_keyset=keyset),
363        ciphertext=ciphertext,
364        associated_data=associated_data)
365    dec_response = aead_servicer.Decrypt(dec_request, self._ctx)
366    self.assertEqual(dec_response.WhichOneof('result'), 'err')
367    self.assertNotEmpty(dec_response.err)
368
369  def test_server_info(self):
370    metadata_servicer = services.MetadataServicer()
371    request = testing_api_pb2.ServerInfoRequest()
372    response = metadata_servicer.GetServerInfo(request, self._ctx)
373    self.assertEqual(response.language, 'python')
374
375  def test_create_deterministic_aead(self):
376    keyset_servicer = services.KeysetServicer()
377    daead_servicer = services.DeterministicAeadServicer()
378
379    template_proto = daead.deterministic_aead_key_templates.AES256_SIV
380    template = template_proto.SerializeToString()
381    gen_request = testing_api_pb2.KeysetGenerateRequest(template=template)
382    gen_response = keyset_servicer.Generate(gen_request, self._ctx)
383    self.assertEqual(gen_response.WhichOneof('result'), 'keyset')
384
385    creation_request = testing_api_pb2.CreationRequest(
386        annotated_keyset=testing_api_pb2.AnnotatedKeyset(
387            serialized_keyset=gen_response.keyset))
388    creation_response = daead_servicer.Create(
389        creation_request, self._ctx)
390    self.assertEmpty(creation_response.err)
391
392  def test_create_deterministic_aead_broken_keyset(self):
393    daead_servicer = services.DeterministicAeadServicer()
394
395    creation_request = testing_api_pb2.CreationRequest(
396        annotated_keyset=testing_api_pb2.AnnotatedKeyset(
397            serialized_keyset=b'\x80'))
398    creation_response = daead_servicer.Create(creation_request, self._ctx)
399    self.assertNotEmpty(creation_response.err)
400
401  def test_encrypt_decrypt_deterministic_aead_broken_keyset(self):
402    keyset_servicer = services.KeysetServicer()
403    daead_servicer = services.DeterministicAeadServicer()
404
405    # AES128_GCM keysets will not allow creation of an Deterministic AEAD.
406    template = aead.aead_key_templates.AES128_GCM.SerializeToString()
407    gen_request = testing_api_pb2.KeysetGenerateRequest(template=template)
408    gen_response = keyset_servicer.Generate(gen_request, self._ctx)
409    self.assertEqual(gen_response.WhichOneof('result'), 'keyset')
410    keyset = gen_response.keyset
411
412    enc_request = testing_api_pb2.DeterministicAeadEncryptRequest(
413        annotated_keyset=testing_api_pb2.AnnotatedKeyset(
414            serialized_keyset=keyset))
415    with self.assertRaises(tink.TinkError):
416      daead_servicer.EncryptDeterministically(enc_request, self._ctx)
417    dec_request = testing_api_pb2.DeterministicAeadDecryptRequest(
418        annotated_keyset=testing_api_pb2.AnnotatedKeyset(
419            serialized_keyset=keyset))
420    with self.assertRaises(tink.TinkError):
421      daead_servicer.DecryptDeterministically(dec_request, self._ctx)
422
423  def test_generate_encrypt_decrypt_deterministically(self):
424    keyset_servicer = services.KeysetServicer()
425    daead_servicer = services.DeterministicAeadServicer()
426
427    template_proto = daead.deterministic_aead_key_templates.AES256_SIV
428    template = template_proto.SerializeToString()
429    gen_request = testing_api_pb2.KeysetGenerateRequest(template=template)
430    gen_response = keyset_servicer.Generate(gen_request, self._ctx)
431    self.assertEqual(gen_response.WhichOneof('result'), 'keyset')
432    keyset = gen_response.keyset
433    plaintext = b'The quick brown fox jumps over the lazy dog'
434    associated_data = b'associated_data'
435    enc_request = testing_api_pb2.DeterministicAeadEncryptRequest(
436        annotated_keyset=testing_api_pb2.AnnotatedKeyset(
437            serialized_keyset=keyset),
438        plaintext=plaintext,
439        associated_data=associated_data)
440    enc_response = daead_servicer.EncryptDeterministically(enc_request,
441                                                           self._ctx)
442    self.assertEqual(enc_response.WhichOneof('result'), 'ciphertext')
443    enc_response2 = daead_servicer.EncryptDeterministically(enc_request,
444                                                            self._ctx)
445    self.assertEqual(enc_response2.WhichOneof('result'), 'ciphertext')
446    self.assertEqual(enc_response2.ciphertext, enc_response.ciphertext)
447    ciphertext = enc_response.ciphertext
448    dec_request = testing_api_pb2.DeterministicAeadDecryptRequest(
449        annotated_keyset=testing_api_pb2.AnnotatedKeyset(
450            serialized_keyset=keyset),
451        ciphertext=ciphertext,
452        associated_data=associated_data)
453    dec_response = daead_servicer.DecryptDeterministically(dec_request,
454                                                           self._ctx)
455    self.assertEqual(dec_response.WhichOneof('result'), 'plaintext')
456    self.assertEqual(dec_response.plaintext, plaintext)
457
458  def test_generate_decrypt_deterministically_fail(self):
459    keyset_servicer = services.KeysetServicer()
460    daead_servicer = services.DeterministicAeadServicer()
461
462    template_proto = daead.deterministic_aead_key_templates.AES256_SIV
463    template = template_proto.SerializeToString()
464    gen_request = testing_api_pb2.KeysetGenerateRequest(template=template)
465    gen_response = keyset_servicer.Generate(gen_request, self._ctx)
466    self.assertEqual(gen_response.WhichOneof('result'), 'keyset')
467    keyset = gen_response.keyset
468
469    ciphertext = b'some invalid ciphertext'
470    associated_data = b'associated_data'
471    dec_request = testing_api_pb2.DeterministicAeadDecryptRequest(
472        annotated_keyset=testing_api_pb2.AnnotatedKeyset(
473            serialized_keyset=keyset),
474        ciphertext=ciphertext,
475        associated_data=associated_data)
476    dec_response = daead_servicer.DecryptDeterministically(dec_request,
477                                                           self._ctx)
478    self.assertEqual(dec_response.WhichOneof('result'), 'err')
479    self.assertNotEmpty(dec_response.err)
480
481  def test_create_mac(self):
482    keyset_servicer = services.KeysetServicer()
483    mac_servicer = services.MacServicer()
484
485    template = mac.mac_key_templates.HMAC_SHA256_128BITTAG.SerializeToString()
486    gen_request = testing_api_pb2.KeysetGenerateRequest(template=template)
487    gen_response = keyset_servicer.Generate(gen_request, self._ctx)
488    self.assertEqual(gen_response.WhichOneof('result'), 'keyset')
489
490    creation_request = testing_api_pb2.CreationRequest(
491        annotated_keyset=testing_api_pb2.AnnotatedKeyset(
492            serialized_keyset=gen_response.keyset))
493    creation_response = mac_servicer.Create(
494        creation_request, self._ctx)
495    self.assertEmpty(creation_response.err)
496
497  def test_create_mac_broken_keyset(self):
498    mac_servicer = services.MacServicer()
499
500    creation_request = testing_api_pb2.CreationRequest(
501        annotated_keyset=testing_api_pb2.AnnotatedKeyset(
502            serialized_keyset=b'\x80'))
503    creation_response = mac_servicer.Create(creation_request, self._ctx)
504    self.assertNotEmpty(creation_response.err)
505
506  def test_generate_compute_verify_mac(self):
507    keyset_servicer = services.KeysetServicer()
508    mac_servicer = services.MacServicer()
509
510    template = mac.mac_key_templates.HMAC_SHA256_128BITTAG.SerializeToString()
511    gen_request = testing_api_pb2.KeysetGenerateRequest(template=template)
512    gen_response = keyset_servicer.Generate(gen_request, self._ctx)
513    self.assertEqual(gen_response.WhichOneof('result'), 'keyset')
514    keyset = gen_response.keyset
515    data = b'The quick brown fox jumps over the lazy dog'
516    comp_request = testing_api_pb2.ComputeMacRequest(
517        annotated_keyset=testing_api_pb2.AnnotatedKeyset(
518            serialized_keyset=keyset),
519        data=data)
520    comp_response = mac_servicer.ComputeMac(comp_request, self._ctx)
521    self.assertEqual(comp_response.WhichOneof('result'), 'mac_value')
522    mac_value = comp_response.mac_value
523    verify_request = testing_api_pb2.VerifyMacRequest(
524        annotated_keyset=testing_api_pb2.AnnotatedKeyset(
525            serialized_keyset=keyset),
526        mac_value=mac_value,
527        data=data)
528    verify_response = mac_servicer.VerifyMac(verify_request, self._ctx)
529    self.assertEmpty(verify_response.err)
530
531  def test_generate_compute_verify_mac_fail(self):
532    keyset_servicer = services.KeysetServicer()
533    mac_servicer = services.MacServicer()
534
535    template = mac.mac_key_templates.HMAC_SHA256_128BITTAG.SerializeToString()
536    gen_request = testing_api_pb2.KeysetGenerateRequest(template=template)
537    gen_response = keyset_servicer.Generate(gen_request, self._ctx)
538    self.assertEqual(gen_response.WhichOneof('result'), 'keyset')
539    keyset = gen_response.keyset
540
541    verify_request = testing_api_pb2.VerifyMacRequest(
542        annotated_keyset=testing_api_pb2.AnnotatedKeyset(
543            serialized_keyset=keyset),
544        mac_value=b'invalid mac_value',
545        data=b'data')
546    verify_response = mac_servicer.VerifyMac(verify_request, self._ctx)
547    self.assertNotEmpty(verify_response.err)
548
549  def test_create_hybrid_decrypt(self):
550    keyset_servicer = services.KeysetServicer()
551    hybrid_servicer = services.HybridServicer()
552
553    tp = hybrid.hybrid_key_templates.ECIES_P256_HKDF_HMAC_SHA256_AES128_GCM
554    template = tp.SerializeToString()
555    gen_request = testing_api_pb2.KeysetGenerateRequest(template=template)
556    gen_response = keyset_servicer.Generate(gen_request, self._ctx)
557    self.assertEqual(gen_response.WhichOneof('result'), 'keyset')
558
559    creation_request = testing_api_pb2.CreationRequest(
560        annotated_keyset=testing_api_pb2.AnnotatedKeyset(
561            serialized_keyset=gen_response.keyset))
562    creation_response = hybrid_servicer.CreateHybridDecrypt(
563        creation_request, self._ctx)
564    self.assertEmpty(creation_response.err)
565
566  def test_create_hybrid_decrypt_bad_keyset(self):
567    hybrid_servicer = services.HybridServicer()
568
569    creation_request = testing_api_pb2.CreationRequest(
570        annotated_keyset=testing_api_pb2.AnnotatedKeyset(
571            serialized_keyset=b'\x80'))
572    creation_response = hybrid_servicer.CreateHybridDecrypt(
573        creation_request, self._ctx)
574    self.assertNotEmpty(creation_response.err)
575
576  def test_create_hybrid_encrypt(self):
577    keyset_servicer = services.KeysetServicer()
578    hybrid_servicer = services.HybridServicer()
579
580    tp = hybrid.hybrid_key_templates.ECIES_P256_HKDF_HMAC_SHA256_AES128_GCM
581    template = tp.SerializeToString()
582    gen_request = testing_api_pb2.KeysetGenerateRequest(template=template)
583    gen_response = keyset_servicer.Generate(gen_request, self._ctx)
584    self.assertEqual(gen_response.WhichOneof('result'), 'keyset')
585    pub_request = testing_api_pb2.KeysetPublicRequest(
586        private_keyset=gen_response.keyset)
587    pub_response = keyset_servicer.Public(pub_request, self._ctx)
588    self.assertEqual(pub_response.WhichOneof('result'), 'public_keyset')
589
590    creation_request = testing_api_pb2.CreationRequest(
591        annotated_keyset=testing_api_pb2.AnnotatedKeyset(
592            serialized_keyset=pub_response.public_keyset))
593    creation_response = hybrid_servicer.CreateHybridEncrypt(
594        creation_request, self._ctx)
595    self.assertEmpty(creation_response.err)
596
597  def test_create_hybrid_encrypt_bad_keyset(self):
598    hybrid_servicer = services.HybridServicer()
599
600    creation_request = testing_api_pb2.CreationRequest(
601        annotated_keyset=testing_api_pb2.AnnotatedKeyset(
602            serialized_keyset=b'\x80'))
603    creation_response = hybrid_servicer.CreateHybridEncrypt(
604        creation_request, self._ctx)
605    self.assertNotEmpty(creation_response.err)
606
607  def test_generate_hybrid_encrypt_decrypt(self):
608    keyset_servicer = services.KeysetServicer()
609    hybrid_servicer = services.HybridServicer()
610
611    tp = hybrid.hybrid_key_templates.ECIES_P256_HKDF_HMAC_SHA256_AES128_GCM
612    template = tp.SerializeToString()
613    gen_request = testing_api_pb2.KeysetGenerateRequest(template=template)
614    gen_response = keyset_servicer.Generate(gen_request, self._ctx)
615    self.assertEmpty(gen_response.err)
616    private_keyset = gen_response.keyset
617
618    pub_request = testing_api_pb2.KeysetPublicRequest(
619        private_keyset=private_keyset)
620    pub_response = keyset_servicer.Public(pub_request, self._ctx)
621    self.assertEqual(pub_response.WhichOneof('result'), 'public_keyset')
622    public_keyset = pub_response.public_keyset
623
624    plaintext = b'The quick brown fox jumps over the lazy dog'
625    context_info = b'context_info'
626    enc_request = testing_api_pb2.HybridEncryptRequest(
627        public_annotated_keyset=testing_api_pb2.AnnotatedKeyset(
628            serialized_keyset=public_keyset),
629        plaintext=plaintext,
630        context_info=context_info)
631    enc_response = hybrid_servicer.Encrypt(enc_request, self._ctx)
632    self.assertEqual(enc_response.WhichOneof('result'), 'ciphertext')
633    ciphertext = enc_response.ciphertext
634
635    dec_request = testing_api_pb2.HybridDecryptRequest(
636        private_annotated_keyset=testing_api_pb2.AnnotatedKeyset(
637            serialized_keyset=private_keyset),
638        ciphertext=ciphertext,
639        context_info=context_info)
640    dec_response = hybrid_servicer.Decrypt(dec_request, self._ctx)
641    self.assertEqual(dec_response.WhichOneof('result'), 'plaintext')
642    self.assertEqual(dec_response.plaintext, plaintext)
643
644  def test_generate_hybrid_encrypt_decrypt_fail(self):
645    keyset_servicer = services.KeysetServicer()
646    hybrid_servicer = services.HybridServicer()
647
648    tp = hybrid.hybrid_key_templates.ECIES_P256_HKDF_HMAC_SHA256_AES128_GCM
649    template = tp.SerializeToString()
650    gen_request = testing_api_pb2.KeysetGenerateRequest(template=template)
651    gen_response = keyset_servicer.Generate(gen_request, self._ctx)
652    self.assertEqual(gen_response.WhichOneof('result'), 'keyset')
653    private_keyset = gen_response.keyset
654
655    dec_request = testing_api_pb2.HybridDecryptRequest(
656        private_annotated_keyset=testing_api_pb2.AnnotatedKeyset(
657            serialized_keyset=private_keyset),
658        ciphertext=b'invalid ciphertext',
659        context_info=b'context_info')
660    dec_response = hybrid_servicer.Decrypt(dec_request, self._ctx)
661    self.assertEqual(dec_response.WhichOneof('result'), 'err')
662    self.assertNotEmpty(dec_response.err)
663
664  def test_create_public_key_sign(self):
665    keyset_servicer = services.KeysetServicer()
666    signature_servicer = services.SignatureServicer()
667
668    template = signature.signature_key_templates.ECDSA_P256.SerializeToString()
669    gen_request = testing_api_pb2.KeysetGenerateRequest(template=template)
670    gen_response = keyset_servicer.Generate(gen_request, self._ctx)
671    self.assertEqual(gen_response.WhichOneof('result'), 'keyset')
672
673    creation_request = testing_api_pb2.CreationRequest(
674        annotated_keyset=testing_api_pb2.AnnotatedKeyset(
675            serialized_keyset=gen_response.keyset))
676    creation_response = signature_servicer.CreatePublicKeySign(
677        creation_request, self._ctx)
678    self.assertEmpty(creation_response.err)
679
680  def test_create_public_key_sign_bad_keyset(self):
681    signature_servicer = services.SignatureServicer()
682
683    creation_request = testing_api_pb2.CreationRequest(
684        annotated_keyset=testing_api_pb2.AnnotatedKeyset(
685            serialized_keyset=b'\x80'))
686    creation_response = signature_servicer.CreatePublicKeySign(
687        creation_request, self._ctx)
688    self.assertNotEmpty(creation_response.err)
689
690  def test_create_public_key_verify(self):
691    keyset_servicer = services.KeysetServicer()
692    signature_servicer = services.SignatureServicer()
693
694    template = signature.signature_key_templates.ECDSA_P256.SerializeToString()
695    gen_request = testing_api_pb2.KeysetGenerateRequest(template=template)
696    gen_response = keyset_servicer.Generate(gen_request, self._ctx)
697    self.assertEqual(gen_response.WhichOneof('result'), 'keyset')
698    pub_request = testing_api_pb2.KeysetPublicRequest(
699        private_keyset=gen_response.keyset)
700    pub_response = keyset_servicer.Public(pub_request, self._ctx)
701    self.assertEqual(pub_response.WhichOneof('result'), 'public_keyset')
702
703    creation_request = testing_api_pb2.CreationRequest(
704        annotated_keyset=testing_api_pb2.AnnotatedKeyset(
705            serialized_keyset=pub_response.public_keyset))
706    creation_response = signature_servicer.CreatePublicKeyVerify(
707        creation_request, self._ctx)
708    self.assertEmpty(creation_response.err)
709
710  def test_create_public_key_verify_bad_keyset(self):
711    signature_servicer = services.SignatureServicer()
712
713    creation_request = testing_api_pb2.CreationRequest(
714        annotated_keyset=testing_api_pb2.AnnotatedKeyset(
715            serialized_keyset=b'\x80'))
716    creation_response = signature_servicer.CreatePublicKeyVerify(
717        creation_request, self._ctx)
718    self.assertNotEmpty(creation_response.err)
719
720  def test_sign_verify(self):
721    keyset_servicer = services.KeysetServicer()
722    signature_servicer = services.SignatureServicer()
723
724    template = signature.signature_key_templates.ECDSA_P256.SerializeToString()
725    gen_request = testing_api_pb2.KeysetGenerateRequest(template=template)
726    gen_response = keyset_servicer.Generate(gen_request, self._ctx)
727
728    self.assertEqual(gen_response.WhichOneof('result'), 'keyset')
729    private_keyset = gen_response.keyset
730
731    pub_request = testing_api_pb2.KeysetPublicRequest(
732        private_keyset=private_keyset)
733    pub_response = keyset_servicer.Public(pub_request, self._ctx)
734    self.assertEqual(pub_response.WhichOneof('result'), 'public_keyset')
735    public_keyset = pub_response.public_keyset
736
737    data = b'The quick brown fox jumps over the lazy dog'
738
739    sign_request = testing_api_pb2.SignatureSignRequest(
740        private_annotated_keyset=testing_api_pb2.AnnotatedKeyset(
741            serialized_keyset=private_keyset),
742        data=data)
743    sign_response = signature_servicer.Sign(sign_request, self._ctx)
744    self.assertEqual(sign_response.WhichOneof('result'), 'signature')
745    a_signature = sign_response.signature
746
747    verify_request = testing_api_pb2.SignatureVerifyRequest(
748        public_annotated_keyset=testing_api_pb2.AnnotatedKeyset(
749            serialized_keyset=public_keyset),
750        signature=a_signature,
751        data=data)
752    verify_response = signature_servicer.Verify(verify_request, self._ctx)
753    self.assertEmpty(verify_response.err)
754
755  def test_sign_verify_fail(self):
756    keyset_servicer = services.KeysetServicer()
757    signature_servicer = services.SignatureServicer()
758
759    template = signature.signature_key_templates.ECDSA_P256.SerializeToString()
760    gen_request = testing_api_pb2.KeysetGenerateRequest(template=template)
761    gen_response = keyset_servicer.Generate(gen_request, self._ctx)
762    self.assertEqual(gen_response.WhichOneof('result'), 'keyset')
763    self.assertEmpty(gen_response.err)
764    private_keyset = gen_response.keyset
765
766    pub_request = testing_api_pb2.KeysetPublicRequest(
767        private_keyset=private_keyset)
768    pub_response = keyset_servicer.Public(pub_request, self._ctx)
769    self.assertEqual(pub_response.WhichOneof('result'), 'public_keyset')
770    public_keyset = pub_response.public_keyset
771
772    invalid_request = testing_api_pb2.SignatureVerifyRequest(
773        public_annotated_keyset=testing_api_pb2.AnnotatedKeyset(
774            serialized_keyset=public_keyset),
775        signature=b'invalid signature',
776        data=b'The quick brown fox jumps over the lazy dog')
777    invalid_response = signature_servicer.Verify(invalid_request, self._ctx)
778    self.assertNotEmpty(invalid_response.err)
779
780  def test_create_prf_set(self):
781    keyset_servicer = services.KeysetServicer()
782    prf_set_servicer = services.PrfSetServicer()
783
784    template = prf.prf_key_templates.HMAC_SHA256.SerializeToString()
785    gen_request = testing_api_pb2.KeysetGenerateRequest(template=template)
786    gen_response = keyset_servicer.Generate(gen_request, self._ctx)
787    self.assertEqual(gen_response.WhichOneof('result'), 'keyset')
788
789    creation_request = testing_api_pb2.CreationRequest(
790        annotated_keyset=testing_api_pb2.AnnotatedKeyset(
791            serialized_keyset=gen_response.keyset))
792    creation_response = prf_set_servicer.Create(creation_request, self._ctx)
793    self.assertEmpty(creation_response.err)
794
795  def test_create_prf_set_wrong_keyset(self):
796    prf_set_servicer = services.PrfSetServicer()
797
798    creation_request = testing_api_pb2.CreationRequest(
799        annotated_keyset=testing_api_pb2.AnnotatedKeyset(
800            serialized_keyset=b'\x80'))
801    creation_response = prf_set_servicer.Create(creation_request, self._ctx)
802    self.assertNotEmpty(creation_response.err)
803
804  def test_compute_prf(self):
805    keyset_servicer = services.KeysetServicer()
806    prf_set_servicer = services.PrfSetServicer()
807    template = prf.prf_key_templates.HMAC_SHA256.SerializeToString()
808    gen_request = testing_api_pb2.KeysetGenerateRequest(template=template)
809    gen_response = keyset_servicer.Generate(gen_request, self._ctx)
810    self.assertEqual(gen_response.WhichOneof('result'), 'keyset')
811    keyset = gen_response.keyset
812
813    key_ids_request = testing_api_pb2.PrfSetKeyIdsRequest(
814        annotated_keyset=testing_api_pb2.AnnotatedKeyset(
815            serialized_keyset=keyset))
816    key_ids_response = prf_set_servicer.KeyIds(key_ids_request, self._ctx)
817    self.assertEqual(key_ids_response.WhichOneof('result'), 'output')
818    self.assertLen(key_ids_response.output.key_id, 1)
819    self.assertEqual(key_ids_response.output.key_id[0],
820                     key_ids_response.output.primary_key_id)
821
822    output_length = 31
823    compute_request = testing_api_pb2.PrfSetComputeRequest(
824        annotated_keyset=testing_api_pb2.AnnotatedKeyset(
825            serialized_keyset=keyset),
826        key_id=key_ids_response.output.primary_key_id,
827        input_data=b'input_data',
828        output_length=output_length)
829    compute_response = prf_set_servicer.Compute(compute_request, self._ctx)
830    self.assertEqual(compute_response.WhichOneof('result'), 'output')
831    self.assertLen(compute_response.output, output_length)
832
833  def test_key_ids_prf_fail(self):
834    prf_set_servicer = services.PrfSetServicer()
835    invalid_key_ids_response = prf_set_servicer.KeyIds(
836        testing_api_pb2.PrfSetKeyIdsRequest(
837            annotated_keyset=testing_api_pb2.AnnotatedKeyset(
838                serialized_keyset=b'badkeyset')), self._ctx)
839    self.assertNotEmpty(invalid_key_ids_response.err)
840
841  def test_compute_prf_fail(self):
842    keyset_servicer = services.KeysetServicer()
843    prf_set_servicer = services.PrfSetServicer()
844    template = prf.prf_key_templates.HMAC_SHA256.SerializeToString()
845    gen_request = testing_api_pb2.KeysetGenerateRequest(template=template)
846    gen_response = keyset_servicer.Generate(gen_request, self._ctx)
847    self.assertEqual(gen_response.WhichOneof('result'), 'keyset')
848    keyset = gen_response.keyset
849    key_ids_request = testing_api_pb2.PrfSetKeyIdsRequest(
850        annotated_keyset=testing_api_pb2.AnnotatedKeyset(
851            serialized_keyset=keyset))
852    key_ids_response = prf_set_servicer.KeyIds(key_ids_request, self._ctx)
853    self.assertEqual(key_ids_response.WhichOneof('result'), 'output')
854    primary_key_id = key_ids_response.output.primary_key_id
855
856    invalid_output_length = 123456
857    invalid_compute_request = testing_api_pb2.PrfSetComputeRequest(
858        annotated_keyset=testing_api_pb2.AnnotatedKeyset(
859            serialized_keyset=keyset),
860        key_id=primary_key_id,
861        input_data=b'input_data',
862        output_length=invalid_output_length)
863    invalid_compute_response = prf_set_servicer.Compute(invalid_compute_request,
864                                                        self._ctx)
865    self.assertEqual(invalid_compute_response.WhichOneof('result'), 'err')
866    self.assertNotEmpty(invalid_compute_response.err)
867
868  def test_create_streaming_aead(self):
869    keyset_servicer = services.KeysetServicer()
870    streaming_aead_servicer = services.StreamingAeadServicer()
871
872    templates = streaming_aead.streaming_aead_key_templates
873    template = templates.AES128_CTR_HMAC_SHA256_4KB.SerializeToString()
874    gen_request = testing_api_pb2.KeysetGenerateRequest(template=template)
875    gen_response = keyset_servicer.Generate(gen_request, self._ctx)
876    self.assertEqual(gen_response.WhichOneof('result'), 'keyset')
877
878    creation_request = testing_api_pb2.CreationRequest(
879        annotated_keyset=testing_api_pb2.AnnotatedKeyset(
880            serialized_keyset=gen_response.keyset))
881    creation_response = streaming_aead_servicer.Create(
882        creation_request, self._ctx)
883    self.assertEmpty(creation_response.err)
884
885  def test_create_streaming_aead_broken_keyset(self):
886    streaming_aead_servicer = services.StreamingAeadServicer()
887
888    creation_request = testing_api_pb2.CreationRequest(
889        annotated_keyset=testing_api_pb2.AnnotatedKeyset(
890            serialized_keyset=b'\x80'))
891    creation_response = streaming_aead_servicer.Create(creation_request,
892                                                       self._ctx)
893    self.assertNotEmpty(creation_response.err)
894
895  def test_generate_streaming_encrypt_decrypt(self):
896    keyset_servicer = services.KeysetServicer()
897    streaming_aead_servicer = services.StreamingAeadServicer()
898
899    templates = streaming_aead.streaming_aead_key_templates
900    template = templates.AES128_CTR_HMAC_SHA256_4KB.SerializeToString()
901    gen_request = testing_api_pb2.KeysetGenerateRequest(template=template)
902    gen_response = keyset_servicer.Generate(gen_request, self._ctx)
903    self.assertEqual(gen_response.WhichOneof('result'), 'keyset')
904    keyset = gen_response.keyset
905    plaintext = b'The quick brown fox jumps over the lazy dog'
906    associated_data = b'associated_data'
907
908    enc_request = testing_api_pb2.StreamingAeadEncryptRequest(
909        annotated_keyset=testing_api_pb2.AnnotatedKeyset(
910            serialized_keyset=keyset),
911        plaintext=plaintext,
912        associated_data=associated_data)
913    enc_response = streaming_aead_servicer.Encrypt(enc_request, self._ctx)
914    self.assertEqual(enc_response.WhichOneof('result'), 'ciphertext')
915    ciphertext = enc_response.ciphertext
916
917    dec_request = testing_api_pb2.StreamingAeadDecryptRequest(
918        annotated_keyset=testing_api_pb2.AnnotatedKeyset(
919            serialized_keyset=keyset),
920        ciphertext=ciphertext,
921        associated_data=associated_data)
922    dec_response = streaming_aead_servicer.Decrypt(dec_request, self._ctx)
923    self.assertEqual(dec_response.WhichOneof('result'), 'plaintext')
924
925    self.assertEqual(dec_response.plaintext, plaintext)
926
927  def test_generate_streaming_decrypt_fail(self):
928    keyset_servicer = services.KeysetServicer()
929    streaming_aead_servicer = services.StreamingAeadServicer()
930
931    templates = streaming_aead.streaming_aead_key_templates
932    template = templates.AES128_CTR_HMAC_SHA256_4KB.SerializeToString()
933    gen_request = testing_api_pb2.KeysetGenerateRequest(template=template)
934    gen_response = keyset_servicer.Generate(gen_request, self._ctx)
935    self.assertEqual(gen_response.WhichOneof('result'), 'keyset')
936    keyset = gen_response.keyset
937
938    ciphertext = b'some invalid ciphertext'
939    associated_data = b'associated_data'
940    dec_request = testing_api_pb2.StreamingAeadDecryptRequest(
941        annotated_keyset=testing_api_pb2.AnnotatedKeyset(
942            serialized_keyset=keyset),
943        ciphertext=ciphertext,
944        associated_data=associated_data)
945    dec_response = streaming_aead_servicer.Decrypt(dec_request, self._ctx)
946    self.assertEqual(dec_response.WhichOneof('result'), 'err')
947    self.assertNotEmpty(dec_response.err)
948
949
950if __name__ == '__main__':
951  absltest.main()
952