xref: /aosp_15_r20/external/tink/python/tink/streaming_aead/_encrypting_stream.py (revision e7b1675dde1b92d52ec075b0a92829627f2c52a5)
1# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""A file-like object that encrypts data written to it.
15
16It writes the ciphertext to a given other file-like object, which can later be
17decrypted and read using a DecryptingStream wrapper.
18"""
19
20import io
21from typing import BinaryIO, Optional
22
23from tink import core
24from tink.cc.pybind import tink_bindings
25from tink.streaming_aead import _file_object_adapter
26
27
28@core.use_tink_errors
29def _new_cc_encrypting_stream(cc_primitive, aad, destination):
30  """Implemented as a separate function to ensure correct error transform."""
31  return tink_bindings.new_cc_encrypting_stream(
32      cc_primitive, aad, destination)
33
34
35class RawEncryptingStream(io.RawIOBase):
36  """A file-like object which wraps writes to an underlying file-like object.
37
38  It encrypts any data written to it, and writes the ciphertext to the wrapped
39  object.
40
41  The close() method indicates that the message is complete, and will write a
42  final ciphertext block to signify end of message.
43  """
44
45  def __init__(self, stream_aead: tink_bindings.StreamingAead,
46               ciphertext_destination: BinaryIO, associated_data: bytes):
47    """Create a new RawEncryptingStream.
48
49    Args:
50      stream_aead: C++ StreamingAead primitive from which a C++ EncryptingStream
51        will be obtained.
52      ciphertext_destination: A writable file-like object to which ciphertext
53        bytes will be written.
54      associated_data: The associated data to use for encryption. This must
55        match the associated_data used for decryption.
56    """
57    super().__init__()
58    if not ciphertext_destination.writable():
59      raise ValueError('ciphertext_destination must be writable')
60    cc_ciphertext_destination = _file_object_adapter.FileObjectAdapter(
61        ciphertext_destination)
62    self._cc_encrypting_stream = _new_cc_encrypting_stream(
63        stream_aead, associated_data, cc_ciphertext_destination)
64
65  @core.use_tink_errors
66  def _write_to_cc_encrypting_stream(self, b: bytes) -> int:
67    return self._cc_encrypting_stream.write(bytes(b))
68
69  @core.use_tink_errors
70  def _close_cc_encrypting_stream(self) -> None:
71    self._cc_encrypting_stream.close()
72
73  def readinto(self, b: bytearray) -> Optional[int]:
74    raise io.UnsupportedOperation()
75
76  def write(self, b: bytes) -> int:
77    """Write the given buffer to the IO stream.
78
79    Args:
80      b: The buffer to write.
81    Returns:
82      The number of bytes written, which may be less than the length of b in
83      bytes.
84    Raises:
85      TinkError: if there was a permanent error.
86
87    """
88    if self.closed:  # pylint:disable=using-constant-test
89      raise ValueError('write on closed file')
90
91    if not isinstance(b, (bytes, memoryview, bytearray)):
92      raise TypeError('a bytes-like object is required, not {}'.format(
93          type(b).__name__))
94    written = self._write_to_cc_encrypting_stream(b)
95    if written < 0 or written > len(b):
96      raise core.TinkError('Incorrect number of bytes written')
97    return written
98
99  def close(self) -> None:
100    """Flush and close the stream. Has no effect on a closed stream."""
101    if self.closed:  # pylint:disable=using-constant-test
102      return
103    self.flush()
104    self._close_cc_encrypting_stream()
105    super().close()
106
107  def writable(self) -> bool:
108    """Return True if the stream supports writing."""
109    return True
110