1# -*- coding: utf-8 -*-
2# Test the support for SSL and sockets
3
4import sys
5import unittest
6from test import test_support as support
7from test.script_helper import assert_python_ok
8import asyncore
9import socket
10import select
11import time
12import datetime
13import gc
14import os
15import errno
16import pprint
17import shutil
18import urllib2
19import traceback
20import weakref
21import platform
22import functools
23from contextlib import closing
24
25ssl = support.import_module("ssl")
26
27PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
28HOST = support.HOST
29IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
30IS_OPENSSL_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
31
32
33def data_file(*name):
34    return os.path.join(os.path.dirname(__file__), *name)
35
36# The custom key and certificate files used in test_ssl are generated
37# using Lib/test/make_ssl_certs.py.
38# Other certificates are simply fetched from the Internet servers they
39# are meant to authenticate.
40
41CERTFILE = data_file("keycert.pem")
42BYTES_CERTFILE = CERTFILE.encode(sys.getfilesystemencoding())
43ONLYCERT = data_file("ssl_cert.pem")
44ONLYKEY = data_file("ssl_key.pem")
45BYTES_ONLYCERT = ONLYCERT.encode(sys.getfilesystemencoding())
46BYTES_ONLYKEY = ONLYKEY.encode(sys.getfilesystemencoding())
47CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
48ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
49KEY_PASSWORD = "somepass"
50CAPATH = data_file("capath")
51BYTES_CAPATH = CAPATH.encode(sys.getfilesystemencoding())
52CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
53CAFILE_CACERT = data_file("capath", "5ed36f99.0")
54
55
56# empty CRL
57CRLFILE = data_file("revocation.crl")
58
59# Two keys and certs signed by the same CA (for SNI tests)
60SIGNED_CERTFILE = data_file("keycert3.pem")
61SIGNED_CERTFILE2 = data_file("keycert4.pem")
62SIGNING_CA = data_file("pycacert.pem")
63# cert with all kinds of subject alt names
64ALLSANFILE = data_file("allsans.pem")
65
66REMOTE_HOST = "self-signed.pythontest.net"
67REMOTE_ROOT_CERT = data_file("selfsigned_pythontestdotnet.pem")
68
69EMPTYCERT = data_file("nullcert.pem")
70BADCERT = data_file("badcert.pem")
71NONEXISTINGCERT = data_file("XXXnonexisting.pem")
72BADKEY = data_file("badkey.pem")
73NOKIACERT = data_file("nokia.pem")
74NULLBYTECERT = data_file("nullbytecert.pem")
75
76DHFILE = data_file("ffdh3072.pem")
77BYTES_DHFILE = DHFILE.encode(sys.getfilesystemencoding())
78
79
80def handle_error(prefix):
81    exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
82    if support.verbose:
83        sys.stdout.write(prefix + exc_format)
84
85
86class BasicTests(unittest.TestCase):
87
88    def test_sslwrap_simple(self):
89        # A crude test for the legacy API
90        try:
91            ssl.sslwrap_simple(socket.socket(socket.AF_INET))
92        except IOError, e:
93            if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
94                pass
95            else:
96                raise
97        try:
98            ssl.sslwrap_simple(socket.socket(socket.AF_INET)._sock)
99        except IOError, e:
100            if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
101                pass
102            else:
103                raise
104
105
106def can_clear_options():
107    # 0.9.8m or higher
108    return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
109
110def no_sslv2_implies_sslv3_hello():
111    # 0.9.7h or higher
112    return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
113
114def have_verify_flags():
115    # 0.9.8 or higher
116    return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
117
118def utc_offset(): #NOTE: ignore issues like #1647654
119    # local time = utc time + utc offset
120    if time.daylight and time.localtime().tm_isdst > 0:
121        return -time.altzone  # seconds
122    return -time.timezone
123
124def asn1time(cert_time):
125    # Some versions of OpenSSL ignore seconds, see #18207
126    # 0.9.8.i
127    if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
128        fmt = "%b %d %H:%M:%S %Y GMT"
129        dt = datetime.datetime.strptime(cert_time, fmt)
130        dt = dt.replace(second=0)
131        cert_time = dt.strftime(fmt)
132        # %d adds leading zero but ASN1_TIME_print() uses leading space
133        if cert_time[4] == "0":
134            cert_time = cert_time[:4] + " " + cert_time[5:]
135
136    return cert_time
137
138# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
139def skip_if_broken_ubuntu_ssl(func):
140    if hasattr(ssl, 'PROTOCOL_SSLv2'):
141        @functools.wraps(func)
142        def f(*args, **kwargs):
143            try:
144                ssl.SSLContext(ssl.PROTOCOL_SSLv2)
145            except ssl.SSLError:
146                if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
147                    platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
148                    raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
149            return func(*args, **kwargs)
150        return f
151    else:
152        return func
153
154needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
155
156
157class BasicSocketTests(unittest.TestCase):
158
159    def test_constants(self):
160        ssl.CERT_NONE
161        ssl.CERT_OPTIONAL
162        ssl.CERT_REQUIRED
163        ssl.OP_CIPHER_SERVER_PREFERENCE
164        ssl.OP_SINGLE_DH_USE
165        if ssl.HAS_ECDH:
166            ssl.OP_SINGLE_ECDH_USE
167        if ssl.OPENSSL_VERSION_INFO >= (1, 0):
168            ssl.OP_NO_COMPRESSION
169        self.assertIn(ssl.HAS_SNI, {True, False})
170        self.assertIn(ssl.HAS_ECDH, {True, False})
171        ssl.OP_NO_SSLv2
172        ssl.OP_NO_SSLv3
173        ssl.OP_NO_TLSv1
174        ssl.OP_NO_TLSv1_3
175        if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
176            ssl.OP_NO_TLSv1_1
177            ssl.OP_NO_TLSv1_2
178
179    def test_random(self):
180        v = ssl.RAND_status()
181        if support.verbose:
182            sys.stdout.write("\n RAND_status is %d (%s)\n"
183                             % (v, (v and "sufficient randomness") or
184                                "insufficient randomness"))
185        if hasattr(ssl, 'RAND_egd'):
186            self.assertRaises(TypeError, ssl.RAND_egd, 1)
187            self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
188        ssl.RAND_add("this is a random string", 75.0)
189
190    def test_parse_cert(self):
191        # note that this uses an 'unofficial' function in _ssl.c,
192        # provided solely for this test, to exercise the certificate
193        # parsing code
194        p = ssl._ssl._test_decode_cert(CERTFILE)
195        if support.verbose:
196            sys.stdout.write("\n" + pprint.pformat(p) + "\n")
197        self.assertEqual(p['issuer'],
198                         ((('countryName', 'XY'),),
199                          (('localityName', 'Castle Anthrax'),),
200                          (('organizationName', 'Python Software Foundation'),),
201                          (('commonName', 'localhost'),))
202                        )
203        # Note the next three asserts will fail if the keys are regenerated
204        self.assertEqual(p['notAfter'], asn1time('Jan 17 19:09:06 2028 GMT'))
205        self.assertEqual(p['notBefore'], asn1time('Jan 19 19:09:06 2018 GMT'))
206        self.assertEqual(p['serialNumber'], 'F9BA076D5B6ABD9B')
207        self.assertEqual(p['subject'],
208                         ((('countryName', 'XY'),),
209                          (('localityName', 'Castle Anthrax'),),
210                          (('organizationName', 'Python Software Foundation'),),
211                          (('commonName', 'localhost'),))
212                        )
213        self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
214        # Issue #13034: the subjectAltName in some certificates
215        # (notably projects.developer.nokia.com:443) wasn't parsed
216        p = ssl._ssl._test_decode_cert(NOKIACERT)
217        if support.verbose:
218            sys.stdout.write("\n" + pprint.pformat(p) + "\n")
219        self.assertEqual(p['subjectAltName'],
220                         (('DNS', 'projects.developer.nokia.com'),
221                          ('DNS', 'projects.forum.nokia.com'))
222                        )
223        # extra OCSP and AIA fields
224        self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
225        self.assertEqual(p['caIssuers'],
226                         ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
227        self.assertEqual(p['crlDistributionPoints'],
228                         ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
229
230    def test_parse_cert_CVE_2013_4238(self):
231        p = ssl._ssl._test_decode_cert(NULLBYTECERT)
232        if support.verbose:
233            sys.stdout.write("\n" + pprint.pformat(p) + "\n")
234        subject = ((('countryName', 'US'),),
235                   (('stateOrProvinceName', 'Oregon'),),
236                   (('localityName', 'Beaverton'),),
237                   (('organizationName', 'Python Software Foundation'),),
238                   (('organizationalUnitName', 'Python Core Development'),),
239                   (('commonName', 'null.python.org\x00example.org'),),
240                   (('emailAddress', '[email protected]'),))
241        self.assertEqual(p['subject'], subject)
242        self.assertEqual(p['issuer'], subject)
243        if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
244            san = (('DNS', 'altnull.python.org\x00example.com'),
245                   ('email', '[email protected]\[email protected]'),
246                   ('URI', 'http://null.python.org\x00http://example.org'),
247                   ('IP Address', '192.0.2.1'),
248                   ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
249        else:
250            # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
251            san = (('DNS', 'altnull.python.org\x00example.com'),
252                   ('email', '[email protected]\[email protected]'),
253                   ('URI', 'http://null.python.org\x00http://example.org'),
254                   ('IP Address', '192.0.2.1'),
255                   ('IP Address', '<invalid>'))
256
257        self.assertEqual(p['subjectAltName'], san)
258
259    def test_parse_all_sans(self):
260        p = ssl._ssl._test_decode_cert(ALLSANFILE)
261        self.assertEqual(p['subjectAltName'],
262            (
263                ('DNS', 'allsans'),
264                ('othername', '<unsupported>'),
265                ('othername', '<unsupported>'),
266                ('email', '[email protected]'),
267                ('DNS', 'www.example.org'),
268                ('DirName',
269                    ((('countryName', 'XY'),),
270                    (('localityName', 'Castle Anthrax'),),
271                    (('organizationName', 'Python Software Foundation'),),
272                    (('commonName', 'dirname example'),))),
273                ('URI', 'https://www.python.org/'),
274                ('IP Address', '127.0.0.1'),
275                ('IP Address', '0:0:0:0:0:0:0:1\n'),
276                ('Registered ID', '1.2.3.4.5')
277            )
278        )
279
280    def test_DER_to_PEM(self):
281        with open(CAFILE_CACERT, 'r') as f:
282            pem = f.read()
283        d1 = ssl.PEM_cert_to_DER_cert(pem)
284        p2 = ssl.DER_cert_to_PEM_cert(d1)
285        d2 = ssl.PEM_cert_to_DER_cert(p2)
286        self.assertEqual(d1, d2)
287        if not p2.startswith(ssl.PEM_HEADER + '\n'):
288            self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
289        if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
290            self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
291
292    def test_openssl_version(self):
293        n = ssl.OPENSSL_VERSION_NUMBER
294        t = ssl.OPENSSL_VERSION_INFO
295        s = ssl.OPENSSL_VERSION
296        self.assertIsInstance(n, (int, long))
297        self.assertIsInstance(t, tuple)
298        self.assertIsInstance(s, str)
299        # Some sanity checks follow
300        # >= 0.9
301        self.assertGreaterEqual(n, 0x900000)
302        # < 3.0
303        self.assertLess(n, 0x30000000)
304        major, minor, fix, patch, status = t
305        self.assertGreaterEqual(major, 0)
306        self.assertLess(major, 3)
307        self.assertGreaterEqual(minor, 0)
308        self.assertLess(minor, 256)
309        self.assertGreaterEqual(fix, 0)
310        self.assertLess(fix, 256)
311        self.assertGreaterEqual(patch, 0)
312        self.assertLessEqual(patch, 63)
313        self.assertGreaterEqual(status, 0)
314        self.assertLessEqual(status, 15)
315        # Version string as returned by {Open,Libre}SSL, the format might change
316        if IS_LIBRESSL:
317            self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
318                            (s, t, hex(n)))
319        else:
320            self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
321                            (s, t))
322
323    @support.cpython_only
324    def test_refcycle(self):
325        # Issue #7943: an SSL object doesn't create reference cycles with
326        # itself.
327        s = socket.socket(socket.AF_INET)
328        ss = ssl.wrap_socket(s)
329        wr = weakref.ref(ss)
330        del ss
331        self.assertEqual(wr(), None)
332
333    def test_wrapped_unconnected(self):
334        # Methods on an unconnected SSLSocket propagate the original
335        # socket.error raise by the underlying socket object.
336        s = socket.socket(socket.AF_INET)
337        with closing(ssl.wrap_socket(s)) as ss:
338            self.assertRaises(socket.error, ss.recv, 1)
339            self.assertRaises(socket.error, ss.recv_into, bytearray(b'x'))
340            self.assertRaises(socket.error, ss.recvfrom, 1)
341            self.assertRaises(socket.error, ss.recvfrom_into, bytearray(b'x'), 1)
342            self.assertRaises(socket.error, ss.send, b'x')
343            self.assertRaises(socket.error, ss.sendto, b'x', ('0.0.0.0', 0))
344
345    def test_timeout(self):
346        # Issue #8524: when creating an SSL socket, the timeout of the
347        # original socket should be retained.
348        for timeout in (None, 0.0, 5.0):
349            s = socket.socket(socket.AF_INET)
350            s.settimeout(timeout)
351            with closing(ssl.wrap_socket(s)) as ss:
352                self.assertEqual(timeout, ss.gettimeout())
353
354    def test_errors(self):
355        sock = socket.socket()
356        self.assertRaisesRegexp(ValueError,
357                        "certfile must be specified",
358                        ssl.wrap_socket, sock, keyfile=CERTFILE)
359        self.assertRaisesRegexp(ValueError,
360                        "certfile must be specified for server-side operations",
361                        ssl.wrap_socket, sock, server_side=True)
362        self.assertRaisesRegexp(ValueError,
363                        "certfile must be specified for server-side operations",
364                        ssl.wrap_socket, sock, server_side=True, certfile="")
365        with closing(ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE)) as s:
366            self.assertRaisesRegexp(ValueError, "can't connect in server-side mode",
367                                    s.connect, (HOST, 8080))
368        with self.assertRaises(IOError) as cm:
369            with closing(socket.socket()) as sock:
370                ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
371        self.assertEqual(cm.exception.errno, errno.ENOENT)
372        with self.assertRaises(IOError) as cm:
373            with closing(socket.socket()) as sock:
374                ssl.wrap_socket(sock,
375                    certfile=CERTFILE, keyfile=NONEXISTINGCERT)
376        self.assertEqual(cm.exception.errno, errno.ENOENT)
377        with self.assertRaises(IOError) as cm:
378            with closing(socket.socket()) as sock:
379                ssl.wrap_socket(sock,
380                    certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
381        self.assertEqual(cm.exception.errno, errno.ENOENT)
382
383    def bad_cert_test(self, certfile):
384        """Check that trying to use the given client certificate fails"""
385        certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
386                                   certfile)
387        sock = socket.socket()
388        self.addCleanup(sock.close)
389        with self.assertRaises(ssl.SSLError):
390            ssl.wrap_socket(sock,
391                            certfile=certfile,
392                            ssl_version=ssl.PROTOCOL_TLSv1)
393
394    def test_empty_cert(self):
395        """Wrapping with an empty cert file"""
396        self.bad_cert_test("nullcert.pem")
397
398    def test_malformed_cert(self):
399        """Wrapping with a badly formatted certificate (syntax error)"""
400        self.bad_cert_test("badcert.pem")
401
402    def test_malformed_key(self):
403        """Wrapping with a badly formatted key (syntax error)"""
404        self.bad_cert_test("badkey.pem")
405
406    def test_match_hostname(self):
407        def ok(cert, hostname):
408            ssl.match_hostname(cert, hostname)
409        def fail(cert, hostname):
410            self.assertRaises(ssl.CertificateError,
411                              ssl.match_hostname, cert, hostname)
412
413        cert = {'subject': ((('commonName', 'example.com'),),)}
414        ok(cert, 'example.com')
415        ok(cert, 'ExAmple.cOm')
416        fail(cert, 'www.example.com')
417        fail(cert, '.example.com')
418        fail(cert, 'example.org')
419        fail(cert, 'exampleXcom')
420
421        cert = {'subject': ((('commonName', '*.a.com'),),)}
422        ok(cert, 'foo.a.com')
423        fail(cert, 'bar.foo.a.com')
424        fail(cert, 'a.com')
425        fail(cert, 'Xa.com')
426        fail(cert, '.a.com')
427
428        # only match one left-most wildcard
429        cert = {'subject': ((('commonName', 'f*.com'),),)}
430        ok(cert, 'foo.com')
431        ok(cert, 'f.com')
432        fail(cert, 'bar.com')
433        fail(cert, 'foo.a.com')
434        fail(cert, 'bar.foo.com')
435
436        # NULL bytes are bad, CVE-2013-4073
437        cert = {'subject': ((('commonName',
438                              'null.python.org\x00example.org'),),)}
439        ok(cert, 'null.python.org\x00example.org') # or raise an error?
440        fail(cert, 'example.org')
441        fail(cert, 'null.python.org')
442
443        # error cases with wildcards
444        cert = {'subject': ((('commonName', '*.*.a.com'),),)}
445        fail(cert, 'bar.foo.a.com')
446        fail(cert, 'a.com')
447        fail(cert, 'Xa.com')
448        fail(cert, '.a.com')
449
450        cert = {'subject': ((('commonName', 'a.*.com'),),)}
451        fail(cert, 'a.foo.com')
452        fail(cert, 'a..com')
453        fail(cert, 'a.com')
454
455        # wildcard doesn't match IDNA prefix 'xn--'
456        idna = u'püthon.python.org'.encode("idna").decode("ascii")
457        cert = {'subject': ((('commonName', idna),),)}
458        ok(cert, idna)
459        cert = {'subject': ((('commonName', 'x*.python.org'),),)}
460        fail(cert, idna)
461        cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
462        fail(cert, idna)
463
464        # wildcard in first fragment and  IDNA A-labels in sequent fragments
465        # are supported.
466        idna = u'www*.pythön.org'.encode("idna").decode("ascii")
467        cert = {'subject': ((('commonName', idna),),)}
468        ok(cert, u'www.pythön.org'.encode("idna").decode("ascii"))
469        ok(cert, u'www1.pythön.org'.encode("idna").decode("ascii"))
470        fail(cert, u'ftp.pythön.org'.encode("idna").decode("ascii"))
471        fail(cert, u'pythön.org'.encode("idna").decode("ascii"))
472
473        # Slightly fake real-world example
474        cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
475                'subject': ((('commonName', 'linuxfrz.org'),),),
476                'subjectAltName': (('DNS', 'linuxfr.org'),
477                                   ('DNS', 'linuxfr.com'),
478                                   ('othername', '<unsupported>'))}
479        ok(cert, 'linuxfr.org')
480        ok(cert, 'linuxfr.com')
481        # Not a "DNS" entry
482        fail(cert, '<unsupported>')
483        # When there is a subjectAltName, commonName isn't used
484        fail(cert, 'linuxfrz.org')
485
486        # A pristine real-world example
487        cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
488                'subject': ((('countryName', 'US'),),
489                            (('stateOrProvinceName', 'California'),),
490                            (('localityName', 'Mountain View'),),
491                            (('organizationName', 'Google Inc'),),
492                            (('commonName', 'mail.google.com'),))}
493        ok(cert, 'mail.google.com')
494        fail(cert, 'gmail.com')
495        # Only commonName is considered
496        fail(cert, 'California')
497
498        # Neither commonName nor subjectAltName
499        cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
500                'subject': ((('countryName', 'US'),),
501                            (('stateOrProvinceName', 'California'),),
502                            (('localityName', 'Mountain View'),),
503                            (('organizationName', 'Google Inc'),))}
504        fail(cert, 'mail.google.com')
505
506        # No DNS entry in subjectAltName but a commonName
507        cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
508                'subject': ((('countryName', 'US'),),
509                            (('stateOrProvinceName', 'California'),),
510                            (('localityName', 'Mountain View'),),
511                            (('commonName', 'mail.google.com'),)),
512                'subjectAltName': (('othername', 'blabla'), )}
513        ok(cert, 'mail.google.com')
514
515        # No DNS entry subjectAltName and no commonName
516        cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
517                'subject': ((('countryName', 'US'),),
518                            (('stateOrProvinceName', 'California'),),
519                            (('localityName', 'Mountain View'),),
520                            (('organizationName', 'Google Inc'),)),
521                'subjectAltName': (('othername', 'blabla'),)}
522        fail(cert, 'google.com')
523
524        # Empty cert / no cert
525        self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
526        self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
527
528        # Issue #17980: avoid denials of service by refusing more than one
529        # wildcard per fragment.
530        cert = {'subject': ((('commonName', 'a*b.com'),),)}
531        ok(cert, 'axxb.com')
532        cert = {'subject': ((('commonName', 'a*b.co*'),),)}
533        fail(cert, 'axxb.com')
534        cert = {'subject': ((('commonName', 'a*b*.com'),),)}
535        with self.assertRaises(ssl.CertificateError) as cm:
536            ssl.match_hostname(cert, 'axxbxxc.com')
537        self.assertIn("too many wildcards", str(cm.exception))
538
539    def test_server_side(self):
540        # server_hostname doesn't work for server sockets
541        ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
542        with closing(socket.socket()) as sock:
543            self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
544                              server_hostname="some.hostname")
545
546    def test_unknown_channel_binding(self):
547        # should raise ValueError for unknown type
548        s = socket.socket(socket.AF_INET)
549        with closing(ssl.wrap_socket(s)) as ss:
550            with self.assertRaises(ValueError):
551                ss.get_channel_binding("unknown-type")
552
553    @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
554                         "'tls-unique' channel binding not available")
555    def test_tls_unique_channel_binding(self):
556        # unconnected should return None for known type
557        s = socket.socket(socket.AF_INET)
558        with closing(ssl.wrap_socket(s)) as ss:
559            self.assertIsNone(ss.get_channel_binding("tls-unique"))
560        # the same for server-side
561        s = socket.socket(socket.AF_INET)
562        with closing(ssl.wrap_socket(s, server_side=True, certfile=CERTFILE)) as ss:
563            self.assertIsNone(ss.get_channel_binding("tls-unique"))
564
565    def test_get_default_verify_paths(self):
566        paths = ssl.get_default_verify_paths()
567        self.assertEqual(len(paths), 6)
568        self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
569
570        with support.EnvironmentVarGuard() as env:
571            env["SSL_CERT_DIR"] = CAPATH
572            env["SSL_CERT_FILE"] = CERTFILE
573            paths = ssl.get_default_verify_paths()
574            self.assertEqual(paths.cafile, CERTFILE)
575            self.assertEqual(paths.capath, CAPATH)
576
577    @unittest.skipUnless(sys.platform == "win32", "Windows specific")
578    def test_enum_certificates(self):
579        self.assertTrue(ssl.enum_certificates("CA"))
580        self.assertTrue(ssl.enum_certificates("ROOT"))
581
582        self.assertRaises(TypeError, ssl.enum_certificates)
583        self.assertRaises(WindowsError, ssl.enum_certificates, "")
584
585        trust_oids = set()
586        for storename in ("CA", "ROOT"):
587            store = ssl.enum_certificates(storename)
588            self.assertIsInstance(store, list)
589            for element in store:
590                self.assertIsInstance(element, tuple)
591                self.assertEqual(len(element), 3)
592                cert, enc, trust = element
593                self.assertIsInstance(cert, bytes)
594                self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
595                self.assertIsInstance(trust, (set, bool))
596                if isinstance(trust, set):
597                    trust_oids.update(trust)
598
599        serverAuth = "1.3.6.1.5.5.7.3.1"
600        self.assertIn(serverAuth, trust_oids)
601
602    @unittest.skipUnless(sys.platform == "win32", "Windows specific")
603    def test_enum_crls(self):
604        self.assertTrue(ssl.enum_crls("CA"))
605        self.assertRaises(TypeError, ssl.enum_crls)
606        self.assertRaises(WindowsError, ssl.enum_crls, "")
607
608        crls = ssl.enum_crls("CA")
609        self.assertIsInstance(crls, list)
610        for element in crls:
611            self.assertIsInstance(element, tuple)
612            self.assertEqual(len(element), 2)
613            self.assertIsInstance(element[0], bytes)
614            self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
615
616
617    def test_asn1object(self):
618        expected = (129, 'serverAuth', 'TLS Web Server Authentication',
619                    '1.3.6.1.5.5.7.3.1')
620
621        val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
622        self.assertEqual(val, expected)
623        self.assertEqual(val.nid, 129)
624        self.assertEqual(val.shortname, 'serverAuth')
625        self.assertEqual(val.longname, 'TLS Web Server Authentication')
626        self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
627        self.assertIsInstance(val, ssl._ASN1Object)
628        self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
629
630        val = ssl._ASN1Object.fromnid(129)
631        self.assertEqual(val, expected)
632        self.assertIsInstance(val, ssl._ASN1Object)
633        self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
634        with self.assertRaisesRegexp(ValueError, "unknown NID 100000"):
635            ssl._ASN1Object.fromnid(100000)
636        for i in range(1000):
637            try:
638                obj = ssl._ASN1Object.fromnid(i)
639            except ValueError:
640                pass
641            else:
642                self.assertIsInstance(obj.nid, int)
643                self.assertIsInstance(obj.shortname, str)
644                self.assertIsInstance(obj.longname, str)
645                self.assertIsInstance(obj.oid, (str, type(None)))
646
647        val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
648        self.assertEqual(val, expected)
649        self.assertIsInstance(val, ssl._ASN1Object)
650        self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
651        self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
652                         expected)
653        with self.assertRaisesRegexp(ValueError, "unknown object 'serverauth'"):
654            ssl._ASN1Object.fromname('serverauth')
655
656    def test_purpose_enum(self):
657        val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
658        self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
659        self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
660        self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
661        self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
662        self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
663                              '1.3.6.1.5.5.7.3.1')
664
665        val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
666        self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
667        self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
668        self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
669        self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
670        self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
671                              '1.3.6.1.5.5.7.3.2')
672
673    def test_unsupported_dtls(self):
674        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
675        self.addCleanup(s.close)
676        with self.assertRaises(NotImplementedError) as cx:
677            ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE)
678        self.assertEqual(str(cx.exception), "only stream sockets are supported")
679        ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
680        with self.assertRaises(NotImplementedError) as cx:
681            ctx.wrap_socket(s)
682        self.assertEqual(str(cx.exception), "only stream sockets are supported")
683
684    def cert_time_ok(self, timestring, timestamp):
685        self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
686
687    def cert_time_fail(self, timestring):
688        with self.assertRaises(ValueError):
689            ssl.cert_time_to_seconds(timestring)
690
691    @unittest.skipUnless(utc_offset(),
692                         'local time needs to be different from UTC')
693    def test_cert_time_to_seconds_timezone(self):
694        # Issue #19940: ssl.cert_time_to_seconds() returns wrong
695        #               results if local timezone is not UTC
696        self.cert_time_ok("May  9 00:00:00 2007 GMT", 1178668800.0)
697        self.cert_time_ok("Jan  5 09:34:43 2018 GMT", 1515144883.0)
698
699    def test_cert_time_to_seconds(self):
700        timestring = "Jan  5 09:34:43 2018 GMT"
701        ts = 1515144883.0
702        self.cert_time_ok(timestring, ts)
703        # accept keyword parameter, assert its name
704        self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
705        # accept both %e and %d (space or zero generated by strftime)
706        self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
707        # case-insensitive
708        self.cert_time_ok("JaN  5 09:34:43 2018 GmT", ts)
709        self.cert_time_fail("Jan  5 09:34 2018 GMT")     # no seconds
710        self.cert_time_fail("Jan  5 09:34:43 2018")      # no GMT
711        self.cert_time_fail("Jan  5 09:34:43 2018 UTC")  # not GMT timezone
712        self.cert_time_fail("Jan 35 09:34:43 2018 GMT")  # invalid day
713        self.cert_time_fail("Jon  5 09:34:43 2018 GMT")  # invalid month
714        self.cert_time_fail("Jan  5 24:00:00 2018 GMT")  # invalid hour
715        self.cert_time_fail("Jan  5 09:60:43 2018 GMT")  # invalid minute
716
717        newyear_ts = 1230768000.0
718        # leap seconds
719        self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
720        # same timestamp
721        self.cert_time_ok("Jan  1 00:00:00 2009 GMT", newyear_ts)
722
723        self.cert_time_ok("Jan  5 09:34:59 2018 GMT", 1515144899)
724        #  allow 60th second (even if it is not a leap second)
725        self.cert_time_ok("Jan  5 09:34:60 2018 GMT", 1515144900)
726        #  allow 2nd leap second for compatibility with time.strptime()
727        self.cert_time_ok("Jan  5 09:34:61 2018 GMT", 1515144901)
728        self.cert_time_fail("Jan  5 09:34:62 2018 GMT")  # invalid seconds
729
730        # no special treatement for the special value:
731        #   99991231235959Z (rfc 5280)
732        self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
733
734    @support.run_with_locale('LC_ALL', '')
735    def test_cert_time_to_seconds_locale(self):
736        # `cert_time_to_seconds()` should be locale independent
737
738        def local_february_name():
739            return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
740
741        if local_february_name().lower() == 'feb':
742            self.skipTest("locale-specific month name needs to be "
743                          "different from C locale")
744
745        # locale-independent
746        self.cert_time_ok("Feb  9 00:00:00 2007 GMT", 1170979200.0)
747        self.cert_time_fail(local_february_name() + "  9 00:00:00 2007 GMT")
748
749
750class ContextTests(unittest.TestCase):
751
752    @skip_if_broken_ubuntu_ssl
753    def test_constructor(self):
754        for protocol in PROTOCOLS:
755            ssl.SSLContext(protocol)
756        self.assertRaises(TypeError, ssl.SSLContext)
757        self.assertRaises(ValueError, ssl.SSLContext, -1)
758        self.assertRaises(ValueError, ssl.SSLContext, 42)
759
760    @skip_if_broken_ubuntu_ssl
761    def test_protocol(self):
762        for proto in PROTOCOLS:
763            ctx = ssl.SSLContext(proto)
764            self.assertEqual(ctx.protocol, proto)
765
766    def test_ciphers(self):
767        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
768        ctx.set_ciphers("ALL")
769        ctx.set_ciphers("DEFAULT")
770        with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
771            ctx.set_ciphers("^$:,;?*'dorothyx")
772
773    @skip_if_broken_ubuntu_ssl
774    def test_options(self):
775        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
776        # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
777        default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
778        if not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0):
779            default |= ssl.OP_NO_COMPRESSION
780        self.assertEqual(default, ctx.options)
781        ctx.options |= ssl.OP_NO_TLSv1
782        self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
783        if can_clear_options():
784            ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
785            self.assertEqual(default, ctx.options)
786            ctx.options = 0
787            # Ubuntu has OP_NO_SSLv3 forced on by default
788            self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
789        else:
790            with self.assertRaises(ValueError):
791                ctx.options = 0
792
793    def test_verify_mode(self):
794        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
795        # Default value
796        self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
797        ctx.verify_mode = ssl.CERT_OPTIONAL
798        self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
799        ctx.verify_mode = ssl.CERT_REQUIRED
800        self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
801        ctx.verify_mode = ssl.CERT_NONE
802        self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
803        with self.assertRaises(TypeError):
804            ctx.verify_mode = None
805        with self.assertRaises(ValueError):
806            ctx.verify_mode = 42
807
808    @unittest.skipUnless(have_verify_flags(),
809                         "verify_flags need OpenSSL > 0.9.8")
810    def test_verify_flags(self):
811        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
812        # default value
813        tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
814        self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
815        ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
816        self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
817        ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
818        self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
819        ctx.verify_flags = ssl.VERIFY_DEFAULT
820        self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
821        # supports any value
822        ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
823        self.assertEqual(ctx.verify_flags,
824                         ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
825        with self.assertRaises(TypeError):
826            ctx.verify_flags = None
827
828    def test_load_cert_chain(self):
829        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
830        # Combined key and cert in a single file
831        ctx.load_cert_chain(CERTFILE, keyfile=None)
832        ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
833        self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
834        with self.assertRaises(IOError) as cm:
835            ctx.load_cert_chain(NONEXISTINGCERT)
836        self.assertEqual(cm.exception.errno, errno.ENOENT)
837        with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
838            ctx.load_cert_chain(BADCERT)
839        with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
840            ctx.load_cert_chain(EMPTYCERT)
841        # Separate key and cert
842        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
843        ctx.load_cert_chain(ONLYCERT, ONLYKEY)
844        ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
845        ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
846        with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
847            ctx.load_cert_chain(ONLYCERT)
848        with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
849            ctx.load_cert_chain(ONLYKEY)
850        with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
851            ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
852        # Mismatching key and cert
853        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
854        with self.assertRaisesRegexp(ssl.SSLError, "key values mismatch"):
855            ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
856        # Password protected key and cert
857        ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
858        ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
859        ctx.load_cert_chain(CERTFILE_PROTECTED,
860                            password=bytearray(KEY_PASSWORD.encode()))
861        ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
862        ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
863        ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
864                            bytearray(KEY_PASSWORD.encode()))
865        with self.assertRaisesRegexp(TypeError, "should be a string"):
866            ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
867        with self.assertRaises(ssl.SSLError):
868            ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
869        with self.assertRaisesRegexp(ValueError, "cannot be longer"):
870            # openssl has a fixed limit on the password buffer.
871            # PEM_BUFSIZE is generally set to 1kb.
872            # Return a string larger than this.
873            ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
874        # Password callback
875        def getpass_unicode():
876            return KEY_PASSWORD
877        def getpass_bytes():
878            return KEY_PASSWORD.encode()
879        def getpass_bytearray():
880            return bytearray(KEY_PASSWORD.encode())
881        def getpass_badpass():
882            return "badpass"
883        def getpass_huge():
884            return b'a' * (1024 * 1024)
885        def getpass_bad_type():
886            return 9
887        def getpass_exception():
888            raise Exception('getpass error')
889        class GetPassCallable:
890            def __call__(self):
891                return KEY_PASSWORD
892            def getpass(self):
893                return KEY_PASSWORD
894        ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
895        ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
896        ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
897        ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
898        ctx.load_cert_chain(CERTFILE_PROTECTED,
899                            password=GetPassCallable().getpass)
900        with self.assertRaises(ssl.SSLError):
901            ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
902        with self.assertRaisesRegexp(ValueError, "cannot be longer"):
903            ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
904        with self.assertRaisesRegexp(TypeError, "must return a string"):
905            ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
906        with self.assertRaisesRegexp(Exception, "getpass error"):
907            ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
908        # Make sure the password function isn't called if it isn't needed
909        ctx.load_cert_chain(CERTFILE, password=getpass_exception)
910
911    def test_load_verify_locations(self):
912        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
913        ctx.load_verify_locations(CERTFILE)
914        ctx.load_verify_locations(cafile=CERTFILE, capath=None)
915        ctx.load_verify_locations(BYTES_CERTFILE)
916        ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
917        ctx.load_verify_locations(cafile=BYTES_CERTFILE.decode('utf-8'))
918        self.assertRaises(TypeError, ctx.load_verify_locations)
919        self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
920        with self.assertRaises(IOError) as cm:
921            ctx.load_verify_locations(NONEXISTINGCERT)
922        self.assertEqual(cm.exception.errno, errno.ENOENT)
923        with self.assertRaises(IOError):
924            ctx.load_verify_locations(u'')
925        with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
926            ctx.load_verify_locations(BADCERT)
927        ctx.load_verify_locations(CERTFILE, CAPATH)
928        ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
929
930        # Issue #10989: crash if the second argument type is invalid
931        self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
932
933    def test_load_verify_cadata(self):
934        # test cadata
935        with open(CAFILE_CACERT) as f:
936            cacert_pem = f.read().decode("ascii")
937        cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
938        with open(CAFILE_NEURONIO) as f:
939            neuronio_pem = f.read().decode("ascii")
940        neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
941
942        # test PEM
943        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
944        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
945        ctx.load_verify_locations(cadata=cacert_pem)
946        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
947        ctx.load_verify_locations(cadata=neuronio_pem)
948        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
949        # cert already in hash table
950        ctx.load_verify_locations(cadata=neuronio_pem)
951        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
952
953        # combined
954        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
955        combined = "\n".join((cacert_pem, neuronio_pem))
956        ctx.load_verify_locations(cadata=combined)
957        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
958
959        # with junk around the certs
960        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
961        combined = ["head", cacert_pem, "other", neuronio_pem, "again",
962                    neuronio_pem, "tail"]
963        ctx.load_verify_locations(cadata="\n".join(combined))
964        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
965
966        # test DER
967        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
968        ctx.load_verify_locations(cadata=cacert_der)
969        ctx.load_verify_locations(cadata=neuronio_der)
970        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
971        # cert already in hash table
972        ctx.load_verify_locations(cadata=cacert_der)
973        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
974
975        # combined
976        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
977        combined = b"".join((cacert_der, neuronio_der))
978        ctx.load_verify_locations(cadata=combined)
979        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
980
981        # error cases
982        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
983        self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
984
985        with self.assertRaisesRegexp(ssl.SSLError, "no start line"):
986            ctx.load_verify_locations(cadata=u"broken")
987        with self.assertRaisesRegexp(ssl.SSLError, "not enough data"):
988            ctx.load_verify_locations(cadata=b"broken")
989
990
991    def test_load_dh_params(self):
992        filename = u'dhpäräm.pem'
993        fs_encoding = sys.getfilesystemencoding()
994        try:
995            filename.encode(fs_encoding)
996        except UnicodeEncodeError:
997            self.skipTest("filename %r cannot be encoded to the filesystem encoding %r" % (filename, fs_encoding))
998
999        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1000        ctx.load_dh_params(DHFILE)
1001        if os.name != 'nt':
1002            ctx.load_dh_params(BYTES_DHFILE)
1003        self.assertRaises(TypeError, ctx.load_dh_params)
1004        self.assertRaises(TypeError, ctx.load_dh_params, None)
1005        with self.assertRaises(IOError) as cm:
1006            ctx.load_dh_params(NONEXISTINGCERT)
1007        self.assertEqual(cm.exception.errno, errno.ENOENT)
1008        with self.assertRaises(ssl.SSLError) as cm:
1009            ctx.load_dh_params(CERTFILE)
1010        with support.temp_dir() as d:
1011            fname = os.path.join(d, filename)
1012            shutil.copy(DHFILE, fname)
1013            ctx.load_dh_params(fname)
1014
1015    @skip_if_broken_ubuntu_ssl
1016    def test_session_stats(self):
1017        for proto in PROTOCOLS:
1018            ctx = ssl.SSLContext(proto)
1019            self.assertEqual(ctx.session_stats(), {
1020                'number': 0,
1021                'connect': 0,
1022                'connect_good': 0,
1023                'connect_renegotiate': 0,
1024                'accept': 0,
1025                'accept_good': 0,
1026                'accept_renegotiate': 0,
1027                'hits': 0,
1028                'misses': 0,
1029                'timeouts': 0,
1030                'cache_full': 0,
1031            })
1032
1033    def test_set_default_verify_paths(self):
1034        # There's not much we can do to test that it acts as expected,
1035        # so just check it doesn't crash or raise an exception.
1036        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1037        ctx.set_default_verify_paths()
1038
1039    @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
1040    def test_set_ecdh_curve(self):
1041        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1042        ctx.set_ecdh_curve("prime256v1")
1043        ctx.set_ecdh_curve(b"prime256v1")
1044        self.assertRaises(TypeError, ctx.set_ecdh_curve)
1045        self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1046        self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1047        self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1048
1049    @needs_sni
1050    def test_sni_callback(self):
1051        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1052
1053        # set_servername_callback expects a callable, or None
1054        self.assertRaises(TypeError, ctx.set_servername_callback)
1055        self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1056        self.assertRaises(TypeError, ctx.set_servername_callback, "")
1057        self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1058
1059        def dummycallback(sock, servername, ctx):
1060            pass
1061        ctx.set_servername_callback(None)
1062        ctx.set_servername_callback(dummycallback)
1063
1064    @needs_sni
1065    def test_sni_callback_refcycle(self):
1066        # Reference cycles through the servername callback are detected
1067        # and cleared.
1068        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1069        def dummycallback(sock, servername, ctx, cycle=ctx):
1070            pass
1071        ctx.set_servername_callback(dummycallback)
1072        wr = weakref.ref(ctx)
1073        del ctx, dummycallback
1074        gc.collect()
1075        self.assertIs(wr(), None)
1076
1077    def test_cert_store_stats(self):
1078        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1079        self.assertEqual(ctx.cert_store_stats(),
1080            {'x509_ca': 0, 'crl': 0, 'x509': 0})
1081        ctx.load_cert_chain(CERTFILE)
1082        self.assertEqual(ctx.cert_store_stats(),
1083            {'x509_ca': 0, 'crl': 0, 'x509': 0})
1084        ctx.load_verify_locations(CERTFILE)
1085        self.assertEqual(ctx.cert_store_stats(),
1086            {'x509_ca': 0, 'crl': 0, 'x509': 1})
1087        ctx.load_verify_locations(CAFILE_CACERT)
1088        self.assertEqual(ctx.cert_store_stats(),
1089            {'x509_ca': 1, 'crl': 0, 'x509': 2})
1090
1091    def test_get_ca_certs(self):
1092        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1093        self.assertEqual(ctx.get_ca_certs(), [])
1094        # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1095        ctx.load_verify_locations(CERTFILE)
1096        self.assertEqual(ctx.get_ca_certs(), [])
1097        # but CAFILE_CACERT is a CA cert
1098        ctx.load_verify_locations(CAFILE_CACERT)
1099        self.assertEqual(ctx.get_ca_certs(),
1100            [{'issuer': ((('organizationName', 'Root CA'),),
1101                         (('organizationalUnitName', 'http://www.cacert.org'),),
1102                         (('commonName', 'CA Cert Signing Authority'),),
1103                         (('emailAddress', '[email protected]'),)),
1104              'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1105              'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1106              'serialNumber': '00',
1107              'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
1108              'subject': ((('organizationName', 'Root CA'),),
1109                          (('organizationalUnitName', 'http://www.cacert.org'),),
1110                          (('commonName', 'CA Cert Signing Authority'),),
1111                          (('emailAddress', '[email protected]'),)),
1112              'version': 3}])
1113
1114        with open(CAFILE_CACERT) as f:
1115            pem = f.read()
1116        der = ssl.PEM_cert_to_DER_cert(pem)
1117        self.assertEqual(ctx.get_ca_certs(True), [der])
1118
1119    def test_load_default_certs(self):
1120        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1121        ctx.load_default_certs()
1122
1123        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1124        ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1125        ctx.load_default_certs()
1126
1127        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1128        ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1129
1130        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1131        self.assertRaises(TypeError, ctx.load_default_certs, None)
1132        self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1133
1134    @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
1135    @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
1136    def test_load_default_certs_env(self):
1137        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1138        with support.EnvironmentVarGuard() as env:
1139            env["SSL_CERT_DIR"] = CAPATH
1140            env["SSL_CERT_FILE"] = CERTFILE
1141            ctx.load_default_certs()
1142            self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1143
1144    @unittest.skipUnless(sys.platform == "win32", "Windows specific")
1145    def test_load_default_certs_env_windows(self):
1146        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1147        ctx.load_default_certs()
1148        stats = ctx.cert_store_stats()
1149
1150        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1151        with support.EnvironmentVarGuard() as env:
1152            env["SSL_CERT_DIR"] = CAPATH
1153            env["SSL_CERT_FILE"] = CERTFILE
1154            ctx.load_default_certs()
1155            stats["x509"] += 1
1156            self.assertEqual(ctx.cert_store_stats(), stats)
1157
1158    def test_create_default_context(self):
1159        ctx = ssl.create_default_context()
1160        self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1161        self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1162        self.assertTrue(ctx.check_hostname)
1163        self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1164        self.assertEqual(
1165            ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1166            getattr(ssl, "OP_NO_COMPRESSION", 0),
1167        )
1168
1169        with open(SIGNING_CA) as f:
1170            cadata = f.read().decode("ascii")
1171        ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1172                                         cadata=cadata)
1173        self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1174        self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1175        self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1176        self.assertEqual(
1177            ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1178            getattr(ssl, "OP_NO_COMPRESSION", 0),
1179        )
1180
1181        ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
1182        self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1183        self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1184        self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1185        self.assertEqual(
1186            ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1187            getattr(ssl, "OP_NO_COMPRESSION", 0),
1188        )
1189        self.assertEqual(
1190            ctx.options & getattr(ssl, "OP_SINGLE_DH_USE", 0),
1191            getattr(ssl, "OP_SINGLE_DH_USE", 0),
1192        )
1193        self.assertEqual(
1194            ctx.options & getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1195            getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1196        )
1197
1198    def test__create_stdlib_context(self):
1199        ctx = ssl._create_stdlib_context()
1200        self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1201        self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1202        self.assertFalse(ctx.check_hostname)
1203        self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1204
1205        ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1206        self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1207        self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1208        self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1209
1210        ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
1211                                         cert_reqs=ssl.CERT_REQUIRED,
1212                                         check_hostname=True)
1213        self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1214        self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1215        self.assertTrue(ctx.check_hostname)
1216        self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1217
1218        ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
1219        self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1220        self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1221        self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1222
1223    def test__https_verify_certificates(self):
1224        # Unit test to check the contect factory mapping
1225        # The factories themselves are tested above
1226        # This test will fail by design if run under PYTHONHTTPSVERIFY=0
1227        # (as will various test_httplib tests)
1228
1229        # Uses a fresh SSL module to avoid affecting the real one
1230        local_ssl = support.import_fresh_module("ssl")
1231        # Certificate verification is enabled by default
1232        self.assertIs(local_ssl._create_default_https_context,
1233                      local_ssl.create_default_context)
1234        # Turn default verification off
1235        local_ssl._https_verify_certificates(enable=False)
1236        self.assertIs(local_ssl._create_default_https_context,
1237                      local_ssl._create_unverified_context)
1238        # And back on
1239        local_ssl._https_verify_certificates(enable=True)
1240        self.assertIs(local_ssl._create_default_https_context,
1241                      local_ssl.create_default_context)
1242        # The default behaviour is to enable
1243        local_ssl._https_verify_certificates(enable=False)
1244        local_ssl._https_verify_certificates()
1245        self.assertIs(local_ssl._create_default_https_context,
1246                      local_ssl.create_default_context)
1247
1248    def test__https_verify_envvar(self):
1249        # Unit test to check the PYTHONHTTPSVERIFY handling
1250        # Need to use a subprocess so it can still be run under -E
1251        https_is_verified = """import ssl, sys; \
1252            status = "Error: _create_default_https_context does not verify certs" \
1253                       if ssl._create_default_https_context is \
1254                          ssl._create_unverified_context \
1255                     else None; \
1256            sys.exit(status)"""
1257        https_is_not_verified = """import ssl, sys; \
1258            status = "Error: _create_default_https_context verifies certs" \
1259                       if ssl._create_default_https_context is \
1260                          ssl.create_default_context \
1261                     else None; \
1262            sys.exit(status)"""
1263        extra_env = {}
1264        # Omitting it leaves verification on
1265        assert_python_ok("-c", https_is_verified, **extra_env)
1266        # Setting it to zero turns verification off
1267        extra_env[ssl._https_verify_envvar] = "0"
1268        assert_python_ok("-c", https_is_not_verified, **extra_env)
1269        # Any other value should also leave it on
1270        for setting in ("", "1", "enabled", "foo"):
1271            extra_env[ssl._https_verify_envvar] = setting
1272            assert_python_ok("-c", https_is_verified, **extra_env)
1273
1274    def test_check_hostname(self):
1275        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1276        self.assertFalse(ctx.check_hostname)
1277
1278        # Requires CERT_REQUIRED or CERT_OPTIONAL
1279        with self.assertRaises(ValueError):
1280            ctx.check_hostname = True
1281        ctx.verify_mode = ssl.CERT_REQUIRED
1282        self.assertFalse(ctx.check_hostname)
1283        ctx.check_hostname = True
1284        self.assertTrue(ctx.check_hostname)
1285
1286        ctx.verify_mode = ssl.CERT_OPTIONAL
1287        ctx.check_hostname = True
1288        self.assertTrue(ctx.check_hostname)
1289
1290        # Cannot set CERT_NONE with check_hostname enabled
1291        with self.assertRaises(ValueError):
1292            ctx.verify_mode = ssl.CERT_NONE
1293        ctx.check_hostname = False
1294        self.assertFalse(ctx.check_hostname)
1295
1296
1297class SSLErrorTests(unittest.TestCase):
1298
1299    def test_str(self):
1300        # The str() of a SSLError doesn't include the errno
1301        e = ssl.SSLError(1, "foo")
1302        self.assertEqual(str(e), "foo")
1303        self.assertEqual(e.errno, 1)
1304        # Same for a subclass
1305        e = ssl.SSLZeroReturnError(1, "foo")
1306        self.assertEqual(str(e), "foo")
1307        self.assertEqual(e.errno, 1)
1308
1309    def test_lib_reason(self):
1310        # Test the library and reason attributes
1311        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1312        with self.assertRaises(ssl.SSLError) as cm:
1313            ctx.load_dh_params(CERTFILE)
1314        self.assertEqual(cm.exception.library, 'PEM')
1315        self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1316        s = str(cm.exception)
1317        self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1318
1319    def test_subclass(self):
1320        # Check that the appropriate SSLError subclass is raised
1321        # (this only tests one of them)
1322        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1323        with closing(socket.socket()) as s:
1324            s.bind(("127.0.0.1", 0))
1325            s.listen(5)
1326            c = socket.socket()
1327            c.connect(s.getsockname())
1328            c.setblocking(False)
1329            with closing(ctx.wrap_socket(c, False, do_handshake_on_connect=False)) as c:
1330                with self.assertRaises(ssl.SSLWantReadError) as cm:
1331                    c.do_handshake()
1332                s = str(cm.exception)
1333                self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1334                # For compatibility
1335                self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1336
1337
1338class NetworkedTests(unittest.TestCase):
1339
1340    def test_connect(self):
1341        with support.transient_internet(REMOTE_HOST):
1342            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1343                                cert_reqs=ssl.CERT_NONE)
1344            try:
1345                s.connect((REMOTE_HOST, 443))
1346                self.assertEqual({}, s.getpeercert())
1347            finally:
1348                s.close()
1349
1350            # this should fail because we have no verification certs
1351            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1352                                cert_reqs=ssl.CERT_REQUIRED)
1353            self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
1354                                   s.connect, (REMOTE_HOST, 443))
1355            s.close()
1356
1357            # this should succeed because we specify the root cert
1358            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1359                                cert_reqs=ssl.CERT_REQUIRED,
1360                                ca_certs=REMOTE_ROOT_CERT)
1361            try:
1362                s.connect((REMOTE_HOST, 443))
1363                self.assertTrue(s.getpeercert())
1364            finally:
1365                s.close()
1366
1367    def test_connect_ex(self):
1368        # Issue #11326: check connect_ex() implementation
1369        with support.transient_internet(REMOTE_HOST):
1370            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1371                                cert_reqs=ssl.CERT_REQUIRED,
1372                                ca_certs=REMOTE_ROOT_CERT)
1373            try:
1374                self.assertEqual(0, s.connect_ex((REMOTE_HOST, 443)))
1375                self.assertTrue(s.getpeercert())
1376            finally:
1377                s.close()
1378
1379    def test_non_blocking_connect_ex(self):
1380        # Issue #11326: non-blocking connect_ex() should allow handshake
1381        # to proceed after the socket gets ready.
1382        with support.transient_internet(REMOTE_HOST):
1383            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1384                                cert_reqs=ssl.CERT_REQUIRED,
1385                                ca_certs=REMOTE_ROOT_CERT,
1386                                do_handshake_on_connect=False)
1387            try:
1388                s.setblocking(False)
1389                rc = s.connect_ex((REMOTE_HOST, 443))
1390                # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1391                self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1392                # Wait for connect to finish
1393                select.select([], [s], [], 5.0)
1394                # Non-blocking handshake
1395                while True:
1396                    try:
1397                        s.do_handshake()
1398                        break
1399                    except ssl.SSLWantReadError:
1400                        select.select([s], [], [], 5.0)
1401                    except ssl.SSLWantWriteError:
1402                        select.select([], [s], [], 5.0)
1403                # SSL established
1404                self.assertTrue(s.getpeercert())
1405            finally:
1406                s.close()
1407
1408    def test_timeout_connect_ex(self):
1409        # Issue #12065: on a timeout, connect_ex() should return the original
1410        # errno (mimicking the behaviour of non-SSL sockets).
1411        with support.transient_internet(REMOTE_HOST):
1412            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1413                                cert_reqs=ssl.CERT_REQUIRED,
1414                                ca_certs=REMOTE_ROOT_CERT,
1415                                do_handshake_on_connect=False)
1416            try:
1417                s.settimeout(0.0000001)
1418                rc = s.connect_ex((REMOTE_HOST, 443))
1419                if rc == 0:
1420                    self.skipTest("REMOTE_HOST responded too quickly")
1421                self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
1422            finally:
1423                s.close()
1424
1425    def test_connect_ex_error(self):
1426        with support.transient_internet(REMOTE_HOST):
1427            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1428                                cert_reqs=ssl.CERT_REQUIRED,
1429                                ca_certs=REMOTE_ROOT_CERT)
1430            try:
1431                rc = s.connect_ex((REMOTE_HOST, 444))
1432                # Issue #19919: Windows machines or VMs hosted on Windows
1433                # machines sometimes return EWOULDBLOCK.
1434                errors = (
1435                    errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
1436                    errno.EWOULDBLOCK,
1437                )
1438                self.assertIn(rc, errors)
1439            finally:
1440                s.close()
1441
1442    def test_connect_with_context(self):
1443        with support.transient_internet(REMOTE_HOST):
1444            # Same as test_connect, but with a separately created context
1445            ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1446            s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1447            s.connect((REMOTE_HOST, 443))
1448            try:
1449                self.assertEqual({}, s.getpeercert())
1450            finally:
1451                s.close()
1452            # Same with a server hostname
1453            s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1454                                server_hostname=REMOTE_HOST)
1455            s.connect((REMOTE_HOST, 443))
1456            s.close()
1457            # This should fail because we have no verification certs
1458            ctx.verify_mode = ssl.CERT_REQUIRED
1459            s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1460            self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
1461                                    s.connect, (REMOTE_HOST, 443))
1462            s.close()
1463            # This should succeed because we specify the root cert
1464            ctx.load_verify_locations(REMOTE_ROOT_CERT)
1465            s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1466            s.connect((REMOTE_HOST, 443))
1467            try:
1468                cert = s.getpeercert()
1469                self.assertTrue(cert)
1470            finally:
1471                s.close()
1472
1473    def test_connect_capath(self):
1474        # Verify server certificates using the `capath` argument
1475        # NOTE: the subject hashing algorithm has been changed between
1476        # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1477        # contain both versions of each certificate (same content, different
1478        # filename) for this test to be portable across OpenSSL releases.
1479        with support.transient_internet(REMOTE_HOST):
1480            ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1481            ctx.verify_mode = ssl.CERT_REQUIRED
1482            ctx.load_verify_locations(capath=CAPATH)
1483            s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1484            s.connect((REMOTE_HOST, 443))
1485            try:
1486                cert = s.getpeercert()
1487                self.assertTrue(cert)
1488            finally:
1489                s.close()
1490            # Same with a bytes `capath` argument
1491            ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1492            ctx.verify_mode = ssl.CERT_REQUIRED
1493            ctx.load_verify_locations(capath=BYTES_CAPATH)
1494            s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1495            s.connect((REMOTE_HOST, 443))
1496            try:
1497                cert = s.getpeercert()
1498                self.assertTrue(cert)
1499            finally:
1500                s.close()
1501
1502    def test_connect_cadata(self):
1503        with open(REMOTE_ROOT_CERT) as f:
1504            pem = f.read().decode('ascii')
1505        der = ssl.PEM_cert_to_DER_cert(pem)
1506        with support.transient_internet(REMOTE_HOST):
1507            ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1508            ctx.verify_mode = ssl.CERT_REQUIRED
1509            ctx.load_verify_locations(cadata=pem)
1510            with closing(ctx.wrap_socket(socket.socket(socket.AF_INET))) as s:
1511                s.connect((REMOTE_HOST, 443))
1512                cert = s.getpeercert()
1513                self.assertTrue(cert)
1514
1515            # same with DER
1516            ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1517            ctx.verify_mode = ssl.CERT_REQUIRED
1518            ctx.load_verify_locations(cadata=der)
1519            with closing(ctx.wrap_socket(socket.socket(socket.AF_INET))) as s:
1520                s.connect((REMOTE_HOST, 443))
1521                cert = s.getpeercert()
1522                self.assertTrue(cert)
1523
1524    @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1525    def test_makefile_close(self):
1526        # Issue #5238: creating a file-like object with makefile() shouldn't
1527        # delay closing the underlying "real socket" (here tested with its
1528        # file descriptor, hence skipping the test under Windows).
1529        with support.transient_internet(REMOTE_HOST):
1530            ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
1531            ss.connect((REMOTE_HOST, 443))
1532            fd = ss.fileno()
1533            f = ss.makefile()
1534            f.close()
1535            # The fd is still open
1536            os.read(fd, 0)
1537            # Closing the SSL socket should close the fd too
1538            ss.close()
1539            gc.collect()
1540            with self.assertRaises(OSError) as e:
1541                os.read(fd, 0)
1542            self.assertEqual(e.exception.errno, errno.EBADF)
1543
1544    def test_non_blocking_handshake(self):
1545        with support.transient_internet(REMOTE_HOST):
1546            s = socket.socket(socket.AF_INET)
1547            s.connect((REMOTE_HOST, 443))
1548            s.setblocking(False)
1549            s = ssl.wrap_socket(s,
1550                                cert_reqs=ssl.CERT_NONE,
1551                                do_handshake_on_connect=False)
1552            count = 0
1553            while True:
1554                try:
1555                    count += 1
1556                    s.do_handshake()
1557                    break
1558                except ssl.SSLWantReadError:
1559                    select.select([s], [], [])
1560                except ssl.SSLWantWriteError:
1561                    select.select([], [s], [])
1562            s.close()
1563            if support.verbose:
1564                sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
1565
1566    def test_get_server_certificate(self):
1567        def _test_get_server_certificate(host, port, cert=None):
1568            with support.transient_internet(host):
1569                pem = ssl.get_server_certificate((host, port))
1570                if not pem:
1571                    self.fail("No server certificate on %s:%s!" % (host, port))
1572
1573                try:
1574                    pem = ssl.get_server_certificate((host, port),
1575                                                     ca_certs=CERTFILE)
1576                except ssl.SSLError as x:
1577                    #should fail
1578                    if support.verbose:
1579                        sys.stdout.write("%s\n" % x)
1580                else:
1581                    self.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
1582                pem = ssl.get_server_certificate((host, port),
1583                                                 ca_certs=cert)
1584                if not pem:
1585                    self.fail("No server certificate on %s:%s!" % (host, port))
1586                if support.verbose:
1587                    sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
1588
1589        _test_get_server_certificate(REMOTE_HOST, 443, REMOTE_ROOT_CERT)
1590        if support.IPV6_ENABLED:
1591            _test_get_server_certificate('ipv6.google.com', 443)
1592
1593    def test_ciphers(self):
1594        remote = (REMOTE_HOST, 443)
1595        with support.transient_internet(remote[0]):
1596            with closing(ssl.wrap_socket(socket.socket(socket.AF_INET),
1597                                         cert_reqs=ssl.CERT_NONE, ciphers="ALL")) as s:
1598                s.connect(remote)
1599            with closing(ssl.wrap_socket(socket.socket(socket.AF_INET),
1600                                         cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT")) as s:
1601                s.connect(remote)
1602            # Error checking can happen at instantiation or when connecting
1603            with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
1604                with closing(socket.socket(socket.AF_INET)) as sock:
1605                    s = ssl.wrap_socket(sock,
1606                                        cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1607                    s.connect(remote)
1608
1609    def test_get_ca_certs_capath(self):
1610        # capath certs are loaded on request
1611        with support.transient_internet(REMOTE_HOST):
1612            ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1613            ctx.verify_mode = ssl.CERT_REQUIRED
1614            ctx.load_verify_locations(capath=CAPATH)
1615            self.assertEqual(ctx.get_ca_certs(), [])
1616            s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1617            s.connect((REMOTE_HOST, 443))
1618            try:
1619                cert = s.getpeercert()
1620                self.assertTrue(cert)
1621            finally:
1622                s.close()
1623            self.assertEqual(len(ctx.get_ca_certs()), 1)
1624
1625    @needs_sni
1626    def test_context_setget(self):
1627        # Check that the context of a connected socket can be replaced.
1628        with support.transient_internet(REMOTE_HOST):
1629            ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1630            ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1631            s = socket.socket(socket.AF_INET)
1632            with closing(ctx1.wrap_socket(s)) as ss:
1633                ss.connect((REMOTE_HOST, 443))
1634                self.assertIs(ss.context, ctx1)
1635                self.assertIs(ss._sslobj.context, ctx1)
1636                ss.context = ctx2
1637                self.assertIs(ss.context, ctx2)
1638                self.assertIs(ss._sslobj.context, ctx2)
1639
1640try:
1641    import threading
1642except ImportError:
1643    _have_threads = False
1644else:
1645    _have_threads = True
1646
1647    from test.ssl_servers import make_https_server
1648
1649    class ThreadedEchoServer(threading.Thread):
1650
1651        class ConnectionHandler(threading.Thread):
1652
1653            """A mildly complicated class, because we want it to work both
1654            with and without the SSL wrapper around the socket connection, so
1655            that we can test the STARTTLS functionality."""
1656
1657            def __init__(self, server, connsock, addr):
1658                self.server = server
1659                self.running = False
1660                self.sock = connsock
1661                self.addr = addr
1662                self.sock.setblocking(1)
1663                self.sslconn = None
1664                threading.Thread.__init__(self)
1665                self.daemon = True
1666
1667            def wrap_conn(self):
1668                try:
1669                    self.sslconn = self.server.context.wrap_socket(
1670                        self.sock, server_side=True)
1671                    self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
1672                    self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
1673                except socket.error as e:
1674                    # We treat ConnectionResetError as though it were an
1675                    # SSLError - OpenSSL on Ubuntu abruptly closes the
1676                    # connection when asked to use an unsupported protocol.
1677                    #
1678                    # XXX Various errors can have happened here, for example
1679                    # a mismatching protocol version, an invalid certificate,
1680                    # or a low-level bug. This should be made more discriminating.
1681                    if not isinstance(e, ssl.SSLError) and e.errno != errno.ECONNRESET:
1682                        raise
1683                    self.server.conn_errors.append(e)
1684                    if self.server.chatty:
1685                        handle_error("\n server:  bad connection attempt from " + repr(self.addr) + ":\n")
1686                    self.running = False
1687                    self.server.stop()
1688                    self.close()
1689                    return False
1690                else:
1691                    if self.server.context.verify_mode == ssl.CERT_REQUIRED:
1692                        cert = self.sslconn.getpeercert()
1693                        if support.verbose and self.server.chatty:
1694                            sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
1695                        cert_binary = self.sslconn.getpeercert(True)
1696                        if support.verbose and self.server.chatty:
1697                            sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
1698                    cipher = self.sslconn.cipher()
1699                    if support.verbose and self.server.chatty:
1700                        sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
1701                        sys.stdout.write(" server: selected protocol is now "
1702                                + str(self.sslconn.selected_npn_protocol()) + "\n")
1703                    return True
1704
1705            def read(self):
1706                if self.sslconn:
1707                    return self.sslconn.read()
1708                else:
1709                    return self.sock.recv(1024)
1710
1711            def write(self, bytes):
1712                if self.sslconn:
1713                    return self.sslconn.write(bytes)
1714                else:
1715                    return self.sock.send(bytes)
1716
1717            def close(self):
1718                if self.sslconn:
1719                    self.sslconn.close()
1720                else:
1721                    self.sock.close()
1722
1723            def run(self):
1724                self.running = True
1725                if not self.server.starttls_server:
1726                    if not self.wrap_conn():
1727                        return
1728                while self.running:
1729                    try:
1730                        msg = self.read()
1731                        stripped = msg.strip()
1732                        if not stripped:
1733                            # eof, so quit this handler
1734                            self.running = False
1735                            self.close()
1736                        elif stripped == b'over':
1737                            if support.verbose and self.server.connectionchatty:
1738                                sys.stdout.write(" server: client closed connection\n")
1739                            self.close()
1740                            return
1741                        elif (self.server.starttls_server and
1742                              stripped == b'STARTTLS'):
1743                            if support.verbose and self.server.connectionchatty:
1744                                sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
1745                            self.write(b"OK\n")
1746                            if not self.wrap_conn():
1747                                return
1748                        elif (self.server.starttls_server and self.sslconn
1749                              and stripped == b'ENDTLS'):
1750                            if support.verbose and self.server.connectionchatty:
1751                                sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
1752                            self.write(b"OK\n")
1753                            self.sock = self.sslconn.unwrap()
1754                            self.sslconn = None
1755                            if support.verbose and self.server.connectionchatty:
1756                                sys.stdout.write(" server: connection is now unencrypted...\n")
1757                        elif stripped == b'CB tls-unique':
1758                            if support.verbose and self.server.connectionchatty:
1759                                sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
1760                            data = self.sslconn.get_channel_binding("tls-unique")
1761                            self.write(repr(data).encode("us-ascii") + b"\n")
1762                        else:
1763                            if (support.verbose and
1764                                self.server.connectionchatty):
1765                                ctype = (self.sslconn and "encrypted") or "unencrypted"
1766                                sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
1767                                                 % (msg, ctype, msg.lower(), ctype))
1768                            self.write(msg.lower())
1769                    except ssl.SSLError:
1770                        if self.server.chatty:
1771                            handle_error("Test server failure:\n")
1772                        self.close()
1773                        self.running = False
1774                        # normally, we'd just stop here, but for the test
1775                        # harness, we want to stop the server
1776                        self.server.stop()
1777
1778        def __init__(self, certificate=None, ssl_version=None,
1779                     certreqs=None, cacerts=None,
1780                     chatty=True, connectionchatty=False, starttls_server=False,
1781                     npn_protocols=None, alpn_protocols=None,
1782                     ciphers=None, context=None):
1783            if context:
1784                self.context = context
1785            else:
1786                self.context = ssl.SSLContext(ssl_version
1787                                              if ssl_version is not None
1788                                              else ssl.PROTOCOL_TLS)
1789                self.context.verify_mode = (certreqs if certreqs is not None
1790                                            else ssl.CERT_NONE)
1791                if cacerts:
1792                    self.context.load_verify_locations(cacerts)
1793                if certificate:
1794                    self.context.load_cert_chain(certificate)
1795                if npn_protocols:
1796                    self.context.set_npn_protocols(npn_protocols)
1797                if alpn_protocols:
1798                    self.context.set_alpn_protocols(alpn_protocols)
1799                if ciphers:
1800                    self.context.set_ciphers(ciphers)
1801            self.chatty = chatty
1802            self.connectionchatty = connectionchatty
1803            self.starttls_server = starttls_server
1804            self.sock = socket.socket()
1805            self.port = support.bind_port(self.sock)
1806            self.flag = None
1807            self.active = False
1808            self.selected_npn_protocols = []
1809            self.selected_alpn_protocols = []
1810            self.conn_errors = []
1811            threading.Thread.__init__(self)
1812            self.daemon = True
1813
1814        def __enter__(self):
1815            self.start(threading.Event())
1816            self.flag.wait()
1817            return self
1818
1819        def __exit__(self, *args):
1820            self.stop()
1821            self.join()
1822
1823        def start(self, flag=None):
1824            self.flag = flag
1825            threading.Thread.start(self)
1826
1827        def run(self):
1828            self.sock.settimeout(0.05)
1829            self.sock.listen(5)
1830            self.active = True
1831            if self.flag:
1832                # signal an event
1833                self.flag.set()
1834            while self.active:
1835                try:
1836                    newconn, connaddr = self.sock.accept()
1837                    if support.verbose and self.chatty:
1838                        sys.stdout.write(' server:  new connection from '
1839                                         + repr(connaddr) + '\n')
1840                    handler = self.ConnectionHandler(self, newconn, connaddr)
1841                    handler.start()
1842                    handler.join()
1843                except socket.timeout:
1844                    pass
1845                except KeyboardInterrupt:
1846                    self.stop()
1847            self.sock.close()
1848
1849        def stop(self):
1850            self.active = False
1851
1852    class AsyncoreEchoServer(threading.Thread):
1853
1854        class EchoServer(asyncore.dispatcher):
1855
1856            class ConnectionHandler(asyncore.dispatcher_with_send):
1857
1858                def __init__(self, conn, certfile):
1859                    self.socket = ssl.wrap_socket(conn, server_side=True,
1860                                                  certfile=certfile,
1861                                                  do_handshake_on_connect=False)
1862                    asyncore.dispatcher_with_send.__init__(self, self.socket)
1863                    self._ssl_accepting = True
1864                    self._do_ssl_handshake()
1865
1866                def readable(self):
1867                    if isinstance(self.socket, ssl.SSLSocket):
1868                        while self.socket.pending() > 0:
1869                            self.handle_read_event()
1870                    return True
1871
1872                def _do_ssl_handshake(self):
1873                    try:
1874                        self.socket.do_handshake()
1875                    except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
1876                        return
1877                    except ssl.SSLEOFError:
1878                        return self.handle_close()
1879                    except ssl.SSLError:
1880                        raise
1881                    except socket.error, err:
1882                        if err.args[0] == errno.ECONNABORTED:
1883                            return self.handle_close()
1884                    else:
1885                        self._ssl_accepting = False
1886
1887                def handle_read(self):
1888                    if self._ssl_accepting:
1889                        self._do_ssl_handshake()
1890                    else:
1891                        data = self.recv(1024)
1892                        if support.verbose:
1893                            sys.stdout.write(" server:  read %s from client\n" % repr(data))
1894                        if not data:
1895                            self.close()
1896                        else:
1897                            self.send(data.lower())
1898
1899                def handle_close(self):
1900                    self.close()
1901                    if support.verbose:
1902                        sys.stdout.write(" server:  closed connection %s\n" % self.socket)
1903
1904                def handle_error(self):
1905                    raise
1906
1907            def __init__(self, certfile):
1908                self.certfile = certfile
1909                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1910                self.port = support.bind_port(sock, '')
1911                asyncore.dispatcher.__init__(self, sock)
1912                self.listen(5)
1913
1914            def handle_accept(self):
1915                sock_obj, addr = self.accept()
1916                if support.verbose:
1917                    sys.stdout.write(" server:  new connection from %s:%s\n" %addr)
1918                self.ConnectionHandler(sock_obj, self.certfile)
1919
1920            def handle_error(self):
1921                raise
1922
1923        def __init__(self, certfile):
1924            self.flag = None
1925            self.active = False
1926            self.server = self.EchoServer(certfile)
1927            self.port = self.server.port
1928            threading.Thread.__init__(self)
1929            self.daemon = True
1930
1931        def __str__(self):
1932            return "<%s %s>" % (self.__class__.__name__, self.server)
1933
1934        def __enter__(self):
1935            self.start(threading.Event())
1936            self.flag.wait()
1937            return self
1938
1939        def __exit__(self, *args):
1940            if support.verbose:
1941                sys.stdout.write(" cleanup: stopping server.\n")
1942            self.stop()
1943            if support.verbose:
1944                sys.stdout.write(" cleanup: joining server thread.\n")
1945            self.join()
1946            if support.verbose:
1947                sys.stdout.write(" cleanup: successfully joined.\n")
1948            # make sure that ConnectionHandler is removed from socket_map
1949            asyncore.close_all(ignore_all=True)
1950
1951        def start(self, flag=None):
1952            self.flag = flag
1953            threading.Thread.start(self)
1954
1955        def run(self):
1956            self.active = True
1957            if self.flag:
1958                self.flag.set()
1959            while self.active:
1960                try:
1961                    asyncore.loop(1)
1962                except:
1963                    pass
1964
1965        def stop(self):
1966            self.active = False
1967            self.server.close()
1968
1969    def server_params_test(client_context, server_context, indata=b"FOO\n",
1970                           chatty=True, connectionchatty=False, sni_name=None):
1971        """
1972        Launch a server, connect a client to it and try various reads
1973        and writes.
1974        """
1975        stats = {}
1976        server = ThreadedEchoServer(context=server_context,
1977                                    chatty=chatty,
1978                                    connectionchatty=False)
1979        with server:
1980            with closing(client_context.wrap_socket(socket.socket(),
1981                    server_hostname=sni_name)) as s:
1982                s.connect((HOST, server.port))
1983                for arg in [indata, bytearray(indata), memoryview(indata)]:
1984                    if connectionchatty:
1985                        if support.verbose:
1986                            sys.stdout.write(
1987                                " client:  sending %r...\n" % indata)
1988                    s.write(arg)
1989                    outdata = s.read()
1990                    if connectionchatty:
1991                        if support.verbose:
1992                            sys.stdout.write(" client:  read %r\n" % outdata)
1993                    if outdata != indata.lower():
1994                        raise AssertionError(
1995                            "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
1996                            % (outdata[:20], len(outdata),
1997                               indata[:20].lower(), len(indata)))
1998                s.write(b"over\n")
1999                if connectionchatty:
2000                    if support.verbose:
2001                        sys.stdout.write(" client:  closing connection.\n")
2002                stats.update({
2003                    'compression': s.compression(),
2004                    'cipher': s.cipher(),
2005                    'peercert': s.getpeercert(),
2006                    'client_alpn_protocol': s.selected_alpn_protocol(),
2007                    'client_npn_protocol': s.selected_npn_protocol(),
2008                    'version': s.version(),
2009                })
2010                s.close()
2011            stats['server_alpn_protocols'] = server.selected_alpn_protocols
2012            stats['server_npn_protocols'] = server.selected_npn_protocols
2013        return stats
2014
2015    def try_protocol_combo(server_protocol, client_protocol, expect_success,
2016                           certsreqs=None, server_options=0, client_options=0):
2017        """
2018        Try to SSL-connect using *client_protocol* to *server_protocol*.
2019        If *expect_success* is true, assert that the connection succeeds,
2020        if it's false, assert that the connection fails.
2021        Also, if *expect_success* is a string, assert that it is the protocol
2022        version actually used by the connection.
2023        """
2024        if certsreqs is None:
2025            certsreqs = ssl.CERT_NONE
2026        certtype = {
2027            ssl.CERT_NONE: "CERT_NONE",
2028            ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2029            ssl.CERT_REQUIRED: "CERT_REQUIRED",
2030        }[certsreqs]
2031        if support.verbose:
2032            formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2033            sys.stdout.write(formatstr %
2034                             (ssl.get_protocol_name(client_protocol),
2035                              ssl.get_protocol_name(server_protocol),
2036                              certtype))
2037        client_context = ssl.SSLContext(client_protocol)
2038        client_context.options |= client_options
2039        server_context = ssl.SSLContext(server_protocol)
2040        server_context.options |= server_options
2041
2042        # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2043        # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2044        # starting from OpenSSL 1.0.0 (see issue #8322).
2045        if client_context.protocol == ssl.PROTOCOL_SSLv23:
2046            client_context.set_ciphers("ALL")
2047
2048        for ctx in (client_context, server_context):
2049            ctx.verify_mode = certsreqs
2050            ctx.load_cert_chain(CERTFILE)
2051            ctx.load_verify_locations(CERTFILE)
2052        try:
2053            stats = server_params_test(client_context, server_context,
2054                                       chatty=False, connectionchatty=False)
2055        # Protocol mismatch can result in either an SSLError, or a
2056        # "Connection reset by peer" error.
2057        except ssl.SSLError:
2058            if expect_success:
2059                raise
2060        except socket.error as e:
2061            if expect_success or e.errno != errno.ECONNRESET:
2062                raise
2063        else:
2064            if not expect_success:
2065                raise AssertionError(
2066                    "Client protocol %s succeeded with server protocol %s!"
2067                    % (ssl.get_protocol_name(client_protocol),
2068                       ssl.get_protocol_name(server_protocol)))
2069            elif (expect_success is not True
2070                  and expect_success != stats['version']):
2071                raise AssertionError("version mismatch: expected %r, got %r"
2072                                     % (expect_success, stats['version']))
2073
2074
2075    class ThreadedTests(unittest.TestCase):
2076
2077        @skip_if_broken_ubuntu_ssl
2078        def test_echo(self):
2079            """Basic test of an SSL client connecting to a server"""
2080            if support.verbose:
2081                sys.stdout.write("\n")
2082            for protocol in PROTOCOLS:
2083                context = ssl.SSLContext(protocol)
2084                context.load_cert_chain(CERTFILE)
2085                server_params_test(context, context,
2086                                   chatty=True, connectionchatty=True)
2087
2088        def test_getpeercert(self):
2089            if support.verbose:
2090                sys.stdout.write("\n")
2091            context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2092            context.verify_mode = ssl.CERT_REQUIRED
2093            context.load_verify_locations(CERTFILE)
2094            context.load_cert_chain(CERTFILE)
2095            server = ThreadedEchoServer(context=context, chatty=False)
2096            with server:
2097                s = context.wrap_socket(socket.socket(),
2098                                        do_handshake_on_connect=False)
2099                s.connect((HOST, server.port))
2100                # getpeercert() raise ValueError while the handshake isn't
2101                # done.
2102                with self.assertRaises(ValueError):
2103                    s.getpeercert()
2104                s.do_handshake()
2105                cert = s.getpeercert()
2106                self.assertTrue(cert, "Can't get peer certificate.")
2107                cipher = s.cipher()
2108                if support.verbose:
2109                    sys.stdout.write(pprint.pformat(cert) + '\n')
2110                    sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2111                if 'subject' not in cert:
2112                    self.fail("No subject field in certificate: %s." %
2113                              pprint.pformat(cert))
2114                if ((('organizationName', 'Python Software Foundation'),)
2115                    not in cert['subject']):
2116                    self.fail(
2117                        "Missing or invalid 'organizationName' field in certificate subject; "
2118                        "should be 'Python Software Foundation'.")
2119                self.assertIn('notBefore', cert)
2120                self.assertIn('notAfter', cert)
2121                before = ssl.cert_time_to_seconds(cert['notBefore'])
2122                after = ssl.cert_time_to_seconds(cert['notAfter'])
2123                self.assertLess(before, after)
2124                s.close()
2125
2126        @unittest.skipUnless(have_verify_flags(),
2127                            "verify_flags need OpenSSL > 0.9.8")
2128        def test_crl_check(self):
2129            if support.verbose:
2130                sys.stdout.write("\n")
2131
2132            server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2133            server_context.load_cert_chain(SIGNED_CERTFILE)
2134
2135            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2136            context.verify_mode = ssl.CERT_REQUIRED
2137            context.load_verify_locations(SIGNING_CA)
2138            tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
2139            self.assertEqual(context.verify_flags, ssl.VERIFY_DEFAULT | tf)
2140
2141            # VERIFY_DEFAULT should pass
2142            server = ThreadedEchoServer(context=server_context, chatty=True)
2143            with server:
2144                with closing(context.wrap_socket(socket.socket())) as s:
2145                    s.connect((HOST, server.port))
2146                    cert = s.getpeercert()
2147                    self.assertTrue(cert, "Can't get peer certificate.")
2148
2149            # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
2150            context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
2151
2152            server = ThreadedEchoServer(context=server_context, chatty=True)
2153            with server:
2154                with closing(context.wrap_socket(socket.socket())) as s:
2155                    with self.assertRaisesRegexp(ssl.SSLError,
2156                                                "certificate verify failed"):
2157                        s.connect((HOST, server.port))
2158
2159            # now load a CRL file. The CRL file is signed by the CA.
2160            context.load_verify_locations(CRLFILE)
2161
2162            server = ThreadedEchoServer(context=server_context, chatty=True)
2163            with server:
2164                with closing(context.wrap_socket(socket.socket())) as s:
2165                    s.connect((HOST, server.port))
2166                    cert = s.getpeercert()
2167                    self.assertTrue(cert, "Can't get peer certificate.")
2168
2169        def test_check_hostname(self):
2170            if support.verbose:
2171                sys.stdout.write("\n")
2172
2173            server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2174            server_context.load_cert_chain(SIGNED_CERTFILE)
2175
2176            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2177            context.verify_mode = ssl.CERT_REQUIRED
2178            context.check_hostname = True
2179            context.load_verify_locations(SIGNING_CA)
2180
2181            # correct hostname should verify
2182            server = ThreadedEchoServer(context=server_context, chatty=True)
2183            with server:
2184                with closing(context.wrap_socket(socket.socket(),
2185                                                 server_hostname="localhost")) as s:
2186                    s.connect((HOST, server.port))
2187                    cert = s.getpeercert()
2188                    self.assertTrue(cert, "Can't get peer certificate.")
2189
2190            # incorrect hostname should raise an exception
2191            server = ThreadedEchoServer(context=server_context, chatty=True)
2192            with server:
2193                with closing(context.wrap_socket(socket.socket(),
2194                                                 server_hostname="invalid")) as s:
2195                    with self.assertRaisesRegexp(ssl.CertificateError,
2196                                                "hostname 'invalid' doesn't match u?'localhost'"):
2197                        s.connect((HOST, server.port))
2198
2199            # missing server_hostname arg should cause an exception, too
2200            server = ThreadedEchoServer(context=server_context, chatty=True)
2201            with server:
2202                with closing(socket.socket()) as s:
2203                    with self.assertRaisesRegexp(ValueError,
2204                                                "check_hostname requires server_hostname"):
2205                        context.wrap_socket(s)
2206
2207        def test_wrong_cert(self):
2208            """Connecting when the server rejects the client's certificate
2209
2210            Launch a server with CERT_REQUIRED, and check that trying to
2211            connect to it with a wrong client certificate fails.
2212            """
2213            certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
2214                                       "keycert.pem")
2215            server = ThreadedEchoServer(SIGNED_CERTFILE,
2216                                        certreqs=ssl.CERT_REQUIRED,
2217                                        cacerts=SIGNING_CA, chatty=False,
2218                                        connectionchatty=False)
2219            with server, \
2220                    closing(socket.socket()) as sock, \
2221                    closing(ssl.wrap_socket(sock,
2222                                        certfile=certfile,
2223                                        ssl_version=ssl.PROTOCOL_TLSv1)) as s:
2224                try:
2225                    # Expect either an SSL error about the server rejecting
2226                    # the connection, or a low-level connection reset (which
2227                    # sometimes happens on Windows)
2228                    s.connect((HOST, server.port))
2229                except ssl.SSLError as e:
2230                    if support.verbose:
2231                        sys.stdout.write("\nSSLError is %r\n" % e)
2232                except socket.error as e:
2233                    if e.errno != errno.ECONNRESET:
2234                        raise
2235                    if support.verbose:
2236                        sys.stdout.write("\nsocket.error is %r\n" % e)
2237                else:
2238                    self.fail("Use of invalid cert should have failed!")
2239
2240        def test_rude_shutdown(self):
2241            """A brutal shutdown of an SSL server should raise an OSError
2242            in the client when attempting handshake.
2243            """
2244            listener_ready = threading.Event()
2245            listener_gone = threading.Event()
2246
2247            s = socket.socket()
2248            port = support.bind_port(s, HOST)
2249
2250            # `listener` runs in a thread.  It sits in an accept() until
2251            # the main thread connects.  Then it rudely closes the socket,
2252            # and sets Event `listener_gone` to let the main thread know
2253            # the socket is gone.
2254            def listener():
2255                s.listen(5)
2256                listener_ready.set()
2257                newsock, addr = s.accept()
2258                newsock.close()
2259                s.close()
2260                listener_gone.set()
2261
2262            def connector():
2263                listener_ready.wait()
2264                with closing(socket.socket()) as c:
2265                    c.connect((HOST, port))
2266                    listener_gone.wait()
2267                    try:
2268                        ssl_sock = ssl.wrap_socket(c)
2269                    except socket.error:
2270                        pass
2271                    else:
2272                        self.fail('connecting to closed SSL socket should have failed')
2273
2274            t = threading.Thread(target=listener)
2275            t.start()
2276            try:
2277                connector()
2278            finally:
2279                t.join()
2280
2281        @skip_if_broken_ubuntu_ssl
2282        @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2283                             "OpenSSL is compiled without SSLv2 support")
2284        def test_protocol_sslv2(self):
2285            """Connecting to an SSLv2 server with various client options"""
2286            if support.verbose:
2287                sys.stdout.write("\n")
2288            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2289            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2290            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
2291            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False)
2292            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
2293            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
2294            # SSLv23 client with specific SSL options
2295            if no_sslv2_implies_sslv3_hello():
2296                # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
2297                try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2298                                   client_options=ssl.OP_NO_SSLv2)
2299            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2300                               client_options=ssl.OP_NO_SSLv3)
2301            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2302                               client_options=ssl.OP_NO_TLSv1)
2303
2304        @skip_if_broken_ubuntu_ssl
2305        def test_protocol_sslv23(self):
2306            """Connecting to an SSLv23 server with various client options"""
2307            if support.verbose:
2308                sys.stdout.write("\n")
2309            if hasattr(ssl, 'PROTOCOL_SSLv2'):
2310                try:
2311                    try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
2312                except socket.error as x:
2313                    # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2314                    if support.verbose:
2315                        sys.stdout.write(
2316                            " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2317                            % str(x))
2318            if hasattr(ssl, 'PROTOCOL_SSLv3'):
2319                try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False)
2320            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
2321            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1')
2322
2323            if hasattr(ssl, 'PROTOCOL_SSLv3'):
2324                try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
2325            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
2326            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2327
2328            if hasattr(ssl, 'PROTOCOL_SSLv3'):
2329                try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
2330            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
2331            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
2332
2333            # Server with specific SSL options
2334            if hasattr(ssl, 'PROTOCOL_SSLv3'):
2335                try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False,
2336                               server_options=ssl.OP_NO_SSLv3)
2337            # Will choose TLSv1
2338            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True,
2339                               server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
2340            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, False,
2341                               server_options=ssl.OP_NO_TLSv1)
2342
2343
2344        @skip_if_broken_ubuntu_ssl
2345        @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
2346                             "OpenSSL is compiled without SSLv3 support")
2347        def test_protocol_sslv3(self):
2348            """Connecting to an SSLv3 server with various client options"""
2349            if support.verbose:
2350                sys.stdout.write("\n")
2351            try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
2352            try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
2353            try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
2354            if hasattr(ssl, 'PROTOCOL_SSLv2'):
2355                try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
2356            try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False,
2357                               client_options=ssl.OP_NO_SSLv3)
2358            try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
2359            if no_sslv2_implies_sslv3_hello():
2360                # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
2361                try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23,
2362                                   False, client_options=ssl.OP_NO_SSLv2)
2363
2364        @skip_if_broken_ubuntu_ssl
2365        def test_protocol_tlsv1(self):
2366            """Connecting to a TLSv1 server with various client options"""
2367            if support.verbose:
2368                sys.stdout.write("\n")
2369            try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
2370            try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2371            try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
2372            if hasattr(ssl, 'PROTOCOL_SSLv2'):
2373                try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
2374            if hasattr(ssl, 'PROTOCOL_SSLv3'):
2375                try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
2376            try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False,
2377                               client_options=ssl.OP_NO_TLSv1)
2378
2379        @skip_if_broken_ubuntu_ssl
2380        @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2381                             "TLS version 1.1 not supported.")
2382        def test_protocol_tlsv1_1(self):
2383            """Connecting to a TLSv1.1 server with various client options.
2384               Testing against older TLS versions."""
2385            if support.verbose:
2386                sys.stdout.write("\n")
2387            try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
2388            if hasattr(ssl, 'PROTOCOL_SSLv2'):
2389                try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
2390            if hasattr(ssl, 'PROTOCOL_SSLv3'):
2391                try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
2392            try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv23, False,
2393                               client_options=ssl.OP_NO_TLSv1_1)
2394
2395            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
2396            try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2397            try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2398
2399
2400        @skip_if_broken_ubuntu_ssl
2401        @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2402                             "TLS version 1.2 not supported.")
2403        def test_protocol_tlsv1_2(self):
2404            """Connecting to a TLSv1.2 server with various client options.
2405               Testing against older TLS versions."""
2406            if support.verbose:
2407                sys.stdout.write("\n")
2408            try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
2409                               server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2410                               client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2411            if hasattr(ssl, 'PROTOCOL_SSLv2'):
2412                try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
2413            if hasattr(ssl, 'PROTOCOL_SSLv3'):
2414                try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
2415            try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv23, False,
2416                               client_options=ssl.OP_NO_TLSv1_2)
2417
2418            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
2419            try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2420            try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
2421            try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
2422            try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
2423
2424        def test_starttls(self):
2425            """Switching from clear text to encrypted and back again."""
2426            msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
2427
2428            server = ThreadedEchoServer(CERTFILE,
2429                                        ssl_version=ssl.PROTOCOL_TLSv1,
2430                                        starttls_server=True,
2431                                        chatty=True,
2432                                        connectionchatty=True)
2433            wrapped = False
2434            with server:
2435                s = socket.socket()
2436                s.setblocking(1)
2437                s.connect((HOST, server.port))
2438                if support.verbose:
2439                    sys.stdout.write("\n")
2440                for indata in msgs:
2441                    if support.verbose:
2442                        sys.stdout.write(
2443                            " client:  sending %r...\n" % indata)
2444                    if wrapped:
2445                        conn.write(indata)
2446                        outdata = conn.read()
2447                    else:
2448                        s.send(indata)
2449                        outdata = s.recv(1024)
2450                    msg = outdata.strip().lower()
2451                    if indata == b"STARTTLS" and msg.startswith(b"ok"):
2452                        # STARTTLS ok, switch to secure mode
2453                        if support.verbose:
2454                            sys.stdout.write(
2455                                " client:  read %r from server, starting TLS...\n"
2456                                % msg)
2457                        conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
2458                        wrapped = True
2459                    elif indata == b"ENDTLS" and msg.startswith(b"ok"):
2460                        # ENDTLS ok, switch back to clear text
2461                        if support.verbose:
2462                            sys.stdout.write(
2463                                " client:  read %r from server, ending TLS...\n"
2464                                % msg)
2465                        s = conn.unwrap()
2466                        wrapped = False
2467                    else:
2468                        if support.verbose:
2469                            sys.stdout.write(
2470                                " client:  read %r from server\n" % msg)
2471                if support.verbose:
2472                    sys.stdout.write(" client:  closing connection.\n")
2473                if wrapped:
2474                    conn.write(b"over\n")
2475                else:
2476                    s.send(b"over\n")
2477                if wrapped:
2478                    conn.close()
2479                else:
2480                    s.close()
2481
2482        def test_socketserver(self):
2483            """Using a SocketServer to create and manage SSL connections."""
2484            server = make_https_server(self, certfile=CERTFILE)
2485            # try to connect
2486            if support.verbose:
2487                sys.stdout.write('\n')
2488            with open(CERTFILE, 'rb') as f:
2489                d1 = f.read()
2490            d2 = ''
2491            # now fetch the same data from the HTTPS server
2492            url = 'https://localhost:%d/%s' % (
2493                server.port, os.path.split(CERTFILE)[1])
2494            context = ssl.create_default_context(cafile=CERTFILE)
2495            f = urllib2.urlopen(url, context=context)
2496            try:
2497                dlen = f.info().getheader("content-length")
2498                if dlen and (int(dlen) > 0):
2499                    d2 = f.read(int(dlen))
2500                    if support.verbose:
2501                        sys.stdout.write(
2502                            " client: read %d bytes from remote server '%s'\n"
2503                            % (len(d2), server))
2504            finally:
2505                f.close()
2506            self.assertEqual(d1, d2)
2507
2508        def test_asyncore_server(self):
2509            """Check the example asyncore integration."""
2510            if support.verbose:
2511                sys.stdout.write("\n")
2512
2513            indata = b"FOO\n"
2514            server = AsyncoreEchoServer(CERTFILE)
2515            with server:
2516                s = ssl.wrap_socket(socket.socket())
2517                s.connect(('127.0.0.1', server.port))
2518                if support.verbose:
2519                    sys.stdout.write(
2520                        " client:  sending %r...\n" % indata)
2521                s.write(indata)
2522                outdata = s.read()
2523                if support.verbose:
2524                    sys.stdout.write(" client:  read %r\n" % outdata)
2525                if outdata != indata.lower():
2526                    self.fail(
2527                        "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2528                        % (outdata[:20], len(outdata),
2529                           indata[:20].lower(), len(indata)))
2530                s.write(b"over\n")
2531                if support.verbose:
2532                    sys.stdout.write(" client:  closing connection.\n")
2533                s.close()
2534                if support.verbose:
2535                    sys.stdout.write(" client:  connection closed.\n")
2536
2537        def test_recv_send(self):
2538            """Test recv(), send() and friends."""
2539            if support.verbose:
2540                sys.stdout.write("\n")
2541
2542            server = ThreadedEchoServer(CERTFILE,
2543                                        certreqs=ssl.CERT_NONE,
2544                                        ssl_version=ssl.PROTOCOL_TLSv1,
2545                                        cacerts=CERTFILE,
2546                                        chatty=True,
2547                                        connectionchatty=False)
2548            with server:
2549                s = ssl.wrap_socket(socket.socket(),
2550                                    server_side=False,
2551                                    certfile=CERTFILE,
2552                                    ca_certs=CERTFILE,
2553                                    cert_reqs=ssl.CERT_NONE,
2554                                    ssl_version=ssl.PROTOCOL_TLSv1)
2555                s.connect((HOST, server.port))
2556                # helper methods for standardising recv* method signatures
2557                def _recv_into():
2558                    b = bytearray(b"\0"*100)
2559                    count = s.recv_into(b)
2560                    return b[:count]
2561
2562                def _recvfrom_into():
2563                    b = bytearray(b"\0"*100)
2564                    count, addr = s.recvfrom_into(b)
2565                    return b[:count]
2566
2567                # (name, method, whether to expect success, *args)
2568                send_methods = [
2569                    ('send', s.send, True, []),
2570                    ('sendto', s.sendto, False, ["some.address"]),
2571                    ('sendall', s.sendall, True, []),
2572                ]
2573                recv_methods = [
2574                    ('recv', s.recv, True, []),
2575                    ('recvfrom', s.recvfrom, False, ["some.address"]),
2576                    ('recv_into', _recv_into, True, []),
2577                    ('recvfrom_into', _recvfrom_into, False, []),
2578                ]
2579                data_prefix = u"PREFIX_"
2580
2581                for meth_name, send_meth, expect_success, args in send_methods:
2582                    indata = (data_prefix + meth_name).encode('ascii')
2583                    try:
2584                        send_meth(indata, *args)
2585                        outdata = s.read()
2586                        if outdata != indata.lower():
2587                            self.fail(
2588                                "While sending with <<{name:s}>> bad data "
2589                                "<<{outdata:r}>> ({nout:d}) received; "
2590                                "expected <<{indata:r}>> ({nin:d})\n".format(
2591                                    name=meth_name, outdata=outdata[:20],
2592                                    nout=len(outdata),
2593                                    indata=indata[:20], nin=len(indata)
2594                                )
2595                            )
2596                    except ValueError as e:
2597                        if expect_success:
2598                            self.fail(
2599                                "Failed to send with method <<{name:s}>>; "
2600                                "expected to succeed.\n".format(name=meth_name)
2601                            )
2602                        if not str(e).startswith(meth_name):
2603                            self.fail(
2604                                "Method <<{name:s}>> failed with unexpected "
2605                                "exception message: {exp:s}\n".format(
2606                                    name=meth_name, exp=e
2607                                )
2608                            )
2609
2610                for meth_name, recv_meth, expect_success, args in recv_methods:
2611                    indata = (data_prefix + meth_name).encode('ascii')
2612                    try:
2613                        s.send(indata)
2614                        outdata = recv_meth(*args)
2615                        if outdata != indata.lower():
2616                            self.fail(
2617                                "While receiving with <<{name:s}>> bad data "
2618                                "<<{outdata:r}>> ({nout:d}) received; "
2619                                "expected <<{indata:r}>> ({nin:d})\n".format(
2620                                    name=meth_name, outdata=outdata[:20],
2621                                    nout=len(outdata),
2622                                    indata=indata[:20], nin=len(indata)
2623                                )
2624                            )
2625                    except ValueError as e:
2626                        if expect_success:
2627                            self.fail(
2628                                "Failed to receive with method <<{name:s}>>; "
2629                                "expected to succeed.\n".format(name=meth_name)
2630                            )
2631                        if not str(e).startswith(meth_name):
2632                            self.fail(
2633                                "Method <<{name:s}>> failed with unexpected "
2634                                "exception message: {exp:s}\n".format(
2635                                    name=meth_name, exp=e
2636                                )
2637                            )
2638                        # consume data
2639                        s.read()
2640
2641                # read(-1, buffer) is supported, even though read(-1) is not
2642                data = b"data"
2643                s.send(data)
2644                buffer = bytearray(len(data))
2645                self.assertEqual(s.read(-1, buffer), len(data))
2646                self.assertEqual(buffer, data)
2647
2648                s.write(b"over\n")
2649
2650                self.assertRaises(ValueError, s.recv, -1)
2651                self.assertRaises(ValueError, s.read, -1)
2652
2653                s.close()
2654
2655        def test_recv_zero(self):
2656            server = ThreadedEchoServer(CERTFILE)
2657            server.__enter__()
2658            self.addCleanup(server.__exit__, None, None)
2659            s = socket.create_connection((HOST, server.port))
2660            self.addCleanup(s.close)
2661            s = ssl.wrap_socket(s, suppress_ragged_eofs=False)
2662            self.addCleanup(s.close)
2663
2664            # recv/read(0) should return no data
2665            s.send(b"data")
2666            self.assertEqual(s.recv(0), b"")
2667            self.assertEqual(s.read(0), b"")
2668            self.assertEqual(s.read(), b"data")
2669
2670            # Should not block if the other end sends no data
2671            s.setblocking(False)
2672            self.assertEqual(s.recv(0), b"")
2673            self.assertEqual(s.recv_into(bytearray()), 0)
2674
2675        def test_handshake_timeout(self):
2676            # Issue #5103: SSL handshake must respect the socket timeout
2677            server = socket.socket(socket.AF_INET)
2678            host = "127.0.0.1"
2679            port = support.bind_port(server)
2680            started = threading.Event()
2681            finish = False
2682
2683            def serve():
2684                server.listen(5)
2685                started.set()
2686                conns = []
2687                while not finish:
2688                    r, w, e = select.select([server], [], [], 0.1)
2689                    if server in r:
2690                        # Let the socket hang around rather than having
2691                        # it closed by garbage collection.
2692                        conns.append(server.accept()[0])
2693                for sock in conns:
2694                    sock.close()
2695
2696            t = threading.Thread(target=serve)
2697            t.start()
2698            started.wait()
2699
2700            try:
2701                try:
2702                    c = socket.socket(socket.AF_INET)
2703                    c.settimeout(0.2)
2704                    c.connect((host, port))
2705                    # Will attempt handshake and time out
2706                    self.assertRaisesRegexp(ssl.SSLError, "timed out",
2707                                            ssl.wrap_socket, c)
2708                finally:
2709                    c.close()
2710                try:
2711                    c = socket.socket(socket.AF_INET)
2712                    c = ssl.wrap_socket(c)
2713                    c.settimeout(0.2)
2714                    # Will attempt handshake and time out
2715                    self.assertRaisesRegexp(ssl.SSLError, "timed out",
2716                                            c.connect, (host, port))
2717                finally:
2718                    c.close()
2719            finally:
2720                finish = True
2721                t.join()
2722                server.close()
2723
2724        def test_server_accept(self):
2725            # Issue #16357: accept() on a SSLSocket created through
2726            # SSLContext.wrap_socket().
2727            context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2728            context.verify_mode = ssl.CERT_REQUIRED
2729            context.load_verify_locations(CERTFILE)
2730            context.load_cert_chain(CERTFILE)
2731            server = socket.socket(socket.AF_INET)
2732            host = "127.0.0.1"
2733            port = support.bind_port(server)
2734            server = context.wrap_socket(server, server_side=True)
2735
2736            evt = threading.Event()
2737            remote = [None]
2738            peer = [None]
2739            def serve():
2740                server.listen(5)
2741                # Block on the accept and wait on the connection to close.
2742                evt.set()
2743                remote[0], peer[0] = server.accept()
2744                remote[0].recv(1)
2745
2746            t = threading.Thread(target=serve)
2747            t.start()
2748            # Client wait until server setup and perform a connect.
2749            evt.wait()
2750            client = context.wrap_socket(socket.socket())
2751            client.connect((host, port))
2752            client_addr = client.getsockname()
2753            client.close()
2754            t.join()
2755            remote[0].close()
2756            server.close()
2757            # Sanity checks.
2758            self.assertIsInstance(remote[0], ssl.SSLSocket)
2759            self.assertEqual(peer[0], client_addr)
2760
2761        def test_getpeercert_enotconn(self):
2762            context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2763            with closing(context.wrap_socket(socket.socket())) as sock:
2764                with self.assertRaises(socket.error) as cm:
2765                    sock.getpeercert()
2766                self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2767
2768        def test_do_handshake_enotconn(self):
2769            context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2770            with closing(context.wrap_socket(socket.socket())) as sock:
2771                with self.assertRaises(socket.error) as cm:
2772                    sock.do_handshake()
2773                self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2774
2775        def test_default_ciphers(self):
2776            context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2777            try:
2778                # Force a set of weak ciphers on our client context
2779                context.set_ciphers("DES")
2780            except ssl.SSLError:
2781                self.skipTest("no DES cipher available")
2782            with ThreadedEchoServer(CERTFILE,
2783                                    ssl_version=ssl.PROTOCOL_SSLv23,
2784                                    chatty=False) as server:
2785                with closing(context.wrap_socket(socket.socket())) as s:
2786                    with self.assertRaises(ssl.SSLError):
2787                        s.connect((HOST, server.port))
2788            self.assertIn("no shared cipher", str(server.conn_errors[0]))
2789
2790        def test_version_basic(self):
2791            """
2792            Basic tests for SSLSocket.version().
2793            More tests are done in the test_protocol_*() methods.
2794            """
2795            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2796            with ThreadedEchoServer(CERTFILE,
2797                                    ssl_version=ssl.PROTOCOL_TLSv1,
2798                                    chatty=False) as server:
2799                with closing(context.wrap_socket(socket.socket())) as s:
2800                    self.assertIs(s.version(), None)
2801                    s.connect((HOST, server.port))
2802                    self.assertEqual(s.version(), 'TLSv1')
2803                self.assertIs(s.version(), None)
2804
2805        @unittest.skipUnless(ssl.HAS_TLSv1_3,
2806                             "test requires TLSv1.3 enabled OpenSSL")
2807        def test_tls1_3(self):
2808            context = ssl.SSLContext(ssl.PROTOCOL_TLS)
2809            context.load_cert_chain(CERTFILE)
2810            # disable all but TLS 1.3
2811            context.options |= (
2812                ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
2813            )
2814            with ThreadedEchoServer(context=context) as server:
2815                with context.wrap_socket(socket.socket()) as s:
2816                    s.connect((HOST, server.port))
2817                    self.assertIn(s.cipher()[0], [
2818                        'TLS13-AES-256-GCM-SHA384',
2819                        'TLS13-CHACHA20-POLY1305-SHA256',
2820                        'TLS13-AES-128-GCM-SHA256',
2821                    ])
2822
2823        @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
2824        def test_default_ecdh_curve(self):
2825            # Issue #21015: elliptic curve-based Diffie Hellman key exchange
2826            # should be enabled by default on SSL contexts.
2827            context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2828            context.load_cert_chain(CERTFILE)
2829            # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
2830            # explicitly using the 'ECCdraft' cipher alias.  Otherwise,
2831            # our default cipher list should prefer ECDH-based ciphers
2832            # automatically.
2833            if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
2834                context.set_ciphers("ECCdraft:ECDH")
2835            with ThreadedEchoServer(context=context) as server:
2836                with closing(context.wrap_socket(socket.socket())) as s:
2837                    s.connect((HOST, server.port))
2838                    self.assertIn("ECDH", s.cipher()[0])
2839
2840        @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
2841                             "'tls-unique' channel binding not available")
2842        def test_tls_unique_channel_binding(self):
2843            """Test tls-unique channel binding."""
2844            if support.verbose:
2845                sys.stdout.write("\n")
2846
2847            server = ThreadedEchoServer(CERTFILE,
2848                                        certreqs=ssl.CERT_NONE,
2849                                        ssl_version=ssl.PROTOCOL_TLSv1,
2850                                        cacerts=CERTFILE,
2851                                        chatty=True,
2852                                        connectionchatty=False)
2853            with server:
2854                s = ssl.wrap_socket(socket.socket(),
2855                                    server_side=False,
2856                                    certfile=CERTFILE,
2857                                    ca_certs=CERTFILE,
2858                                    cert_reqs=ssl.CERT_NONE,
2859                                    ssl_version=ssl.PROTOCOL_TLSv1)
2860                s.connect((HOST, server.port))
2861                # get the data
2862                cb_data = s.get_channel_binding("tls-unique")
2863                if support.verbose:
2864                    sys.stdout.write(" got channel binding data: {0!r}\n"
2865                                     .format(cb_data))
2866
2867                # check if it is sane
2868                self.assertIsNotNone(cb_data)
2869                self.assertEqual(len(cb_data), 12) # True for TLSv1
2870
2871                # and compare with the peers version
2872                s.write(b"CB tls-unique\n")
2873                peer_data_repr = s.read().strip()
2874                self.assertEqual(peer_data_repr,
2875                                 repr(cb_data).encode("us-ascii"))
2876                s.close()
2877
2878                # now, again
2879                s = ssl.wrap_socket(socket.socket(),
2880                                    server_side=False,
2881                                    certfile=CERTFILE,
2882                                    ca_certs=CERTFILE,
2883                                    cert_reqs=ssl.CERT_NONE,
2884                                    ssl_version=ssl.PROTOCOL_TLSv1)
2885                s.connect((HOST, server.port))
2886                new_cb_data = s.get_channel_binding("tls-unique")
2887                if support.verbose:
2888                    sys.stdout.write(" got another channel binding data: {0!r}\n"
2889                                     .format(new_cb_data))
2890                # is it really unique
2891                self.assertNotEqual(cb_data, new_cb_data)
2892                self.assertIsNotNone(cb_data)
2893                self.assertEqual(len(cb_data), 12) # True for TLSv1
2894                s.write(b"CB tls-unique\n")
2895                peer_data_repr = s.read().strip()
2896                self.assertEqual(peer_data_repr,
2897                                 repr(new_cb_data).encode("us-ascii"))
2898                s.close()
2899
2900        def test_compression(self):
2901            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2902            context.load_cert_chain(CERTFILE)
2903            stats = server_params_test(context, context,
2904                                       chatty=True, connectionchatty=True)
2905            if support.verbose:
2906                sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
2907            self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
2908
2909        @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
2910                             "ssl.OP_NO_COMPRESSION needed for this test")
2911        def test_compression_disabled(self):
2912            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2913            context.load_cert_chain(CERTFILE)
2914            context.options |= ssl.OP_NO_COMPRESSION
2915            stats = server_params_test(context, context,
2916                                       chatty=True, connectionchatty=True)
2917            self.assertIs(stats['compression'], None)
2918
2919        def test_dh_params(self):
2920            # Check we can get a connection with ephemeral Diffie-Hellman
2921            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2922            context.load_cert_chain(CERTFILE)
2923            context.load_dh_params(DHFILE)
2924            context.set_ciphers("kEDH")
2925            stats = server_params_test(context, context,
2926                                       chatty=True, connectionchatty=True)
2927            cipher = stats["cipher"][0]
2928            parts = cipher.split("-")
2929            if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
2930                self.fail("Non-DH cipher: " + cipher[0])
2931
2932        def test_selected_alpn_protocol(self):
2933            # selected_alpn_protocol() is None unless ALPN is used.
2934            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2935            context.load_cert_chain(CERTFILE)
2936            stats = server_params_test(context, context,
2937                                       chatty=True, connectionchatty=True)
2938            self.assertIs(stats['client_alpn_protocol'], None)
2939
2940        @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
2941        def test_selected_alpn_protocol_if_server_uses_alpn(self):
2942            # selected_alpn_protocol() is None unless ALPN is used by the client.
2943            client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2944            client_context.load_verify_locations(CERTFILE)
2945            server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2946            server_context.load_cert_chain(CERTFILE)
2947            server_context.set_alpn_protocols(['foo', 'bar'])
2948            stats = server_params_test(client_context, server_context,
2949                                       chatty=True, connectionchatty=True)
2950            self.assertIs(stats['client_alpn_protocol'], None)
2951
2952        @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
2953        def test_alpn_protocols(self):
2954            server_protocols = ['foo', 'bar', 'milkshake']
2955            protocol_tests = [
2956                (['foo', 'bar'], 'foo'),
2957                (['bar', 'foo'], 'foo'),
2958                (['milkshake'], 'milkshake'),
2959                (['http/3.0', 'http/4.0'], None)
2960            ]
2961            for client_protocols, expected in protocol_tests:
2962                server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
2963                server_context.load_cert_chain(CERTFILE)
2964                server_context.set_alpn_protocols(server_protocols)
2965                client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
2966                client_context.load_cert_chain(CERTFILE)
2967                client_context.set_alpn_protocols(client_protocols)
2968
2969                try:
2970                    stats = server_params_test(client_context,
2971                                               server_context,
2972                                               chatty=True,
2973                                               connectionchatty=True)
2974                except ssl.SSLError as e:
2975                    stats = e
2976
2977                if (expected is None and IS_OPENSSL_1_1
2978                        and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
2979                    # OpenSSL 1.1.0 to 1.1.0e raises handshake error
2980                    self.assertIsInstance(stats, ssl.SSLError)
2981                else:
2982                    msg = "failed trying %s (s) and %s (c).\n" \
2983                        "was expecting %s, but got %%s from the %%s" \
2984                            % (str(server_protocols), str(client_protocols),
2985                                str(expected))
2986                    client_result = stats['client_alpn_protocol']
2987                    self.assertEqual(client_result, expected,
2988                                     msg % (client_result, "client"))
2989                    server_result = stats['server_alpn_protocols'][-1] \
2990                        if len(stats['server_alpn_protocols']) else 'nothing'
2991                    self.assertEqual(server_result, expected,
2992                                     msg % (server_result, "server"))
2993
2994        def test_selected_npn_protocol(self):
2995            # selected_npn_protocol() is None unless NPN is used
2996            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2997            context.load_cert_chain(CERTFILE)
2998            stats = server_params_test(context, context,
2999                                       chatty=True, connectionchatty=True)
3000            self.assertIs(stats['client_npn_protocol'], None)
3001
3002        @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3003        def test_npn_protocols(self):
3004            server_protocols = ['http/1.1', 'spdy/2']
3005            protocol_tests = [
3006                (['http/1.1', 'spdy/2'], 'http/1.1'),
3007                (['spdy/2', 'http/1.1'], 'http/1.1'),
3008                (['spdy/2', 'test'], 'spdy/2'),
3009                (['abc', 'def'], 'abc')
3010            ]
3011            for client_protocols, expected in protocol_tests:
3012                server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3013                server_context.load_cert_chain(CERTFILE)
3014                server_context.set_npn_protocols(server_protocols)
3015                client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3016                client_context.load_cert_chain(CERTFILE)
3017                client_context.set_npn_protocols(client_protocols)
3018                stats = server_params_test(client_context, server_context,
3019                                           chatty=True, connectionchatty=True)
3020
3021                msg = "failed trying %s (s) and %s (c).\n" \
3022                      "was expecting %s, but got %%s from the %%s" \
3023                          % (str(server_protocols), str(client_protocols),
3024                             str(expected))
3025                client_result = stats['client_npn_protocol']
3026                self.assertEqual(client_result, expected, msg % (client_result, "client"))
3027                server_result = stats['server_npn_protocols'][-1] \
3028                    if len(stats['server_npn_protocols']) else 'nothing'
3029                self.assertEqual(server_result, expected, msg % (server_result, "server"))
3030
3031        def sni_contexts(self):
3032            server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3033            server_context.load_cert_chain(SIGNED_CERTFILE)
3034            other_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3035            other_context.load_cert_chain(SIGNED_CERTFILE2)
3036            client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3037            client_context.verify_mode = ssl.CERT_REQUIRED
3038            client_context.load_verify_locations(SIGNING_CA)
3039            return server_context, other_context, client_context
3040
3041        def check_common_name(self, stats, name):
3042            cert = stats['peercert']
3043            self.assertIn((('commonName', name),), cert['subject'])
3044
3045        @needs_sni
3046        def test_sni_callback(self):
3047            calls = []
3048            server_context, other_context, client_context = self.sni_contexts()
3049
3050            def servername_cb(ssl_sock, server_name, initial_context):
3051                calls.append((server_name, initial_context))
3052                if server_name is not None:
3053                    ssl_sock.context = other_context
3054            server_context.set_servername_callback(servername_cb)
3055
3056            stats = server_params_test(client_context, server_context,
3057                                       chatty=True,
3058                                       sni_name='supermessage')
3059            # The hostname was fetched properly, and the certificate was
3060            # changed for the connection.
3061            self.assertEqual(calls, [("supermessage", server_context)])
3062            # CERTFILE4 was selected
3063            self.check_common_name(stats, 'fakehostname')
3064
3065            calls = []
3066            # The callback is called with server_name=None
3067            stats = server_params_test(client_context, server_context,
3068                                       chatty=True,
3069                                       sni_name=None)
3070            self.assertEqual(calls, [(None, server_context)])
3071            self.check_common_name(stats, 'localhost')
3072
3073            # Check disabling the callback
3074            calls = []
3075            server_context.set_servername_callback(None)
3076
3077            stats = server_params_test(client_context, server_context,
3078                                       chatty=True,
3079                                       sni_name='notfunny')
3080            # Certificate didn't change
3081            self.check_common_name(stats, 'localhost')
3082            self.assertEqual(calls, [])
3083
3084        @needs_sni
3085        def test_sni_callback_alert(self):
3086            # Returning a TLS alert is reflected to the connecting client
3087            server_context, other_context, client_context = self.sni_contexts()
3088
3089            def cb_returning_alert(ssl_sock, server_name, initial_context):
3090                return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3091            server_context.set_servername_callback(cb_returning_alert)
3092
3093            with self.assertRaises(ssl.SSLError) as cm:
3094                stats = server_params_test(client_context, server_context,
3095                                           chatty=False,
3096                                           sni_name='supermessage')
3097            self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
3098
3099        @needs_sni
3100        def test_sni_callback_raising(self):
3101            # Raising fails the connection with a TLS handshake failure alert.
3102            server_context, other_context, client_context = self.sni_contexts()
3103
3104            def cb_raising(ssl_sock, server_name, initial_context):
3105                1.0/0.0
3106            server_context.set_servername_callback(cb_raising)
3107
3108            with self.assertRaises(ssl.SSLError) as cm, \
3109                 support.captured_stderr() as stderr:
3110                stats = server_params_test(client_context, server_context,
3111                                           chatty=False,
3112                                           sni_name='supermessage')
3113            self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
3114            self.assertIn("ZeroDivisionError", stderr.getvalue())
3115
3116        @needs_sni
3117        def test_sni_callback_wrong_return_type(self):
3118            # Returning the wrong return type terminates the TLS connection
3119            # with an internal error alert.
3120            server_context, other_context, client_context = self.sni_contexts()
3121
3122            def cb_wrong_return_type(ssl_sock, server_name, initial_context):
3123                return "foo"
3124            server_context.set_servername_callback(cb_wrong_return_type)
3125
3126            with self.assertRaises(ssl.SSLError) as cm, \
3127                 support.captured_stderr() as stderr:
3128                stats = server_params_test(client_context, server_context,
3129                                           chatty=False,
3130                                           sni_name='supermessage')
3131            self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
3132            self.assertIn("TypeError", stderr.getvalue())
3133
3134        def test_read_write_after_close_raises_valuerror(self):
3135            context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3136            context.verify_mode = ssl.CERT_REQUIRED
3137            context.load_verify_locations(CERTFILE)
3138            context.load_cert_chain(CERTFILE)
3139            server = ThreadedEchoServer(context=context, chatty=False)
3140
3141            with server:
3142                s = context.wrap_socket(socket.socket())
3143                s.connect((HOST, server.port))
3144                s.close()
3145
3146                self.assertRaises(ValueError, s.read, 1024)
3147                self.assertRaises(ValueError, s.write, b'hello')
3148
3149
3150def test_main(verbose=False):
3151    if support.verbose:
3152        plats = {
3153            'Linux': platform.linux_distribution,
3154            'Mac': platform.mac_ver,
3155            'Windows': platform.win32_ver,
3156        }
3157        for name, func in plats.items():
3158            plat = func()
3159            if plat and plat[0]:
3160                plat = '%s %r' % (name, plat)
3161                break
3162        else:
3163            plat = repr(platform.platform())
3164        print("test_ssl: testing with %r %r" %
3165            (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
3166        print("          under %s" % plat)
3167        print("          HAS_SNI = %r" % ssl.HAS_SNI)
3168        print("          OP_ALL = 0x%8x" % ssl.OP_ALL)
3169        try:
3170            print("          OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
3171        except AttributeError:
3172            pass
3173
3174    for filename in [
3175        CERTFILE, REMOTE_ROOT_CERT, BYTES_CERTFILE,
3176        ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
3177        SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
3178        BADCERT, BADKEY, EMPTYCERT]:
3179        if not os.path.exists(filename):
3180            raise support.TestFailed("Can't read certificate file %r" % filename)
3181
3182    tests = [ContextTests, BasicTests, BasicSocketTests, SSLErrorTests]
3183
3184    if support.is_resource_enabled('network'):
3185        tests.append(NetworkedTests)
3186
3187    if _have_threads:
3188        thread_info = support.threading_setup()
3189        if thread_info:
3190            tests.append(ThreadedTests)
3191
3192    try:
3193        support.run_unittest(*tests)
3194    finally:
3195        if _have_threads:
3196            support.threading_cleanup(*thread_info)
3197
3198if __name__ == "__main__":
3199    test_main()
3200