xref: /aosp_15_r20/external/tink/python/tink/streaming_aead/_decrypting_stream_test.py (revision e7b1675dde1b92d52ec075b0a92829627f2c52a5)
1# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Tests for tink.python.tink.streaming_aead.decrypting_stream."""
15
16import io
17from typing import BinaryIO
18
19from absl.testing import absltest
20
21from tink import core
22from tink import streaming_aead
23from tink.streaming_aead import _raw_streaming_aead
24
25# Using malformed UTF-8 sequences to ensure there is no accidental decoding.
26B_X80 = b'\x80'
27B_SOMETHING_ = b'somethin' + B_X80
28B_AAD_ = b'aa' + B_X80
29
30
31def setUpModule():
32  streaming_aead.register()
33
34
35def get_raw_primitive():
36  key_data = core.Registry.new_key_data(
37      streaming_aead.streaming_aead_key_templates.AES128_CTR_HMAC_SHA256_4KB)
38  return core.Registry.primitive(key_data, _raw_streaming_aead.RawStreamingAead)
39
40
41def get_raw_decrypting_stream(
42    ciphertext_source: BinaryIO,
43    aad: bytes,
44    close_ciphertext_source: bool = True) -> io.RawIOBase:
45  return get_raw_primitive().new_raw_decrypting_stream(
46      ciphertext_source, aad, close_ciphertext_source=close_ciphertext_source)
47
48
49class DecryptingStreamTest(absltest.TestCase):
50
51  def test_unsupported_operation(self):
52    f = io.BytesIO(B_SOMETHING_)
53    ds = get_raw_primitive().new_raw_decrypting_stream(
54        f, B_AAD_, close_ciphertext_source=True)
55
56    with self.assertRaises(io.UnsupportedOperation):
57      ds.seek(0, 0)
58    with self.assertRaises(io.UnsupportedOperation):
59      ds.tell()
60    with self.assertRaises(io.UnsupportedOperation):
61      ds.truncate()
62    with self.assertRaises(io.UnsupportedOperation):
63      ds.write(b'data')
64    with self.assertRaises(io.UnsupportedOperation):
65      ds.writelines([b'data'])
66    with self.assertRaises(io.UnsupportedOperation):
67      ds.fileno()
68
69  def test_closed_methods_raise(self):
70    f = io.BytesIO(B_SOMETHING_)
71    ds = get_raw_primitive().new_raw_decrypting_stream(
72        f, B_AAD_, close_ciphertext_source=True)
73
74    ds.close()
75    with self.assertRaisesRegex(ValueError, 'closed'):
76      ds.read()
77    with self.assertRaisesRegex(ValueError, 'closed'):
78      ds.flush()
79
80  def test_inquiries(self):
81    f = io.BytesIO(B_SOMETHING_)
82    ds = get_raw_primitive().new_raw_decrypting_stream(
83        f, B_AAD_, close_ciphertext_source=True)
84
85    self.assertTrue(ds.readable())
86    self.assertFalse(ds.writable())
87    self.assertFalse(ds.seekable())
88    self.assertFalse(ds.isatty())
89
90
91if __name__ == '__main__':
92  absltest.main()
93