1"""Unittests for the various HTTPServer modules.
2
3Written by Cody A.W. Somerville <[email protected]>,
4Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest.
5"""
6from collections import OrderedDict
7from http.server import BaseHTTPRequestHandler, HTTPServer, \
8     SimpleHTTPRequestHandler, CGIHTTPRequestHandler
9from http import server, HTTPStatus
10
11import os
12import socket
13import sys
14import re
15import base64
16import ntpath
17import pathlib
18import shutil
19import email.message
20import email.utils
21import html
22import http, http.client
23import urllib.parse
24import tempfile
25import time
26import datetime
27import threading
28from unittest import mock
29from io import BytesIO, StringIO
30
31import unittest
32from test import support
33from test.support import os_helper
34from test.support import threading_helper
35
36support.requires_working_socket(module=True)
37
38class NoLogRequestHandler:
39    def log_message(self, *args):
40        # don't write log messages to stderr
41        pass
42
43    def read(self, n=None):
44        return ''
45
46
47class TestServerThread(threading.Thread):
48    def __init__(self, test_object, request_handler):
49        threading.Thread.__init__(self)
50        self.request_handler = request_handler
51        self.test_object = test_object
52
53    def run(self):
54        self.server = HTTPServer(('localhost', 0), self.request_handler)
55        self.test_object.HOST, self.test_object.PORT = self.server.socket.getsockname()
56        self.test_object.server_started.set()
57        self.test_object = None
58        try:
59            self.server.serve_forever(0.05)
60        finally:
61            self.server.server_close()
62
63    def stop(self):
64        self.server.shutdown()
65        self.join()
66
67
68class BaseTestCase(unittest.TestCase):
69    def setUp(self):
70        self._threads = threading_helper.threading_setup()
71        os.environ = os_helper.EnvironmentVarGuard()
72        self.server_started = threading.Event()
73        self.thread = TestServerThread(self, self.request_handler)
74        self.thread.start()
75        self.server_started.wait()
76
77    def tearDown(self):
78        self.thread.stop()
79        self.thread = None
80        os.environ.__exit__()
81        threading_helper.threading_cleanup(*self._threads)
82
83    def request(self, uri, method='GET', body=None, headers={}):
84        self.connection = http.client.HTTPConnection(self.HOST, self.PORT)
85        self.connection.request(method, uri, body, headers)
86        return self.connection.getresponse()
87
88
89class BaseHTTPServerTestCase(BaseTestCase):
90    class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler):
91        protocol_version = 'HTTP/1.1'
92        default_request_version = 'HTTP/1.1'
93
94        def do_TEST(self):
95            self.send_response(HTTPStatus.NO_CONTENT)
96            self.send_header('Content-Type', 'text/html')
97            self.send_header('Connection', 'close')
98            self.end_headers()
99
100        def do_KEEP(self):
101            self.send_response(HTTPStatus.NO_CONTENT)
102            self.send_header('Content-Type', 'text/html')
103            self.send_header('Connection', 'keep-alive')
104            self.end_headers()
105
106        def do_KEYERROR(self):
107            self.send_error(999)
108
109        def do_NOTFOUND(self):
110            self.send_error(HTTPStatus.NOT_FOUND)
111
112        def do_EXPLAINERROR(self):
113            self.send_error(999, "Short Message",
114                            "This is a long \n explanation")
115
116        def do_CUSTOM(self):
117            self.send_response(999)
118            self.send_header('Content-Type', 'text/html')
119            self.send_header('Connection', 'close')
120            self.end_headers()
121
122        def do_LATINONEHEADER(self):
123            self.send_response(999)
124            self.send_header('X-Special', 'Dängerous Mind')
125            self.send_header('Connection', 'close')
126            self.end_headers()
127            body = self.headers['x-special-incoming'].encode('utf-8')
128            self.wfile.write(body)
129
130        def do_SEND_ERROR(self):
131            self.send_error(int(self.path[1:]))
132
133        def do_HEAD(self):
134            self.send_error(int(self.path[1:]))
135
136    def setUp(self):
137        BaseTestCase.setUp(self)
138        self.con = http.client.HTTPConnection(self.HOST, self.PORT)
139        self.con.connect()
140
141    def test_command(self):
142        self.con.request('GET', '/')
143        res = self.con.getresponse()
144        self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
145
146    def test_request_line_trimming(self):
147        self.con._http_vsn_str = 'HTTP/1.1\n'
148        self.con.putrequest('XYZBOGUS', '/')
149        self.con.endheaders()
150        res = self.con.getresponse()
151        self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
152
153    def test_version_bogus(self):
154        self.con._http_vsn_str = 'FUBAR'
155        self.con.putrequest('GET', '/')
156        self.con.endheaders()
157        res = self.con.getresponse()
158        self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
159
160    def test_version_digits(self):
161        self.con._http_vsn_str = 'HTTP/9.9.9'
162        self.con.putrequest('GET', '/')
163        self.con.endheaders()
164        res = self.con.getresponse()
165        self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
166
167    def test_version_signs_and_underscores(self):
168        self.con._http_vsn_str = 'HTTP/-9_9_9.+9_9_9'
169        self.con.putrequest('GET', '/')
170        self.con.endheaders()
171        res = self.con.getresponse()
172        self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
173
174    def test_major_version_number_too_long(self):
175        self.con._http_vsn_str = 'HTTP/909876543210.0'
176        self.con.putrequest('GET', '/')
177        self.con.endheaders()
178        res = self.con.getresponse()
179        self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
180
181    def test_minor_version_number_too_long(self):
182        self.con._http_vsn_str = 'HTTP/1.909876543210'
183        self.con.putrequest('GET', '/')
184        self.con.endheaders()
185        res = self.con.getresponse()
186        self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
187
188    def test_version_none_get(self):
189        self.con._http_vsn_str = ''
190        self.con.putrequest('GET', '/')
191        self.con.endheaders()
192        res = self.con.getresponse()
193        self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
194
195    def test_version_none(self):
196        # Test that a valid method is rejected when not HTTP/1.x
197        self.con._http_vsn_str = ''
198        self.con.putrequest('CUSTOM', '/')
199        self.con.endheaders()
200        res = self.con.getresponse()
201        self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
202
203    def test_version_invalid(self):
204        self.con._http_vsn = 99
205        self.con._http_vsn_str = 'HTTP/9.9'
206        self.con.putrequest('GET', '/')
207        self.con.endheaders()
208        res = self.con.getresponse()
209        self.assertEqual(res.status, HTTPStatus.HTTP_VERSION_NOT_SUPPORTED)
210
211    def test_send_blank(self):
212        self.con._http_vsn_str = ''
213        self.con.putrequest('', '')
214        self.con.endheaders()
215        res = self.con.getresponse()
216        self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
217
218    def test_header_close(self):
219        self.con.putrequest('GET', '/')
220        self.con.putheader('Connection', 'close')
221        self.con.endheaders()
222        res = self.con.getresponse()
223        self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
224
225    def test_header_keep_alive(self):
226        self.con._http_vsn_str = 'HTTP/1.1'
227        self.con.putrequest('GET', '/')
228        self.con.putheader('Connection', 'keep-alive')
229        self.con.endheaders()
230        res = self.con.getresponse()
231        self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
232
233    def test_handler(self):
234        self.con.request('TEST', '/')
235        res = self.con.getresponse()
236        self.assertEqual(res.status, HTTPStatus.NO_CONTENT)
237
238    def test_return_header_keep_alive(self):
239        self.con.request('KEEP', '/')
240        res = self.con.getresponse()
241        self.assertEqual(res.getheader('Connection'), 'keep-alive')
242        self.con.request('TEST', '/')
243        self.addCleanup(self.con.close)
244
245    def test_internal_key_error(self):
246        self.con.request('KEYERROR', '/')
247        res = self.con.getresponse()
248        self.assertEqual(res.status, 999)
249
250    def test_return_custom_status(self):
251        self.con.request('CUSTOM', '/')
252        res = self.con.getresponse()
253        self.assertEqual(res.status, 999)
254
255    def test_return_explain_error(self):
256        self.con.request('EXPLAINERROR', '/')
257        res = self.con.getresponse()
258        self.assertEqual(res.status, 999)
259        self.assertTrue(int(res.getheader('Content-Length')))
260
261    def test_latin1_header(self):
262        self.con.request('LATINONEHEADER', '/', headers={
263            'X-Special-Incoming':       'Ärger mit Unicode'
264        })
265        res = self.con.getresponse()
266        self.assertEqual(res.getheader('X-Special'), 'Dängerous Mind')
267        self.assertEqual(res.read(), 'Ärger mit Unicode'.encode('utf-8'))
268
269    def test_error_content_length(self):
270        # Issue #16088: standard error responses should have a content-length
271        self.con.request('NOTFOUND', '/')
272        res = self.con.getresponse()
273        self.assertEqual(res.status, HTTPStatus.NOT_FOUND)
274
275        data = res.read()
276        self.assertEqual(int(res.getheader('Content-Length')), len(data))
277
278    def test_send_error(self):
279        allow_transfer_encoding_codes = (HTTPStatus.NOT_MODIFIED,
280                                         HTTPStatus.RESET_CONTENT)
281        for code in (HTTPStatus.NO_CONTENT, HTTPStatus.NOT_MODIFIED,
282                     HTTPStatus.PROCESSING, HTTPStatus.RESET_CONTENT,
283                     HTTPStatus.SWITCHING_PROTOCOLS):
284            self.con.request('SEND_ERROR', '/{}'.format(code))
285            res = self.con.getresponse()
286            self.assertEqual(code, res.status)
287            self.assertEqual(None, res.getheader('Content-Length'))
288            self.assertEqual(None, res.getheader('Content-Type'))
289            if code not in allow_transfer_encoding_codes:
290                self.assertEqual(None, res.getheader('Transfer-Encoding'))
291
292            data = res.read()
293            self.assertEqual(b'', data)
294
295    def test_head_via_send_error(self):
296        allow_transfer_encoding_codes = (HTTPStatus.NOT_MODIFIED,
297                                         HTTPStatus.RESET_CONTENT)
298        for code in (HTTPStatus.OK, HTTPStatus.NO_CONTENT,
299                     HTTPStatus.NOT_MODIFIED, HTTPStatus.RESET_CONTENT,
300                     HTTPStatus.SWITCHING_PROTOCOLS):
301            self.con.request('HEAD', '/{}'.format(code))
302            res = self.con.getresponse()
303            self.assertEqual(code, res.status)
304            if code == HTTPStatus.OK:
305                self.assertTrue(int(res.getheader('Content-Length')) > 0)
306                self.assertIn('text/html', res.getheader('Content-Type'))
307            else:
308                self.assertEqual(None, res.getheader('Content-Length'))
309                self.assertEqual(None, res.getheader('Content-Type'))
310            if code not in allow_transfer_encoding_codes:
311                self.assertEqual(None, res.getheader('Transfer-Encoding'))
312
313            data = res.read()
314            self.assertEqual(b'', data)
315
316
317class RequestHandlerLoggingTestCase(BaseTestCase):
318    class request_handler(BaseHTTPRequestHandler):
319        protocol_version = 'HTTP/1.1'
320        default_request_version = 'HTTP/1.1'
321
322        def do_GET(self):
323            self.send_response(HTTPStatus.OK)
324            self.end_headers()
325
326        def do_ERROR(self):
327            self.send_error(HTTPStatus.NOT_FOUND, 'File not found')
328
329    def test_get(self):
330        self.con = http.client.HTTPConnection(self.HOST, self.PORT)
331        self.con.connect()
332
333        with support.captured_stderr() as err:
334            self.con.request('GET', '/')
335            self.con.getresponse()
336
337        self.assertTrue(
338            err.getvalue().endswith('"GET / HTTP/1.1" 200 -\n'))
339
340    def test_err(self):
341        self.con = http.client.HTTPConnection(self.HOST, self.PORT)
342        self.con.connect()
343
344        with support.captured_stderr() as err:
345            self.con.request('ERROR', '/')
346            self.con.getresponse()
347
348        lines = err.getvalue().split('\n')
349        self.assertTrue(lines[0].endswith('code 404, message File not found'))
350        self.assertTrue(lines[1].endswith('"ERROR / HTTP/1.1" 404 -'))
351
352
353class SimpleHTTPServerTestCase(BaseTestCase):
354    class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler):
355        pass
356
357    def setUp(self):
358        super().setUp()
359        self.cwd = os.getcwd()
360        basetempdir = tempfile.gettempdir()
361        os.chdir(basetempdir)
362        self.data = b'We are the knights who say Ni!'
363        self.tempdir = tempfile.mkdtemp(dir=basetempdir)
364        self.tempdir_name = os.path.basename(self.tempdir)
365        self.base_url = '/' + self.tempdir_name
366        tempname = os.path.join(self.tempdir, 'test')
367        with open(tempname, 'wb') as temp:
368            temp.write(self.data)
369            temp.flush()
370        mtime = os.stat(tempname).st_mtime
371        # compute last modification datetime for browser cache tests
372        last_modif = datetime.datetime.fromtimestamp(mtime,
373            datetime.timezone.utc)
374        self.last_modif_datetime = last_modif.replace(microsecond=0)
375        self.last_modif_header = email.utils.formatdate(
376            last_modif.timestamp(), usegmt=True)
377
378    def tearDown(self):
379        try:
380            os.chdir(self.cwd)
381            try:
382                shutil.rmtree(self.tempdir)
383            except:
384                pass
385        finally:
386            super().tearDown()
387
388    def check_status_and_reason(self, response, status, data=None):
389        def close_conn():
390            """Don't close reader yet so we can check if there was leftover
391            buffered input"""
392            nonlocal reader
393            reader = response.fp
394            response.fp = None
395        reader = None
396        response._close_conn = close_conn
397
398        body = response.read()
399        self.assertTrue(response)
400        self.assertEqual(response.status, status)
401        self.assertIsNotNone(response.reason)
402        if data:
403            self.assertEqual(data, body)
404        # Ensure the server has not set up a persistent connection, and has
405        # not sent any extra data
406        self.assertEqual(response.version, 10)
407        self.assertEqual(response.msg.get("Connection", "close"), "close")
408        self.assertEqual(reader.read(30), b'', 'Connection should be closed')
409
410        reader.close()
411        return body
412
413    @unittest.skipIf(sys.platform == 'darwin',
414                     'undecodable name cannot always be decoded on macOS')
415    @unittest.skipIf(sys.platform == 'win32',
416                     'undecodable name cannot be decoded on win32')
417    @unittest.skipUnless(os_helper.TESTFN_UNDECODABLE,
418                         'need os_helper.TESTFN_UNDECODABLE')
419    def test_undecodable_filename(self):
420        enc = sys.getfilesystemencoding()
421        filename = os.fsdecode(os_helper.TESTFN_UNDECODABLE) + '.txt'
422        with open(os.path.join(self.tempdir, filename), 'wb') as f:
423            f.write(os_helper.TESTFN_UNDECODABLE)
424        response = self.request(self.base_url + '/')
425        if sys.platform == 'darwin':
426            # On Mac OS the HFS+ filesystem replaces bytes that aren't valid
427            # UTF-8 into a percent-encoded value.
428            for name in os.listdir(self.tempdir):
429                if name != 'test': # Ignore a filename created in setUp().
430                    filename = name
431                    break
432        body = self.check_status_and_reason(response, HTTPStatus.OK)
433        quotedname = urllib.parse.quote(filename, errors='surrogatepass')
434        self.assertIn(('href="%s"' % quotedname)
435                      .encode(enc, 'surrogateescape'), body)
436        self.assertIn(('>%s<' % html.escape(filename, quote=False))
437                      .encode(enc, 'surrogateescape'), body)
438        response = self.request(self.base_url + '/' + quotedname)
439        self.check_status_and_reason(response, HTTPStatus.OK,
440                                     data=os_helper.TESTFN_UNDECODABLE)
441
442    def test_undecodable_parameter(self):
443        # sanity check using a valid parameter
444        response = self.request(self.base_url + '/?x=123').read()
445        self.assertRegex(response, f'listing for {self.base_url}/\?x=123'.encode('latin1'))
446        # now the bogus encoding
447        response = self.request(self.base_url + '/?x=%bb').read()
448        self.assertRegex(response, f'listing for {self.base_url}/\?x=\xef\xbf\xbd'.encode('latin1'))
449
450    def test_get_dir_redirect_location_domain_injection_bug(self):
451        """Ensure //evil.co/..%2f../../X does not put //evil.co/ in Location.
452
453        //netloc/ in a Location header is a redirect to a new host.
454        https://github.com/python/cpython/issues/87389
455
456        This checks that a path resolving to a directory on our server cannot
457        resolve into a redirect to another server.
458        """
459        os.mkdir(os.path.join(self.tempdir, 'existing_directory'))
460        url = f'/python.org/..%2f..%2f..%2f..%2f..%2f../%0a%0d/../{self.tempdir_name}/existing_directory'
461        expected_location = f'{url}/'  # /python.org.../ single slash single prefix, trailing slash
462        # Canonicalizes to /tmp/tempdir_name/existing_directory which does
463        # exist and is a dir, triggering the 301 redirect logic.
464        response = self.request(url)
465        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
466        location = response.getheader('Location')
467        self.assertEqual(location, expected_location, msg='non-attack failed!')
468
469        # //python.org... multi-slash prefix, no trailing slash
470        attack_url = f'/{url}'
471        response = self.request(attack_url)
472        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
473        location = response.getheader('Location')
474        self.assertFalse(location.startswith('//'), msg=location)
475        self.assertEqual(location, expected_location,
476                msg='Expected Location header to start with a single / and '
477                'end with a / as this is a directory redirect.')
478
479        # ///python.org... triple-slash prefix, no trailing slash
480        attack3_url = f'//{url}'
481        response = self.request(attack3_url)
482        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
483        self.assertEqual(response.getheader('Location'), expected_location)
484
485        # If the second word in the http request (Request-URI for the http
486        # method) is a full URI, we don't worry about it, as that'll be parsed
487        # and reassembled as a full URI within BaseHTTPRequestHandler.send_head
488        # so no errant scheme-less //netloc//evil.co/ domain mixup can happen.
489        attack_scheme_netloc_2slash_url = f'https://pypi.org/{url}'
490        expected_scheme_netloc_location = f'{attack_scheme_netloc_2slash_url}/'
491        response = self.request(attack_scheme_netloc_2slash_url)
492        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
493        location = response.getheader('Location')
494        # We're just ensuring that the scheme and domain make it through, if
495        # there are or aren't multiple slashes at the start of the path that
496        # follows that isn't important in this Location: header.
497        self.assertTrue(location.startswith('https://pypi.org/'), msg=location)
498
499    def test_get(self):
500        #constructs the path relative to the root directory of the HTTPServer
501        response = self.request(self.base_url + '/test')
502        self.check_status_and_reason(response, HTTPStatus.OK, data=self.data)
503        # check for trailing "/" which should return 404. See Issue17324
504        response = self.request(self.base_url + '/test/')
505        self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
506        response = self.request(self.base_url + '/')
507        self.check_status_and_reason(response, HTTPStatus.OK)
508        response = self.request(self.base_url)
509        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
510        self.assertEqual(response.getheader("Content-Length"), "0")
511        response = self.request(self.base_url + '/?hi=2')
512        self.check_status_and_reason(response, HTTPStatus.OK)
513        response = self.request(self.base_url + '?hi=1')
514        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
515        self.assertEqual(response.getheader("Location"),
516                         self.base_url + "/?hi=1")
517        response = self.request('/ThisDoesNotExist')
518        self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
519        response = self.request('/' + 'ThisDoesNotExist' + '/')
520        self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
521        os.makedirs(os.path.join(self.tempdir, 'spam', 'index.html'))
522        response = self.request(self.base_url + '/spam/')
523        self.check_status_and_reason(response, HTTPStatus.OK)
524
525        data = b"Dummy index file\r\n"
526        with open(os.path.join(self.tempdir_name, 'index.html'), 'wb') as f:
527            f.write(data)
528        response = self.request(self.base_url + '/')
529        self.check_status_and_reason(response, HTTPStatus.OK, data)
530
531        # chmod() doesn't work as expected on Windows, and filesystem
532        # permissions are ignored by root on Unix.
533        if os.name == 'posix' and os.geteuid() != 0:
534            os.chmod(self.tempdir, 0)
535            try:
536                response = self.request(self.base_url + '/')
537                self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
538            finally:
539                os.chmod(self.tempdir, 0o755)
540
541    def test_head(self):
542        response = self.request(
543            self.base_url + '/test', method='HEAD')
544        self.check_status_and_reason(response, HTTPStatus.OK)
545        self.assertEqual(response.getheader('content-length'),
546                         str(len(self.data)))
547        self.assertEqual(response.getheader('content-type'),
548                         'application/octet-stream')
549
550    def test_browser_cache(self):
551        """Check that when a request to /test is sent with the request header
552        If-Modified-Since set to date of last modification, the server returns
553        status code 304, not 200
554        """
555        headers = email.message.Message()
556        headers['If-Modified-Since'] = self.last_modif_header
557        response = self.request(self.base_url + '/test', headers=headers)
558        self.check_status_and_reason(response, HTTPStatus.NOT_MODIFIED)
559
560        # one hour after last modification : must return 304
561        new_dt = self.last_modif_datetime + datetime.timedelta(hours=1)
562        headers = email.message.Message()
563        headers['If-Modified-Since'] = email.utils.format_datetime(new_dt,
564            usegmt=True)
565        response = self.request(self.base_url + '/test', headers=headers)
566        self.check_status_and_reason(response, HTTPStatus.NOT_MODIFIED)
567
568    def test_browser_cache_file_changed(self):
569        # with If-Modified-Since earlier than Last-Modified, must return 200
570        dt = self.last_modif_datetime
571        # build datetime object : 365 days before last modification
572        old_dt = dt - datetime.timedelta(days=365)
573        headers = email.message.Message()
574        headers['If-Modified-Since'] = email.utils.format_datetime(old_dt,
575            usegmt=True)
576        response = self.request(self.base_url + '/test', headers=headers)
577        self.check_status_and_reason(response, HTTPStatus.OK)
578
579    def test_browser_cache_with_If_None_Match_header(self):
580        # if If-None-Match header is present, ignore If-Modified-Since
581
582        headers = email.message.Message()
583        headers['If-Modified-Since'] = self.last_modif_header
584        headers['If-None-Match'] = "*"
585        response = self.request(self.base_url + '/test', headers=headers)
586        self.check_status_and_reason(response, HTTPStatus.OK)
587
588    def test_invalid_requests(self):
589        response = self.request('/', method='FOO')
590        self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED)
591        # requests must be case sensitive,so this should fail too
592        response = self.request('/', method='custom')
593        self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED)
594        response = self.request('/', method='GETs')
595        self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED)
596
597    def test_last_modified(self):
598        """Checks that the datetime returned in Last-Modified response header
599        is the actual datetime of last modification, rounded to the second
600        """
601        response = self.request(self.base_url + '/test')
602        self.check_status_and_reason(response, HTTPStatus.OK, data=self.data)
603        last_modif_header = response.headers['Last-modified']
604        self.assertEqual(last_modif_header, self.last_modif_header)
605
606    def test_path_without_leading_slash(self):
607        response = self.request(self.tempdir_name + '/test')
608        self.check_status_and_reason(response, HTTPStatus.OK, data=self.data)
609        response = self.request(self.tempdir_name + '/test/')
610        self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
611        response = self.request(self.tempdir_name + '/')
612        self.check_status_and_reason(response, HTTPStatus.OK)
613        response = self.request(self.tempdir_name)
614        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
615        response = self.request(self.tempdir_name + '/?hi=2')
616        self.check_status_and_reason(response, HTTPStatus.OK)
617        response = self.request(self.tempdir_name + '?hi=1')
618        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
619        self.assertEqual(response.getheader("Location"),
620                         self.tempdir_name + "/?hi=1")
621
622    def test_html_escape_filename(self):
623        filename = '<test&>.txt'
624        fullpath = os.path.join(self.tempdir, filename)
625
626        try:
627            open(fullpath, 'wb').close()
628        except OSError:
629            raise unittest.SkipTest('Can not create file %s on current file '
630                                    'system' % filename)
631
632        try:
633            response = self.request(self.base_url + '/')
634            body = self.check_status_and_reason(response, HTTPStatus.OK)
635            enc = response.headers.get_content_charset()
636        finally:
637            os.unlink(fullpath)  # avoid affecting test_undecodable_filename
638
639        self.assertIsNotNone(enc)
640        html_text = '>%s<' % html.escape(filename, quote=False)
641        self.assertIn(html_text.encode(enc), body)
642
643
644cgi_file1 = """\
645#!%s
646
647print("Content-type: text/html")
648print()
649print("Hello World")
650"""
651
652cgi_file2 = """\
653#!%s
654import os
655import sys
656import urllib.parse
657
658print("Content-type: text/html")
659print()
660
661content_length = int(os.environ["CONTENT_LENGTH"])
662query_string = sys.stdin.buffer.read(content_length)
663params = {key.decode("utf-8"): val.decode("utf-8")
664            for key, val in urllib.parse.parse_qsl(query_string)}
665
666print("%%s, %%s, %%s" %% (params["spam"], params["eggs"], params["bacon"]))
667"""
668
669cgi_file4 = """\
670#!%s
671import os
672
673print("Content-type: text/html")
674print()
675
676print(os.environ["%s"])
677"""
678
679cgi_file6 = """\
680#!%s
681import os
682
683print("X-ambv: was here")
684print("Content-type: text/html")
685print()
686print("<pre>")
687for k, v in os.environ.items():
688    try:
689        k.encode('ascii')
690        v.encode('ascii')
691    except UnicodeEncodeError:
692        continue  # see: BPO-44647
693    print(f"{k}={v}")
694print("</pre>")
695"""
696
697
698@unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
699        "This test can't be run reliably as root (issue #13308).")
700class CGIHTTPServerTestCase(BaseTestCase):
701    class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler):
702        pass
703
704    linesep = os.linesep.encode('ascii')
705
706    def setUp(self):
707        BaseTestCase.setUp(self)
708        self.cwd = os.getcwd()
709        self.parent_dir = tempfile.mkdtemp()
710        self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin')
711        self.cgi_child_dir = os.path.join(self.cgi_dir, 'child-dir')
712        self.sub_dir_1 = os.path.join(self.parent_dir, 'sub')
713        self.sub_dir_2 = os.path.join(self.sub_dir_1, 'dir')
714        self.cgi_dir_in_sub_dir = os.path.join(self.sub_dir_2, 'cgi-bin')
715        os.mkdir(self.cgi_dir)
716        os.mkdir(self.cgi_child_dir)
717        os.mkdir(self.sub_dir_1)
718        os.mkdir(self.sub_dir_2)
719        os.mkdir(self.cgi_dir_in_sub_dir)
720        self.nocgi_path = None
721        self.file1_path = None
722        self.file2_path = None
723        self.file3_path = None
724        self.file4_path = None
725        self.file5_path = None
726
727        # The shebang line should be pure ASCII: use symlink if possible.
728        # See issue #7668.
729        self._pythonexe_symlink = None
730        if os_helper.can_symlink():
731            self.pythonexe = os.path.join(self.parent_dir, 'python')
732            self._pythonexe_symlink = support.PythonSymlink(self.pythonexe).__enter__()
733        else:
734            self.pythonexe = sys.executable
735
736        try:
737            # The python executable path is written as the first line of the
738            # CGI Python script. The encoding cookie cannot be used, and so the
739            # path should be encodable to the default script encoding (utf-8)
740            self.pythonexe.encode('utf-8')
741        except UnicodeEncodeError:
742            self.tearDown()
743            self.skipTest("Python executable path is not encodable to utf-8")
744
745        self.nocgi_path = os.path.join(self.parent_dir, 'nocgi.py')
746        with open(self.nocgi_path, 'w', encoding='utf-8') as fp:
747            fp.write(cgi_file1 % self.pythonexe)
748        os.chmod(self.nocgi_path, 0o777)
749
750        self.file1_path = os.path.join(self.cgi_dir, 'file1.py')
751        with open(self.file1_path, 'w', encoding='utf-8') as file1:
752            file1.write(cgi_file1 % self.pythonexe)
753        os.chmod(self.file1_path, 0o777)
754
755        self.file2_path = os.path.join(self.cgi_dir, 'file2.py')
756        with open(self.file2_path, 'w', encoding='utf-8') as file2:
757            file2.write(cgi_file2 % self.pythonexe)
758        os.chmod(self.file2_path, 0o777)
759
760        self.file3_path = os.path.join(self.cgi_child_dir, 'file3.py')
761        with open(self.file3_path, 'w', encoding='utf-8') as file3:
762            file3.write(cgi_file1 % self.pythonexe)
763        os.chmod(self.file3_path, 0o777)
764
765        self.file4_path = os.path.join(self.cgi_dir, 'file4.py')
766        with open(self.file4_path, 'w', encoding='utf-8') as file4:
767            file4.write(cgi_file4 % (self.pythonexe, 'QUERY_STRING'))
768        os.chmod(self.file4_path, 0o777)
769
770        self.file5_path = os.path.join(self.cgi_dir_in_sub_dir, 'file5.py')
771        with open(self.file5_path, 'w', encoding='utf-8') as file5:
772            file5.write(cgi_file1 % self.pythonexe)
773        os.chmod(self.file5_path, 0o777)
774
775        self.file6_path = os.path.join(self.cgi_dir, 'file6.py')
776        with open(self.file6_path, 'w', encoding='utf-8') as file6:
777            file6.write(cgi_file6 % self.pythonexe)
778        os.chmod(self.file6_path, 0o777)
779
780        os.chdir(self.parent_dir)
781
782    def tearDown(self):
783        try:
784            os.chdir(self.cwd)
785            if self._pythonexe_symlink:
786                self._pythonexe_symlink.__exit__(None, None, None)
787            if self.nocgi_path:
788                os.remove(self.nocgi_path)
789            if self.file1_path:
790                os.remove(self.file1_path)
791            if self.file2_path:
792                os.remove(self.file2_path)
793            if self.file3_path:
794                os.remove(self.file3_path)
795            if self.file4_path:
796                os.remove(self.file4_path)
797            if self.file5_path:
798                os.remove(self.file5_path)
799            if self.file6_path:
800                os.remove(self.file6_path)
801            os.rmdir(self.cgi_child_dir)
802            os.rmdir(self.cgi_dir)
803            os.rmdir(self.cgi_dir_in_sub_dir)
804            os.rmdir(self.sub_dir_2)
805            os.rmdir(self.sub_dir_1)
806            os.rmdir(self.parent_dir)
807        finally:
808            BaseTestCase.tearDown(self)
809
810    def test_url_collapse_path(self):
811        # verify tail is the last portion and head is the rest on proper urls
812        test_vectors = {
813            '': '//',
814            '..': IndexError,
815            '/.//..': IndexError,
816            '/': '//',
817            '//': '//',
818            '/\\': '//\\',
819            '/.//': '//',
820            'cgi-bin/file1.py': '/cgi-bin/file1.py',
821            '/cgi-bin/file1.py': '/cgi-bin/file1.py',
822            'a': '//a',
823            '/a': '//a',
824            '//a': '//a',
825            './a': '//a',
826            './C:/': '/C:/',
827            '/a/b': '/a/b',
828            '/a/b/': '/a/b/',
829            '/a/b/.': '/a/b/',
830            '/a/b/c/..': '/a/b/',
831            '/a/b/c/../d': '/a/b/d',
832            '/a/b/c/../d/e/../f': '/a/b/d/f',
833            '/a/b/c/../d/e/../../f': '/a/b/f',
834            '/a/b/c/../d/e/.././././..//f': '/a/b/f',
835            '../a/b/c/../d/e/.././././..//f': IndexError,
836            '/a/b/c/../d/e/../../../f': '/a/f',
837            '/a/b/c/../d/e/../../../../f': '//f',
838            '/a/b/c/../d/e/../../../../../f': IndexError,
839            '/a/b/c/../d/e/../../../../f/..': '//',
840            '/a/b/c/../d/e/../../../../f/../.': '//',
841        }
842        for path, expected in test_vectors.items():
843            if isinstance(expected, type) and issubclass(expected, Exception):
844                self.assertRaises(expected,
845                                  server._url_collapse_path, path)
846            else:
847                actual = server._url_collapse_path(path)
848                self.assertEqual(expected, actual,
849                                 msg='path = %r\nGot:    %r\nWanted: %r' %
850                                 (path, actual, expected))
851
852    def test_headers_and_content(self):
853        res = self.request('/cgi-bin/file1.py')
854        self.assertEqual(
855            (res.read(), res.getheader('Content-type'), res.status),
856            (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK))
857
858    def test_issue19435(self):
859        res = self.request('///////////nocgi.py/../cgi-bin/nothere.sh')
860        self.assertEqual(res.status, HTTPStatus.NOT_FOUND)
861
862    def test_post(self):
863        params = urllib.parse.urlencode(
864            {'spam' : 1, 'eggs' : 'python', 'bacon' : 123456})
865        headers = {'Content-type' : 'application/x-www-form-urlencoded'}
866        res = self.request('/cgi-bin/file2.py', 'POST', params, headers)
867
868        self.assertEqual(res.read(), b'1, python, 123456' + self.linesep)
869
870    def test_invaliduri(self):
871        res = self.request('/cgi-bin/invalid')
872        res.read()
873        self.assertEqual(res.status, HTTPStatus.NOT_FOUND)
874
875    def test_authorization(self):
876        headers = {b'Authorization' : b'Basic ' +
877                   base64.b64encode(b'username:pass')}
878        res = self.request('/cgi-bin/file1.py', 'GET', headers=headers)
879        self.assertEqual(
880            (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
881            (res.read(), res.getheader('Content-type'), res.status))
882
883    def test_no_leading_slash(self):
884        # http://bugs.python.org/issue2254
885        res = self.request('cgi-bin/file1.py')
886        self.assertEqual(
887            (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
888            (res.read(), res.getheader('Content-type'), res.status))
889
890    def test_os_environ_is_not_altered(self):
891        signature = "Test CGI Server"
892        os.environ['SERVER_SOFTWARE'] = signature
893        res = self.request('/cgi-bin/file1.py')
894        self.assertEqual(
895            (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
896            (res.read(), res.getheader('Content-type'), res.status))
897        self.assertEqual(os.environ['SERVER_SOFTWARE'], signature)
898
899    def test_urlquote_decoding_in_cgi_check(self):
900        res = self.request('/cgi-bin%2ffile1.py')
901        self.assertEqual(
902            (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
903            (res.read(), res.getheader('Content-type'), res.status))
904
905    def test_nested_cgi_path_issue21323(self):
906        res = self.request('/cgi-bin/child-dir/file3.py')
907        self.assertEqual(
908            (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
909            (res.read(), res.getheader('Content-type'), res.status))
910
911    def test_query_with_multiple_question_mark(self):
912        res = self.request('/cgi-bin/file4.py?a=b?c=d')
913        self.assertEqual(
914            (b'a=b?c=d' + self.linesep, 'text/html', HTTPStatus.OK),
915            (res.read(), res.getheader('Content-type'), res.status))
916
917    def test_query_with_continuous_slashes(self):
918        res = self.request('/cgi-bin/file4.py?k=aa%2F%2Fbb&//q//p//=//a//b//')
919        self.assertEqual(
920            (b'k=aa%2F%2Fbb&//q//p//=//a//b//' + self.linesep,
921             'text/html', HTTPStatus.OK),
922            (res.read(), res.getheader('Content-type'), res.status))
923
924    def test_cgi_path_in_sub_directories(self):
925        try:
926            CGIHTTPRequestHandler.cgi_directories.append('/sub/dir/cgi-bin')
927            res = self.request('/sub/dir/cgi-bin/file5.py')
928            self.assertEqual(
929                (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
930                (res.read(), res.getheader('Content-type'), res.status))
931        finally:
932            CGIHTTPRequestHandler.cgi_directories.remove('/sub/dir/cgi-bin')
933
934    def test_accept(self):
935        browser_accept = \
936                    'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'
937        tests = (
938            ((('Accept', browser_accept),), browser_accept),
939            ((), ''),
940            # Hack case to get two values for the one header
941            ((('Accept', 'text/html'), ('ACCEPT', 'text/plain')),
942               'text/html,text/plain'),
943        )
944        for headers, expected in tests:
945            headers = OrderedDict(headers)
946            with self.subTest(headers):
947                res = self.request('/cgi-bin/file6.py', 'GET', headers=headers)
948                self.assertEqual(http.HTTPStatus.OK, res.status)
949                expected = f"HTTP_ACCEPT={expected}".encode('ascii')
950                self.assertIn(expected, res.read())
951
952
953class SocketlessRequestHandler(SimpleHTTPRequestHandler):
954    def __init__(self, directory=None):
955        request = mock.Mock()
956        request.makefile.return_value = BytesIO()
957        super().__init__(request, None, None, directory=directory)
958
959        self.get_called = False
960        self.protocol_version = "HTTP/1.1"
961
962    def do_GET(self):
963        self.get_called = True
964        self.send_response(HTTPStatus.OK)
965        self.send_header('Content-Type', 'text/html')
966        self.end_headers()
967        self.wfile.write(b'<html><body>Data</body></html>\r\n')
968
969    def log_message(self, format, *args):
970        pass
971
972class RejectingSocketlessRequestHandler(SocketlessRequestHandler):
973    def handle_expect_100(self):
974        self.send_error(HTTPStatus.EXPECTATION_FAILED)
975        return False
976
977
978class AuditableBytesIO:
979
980    def __init__(self):
981        self.datas = []
982
983    def write(self, data):
984        self.datas.append(data)
985
986    def getData(self):
987        return b''.join(self.datas)
988
989    @property
990    def numWrites(self):
991        return len(self.datas)
992
993
994class BaseHTTPRequestHandlerTestCase(unittest.TestCase):
995    """Test the functionality of the BaseHTTPServer.
996
997       Test the support for the Expect 100-continue header.
998       """
999
1000    HTTPResponseMatch = re.compile(b'HTTP/1.[0-9]+ 200 OK')
1001
1002    def setUp (self):
1003        self.handler = SocketlessRequestHandler()
1004
1005    def send_typical_request(self, message):
1006        input = BytesIO(message)
1007        output = BytesIO()
1008        self.handler.rfile = input
1009        self.handler.wfile = output
1010        self.handler.handle_one_request()
1011        output.seek(0)
1012        return output.readlines()
1013
1014    def verify_get_called(self):
1015        self.assertTrue(self.handler.get_called)
1016
1017    def verify_expected_headers(self, headers):
1018        for fieldName in b'Server: ', b'Date: ', b'Content-Type: ':
1019            self.assertEqual(sum(h.startswith(fieldName) for h in headers), 1)
1020
1021    def verify_http_server_response(self, response):
1022        match = self.HTTPResponseMatch.search(response)
1023        self.assertIsNotNone(match)
1024
1025    def test_unprintable_not_logged(self):
1026        # We call the method from the class directly as our Socketless
1027        # Handler subclass overrode it... nice for everything BUT this test.
1028        self.handler.client_address = ('127.0.0.1', 1337)
1029        log_message = BaseHTTPRequestHandler.log_message
1030        with mock.patch.object(sys, 'stderr', StringIO()) as fake_stderr:
1031            log_message(self.handler, '/foo')
1032            log_message(self.handler, '/\033bar\000\033')
1033            log_message(self.handler, '/spam %s.', 'a')
1034            log_message(self.handler, '/spam %s.', '\033\x7f\x9f\xa0beans')
1035            log_message(self.handler, '"GET /foo\\b"ar\007 HTTP/1.0"')
1036        stderr = fake_stderr.getvalue()
1037        self.assertNotIn('\033', stderr)  # non-printable chars are caught.
1038        self.assertNotIn('\000', stderr)  # non-printable chars are caught.
1039        lines = stderr.splitlines()
1040        self.assertIn('/foo', lines[0])
1041        self.assertIn(r'/\x1bbar\x00\x1b', lines[1])
1042        self.assertIn('/spam a.', lines[2])
1043        self.assertIn('/spam \\x1b\\x7f\\x9f\xa0beans.', lines[3])
1044        self.assertIn(r'"GET /foo\\b"ar\x07 HTTP/1.0"', lines[4])
1045
1046    def test_http_1_1(self):
1047        result = self.send_typical_request(b'GET / HTTP/1.1\r\n\r\n')
1048        self.verify_http_server_response(result[0])
1049        self.verify_expected_headers(result[1:-1])
1050        self.verify_get_called()
1051        self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
1052        self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
1053        self.assertEqual(self.handler.command, 'GET')
1054        self.assertEqual(self.handler.path, '/')
1055        self.assertEqual(self.handler.request_version, 'HTTP/1.1')
1056        self.assertSequenceEqual(self.handler.headers.items(), ())
1057
1058    def test_http_1_0(self):
1059        result = self.send_typical_request(b'GET / HTTP/1.0\r\n\r\n')
1060        self.verify_http_server_response(result[0])
1061        self.verify_expected_headers(result[1:-1])
1062        self.verify_get_called()
1063        self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
1064        self.assertEqual(self.handler.requestline, 'GET / HTTP/1.0')
1065        self.assertEqual(self.handler.command, 'GET')
1066        self.assertEqual(self.handler.path, '/')
1067        self.assertEqual(self.handler.request_version, 'HTTP/1.0')
1068        self.assertSequenceEqual(self.handler.headers.items(), ())
1069
1070    def test_http_0_9(self):
1071        result = self.send_typical_request(b'GET / HTTP/0.9\r\n\r\n')
1072        self.assertEqual(len(result), 1)
1073        self.assertEqual(result[0], b'<html><body>Data</body></html>\r\n')
1074        self.verify_get_called()
1075
1076    def test_extra_space(self):
1077        result = self.send_typical_request(
1078            b'GET /spaced out HTTP/1.1\r\n'
1079            b'Host: dummy\r\n'
1080            b'\r\n'
1081        )
1082        self.assertTrue(result[0].startswith(b'HTTP/1.1 400 '))
1083        self.verify_expected_headers(result[1:result.index(b'\r\n')])
1084        self.assertFalse(self.handler.get_called)
1085
1086    def test_with_continue_1_0(self):
1087        result = self.send_typical_request(b'GET / HTTP/1.0\r\nExpect: 100-continue\r\n\r\n')
1088        self.verify_http_server_response(result[0])
1089        self.verify_expected_headers(result[1:-1])
1090        self.verify_get_called()
1091        self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
1092        self.assertEqual(self.handler.requestline, 'GET / HTTP/1.0')
1093        self.assertEqual(self.handler.command, 'GET')
1094        self.assertEqual(self.handler.path, '/')
1095        self.assertEqual(self.handler.request_version, 'HTTP/1.0')
1096        headers = (("Expect", "100-continue"),)
1097        self.assertSequenceEqual(self.handler.headers.items(), headers)
1098
1099    def test_with_continue_1_1(self):
1100        result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n')
1101        self.assertEqual(result[0], b'HTTP/1.1 100 Continue\r\n')
1102        self.assertEqual(result[1], b'\r\n')
1103        self.assertEqual(result[2], b'HTTP/1.1 200 OK\r\n')
1104        self.verify_expected_headers(result[2:-1])
1105        self.verify_get_called()
1106        self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
1107        self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
1108        self.assertEqual(self.handler.command, 'GET')
1109        self.assertEqual(self.handler.path, '/')
1110        self.assertEqual(self.handler.request_version, 'HTTP/1.1')
1111        headers = (("Expect", "100-continue"),)
1112        self.assertSequenceEqual(self.handler.headers.items(), headers)
1113
1114    def test_header_buffering_of_send_error(self):
1115
1116        input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
1117        output = AuditableBytesIO()
1118        handler = SocketlessRequestHandler()
1119        handler.rfile = input
1120        handler.wfile = output
1121        handler.request_version = 'HTTP/1.1'
1122        handler.requestline = ''
1123        handler.command = None
1124
1125        handler.send_error(418)
1126        self.assertEqual(output.numWrites, 2)
1127
1128    def test_header_buffering_of_send_response_only(self):
1129
1130        input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
1131        output = AuditableBytesIO()
1132        handler = SocketlessRequestHandler()
1133        handler.rfile = input
1134        handler.wfile = output
1135        handler.request_version = 'HTTP/1.1'
1136
1137        handler.send_response_only(418)
1138        self.assertEqual(output.numWrites, 0)
1139        handler.end_headers()
1140        self.assertEqual(output.numWrites, 1)
1141
1142    def test_header_buffering_of_send_header(self):
1143
1144        input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
1145        output = AuditableBytesIO()
1146        handler = SocketlessRequestHandler()
1147        handler.rfile = input
1148        handler.wfile = output
1149        handler.request_version = 'HTTP/1.1'
1150
1151        handler.send_header('Foo', 'foo')
1152        handler.send_header('bar', 'bar')
1153        self.assertEqual(output.numWrites, 0)
1154        handler.end_headers()
1155        self.assertEqual(output.getData(), b'Foo: foo\r\nbar: bar\r\n\r\n')
1156        self.assertEqual(output.numWrites, 1)
1157
1158    def test_header_unbuffered_when_continue(self):
1159
1160        def _readAndReseek(f):
1161            pos = f.tell()
1162            f.seek(0)
1163            data = f.read()
1164            f.seek(pos)
1165            return data
1166
1167        input = BytesIO(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n')
1168        output = BytesIO()
1169        self.handler.rfile = input
1170        self.handler.wfile = output
1171        self.handler.request_version = 'HTTP/1.1'
1172
1173        self.handler.handle_one_request()
1174        self.assertNotEqual(_readAndReseek(output), b'')
1175        result = _readAndReseek(output).split(b'\r\n')
1176        self.assertEqual(result[0], b'HTTP/1.1 100 Continue')
1177        self.assertEqual(result[1], b'')
1178        self.assertEqual(result[2], b'HTTP/1.1 200 OK')
1179
1180    def test_with_continue_rejected(self):
1181        usual_handler = self.handler        # Save to avoid breaking any subsequent tests.
1182        self.handler = RejectingSocketlessRequestHandler()
1183        result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n')
1184        self.assertEqual(result[0], b'HTTP/1.1 417 Expectation Failed\r\n')
1185        self.verify_expected_headers(result[1:-1])
1186        # The expect handler should short circuit the usual get method by
1187        # returning false here, so get_called should be false
1188        self.assertFalse(self.handler.get_called)
1189        self.assertEqual(sum(r == b'Connection: close\r\n' for r in result[1:-1]), 1)
1190        self.handler = usual_handler        # Restore to avoid breaking any subsequent tests.
1191
1192    def test_request_length(self):
1193        # Issue #10714: huge request lines are discarded, to avoid Denial
1194        # of Service attacks.
1195        result = self.send_typical_request(b'GET ' + b'x' * 65537)
1196        self.assertEqual(result[0], b'HTTP/1.1 414 Request-URI Too Long\r\n')
1197        self.assertFalse(self.handler.get_called)
1198        self.assertIsInstance(self.handler.requestline, str)
1199
1200    def test_header_length(self):
1201        # Issue #6791: same for headers
1202        result = self.send_typical_request(
1203            b'GET / HTTP/1.1\r\nX-Foo: bar' + b'r' * 65537 + b'\r\n\r\n')
1204        self.assertEqual(result[0], b'HTTP/1.1 431 Line too long\r\n')
1205        self.assertFalse(self.handler.get_called)
1206        self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
1207
1208    def test_too_many_headers(self):
1209        result = self.send_typical_request(
1210            b'GET / HTTP/1.1\r\n' + b'X-Foo: bar\r\n' * 101 + b'\r\n')
1211        self.assertEqual(result[0], b'HTTP/1.1 431 Too many headers\r\n')
1212        self.assertFalse(self.handler.get_called)
1213        self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
1214
1215    def test_html_escape_on_error(self):
1216        result = self.send_typical_request(
1217            b'<script>alert("hello")</script> / HTTP/1.1')
1218        result = b''.join(result)
1219        text = '<script>alert("hello")</script>'
1220        self.assertIn(html.escape(text, quote=False).encode('ascii'), result)
1221
1222    def test_close_connection(self):
1223        # handle_one_request() should be repeatedly called until
1224        # it sets close_connection
1225        def handle_one_request():
1226            self.handler.close_connection = next(close_values)
1227        self.handler.handle_one_request = handle_one_request
1228
1229        close_values = iter((True,))
1230        self.handler.handle()
1231        self.assertRaises(StopIteration, next, close_values)
1232
1233        close_values = iter((False, False, True))
1234        self.handler.handle()
1235        self.assertRaises(StopIteration, next, close_values)
1236
1237    def test_date_time_string(self):
1238        now = time.time()
1239        # this is the old code that formats the timestamp
1240        year, month, day, hh, mm, ss, wd, y, z = time.gmtime(now)
1241        expected = "%s, %02d %3s %4d %02d:%02d:%02d GMT" % (
1242            self.handler.weekdayname[wd],
1243            day,
1244            self.handler.monthname[month],
1245            year, hh, mm, ss
1246        )
1247        self.assertEqual(self.handler.date_time_string(timestamp=now), expected)
1248
1249
1250class SimpleHTTPRequestHandlerTestCase(unittest.TestCase):
1251    """ Test url parsing """
1252    def setUp(self):
1253        self.translated_1 = os.path.join(os.getcwd(), 'filename')
1254        self.translated_2 = os.path.join('foo', 'filename')
1255        self.translated_3 = os.path.join('bar', 'filename')
1256        self.handler_1 = SocketlessRequestHandler()
1257        self.handler_2 = SocketlessRequestHandler(directory='foo')
1258        self.handler_3 = SocketlessRequestHandler(directory=pathlib.PurePath('bar'))
1259
1260    def test_query_arguments(self):
1261        path = self.handler_1.translate_path('/filename')
1262        self.assertEqual(path, self.translated_1)
1263        path = self.handler_2.translate_path('/filename')
1264        self.assertEqual(path, self.translated_2)
1265        path = self.handler_3.translate_path('/filename')
1266        self.assertEqual(path, self.translated_3)
1267
1268        path = self.handler_1.translate_path('/filename?foo=bar')
1269        self.assertEqual(path, self.translated_1)
1270        path = self.handler_2.translate_path('/filename?foo=bar')
1271        self.assertEqual(path, self.translated_2)
1272        path = self.handler_3.translate_path('/filename?foo=bar')
1273        self.assertEqual(path, self.translated_3)
1274
1275        path = self.handler_1.translate_path('/filename?a=b&spam=eggs#zot')
1276        self.assertEqual(path, self.translated_1)
1277        path = self.handler_2.translate_path('/filename?a=b&spam=eggs#zot')
1278        self.assertEqual(path, self.translated_2)
1279        path = self.handler_3.translate_path('/filename?a=b&spam=eggs#zot')
1280        self.assertEqual(path, self.translated_3)
1281
1282    def test_start_with_double_slash(self):
1283        path = self.handler_1.translate_path('//filename')
1284        self.assertEqual(path, self.translated_1)
1285        path = self.handler_2.translate_path('//filename')
1286        self.assertEqual(path, self.translated_2)
1287        path = self.handler_3.translate_path('//filename')
1288        self.assertEqual(path, self.translated_3)
1289
1290        path = self.handler_1.translate_path('//filename?foo=bar')
1291        self.assertEqual(path, self.translated_1)
1292        path = self.handler_2.translate_path('//filename?foo=bar')
1293        self.assertEqual(path, self.translated_2)
1294        path = self.handler_3.translate_path('//filename?foo=bar')
1295        self.assertEqual(path, self.translated_3)
1296
1297    def test_windows_colon(self):
1298        with support.swap_attr(server.os, 'path', ntpath):
1299            path = self.handler_1.translate_path('c:c:c:foo/filename')
1300            path = path.replace(ntpath.sep, os.sep)
1301            self.assertEqual(path, self.translated_1)
1302            path = self.handler_2.translate_path('c:c:c:foo/filename')
1303            path = path.replace(ntpath.sep, os.sep)
1304            self.assertEqual(path, self.translated_2)
1305            path = self.handler_3.translate_path('c:c:c:foo/filename')
1306            path = path.replace(ntpath.sep, os.sep)
1307            self.assertEqual(path, self.translated_3)
1308
1309            path = self.handler_1.translate_path('\\c:../filename')
1310            path = path.replace(ntpath.sep, os.sep)
1311            self.assertEqual(path, self.translated_1)
1312            path = self.handler_2.translate_path('\\c:../filename')
1313            path = path.replace(ntpath.sep, os.sep)
1314            self.assertEqual(path, self.translated_2)
1315            path = self.handler_3.translate_path('\\c:../filename')
1316            path = path.replace(ntpath.sep, os.sep)
1317            self.assertEqual(path, self.translated_3)
1318
1319            path = self.handler_1.translate_path('c:\\c:..\\foo/filename')
1320            path = path.replace(ntpath.sep, os.sep)
1321            self.assertEqual(path, self.translated_1)
1322            path = self.handler_2.translate_path('c:\\c:..\\foo/filename')
1323            path = path.replace(ntpath.sep, os.sep)
1324            self.assertEqual(path, self.translated_2)
1325            path = self.handler_3.translate_path('c:\\c:..\\foo/filename')
1326            path = path.replace(ntpath.sep, os.sep)
1327            self.assertEqual(path, self.translated_3)
1328
1329            path = self.handler_1.translate_path('c:c:foo\\c:c:bar/filename')
1330            path = path.replace(ntpath.sep, os.sep)
1331            self.assertEqual(path, self.translated_1)
1332            path = self.handler_2.translate_path('c:c:foo\\c:c:bar/filename')
1333            path = path.replace(ntpath.sep, os.sep)
1334            self.assertEqual(path, self.translated_2)
1335            path = self.handler_3.translate_path('c:c:foo\\c:c:bar/filename')
1336            path = path.replace(ntpath.sep, os.sep)
1337            self.assertEqual(path, self.translated_3)
1338
1339
1340class MiscTestCase(unittest.TestCase):
1341    def test_all(self):
1342        expected = []
1343        denylist = {'executable', 'nobody_uid', 'test'}
1344        for name in dir(server):
1345            if name.startswith('_') or name in denylist:
1346                continue
1347            module_object = getattr(server, name)
1348            if getattr(module_object, '__module__', None) == 'http.server':
1349                expected.append(name)
1350        self.assertCountEqual(server.__all__, expected)
1351
1352
1353class ScriptTestCase(unittest.TestCase):
1354
1355    def mock_server_class(self):
1356        return mock.MagicMock(
1357            return_value=mock.MagicMock(
1358                __enter__=mock.MagicMock(
1359                    return_value=mock.MagicMock(
1360                        socket=mock.MagicMock(
1361                            getsockname=lambda: ('', 0),
1362                        ),
1363                    ),
1364                ),
1365            ),
1366        )
1367
1368    @mock.patch('builtins.print')
1369    def test_server_test_unspec(self, _):
1370        mock_server = self.mock_server_class()
1371        server.test(ServerClass=mock_server, bind=None)
1372        self.assertIn(
1373            mock_server.address_family,
1374            (socket.AF_INET6, socket.AF_INET),
1375        )
1376
1377    @mock.patch('builtins.print')
1378    def test_server_test_localhost(self, _):
1379        mock_server = self.mock_server_class()
1380        server.test(ServerClass=mock_server, bind="localhost")
1381        self.assertIn(
1382            mock_server.address_family,
1383            (socket.AF_INET6, socket.AF_INET),
1384        )
1385
1386    ipv6_addrs = (
1387        "::",
1388        "2001:0db8:85a3:0000:0000:8a2e:0370:7334",
1389        "::1",
1390    )
1391
1392    ipv4_addrs = (
1393        "0.0.0.0",
1394        "8.8.8.8",
1395        "127.0.0.1",
1396    )
1397
1398    @mock.patch('builtins.print')
1399    def test_server_test_ipv6(self, _):
1400        for bind in self.ipv6_addrs:
1401            mock_server = self.mock_server_class()
1402            server.test(ServerClass=mock_server, bind=bind)
1403            self.assertEqual(mock_server.address_family, socket.AF_INET6)
1404
1405    @mock.patch('builtins.print')
1406    def test_server_test_ipv4(self, _):
1407        for bind in self.ipv4_addrs:
1408            mock_server = self.mock_server_class()
1409            server.test(ServerClass=mock_server, bind=bind)
1410            self.assertEqual(mock_server.address_family, socket.AF_INET)
1411
1412
1413def setUpModule():
1414    unittest.addModuleCleanup(os.chdir, os.getcwd())
1415
1416
1417if __name__ == '__main__':
1418    unittest.main()
1419