xref: /aosp_15_r20/external/cronet/net/tools/print_certificates.py (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1#!/usr/bin/env python3
2# Copyright 2017 The Chromium Authors
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Pretty-prints certificates as an openssl-annotated PEM file."""
7
8import argparse
9import base64
10import errno
11import hashlib
12import os
13import re
14import subprocess
15import sys
16import traceback
17
18
19def read_file_to_string(path):
20  with open(path, 'rb') as f:
21    return f.read()
22
23
24def read_certificates_data_from_server(hostname):
25  """Uses openssl to fetch the PEM-encoded certificates for an SSL server."""
26  p = subprocess.Popen(["openssl", "s_client", "-showcerts",
27                        "-servername", hostname,
28                        "-connect", hostname + ":443"],
29                        stdin=subprocess.PIPE,
30                        stdout=subprocess.PIPE,
31                        stderr=subprocess.PIPE)
32  result = p.communicate()
33
34  if p.returncode == 0:
35    return result[0]
36
37  sys.stderr.write("Failed getting certificates for %s:\n%s\n" % (
38      hostname, result[1]))
39  return b""
40
41
42def read_sources_from_commandline(sources):
43  """Processes the command lines and returns an array of all the sources
44  bytes."""
45  sources_bytes = []
46
47  if not sources:
48    # If no command-line arguments were given to the program, read input from
49    # stdin.
50    sources_bytes.append(sys.stdin.buffer.read())
51  else:
52    for arg in sources:
53      # If the argument identifies a file path, read it
54      if os.path.exists(arg):
55        sources_bytes.append(read_file_to_string(arg))
56      else:
57        # Otherwise treat it as a web server address.
58        sources_bytes.append(read_certificates_data_from_server(arg))
59
60  return sources_bytes
61
62
63def strip_indentation_whitespace(text):
64  """Strips leading whitespace from each line."""
65  stripped_lines = [line.lstrip() for line in text.split(b"\n")]
66  return b"\n".join(stripped_lines)
67
68
69def strip_all_whitespace(text):
70  pattern = re.compile(rb'\s+')
71  return re.sub(pattern, b'', text).replace(rb'\n', b'\n')
72
73
74def extract_certificates_from_pem(pem_bytes):
75  certificates_der = []
76
77  regex = re.compile(
78      rb'-----BEGIN (CERTIFICATE|PKCS7)-----(.*?)(-----END \1-----|$)',
79      re.DOTALL)
80
81  for match in regex.finditer(pem_bytes):
82    if not match.group(3):
83      sys.stderr.write(
84          "\nUnterminated %s block, input is corrupt or truncated\n" %
85          match.group(1))
86      continue
87    der = base64.b64decode(strip_all_whitespace(match.group(2)))
88    if match.group(1) == b'CERTIFICATE':
89      certificates_der.append(der)
90    else:
91      certificates_der.extend(extract_certificates_from_der_pkcs7(der))
92
93  return certificates_der
94
95
96def extract_certificates_from_der_pkcs7(der_bytes):
97  pkcs7_certs_pem = process_data_with_command(
98      ['openssl','pkcs7','-print_certs', '-inform', 'DER'], der_bytes)
99  # The output will be one or more PEM encoded certificates.
100  # (Or CRLS, but those will be ignored.)
101  if pkcs7_certs_pem:
102    return extract_certificates_from_pem(pkcs7_certs_pem)
103  return []
104
105
106def extract_certificates_from_der_ascii(input_text):
107  certificates_der = []
108
109  # Look for beginning and end of Certificate SEQUENCE. The indentation is
110  # significant. (The SEQUENCE must be non-indented, and the rest of the DER
111  # ASCII must be indented until the closing } which again is non-indented.)
112  # The output of der2ascii meets this, but it is not a requirement of the DER
113  # ASCII language.
114  # TODO(mattm): consider alternate approach of doing ascii2der on entire
115  # input, and handling the multiple concatenated DER certificates.
116  regex = re.compile(r'^(SEQUENCE {.*?^})', re.DOTALL | re.MULTILINE)
117
118  for match in regex.finditer(input_text):
119    der_ascii_bytes = match.group(1)
120    der_bytes = process_data_with_command(["ascii2der"], der_ascii_bytes)
121    if der_bytes:
122      certificates_der.append(der_bytes)
123
124  return certificates_der
125
126
127def decode_netlog_hexdump(netlog_text):
128  lines = netlog_text.splitlines()
129
130  # Skip the text preceeding the actual hexdump.
131  while lines and 'bytes =' not in lines[0]:
132    del lines[0]
133  if not lines:
134    return None
135  del lines[0]
136
137  bytes = []
138  hex_re = re.compile('\s*([0-9A-Fa-f ]{48})')
139  for line in lines:
140    m = hex_re.search(line)
141    if not m:
142      break
143    hex_string = m.group(1)
144    bytes.extend(chr(int(part, 16)) for part in hex_string.split())
145
146  return ''.join(bytes)
147
148
149class ByteReader:
150  """Iteratively consume data from a byte string.
151
152  Automatically tracks and advances current position in the string as data is
153  consumed, and will throw an exception if attempting to read past the end of
154  the string.
155  """
156  def __init__(self, data):
157    self.data = data
158    self.pos = 0
159
160  def consume_byte(self):
161    i = ord(self.data[self.pos])
162    self.pos += 1
163    return i
164
165  def consume_int16(self):
166    return ((self.consume_byte() << 8) + self.consume_byte())
167
168  def consume_int24(self):
169    return ((self.consume_byte() << 16) + (self.consume_byte() << 8) +
170            self.consume_byte())
171
172  def consume_bytes(self, n):
173    b = self.data[self.pos:self.pos+n]
174    if len(b) != n:
175      raise IndexError('requested:%d bytes  actual:%d bytes'%(n, len(b)))
176    self.pos += n
177    return b
178
179  def remaining_byte_count(self):
180    return len(self.data) - self.pos
181
182
183def decode_tls10_certificate_message(reader):
184  message_length = reader.consume_int24()
185  if reader.remaining_byte_count() != message_length:
186    raise RuntimeError(
187        'message_length(%d) != remaining_byte_count(%d)\n' % (
188            message_length, reader.remaining_byte_count()))
189
190  certificate_list_length = reader.consume_int24()
191  if reader.remaining_byte_count() != certificate_list_length:
192    raise RuntimeError(
193        'certificate_list_length(%d) != remaining_byte_count(%d)\n' % (
194            certificate_list_length, reader.remaining_byte_count()))
195
196  certificates_der = []
197  while reader.remaining_byte_count():
198    cert_len = reader.consume_int24()
199    certificates_der.append(reader.consume_bytes(cert_len))
200
201  return certificates_der
202
203
204def decode_tls13_certificate_message(reader):
205  message_length = reader.consume_int24()
206  if reader.remaining_byte_count() != message_length:
207    raise RuntimeError(
208        'message_length(%d) != remaining_byte_count(%d)\n' % (
209            message_length, reader.remaining_byte_count()))
210
211  # Ignore certificate_request_context.
212  certificate_request_context_length = reader.consume_byte()
213  reader.consume_bytes(certificate_request_context_length)
214
215  certificate_list_length = reader.consume_int24()
216  if reader.remaining_byte_count() != certificate_list_length:
217    raise RuntimeError(
218        'certificate_list_length(%d) != remaining_byte_count(%d)\n' % (
219            certificate_list_length, reader.remaining_byte_count()))
220
221  certificates_der = []
222  while reader.remaining_byte_count():
223    # Assume certificate_type is X.509.
224    cert_len = reader.consume_int24()
225    certificates_der.append(reader.consume_bytes(cert_len))
226    # Ignore extensions.
227    extension_len = reader.consume_int16()
228    reader.consume_bytes(extension_len)
229
230  return certificates_der
231
232
233def decode_tls_certificate_message(certificate_message):
234  reader = ByteReader(certificate_message)
235  if reader.consume_byte() != 11:
236    sys.stderr.write('HandshakeType != 11. Not a Certificate Message.\n')
237    return []
238
239  # The TLS certificate message encoding changed in TLS 1.3. Rather than
240  # require pasting in and parsing the whole handshake to discover the TLS
241  # version, just try parsing the message with both the old and new encodings.
242
243  # First try the old style certificate message:
244  try:
245    return decode_tls10_certificate_message(reader)
246  except (IndexError, RuntimeError):
247    tls10_traceback = traceback.format_exc()
248
249  # Restart the ByteReader and consume the HandshakeType byte again.
250  reader = ByteReader(certificate_message)
251  reader.consume_byte()
252  # Try the new style certificate message:
253  try:
254    return decode_tls13_certificate_message(reader)
255  except (IndexError, RuntimeError):
256    tls13_traceback = traceback.format_exc()
257
258  # Neither attempt succeeded, just dump some error info:
259  sys.stderr.write("Couldn't parse TLS certificate message\n")
260  sys.stderr.write("TLS1.0 parse attempt:\n%s\n" % tls10_traceback)
261  sys.stderr.write("TLS1.3 parse attempt:\n%s\n" % tls13_traceback)
262  sys.stderr.write("\n")
263
264  return []
265
266
267def extract_tls_certificate_message(netlog_text):
268  raw_certificate_message = decode_netlog_hexdump(netlog_text)
269  if not raw_certificate_message:
270    return []
271  return decode_tls_certificate_message(raw_certificate_message)
272
273
274def extract_certificates(source_bytes):
275  if b"BEGIN CERTIFICATE" in source_bytes or b"BEGIN PKCS7" in source_bytes:
276    return extract_certificates_from_pem(source_bytes)
277
278  if b"SEQUENCE {" in source_bytes:
279    return extract_certificates_from_der_ascii(source_bytes)
280
281  if b"SSL_HANDSHAKE_MESSAGE_RECEIVED" in source_bytes:
282    return extract_tls_certificate_message(source_bytes)
283
284  # DER encoding of PKCS #7 signedData OID (1.2.840.113549.1.7.2)
285  if b"\x06\x09\x2a\x86\x48\x86\xf7\x0d\x01\x07\x02" in source_bytes:
286    return extract_certificates_from_der_pkcs7(source_bytes)
287
288  # Otherwise assume it is the DER for a single certificate
289  return [source_bytes]
290
291
292def process_data_with_command(command, data):
293  try:
294    p = subprocess.Popen(command,
295                         stdin=subprocess.PIPE,
296                         stdout=subprocess.PIPE,
297                         stderr=subprocess.PIPE)
298  except OSError as e:
299    if e.errno == errno.ENOENT:
300      sys.stderr.write("Failed to execute %s\n" % command[0])
301      return b""
302    raise
303
304  result = p.communicate(data)
305
306  if p.returncode == 0:
307    return result[0]
308
309  # Otherwise failed.
310  sys.stderr.write("Failed: %s: %s\n" % (" ".join(command), result[1]))
311  return b""
312
313
314def openssl_text_pretty_printer(certificate_der, unused_certificate_number):
315  return process_data_with_command(["openssl", "x509", "-text", "-inform",
316                                   "DER", "-noout"], certificate_der)
317
318
319def pem_pretty_printer(certificate_der, unused_certificate_number):
320  return process_data_with_command(["openssl", "x509", "-inform", "DER",
321                                   "-outform", "PEM"], certificate_der)
322
323
324def der2ascii_pretty_printer(certificate_der, unused_certificate_number):
325  return process_data_with_command(["der2ascii"], certificate_der)
326
327
328def header_pretty_printer(certificate_der, certificate_number):
329  cert_hash = hashlib.sha256(certificate_der).hexdigest()
330  s = """===========================================
331Certificate%d: %s
332===========================================""" % (certificate_number, cert_hash)
333  return s.encode("ascii")
334
335
336# This is actually just used as a magic value, since pretty_print_certificates
337# special-cases der output.
338def der_printer():
339  raise RuntimeError
340
341
342def pretty_print_certificates(certificates_der, pretty_printers):
343  # Need to special-case DER output to avoid adding any newlines, and to
344  # only allow a single certificate to be output.
345  if pretty_printers == [der_printer]:
346    if len(certificates_der) > 1:
347      sys.stderr.write("DER output only supports a single certificate, "
348                       "ignoring %d remaining certs\n" % (
349                           len(certificates_der) - 1))
350    return certificates_der[0]
351
352  result = b""
353  for i in range(len(certificates_der)):
354    certificate_der = certificates_der[i]
355    pretty = []
356    for pretty_printer in pretty_printers:
357      pretty_printed = pretty_printer(certificate_der, i)
358      if pretty_printed:
359        pretty.append(pretty_printed)
360    result += b"\n".join(pretty) + b"\n"
361  return result
362
363
364def parse_outputs(outputs):
365  pretty_printers = []
366  output_map = {"der2ascii": der2ascii_pretty_printer,
367                "openssl_text": openssl_text_pretty_printer,
368                "pem": pem_pretty_printer,
369                "header": header_pretty_printer,
370                "der": der_printer}
371  for output_name in outputs.split(','):
372    if output_name not in output_map:
373      sys.stderr.write("Invalid output type: %s\n" % output_name)
374      return []
375    pretty_printers.append(output_map[output_name])
376  if der_printer in pretty_printers and len(pretty_printers) > 1:
377    sys.stderr.write("Output type der must be used alone.\n")
378    return []
379  return pretty_printers
380
381
382def main():
383  parser = argparse.ArgumentParser(
384      description=__doc__, formatter_class=argparse.RawTextHelpFormatter)
385
386  parser.add_argument('sources', metavar='SOURCE', nargs='*',
387                      help='''Each SOURCE can be one of:
388  (1) A server name such as www.google.com.
389  (2) A PEM [*] file containing one or more CERTIFICATE or PKCS7 blocks
390  (3) A file containing one or more DER ASCII certificates
391  (4) A text NetLog dump of a TLS certificate message
392      (must include the SSL_HANDSHAKE_MESSAGE_RECEIVED line)
393  (5) A binary file containing DER-encoded PKCS #7 signedData
394  (6) A binary file containing DER-encoded certificate
395
396When multiple SOURCEs are listed, all certificates in them
397are concatenated. If no SOURCE is given then data will be
398read from stdin.
399
400[*] Parsing of PEM files is relaxed - leading indentation
401whitespace will be stripped (needed for copy-pasting data
402from NetLogs).''')
403
404  parser.add_argument('--output',
405                      dest='outputs',
406                      action='store',
407                      default="header,openssl_text,pem",
408                      help='output formats to use. Default: %(default)s')
409
410  args = parser.parse_args()
411
412  sources_bytes = read_sources_from_commandline(args.sources)
413
414  pretty_printers = parse_outputs(args.outputs)
415  if not pretty_printers:
416    sys.stderr.write('No pretty printers selected.\n')
417    sys.exit(1)
418
419  certificates_der = []
420  for source_bytes in sources_bytes:
421    certificates_der.extend(extract_certificates(source_bytes))
422
423  sys.stdout.buffer.write(
424      pretty_print_certificates(certificates_der, pretty_printers))
425
426
427if __name__ == "__main__":
428  main()
429