1#!/usr/bin/env python3 2# Copyright 2017 The Chromium Authors 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""Pretty-prints certificates as an openssl-annotated PEM file.""" 7 8import argparse 9import base64 10import errno 11import hashlib 12import os 13import re 14import subprocess 15import sys 16import traceback 17 18 19def read_file_to_string(path): 20 with open(path, 'rb') as f: 21 return f.read() 22 23 24def read_certificates_data_from_server(hostname): 25 """Uses openssl to fetch the PEM-encoded certificates for an SSL server.""" 26 p = subprocess.Popen(["openssl", "s_client", "-showcerts", 27 "-servername", hostname, 28 "-connect", hostname + ":443"], 29 stdin=subprocess.PIPE, 30 stdout=subprocess.PIPE, 31 stderr=subprocess.PIPE) 32 result = p.communicate() 33 34 if p.returncode == 0: 35 return result[0] 36 37 sys.stderr.write("Failed getting certificates for %s:\n%s\n" % ( 38 hostname, result[1])) 39 return b"" 40 41 42def read_sources_from_commandline(sources): 43 """Processes the command lines and returns an array of all the sources 44 bytes.""" 45 sources_bytes = [] 46 47 if not sources: 48 # If no command-line arguments were given to the program, read input from 49 # stdin. 50 sources_bytes.append(sys.stdin.buffer.read()) 51 else: 52 for arg in sources: 53 # If the argument identifies a file path, read it 54 if os.path.exists(arg): 55 sources_bytes.append(read_file_to_string(arg)) 56 else: 57 # Otherwise treat it as a web server address. 58 sources_bytes.append(read_certificates_data_from_server(arg)) 59 60 return sources_bytes 61 62 63def strip_indentation_whitespace(text): 64 """Strips leading whitespace from each line.""" 65 stripped_lines = [line.lstrip() for line in text.split(b"\n")] 66 return b"\n".join(stripped_lines) 67 68 69def strip_all_whitespace(text): 70 pattern = re.compile(rb'\s+') 71 return re.sub(pattern, b'', text).replace(rb'\n', b'\n') 72 73 74def extract_certificates_from_pem(pem_bytes): 75 certificates_der = [] 76 77 regex = re.compile( 78 rb'-----BEGIN (CERTIFICATE|PKCS7)-----(.*?)(-----END \1-----|$)', 79 re.DOTALL) 80 81 for match in regex.finditer(pem_bytes): 82 if not match.group(3): 83 sys.stderr.write( 84 "\nUnterminated %s block, input is corrupt or truncated\n" % 85 match.group(1)) 86 continue 87 der = base64.b64decode(strip_all_whitespace(match.group(2))) 88 if match.group(1) == b'CERTIFICATE': 89 certificates_der.append(der) 90 else: 91 certificates_der.extend(extract_certificates_from_der_pkcs7(der)) 92 93 return certificates_der 94 95 96def extract_certificates_from_der_pkcs7(der_bytes): 97 pkcs7_certs_pem = process_data_with_command( 98 ['openssl','pkcs7','-print_certs', '-inform', 'DER'], der_bytes) 99 # The output will be one or more PEM encoded certificates. 100 # (Or CRLS, but those will be ignored.) 101 if pkcs7_certs_pem: 102 return extract_certificates_from_pem(pkcs7_certs_pem) 103 return [] 104 105 106def extract_certificates_from_der_ascii(input_text): 107 certificates_der = [] 108 109 # Look for beginning and end of Certificate SEQUENCE. The indentation is 110 # significant. (The SEQUENCE must be non-indented, and the rest of the DER 111 # ASCII must be indented until the closing } which again is non-indented.) 112 # The output of der2ascii meets this, but it is not a requirement of the DER 113 # ASCII language. 114 # TODO(mattm): consider alternate approach of doing ascii2der on entire 115 # input, and handling the multiple concatenated DER certificates. 116 regex = re.compile(r'^(SEQUENCE {.*?^})', re.DOTALL | re.MULTILINE) 117 118 for match in regex.finditer(input_text): 119 der_ascii_bytes = match.group(1) 120 der_bytes = process_data_with_command(["ascii2der"], der_ascii_bytes) 121 if der_bytes: 122 certificates_der.append(der_bytes) 123 124 return certificates_der 125 126 127def decode_netlog_hexdump(netlog_text): 128 lines = netlog_text.splitlines() 129 130 # Skip the text preceeding the actual hexdump. 131 while lines and 'bytes =' not in lines[0]: 132 del lines[0] 133 if not lines: 134 return None 135 del lines[0] 136 137 bytes = [] 138 hex_re = re.compile('\s*([0-9A-Fa-f ]{48})') 139 for line in lines: 140 m = hex_re.search(line) 141 if not m: 142 break 143 hex_string = m.group(1) 144 bytes.extend(chr(int(part, 16)) for part in hex_string.split()) 145 146 return ''.join(bytes) 147 148 149class ByteReader: 150 """Iteratively consume data from a byte string. 151 152 Automatically tracks and advances current position in the string as data is 153 consumed, and will throw an exception if attempting to read past the end of 154 the string. 155 """ 156 def __init__(self, data): 157 self.data = data 158 self.pos = 0 159 160 def consume_byte(self): 161 i = ord(self.data[self.pos]) 162 self.pos += 1 163 return i 164 165 def consume_int16(self): 166 return ((self.consume_byte() << 8) + self.consume_byte()) 167 168 def consume_int24(self): 169 return ((self.consume_byte() << 16) + (self.consume_byte() << 8) + 170 self.consume_byte()) 171 172 def consume_bytes(self, n): 173 b = self.data[self.pos:self.pos+n] 174 if len(b) != n: 175 raise IndexError('requested:%d bytes actual:%d bytes'%(n, len(b))) 176 self.pos += n 177 return b 178 179 def remaining_byte_count(self): 180 return len(self.data) - self.pos 181 182 183def decode_tls10_certificate_message(reader): 184 message_length = reader.consume_int24() 185 if reader.remaining_byte_count() != message_length: 186 raise RuntimeError( 187 'message_length(%d) != remaining_byte_count(%d)\n' % ( 188 message_length, reader.remaining_byte_count())) 189 190 certificate_list_length = reader.consume_int24() 191 if reader.remaining_byte_count() != certificate_list_length: 192 raise RuntimeError( 193 'certificate_list_length(%d) != remaining_byte_count(%d)\n' % ( 194 certificate_list_length, reader.remaining_byte_count())) 195 196 certificates_der = [] 197 while reader.remaining_byte_count(): 198 cert_len = reader.consume_int24() 199 certificates_der.append(reader.consume_bytes(cert_len)) 200 201 return certificates_der 202 203 204def decode_tls13_certificate_message(reader): 205 message_length = reader.consume_int24() 206 if reader.remaining_byte_count() != message_length: 207 raise RuntimeError( 208 'message_length(%d) != remaining_byte_count(%d)\n' % ( 209 message_length, reader.remaining_byte_count())) 210 211 # Ignore certificate_request_context. 212 certificate_request_context_length = reader.consume_byte() 213 reader.consume_bytes(certificate_request_context_length) 214 215 certificate_list_length = reader.consume_int24() 216 if reader.remaining_byte_count() != certificate_list_length: 217 raise RuntimeError( 218 'certificate_list_length(%d) != remaining_byte_count(%d)\n' % ( 219 certificate_list_length, reader.remaining_byte_count())) 220 221 certificates_der = [] 222 while reader.remaining_byte_count(): 223 # Assume certificate_type is X.509. 224 cert_len = reader.consume_int24() 225 certificates_der.append(reader.consume_bytes(cert_len)) 226 # Ignore extensions. 227 extension_len = reader.consume_int16() 228 reader.consume_bytes(extension_len) 229 230 return certificates_der 231 232 233def decode_tls_certificate_message(certificate_message): 234 reader = ByteReader(certificate_message) 235 if reader.consume_byte() != 11: 236 sys.stderr.write('HandshakeType != 11. Not a Certificate Message.\n') 237 return [] 238 239 # The TLS certificate message encoding changed in TLS 1.3. Rather than 240 # require pasting in and parsing the whole handshake to discover the TLS 241 # version, just try parsing the message with both the old and new encodings. 242 243 # First try the old style certificate message: 244 try: 245 return decode_tls10_certificate_message(reader) 246 except (IndexError, RuntimeError): 247 tls10_traceback = traceback.format_exc() 248 249 # Restart the ByteReader and consume the HandshakeType byte again. 250 reader = ByteReader(certificate_message) 251 reader.consume_byte() 252 # Try the new style certificate message: 253 try: 254 return decode_tls13_certificate_message(reader) 255 except (IndexError, RuntimeError): 256 tls13_traceback = traceback.format_exc() 257 258 # Neither attempt succeeded, just dump some error info: 259 sys.stderr.write("Couldn't parse TLS certificate message\n") 260 sys.stderr.write("TLS1.0 parse attempt:\n%s\n" % tls10_traceback) 261 sys.stderr.write("TLS1.3 parse attempt:\n%s\n" % tls13_traceback) 262 sys.stderr.write("\n") 263 264 return [] 265 266 267def extract_tls_certificate_message(netlog_text): 268 raw_certificate_message = decode_netlog_hexdump(netlog_text) 269 if not raw_certificate_message: 270 return [] 271 return decode_tls_certificate_message(raw_certificate_message) 272 273 274def extract_certificates(source_bytes): 275 if b"BEGIN CERTIFICATE" in source_bytes or b"BEGIN PKCS7" in source_bytes: 276 return extract_certificates_from_pem(source_bytes) 277 278 if b"SEQUENCE {" in source_bytes: 279 return extract_certificates_from_der_ascii(source_bytes) 280 281 if b"SSL_HANDSHAKE_MESSAGE_RECEIVED" in source_bytes: 282 return extract_tls_certificate_message(source_bytes) 283 284 # DER encoding of PKCS #7 signedData OID (1.2.840.113549.1.7.2) 285 if b"\x06\x09\x2a\x86\x48\x86\xf7\x0d\x01\x07\x02" in source_bytes: 286 return extract_certificates_from_der_pkcs7(source_bytes) 287 288 # Otherwise assume it is the DER for a single certificate 289 return [source_bytes] 290 291 292def process_data_with_command(command, data): 293 try: 294 p = subprocess.Popen(command, 295 stdin=subprocess.PIPE, 296 stdout=subprocess.PIPE, 297 stderr=subprocess.PIPE) 298 except OSError as e: 299 if e.errno == errno.ENOENT: 300 sys.stderr.write("Failed to execute %s\n" % command[0]) 301 return b"" 302 raise 303 304 result = p.communicate(data) 305 306 if p.returncode == 0: 307 return result[0] 308 309 # Otherwise failed. 310 sys.stderr.write("Failed: %s: %s\n" % (" ".join(command), result[1])) 311 return b"" 312 313 314def openssl_text_pretty_printer(certificate_der, unused_certificate_number): 315 return process_data_with_command(["openssl", "x509", "-text", "-inform", 316 "DER", "-noout"], certificate_der) 317 318 319def pem_pretty_printer(certificate_der, unused_certificate_number): 320 return process_data_with_command(["openssl", "x509", "-inform", "DER", 321 "-outform", "PEM"], certificate_der) 322 323 324def der2ascii_pretty_printer(certificate_der, unused_certificate_number): 325 return process_data_with_command(["der2ascii"], certificate_der) 326 327 328def header_pretty_printer(certificate_der, certificate_number): 329 cert_hash = hashlib.sha256(certificate_der).hexdigest() 330 s = """=========================================== 331Certificate%d: %s 332===========================================""" % (certificate_number, cert_hash) 333 return s.encode("ascii") 334 335 336# This is actually just used as a magic value, since pretty_print_certificates 337# special-cases der output. 338def der_printer(): 339 raise RuntimeError 340 341 342def pretty_print_certificates(certificates_der, pretty_printers): 343 # Need to special-case DER output to avoid adding any newlines, and to 344 # only allow a single certificate to be output. 345 if pretty_printers == [der_printer]: 346 if len(certificates_der) > 1: 347 sys.stderr.write("DER output only supports a single certificate, " 348 "ignoring %d remaining certs\n" % ( 349 len(certificates_der) - 1)) 350 return certificates_der[0] 351 352 result = b"" 353 for i in range(len(certificates_der)): 354 certificate_der = certificates_der[i] 355 pretty = [] 356 for pretty_printer in pretty_printers: 357 pretty_printed = pretty_printer(certificate_der, i) 358 if pretty_printed: 359 pretty.append(pretty_printed) 360 result += b"\n".join(pretty) + b"\n" 361 return result 362 363 364def parse_outputs(outputs): 365 pretty_printers = [] 366 output_map = {"der2ascii": der2ascii_pretty_printer, 367 "openssl_text": openssl_text_pretty_printer, 368 "pem": pem_pretty_printer, 369 "header": header_pretty_printer, 370 "der": der_printer} 371 for output_name in outputs.split(','): 372 if output_name not in output_map: 373 sys.stderr.write("Invalid output type: %s\n" % output_name) 374 return [] 375 pretty_printers.append(output_map[output_name]) 376 if der_printer in pretty_printers and len(pretty_printers) > 1: 377 sys.stderr.write("Output type der must be used alone.\n") 378 return [] 379 return pretty_printers 380 381 382def main(): 383 parser = argparse.ArgumentParser( 384 description=__doc__, formatter_class=argparse.RawTextHelpFormatter) 385 386 parser.add_argument('sources', metavar='SOURCE', nargs='*', 387 help='''Each SOURCE can be one of: 388 (1) A server name such as www.google.com. 389 (2) A PEM [*] file containing one or more CERTIFICATE or PKCS7 blocks 390 (3) A file containing one or more DER ASCII certificates 391 (4) A text NetLog dump of a TLS certificate message 392 (must include the SSL_HANDSHAKE_MESSAGE_RECEIVED line) 393 (5) A binary file containing DER-encoded PKCS #7 signedData 394 (6) A binary file containing DER-encoded certificate 395 396When multiple SOURCEs are listed, all certificates in them 397are concatenated. If no SOURCE is given then data will be 398read from stdin. 399 400[*] Parsing of PEM files is relaxed - leading indentation 401whitespace will be stripped (needed for copy-pasting data 402from NetLogs).''') 403 404 parser.add_argument('--output', 405 dest='outputs', 406 action='store', 407 default="header,openssl_text,pem", 408 help='output formats to use. Default: %(default)s') 409 410 args = parser.parse_args() 411 412 sources_bytes = read_sources_from_commandline(args.sources) 413 414 pretty_printers = parse_outputs(args.outputs) 415 if not pretty_printers: 416 sys.stderr.write('No pretty printers selected.\n') 417 sys.exit(1) 418 419 certificates_der = [] 420 for source_bytes in sources_bytes: 421 certificates_der.extend(extract_certificates(source_bytes)) 422 423 sys.stdout.buffer.write( 424 pretty_print_certificates(certificates_der, pretty_printers)) 425 426 427if __name__ == "__main__": 428 main() 429