#!/usr/bin/env python3 # Copyright 2014 The Chromium Authors # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """ This utility takes a JSON input that describes a CRLSet and produces a CRLSet from it. The input is taken on stdin and is a dict with the following keys: - BlockedBySPKI: An array of strings, where each string is a filename containing a PEM certificate, from which an SPKI will be extracted. - BlockedByHash: A dict of string to an array of strings. The dict key is a filename containing a PEM certificate, representing the issuer cert, while the array of strings contain the filenames of PEM format certificates whose serials are blocked. - LimitedSubjects: A dict of string to an array of strings, where the key is a filename containing a PEM format certificate, and the strings are the filenames of PEM format certificates. Certificates that share a Subject with the key will be restricted to the set of SPKIs extracted from the files in the values. - Sequence: An optional integer sequence number to use for the CRLSet. If not present, defaults to 1. For example: { "BlockedBySPKI": ["/tmp/blocked-certificate"], "BlockedByHash": { "/tmp/intermediate-certificate": [1, 2, 3] }, "LimitedSubjects": { "/tmp/limited-certificate": [ "/tmp/limited-certificate", "/tmp/limited-certificate2" ] }, "Sequence": 23 } """ import base64 import collections import hashlib import json import optparse import six import struct import sys def _pem_cert_to_binary(pem_filename): """Decodes the first PEM-encoded certificate in a given file into binary Args: pem_filename: A filename that contains a PEM-encoded certificate. It may contain additional data (keys, textual representation) which will be ignored Returns: A byte array containing the decoded certificate data """ pem_data = "" started = False with open(pem_filename, 'r') as pem_file: for line in pem_file: if not started: if line.startswith('-----BEGIN CERTIFICATE'): started = True else: if line.startswith('-----END CERTIFICATE'): break pem_data += line[:-1].strip() return base64.b64decode(pem_data) def _parse_asn1_element(der_bytes): """Parses a DER-encoded tag/Length/Value into its component parts Args: der_bytes: A DER-encoded ASN.1 data type Returns: A tuple of the ASN.1 tag value, the length of the ASN.1 header that was read, the sequence of bytes for the value, and then any data from der_bytes that was not part of the tag/Length/Value. """ tag = six.indexbytes(der_bytes, 0) length = six.indexbytes(der_bytes, 1) header_length = 2 if length & 0x80: num_length_bytes = length & 0x7f length = 0 for i in range(2, 2 + num_length_bytes): length <<= 8 length += six.indexbytes(der_bytes, i) header_length = 2 + num_length_bytes contents = der_bytes[:header_length + length] rest = der_bytes[header_length + length:] return (tag, header_length, contents, rest) class ASN1Iterator(object): """Iterator that parses and iterates through a ASN.1 DER structure""" def __init__(self, contents): self._tag = 0 self._header_length = 0 self._rest = None self._contents = contents self.step_into() def step_into(self): """Begins processing the inner contents of the next ASN.1 element""" (self._tag, self._header_length, self._contents, self._rest) = ( _parse_asn1_element(self._contents[self._header_length:])) def step_over(self): """Skips/ignores the next ASN.1 element""" (self._tag, self._header_length, self._contents, self._rest) = ( _parse_asn1_element(self._rest)) def tag(self): """Returns the ASN.1 tag of the current element""" return self._tag def contents(self): """Returns the raw data of the current element""" return self._contents def encoded_value(self): """Returns the encoded value of the current element (i.e. without header)""" return self._contents[self._header_length:] def _der_cert_to_spki(der_bytes): """Returns the subjectPublicKeyInfo of a DER-encoded certificate Args: der_bytes: A DER-encoded certificate (RFC 5280) Returns: A byte array containing the subjectPublicKeyInfo """ iterator = ASN1Iterator(der_bytes) iterator.step_into() # enter certificate structure iterator.step_into() # enter TBSCertificate iterator.step_over() # over version iterator.step_over() # over serial iterator.step_over() # over signature algorithm iterator.step_over() # over issuer name iterator.step_over() # over validity iterator.step_over() # over subject name return iterator.contents() def der_cert_to_spki_hash(der_cert): """Gets the SHA-256 hash of the subjectPublicKeyInfo of a DER encoded cert Args: der_cert: A string containing the DER-encoded certificate Returns: The SHA-256 hash of the certificate, as a byte sequence """ return hashlib.sha256(_der_cert_to_spki(der_cert)).digest() def pem_cert_file_to_spki_hash(pem_filename): """Gets the SHA-256 hash of the subjectPublicKeyInfo of a cert in a file Args: pem_filename: A file containing a PEM-encoded certificate. Returns: The SHA-256 hash of the first certificate in the file, as a byte sequence """ return der_cert_to_spki_hash(_pem_cert_to_binary(pem_filename)) def der_cert_to_subject_hash(der_bytes): """Returns SHA256(subject) of a DER-encoded certificate Args: der_bytes: A DER-encoded certificate (RFC 5280) Returns: The SHA-256 hash of the certificate's subject. """ iterator = ASN1Iterator(der_bytes) iterator.step_into() # enter certificate structure iterator.step_into() # enter TBSCertificate iterator.step_over() # over version iterator.step_over() # over serial iterator.step_over() # over signature algorithm iterator.step_over() # over issuer name iterator.step_over() # over validity return hashlib.sha256(iterator.contents()).digest() def pem_cert_file_to_subject_hash(pem_filename): """Gets the SHA-256 hash of the subject of a cert in a file Args: pem_filename: A file containing a PEM-encoded certificate. Returns: The SHA-256 hash of the subject of the first certificate in the file, as a byte sequence """ return der_cert_to_subject_hash(_pem_cert_to_binary(pem_filename)) def der_cert_to_serial(der_bytes): """Gets the serial of a DER-encoded certificate, omitting leading 0x00 Args: der_bytes: A DER-encoded certificates (RFC 5280) Returns: The encoded serial number value (omitting tag and length), and omitting any leading 0x00 used to indicate it is a positive INTEGER. """ iterator = ASN1Iterator(der_bytes) iterator.step_into() # enter certificate structure iterator.step_into() # enter TBSCertificate iterator.step_over() # over version raw_serial = iterator.encoded_value() if six.indexbytes(raw_serial, 0) == 0x00 and len(raw_serial) > 1: raw_serial = raw_serial[1:] return raw_serial def pem_cert_file_to_serial(pem_filename): """Gets the DER-encoded serial of a cert in a file, omitting leading 0x00 Args: pem_filename: A file containing a PEM-encoded certificate. Returns: The DER-encoded serial as a byte sequence """ return der_cert_to_serial(_pem_cert_to_binary(pem_filename)) def main(): parser = optparse.OptionParser(description=sys.modules[__name__].__doc__) parser.add_option('-o', '--output', help='Specifies the output file. The default is stdout.') options, _ = parser.parse_args() outfile = sys.stdout if options.output and options.output != '-': outfile = open(options.output, 'wb') config = json.load(sys.stdin) blocked_spkis = [ base64.b64encode(pem_cert_file_to_spki_hash(pem_file)).decode('ascii') for pem_file in config.get('BlockedBySPKI', []) ] parents = { pem_cert_file_to_spki_hash(pem_file): [ pem_cert_file_to_serial(issued_cert_file) for issued_cert_file in issued_certs ] for pem_file, issued_certs in config.get('BlockedByHash', {}).items() } limited_subjects = { base64.b64encode(pem_cert_file_to_subject_hash(pem_file)).decode('ascii'): [ base64.b64encode(pem_cert_file_to_spki_hash(filename)).decode('ascii') for filename in allowed_pems ] for pem_file, allowed_pems in config.get('LimitedSubjects', {}).items() } known_interception_spkis = [ base64.b64encode(pem_cert_file_to_spki_hash(pem_file)).decode('ascii') for pem_file in config.get('KnownInterceptionSPKIs', []) ] blocked_interception_spkis = [ base64.b64encode(pem_cert_file_to_spki_hash(pem_file)).decode('ascii') for pem_file in config.get('BlockedInterceptionSPKIs', []) ] header_json = { 'Version': 0, 'ContentType': 'CRLSet', 'Sequence': int(config.get("Sequence", 1)), 'NumParents': len(parents), 'BlockedSPKIs': blocked_spkis, 'LimitedSubjects': limited_subjects, 'KnownInterceptionSPKIs': known_interception_spkis, 'BlockedInterceptionSPKIs': blocked_interception_spkis } header = json.dumps(header_json) outfile.write(struct.pack('