1# -*- coding: utf-8 -*- 2# Test the support for SSL and sockets 3 4import sys 5import unittest 6from test import test_support as support 7from test.script_helper import assert_python_ok 8import asyncore 9import socket 10import select 11import time 12import datetime 13import gc 14import os 15import errno 16import pprint 17import shutil 18import urllib2 19import traceback 20import weakref 21import platform 22import functools 23from contextlib import closing 24 25ssl = support.import_module("ssl") 26 27PROTOCOLS = sorted(ssl._PROTOCOL_NAMES) 28HOST = support.HOST 29IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL') 30IS_OPENSSL_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0) 31 32 33def data_file(*name): 34 return os.path.join(os.path.dirname(__file__), *name) 35 36# The custom key and certificate files used in test_ssl are generated 37# using Lib/test/make_ssl_certs.py. 38# Other certificates are simply fetched from the Internet servers they 39# are meant to authenticate. 40 41CERTFILE = data_file("keycert.pem") 42BYTES_CERTFILE = CERTFILE.encode(sys.getfilesystemencoding()) 43ONLYCERT = data_file("ssl_cert.pem") 44ONLYKEY = data_file("ssl_key.pem") 45BYTES_ONLYCERT = ONLYCERT.encode(sys.getfilesystemencoding()) 46BYTES_ONLYKEY = ONLYKEY.encode(sys.getfilesystemencoding()) 47CERTFILE_PROTECTED = data_file("keycert.passwd.pem") 48ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem") 49KEY_PASSWORD = "somepass" 50CAPATH = data_file("capath") 51BYTES_CAPATH = CAPATH.encode(sys.getfilesystemencoding()) 52CAFILE_NEURONIO = data_file("capath", "4e1295a3.0") 53CAFILE_CACERT = data_file("capath", "5ed36f99.0") 54 55 56# empty CRL 57CRLFILE = data_file("revocation.crl") 58 59# Two keys and certs signed by the same CA (for SNI tests) 60SIGNED_CERTFILE = data_file("keycert3.pem") 61SIGNED_CERTFILE2 = data_file("keycert4.pem") 62SIGNING_CA = data_file("pycacert.pem") 63# cert with all kinds of subject alt names 64ALLSANFILE = data_file("allsans.pem") 65 66REMOTE_HOST = "self-signed.pythontest.net" 67REMOTE_ROOT_CERT = data_file("selfsigned_pythontestdotnet.pem") 68 69EMPTYCERT = data_file("nullcert.pem") 70BADCERT = data_file("badcert.pem") 71NONEXISTINGCERT = data_file("XXXnonexisting.pem") 72BADKEY = data_file("badkey.pem") 73NOKIACERT = data_file("nokia.pem") 74NULLBYTECERT = data_file("nullbytecert.pem") 75 76DHFILE = data_file("ffdh3072.pem") 77BYTES_DHFILE = DHFILE.encode(sys.getfilesystemencoding()) 78 79 80def handle_error(prefix): 81 exc_format = ' '.join(traceback.format_exception(*sys.exc_info())) 82 if support.verbose: 83 sys.stdout.write(prefix + exc_format) 84 85 86class BasicTests(unittest.TestCase): 87 88 def test_sslwrap_simple(self): 89 # A crude test for the legacy API 90 try: 91 ssl.sslwrap_simple(socket.socket(socket.AF_INET)) 92 except IOError, e: 93 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that 94 pass 95 else: 96 raise 97 try: 98 ssl.sslwrap_simple(socket.socket(socket.AF_INET)._sock) 99 except IOError, e: 100 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that 101 pass 102 else: 103 raise 104 105 106def can_clear_options(): 107 # 0.9.8m or higher 108 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15) 109 110def no_sslv2_implies_sslv3_hello(): 111 # 0.9.7h or higher 112 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15) 113 114def have_verify_flags(): 115 # 0.9.8 or higher 116 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15) 117 118def utc_offset(): #NOTE: ignore issues like #1647654 119 # local time = utc time + utc offset 120 if time.daylight and time.localtime().tm_isdst > 0: 121 return -time.altzone # seconds 122 return -time.timezone 123 124def asn1time(cert_time): 125 # Some versions of OpenSSL ignore seconds, see #18207 126 # 0.9.8.i 127 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15): 128 fmt = "%b %d %H:%M:%S %Y GMT" 129 dt = datetime.datetime.strptime(cert_time, fmt) 130 dt = dt.replace(second=0) 131 cert_time = dt.strftime(fmt) 132 # %d adds leading zero but ASN1_TIME_print() uses leading space 133 if cert_time[4] == "0": 134 cert_time = cert_time[:4] + " " + cert_time[5:] 135 136 return cert_time 137 138# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2 139def skip_if_broken_ubuntu_ssl(func): 140 if hasattr(ssl, 'PROTOCOL_SSLv2'): 141 @functools.wraps(func) 142 def f(*args, **kwargs): 143 try: 144 ssl.SSLContext(ssl.PROTOCOL_SSLv2) 145 except ssl.SSLError: 146 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and 147 platform.linux_distribution() == ('debian', 'squeeze/sid', '')): 148 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour") 149 return func(*args, **kwargs) 150 return f 151 else: 152 return func 153 154needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test") 155 156 157class BasicSocketTests(unittest.TestCase): 158 159 def test_constants(self): 160 ssl.CERT_NONE 161 ssl.CERT_OPTIONAL 162 ssl.CERT_REQUIRED 163 ssl.OP_CIPHER_SERVER_PREFERENCE 164 ssl.OP_SINGLE_DH_USE 165 if ssl.HAS_ECDH: 166 ssl.OP_SINGLE_ECDH_USE 167 if ssl.OPENSSL_VERSION_INFO >= (1, 0): 168 ssl.OP_NO_COMPRESSION 169 self.assertIn(ssl.HAS_SNI, {True, False}) 170 self.assertIn(ssl.HAS_ECDH, {True, False}) 171 ssl.OP_NO_SSLv2 172 ssl.OP_NO_SSLv3 173 ssl.OP_NO_TLSv1 174 ssl.OP_NO_TLSv1_3 175 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1): 176 ssl.OP_NO_TLSv1_1 177 ssl.OP_NO_TLSv1_2 178 179 def test_random(self): 180 v = ssl.RAND_status() 181 if support.verbose: 182 sys.stdout.write("\n RAND_status is %d (%s)\n" 183 % (v, (v and "sufficient randomness") or 184 "insufficient randomness")) 185 if hasattr(ssl, 'RAND_egd'): 186 self.assertRaises(TypeError, ssl.RAND_egd, 1) 187 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1) 188 ssl.RAND_add("this is a random string", 75.0) 189 190 def test_parse_cert(self): 191 # note that this uses an 'unofficial' function in _ssl.c, 192 # provided solely for this test, to exercise the certificate 193 # parsing code 194 p = ssl._ssl._test_decode_cert(CERTFILE) 195 if support.verbose: 196 sys.stdout.write("\n" + pprint.pformat(p) + "\n") 197 self.assertEqual(p['issuer'], 198 ((('countryName', 'XY'),), 199 (('localityName', 'Castle Anthrax'),), 200 (('organizationName', 'Python Software Foundation'),), 201 (('commonName', 'localhost'),)) 202 ) 203 # Note the next three asserts will fail if the keys are regenerated 204 self.assertEqual(p['notAfter'], asn1time('Jan 17 19:09:06 2028 GMT')) 205 self.assertEqual(p['notBefore'], asn1time('Jan 19 19:09:06 2018 GMT')) 206 self.assertEqual(p['serialNumber'], 'F9BA076D5B6ABD9B') 207 self.assertEqual(p['subject'], 208 ((('countryName', 'XY'),), 209 (('localityName', 'Castle Anthrax'),), 210 (('organizationName', 'Python Software Foundation'),), 211 (('commonName', 'localhost'),)) 212 ) 213 self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),)) 214 # Issue #13034: the subjectAltName in some certificates 215 # (notably projects.developer.nokia.com:443) wasn't parsed 216 p = ssl._ssl._test_decode_cert(NOKIACERT) 217 if support.verbose: 218 sys.stdout.write("\n" + pprint.pformat(p) + "\n") 219 self.assertEqual(p['subjectAltName'], 220 (('DNS', 'projects.developer.nokia.com'), 221 ('DNS', 'projects.forum.nokia.com')) 222 ) 223 # extra OCSP and AIA fields 224 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',)) 225 self.assertEqual(p['caIssuers'], 226 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',)) 227 self.assertEqual(p['crlDistributionPoints'], 228 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',)) 229 230 def test_parse_cert_CVE_2013_4238(self): 231 p = ssl._ssl._test_decode_cert(NULLBYTECERT) 232 if support.verbose: 233 sys.stdout.write("\n" + pprint.pformat(p) + "\n") 234 subject = ((('countryName', 'US'),), 235 (('stateOrProvinceName', 'Oregon'),), 236 (('localityName', 'Beaverton'),), 237 (('organizationName', 'Python Software Foundation'),), 238 (('organizationalUnitName', 'Python Core Development'),), 239 (('commonName', 'null.python.org\x00example.org'),), 240 (('emailAddress', '[email protected]'),)) 241 self.assertEqual(p['subject'], subject) 242 self.assertEqual(p['issuer'], subject) 243 if ssl._OPENSSL_API_VERSION >= (0, 9, 8): 244 san = (('DNS', 'altnull.python.org\x00example.com'), 245 ('email', '[email protected]\[email protected]'), 246 ('URI', 'http://null.python.org\x00http://example.org'), 247 ('IP Address', '192.0.2.1'), 248 ('IP Address', '2001:DB8:0:0:0:0:0:1\n')) 249 else: 250 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName 251 san = (('DNS', 'altnull.python.org\x00example.com'), 252 ('email', '[email protected]\[email protected]'), 253 ('URI', 'http://null.python.org\x00http://example.org'), 254 ('IP Address', '192.0.2.1'), 255 ('IP Address', '<invalid>')) 256 257 self.assertEqual(p['subjectAltName'], san) 258 259 def test_parse_all_sans(self): 260 p = ssl._ssl._test_decode_cert(ALLSANFILE) 261 self.assertEqual(p['subjectAltName'], 262 ( 263 ('DNS', 'allsans'), 264 ('othername', '<unsupported>'), 265 ('othername', '<unsupported>'), 266 ('email', '[email protected]'), 267 ('DNS', 'www.example.org'), 268 ('DirName', 269 ((('countryName', 'XY'),), 270 (('localityName', 'Castle Anthrax'),), 271 (('organizationName', 'Python Software Foundation'),), 272 (('commonName', 'dirname example'),))), 273 ('URI', 'https://www.python.org/'), 274 ('IP Address', '127.0.0.1'), 275 ('IP Address', '0:0:0:0:0:0:0:1\n'), 276 ('Registered ID', '1.2.3.4.5') 277 ) 278 ) 279 280 def test_DER_to_PEM(self): 281 with open(CAFILE_CACERT, 'r') as f: 282 pem = f.read() 283 d1 = ssl.PEM_cert_to_DER_cert(pem) 284 p2 = ssl.DER_cert_to_PEM_cert(d1) 285 d2 = ssl.PEM_cert_to_DER_cert(p2) 286 self.assertEqual(d1, d2) 287 if not p2.startswith(ssl.PEM_HEADER + '\n'): 288 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2) 289 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'): 290 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2) 291 292 def test_openssl_version(self): 293 n = ssl.OPENSSL_VERSION_NUMBER 294 t = ssl.OPENSSL_VERSION_INFO 295 s = ssl.OPENSSL_VERSION 296 self.assertIsInstance(n, (int, long)) 297 self.assertIsInstance(t, tuple) 298 self.assertIsInstance(s, str) 299 # Some sanity checks follow 300 # >= 0.9 301 self.assertGreaterEqual(n, 0x900000) 302 # < 3.0 303 self.assertLess(n, 0x30000000) 304 major, minor, fix, patch, status = t 305 self.assertGreaterEqual(major, 0) 306 self.assertLess(major, 3) 307 self.assertGreaterEqual(minor, 0) 308 self.assertLess(minor, 256) 309 self.assertGreaterEqual(fix, 0) 310 self.assertLess(fix, 256) 311 self.assertGreaterEqual(patch, 0) 312 self.assertLessEqual(patch, 63) 313 self.assertGreaterEqual(status, 0) 314 self.assertLessEqual(status, 15) 315 # Version string as returned by {Open,Libre}SSL, the format might change 316 if IS_LIBRESSL: 317 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)), 318 (s, t, hex(n))) 319 else: 320 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)), 321 (s, t)) 322 323 @support.cpython_only 324 def test_refcycle(self): 325 # Issue #7943: an SSL object doesn't create reference cycles with 326 # itself. 327 s = socket.socket(socket.AF_INET) 328 ss = ssl.wrap_socket(s) 329 wr = weakref.ref(ss) 330 del ss 331 self.assertEqual(wr(), None) 332 333 def test_wrapped_unconnected(self): 334 # Methods on an unconnected SSLSocket propagate the original 335 # socket.error raise by the underlying socket object. 336 s = socket.socket(socket.AF_INET) 337 with closing(ssl.wrap_socket(s)) as ss: 338 self.assertRaises(socket.error, ss.recv, 1) 339 self.assertRaises(socket.error, ss.recv_into, bytearray(b'x')) 340 self.assertRaises(socket.error, ss.recvfrom, 1) 341 self.assertRaises(socket.error, ss.recvfrom_into, bytearray(b'x'), 1) 342 self.assertRaises(socket.error, ss.send, b'x') 343 self.assertRaises(socket.error, ss.sendto, b'x', ('0.0.0.0', 0)) 344 345 def test_timeout(self): 346 # Issue #8524: when creating an SSL socket, the timeout of the 347 # original socket should be retained. 348 for timeout in (None, 0.0, 5.0): 349 s = socket.socket(socket.AF_INET) 350 s.settimeout(timeout) 351 with closing(ssl.wrap_socket(s)) as ss: 352 self.assertEqual(timeout, ss.gettimeout()) 353 354 def test_errors(self): 355 sock = socket.socket() 356 self.assertRaisesRegexp(ValueError, 357 "certfile must be specified", 358 ssl.wrap_socket, sock, keyfile=CERTFILE) 359 self.assertRaisesRegexp(ValueError, 360 "certfile must be specified for server-side operations", 361 ssl.wrap_socket, sock, server_side=True) 362 self.assertRaisesRegexp(ValueError, 363 "certfile must be specified for server-side operations", 364 ssl.wrap_socket, sock, server_side=True, certfile="") 365 with closing(ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE)) as s: 366 self.assertRaisesRegexp(ValueError, "can't connect in server-side mode", 367 s.connect, (HOST, 8080)) 368 with self.assertRaises(IOError) as cm: 369 with closing(socket.socket()) as sock: 370 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT) 371 self.assertEqual(cm.exception.errno, errno.ENOENT) 372 with self.assertRaises(IOError) as cm: 373 with closing(socket.socket()) as sock: 374 ssl.wrap_socket(sock, 375 certfile=CERTFILE, keyfile=NONEXISTINGCERT) 376 self.assertEqual(cm.exception.errno, errno.ENOENT) 377 with self.assertRaises(IOError) as cm: 378 with closing(socket.socket()) as sock: 379 ssl.wrap_socket(sock, 380 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT) 381 self.assertEqual(cm.exception.errno, errno.ENOENT) 382 383 def bad_cert_test(self, certfile): 384 """Check that trying to use the given client certificate fails""" 385 certfile = os.path.join(os.path.dirname(__file__) or os.curdir, 386 certfile) 387 sock = socket.socket() 388 self.addCleanup(sock.close) 389 with self.assertRaises(ssl.SSLError): 390 ssl.wrap_socket(sock, 391 certfile=certfile, 392 ssl_version=ssl.PROTOCOL_TLSv1) 393 394 def test_empty_cert(self): 395 """Wrapping with an empty cert file""" 396 self.bad_cert_test("nullcert.pem") 397 398 def test_malformed_cert(self): 399 """Wrapping with a badly formatted certificate (syntax error)""" 400 self.bad_cert_test("badcert.pem") 401 402 def test_malformed_key(self): 403 """Wrapping with a badly formatted key (syntax error)""" 404 self.bad_cert_test("badkey.pem") 405 406 def test_match_hostname(self): 407 def ok(cert, hostname): 408 ssl.match_hostname(cert, hostname) 409 def fail(cert, hostname): 410 self.assertRaises(ssl.CertificateError, 411 ssl.match_hostname, cert, hostname) 412 413 cert = {'subject': ((('commonName', 'example.com'),),)} 414 ok(cert, 'example.com') 415 ok(cert, 'ExAmple.cOm') 416 fail(cert, 'www.example.com') 417 fail(cert, '.example.com') 418 fail(cert, 'example.org') 419 fail(cert, 'exampleXcom') 420 421 cert = {'subject': ((('commonName', '*.a.com'),),)} 422 ok(cert, 'foo.a.com') 423 fail(cert, 'bar.foo.a.com') 424 fail(cert, 'a.com') 425 fail(cert, 'Xa.com') 426 fail(cert, '.a.com') 427 428 # only match one left-most wildcard 429 cert = {'subject': ((('commonName', 'f*.com'),),)} 430 ok(cert, 'foo.com') 431 ok(cert, 'f.com') 432 fail(cert, 'bar.com') 433 fail(cert, 'foo.a.com') 434 fail(cert, 'bar.foo.com') 435 436 # NULL bytes are bad, CVE-2013-4073 437 cert = {'subject': ((('commonName', 438 'null.python.org\x00example.org'),),)} 439 ok(cert, 'null.python.org\x00example.org') # or raise an error? 440 fail(cert, 'example.org') 441 fail(cert, 'null.python.org') 442 443 # error cases with wildcards 444 cert = {'subject': ((('commonName', '*.*.a.com'),),)} 445 fail(cert, 'bar.foo.a.com') 446 fail(cert, 'a.com') 447 fail(cert, 'Xa.com') 448 fail(cert, '.a.com') 449 450 cert = {'subject': ((('commonName', 'a.*.com'),),)} 451 fail(cert, 'a.foo.com') 452 fail(cert, 'a..com') 453 fail(cert, 'a.com') 454 455 # wildcard doesn't match IDNA prefix 'xn--' 456 idna = u'püthon.python.org'.encode("idna").decode("ascii") 457 cert = {'subject': ((('commonName', idna),),)} 458 ok(cert, idna) 459 cert = {'subject': ((('commonName', 'x*.python.org'),),)} 460 fail(cert, idna) 461 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)} 462 fail(cert, idna) 463 464 # wildcard in first fragment and IDNA A-labels in sequent fragments 465 # are supported. 466 idna = u'www*.pythön.org'.encode("idna").decode("ascii") 467 cert = {'subject': ((('commonName', idna),),)} 468 ok(cert, u'www.pythön.org'.encode("idna").decode("ascii")) 469 ok(cert, u'www1.pythön.org'.encode("idna").decode("ascii")) 470 fail(cert, u'ftp.pythön.org'.encode("idna").decode("ascii")) 471 fail(cert, u'pythön.org'.encode("idna").decode("ascii")) 472 473 # Slightly fake real-world example 474 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT', 475 'subject': ((('commonName', 'linuxfrz.org'),),), 476 'subjectAltName': (('DNS', 'linuxfr.org'), 477 ('DNS', 'linuxfr.com'), 478 ('othername', '<unsupported>'))} 479 ok(cert, 'linuxfr.org') 480 ok(cert, 'linuxfr.com') 481 # Not a "DNS" entry 482 fail(cert, '<unsupported>') 483 # When there is a subjectAltName, commonName isn't used 484 fail(cert, 'linuxfrz.org') 485 486 # A pristine real-world example 487 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT', 488 'subject': ((('countryName', 'US'),), 489 (('stateOrProvinceName', 'California'),), 490 (('localityName', 'Mountain View'),), 491 (('organizationName', 'Google Inc'),), 492 (('commonName', 'mail.google.com'),))} 493 ok(cert, 'mail.google.com') 494 fail(cert, 'gmail.com') 495 # Only commonName is considered 496 fail(cert, 'California') 497 498 # Neither commonName nor subjectAltName 499 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT', 500 'subject': ((('countryName', 'US'),), 501 (('stateOrProvinceName', 'California'),), 502 (('localityName', 'Mountain View'),), 503 (('organizationName', 'Google Inc'),))} 504 fail(cert, 'mail.google.com') 505 506 # No DNS entry in subjectAltName but a commonName 507 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT', 508 'subject': ((('countryName', 'US'),), 509 (('stateOrProvinceName', 'California'),), 510 (('localityName', 'Mountain View'),), 511 (('commonName', 'mail.google.com'),)), 512 'subjectAltName': (('othername', 'blabla'), )} 513 ok(cert, 'mail.google.com') 514 515 # No DNS entry subjectAltName and no commonName 516 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT', 517 'subject': ((('countryName', 'US'),), 518 (('stateOrProvinceName', 'California'),), 519 (('localityName', 'Mountain View'),), 520 (('organizationName', 'Google Inc'),)), 521 'subjectAltName': (('othername', 'blabla'),)} 522 fail(cert, 'google.com') 523 524 # Empty cert / no cert 525 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com') 526 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com') 527 528 # Issue #17980: avoid denials of service by refusing more than one 529 # wildcard per fragment. 530 cert = {'subject': ((('commonName', 'a*b.com'),),)} 531 ok(cert, 'axxb.com') 532 cert = {'subject': ((('commonName', 'a*b.co*'),),)} 533 fail(cert, 'axxb.com') 534 cert = {'subject': ((('commonName', 'a*b*.com'),),)} 535 with self.assertRaises(ssl.CertificateError) as cm: 536 ssl.match_hostname(cert, 'axxbxxc.com') 537 self.assertIn("too many wildcards", str(cm.exception)) 538 539 def test_server_side(self): 540 # server_hostname doesn't work for server sockets 541 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 542 with closing(socket.socket()) as sock: 543 self.assertRaises(ValueError, ctx.wrap_socket, sock, True, 544 server_hostname="some.hostname") 545 546 def test_unknown_channel_binding(self): 547 # should raise ValueError for unknown type 548 s = socket.socket(socket.AF_INET) 549 with closing(ssl.wrap_socket(s)) as ss: 550 with self.assertRaises(ValueError): 551 ss.get_channel_binding("unknown-type") 552 553 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES, 554 "'tls-unique' channel binding not available") 555 def test_tls_unique_channel_binding(self): 556 # unconnected should return None for known type 557 s = socket.socket(socket.AF_INET) 558 with closing(ssl.wrap_socket(s)) as ss: 559 self.assertIsNone(ss.get_channel_binding("tls-unique")) 560 # the same for server-side 561 s = socket.socket(socket.AF_INET) 562 with closing(ssl.wrap_socket(s, server_side=True, certfile=CERTFILE)) as ss: 563 self.assertIsNone(ss.get_channel_binding("tls-unique")) 564 565 def test_get_default_verify_paths(self): 566 paths = ssl.get_default_verify_paths() 567 self.assertEqual(len(paths), 6) 568 self.assertIsInstance(paths, ssl.DefaultVerifyPaths) 569 570 with support.EnvironmentVarGuard() as env: 571 env["SSL_CERT_DIR"] = CAPATH 572 env["SSL_CERT_FILE"] = CERTFILE 573 paths = ssl.get_default_verify_paths() 574 self.assertEqual(paths.cafile, CERTFILE) 575 self.assertEqual(paths.capath, CAPATH) 576 577 @unittest.skipUnless(sys.platform == "win32", "Windows specific") 578 def test_enum_certificates(self): 579 self.assertTrue(ssl.enum_certificates("CA")) 580 self.assertTrue(ssl.enum_certificates("ROOT")) 581 582 self.assertRaises(TypeError, ssl.enum_certificates) 583 self.assertRaises(WindowsError, ssl.enum_certificates, "") 584 585 trust_oids = set() 586 for storename in ("CA", "ROOT"): 587 store = ssl.enum_certificates(storename) 588 self.assertIsInstance(store, list) 589 for element in store: 590 self.assertIsInstance(element, tuple) 591 self.assertEqual(len(element), 3) 592 cert, enc, trust = element 593 self.assertIsInstance(cert, bytes) 594 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"}) 595 self.assertIsInstance(trust, (set, bool)) 596 if isinstance(trust, set): 597 trust_oids.update(trust) 598 599 serverAuth = "1.3.6.1.5.5.7.3.1" 600 self.assertIn(serverAuth, trust_oids) 601 602 @unittest.skipUnless(sys.platform == "win32", "Windows specific") 603 def test_enum_crls(self): 604 self.assertTrue(ssl.enum_crls("CA")) 605 self.assertRaises(TypeError, ssl.enum_crls) 606 self.assertRaises(WindowsError, ssl.enum_crls, "") 607 608 crls = ssl.enum_crls("CA") 609 self.assertIsInstance(crls, list) 610 for element in crls: 611 self.assertIsInstance(element, tuple) 612 self.assertEqual(len(element), 2) 613 self.assertIsInstance(element[0], bytes) 614 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"}) 615 616 617 def test_asn1object(self): 618 expected = (129, 'serverAuth', 'TLS Web Server Authentication', 619 '1.3.6.1.5.5.7.3.1') 620 621 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1') 622 self.assertEqual(val, expected) 623 self.assertEqual(val.nid, 129) 624 self.assertEqual(val.shortname, 'serverAuth') 625 self.assertEqual(val.longname, 'TLS Web Server Authentication') 626 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1') 627 self.assertIsInstance(val, ssl._ASN1Object) 628 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth') 629 630 val = ssl._ASN1Object.fromnid(129) 631 self.assertEqual(val, expected) 632 self.assertIsInstance(val, ssl._ASN1Object) 633 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1) 634 with self.assertRaisesRegexp(ValueError, "unknown NID 100000"): 635 ssl._ASN1Object.fromnid(100000) 636 for i in range(1000): 637 try: 638 obj = ssl._ASN1Object.fromnid(i) 639 except ValueError: 640 pass 641 else: 642 self.assertIsInstance(obj.nid, int) 643 self.assertIsInstance(obj.shortname, str) 644 self.assertIsInstance(obj.longname, str) 645 self.assertIsInstance(obj.oid, (str, type(None))) 646 647 val = ssl._ASN1Object.fromname('TLS Web Server Authentication') 648 self.assertEqual(val, expected) 649 self.assertIsInstance(val, ssl._ASN1Object) 650 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected) 651 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'), 652 expected) 653 with self.assertRaisesRegexp(ValueError, "unknown object 'serverauth'"): 654 ssl._ASN1Object.fromname('serverauth') 655 656 def test_purpose_enum(self): 657 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1') 658 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object) 659 self.assertEqual(ssl.Purpose.SERVER_AUTH, val) 660 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129) 661 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth') 662 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid, 663 '1.3.6.1.5.5.7.3.1') 664 665 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2') 666 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object) 667 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val) 668 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130) 669 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth') 670 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid, 671 '1.3.6.1.5.5.7.3.2') 672 673 def test_unsupported_dtls(self): 674 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 675 self.addCleanup(s.close) 676 with self.assertRaises(NotImplementedError) as cx: 677 ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE) 678 self.assertEqual(str(cx.exception), "only stream sockets are supported") 679 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 680 with self.assertRaises(NotImplementedError) as cx: 681 ctx.wrap_socket(s) 682 self.assertEqual(str(cx.exception), "only stream sockets are supported") 683 684 def cert_time_ok(self, timestring, timestamp): 685 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp) 686 687 def cert_time_fail(self, timestring): 688 with self.assertRaises(ValueError): 689 ssl.cert_time_to_seconds(timestring) 690 691 @unittest.skipUnless(utc_offset(), 692 'local time needs to be different from UTC') 693 def test_cert_time_to_seconds_timezone(self): 694 # Issue #19940: ssl.cert_time_to_seconds() returns wrong 695 # results if local timezone is not UTC 696 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0) 697 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0) 698 699 def test_cert_time_to_seconds(self): 700 timestring = "Jan 5 09:34:43 2018 GMT" 701 ts = 1515144883.0 702 self.cert_time_ok(timestring, ts) 703 # accept keyword parameter, assert its name 704 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts) 705 # accept both %e and %d (space or zero generated by strftime) 706 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts) 707 # case-insensitive 708 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts) 709 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds 710 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT 711 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone 712 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day 713 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month 714 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour 715 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute 716 717 newyear_ts = 1230768000.0 718 # leap seconds 719 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts) 720 # same timestamp 721 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts) 722 723 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899) 724 # allow 60th second (even if it is not a leap second) 725 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900) 726 # allow 2nd leap second for compatibility with time.strptime() 727 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901) 728 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds 729 730 # no special treatement for the special value: 731 # 99991231235959Z (rfc 5280) 732 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0) 733 734 @support.run_with_locale('LC_ALL', '') 735 def test_cert_time_to_seconds_locale(self): 736 # `cert_time_to_seconds()` should be locale independent 737 738 def local_february_name(): 739 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0)) 740 741 if local_february_name().lower() == 'feb': 742 self.skipTest("locale-specific month name needs to be " 743 "different from C locale") 744 745 # locale-independent 746 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0) 747 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT") 748 749 750class ContextTests(unittest.TestCase): 751 752 @skip_if_broken_ubuntu_ssl 753 def test_constructor(self): 754 for protocol in PROTOCOLS: 755 ssl.SSLContext(protocol) 756 self.assertRaises(TypeError, ssl.SSLContext) 757 self.assertRaises(ValueError, ssl.SSLContext, -1) 758 self.assertRaises(ValueError, ssl.SSLContext, 42) 759 760 @skip_if_broken_ubuntu_ssl 761 def test_protocol(self): 762 for proto in PROTOCOLS: 763 ctx = ssl.SSLContext(proto) 764 self.assertEqual(ctx.protocol, proto) 765 766 def test_ciphers(self): 767 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 768 ctx.set_ciphers("ALL") 769 ctx.set_ciphers("DEFAULT") 770 with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"): 771 ctx.set_ciphers("^$:,;?*'dorothyx") 772 773 @skip_if_broken_ubuntu_ssl 774 def test_options(self): 775 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 776 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value 777 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3) 778 if not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0): 779 default |= ssl.OP_NO_COMPRESSION 780 self.assertEqual(default, ctx.options) 781 ctx.options |= ssl.OP_NO_TLSv1 782 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options) 783 if can_clear_options(): 784 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1) 785 self.assertEqual(default, ctx.options) 786 ctx.options = 0 787 # Ubuntu has OP_NO_SSLv3 forced on by default 788 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3) 789 else: 790 with self.assertRaises(ValueError): 791 ctx.options = 0 792 793 def test_verify_mode(self): 794 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 795 # Default value 796 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) 797 ctx.verify_mode = ssl.CERT_OPTIONAL 798 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL) 799 ctx.verify_mode = ssl.CERT_REQUIRED 800 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) 801 ctx.verify_mode = ssl.CERT_NONE 802 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) 803 with self.assertRaises(TypeError): 804 ctx.verify_mode = None 805 with self.assertRaises(ValueError): 806 ctx.verify_mode = 42 807 808 @unittest.skipUnless(have_verify_flags(), 809 "verify_flags need OpenSSL > 0.9.8") 810 def test_verify_flags(self): 811 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 812 # default value 813 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0) 814 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf) 815 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF 816 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF) 817 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN 818 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN) 819 ctx.verify_flags = ssl.VERIFY_DEFAULT 820 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT) 821 # supports any value 822 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT 823 self.assertEqual(ctx.verify_flags, 824 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT) 825 with self.assertRaises(TypeError): 826 ctx.verify_flags = None 827 828 def test_load_cert_chain(self): 829 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 830 # Combined key and cert in a single file 831 ctx.load_cert_chain(CERTFILE, keyfile=None) 832 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE) 833 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE) 834 with self.assertRaises(IOError) as cm: 835 ctx.load_cert_chain(NONEXISTINGCERT) 836 self.assertEqual(cm.exception.errno, errno.ENOENT) 837 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"): 838 ctx.load_cert_chain(BADCERT) 839 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"): 840 ctx.load_cert_chain(EMPTYCERT) 841 # Separate key and cert 842 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 843 ctx.load_cert_chain(ONLYCERT, ONLYKEY) 844 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY) 845 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY) 846 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"): 847 ctx.load_cert_chain(ONLYCERT) 848 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"): 849 ctx.load_cert_chain(ONLYKEY) 850 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"): 851 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT) 852 # Mismatching key and cert 853 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 854 with self.assertRaisesRegexp(ssl.SSLError, "key values mismatch"): 855 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY) 856 # Password protected key and cert 857 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD) 858 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode()) 859 ctx.load_cert_chain(CERTFILE_PROTECTED, 860 password=bytearray(KEY_PASSWORD.encode())) 861 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD) 862 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode()) 863 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, 864 bytearray(KEY_PASSWORD.encode())) 865 with self.assertRaisesRegexp(TypeError, "should be a string"): 866 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True) 867 with self.assertRaises(ssl.SSLError): 868 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass") 869 with self.assertRaisesRegexp(ValueError, "cannot be longer"): 870 # openssl has a fixed limit on the password buffer. 871 # PEM_BUFSIZE is generally set to 1kb. 872 # Return a string larger than this. 873 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400) 874 # Password callback 875 def getpass_unicode(): 876 return KEY_PASSWORD 877 def getpass_bytes(): 878 return KEY_PASSWORD.encode() 879 def getpass_bytearray(): 880 return bytearray(KEY_PASSWORD.encode()) 881 def getpass_badpass(): 882 return "badpass" 883 def getpass_huge(): 884 return b'a' * (1024 * 1024) 885 def getpass_bad_type(): 886 return 9 887 def getpass_exception(): 888 raise Exception('getpass error') 889 class GetPassCallable: 890 def __call__(self): 891 return KEY_PASSWORD 892 def getpass(self): 893 return KEY_PASSWORD 894 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode) 895 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes) 896 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray) 897 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable()) 898 ctx.load_cert_chain(CERTFILE_PROTECTED, 899 password=GetPassCallable().getpass) 900 with self.assertRaises(ssl.SSLError): 901 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass) 902 with self.assertRaisesRegexp(ValueError, "cannot be longer"): 903 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge) 904 with self.assertRaisesRegexp(TypeError, "must return a string"): 905 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type) 906 with self.assertRaisesRegexp(Exception, "getpass error"): 907 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception) 908 # Make sure the password function isn't called if it isn't needed 909 ctx.load_cert_chain(CERTFILE, password=getpass_exception) 910 911 def test_load_verify_locations(self): 912 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 913 ctx.load_verify_locations(CERTFILE) 914 ctx.load_verify_locations(cafile=CERTFILE, capath=None) 915 ctx.load_verify_locations(BYTES_CERTFILE) 916 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None) 917 ctx.load_verify_locations(cafile=BYTES_CERTFILE.decode('utf-8')) 918 self.assertRaises(TypeError, ctx.load_verify_locations) 919 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None) 920 with self.assertRaises(IOError) as cm: 921 ctx.load_verify_locations(NONEXISTINGCERT) 922 self.assertEqual(cm.exception.errno, errno.ENOENT) 923 with self.assertRaises(IOError): 924 ctx.load_verify_locations(u'') 925 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"): 926 ctx.load_verify_locations(BADCERT) 927 ctx.load_verify_locations(CERTFILE, CAPATH) 928 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH) 929 930 # Issue #10989: crash if the second argument type is invalid 931 self.assertRaises(TypeError, ctx.load_verify_locations, None, True) 932 933 def test_load_verify_cadata(self): 934 # test cadata 935 with open(CAFILE_CACERT) as f: 936 cacert_pem = f.read().decode("ascii") 937 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem) 938 with open(CAFILE_NEURONIO) as f: 939 neuronio_pem = f.read().decode("ascii") 940 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem) 941 942 # test PEM 943 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 944 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0) 945 ctx.load_verify_locations(cadata=cacert_pem) 946 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1) 947 ctx.load_verify_locations(cadata=neuronio_pem) 948 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) 949 # cert already in hash table 950 ctx.load_verify_locations(cadata=neuronio_pem) 951 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) 952 953 # combined 954 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 955 combined = "\n".join((cacert_pem, neuronio_pem)) 956 ctx.load_verify_locations(cadata=combined) 957 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) 958 959 # with junk around the certs 960 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 961 combined = ["head", cacert_pem, "other", neuronio_pem, "again", 962 neuronio_pem, "tail"] 963 ctx.load_verify_locations(cadata="\n".join(combined)) 964 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) 965 966 # test DER 967 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 968 ctx.load_verify_locations(cadata=cacert_der) 969 ctx.load_verify_locations(cadata=neuronio_der) 970 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) 971 # cert already in hash table 972 ctx.load_verify_locations(cadata=cacert_der) 973 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) 974 975 # combined 976 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 977 combined = b"".join((cacert_der, neuronio_der)) 978 ctx.load_verify_locations(cadata=combined) 979 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) 980 981 # error cases 982 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 983 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object) 984 985 with self.assertRaisesRegexp(ssl.SSLError, "no start line"): 986 ctx.load_verify_locations(cadata=u"broken") 987 with self.assertRaisesRegexp(ssl.SSLError, "not enough data"): 988 ctx.load_verify_locations(cadata=b"broken") 989 990 991 def test_load_dh_params(self): 992 filename = u'dhpäräm.pem' 993 fs_encoding = sys.getfilesystemencoding() 994 try: 995 filename.encode(fs_encoding) 996 except UnicodeEncodeError: 997 self.skipTest("filename %r cannot be encoded to the filesystem encoding %r" % (filename, fs_encoding)) 998 999 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1000 ctx.load_dh_params(DHFILE) 1001 if os.name != 'nt': 1002 ctx.load_dh_params(BYTES_DHFILE) 1003 self.assertRaises(TypeError, ctx.load_dh_params) 1004 self.assertRaises(TypeError, ctx.load_dh_params, None) 1005 with self.assertRaises(IOError) as cm: 1006 ctx.load_dh_params(NONEXISTINGCERT) 1007 self.assertEqual(cm.exception.errno, errno.ENOENT) 1008 with self.assertRaises(ssl.SSLError) as cm: 1009 ctx.load_dh_params(CERTFILE) 1010 with support.temp_dir() as d: 1011 fname = os.path.join(d, filename) 1012 shutil.copy(DHFILE, fname) 1013 ctx.load_dh_params(fname) 1014 1015 @skip_if_broken_ubuntu_ssl 1016 def test_session_stats(self): 1017 for proto in PROTOCOLS: 1018 ctx = ssl.SSLContext(proto) 1019 self.assertEqual(ctx.session_stats(), { 1020 'number': 0, 1021 'connect': 0, 1022 'connect_good': 0, 1023 'connect_renegotiate': 0, 1024 'accept': 0, 1025 'accept_good': 0, 1026 'accept_renegotiate': 0, 1027 'hits': 0, 1028 'misses': 0, 1029 'timeouts': 0, 1030 'cache_full': 0, 1031 }) 1032 1033 def test_set_default_verify_paths(self): 1034 # There's not much we can do to test that it acts as expected, 1035 # so just check it doesn't crash or raise an exception. 1036 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1037 ctx.set_default_verify_paths() 1038 1039 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build") 1040 def test_set_ecdh_curve(self): 1041 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1042 ctx.set_ecdh_curve("prime256v1") 1043 ctx.set_ecdh_curve(b"prime256v1") 1044 self.assertRaises(TypeError, ctx.set_ecdh_curve) 1045 self.assertRaises(TypeError, ctx.set_ecdh_curve, None) 1046 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo") 1047 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo") 1048 1049 @needs_sni 1050 def test_sni_callback(self): 1051 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1052 1053 # set_servername_callback expects a callable, or None 1054 self.assertRaises(TypeError, ctx.set_servername_callback) 1055 self.assertRaises(TypeError, ctx.set_servername_callback, 4) 1056 self.assertRaises(TypeError, ctx.set_servername_callback, "") 1057 self.assertRaises(TypeError, ctx.set_servername_callback, ctx) 1058 1059 def dummycallback(sock, servername, ctx): 1060 pass 1061 ctx.set_servername_callback(None) 1062 ctx.set_servername_callback(dummycallback) 1063 1064 @needs_sni 1065 def test_sni_callback_refcycle(self): 1066 # Reference cycles through the servername callback are detected 1067 # and cleared. 1068 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1069 def dummycallback(sock, servername, ctx, cycle=ctx): 1070 pass 1071 ctx.set_servername_callback(dummycallback) 1072 wr = weakref.ref(ctx) 1073 del ctx, dummycallback 1074 gc.collect() 1075 self.assertIs(wr(), None) 1076 1077 def test_cert_store_stats(self): 1078 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1079 self.assertEqual(ctx.cert_store_stats(), 1080 {'x509_ca': 0, 'crl': 0, 'x509': 0}) 1081 ctx.load_cert_chain(CERTFILE) 1082 self.assertEqual(ctx.cert_store_stats(), 1083 {'x509_ca': 0, 'crl': 0, 'x509': 0}) 1084 ctx.load_verify_locations(CERTFILE) 1085 self.assertEqual(ctx.cert_store_stats(), 1086 {'x509_ca': 0, 'crl': 0, 'x509': 1}) 1087 ctx.load_verify_locations(CAFILE_CACERT) 1088 self.assertEqual(ctx.cert_store_stats(), 1089 {'x509_ca': 1, 'crl': 0, 'x509': 2}) 1090 1091 def test_get_ca_certs(self): 1092 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1093 self.assertEqual(ctx.get_ca_certs(), []) 1094 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE 1095 ctx.load_verify_locations(CERTFILE) 1096 self.assertEqual(ctx.get_ca_certs(), []) 1097 # but CAFILE_CACERT is a CA cert 1098 ctx.load_verify_locations(CAFILE_CACERT) 1099 self.assertEqual(ctx.get_ca_certs(), 1100 [{'issuer': ((('organizationName', 'Root CA'),), 1101 (('organizationalUnitName', 'http://www.cacert.org'),), 1102 (('commonName', 'CA Cert Signing Authority'),), 1103 (('emailAddress', '[email protected]'),)), 1104 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'), 1105 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'), 1106 'serialNumber': '00', 1107 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',), 1108 'subject': ((('organizationName', 'Root CA'),), 1109 (('organizationalUnitName', 'http://www.cacert.org'),), 1110 (('commonName', 'CA Cert Signing Authority'),), 1111 (('emailAddress', '[email protected]'),)), 1112 'version': 3}]) 1113 1114 with open(CAFILE_CACERT) as f: 1115 pem = f.read() 1116 der = ssl.PEM_cert_to_DER_cert(pem) 1117 self.assertEqual(ctx.get_ca_certs(True), [der]) 1118 1119 def test_load_default_certs(self): 1120 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1121 ctx.load_default_certs() 1122 1123 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1124 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH) 1125 ctx.load_default_certs() 1126 1127 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1128 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH) 1129 1130 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1131 self.assertRaises(TypeError, ctx.load_default_certs, None) 1132 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH') 1133 1134 @unittest.skipIf(sys.platform == "win32", "not-Windows specific") 1135 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars") 1136 def test_load_default_certs_env(self): 1137 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1138 with support.EnvironmentVarGuard() as env: 1139 env["SSL_CERT_DIR"] = CAPATH 1140 env["SSL_CERT_FILE"] = CERTFILE 1141 ctx.load_default_certs() 1142 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0}) 1143 1144 @unittest.skipUnless(sys.platform == "win32", "Windows specific") 1145 def test_load_default_certs_env_windows(self): 1146 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1147 ctx.load_default_certs() 1148 stats = ctx.cert_store_stats() 1149 1150 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1151 with support.EnvironmentVarGuard() as env: 1152 env["SSL_CERT_DIR"] = CAPATH 1153 env["SSL_CERT_FILE"] = CERTFILE 1154 ctx.load_default_certs() 1155 stats["x509"] += 1 1156 self.assertEqual(ctx.cert_store_stats(), stats) 1157 1158 def test_create_default_context(self): 1159 ctx = ssl.create_default_context() 1160 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23) 1161 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) 1162 self.assertTrue(ctx.check_hostname) 1163 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) 1164 self.assertEqual( 1165 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0), 1166 getattr(ssl, "OP_NO_COMPRESSION", 0), 1167 ) 1168 1169 with open(SIGNING_CA) as f: 1170 cadata = f.read().decode("ascii") 1171 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH, 1172 cadata=cadata) 1173 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23) 1174 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) 1175 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) 1176 self.assertEqual( 1177 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0), 1178 getattr(ssl, "OP_NO_COMPRESSION", 0), 1179 ) 1180 1181 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) 1182 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23) 1183 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) 1184 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) 1185 self.assertEqual( 1186 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0), 1187 getattr(ssl, "OP_NO_COMPRESSION", 0), 1188 ) 1189 self.assertEqual( 1190 ctx.options & getattr(ssl, "OP_SINGLE_DH_USE", 0), 1191 getattr(ssl, "OP_SINGLE_DH_USE", 0), 1192 ) 1193 self.assertEqual( 1194 ctx.options & getattr(ssl, "OP_SINGLE_ECDH_USE", 0), 1195 getattr(ssl, "OP_SINGLE_ECDH_USE", 0), 1196 ) 1197 1198 def test__create_stdlib_context(self): 1199 ctx = ssl._create_stdlib_context() 1200 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23) 1201 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) 1202 self.assertFalse(ctx.check_hostname) 1203 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) 1204 1205 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1) 1206 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1) 1207 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) 1208 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) 1209 1210 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1, 1211 cert_reqs=ssl.CERT_REQUIRED, 1212 check_hostname=True) 1213 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1) 1214 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) 1215 self.assertTrue(ctx.check_hostname) 1216 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) 1217 1218 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH) 1219 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23) 1220 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) 1221 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) 1222 1223 def test__https_verify_certificates(self): 1224 # Unit test to check the contect factory mapping 1225 # The factories themselves are tested above 1226 # This test will fail by design if run under PYTHONHTTPSVERIFY=0 1227 # (as will various test_httplib tests) 1228 1229 # Uses a fresh SSL module to avoid affecting the real one 1230 local_ssl = support.import_fresh_module("ssl") 1231 # Certificate verification is enabled by default 1232 self.assertIs(local_ssl._create_default_https_context, 1233 local_ssl.create_default_context) 1234 # Turn default verification off 1235 local_ssl._https_verify_certificates(enable=False) 1236 self.assertIs(local_ssl._create_default_https_context, 1237 local_ssl._create_unverified_context) 1238 # And back on 1239 local_ssl._https_verify_certificates(enable=True) 1240 self.assertIs(local_ssl._create_default_https_context, 1241 local_ssl.create_default_context) 1242 # The default behaviour is to enable 1243 local_ssl._https_verify_certificates(enable=False) 1244 local_ssl._https_verify_certificates() 1245 self.assertIs(local_ssl._create_default_https_context, 1246 local_ssl.create_default_context) 1247 1248 def test__https_verify_envvar(self): 1249 # Unit test to check the PYTHONHTTPSVERIFY handling 1250 # Need to use a subprocess so it can still be run under -E 1251 https_is_verified = """import ssl, sys; \ 1252 status = "Error: _create_default_https_context does not verify certs" \ 1253 if ssl._create_default_https_context is \ 1254 ssl._create_unverified_context \ 1255 else None; \ 1256 sys.exit(status)""" 1257 https_is_not_verified = """import ssl, sys; \ 1258 status = "Error: _create_default_https_context verifies certs" \ 1259 if ssl._create_default_https_context is \ 1260 ssl.create_default_context \ 1261 else None; \ 1262 sys.exit(status)""" 1263 extra_env = {} 1264 # Omitting it leaves verification on 1265 assert_python_ok("-c", https_is_verified, **extra_env) 1266 # Setting it to zero turns verification off 1267 extra_env[ssl._https_verify_envvar] = "0" 1268 assert_python_ok("-c", https_is_not_verified, **extra_env) 1269 # Any other value should also leave it on 1270 for setting in ("", "1", "enabled", "foo"): 1271 extra_env[ssl._https_verify_envvar] = setting 1272 assert_python_ok("-c", https_is_verified, **extra_env) 1273 1274 def test_check_hostname(self): 1275 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1276 self.assertFalse(ctx.check_hostname) 1277 1278 # Requires CERT_REQUIRED or CERT_OPTIONAL 1279 with self.assertRaises(ValueError): 1280 ctx.check_hostname = True 1281 ctx.verify_mode = ssl.CERT_REQUIRED 1282 self.assertFalse(ctx.check_hostname) 1283 ctx.check_hostname = True 1284 self.assertTrue(ctx.check_hostname) 1285 1286 ctx.verify_mode = ssl.CERT_OPTIONAL 1287 ctx.check_hostname = True 1288 self.assertTrue(ctx.check_hostname) 1289 1290 # Cannot set CERT_NONE with check_hostname enabled 1291 with self.assertRaises(ValueError): 1292 ctx.verify_mode = ssl.CERT_NONE 1293 ctx.check_hostname = False 1294 self.assertFalse(ctx.check_hostname) 1295 1296 1297class SSLErrorTests(unittest.TestCase): 1298 1299 def test_str(self): 1300 # The str() of a SSLError doesn't include the errno 1301 e = ssl.SSLError(1, "foo") 1302 self.assertEqual(str(e), "foo") 1303 self.assertEqual(e.errno, 1) 1304 # Same for a subclass 1305 e = ssl.SSLZeroReturnError(1, "foo") 1306 self.assertEqual(str(e), "foo") 1307 self.assertEqual(e.errno, 1) 1308 1309 def test_lib_reason(self): 1310 # Test the library and reason attributes 1311 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1312 with self.assertRaises(ssl.SSLError) as cm: 1313 ctx.load_dh_params(CERTFILE) 1314 self.assertEqual(cm.exception.library, 'PEM') 1315 self.assertEqual(cm.exception.reason, 'NO_START_LINE') 1316 s = str(cm.exception) 1317 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s) 1318 1319 def test_subclass(self): 1320 # Check that the appropriate SSLError subclass is raised 1321 # (this only tests one of them) 1322 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1323 with closing(socket.socket()) as s: 1324 s.bind(("127.0.0.1", 0)) 1325 s.listen(5) 1326 c = socket.socket() 1327 c.connect(s.getsockname()) 1328 c.setblocking(False) 1329 with closing(ctx.wrap_socket(c, False, do_handshake_on_connect=False)) as c: 1330 with self.assertRaises(ssl.SSLWantReadError) as cm: 1331 c.do_handshake() 1332 s = str(cm.exception) 1333 self.assertTrue(s.startswith("The operation did not complete (read)"), s) 1334 # For compatibility 1335 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ) 1336 1337 1338class NetworkedTests(unittest.TestCase): 1339 1340 def test_connect(self): 1341 with support.transient_internet(REMOTE_HOST): 1342 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 1343 cert_reqs=ssl.CERT_NONE) 1344 try: 1345 s.connect((REMOTE_HOST, 443)) 1346 self.assertEqual({}, s.getpeercert()) 1347 finally: 1348 s.close() 1349 1350 # this should fail because we have no verification certs 1351 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 1352 cert_reqs=ssl.CERT_REQUIRED) 1353 self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed", 1354 s.connect, (REMOTE_HOST, 443)) 1355 s.close() 1356 1357 # this should succeed because we specify the root cert 1358 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 1359 cert_reqs=ssl.CERT_REQUIRED, 1360 ca_certs=REMOTE_ROOT_CERT) 1361 try: 1362 s.connect((REMOTE_HOST, 443)) 1363 self.assertTrue(s.getpeercert()) 1364 finally: 1365 s.close() 1366 1367 def test_connect_ex(self): 1368 # Issue #11326: check connect_ex() implementation 1369 with support.transient_internet(REMOTE_HOST): 1370 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 1371 cert_reqs=ssl.CERT_REQUIRED, 1372 ca_certs=REMOTE_ROOT_CERT) 1373 try: 1374 self.assertEqual(0, s.connect_ex((REMOTE_HOST, 443))) 1375 self.assertTrue(s.getpeercert()) 1376 finally: 1377 s.close() 1378 1379 def test_non_blocking_connect_ex(self): 1380 # Issue #11326: non-blocking connect_ex() should allow handshake 1381 # to proceed after the socket gets ready. 1382 with support.transient_internet(REMOTE_HOST): 1383 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 1384 cert_reqs=ssl.CERT_REQUIRED, 1385 ca_certs=REMOTE_ROOT_CERT, 1386 do_handshake_on_connect=False) 1387 try: 1388 s.setblocking(False) 1389 rc = s.connect_ex((REMOTE_HOST, 443)) 1390 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere 1391 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK)) 1392 # Wait for connect to finish 1393 select.select([], [s], [], 5.0) 1394 # Non-blocking handshake 1395 while True: 1396 try: 1397 s.do_handshake() 1398 break 1399 except ssl.SSLWantReadError: 1400 select.select([s], [], [], 5.0) 1401 except ssl.SSLWantWriteError: 1402 select.select([], [s], [], 5.0) 1403 # SSL established 1404 self.assertTrue(s.getpeercert()) 1405 finally: 1406 s.close() 1407 1408 def test_timeout_connect_ex(self): 1409 # Issue #12065: on a timeout, connect_ex() should return the original 1410 # errno (mimicking the behaviour of non-SSL sockets). 1411 with support.transient_internet(REMOTE_HOST): 1412 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 1413 cert_reqs=ssl.CERT_REQUIRED, 1414 ca_certs=REMOTE_ROOT_CERT, 1415 do_handshake_on_connect=False) 1416 try: 1417 s.settimeout(0.0000001) 1418 rc = s.connect_ex((REMOTE_HOST, 443)) 1419 if rc == 0: 1420 self.skipTest("REMOTE_HOST responded too quickly") 1421 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK)) 1422 finally: 1423 s.close() 1424 1425 def test_connect_ex_error(self): 1426 with support.transient_internet(REMOTE_HOST): 1427 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 1428 cert_reqs=ssl.CERT_REQUIRED, 1429 ca_certs=REMOTE_ROOT_CERT) 1430 try: 1431 rc = s.connect_ex((REMOTE_HOST, 444)) 1432 # Issue #19919: Windows machines or VMs hosted on Windows 1433 # machines sometimes return EWOULDBLOCK. 1434 errors = ( 1435 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT, 1436 errno.EWOULDBLOCK, 1437 ) 1438 self.assertIn(rc, errors) 1439 finally: 1440 s.close() 1441 1442 def test_connect_with_context(self): 1443 with support.transient_internet(REMOTE_HOST): 1444 # Same as test_connect, but with a separately created context 1445 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 1446 s = ctx.wrap_socket(socket.socket(socket.AF_INET)) 1447 s.connect((REMOTE_HOST, 443)) 1448 try: 1449 self.assertEqual({}, s.getpeercert()) 1450 finally: 1451 s.close() 1452 # Same with a server hostname 1453 s = ctx.wrap_socket(socket.socket(socket.AF_INET), 1454 server_hostname=REMOTE_HOST) 1455 s.connect((REMOTE_HOST, 443)) 1456 s.close() 1457 # This should fail because we have no verification certs 1458 ctx.verify_mode = ssl.CERT_REQUIRED 1459 s = ctx.wrap_socket(socket.socket(socket.AF_INET)) 1460 self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed", 1461 s.connect, (REMOTE_HOST, 443)) 1462 s.close() 1463 # This should succeed because we specify the root cert 1464 ctx.load_verify_locations(REMOTE_ROOT_CERT) 1465 s = ctx.wrap_socket(socket.socket(socket.AF_INET)) 1466 s.connect((REMOTE_HOST, 443)) 1467 try: 1468 cert = s.getpeercert() 1469 self.assertTrue(cert) 1470 finally: 1471 s.close() 1472 1473 def test_connect_capath(self): 1474 # Verify server certificates using the `capath` argument 1475 # NOTE: the subject hashing algorithm has been changed between 1476 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must 1477 # contain both versions of each certificate (same content, different 1478 # filename) for this test to be portable across OpenSSL releases. 1479 with support.transient_internet(REMOTE_HOST): 1480 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 1481 ctx.verify_mode = ssl.CERT_REQUIRED 1482 ctx.load_verify_locations(capath=CAPATH) 1483 s = ctx.wrap_socket(socket.socket(socket.AF_INET)) 1484 s.connect((REMOTE_HOST, 443)) 1485 try: 1486 cert = s.getpeercert() 1487 self.assertTrue(cert) 1488 finally: 1489 s.close() 1490 # Same with a bytes `capath` argument 1491 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 1492 ctx.verify_mode = ssl.CERT_REQUIRED 1493 ctx.load_verify_locations(capath=BYTES_CAPATH) 1494 s = ctx.wrap_socket(socket.socket(socket.AF_INET)) 1495 s.connect((REMOTE_HOST, 443)) 1496 try: 1497 cert = s.getpeercert() 1498 self.assertTrue(cert) 1499 finally: 1500 s.close() 1501 1502 def test_connect_cadata(self): 1503 with open(REMOTE_ROOT_CERT) as f: 1504 pem = f.read().decode('ascii') 1505 der = ssl.PEM_cert_to_DER_cert(pem) 1506 with support.transient_internet(REMOTE_HOST): 1507 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 1508 ctx.verify_mode = ssl.CERT_REQUIRED 1509 ctx.load_verify_locations(cadata=pem) 1510 with closing(ctx.wrap_socket(socket.socket(socket.AF_INET))) as s: 1511 s.connect((REMOTE_HOST, 443)) 1512 cert = s.getpeercert() 1513 self.assertTrue(cert) 1514 1515 # same with DER 1516 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 1517 ctx.verify_mode = ssl.CERT_REQUIRED 1518 ctx.load_verify_locations(cadata=der) 1519 with closing(ctx.wrap_socket(socket.socket(socket.AF_INET))) as s: 1520 s.connect((REMOTE_HOST, 443)) 1521 cert = s.getpeercert() 1522 self.assertTrue(cert) 1523 1524 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows") 1525 def test_makefile_close(self): 1526 # Issue #5238: creating a file-like object with makefile() shouldn't 1527 # delay closing the underlying "real socket" (here tested with its 1528 # file descriptor, hence skipping the test under Windows). 1529 with support.transient_internet(REMOTE_HOST): 1530 ss = ssl.wrap_socket(socket.socket(socket.AF_INET)) 1531 ss.connect((REMOTE_HOST, 443)) 1532 fd = ss.fileno() 1533 f = ss.makefile() 1534 f.close() 1535 # The fd is still open 1536 os.read(fd, 0) 1537 # Closing the SSL socket should close the fd too 1538 ss.close() 1539 gc.collect() 1540 with self.assertRaises(OSError) as e: 1541 os.read(fd, 0) 1542 self.assertEqual(e.exception.errno, errno.EBADF) 1543 1544 def test_non_blocking_handshake(self): 1545 with support.transient_internet(REMOTE_HOST): 1546 s = socket.socket(socket.AF_INET) 1547 s.connect((REMOTE_HOST, 443)) 1548 s.setblocking(False) 1549 s = ssl.wrap_socket(s, 1550 cert_reqs=ssl.CERT_NONE, 1551 do_handshake_on_connect=False) 1552 count = 0 1553 while True: 1554 try: 1555 count += 1 1556 s.do_handshake() 1557 break 1558 except ssl.SSLWantReadError: 1559 select.select([s], [], []) 1560 except ssl.SSLWantWriteError: 1561 select.select([], [s], []) 1562 s.close() 1563 if support.verbose: 1564 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count) 1565 1566 def test_get_server_certificate(self): 1567 def _test_get_server_certificate(host, port, cert=None): 1568 with support.transient_internet(host): 1569 pem = ssl.get_server_certificate((host, port)) 1570 if not pem: 1571 self.fail("No server certificate on %s:%s!" % (host, port)) 1572 1573 try: 1574 pem = ssl.get_server_certificate((host, port), 1575 ca_certs=CERTFILE) 1576 except ssl.SSLError as x: 1577 #should fail 1578 if support.verbose: 1579 sys.stdout.write("%s\n" % x) 1580 else: 1581 self.fail("Got server certificate %s for %s:%s!" % (pem, host, port)) 1582 pem = ssl.get_server_certificate((host, port), 1583 ca_certs=cert) 1584 if not pem: 1585 self.fail("No server certificate on %s:%s!" % (host, port)) 1586 if support.verbose: 1587 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem)) 1588 1589 _test_get_server_certificate(REMOTE_HOST, 443, REMOTE_ROOT_CERT) 1590 if support.IPV6_ENABLED: 1591 _test_get_server_certificate('ipv6.google.com', 443) 1592 1593 def test_ciphers(self): 1594 remote = (REMOTE_HOST, 443) 1595 with support.transient_internet(remote[0]): 1596 with closing(ssl.wrap_socket(socket.socket(socket.AF_INET), 1597 cert_reqs=ssl.CERT_NONE, ciphers="ALL")) as s: 1598 s.connect(remote) 1599 with closing(ssl.wrap_socket(socket.socket(socket.AF_INET), 1600 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT")) as s: 1601 s.connect(remote) 1602 # Error checking can happen at instantiation or when connecting 1603 with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"): 1604 with closing(socket.socket(socket.AF_INET)) as sock: 1605 s = ssl.wrap_socket(sock, 1606 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx") 1607 s.connect(remote) 1608 1609 def test_get_ca_certs_capath(self): 1610 # capath certs are loaded on request 1611 with support.transient_internet(REMOTE_HOST): 1612 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 1613 ctx.verify_mode = ssl.CERT_REQUIRED 1614 ctx.load_verify_locations(capath=CAPATH) 1615 self.assertEqual(ctx.get_ca_certs(), []) 1616 s = ctx.wrap_socket(socket.socket(socket.AF_INET)) 1617 s.connect((REMOTE_HOST, 443)) 1618 try: 1619 cert = s.getpeercert() 1620 self.assertTrue(cert) 1621 finally: 1622 s.close() 1623 self.assertEqual(len(ctx.get_ca_certs()), 1) 1624 1625 @needs_sni 1626 def test_context_setget(self): 1627 # Check that the context of a connected socket can be replaced. 1628 with support.transient_internet(REMOTE_HOST): 1629 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1630 ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 1631 s = socket.socket(socket.AF_INET) 1632 with closing(ctx1.wrap_socket(s)) as ss: 1633 ss.connect((REMOTE_HOST, 443)) 1634 self.assertIs(ss.context, ctx1) 1635 self.assertIs(ss._sslobj.context, ctx1) 1636 ss.context = ctx2 1637 self.assertIs(ss.context, ctx2) 1638 self.assertIs(ss._sslobj.context, ctx2) 1639 1640try: 1641 import threading 1642except ImportError: 1643 _have_threads = False 1644else: 1645 _have_threads = True 1646 1647 from test.ssl_servers import make_https_server 1648 1649 class ThreadedEchoServer(threading.Thread): 1650 1651 class ConnectionHandler(threading.Thread): 1652 1653 """A mildly complicated class, because we want it to work both 1654 with and without the SSL wrapper around the socket connection, so 1655 that we can test the STARTTLS functionality.""" 1656 1657 def __init__(self, server, connsock, addr): 1658 self.server = server 1659 self.running = False 1660 self.sock = connsock 1661 self.addr = addr 1662 self.sock.setblocking(1) 1663 self.sslconn = None 1664 threading.Thread.__init__(self) 1665 self.daemon = True 1666 1667 def wrap_conn(self): 1668 try: 1669 self.sslconn = self.server.context.wrap_socket( 1670 self.sock, server_side=True) 1671 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol()) 1672 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol()) 1673 except socket.error as e: 1674 # We treat ConnectionResetError as though it were an 1675 # SSLError - OpenSSL on Ubuntu abruptly closes the 1676 # connection when asked to use an unsupported protocol. 1677 # 1678 # XXX Various errors can have happened here, for example 1679 # a mismatching protocol version, an invalid certificate, 1680 # or a low-level bug. This should be made more discriminating. 1681 if not isinstance(e, ssl.SSLError) and e.errno != errno.ECONNRESET: 1682 raise 1683 self.server.conn_errors.append(e) 1684 if self.server.chatty: 1685 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n") 1686 self.running = False 1687 self.server.stop() 1688 self.close() 1689 return False 1690 else: 1691 if self.server.context.verify_mode == ssl.CERT_REQUIRED: 1692 cert = self.sslconn.getpeercert() 1693 if support.verbose and self.server.chatty: 1694 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n") 1695 cert_binary = self.sslconn.getpeercert(True) 1696 if support.verbose and self.server.chatty: 1697 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n") 1698 cipher = self.sslconn.cipher() 1699 if support.verbose and self.server.chatty: 1700 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n") 1701 sys.stdout.write(" server: selected protocol is now " 1702 + str(self.sslconn.selected_npn_protocol()) + "\n") 1703 return True 1704 1705 def read(self): 1706 if self.sslconn: 1707 return self.sslconn.read() 1708 else: 1709 return self.sock.recv(1024) 1710 1711 def write(self, bytes): 1712 if self.sslconn: 1713 return self.sslconn.write(bytes) 1714 else: 1715 return self.sock.send(bytes) 1716 1717 def close(self): 1718 if self.sslconn: 1719 self.sslconn.close() 1720 else: 1721 self.sock.close() 1722 1723 def run(self): 1724 self.running = True 1725 if not self.server.starttls_server: 1726 if not self.wrap_conn(): 1727 return 1728 while self.running: 1729 try: 1730 msg = self.read() 1731 stripped = msg.strip() 1732 if not stripped: 1733 # eof, so quit this handler 1734 self.running = False 1735 self.close() 1736 elif stripped == b'over': 1737 if support.verbose and self.server.connectionchatty: 1738 sys.stdout.write(" server: client closed connection\n") 1739 self.close() 1740 return 1741 elif (self.server.starttls_server and 1742 stripped == b'STARTTLS'): 1743 if support.verbose and self.server.connectionchatty: 1744 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n") 1745 self.write(b"OK\n") 1746 if not self.wrap_conn(): 1747 return 1748 elif (self.server.starttls_server and self.sslconn 1749 and stripped == b'ENDTLS'): 1750 if support.verbose and self.server.connectionchatty: 1751 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n") 1752 self.write(b"OK\n") 1753 self.sock = self.sslconn.unwrap() 1754 self.sslconn = None 1755 if support.verbose and self.server.connectionchatty: 1756 sys.stdout.write(" server: connection is now unencrypted...\n") 1757 elif stripped == b'CB tls-unique': 1758 if support.verbose and self.server.connectionchatty: 1759 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n") 1760 data = self.sslconn.get_channel_binding("tls-unique") 1761 self.write(repr(data).encode("us-ascii") + b"\n") 1762 else: 1763 if (support.verbose and 1764 self.server.connectionchatty): 1765 ctype = (self.sslconn and "encrypted") or "unencrypted" 1766 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n" 1767 % (msg, ctype, msg.lower(), ctype)) 1768 self.write(msg.lower()) 1769 except ssl.SSLError: 1770 if self.server.chatty: 1771 handle_error("Test server failure:\n") 1772 self.close() 1773 self.running = False 1774 # normally, we'd just stop here, but for the test 1775 # harness, we want to stop the server 1776 self.server.stop() 1777 1778 def __init__(self, certificate=None, ssl_version=None, 1779 certreqs=None, cacerts=None, 1780 chatty=True, connectionchatty=False, starttls_server=False, 1781 npn_protocols=None, alpn_protocols=None, 1782 ciphers=None, context=None): 1783 if context: 1784 self.context = context 1785 else: 1786 self.context = ssl.SSLContext(ssl_version 1787 if ssl_version is not None 1788 else ssl.PROTOCOL_TLS) 1789 self.context.verify_mode = (certreqs if certreqs is not None 1790 else ssl.CERT_NONE) 1791 if cacerts: 1792 self.context.load_verify_locations(cacerts) 1793 if certificate: 1794 self.context.load_cert_chain(certificate) 1795 if npn_protocols: 1796 self.context.set_npn_protocols(npn_protocols) 1797 if alpn_protocols: 1798 self.context.set_alpn_protocols(alpn_protocols) 1799 if ciphers: 1800 self.context.set_ciphers(ciphers) 1801 self.chatty = chatty 1802 self.connectionchatty = connectionchatty 1803 self.starttls_server = starttls_server 1804 self.sock = socket.socket() 1805 self.port = support.bind_port(self.sock) 1806 self.flag = None 1807 self.active = False 1808 self.selected_npn_protocols = [] 1809 self.selected_alpn_protocols = [] 1810 self.conn_errors = [] 1811 threading.Thread.__init__(self) 1812 self.daemon = True 1813 1814 def __enter__(self): 1815 self.start(threading.Event()) 1816 self.flag.wait() 1817 return self 1818 1819 def __exit__(self, *args): 1820 self.stop() 1821 self.join() 1822 1823 def start(self, flag=None): 1824 self.flag = flag 1825 threading.Thread.start(self) 1826 1827 def run(self): 1828 self.sock.settimeout(0.05) 1829 self.sock.listen(5) 1830 self.active = True 1831 if self.flag: 1832 # signal an event 1833 self.flag.set() 1834 while self.active: 1835 try: 1836 newconn, connaddr = self.sock.accept() 1837 if support.verbose and self.chatty: 1838 sys.stdout.write(' server: new connection from ' 1839 + repr(connaddr) + '\n') 1840 handler = self.ConnectionHandler(self, newconn, connaddr) 1841 handler.start() 1842 handler.join() 1843 except socket.timeout: 1844 pass 1845 except KeyboardInterrupt: 1846 self.stop() 1847 self.sock.close() 1848 1849 def stop(self): 1850 self.active = False 1851 1852 class AsyncoreEchoServer(threading.Thread): 1853 1854 class EchoServer(asyncore.dispatcher): 1855 1856 class ConnectionHandler(asyncore.dispatcher_with_send): 1857 1858 def __init__(self, conn, certfile): 1859 self.socket = ssl.wrap_socket(conn, server_side=True, 1860 certfile=certfile, 1861 do_handshake_on_connect=False) 1862 asyncore.dispatcher_with_send.__init__(self, self.socket) 1863 self._ssl_accepting = True 1864 self._do_ssl_handshake() 1865 1866 def readable(self): 1867 if isinstance(self.socket, ssl.SSLSocket): 1868 while self.socket.pending() > 0: 1869 self.handle_read_event() 1870 return True 1871 1872 def _do_ssl_handshake(self): 1873 try: 1874 self.socket.do_handshake() 1875 except (ssl.SSLWantReadError, ssl.SSLWantWriteError): 1876 return 1877 except ssl.SSLEOFError: 1878 return self.handle_close() 1879 except ssl.SSLError: 1880 raise 1881 except socket.error, err: 1882 if err.args[0] == errno.ECONNABORTED: 1883 return self.handle_close() 1884 else: 1885 self._ssl_accepting = False 1886 1887 def handle_read(self): 1888 if self._ssl_accepting: 1889 self._do_ssl_handshake() 1890 else: 1891 data = self.recv(1024) 1892 if support.verbose: 1893 sys.stdout.write(" server: read %s from client\n" % repr(data)) 1894 if not data: 1895 self.close() 1896 else: 1897 self.send(data.lower()) 1898 1899 def handle_close(self): 1900 self.close() 1901 if support.verbose: 1902 sys.stdout.write(" server: closed connection %s\n" % self.socket) 1903 1904 def handle_error(self): 1905 raise 1906 1907 def __init__(self, certfile): 1908 self.certfile = certfile 1909 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1910 self.port = support.bind_port(sock, '') 1911 asyncore.dispatcher.__init__(self, sock) 1912 self.listen(5) 1913 1914 def handle_accept(self): 1915 sock_obj, addr = self.accept() 1916 if support.verbose: 1917 sys.stdout.write(" server: new connection from %s:%s\n" %addr) 1918 self.ConnectionHandler(sock_obj, self.certfile) 1919 1920 def handle_error(self): 1921 raise 1922 1923 def __init__(self, certfile): 1924 self.flag = None 1925 self.active = False 1926 self.server = self.EchoServer(certfile) 1927 self.port = self.server.port 1928 threading.Thread.__init__(self) 1929 self.daemon = True 1930 1931 def __str__(self): 1932 return "<%s %s>" % (self.__class__.__name__, self.server) 1933 1934 def __enter__(self): 1935 self.start(threading.Event()) 1936 self.flag.wait() 1937 return self 1938 1939 def __exit__(self, *args): 1940 if support.verbose: 1941 sys.stdout.write(" cleanup: stopping server.\n") 1942 self.stop() 1943 if support.verbose: 1944 sys.stdout.write(" cleanup: joining server thread.\n") 1945 self.join() 1946 if support.verbose: 1947 sys.stdout.write(" cleanup: successfully joined.\n") 1948 # make sure that ConnectionHandler is removed from socket_map 1949 asyncore.close_all(ignore_all=True) 1950 1951 def start(self, flag=None): 1952 self.flag = flag 1953 threading.Thread.start(self) 1954 1955 def run(self): 1956 self.active = True 1957 if self.flag: 1958 self.flag.set() 1959 while self.active: 1960 try: 1961 asyncore.loop(1) 1962 except: 1963 pass 1964 1965 def stop(self): 1966 self.active = False 1967 self.server.close() 1968 1969 def server_params_test(client_context, server_context, indata=b"FOO\n", 1970 chatty=True, connectionchatty=False, sni_name=None): 1971 """ 1972 Launch a server, connect a client to it and try various reads 1973 and writes. 1974 """ 1975 stats = {} 1976 server = ThreadedEchoServer(context=server_context, 1977 chatty=chatty, 1978 connectionchatty=False) 1979 with server: 1980 with closing(client_context.wrap_socket(socket.socket(), 1981 server_hostname=sni_name)) as s: 1982 s.connect((HOST, server.port)) 1983 for arg in [indata, bytearray(indata), memoryview(indata)]: 1984 if connectionchatty: 1985 if support.verbose: 1986 sys.stdout.write( 1987 " client: sending %r...\n" % indata) 1988 s.write(arg) 1989 outdata = s.read() 1990 if connectionchatty: 1991 if support.verbose: 1992 sys.stdout.write(" client: read %r\n" % outdata) 1993 if outdata != indata.lower(): 1994 raise AssertionError( 1995 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n" 1996 % (outdata[:20], len(outdata), 1997 indata[:20].lower(), len(indata))) 1998 s.write(b"over\n") 1999 if connectionchatty: 2000 if support.verbose: 2001 sys.stdout.write(" client: closing connection.\n") 2002 stats.update({ 2003 'compression': s.compression(), 2004 'cipher': s.cipher(), 2005 'peercert': s.getpeercert(), 2006 'client_alpn_protocol': s.selected_alpn_protocol(), 2007 'client_npn_protocol': s.selected_npn_protocol(), 2008 'version': s.version(), 2009 }) 2010 s.close() 2011 stats['server_alpn_protocols'] = server.selected_alpn_protocols 2012 stats['server_npn_protocols'] = server.selected_npn_protocols 2013 return stats 2014 2015 def try_protocol_combo(server_protocol, client_protocol, expect_success, 2016 certsreqs=None, server_options=0, client_options=0): 2017 """ 2018 Try to SSL-connect using *client_protocol* to *server_protocol*. 2019 If *expect_success* is true, assert that the connection succeeds, 2020 if it's false, assert that the connection fails. 2021 Also, if *expect_success* is a string, assert that it is the protocol 2022 version actually used by the connection. 2023 """ 2024 if certsreqs is None: 2025 certsreqs = ssl.CERT_NONE 2026 certtype = { 2027 ssl.CERT_NONE: "CERT_NONE", 2028 ssl.CERT_OPTIONAL: "CERT_OPTIONAL", 2029 ssl.CERT_REQUIRED: "CERT_REQUIRED", 2030 }[certsreqs] 2031 if support.verbose: 2032 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n" 2033 sys.stdout.write(formatstr % 2034 (ssl.get_protocol_name(client_protocol), 2035 ssl.get_protocol_name(server_protocol), 2036 certtype)) 2037 client_context = ssl.SSLContext(client_protocol) 2038 client_context.options |= client_options 2039 server_context = ssl.SSLContext(server_protocol) 2040 server_context.options |= server_options 2041 2042 # NOTE: we must enable "ALL" ciphers on the client, otherwise an 2043 # SSLv23 client will send an SSLv3 hello (rather than SSLv2) 2044 # starting from OpenSSL 1.0.0 (see issue #8322). 2045 if client_context.protocol == ssl.PROTOCOL_SSLv23: 2046 client_context.set_ciphers("ALL") 2047 2048 for ctx in (client_context, server_context): 2049 ctx.verify_mode = certsreqs 2050 ctx.load_cert_chain(CERTFILE) 2051 ctx.load_verify_locations(CERTFILE) 2052 try: 2053 stats = server_params_test(client_context, server_context, 2054 chatty=False, connectionchatty=False) 2055 # Protocol mismatch can result in either an SSLError, or a 2056 # "Connection reset by peer" error. 2057 except ssl.SSLError: 2058 if expect_success: 2059 raise 2060 except socket.error as e: 2061 if expect_success or e.errno != errno.ECONNRESET: 2062 raise 2063 else: 2064 if not expect_success: 2065 raise AssertionError( 2066 "Client protocol %s succeeded with server protocol %s!" 2067 % (ssl.get_protocol_name(client_protocol), 2068 ssl.get_protocol_name(server_protocol))) 2069 elif (expect_success is not True 2070 and expect_success != stats['version']): 2071 raise AssertionError("version mismatch: expected %r, got %r" 2072 % (expect_success, stats['version'])) 2073 2074 2075 class ThreadedTests(unittest.TestCase): 2076 2077 @skip_if_broken_ubuntu_ssl 2078 def test_echo(self): 2079 """Basic test of an SSL client connecting to a server""" 2080 if support.verbose: 2081 sys.stdout.write("\n") 2082 for protocol in PROTOCOLS: 2083 context = ssl.SSLContext(protocol) 2084 context.load_cert_chain(CERTFILE) 2085 server_params_test(context, context, 2086 chatty=True, connectionchatty=True) 2087 2088 def test_getpeercert(self): 2089 if support.verbose: 2090 sys.stdout.write("\n") 2091 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 2092 context.verify_mode = ssl.CERT_REQUIRED 2093 context.load_verify_locations(CERTFILE) 2094 context.load_cert_chain(CERTFILE) 2095 server = ThreadedEchoServer(context=context, chatty=False) 2096 with server: 2097 s = context.wrap_socket(socket.socket(), 2098 do_handshake_on_connect=False) 2099 s.connect((HOST, server.port)) 2100 # getpeercert() raise ValueError while the handshake isn't 2101 # done. 2102 with self.assertRaises(ValueError): 2103 s.getpeercert() 2104 s.do_handshake() 2105 cert = s.getpeercert() 2106 self.assertTrue(cert, "Can't get peer certificate.") 2107 cipher = s.cipher() 2108 if support.verbose: 2109 sys.stdout.write(pprint.pformat(cert) + '\n') 2110 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n') 2111 if 'subject' not in cert: 2112 self.fail("No subject field in certificate: %s." % 2113 pprint.pformat(cert)) 2114 if ((('organizationName', 'Python Software Foundation'),) 2115 not in cert['subject']): 2116 self.fail( 2117 "Missing or invalid 'organizationName' field in certificate subject; " 2118 "should be 'Python Software Foundation'.") 2119 self.assertIn('notBefore', cert) 2120 self.assertIn('notAfter', cert) 2121 before = ssl.cert_time_to_seconds(cert['notBefore']) 2122 after = ssl.cert_time_to_seconds(cert['notAfter']) 2123 self.assertLess(before, after) 2124 s.close() 2125 2126 @unittest.skipUnless(have_verify_flags(), 2127 "verify_flags need OpenSSL > 0.9.8") 2128 def test_crl_check(self): 2129 if support.verbose: 2130 sys.stdout.write("\n") 2131 2132 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 2133 server_context.load_cert_chain(SIGNED_CERTFILE) 2134 2135 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 2136 context.verify_mode = ssl.CERT_REQUIRED 2137 context.load_verify_locations(SIGNING_CA) 2138 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0) 2139 self.assertEqual(context.verify_flags, ssl.VERIFY_DEFAULT | tf) 2140 2141 # VERIFY_DEFAULT should pass 2142 server = ThreadedEchoServer(context=server_context, chatty=True) 2143 with server: 2144 with closing(context.wrap_socket(socket.socket())) as s: 2145 s.connect((HOST, server.port)) 2146 cert = s.getpeercert() 2147 self.assertTrue(cert, "Can't get peer certificate.") 2148 2149 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails 2150 context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF 2151 2152 server = ThreadedEchoServer(context=server_context, chatty=True) 2153 with server: 2154 with closing(context.wrap_socket(socket.socket())) as s: 2155 with self.assertRaisesRegexp(ssl.SSLError, 2156 "certificate verify failed"): 2157 s.connect((HOST, server.port)) 2158 2159 # now load a CRL file. The CRL file is signed by the CA. 2160 context.load_verify_locations(CRLFILE) 2161 2162 server = ThreadedEchoServer(context=server_context, chatty=True) 2163 with server: 2164 with closing(context.wrap_socket(socket.socket())) as s: 2165 s.connect((HOST, server.port)) 2166 cert = s.getpeercert() 2167 self.assertTrue(cert, "Can't get peer certificate.") 2168 2169 def test_check_hostname(self): 2170 if support.verbose: 2171 sys.stdout.write("\n") 2172 2173 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 2174 server_context.load_cert_chain(SIGNED_CERTFILE) 2175 2176 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 2177 context.verify_mode = ssl.CERT_REQUIRED 2178 context.check_hostname = True 2179 context.load_verify_locations(SIGNING_CA) 2180 2181 # correct hostname should verify 2182 server = ThreadedEchoServer(context=server_context, chatty=True) 2183 with server: 2184 with closing(context.wrap_socket(socket.socket(), 2185 server_hostname="localhost")) as s: 2186 s.connect((HOST, server.port)) 2187 cert = s.getpeercert() 2188 self.assertTrue(cert, "Can't get peer certificate.") 2189 2190 # incorrect hostname should raise an exception 2191 server = ThreadedEchoServer(context=server_context, chatty=True) 2192 with server: 2193 with closing(context.wrap_socket(socket.socket(), 2194 server_hostname="invalid")) as s: 2195 with self.assertRaisesRegexp(ssl.CertificateError, 2196 "hostname 'invalid' doesn't match u?'localhost'"): 2197 s.connect((HOST, server.port)) 2198 2199 # missing server_hostname arg should cause an exception, too 2200 server = ThreadedEchoServer(context=server_context, chatty=True) 2201 with server: 2202 with closing(socket.socket()) as s: 2203 with self.assertRaisesRegexp(ValueError, 2204 "check_hostname requires server_hostname"): 2205 context.wrap_socket(s) 2206 2207 def test_wrong_cert(self): 2208 """Connecting when the server rejects the client's certificate 2209 2210 Launch a server with CERT_REQUIRED, and check that trying to 2211 connect to it with a wrong client certificate fails. 2212 """ 2213 certfile = os.path.join(os.path.dirname(__file__) or os.curdir, 2214 "keycert.pem") 2215 server = ThreadedEchoServer(SIGNED_CERTFILE, 2216 certreqs=ssl.CERT_REQUIRED, 2217 cacerts=SIGNING_CA, chatty=False, 2218 connectionchatty=False) 2219 with server, \ 2220 closing(socket.socket()) as sock, \ 2221 closing(ssl.wrap_socket(sock, 2222 certfile=certfile, 2223 ssl_version=ssl.PROTOCOL_TLSv1)) as s: 2224 try: 2225 # Expect either an SSL error about the server rejecting 2226 # the connection, or a low-level connection reset (which 2227 # sometimes happens on Windows) 2228 s.connect((HOST, server.port)) 2229 except ssl.SSLError as e: 2230 if support.verbose: 2231 sys.stdout.write("\nSSLError is %r\n" % e) 2232 except socket.error as e: 2233 if e.errno != errno.ECONNRESET: 2234 raise 2235 if support.verbose: 2236 sys.stdout.write("\nsocket.error is %r\n" % e) 2237 else: 2238 self.fail("Use of invalid cert should have failed!") 2239 2240 def test_rude_shutdown(self): 2241 """A brutal shutdown of an SSL server should raise an OSError 2242 in the client when attempting handshake. 2243 """ 2244 listener_ready = threading.Event() 2245 listener_gone = threading.Event() 2246 2247 s = socket.socket() 2248 port = support.bind_port(s, HOST) 2249 2250 # `listener` runs in a thread. It sits in an accept() until 2251 # the main thread connects. Then it rudely closes the socket, 2252 # and sets Event `listener_gone` to let the main thread know 2253 # the socket is gone. 2254 def listener(): 2255 s.listen(5) 2256 listener_ready.set() 2257 newsock, addr = s.accept() 2258 newsock.close() 2259 s.close() 2260 listener_gone.set() 2261 2262 def connector(): 2263 listener_ready.wait() 2264 with closing(socket.socket()) as c: 2265 c.connect((HOST, port)) 2266 listener_gone.wait() 2267 try: 2268 ssl_sock = ssl.wrap_socket(c) 2269 except socket.error: 2270 pass 2271 else: 2272 self.fail('connecting to closed SSL socket should have failed') 2273 2274 t = threading.Thread(target=listener) 2275 t.start() 2276 try: 2277 connector() 2278 finally: 2279 t.join() 2280 2281 @skip_if_broken_ubuntu_ssl 2282 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'), 2283 "OpenSSL is compiled without SSLv2 support") 2284 def test_protocol_sslv2(self): 2285 """Connecting to an SSLv2 server with various client options""" 2286 if support.verbose: 2287 sys.stdout.write("\n") 2288 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True) 2289 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL) 2290 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED) 2291 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False) 2292 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False) 2293 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False) 2294 # SSLv23 client with specific SSL options 2295 if no_sslv2_implies_sslv3_hello(): 2296 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs 2297 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False, 2298 client_options=ssl.OP_NO_SSLv2) 2299 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False, 2300 client_options=ssl.OP_NO_SSLv3) 2301 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False, 2302 client_options=ssl.OP_NO_TLSv1) 2303 2304 @skip_if_broken_ubuntu_ssl 2305 def test_protocol_sslv23(self): 2306 """Connecting to an SSLv23 server with various client options""" 2307 if support.verbose: 2308 sys.stdout.write("\n") 2309 if hasattr(ssl, 'PROTOCOL_SSLv2'): 2310 try: 2311 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True) 2312 except socket.error as x: 2313 # this fails on some older versions of OpenSSL (0.9.7l, for instance) 2314 if support.verbose: 2315 sys.stdout.write( 2316 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n" 2317 % str(x)) 2318 if hasattr(ssl, 'PROTOCOL_SSLv3'): 2319 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False) 2320 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True) 2321 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1') 2322 2323 if hasattr(ssl, 'PROTOCOL_SSLv3'): 2324 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL) 2325 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL) 2326 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL) 2327 2328 if hasattr(ssl, 'PROTOCOL_SSLv3'): 2329 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED) 2330 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED) 2331 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED) 2332 2333 # Server with specific SSL options 2334 if hasattr(ssl, 'PROTOCOL_SSLv3'): 2335 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, 2336 server_options=ssl.OP_NO_SSLv3) 2337 # Will choose TLSv1 2338 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, 2339 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3) 2340 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, False, 2341 server_options=ssl.OP_NO_TLSv1) 2342 2343 2344 @skip_if_broken_ubuntu_ssl 2345 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'), 2346 "OpenSSL is compiled without SSLv3 support") 2347 def test_protocol_sslv3(self): 2348 """Connecting to an SSLv3 server with various client options""" 2349 if support.verbose: 2350 sys.stdout.write("\n") 2351 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3') 2352 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL) 2353 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED) 2354 if hasattr(ssl, 'PROTOCOL_SSLv2'): 2355 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False) 2356 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False, 2357 client_options=ssl.OP_NO_SSLv3) 2358 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False) 2359 if no_sslv2_implies_sslv3_hello(): 2360 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs 2361 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, 2362 False, client_options=ssl.OP_NO_SSLv2) 2363 2364 @skip_if_broken_ubuntu_ssl 2365 def test_protocol_tlsv1(self): 2366 """Connecting to a TLSv1 server with various client options""" 2367 if support.verbose: 2368 sys.stdout.write("\n") 2369 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1') 2370 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL) 2371 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED) 2372 if hasattr(ssl, 'PROTOCOL_SSLv2'): 2373 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False) 2374 if hasattr(ssl, 'PROTOCOL_SSLv3'): 2375 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False) 2376 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False, 2377 client_options=ssl.OP_NO_TLSv1) 2378 2379 @skip_if_broken_ubuntu_ssl 2380 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"), 2381 "TLS version 1.1 not supported.") 2382 def test_protocol_tlsv1_1(self): 2383 """Connecting to a TLSv1.1 server with various client options. 2384 Testing against older TLS versions.""" 2385 if support.verbose: 2386 sys.stdout.write("\n") 2387 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1') 2388 if hasattr(ssl, 'PROTOCOL_SSLv2'): 2389 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False) 2390 if hasattr(ssl, 'PROTOCOL_SSLv3'): 2391 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False) 2392 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv23, False, 2393 client_options=ssl.OP_NO_TLSv1_1) 2394 2395 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1') 2396 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False) 2397 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False) 2398 2399 2400 @skip_if_broken_ubuntu_ssl 2401 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"), 2402 "TLS version 1.2 not supported.") 2403 def test_protocol_tlsv1_2(self): 2404 """Connecting to a TLSv1.2 server with various client options. 2405 Testing against older TLS versions.""" 2406 if support.verbose: 2407 sys.stdout.write("\n") 2408 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2', 2409 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2, 2410 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,) 2411 if hasattr(ssl, 'PROTOCOL_SSLv2'): 2412 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False) 2413 if hasattr(ssl, 'PROTOCOL_SSLv3'): 2414 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False) 2415 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv23, False, 2416 client_options=ssl.OP_NO_TLSv1_2) 2417 2418 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2') 2419 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False) 2420 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False) 2421 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False) 2422 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False) 2423 2424 def test_starttls(self): 2425 """Switching from clear text to encrypted and back again.""" 2426 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6") 2427 2428 server = ThreadedEchoServer(CERTFILE, 2429 ssl_version=ssl.PROTOCOL_TLSv1, 2430 starttls_server=True, 2431 chatty=True, 2432 connectionchatty=True) 2433 wrapped = False 2434 with server: 2435 s = socket.socket() 2436 s.setblocking(1) 2437 s.connect((HOST, server.port)) 2438 if support.verbose: 2439 sys.stdout.write("\n") 2440 for indata in msgs: 2441 if support.verbose: 2442 sys.stdout.write( 2443 " client: sending %r...\n" % indata) 2444 if wrapped: 2445 conn.write(indata) 2446 outdata = conn.read() 2447 else: 2448 s.send(indata) 2449 outdata = s.recv(1024) 2450 msg = outdata.strip().lower() 2451 if indata == b"STARTTLS" and msg.startswith(b"ok"): 2452 # STARTTLS ok, switch to secure mode 2453 if support.verbose: 2454 sys.stdout.write( 2455 " client: read %r from server, starting TLS...\n" 2456 % msg) 2457 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1) 2458 wrapped = True 2459 elif indata == b"ENDTLS" and msg.startswith(b"ok"): 2460 # ENDTLS ok, switch back to clear text 2461 if support.verbose: 2462 sys.stdout.write( 2463 " client: read %r from server, ending TLS...\n" 2464 % msg) 2465 s = conn.unwrap() 2466 wrapped = False 2467 else: 2468 if support.verbose: 2469 sys.stdout.write( 2470 " client: read %r from server\n" % msg) 2471 if support.verbose: 2472 sys.stdout.write(" client: closing connection.\n") 2473 if wrapped: 2474 conn.write(b"over\n") 2475 else: 2476 s.send(b"over\n") 2477 if wrapped: 2478 conn.close() 2479 else: 2480 s.close() 2481 2482 def test_socketserver(self): 2483 """Using a SocketServer to create and manage SSL connections.""" 2484 server = make_https_server(self, certfile=CERTFILE) 2485 # try to connect 2486 if support.verbose: 2487 sys.stdout.write('\n') 2488 with open(CERTFILE, 'rb') as f: 2489 d1 = f.read() 2490 d2 = '' 2491 # now fetch the same data from the HTTPS server 2492 url = 'https://localhost:%d/%s' % ( 2493 server.port, os.path.split(CERTFILE)[1]) 2494 context = ssl.create_default_context(cafile=CERTFILE) 2495 f = urllib2.urlopen(url, context=context) 2496 try: 2497 dlen = f.info().getheader("content-length") 2498 if dlen and (int(dlen) > 0): 2499 d2 = f.read(int(dlen)) 2500 if support.verbose: 2501 sys.stdout.write( 2502 " client: read %d bytes from remote server '%s'\n" 2503 % (len(d2), server)) 2504 finally: 2505 f.close() 2506 self.assertEqual(d1, d2) 2507 2508 def test_asyncore_server(self): 2509 """Check the example asyncore integration.""" 2510 if support.verbose: 2511 sys.stdout.write("\n") 2512 2513 indata = b"FOO\n" 2514 server = AsyncoreEchoServer(CERTFILE) 2515 with server: 2516 s = ssl.wrap_socket(socket.socket()) 2517 s.connect(('127.0.0.1', server.port)) 2518 if support.verbose: 2519 sys.stdout.write( 2520 " client: sending %r...\n" % indata) 2521 s.write(indata) 2522 outdata = s.read() 2523 if support.verbose: 2524 sys.stdout.write(" client: read %r\n" % outdata) 2525 if outdata != indata.lower(): 2526 self.fail( 2527 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n" 2528 % (outdata[:20], len(outdata), 2529 indata[:20].lower(), len(indata))) 2530 s.write(b"over\n") 2531 if support.verbose: 2532 sys.stdout.write(" client: closing connection.\n") 2533 s.close() 2534 if support.verbose: 2535 sys.stdout.write(" client: connection closed.\n") 2536 2537 def test_recv_send(self): 2538 """Test recv(), send() and friends.""" 2539 if support.verbose: 2540 sys.stdout.write("\n") 2541 2542 server = ThreadedEchoServer(CERTFILE, 2543 certreqs=ssl.CERT_NONE, 2544 ssl_version=ssl.PROTOCOL_TLSv1, 2545 cacerts=CERTFILE, 2546 chatty=True, 2547 connectionchatty=False) 2548 with server: 2549 s = ssl.wrap_socket(socket.socket(), 2550 server_side=False, 2551 certfile=CERTFILE, 2552 ca_certs=CERTFILE, 2553 cert_reqs=ssl.CERT_NONE, 2554 ssl_version=ssl.PROTOCOL_TLSv1) 2555 s.connect((HOST, server.port)) 2556 # helper methods for standardising recv* method signatures 2557 def _recv_into(): 2558 b = bytearray(b"\0"*100) 2559 count = s.recv_into(b) 2560 return b[:count] 2561 2562 def _recvfrom_into(): 2563 b = bytearray(b"\0"*100) 2564 count, addr = s.recvfrom_into(b) 2565 return b[:count] 2566 2567 # (name, method, whether to expect success, *args) 2568 send_methods = [ 2569 ('send', s.send, True, []), 2570 ('sendto', s.sendto, False, ["some.address"]), 2571 ('sendall', s.sendall, True, []), 2572 ] 2573 recv_methods = [ 2574 ('recv', s.recv, True, []), 2575 ('recvfrom', s.recvfrom, False, ["some.address"]), 2576 ('recv_into', _recv_into, True, []), 2577 ('recvfrom_into', _recvfrom_into, False, []), 2578 ] 2579 data_prefix = u"PREFIX_" 2580 2581 for meth_name, send_meth, expect_success, args in send_methods: 2582 indata = (data_prefix + meth_name).encode('ascii') 2583 try: 2584 send_meth(indata, *args) 2585 outdata = s.read() 2586 if outdata != indata.lower(): 2587 self.fail( 2588 "While sending with <<{name:s}>> bad data " 2589 "<<{outdata:r}>> ({nout:d}) received; " 2590 "expected <<{indata:r}>> ({nin:d})\n".format( 2591 name=meth_name, outdata=outdata[:20], 2592 nout=len(outdata), 2593 indata=indata[:20], nin=len(indata) 2594 ) 2595 ) 2596 except ValueError as e: 2597 if expect_success: 2598 self.fail( 2599 "Failed to send with method <<{name:s}>>; " 2600 "expected to succeed.\n".format(name=meth_name) 2601 ) 2602 if not str(e).startswith(meth_name): 2603 self.fail( 2604 "Method <<{name:s}>> failed with unexpected " 2605 "exception message: {exp:s}\n".format( 2606 name=meth_name, exp=e 2607 ) 2608 ) 2609 2610 for meth_name, recv_meth, expect_success, args in recv_methods: 2611 indata = (data_prefix + meth_name).encode('ascii') 2612 try: 2613 s.send(indata) 2614 outdata = recv_meth(*args) 2615 if outdata != indata.lower(): 2616 self.fail( 2617 "While receiving with <<{name:s}>> bad data " 2618 "<<{outdata:r}>> ({nout:d}) received; " 2619 "expected <<{indata:r}>> ({nin:d})\n".format( 2620 name=meth_name, outdata=outdata[:20], 2621 nout=len(outdata), 2622 indata=indata[:20], nin=len(indata) 2623 ) 2624 ) 2625 except ValueError as e: 2626 if expect_success: 2627 self.fail( 2628 "Failed to receive with method <<{name:s}>>; " 2629 "expected to succeed.\n".format(name=meth_name) 2630 ) 2631 if not str(e).startswith(meth_name): 2632 self.fail( 2633 "Method <<{name:s}>> failed with unexpected " 2634 "exception message: {exp:s}\n".format( 2635 name=meth_name, exp=e 2636 ) 2637 ) 2638 # consume data 2639 s.read() 2640 2641 # read(-1, buffer) is supported, even though read(-1) is not 2642 data = b"data" 2643 s.send(data) 2644 buffer = bytearray(len(data)) 2645 self.assertEqual(s.read(-1, buffer), len(data)) 2646 self.assertEqual(buffer, data) 2647 2648 s.write(b"over\n") 2649 2650 self.assertRaises(ValueError, s.recv, -1) 2651 self.assertRaises(ValueError, s.read, -1) 2652 2653 s.close() 2654 2655 def test_recv_zero(self): 2656 server = ThreadedEchoServer(CERTFILE) 2657 server.__enter__() 2658 self.addCleanup(server.__exit__, None, None) 2659 s = socket.create_connection((HOST, server.port)) 2660 self.addCleanup(s.close) 2661 s = ssl.wrap_socket(s, suppress_ragged_eofs=False) 2662 self.addCleanup(s.close) 2663 2664 # recv/read(0) should return no data 2665 s.send(b"data") 2666 self.assertEqual(s.recv(0), b"") 2667 self.assertEqual(s.read(0), b"") 2668 self.assertEqual(s.read(), b"data") 2669 2670 # Should not block if the other end sends no data 2671 s.setblocking(False) 2672 self.assertEqual(s.recv(0), b"") 2673 self.assertEqual(s.recv_into(bytearray()), 0) 2674 2675 def test_handshake_timeout(self): 2676 # Issue #5103: SSL handshake must respect the socket timeout 2677 server = socket.socket(socket.AF_INET) 2678 host = "127.0.0.1" 2679 port = support.bind_port(server) 2680 started = threading.Event() 2681 finish = False 2682 2683 def serve(): 2684 server.listen(5) 2685 started.set() 2686 conns = [] 2687 while not finish: 2688 r, w, e = select.select([server], [], [], 0.1) 2689 if server in r: 2690 # Let the socket hang around rather than having 2691 # it closed by garbage collection. 2692 conns.append(server.accept()[0]) 2693 for sock in conns: 2694 sock.close() 2695 2696 t = threading.Thread(target=serve) 2697 t.start() 2698 started.wait() 2699 2700 try: 2701 try: 2702 c = socket.socket(socket.AF_INET) 2703 c.settimeout(0.2) 2704 c.connect((host, port)) 2705 # Will attempt handshake and time out 2706 self.assertRaisesRegexp(ssl.SSLError, "timed out", 2707 ssl.wrap_socket, c) 2708 finally: 2709 c.close() 2710 try: 2711 c = socket.socket(socket.AF_INET) 2712 c = ssl.wrap_socket(c) 2713 c.settimeout(0.2) 2714 # Will attempt handshake and time out 2715 self.assertRaisesRegexp(ssl.SSLError, "timed out", 2716 c.connect, (host, port)) 2717 finally: 2718 c.close() 2719 finally: 2720 finish = True 2721 t.join() 2722 server.close() 2723 2724 def test_server_accept(self): 2725 # Issue #16357: accept() on a SSLSocket created through 2726 # SSLContext.wrap_socket(). 2727 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 2728 context.verify_mode = ssl.CERT_REQUIRED 2729 context.load_verify_locations(CERTFILE) 2730 context.load_cert_chain(CERTFILE) 2731 server = socket.socket(socket.AF_INET) 2732 host = "127.0.0.1" 2733 port = support.bind_port(server) 2734 server = context.wrap_socket(server, server_side=True) 2735 2736 evt = threading.Event() 2737 remote = [None] 2738 peer = [None] 2739 def serve(): 2740 server.listen(5) 2741 # Block on the accept and wait on the connection to close. 2742 evt.set() 2743 remote[0], peer[0] = server.accept() 2744 remote[0].recv(1) 2745 2746 t = threading.Thread(target=serve) 2747 t.start() 2748 # Client wait until server setup and perform a connect. 2749 evt.wait() 2750 client = context.wrap_socket(socket.socket()) 2751 client.connect((host, port)) 2752 client_addr = client.getsockname() 2753 client.close() 2754 t.join() 2755 remote[0].close() 2756 server.close() 2757 # Sanity checks. 2758 self.assertIsInstance(remote[0], ssl.SSLSocket) 2759 self.assertEqual(peer[0], client_addr) 2760 2761 def test_getpeercert_enotconn(self): 2762 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 2763 with closing(context.wrap_socket(socket.socket())) as sock: 2764 with self.assertRaises(socket.error) as cm: 2765 sock.getpeercert() 2766 self.assertEqual(cm.exception.errno, errno.ENOTCONN) 2767 2768 def test_do_handshake_enotconn(self): 2769 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 2770 with closing(context.wrap_socket(socket.socket())) as sock: 2771 with self.assertRaises(socket.error) as cm: 2772 sock.do_handshake() 2773 self.assertEqual(cm.exception.errno, errno.ENOTCONN) 2774 2775 def test_default_ciphers(self): 2776 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 2777 try: 2778 # Force a set of weak ciphers on our client context 2779 context.set_ciphers("DES") 2780 except ssl.SSLError: 2781 self.skipTest("no DES cipher available") 2782 with ThreadedEchoServer(CERTFILE, 2783 ssl_version=ssl.PROTOCOL_SSLv23, 2784 chatty=False) as server: 2785 with closing(context.wrap_socket(socket.socket())) as s: 2786 with self.assertRaises(ssl.SSLError): 2787 s.connect((HOST, server.port)) 2788 self.assertIn("no shared cipher", str(server.conn_errors[0])) 2789 2790 def test_version_basic(self): 2791 """ 2792 Basic tests for SSLSocket.version(). 2793 More tests are done in the test_protocol_*() methods. 2794 """ 2795 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 2796 with ThreadedEchoServer(CERTFILE, 2797 ssl_version=ssl.PROTOCOL_TLSv1, 2798 chatty=False) as server: 2799 with closing(context.wrap_socket(socket.socket())) as s: 2800 self.assertIs(s.version(), None) 2801 s.connect((HOST, server.port)) 2802 self.assertEqual(s.version(), 'TLSv1') 2803 self.assertIs(s.version(), None) 2804 2805 @unittest.skipUnless(ssl.HAS_TLSv1_3, 2806 "test requires TLSv1.3 enabled OpenSSL") 2807 def test_tls1_3(self): 2808 context = ssl.SSLContext(ssl.PROTOCOL_TLS) 2809 context.load_cert_chain(CERTFILE) 2810 # disable all but TLS 1.3 2811 context.options |= ( 2812 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2 2813 ) 2814 with ThreadedEchoServer(context=context) as server: 2815 with context.wrap_socket(socket.socket()) as s: 2816 s.connect((HOST, server.port)) 2817 self.assertIn(s.cipher()[0], [ 2818 'TLS13-AES-256-GCM-SHA384', 2819 'TLS13-CHACHA20-POLY1305-SHA256', 2820 'TLS13-AES-128-GCM-SHA256', 2821 ]) 2822 2823 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL") 2824 def test_default_ecdh_curve(self): 2825 # Issue #21015: elliptic curve-based Diffie Hellman key exchange 2826 # should be enabled by default on SSL contexts. 2827 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 2828 context.load_cert_chain(CERTFILE) 2829 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled 2830 # explicitly using the 'ECCdraft' cipher alias. Otherwise, 2831 # our default cipher list should prefer ECDH-based ciphers 2832 # automatically. 2833 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0): 2834 context.set_ciphers("ECCdraft:ECDH") 2835 with ThreadedEchoServer(context=context) as server: 2836 with closing(context.wrap_socket(socket.socket())) as s: 2837 s.connect((HOST, server.port)) 2838 self.assertIn("ECDH", s.cipher()[0]) 2839 2840 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES, 2841 "'tls-unique' channel binding not available") 2842 def test_tls_unique_channel_binding(self): 2843 """Test tls-unique channel binding.""" 2844 if support.verbose: 2845 sys.stdout.write("\n") 2846 2847 server = ThreadedEchoServer(CERTFILE, 2848 certreqs=ssl.CERT_NONE, 2849 ssl_version=ssl.PROTOCOL_TLSv1, 2850 cacerts=CERTFILE, 2851 chatty=True, 2852 connectionchatty=False) 2853 with server: 2854 s = ssl.wrap_socket(socket.socket(), 2855 server_side=False, 2856 certfile=CERTFILE, 2857 ca_certs=CERTFILE, 2858 cert_reqs=ssl.CERT_NONE, 2859 ssl_version=ssl.PROTOCOL_TLSv1) 2860 s.connect((HOST, server.port)) 2861 # get the data 2862 cb_data = s.get_channel_binding("tls-unique") 2863 if support.verbose: 2864 sys.stdout.write(" got channel binding data: {0!r}\n" 2865 .format(cb_data)) 2866 2867 # check if it is sane 2868 self.assertIsNotNone(cb_data) 2869 self.assertEqual(len(cb_data), 12) # True for TLSv1 2870 2871 # and compare with the peers version 2872 s.write(b"CB tls-unique\n") 2873 peer_data_repr = s.read().strip() 2874 self.assertEqual(peer_data_repr, 2875 repr(cb_data).encode("us-ascii")) 2876 s.close() 2877 2878 # now, again 2879 s = ssl.wrap_socket(socket.socket(), 2880 server_side=False, 2881 certfile=CERTFILE, 2882 ca_certs=CERTFILE, 2883 cert_reqs=ssl.CERT_NONE, 2884 ssl_version=ssl.PROTOCOL_TLSv1) 2885 s.connect((HOST, server.port)) 2886 new_cb_data = s.get_channel_binding("tls-unique") 2887 if support.verbose: 2888 sys.stdout.write(" got another channel binding data: {0!r}\n" 2889 .format(new_cb_data)) 2890 # is it really unique 2891 self.assertNotEqual(cb_data, new_cb_data) 2892 self.assertIsNotNone(cb_data) 2893 self.assertEqual(len(cb_data), 12) # True for TLSv1 2894 s.write(b"CB tls-unique\n") 2895 peer_data_repr = s.read().strip() 2896 self.assertEqual(peer_data_repr, 2897 repr(new_cb_data).encode("us-ascii")) 2898 s.close() 2899 2900 def test_compression(self): 2901 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 2902 context.load_cert_chain(CERTFILE) 2903 stats = server_params_test(context, context, 2904 chatty=True, connectionchatty=True) 2905 if support.verbose: 2906 sys.stdout.write(" got compression: {!r}\n".format(stats['compression'])) 2907 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' }) 2908 2909 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'), 2910 "ssl.OP_NO_COMPRESSION needed for this test") 2911 def test_compression_disabled(self): 2912 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 2913 context.load_cert_chain(CERTFILE) 2914 context.options |= ssl.OP_NO_COMPRESSION 2915 stats = server_params_test(context, context, 2916 chatty=True, connectionchatty=True) 2917 self.assertIs(stats['compression'], None) 2918 2919 def test_dh_params(self): 2920 # Check we can get a connection with ephemeral Diffie-Hellman 2921 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 2922 context.load_cert_chain(CERTFILE) 2923 context.load_dh_params(DHFILE) 2924 context.set_ciphers("kEDH") 2925 stats = server_params_test(context, context, 2926 chatty=True, connectionchatty=True) 2927 cipher = stats["cipher"][0] 2928 parts = cipher.split("-") 2929 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts: 2930 self.fail("Non-DH cipher: " + cipher[0]) 2931 2932 def test_selected_alpn_protocol(self): 2933 # selected_alpn_protocol() is None unless ALPN is used. 2934 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 2935 context.load_cert_chain(CERTFILE) 2936 stats = server_params_test(context, context, 2937 chatty=True, connectionchatty=True) 2938 self.assertIs(stats['client_alpn_protocol'], None) 2939 2940 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required") 2941 def test_selected_alpn_protocol_if_server_uses_alpn(self): 2942 # selected_alpn_protocol() is None unless ALPN is used by the client. 2943 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 2944 client_context.load_verify_locations(CERTFILE) 2945 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 2946 server_context.load_cert_chain(CERTFILE) 2947 server_context.set_alpn_protocols(['foo', 'bar']) 2948 stats = server_params_test(client_context, server_context, 2949 chatty=True, connectionchatty=True) 2950 self.assertIs(stats['client_alpn_protocol'], None) 2951 2952 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test") 2953 def test_alpn_protocols(self): 2954 server_protocols = ['foo', 'bar', 'milkshake'] 2955 protocol_tests = [ 2956 (['foo', 'bar'], 'foo'), 2957 (['bar', 'foo'], 'foo'), 2958 (['milkshake'], 'milkshake'), 2959 (['http/3.0', 'http/4.0'], None) 2960 ] 2961 for client_protocols, expected in protocol_tests: 2962 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) 2963 server_context.load_cert_chain(CERTFILE) 2964 server_context.set_alpn_protocols(server_protocols) 2965 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) 2966 client_context.load_cert_chain(CERTFILE) 2967 client_context.set_alpn_protocols(client_protocols) 2968 2969 try: 2970 stats = server_params_test(client_context, 2971 server_context, 2972 chatty=True, 2973 connectionchatty=True) 2974 except ssl.SSLError as e: 2975 stats = e 2976 2977 if (expected is None and IS_OPENSSL_1_1 2978 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)): 2979 # OpenSSL 1.1.0 to 1.1.0e raises handshake error 2980 self.assertIsInstance(stats, ssl.SSLError) 2981 else: 2982 msg = "failed trying %s (s) and %s (c).\n" \ 2983 "was expecting %s, but got %%s from the %%s" \ 2984 % (str(server_protocols), str(client_protocols), 2985 str(expected)) 2986 client_result = stats['client_alpn_protocol'] 2987 self.assertEqual(client_result, expected, 2988 msg % (client_result, "client")) 2989 server_result = stats['server_alpn_protocols'][-1] \ 2990 if len(stats['server_alpn_protocols']) else 'nothing' 2991 self.assertEqual(server_result, expected, 2992 msg % (server_result, "server")) 2993 2994 def test_selected_npn_protocol(self): 2995 # selected_npn_protocol() is None unless NPN is used 2996 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 2997 context.load_cert_chain(CERTFILE) 2998 stats = server_params_test(context, context, 2999 chatty=True, connectionchatty=True) 3000 self.assertIs(stats['client_npn_protocol'], None) 3001 3002 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test") 3003 def test_npn_protocols(self): 3004 server_protocols = ['http/1.1', 'spdy/2'] 3005 protocol_tests = [ 3006 (['http/1.1', 'spdy/2'], 'http/1.1'), 3007 (['spdy/2', 'http/1.1'], 'http/1.1'), 3008 (['spdy/2', 'test'], 'spdy/2'), 3009 (['abc', 'def'], 'abc') 3010 ] 3011 for client_protocols, expected in protocol_tests: 3012 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 3013 server_context.load_cert_chain(CERTFILE) 3014 server_context.set_npn_protocols(server_protocols) 3015 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 3016 client_context.load_cert_chain(CERTFILE) 3017 client_context.set_npn_protocols(client_protocols) 3018 stats = server_params_test(client_context, server_context, 3019 chatty=True, connectionchatty=True) 3020 3021 msg = "failed trying %s (s) and %s (c).\n" \ 3022 "was expecting %s, but got %%s from the %%s" \ 3023 % (str(server_protocols), str(client_protocols), 3024 str(expected)) 3025 client_result = stats['client_npn_protocol'] 3026 self.assertEqual(client_result, expected, msg % (client_result, "client")) 3027 server_result = stats['server_npn_protocols'][-1] \ 3028 if len(stats['server_npn_protocols']) else 'nothing' 3029 self.assertEqual(server_result, expected, msg % (server_result, "server")) 3030 3031 def sni_contexts(self): 3032 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 3033 server_context.load_cert_chain(SIGNED_CERTFILE) 3034 other_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 3035 other_context.load_cert_chain(SIGNED_CERTFILE2) 3036 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 3037 client_context.verify_mode = ssl.CERT_REQUIRED 3038 client_context.load_verify_locations(SIGNING_CA) 3039 return server_context, other_context, client_context 3040 3041 def check_common_name(self, stats, name): 3042 cert = stats['peercert'] 3043 self.assertIn((('commonName', name),), cert['subject']) 3044 3045 @needs_sni 3046 def test_sni_callback(self): 3047 calls = [] 3048 server_context, other_context, client_context = self.sni_contexts() 3049 3050 def servername_cb(ssl_sock, server_name, initial_context): 3051 calls.append((server_name, initial_context)) 3052 if server_name is not None: 3053 ssl_sock.context = other_context 3054 server_context.set_servername_callback(servername_cb) 3055 3056 stats = server_params_test(client_context, server_context, 3057 chatty=True, 3058 sni_name='supermessage') 3059 # The hostname was fetched properly, and the certificate was 3060 # changed for the connection. 3061 self.assertEqual(calls, [("supermessage", server_context)]) 3062 # CERTFILE4 was selected 3063 self.check_common_name(stats, 'fakehostname') 3064 3065 calls = [] 3066 # The callback is called with server_name=None 3067 stats = server_params_test(client_context, server_context, 3068 chatty=True, 3069 sni_name=None) 3070 self.assertEqual(calls, [(None, server_context)]) 3071 self.check_common_name(stats, 'localhost') 3072 3073 # Check disabling the callback 3074 calls = [] 3075 server_context.set_servername_callback(None) 3076 3077 stats = server_params_test(client_context, server_context, 3078 chatty=True, 3079 sni_name='notfunny') 3080 # Certificate didn't change 3081 self.check_common_name(stats, 'localhost') 3082 self.assertEqual(calls, []) 3083 3084 @needs_sni 3085 def test_sni_callback_alert(self): 3086 # Returning a TLS alert is reflected to the connecting client 3087 server_context, other_context, client_context = self.sni_contexts() 3088 3089 def cb_returning_alert(ssl_sock, server_name, initial_context): 3090 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED 3091 server_context.set_servername_callback(cb_returning_alert) 3092 3093 with self.assertRaises(ssl.SSLError) as cm: 3094 stats = server_params_test(client_context, server_context, 3095 chatty=False, 3096 sni_name='supermessage') 3097 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED') 3098 3099 @needs_sni 3100 def test_sni_callback_raising(self): 3101 # Raising fails the connection with a TLS handshake failure alert. 3102 server_context, other_context, client_context = self.sni_contexts() 3103 3104 def cb_raising(ssl_sock, server_name, initial_context): 3105 1.0/0.0 3106 server_context.set_servername_callback(cb_raising) 3107 3108 with self.assertRaises(ssl.SSLError) as cm, \ 3109 support.captured_stderr() as stderr: 3110 stats = server_params_test(client_context, server_context, 3111 chatty=False, 3112 sni_name='supermessage') 3113 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE') 3114 self.assertIn("ZeroDivisionError", stderr.getvalue()) 3115 3116 @needs_sni 3117 def test_sni_callback_wrong_return_type(self): 3118 # Returning the wrong return type terminates the TLS connection 3119 # with an internal error alert. 3120 server_context, other_context, client_context = self.sni_contexts() 3121 3122 def cb_wrong_return_type(ssl_sock, server_name, initial_context): 3123 return "foo" 3124 server_context.set_servername_callback(cb_wrong_return_type) 3125 3126 with self.assertRaises(ssl.SSLError) as cm, \ 3127 support.captured_stderr() as stderr: 3128 stats = server_params_test(client_context, server_context, 3129 chatty=False, 3130 sni_name='supermessage') 3131 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR') 3132 self.assertIn("TypeError", stderr.getvalue()) 3133 3134 def test_read_write_after_close_raises_valuerror(self): 3135 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 3136 context.verify_mode = ssl.CERT_REQUIRED 3137 context.load_verify_locations(CERTFILE) 3138 context.load_cert_chain(CERTFILE) 3139 server = ThreadedEchoServer(context=context, chatty=False) 3140 3141 with server: 3142 s = context.wrap_socket(socket.socket()) 3143 s.connect((HOST, server.port)) 3144 s.close() 3145 3146 self.assertRaises(ValueError, s.read, 1024) 3147 self.assertRaises(ValueError, s.write, b'hello') 3148 3149 3150def test_main(verbose=False): 3151 if support.verbose: 3152 plats = { 3153 'Linux': platform.linux_distribution, 3154 'Mac': platform.mac_ver, 3155 'Windows': platform.win32_ver, 3156 } 3157 for name, func in plats.items(): 3158 plat = func() 3159 if plat and plat[0]: 3160 plat = '%s %r' % (name, plat) 3161 break 3162 else: 3163 plat = repr(platform.platform()) 3164 print("test_ssl: testing with %r %r" % 3165 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO)) 3166 print(" under %s" % plat) 3167 print(" HAS_SNI = %r" % ssl.HAS_SNI) 3168 print(" OP_ALL = 0x%8x" % ssl.OP_ALL) 3169 try: 3170 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1) 3171 except AttributeError: 3172 pass 3173 3174 for filename in [ 3175 CERTFILE, REMOTE_ROOT_CERT, BYTES_CERTFILE, 3176 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY, 3177 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA, 3178 BADCERT, BADKEY, EMPTYCERT]: 3179 if not os.path.exists(filename): 3180 raise support.TestFailed("Can't read certificate file %r" % filename) 3181 3182 tests = [ContextTests, BasicTests, BasicSocketTests, SSLErrorTests] 3183 3184 if support.is_resource_enabled('network'): 3185 tests.append(NetworkedTests) 3186 3187 if _have_threads: 3188 thread_info = support.threading_setup() 3189 if thread_info: 3190 tests.append(ThreadedTests) 3191 3192 try: 3193 support.run_unittest(*tests) 3194 finally: 3195 if _have_threads: 3196 support.threading_cleanup(*thread_info) 3197 3198if __name__ == "__main__": 3199 test_main() 3200