1# Copyright 2020 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests for tink.python.tink.streaming_aead.decrypting_stream.""" 15 16import io 17from typing import BinaryIO 18 19from absl.testing import absltest 20 21from tink import core 22from tink import streaming_aead 23from tink.streaming_aead import _raw_streaming_aead 24 25# Using malformed UTF-8 sequences to ensure there is no accidental decoding. 26B_X80 = b'\x80' 27B_SOMETHING_ = b'somethin' + B_X80 28B_AAD_ = b'aa' + B_X80 29 30 31def setUpModule(): 32 streaming_aead.register() 33 34 35def get_raw_primitive(): 36 key_data = core.Registry.new_key_data( 37 streaming_aead.streaming_aead_key_templates.AES128_CTR_HMAC_SHA256_4KB) 38 return core.Registry.primitive(key_data, _raw_streaming_aead.RawStreamingAead) 39 40 41def get_raw_decrypting_stream( 42 ciphertext_source: BinaryIO, 43 aad: bytes, 44 close_ciphertext_source: bool = True) -> io.RawIOBase: 45 return get_raw_primitive().new_raw_decrypting_stream( 46 ciphertext_source, aad, close_ciphertext_source=close_ciphertext_source) 47 48 49class DecryptingStreamTest(absltest.TestCase): 50 51 def test_unsupported_operation(self): 52 f = io.BytesIO(B_SOMETHING_) 53 ds = get_raw_primitive().new_raw_decrypting_stream( 54 f, B_AAD_, close_ciphertext_source=True) 55 56 with self.assertRaises(io.UnsupportedOperation): 57 ds.seek(0, 0) 58 with self.assertRaises(io.UnsupportedOperation): 59 ds.tell() 60 with self.assertRaises(io.UnsupportedOperation): 61 ds.truncate() 62 with self.assertRaises(io.UnsupportedOperation): 63 ds.write(b'data') 64 with self.assertRaises(io.UnsupportedOperation): 65 ds.writelines([b'data']) 66 with self.assertRaises(io.UnsupportedOperation): 67 ds.fileno() 68 69 def test_closed_methods_raise(self): 70 f = io.BytesIO(B_SOMETHING_) 71 ds = get_raw_primitive().new_raw_decrypting_stream( 72 f, B_AAD_, close_ciphertext_source=True) 73 74 ds.close() 75 with self.assertRaisesRegex(ValueError, 'closed'): 76 ds.read() 77 with self.assertRaisesRegex(ValueError, 'closed'): 78 ds.flush() 79 80 def test_inquiries(self): 81 f = io.BytesIO(B_SOMETHING_) 82 ds = get_raw_primitive().new_raw_decrypting_stream( 83 f, B_AAD_, close_ciphertext_source=True) 84 85 self.assertTrue(ds.readable()) 86 self.assertFalse(ds.writable()) 87 self.assertFalse(ds.seekable()) 88 self.assertFalse(ds.isatty()) 89 90 91if __name__ == '__main__': 92 absltest.main() 93