1#!/usr/bin/env python 2# 3# Copyright 2021 Google Inc. All rights reserved. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17""" 18`fsverity_metadata_generator` generates fsverity metadata and signature to a 19container file 20 21This actually is a simple wrapper around the `fsverity` program. A file is 22signed by the program which produces the PKCS#7 signature file, merkle tree file 23, and the fsverity_descriptor file. Then the files are packed into a single 24output file so that the information about the signing stays together. 25 26Currently, the output of this script is used by `fd_server` which is the host- 27side backend of an authfs filesystem. `fd_server` uses this file in case when 28the underlying filesystem (ext4, etc.) on the device doesn't support the 29fsverity feature natively in which case the information is read directly from 30the filesystem using ioctl. 31""" 32 33import argparse 34import os 35import re 36import shutil 37import subprocess 38import sys 39import tempfile 40from struct import * 41 42class TempDirectory(object): 43 def __enter__(self): 44 self.name = tempfile.mkdtemp() 45 return self.name 46 47 def __exit__(self, *unused): 48 shutil.rmtree(self.name) 49 50class FSVerityMetadataGenerator: 51 def __init__(self, fsverity_path): 52 self._fsverity_path = fsverity_path 53 54 # Default values for some properties 55 self.set_hash_alg("sha256") 56 self.set_signature('none') 57 58 def set_key_format(self, key_format): 59 self._key_format = key_format 60 61 def set_key(self, key): 62 self._key = key 63 64 def set_cert(self, cert): 65 self._cert = cert 66 67 def set_hash_alg(self, hash_alg): 68 self._hash_alg = hash_alg 69 70 def set_signature(self, signature): 71 self._signature = signature 72 73 def _raw_signature(pkcs7_sig_file): 74 """ Extracts raw signature from DER formatted PKCS#7 detached signature file 75 76 Do that by parsing the ASN.1 tree to get the location of the signature 77 in the file and then read the portion. 78 """ 79 80 # Note: there seems to be no public python API (even in 3p modules) that 81 # provides direct access to the raw signature at this moment. So, `openssl 82 # asn1parse` commandline tool is used instead. 83 cmd = ['openssl', 'asn1parse'] 84 cmd.extend(['-inform', 'DER']) 85 cmd.extend(['-in', pkcs7_sig_file]) 86 out = subprocess.check_output(cmd, universal_newlines=True) 87 88 # The signature is the last element in the tree 89 last_line = out.splitlines()[-1] 90 m = re.search('(\d+):.*hl=\s*(\d+)\s*l=\s*(\d+)\s*.*OCTET STRING', last_line) 91 if not m: 92 raise RuntimeError("Failed to parse asn1parse output: " + out) 93 offset = int(m.group(1)) 94 header_len = int(m.group(2)) 95 size = int(m.group(3)) 96 with open(pkcs7_sig_file, 'rb') as f: 97 f.seek(offset + header_len) 98 return f.read(size) 99 100 def digest(self, input_file): 101 cmd = [self._fsverity_path, 'digest', input_file] 102 cmd.extend(['--compact']) 103 cmd.extend(['--hash-alg', self._hash_alg]) 104 out = subprocess.check_output(cmd, universal_newlines=True).strip() 105 return bytes(bytearray.fromhex(out)) 106 107 def generate(self, input_file, output_file=None): 108 if self._signature != 'none': 109 if not self._key: 110 raise RuntimeError("key must be specified.") 111 if not self._cert: 112 raise RuntimeError("cert must be specified.") 113 114 if not output_file: 115 output_file = input_file + '.fsv_meta' 116 117 with TempDirectory() as temp_dir: 118 self._do_generate(input_file, output_file, temp_dir) 119 120 def _do_generate(self, input_file, output_file, work_dir): 121 # temporary files 122 desc_file = os.path.join(work_dir, 'desc') 123 merkletree_file = os.path.join(work_dir, 'merkletree') 124 sig_file = os.path.join(work_dir, 'signature') 125 126 # run the fsverity util to create the temporary files 127 cmd = [self._fsverity_path] 128 if self._signature == 'none': 129 cmd.append('digest') 130 cmd.append(input_file) 131 else: 132 cmd.append('sign') 133 cmd.append(input_file) 134 cmd.append(sig_file) 135 136 # If key is DER, convert DER private key to PEM 137 if self._key_format == 'der': 138 pem_key = os.path.join(work_dir, 'key.pem') 139 key_cmd = ['openssl', 'pkcs8'] 140 key_cmd.extend(['-inform', 'DER']) 141 key_cmd.extend(['-in', self._key]) 142 key_cmd.extend(['-nocrypt']) 143 key_cmd.extend(['-out', pem_key]) 144 subprocess.check_call(key_cmd) 145 else: 146 pem_key = self._key 147 148 cmd.extend(['--key', pem_key]) 149 cmd.extend(['--cert', self._cert]) 150 cmd.extend(['--hash-alg', self._hash_alg]) 151 cmd.extend(['--block-size', '4096']) 152 cmd.extend(['--out-merkle-tree', merkletree_file]) 153 cmd.extend(['--out-descriptor', desc_file]) 154 subprocess.check_call(cmd, stdout=open(os.devnull, 'w')) 155 156 with open(output_file, 'wb') as out: 157 # 1. version 158 out.write(pack('<I', 1)) 159 160 # 2. fsverity_descriptor 161 with open(desc_file, 'rb') as f: 162 out.write(f.read()) 163 164 # 3. signature 165 SIG_TYPE_NONE = 0 166 SIG_TYPE_PKCS7 = 1 167 SIG_TYPE_RAW = 2 168 if self._signature == 'raw': 169 out.write(pack('<I', SIG_TYPE_RAW)) 170 sig = self._raw_signature(sig_file) 171 out.write(pack('<I', len(sig))) 172 out.write(sig) 173 elif self._signature == 'pkcs7': 174 with open(sig_file, 'rb') as f: 175 out.write(pack('<I', SIG_TYPE_PKCS7)) 176 sig = f.read() 177 out.write(pack('<I', len(sig))) 178 out.write(sig) 179 else: 180 out.write(pack('<I', SIG_TYPE_NONE)) 181 out.write(pack('<I', 0)) 182 183 # 4. merkle tree 184 with open(merkletree_file, 'rb') as f: 185 # merkle tree is placed at the next nearest page boundary to make 186 # mmapping possible 187 out.seek(next_page(out.tell())) 188 out.write(f.read()) 189 190def next_page(n): 191 """ Returns the next nearest page boundary from `n` """ 192 PAGE_SIZE = 4096 193 return (n + PAGE_SIZE - 1) // PAGE_SIZE * PAGE_SIZE 194 195if __name__ == '__main__': 196 p = argparse.ArgumentParser() 197 p.add_argument( 198 '--output', 199 help='output file. If omitted, print to <INPUT>.fsv_meta', 200 metavar='output', 201 default=None) 202 p.add_argument( 203 'input', 204 help='input file to be signed') 205 p.add_argument( 206 '--key-format', 207 choices=['pem', 'der'], 208 default='der', 209 help='format of the input key. Default is der') 210 p.add_argument( 211 '--key', 212 help='PKCS#8 private key file') 213 p.add_argument( 214 '--cert', 215 help='x509 certificate file in PEM format') 216 p.add_argument( 217 '--hash-alg', 218 help='hash algorithm to use to build the merkle tree', 219 choices=['sha256', 'sha512'], 220 default='sha256') 221 p.add_argument( 222 '--signature', 223 help='format for signature', 224 choices=['none', 'raw', 'pkcs7'], 225 default='none') 226 p.add_argument( 227 '--fsverity-path', 228 help='path to the fsverity program', 229 required=True) 230 args = p.parse_args(sys.argv[1:]) 231 232 generator = FSVerityMetadataGenerator(args.fsverity_path) 233 generator.set_signature(args.signature) 234 if args.signature == 'none': 235 if args.key or args.cert: 236 raise ValueError("When signature is none, key and cert can't be set") 237 else: 238 if not args.key or not args.cert: 239 raise ValueError("To generate signature, key and cert must be set") 240 generator.set_key(args.key) 241 generator.set_cert(args.cert) 242 generator.set_key_format(args.key_format) 243 generator.set_hash_alg(args.hash_alg) 244 generator.generate(args.input, args.output) 245