1#!/usr/bin/env python
2#
3# Copyright 2021 Google Inc. All rights reserved.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""
18`fsverity_metadata_generator` generates fsverity metadata and signature to a
19container file
20
21This actually is a simple wrapper around the `fsverity` program. A file is
22signed by the program which produces the PKCS#7 signature file, merkle tree file
23, and the fsverity_descriptor file. Then the files are packed into a single
24output file so that the information about the signing stays together.
25
26Currently, the output of this script is used by `fd_server` which is the host-
27side backend of an authfs filesystem. `fd_server` uses this file in case when
28the underlying filesystem (ext4, etc.) on the device doesn't support the
29fsverity feature natively in which case the information is read directly from
30the filesystem using ioctl.
31"""
32
33import argparse
34import os
35import re
36import shutil
37import subprocess
38import sys
39import tempfile
40from struct import *
41
42class TempDirectory(object):
43  def __enter__(self):
44    self.name = tempfile.mkdtemp()
45    return self.name
46
47  def __exit__(self, *unused):
48    shutil.rmtree(self.name)
49
50class FSVerityMetadataGenerator:
51  def __init__(self, fsverity_path):
52    self._fsverity_path = fsverity_path
53
54    # Default values for some properties
55    self.set_hash_alg("sha256")
56    self.set_signature('none')
57
58  def set_key_format(self, key_format):
59    self._key_format = key_format
60
61  def set_key(self, key):
62    self._key = key
63
64  def set_cert(self, cert):
65    self._cert = cert
66
67  def set_hash_alg(self, hash_alg):
68    self._hash_alg = hash_alg
69
70  def set_signature(self, signature):
71    self._signature = signature
72
73  def _raw_signature(pkcs7_sig_file):
74    """ Extracts raw signature from DER formatted PKCS#7 detached signature file
75
76    Do that by parsing the ASN.1 tree to get the location of the signature
77    in the file and then read the portion.
78    """
79
80    # Note: there seems to be no public python API (even in 3p modules) that
81    # provides direct access to the raw signature at this moment. So, `openssl
82    # asn1parse` commandline tool is used instead.
83    cmd = ['openssl', 'asn1parse']
84    cmd.extend(['-inform', 'DER'])
85    cmd.extend(['-in', pkcs7_sig_file])
86    out = subprocess.check_output(cmd, universal_newlines=True)
87
88    # The signature is the last element in the tree
89    last_line = out.splitlines()[-1]
90    m = re.search('(\d+):.*hl=\s*(\d+)\s*l=\s*(\d+)\s*.*OCTET STRING', last_line)
91    if not m:
92      raise RuntimeError("Failed to parse asn1parse output: " + out)
93    offset = int(m.group(1))
94    header_len = int(m.group(2))
95    size = int(m.group(3))
96    with open(pkcs7_sig_file, 'rb') as f:
97      f.seek(offset + header_len)
98      return f.read(size)
99
100  def digest(self, input_file):
101    cmd = [self._fsverity_path, 'digest', input_file]
102    cmd.extend(['--compact'])
103    cmd.extend(['--hash-alg', self._hash_alg])
104    out = subprocess.check_output(cmd, universal_newlines=True).strip()
105    return bytes(bytearray.fromhex(out))
106
107  def generate(self, input_file, output_file=None):
108    if self._signature != 'none':
109      if not self._key:
110        raise RuntimeError("key must be specified.")
111      if not self._cert:
112        raise RuntimeError("cert must be specified.")
113
114    if not output_file:
115      output_file = input_file + '.fsv_meta'
116
117    with TempDirectory() as temp_dir:
118      self._do_generate(input_file, output_file, temp_dir)
119
120  def _do_generate(self, input_file, output_file, work_dir):
121    # temporary files
122    desc_file = os.path.join(work_dir, 'desc')
123    merkletree_file = os.path.join(work_dir, 'merkletree')
124    sig_file = os.path.join(work_dir, 'signature')
125
126    # run the fsverity util to create the temporary files
127    cmd = [self._fsverity_path]
128    if self._signature == 'none':
129      cmd.append('digest')
130      cmd.append(input_file)
131    else:
132      cmd.append('sign')
133      cmd.append(input_file)
134      cmd.append(sig_file)
135
136      # If key is DER, convert DER private key to PEM
137      if self._key_format == 'der':
138        pem_key = os.path.join(work_dir, 'key.pem')
139        key_cmd = ['openssl', 'pkcs8']
140        key_cmd.extend(['-inform', 'DER'])
141        key_cmd.extend(['-in', self._key])
142        key_cmd.extend(['-nocrypt'])
143        key_cmd.extend(['-out', pem_key])
144        subprocess.check_call(key_cmd)
145      else:
146        pem_key = self._key
147
148      cmd.extend(['--key', pem_key])
149      cmd.extend(['--cert', self._cert])
150    cmd.extend(['--hash-alg', self._hash_alg])
151    cmd.extend(['--block-size', '4096'])
152    cmd.extend(['--out-merkle-tree', merkletree_file])
153    cmd.extend(['--out-descriptor', desc_file])
154    subprocess.check_call(cmd, stdout=open(os.devnull, 'w'))
155
156    with open(output_file, 'wb') as out:
157      # 1. version
158      out.write(pack('<I', 1))
159
160      # 2. fsverity_descriptor
161      with open(desc_file, 'rb') as f:
162        out.write(f.read())
163
164      # 3. signature
165      SIG_TYPE_NONE = 0
166      SIG_TYPE_PKCS7 = 1
167      SIG_TYPE_RAW = 2
168      if self._signature == 'raw':
169        out.write(pack('<I', SIG_TYPE_RAW))
170        sig = self._raw_signature(sig_file)
171        out.write(pack('<I', len(sig)))
172        out.write(sig)
173      elif self._signature == 'pkcs7':
174        with open(sig_file, 'rb') as f:
175          out.write(pack('<I', SIG_TYPE_PKCS7))
176          sig = f.read()
177          out.write(pack('<I', len(sig)))
178          out.write(sig)
179      else:
180        out.write(pack('<I', SIG_TYPE_NONE))
181        out.write(pack('<I', 0))
182
183      # 4. merkle tree
184      with open(merkletree_file, 'rb') as f:
185        # merkle tree is placed at the next nearest page boundary to make
186        # mmapping possible
187        out.seek(next_page(out.tell()))
188        out.write(f.read())
189
190def next_page(n):
191  """ Returns the next nearest page boundary from `n` """
192  PAGE_SIZE = 4096
193  return (n + PAGE_SIZE - 1) // PAGE_SIZE * PAGE_SIZE
194
195if __name__ == '__main__':
196  p = argparse.ArgumentParser()
197  p.add_argument(
198      '--output',
199      help='output file. If omitted, print to <INPUT>.fsv_meta',
200      metavar='output',
201      default=None)
202  p.add_argument(
203      'input',
204      help='input file to be signed')
205  p.add_argument(
206      '--key-format',
207      choices=['pem', 'der'],
208      default='der',
209      help='format of the input key. Default is der')
210  p.add_argument(
211      '--key',
212      help='PKCS#8 private key file')
213  p.add_argument(
214      '--cert',
215      help='x509 certificate file in PEM format')
216  p.add_argument(
217      '--hash-alg',
218      help='hash algorithm to use to build the merkle tree',
219      choices=['sha256', 'sha512'],
220      default='sha256')
221  p.add_argument(
222      '--signature',
223      help='format for signature',
224      choices=['none', 'raw', 'pkcs7'],
225      default='none')
226  p.add_argument(
227      '--fsverity-path',
228      help='path to the fsverity program',
229      required=True)
230  args = p.parse_args(sys.argv[1:])
231
232  generator = FSVerityMetadataGenerator(args.fsverity_path)
233  generator.set_signature(args.signature)
234  if args.signature == 'none':
235    if args.key or args.cert:
236      raise ValueError("When signature is none, key and cert can't be set")
237  else:
238    if not args.key or not args.cert:
239      raise ValueError("To generate signature, key and cert must be set")
240    generator.set_key(args.key)
241    generator.set_cert(args.cert)
242  generator.set_key_format(args.key_format)
243  generator.set_hash_alg(args.hash_alg)
244  generator.generate(args.input, args.output)
245