1import enum
2import errno
3from http import client, HTTPStatus
4import io
5import itertools
6import os
7import array
8import re
9import socket
10import threading
11import warnings
12
13import unittest
14from unittest import mock
15TestCase = unittest.TestCase
16
17from test import support
18from test.support import os_helper
19from test.support import socket_helper
20from test.support import warnings_helper
21
22support.requires_working_socket(module=True)
23
24here = os.path.dirname(__file__)
25# Self-signed cert file for 'localhost'
26CERT_localhost = os.path.join(here, 'keycert.pem')
27# Self-signed cert file for 'fakehostname'
28CERT_fakehostname = os.path.join(here, 'keycert2.pem')
29# Self-signed cert file for self-signed.pythontest.net
30CERT_selfsigned_pythontestdotnet = os.path.join(here, 'selfsigned_pythontestdotnet.pem')
31
32# constants for testing chunked encoding
33chunked_start = (
34    'HTTP/1.1 200 OK\r\n'
35    'Transfer-Encoding: chunked\r\n\r\n'
36    'a\r\n'
37    'hello worl\r\n'
38    '3\r\n'
39    'd! \r\n'
40    '8\r\n'
41    'and now \r\n'
42    '22\r\n'
43    'for something completely different\r\n'
44)
45chunked_expected = b'hello world! and now for something completely different'
46chunk_extension = ";foo=bar"
47last_chunk = "0\r\n"
48last_chunk_extended = "0" + chunk_extension + "\r\n"
49trailers = "X-Dummy: foo\r\nX-Dumm2: bar\r\n"
50chunked_end = "\r\n"
51
52HOST = socket_helper.HOST
53
54class FakeSocket:
55    def __init__(self, text, fileclass=io.BytesIO, host=None, port=None):
56        if isinstance(text, str):
57            text = text.encode("ascii")
58        self.text = text
59        self.fileclass = fileclass
60        self.data = b''
61        self.sendall_calls = 0
62        self.file_closed = False
63        self.host = host
64        self.port = port
65
66    def sendall(self, data):
67        self.sendall_calls += 1
68        self.data += data
69
70    def makefile(self, mode, bufsize=None):
71        if mode != 'r' and mode != 'rb':
72            raise client.UnimplementedFileMode()
73        # keep the file around so we can check how much was read from it
74        self.file = self.fileclass(self.text)
75        self.file.close = self.file_close #nerf close ()
76        return self.file
77
78    def file_close(self):
79        self.file_closed = True
80
81    def close(self):
82        pass
83
84    def setsockopt(self, level, optname, value):
85        pass
86
87class EPipeSocket(FakeSocket):
88
89    def __init__(self, text, pipe_trigger):
90        # When sendall() is called with pipe_trigger, raise EPIPE.
91        FakeSocket.__init__(self, text)
92        self.pipe_trigger = pipe_trigger
93
94    def sendall(self, data):
95        if self.pipe_trigger in data:
96            raise OSError(errno.EPIPE, "gotcha")
97        self.data += data
98
99    def close(self):
100        pass
101
102class NoEOFBytesIO(io.BytesIO):
103    """Like BytesIO, but raises AssertionError on EOF.
104
105    This is used below to test that http.client doesn't try to read
106    more from the underlying file than it should.
107    """
108    def read(self, n=-1):
109        data = io.BytesIO.read(self, n)
110        if data == b'':
111            raise AssertionError('caller tried to read past EOF')
112        return data
113
114    def readline(self, length=None):
115        data = io.BytesIO.readline(self, length)
116        if data == b'':
117            raise AssertionError('caller tried to read past EOF')
118        return data
119
120class FakeSocketHTTPConnection(client.HTTPConnection):
121    """HTTPConnection subclass using FakeSocket; counts connect() calls"""
122
123    def __init__(self, *args):
124        self.connections = 0
125        super().__init__('example.com')
126        self.fake_socket_args = args
127        self._create_connection = self.create_connection
128
129    def connect(self):
130        """Count the number of times connect() is invoked"""
131        self.connections += 1
132        return super().connect()
133
134    def create_connection(self, *pos, **kw):
135        return FakeSocket(*self.fake_socket_args)
136
137class HeaderTests(TestCase):
138    def test_auto_headers(self):
139        # Some headers are added automatically, but should not be added by
140        # .request() if they are explicitly set.
141
142        class HeaderCountingBuffer(list):
143            def __init__(self):
144                self.count = {}
145            def append(self, item):
146                kv = item.split(b':')
147                if len(kv) > 1:
148                    # item is a 'Key: Value' header string
149                    lcKey = kv[0].decode('ascii').lower()
150                    self.count.setdefault(lcKey, 0)
151                    self.count[lcKey] += 1
152                list.append(self, item)
153
154        for explicit_header in True, False:
155            for header in 'Content-length', 'Host', 'Accept-encoding':
156                conn = client.HTTPConnection('example.com')
157                conn.sock = FakeSocket('blahblahblah')
158                conn._buffer = HeaderCountingBuffer()
159
160                body = 'spamspamspam'
161                headers = {}
162                if explicit_header:
163                    headers[header] = str(len(body))
164                conn.request('POST', '/', body, headers)
165                self.assertEqual(conn._buffer.count[header.lower()], 1)
166
167    def test_content_length_0(self):
168
169        class ContentLengthChecker(list):
170            def __init__(self):
171                list.__init__(self)
172                self.content_length = None
173            def append(self, item):
174                kv = item.split(b':', 1)
175                if len(kv) > 1 and kv[0].lower() == b'content-length':
176                    self.content_length = kv[1].strip()
177                list.append(self, item)
178
179        # Here, we're testing that methods expecting a body get a
180        # content-length set to zero if the body is empty (either None or '')
181        bodies = (None, '')
182        methods_with_body = ('PUT', 'POST', 'PATCH')
183        for method, body in itertools.product(methods_with_body, bodies):
184            conn = client.HTTPConnection('example.com')
185            conn.sock = FakeSocket(None)
186            conn._buffer = ContentLengthChecker()
187            conn.request(method, '/', body)
188            self.assertEqual(
189                conn._buffer.content_length, b'0',
190                'Header Content-Length incorrect on {}'.format(method)
191            )
192
193        # For these methods, we make sure that content-length is not set when
194        # the body is None because it might cause unexpected behaviour on the
195        # server.
196        methods_without_body = (
197             'GET', 'CONNECT', 'DELETE', 'HEAD', 'OPTIONS', 'TRACE',
198        )
199        for method in methods_without_body:
200            conn = client.HTTPConnection('example.com')
201            conn.sock = FakeSocket(None)
202            conn._buffer = ContentLengthChecker()
203            conn.request(method, '/', None)
204            self.assertEqual(
205                conn._buffer.content_length, None,
206                'Header Content-Length set for empty body on {}'.format(method)
207            )
208
209        # If the body is set to '', that's considered to be "present but
210        # empty" rather than "missing", so content length would be set, even
211        # for methods that don't expect a body.
212        for method in methods_without_body:
213            conn = client.HTTPConnection('example.com')
214            conn.sock = FakeSocket(None)
215            conn._buffer = ContentLengthChecker()
216            conn.request(method, '/', '')
217            self.assertEqual(
218                conn._buffer.content_length, b'0',
219                'Header Content-Length incorrect on {}'.format(method)
220            )
221
222        # If the body is set, make sure Content-Length is set.
223        for method in itertools.chain(methods_without_body, methods_with_body):
224            conn = client.HTTPConnection('example.com')
225            conn.sock = FakeSocket(None)
226            conn._buffer = ContentLengthChecker()
227            conn.request(method, '/', ' ')
228            self.assertEqual(
229                conn._buffer.content_length, b'1',
230                'Header Content-Length incorrect on {}'.format(method)
231            )
232
233    def test_putheader(self):
234        conn = client.HTTPConnection('example.com')
235        conn.sock = FakeSocket(None)
236        conn.putrequest('GET','/')
237        conn.putheader('Content-length', 42)
238        self.assertIn(b'Content-length: 42', conn._buffer)
239
240        conn.putheader('Foo', ' bar ')
241        self.assertIn(b'Foo:  bar ', conn._buffer)
242        conn.putheader('Bar', '\tbaz\t')
243        self.assertIn(b'Bar: \tbaz\t', conn._buffer)
244        conn.putheader('Authorization', 'Bearer mytoken')
245        self.assertIn(b'Authorization: Bearer mytoken', conn._buffer)
246        conn.putheader('IterHeader', 'IterA', 'IterB')
247        self.assertIn(b'IterHeader: IterA\r\n\tIterB', conn._buffer)
248        conn.putheader('LatinHeader', b'\xFF')
249        self.assertIn(b'LatinHeader: \xFF', conn._buffer)
250        conn.putheader('Utf8Header', b'\xc3\x80')
251        self.assertIn(b'Utf8Header: \xc3\x80', conn._buffer)
252        conn.putheader('C1-Control', b'next\x85line')
253        self.assertIn(b'C1-Control: next\x85line', conn._buffer)
254        conn.putheader('Embedded-Fold-Space', 'is\r\n allowed')
255        self.assertIn(b'Embedded-Fold-Space: is\r\n allowed', conn._buffer)
256        conn.putheader('Embedded-Fold-Tab', 'is\r\n\tallowed')
257        self.assertIn(b'Embedded-Fold-Tab: is\r\n\tallowed', conn._buffer)
258        conn.putheader('Key Space', 'value')
259        self.assertIn(b'Key Space: value', conn._buffer)
260        conn.putheader('KeySpace ', 'value')
261        self.assertIn(b'KeySpace : value', conn._buffer)
262        conn.putheader(b'Nonbreak\xa0Space', 'value')
263        self.assertIn(b'Nonbreak\xa0Space: value', conn._buffer)
264        conn.putheader(b'\xa0NonbreakSpace', 'value')
265        self.assertIn(b'\xa0NonbreakSpace: value', conn._buffer)
266
267    def test_ipv6host_header(self):
268        # Default host header on IPv6 transaction should be wrapped by [] if
269        # it is an IPv6 address
270        expected = b'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \
271                   b'Accept-Encoding: identity\r\n\r\n'
272        conn = client.HTTPConnection('[2001::]:81')
273        sock = FakeSocket('')
274        conn.sock = sock
275        conn.request('GET', '/foo')
276        self.assertTrue(sock.data.startswith(expected))
277
278        expected = b'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \
279                   b'Accept-Encoding: identity\r\n\r\n'
280        conn = client.HTTPConnection('[2001:102A::]')
281        sock = FakeSocket('')
282        conn.sock = sock
283        conn.request('GET', '/foo')
284        self.assertTrue(sock.data.startswith(expected))
285
286    def test_malformed_headers_coped_with(self):
287        # Issue 19996
288        body = "HTTP/1.1 200 OK\r\nFirst: val\r\n: nval\r\nSecond: val\r\n\r\n"
289        sock = FakeSocket(body)
290        resp = client.HTTPResponse(sock)
291        resp.begin()
292
293        self.assertEqual(resp.getheader('First'), 'val')
294        self.assertEqual(resp.getheader('Second'), 'val')
295
296    def test_parse_all_octets(self):
297        # Ensure no valid header field octet breaks the parser
298        body = (
299            b'HTTP/1.1 200 OK\r\n'
300            b"!#$%&'*+-.^_`|~: value\r\n"  # Special token characters
301            b'VCHAR: ' + bytes(range(0x21, 0x7E + 1)) + b'\r\n'
302            b'obs-text: ' + bytes(range(0x80, 0xFF + 1)) + b'\r\n'
303            b'obs-fold: text\r\n'
304            b' folded with space\r\n'
305            b'\tfolded with tab\r\n'
306            b'Content-Length: 0\r\n'
307            b'\r\n'
308        )
309        sock = FakeSocket(body)
310        resp = client.HTTPResponse(sock)
311        resp.begin()
312        self.assertEqual(resp.getheader('Content-Length'), '0')
313        self.assertEqual(resp.msg['Content-Length'], '0')
314        self.assertEqual(resp.getheader("!#$%&'*+-.^_`|~"), 'value')
315        self.assertEqual(resp.msg["!#$%&'*+-.^_`|~"], 'value')
316        vchar = ''.join(map(chr, range(0x21, 0x7E + 1)))
317        self.assertEqual(resp.getheader('VCHAR'), vchar)
318        self.assertEqual(resp.msg['VCHAR'], vchar)
319        self.assertIsNotNone(resp.getheader('obs-text'))
320        self.assertIn('obs-text', resp.msg)
321        for folded in (resp.getheader('obs-fold'), resp.msg['obs-fold']):
322            self.assertTrue(folded.startswith('text'))
323            self.assertIn(' folded with space', folded)
324            self.assertTrue(folded.endswith('folded with tab'))
325
326    def test_invalid_headers(self):
327        conn = client.HTTPConnection('example.com')
328        conn.sock = FakeSocket('')
329        conn.putrequest('GET', '/')
330
331        # http://tools.ietf.org/html/rfc7230#section-3.2.4, whitespace is no
332        # longer allowed in header names
333        cases = (
334            (b'Invalid\r\nName', b'ValidValue'),
335            (b'Invalid\rName', b'ValidValue'),
336            (b'Invalid\nName', b'ValidValue'),
337            (b'\r\nInvalidName', b'ValidValue'),
338            (b'\rInvalidName', b'ValidValue'),
339            (b'\nInvalidName', b'ValidValue'),
340            (b' InvalidName', b'ValidValue'),
341            (b'\tInvalidName', b'ValidValue'),
342            (b'Invalid:Name', b'ValidValue'),
343            (b':InvalidName', b'ValidValue'),
344            (b'ValidName', b'Invalid\r\nValue'),
345            (b'ValidName', b'Invalid\rValue'),
346            (b'ValidName', b'Invalid\nValue'),
347            (b'ValidName', b'InvalidValue\r\n'),
348            (b'ValidName', b'InvalidValue\r'),
349            (b'ValidName', b'InvalidValue\n'),
350        )
351        for name, value in cases:
352            with self.subTest((name, value)):
353                with self.assertRaisesRegex(ValueError, 'Invalid header'):
354                    conn.putheader(name, value)
355
356    def test_headers_debuglevel(self):
357        body = (
358            b'HTTP/1.1 200 OK\r\n'
359            b'First: val\r\n'
360            b'Second: val1\r\n'
361            b'Second: val2\r\n'
362        )
363        sock = FakeSocket(body)
364        resp = client.HTTPResponse(sock, debuglevel=1)
365        with support.captured_stdout() as output:
366            resp.begin()
367        lines = output.getvalue().splitlines()
368        self.assertEqual(lines[0], "reply: 'HTTP/1.1 200 OK\\r\\n'")
369        self.assertEqual(lines[1], "header: First: val")
370        self.assertEqual(lines[2], "header: Second: val1")
371        self.assertEqual(lines[3], "header: Second: val2")
372
373
374class HttpMethodTests(TestCase):
375    def test_invalid_method_names(self):
376        methods = (
377            'GET\r',
378            'POST\n',
379            'PUT\n\r',
380            'POST\nValue',
381            'POST\nHOST:abc',
382            'GET\nrHost:abc\n',
383            'POST\rRemainder:\r',
384            'GET\rHOST:\n',
385            '\nPUT'
386        )
387
388        for method in methods:
389            with self.assertRaisesRegex(
390                    ValueError, "method can't contain control characters"):
391                conn = client.HTTPConnection('example.com')
392                conn.sock = FakeSocket(None)
393                conn.request(method=method, url="/")
394
395
396class TransferEncodingTest(TestCase):
397    expected_body = b"It's just a flesh wound"
398
399    def test_endheaders_chunked(self):
400        conn = client.HTTPConnection('example.com')
401        conn.sock = FakeSocket(b'')
402        conn.putrequest('POST', '/')
403        conn.endheaders(self._make_body(), encode_chunked=True)
404
405        _, _, body = self._parse_request(conn.sock.data)
406        body = self._parse_chunked(body)
407        self.assertEqual(body, self.expected_body)
408
409    def test_explicit_headers(self):
410        # explicit chunked
411        conn = client.HTTPConnection('example.com')
412        conn.sock = FakeSocket(b'')
413        # this shouldn't actually be automatically chunk-encoded because the
414        # calling code has explicitly stated that it's taking care of it
415        conn.request(
416            'POST', '/', self._make_body(), {'Transfer-Encoding': 'chunked'})
417
418        _, headers, body = self._parse_request(conn.sock.data)
419        self.assertNotIn('content-length', [k.lower() for k in headers.keys()])
420        self.assertEqual(headers['Transfer-Encoding'], 'chunked')
421        self.assertEqual(body, self.expected_body)
422
423        # explicit chunked, string body
424        conn = client.HTTPConnection('example.com')
425        conn.sock = FakeSocket(b'')
426        conn.request(
427            'POST', '/', self.expected_body.decode('latin-1'),
428            {'Transfer-Encoding': 'chunked'})
429
430        _, headers, body = self._parse_request(conn.sock.data)
431        self.assertNotIn('content-length', [k.lower() for k in headers.keys()])
432        self.assertEqual(headers['Transfer-Encoding'], 'chunked')
433        self.assertEqual(body, self.expected_body)
434
435        # User-specified TE, but request() does the chunk encoding
436        conn = client.HTTPConnection('example.com')
437        conn.sock = FakeSocket(b'')
438        conn.request('POST', '/',
439            headers={'Transfer-Encoding': 'gzip, chunked'},
440            encode_chunked=True,
441            body=self._make_body())
442        _, headers, body = self._parse_request(conn.sock.data)
443        self.assertNotIn('content-length', [k.lower() for k in headers])
444        self.assertEqual(headers['Transfer-Encoding'], 'gzip, chunked')
445        self.assertEqual(self._parse_chunked(body), self.expected_body)
446
447    def test_request(self):
448        for empty_lines in (False, True,):
449            conn = client.HTTPConnection('example.com')
450            conn.sock = FakeSocket(b'')
451            conn.request(
452                'POST', '/', self._make_body(empty_lines=empty_lines))
453
454            _, headers, body = self._parse_request(conn.sock.data)
455            body = self._parse_chunked(body)
456            self.assertEqual(body, self.expected_body)
457            self.assertEqual(headers['Transfer-Encoding'], 'chunked')
458
459            # Content-Length and Transfer-Encoding SHOULD not be sent in the
460            # same request
461            self.assertNotIn('content-length', [k.lower() for k in headers])
462
463    def test_empty_body(self):
464        # Zero-length iterable should be treated like any other iterable
465        conn = client.HTTPConnection('example.com')
466        conn.sock = FakeSocket(b'')
467        conn.request('POST', '/', ())
468        _, headers, body = self._parse_request(conn.sock.data)
469        self.assertEqual(headers['Transfer-Encoding'], 'chunked')
470        self.assertNotIn('content-length', [k.lower() for k in headers])
471        self.assertEqual(body, b"0\r\n\r\n")
472
473    def _make_body(self, empty_lines=False):
474        lines = self.expected_body.split(b' ')
475        for idx, line in enumerate(lines):
476            # for testing handling empty lines
477            if empty_lines and idx % 2:
478                yield b''
479            if idx < len(lines) - 1:
480                yield line + b' '
481            else:
482                yield line
483
484    def _parse_request(self, data):
485        lines = data.split(b'\r\n')
486        request = lines[0]
487        headers = {}
488        n = 1
489        while n < len(lines) and len(lines[n]) > 0:
490            key, val = lines[n].split(b':')
491            key = key.decode('latin-1').strip()
492            headers[key] = val.decode('latin-1').strip()
493            n += 1
494
495        return request, headers, b'\r\n'.join(lines[n + 1:])
496
497    def _parse_chunked(self, data):
498        body = []
499        trailers = {}
500        n = 0
501        lines = data.split(b'\r\n')
502        # parse body
503        while True:
504            size, chunk = lines[n:n+2]
505            size = int(size, 16)
506
507            if size == 0:
508                n += 1
509                break
510
511            self.assertEqual(size, len(chunk))
512            body.append(chunk)
513
514            n += 2
515            # we /should/ hit the end chunk, but check against the size of
516            # lines so we're not stuck in an infinite loop should we get
517            # malformed data
518            if n > len(lines):
519                break
520
521        return b''.join(body)
522
523
524class BasicTest(TestCase):
525    def test_dir_with_added_behavior_on_status(self):
526        # see issue40084
527        self.assertTrue({'description', 'name', 'phrase', 'value'} <= set(dir(HTTPStatus(404))))
528
529    def test_simple_httpstatus(self):
530        class CheckedHTTPStatus(enum.IntEnum):
531            """HTTP status codes and reason phrases
532
533            Status codes from the following RFCs are all observed:
534
535                * RFC 7231: Hypertext Transfer Protocol (HTTP/1.1), obsoletes 2616
536                * RFC 6585: Additional HTTP Status Codes
537                * RFC 3229: Delta encoding in HTTP
538                * RFC 4918: HTTP Extensions for WebDAV, obsoletes 2518
539                * RFC 5842: Binding Extensions to WebDAV
540                * RFC 7238: Permanent Redirect
541                * RFC 2295: Transparent Content Negotiation in HTTP
542                * RFC 2774: An HTTP Extension Framework
543                * RFC 7725: An HTTP Status Code to Report Legal Obstacles
544                * RFC 7540: Hypertext Transfer Protocol Version 2 (HTTP/2)
545                * RFC 2324: Hyper Text Coffee Pot Control Protocol (HTCPCP/1.0)
546                * RFC 8297: An HTTP Status Code for Indicating Hints
547                * RFC 8470: Using Early Data in HTTP
548            """
549            def __new__(cls, value, phrase, description=''):
550                obj = int.__new__(cls, value)
551                obj._value_ = value
552
553                obj.phrase = phrase
554                obj.description = description
555                return obj
556            # informational
557            CONTINUE = 100, 'Continue', 'Request received, please continue'
558            SWITCHING_PROTOCOLS = (101, 'Switching Protocols',
559                    'Switching to new protocol; obey Upgrade header')
560            PROCESSING = 102, 'Processing'
561            EARLY_HINTS = 103, 'Early Hints'
562            # success
563            OK = 200, 'OK', 'Request fulfilled, document follows'
564            CREATED = 201, 'Created', 'Document created, URL follows'
565            ACCEPTED = (202, 'Accepted',
566                'Request accepted, processing continues off-line')
567            NON_AUTHORITATIVE_INFORMATION = (203,
568                'Non-Authoritative Information', 'Request fulfilled from cache')
569            NO_CONTENT = 204, 'No Content', 'Request fulfilled, nothing follows'
570            RESET_CONTENT = 205, 'Reset Content', 'Clear input form for further input'
571            PARTIAL_CONTENT = 206, 'Partial Content', 'Partial content follows'
572            MULTI_STATUS = 207, 'Multi-Status'
573            ALREADY_REPORTED = 208, 'Already Reported'
574            IM_USED = 226, 'IM Used'
575            # redirection
576            MULTIPLE_CHOICES = (300, 'Multiple Choices',
577                'Object has several resources -- see URI list')
578            MOVED_PERMANENTLY = (301, 'Moved Permanently',
579                'Object moved permanently -- see URI list')
580            FOUND = 302, 'Found', 'Object moved temporarily -- see URI list'
581            SEE_OTHER = 303, 'See Other', 'Object moved -- see Method and URL list'
582            NOT_MODIFIED = (304, 'Not Modified',
583                'Document has not changed since given time')
584            USE_PROXY = (305, 'Use Proxy',
585                'You must use proxy specified in Location to access this resource')
586            TEMPORARY_REDIRECT = (307, 'Temporary Redirect',
587                'Object moved temporarily -- see URI list')
588            PERMANENT_REDIRECT = (308, 'Permanent Redirect',
589                'Object moved permanently -- see URI list')
590            # client error
591            BAD_REQUEST = (400, 'Bad Request',
592                'Bad request syntax or unsupported method')
593            UNAUTHORIZED = (401, 'Unauthorized',
594                'No permission -- see authorization schemes')
595            PAYMENT_REQUIRED = (402, 'Payment Required',
596                'No payment -- see charging schemes')
597            FORBIDDEN = (403, 'Forbidden',
598                'Request forbidden -- authorization will not help')
599            NOT_FOUND = (404, 'Not Found',
600                'Nothing matches the given URI')
601            METHOD_NOT_ALLOWED = (405, 'Method Not Allowed',
602                'Specified method is invalid for this resource')
603            NOT_ACCEPTABLE = (406, 'Not Acceptable',
604                'URI not available in preferred format')
605            PROXY_AUTHENTICATION_REQUIRED = (407,
606                'Proxy Authentication Required',
607                'You must authenticate with this proxy before proceeding')
608            REQUEST_TIMEOUT = (408, 'Request Timeout',
609                'Request timed out; try again later')
610            CONFLICT = 409, 'Conflict', 'Request conflict'
611            GONE = (410, 'Gone',
612                'URI no longer exists and has been permanently removed')
613            LENGTH_REQUIRED = (411, 'Length Required',
614                'Client must specify Content-Length')
615            PRECONDITION_FAILED = (412, 'Precondition Failed',
616                'Precondition in headers is false')
617            REQUEST_ENTITY_TOO_LARGE = (413, 'Request Entity Too Large',
618                'Entity is too large')
619            REQUEST_URI_TOO_LONG = (414, 'Request-URI Too Long',
620                'URI is too long')
621            UNSUPPORTED_MEDIA_TYPE = (415, 'Unsupported Media Type',
622                'Entity body in unsupported format')
623            REQUESTED_RANGE_NOT_SATISFIABLE = (416,
624                'Requested Range Not Satisfiable',
625                'Cannot satisfy request range')
626            EXPECTATION_FAILED = (417, 'Expectation Failed',
627                'Expect condition could not be satisfied')
628            IM_A_TEAPOT = (418, 'I\'m a Teapot',
629                'Server refuses to brew coffee because it is a teapot.')
630            MISDIRECTED_REQUEST = (421, 'Misdirected Request',
631                'Server is not able to produce a response')
632            UNPROCESSABLE_ENTITY = 422, 'Unprocessable Entity'
633            LOCKED = 423, 'Locked'
634            FAILED_DEPENDENCY = 424, 'Failed Dependency'
635            TOO_EARLY = 425, 'Too Early'
636            UPGRADE_REQUIRED = 426, 'Upgrade Required'
637            PRECONDITION_REQUIRED = (428, 'Precondition Required',
638                'The origin server requires the request to be conditional')
639            TOO_MANY_REQUESTS = (429, 'Too Many Requests',
640                'The user has sent too many requests in '
641                'a given amount of time ("rate limiting")')
642            REQUEST_HEADER_FIELDS_TOO_LARGE = (431,
643                'Request Header Fields Too Large',
644                'The server is unwilling to process the request because its header '
645                'fields are too large')
646            UNAVAILABLE_FOR_LEGAL_REASONS = (451,
647                'Unavailable For Legal Reasons',
648                'The server is denying access to the '
649                'resource as a consequence of a legal demand')
650            # server errors
651            INTERNAL_SERVER_ERROR = (500, 'Internal Server Error',
652                'Server got itself in trouble')
653            NOT_IMPLEMENTED = (501, 'Not Implemented',
654                'Server does not support this operation')
655            BAD_GATEWAY = (502, 'Bad Gateway',
656                'Invalid responses from another server/proxy')
657            SERVICE_UNAVAILABLE = (503, 'Service Unavailable',
658                'The server cannot process the request due to a high load')
659            GATEWAY_TIMEOUT = (504, 'Gateway Timeout',
660                'The gateway server did not receive a timely response')
661            HTTP_VERSION_NOT_SUPPORTED = (505, 'HTTP Version Not Supported',
662                'Cannot fulfill request')
663            VARIANT_ALSO_NEGOTIATES = 506, 'Variant Also Negotiates'
664            INSUFFICIENT_STORAGE = 507, 'Insufficient Storage'
665            LOOP_DETECTED = 508, 'Loop Detected'
666            NOT_EXTENDED = 510, 'Not Extended'
667            NETWORK_AUTHENTICATION_REQUIRED = (511,
668                'Network Authentication Required',
669                'The client needs to authenticate to gain network access')
670        enum._test_simple_enum(CheckedHTTPStatus, HTTPStatus)
671
672
673    def test_status_lines(self):
674        # Test HTTP status lines
675
676        body = "HTTP/1.1 200 Ok\r\n\r\nText"
677        sock = FakeSocket(body)
678        resp = client.HTTPResponse(sock)
679        resp.begin()
680        self.assertEqual(resp.read(0), b'')  # Issue #20007
681        self.assertFalse(resp.isclosed())
682        self.assertFalse(resp.closed)
683        self.assertEqual(resp.read(), b"Text")
684        self.assertTrue(resp.isclosed())
685        self.assertFalse(resp.closed)
686        resp.close()
687        self.assertTrue(resp.closed)
688
689        body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText"
690        sock = FakeSocket(body)
691        resp = client.HTTPResponse(sock)
692        self.assertRaises(client.BadStatusLine, resp.begin)
693
694    def test_bad_status_repr(self):
695        exc = client.BadStatusLine('')
696        self.assertEqual(repr(exc), '''BadStatusLine("''")''')
697
698    def test_partial_reads(self):
699        # if we have Content-Length, HTTPResponse knows when to close itself,
700        # the same behaviour as when we read the whole thing with read()
701        body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
702        sock = FakeSocket(body)
703        resp = client.HTTPResponse(sock)
704        resp.begin()
705        self.assertEqual(resp.read(2), b'Te')
706        self.assertFalse(resp.isclosed())
707        self.assertEqual(resp.read(2), b'xt')
708        self.assertTrue(resp.isclosed())
709        self.assertFalse(resp.closed)
710        resp.close()
711        self.assertTrue(resp.closed)
712
713    def test_mixed_reads(self):
714        # readline() should update the remaining length, so that read() knows
715        # how much data is left and does not raise IncompleteRead
716        body = "HTTP/1.1 200 Ok\r\nContent-Length: 13\r\n\r\nText\r\nAnother"
717        sock = FakeSocket(body)
718        resp = client.HTTPResponse(sock)
719        resp.begin()
720        self.assertEqual(resp.readline(), b'Text\r\n')
721        self.assertFalse(resp.isclosed())
722        self.assertEqual(resp.read(), b'Another')
723        self.assertTrue(resp.isclosed())
724        self.assertFalse(resp.closed)
725        resp.close()
726        self.assertTrue(resp.closed)
727
728    def test_partial_readintos(self):
729        # if we have Content-Length, HTTPResponse knows when to close itself,
730        # the same behaviour as when we read the whole thing with read()
731        body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
732        sock = FakeSocket(body)
733        resp = client.HTTPResponse(sock)
734        resp.begin()
735        b = bytearray(2)
736        n = resp.readinto(b)
737        self.assertEqual(n, 2)
738        self.assertEqual(bytes(b), b'Te')
739        self.assertFalse(resp.isclosed())
740        n = resp.readinto(b)
741        self.assertEqual(n, 2)
742        self.assertEqual(bytes(b), b'xt')
743        self.assertTrue(resp.isclosed())
744        self.assertFalse(resp.closed)
745        resp.close()
746        self.assertTrue(resp.closed)
747
748    def test_partial_reads_past_end(self):
749        # if we have Content-Length, clip reads to the end
750        body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
751        sock = FakeSocket(body)
752        resp = client.HTTPResponse(sock)
753        resp.begin()
754        self.assertEqual(resp.read(10), b'Text')
755        self.assertTrue(resp.isclosed())
756        self.assertFalse(resp.closed)
757        resp.close()
758        self.assertTrue(resp.closed)
759
760    def test_partial_readintos_past_end(self):
761        # if we have Content-Length, clip readintos to the end
762        body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
763        sock = FakeSocket(body)
764        resp = client.HTTPResponse(sock)
765        resp.begin()
766        b = bytearray(10)
767        n = resp.readinto(b)
768        self.assertEqual(n, 4)
769        self.assertEqual(bytes(b)[:4], b'Text')
770        self.assertTrue(resp.isclosed())
771        self.assertFalse(resp.closed)
772        resp.close()
773        self.assertTrue(resp.closed)
774
775    def test_partial_reads_no_content_length(self):
776        # when no length is present, the socket should be gracefully closed when
777        # all data was read
778        body = "HTTP/1.1 200 Ok\r\n\r\nText"
779        sock = FakeSocket(body)
780        resp = client.HTTPResponse(sock)
781        resp.begin()
782        self.assertEqual(resp.read(2), b'Te')
783        self.assertFalse(resp.isclosed())
784        self.assertEqual(resp.read(2), b'xt')
785        self.assertEqual(resp.read(1), b'')
786        self.assertTrue(resp.isclosed())
787        self.assertFalse(resp.closed)
788        resp.close()
789        self.assertTrue(resp.closed)
790
791    def test_partial_readintos_no_content_length(self):
792        # when no length is present, the socket should be gracefully closed when
793        # all data was read
794        body = "HTTP/1.1 200 Ok\r\n\r\nText"
795        sock = FakeSocket(body)
796        resp = client.HTTPResponse(sock)
797        resp.begin()
798        b = bytearray(2)
799        n = resp.readinto(b)
800        self.assertEqual(n, 2)
801        self.assertEqual(bytes(b), b'Te')
802        self.assertFalse(resp.isclosed())
803        n = resp.readinto(b)
804        self.assertEqual(n, 2)
805        self.assertEqual(bytes(b), b'xt')
806        n = resp.readinto(b)
807        self.assertEqual(n, 0)
808        self.assertTrue(resp.isclosed())
809
810    def test_partial_reads_incomplete_body(self):
811        # if the server shuts down the connection before the whole
812        # content-length is delivered, the socket is gracefully closed
813        body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText"
814        sock = FakeSocket(body)
815        resp = client.HTTPResponse(sock)
816        resp.begin()
817        self.assertEqual(resp.read(2), b'Te')
818        self.assertFalse(resp.isclosed())
819        self.assertEqual(resp.read(2), b'xt')
820        self.assertEqual(resp.read(1), b'')
821        self.assertTrue(resp.isclosed())
822
823    def test_partial_readintos_incomplete_body(self):
824        # if the server shuts down the connection before the whole
825        # content-length is delivered, the socket is gracefully closed
826        body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText"
827        sock = FakeSocket(body)
828        resp = client.HTTPResponse(sock)
829        resp.begin()
830        b = bytearray(2)
831        n = resp.readinto(b)
832        self.assertEqual(n, 2)
833        self.assertEqual(bytes(b), b'Te')
834        self.assertFalse(resp.isclosed())
835        n = resp.readinto(b)
836        self.assertEqual(n, 2)
837        self.assertEqual(bytes(b), b'xt')
838        n = resp.readinto(b)
839        self.assertEqual(n, 0)
840        self.assertTrue(resp.isclosed())
841        self.assertFalse(resp.closed)
842        resp.close()
843        self.assertTrue(resp.closed)
844
845    def test_host_port(self):
846        # Check invalid host_port
847
848        for hp in ("www.python.org:abc", "user:[email protected]"):
849            self.assertRaises(client.InvalidURL, client.HTTPConnection, hp)
850
851        for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
852                          "fe80::207:e9ff:fe9b", 8000),
853                         ("www.python.org:80", "www.python.org", 80),
854                         ("www.python.org:", "www.python.org", 80),
855                         ("www.python.org", "www.python.org", 80),
856                         ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80),
857                         ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b", 80)):
858            c = client.HTTPConnection(hp)
859            self.assertEqual(h, c.host)
860            self.assertEqual(p, c.port)
861
862    def test_response_headers(self):
863        # test response with multiple message headers with the same field name.
864        text = ('HTTP/1.1 200 OK\r\n'
865                'Set-Cookie: Customer="WILE_E_COYOTE"; '
866                'Version="1"; Path="/acme"\r\n'
867                'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";'
868                ' Path="/acme"\r\n'
869                '\r\n'
870                'No body\r\n')
871        hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"'
872               ', '
873               'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"')
874        s = FakeSocket(text)
875        r = client.HTTPResponse(s)
876        r.begin()
877        cookies = r.getheader("Set-Cookie")
878        self.assertEqual(cookies, hdr)
879
880    def test_read_head(self):
881        # Test that the library doesn't attempt to read any data
882        # from a HEAD request.  (Tickles SF bug #622042.)
883        sock = FakeSocket(
884            'HTTP/1.1 200 OK\r\n'
885            'Content-Length: 14432\r\n'
886            '\r\n',
887            NoEOFBytesIO)
888        resp = client.HTTPResponse(sock, method="HEAD")
889        resp.begin()
890        if resp.read():
891            self.fail("Did not expect response from HEAD request")
892
893    def test_readinto_head(self):
894        # Test that the library doesn't attempt to read any data
895        # from a HEAD request.  (Tickles SF bug #622042.)
896        sock = FakeSocket(
897            'HTTP/1.1 200 OK\r\n'
898            'Content-Length: 14432\r\n'
899            '\r\n',
900            NoEOFBytesIO)
901        resp = client.HTTPResponse(sock, method="HEAD")
902        resp.begin()
903        b = bytearray(5)
904        if resp.readinto(b) != 0:
905            self.fail("Did not expect response from HEAD request")
906        self.assertEqual(bytes(b), b'\x00'*5)
907
908    def test_too_many_headers(self):
909        headers = '\r\n'.join('Header%d: foo' % i
910                              for i in range(client._MAXHEADERS + 1)) + '\r\n'
911        text = ('HTTP/1.1 200 OK\r\n' + headers)
912        s = FakeSocket(text)
913        r = client.HTTPResponse(s)
914        self.assertRaisesRegex(client.HTTPException,
915                               r"got more than \d+ headers", r.begin)
916
917    def test_send_file(self):
918        expected = (b'GET /foo HTTP/1.1\r\nHost: example.com\r\n'
919                    b'Accept-Encoding: identity\r\n'
920                    b'Transfer-Encoding: chunked\r\n'
921                    b'\r\n')
922
923        with open(__file__, 'rb') as body:
924            conn = client.HTTPConnection('example.com')
925            sock = FakeSocket(body)
926            conn.sock = sock
927            conn.request('GET', '/foo', body)
928            self.assertTrue(sock.data.startswith(expected), '%r != %r' %
929                    (sock.data[:len(expected)], expected))
930
931    def test_send(self):
932        expected = b'this is a test this is only a test'
933        conn = client.HTTPConnection('example.com')
934        sock = FakeSocket(None)
935        conn.sock = sock
936        conn.send(expected)
937        self.assertEqual(expected, sock.data)
938        sock.data = b''
939        conn.send(array.array('b', expected))
940        self.assertEqual(expected, sock.data)
941        sock.data = b''
942        conn.send(io.BytesIO(expected))
943        self.assertEqual(expected, sock.data)
944
945    def test_send_updating_file(self):
946        def data():
947            yield 'data'
948            yield None
949            yield 'data_two'
950
951        class UpdatingFile(io.TextIOBase):
952            mode = 'r'
953            d = data()
954            def read(self, blocksize=-1):
955                return next(self.d)
956
957        expected = b'data'
958
959        conn = client.HTTPConnection('example.com')
960        sock = FakeSocket("")
961        conn.sock = sock
962        conn.send(UpdatingFile())
963        self.assertEqual(sock.data, expected)
964
965
966    def test_send_iter(self):
967        expected = b'GET /foo HTTP/1.1\r\nHost: example.com\r\n' \
968                   b'Accept-Encoding: identity\r\nContent-Length: 11\r\n' \
969                   b'\r\nonetwothree'
970
971        def body():
972            yield b"one"
973            yield b"two"
974            yield b"three"
975
976        conn = client.HTTPConnection('example.com')
977        sock = FakeSocket("")
978        conn.sock = sock
979        conn.request('GET', '/foo', body(), {'Content-Length': '11'})
980        self.assertEqual(sock.data, expected)
981
982    def test_blocksize_request(self):
983        """Check that request() respects the configured block size."""
984        blocksize = 8  # For easy debugging.
985        conn = client.HTTPConnection('example.com', blocksize=blocksize)
986        sock = FakeSocket(None)
987        conn.sock = sock
988        expected = b"a" * blocksize + b"b"
989        conn.request("PUT", "/", io.BytesIO(expected), {"Content-Length": "9"})
990        self.assertEqual(sock.sendall_calls, 3)
991        body = sock.data.split(b"\r\n\r\n", 1)[1]
992        self.assertEqual(body, expected)
993
994    def test_blocksize_send(self):
995        """Check that send() respects the configured block size."""
996        blocksize = 8  # For easy debugging.
997        conn = client.HTTPConnection('example.com', blocksize=blocksize)
998        sock = FakeSocket(None)
999        conn.sock = sock
1000        expected = b"a" * blocksize + b"b"
1001        conn.send(io.BytesIO(expected))
1002        self.assertEqual(sock.sendall_calls, 2)
1003        self.assertEqual(sock.data, expected)
1004
1005    def test_send_type_error(self):
1006        # See: Issue #12676
1007        conn = client.HTTPConnection('example.com')
1008        conn.sock = FakeSocket('')
1009        with self.assertRaises(TypeError):
1010            conn.request('POST', 'test', conn)
1011
1012    def test_chunked(self):
1013        expected = chunked_expected
1014        sock = FakeSocket(chunked_start + last_chunk + chunked_end)
1015        resp = client.HTTPResponse(sock, method="GET")
1016        resp.begin()
1017        self.assertEqual(resp.read(), expected)
1018        resp.close()
1019
1020        # Various read sizes
1021        for n in range(1, 12):
1022            sock = FakeSocket(chunked_start + last_chunk + chunked_end)
1023            resp = client.HTTPResponse(sock, method="GET")
1024            resp.begin()
1025            self.assertEqual(resp.read(n) + resp.read(n) + resp.read(), expected)
1026            resp.close()
1027
1028        for x in ('', 'foo\r\n'):
1029            sock = FakeSocket(chunked_start + x)
1030            resp = client.HTTPResponse(sock, method="GET")
1031            resp.begin()
1032            try:
1033                resp.read()
1034            except client.IncompleteRead as i:
1035                self.assertEqual(i.partial, expected)
1036                expected_message = 'IncompleteRead(%d bytes read)' % len(expected)
1037                self.assertEqual(repr(i), expected_message)
1038                self.assertEqual(str(i), expected_message)
1039            else:
1040                self.fail('IncompleteRead expected')
1041            finally:
1042                resp.close()
1043
1044    def test_readinto_chunked(self):
1045
1046        expected = chunked_expected
1047        nexpected = len(expected)
1048        b = bytearray(128)
1049
1050        sock = FakeSocket(chunked_start + last_chunk + chunked_end)
1051        resp = client.HTTPResponse(sock, method="GET")
1052        resp.begin()
1053        n = resp.readinto(b)
1054        self.assertEqual(b[:nexpected], expected)
1055        self.assertEqual(n, nexpected)
1056        resp.close()
1057
1058        # Various read sizes
1059        for n in range(1, 12):
1060            sock = FakeSocket(chunked_start + last_chunk + chunked_end)
1061            resp = client.HTTPResponse(sock, method="GET")
1062            resp.begin()
1063            m = memoryview(b)
1064            i = resp.readinto(m[0:n])
1065            i += resp.readinto(m[i:n + i])
1066            i += resp.readinto(m[i:])
1067            self.assertEqual(b[:nexpected], expected)
1068            self.assertEqual(i, nexpected)
1069            resp.close()
1070
1071        for x in ('', 'foo\r\n'):
1072            sock = FakeSocket(chunked_start + x)
1073            resp = client.HTTPResponse(sock, method="GET")
1074            resp.begin()
1075            try:
1076                n = resp.readinto(b)
1077            except client.IncompleteRead as i:
1078                self.assertEqual(i.partial, expected)
1079                expected_message = 'IncompleteRead(%d bytes read)' % len(expected)
1080                self.assertEqual(repr(i), expected_message)
1081                self.assertEqual(str(i), expected_message)
1082            else:
1083                self.fail('IncompleteRead expected')
1084            finally:
1085                resp.close()
1086
1087    def test_chunked_head(self):
1088        chunked_start = (
1089            'HTTP/1.1 200 OK\r\n'
1090            'Transfer-Encoding: chunked\r\n\r\n'
1091            'a\r\n'
1092            'hello world\r\n'
1093            '1\r\n'
1094            'd\r\n'
1095        )
1096        sock = FakeSocket(chunked_start + last_chunk + chunked_end)
1097        resp = client.HTTPResponse(sock, method="HEAD")
1098        resp.begin()
1099        self.assertEqual(resp.read(), b'')
1100        self.assertEqual(resp.status, 200)
1101        self.assertEqual(resp.reason, 'OK')
1102        self.assertTrue(resp.isclosed())
1103        self.assertFalse(resp.closed)
1104        resp.close()
1105        self.assertTrue(resp.closed)
1106
1107    def test_readinto_chunked_head(self):
1108        chunked_start = (
1109            'HTTP/1.1 200 OK\r\n'
1110            'Transfer-Encoding: chunked\r\n\r\n'
1111            'a\r\n'
1112            'hello world\r\n'
1113            '1\r\n'
1114            'd\r\n'
1115        )
1116        sock = FakeSocket(chunked_start + last_chunk + chunked_end)
1117        resp = client.HTTPResponse(sock, method="HEAD")
1118        resp.begin()
1119        b = bytearray(5)
1120        n = resp.readinto(b)
1121        self.assertEqual(n, 0)
1122        self.assertEqual(bytes(b), b'\x00'*5)
1123        self.assertEqual(resp.status, 200)
1124        self.assertEqual(resp.reason, 'OK')
1125        self.assertTrue(resp.isclosed())
1126        self.assertFalse(resp.closed)
1127        resp.close()
1128        self.assertTrue(resp.closed)
1129
1130    def test_negative_content_length(self):
1131        sock = FakeSocket(
1132            'HTTP/1.1 200 OK\r\nContent-Length: -1\r\n\r\nHello\r\n')
1133        resp = client.HTTPResponse(sock, method="GET")
1134        resp.begin()
1135        self.assertEqual(resp.read(), b'Hello\r\n')
1136        self.assertTrue(resp.isclosed())
1137
1138    def test_incomplete_read(self):
1139        sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello\r\n')
1140        resp = client.HTTPResponse(sock, method="GET")
1141        resp.begin()
1142        try:
1143            resp.read()
1144        except client.IncompleteRead as i:
1145            self.assertEqual(i.partial, b'Hello\r\n')
1146            self.assertEqual(repr(i),
1147                             "IncompleteRead(7 bytes read, 3 more expected)")
1148            self.assertEqual(str(i),
1149                             "IncompleteRead(7 bytes read, 3 more expected)")
1150            self.assertTrue(resp.isclosed())
1151        else:
1152            self.fail('IncompleteRead expected')
1153
1154    def test_epipe(self):
1155        sock = EPipeSocket(
1156            "HTTP/1.0 401 Authorization Required\r\n"
1157            "Content-type: text/html\r\n"
1158            "WWW-Authenticate: Basic realm=\"example\"\r\n",
1159            b"Content-Length")
1160        conn = client.HTTPConnection("example.com")
1161        conn.sock = sock
1162        self.assertRaises(OSError,
1163                          lambda: conn.request("PUT", "/url", "body"))
1164        resp = conn.getresponse()
1165        self.assertEqual(401, resp.status)
1166        self.assertEqual("Basic realm=\"example\"",
1167                         resp.getheader("www-authenticate"))
1168
1169    # Test lines overflowing the max line size (_MAXLINE in http.client)
1170
1171    def test_overflowing_status_line(self):
1172        body = "HTTP/1.1 200 Ok" + "k" * 65536 + "\r\n"
1173        resp = client.HTTPResponse(FakeSocket(body))
1174        self.assertRaises((client.LineTooLong, client.BadStatusLine), resp.begin)
1175
1176    def test_overflowing_header_line(self):
1177        body = (
1178            'HTTP/1.1 200 OK\r\n'
1179            'X-Foo: bar' + 'r' * 65536 + '\r\n\r\n'
1180        )
1181        resp = client.HTTPResponse(FakeSocket(body))
1182        self.assertRaises(client.LineTooLong, resp.begin)
1183
1184    def test_overflowing_header_limit_after_100(self):
1185        body = (
1186            'HTTP/1.1 100 OK\r\n'
1187            'r\n' * 32768
1188        )
1189        resp = client.HTTPResponse(FakeSocket(body))
1190        with self.assertRaises(client.HTTPException) as cm:
1191            resp.begin()
1192        # We must assert more because other reasonable errors that we
1193        # do not want can also be HTTPException derived.
1194        self.assertIn('got more than ', str(cm.exception))
1195        self.assertIn('headers', str(cm.exception))
1196
1197    def test_overflowing_chunked_line(self):
1198        body = (
1199            'HTTP/1.1 200 OK\r\n'
1200            'Transfer-Encoding: chunked\r\n\r\n'
1201            + '0' * 65536 + 'a\r\n'
1202            'hello world\r\n'
1203            '0\r\n'
1204            '\r\n'
1205        )
1206        resp = client.HTTPResponse(FakeSocket(body))
1207        resp.begin()
1208        self.assertRaises(client.LineTooLong, resp.read)
1209
1210    def test_early_eof(self):
1211        # Test httpresponse with no \r\n termination,
1212        body = "HTTP/1.1 200 Ok"
1213        sock = FakeSocket(body)
1214        resp = client.HTTPResponse(sock)
1215        resp.begin()
1216        self.assertEqual(resp.read(), b'')
1217        self.assertTrue(resp.isclosed())
1218        self.assertFalse(resp.closed)
1219        resp.close()
1220        self.assertTrue(resp.closed)
1221
1222    def test_error_leak(self):
1223        # Test that the socket is not leaked if getresponse() fails
1224        conn = client.HTTPConnection('example.com')
1225        response = None
1226        class Response(client.HTTPResponse):
1227            def __init__(self, *pos, **kw):
1228                nonlocal response
1229                response = self  # Avoid garbage collector closing the socket
1230                client.HTTPResponse.__init__(self, *pos, **kw)
1231        conn.response_class = Response
1232        conn.sock = FakeSocket('Invalid status line')
1233        conn.request('GET', '/')
1234        self.assertRaises(client.BadStatusLine, conn.getresponse)
1235        self.assertTrue(response.closed)
1236        self.assertTrue(conn.sock.file_closed)
1237
1238    def test_chunked_extension(self):
1239        extra = '3;foo=bar\r\n' + 'abc\r\n'
1240        expected = chunked_expected + b'abc'
1241
1242        sock = FakeSocket(chunked_start + extra + last_chunk_extended + chunked_end)
1243        resp = client.HTTPResponse(sock, method="GET")
1244        resp.begin()
1245        self.assertEqual(resp.read(), expected)
1246        resp.close()
1247
1248    def test_chunked_missing_end(self):
1249        """some servers may serve up a short chunked encoding stream"""
1250        expected = chunked_expected
1251        sock = FakeSocket(chunked_start + last_chunk)  #no terminating crlf
1252        resp = client.HTTPResponse(sock, method="GET")
1253        resp.begin()
1254        self.assertEqual(resp.read(), expected)
1255        resp.close()
1256
1257    def test_chunked_trailers(self):
1258        """See that trailers are read and ignored"""
1259        expected = chunked_expected
1260        sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end)
1261        resp = client.HTTPResponse(sock, method="GET")
1262        resp.begin()
1263        self.assertEqual(resp.read(), expected)
1264        # we should have reached the end of the file
1265        self.assertEqual(sock.file.read(), b"") #we read to the end
1266        resp.close()
1267
1268    def test_chunked_sync(self):
1269        """Check that we don't read past the end of the chunked-encoding stream"""
1270        expected = chunked_expected
1271        extradata = "extradata"
1272        sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end + extradata)
1273        resp = client.HTTPResponse(sock, method="GET")
1274        resp.begin()
1275        self.assertEqual(resp.read(), expected)
1276        # the file should now have our extradata ready to be read
1277        self.assertEqual(sock.file.read(), extradata.encode("ascii")) #we read to the end
1278        resp.close()
1279
1280    def test_content_length_sync(self):
1281        """Check that we don't read past the end of the Content-Length stream"""
1282        extradata = b"extradata"
1283        expected = b"Hello123\r\n"
1284        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
1285        resp = client.HTTPResponse(sock, method="GET")
1286        resp.begin()
1287        self.assertEqual(resp.read(), expected)
1288        # the file should now have our extradata ready to be read
1289        self.assertEqual(sock.file.read(), extradata) #we read to the end
1290        resp.close()
1291
1292    def test_readlines_content_length(self):
1293        extradata = b"extradata"
1294        expected = b"Hello123\r\n"
1295        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
1296        resp = client.HTTPResponse(sock, method="GET")
1297        resp.begin()
1298        self.assertEqual(resp.readlines(2000), [expected])
1299        # the file should now have our extradata ready to be read
1300        self.assertEqual(sock.file.read(), extradata) #we read to the end
1301        resp.close()
1302
1303    def test_read1_content_length(self):
1304        extradata = b"extradata"
1305        expected = b"Hello123\r\n"
1306        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
1307        resp = client.HTTPResponse(sock, method="GET")
1308        resp.begin()
1309        self.assertEqual(resp.read1(2000), expected)
1310        # the file should now have our extradata ready to be read
1311        self.assertEqual(sock.file.read(), extradata) #we read to the end
1312        resp.close()
1313
1314    def test_readline_bound_content_length(self):
1315        extradata = b"extradata"
1316        expected = b"Hello123\r\n"
1317        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
1318        resp = client.HTTPResponse(sock, method="GET")
1319        resp.begin()
1320        self.assertEqual(resp.readline(10), expected)
1321        self.assertEqual(resp.readline(10), b"")
1322        # the file should now have our extradata ready to be read
1323        self.assertEqual(sock.file.read(), extradata) #we read to the end
1324        resp.close()
1325
1326    def test_read1_bound_content_length(self):
1327        extradata = b"extradata"
1328        expected = b"Hello123\r\n"
1329        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 30\r\n\r\n' + expected*3 + extradata)
1330        resp = client.HTTPResponse(sock, method="GET")
1331        resp.begin()
1332        self.assertEqual(resp.read1(20), expected*2)
1333        self.assertEqual(resp.read(), expected)
1334        # the file should now have our extradata ready to be read
1335        self.assertEqual(sock.file.read(), extradata) #we read to the end
1336        resp.close()
1337
1338    def test_response_fileno(self):
1339        # Make sure fd returned by fileno is valid.
1340        serv = socket.create_server((HOST, 0))
1341        self.addCleanup(serv.close)
1342
1343        result = None
1344        def run_server():
1345            [conn, address] = serv.accept()
1346            with conn, conn.makefile("rb") as reader:
1347                # Read the request header until a blank line
1348                while True:
1349                    line = reader.readline()
1350                    if not line.rstrip(b"\r\n"):
1351                        break
1352                conn.sendall(b"HTTP/1.1 200 Connection established\r\n\r\n")
1353                nonlocal result
1354                result = reader.read()
1355
1356        thread = threading.Thread(target=run_server)
1357        thread.start()
1358        self.addCleanup(thread.join, float(1))
1359        conn = client.HTTPConnection(*serv.getsockname())
1360        conn.request("CONNECT", "dummy:1234")
1361        response = conn.getresponse()
1362        try:
1363            self.assertEqual(response.status, client.OK)
1364            s = socket.socket(fileno=response.fileno())
1365            try:
1366                s.sendall(b"proxied data\n")
1367            finally:
1368                s.detach()
1369        finally:
1370            response.close()
1371            conn.close()
1372        thread.join()
1373        self.assertEqual(result, b"proxied data\n")
1374
1375    def test_putrequest_override_domain_validation(self):
1376        """
1377        It should be possible to override the default validation
1378        behavior in putrequest (bpo-38216).
1379        """
1380        class UnsafeHTTPConnection(client.HTTPConnection):
1381            def _validate_path(self, url):
1382                pass
1383
1384        conn = UnsafeHTTPConnection('example.com')
1385        conn.sock = FakeSocket('')
1386        conn.putrequest('GET', '/\x00')
1387
1388    def test_putrequest_override_host_validation(self):
1389        class UnsafeHTTPConnection(client.HTTPConnection):
1390            def _validate_host(self, url):
1391                pass
1392
1393        conn = UnsafeHTTPConnection('example.com\r\n')
1394        conn.sock = FakeSocket('')
1395        # set skip_host so a ValueError is not raised upon adding the
1396        # invalid URL as the value of the "Host:" header
1397        conn.putrequest('GET', '/', skip_host=1)
1398
1399    def test_putrequest_override_encoding(self):
1400        """
1401        It should be possible to override the default encoding
1402        to transmit bytes in another encoding even if invalid
1403        (bpo-36274).
1404        """
1405        class UnsafeHTTPConnection(client.HTTPConnection):
1406            def _encode_request(self, str_url):
1407                return str_url.encode('utf-8')
1408
1409        conn = UnsafeHTTPConnection('example.com')
1410        conn.sock = FakeSocket('')
1411        conn.putrequest('GET', '/☃')
1412
1413
1414class ExtendedReadTest(TestCase):
1415    """
1416    Test peek(), read1(), readline()
1417    """
1418    lines = (
1419        'HTTP/1.1 200 OK\r\n'
1420        '\r\n'
1421        'hello world!\n'
1422        'and now \n'
1423        'for something completely different\n'
1424        'foo'
1425        )
1426    lines_expected = lines[lines.find('hello'):].encode("ascii")
1427    lines_chunked = (
1428        'HTTP/1.1 200 OK\r\n'
1429        'Transfer-Encoding: chunked\r\n\r\n'
1430        'a\r\n'
1431        'hello worl\r\n'
1432        '3\r\n'
1433        'd!\n\r\n'
1434        '9\r\n'
1435        'and now \n\r\n'
1436        '23\r\n'
1437        'for something completely different\n\r\n'
1438        '3\r\n'
1439        'foo\r\n'
1440        '0\r\n' # terminating chunk
1441        '\r\n'  # end of trailers
1442    )
1443
1444    def setUp(self):
1445        sock = FakeSocket(self.lines)
1446        resp = client.HTTPResponse(sock, method="GET")
1447        resp.begin()
1448        resp.fp = io.BufferedReader(resp.fp)
1449        self.resp = resp
1450
1451
1452
1453    def test_peek(self):
1454        resp = self.resp
1455        # patch up the buffered peek so that it returns not too much stuff
1456        oldpeek = resp.fp.peek
1457        def mypeek(n=-1):
1458            p = oldpeek(n)
1459            if n >= 0:
1460                return p[:n]
1461            return p[:10]
1462        resp.fp.peek = mypeek
1463
1464        all = []
1465        while True:
1466            # try a short peek
1467            p = resp.peek(3)
1468            if p:
1469                self.assertGreater(len(p), 0)
1470                # then unbounded peek
1471                p2 = resp.peek()
1472                self.assertGreaterEqual(len(p2), len(p))
1473                self.assertTrue(p2.startswith(p))
1474                next = resp.read(len(p2))
1475                self.assertEqual(next, p2)
1476            else:
1477                next = resp.read()
1478                self.assertFalse(next)
1479            all.append(next)
1480            if not next:
1481                break
1482        self.assertEqual(b"".join(all), self.lines_expected)
1483
1484    def test_readline(self):
1485        resp = self.resp
1486        self._verify_readline(self.resp.readline, self.lines_expected)
1487
1488    def _verify_readline(self, readline, expected):
1489        all = []
1490        while True:
1491            # short readlines
1492            line = readline(5)
1493            if line and line != b"foo":
1494                if len(line) < 5:
1495                    self.assertTrue(line.endswith(b"\n"))
1496            all.append(line)
1497            if not line:
1498                break
1499        self.assertEqual(b"".join(all), expected)
1500
1501    def test_read1(self):
1502        resp = self.resp
1503        def r():
1504            res = resp.read1(4)
1505            self.assertLessEqual(len(res), 4)
1506            return res
1507        readliner = Readliner(r)
1508        self._verify_readline(readliner.readline, self.lines_expected)
1509
1510    def test_read1_unbounded(self):
1511        resp = self.resp
1512        all = []
1513        while True:
1514            data = resp.read1()
1515            if not data:
1516                break
1517            all.append(data)
1518        self.assertEqual(b"".join(all), self.lines_expected)
1519
1520    def test_read1_bounded(self):
1521        resp = self.resp
1522        all = []
1523        while True:
1524            data = resp.read1(10)
1525            if not data:
1526                break
1527            self.assertLessEqual(len(data), 10)
1528            all.append(data)
1529        self.assertEqual(b"".join(all), self.lines_expected)
1530
1531    def test_read1_0(self):
1532        self.assertEqual(self.resp.read1(0), b"")
1533
1534    def test_peek_0(self):
1535        p = self.resp.peek(0)
1536        self.assertLessEqual(0, len(p))
1537
1538
1539class ExtendedReadTestChunked(ExtendedReadTest):
1540    """
1541    Test peek(), read1(), readline() in chunked mode
1542    """
1543    lines = (
1544        'HTTP/1.1 200 OK\r\n'
1545        'Transfer-Encoding: chunked\r\n\r\n'
1546        'a\r\n'
1547        'hello worl\r\n'
1548        '3\r\n'
1549        'd!\n\r\n'
1550        '9\r\n'
1551        'and now \n\r\n'
1552        '23\r\n'
1553        'for something completely different\n\r\n'
1554        '3\r\n'
1555        'foo\r\n'
1556        '0\r\n' # terminating chunk
1557        '\r\n'  # end of trailers
1558    )
1559
1560
1561class Readliner:
1562    """
1563    a simple readline class that uses an arbitrary read function and buffering
1564    """
1565    def __init__(self, readfunc):
1566        self.readfunc = readfunc
1567        self.remainder = b""
1568
1569    def readline(self, limit):
1570        data = []
1571        datalen = 0
1572        read = self.remainder
1573        try:
1574            while True:
1575                idx = read.find(b'\n')
1576                if idx != -1:
1577                    break
1578                if datalen + len(read) >= limit:
1579                    idx = limit - datalen - 1
1580                # read more data
1581                data.append(read)
1582                read = self.readfunc()
1583                if not read:
1584                    idx = 0 #eof condition
1585                    break
1586            idx += 1
1587            data.append(read[:idx])
1588            self.remainder = read[idx:]
1589            return b"".join(data)
1590        except:
1591            self.remainder = b"".join(data)
1592            raise
1593
1594
1595class OfflineTest(TestCase):
1596    def test_all(self):
1597        # Documented objects defined in the module should be in __all__
1598        expected = {"responses"}  # Allowlist documented dict() object
1599        # HTTPMessage, parse_headers(), and the HTTP status code constants are
1600        # intentionally omitted for simplicity
1601        denylist = {"HTTPMessage", "parse_headers"}
1602        for name in dir(client):
1603            if name.startswith("_") or name in denylist:
1604                continue
1605            module_object = getattr(client, name)
1606            if getattr(module_object, "__module__", None) == "http.client":
1607                expected.add(name)
1608        self.assertCountEqual(client.__all__, expected)
1609
1610    def test_responses(self):
1611        self.assertEqual(client.responses[client.NOT_FOUND], "Not Found")
1612
1613    def test_client_constants(self):
1614        # Make sure we don't break backward compatibility with 3.4
1615        expected = [
1616            'CONTINUE',
1617            'SWITCHING_PROTOCOLS',
1618            'PROCESSING',
1619            'OK',
1620            'CREATED',
1621            'ACCEPTED',
1622            'NON_AUTHORITATIVE_INFORMATION',
1623            'NO_CONTENT',
1624            'RESET_CONTENT',
1625            'PARTIAL_CONTENT',
1626            'MULTI_STATUS',
1627            'IM_USED',
1628            'MULTIPLE_CHOICES',
1629            'MOVED_PERMANENTLY',
1630            'FOUND',
1631            'SEE_OTHER',
1632            'NOT_MODIFIED',
1633            'USE_PROXY',
1634            'TEMPORARY_REDIRECT',
1635            'BAD_REQUEST',
1636            'UNAUTHORIZED',
1637            'PAYMENT_REQUIRED',
1638            'FORBIDDEN',
1639            'NOT_FOUND',
1640            'METHOD_NOT_ALLOWED',
1641            'NOT_ACCEPTABLE',
1642            'PROXY_AUTHENTICATION_REQUIRED',
1643            'REQUEST_TIMEOUT',
1644            'CONFLICT',
1645            'GONE',
1646            'LENGTH_REQUIRED',
1647            'PRECONDITION_FAILED',
1648            'REQUEST_ENTITY_TOO_LARGE',
1649            'REQUEST_URI_TOO_LONG',
1650            'UNSUPPORTED_MEDIA_TYPE',
1651            'REQUESTED_RANGE_NOT_SATISFIABLE',
1652            'EXPECTATION_FAILED',
1653            'IM_A_TEAPOT',
1654            'MISDIRECTED_REQUEST',
1655            'UNPROCESSABLE_ENTITY',
1656            'LOCKED',
1657            'FAILED_DEPENDENCY',
1658            'UPGRADE_REQUIRED',
1659            'PRECONDITION_REQUIRED',
1660            'TOO_MANY_REQUESTS',
1661            'REQUEST_HEADER_FIELDS_TOO_LARGE',
1662            'UNAVAILABLE_FOR_LEGAL_REASONS',
1663            'INTERNAL_SERVER_ERROR',
1664            'NOT_IMPLEMENTED',
1665            'BAD_GATEWAY',
1666            'SERVICE_UNAVAILABLE',
1667            'GATEWAY_TIMEOUT',
1668            'HTTP_VERSION_NOT_SUPPORTED',
1669            'INSUFFICIENT_STORAGE',
1670            'NOT_EXTENDED',
1671            'NETWORK_AUTHENTICATION_REQUIRED',
1672            'EARLY_HINTS',
1673            'TOO_EARLY'
1674        ]
1675        for const in expected:
1676            with self.subTest(constant=const):
1677                self.assertTrue(hasattr(client, const))
1678
1679
1680class SourceAddressTest(TestCase):
1681    def setUp(self):
1682        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1683        self.port = socket_helper.bind_port(self.serv)
1684        self.source_port = socket_helper.find_unused_port()
1685        self.serv.listen()
1686        self.conn = None
1687
1688    def tearDown(self):
1689        if self.conn:
1690            self.conn.close()
1691            self.conn = None
1692        self.serv.close()
1693        self.serv = None
1694
1695    def testHTTPConnectionSourceAddress(self):
1696        self.conn = client.HTTPConnection(HOST, self.port,
1697                source_address=('', self.source_port))
1698        self.conn.connect()
1699        self.assertEqual(self.conn.sock.getsockname()[1], self.source_port)
1700
1701    @unittest.skipIf(not hasattr(client, 'HTTPSConnection'),
1702                     'http.client.HTTPSConnection not defined')
1703    def testHTTPSConnectionSourceAddress(self):
1704        self.conn = client.HTTPSConnection(HOST, self.port,
1705                source_address=('', self.source_port))
1706        # We don't test anything here other than the constructor not barfing as
1707        # this code doesn't deal with setting up an active running SSL server
1708        # for an ssl_wrapped connect() to actually return from.
1709
1710
1711class TimeoutTest(TestCase):
1712    PORT = None
1713
1714    def setUp(self):
1715        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1716        TimeoutTest.PORT = socket_helper.bind_port(self.serv)
1717        self.serv.listen()
1718
1719    def tearDown(self):
1720        self.serv.close()
1721        self.serv = None
1722
1723    def testTimeoutAttribute(self):
1724        # This will prove that the timeout gets through HTTPConnection
1725        # and into the socket.
1726
1727        # default -- use global socket timeout
1728        self.assertIsNone(socket.getdefaulttimeout())
1729        socket.setdefaulttimeout(30)
1730        try:
1731            httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT)
1732            httpConn.connect()
1733        finally:
1734            socket.setdefaulttimeout(None)
1735        self.assertEqual(httpConn.sock.gettimeout(), 30)
1736        httpConn.close()
1737
1738        # no timeout -- do not use global socket default
1739        self.assertIsNone(socket.getdefaulttimeout())
1740        socket.setdefaulttimeout(30)
1741        try:
1742            httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT,
1743                                              timeout=None)
1744            httpConn.connect()
1745        finally:
1746            socket.setdefaulttimeout(None)
1747        self.assertEqual(httpConn.sock.gettimeout(), None)
1748        httpConn.close()
1749
1750        # a value
1751        httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30)
1752        httpConn.connect()
1753        self.assertEqual(httpConn.sock.gettimeout(), 30)
1754        httpConn.close()
1755
1756
1757class PersistenceTest(TestCase):
1758
1759    def test_reuse_reconnect(self):
1760        # Should reuse or reconnect depending on header from server
1761        tests = (
1762            ('1.0', '', False),
1763            ('1.0', 'Connection: keep-alive\r\n', True),
1764            ('1.1', '', True),
1765            ('1.1', 'Connection: close\r\n', False),
1766            ('1.0', 'Connection: keep-ALIVE\r\n', True),
1767            ('1.1', 'Connection: cloSE\r\n', False),
1768        )
1769        for version, header, reuse in tests:
1770            with self.subTest(version=version, header=header):
1771                msg = (
1772                    'HTTP/{} 200 OK\r\n'
1773                    '{}'
1774                    'Content-Length: 12\r\n'
1775                    '\r\n'
1776                    'Dummy body\r\n'
1777                ).format(version, header)
1778                conn = FakeSocketHTTPConnection(msg)
1779                self.assertIsNone(conn.sock)
1780                conn.request('GET', '/open-connection')
1781                with conn.getresponse() as response:
1782                    self.assertEqual(conn.sock is None, not reuse)
1783                    response.read()
1784                self.assertEqual(conn.sock is None, not reuse)
1785                self.assertEqual(conn.connections, 1)
1786                conn.request('GET', '/subsequent-request')
1787                self.assertEqual(conn.connections, 1 if reuse else 2)
1788
1789    def test_disconnected(self):
1790
1791        def make_reset_reader(text):
1792            """Return BufferedReader that raises ECONNRESET at EOF"""
1793            stream = io.BytesIO(text)
1794            def readinto(buffer):
1795                size = io.BytesIO.readinto(stream, buffer)
1796                if size == 0:
1797                    raise ConnectionResetError()
1798                return size
1799            stream.readinto = readinto
1800            return io.BufferedReader(stream)
1801
1802        tests = (
1803            (io.BytesIO, client.RemoteDisconnected),
1804            (make_reset_reader, ConnectionResetError),
1805        )
1806        for stream_factory, exception in tests:
1807            with self.subTest(exception=exception):
1808                conn = FakeSocketHTTPConnection(b'', stream_factory)
1809                conn.request('GET', '/eof-response')
1810                self.assertRaises(exception, conn.getresponse)
1811                self.assertIsNone(conn.sock)
1812                # HTTPConnection.connect() should be automatically invoked
1813                conn.request('GET', '/reconnect')
1814                self.assertEqual(conn.connections, 2)
1815
1816    def test_100_close(self):
1817        conn = FakeSocketHTTPConnection(
1818            b'HTTP/1.1 100 Continue\r\n'
1819            b'\r\n'
1820            # Missing final response
1821        )
1822        conn.request('GET', '/', headers={'Expect': '100-continue'})
1823        self.assertRaises(client.RemoteDisconnected, conn.getresponse)
1824        self.assertIsNone(conn.sock)
1825        conn.request('GET', '/reconnect')
1826        self.assertEqual(conn.connections, 2)
1827
1828
1829class HTTPSTest(TestCase):
1830
1831    def setUp(self):
1832        if not hasattr(client, 'HTTPSConnection'):
1833            self.skipTest('ssl support required')
1834
1835    def make_server(self, certfile):
1836        from test.ssl_servers import make_https_server
1837        return make_https_server(self, certfile=certfile)
1838
1839    def test_attributes(self):
1840        # simple test to check it's storing the timeout
1841        h = client.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30)
1842        self.assertEqual(h.timeout, 30)
1843
1844    def test_networked(self):
1845        # Default settings: requires a valid cert from a trusted CA
1846        import ssl
1847        support.requires('network')
1848        with socket_helper.transient_internet('self-signed.pythontest.net'):
1849            h = client.HTTPSConnection('self-signed.pythontest.net', 443)
1850            with self.assertRaises(ssl.SSLError) as exc_info:
1851                h.request('GET', '/')
1852            self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
1853
1854    def test_networked_noverification(self):
1855        # Switch off cert verification
1856        import ssl
1857        support.requires('network')
1858        with socket_helper.transient_internet('self-signed.pythontest.net'):
1859            context = ssl._create_unverified_context()
1860            h = client.HTTPSConnection('self-signed.pythontest.net', 443,
1861                                       context=context)
1862            h.request('GET', '/')
1863            resp = h.getresponse()
1864            h.close()
1865            self.assertIn('nginx', resp.getheader('server'))
1866            resp.close()
1867
1868    @support.system_must_validate_cert
1869    def test_networked_trusted_by_default_cert(self):
1870        # Default settings: requires a valid cert from a trusted CA
1871        support.requires('network')
1872        with socket_helper.transient_internet('www.python.org'):
1873            h = client.HTTPSConnection('www.python.org', 443)
1874            h.request('GET', '/')
1875            resp = h.getresponse()
1876            content_type = resp.getheader('content-type')
1877            resp.close()
1878            h.close()
1879            self.assertIn('text/html', content_type)
1880
1881    def test_networked_good_cert(self):
1882        # We feed the server's cert as a validating cert
1883        import ssl
1884        support.requires('network')
1885        selfsigned_pythontestdotnet = 'self-signed.pythontest.net'
1886        with socket_helper.transient_internet(selfsigned_pythontestdotnet):
1887            context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1888            self.assertEqual(context.verify_mode, ssl.CERT_REQUIRED)
1889            self.assertEqual(context.check_hostname, True)
1890            context.load_verify_locations(CERT_selfsigned_pythontestdotnet)
1891            try:
1892                h = client.HTTPSConnection(selfsigned_pythontestdotnet, 443,
1893                                           context=context)
1894                h.request('GET', '/')
1895                resp = h.getresponse()
1896            except ssl.SSLError as ssl_err:
1897                ssl_err_str = str(ssl_err)
1898                # In the error message of [SSL: CERTIFICATE_VERIFY_FAILED] on
1899                # modern Linux distros (Debian Buster, etc) default OpenSSL
1900                # configurations it'll fail saying "key too weak" until we
1901                # address https://bugs.python.org/issue36816 to use a proper
1902                # key size on self-signed.pythontest.net.
1903                if re.search(r'(?i)key.too.weak', ssl_err_str):
1904                    raise unittest.SkipTest(
1905                        f'Got {ssl_err_str} trying to connect '
1906                        f'to {selfsigned_pythontestdotnet}. '
1907                        'See https://bugs.python.org/issue36816.')
1908                raise
1909            server_string = resp.getheader('server')
1910            resp.close()
1911            h.close()
1912            self.assertIn('nginx', server_string)
1913
1914    def test_networked_bad_cert(self):
1915        # We feed a "CA" cert that is unrelated to the server's cert
1916        import ssl
1917        support.requires('network')
1918        with socket_helper.transient_internet('self-signed.pythontest.net'):
1919            context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1920            context.load_verify_locations(CERT_localhost)
1921            h = client.HTTPSConnection('self-signed.pythontest.net', 443, context=context)
1922            with self.assertRaises(ssl.SSLError) as exc_info:
1923                h.request('GET', '/')
1924            self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
1925
1926    def test_local_unknown_cert(self):
1927        # The custom cert isn't known to the default trust bundle
1928        import ssl
1929        server = self.make_server(CERT_localhost)
1930        h = client.HTTPSConnection('localhost', server.port)
1931        with self.assertRaises(ssl.SSLError) as exc_info:
1932            h.request('GET', '/')
1933        self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
1934
1935    def test_local_good_hostname(self):
1936        # The (valid) cert validates the HTTP hostname
1937        import ssl
1938        server = self.make_server(CERT_localhost)
1939        context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1940        context.load_verify_locations(CERT_localhost)
1941        h = client.HTTPSConnection('localhost', server.port, context=context)
1942        self.addCleanup(h.close)
1943        h.request('GET', '/nonexistent')
1944        resp = h.getresponse()
1945        self.addCleanup(resp.close)
1946        self.assertEqual(resp.status, 404)
1947
1948    def test_local_bad_hostname(self):
1949        # The (valid) cert doesn't validate the HTTP hostname
1950        import ssl
1951        server = self.make_server(CERT_fakehostname)
1952        context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1953        context.load_verify_locations(CERT_fakehostname)
1954        h = client.HTTPSConnection('localhost', server.port, context=context)
1955        with self.assertRaises(ssl.CertificateError):
1956            h.request('GET', '/')
1957        # Same with explicit check_hostname=True
1958        with warnings_helper.check_warnings(('', DeprecationWarning)):
1959            h = client.HTTPSConnection('localhost', server.port,
1960                                       context=context, check_hostname=True)
1961        with self.assertRaises(ssl.CertificateError):
1962            h.request('GET', '/')
1963        # With check_hostname=False, the mismatching is ignored
1964        context.check_hostname = False
1965        with warnings_helper.check_warnings(('', DeprecationWarning)):
1966            h = client.HTTPSConnection('localhost', server.port,
1967                                       context=context, check_hostname=False)
1968        h.request('GET', '/nonexistent')
1969        resp = h.getresponse()
1970        resp.close()
1971        h.close()
1972        self.assertEqual(resp.status, 404)
1973        # The context's check_hostname setting is used if one isn't passed to
1974        # HTTPSConnection.
1975        context.check_hostname = False
1976        h = client.HTTPSConnection('localhost', server.port, context=context)
1977        h.request('GET', '/nonexistent')
1978        resp = h.getresponse()
1979        self.assertEqual(resp.status, 404)
1980        resp.close()
1981        h.close()
1982        # Passing check_hostname to HTTPSConnection should override the
1983        # context's setting.
1984        with warnings_helper.check_warnings(('', DeprecationWarning)):
1985            h = client.HTTPSConnection('localhost', server.port,
1986                                       context=context, check_hostname=True)
1987        with self.assertRaises(ssl.CertificateError):
1988            h.request('GET', '/')
1989
1990    @unittest.skipIf(not hasattr(client, 'HTTPSConnection'),
1991                     'http.client.HTTPSConnection not available')
1992    def test_host_port(self):
1993        # Check invalid host_port
1994
1995        for hp in ("www.python.org:abc", "user:[email protected]"):
1996            self.assertRaises(client.InvalidURL, client.HTTPSConnection, hp)
1997
1998        for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
1999                          "fe80::207:e9ff:fe9b", 8000),
2000                         ("www.python.org:443", "www.python.org", 443),
2001                         ("www.python.org:", "www.python.org", 443),
2002                         ("www.python.org", "www.python.org", 443),
2003                         ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 443),
2004                         ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b",
2005                             443)):
2006            c = client.HTTPSConnection(hp)
2007            self.assertEqual(h, c.host)
2008            self.assertEqual(p, c.port)
2009
2010    def test_tls13_pha(self):
2011        import ssl
2012        if not ssl.HAS_TLSv1_3:
2013            self.skipTest('TLS 1.3 support required')
2014        # just check status of PHA flag
2015        h = client.HTTPSConnection('localhost', 443)
2016        self.assertTrue(h._context.post_handshake_auth)
2017
2018        context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2019        self.assertFalse(context.post_handshake_auth)
2020        h = client.HTTPSConnection('localhost', 443, context=context)
2021        self.assertIs(h._context, context)
2022        self.assertFalse(h._context.post_handshake_auth)
2023
2024        with warnings.catch_warnings():
2025            warnings.filterwarnings('ignore', 'key_file, cert_file and check_hostname are deprecated',
2026                                    DeprecationWarning)
2027            h = client.HTTPSConnection('localhost', 443, context=context,
2028                                       cert_file=CERT_localhost)
2029        self.assertTrue(h._context.post_handshake_auth)
2030
2031
2032class RequestBodyTest(TestCase):
2033    """Test cases where a request includes a message body."""
2034
2035    def setUp(self):
2036        self.conn = client.HTTPConnection('example.com')
2037        self.conn.sock = self.sock = FakeSocket("")
2038        self.conn.sock = self.sock
2039
2040    def get_headers_and_fp(self):
2041        f = io.BytesIO(self.sock.data)
2042        f.readline()  # read the request line
2043        message = client.parse_headers(f)
2044        return message, f
2045
2046    def test_list_body(self):
2047        # Note that no content-length is automatically calculated for
2048        # an iterable.  The request will fall back to send chunked
2049        # transfer encoding.
2050        cases = (
2051            ([b'foo', b'bar'], b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'),
2052            ((b'foo', b'bar'), b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'),
2053        )
2054        for body, expected in cases:
2055            with self.subTest(body):
2056                self.conn = client.HTTPConnection('example.com')
2057                self.conn.sock = self.sock = FakeSocket('')
2058
2059                self.conn.request('PUT', '/url', body)
2060                msg, f = self.get_headers_and_fp()
2061                self.assertNotIn('Content-Type', msg)
2062                self.assertNotIn('Content-Length', msg)
2063                self.assertEqual(msg.get('Transfer-Encoding'), 'chunked')
2064                self.assertEqual(expected, f.read())
2065
2066    def test_manual_content_length(self):
2067        # Set an incorrect content-length so that we can verify that
2068        # it will not be over-ridden by the library.
2069        self.conn.request("PUT", "/url", "body",
2070                          {"Content-Length": "42"})
2071        message, f = self.get_headers_and_fp()
2072        self.assertEqual("42", message.get("content-length"))
2073        self.assertEqual(4, len(f.read()))
2074
2075    def test_ascii_body(self):
2076        self.conn.request("PUT", "/url", "body")
2077        message, f = self.get_headers_and_fp()
2078        self.assertEqual("text/plain", message.get_content_type())
2079        self.assertIsNone(message.get_charset())
2080        self.assertEqual("4", message.get("content-length"))
2081        self.assertEqual(b'body', f.read())
2082
2083    def test_latin1_body(self):
2084        self.conn.request("PUT", "/url", "body\xc1")
2085        message, f = self.get_headers_and_fp()
2086        self.assertEqual("text/plain", message.get_content_type())
2087        self.assertIsNone(message.get_charset())
2088        self.assertEqual("5", message.get("content-length"))
2089        self.assertEqual(b'body\xc1', f.read())
2090
2091    def test_bytes_body(self):
2092        self.conn.request("PUT", "/url", b"body\xc1")
2093        message, f = self.get_headers_and_fp()
2094        self.assertEqual("text/plain", message.get_content_type())
2095        self.assertIsNone(message.get_charset())
2096        self.assertEqual("5", message.get("content-length"))
2097        self.assertEqual(b'body\xc1', f.read())
2098
2099    def test_text_file_body(self):
2100        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
2101        with open(os_helper.TESTFN, "w", encoding="utf-8") as f:
2102            f.write("body")
2103        with open(os_helper.TESTFN, encoding="utf-8") as f:
2104            self.conn.request("PUT", "/url", f)
2105            message, f = self.get_headers_and_fp()
2106            self.assertEqual("text/plain", message.get_content_type())
2107            self.assertIsNone(message.get_charset())
2108            # No content-length will be determined for files; the body
2109            # will be sent using chunked transfer encoding instead.
2110            self.assertIsNone(message.get("content-length"))
2111            self.assertEqual("chunked", message.get("transfer-encoding"))
2112            self.assertEqual(b'4\r\nbody\r\n0\r\n\r\n', f.read())
2113
2114    def test_binary_file_body(self):
2115        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
2116        with open(os_helper.TESTFN, "wb") as f:
2117            f.write(b"body\xc1")
2118        with open(os_helper.TESTFN, "rb") as f:
2119            self.conn.request("PUT", "/url", f)
2120            message, f = self.get_headers_and_fp()
2121            self.assertEqual("text/plain", message.get_content_type())
2122            self.assertIsNone(message.get_charset())
2123            self.assertEqual("chunked", message.get("Transfer-Encoding"))
2124            self.assertNotIn("Content-Length", message)
2125            self.assertEqual(b'5\r\nbody\xc1\r\n0\r\n\r\n', f.read())
2126
2127
2128class HTTPResponseTest(TestCase):
2129
2130    def setUp(self):
2131        body = "HTTP/1.1 200 Ok\r\nMy-Header: first-value\r\nMy-Header: \
2132                second-value\r\n\r\nText"
2133        sock = FakeSocket(body)
2134        self.resp = client.HTTPResponse(sock)
2135        self.resp.begin()
2136
2137    def test_getting_header(self):
2138        header = self.resp.getheader('My-Header')
2139        self.assertEqual(header, 'first-value, second-value')
2140
2141        header = self.resp.getheader('My-Header', 'some default')
2142        self.assertEqual(header, 'first-value, second-value')
2143
2144    def test_getting_nonexistent_header_with_string_default(self):
2145        header = self.resp.getheader('No-Such-Header', 'default-value')
2146        self.assertEqual(header, 'default-value')
2147
2148    def test_getting_nonexistent_header_with_iterable_default(self):
2149        header = self.resp.getheader('No-Such-Header', ['default', 'values'])
2150        self.assertEqual(header, 'default, values')
2151
2152        header = self.resp.getheader('No-Such-Header', ('default', 'values'))
2153        self.assertEqual(header, 'default, values')
2154
2155    def test_getting_nonexistent_header_without_default(self):
2156        header = self.resp.getheader('No-Such-Header')
2157        self.assertEqual(header, None)
2158
2159    def test_getting_header_defaultint(self):
2160        header = self.resp.getheader('No-Such-Header',default=42)
2161        self.assertEqual(header, 42)
2162
2163class TunnelTests(TestCase):
2164    def setUp(self):
2165        response_text = (
2166            'HTTP/1.0 200 OK\r\n\r\n' # Reply to CONNECT
2167            'HTTP/1.1 200 OK\r\n' # Reply to HEAD
2168            'Content-Length: 42\r\n\r\n'
2169        )
2170        self.host = 'proxy.com'
2171        self.conn = client.HTTPConnection(self.host)
2172        self.conn._create_connection = self._create_connection(response_text)
2173
2174    def tearDown(self):
2175        self.conn.close()
2176
2177    def _create_connection(self, response_text):
2178        def create_connection(address, timeout=None, source_address=None):
2179            return FakeSocket(response_text, host=address[0], port=address[1])
2180        return create_connection
2181
2182    def test_set_tunnel_host_port_headers(self):
2183        tunnel_host = 'destination.com'
2184        tunnel_port = 8888
2185        tunnel_headers = {'User-Agent': 'Mozilla/5.0 (compatible, MSIE 11)'}
2186        self.conn.set_tunnel(tunnel_host, port=tunnel_port,
2187                             headers=tunnel_headers)
2188        self.conn.request('HEAD', '/', '')
2189        self.assertEqual(self.conn.sock.host, self.host)
2190        self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
2191        self.assertEqual(self.conn._tunnel_host, tunnel_host)
2192        self.assertEqual(self.conn._tunnel_port, tunnel_port)
2193        self.assertEqual(self.conn._tunnel_headers, tunnel_headers)
2194
2195    def test_disallow_set_tunnel_after_connect(self):
2196        # Once connected, we shouldn't be able to tunnel anymore
2197        self.conn.connect()
2198        self.assertRaises(RuntimeError, self.conn.set_tunnel,
2199                          'destination.com')
2200
2201    def test_connect_with_tunnel(self):
2202        self.conn.set_tunnel('destination.com')
2203        self.conn.request('HEAD', '/', '')
2204        self.assertEqual(self.conn.sock.host, self.host)
2205        self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
2206        self.assertIn(b'CONNECT destination.com', self.conn.sock.data)
2207        # issue22095
2208        self.assertNotIn(b'Host: destination.com:None', self.conn.sock.data)
2209        self.assertIn(b'Host: destination.com', self.conn.sock.data)
2210
2211        # This test should be removed when CONNECT gets the HTTP/1.1 blessing
2212        self.assertNotIn(b'Host: proxy.com', self.conn.sock.data)
2213
2214    def test_tunnel_connect_single_send_connection_setup(self):
2215        """Regresstion test for https://bugs.python.org/issue43332."""
2216        with mock.patch.object(self.conn, 'send') as mock_send:
2217            self.conn.set_tunnel('destination.com')
2218            self.conn.connect()
2219            self.conn.request('GET', '/')
2220        mock_send.assert_called()
2221        # Likely 2, but this test only cares about the first.
2222        self.assertGreater(
2223                len(mock_send.mock_calls), 1,
2224                msg=f'unexpected number of send calls: {mock_send.mock_calls}')
2225        proxy_setup_data_sent = mock_send.mock_calls[0][1][0]
2226        self.assertIn(b'CONNECT destination.com', proxy_setup_data_sent)
2227        self.assertTrue(
2228                proxy_setup_data_sent.endswith(b'\r\n\r\n'),
2229                msg=f'unexpected proxy data sent {proxy_setup_data_sent!r}')
2230
2231    def test_connect_put_request(self):
2232        self.conn.set_tunnel('destination.com')
2233        self.conn.request('PUT', '/', '')
2234        self.assertEqual(self.conn.sock.host, self.host)
2235        self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
2236        self.assertIn(b'CONNECT destination.com', self.conn.sock.data)
2237        self.assertIn(b'Host: destination.com', self.conn.sock.data)
2238
2239    def test_tunnel_debuglog(self):
2240        expected_header = 'X-Dummy: 1'
2241        response_text = 'HTTP/1.0 200 OK\r\n{}\r\n\r\n'.format(expected_header)
2242
2243        self.conn.set_debuglevel(1)
2244        self.conn._create_connection = self._create_connection(response_text)
2245        self.conn.set_tunnel('destination.com')
2246
2247        with support.captured_stdout() as output:
2248            self.conn.request('PUT', '/', '')
2249        lines = output.getvalue().splitlines()
2250        self.assertIn('header: {}'.format(expected_header), lines)
2251
2252    def test_tunnel_leak(self):
2253        sock = None
2254
2255        def _create_connection(address, timeout=None, source_address=None):
2256            nonlocal sock
2257            sock = FakeSocket(
2258                'HTTP/1.1 404 NOT FOUND\r\n\r\n',
2259                host=address[0],
2260                port=address[1],
2261            )
2262            return sock
2263
2264        self.conn._create_connection = _create_connection
2265        self.conn.set_tunnel('destination.com')
2266        exc = None
2267        try:
2268            self.conn.request('HEAD', '/', '')
2269        except OSError as e:
2270            # keeping a reference to exc keeps response alive in the traceback
2271            exc = e
2272        self.assertIsNotNone(exc)
2273        self.assertTrue(sock.file_closed)
2274
2275
2276if __name__ == '__main__':
2277    unittest.main(verbosity=2)
2278