1# Wrapper module for _ssl, providing some additional facilities 2# implemented in Python. Written by Bill Janssen. 3 4"""This module provides some more Pythonic support for SSL. 5 6Object types: 7 8 SSLSocket -- subtype of socket.socket which does SSL over the socket 9 10Exceptions: 11 12 SSLError -- exception raised for I/O errors 13 14Functions: 15 16 cert_time_to_seconds -- convert time string used for certificate 17 notBefore and notAfter functions to integer 18 seconds past the Epoch (the time values 19 returned from time.time()) 20 21 get_server_certificate (addr, ssl_version, ca_certs, timeout) -- Retrieve the 22 certificate from the server at the specified 23 address and return it as a PEM-encoded string 24 25 26Integer constants: 27 28SSL_ERROR_ZERO_RETURN 29SSL_ERROR_WANT_READ 30SSL_ERROR_WANT_WRITE 31SSL_ERROR_WANT_X509_LOOKUP 32SSL_ERROR_SYSCALL 33SSL_ERROR_SSL 34SSL_ERROR_WANT_CONNECT 35 36SSL_ERROR_EOF 37SSL_ERROR_INVALID_ERROR_CODE 38 39The following group define certificate requirements that one side is 40allowing/requiring from the other side: 41 42CERT_NONE - no certificates from the other side are required (or will 43 be looked at if provided) 44CERT_OPTIONAL - certificates are not required, but if provided will be 45 validated, and if validation fails, the connection will 46 also fail 47CERT_REQUIRED - certificates are required, and will be validated, and 48 if validation fails, the connection will also fail 49 50The following constants identify various SSL protocol variants: 51 52PROTOCOL_SSLv2 53PROTOCOL_SSLv3 54PROTOCOL_SSLv23 55PROTOCOL_TLS 56PROTOCOL_TLS_CLIENT 57PROTOCOL_TLS_SERVER 58PROTOCOL_TLSv1 59PROTOCOL_TLSv1_1 60PROTOCOL_TLSv1_2 61 62The following constants identify various SSL alert message descriptions as per 63http://www.iana.org/assignments/tls-parameters/tls-parameters.xml#tls-parameters-6 64 65ALERT_DESCRIPTION_CLOSE_NOTIFY 66ALERT_DESCRIPTION_UNEXPECTED_MESSAGE 67ALERT_DESCRIPTION_BAD_RECORD_MAC 68ALERT_DESCRIPTION_RECORD_OVERFLOW 69ALERT_DESCRIPTION_DECOMPRESSION_FAILURE 70ALERT_DESCRIPTION_HANDSHAKE_FAILURE 71ALERT_DESCRIPTION_BAD_CERTIFICATE 72ALERT_DESCRIPTION_UNSUPPORTED_CERTIFICATE 73ALERT_DESCRIPTION_CERTIFICATE_REVOKED 74ALERT_DESCRIPTION_CERTIFICATE_EXPIRED 75ALERT_DESCRIPTION_CERTIFICATE_UNKNOWN 76ALERT_DESCRIPTION_ILLEGAL_PARAMETER 77ALERT_DESCRIPTION_UNKNOWN_CA 78ALERT_DESCRIPTION_ACCESS_DENIED 79ALERT_DESCRIPTION_DECODE_ERROR 80ALERT_DESCRIPTION_DECRYPT_ERROR 81ALERT_DESCRIPTION_PROTOCOL_VERSION 82ALERT_DESCRIPTION_INSUFFICIENT_SECURITY 83ALERT_DESCRIPTION_INTERNAL_ERROR 84ALERT_DESCRIPTION_USER_CANCELLED 85ALERT_DESCRIPTION_NO_RENEGOTIATION 86ALERT_DESCRIPTION_UNSUPPORTED_EXTENSION 87ALERT_DESCRIPTION_CERTIFICATE_UNOBTAINABLE 88ALERT_DESCRIPTION_UNRECOGNIZED_NAME 89ALERT_DESCRIPTION_BAD_CERTIFICATE_STATUS_RESPONSE 90ALERT_DESCRIPTION_BAD_CERTIFICATE_HASH_VALUE 91ALERT_DESCRIPTION_UNKNOWN_PSK_IDENTITY 92""" 93 94import sys 95import os 96from collections import namedtuple 97from enum import Enum as _Enum, IntEnum as _IntEnum, IntFlag as _IntFlag 98from enum import _simple_enum 99 100import _ssl # if we can't import it, let the error propagate 101 102from _ssl import OPENSSL_VERSION_NUMBER, OPENSSL_VERSION_INFO, OPENSSL_VERSION 103from _ssl import _SSLContext, MemoryBIO, SSLSession 104from _ssl import ( 105 SSLError, SSLZeroReturnError, SSLWantReadError, SSLWantWriteError, 106 SSLSyscallError, SSLEOFError, SSLCertVerificationError 107 ) 108from _ssl import txt2obj as _txt2obj, nid2obj as _nid2obj 109from _ssl import RAND_status, RAND_add, RAND_bytes, RAND_pseudo_bytes 110try: 111 from _ssl import RAND_egd 112except ImportError: 113 # LibreSSL does not provide RAND_egd 114 pass 115 116 117from _ssl import ( 118 HAS_SNI, HAS_ECDH, HAS_NPN, HAS_ALPN, HAS_SSLv2, HAS_SSLv3, HAS_TLSv1, 119 HAS_TLSv1_1, HAS_TLSv1_2, HAS_TLSv1_3 120) 121from _ssl import _DEFAULT_CIPHERS, _OPENSSL_API_VERSION 122 123_IntEnum._convert_( 124 '_SSLMethod', __name__, 125 lambda name: name.startswith('PROTOCOL_') and name != 'PROTOCOL_SSLv23', 126 source=_ssl) 127 128_IntFlag._convert_( 129 'Options', __name__, 130 lambda name: name.startswith('OP_'), 131 source=_ssl) 132 133_IntEnum._convert_( 134 'AlertDescription', __name__, 135 lambda name: name.startswith('ALERT_DESCRIPTION_'), 136 source=_ssl) 137 138_IntEnum._convert_( 139 'SSLErrorNumber', __name__, 140 lambda name: name.startswith('SSL_ERROR_'), 141 source=_ssl) 142 143_IntFlag._convert_( 144 'VerifyFlags', __name__, 145 lambda name: name.startswith('VERIFY_'), 146 source=_ssl) 147 148_IntEnum._convert_( 149 'VerifyMode', __name__, 150 lambda name: name.startswith('CERT_'), 151 source=_ssl) 152 153PROTOCOL_SSLv23 = _SSLMethod.PROTOCOL_SSLv23 = _SSLMethod.PROTOCOL_TLS 154_PROTOCOL_NAMES = {value: name for name, value in _SSLMethod.__members__.items()} 155 156_SSLv2_IF_EXISTS = getattr(_SSLMethod, 'PROTOCOL_SSLv2', None) 157 158 159@_simple_enum(_IntEnum) 160class TLSVersion: 161 MINIMUM_SUPPORTED = _ssl.PROTO_MINIMUM_SUPPORTED 162 SSLv3 = _ssl.PROTO_SSLv3 163 TLSv1 = _ssl.PROTO_TLSv1 164 TLSv1_1 = _ssl.PROTO_TLSv1_1 165 TLSv1_2 = _ssl.PROTO_TLSv1_2 166 TLSv1_3 = _ssl.PROTO_TLSv1_3 167 MAXIMUM_SUPPORTED = _ssl.PROTO_MAXIMUM_SUPPORTED 168 169 170@_simple_enum(_IntEnum) 171class _TLSContentType: 172 """Content types (record layer) 173 174 See RFC 8446, section B.1 175 """ 176 CHANGE_CIPHER_SPEC = 20 177 ALERT = 21 178 HANDSHAKE = 22 179 APPLICATION_DATA = 23 180 # pseudo content types 181 HEADER = 0x100 182 INNER_CONTENT_TYPE = 0x101 183 184 185@_simple_enum(_IntEnum) 186class _TLSAlertType: 187 """Alert types for TLSContentType.ALERT messages 188 189 See RFC 8466, section B.2 190 """ 191 CLOSE_NOTIFY = 0 192 UNEXPECTED_MESSAGE = 10 193 BAD_RECORD_MAC = 20 194 DECRYPTION_FAILED = 21 195 RECORD_OVERFLOW = 22 196 DECOMPRESSION_FAILURE = 30 197 HANDSHAKE_FAILURE = 40 198 NO_CERTIFICATE = 41 199 BAD_CERTIFICATE = 42 200 UNSUPPORTED_CERTIFICATE = 43 201 CERTIFICATE_REVOKED = 44 202 CERTIFICATE_EXPIRED = 45 203 CERTIFICATE_UNKNOWN = 46 204 ILLEGAL_PARAMETER = 47 205 UNKNOWN_CA = 48 206 ACCESS_DENIED = 49 207 DECODE_ERROR = 50 208 DECRYPT_ERROR = 51 209 EXPORT_RESTRICTION = 60 210 PROTOCOL_VERSION = 70 211 INSUFFICIENT_SECURITY = 71 212 INTERNAL_ERROR = 80 213 INAPPROPRIATE_FALLBACK = 86 214 USER_CANCELED = 90 215 NO_RENEGOTIATION = 100 216 MISSING_EXTENSION = 109 217 UNSUPPORTED_EXTENSION = 110 218 CERTIFICATE_UNOBTAINABLE = 111 219 UNRECOGNIZED_NAME = 112 220 BAD_CERTIFICATE_STATUS_RESPONSE = 113 221 BAD_CERTIFICATE_HASH_VALUE = 114 222 UNKNOWN_PSK_IDENTITY = 115 223 CERTIFICATE_REQUIRED = 116 224 NO_APPLICATION_PROTOCOL = 120 225 226 227@_simple_enum(_IntEnum) 228class _TLSMessageType: 229 """Message types (handshake protocol) 230 231 See RFC 8446, section B.3 232 """ 233 HELLO_REQUEST = 0 234 CLIENT_HELLO = 1 235 SERVER_HELLO = 2 236 HELLO_VERIFY_REQUEST = 3 237 NEWSESSION_TICKET = 4 238 END_OF_EARLY_DATA = 5 239 HELLO_RETRY_REQUEST = 6 240 ENCRYPTED_EXTENSIONS = 8 241 CERTIFICATE = 11 242 SERVER_KEY_EXCHANGE = 12 243 CERTIFICATE_REQUEST = 13 244 SERVER_DONE = 14 245 CERTIFICATE_VERIFY = 15 246 CLIENT_KEY_EXCHANGE = 16 247 FINISHED = 20 248 CERTIFICATE_URL = 21 249 CERTIFICATE_STATUS = 22 250 SUPPLEMENTAL_DATA = 23 251 KEY_UPDATE = 24 252 NEXT_PROTO = 67 253 MESSAGE_HASH = 254 254 CHANGE_CIPHER_SPEC = 0x0101 255 256 257if sys.platform == "win32": 258 from _ssl import enum_certificates, enum_crls 259 260from socket import socket, SOCK_STREAM, create_connection 261from socket import SOL_SOCKET, SO_TYPE, _GLOBAL_DEFAULT_TIMEOUT 262import socket as _socket 263import base64 # for DER-to-PEM translation 264import errno 265import warnings 266 267 268socket_error = OSError # keep that public name in module namespace 269 270CHANNEL_BINDING_TYPES = ['tls-unique'] 271 272HAS_NEVER_CHECK_COMMON_NAME = hasattr(_ssl, 'HOSTFLAG_NEVER_CHECK_SUBJECT') 273 274 275_RESTRICTED_SERVER_CIPHERS = _DEFAULT_CIPHERS 276 277CertificateError = SSLCertVerificationError 278 279 280def _dnsname_match(dn, hostname): 281 """Matching according to RFC 6125, section 6.4.3 282 283 - Hostnames are compared lower-case. 284 - For IDNA, both dn and hostname must be encoded as IDN A-label (ACE). 285 - Partial wildcards like 'www*.example.org', multiple wildcards, sole 286 wildcard or wildcards in labels other then the left-most label are not 287 supported and a CertificateError is raised. 288 - A wildcard must match at least one character. 289 """ 290 if not dn: 291 return False 292 293 wildcards = dn.count('*') 294 # speed up common case w/o wildcards 295 if not wildcards: 296 return dn.lower() == hostname.lower() 297 298 if wildcards > 1: 299 raise CertificateError( 300 "too many wildcards in certificate DNS name: {!r}.".format(dn)) 301 302 dn_leftmost, sep, dn_remainder = dn.partition('.') 303 304 if '*' in dn_remainder: 305 # Only match wildcard in leftmost segment. 306 raise CertificateError( 307 "wildcard can only be present in the leftmost label: " 308 "{!r}.".format(dn)) 309 310 if not sep: 311 # no right side 312 raise CertificateError( 313 "sole wildcard without additional labels are not support: " 314 "{!r}.".format(dn)) 315 316 if dn_leftmost != '*': 317 # no partial wildcard matching 318 raise CertificateError( 319 "partial wildcards in leftmost label are not supported: " 320 "{!r}.".format(dn)) 321 322 hostname_leftmost, sep, hostname_remainder = hostname.partition('.') 323 if not hostname_leftmost or not sep: 324 # wildcard must match at least one char 325 return False 326 return dn_remainder.lower() == hostname_remainder.lower() 327 328 329def _inet_paton(ipname): 330 """Try to convert an IP address to packed binary form 331 332 Supports IPv4 addresses on all platforms and IPv6 on platforms with IPv6 333 support. 334 """ 335 # inet_aton() also accepts strings like '1', '127.1', some also trailing 336 # data like '127.0.0.1 whatever'. 337 try: 338 addr = _socket.inet_aton(ipname) 339 except OSError: 340 # not an IPv4 address 341 pass 342 else: 343 if _socket.inet_ntoa(addr) == ipname: 344 # only accept injective ipnames 345 return addr 346 else: 347 # refuse for short IPv4 notation and additional trailing data 348 raise ValueError( 349 "{!r} is not a quad-dotted IPv4 address.".format(ipname) 350 ) 351 352 try: 353 return _socket.inet_pton(_socket.AF_INET6, ipname) 354 except OSError: 355 raise ValueError("{!r} is neither an IPv4 nor an IP6 " 356 "address.".format(ipname)) 357 except AttributeError: 358 # AF_INET6 not available 359 pass 360 361 raise ValueError("{!r} is not an IPv4 address.".format(ipname)) 362 363 364def _ipaddress_match(cert_ipaddress, host_ip): 365 """Exact matching of IP addresses. 366 367 RFC 6125 explicitly doesn't define an algorithm for this 368 (section 1.7.2 - "Out of Scope"). 369 """ 370 # OpenSSL may add a trailing newline to a subjectAltName's IP address, 371 # commonly with IPv6 addresses. Strip off trailing \n. 372 ip = _inet_paton(cert_ipaddress.rstrip()) 373 return ip == host_ip 374 375 376def match_hostname(cert, hostname): 377 """Verify that *cert* (in decoded format as returned by 378 SSLSocket.getpeercert()) matches the *hostname*. RFC 2818 and RFC 6125 379 rules are followed. 380 381 The function matches IP addresses rather than dNSNames if hostname is a 382 valid ipaddress string. IPv4 addresses are supported on all platforms. 383 IPv6 addresses are supported on platforms with IPv6 support (AF_INET6 384 and inet_pton). 385 386 CertificateError is raised on failure. On success, the function 387 returns nothing. 388 """ 389 warnings.warn( 390 "ssl.match_hostname() is deprecated", 391 category=DeprecationWarning, 392 stacklevel=2 393 ) 394 if not cert: 395 raise ValueError("empty or no certificate, match_hostname needs a " 396 "SSL socket or SSL context with either " 397 "CERT_OPTIONAL or CERT_REQUIRED") 398 try: 399 host_ip = _inet_paton(hostname) 400 except ValueError: 401 # Not an IP address (common case) 402 host_ip = None 403 dnsnames = [] 404 san = cert.get('subjectAltName', ()) 405 for key, value in san: 406 if key == 'DNS': 407 if host_ip is None and _dnsname_match(value, hostname): 408 return 409 dnsnames.append(value) 410 elif key == 'IP Address': 411 if host_ip is not None and _ipaddress_match(value, host_ip): 412 return 413 dnsnames.append(value) 414 if not dnsnames: 415 # The subject is only checked when there is no dNSName entry 416 # in subjectAltName 417 for sub in cert.get('subject', ()): 418 for key, value in sub: 419 # XXX according to RFC 2818, the most specific Common Name 420 # must be used. 421 if key == 'commonName': 422 if _dnsname_match(value, hostname): 423 return 424 dnsnames.append(value) 425 if len(dnsnames) > 1: 426 raise CertificateError("hostname %r " 427 "doesn't match either of %s" 428 % (hostname, ', '.join(map(repr, dnsnames)))) 429 elif len(dnsnames) == 1: 430 raise CertificateError("hostname %r " 431 "doesn't match %r" 432 % (hostname, dnsnames[0])) 433 else: 434 raise CertificateError("no appropriate commonName or " 435 "subjectAltName fields were found") 436 437 438DefaultVerifyPaths = namedtuple("DefaultVerifyPaths", 439 "cafile capath openssl_cafile_env openssl_cafile openssl_capath_env " 440 "openssl_capath") 441 442def get_default_verify_paths(): 443 """Return paths to default cafile and capath. 444 """ 445 parts = _ssl.get_default_verify_paths() 446 447 # environment vars shadow paths 448 cafile = os.environ.get(parts[0], parts[1]) 449 capath = os.environ.get(parts[2], parts[3]) 450 451 return DefaultVerifyPaths(cafile if os.path.isfile(cafile) else None, 452 capath if os.path.isdir(capath) else None, 453 *parts) 454 455 456class _ASN1Object(namedtuple("_ASN1Object", "nid shortname longname oid")): 457 """ASN.1 object identifier lookup 458 """ 459 __slots__ = () 460 461 def __new__(cls, oid): 462 return super().__new__(cls, *_txt2obj(oid, name=False)) 463 464 @classmethod 465 def fromnid(cls, nid): 466 """Create _ASN1Object from OpenSSL numeric ID 467 """ 468 return super().__new__(cls, *_nid2obj(nid)) 469 470 @classmethod 471 def fromname(cls, name): 472 """Create _ASN1Object from short name, long name or OID 473 """ 474 return super().__new__(cls, *_txt2obj(name, name=True)) 475 476 477class Purpose(_ASN1Object, _Enum): 478 """SSLContext purpose flags with X509v3 Extended Key Usage objects 479 """ 480 SERVER_AUTH = '1.3.6.1.5.5.7.3.1' 481 CLIENT_AUTH = '1.3.6.1.5.5.7.3.2' 482 483 484class SSLContext(_SSLContext): 485 """An SSLContext holds various SSL-related configuration options and 486 data, such as certificates and possibly a private key.""" 487 _windows_cert_stores = ("CA", "ROOT") 488 489 sslsocket_class = None # SSLSocket is assigned later. 490 sslobject_class = None # SSLObject is assigned later. 491 492 def __new__(cls, protocol=None, *args, **kwargs): 493 if protocol is None: 494 warnings.warn( 495 "ssl.SSLContext() without protocol argument is deprecated.", 496 category=DeprecationWarning, 497 stacklevel=2 498 ) 499 protocol = PROTOCOL_TLS 500 self = _SSLContext.__new__(cls, protocol) 501 return self 502 503 def _encode_hostname(self, hostname): 504 if hostname is None: 505 return None 506 elif isinstance(hostname, str): 507 return hostname.encode('idna').decode('ascii') 508 else: 509 return hostname.decode('ascii') 510 511 def wrap_socket(self, sock, server_side=False, 512 do_handshake_on_connect=True, 513 suppress_ragged_eofs=True, 514 server_hostname=None, session=None): 515 # SSLSocket class handles server_hostname encoding before it calls 516 # ctx._wrap_socket() 517 return self.sslsocket_class._create( 518 sock=sock, 519 server_side=server_side, 520 do_handshake_on_connect=do_handshake_on_connect, 521 suppress_ragged_eofs=suppress_ragged_eofs, 522 server_hostname=server_hostname, 523 context=self, 524 session=session 525 ) 526 527 def wrap_bio(self, incoming, outgoing, server_side=False, 528 server_hostname=None, session=None): 529 # Need to encode server_hostname here because _wrap_bio() can only 530 # handle ASCII str. 531 return self.sslobject_class._create( 532 incoming, outgoing, server_side=server_side, 533 server_hostname=self._encode_hostname(server_hostname), 534 session=session, context=self, 535 ) 536 537 def set_npn_protocols(self, npn_protocols): 538 warnings.warn( 539 "ssl NPN is deprecated, use ALPN instead", 540 DeprecationWarning, 541 stacklevel=2 542 ) 543 protos = bytearray() 544 for protocol in npn_protocols: 545 b = bytes(protocol, 'ascii') 546 if len(b) == 0 or len(b) > 255: 547 raise SSLError('NPN protocols must be 1 to 255 in length') 548 protos.append(len(b)) 549 protos.extend(b) 550 551 self._set_npn_protocols(protos) 552 553 def set_servername_callback(self, server_name_callback): 554 if server_name_callback is None: 555 self.sni_callback = None 556 else: 557 if not callable(server_name_callback): 558 raise TypeError("not a callable object") 559 560 def shim_cb(sslobj, servername, sslctx): 561 servername = self._encode_hostname(servername) 562 return server_name_callback(sslobj, servername, sslctx) 563 564 self.sni_callback = shim_cb 565 566 def set_alpn_protocols(self, alpn_protocols): 567 protos = bytearray() 568 for protocol in alpn_protocols: 569 b = bytes(protocol, 'ascii') 570 if len(b) == 0 or len(b) > 255: 571 raise SSLError('ALPN protocols must be 1 to 255 in length') 572 protos.append(len(b)) 573 protos.extend(b) 574 575 self._set_alpn_protocols(protos) 576 577 def _load_windows_store_certs(self, storename, purpose): 578 certs = bytearray() 579 try: 580 for cert, encoding, trust in enum_certificates(storename): 581 # CA certs are never PKCS#7 encoded 582 if encoding == "x509_asn": 583 if trust is True or purpose.oid in trust: 584 certs.extend(cert) 585 except PermissionError: 586 warnings.warn("unable to enumerate Windows certificate store") 587 if certs: 588 self.load_verify_locations(cadata=certs) 589 return certs 590 591 def load_default_certs(self, purpose=Purpose.SERVER_AUTH): 592 if not isinstance(purpose, _ASN1Object): 593 raise TypeError(purpose) 594 if sys.platform == "win32": 595 for storename in self._windows_cert_stores: 596 self._load_windows_store_certs(storename, purpose) 597 self.set_default_verify_paths() 598 599 if hasattr(_SSLContext, 'minimum_version'): 600 @property 601 def minimum_version(self): 602 return TLSVersion(super().minimum_version) 603 604 @minimum_version.setter 605 def minimum_version(self, value): 606 if value == TLSVersion.SSLv3: 607 self.options &= ~Options.OP_NO_SSLv3 608 super(SSLContext, SSLContext).minimum_version.__set__(self, value) 609 610 @property 611 def maximum_version(self): 612 return TLSVersion(super().maximum_version) 613 614 @maximum_version.setter 615 def maximum_version(self, value): 616 super(SSLContext, SSLContext).maximum_version.__set__(self, value) 617 618 @property 619 def options(self): 620 return Options(super().options) 621 622 @options.setter 623 def options(self, value): 624 super(SSLContext, SSLContext).options.__set__(self, value) 625 626 if hasattr(_ssl, 'HOSTFLAG_NEVER_CHECK_SUBJECT'): 627 @property 628 def hostname_checks_common_name(self): 629 ncs = self._host_flags & _ssl.HOSTFLAG_NEVER_CHECK_SUBJECT 630 return ncs != _ssl.HOSTFLAG_NEVER_CHECK_SUBJECT 631 632 @hostname_checks_common_name.setter 633 def hostname_checks_common_name(self, value): 634 if value: 635 self._host_flags &= ~_ssl.HOSTFLAG_NEVER_CHECK_SUBJECT 636 else: 637 self._host_flags |= _ssl.HOSTFLAG_NEVER_CHECK_SUBJECT 638 else: 639 @property 640 def hostname_checks_common_name(self): 641 return True 642 643 @property 644 def _msg_callback(self): 645 """TLS message callback 646 647 The message callback provides a debugging hook to analyze TLS 648 connections. The callback is called for any TLS protocol message 649 (header, handshake, alert, and more), but not for application data. 650 Due to technical limitations, the callback can't be used to filter 651 traffic or to abort a connection. Any exception raised in the 652 callback is delayed until the handshake, read, or write operation 653 has been performed. 654 655 def msg_cb(conn, direction, version, content_type, msg_type, data): 656 pass 657 658 conn 659 :class:`SSLSocket` or :class:`SSLObject` instance 660 direction 661 ``read`` or ``write`` 662 version 663 :class:`TLSVersion` enum member or int for unknown version. For a 664 frame header, it's the header version. 665 content_type 666 :class:`_TLSContentType` enum member or int for unsupported 667 content type. 668 msg_type 669 Either a :class:`_TLSContentType` enum number for a header 670 message, a :class:`_TLSAlertType` enum member for an alert 671 message, a :class:`_TLSMessageType` enum member for other 672 messages, or int for unsupported message types. 673 data 674 Raw, decrypted message content as bytes 675 """ 676 inner = super()._msg_callback 677 if inner is not None: 678 return inner.user_function 679 else: 680 return None 681 682 @_msg_callback.setter 683 def _msg_callback(self, callback): 684 if callback is None: 685 super(SSLContext, SSLContext)._msg_callback.__set__(self, None) 686 return 687 688 if not hasattr(callback, '__call__'): 689 raise TypeError(f"{callback} is not callable.") 690 691 def inner(conn, direction, version, content_type, msg_type, data): 692 try: 693 version = TLSVersion(version) 694 except ValueError: 695 pass 696 697 try: 698 content_type = _TLSContentType(content_type) 699 except ValueError: 700 pass 701 702 if content_type == _TLSContentType.HEADER: 703 msg_enum = _TLSContentType 704 elif content_type == _TLSContentType.ALERT: 705 msg_enum = _TLSAlertType 706 else: 707 msg_enum = _TLSMessageType 708 try: 709 msg_type = msg_enum(msg_type) 710 except ValueError: 711 pass 712 713 return callback(conn, direction, version, 714 content_type, msg_type, data) 715 716 inner.user_function = callback 717 718 super(SSLContext, SSLContext)._msg_callback.__set__(self, inner) 719 720 @property 721 def protocol(self): 722 return _SSLMethod(super().protocol) 723 724 @property 725 def verify_flags(self): 726 return VerifyFlags(super().verify_flags) 727 728 @verify_flags.setter 729 def verify_flags(self, value): 730 super(SSLContext, SSLContext).verify_flags.__set__(self, value) 731 732 @property 733 def verify_mode(self): 734 value = super().verify_mode 735 try: 736 return VerifyMode(value) 737 except ValueError: 738 return value 739 740 @verify_mode.setter 741 def verify_mode(self, value): 742 super(SSLContext, SSLContext).verify_mode.__set__(self, value) 743 744 745def create_default_context(purpose=Purpose.SERVER_AUTH, *, cafile=None, 746 capath=None, cadata=None): 747 """Create a SSLContext object with default settings. 748 749 NOTE: The protocol and settings may change anytime without prior 750 deprecation. The values represent a fair balance between maximum 751 compatibility and security. 752 """ 753 if not isinstance(purpose, _ASN1Object): 754 raise TypeError(purpose) 755 756 # SSLContext sets OP_NO_SSLv2, OP_NO_SSLv3, OP_NO_COMPRESSION, 757 # OP_CIPHER_SERVER_PREFERENCE, OP_SINGLE_DH_USE and OP_SINGLE_ECDH_USE 758 # by default. 759 if purpose == Purpose.SERVER_AUTH: 760 # verify certs and host name in client mode 761 context = SSLContext(PROTOCOL_TLS_CLIENT) 762 context.verify_mode = CERT_REQUIRED 763 context.check_hostname = True 764 elif purpose == Purpose.CLIENT_AUTH: 765 context = SSLContext(PROTOCOL_TLS_SERVER) 766 else: 767 raise ValueError(purpose) 768 769 if cafile or capath or cadata: 770 context.load_verify_locations(cafile, capath, cadata) 771 elif context.verify_mode != CERT_NONE: 772 # no explicit cafile, capath or cadata but the verify mode is 773 # CERT_OPTIONAL or CERT_REQUIRED. Let's try to load default system 774 # root CA certificates for the given purpose. This may fail silently. 775 context.load_default_certs(purpose) 776 # OpenSSL 1.1.1 keylog file 777 if hasattr(context, 'keylog_filename'): 778 keylogfile = os.environ.get('SSLKEYLOGFILE') 779 if keylogfile and not sys.flags.ignore_environment: 780 context.keylog_filename = keylogfile 781 return context 782 783def _create_unverified_context(protocol=None, *, cert_reqs=CERT_NONE, 784 check_hostname=False, purpose=Purpose.SERVER_AUTH, 785 certfile=None, keyfile=None, 786 cafile=None, capath=None, cadata=None): 787 """Create a SSLContext object for Python stdlib modules 788 789 All Python stdlib modules shall use this function to create SSLContext 790 objects in order to keep common settings in one place. The configuration 791 is less restrict than create_default_context()'s to increase backward 792 compatibility. 793 """ 794 if not isinstance(purpose, _ASN1Object): 795 raise TypeError(purpose) 796 797 # SSLContext sets OP_NO_SSLv2, OP_NO_SSLv3, OP_NO_COMPRESSION, 798 # OP_CIPHER_SERVER_PREFERENCE, OP_SINGLE_DH_USE and OP_SINGLE_ECDH_USE 799 # by default. 800 if purpose == Purpose.SERVER_AUTH: 801 # verify certs and host name in client mode 802 if protocol is None: 803 protocol = PROTOCOL_TLS_CLIENT 804 elif purpose == Purpose.CLIENT_AUTH: 805 if protocol is None: 806 protocol = PROTOCOL_TLS_SERVER 807 else: 808 raise ValueError(purpose) 809 810 context = SSLContext(protocol) 811 context.check_hostname = check_hostname 812 if cert_reqs is not None: 813 context.verify_mode = cert_reqs 814 if check_hostname: 815 context.check_hostname = True 816 817 if keyfile and not certfile: 818 raise ValueError("certfile must be specified") 819 if certfile or keyfile: 820 context.load_cert_chain(certfile, keyfile) 821 822 # load CA root certs 823 if cafile or capath or cadata: 824 context.load_verify_locations(cafile, capath, cadata) 825 elif context.verify_mode != CERT_NONE: 826 # no explicit cafile, capath or cadata but the verify mode is 827 # CERT_OPTIONAL or CERT_REQUIRED. Let's try to load default system 828 # root CA certificates for the given purpose. This may fail silently. 829 context.load_default_certs(purpose) 830 # OpenSSL 1.1.1 keylog file 831 if hasattr(context, 'keylog_filename'): 832 keylogfile = os.environ.get('SSLKEYLOGFILE') 833 if keylogfile and not sys.flags.ignore_environment: 834 context.keylog_filename = keylogfile 835 return context 836 837# Used by http.client if no context is explicitly passed. 838_create_default_https_context = create_default_context 839 840 841# Backwards compatibility alias, even though it's not a public name. 842_create_stdlib_context = _create_unverified_context 843 844 845class SSLObject: 846 """This class implements an interface on top of a low-level SSL object as 847 implemented by OpenSSL. This object captures the state of an SSL connection 848 but does not provide any network IO itself. IO needs to be performed 849 through separate "BIO" objects which are OpenSSL's IO abstraction layer. 850 851 This class does not have a public constructor. Instances are returned by 852 ``SSLContext.wrap_bio``. This class is typically used by framework authors 853 that want to implement asynchronous IO for SSL through memory buffers. 854 855 When compared to ``SSLSocket``, this object lacks the following features: 856 857 * Any form of network IO, including methods such as ``recv`` and ``send``. 858 * The ``do_handshake_on_connect`` and ``suppress_ragged_eofs`` machinery. 859 """ 860 def __init__(self, *args, **kwargs): 861 raise TypeError( 862 f"{self.__class__.__name__} does not have a public " 863 f"constructor. Instances are returned by SSLContext.wrap_bio()." 864 ) 865 866 @classmethod 867 def _create(cls, incoming, outgoing, server_side=False, 868 server_hostname=None, session=None, context=None): 869 self = cls.__new__(cls) 870 sslobj = context._wrap_bio( 871 incoming, outgoing, server_side=server_side, 872 server_hostname=server_hostname, 873 owner=self, session=session 874 ) 875 self._sslobj = sslobj 876 return self 877 878 @property 879 def context(self): 880 """The SSLContext that is currently in use.""" 881 return self._sslobj.context 882 883 @context.setter 884 def context(self, ctx): 885 self._sslobj.context = ctx 886 887 @property 888 def session(self): 889 """The SSLSession for client socket.""" 890 return self._sslobj.session 891 892 @session.setter 893 def session(self, session): 894 self._sslobj.session = session 895 896 @property 897 def session_reused(self): 898 """Was the client session reused during handshake""" 899 return self._sslobj.session_reused 900 901 @property 902 def server_side(self): 903 """Whether this is a server-side socket.""" 904 return self._sslobj.server_side 905 906 @property 907 def server_hostname(self): 908 """The currently set server hostname (for SNI), or ``None`` if no 909 server hostname is set.""" 910 return self._sslobj.server_hostname 911 912 def read(self, len=1024, buffer=None): 913 """Read up to 'len' bytes from the SSL object and return them. 914 915 If 'buffer' is provided, read into this buffer and return the number of 916 bytes read. 917 """ 918 if buffer is not None: 919 v = self._sslobj.read(len, buffer) 920 else: 921 v = self._sslobj.read(len) 922 return v 923 924 def write(self, data): 925 """Write 'data' to the SSL object and return the number of bytes 926 written. 927 928 The 'data' argument must support the buffer interface. 929 """ 930 return self._sslobj.write(data) 931 932 def getpeercert(self, binary_form=False): 933 """Returns a formatted version of the data in the certificate provided 934 by the other end of the SSL channel. 935 936 Return None if no certificate was provided, {} if a certificate was 937 provided, but not validated. 938 """ 939 return self._sslobj.getpeercert(binary_form) 940 941 def selected_npn_protocol(self): 942 """Return the currently selected NPN protocol as a string, or ``None`` 943 if a next protocol was not negotiated or if NPN is not supported by one 944 of the peers.""" 945 warnings.warn( 946 "ssl NPN is deprecated, use ALPN instead", 947 DeprecationWarning, 948 stacklevel=2 949 ) 950 951 def selected_alpn_protocol(self): 952 """Return the currently selected ALPN protocol as a string, or ``None`` 953 if a next protocol was not negotiated or if ALPN is not supported by one 954 of the peers.""" 955 return self._sslobj.selected_alpn_protocol() 956 957 def cipher(self): 958 """Return the currently selected cipher as a 3-tuple ``(name, 959 ssl_version, secret_bits)``.""" 960 return self._sslobj.cipher() 961 962 def shared_ciphers(self): 963 """Return a list of ciphers shared by the client during the handshake or 964 None if this is not a valid server connection. 965 """ 966 return self._sslobj.shared_ciphers() 967 968 def compression(self): 969 """Return the current compression algorithm in use, or ``None`` if 970 compression was not negotiated or not supported by one of the peers.""" 971 return self._sslobj.compression() 972 973 def pending(self): 974 """Return the number of bytes that can be read immediately.""" 975 return self._sslobj.pending() 976 977 def do_handshake(self): 978 """Start the SSL/TLS handshake.""" 979 self._sslobj.do_handshake() 980 981 def unwrap(self): 982 """Start the SSL shutdown handshake.""" 983 return self._sslobj.shutdown() 984 985 def get_channel_binding(self, cb_type="tls-unique"): 986 """Get channel binding data for current connection. Raise ValueError 987 if the requested `cb_type` is not supported. Return bytes of the data 988 or None if the data is not available (e.g. before the handshake).""" 989 return self._sslobj.get_channel_binding(cb_type) 990 991 def version(self): 992 """Return a string identifying the protocol version used by the 993 current SSL channel. """ 994 return self._sslobj.version() 995 996 def verify_client_post_handshake(self): 997 return self._sslobj.verify_client_post_handshake() 998 999 1000def _sslcopydoc(func): 1001 """Copy docstring from SSLObject to SSLSocket""" 1002 func.__doc__ = getattr(SSLObject, func.__name__).__doc__ 1003 return func 1004 1005 1006class SSLSocket(socket): 1007 """This class implements a subtype of socket.socket that wraps 1008 the underlying OS socket in an SSL context when necessary, and 1009 provides read and write methods over that channel. """ 1010 1011 def __init__(self, *args, **kwargs): 1012 raise TypeError( 1013 f"{self.__class__.__name__} does not have a public " 1014 f"constructor. Instances are returned by " 1015 f"SSLContext.wrap_socket()." 1016 ) 1017 1018 @classmethod 1019 def _create(cls, sock, server_side=False, do_handshake_on_connect=True, 1020 suppress_ragged_eofs=True, server_hostname=None, 1021 context=None, session=None): 1022 if sock.getsockopt(SOL_SOCKET, SO_TYPE) != SOCK_STREAM: 1023 raise NotImplementedError("only stream sockets are supported") 1024 if server_side: 1025 if server_hostname: 1026 raise ValueError("server_hostname can only be specified " 1027 "in client mode") 1028 if session is not None: 1029 raise ValueError("session can only be specified in " 1030 "client mode") 1031 if context.check_hostname and not server_hostname: 1032 raise ValueError("check_hostname requires server_hostname") 1033 1034 kwargs = dict( 1035 family=sock.family, type=sock.type, proto=sock.proto, 1036 fileno=sock.fileno() 1037 ) 1038 self = cls.__new__(cls, **kwargs) 1039 super(SSLSocket, self).__init__(**kwargs) 1040 self.settimeout(sock.gettimeout()) 1041 sock.detach() 1042 1043 self._context = context 1044 self._session = session 1045 self._closed = False 1046 self._sslobj = None 1047 self.server_side = server_side 1048 self.server_hostname = context._encode_hostname(server_hostname) 1049 self.do_handshake_on_connect = do_handshake_on_connect 1050 self.suppress_ragged_eofs = suppress_ragged_eofs 1051 1052 # See if we are connected 1053 try: 1054 self.getpeername() 1055 except OSError as e: 1056 if e.errno != errno.ENOTCONN: 1057 raise 1058 connected = False 1059 else: 1060 connected = True 1061 1062 self._connected = connected 1063 if connected: 1064 # create the SSL object 1065 try: 1066 self._sslobj = self._context._wrap_socket( 1067 self, server_side, self.server_hostname, 1068 owner=self, session=self._session, 1069 ) 1070 if do_handshake_on_connect: 1071 timeout = self.gettimeout() 1072 if timeout == 0.0: 1073 # non-blocking 1074 raise ValueError("do_handshake_on_connect should not be specified for non-blocking sockets") 1075 self.do_handshake() 1076 except (OSError, ValueError): 1077 self.close() 1078 raise 1079 return self 1080 1081 @property 1082 @_sslcopydoc 1083 def context(self): 1084 return self._context 1085 1086 @context.setter 1087 def context(self, ctx): 1088 self._context = ctx 1089 self._sslobj.context = ctx 1090 1091 @property 1092 @_sslcopydoc 1093 def session(self): 1094 if self._sslobj is not None: 1095 return self._sslobj.session 1096 1097 @session.setter 1098 def session(self, session): 1099 self._session = session 1100 if self._sslobj is not None: 1101 self._sslobj.session = session 1102 1103 @property 1104 @_sslcopydoc 1105 def session_reused(self): 1106 if self._sslobj is not None: 1107 return self._sslobj.session_reused 1108 1109 def dup(self): 1110 raise NotImplementedError("Can't dup() %s instances" % 1111 self.__class__.__name__) 1112 1113 def _checkClosed(self, msg=None): 1114 # raise an exception here if you wish to check for spurious closes 1115 pass 1116 1117 def _check_connected(self): 1118 if not self._connected: 1119 # getpeername() will raise ENOTCONN if the socket is really 1120 # not connected; note that we can be connected even without 1121 # _connected being set, e.g. if connect() first returned 1122 # EAGAIN. 1123 self.getpeername() 1124 1125 def read(self, len=1024, buffer=None): 1126 """Read up to LEN bytes and return them. 1127 Return zero-length string on EOF.""" 1128 1129 self._checkClosed() 1130 if self._sslobj is None: 1131 raise ValueError("Read on closed or unwrapped SSL socket.") 1132 try: 1133 if buffer is not None: 1134 return self._sslobj.read(len, buffer) 1135 else: 1136 return self._sslobj.read(len) 1137 except SSLError as x: 1138 if x.args[0] == SSL_ERROR_EOF and self.suppress_ragged_eofs: 1139 if buffer is not None: 1140 return 0 1141 else: 1142 return b'' 1143 else: 1144 raise 1145 1146 def write(self, data): 1147 """Write DATA to the underlying SSL channel. Returns 1148 number of bytes of DATA actually transmitted.""" 1149 1150 self._checkClosed() 1151 if self._sslobj is None: 1152 raise ValueError("Write on closed or unwrapped SSL socket.") 1153 return self._sslobj.write(data) 1154 1155 @_sslcopydoc 1156 def getpeercert(self, binary_form=False): 1157 self._checkClosed() 1158 self._check_connected() 1159 return self._sslobj.getpeercert(binary_form) 1160 1161 @_sslcopydoc 1162 def selected_npn_protocol(self): 1163 self._checkClosed() 1164 warnings.warn( 1165 "ssl NPN is deprecated, use ALPN instead", 1166 DeprecationWarning, 1167 stacklevel=2 1168 ) 1169 return None 1170 1171 @_sslcopydoc 1172 def selected_alpn_protocol(self): 1173 self._checkClosed() 1174 if self._sslobj is None or not _ssl.HAS_ALPN: 1175 return None 1176 else: 1177 return self._sslobj.selected_alpn_protocol() 1178 1179 @_sslcopydoc 1180 def cipher(self): 1181 self._checkClosed() 1182 if self._sslobj is None: 1183 return None 1184 else: 1185 return self._sslobj.cipher() 1186 1187 @_sslcopydoc 1188 def shared_ciphers(self): 1189 self._checkClosed() 1190 if self._sslobj is None: 1191 return None 1192 else: 1193 return self._sslobj.shared_ciphers() 1194 1195 @_sslcopydoc 1196 def compression(self): 1197 self._checkClosed() 1198 if self._sslobj is None: 1199 return None 1200 else: 1201 return self._sslobj.compression() 1202 1203 def send(self, data, flags=0): 1204 self._checkClosed() 1205 if self._sslobj is not None: 1206 if flags != 0: 1207 raise ValueError( 1208 "non-zero flags not allowed in calls to send() on %s" % 1209 self.__class__) 1210 return self._sslobj.write(data) 1211 else: 1212 return super().send(data, flags) 1213 1214 def sendto(self, data, flags_or_addr, addr=None): 1215 self._checkClosed() 1216 if self._sslobj is not None: 1217 raise ValueError("sendto not allowed on instances of %s" % 1218 self.__class__) 1219 elif addr is None: 1220 return super().sendto(data, flags_or_addr) 1221 else: 1222 return super().sendto(data, flags_or_addr, addr) 1223 1224 def sendmsg(self, *args, **kwargs): 1225 # Ensure programs don't send data unencrypted if they try to 1226 # use this method. 1227 raise NotImplementedError("sendmsg not allowed on instances of %s" % 1228 self.__class__) 1229 1230 def sendall(self, data, flags=0): 1231 self._checkClosed() 1232 if self._sslobj is not None: 1233 if flags != 0: 1234 raise ValueError( 1235 "non-zero flags not allowed in calls to sendall() on %s" % 1236 self.__class__) 1237 count = 0 1238 with memoryview(data) as view, view.cast("B") as byte_view: 1239 amount = len(byte_view) 1240 while count < amount: 1241 v = self.send(byte_view[count:]) 1242 count += v 1243 else: 1244 return super().sendall(data, flags) 1245 1246 def sendfile(self, file, offset=0, count=None): 1247 """Send a file, possibly by using os.sendfile() if this is a 1248 clear-text socket. Return the total number of bytes sent. 1249 """ 1250 if self._sslobj is not None: 1251 return self._sendfile_use_send(file, offset, count) 1252 else: 1253 # os.sendfile() works with plain sockets only 1254 return super().sendfile(file, offset, count) 1255 1256 def recv(self, buflen=1024, flags=0): 1257 self._checkClosed() 1258 if self._sslobj is not None: 1259 if flags != 0: 1260 raise ValueError( 1261 "non-zero flags not allowed in calls to recv() on %s" % 1262 self.__class__) 1263 return self.read(buflen) 1264 else: 1265 return super().recv(buflen, flags) 1266 1267 def recv_into(self, buffer, nbytes=None, flags=0): 1268 self._checkClosed() 1269 if buffer and (nbytes is None): 1270 nbytes = len(buffer) 1271 elif nbytes is None: 1272 nbytes = 1024 1273 if self._sslobj is not None: 1274 if flags != 0: 1275 raise ValueError( 1276 "non-zero flags not allowed in calls to recv_into() on %s" % 1277 self.__class__) 1278 return self.read(nbytes, buffer) 1279 else: 1280 return super().recv_into(buffer, nbytes, flags) 1281 1282 def recvfrom(self, buflen=1024, flags=0): 1283 self._checkClosed() 1284 if self._sslobj is not None: 1285 raise ValueError("recvfrom not allowed on instances of %s" % 1286 self.__class__) 1287 else: 1288 return super().recvfrom(buflen, flags) 1289 1290 def recvfrom_into(self, buffer, nbytes=None, flags=0): 1291 self._checkClosed() 1292 if self._sslobj is not None: 1293 raise ValueError("recvfrom_into not allowed on instances of %s" % 1294 self.__class__) 1295 else: 1296 return super().recvfrom_into(buffer, nbytes, flags) 1297 1298 def recvmsg(self, *args, **kwargs): 1299 raise NotImplementedError("recvmsg not allowed on instances of %s" % 1300 self.__class__) 1301 1302 def recvmsg_into(self, *args, **kwargs): 1303 raise NotImplementedError("recvmsg_into not allowed on instances of " 1304 "%s" % self.__class__) 1305 1306 @_sslcopydoc 1307 def pending(self): 1308 self._checkClosed() 1309 if self._sslobj is not None: 1310 return self._sslobj.pending() 1311 else: 1312 return 0 1313 1314 def shutdown(self, how): 1315 self._checkClosed() 1316 self._sslobj = None 1317 super().shutdown(how) 1318 1319 @_sslcopydoc 1320 def unwrap(self): 1321 if self._sslobj: 1322 s = self._sslobj.shutdown() 1323 self._sslobj = None 1324 return s 1325 else: 1326 raise ValueError("No SSL wrapper around " + str(self)) 1327 1328 @_sslcopydoc 1329 def verify_client_post_handshake(self): 1330 if self._sslobj: 1331 return self._sslobj.verify_client_post_handshake() 1332 else: 1333 raise ValueError("No SSL wrapper around " + str(self)) 1334 1335 def _real_close(self): 1336 self._sslobj = None 1337 super()._real_close() 1338 1339 @_sslcopydoc 1340 def do_handshake(self, block=False): 1341 self._check_connected() 1342 timeout = self.gettimeout() 1343 try: 1344 if timeout == 0.0 and block: 1345 self.settimeout(None) 1346 self._sslobj.do_handshake() 1347 finally: 1348 self.settimeout(timeout) 1349 1350 def _real_connect(self, addr, connect_ex): 1351 if self.server_side: 1352 raise ValueError("can't connect in server-side mode") 1353 # Here we assume that the socket is client-side, and not 1354 # connected at the time of the call. We connect it, then wrap it. 1355 if self._connected or self._sslobj is not None: 1356 raise ValueError("attempt to connect already-connected SSLSocket!") 1357 self._sslobj = self.context._wrap_socket( 1358 self, False, self.server_hostname, 1359 owner=self, session=self._session 1360 ) 1361 try: 1362 if connect_ex: 1363 rc = super().connect_ex(addr) 1364 else: 1365 rc = None 1366 super().connect(addr) 1367 if not rc: 1368 self._connected = True 1369 if self.do_handshake_on_connect: 1370 self.do_handshake() 1371 return rc 1372 except (OSError, ValueError): 1373 self._sslobj = None 1374 raise 1375 1376 def connect(self, addr): 1377 """Connects to remote ADDR, and then wraps the connection in 1378 an SSL channel.""" 1379 self._real_connect(addr, False) 1380 1381 def connect_ex(self, addr): 1382 """Connects to remote ADDR, and then wraps the connection in 1383 an SSL channel.""" 1384 return self._real_connect(addr, True) 1385 1386 def accept(self): 1387 """Accepts a new connection from a remote client, and returns 1388 a tuple containing that new connection wrapped with a server-side 1389 SSL channel, and the address of the remote client.""" 1390 1391 newsock, addr = super().accept() 1392 newsock = self.context.wrap_socket(newsock, 1393 do_handshake_on_connect=self.do_handshake_on_connect, 1394 suppress_ragged_eofs=self.suppress_ragged_eofs, 1395 server_side=True) 1396 return newsock, addr 1397 1398 @_sslcopydoc 1399 def get_channel_binding(self, cb_type="tls-unique"): 1400 if self._sslobj is not None: 1401 return self._sslobj.get_channel_binding(cb_type) 1402 else: 1403 if cb_type not in CHANNEL_BINDING_TYPES: 1404 raise ValueError( 1405 "{0} channel binding type not implemented".format(cb_type) 1406 ) 1407 return None 1408 1409 @_sslcopydoc 1410 def version(self): 1411 if self._sslobj is not None: 1412 return self._sslobj.version() 1413 else: 1414 return None 1415 1416 1417# Python does not support forward declaration of types. 1418SSLContext.sslsocket_class = SSLSocket 1419SSLContext.sslobject_class = SSLObject 1420 1421 1422def wrap_socket(sock, keyfile=None, certfile=None, 1423 server_side=False, cert_reqs=CERT_NONE, 1424 ssl_version=PROTOCOL_TLS, ca_certs=None, 1425 do_handshake_on_connect=True, 1426 suppress_ragged_eofs=True, 1427 ciphers=None): 1428 warnings.warn( 1429 "ssl.wrap_socket() is deprecated, use SSLContext.wrap_socket()", 1430 category=DeprecationWarning, 1431 stacklevel=2 1432 ) 1433 if server_side and not certfile: 1434 raise ValueError("certfile must be specified for server-side " 1435 "operations") 1436 if keyfile and not certfile: 1437 raise ValueError("certfile must be specified") 1438 context = SSLContext(ssl_version) 1439 context.verify_mode = cert_reqs 1440 if ca_certs: 1441 context.load_verify_locations(ca_certs) 1442 if certfile: 1443 context.load_cert_chain(certfile, keyfile) 1444 if ciphers: 1445 context.set_ciphers(ciphers) 1446 return context.wrap_socket( 1447 sock=sock, server_side=server_side, 1448 do_handshake_on_connect=do_handshake_on_connect, 1449 suppress_ragged_eofs=suppress_ragged_eofs 1450 ) 1451 1452# some utility functions 1453 1454def cert_time_to_seconds(cert_time): 1455 """Return the time in seconds since the Epoch, given the timestring 1456 representing the "notBefore" or "notAfter" date from a certificate 1457 in ``"%b %d %H:%M:%S %Y %Z"`` strptime format (C locale). 1458 1459 "notBefore" or "notAfter" dates must use UTC (RFC 5280). 1460 1461 Month is one of: Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec 1462 UTC should be specified as GMT (see ASN1_TIME_print()) 1463 """ 1464 from time import strptime 1465 from calendar import timegm 1466 1467 months = ( 1468 "Jan","Feb","Mar","Apr","May","Jun", 1469 "Jul","Aug","Sep","Oct","Nov","Dec" 1470 ) 1471 time_format = ' %d %H:%M:%S %Y GMT' # NOTE: no month, fixed GMT 1472 try: 1473 month_number = months.index(cert_time[:3].title()) + 1 1474 except ValueError: 1475 raise ValueError('time data %r does not match ' 1476 'format "%%b%s"' % (cert_time, time_format)) 1477 else: 1478 # found valid month 1479 tt = strptime(cert_time[3:], time_format) 1480 # return an integer, the previous mktime()-based implementation 1481 # returned a float (fractional seconds are always zero here). 1482 return timegm((tt[0], month_number) + tt[2:6]) 1483 1484PEM_HEADER = "-----BEGIN CERTIFICATE-----" 1485PEM_FOOTER = "-----END CERTIFICATE-----" 1486 1487def DER_cert_to_PEM_cert(der_cert_bytes): 1488 """Takes a certificate in binary DER format and returns the 1489 PEM version of it as a string.""" 1490 1491 f = str(base64.standard_b64encode(der_cert_bytes), 'ASCII', 'strict') 1492 ss = [PEM_HEADER] 1493 ss += [f[i:i+64] for i in range(0, len(f), 64)] 1494 ss.append(PEM_FOOTER + '\n') 1495 return '\n'.join(ss) 1496 1497def PEM_cert_to_DER_cert(pem_cert_string): 1498 """Takes a certificate in ASCII PEM format and returns the 1499 DER-encoded version of it as a byte sequence""" 1500 1501 if not pem_cert_string.startswith(PEM_HEADER): 1502 raise ValueError("Invalid PEM encoding; must start with %s" 1503 % PEM_HEADER) 1504 if not pem_cert_string.strip().endswith(PEM_FOOTER): 1505 raise ValueError("Invalid PEM encoding; must end with %s" 1506 % PEM_FOOTER) 1507 d = pem_cert_string.strip()[len(PEM_HEADER):-len(PEM_FOOTER)] 1508 return base64.decodebytes(d.encode('ASCII', 'strict')) 1509 1510def get_server_certificate(addr, ssl_version=PROTOCOL_TLS_CLIENT, 1511 ca_certs=None, timeout=_GLOBAL_DEFAULT_TIMEOUT): 1512 """Retrieve the certificate from the server at the specified address, 1513 and return it as a PEM-encoded string. 1514 If 'ca_certs' is specified, validate the server cert against it. 1515 If 'ssl_version' is specified, use it in the connection attempt. 1516 If 'timeout' is specified, use it in the connection attempt. 1517 """ 1518 1519 host, port = addr 1520 if ca_certs is not None: 1521 cert_reqs = CERT_REQUIRED 1522 else: 1523 cert_reqs = CERT_NONE 1524 context = _create_stdlib_context(ssl_version, 1525 cert_reqs=cert_reqs, 1526 cafile=ca_certs) 1527 with create_connection(addr, timeout=timeout) as sock: 1528 with context.wrap_socket(sock, server_hostname=host) as sslsock: 1529 dercert = sslsock.getpeercert(True) 1530 return DER_cert_to_PEM_cert(dercert) 1531 1532def get_protocol_name(protocol_code): 1533 return _PROTOCOL_NAMES.get(protocol_code, '<unknown>') 1534