xref: /aosp_15_r20/external/curl/tests/http/testenv/certs.py (revision 6236dae45794135f37c4eb022389c904c8b0090d)
1*6236dae4SAndroid Build Coastguard Worker#!/usr/bin/env python3
2*6236dae4SAndroid Build Coastguard Worker# -*- coding: utf-8 -*-
3*6236dae4SAndroid Build Coastguard Worker#***************************************************************************
4*6236dae4SAndroid Build Coastguard Worker#                                  _   _ ____  _
5*6236dae4SAndroid Build Coastguard Worker#  Project                     ___| | | |  _ \| |
6*6236dae4SAndroid Build Coastguard Worker#                             / __| | | | |_) | |
7*6236dae4SAndroid Build Coastguard Worker#                            | (__| |_| |  _ <| |___
8*6236dae4SAndroid Build Coastguard Worker#                             \___|\___/|_| \_\_____|
9*6236dae4SAndroid Build Coastguard Worker#
10*6236dae4SAndroid Build Coastguard Worker# Copyright (C) Daniel Stenberg, <[email protected]>, et al.
11*6236dae4SAndroid Build Coastguard Worker#
12*6236dae4SAndroid Build Coastguard Worker# This software is licensed as described in the file COPYING, which
13*6236dae4SAndroid Build Coastguard Worker# you should have received as part of this distribution. The terms
14*6236dae4SAndroid Build Coastguard Worker# are also available at https://curl.se/docs/copyright.html.
15*6236dae4SAndroid Build Coastguard Worker#
16*6236dae4SAndroid Build Coastguard Worker# You may opt to use, copy, modify, merge, publish, distribute and/or sell
17*6236dae4SAndroid Build Coastguard Worker# copies of the Software, and permit persons to whom the Software is
18*6236dae4SAndroid Build Coastguard Worker# furnished to do so, under the terms of the COPYING file.
19*6236dae4SAndroid Build Coastguard Worker#
20*6236dae4SAndroid Build Coastguard Worker# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
21*6236dae4SAndroid Build Coastguard Worker# KIND, either express or implied.
22*6236dae4SAndroid Build Coastguard Worker#
23*6236dae4SAndroid Build Coastguard Worker# SPDX-License-Identifier: curl
24*6236dae4SAndroid Build Coastguard Worker#
25*6236dae4SAndroid Build Coastguard Worker###########################################################################
26*6236dae4SAndroid Build Coastguard Worker#
27*6236dae4SAndroid Build Coastguard Workerimport ipaddress
28*6236dae4SAndroid Build Coastguard Workerimport os
29*6236dae4SAndroid Build Coastguard Workerimport re
30*6236dae4SAndroid Build Coastguard Workerfrom datetime import timedelta, datetime, timezone
31*6236dae4SAndroid Build Coastguard Workerfrom typing import List, Any, Optional
32*6236dae4SAndroid Build Coastguard Worker
33*6236dae4SAndroid Build Coastguard Workerfrom cryptography import x509
34*6236dae4SAndroid Build Coastguard Workerfrom cryptography.hazmat.backends import default_backend
35*6236dae4SAndroid Build Coastguard Workerfrom cryptography.hazmat.primitives import hashes
36*6236dae4SAndroid Build Coastguard Workerfrom cryptography.hazmat.primitives.asymmetric import ec, rsa
37*6236dae4SAndroid Build Coastguard Workerfrom cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey
38*6236dae4SAndroid Build Coastguard Workerfrom cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
39*6236dae4SAndroid Build Coastguard Workerfrom cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption, load_pem_private_key
40*6236dae4SAndroid Build Coastguard Workerfrom cryptography.x509 import ExtendedKeyUsageOID, NameOID
41*6236dae4SAndroid Build Coastguard Worker
42*6236dae4SAndroid Build Coastguard Worker
43*6236dae4SAndroid Build Coastguard WorkerEC_SUPPORTED = {}
44*6236dae4SAndroid Build Coastguard WorkerEC_SUPPORTED.update([(curve.name.upper(), curve) for curve in [
45*6236dae4SAndroid Build Coastguard Worker    ec.SECP192R1,
46*6236dae4SAndroid Build Coastguard Worker    ec.SECP224R1,
47*6236dae4SAndroid Build Coastguard Worker    ec.SECP256R1,
48*6236dae4SAndroid Build Coastguard Worker    ec.SECP384R1,
49*6236dae4SAndroid Build Coastguard Worker]])
50*6236dae4SAndroid Build Coastguard Worker
51*6236dae4SAndroid Build Coastguard Worker
52*6236dae4SAndroid Build Coastguard Workerdef _private_key(key_type):
53*6236dae4SAndroid Build Coastguard Worker    if isinstance(key_type, str):
54*6236dae4SAndroid Build Coastguard Worker        key_type = key_type.upper()
55*6236dae4SAndroid Build Coastguard Worker        m = re.match(r'^(RSA)?(\d+)$', key_type)
56*6236dae4SAndroid Build Coastguard Worker        if m:
57*6236dae4SAndroid Build Coastguard Worker            key_type = int(m.group(2))
58*6236dae4SAndroid Build Coastguard Worker
59*6236dae4SAndroid Build Coastguard Worker    if isinstance(key_type, int):
60*6236dae4SAndroid Build Coastguard Worker        return rsa.generate_private_key(
61*6236dae4SAndroid Build Coastguard Worker            public_exponent=65537,
62*6236dae4SAndroid Build Coastguard Worker            key_size=key_type,
63*6236dae4SAndroid Build Coastguard Worker            backend=default_backend()
64*6236dae4SAndroid Build Coastguard Worker        )
65*6236dae4SAndroid Build Coastguard Worker    if not isinstance(key_type, ec.EllipticCurve) and key_type in EC_SUPPORTED:
66*6236dae4SAndroid Build Coastguard Worker        key_type = EC_SUPPORTED[key_type]
67*6236dae4SAndroid Build Coastguard Worker    return ec.generate_private_key(
68*6236dae4SAndroid Build Coastguard Worker        curve=key_type,
69*6236dae4SAndroid Build Coastguard Worker        backend=default_backend()
70*6236dae4SAndroid Build Coastguard Worker    )
71*6236dae4SAndroid Build Coastguard Worker
72*6236dae4SAndroid Build Coastguard Worker
73*6236dae4SAndroid Build Coastguard Workerclass CertificateSpec:
74*6236dae4SAndroid Build Coastguard Worker
75*6236dae4SAndroid Build Coastguard Worker    def __init__(self, name: Optional[str] = None,
76*6236dae4SAndroid Build Coastguard Worker                 domains: Optional[List[str]] = None,
77*6236dae4SAndroid Build Coastguard Worker                 email: Optional[str] = None,
78*6236dae4SAndroid Build Coastguard Worker                 key_type: Optional[str] = None,
79*6236dae4SAndroid Build Coastguard Worker                 single_file: bool = False,
80*6236dae4SAndroid Build Coastguard Worker                 valid_from: timedelta = timedelta(days=-1),
81*6236dae4SAndroid Build Coastguard Worker                 valid_to: timedelta = timedelta(days=89),
82*6236dae4SAndroid Build Coastguard Worker                 client: bool = False,
83*6236dae4SAndroid Build Coastguard Worker                 check_valid: bool = True,
84*6236dae4SAndroid Build Coastguard Worker                 sub_specs: Optional[List['CertificateSpec']] = None):
85*6236dae4SAndroid Build Coastguard Worker        self._name = name
86*6236dae4SAndroid Build Coastguard Worker        self.domains = domains
87*6236dae4SAndroid Build Coastguard Worker        self.client = client
88*6236dae4SAndroid Build Coastguard Worker        self.email = email
89*6236dae4SAndroid Build Coastguard Worker        self.key_type = key_type
90*6236dae4SAndroid Build Coastguard Worker        self.single_file = single_file
91*6236dae4SAndroid Build Coastguard Worker        self.valid_from = valid_from
92*6236dae4SAndroid Build Coastguard Worker        self.valid_to = valid_to
93*6236dae4SAndroid Build Coastguard Worker        self.sub_specs = sub_specs
94*6236dae4SAndroid Build Coastguard Worker        self.check_valid = check_valid
95*6236dae4SAndroid Build Coastguard Worker
96*6236dae4SAndroid Build Coastguard Worker    @property
97*6236dae4SAndroid Build Coastguard Worker    def name(self) -> Optional[str]:
98*6236dae4SAndroid Build Coastguard Worker        if self._name:
99*6236dae4SAndroid Build Coastguard Worker            return self._name
100*6236dae4SAndroid Build Coastguard Worker        elif self.domains:
101*6236dae4SAndroid Build Coastguard Worker            return self.domains[0]
102*6236dae4SAndroid Build Coastguard Worker        return None
103*6236dae4SAndroid Build Coastguard Worker
104*6236dae4SAndroid Build Coastguard Worker    @property
105*6236dae4SAndroid Build Coastguard Worker    def type(self) -> Optional[str]:
106*6236dae4SAndroid Build Coastguard Worker        if self.domains and len(self.domains):
107*6236dae4SAndroid Build Coastguard Worker            return "server"
108*6236dae4SAndroid Build Coastguard Worker        elif self.client:
109*6236dae4SAndroid Build Coastguard Worker            return "client"
110*6236dae4SAndroid Build Coastguard Worker        elif self.name:
111*6236dae4SAndroid Build Coastguard Worker            return "ca"
112*6236dae4SAndroid Build Coastguard Worker        return None
113*6236dae4SAndroid Build Coastguard Worker
114*6236dae4SAndroid Build Coastguard Worker
115*6236dae4SAndroid Build Coastguard Workerclass Credentials:
116*6236dae4SAndroid Build Coastguard Worker
117*6236dae4SAndroid Build Coastguard Worker    def __init__(self,
118*6236dae4SAndroid Build Coastguard Worker                 name: str,
119*6236dae4SAndroid Build Coastguard Worker                 cert: Any,
120*6236dae4SAndroid Build Coastguard Worker                 pkey: Any,
121*6236dae4SAndroid Build Coastguard Worker                 issuer: Optional['Credentials'] = None):
122*6236dae4SAndroid Build Coastguard Worker        self._name = name
123*6236dae4SAndroid Build Coastguard Worker        self._cert = cert
124*6236dae4SAndroid Build Coastguard Worker        self._pkey = pkey
125*6236dae4SAndroid Build Coastguard Worker        self._issuer = issuer
126*6236dae4SAndroid Build Coastguard Worker        self._cert_file = None
127*6236dae4SAndroid Build Coastguard Worker        self._pkey_file = None
128*6236dae4SAndroid Build Coastguard Worker        self._store = None
129*6236dae4SAndroid Build Coastguard Worker        self._combined_file = None
130*6236dae4SAndroid Build Coastguard Worker
131*6236dae4SAndroid Build Coastguard Worker    @property
132*6236dae4SAndroid Build Coastguard Worker    def name(self) -> str:
133*6236dae4SAndroid Build Coastguard Worker        return self._name
134*6236dae4SAndroid Build Coastguard Worker
135*6236dae4SAndroid Build Coastguard Worker    @property
136*6236dae4SAndroid Build Coastguard Worker    def subject(self) -> x509.Name:
137*6236dae4SAndroid Build Coastguard Worker        return self._cert.subject
138*6236dae4SAndroid Build Coastguard Worker
139*6236dae4SAndroid Build Coastguard Worker    @property
140*6236dae4SAndroid Build Coastguard Worker    def key_type(self):
141*6236dae4SAndroid Build Coastguard Worker        if isinstance(self._pkey, RSAPrivateKey):
142*6236dae4SAndroid Build Coastguard Worker            return f"rsa{self._pkey.key_size}"
143*6236dae4SAndroid Build Coastguard Worker        elif isinstance(self._pkey, EllipticCurvePrivateKey):
144*6236dae4SAndroid Build Coastguard Worker            return f"{self._pkey.curve.name}"
145*6236dae4SAndroid Build Coastguard Worker        else:
146*6236dae4SAndroid Build Coastguard Worker            raise Exception(f"unknown key type: {self._pkey}")
147*6236dae4SAndroid Build Coastguard Worker
148*6236dae4SAndroid Build Coastguard Worker    @property
149*6236dae4SAndroid Build Coastguard Worker    def private_key(self) -> Any:
150*6236dae4SAndroid Build Coastguard Worker        return self._pkey
151*6236dae4SAndroid Build Coastguard Worker
152*6236dae4SAndroid Build Coastguard Worker    @property
153*6236dae4SAndroid Build Coastguard Worker    def certificate(self) -> Any:
154*6236dae4SAndroid Build Coastguard Worker        return self._cert
155*6236dae4SAndroid Build Coastguard Worker
156*6236dae4SAndroid Build Coastguard Worker    @property
157*6236dae4SAndroid Build Coastguard Worker    def cert_pem(self) -> bytes:
158*6236dae4SAndroid Build Coastguard Worker        return self._cert.public_bytes(Encoding.PEM)
159*6236dae4SAndroid Build Coastguard Worker
160*6236dae4SAndroid Build Coastguard Worker    @property
161*6236dae4SAndroid Build Coastguard Worker    def pkey_pem(self) -> bytes:
162*6236dae4SAndroid Build Coastguard Worker        return self._pkey.private_bytes(
163*6236dae4SAndroid Build Coastguard Worker            Encoding.PEM,
164*6236dae4SAndroid Build Coastguard Worker            PrivateFormat.TraditionalOpenSSL if self.key_type.startswith('rsa') else PrivateFormat.PKCS8,
165*6236dae4SAndroid Build Coastguard Worker            NoEncryption())
166*6236dae4SAndroid Build Coastguard Worker
167*6236dae4SAndroid Build Coastguard Worker    @property
168*6236dae4SAndroid Build Coastguard Worker    def issuer(self) -> Optional['Credentials']:
169*6236dae4SAndroid Build Coastguard Worker        return self._issuer
170*6236dae4SAndroid Build Coastguard Worker
171*6236dae4SAndroid Build Coastguard Worker    def set_store(self, store: 'CertStore'):
172*6236dae4SAndroid Build Coastguard Worker        self._store = store
173*6236dae4SAndroid Build Coastguard Worker
174*6236dae4SAndroid Build Coastguard Worker    def set_files(self, cert_file: str, pkey_file: Optional[str] = None,
175*6236dae4SAndroid Build Coastguard Worker                  combined_file: Optional[str] = None):
176*6236dae4SAndroid Build Coastguard Worker        self._cert_file = cert_file
177*6236dae4SAndroid Build Coastguard Worker        self._pkey_file = pkey_file
178*6236dae4SAndroid Build Coastguard Worker        self._combined_file = combined_file
179*6236dae4SAndroid Build Coastguard Worker
180*6236dae4SAndroid Build Coastguard Worker    @property
181*6236dae4SAndroid Build Coastguard Worker    def cert_file(self) -> str:
182*6236dae4SAndroid Build Coastguard Worker        return self._cert_file
183*6236dae4SAndroid Build Coastguard Worker
184*6236dae4SAndroid Build Coastguard Worker    @property
185*6236dae4SAndroid Build Coastguard Worker    def pkey_file(self) -> Optional[str]:
186*6236dae4SAndroid Build Coastguard Worker        return self._pkey_file
187*6236dae4SAndroid Build Coastguard Worker
188*6236dae4SAndroid Build Coastguard Worker    @property
189*6236dae4SAndroid Build Coastguard Worker    def combined_file(self) -> Optional[str]:
190*6236dae4SAndroid Build Coastguard Worker        return self._combined_file
191*6236dae4SAndroid Build Coastguard Worker
192*6236dae4SAndroid Build Coastguard Worker    def get_first(self, name) -> Optional['Credentials']:
193*6236dae4SAndroid Build Coastguard Worker        creds = self._store.get_credentials_for_name(name) if self._store else []
194*6236dae4SAndroid Build Coastguard Worker        return creds[0] if len(creds) else None
195*6236dae4SAndroid Build Coastguard Worker
196*6236dae4SAndroid Build Coastguard Worker    def get_credentials_for_name(self, name) -> List['Credentials']:
197*6236dae4SAndroid Build Coastguard Worker        return self._store.get_credentials_for_name(name) if self._store else []
198*6236dae4SAndroid Build Coastguard Worker
199*6236dae4SAndroid Build Coastguard Worker    def issue_certs(self, specs: List[CertificateSpec],
200*6236dae4SAndroid Build Coastguard Worker                    chain: Optional[List['Credentials']] = None) -> List['Credentials']:
201*6236dae4SAndroid Build Coastguard Worker        return [self.issue_cert(spec=spec, chain=chain) for spec in specs]
202*6236dae4SAndroid Build Coastguard Worker
203*6236dae4SAndroid Build Coastguard Worker    def issue_cert(self, spec: CertificateSpec,
204*6236dae4SAndroid Build Coastguard Worker                   chain: Optional[List['Credentials']] = None) -> 'Credentials':
205*6236dae4SAndroid Build Coastguard Worker        key_type = spec.key_type if spec.key_type else self.key_type
206*6236dae4SAndroid Build Coastguard Worker        creds = None
207*6236dae4SAndroid Build Coastguard Worker        if self._store:
208*6236dae4SAndroid Build Coastguard Worker            creds = self._store.load_credentials(
209*6236dae4SAndroid Build Coastguard Worker                name=spec.name, key_type=key_type, single_file=spec.single_file,
210*6236dae4SAndroid Build Coastguard Worker                issuer=self, check_valid=spec.check_valid)
211*6236dae4SAndroid Build Coastguard Worker        if creds is None:
212*6236dae4SAndroid Build Coastguard Worker            creds = TestCA.create_credentials(spec=spec, issuer=self, key_type=key_type,
213*6236dae4SAndroid Build Coastguard Worker                                              valid_from=spec.valid_from, valid_to=spec.valid_to)
214*6236dae4SAndroid Build Coastguard Worker            if self._store:
215*6236dae4SAndroid Build Coastguard Worker                self._store.save(creds, single_file=spec.single_file)
216*6236dae4SAndroid Build Coastguard Worker                if spec.type == "ca":
217*6236dae4SAndroid Build Coastguard Worker                    self._store.save_chain(creds, "ca", with_root=True)
218*6236dae4SAndroid Build Coastguard Worker
219*6236dae4SAndroid Build Coastguard Worker        if spec.sub_specs:
220*6236dae4SAndroid Build Coastguard Worker            if self._store:
221*6236dae4SAndroid Build Coastguard Worker                sub_store = CertStore(fpath=os.path.join(self._store.path, creds.name))
222*6236dae4SAndroid Build Coastguard Worker                creds.set_store(sub_store)
223*6236dae4SAndroid Build Coastguard Worker            subchain = chain.copy() if chain else []
224*6236dae4SAndroid Build Coastguard Worker            subchain.append(self)
225*6236dae4SAndroid Build Coastguard Worker            creds.issue_certs(spec.sub_specs, chain=subchain)
226*6236dae4SAndroid Build Coastguard Worker        return creds
227*6236dae4SAndroid Build Coastguard Worker
228*6236dae4SAndroid Build Coastguard Worker
229*6236dae4SAndroid Build Coastguard Workerclass CertStore:
230*6236dae4SAndroid Build Coastguard Worker
231*6236dae4SAndroid Build Coastguard Worker    def __init__(self, fpath: str):
232*6236dae4SAndroid Build Coastguard Worker        self._store_dir = fpath
233*6236dae4SAndroid Build Coastguard Worker        if not os.path.exists(self._store_dir):
234*6236dae4SAndroid Build Coastguard Worker            os.makedirs(self._store_dir)
235*6236dae4SAndroid Build Coastguard Worker        self._creds_by_name = {}
236*6236dae4SAndroid Build Coastguard Worker
237*6236dae4SAndroid Build Coastguard Worker    @property
238*6236dae4SAndroid Build Coastguard Worker    def path(self) -> str:
239*6236dae4SAndroid Build Coastguard Worker        return self._store_dir
240*6236dae4SAndroid Build Coastguard Worker
241*6236dae4SAndroid Build Coastguard Worker    def save(self, creds: Credentials, name: Optional[str] = None,
242*6236dae4SAndroid Build Coastguard Worker             chain: Optional[List[Credentials]] = None,
243*6236dae4SAndroid Build Coastguard Worker             single_file: bool = False) -> None:
244*6236dae4SAndroid Build Coastguard Worker        name = name if name is not None else creds.name
245*6236dae4SAndroid Build Coastguard Worker        cert_file = self.get_cert_file(name=name, key_type=creds.key_type)
246*6236dae4SAndroid Build Coastguard Worker        pkey_file = self.get_pkey_file(name=name, key_type=creds.key_type)
247*6236dae4SAndroid Build Coastguard Worker        comb_file = self.get_combined_file(name=name, key_type=creds.key_type)
248*6236dae4SAndroid Build Coastguard Worker        if single_file:
249*6236dae4SAndroid Build Coastguard Worker            pkey_file = None
250*6236dae4SAndroid Build Coastguard Worker        with open(cert_file, "wb") as fd:
251*6236dae4SAndroid Build Coastguard Worker            fd.write(creds.cert_pem)
252*6236dae4SAndroid Build Coastguard Worker            if chain:
253*6236dae4SAndroid Build Coastguard Worker                for c in chain:
254*6236dae4SAndroid Build Coastguard Worker                    fd.write(c.cert_pem)
255*6236dae4SAndroid Build Coastguard Worker            if pkey_file is None:
256*6236dae4SAndroid Build Coastguard Worker                fd.write(creds.pkey_pem)
257*6236dae4SAndroid Build Coastguard Worker        if pkey_file is not None:
258*6236dae4SAndroid Build Coastguard Worker            with open(pkey_file, "wb") as fd:
259*6236dae4SAndroid Build Coastguard Worker                fd.write(creds.pkey_pem)
260*6236dae4SAndroid Build Coastguard Worker        with open(comb_file, "wb") as fd:
261*6236dae4SAndroid Build Coastguard Worker            fd.write(creds.cert_pem)
262*6236dae4SAndroid Build Coastguard Worker            if chain:
263*6236dae4SAndroid Build Coastguard Worker                for c in chain:
264*6236dae4SAndroid Build Coastguard Worker                    fd.write(c.cert_pem)
265*6236dae4SAndroid Build Coastguard Worker            fd.write(creds.pkey_pem)
266*6236dae4SAndroid Build Coastguard Worker        creds.set_files(cert_file, pkey_file, comb_file)
267*6236dae4SAndroid Build Coastguard Worker        self._add_credentials(name, creds)
268*6236dae4SAndroid Build Coastguard Worker
269*6236dae4SAndroid Build Coastguard Worker    def save_chain(self, creds: Credentials, infix: str, with_root=False):
270*6236dae4SAndroid Build Coastguard Worker        name = creds.name
271*6236dae4SAndroid Build Coastguard Worker        chain = [creds]
272*6236dae4SAndroid Build Coastguard Worker        while creds.issuer is not None:
273*6236dae4SAndroid Build Coastguard Worker            creds = creds.issuer
274*6236dae4SAndroid Build Coastguard Worker            chain.append(creds)
275*6236dae4SAndroid Build Coastguard Worker        if not with_root and len(chain) > 1:
276*6236dae4SAndroid Build Coastguard Worker            chain = chain[:-1]
277*6236dae4SAndroid Build Coastguard Worker        chain_file = os.path.join(self._store_dir, f'{name}-{infix}.pem')
278*6236dae4SAndroid Build Coastguard Worker        with open(chain_file, "wb") as fd:
279*6236dae4SAndroid Build Coastguard Worker            for c in chain:
280*6236dae4SAndroid Build Coastguard Worker                fd.write(c.cert_pem)
281*6236dae4SAndroid Build Coastguard Worker
282*6236dae4SAndroid Build Coastguard Worker    def _add_credentials(self, name: str, creds: Credentials):
283*6236dae4SAndroid Build Coastguard Worker        if name not in self._creds_by_name:
284*6236dae4SAndroid Build Coastguard Worker            self._creds_by_name[name] = []
285*6236dae4SAndroid Build Coastguard Worker        self._creds_by_name[name].append(creds)
286*6236dae4SAndroid Build Coastguard Worker
287*6236dae4SAndroid Build Coastguard Worker    def get_credentials_for_name(self, name) -> List[Credentials]:
288*6236dae4SAndroid Build Coastguard Worker        return self._creds_by_name[name] if name in self._creds_by_name else []
289*6236dae4SAndroid Build Coastguard Worker
290*6236dae4SAndroid Build Coastguard Worker    def get_cert_file(self, name: str, key_type=None) -> str:
291*6236dae4SAndroid Build Coastguard Worker        key_infix = ".{0}".format(key_type) if key_type is not None else ""
292*6236dae4SAndroid Build Coastguard Worker        return os.path.join(self._store_dir, f'{name}{key_infix}.cert.pem')
293*6236dae4SAndroid Build Coastguard Worker
294*6236dae4SAndroid Build Coastguard Worker    def get_pkey_file(self, name: str, key_type=None) -> str:
295*6236dae4SAndroid Build Coastguard Worker        key_infix = ".{0}".format(key_type) if key_type is not None else ""
296*6236dae4SAndroid Build Coastguard Worker        return os.path.join(self._store_dir, f'{name}{key_infix}.pkey.pem')
297*6236dae4SAndroid Build Coastguard Worker
298*6236dae4SAndroid Build Coastguard Worker    def get_combined_file(self, name: str, key_type=None) -> str:
299*6236dae4SAndroid Build Coastguard Worker        return os.path.join(self._store_dir, f'{name}.pem')
300*6236dae4SAndroid Build Coastguard Worker
301*6236dae4SAndroid Build Coastguard Worker    def load_pem_cert(self, fpath: str) -> x509.Certificate:
302*6236dae4SAndroid Build Coastguard Worker        with open(fpath) as fd:
303*6236dae4SAndroid Build Coastguard Worker            return x509.load_pem_x509_certificate("".join(fd.readlines()).encode())
304*6236dae4SAndroid Build Coastguard Worker
305*6236dae4SAndroid Build Coastguard Worker    def load_pem_pkey(self, fpath: str):
306*6236dae4SAndroid Build Coastguard Worker        with open(fpath) as fd:
307*6236dae4SAndroid Build Coastguard Worker            return load_pem_private_key("".join(fd.readlines()).encode(), password=None)
308*6236dae4SAndroid Build Coastguard Worker
309*6236dae4SAndroid Build Coastguard Worker    def load_credentials(self, name: str, key_type=None,
310*6236dae4SAndroid Build Coastguard Worker                         single_file: bool = False,
311*6236dae4SAndroid Build Coastguard Worker                         issuer: Optional[Credentials] = None,
312*6236dae4SAndroid Build Coastguard Worker                         check_valid: bool = False):
313*6236dae4SAndroid Build Coastguard Worker        cert_file = self.get_cert_file(name=name, key_type=key_type)
314*6236dae4SAndroid Build Coastguard Worker        pkey_file = cert_file if single_file else self.get_pkey_file(name=name, key_type=key_type)
315*6236dae4SAndroid Build Coastguard Worker        comb_file = self.get_combined_file(name=name, key_type=key_type)
316*6236dae4SAndroid Build Coastguard Worker        if os.path.isfile(cert_file) and os.path.isfile(pkey_file):
317*6236dae4SAndroid Build Coastguard Worker            cert = self.load_pem_cert(cert_file)
318*6236dae4SAndroid Build Coastguard Worker            pkey = self.load_pem_pkey(pkey_file)
319*6236dae4SAndroid Build Coastguard Worker            try:
320*6236dae4SAndroid Build Coastguard Worker                now = datetime.now(tz=timezone.utc)
321*6236dae4SAndroid Build Coastguard Worker                if check_valid and \
322*6236dae4SAndroid Build Coastguard Worker                    ((cert.not_valid_after_utc < now) or
323*6236dae4SAndroid Build Coastguard Worker                     (cert.not_valid_before_utc > now)):
324*6236dae4SAndroid Build Coastguard Worker                    return None
325*6236dae4SAndroid Build Coastguard Worker            except AttributeError:  # older python
326*6236dae4SAndroid Build Coastguard Worker                now = datetime.now()
327*6236dae4SAndroid Build Coastguard Worker                if check_valid and \
328*6236dae4SAndroid Build Coastguard Worker                        ((cert.not_valid_after < now) or
329*6236dae4SAndroid Build Coastguard Worker                         (cert.not_valid_before > now)):
330*6236dae4SAndroid Build Coastguard Worker                    return None
331*6236dae4SAndroid Build Coastguard Worker            creds = Credentials(name=name, cert=cert, pkey=pkey, issuer=issuer)
332*6236dae4SAndroid Build Coastguard Worker            creds.set_store(self)
333*6236dae4SAndroid Build Coastguard Worker            creds.set_files(cert_file, pkey_file, comb_file)
334*6236dae4SAndroid Build Coastguard Worker            self._add_credentials(name, creds)
335*6236dae4SAndroid Build Coastguard Worker            return creds
336*6236dae4SAndroid Build Coastguard Worker        return None
337*6236dae4SAndroid Build Coastguard Worker
338*6236dae4SAndroid Build Coastguard Worker
339*6236dae4SAndroid Build Coastguard Workerclass TestCA:
340*6236dae4SAndroid Build Coastguard Worker
341*6236dae4SAndroid Build Coastguard Worker    @classmethod
342*6236dae4SAndroid Build Coastguard Worker    def create_root(cls, name: str, store_dir: str, key_type: str = "rsa2048") -> Credentials:
343*6236dae4SAndroid Build Coastguard Worker        store = CertStore(fpath=store_dir)
344*6236dae4SAndroid Build Coastguard Worker        creds = store.load_credentials(name="ca", key_type=key_type, issuer=None)
345*6236dae4SAndroid Build Coastguard Worker        if creds is None:
346*6236dae4SAndroid Build Coastguard Worker            creds = TestCA._make_ca_credentials(name=name, key_type=key_type)
347*6236dae4SAndroid Build Coastguard Worker            store.save(creds, name="ca")
348*6236dae4SAndroid Build Coastguard Worker            creds.set_store(store)
349*6236dae4SAndroid Build Coastguard Worker        return creds
350*6236dae4SAndroid Build Coastguard Worker
351*6236dae4SAndroid Build Coastguard Worker    @staticmethod
352*6236dae4SAndroid Build Coastguard Worker    def create_credentials(spec: CertificateSpec, issuer: Credentials, key_type: Any,
353*6236dae4SAndroid Build Coastguard Worker                           valid_from: timedelta = timedelta(days=-1),
354*6236dae4SAndroid Build Coastguard Worker                           valid_to: timedelta = timedelta(days=89),
355*6236dae4SAndroid Build Coastguard Worker                           ) -> Credentials:
356*6236dae4SAndroid Build Coastguard Worker        """
357*6236dae4SAndroid Build Coastguard Worker        Create a certificate signed by this CA for the given domains.
358*6236dae4SAndroid Build Coastguard Worker
359*6236dae4SAndroid Build Coastguard Worker        :returns: the certificate and private key PEM file paths
360*6236dae4SAndroid Build Coastguard Worker        """
361*6236dae4SAndroid Build Coastguard Worker        if spec.domains and len(spec.domains):
362*6236dae4SAndroid Build Coastguard Worker            creds = TestCA._make_server_credentials(name=spec.name, domains=spec.domains,
363*6236dae4SAndroid Build Coastguard Worker                                                    issuer=issuer, valid_from=valid_from,
364*6236dae4SAndroid Build Coastguard Worker                                                    valid_to=valid_to, key_type=key_type)
365*6236dae4SAndroid Build Coastguard Worker        elif spec.client:
366*6236dae4SAndroid Build Coastguard Worker            creds = TestCA._make_client_credentials(name=spec.name, issuer=issuer,
367*6236dae4SAndroid Build Coastguard Worker                                                    email=spec.email, valid_from=valid_from,
368*6236dae4SAndroid Build Coastguard Worker                                                    valid_to=valid_to, key_type=key_type)
369*6236dae4SAndroid Build Coastguard Worker        elif spec.name:
370*6236dae4SAndroid Build Coastguard Worker            creds = TestCA._make_ca_credentials(name=spec.name, issuer=issuer,
371*6236dae4SAndroid Build Coastguard Worker                                                valid_from=valid_from, valid_to=valid_to,
372*6236dae4SAndroid Build Coastguard Worker                                                key_type=key_type)
373*6236dae4SAndroid Build Coastguard Worker        else:
374*6236dae4SAndroid Build Coastguard Worker            raise Exception(f"unrecognized certificate specification: {spec}")
375*6236dae4SAndroid Build Coastguard Worker        return creds
376*6236dae4SAndroid Build Coastguard Worker
377*6236dae4SAndroid Build Coastguard Worker    @staticmethod
378*6236dae4SAndroid Build Coastguard Worker    def _make_x509_name(org_name: Optional[str] = None, common_name: Optional[str] = None, parent: x509.Name = None) -> x509.Name:
379*6236dae4SAndroid Build Coastguard Worker        name_pieces = []
380*6236dae4SAndroid Build Coastguard Worker        if org_name:
381*6236dae4SAndroid Build Coastguard Worker            oid = NameOID.ORGANIZATIONAL_UNIT_NAME if parent else NameOID.ORGANIZATION_NAME
382*6236dae4SAndroid Build Coastguard Worker            name_pieces.append(x509.NameAttribute(oid, org_name))
383*6236dae4SAndroid Build Coastguard Worker        elif common_name:
384*6236dae4SAndroid Build Coastguard Worker            name_pieces.append(x509.NameAttribute(NameOID.COMMON_NAME, common_name))
385*6236dae4SAndroid Build Coastguard Worker        if parent:
386*6236dae4SAndroid Build Coastguard Worker            name_pieces.extend(list(parent))
387*6236dae4SAndroid Build Coastguard Worker        return x509.Name(name_pieces)
388*6236dae4SAndroid Build Coastguard Worker
389*6236dae4SAndroid Build Coastguard Worker    @staticmethod
390*6236dae4SAndroid Build Coastguard Worker    def _make_csr(
391*6236dae4SAndroid Build Coastguard Worker            subject: x509.Name,
392*6236dae4SAndroid Build Coastguard Worker            pkey: Any,
393*6236dae4SAndroid Build Coastguard Worker            issuer_subject: Optional[Credentials],
394*6236dae4SAndroid Build Coastguard Worker            valid_from_delta: Optional[timedelta] = None,
395*6236dae4SAndroid Build Coastguard Worker            valid_until_delta: Optional[timedelta] = None
396*6236dae4SAndroid Build Coastguard Worker    ):
397*6236dae4SAndroid Build Coastguard Worker        pubkey = pkey.public_key()
398*6236dae4SAndroid Build Coastguard Worker        issuer_subject = issuer_subject if issuer_subject is not None else subject
399*6236dae4SAndroid Build Coastguard Worker
400*6236dae4SAndroid Build Coastguard Worker        valid_from = datetime.now()
401*6236dae4SAndroid Build Coastguard Worker        if valid_until_delta is not None:
402*6236dae4SAndroid Build Coastguard Worker            valid_from += valid_from_delta
403*6236dae4SAndroid Build Coastguard Worker        valid_until = datetime.now()
404*6236dae4SAndroid Build Coastguard Worker        if valid_until_delta is not None:
405*6236dae4SAndroid Build Coastguard Worker            valid_until += valid_until_delta
406*6236dae4SAndroid Build Coastguard Worker
407*6236dae4SAndroid Build Coastguard Worker        return (
408*6236dae4SAndroid Build Coastguard Worker            x509.CertificateBuilder()
409*6236dae4SAndroid Build Coastguard Worker            .subject_name(subject)
410*6236dae4SAndroid Build Coastguard Worker            .issuer_name(issuer_subject)
411*6236dae4SAndroid Build Coastguard Worker            .public_key(pubkey)
412*6236dae4SAndroid Build Coastguard Worker            .not_valid_before(valid_from)
413*6236dae4SAndroid Build Coastguard Worker            .not_valid_after(valid_until)
414*6236dae4SAndroid Build Coastguard Worker            .serial_number(x509.random_serial_number())
415*6236dae4SAndroid Build Coastguard Worker            .add_extension(
416*6236dae4SAndroid Build Coastguard Worker                x509.SubjectKeyIdentifier.from_public_key(pubkey),
417*6236dae4SAndroid Build Coastguard Worker                critical=False,
418*6236dae4SAndroid Build Coastguard Worker            )
419*6236dae4SAndroid Build Coastguard Worker        )
420*6236dae4SAndroid Build Coastguard Worker
421*6236dae4SAndroid Build Coastguard Worker    @staticmethod
422*6236dae4SAndroid Build Coastguard Worker    def _add_ca_usages(csr: Any) -> Any:
423*6236dae4SAndroid Build Coastguard Worker        return csr.add_extension(
424*6236dae4SAndroid Build Coastguard Worker            x509.BasicConstraints(ca=True, path_length=9),
425*6236dae4SAndroid Build Coastguard Worker            critical=True,
426*6236dae4SAndroid Build Coastguard Worker        ).add_extension(
427*6236dae4SAndroid Build Coastguard Worker            x509.KeyUsage(
428*6236dae4SAndroid Build Coastguard Worker                digital_signature=True,
429*6236dae4SAndroid Build Coastguard Worker                content_commitment=False,
430*6236dae4SAndroid Build Coastguard Worker                key_encipherment=False,
431*6236dae4SAndroid Build Coastguard Worker                data_encipherment=False,
432*6236dae4SAndroid Build Coastguard Worker                key_agreement=False,
433*6236dae4SAndroid Build Coastguard Worker                key_cert_sign=True,
434*6236dae4SAndroid Build Coastguard Worker                crl_sign=True,
435*6236dae4SAndroid Build Coastguard Worker                encipher_only=False,
436*6236dae4SAndroid Build Coastguard Worker                decipher_only=False),
437*6236dae4SAndroid Build Coastguard Worker            critical=True
438*6236dae4SAndroid Build Coastguard Worker        ).add_extension(
439*6236dae4SAndroid Build Coastguard Worker            x509.ExtendedKeyUsage([
440*6236dae4SAndroid Build Coastguard Worker                ExtendedKeyUsageOID.CLIENT_AUTH,
441*6236dae4SAndroid Build Coastguard Worker                ExtendedKeyUsageOID.SERVER_AUTH,
442*6236dae4SAndroid Build Coastguard Worker                ExtendedKeyUsageOID.CODE_SIGNING,
443*6236dae4SAndroid Build Coastguard Worker            ]),
444*6236dae4SAndroid Build Coastguard Worker            critical=True
445*6236dae4SAndroid Build Coastguard Worker        )
446*6236dae4SAndroid Build Coastguard Worker
447*6236dae4SAndroid Build Coastguard Worker    @staticmethod
448*6236dae4SAndroid Build Coastguard Worker    def _add_leaf_usages(csr: Any, domains: List[str], issuer: Credentials) -> Any:
449*6236dae4SAndroid Build Coastguard Worker        names = []
450*6236dae4SAndroid Build Coastguard Worker        for name in domains:
451*6236dae4SAndroid Build Coastguard Worker            try:
452*6236dae4SAndroid Build Coastguard Worker                names.append(x509.IPAddress(ipaddress.ip_address(name)))
453*6236dae4SAndroid Build Coastguard Worker            # TODO: specify specific exceptions here
454*6236dae4SAndroid Build Coastguard Worker            except:  # noqa: E722
455*6236dae4SAndroid Build Coastguard Worker                names.append(x509.DNSName(name))
456*6236dae4SAndroid Build Coastguard Worker
457*6236dae4SAndroid Build Coastguard Worker        return csr.add_extension(
458*6236dae4SAndroid Build Coastguard Worker            x509.BasicConstraints(ca=False, path_length=None),
459*6236dae4SAndroid Build Coastguard Worker            critical=True,
460*6236dae4SAndroid Build Coastguard Worker        ).add_extension(
461*6236dae4SAndroid Build Coastguard Worker            x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(
462*6236dae4SAndroid Build Coastguard Worker                issuer.certificate.extensions.get_extension_for_class(
463*6236dae4SAndroid Build Coastguard Worker                    x509.SubjectKeyIdentifier).value),
464*6236dae4SAndroid Build Coastguard Worker            critical=False
465*6236dae4SAndroid Build Coastguard Worker        ).add_extension(
466*6236dae4SAndroid Build Coastguard Worker            x509.SubjectAlternativeName(names), critical=True,
467*6236dae4SAndroid Build Coastguard Worker        ).add_extension(
468*6236dae4SAndroid Build Coastguard Worker            x509.ExtendedKeyUsage([
469*6236dae4SAndroid Build Coastguard Worker                ExtendedKeyUsageOID.SERVER_AUTH,
470*6236dae4SAndroid Build Coastguard Worker            ]),
471*6236dae4SAndroid Build Coastguard Worker            critical=False
472*6236dae4SAndroid Build Coastguard Worker        )
473*6236dae4SAndroid Build Coastguard Worker
474*6236dae4SAndroid Build Coastguard Worker    @staticmethod
475*6236dae4SAndroid Build Coastguard Worker    def _add_client_usages(csr: Any, issuer: Credentials, rfc82name: Optional[str] = None) -> Any:
476*6236dae4SAndroid Build Coastguard Worker        cert = csr.add_extension(
477*6236dae4SAndroid Build Coastguard Worker            x509.BasicConstraints(ca=False, path_length=None),
478*6236dae4SAndroid Build Coastguard Worker            critical=True,
479*6236dae4SAndroid Build Coastguard Worker        ).add_extension(
480*6236dae4SAndroid Build Coastguard Worker            x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(
481*6236dae4SAndroid Build Coastguard Worker                issuer.certificate.extensions.get_extension_for_class(
482*6236dae4SAndroid Build Coastguard Worker                    x509.SubjectKeyIdentifier).value),
483*6236dae4SAndroid Build Coastguard Worker            critical=False
484*6236dae4SAndroid Build Coastguard Worker        )
485*6236dae4SAndroid Build Coastguard Worker        if rfc82name:
486*6236dae4SAndroid Build Coastguard Worker            cert.add_extension(
487*6236dae4SAndroid Build Coastguard Worker                x509.SubjectAlternativeName([x509.RFC822Name(rfc82name)]),
488*6236dae4SAndroid Build Coastguard Worker                critical=True,
489*6236dae4SAndroid Build Coastguard Worker            )
490*6236dae4SAndroid Build Coastguard Worker        cert.add_extension(
491*6236dae4SAndroid Build Coastguard Worker            x509.ExtendedKeyUsage([
492*6236dae4SAndroid Build Coastguard Worker                ExtendedKeyUsageOID.CLIENT_AUTH,
493*6236dae4SAndroid Build Coastguard Worker            ]),
494*6236dae4SAndroid Build Coastguard Worker            critical=True
495*6236dae4SAndroid Build Coastguard Worker        )
496*6236dae4SAndroid Build Coastguard Worker        return cert
497*6236dae4SAndroid Build Coastguard Worker
498*6236dae4SAndroid Build Coastguard Worker    @staticmethod
499*6236dae4SAndroid Build Coastguard Worker    def _make_ca_credentials(name, key_type: Any,
500*6236dae4SAndroid Build Coastguard Worker                             issuer: Optional[Credentials] = None,
501*6236dae4SAndroid Build Coastguard Worker                             valid_from: timedelta = timedelta(days=-1),
502*6236dae4SAndroid Build Coastguard Worker                             valid_to: timedelta = timedelta(days=89),
503*6236dae4SAndroid Build Coastguard Worker                             ) -> Credentials:
504*6236dae4SAndroid Build Coastguard Worker        pkey = _private_key(key_type=key_type)
505*6236dae4SAndroid Build Coastguard Worker        if issuer is not None:
506*6236dae4SAndroid Build Coastguard Worker            issuer_subject = issuer.certificate.subject
507*6236dae4SAndroid Build Coastguard Worker            issuer_key = issuer.private_key
508*6236dae4SAndroid Build Coastguard Worker        else:
509*6236dae4SAndroid Build Coastguard Worker            issuer_subject = None
510*6236dae4SAndroid Build Coastguard Worker            issuer_key = pkey
511*6236dae4SAndroid Build Coastguard Worker        subject = TestCA._make_x509_name(org_name=name, parent=issuer.subject if issuer else None)
512*6236dae4SAndroid Build Coastguard Worker        csr = TestCA._make_csr(subject=subject,
513*6236dae4SAndroid Build Coastguard Worker                               issuer_subject=issuer_subject, pkey=pkey,
514*6236dae4SAndroid Build Coastguard Worker                               valid_from_delta=valid_from, valid_until_delta=valid_to)
515*6236dae4SAndroid Build Coastguard Worker        csr = TestCA._add_ca_usages(csr)
516*6236dae4SAndroid Build Coastguard Worker        cert = csr.sign(private_key=issuer_key,
517*6236dae4SAndroid Build Coastguard Worker                        algorithm=hashes.SHA256(),
518*6236dae4SAndroid Build Coastguard Worker                        backend=default_backend())
519*6236dae4SAndroid Build Coastguard Worker        return Credentials(name=name, cert=cert, pkey=pkey, issuer=issuer)
520*6236dae4SAndroid Build Coastguard Worker
521*6236dae4SAndroid Build Coastguard Worker    @staticmethod
522*6236dae4SAndroid Build Coastguard Worker    def _make_server_credentials(name: str, domains: List[str], issuer: Credentials,
523*6236dae4SAndroid Build Coastguard Worker                                 key_type: Any,
524*6236dae4SAndroid Build Coastguard Worker                                 valid_from: timedelta = timedelta(days=-1),
525*6236dae4SAndroid Build Coastguard Worker                                 valid_to: timedelta = timedelta(days=89),
526*6236dae4SAndroid Build Coastguard Worker                                 ) -> Credentials:
527*6236dae4SAndroid Build Coastguard Worker        pkey = _private_key(key_type=key_type)
528*6236dae4SAndroid Build Coastguard Worker        subject = TestCA._make_x509_name(common_name=name, parent=issuer.subject)
529*6236dae4SAndroid Build Coastguard Worker        csr = TestCA._make_csr(subject=subject,
530*6236dae4SAndroid Build Coastguard Worker                               issuer_subject=issuer.certificate.subject, pkey=pkey,
531*6236dae4SAndroid Build Coastguard Worker                               valid_from_delta=valid_from, valid_until_delta=valid_to)
532*6236dae4SAndroid Build Coastguard Worker        csr = TestCA._add_leaf_usages(csr, domains=domains, issuer=issuer)
533*6236dae4SAndroid Build Coastguard Worker        cert = csr.sign(private_key=issuer.private_key,
534*6236dae4SAndroid Build Coastguard Worker                        algorithm=hashes.SHA256(),
535*6236dae4SAndroid Build Coastguard Worker                        backend=default_backend())
536*6236dae4SAndroid Build Coastguard Worker        return Credentials(name=name, cert=cert, pkey=pkey, issuer=issuer)
537*6236dae4SAndroid Build Coastguard Worker
538*6236dae4SAndroid Build Coastguard Worker    @staticmethod
539*6236dae4SAndroid Build Coastguard Worker    def _make_client_credentials(name: str,
540*6236dae4SAndroid Build Coastguard Worker                                 issuer: Credentials, email: Optional[str],
541*6236dae4SAndroid Build Coastguard Worker                                 key_type: Any,
542*6236dae4SAndroid Build Coastguard Worker                                 valid_from: timedelta = timedelta(days=-1),
543*6236dae4SAndroid Build Coastguard Worker                                 valid_to: timedelta = timedelta(days=89),
544*6236dae4SAndroid Build Coastguard Worker                                 ) -> Credentials:
545*6236dae4SAndroid Build Coastguard Worker        pkey = _private_key(key_type=key_type)
546*6236dae4SAndroid Build Coastguard Worker        subject = TestCA._make_x509_name(common_name=name, parent=issuer.subject)
547*6236dae4SAndroid Build Coastguard Worker        csr = TestCA._make_csr(subject=subject,
548*6236dae4SAndroid Build Coastguard Worker                               issuer_subject=issuer.certificate.subject, pkey=pkey,
549*6236dae4SAndroid Build Coastguard Worker                               valid_from_delta=valid_from, valid_until_delta=valid_to)
550*6236dae4SAndroid Build Coastguard Worker        csr = TestCA._add_client_usages(csr, issuer=issuer, rfc82name=email)
551*6236dae4SAndroid Build Coastguard Worker        cert = csr.sign(private_key=issuer.private_key,
552*6236dae4SAndroid Build Coastguard Worker                        algorithm=hashes.SHA256(),
553*6236dae4SAndroid Build Coastguard Worker                        backend=default_backend())
554*6236dae4SAndroid Build Coastguard Worker        return Credentials(name=name, cert=cert, pkey=pkey, issuer=issuer)
555