xref: /aosp_15_r20/external/tink/python/tink/streaming_aead/_decrypting_stream.py (revision e7b1675dde1b92d52ec075b0a92829627f2c52a5)
1# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""A file-like object that decrypts the data it reads.
15
16It reads the ciphertext from a given other file-like object, and decrypts it.
17"""
18
19import io
20from typing import BinaryIO
21
22from tink import core
23from tink.cc.pybind import tink_bindings
24from tink.streaming_aead import _file_object_adapter
25
26
27class RawDecryptingStream(io.RawIOBase):
28  """A file-like object which decrypts reads from an underlying object.
29
30  It reads the ciphertext from the wrapped file-like object, and decrypts it.
31  """
32
33  def __init__(self, stream_aead: tink_bindings.StreamingAead,
34               ciphertext_source: BinaryIO, associated_data: bytes, *,
35               close_ciphertext_source: bool):
36    """Create a new RawDecryptingStream.
37
38    Args:
39      stream_aead: C++ StreamingAead primitive from which a C++ DecryptingStream
40        will be obtained.
41      ciphertext_source: A readable file-like object from which ciphertext bytes
42        will be read.
43      associated_data: The associated data to use for decryption.
44      close_ciphertext_source: Whether ciphertext_source should be closed when
45        close() is called.
46    """
47    super().__init__()
48    self._ciphertext_source = ciphertext_source
49    self._close_ciphertext_source = close_ciphertext_source
50    if not ciphertext_source.readable():
51      raise ValueError('ciphertext_source must be readable')
52    cc_ciphertext_source = _file_object_adapter.FileObjectAdapter(
53        ciphertext_source)
54    self._input_stream_adapter = self._get_input_stream_adapter(
55        stream_aead, associated_data, cc_ciphertext_source)
56
57  @staticmethod
58  @core.use_tink_errors
59  def _get_input_stream_adapter(cc_primitive, aad, source):
60    """Implemented as a separate method to ensure correct error transform."""
61    return tink_bindings.new_cc_decrypting_stream(
62        cc_primitive, aad, source)
63
64  @core.use_tink_errors
65  def _read_from_input_stream_adapter(self, size: int) -> bytes:
66    """Implemented as a separate method to ensure correct error transform."""
67    return self._input_stream_adapter.read(size)
68
69  def read(self, size=-1) -> bytes:
70    """Read and return up to size bytes, where size is an int.
71
72    It blocks until at least one byte can be returned.
73
74    Args:
75      size: Maximum number of bytes to read. As a convenience, if size is
76      unspecified or -1, all bytes until EOF are returned.
77
78    Returns:
79      Bytes read. If b'' is returned and size was not 0, this indicates EOF.
80
81    Raises:
82      TinkError if there was a permanent error.
83    """
84    if self.closed:  # pylint:disable=using-constant-test
85      raise ValueError('read on closed file.')
86    if size is None:
87      size = -1
88    if size < 0:
89      return self.readall()
90    try:
91      # _input_stream_adapter may return an empty string when there is currently
92      # no data is available. In Python (in blocking mode), read is expected to
93      # block until some data is available.
94      # https://docs.python.org/3/library/io.html also mentions a
95      # non-blocking mode, but according to https://bugs.python.org/issue13322
96      # that mode is not properly implemented and not really used.
97      while True:
98        data = self._read_from_input_stream_adapter(size)
99        if data:
100          return data
101    except tink_bindings.PythonTinkStreamFinishedException:
102      return b''
103
104  def readinto(self, b: bytearray) -> int:
105    """Read bytes into a pre-allocated bytes-like object b.
106
107    Args:
108      b: Bytes-like object to which data will be read.
109
110    Returns:
111      Number of bytes read. It returns 0 if EOF is reached, and None if no data
112      is available at the moment.
113
114    Raises:
115      TinkError if there was a permanent error.
116    """
117    data = self.read(len(b))
118    n = len(data)
119    b[:n] = data
120    return n
121
122  def close(self) -> None:
123    """Close the stream. Has no effect on a closed stream."""
124    if self.closed:  # pylint:disable=using-constant-test
125      return
126    if self._close_ciphertext_source:
127      self._ciphertext_source.close()
128    super().close()
129
130  def readable(self) -> bool:
131    """Return True if the stream can be read from."""
132    return True
133
134  def write(self, b: bytes) -> int:
135    raise io.UnsupportedOperation()
136