xref: /aosp_15_r20/prebuilts/build-tools/common/py3-stdlib/ssl.py (revision cda5da8d549138a6648c5ee6d7a49cf8f4a657be)
1# Wrapper module for _ssl, providing some additional facilities
2# implemented in Python.  Written by Bill Janssen.
3
4"""This module provides some more Pythonic support for SSL.
5
6Object types:
7
8  SSLSocket -- subtype of socket.socket which does SSL over the socket
9
10Exceptions:
11
12  SSLError -- exception raised for I/O errors
13
14Functions:
15
16  cert_time_to_seconds -- convert time string used for certificate
17                          notBefore and notAfter functions to integer
18                          seconds past the Epoch (the time values
19                          returned from time.time())
20
21  get_server_certificate (addr, ssl_version, ca_certs, timeout) -- Retrieve the
22                          certificate from the server at the specified
23                          address and return it as a PEM-encoded string
24
25
26Integer constants:
27
28SSL_ERROR_ZERO_RETURN
29SSL_ERROR_WANT_READ
30SSL_ERROR_WANT_WRITE
31SSL_ERROR_WANT_X509_LOOKUP
32SSL_ERROR_SYSCALL
33SSL_ERROR_SSL
34SSL_ERROR_WANT_CONNECT
35
36SSL_ERROR_EOF
37SSL_ERROR_INVALID_ERROR_CODE
38
39The following group define certificate requirements that one side is
40allowing/requiring from the other side:
41
42CERT_NONE - no certificates from the other side are required (or will
43            be looked at if provided)
44CERT_OPTIONAL - certificates are not required, but if provided will be
45                validated, and if validation fails, the connection will
46                also fail
47CERT_REQUIRED - certificates are required, and will be validated, and
48                if validation fails, the connection will also fail
49
50The following constants identify various SSL protocol variants:
51
52PROTOCOL_SSLv2
53PROTOCOL_SSLv3
54PROTOCOL_SSLv23
55PROTOCOL_TLS
56PROTOCOL_TLS_CLIENT
57PROTOCOL_TLS_SERVER
58PROTOCOL_TLSv1
59PROTOCOL_TLSv1_1
60PROTOCOL_TLSv1_2
61
62The following constants identify various SSL alert message descriptions as per
63http://www.iana.org/assignments/tls-parameters/tls-parameters.xml#tls-parameters-6
64
65ALERT_DESCRIPTION_CLOSE_NOTIFY
66ALERT_DESCRIPTION_UNEXPECTED_MESSAGE
67ALERT_DESCRIPTION_BAD_RECORD_MAC
68ALERT_DESCRIPTION_RECORD_OVERFLOW
69ALERT_DESCRIPTION_DECOMPRESSION_FAILURE
70ALERT_DESCRIPTION_HANDSHAKE_FAILURE
71ALERT_DESCRIPTION_BAD_CERTIFICATE
72ALERT_DESCRIPTION_UNSUPPORTED_CERTIFICATE
73ALERT_DESCRIPTION_CERTIFICATE_REVOKED
74ALERT_DESCRIPTION_CERTIFICATE_EXPIRED
75ALERT_DESCRIPTION_CERTIFICATE_UNKNOWN
76ALERT_DESCRIPTION_ILLEGAL_PARAMETER
77ALERT_DESCRIPTION_UNKNOWN_CA
78ALERT_DESCRIPTION_ACCESS_DENIED
79ALERT_DESCRIPTION_DECODE_ERROR
80ALERT_DESCRIPTION_DECRYPT_ERROR
81ALERT_DESCRIPTION_PROTOCOL_VERSION
82ALERT_DESCRIPTION_INSUFFICIENT_SECURITY
83ALERT_DESCRIPTION_INTERNAL_ERROR
84ALERT_DESCRIPTION_USER_CANCELLED
85ALERT_DESCRIPTION_NO_RENEGOTIATION
86ALERT_DESCRIPTION_UNSUPPORTED_EXTENSION
87ALERT_DESCRIPTION_CERTIFICATE_UNOBTAINABLE
88ALERT_DESCRIPTION_UNRECOGNIZED_NAME
89ALERT_DESCRIPTION_BAD_CERTIFICATE_STATUS_RESPONSE
90ALERT_DESCRIPTION_BAD_CERTIFICATE_HASH_VALUE
91ALERT_DESCRIPTION_UNKNOWN_PSK_IDENTITY
92"""
93
94import sys
95import os
96from collections import namedtuple
97from enum import Enum as _Enum, IntEnum as _IntEnum, IntFlag as _IntFlag
98from enum import _simple_enum
99
100import _ssl             # if we can't import it, let the error propagate
101
102from _ssl import OPENSSL_VERSION_NUMBER, OPENSSL_VERSION_INFO, OPENSSL_VERSION
103from _ssl import _SSLContext, MemoryBIO, SSLSession
104from _ssl import (
105    SSLError, SSLZeroReturnError, SSLWantReadError, SSLWantWriteError,
106    SSLSyscallError, SSLEOFError, SSLCertVerificationError
107    )
108from _ssl import txt2obj as _txt2obj, nid2obj as _nid2obj
109from _ssl import RAND_status, RAND_add, RAND_bytes, RAND_pseudo_bytes
110try:
111    from _ssl import RAND_egd
112except ImportError:
113    # LibreSSL does not provide RAND_egd
114    pass
115
116
117from _ssl import (
118    HAS_SNI, HAS_ECDH, HAS_NPN, HAS_ALPN, HAS_SSLv2, HAS_SSLv3, HAS_TLSv1,
119    HAS_TLSv1_1, HAS_TLSv1_2, HAS_TLSv1_3
120)
121from _ssl import _DEFAULT_CIPHERS, _OPENSSL_API_VERSION
122
123_IntEnum._convert_(
124    '_SSLMethod', __name__,
125    lambda name: name.startswith('PROTOCOL_') and name != 'PROTOCOL_SSLv23',
126    source=_ssl)
127
128_IntFlag._convert_(
129    'Options', __name__,
130    lambda name: name.startswith('OP_'),
131    source=_ssl)
132
133_IntEnum._convert_(
134    'AlertDescription', __name__,
135    lambda name: name.startswith('ALERT_DESCRIPTION_'),
136    source=_ssl)
137
138_IntEnum._convert_(
139    'SSLErrorNumber', __name__,
140    lambda name: name.startswith('SSL_ERROR_'),
141    source=_ssl)
142
143_IntFlag._convert_(
144    'VerifyFlags', __name__,
145    lambda name: name.startswith('VERIFY_'),
146    source=_ssl)
147
148_IntEnum._convert_(
149    'VerifyMode', __name__,
150    lambda name: name.startswith('CERT_'),
151    source=_ssl)
152
153PROTOCOL_SSLv23 = _SSLMethod.PROTOCOL_SSLv23 = _SSLMethod.PROTOCOL_TLS
154_PROTOCOL_NAMES = {value: name for name, value in _SSLMethod.__members__.items()}
155
156_SSLv2_IF_EXISTS = getattr(_SSLMethod, 'PROTOCOL_SSLv2', None)
157
158
159@_simple_enum(_IntEnum)
160class TLSVersion:
161    MINIMUM_SUPPORTED = _ssl.PROTO_MINIMUM_SUPPORTED
162    SSLv3 = _ssl.PROTO_SSLv3
163    TLSv1 = _ssl.PROTO_TLSv1
164    TLSv1_1 = _ssl.PROTO_TLSv1_1
165    TLSv1_2 = _ssl.PROTO_TLSv1_2
166    TLSv1_3 = _ssl.PROTO_TLSv1_3
167    MAXIMUM_SUPPORTED = _ssl.PROTO_MAXIMUM_SUPPORTED
168
169
170@_simple_enum(_IntEnum)
171class _TLSContentType:
172    """Content types (record layer)
173
174    See RFC 8446, section B.1
175    """
176    CHANGE_CIPHER_SPEC = 20
177    ALERT = 21
178    HANDSHAKE = 22
179    APPLICATION_DATA = 23
180    # pseudo content types
181    HEADER = 0x100
182    INNER_CONTENT_TYPE = 0x101
183
184
185@_simple_enum(_IntEnum)
186class _TLSAlertType:
187    """Alert types for TLSContentType.ALERT messages
188
189    See RFC 8466, section B.2
190    """
191    CLOSE_NOTIFY = 0
192    UNEXPECTED_MESSAGE = 10
193    BAD_RECORD_MAC = 20
194    DECRYPTION_FAILED = 21
195    RECORD_OVERFLOW = 22
196    DECOMPRESSION_FAILURE = 30
197    HANDSHAKE_FAILURE = 40
198    NO_CERTIFICATE = 41
199    BAD_CERTIFICATE = 42
200    UNSUPPORTED_CERTIFICATE = 43
201    CERTIFICATE_REVOKED = 44
202    CERTIFICATE_EXPIRED = 45
203    CERTIFICATE_UNKNOWN = 46
204    ILLEGAL_PARAMETER = 47
205    UNKNOWN_CA = 48
206    ACCESS_DENIED = 49
207    DECODE_ERROR = 50
208    DECRYPT_ERROR = 51
209    EXPORT_RESTRICTION = 60
210    PROTOCOL_VERSION = 70
211    INSUFFICIENT_SECURITY = 71
212    INTERNAL_ERROR = 80
213    INAPPROPRIATE_FALLBACK = 86
214    USER_CANCELED = 90
215    NO_RENEGOTIATION = 100
216    MISSING_EXTENSION = 109
217    UNSUPPORTED_EXTENSION = 110
218    CERTIFICATE_UNOBTAINABLE = 111
219    UNRECOGNIZED_NAME = 112
220    BAD_CERTIFICATE_STATUS_RESPONSE = 113
221    BAD_CERTIFICATE_HASH_VALUE = 114
222    UNKNOWN_PSK_IDENTITY = 115
223    CERTIFICATE_REQUIRED = 116
224    NO_APPLICATION_PROTOCOL = 120
225
226
227@_simple_enum(_IntEnum)
228class _TLSMessageType:
229    """Message types (handshake protocol)
230
231    See RFC 8446, section B.3
232    """
233    HELLO_REQUEST = 0
234    CLIENT_HELLO = 1
235    SERVER_HELLO = 2
236    HELLO_VERIFY_REQUEST = 3
237    NEWSESSION_TICKET = 4
238    END_OF_EARLY_DATA = 5
239    HELLO_RETRY_REQUEST = 6
240    ENCRYPTED_EXTENSIONS = 8
241    CERTIFICATE = 11
242    SERVER_KEY_EXCHANGE = 12
243    CERTIFICATE_REQUEST = 13
244    SERVER_DONE = 14
245    CERTIFICATE_VERIFY = 15
246    CLIENT_KEY_EXCHANGE = 16
247    FINISHED = 20
248    CERTIFICATE_URL = 21
249    CERTIFICATE_STATUS = 22
250    SUPPLEMENTAL_DATA = 23
251    KEY_UPDATE = 24
252    NEXT_PROTO = 67
253    MESSAGE_HASH = 254
254    CHANGE_CIPHER_SPEC = 0x0101
255
256
257if sys.platform == "win32":
258    from _ssl import enum_certificates, enum_crls
259
260from socket import socket, SOCK_STREAM, create_connection
261from socket import SOL_SOCKET, SO_TYPE, _GLOBAL_DEFAULT_TIMEOUT
262import socket as _socket
263import base64        # for DER-to-PEM translation
264import errno
265import warnings
266
267
268socket_error = OSError  # keep that public name in module namespace
269
270CHANNEL_BINDING_TYPES = ['tls-unique']
271
272HAS_NEVER_CHECK_COMMON_NAME = hasattr(_ssl, 'HOSTFLAG_NEVER_CHECK_SUBJECT')
273
274
275_RESTRICTED_SERVER_CIPHERS = _DEFAULT_CIPHERS
276
277CertificateError = SSLCertVerificationError
278
279
280def _dnsname_match(dn, hostname):
281    """Matching according to RFC 6125, section 6.4.3
282
283    - Hostnames are compared lower-case.
284    - For IDNA, both dn and hostname must be encoded as IDN A-label (ACE).
285    - Partial wildcards like 'www*.example.org', multiple wildcards, sole
286      wildcard or wildcards in labels other then the left-most label are not
287      supported and a CertificateError is raised.
288    - A wildcard must match at least one character.
289    """
290    if not dn:
291        return False
292
293    wildcards = dn.count('*')
294    # speed up common case w/o wildcards
295    if not wildcards:
296        return dn.lower() == hostname.lower()
297
298    if wildcards > 1:
299        raise CertificateError(
300            "too many wildcards in certificate DNS name: {!r}.".format(dn))
301
302    dn_leftmost, sep, dn_remainder = dn.partition('.')
303
304    if '*' in dn_remainder:
305        # Only match wildcard in leftmost segment.
306        raise CertificateError(
307            "wildcard can only be present in the leftmost label: "
308            "{!r}.".format(dn))
309
310    if not sep:
311        # no right side
312        raise CertificateError(
313            "sole wildcard without additional labels are not support: "
314            "{!r}.".format(dn))
315
316    if dn_leftmost != '*':
317        # no partial wildcard matching
318        raise CertificateError(
319            "partial wildcards in leftmost label are not supported: "
320            "{!r}.".format(dn))
321
322    hostname_leftmost, sep, hostname_remainder = hostname.partition('.')
323    if not hostname_leftmost or not sep:
324        # wildcard must match at least one char
325        return False
326    return dn_remainder.lower() == hostname_remainder.lower()
327
328
329def _inet_paton(ipname):
330    """Try to convert an IP address to packed binary form
331
332    Supports IPv4 addresses on all platforms and IPv6 on platforms with IPv6
333    support.
334    """
335    # inet_aton() also accepts strings like '1', '127.1', some also trailing
336    # data like '127.0.0.1 whatever'.
337    try:
338        addr = _socket.inet_aton(ipname)
339    except OSError:
340        # not an IPv4 address
341        pass
342    else:
343        if _socket.inet_ntoa(addr) == ipname:
344            # only accept injective ipnames
345            return addr
346        else:
347            # refuse for short IPv4 notation and additional trailing data
348            raise ValueError(
349                "{!r} is not a quad-dotted IPv4 address.".format(ipname)
350            )
351
352    try:
353        return _socket.inet_pton(_socket.AF_INET6, ipname)
354    except OSError:
355        raise ValueError("{!r} is neither an IPv4 nor an IP6 "
356                         "address.".format(ipname))
357    except AttributeError:
358        # AF_INET6 not available
359        pass
360
361    raise ValueError("{!r} is not an IPv4 address.".format(ipname))
362
363
364def _ipaddress_match(cert_ipaddress, host_ip):
365    """Exact matching of IP addresses.
366
367    RFC 6125 explicitly doesn't define an algorithm for this
368    (section 1.7.2 - "Out of Scope").
369    """
370    # OpenSSL may add a trailing newline to a subjectAltName's IP address,
371    # commonly with IPv6 addresses. Strip off trailing \n.
372    ip = _inet_paton(cert_ipaddress.rstrip())
373    return ip == host_ip
374
375
376def match_hostname(cert, hostname):
377    """Verify that *cert* (in decoded format as returned by
378    SSLSocket.getpeercert()) matches the *hostname*.  RFC 2818 and RFC 6125
379    rules are followed.
380
381    The function matches IP addresses rather than dNSNames if hostname is a
382    valid ipaddress string. IPv4 addresses are supported on all platforms.
383    IPv6 addresses are supported on platforms with IPv6 support (AF_INET6
384    and inet_pton).
385
386    CertificateError is raised on failure. On success, the function
387    returns nothing.
388    """
389    warnings.warn(
390        "ssl.match_hostname() is deprecated",
391        category=DeprecationWarning,
392        stacklevel=2
393    )
394    if not cert:
395        raise ValueError("empty or no certificate, match_hostname needs a "
396                         "SSL socket or SSL context with either "
397                         "CERT_OPTIONAL or CERT_REQUIRED")
398    try:
399        host_ip = _inet_paton(hostname)
400    except ValueError:
401        # Not an IP address (common case)
402        host_ip = None
403    dnsnames = []
404    san = cert.get('subjectAltName', ())
405    for key, value in san:
406        if key == 'DNS':
407            if host_ip is None and _dnsname_match(value, hostname):
408                return
409            dnsnames.append(value)
410        elif key == 'IP Address':
411            if host_ip is not None and _ipaddress_match(value, host_ip):
412                return
413            dnsnames.append(value)
414    if not dnsnames:
415        # The subject is only checked when there is no dNSName entry
416        # in subjectAltName
417        for sub in cert.get('subject', ()):
418            for key, value in sub:
419                # XXX according to RFC 2818, the most specific Common Name
420                # must be used.
421                if key == 'commonName':
422                    if _dnsname_match(value, hostname):
423                        return
424                    dnsnames.append(value)
425    if len(dnsnames) > 1:
426        raise CertificateError("hostname %r "
427            "doesn't match either of %s"
428            % (hostname, ', '.join(map(repr, dnsnames))))
429    elif len(dnsnames) == 1:
430        raise CertificateError("hostname %r "
431            "doesn't match %r"
432            % (hostname, dnsnames[0]))
433    else:
434        raise CertificateError("no appropriate commonName or "
435            "subjectAltName fields were found")
436
437
438DefaultVerifyPaths = namedtuple("DefaultVerifyPaths",
439    "cafile capath openssl_cafile_env openssl_cafile openssl_capath_env "
440    "openssl_capath")
441
442def get_default_verify_paths():
443    """Return paths to default cafile and capath.
444    """
445    parts = _ssl.get_default_verify_paths()
446
447    # environment vars shadow paths
448    cafile = os.environ.get(parts[0], parts[1])
449    capath = os.environ.get(parts[2], parts[3])
450
451    return DefaultVerifyPaths(cafile if os.path.isfile(cafile) else None,
452                              capath if os.path.isdir(capath) else None,
453                              *parts)
454
455
456class _ASN1Object(namedtuple("_ASN1Object", "nid shortname longname oid")):
457    """ASN.1 object identifier lookup
458    """
459    __slots__ = ()
460
461    def __new__(cls, oid):
462        return super().__new__(cls, *_txt2obj(oid, name=False))
463
464    @classmethod
465    def fromnid(cls, nid):
466        """Create _ASN1Object from OpenSSL numeric ID
467        """
468        return super().__new__(cls, *_nid2obj(nid))
469
470    @classmethod
471    def fromname(cls, name):
472        """Create _ASN1Object from short name, long name or OID
473        """
474        return super().__new__(cls, *_txt2obj(name, name=True))
475
476
477class Purpose(_ASN1Object, _Enum):
478    """SSLContext purpose flags with X509v3 Extended Key Usage objects
479    """
480    SERVER_AUTH = '1.3.6.1.5.5.7.3.1'
481    CLIENT_AUTH = '1.3.6.1.5.5.7.3.2'
482
483
484class SSLContext(_SSLContext):
485    """An SSLContext holds various SSL-related configuration options and
486    data, such as certificates and possibly a private key."""
487    _windows_cert_stores = ("CA", "ROOT")
488
489    sslsocket_class = None  # SSLSocket is assigned later.
490    sslobject_class = None  # SSLObject is assigned later.
491
492    def __new__(cls, protocol=None, *args, **kwargs):
493        if protocol is None:
494            warnings.warn(
495                "ssl.SSLContext() without protocol argument is deprecated.",
496                category=DeprecationWarning,
497                stacklevel=2
498            )
499            protocol = PROTOCOL_TLS
500        self = _SSLContext.__new__(cls, protocol)
501        return self
502
503    def _encode_hostname(self, hostname):
504        if hostname is None:
505            return None
506        elif isinstance(hostname, str):
507            return hostname.encode('idna').decode('ascii')
508        else:
509            return hostname.decode('ascii')
510
511    def wrap_socket(self, sock, server_side=False,
512                    do_handshake_on_connect=True,
513                    suppress_ragged_eofs=True,
514                    server_hostname=None, session=None):
515        # SSLSocket class handles server_hostname encoding before it calls
516        # ctx._wrap_socket()
517        return self.sslsocket_class._create(
518            sock=sock,
519            server_side=server_side,
520            do_handshake_on_connect=do_handshake_on_connect,
521            suppress_ragged_eofs=suppress_ragged_eofs,
522            server_hostname=server_hostname,
523            context=self,
524            session=session
525        )
526
527    def wrap_bio(self, incoming, outgoing, server_side=False,
528                 server_hostname=None, session=None):
529        # Need to encode server_hostname here because _wrap_bio() can only
530        # handle ASCII str.
531        return self.sslobject_class._create(
532            incoming, outgoing, server_side=server_side,
533            server_hostname=self._encode_hostname(server_hostname),
534            session=session, context=self,
535        )
536
537    def set_npn_protocols(self, npn_protocols):
538        warnings.warn(
539            "ssl NPN is deprecated, use ALPN instead",
540            DeprecationWarning,
541            stacklevel=2
542        )
543        protos = bytearray()
544        for protocol in npn_protocols:
545            b = bytes(protocol, 'ascii')
546            if len(b) == 0 or len(b) > 255:
547                raise SSLError('NPN protocols must be 1 to 255 in length')
548            protos.append(len(b))
549            protos.extend(b)
550
551        self._set_npn_protocols(protos)
552
553    def set_servername_callback(self, server_name_callback):
554        if server_name_callback is None:
555            self.sni_callback = None
556        else:
557            if not callable(server_name_callback):
558                raise TypeError("not a callable object")
559
560            def shim_cb(sslobj, servername, sslctx):
561                servername = self._encode_hostname(servername)
562                return server_name_callback(sslobj, servername, sslctx)
563
564            self.sni_callback = shim_cb
565
566    def set_alpn_protocols(self, alpn_protocols):
567        protos = bytearray()
568        for protocol in alpn_protocols:
569            b = bytes(protocol, 'ascii')
570            if len(b) == 0 or len(b) > 255:
571                raise SSLError('ALPN protocols must be 1 to 255 in length')
572            protos.append(len(b))
573            protos.extend(b)
574
575        self._set_alpn_protocols(protos)
576
577    def _load_windows_store_certs(self, storename, purpose):
578        certs = bytearray()
579        try:
580            for cert, encoding, trust in enum_certificates(storename):
581                # CA certs are never PKCS#7 encoded
582                if encoding == "x509_asn":
583                    if trust is True or purpose.oid in trust:
584                        certs.extend(cert)
585        except PermissionError:
586            warnings.warn("unable to enumerate Windows certificate store")
587        if certs:
588            self.load_verify_locations(cadata=certs)
589        return certs
590
591    def load_default_certs(self, purpose=Purpose.SERVER_AUTH):
592        if not isinstance(purpose, _ASN1Object):
593            raise TypeError(purpose)
594        if sys.platform == "win32":
595            for storename in self._windows_cert_stores:
596                self._load_windows_store_certs(storename, purpose)
597        self.set_default_verify_paths()
598
599    if hasattr(_SSLContext, 'minimum_version'):
600        @property
601        def minimum_version(self):
602            return TLSVersion(super().minimum_version)
603
604        @minimum_version.setter
605        def minimum_version(self, value):
606            if value == TLSVersion.SSLv3:
607                self.options &= ~Options.OP_NO_SSLv3
608            super(SSLContext, SSLContext).minimum_version.__set__(self, value)
609
610        @property
611        def maximum_version(self):
612            return TLSVersion(super().maximum_version)
613
614        @maximum_version.setter
615        def maximum_version(self, value):
616            super(SSLContext, SSLContext).maximum_version.__set__(self, value)
617
618    @property
619    def options(self):
620        return Options(super().options)
621
622    @options.setter
623    def options(self, value):
624        super(SSLContext, SSLContext).options.__set__(self, value)
625
626    if hasattr(_ssl, 'HOSTFLAG_NEVER_CHECK_SUBJECT'):
627        @property
628        def hostname_checks_common_name(self):
629            ncs = self._host_flags & _ssl.HOSTFLAG_NEVER_CHECK_SUBJECT
630            return ncs != _ssl.HOSTFLAG_NEVER_CHECK_SUBJECT
631
632        @hostname_checks_common_name.setter
633        def hostname_checks_common_name(self, value):
634            if value:
635                self._host_flags &= ~_ssl.HOSTFLAG_NEVER_CHECK_SUBJECT
636            else:
637                self._host_flags |= _ssl.HOSTFLAG_NEVER_CHECK_SUBJECT
638    else:
639        @property
640        def hostname_checks_common_name(self):
641            return True
642
643    @property
644    def _msg_callback(self):
645        """TLS message callback
646
647        The message callback provides a debugging hook to analyze TLS
648        connections. The callback is called for any TLS protocol message
649        (header, handshake, alert, and more), but not for application data.
650        Due to technical  limitations, the callback can't be used to filter
651        traffic or to abort a connection. Any exception raised in the
652        callback is delayed until the handshake, read, or write operation
653        has been performed.
654
655        def msg_cb(conn, direction, version, content_type, msg_type, data):
656            pass
657
658        conn
659            :class:`SSLSocket` or :class:`SSLObject` instance
660        direction
661            ``read`` or ``write``
662        version
663            :class:`TLSVersion` enum member or int for unknown version. For a
664            frame header, it's the header version.
665        content_type
666            :class:`_TLSContentType` enum member or int for unsupported
667            content type.
668        msg_type
669            Either a :class:`_TLSContentType` enum number for a header
670            message, a :class:`_TLSAlertType` enum member for an alert
671            message, a :class:`_TLSMessageType` enum member for other
672            messages, or int for unsupported message types.
673        data
674            Raw, decrypted message content as bytes
675        """
676        inner = super()._msg_callback
677        if inner is not None:
678            return inner.user_function
679        else:
680            return None
681
682    @_msg_callback.setter
683    def _msg_callback(self, callback):
684        if callback is None:
685            super(SSLContext, SSLContext)._msg_callback.__set__(self, None)
686            return
687
688        if not hasattr(callback, '__call__'):
689            raise TypeError(f"{callback} is not callable.")
690
691        def inner(conn, direction, version, content_type, msg_type, data):
692            try:
693                version = TLSVersion(version)
694            except ValueError:
695                pass
696
697            try:
698                content_type = _TLSContentType(content_type)
699            except ValueError:
700                pass
701
702            if content_type == _TLSContentType.HEADER:
703                msg_enum = _TLSContentType
704            elif content_type == _TLSContentType.ALERT:
705                msg_enum = _TLSAlertType
706            else:
707                msg_enum = _TLSMessageType
708            try:
709                msg_type = msg_enum(msg_type)
710            except ValueError:
711                pass
712
713            return callback(conn, direction, version,
714                            content_type, msg_type, data)
715
716        inner.user_function = callback
717
718        super(SSLContext, SSLContext)._msg_callback.__set__(self, inner)
719
720    @property
721    def protocol(self):
722        return _SSLMethod(super().protocol)
723
724    @property
725    def verify_flags(self):
726        return VerifyFlags(super().verify_flags)
727
728    @verify_flags.setter
729    def verify_flags(self, value):
730        super(SSLContext, SSLContext).verify_flags.__set__(self, value)
731
732    @property
733    def verify_mode(self):
734        value = super().verify_mode
735        try:
736            return VerifyMode(value)
737        except ValueError:
738            return value
739
740    @verify_mode.setter
741    def verify_mode(self, value):
742        super(SSLContext, SSLContext).verify_mode.__set__(self, value)
743
744
745def create_default_context(purpose=Purpose.SERVER_AUTH, *, cafile=None,
746                           capath=None, cadata=None):
747    """Create a SSLContext object with default settings.
748
749    NOTE: The protocol and settings may change anytime without prior
750          deprecation. The values represent a fair balance between maximum
751          compatibility and security.
752    """
753    if not isinstance(purpose, _ASN1Object):
754        raise TypeError(purpose)
755
756    # SSLContext sets OP_NO_SSLv2, OP_NO_SSLv3, OP_NO_COMPRESSION,
757    # OP_CIPHER_SERVER_PREFERENCE, OP_SINGLE_DH_USE and OP_SINGLE_ECDH_USE
758    # by default.
759    if purpose == Purpose.SERVER_AUTH:
760        # verify certs and host name in client mode
761        context = SSLContext(PROTOCOL_TLS_CLIENT)
762        context.verify_mode = CERT_REQUIRED
763        context.check_hostname = True
764    elif purpose == Purpose.CLIENT_AUTH:
765        context = SSLContext(PROTOCOL_TLS_SERVER)
766    else:
767        raise ValueError(purpose)
768
769    if cafile or capath or cadata:
770        context.load_verify_locations(cafile, capath, cadata)
771    elif context.verify_mode != CERT_NONE:
772        # no explicit cafile, capath or cadata but the verify mode is
773        # CERT_OPTIONAL or CERT_REQUIRED. Let's try to load default system
774        # root CA certificates for the given purpose. This may fail silently.
775        context.load_default_certs(purpose)
776    # OpenSSL 1.1.1 keylog file
777    if hasattr(context, 'keylog_filename'):
778        keylogfile = os.environ.get('SSLKEYLOGFILE')
779        if keylogfile and not sys.flags.ignore_environment:
780            context.keylog_filename = keylogfile
781    return context
782
783def _create_unverified_context(protocol=None, *, cert_reqs=CERT_NONE,
784                           check_hostname=False, purpose=Purpose.SERVER_AUTH,
785                           certfile=None, keyfile=None,
786                           cafile=None, capath=None, cadata=None):
787    """Create a SSLContext object for Python stdlib modules
788
789    All Python stdlib modules shall use this function to create SSLContext
790    objects in order to keep common settings in one place. The configuration
791    is less restrict than create_default_context()'s to increase backward
792    compatibility.
793    """
794    if not isinstance(purpose, _ASN1Object):
795        raise TypeError(purpose)
796
797    # SSLContext sets OP_NO_SSLv2, OP_NO_SSLv3, OP_NO_COMPRESSION,
798    # OP_CIPHER_SERVER_PREFERENCE, OP_SINGLE_DH_USE and OP_SINGLE_ECDH_USE
799    # by default.
800    if purpose == Purpose.SERVER_AUTH:
801        # verify certs and host name in client mode
802        if protocol is None:
803            protocol = PROTOCOL_TLS_CLIENT
804    elif purpose == Purpose.CLIENT_AUTH:
805        if protocol is None:
806            protocol = PROTOCOL_TLS_SERVER
807    else:
808        raise ValueError(purpose)
809
810    context = SSLContext(protocol)
811    context.check_hostname = check_hostname
812    if cert_reqs is not None:
813        context.verify_mode = cert_reqs
814    if check_hostname:
815        context.check_hostname = True
816
817    if keyfile and not certfile:
818        raise ValueError("certfile must be specified")
819    if certfile or keyfile:
820        context.load_cert_chain(certfile, keyfile)
821
822    # load CA root certs
823    if cafile or capath or cadata:
824        context.load_verify_locations(cafile, capath, cadata)
825    elif context.verify_mode != CERT_NONE:
826        # no explicit cafile, capath or cadata but the verify mode is
827        # CERT_OPTIONAL or CERT_REQUIRED. Let's try to load default system
828        # root CA certificates for the given purpose. This may fail silently.
829        context.load_default_certs(purpose)
830    # OpenSSL 1.1.1 keylog file
831    if hasattr(context, 'keylog_filename'):
832        keylogfile = os.environ.get('SSLKEYLOGFILE')
833        if keylogfile and not sys.flags.ignore_environment:
834            context.keylog_filename = keylogfile
835    return context
836
837# Used by http.client if no context is explicitly passed.
838_create_default_https_context = create_default_context
839
840
841# Backwards compatibility alias, even though it's not a public name.
842_create_stdlib_context = _create_unverified_context
843
844
845class SSLObject:
846    """This class implements an interface on top of a low-level SSL object as
847    implemented by OpenSSL. This object captures the state of an SSL connection
848    but does not provide any network IO itself. IO needs to be performed
849    through separate "BIO" objects which are OpenSSL's IO abstraction layer.
850
851    This class does not have a public constructor. Instances are returned by
852    ``SSLContext.wrap_bio``. This class is typically used by framework authors
853    that want to implement asynchronous IO for SSL through memory buffers.
854
855    When compared to ``SSLSocket``, this object lacks the following features:
856
857     * Any form of network IO, including methods such as ``recv`` and ``send``.
858     * The ``do_handshake_on_connect`` and ``suppress_ragged_eofs`` machinery.
859    """
860    def __init__(self, *args, **kwargs):
861        raise TypeError(
862            f"{self.__class__.__name__} does not have a public "
863            f"constructor. Instances are returned by SSLContext.wrap_bio()."
864        )
865
866    @classmethod
867    def _create(cls, incoming, outgoing, server_side=False,
868                 server_hostname=None, session=None, context=None):
869        self = cls.__new__(cls)
870        sslobj = context._wrap_bio(
871            incoming, outgoing, server_side=server_side,
872            server_hostname=server_hostname,
873            owner=self, session=session
874        )
875        self._sslobj = sslobj
876        return self
877
878    @property
879    def context(self):
880        """The SSLContext that is currently in use."""
881        return self._sslobj.context
882
883    @context.setter
884    def context(self, ctx):
885        self._sslobj.context = ctx
886
887    @property
888    def session(self):
889        """The SSLSession for client socket."""
890        return self._sslobj.session
891
892    @session.setter
893    def session(self, session):
894        self._sslobj.session = session
895
896    @property
897    def session_reused(self):
898        """Was the client session reused during handshake"""
899        return self._sslobj.session_reused
900
901    @property
902    def server_side(self):
903        """Whether this is a server-side socket."""
904        return self._sslobj.server_side
905
906    @property
907    def server_hostname(self):
908        """The currently set server hostname (for SNI), or ``None`` if no
909        server hostname is set."""
910        return self._sslobj.server_hostname
911
912    def read(self, len=1024, buffer=None):
913        """Read up to 'len' bytes from the SSL object and return them.
914
915        If 'buffer' is provided, read into this buffer and return the number of
916        bytes read.
917        """
918        if buffer is not None:
919            v = self._sslobj.read(len, buffer)
920        else:
921            v = self._sslobj.read(len)
922        return v
923
924    def write(self, data):
925        """Write 'data' to the SSL object and return the number of bytes
926        written.
927
928        The 'data' argument must support the buffer interface.
929        """
930        return self._sslobj.write(data)
931
932    def getpeercert(self, binary_form=False):
933        """Returns a formatted version of the data in the certificate provided
934        by the other end of the SSL channel.
935
936        Return None if no certificate was provided, {} if a certificate was
937        provided, but not validated.
938        """
939        return self._sslobj.getpeercert(binary_form)
940
941    def selected_npn_protocol(self):
942        """Return the currently selected NPN protocol as a string, or ``None``
943        if a next protocol was not negotiated or if NPN is not supported by one
944        of the peers."""
945        warnings.warn(
946            "ssl NPN is deprecated, use ALPN instead",
947            DeprecationWarning,
948            stacklevel=2
949        )
950
951    def selected_alpn_protocol(self):
952        """Return the currently selected ALPN protocol as a string, or ``None``
953        if a next protocol was not negotiated or if ALPN is not supported by one
954        of the peers."""
955        return self._sslobj.selected_alpn_protocol()
956
957    def cipher(self):
958        """Return the currently selected cipher as a 3-tuple ``(name,
959        ssl_version, secret_bits)``."""
960        return self._sslobj.cipher()
961
962    def shared_ciphers(self):
963        """Return a list of ciphers shared by the client during the handshake or
964        None if this is not a valid server connection.
965        """
966        return self._sslobj.shared_ciphers()
967
968    def compression(self):
969        """Return the current compression algorithm in use, or ``None`` if
970        compression was not negotiated or not supported by one of the peers."""
971        return self._sslobj.compression()
972
973    def pending(self):
974        """Return the number of bytes that can be read immediately."""
975        return self._sslobj.pending()
976
977    def do_handshake(self):
978        """Start the SSL/TLS handshake."""
979        self._sslobj.do_handshake()
980
981    def unwrap(self):
982        """Start the SSL shutdown handshake."""
983        return self._sslobj.shutdown()
984
985    def get_channel_binding(self, cb_type="tls-unique"):
986        """Get channel binding data for current connection.  Raise ValueError
987        if the requested `cb_type` is not supported.  Return bytes of the data
988        or None if the data is not available (e.g. before the handshake)."""
989        return self._sslobj.get_channel_binding(cb_type)
990
991    def version(self):
992        """Return a string identifying the protocol version used by the
993        current SSL channel. """
994        return self._sslobj.version()
995
996    def verify_client_post_handshake(self):
997        return self._sslobj.verify_client_post_handshake()
998
999
1000def _sslcopydoc(func):
1001    """Copy docstring from SSLObject to SSLSocket"""
1002    func.__doc__ = getattr(SSLObject, func.__name__).__doc__
1003    return func
1004
1005
1006class SSLSocket(socket):
1007    """This class implements a subtype of socket.socket that wraps
1008    the underlying OS socket in an SSL context when necessary, and
1009    provides read and write methods over that channel. """
1010
1011    def __init__(self, *args, **kwargs):
1012        raise TypeError(
1013            f"{self.__class__.__name__} does not have a public "
1014            f"constructor. Instances are returned by "
1015            f"SSLContext.wrap_socket()."
1016        )
1017
1018    @classmethod
1019    def _create(cls, sock, server_side=False, do_handshake_on_connect=True,
1020                suppress_ragged_eofs=True, server_hostname=None,
1021                context=None, session=None):
1022        if sock.getsockopt(SOL_SOCKET, SO_TYPE) != SOCK_STREAM:
1023            raise NotImplementedError("only stream sockets are supported")
1024        if server_side:
1025            if server_hostname:
1026                raise ValueError("server_hostname can only be specified "
1027                                 "in client mode")
1028            if session is not None:
1029                raise ValueError("session can only be specified in "
1030                                 "client mode")
1031        if context.check_hostname and not server_hostname:
1032            raise ValueError("check_hostname requires server_hostname")
1033
1034        kwargs = dict(
1035            family=sock.family, type=sock.type, proto=sock.proto,
1036            fileno=sock.fileno()
1037        )
1038        self = cls.__new__(cls, **kwargs)
1039        super(SSLSocket, self).__init__(**kwargs)
1040        self.settimeout(sock.gettimeout())
1041        sock.detach()
1042
1043        self._context = context
1044        self._session = session
1045        self._closed = False
1046        self._sslobj = None
1047        self.server_side = server_side
1048        self.server_hostname = context._encode_hostname(server_hostname)
1049        self.do_handshake_on_connect = do_handshake_on_connect
1050        self.suppress_ragged_eofs = suppress_ragged_eofs
1051
1052        # See if we are connected
1053        try:
1054            self.getpeername()
1055        except OSError as e:
1056            if e.errno != errno.ENOTCONN:
1057                raise
1058            connected = False
1059        else:
1060            connected = True
1061
1062        self._connected = connected
1063        if connected:
1064            # create the SSL object
1065            try:
1066                self._sslobj = self._context._wrap_socket(
1067                    self, server_side, self.server_hostname,
1068                    owner=self, session=self._session,
1069                )
1070                if do_handshake_on_connect:
1071                    timeout = self.gettimeout()
1072                    if timeout == 0.0:
1073                        # non-blocking
1074                        raise ValueError("do_handshake_on_connect should not be specified for non-blocking sockets")
1075                    self.do_handshake()
1076            except (OSError, ValueError):
1077                self.close()
1078                raise
1079        return self
1080
1081    @property
1082    @_sslcopydoc
1083    def context(self):
1084        return self._context
1085
1086    @context.setter
1087    def context(self, ctx):
1088        self._context = ctx
1089        self._sslobj.context = ctx
1090
1091    @property
1092    @_sslcopydoc
1093    def session(self):
1094        if self._sslobj is not None:
1095            return self._sslobj.session
1096
1097    @session.setter
1098    def session(self, session):
1099        self._session = session
1100        if self._sslobj is not None:
1101            self._sslobj.session = session
1102
1103    @property
1104    @_sslcopydoc
1105    def session_reused(self):
1106        if self._sslobj is not None:
1107            return self._sslobj.session_reused
1108
1109    def dup(self):
1110        raise NotImplementedError("Can't dup() %s instances" %
1111                                  self.__class__.__name__)
1112
1113    def _checkClosed(self, msg=None):
1114        # raise an exception here if you wish to check for spurious closes
1115        pass
1116
1117    def _check_connected(self):
1118        if not self._connected:
1119            # getpeername() will raise ENOTCONN if the socket is really
1120            # not connected; note that we can be connected even without
1121            # _connected being set, e.g. if connect() first returned
1122            # EAGAIN.
1123            self.getpeername()
1124
1125    def read(self, len=1024, buffer=None):
1126        """Read up to LEN bytes and return them.
1127        Return zero-length string on EOF."""
1128
1129        self._checkClosed()
1130        if self._sslobj is None:
1131            raise ValueError("Read on closed or unwrapped SSL socket.")
1132        try:
1133            if buffer is not None:
1134                return self._sslobj.read(len, buffer)
1135            else:
1136                return self._sslobj.read(len)
1137        except SSLError as x:
1138            if x.args[0] == SSL_ERROR_EOF and self.suppress_ragged_eofs:
1139                if buffer is not None:
1140                    return 0
1141                else:
1142                    return b''
1143            else:
1144                raise
1145
1146    def write(self, data):
1147        """Write DATA to the underlying SSL channel.  Returns
1148        number of bytes of DATA actually transmitted."""
1149
1150        self._checkClosed()
1151        if self._sslobj is None:
1152            raise ValueError("Write on closed or unwrapped SSL socket.")
1153        return self._sslobj.write(data)
1154
1155    @_sslcopydoc
1156    def getpeercert(self, binary_form=False):
1157        self._checkClosed()
1158        self._check_connected()
1159        return self._sslobj.getpeercert(binary_form)
1160
1161    @_sslcopydoc
1162    def selected_npn_protocol(self):
1163        self._checkClosed()
1164        warnings.warn(
1165            "ssl NPN is deprecated, use ALPN instead",
1166            DeprecationWarning,
1167            stacklevel=2
1168        )
1169        return None
1170
1171    @_sslcopydoc
1172    def selected_alpn_protocol(self):
1173        self._checkClosed()
1174        if self._sslobj is None or not _ssl.HAS_ALPN:
1175            return None
1176        else:
1177            return self._sslobj.selected_alpn_protocol()
1178
1179    @_sslcopydoc
1180    def cipher(self):
1181        self._checkClosed()
1182        if self._sslobj is None:
1183            return None
1184        else:
1185            return self._sslobj.cipher()
1186
1187    @_sslcopydoc
1188    def shared_ciphers(self):
1189        self._checkClosed()
1190        if self._sslobj is None:
1191            return None
1192        else:
1193            return self._sslobj.shared_ciphers()
1194
1195    @_sslcopydoc
1196    def compression(self):
1197        self._checkClosed()
1198        if self._sslobj is None:
1199            return None
1200        else:
1201            return self._sslobj.compression()
1202
1203    def send(self, data, flags=0):
1204        self._checkClosed()
1205        if self._sslobj is not None:
1206            if flags != 0:
1207                raise ValueError(
1208                    "non-zero flags not allowed in calls to send() on %s" %
1209                    self.__class__)
1210            return self._sslobj.write(data)
1211        else:
1212            return super().send(data, flags)
1213
1214    def sendto(self, data, flags_or_addr, addr=None):
1215        self._checkClosed()
1216        if self._sslobj is not None:
1217            raise ValueError("sendto not allowed on instances of %s" %
1218                             self.__class__)
1219        elif addr is None:
1220            return super().sendto(data, flags_or_addr)
1221        else:
1222            return super().sendto(data, flags_or_addr, addr)
1223
1224    def sendmsg(self, *args, **kwargs):
1225        # Ensure programs don't send data unencrypted if they try to
1226        # use this method.
1227        raise NotImplementedError("sendmsg not allowed on instances of %s" %
1228                                  self.__class__)
1229
1230    def sendall(self, data, flags=0):
1231        self._checkClosed()
1232        if self._sslobj is not None:
1233            if flags != 0:
1234                raise ValueError(
1235                    "non-zero flags not allowed in calls to sendall() on %s" %
1236                    self.__class__)
1237            count = 0
1238            with memoryview(data) as view, view.cast("B") as byte_view:
1239                amount = len(byte_view)
1240                while count < amount:
1241                    v = self.send(byte_view[count:])
1242                    count += v
1243        else:
1244            return super().sendall(data, flags)
1245
1246    def sendfile(self, file, offset=0, count=None):
1247        """Send a file, possibly by using os.sendfile() if this is a
1248        clear-text socket.  Return the total number of bytes sent.
1249        """
1250        if self._sslobj is not None:
1251            return self._sendfile_use_send(file, offset, count)
1252        else:
1253            # os.sendfile() works with plain sockets only
1254            return super().sendfile(file, offset, count)
1255
1256    def recv(self, buflen=1024, flags=0):
1257        self._checkClosed()
1258        if self._sslobj is not None:
1259            if flags != 0:
1260                raise ValueError(
1261                    "non-zero flags not allowed in calls to recv() on %s" %
1262                    self.__class__)
1263            return self.read(buflen)
1264        else:
1265            return super().recv(buflen, flags)
1266
1267    def recv_into(self, buffer, nbytes=None, flags=0):
1268        self._checkClosed()
1269        if buffer and (nbytes is None):
1270            nbytes = len(buffer)
1271        elif nbytes is None:
1272            nbytes = 1024
1273        if self._sslobj is not None:
1274            if flags != 0:
1275                raise ValueError(
1276                  "non-zero flags not allowed in calls to recv_into() on %s" %
1277                  self.__class__)
1278            return self.read(nbytes, buffer)
1279        else:
1280            return super().recv_into(buffer, nbytes, flags)
1281
1282    def recvfrom(self, buflen=1024, flags=0):
1283        self._checkClosed()
1284        if self._sslobj is not None:
1285            raise ValueError("recvfrom not allowed on instances of %s" %
1286                             self.__class__)
1287        else:
1288            return super().recvfrom(buflen, flags)
1289
1290    def recvfrom_into(self, buffer, nbytes=None, flags=0):
1291        self._checkClosed()
1292        if self._sslobj is not None:
1293            raise ValueError("recvfrom_into not allowed on instances of %s" %
1294                             self.__class__)
1295        else:
1296            return super().recvfrom_into(buffer, nbytes, flags)
1297
1298    def recvmsg(self, *args, **kwargs):
1299        raise NotImplementedError("recvmsg not allowed on instances of %s" %
1300                                  self.__class__)
1301
1302    def recvmsg_into(self, *args, **kwargs):
1303        raise NotImplementedError("recvmsg_into not allowed on instances of "
1304                                  "%s" % self.__class__)
1305
1306    @_sslcopydoc
1307    def pending(self):
1308        self._checkClosed()
1309        if self._sslobj is not None:
1310            return self._sslobj.pending()
1311        else:
1312            return 0
1313
1314    def shutdown(self, how):
1315        self._checkClosed()
1316        self._sslobj = None
1317        super().shutdown(how)
1318
1319    @_sslcopydoc
1320    def unwrap(self):
1321        if self._sslobj:
1322            s = self._sslobj.shutdown()
1323            self._sslobj = None
1324            return s
1325        else:
1326            raise ValueError("No SSL wrapper around " + str(self))
1327
1328    @_sslcopydoc
1329    def verify_client_post_handshake(self):
1330        if self._sslobj:
1331            return self._sslobj.verify_client_post_handshake()
1332        else:
1333            raise ValueError("No SSL wrapper around " + str(self))
1334
1335    def _real_close(self):
1336        self._sslobj = None
1337        super()._real_close()
1338
1339    @_sslcopydoc
1340    def do_handshake(self, block=False):
1341        self._check_connected()
1342        timeout = self.gettimeout()
1343        try:
1344            if timeout == 0.0 and block:
1345                self.settimeout(None)
1346            self._sslobj.do_handshake()
1347        finally:
1348            self.settimeout(timeout)
1349
1350    def _real_connect(self, addr, connect_ex):
1351        if self.server_side:
1352            raise ValueError("can't connect in server-side mode")
1353        # Here we assume that the socket is client-side, and not
1354        # connected at the time of the call.  We connect it, then wrap it.
1355        if self._connected or self._sslobj is not None:
1356            raise ValueError("attempt to connect already-connected SSLSocket!")
1357        self._sslobj = self.context._wrap_socket(
1358            self, False, self.server_hostname,
1359            owner=self, session=self._session
1360        )
1361        try:
1362            if connect_ex:
1363                rc = super().connect_ex(addr)
1364            else:
1365                rc = None
1366                super().connect(addr)
1367            if not rc:
1368                self._connected = True
1369                if self.do_handshake_on_connect:
1370                    self.do_handshake()
1371            return rc
1372        except (OSError, ValueError):
1373            self._sslobj = None
1374            raise
1375
1376    def connect(self, addr):
1377        """Connects to remote ADDR, and then wraps the connection in
1378        an SSL channel."""
1379        self._real_connect(addr, False)
1380
1381    def connect_ex(self, addr):
1382        """Connects to remote ADDR, and then wraps the connection in
1383        an SSL channel."""
1384        return self._real_connect(addr, True)
1385
1386    def accept(self):
1387        """Accepts a new connection from a remote client, and returns
1388        a tuple containing that new connection wrapped with a server-side
1389        SSL channel, and the address of the remote client."""
1390
1391        newsock, addr = super().accept()
1392        newsock = self.context.wrap_socket(newsock,
1393                    do_handshake_on_connect=self.do_handshake_on_connect,
1394                    suppress_ragged_eofs=self.suppress_ragged_eofs,
1395                    server_side=True)
1396        return newsock, addr
1397
1398    @_sslcopydoc
1399    def get_channel_binding(self, cb_type="tls-unique"):
1400        if self._sslobj is not None:
1401            return self._sslobj.get_channel_binding(cb_type)
1402        else:
1403            if cb_type not in CHANNEL_BINDING_TYPES:
1404                raise ValueError(
1405                    "{0} channel binding type not implemented".format(cb_type)
1406                )
1407            return None
1408
1409    @_sslcopydoc
1410    def version(self):
1411        if self._sslobj is not None:
1412            return self._sslobj.version()
1413        else:
1414            return None
1415
1416
1417# Python does not support forward declaration of types.
1418SSLContext.sslsocket_class = SSLSocket
1419SSLContext.sslobject_class = SSLObject
1420
1421
1422def wrap_socket(sock, keyfile=None, certfile=None,
1423                server_side=False, cert_reqs=CERT_NONE,
1424                ssl_version=PROTOCOL_TLS, ca_certs=None,
1425                do_handshake_on_connect=True,
1426                suppress_ragged_eofs=True,
1427                ciphers=None):
1428    warnings.warn(
1429        "ssl.wrap_socket() is deprecated, use SSLContext.wrap_socket()",
1430        category=DeprecationWarning,
1431        stacklevel=2
1432    )
1433    if server_side and not certfile:
1434        raise ValueError("certfile must be specified for server-side "
1435                         "operations")
1436    if keyfile and not certfile:
1437        raise ValueError("certfile must be specified")
1438    context = SSLContext(ssl_version)
1439    context.verify_mode = cert_reqs
1440    if ca_certs:
1441        context.load_verify_locations(ca_certs)
1442    if certfile:
1443        context.load_cert_chain(certfile, keyfile)
1444    if ciphers:
1445        context.set_ciphers(ciphers)
1446    return context.wrap_socket(
1447        sock=sock, server_side=server_side,
1448        do_handshake_on_connect=do_handshake_on_connect,
1449        suppress_ragged_eofs=suppress_ragged_eofs
1450    )
1451
1452# some utility functions
1453
1454def cert_time_to_seconds(cert_time):
1455    """Return the time in seconds since the Epoch, given the timestring
1456    representing the "notBefore" or "notAfter" date from a certificate
1457    in ``"%b %d %H:%M:%S %Y %Z"`` strptime format (C locale).
1458
1459    "notBefore" or "notAfter" dates must use UTC (RFC 5280).
1460
1461    Month is one of: Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec
1462    UTC should be specified as GMT (see ASN1_TIME_print())
1463    """
1464    from time import strptime
1465    from calendar import timegm
1466
1467    months = (
1468        "Jan","Feb","Mar","Apr","May","Jun",
1469        "Jul","Aug","Sep","Oct","Nov","Dec"
1470    )
1471    time_format = ' %d %H:%M:%S %Y GMT' # NOTE: no month, fixed GMT
1472    try:
1473        month_number = months.index(cert_time[:3].title()) + 1
1474    except ValueError:
1475        raise ValueError('time data %r does not match '
1476                         'format "%%b%s"' % (cert_time, time_format))
1477    else:
1478        # found valid month
1479        tt = strptime(cert_time[3:], time_format)
1480        # return an integer, the previous mktime()-based implementation
1481        # returned a float (fractional seconds are always zero here).
1482        return timegm((tt[0], month_number) + tt[2:6])
1483
1484PEM_HEADER = "-----BEGIN CERTIFICATE-----"
1485PEM_FOOTER = "-----END CERTIFICATE-----"
1486
1487def DER_cert_to_PEM_cert(der_cert_bytes):
1488    """Takes a certificate in binary DER format and returns the
1489    PEM version of it as a string."""
1490
1491    f = str(base64.standard_b64encode(der_cert_bytes), 'ASCII', 'strict')
1492    ss = [PEM_HEADER]
1493    ss += [f[i:i+64] for i in range(0, len(f), 64)]
1494    ss.append(PEM_FOOTER + '\n')
1495    return '\n'.join(ss)
1496
1497def PEM_cert_to_DER_cert(pem_cert_string):
1498    """Takes a certificate in ASCII PEM format and returns the
1499    DER-encoded version of it as a byte sequence"""
1500
1501    if not pem_cert_string.startswith(PEM_HEADER):
1502        raise ValueError("Invalid PEM encoding; must start with %s"
1503                         % PEM_HEADER)
1504    if not pem_cert_string.strip().endswith(PEM_FOOTER):
1505        raise ValueError("Invalid PEM encoding; must end with %s"
1506                         % PEM_FOOTER)
1507    d = pem_cert_string.strip()[len(PEM_HEADER):-len(PEM_FOOTER)]
1508    return base64.decodebytes(d.encode('ASCII', 'strict'))
1509
1510def get_server_certificate(addr, ssl_version=PROTOCOL_TLS_CLIENT,
1511                           ca_certs=None, timeout=_GLOBAL_DEFAULT_TIMEOUT):
1512    """Retrieve the certificate from the server at the specified address,
1513    and return it as a PEM-encoded string.
1514    If 'ca_certs' is specified, validate the server cert against it.
1515    If 'ssl_version' is specified, use it in the connection attempt.
1516    If 'timeout' is specified, use it in the connection attempt.
1517    """
1518
1519    host, port = addr
1520    if ca_certs is not None:
1521        cert_reqs = CERT_REQUIRED
1522    else:
1523        cert_reqs = CERT_NONE
1524    context = _create_stdlib_context(ssl_version,
1525                                     cert_reqs=cert_reqs,
1526                                     cafile=ca_certs)
1527    with create_connection(addr, timeout=timeout) as sock:
1528        with context.wrap_socket(sock, server_hostname=host) as sslsock:
1529            dercert = sslsock.getpeercert(True)
1530    return DER_cert_to_PEM_cert(dercert)
1531
1532def get_protocol_name(protocol_code):
1533    return _PROTOCOL_NAMES.get(protocol_code, '<unknown>')
1534