xref: /aosp_15_r20/external/tink/python/examples/encrypted_keyset/encrypted_keyset.py (revision e7b1675dde1b92d52ec075b0a92829627f2c52a5)
1# Copyright 2021 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS-IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# [START encrypted-keyset-example]
15"""A command-line utility for generating, encrypting and storing keysets."""
16
17from absl import app
18from absl import flags
19from absl import logging
20
21import tink
22from tink import aead
23from tink.integration import gcpkms
24
25
26FLAGS = flags.FLAGS
27
28flags.DEFINE_enum('mode', None, ['generate', 'encrypt', 'decrypt'],
29                  'The operation to perform.')
30flags.DEFINE_string('keyset_path', None,
31                    'Path to the keyset used for encryption.')
32flags.DEFINE_string('kek_uri', None,
33                    'The Cloud KMS URI of the key encryption key.')
34flags.DEFINE_string('gcp_credential_path', None,
35                    'Path to the GCP credentials JSON file.')
36flags.DEFINE_string('input_path', None, 'Path to the input file.')
37flags.DEFINE_string('output_path', None, 'Path to the output file.')
38flags.DEFINE_string('associated_data', None,
39                    'Optional associated data to use with the '
40                    'encryption operation.')
41
42
43def main(argv):
44  del argv  # Unused.
45
46  associated_data = b'' if not FLAGS.associated_data else bytes(
47      FLAGS.associated_data, 'utf-8')
48
49  # Initialise Tink
50  aead.register()
51
52  try:
53    # Read the GCP credentials and setup client
54    client = gcpkms.GcpKmsClient(FLAGS.kek_uri, FLAGS.gcp_credential_path)
55  except tink.TinkError as e:
56    logging.exception('Error creating GCP KMS client: %s', e)
57    return 1
58
59  # Create envelope AEAD primitive using AES256 GCM for encrypting the data
60  try:
61    remote_aead = client.get_aead(FLAGS.kek_uri)
62  except tink.TinkError as e:
63    logging.exception('Error creating primitive: %s', e)
64    return 1
65
66  if FLAGS.mode == 'generate':
67    # [START generate-a-new-keyset]
68    # Generate a new keyset
69    try:
70      key_template = aead.aead_key_templates.AES128_GCM
71      keyset_handle = tink.new_keyset_handle(key_template)
72    except tink.TinkError as e:
73      logging.exception('Error creating primitive: %s', e)
74      return 1
75    # [END generate-a-new-keyset]
76
77    # [START encrypt-a-keyset]
78    # Encrypt the keyset_handle with the remote key-encryption key (KEK)
79    with open(FLAGS.keyset_path, 'wt') as keyset_file:
80      try:
81        keyset_handle.write(tink.JsonKeysetWriter(keyset_file), remote_aead)
82      except tink.TinkError as e:
83        logging.exception('Error writing key: %s', e)
84        return 1
85    return 0
86    # [END encrypt-a-keyset]
87
88  # Use the keyset to encrypt/decrypt data
89
90  # Read the encrypted keyset into a keyset_handle
91  with open(FLAGS.keyset_path, 'rt') as keyset_file:
92    try:
93      text = keyset_file.read()
94      keyset_handle = tink.KeysetHandle.read(
95          tink.JsonKeysetReader(text), remote_aead
96      )
97    except tink.TinkError as e:
98      logging.exception('Error reading key: %s', e)
99      return 1
100
101  # Get the primitive
102  try:
103    cipher = keyset_handle.primitive(aead.Aead)
104  except tink.TinkError as e:
105    logging.exception('Error creating primitive: %s', e)
106    return 1
107
108  with open(FLAGS.input_path, 'rb') as input_file:
109    input_data = input_file.read()
110    if FLAGS.mode == 'decrypt':
111      output_data = cipher.decrypt(input_data, associated_data)
112    elif FLAGS.mode == 'encrypt':
113      output_data = cipher.encrypt(input_data, associated_data)
114    else:
115      logging.error(
116          'Unsupported mode %s. Please choose "encrypt" or "decrypt".',
117          FLAGS.mode,
118      )
119      return 1
120
121    with open(FLAGS.output_path, 'wb') as output_file:
122      output_file.write(output_data)
123
124
125if __name__ == '__main__':
126  flags.mark_flags_as_required([
127      'mode', 'keyset_path', 'kek_uri', 'gcp_credential_path'])
128  app.run(main)
129# [END encrypted-keyset-example]
130