xref: /aosp_15_r20/external/tink/python/examples/gcs/gcs_envelope_aead.py (revision e7b1675dde1b92d52ec075b0a92829627f2c52a5)
1# Copyright 2021 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS-IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# [START gcs-envelope-aead-example]
15"""A command-line utility for performing file encryption using GCS.
16
17It is inteded for use with small files, utilizes envelope encryption and
18facilitates ciphertexts stored in GCS.
19"""
20
21from absl import app
22from absl import flags
23from absl import logging
24from google.cloud import storage
25
26import tink
27from tink import aead
28from tink.integration import gcpkms
29
30
31FLAGS = flags.FLAGS
32
33flags.DEFINE_enum('mode', None, ['encrypt', 'decrypt'],
34                  'The operation to perform.')
35flags.DEFINE_string('kek_uri', None,
36                    'The Cloud KMS URI of the key encryption key.')
37flags.DEFINE_string('gcp_credential_path', None,
38                    'Path to the GCP credentials JSON file.')
39flags.DEFINE_string('gcp_project_id', None,
40                    'The ID of the GCP project hosting the GCS blobs.')
41flags.DEFINE_string('local_path', None, 'Path to the local file.')
42flags.DEFINE_string('gcs_blob_path', None, 'Path to the GCS blob.')
43
44
45_GCS_PATH_PREFIX = 'gs://'
46
47
48def main(argv):
49  del argv  # Unused.
50
51  # Initialise Tink
52  aead.register()
53
54  try:
55    # Read the GCP credentials and setup client
56    client = gcpkms.GcpKmsClient(FLAGS.kek_uri, FLAGS.gcp_credential_path)
57  except tink.TinkError as e:
58    logging.exception('Error creating GCP KMS client: %s', e)
59    return 1
60
61  # Create envelope AEAD primitive using AES256 GCM for encrypting the data
62  try:
63    remote_aead = client.get_aead(FLAGS.kek_uri)
64    env_aead = aead.KmsEnvelopeAead(
65        aead.aead_key_templates.AES256_GCM, remote_aead
66    )
67  except tink.TinkError as e:
68    logging.exception('Error creating primitive: %s', e)
69    return 1
70
71  storage_client = storage.Client.from_service_account_json(
72      FLAGS.gcp_credential_path)
73
74  try:
75    bucket_name, object_name = _get_bucket_and_object(FLAGS.gcs_blob_path)
76  except ValueError as e:
77    logging.exception('Error parsing GCS blob path: %s', e)
78    return 1
79  bucket = storage_client.bucket(bucket_name)
80  blob = bucket.blob(object_name)
81  associated_data = FLAGS.gcs_blob_path.encode('utf-8')
82
83  if FLAGS.mode == 'encrypt':
84    with open(FLAGS.local_path, 'rb') as input_file:
85      output_data = env_aead.encrypt(input_file.read(), associated_data)
86    blob.upload_from_string(output_data)
87
88  elif FLAGS.mode == 'decrypt':
89    ciphertext = blob.download_as_bytes()
90    with open(FLAGS.local_path, 'wb') as output_file:
91      output_file.write(env_aead.decrypt(ciphertext, associated_data))
92
93  else:
94    logging.error(
95        'Unsupported mode %s. Please choose "encrypt" or "decrypt".',
96        FLAGS.mode,
97    )
98    return 1
99
100
101def _get_bucket_and_object(gcs_blob_path):
102  """Extract bucket and object name from a GCS blob path.
103
104  Args:
105    gcs_blob_path: path to a GCS blob
106
107  Returns:
108    The bucket and object name of the GCS blob
109
110  Raises:
111    ValueError: If gcs_blob_path parsing fails.
112  """
113  if not gcs_blob_path.startswith(_GCS_PATH_PREFIX):
114    raise ValueError(
115        f'GCS blob paths must start with gs://, got {gcs_blob_path}')
116  path = gcs_blob_path[len(_GCS_PATH_PREFIX):]
117  parts = path.split('/', 1)
118  if len(parts) < 2:
119    raise ValueError(
120        'GCS blob paths must be in format gs://bucket-name/object-name, '
121        f'got {gcs_blob_path}')
122  return parts[0], parts[1]
123
124if __name__ == '__main__':
125  flags.mark_flags_as_required([
126      'mode', 'kek_uri', 'gcp_credential_path', 'gcp_project_id', 'local_path',
127      'gcs_blob_path'])
128  app.run(main)
129# [END gcs-envelope-aead-example]
130