1*6236dae4SAndroid Build Coastguard Worker#!/usr/bin/env python3 2*6236dae4SAndroid Build Coastguard Worker# -*- coding: utf-8 -*- 3*6236dae4SAndroid Build Coastguard Worker#*************************************************************************** 4*6236dae4SAndroid Build Coastguard Worker# _ _ ____ _ 5*6236dae4SAndroid Build Coastguard Worker# Project ___| | | | _ \| | 6*6236dae4SAndroid Build Coastguard Worker# / __| | | | |_) | | 7*6236dae4SAndroid Build Coastguard Worker# | (__| |_| | _ <| |___ 8*6236dae4SAndroid Build Coastguard Worker# \___|\___/|_| \_\_____| 9*6236dae4SAndroid Build Coastguard Worker# 10*6236dae4SAndroid Build Coastguard Worker# Copyright (C) Daniel Stenberg, <[email protected]>, et al. 11*6236dae4SAndroid Build Coastguard Worker# 12*6236dae4SAndroid Build Coastguard Worker# This software is licensed as described in the file COPYING, which 13*6236dae4SAndroid Build Coastguard Worker# you should have received as part of this distribution. The terms 14*6236dae4SAndroid Build Coastguard Worker# are also available at https://curl.se/docs/copyright.html. 15*6236dae4SAndroid Build Coastguard Worker# 16*6236dae4SAndroid Build Coastguard Worker# You may opt to use, copy, modify, merge, publish, distribute and/or sell 17*6236dae4SAndroid Build Coastguard Worker# copies of the Software, and permit persons to whom the Software is 18*6236dae4SAndroid Build Coastguard Worker# furnished to do so, under the terms of the COPYING file. 19*6236dae4SAndroid Build Coastguard Worker# 20*6236dae4SAndroid Build Coastguard Worker# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY 21*6236dae4SAndroid Build Coastguard Worker# KIND, either express or implied. 22*6236dae4SAndroid Build Coastguard Worker# 23*6236dae4SAndroid Build Coastguard Worker# SPDX-License-Identifier: curl 24*6236dae4SAndroid Build Coastguard Worker# 25*6236dae4SAndroid Build Coastguard Worker########################################################################### 26*6236dae4SAndroid Build Coastguard Worker# 27*6236dae4SAndroid Build Coastguard Workerimport ipaddress 28*6236dae4SAndroid Build Coastguard Workerimport os 29*6236dae4SAndroid Build Coastguard Workerimport re 30*6236dae4SAndroid Build Coastguard Workerfrom datetime import timedelta, datetime, timezone 31*6236dae4SAndroid Build Coastguard Workerfrom typing import List, Any, Optional 32*6236dae4SAndroid Build Coastguard Worker 33*6236dae4SAndroid Build Coastguard Workerfrom cryptography import x509 34*6236dae4SAndroid Build Coastguard Workerfrom cryptography.hazmat.backends import default_backend 35*6236dae4SAndroid Build Coastguard Workerfrom cryptography.hazmat.primitives import hashes 36*6236dae4SAndroid Build Coastguard Workerfrom cryptography.hazmat.primitives.asymmetric import ec, rsa 37*6236dae4SAndroid Build Coastguard Workerfrom cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey 38*6236dae4SAndroid Build Coastguard Workerfrom cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey 39*6236dae4SAndroid Build Coastguard Workerfrom cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption, load_pem_private_key 40*6236dae4SAndroid Build Coastguard Workerfrom cryptography.x509 import ExtendedKeyUsageOID, NameOID 41*6236dae4SAndroid Build Coastguard Worker 42*6236dae4SAndroid Build Coastguard Worker 43*6236dae4SAndroid Build Coastguard WorkerEC_SUPPORTED = {} 44*6236dae4SAndroid Build Coastguard WorkerEC_SUPPORTED.update([(curve.name.upper(), curve) for curve in [ 45*6236dae4SAndroid Build Coastguard Worker ec.SECP192R1, 46*6236dae4SAndroid Build Coastguard Worker ec.SECP224R1, 47*6236dae4SAndroid Build Coastguard Worker ec.SECP256R1, 48*6236dae4SAndroid Build Coastguard Worker ec.SECP384R1, 49*6236dae4SAndroid Build Coastguard Worker]]) 50*6236dae4SAndroid Build Coastguard Worker 51*6236dae4SAndroid Build Coastguard Worker 52*6236dae4SAndroid Build Coastguard Workerdef _private_key(key_type): 53*6236dae4SAndroid Build Coastguard Worker if isinstance(key_type, str): 54*6236dae4SAndroid Build Coastguard Worker key_type = key_type.upper() 55*6236dae4SAndroid Build Coastguard Worker m = re.match(r'^(RSA)?(\d+)$', key_type) 56*6236dae4SAndroid Build Coastguard Worker if m: 57*6236dae4SAndroid Build Coastguard Worker key_type = int(m.group(2)) 58*6236dae4SAndroid Build Coastguard Worker 59*6236dae4SAndroid Build Coastguard Worker if isinstance(key_type, int): 60*6236dae4SAndroid Build Coastguard Worker return rsa.generate_private_key( 61*6236dae4SAndroid Build Coastguard Worker public_exponent=65537, 62*6236dae4SAndroid Build Coastguard Worker key_size=key_type, 63*6236dae4SAndroid Build Coastguard Worker backend=default_backend() 64*6236dae4SAndroid Build Coastguard Worker ) 65*6236dae4SAndroid Build Coastguard Worker if not isinstance(key_type, ec.EllipticCurve) and key_type in EC_SUPPORTED: 66*6236dae4SAndroid Build Coastguard Worker key_type = EC_SUPPORTED[key_type] 67*6236dae4SAndroid Build Coastguard Worker return ec.generate_private_key( 68*6236dae4SAndroid Build Coastguard Worker curve=key_type, 69*6236dae4SAndroid Build Coastguard Worker backend=default_backend() 70*6236dae4SAndroid Build Coastguard Worker ) 71*6236dae4SAndroid Build Coastguard Worker 72*6236dae4SAndroid Build Coastguard Worker 73*6236dae4SAndroid Build Coastguard Workerclass CertificateSpec: 74*6236dae4SAndroid Build Coastguard Worker 75*6236dae4SAndroid Build Coastguard Worker def __init__(self, name: Optional[str] = None, 76*6236dae4SAndroid Build Coastguard Worker domains: Optional[List[str]] = None, 77*6236dae4SAndroid Build Coastguard Worker email: Optional[str] = None, 78*6236dae4SAndroid Build Coastguard Worker key_type: Optional[str] = None, 79*6236dae4SAndroid Build Coastguard Worker single_file: bool = False, 80*6236dae4SAndroid Build Coastguard Worker valid_from: timedelta = timedelta(days=-1), 81*6236dae4SAndroid Build Coastguard Worker valid_to: timedelta = timedelta(days=89), 82*6236dae4SAndroid Build Coastguard Worker client: bool = False, 83*6236dae4SAndroid Build Coastguard Worker check_valid: bool = True, 84*6236dae4SAndroid Build Coastguard Worker sub_specs: Optional[List['CertificateSpec']] = None): 85*6236dae4SAndroid Build Coastguard Worker self._name = name 86*6236dae4SAndroid Build Coastguard Worker self.domains = domains 87*6236dae4SAndroid Build Coastguard Worker self.client = client 88*6236dae4SAndroid Build Coastguard Worker self.email = email 89*6236dae4SAndroid Build Coastguard Worker self.key_type = key_type 90*6236dae4SAndroid Build Coastguard Worker self.single_file = single_file 91*6236dae4SAndroid Build Coastguard Worker self.valid_from = valid_from 92*6236dae4SAndroid Build Coastguard Worker self.valid_to = valid_to 93*6236dae4SAndroid Build Coastguard Worker self.sub_specs = sub_specs 94*6236dae4SAndroid Build Coastguard Worker self.check_valid = check_valid 95*6236dae4SAndroid Build Coastguard Worker 96*6236dae4SAndroid Build Coastguard Worker @property 97*6236dae4SAndroid Build Coastguard Worker def name(self) -> Optional[str]: 98*6236dae4SAndroid Build Coastguard Worker if self._name: 99*6236dae4SAndroid Build Coastguard Worker return self._name 100*6236dae4SAndroid Build Coastguard Worker elif self.domains: 101*6236dae4SAndroid Build Coastguard Worker return self.domains[0] 102*6236dae4SAndroid Build Coastguard Worker return None 103*6236dae4SAndroid Build Coastguard Worker 104*6236dae4SAndroid Build Coastguard Worker @property 105*6236dae4SAndroid Build Coastguard Worker def type(self) -> Optional[str]: 106*6236dae4SAndroid Build Coastguard Worker if self.domains and len(self.domains): 107*6236dae4SAndroid Build Coastguard Worker return "server" 108*6236dae4SAndroid Build Coastguard Worker elif self.client: 109*6236dae4SAndroid Build Coastguard Worker return "client" 110*6236dae4SAndroid Build Coastguard Worker elif self.name: 111*6236dae4SAndroid Build Coastguard Worker return "ca" 112*6236dae4SAndroid Build Coastguard Worker return None 113*6236dae4SAndroid Build Coastguard Worker 114*6236dae4SAndroid Build Coastguard Worker 115*6236dae4SAndroid Build Coastguard Workerclass Credentials: 116*6236dae4SAndroid Build Coastguard Worker 117*6236dae4SAndroid Build Coastguard Worker def __init__(self, 118*6236dae4SAndroid Build Coastguard Worker name: str, 119*6236dae4SAndroid Build Coastguard Worker cert: Any, 120*6236dae4SAndroid Build Coastguard Worker pkey: Any, 121*6236dae4SAndroid Build Coastguard Worker issuer: Optional['Credentials'] = None): 122*6236dae4SAndroid Build Coastguard Worker self._name = name 123*6236dae4SAndroid Build Coastguard Worker self._cert = cert 124*6236dae4SAndroid Build Coastguard Worker self._pkey = pkey 125*6236dae4SAndroid Build Coastguard Worker self._issuer = issuer 126*6236dae4SAndroid Build Coastguard Worker self._cert_file = None 127*6236dae4SAndroid Build Coastguard Worker self._pkey_file = None 128*6236dae4SAndroid Build Coastguard Worker self._store = None 129*6236dae4SAndroid Build Coastguard Worker self._combined_file = None 130*6236dae4SAndroid Build Coastguard Worker 131*6236dae4SAndroid Build Coastguard Worker @property 132*6236dae4SAndroid Build Coastguard Worker def name(self) -> str: 133*6236dae4SAndroid Build Coastguard Worker return self._name 134*6236dae4SAndroid Build Coastguard Worker 135*6236dae4SAndroid Build Coastguard Worker @property 136*6236dae4SAndroid Build Coastguard Worker def subject(self) -> x509.Name: 137*6236dae4SAndroid Build Coastguard Worker return self._cert.subject 138*6236dae4SAndroid Build Coastguard Worker 139*6236dae4SAndroid Build Coastguard Worker @property 140*6236dae4SAndroid Build Coastguard Worker def key_type(self): 141*6236dae4SAndroid Build Coastguard Worker if isinstance(self._pkey, RSAPrivateKey): 142*6236dae4SAndroid Build Coastguard Worker return f"rsa{self._pkey.key_size}" 143*6236dae4SAndroid Build Coastguard Worker elif isinstance(self._pkey, EllipticCurvePrivateKey): 144*6236dae4SAndroid Build Coastguard Worker return f"{self._pkey.curve.name}" 145*6236dae4SAndroid Build Coastguard Worker else: 146*6236dae4SAndroid Build Coastguard Worker raise Exception(f"unknown key type: {self._pkey}") 147*6236dae4SAndroid Build Coastguard Worker 148*6236dae4SAndroid Build Coastguard Worker @property 149*6236dae4SAndroid Build Coastguard Worker def private_key(self) -> Any: 150*6236dae4SAndroid Build Coastguard Worker return self._pkey 151*6236dae4SAndroid Build Coastguard Worker 152*6236dae4SAndroid Build Coastguard Worker @property 153*6236dae4SAndroid Build Coastguard Worker def certificate(self) -> Any: 154*6236dae4SAndroid Build Coastguard Worker return self._cert 155*6236dae4SAndroid Build Coastguard Worker 156*6236dae4SAndroid Build Coastguard Worker @property 157*6236dae4SAndroid Build Coastguard Worker def cert_pem(self) -> bytes: 158*6236dae4SAndroid Build Coastguard Worker return self._cert.public_bytes(Encoding.PEM) 159*6236dae4SAndroid Build Coastguard Worker 160*6236dae4SAndroid Build Coastguard Worker @property 161*6236dae4SAndroid Build Coastguard Worker def pkey_pem(self) -> bytes: 162*6236dae4SAndroid Build Coastguard Worker return self._pkey.private_bytes( 163*6236dae4SAndroid Build Coastguard Worker Encoding.PEM, 164*6236dae4SAndroid Build Coastguard Worker PrivateFormat.TraditionalOpenSSL if self.key_type.startswith('rsa') else PrivateFormat.PKCS8, 165*6236dae4SAndroid Build Coastguard Worker NoEncryption()) 166*6236dae4SAndroid Build Coastguard Worker 167*6236dae4SAndroid Build Coastguard Worker @property 168*6236dae4SAndroid Build Coastguard Worker def issuer(self) -> Optional['Credentials']: 169*6236dae4SAndroid Build Coastguard Worker return self._issuer 170*6236dae4SAndroid Build Coastguard Worker 171*6236dae4SAndroid Build Coastguard Worker def set_store(self, store: 'CertStore'): 172*6236dae4SAndroid Build Coastguard Worker self._store = store 173*6236dae4SAndroid Build Coastguard Worker 174*6236dae4SAndroid Build Coastguard Worker def set_files(self, cert_file: str, pkey_file: Optional[str] = None, 175*6236dae4SAndroid Build Coastguard Worker combined_file: Optional[str] = None): 176*6236dae4SAndroid Build Coastguard Worker self._cert_file = cert_file 177*6236dae4SAndroid Build Coastguard Worker self._pkey_file = pkey_file 178*6236dae4SAndroid Build Coastguard Worker self._combined_file = combined_file 179*6236dae4SAndroid Build Coastguard Worker 180*6236dae4SAndroid Build Coastguard Worker @property 181*6236dae4SAndroid Build Coastguard Worker def cert_file(self) -> str: 182*6236dae4SAndroid Build Coastguard Worker return self._cert_file 183*6236dae4SAndroid Build Coastguard Worker 184*6236dae4SAndroid Build Coastguard Worker @property 185*6236dae4SAndroid Build Coastguard Worker def pkey_file(self) -> Optional[str]: 186*6236dae4SAndroid Build Coastguard Worker return self._pkey_file 187*6236dae4SAndroid Build Coastguard Worker 188*6236dae4SAndroid Build Coastguard Worker @property 189*6236dae4SAndroid Build Coastguard Worker def combined_file(self) -> Optional[str]: 190*6236dae4SAndroid Build Coastguard Worker return self._combined_file 191*6236dae4SAndroid Build Coastguard Worker 192*6236dae4SAndroid Build Coastguard Worker def get_first(self, name) -> Optional['Credentials']: 193*6236dae4SAndroid Build Coastguard Worker creds = self._store.get_credentials_for_name(name) if self._store else [] 194*6236dae4SAndroid Build Coastguard Worker return creds[0] if len(creds) else None 195*6236dae4SAndroid Build Coastguard Worker 196*6236dae4SAndroid Build Coastguard Worker def get_credentials_for_name(self, name) -> List['Credentials']: 197*6236dae4SAndroid Build Coastguard Worker return self._store.get_credentials_for_name(name) if self._store else [] 198*6236dae4SAndroid Build Coastguard Worker 199*6236dae4SAndroid Build Coastguard Worker def issue_certs(self, specs: List[CertificateSpec], 200*6236dae4SAndroid Build Coastguard Worker chain: Optional[List['Credentials']] = None) -> List['Credentials']: 201*6236dae4SAndroid Build Coastguard Worker return [self.issue_cert(spec=spec, chain=chain) for spec in specs] 202*6236dae4SAndroid Build Coastguard Worker 203*6236dae4SAndroid Build Coastguard Worker def issue_cert(self, spec: CertificateSpec, 204*6236dae4SAndroid Build Coastguard Worker chain: Optional[List['Credentials']] = None) -> 'Credentials': 205*6236dae4SAndroid Build Coastguard Worker key_type = spec.key_type if spec.key_type else self.key_type 206*6236dae4SAndroid Build Coastguard Worker creds = None 207*6236dae4SAndroid Build Coastguard Worker if self._store: 208*6236dae4SAndroid Build Coastguard Worker creds = self._store.load_credentials( 209*6236dae4SAndroid Build Coastguard Worker name=spec.name, key_type=key_type, single_file=spec.single_file, 210*6236dae4SAndroid Build Coastguard Worker issuer=self, check_valid=spec.check_valid) 211*6236dae4SAndroid Build Coastguard Worker if creds is None: 212*6236dae4SAndroid Build Coastguard Worker creds = TestCA.create_credentials(spec=spec, issuer=self, key_type=key_type, 213*6236dae4SAndroid Build Coastguard Worker valid_from=spec.valid_from, valid_to=spec.valid_to) 214*6236dae4SAndroid Build Coastguard Worker if self._store: 215*6236dae4SAndroid Build Coastguard Worker self._store.save(creds, single_file=spec.single_file) 216*6236dae4SAndroid Build Coastguard Worker if spec.type == "ca": 217*6236dae4SAndroid Build Coastguard Worker self._store.save_chain(creds, "ca", with_root=True) 218*6236dae4SAndroid Build Coastguard Worker 219*6236dae4SAndroid Build Coastguard Worker if spec.sub_specs: 220*6236dae4SAndroid Build Coastguard Worker if self._store: 221*6236dae4SAndroid Build Coastguard Worker sub_store = CertStore(fpath=os.path.join(self._store.path, creds.name)) 222*6236dae4SAndroid Build Coastguard Worker creds.set_store(sub_store) 223*6236dae4SAndroid Build Coastguard Worker subchain = chain.copy() if chain else [] 224*6236dae4SAndroid Build Coastguard Worker subchain.append(self) 225*6236dae4SAndroid Build Coastguard Worker creds.issue_certs(spec.sub_specs, chain=subchain) 226*6236dae4SAndroid Build Coastguard Worker return creds 227*6236dae4SAndroid Build Coastguard Worker 228*6236dae4SAndroid Build Coastguard Worker 229*6236dae4SAndroid Build Coastguard Workerclass CertStore: 230*6236dae4SAndroid Build Coastguard Worker 231*6236dae4SAndroid Build Coastguard Worker def __init__(self, fpath: str): 232*6236dae4SAndroid Build Coastguard Worker self._store_dir = fpath 233*6236dae4SAndroid Build Coastguard Worker if not os.path.exists(self._store_dir): 234*6236dae4SAndroid Build Coastguard Worker os.makedirs(self._store_dir) 235*6236dae4SAndroid Build Coastguard Worker self._creds_by_name = {} 236*6236dae4SAndroid Build Coastguard Worker 237*6236dae4SAndroid Build Coastguard Worker @property 238*6236dae4SAndroid Build Coastguard Worker def path(self) -> str: 239*6236dae4SAndroid Build Coastguard Worker return self._store_dir 240*6236dae4SAndroid Build Coastguard Worker 241*6236dae4SAndroid Build Coastguard Worker def save(self, creds: Credentials, name: Optional[str] = None, 242*6236dae4SAndroid Build Coastguard Worker chain: Optional[List[Credentials]] = None, 243*6236dae4SAndroid Build Coastguard Worker single_file: bool = False) -> None: 244*6236dae4SAndroid Build Coastguard Worker name = name if name is not None else creds.name 245*6236dae4SAndroid Build Coastguard Worker cert_file = self.get_cert_file(name=name, key_type=creds.key_type) 246*6236dae4SAndroid Build Coastguard Worker pkey_file = self.get_pkey_file(name=name, key_type=creds.key_type) 247*6236dae4SAndroid Build Coastguard Worker comb_file = self.get_combined_file(name=name, key_type=creds.key_type) 248*6236dae4SAndroid Build Coastguard Worker if single_file: 249*6236dae4SAndroid Build Coastguard Worker pkey_file = None 250*6236dae4SAndroid Build Coastguard Worker with open(cert_file, "wb") as fd: 251*6236dae4SAndroid Build Coastguard Worker fd.write(creds.cert_pem) 252*6236dae4SAndroid Build Coastguard Worker if chain: 253*6236dae4SAndroid Build Coastguard Worker for c in chain: 254*6236dae4SAndroid Build Coastguard Worker fd.write(c.cert_pem) 255*6236dae4SAndroid Build Coastguard Worker if pkey_file is None: 256*6236dae4SAndroid Build Coastguard Worker fd.write(creds.pkey_pem) 257*6236dae4SAndroid Build Coastguard Worker if pkey_file is not None: 258*6236dae4SAndroid Build Coastguard Worker with open(pkey_file, "wb") as fd: 259*6236dae4SAndroid Build Coastguard Worker fd.write(creds.pkey_pem) 260*6236dae4SAndroid Build Coastguard Worker with open(comb_file, "wb") as fd: 261*6236dae4SAndroid Build Coastguard Worker fd.write(creds.cert_pem) 262*6236dae4SAndroid Build Coastguard Worker if chain: 263*6236dae4SAndroid Build Coastguard Worker for c in chain: 264*6236dae4SAndroid Build Coastguard Worker fd.write(c.cert_pem) 265*6236dae4SAndroid Build Coastguard Worker fd.write(creds.pkey_pem) 266*6236dae4SAndroid Build Coastguard Worker creds.set_files(cert_file, pkey_file, comb_file) 267*6236dae4SAndroid Build Coastguard Worker self._add_credentials(name, creds) 268*6236dae4SAndroid Build Coastguard Worker 269*6236dae4SAndroid Build Coastguard Worker def save_chain(self, creds: Credentials, infix: str, with_root=False): 270*6236dae4SAndroid Build Coastguard Worker name = creds.name 271*6236dae4SAndroid Build Coastguard Worker chain = [creds] 272*6236dae4SAndroid Build Coastguard Worker while creds.issuer is not None: 273*6236dae4SAndroid Build Coastguard Worker creds = creds.issuer 274*6236dae4SAndroid Build Coastguard Worker chain.append(creds) 275*6236dae4SAndroid Build Coastguard Worker if not with_root and len(chain) > 1: 276*6236dae4SAndroid Build Coastguard Worker chain = chain[:-1] 277*6236dae4SAndroid Build Coastguard Worker chain_file = os.path.join(self._store_dir, f'{name}-{infix}.pem') 278*6236dae4SAndroid Build Coastguard Worker with open(chain_file, "wb") as fd: 279*6236dae4SAndroid Build Coastguard Worker for c in chain: 280*6236dae4SAndroid Build Coastguard Worker fd.write(c.cert_pem) 281*6236dae4SAndroid Build Coastguard Worker 282*6236dae4SAndroid Build Coastguard Worker def _add_credentials(self, name: str, creds: Credentials): 283*6236dae4SAndroid Build Coastguard Worker if name not in self._creds_by_name: 284*6236dae4SAndroid Build Coastguard Worker self._creds_by_name[name] = [] 285*6236dae4SAndroid Build Coastguard Worker self._creds_by_name[name].append(creds) 286*6236dae4SAndroid Build Coastguard Worker 287*6236dae4SAndroid Build Coastguard Worker def get_credentials_for_name(self, name) -> List[Credentials]: 288*6236dae4SAndroid Build Coastguard Worker return self._creds_by_name[name] if name in self._creds_by_name else [] 289*6236dae4SAndroid Build Coastguard Worker 290*6236dae4SAndroid Build Coastguard Worker def get_cert_file(self, name: str, key_type=None) -> str: 291*6236dae4SAndroid Build Coastguard Worker key_infix = ".{0}".format(key_type) if key_type is not None else "" 292*6236dae4SAndroid Build Coastguard Worker return os.path.join(self._store_dir, f'{name}{key_infix}.cert.pem') 293*6236dae4SAndroid Build Coastguard Worker 294*6236dae4SAndroid Build Coastguard Worker def get_pkey_file(self, name: str, key_type=None) -> str: 295*6236dae4SAndroid Build Coastguard Worker key_infix = ".{0}".format(key_type) if key_type is not None else "" 296*6236dae4SAndroid Build Coastguard Worker return os.path.join(self._store_dir, f'{name}{key_infix}.pkey.pem') 297*6236dae4SAndroid Build Coastguard Worker 298*6236dae4SAndroid Build Coastguard Worker def get_combined_file(self, name: str, key_type=None) -> str: 299*6236dae4SAndroid Build Coastguard Worker return os.path.join(self._store_dir, f'{name}.pem') 300*6236dae4SAndroid Build Coastguard Worker 301*6236dae4SAndroid Build Coastguard Worker def load_pem_cert(self, fpath: str) -> x509.Certificate: 302*6236dae4SAndroid Build Coastguard Worker with open(fpath) as fd: 303*6236dae4SAndroid Build Coastguard Worker return x509.load_pem_x509_certificate("".join(fd.readlines()).encode()) 304*6236dae4SAndroid Build Coastguard Worker 305*6236dae4SAndroid Build Coastguard Worker def load_pem_pkey(self, fpath: str): 306*6236dae4SAndroid Build Coastguard Worker with open(fpath) as fd: 307*6236dae4SAndroid Build Coastguard Worker return load_pem_private_key("".join(fd.readlines()).encode(), password=None) 308*6236dae4SAndroid Build Coastguard Worker 309*6236dae4SAndroid Build Coastguard Worker def load_credentials(self, name: str, key_type=None, 310*6236dae4SAndroid Build Coastguard Worker single_file: bool = False, 311*6236dae4SAndroid Build Coastguard Worker issuer: Optional[Credentials] = None, 312*6236dae4SAndroid Build Coastguard Worker check_valid: bool = False): 313*6236dae4SAndroid Build Coastguard Worker cert_file = self.get_cert_file(name=name, key_type=key_type) 314*6236dae4SAndroid Build Coastguard Worker pkey_file = cert_file if single_file else self.get_pkey_file(name=name, key_type=key_type) 315*6236dae4SAndroid Build Coastguard Worker comb_file = self.get_combined_file(name=name, key_type=key_type) 316*6236dae4SAndroid Build Coastguard Worker if os.path.isfile(cert_file) and os.path.isfile(pkey_file): 317*6236dae4SAndroid Build Coastguard Worker cert = self.load_pem_cert(cert_file) 318*6236dae4SAndroid Build Coastguard Worker pkey = self.load_pem_pkey(pkey_file) 319*6236dae4SAndroid Build Coastguard Worker try: 320*6236dae4SAndroid Build Coastguard Worker now = datetime.now(tz=timezone.utc) 321*6236dae4SAndroid Build Coastguard Worker if check_valid and \ 322*6236dae4SAndroid Build Coastguard Worker ((cert.not_valid_after_utc < now) or 323*6236dae4SAndroid Build Coastguard Worker (cert.not_valid_before_utc > now)): 324*6236dae4SAndroid Build Coastguard Worker return None 325*6236dae4SAndroid Build Coastguard Worker except AttributeError: # older python 326*6236dae4SAndroid Build Coastguard Worker now = datetime.now() 327*6236dae4SAndroid Build Coastguard Worker if check_valid and \ 328*6236dae4SAndroid Build Coastguard Worker ((cert.not_valid_after < now) or 329*6236dae4SAndroid Build Coastguard Worker (cert.not_valid_before > now)): 330*6236dae4SAndroid Build Coastguard Worker return None 331*6236dae4SAndroid Build Coastguard Worker creds = Credentials(name=name, cert=cert, pkey=pkey, issuer=issuer) 332*6236dae4SAndroid Build Coastguard Worker creds.set_store(self) 333*6236dae4SAndroid Build Coastguard Worker creds.set_files(cert_file, pkey_file, comb_file) 334*6236dae4SAndroid Build Coastguard Worker self._add_credentials(name, creds) 335*6236dae4SAndroid Build Coastguard Worker return creds 336*6236dae4SAndroid Build Coastguard Worker return None 337*6236dae4SAndroid Build Coastguard Worker 338*6236dae4SAndroid Build Coastguard Worker 339*6236dae4SAndroid Build Coastguard Workerclass TestCA: 340*6236dae4SAndroid Build Coastguard Worker 341*6236dae4SAndroid Build Coastguard Worker @classmethod 342*6236dae4SAndroid Build Coastguard Worker def create_root(cls, name: str, store_dir: str, key_type: str = "rsa2048") -> Credentials: 343*6236dae4SAndroid Build Coastguard Worker store = CertStore(fpath=store_dir) 344*6236dae4SAndroid Build Coastguard Worker creds = store.load_credentials(name="ca", key_type=key_type, issuer=None) 345*6236dae4SAndroid Build Coastguard Worker if creds is None: 346*6236dae4SAndroid Build Coastguard Worker creds = TestCA._make_ca_credentials(name=name, key_type=key_type) 347*6236dae4SAndroid Build Coastguard Worker store.save(creds, name="ca") 348*6236dae4SAndroid Build Coastguard Worker creds.set_store(store) 349*6236dae4SAndroid Build Coastguard Worker return creds 350*6236dae4SAndroid Build Coastguard Worker 351*6236dae4SAndroid Build Coastguard Worker @staticmethod 352*6236dae4SAndroid Build Coastguard Worker def create_credentials(spec: CertificateSpec, issuer: Credentials, key_type: Any, 353*6236dae4SAndroid Build Coastguard Worker valid_from: timedelta = timedelta(days=-1), 354*6236dae4SAndroid Build Coastguard Worker valid_to: timedelta = timedelta(days=89), 355*6236dae4SAndroid Build Coastguard Worker ) -> Credentials: 356*6236dae4SAndroid Build Coastguard Worker """ 357*6236dae4SAndroid Build Coastguard Worker Create a certificate signed by this CA for the given domains. 358*6236dae4SAndroid Build Coastguard Worker 359*6236dae4SAndroid Build Coastguard Worker :returns: the certificate and private key PEM file paths 360*6236dae4SAndroid Build Coastguard Worker """ 361*6236dae4SAndroid Build Coastguard Worker if spec.domains and len(spec.domains): 362*6236dae4SAndroid Build Coastguard Worker creds = TestCA._make_server_credentials(name=spec.name, domains=spec.domains, 363*6236dae4SAndroid Build Coastguard Worker issuer=issuer, valid_from=valid_from, 364*6236dae4SAndroid Build Coastguard Worker valid_to=valid_to, key_type=key_type) 365*6236dae4SAndroid Build Coastguard Worker elif spec.client: 366*6236dae4SAndroid Build Coastguard Worker creds = TestCA._make_client_credentials(name=spec.name, issuer=issuer, 367*6236dae4SAndroid Build Coastguard Worker email=spec.email, valid_from=valid_from, 368*6236dae4SAndroid Build Coastguard Worker valid_to=valid_to, key_type=key_type) 369*6236dae4SAndroid Build Coastguard Worker elif spec.name: 370*6236dae4SAndroid Build Coastguard Worker creds = TestCA._make_ca_credentials(name=spec.name, issuer=issuer, 371*6236dae4SAndroid Build Coastguard Worker valid_from=valid_from, valid_to=valid_to, 372*6236dae4SAndroid Build Coastguard Worker key_type=key_type) 373*6236dae4SAndroid Build Coastguard Worker else: 374*6236dae4SAndroid Build Coastguard Worker raise Exception(f"unrecognized certificate specification: {spec}") 375*6236dae4SAndroid Build Coastguard Worker return creds 376*6236dae4SAndroid Build Coastguard Worker 377*6236dae4SAndroid Build Coastguard Worker @staticmethod 378*6236dae4SAndroid Build Coastguard Worker def _make_x509_name(org_name: Optional[str] = None, common_name: Optional[str] = None, parent: x509.Name = None) -> x509.Name: 379*6236dae4SAndroid Build Coastguard Worker name_pieces = [] 380*6236dae4SAndroid Build Coastguard Worker if org_name: 381*6236dae4SAndroid Build Coastguard Worker oid = NameOID.ORGANIZATIONAL_UNIT_NAME if parent else NameOID.ORGANIZATION_NAME 382*6236dae4SAndroid Build Coastguard Worker name_pieces.append(x509.NameAttribute(oid, org_name)) 383*6236dae4SAndroid Build Coastguard Worker elif common_name: 384*6236dae4SAndroid Build Coastguard Worker name_pieces.append(x509.NameAttribute(NameOID.COMMON_NAME, common_name)) 385*6236dae4SAndroid Build Coastguard Worker if parent: 386*6236dae4SAndroid Build Coastguard Worker name_pieces.extend(list(parent)) 387*6236dae4SAndroid Build Coastguard Worker return x509.Name(name_pieces) 388*6236dae4SAndroid Build Coastguard Worker 389*6236dae4SAndroid Build Coastguard Worker @staticmethod 390*6236dae4SAndroid Build Coastguard Worker def _make_csr( 391*6236dae4SAndroid Build Coastguard Worker subject: x509.Name, 392*6236dae4SAndroid Build Coastguard Worker pkey: Any, 393*6236dae4SAndroid Build Coastguard Worker issuer_subject: Optional[Credentials], 394*6236dae4SAndroid Build Coastguard Worker valid_from_delta: Optional[timedelta] = None, 395*6236dae4SAndroid Build Coastguard Worker valid_until_delta: Optional[timedelta] = None 396*6236dae4SAndroid Build Coastguard Worker ): 397*6236dae4SAndroid Build Coastguard Worker pubkey = pkey.public_key() 398*6236dae4SAndroid Build Coastguard Worker issuer_subject = issuer_subject if issuer_subject is not None else subject 399*6236dae4SAndroid Build Coastguard Worker 400*6236dae4SAndroid Build Coastguard Worker valid_from = datetime.now() 401*6236dae4SAndroid Build Coastguard Worker if valid_until_delta is not None: 402*6236dae4SAndroid Build Coastguard Worker valid_from += valid_from_delta 403*6236dae4SAndroid Build Coastguard Worker valid_until = datetime.now() 404*6236dae4SAndroid Build Coastguard Worker if valid_until_delta is not None: 405*6236dae4SAndroid Build Coastguard Worker valid_until += valid_until_delta 406*6236dae4SAndroid Build Coastguard Worker 407*6236dae4SAndroid Build Coastguard Worker return ( 408*6236dae4SAndroid Build Coastguard Worker x509.CertificateBuilder() 409*6236dae4SAndroid Build Coastguard Worker .subject_name(subject) 410*6236dae4SAndroid Build Coastguard Worker .issuer_name(issuer_subject) 411*6236dae4SAndroid Build Coastguard Worker .public_key(pubkey) 412*6236dae4SAndroid Build Coastguard Worker .not_valid_before(valid_from) 413*6236dae4SAndroid Build Coastguard Worker .not_valid_after(valid_until) 414*6236dae4SAndroid Build Coastguard Worker .serial_number(x509.random_serial_number()) 415*6236dae4SAndroid Build Coastguard Worker .add_extension( 416*6236dae4SAndroid Build Coastguard Worker x509.SubjectKeyIdentifier.from_public_key(pubkey), 417*6236dae4SAndroid Build Coastguard Worker critical=False, 418*6236dae4SAndroid Build Coastguard Worker ) 419*6236dae4SAndroid Build Coastguard Worker ) 420*6236dae4SAndroid Build Coastguard Worker 421*6236dae4SAndroid Build Coastguard Worker @staticmethod 422*6236dae4SAndroid Build Coastguard Worker def _add_ca_usages(csr: Any) -> Any: 423*6236dae4SAndroid Build Coastguard Worker return csr.add_extension( 424*6236dae4SAndroid Build Coastguard Worker x509.BasicConstraints(ca=True, path_length=9), 425*6236dae4SAndroid Build Coastguard Worker critical=True, 426*6236dae4SAndroid Build Coastguard Worker ).add_extension( 427*6236dae4SAndroid Build Coastguard Worker x509.KeyUsage( 428*6236dae4SAndroid Build Coastguard Worker digital_signature=True, 429*6236dae4SAndroid Build Coastguard Worker content_commitment=False, 430*6236dae4SAndroid Build Coastguard Worker key_encipherment=False, 431*6236dae4SAndroid Build Coastguard Worker data_encipherment=False, 432*6236dae4SAndroid Build Coastguard Worker key_agreement=False, 433*6236dae4SAndroid Build Coastguard Worker key_cert_sign=True, 434*6236dae4SAndroid Build Coastguard Worker crl_sign=True, 435*6236dae4SAndroid Build Coastguard Worker encipher_only=False, 436*6236dae4SAndroid Build Coastguard Worker decipher_only=False), 437*6236dae4SAndroid Build Coastguard Worker critical=True 438*6236dae4SAndroid Build Coastguard Worker ).add_extension( 439*6236dae4SAndroid Build Coastguard Worker x509.ExtendedKeyUsage([ 440*6236dae4SAndroid Build Coastguard Worker ExtendedKeyUsageOID.CLIENT_AUTH, 441*6236dae4SAndroid Build Coastguard Worker ExtendedKeyUsageOID.SERVER_AUTH, 442*6236dae4SAndroid Build Coastguard Worker ExtendedKeyUsageOID.CODE_SIGNING, 443*6236dae4SAndroid Build Coastguard Worker ]), 444*6236dae4SAndroid Build Coastguard Worker critical=True 445*6236dae4SAndroid Build Coastguard Worker ) 446*6236dae4SAndroid Build Coastguard Worker 447*6236dae4SAndroid Build Coastguard Worker @staticmethod 448*6236dae4SAndroid Build Coastguard Worker def _add_leaf_usages(csr: Any, domains: List[str], issuer: Credentials) -> Any: 449*6236dae4SAndroid Build Coastguard Worker names = [] 450*6236dae4SAndroid Build Coastguard Worker for name in domains: 451*6236dae4SAndroid Build Coastguard Worker try: 452*6236dae4SAndroid Build Coastguard Worker names.append(x509.IPAddress(ipaddress.ip_address(name))) 453*6236dae4SAndroid Build Coastguard Worker # TODO: specify specific exceptions here 454*6236dae4SAndroid Build Coastguard Worker except: # noqa: E722 455*6236dae4SAndroid Build Coastguard Worker names.append(x509.DNSName(name)) 456*6236dae4SAndroid Build Coastguard Worker 457*6236dae4SAndroid Build Coastguard Worker return csr.add_extension( 458*6236dae4SAndroid Build Coastguard Worker x509.BasicConstraints(ca=False, path_length=None), 459*6236dae4SAndroid Build Coastguard Worker critical=True, 460*6236dae4SAndroid Build Coastguard Worker ).add_extension( 461*6236dae4SAndroid Build Coastguard Worker x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier( 462*6236dae4SAndroid Build Coastguard Worker issuer.certificate.extensions.get_extension_for_class( 463*6236dae4SAndroid Build Coastguard Worker x509.SubjectKeyIdentifier).value), 464*6236dae4SAndroid Build Coastguard Worker critical=False 465*6236dae4SAndroid Build Coastguard Worker ).add_extension( 466*6236dae4SAndroid Build Coastguard Worker x509.SubjectAlternativeName(names), critical=True, 467*6236dae4SAndroid Build Coastguard Worker ).add_extension( 468*6236dae4SAndroid Build Coastguard Worker x509.ExtendedKeyUsage([ 469*6236dae4SAndroid Build Coastguard Worker ExtendedKeyUsageOID.SERVER_AUTH, 470*6236dae4SAndroid Build Coastguard Worker ]), 471*6236dae4SAndroid Build Coastguard Worker critical=False 472*6236dae4SAndroid Build Coastguard Worker ) 473*6236dae4SAndroid Build Coastguard Worker 474*6236dae4SAndroid Build Coastguard Worker @staticmethod 475*6236dae4SAndroid Build Coastguard Worker def _add_client_usages(csr: Any, issuer: Credentials, rfc82name: Optional[str] = None) -> Any: 476*6236dae4SAndroid Build Coastguard Worker cert = csr.add_extension( 477*6236dae4SAndroid Build Coastguard Worker x509.BasicConstraints(ca=False, path_length=None), 478*6236dae4SAndroid Build Coastguard Worker critical=True, 479*6236dae4SAndroid Build Coastguard Worker ).add_extension( 480*6236dae4SAndroid Build Coastguard Worker x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier( 481*6236dae4SAndroid Build Coastguard Worker issuer.certificate.extensions.get_extension_for_class( 482*6236dae4SAndroid Build Coastguard Worker x509.SubjectKeyIdentifier).value), 483*6236dae4SAndroid Build Coastguard Worker critical=False 484*6236dae4SAndroid Build Coastguard Worker ) 485*6236dae4SAndroid Build Coastguard Worker if rfc82name: 486*6236dae4SAndroid Build Coastguard Worker cert.add_extension( 487*6236dae4SAndroid Build Coastguard Worker x509.SubjectAlternativeName([x509.RFC822Name(rfc82name)]), 488*6236dae4SAndroid Build Coastguard Worker critical=True, 489*6236dae4SAndroid Build Coastguard Worker ) 490*6236dae4SAndroid Build Coastguard Worker cert.add_extension( 491*6236dae4SAndroid Build Coastguard Worker x509.ExtendedKeyUsage([ 492*6236dae4SAndroid Build Coastguard Worker ExtendedKeyUsageOID.CLIENT_AUTH, 493*6236dae4SAndroid Build Coastguard Worker ]), 494*6236dae4SAndroid Build Coastguard Worker critical=True 495*6236dae4SAndroid Build Coastguard Worker ) 496*6236dae4SAndroid Build Coastguard Worker return cert 497*6236dae4SAndroid Build Coastguard Worker 498*6236dae4SAndroid Build Coastguard Worker @staticmethod 499*6236dae4SAndroid Build Coastguard Worker def _make_ca_credentials(name, key_type: Any, 500*6236dae4SAndroid Build Coastguard Worker issuer: Optional[Credentials] = None, 501*6236dae4SAndroid Build Coastguard Worker valid_from: timedelta = timedelta(days=-1), 502*6236dae4SAndroid Build Coastguard Worker valid_to: timedelta = timedelta(days=89), 503*6236dae4SAndroid Build Coastguard Worker ) -> Credentials: 504*6236dae4SAndroid Build Coastguard Worker pkey = _private_key(key_type=key_type) 505*6236dae4SAndroid Build Coastguard Worker if issuer is not None: 506*6236dae4SAndroid Build Coastguard Worker issuer_subject = issuer.certificate.subject 507*6236dae4SAndroid Build Coastguard Worker issuer_key = issuer.private_key 508*6236dae4SAndroid Build Coastguard Worker else: 509*6236dae4SAndroid Build Coastguard Worker issuer_subject = None 510*6236dae4SAndroid Build Coastguard Worker issuer_key = pkey 511*6236dae4SAndroid Build Coastguard Worker subject = TestCA._make_x509_name(org_name=name, parent=issuer.subject if issuer else None) 512*6236dae4SAndroid Build Coastguard Worker csr = TestCA._make_csr(subject=subject, 513*6236dae4SAndroid Build Coastguard Worker issuer_subject=issuer_subject, pkey=pkey, 514*6236dae4SAndroid Build Coastguard Worker valid_from_delta=valid_from, valid_until_delta=valid_to) 515*6236dae4SAndroid Build Coastguard Worker csr = TestCA._add_ca_usages(csr) 516*6236dae4SAndroid Build Coastguard Worker cert = csr.sign(private_key=issuer_key, 517*6236dae4SAndroid Build Coastguard Worker algorithm=hashes.SHA256(), 518*6236dae4SAndroid Build Coastguard Worker backend=default_backend()) 519*6236dae4SAndroid Build Coastguard Worker return Credentials(name=name, cert=cert, pkey=pkey, issuer=issuer) 520*6236dae4SAndroid Build Coastguard Worker 521*6236dae4SAndroid Build Coastguard Worker @staticmethod 522*6236dae4SAndroid Build Coastguard Worker def _make_server_credentials(name: str, domains: List[str], issuer: Credentials, 523*6236dae4SAndroid Build Coastguard Worker key_type: Any, 524*6236dae4SAndroid Build Coastguard Worker valid_from: timedelta = timedelta(days=-1), 525*6236dae4SAndroid Build Coastguard Worker valid_to: timedelta = timedelta(days=89), 526*6236dae4SAndroid Build Coastguard Worker ) -> Credentials: 527*6236dae4SAndroid Build Coastguard Worker pkey = _private_key(key_type=key_type) 528*6236dae4SAndroid Build Coastguard Worker subject = TestCA._make_x509_name(common_name=name, parent=issuer.subject) 529*6236dae4SAndroid Build Coastguard Worker csr = TestCA._make_csr(subject=subject, 530*6236dae4SAndroid Build Coastguard Worker issuer_subject=issuer.certificate.subject, pkey=pkey, 531*6236dae4SAndroid Build Coastguard Worker valid_from_delta=valid_from, valid_until_delta=valid_to) 532*6236dae4SAndroid Build Coastguard Worker csr = TestCA._add_leaf_usages(csr, domains=domains, issuer=issuer) 533*6236dae4SAndroid Build Coastguard Worker cert = csr.sign(private_key=issuer.private_key, 534*6236dae4SAndroid Build Coastguard Worker algorithm=hashes.SHA256(), 535*6236dae4SAndroid Build Coastguard Worker backend=default_backend()) 536*6236dae4SAndroid Build Coastguard Worker return Credentials(name=name, cert=cert, pkey=pkey, issuer=issuer) 537*6236dae4SAndroid Build Coastguard Worker 538*6236dae4SAndroid Build Coastguard Worker @staticmethod 539*6236dae4SAndroid Build Coastguard Worker def _make_client_credentials(name: str, 540*6236dae4SAndroid Build Coastguard Worker issuer: Credentials, email: Optional[str], 541*6236dae4SAndroid Build Coastguard Worker key_type: Any, 542*6236dae4SAndroid Build Coastguard Worker valid_from: timedelta = timedelta(days=-1), 543*6236dae4SAndroid Build Coastguard Worker valid_to: timedelta = timedelta(days=89), 544*6236dae4SAndroid Build Coastguard Worker ) -> Credentials: 545*6236dae4SAndroid Build Coastguard Worker pkey = _private_key(key_type=key_type) 546*6236dae4SAndroid Build Coastguard Worker subject = TestCA._make_x509_name(common_name=name, parent=issuer.subject) 547*6236dae4SAndroid Build Coastguard Worker csr = TestCA._make_csr(subject=subject, 548*6236dae4SAndroid Build Coastguard Worker issuer_subject=issuer.certificate.subject, pkey=pkey, 549*6236dae4SAndroid Build Coastguard Worker valid_from_delta=valid_from, valid_until_delta=valid_to) 550*6236dae4SAndroid Build Coastguard Worker csr = TestCA._add_client_usages(csr, issuer=issuer, rfc82name=email) 551*6236dae4SAndroid Build Coastguard Worker cert = csr.sign(private_key=issuer.private_key, 552*6236dae4SAndroid Build Coastguard Worker algorithm=hashes.SHA256(), 553*6236dae4SAndroid Build Coastguard Worker backend=default_backend()) 554*6236dae4SAndroid Build Coastguard Worker return Credentials(name=name, cert=cert, pkey=pkey, issuer=issuer) 555