1"""Unittests for the various HTTPServer modules. 2 3Written by Cody A.W. Somerville <[email protected]>, 4Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest. 5""" 6from collections import OrderedDict 7from http.server import BaseHTTPRequestHandler, HTTPServer, \ 8 SimpleHTTPRequestHandler, CGIHTTPRequestHandler 9from http import server, HTTPStatus 10 11import os 12import socket 13import sys 14import re 15import base64 16import ntpath 17import pathlib 18import shutil 19import email.message 20import email.utils 21import html 22import http, http.client 23import urllib.parse 24import tempfile 25import time 26import datetime 27import threading 28from unittest import mock 29from io import BytesIO, StringIO 30 31import unittest 32from test import support 33from test.support import os_helper 34from test.support import threading_helper 35 36support.requires_working_socket(module=True) 37 38class NoLogRequestHandler: 39 def log_message(self, *args): 40 # don't write log messages to stderr 41 pass 42 43 def read(self, n=None): 44 return '' 45 46 47class TestServerThread(threading.Thread): 48 def __init__(self, test_object, request_handler): 49 threading.Thread.__init__(self) 50 self.request_handler = request_handler 51 self.test_object = test_object 52 53 def run(self): 54 self.server = HTTPServer(('localhost', 0), self.request_handler) 55 self.test_object.HOST, self.test_object.PORT = self.server.socket.getsockname() 56 self.test_object.server_started.set() 57 self.test_object = None 58 try: 59 self.server.serve_forever(0.05) 60 finally: 61 self.server.server_close() 62 63 def stop(self): 64 self.server.shutdown() 65 self.join() 66 67 68class BaseTestCase(unittest.TestCase): 69 def setUp(self): 70 self._threads = threading_helper.threading_setup() 71 os.environ = os_helper.EnvironmentVarGuard() 72 self.server_started = threading.Event() 73 self.thread = TestServerThread(self, self.request_handler) 74 self.thread.start() 75 self.server_started.wait() 76 77 def tearDown(self): 78 self.thread.stop() 79 self.thread = None 80 os.environ.__exit__() 81 threading_helper.threading_cleanup(*self._threads) 82 83 def request(self, uri, method='GET', body=None, headers={}): 84 self.connection = http.client.HTTPConnection(self.HOST, self.PORT) 85 self.connection.request(method, uri, body, headers) 86 return self.connection.getresponse() 87 88 89class BaseHTTPServerTestCase(BaseTestCase): 90 class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler): 91 protocol_version = 'HTTP/1.1' 92 default_request_version = 'HTTP/1.1' 93 94 def do_TEST(self): 95 self.send_response(HTTPStatus.NO_CONTENT) 96 self.send_header('Content-Type', 'text/html') 97 self.send_header('Connection', 'close') 98 self.end_headers() 99 100 def do_KEEP(self): 101 self.send_response(HTTPStatus.NO_CONTENT) 102 self.send_header('Content-Type', 'text/html') 103 self.send_header('Connection', 'keep-alive') 104 self.end_headers() 105 106 def do_KEYERROR(self): 107 self.send_error(999) 108 109 def do_NOTFOUND(self): 110 self.send_error(HTTPStatus.NOT_FOUND) 111 112 def do_EXPLAINERROR(self): 113 self.send_error(999, "Short Message", 114 "This is a long \n explanation") 115 116 def do_CUSTOM(self): 117 self.send_response(999) 118 self.send_header('Content-Type', 'text/html') 119 self.send_header('Connection', 'close') 120 self.end_headers() 121 122 def do_LATINONEHEADER(self): 123 self.send_response(999) 124 self.send_header('X-Special', 'Dängerous Mind') 125 self.send_header('Connection', 'close') 126 self.end_headers() 127 body = self.headers['x-special-incoming'].encode('utf-8') 128 self.wfile.write(body) 129 130 def do_SEND_ERROR(self): 131 self.send_error(int(self.path[1:])) 132 133 def do_HEAD(self): 134 self.send_error(int(self.path[1:])) 135 136 def setUp(self): 137 BaseTestCase.setUp(self) 138 self.con = http.client.HTTPConnection(self.HOST, self.PORT) 139 self.con.connect() 140 141 def test_command(self): 142 self.con.request('GET', '/') 143 res = self.con.getresponse() 144 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) 145 146 def test_request_line_trimming(self): 147 self.con._http_vsn_str = 'HTTP/1.1\n' 148 self.con.putrequest('XYZBOGUS', '/') 149 self.con.endheaders() 150 res = self.con.getresponse() 151 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) 152 153 def test_version_bogus(self): 154 self.con._http_vsn_str = 'FUBAR' 155 self.con.putrequest('GET', '/') 156 self.con.endheaders() 157 res = self.con.getresponse() 158 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST) 159 160 def test_version_digits(self): 161 self.con._http_vsn_str = 'HTTP/9.9.9' 162 self.con.putrequest('GET', '/') 163 self.con.endheaders() 164 res = self.con.getresponse() 165 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST) 166 167 def test_version_signs_and_underscores(self): 168 self.con._http_vsn_str = 'HTTP/-9_9_9.+9_9_9' 169 self.con.putrequest('GET', '/') 170 self.con.endheaders() 171 res = self.con.getresponse() 172 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST) 173 174 def test_major_version_number_too_long(self): 175 self.con._http_vsn_str = 'HTTP/909876543210.0' 176 self.con.putrequest('GET', '/') 177 self.con.endheaders() 178 res = self.con.getresponse() 179 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST) 180 181 def test_minor_version_number_too_long(self): 182 self.con._http_vsn_str = 'HTTP/1.909876543210' 183 self.con.putrequest('GET', '/') 184 self.con.endheaders() 185 res = self.con.getresponse() 186 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST) 187 188 def test_version_none_get(self): 189 self.con._http_vsn_str = '' 190 self.con.putrequest('GET', '/') 191 self.con.endheaders() 192 res = self.con.getresponse() 193 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) 194 195 def test_version_none(self): 196 # Test that a valid method is rejected when not HTTP/1.x 197 self.con._http_vsn_str = '' 198 self.con.putrequest('CUSTOM', '/') 199 self.con.endheaders() 200 res = self.con.getresponse() 201 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST) 202 203 def test_version_invalid(self): 204 self.con._http_vsn = 99 205 self.con._http_vsn_str = 'HTTP/9.9' 206 self.con.putrequest('GET', '/') 207 self.con.endheaders() 208 res = self.con.getresponse() 209 self.assertEqual(res.status, HTTPStatus.HTTP_VERSION_NOT_SUPPORTED) 210 211 def test_send_blank(self): 212 self.con._http_vsn_str = '' 213 self.con.putrequest('', '') 214 self.con.endheaders() 215 res = self.con.getresponse() 216 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST) 217 218 def test_header_close(self): 219 self.con.putrequest('GET', '/') 220 self.con.putheader('Connection', 'close') 221 self.con.endheaders() 222 res = self.con.getresponse() 223 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) 224 225 def test_header_keep_alive(self): 226 self.con._http_vsn_str = 'HTTP/1.1' 227 self.con.putrequest('GET', '/') 228 self.con.putheader('Connection', 'keep-alive') 229 self.con.endheaders() 230 res = self.con.getresponse() 231 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) 232 233 def test_handler(self): 234 self.con.request('TEST', '/') 235 res = self.con.getresponse() 236 self.assertEqual(res.status, HTTPStatus.NO_CONTENT) 237 238 def test_return_header_keep_alive(self): 239 self.con.request('KEEP', '/') 240 res = self.con.getresponse() 241 self.assertEqual(res.getheader('Connection'), 'keep-alive') 242 self.con.request('TEST', '/') 243 self.addCleanup(self.con.close) 244 245 def test_internal_key_error(self): 246 self.con.request('KEYERROR', '/') 247 res = self.con.getresponse() 248 self.assertEqual(res.status, 999) 249 250 def test_return_custom_status(self): 251 self.con.request('CUSTOM', '/') 252 res = self.con.getresponse() 253 self.assertEqual(res.status, 999) 254 255 def test_return_explain_error(self): 256 self.con.request('EXPLAINERROR', '/') 257 res = self.con.getresponse() 258 self.assertEqual(res.status, 999) 259 self.assertTrue(int(res.getheader('Content-Length'))) 260 261 def test_latin1_header(self): 262 self.con.request('LATINONEHEADER', '/', headers={ 263 'X-Special-Incoming': 'Ärger mit Unicode' 264 }) 265 res = self.con.getresponse() 266 self.assertEqual(res.getheader('X-Special'), 'Dängerous Mind') 267 self.assertEqual(res.read(), 'Ärger mit Unicode'.encode('utf-8')) 268 269 def test_error_content_length(self): 270 # Issue #16088: standard error responses should have a content-length 271 self.con.request('NOTFOUND', '/') 272 res = self.con.getresponse() 273 self.assertEqual(res.status, HTTPStatus.NOT_FOUND) 274 275 data = res.read() 276 self.assertEqual(int(res.getheader('Content-Length')), len(data)) 277 278 def test_send_error(self): 279 allow_transfer_encoding_codes = (HTTPStatus.NOT_MODIFIED, 280 HTTPStatus.RESET_CONTENT) 281 for code in (HTTPStatus.NO_CONTENT, HTTPStatus.NOT_MODIFIED, 282 HTTPStatus.PROCESSING, HTTPStatus.RESET_CONTENT, 283 HTTPStatus.SWITCHING_PROTOCOLS): 284 self.con.request('SEND_ERROR', '/{}'.format(code)) 285 res = self.con.getresponse() 286 self.assertEqual(code, res.status) 287 self.assertEqual(None, res.getheader('Content-Length')) 288 self.assertEqual(None, res.getheader('Content-Type')) 289 if code not in allow_transfer_encoding_codes: 290 self.assertEqual(None, res.getheader('Transfer-Encoding')) 291 292 data = res.read() 293 self.assertEqual(b'', data) 294 295 def test_head_via_send_error(self): 296 allow_transfer_encoding_codes = (HTTPStatus.NOT_MODIFIED, 297 HTTPStatus.RESET_CONTENT) 298 for code in (HTTPStatus.OK, HTTPStatus.NO_CONTENT, 299 HTTPStatus.NOT_MODIFIED, HTTPStatus.RESET_CONTENT, 300 HTTPStatus.SWITCHING_PROTOCOLS): 301 self.con.request('HEAD', '/{}'.format(code)) 302 res = self.con.getresponse() 303 self.assertEqual(code, res.status) 304 if code == HTTPStatus.OK: 305 self.assertTrue(int(res.getheader('Content-Length')) > 0) 306 self.assertIn('text/html', res.getheader('Content-Type')) 307 else: 308 self.assertEqual(None, res.getheader('Content-Length')) 309 self.assertEqual(None, res.getheader('Content-Type')) 310 if code not in allow_transfer_encoding_codes: 311 self.assertEqual(None, res.getheader('Transfer-Encoding')) 312 313 data = res.read() 314 self.assertEqual(b'', data) 315 316 317class RequestHandlerLoggingTestCase(BaseTestCase): 318 class request_handler(BaseHTTPRequestHandler): 319 protocol_version = 'HTTP/1.1' 320 default_request_version = 'HTTP/1.1' 321 322 def do_GET(self): 323 self.send_response(HTTPStatus.OK) 324 self.end_headers() 325 326 def do_ERROR(self): 327 self.send_error(HTTPStatus.NOT_FOUND, 'File not found') 328 329 def test_get(self): 330 self.con = http.client.HTTPConnection(self.HOST, self.PORT) 331 self.con.connect() 332 333 with support.captured_stderr() as err: 334 self.con.request('GET', '/') 335 self.con.getresponse() 336 337 self.assertTrue( 338 err.getvalue().endswith('"GET / HTTP/1.1" 200 -\n')) 339 340 def test_err(self): 341 self.con = http.client.HTTPConnection(self.HOST, self.PORT) 342 self.con.connect() 343 344 with support.captured_stderr() as err: 345 self.con.request('ERROR', '/') 346 self.con.getresponse() 347 348 lines = err.getvalue().split('\n') 349 self.assertTrue(lines[0].endswith('code 404, message File not found')) 350 self.assertTrue(lines[1].endswith('"ERROR / HTTP/1.1" 404 -')) 351 352 353class SimpleHTTPServerTestCase(BaseTestCase): 354 class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler): 355 pass 356 357 def setUp(self): 358 super().setUp() 359 self.cwd = os.getcwd() 360 basetempdir = tempfile.gettempdir() 361 os.chdir(basetempdir) 362 self.data = b'We are the knights who say Ni!' 363 self.tempdir = tempfile.mkdtemp(dir=basetempdir) 364 self.tempdir_name = os.path.basename(self.tempdir) 365 self.base_url = '/' + self.tempdir_name 366 tempname = os.path.join(self.tempdir, 'test') 367 with open(tempname, 'wb') as temp: 368 temp.write(self.data) 369 temp.flush() 370 mtime = os.stat(tempname).st_mtime 371 # compute last modification datetime for browser cache tests 372 last_modif = datetime.datetime.fromtimestamp(mtime, 373 datetime.timezone.utc) 374 self.last_modif_datetime = last_modif.replace(microsecond=0) 375 self.last_modif_header = email.utils.formatdate( 376 last_modif.timestamp(), usegmt=True) 377 378 def tearDown(self): 379 try: 380 os.chdir(self.cwd) 381 try: 382 shutil.rmtree(self.tempdir) 383 except: 384 pass 385 finally: 386 super().tearDown() 387 388 def check_status_and_reason(self, response, status, data=None): 389 def close_conn(): 390 """Don't close reader yet so we can check if there was leftover 391 buffered input""" 392 nonlocal reader 393 reader = response.fp 394 response.fp = None 395 reader = None 396 response._close_conn = close_conn 397 398 body = response.read() 399 self.assertTrue(response) 400 self.assertEqual(response.status, status) 401 self.assertIsNotNone(response.reason) 402 if data: 403 self.assertEqual(data, body) 404 # Ensure the server has not set up a persistent connection, and has 405 # not sent any extra data 406 self.assertEqual(response.version, 10) 407 self.assertEqual(response.msg.get("Connection", "close"), "close") 408 self.assertEqual(reader.read(30), b'', 'Connection should be closed') 409 410 reader.close() 411 return body 412 413 @unittest.skipIf(sys.platform == 'darwin', 414 'undecodable name cannot always be decoded on macOS') 415 @unittest.skipIf(sys.platform == 'win32', 416 'undecodable name cannot be decoded on win32') 417 @unittest.skipUnless(os_helper.TESTFN_UNDECODABLE, 418 'need os_helper.TESTFN_UNDECODABLE') 419 def test_undecodable_filename(self): 420 enc = sys.getfilesystemencoding() 421 filename = os.fsdecode(os_helper.TESTFN_UNDECODABLE) + '.txt' 422 with open(os.path.join(self.tempdir, filename), 'wb') as f: 423 f.write(os_helper.TESTFN_UNDECODABLE) 424 response = self.request(self.base_url + '/') 425 if sys.platform == 'darwin': 426 # On Mac OS the HFS+ filesystem replaces bytes that aren't valid 427 # UTF-8 into a percent-encoded value. 428 for name in os.listdir(self.tempdir): 429 if name != 'test': # Ignore a filename created in setUp(). 430 filename = name 431 break 432 body = self.check_status_and_reason(response, HTTPStatus.OK) 433 quotedname = urllib.parse.quote(filename, errors='surrogatepass') 434 self.assertIn(('href="%s"' % quotedname) 435 .encode(enc, 'surrogateescape'), body) 436 self.assertIn(('>%s<' % html.escape(filename, quote=False)) 437 .encode(enc, 'surrogateescape'), body) 438 response = self.request(self.base_url + '/' + quotedname) 439 self.check_status_and_reason(response, HTTPStatus.OK, 440 data=os_helper.TESTFN_UNDECODABLE) 441 442 def test_undecodable_parameter(self): 443 # sanity check using a valid parameter 444 response = self.request(self.base_url + '/?x=123').read() 445 self.assertRegex(response, f'listing for {self.base_url}/\?x=123'.encode('latin1')) 446 # now the bogus encoding 447 response = self.request(self.base_url + '/?x=%bb').read() 448 self.assertRegex(response, f'listing for {self.base_url}/\?x=\xef\xbf\xbd'.encode('latin1')) 449 450 def test_get_dir_redirect_location_domain_injection_bug(self): 451 """Ensure //evil.co/..%2f../../X does not put //evil.co/ in Location. 452 453 //netloc/ in a Location header is a redirect to a new host. 454 https://github.com/python/cpython/issues/87389 455 456 This checks that a path resolving to a directory on our server cannot 457 resolve into a redirect to another server. 458 """ 459 os.mkdir(os.path.join(self.tempdir, 'existing_directory')) 460 url = f'/python.org/..%2f..%2f..%2f..%2f..%2f../%0a%0d/../{self.tempdir_name}/existing_directory' 461 expected_location = f'{url}/' # /python.org.../ single slash single prefix, trailing slash 462 # Canonicalizes to /tmp/tempdir_name/existing_directory which does 463 # exist and is a dir, triggering the 301 redirect logic. 464 response = self.request(url) 465 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 466 location = response.getheader('Location') 467 self.assertEqual(location, expected_location, msg='non-attack failed!') 468 469 # //python.org... multi-slash prefix, no trailing slash 470 attack_url = f'/{url}' 471 response = self.request(attack_url) 472 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 473 location = response.getheader('Location') 474 self.assertFalse(location.startswith('//'), msg=location) 475 self.assertEqual(location, expected_location, 476 msg='Expected Location header to start with a single / and ' 477 'end with a / as this is a directory redirect.') 478 479 # ///python.org... triple-slash prefix, no trailing slash 480 attack3_url = f'//{url}' 481 response = self.request(attack3_url) 482 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 483 self.assertEqual(response.getheader('Location'), expected_location) 484 485 # If the second word in the http request (Request-URI for the http 486 # method) is a full URI, we don't worry about it, as that'll be parsed 487 # and reassembled as a full URI within BaseHTTPRequestHandler.send_head 488 # so no errant scheme-less //netloc//evil.co/ domain mixup can happen. 489 attack_scheme_netloc_2slash_url = f'https://pypi.org/{url}' 490 expected_scheme_netloc_location = f'{attack_scheme_netloc_2slash_url}/' 491 response = self.request(attack_scheme_netloc_2slash_url) 492 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 493 location = response.getheader('Location') 494 # We're just ensuring that the scheme and domain make it through, if 495 # there are or aren't multiple slashes at the start of the path that 496 # follows that isn't important in this Location: header. 497 self.assertTrue(location.startswith('https://pypi.org/'), msg=location) 498 499 def test_get(self): 500 #constructs the path relative to the root directory of the HTTPServer 501 response = self.request(self.base_url + '/test') 502 self.check_status_and_reason(response, HTTPStatus.OK, data=self.data) 503 # check for trailing "/" which should return 404. See Issue17324 504 response = self.request(self.base_url + '/test/') 505 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) 506 response = self.request(self.base_url + '/') 507 self.check_status_and_reason(response, HTTPStatus.OK) 508 response = self.request(self.base_url) 509 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 510 self.assertEqual(response.getheader("Content-Length"), "0") 511 response = self.request(self.base_url + '/?hi=2') 512 self.check_status_and_reason(response, HTTPStatus.OK) 513 response = self.request(self.base_url + '?hi=1') 514 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 515 self.assertEqual(response.getheader("Location"), 516 self.base_url + "/?hi=1") 517 response = self.request('/ThisDoesNotExist') 518 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) 519 response = self.request('/' + 'ThisDoesNotExist' + '/') 520 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) 521 os.makedirs(os.path.join(self.tempdir, 'spam', 'index.html')) 522 response = self.request(self.base_url + '/spam/') 523 self.check_status_and_reason(response, HTTPStatus.OK) 524 525 data = b"Dummy index file\r\n" 526 with open(os.path.join(self.tempdir_name, 'index.html'), 'wb') as f: 527 f.write(data) 528 response = self.request(self.base_url + '/') 529 self.check_status_and_reason(response, HTTPStatus.OK, data) 530 531 # chmod() doesn't work as expected on Windows, and filesystem 532 # permissions are ignored by root on Unix. 533 if os.name == 'posix' and os.geteuid() != 0: 534 os.chmod(self.tempdir, 0) 535 try: 536 response = self.request(self.base_url + '/') 537 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) 538 finally: 539 os.chmod(self.tempdir, 0o755) 540 541 def test_head(self): 542 response = self.request( 543 self.base_url + '/test', method='HEAD') 544 self.check_status_and_reason(response, HTTPStatus.OK) 545 self.assertEqual(response.getheader('content-length'), 546 str(len(self.data))) 547 self.assertEqual(response.getheader('content-type'), 548 'application/octet-stream') 549 550 def test_browser_cache(self): 551 """Check that when a request to /test is sent with the request header 552 If-Modified-Since set to date of last modification, the server returns 553 status code 304, not 200 554 """ 555 headers = email.message.Message() 556 headers['If-Modified-Since'] = self.last_modif_header 557 response = self.request(self.base_url + '/test', headers=headers) 558 self.check_status_and_reason(response, HTTPStatus.NOT_MODIFIED) 559 560 # one hour after last modification : must return 304 561 new_dt = self.last_modif_datetime + datetime.timedelta(hours=1) 562 headers = email.message.Message() 563 headers['If-Modified-Since'] = email.utils.format_datetime(new_dt, 564 usegmt=True) 565 response = self.request(self.base_url + '/test', headers=headers) 566 self.check_status_and_reason(response, HTTPStatus.NOT_MODIFIED) 567 568 def test_browser_cache_file_changed(self): 569 # with If-Modified-Since earlier than Last-Modified, must return 200 570 dt = self.last_modif_datetime 571 # build datetime object : 365 days before last modification 572 old_dt = dt - datetime.timedelta(days=365) 573 headers = email.message.Message() 574 headers['If-Modified-Since'] = email.utils.format_datetime(old_dt, 575 usegmt=True) 576 response = self.request(self.base_url + '/test', headers=headers) 577 self.check_status_and_reason(response, HTTPStatus.OK) 578 579 def test_browser_cache_with_If_None_Match_header(self): 580 # if If-None-Match header is present, ignore If-Modified-Since 581 582 headers = email.message.Message() 583 headers['If-Modified-Since'] = self.last_modif_header 584 headers['If-None-Match'] = "*" 585 response = self.request(self.base_url + '/test', headers=headers) 586 self.check_status_and_reason(response, HTTPStatus.OK) 587 588 def test_invalid_requests(self): 589 response = self.request('/', method='FOO') 590 self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED) 591 # requests must be case sensitive,so this should fail too 592 response = self.request('/', method='custom') 593 self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED) 594 response = self.request('/', method='GETs') 595 self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED) 596 597 def test_last_modified(self): 598 """Checks that the datetime returned in Last-Modified response header 599 is the actual datetime of last modification, rounded to the second 600 """ 601 response = self.request(self.base_url + '/test') 602 self.check_status_and_reason(response, HTTPStatus.OK, data=self.data) 603 last_modif_header = response.headers['Last-modified'] 604 self.assertEqual(last_modif_header, self.last_modif_header) 605 606 def test_path_without_leading_slash(self): 607 response = self.request(self.tempdir_name + '/test') 608 self.check_status_and_reason(response, HTTPStatus.OK, data=self.data) 609 response = self.request(self.tempdir_name + '/test/') 610 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) 611 response = self.request(self.tempdir_name + '/') 612 self.check_status_and_reason(response, HTTPStatus.OK) 613 response = self.request(self.tempdir_name) 614 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 615 response = self.request(self.tempdir_name + '/?hi=2') 616 self.check_status_and_reason(response, HTTPStatus.OK) 617 response = self.request(self.tempdir_name + '?hi=1') 618 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 619 self.assertEqual(response.getheader("Location"), 620 self.tempdir_name + "/?hi=1") 621 622 def test_html_escape_filename(self): 623 filename = '<test&>.txt' 624 fullpath = os.path.join(self.tempdir, filename) 625 626 try: 627 open(fullpath, 'wb').close() 628 except OSError: 629 raise unittest.SkipTest('Can not create file %s on current file ' 630 'system' % filename) 631 632 try: 633 response = self.request(self.base_url + '/') 634 body = self.check_status_and_reason(response, HTTPStatus.OK) 635 enc = response.headers.get_content_charset() 636 finally: 637 os.unlink(fullpath) # avoid affecting test_undecodable_filename 638 639 self.assertIsNotNone(enc) 640 html_text = '>%s<' % html.escape(filename, quote=False) 641 self.assertIn(html_text.encode(enc), body) 642 643 644cgi_file1 = """\ 645#!%s 646 647print("Content-type: text/html") 648print() 649print("Hello World") 650""" 651 652cgi_file2 = """\ 653#!%s 654import os 655import sys 656import urllib.parse 657 658print("Content-type: text/html") 659print() 660 661content_length = int(os.environ["CONTENT_LENGTH"]) 662query_string = sys.stdin.buffer.read(content_length) 663params = {key.decode("utf-8"): val.decode("utf-8") 664 for key, val in urllib.parse.parse_qsl(query_string)} 665 666print("%%s, %%s, %%s" %% (params["spam"], params["eggs"], params["bacon"])) 667""" 668 669cgi_file4 = """\ 670#!%s 671import os 672 673print("Content-type: text/html") 674print() 675 676print(os.environ["%s"]) 677""" 678 679cgi_file6 = """\ 680#!%s 681import os 682 683print("X-ambv: was here") 684print("Content-type: text/html") 685print() 686print("<pre>") 687for k, v in os.environ.items(): 688 try: 689 k.encode('ascii') 690 v.encode('ascii') 691 except UnicodeEncodeError: 692 continue # see: BPO-44647 693 print(f"{k}={v}") 694print("</pre>") 695""" 696 697 698@unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0, 699 "This test can't be run reliably as root (issue #13308).") 700class CGIHTTPServerTestCase(BaseTestCase): 701 class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler): 702 pass 703 704 linesep = os.linesep.encode('ascii') 705 706 def setUp(self): 707 BaseTestCase.setUp(self) 708 self.cwd = os.getcwd() 709 self.parent_dir = tempfile.mkdtemp() 710 self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin') 711 self.cgi_child_dir = os.path.join(self.cgi_dir, 'child-dir') 712 self.sub_dir_1 = os.path.join(self.parent_dir, 'sub') 713 self.sub_dir_2 = os.path.join(self.sub_dir_1, 'dir') 714 self.cgi_dir_in_sub_dir = os.path.join(self.sub_dir_2, 'cgi-bin') 715 os.mkdir(self.cgi_dir) 716 os.mkdir(self.cgi_child_dir) 717 os.mkdir(self.sub_dir_1) 718 os.mkdir(self.sub_dir_2) 719 os.mkdir(self.cgi_dir_in_sub_dir) 720 self.nocgi_path = None 721 self.file1_path = None 722 self.file2_path = None 723 self.file3_path = None 724 self.file4_path = None 725 self.file5_path = None 726 727 # The shebang line should be pure ASCII: use symlink if possible. 728 # See issue #7668. 729 self._pythonexe_symlink = None 730 if os_helper.can_symlink(): 731 self.pythonexe = os.path.join(self.parent_dir, 'python') 732 self._pythonexe_symlink = support.PythonSymlink(self.pythonexe).__enter__() 733 else: 734 self.pythonexe = sys.executable 735 736 try: 737 # The python executable path is written as the first line of the 738 # CGI Python script. The encoding cookie cannot be used, and so the 739 # path should be encodable to the default script encoding (utf-8) 740 self.pythonexe.encode('utf-8') 741 except UnicodeEncodeError: 742 self.tearDown() 743 self.skipTest("Python executable path is not encodable to utf-8") 744 745 self.nocgi_path = os.path.join(self.parent_dir, 'nocgi.py') 746 with open(self.nocgi_path, 'w', encoding='utf-8') as fp: 747 fp.write(cgi_file1 % self.pythonexe) 748 os.chmod(self.nocgi_path, 0o777) 749 750 self.file1_path = os.path.join(self.cgi_dir, 'file1.py') 751 with open(self.file1_path, 'w', encoding='utf-8') as file1: 752 file1.write(cgi_file1 % self.pythonexe) 753 os.chmod(self.file1_path, 0o777) 754 755 self.file2_path = os.path.join(self.cgi_dir, 'file2.py') 756 with open(self.file2_path, 'w', encoding='utf-8') as file2: 757 file2.write(cgi_file2 % self.pythonexe) 758 os.chmod(self.file2_path, 0o777) 759 760 self.file3_path = os.path.join(self.cgi_child_dir, 'file3.py') 761 with open(self.file3_path, 'w', encoding='utf-8') as file3: 762 file3.write(cgi_file1 % self.pythonexe) 763 os.chmod(self.file3_path, 0o777) 764 765 self.file4_path = os.path.join(self.cgi_dir, 'file4.py') 766 with open(self.file4_path, 'w', encoding='utf-8') as file4: 767 file4.write(cgi_file4 % (self.pythonexe, 'QUERY_STRING')) 768 os.chmod(self.file4_path, 0o777) 769 770 self.file5_path = os.path.join(self.cgi_dir_in_sub_dir, 'file5.py') 771 with open(self.file5_path, 'w', encoding='utf-8') as file5: 772 file5.write(cgi_file1 % self.pythonexe) 773 os.chmod(self.file5_path, 0o777) 774 775 self.file6_path = os.path.join(self.cgi_dir, 'file6.py') 776 with open(self.file6_path, 'w', encoding='utf-8') as file6: 777 file6.write(cgi_file6 % self.pythonexe) 778 os.chmod(self.file6_path, 0o777) 779 780 os.chdir(self.parent_dir) 781 782 def tearDown(self): 783 try: 784 os.chdir(self.cwd) 785 if self._pythonexe_symlink: 786 self._pythonexe_symlink.__exit__(None, None, None) 787 if self.nocgi_path: 788 os.remove(self.nocgi_path) 789 if self.file1_path: 790 os.remove(self.file1_path) 791 if self.file2_path: 792 os.remove(self.file2_path) 793 if self.file3_path: 794 os.remove(self.file3_path) 795 if self.file4_path: 796 os.remove(self.file4_path) 797 if self.file5_path: 798 os.remove(self.file5_path) 799 if self.file6_path: 800 os.remove(self.file6_path) 801 os.rmdir(self.cgi_child_dir) 802 os.rmdir(self.cgi_dir) 803 os.rmdir(self.cgi_dir_in_sub_dir) 804 os.rmdir(self.sub_dir_2) 805 os.rmdir(self.sub_dir_1) 806 os.rmdir(self.parent_dir) 807 finally: 808 BaseTestCase.tearDown(self) 809 810 def test_url_collapse_path(self): 811 # verify tail is the last portion and head is the rest on proper urls 812 test_vectors = { 813 '': '//', 814 '..': IndexError, 815 '/.//..': IndexError, 816 '/': '//', 817 '//': '//', 818 '/\\': '//\\', 819 '/.//': '//', 820 'cgi-bin/file1.py': '/cgi-bin/file1.py', 821 '/cgi-bin/file1.py': '/cgi-bin/file1.py', 822 'a': '//a', 823 '/a': '//a', 824 '//a': '//a', 825 './a': '//a', 826 './C:/': '/C:/', 827 '/a/b': '/a/b', 828 '/a/b/': '/a/b/', 829 '/a/b/.': '/a/b/', 830 '/a/b/c/..': '/a/b/', 831 '/a/b/c/../d': '/a/b/d', 832 '/a/b/c/../d/e/../f': '/a/b/d/f', 833 '/a/b/c/../d/e/../../f': '/a/b/f', 834 '/a/b/c/../d/e/.././././..//f': '/a/b/f', 835 '../a/b/c/../d/e/.././././..//f': IndexError, 836 '/a/b/c/../d/e/../../../f': '/a/f', 837 '/a/b/c/../d/e/../../../../f': '//f', 838 '/a/b/c/../d/e/../../../../../f': IndexError, 839 '/a/b/c/../d/e/../../../../f/..': '//', 840 '/a/b/c/../d/e/../../../../f/../.': '//', 841 } 842 for path, expected in test_vectors.items(): 843 if isinstance(expected, type) and issubclass(expected, Exception): 844 self.assertRaises(expected, 845 server._url_collapse_path, path) 846 else: 847 actual = server._url_collapse_path(path) 848 self.assertEqual(expected, actual, 849 msg='path = %r\nGot: %r\nWanted: %r' % 850 (path, actual, expected)) 851 852 def test_headers_and_content(self): 853 res = self.request('/cgi-bin/file1.py') 854 self.assertEqual( 855 (res.read(), res.getheader('Content-type'), res.status), 856 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK)) 857 858 def test_issue19435(self): 859 res = self.request('///////////nocgi.py/../cgi-bin/nothere.sh') 860 self.assertEqual(res.status, HTTPStatus.NOT_FOUND) 861 862 def test_post(self): 863 params = urllib.parse.urlencode( 864 {'spam' : 1, 'eggs' : 'python', 'bacon' : 123456}) 865 headers = {'Content-type' : 'application/x-www-form-urlencoded'} 866 res = self.request('/cgi-bin/file2.py', 'POST', params, headers) 867 868 self.assertEqual(res.read(), b'1, python, 123456' + self.linesep) 869 870 def test_invaliduri(self): 871 res = self.request('/cgi-bin/invalid') 872 res.read() 873 self.assertEqual(res.status, HTTPStatus.NOT_FOUND) 874 875 def test_authorization(self): 876 headers = {b'Authorization' : b'Basic ' + 877 base64.b64encode(b'username:pass')} 878 res = self.request('/cgi-bin/file1.py', 'GET', headers=headers) 879 self.assertEqual( 880 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 881 (res.read(), res.getheader('Content-type'), res.status)) 882 883 def test_no_leading_slash(self): 884 # http://bugs.python.org/issue2254 885 res = self.request('cgi-bin/file1.py') 886 self.assertEqual( 887 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 888 (res.read(), res.getheader('Content-type'), res.status)) 889 890 def test_os_environ_is_not_altered(self): 891 signature = "Test CGI Server" 892 os.environ['SERVER_SOFTWARE'] = signature 893 res = self.request('/cgi-bin/file1.py') 894 self.assertEqual( 895 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 896 (res.read(), res.getheader('Content-type'), res.status)) 897 self.assertEqual(os.environ['SERVER_SOFTWARE'], signature) 898 899 def test_urlquote_decoding_in_cgi_check(self): 900 res = self.request('/cgi-bin%2ffile1.py') 901 self.assertEqual( 902 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 903 (res.read(), res.getheader('Content-type'), res.status)) 904 905 def test_nested_cgi_path_issue21323(self): 906 res = self.request('/cgi-bin/child-dir/file3.py') 907 self.assertEqual( 908 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 909 (res.read(), res.getheader('Content-type'), res.status)) 910 911 def test_query_with_multiple_question_mark(self): 912 res = self.request('/cgi-bin/file4.py?a=b?c=d') 913 self.assertEqual( 914 (b'a=b?c=d' + self.linesep, 'text/html', HTTPStatus.OK), 915 (res.read(), res.getheader('Content-type'), res.status)) 916 917 def test_query_with_continuous_slashes(self): 918 res = self.request('/cgi-bin/file4.py?k=aa%2F%2Fbb&//q//p//=//a//b//') 919 self.assertEqual( 920 (b'k=aa%2F%2Fbb&//q//p//=//a//b//' + self.linesep, 921 'text/html', HTTPStatus.OK), 922 (res.read(), res.getheader('Content-type'), res.status)) 923 924 def test_cgi_path_in_sub_directories(self): 925 try: 926 CGIHTTPRequestHandler.cgi_directories.append('/sub/dir/cgi-bin') 927 res = self.request('/sub/dir/cgi-bin/file5.py') 928 self.assertEqual( 929 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 930 (res.read(), res.getheader('Content-type'), res.status)) 931 finally: 932 CGIHTTPRequestHandler.cgi_directories.remove('/sub/dir/cgi-bin') 933 934 def test_accept(self): 935 browser_accept = \ 936 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8' 937 tests = ( 938 ((('Accept', browser_accept),), browser_accept), 939 ((), ''), 940 # Hack case to get two values for the one header 941 ((('Accept', 'text/html'), ('ACCEPT', 'text/plain')), 942 'text/html,text/plain'), 943 ) 944 for headers, expected in tests: 945 headers = OrderedDict(headers) 946 with self.subTest(headers): 947 res = self.request('/cgi-bin/file6.py', 'GET', headers=headers) 948 self.assertEqual(http.HTTPStatus.OK, res.status) 949 expected = f"HTTP_ACCEPT={expected}".encode('ascii') 950 self.assertIn(expected, res.read()) 951 952 953class SocketlessRequestHandler(SimpleHTTPRequestHandler): 954 def __init__(self, directory=None): 955 request = mock.Mock() 956 request.makefile.return_value = BytesIO() 957 super().__init__(request, None, None, directory=directory) 958 959 self.get_called = False 960 self.protocol_version = "HTTP/1.1" 961 962 def do_GET(self): 963 self.get_called = True 964 self.send_response(HTTPStatus.OK) 965 self.send_header('Content-Type', 'text/html') 966 self.end_headers() 967 self.wfile.write(b'<html><body>Data</body></html>\r\n') 968 969 def log_message(self, format, *args): 970 pass 971 972class RejectingSocketlessRequestHandler(SocketlessRequestHandler): 973 def handle_expect_100(self): 974 self.send_error(HTTPStatus.EXPECTATION_FAILED) 975 return False 976 977 978class AuditableBytesIO: 979 980 def __init__(self): 981 self.datas = [] 982 983 def write(self, data): 984 self.datas.append(data) 985 986 def getData(self): 987 return b''.join(self.datas) 988 989 @property 990 def numWrites(self): 991 return len(self.datas) 992 993 994class BaseHTTPRequestHandlerTestCase(unittest.TestCase): 995 """Test the functionality of the BaseHTTPServer. 996 997 Test the support for the Expect 100-continue header. 998 """ 999 1000 HTTPResponseMatch = re.compile(b'HTTP/1.[0-9]+ 200 OK') 1001 1002 def setUp (self): 1003 self.handler = SocketlessRequestHandler() 1004 1005 def send_typical_request(self, message): 1006 input = BytesIO(message) 1007 output = BytesIO() 1008 self.handler.rfile = input 1009 self.handler.wfile = output 1010 self.handler.handle_one_request() 1011 output.seek(0) 1012 return output.readlines() 1013 1014 def verify_get_called(self): 1015 self.assertTrue(self.handler.get_called) 1016 1017 def verify_expected_headers(self, headers): 1018 for fieldName in b'Server: ', b'Date: ', b'Content-Type: ': 1019 self.assertEqual(sum(h.startswith(fieldName) for h in headers), 1) 1020 1021 def verify_http_server_response(self, response): 1022 match = self.HTTPResponseMatch.search(response) 1023 self.assertIsNotNone(match) 1024 1025 def test_unprintable_not_logged(self): 1026 # We call the method from the class directly as our Socketless 1027 # Handler subclass overrode it... nice for everything BUT this test. 1028 self.handler.client_address = ('127.0.0.1', 1337) 1029 log_message = BaseHTTPRequestHandler.log_message 1030 with mock.patch.object(sys, 'stderr', StringIO()) as fake_stderr: 1031 log_message(self.handler, '/foo') 1032 log_message(self.handler, '/\033bar\000\033') 1033 log_message(self.handler, '/spam %s.', 'a') 1034 log_message(self.handler, '/spam %s.', '\033\x7f\x9f\xa0beans') 1035 log_message(self.handler, '"GET /foo\\b"ar\007 HTTP/1.0"') 1036 stderr = fake_stderr.getvalue() 1037 self.assertNotIn('\033', stderr) # non-printable chars are caught. 1038 self.assertNotIn('\000', stderr) # non-printable chars are caught. 1039 lines = stderr.splitlines() 1040 self.assertIn('/foo', lines[0]) 1041 self.assertIn(r'/\x1bbar\x00\x1b', lines[1]) 1042 self.assertIn('/spam a.', lines[2]) 1043 self.assertIn('/spam \\x1b\\x7f\\x9f\xa0beans.', lines[3]) 1044 self.assertIn(r'"GET /foo\\b"ar\x07 HTTP/1.0"', lines[4]) 1045 1046 def test_http_1_1(self): 1047 result = self.send_typical_request(b'GET / HTTP/1.1\r\n\r\n') 1048 self.verify_http_server_response(result[0]) 1049 self.verify_expected_headers(result[1:-1]) 1050 self.verify_get_called() 1051 self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') 1052 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1') 1053 self.assertEqual(self.handler.command, 'GET') 1054 self.assertEqual(self.handler.path, '/') 1055 self.assertEqual(self.handler.request_version, 'HTTP/1.1') 1056 self.assertSequenceEqual(self.handler.headers.items(), ()) 1057 1058 def test_http_1_0(self): 1059 result = self.send_typical_request(b'GET / HTTP/1.0\r\n\r\n') 1060 self.verify_http_server_response(result[0]) 1061 self.verify_expected_headers(result[1:-1]) 1062 self.verify_get_called() 1063 self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') 1064 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.0') 1065 self.assertEqual(self.handler.command, 'GET') 1066 self.assertEqual(self.handler.path, '/') 1067 self.assertEqual(self.handler.request_version, 'HTTP/1.0') 1068 self.assertSequenceEqual(self.handler.headers.items(), ()) 1069 1070 def test_http_0_9(self): 1071 result = self.send_typical_request(b'GET / HTTP/0.9\r\n\r\n') 1072 self.assertEqual(len(result), 1) 1073 self.assertEqual(result[0], b'<html><body>Data</body></html>\r\n') 1074 self.verify_get_called() 1075 1076 def test_extra_space(self): 1077 result = self.send_typical_request( 1078 b'GET /spaced out HTTP/1.1\r\n' 1079 b'Host: dummy\r\n' 1080 b'\r\n' 1081 ) 1082 self.assertTrue(result[0].startswith(b'HTTP/1.1 400 ')) 1083 self.verify_expected_headers(result[1:result.index(b'\r\n')]) 1084 self.assertFalse(self.handler.get_called) 1085 1086 def test_with_continue_1_0(self): 1087 result = self.send_typical_request(b'GET / HTTP/1.0\r\nExpect: 100-continue\r\n\r\n') 1088 self.verify_http_server_response(result[0]) 1089 self.verify_expected_headers(result[1:-1]) 1090 self.verify_get_called() 1091 self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') 1092 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.0') 1093 self.assertEqual(self.handler.command, 'GET') 1094 self.assertEqual(self.handler.path, '/') 1095 self.assertEqual(self.handler.request_version, 'HTTP/1.0') 1096 headers = (("Expect", "100-continue"),) 1097 self.assertSequenceEqual(self.handler.headers.items(), headers) 1098 1099 def test_with_continue_1_1(self): 1100 result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n') 1101 self.assertEqual(result[0], b'HTTP/1.1 100 Continue\r\n') 1102 self.assertEqual(result[1], b'\r\n') 1103 self.assertEqual(result[2], b'HTTP/1.1 200 OK\r\n') 1104 self.verify_expected_headers(result[2:-1]) 1105 self.verify_get_called() 1106 self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') 1107 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1') 1108 self.assertEqual(self.handler.command, 'GET') 1109 self.assertEqual(self.handler.path, '/') 1110 self.assertEqual(self.handler.request_version, 'HTTP/1.1') 1111 headers = (("Expect", "100-continue"),) 1112 self.assertSequenceEqual(self.handler.headers.items(), headers) 1113 1114 def test_header_buffering_of_send_error(self): 1115 1116 input = BytesIO(b'GET / HTTP/1.1\r\n\r\n') 1117 output = AuditableBytesIO() 1118 handler = SocketlessRequestHandler() 1119 handler.rfile = input 1120 handler.wfile = output 1121 handler.request_version = 'HTTP/1.1' 1122 handler.requestline = '' 1123 handler.command = None 1124 1125 handler.send_error(418) 1126 self.assertEqual(output.numWrites, 2) 1127 1128 def test_header_buffering_of_send_response_only(self): 1129 1130 input = BytesIO(b'GET / HTTP/1.1\r\n\r\n') 1131 output = AuditableBytesIO() 1132 handler = SocketlessRequestHandler() 1133 handler.rfile = input 1134 handler.wfile = output 1135 handler.request_version = 'HTTP/1.1' 1136 1137 handler.send_response_only(418) 1138 self.assertEqual(output.numWrites, 0) 1139 handler.end_headers() 1140 self.assertEqual(output.numWrites, 1) 1141 1142 def test_header_buffering_of_send_header(self): 1143 1144 input = BytesIO(b'GET / HTTP/1.1\r\n\r\n') 1145 output = AuditableBytesIO() 1146 handler = SocketlessRequestHandler() 1147 handler.rfile = input 1148 handler.wfile = output 1149 handler.request_version = 'HTTP/1.1' 1150 1151 handler.send_header('Foo', 'foo') 1152 handler.send_header('bar', 'bar') 1153 self.assertEqual(output.numWrites, 0) 1154 handler.end_headers() 1155 self.assertEqual(output.getData(), b'Foo: foo\r\nbar: bar\r\n\r\n') 1156 self.assertEqual(output.numWrites, 1) 1157 1158 def test_header_unbuffered_when_continue(self): 1159 1160 def _readAndReseek(f): 1161 pos = f.tell() 1162 f.seek(0) 1163 data = f.read() 1164 f.seek(pos) 1165 return data 1166 1167 input = BytesIO(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n') 1168 output = BytesIO() 1169 self.handler.rfile = input 1170 self.handler.wfile = output 1171 self.handler.request_version = 'HTTP/1.1' 1172 1173 self.handler.handle_one_request() 1174 self.assertNotEqual(_readAndReseek(output), b'') 1175 result = _readAndReseek(output).split(b'\r\n') 1176 self.assertEqual(result[0], b'HTTP/1.1 100 Continue') 1177 self.assertEqual(result[1], b'') 1178 self.assertEqual(result[2], b'HTTP/1.1 200 OK') 1179 1180 def test_with_continue_rejected(self): 1181 usual_handler = self.handler # Save to avoid breaking any subsequent tests. 1182 self.handler = RejectingSocketlessRequestHandler() 1183 result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n') 1184 self.assertEqual(result[0], b'HTTP/1.1 417 Expectation Failed\r\n') 1185 self.verify_expected_headers(result[1:-1]) 1186 # The expect handler should short circuit the usual get method by 1187 # returning false here, so get_called should be false 1188 self.assertFalse(self.handler.get_called) 1189 self.assertEqual(sum(r == b'Connection: close\r\n' for r in result[1:-1]), 1) 1190 self.handler = usual_handler # Restore to avoid breaking any subsequent tests. 1191 1192 def test_request_length(self): 1193 # Issue #10714: huge request lines are discarded, to avoid Denial 1194 # of Service attacks. 1195 result = self.send_typical_request(b'GET ' + b'x' * 65537) 1196 self.assertEqual(result[0], b'HTTP/1.1 414 Request-URI Too Long\r\n') 1197 self.assertFalse(self.handler.get_called) 1198 self.assertIsInstance(self.handler.requestline, str) 1199 1200 def test_header_length(self): 1201 # Issue #6791: same for headers 1202 result = self.send_typical_request( 1203 b'GET / HTTP/1.1\r\nX-Foo: bar' + b'r' * 65537 + b'\r\n\r\n') 1204 self.assertEqual(result[0], b'HTTP/1.1 431 Line too long\r\n') 1205 self.assertFalse(self.handler.get_called) 1206 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1') 1207 1208 def test_too_many_headers(self): 1209 result = self.send_typical_request( 1210 b'GET / HTTP/1.1\r\n' + b'X-Foo: bar\r\n' * 101 + b'\r\n') 1211 self.assertEqual(result[0], b'HTTP/1.1 431 Too many headers\r\n') 1212 self.assertFalse(self.handler.get_called) 1213 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1') 1214 1215 def test_html_escape_on_error(self): 1216 result = self.send_typical_request( 1217 b'<script>alert("hello")</script> / HTTP/1.1') 1218 result = b''.join(result) 1219 text = '<script>alert("hello")</script>' 1220 self.assertIn(html.escape(text, quote=False).encode('ascii'), result) 1221 1222 def test_close_connection(self): 1223 # handle_one_request() should be repeatedly called until 1224 # it sets close_connection 1225 def handle_one_request(): 1226 self.handler.close_connection = next(close_values) 1227 self.handler.handle_one_request = handle_one_request 1228 1229 close_values = iter((True,)) 1230 self.handler.handle() 1231 self.assertRaises(StopIteration, next, close_values) 1232 1233 close_values = iter((False, False, True)) 1234 self.handler.handle() 1235 self.assertRaises(StopIteration, next, close_values) 1236 1237 def test_date_time_string(self): 1238 now = time.time() 1239 # this is the old code that formats the timestamp 1240 year, month, day, hh, mm, ss, wd, y, z = time.gmtime(now) 1241 expected = "%s, %02d %3s %4d %02d:%02d:%02d GMT" % ( 1242 self.handler.weekdayname[wd], 1243 day, 1244 self.handler.monthname[month], 1245 year, hh, mm, ss 1246 ) 1247 self.assertEqual(self.handler.date_time_string(timestamp=now), expected) 1248 1249 1250class SimpleHTTPRequestHandlerTestCase(unittest.TestCase): 1251 """ Test url parsing """ 1252 def setUp(self): 1253 self.translated_1 = os.path.join(os.getcwd(), 'filename') 1254 self.translated_2 = os.path.join('foo', 'filename') 1255 self.translated_3 = os.path.join('bar', 'filename') 1256 self.handler_1 = SocketlessRequestHandler() 1257 self.handler_2 = SocketlessRequestHandler(directory='foo') 1258 self.handler_3 = SocketlessRequestHandler(directory=pathlib.PurePath('bar')) 1259 1260 def test_query_arguments(self): 1261 path = self.handler_1.translate_path('/filename') 1262 self.assertEqual(path, self.translated_1) 1263 path = self.handler_2.translate_path('/filename') 1264 self.assertEqual(path, self.translated_2) 1265 path = self.handler_3.translate_path('/filename') 1266 self.assertEqual(path, self.translated_3) 1267 1268 path = self.handler_1.translate_path('/filename?foo=bar') 1269 self.assertEqual(path, self.translated_1) 1270 path = self.handler_2.translate_path('/filename?foo=bar') 1271 self.assertEqual(path, self.translated_2) 1272 path = self.handler_3.translate_path('/filename?foo=bar') 1273 self.assertEqual(path, self.translated_3) 1274 1275 path = self.handler_1.translate_path('/filename?a=b&spam=eggs#zot') 1276 self.assertEqual(path, self.translated_1) 1277 path = self.handler_2.translate_path('/filename?a=b&spam=eggs#zot') 1278 self.assertEqual(path, self.translated_2) 1279 path = self.handler_3.translate_path('/filename?a=b&spam=eggs#zot') 1280 self.assertEqual(path, self.translated_3) 1281 1282 def test_start_with_double_slash(self): 1283 path = self.handler_1.translate_path('//filename') 1284 self.assertEqual(path, self.translated_1) 1285 path = self.handler_2.translate_path('//filename') 1286 self.assertEqual(path, self.translated_2) 1287 path = self.handler_3.translate_path('//filename') 1288 self.assertEqual(path, self.translated_3) 1289 1290 path = self.handler_1.translate_path('//filename?foo=bar') 1291 self.assertEqual(path, self.translated_1) 1292 path = self.handler_2.translate_path('//filename?foo=bar') 1293 self.assertEqual(path, self.translated_2) 1294 path = self.handler_3.translate_path('//filename?foo=bar') 1295 self.assertEqual(path, self.translated_3) 1296 1297 def test_windows_colon(self): 1298 with support.swap_attr(server.os, 'path', ntpath): 1299 path = self.handler_1.translate_path('c:c:c:foo/filename') 1300 path = path.replace(ntpath.sep, os.sep) 1301 self.assertEqual(path, self.translated_1) 1302 path = self.handler_2.translate_path('c:c:c:foo/filename') 1303 path = path.replace(ntpath.sep, os.sep) 1304 self.assertEqual(path, self.translated_2) 1305 path = self.handler_3.translate_path('c:c:c:foo/filename') 1306 path = path.replace(ntpath.sep, os.sep) 1307 self.assertEqual(path, self.translated_3) 1308 1309 path = self.handler_1.translate_path('\\c:../filename') 1310 path = path.replace(ntpath.sep, os.sep) 1311 self.assertEqual(path, self.translated_1) 1312 path = self.handler_2.translate_path('\\c:../filename') 1313 path = path.replace(ntpath.sep, os.sep) 1314 self.assertEqual(path, self.translated_2) 1315 path = self.handler_3.translate_path('\\c:../filename') 1316 path = path.replace(ntpath.sep, os.sep) 1317 self.assertEqual(path, self.translated_3) 1318 1319 path = self.handler_1.translate_path('c:\\c:..\\foo/filename') 1320 path = path.replace(ntpath.sep, os.sep) 1321 self.assertEqual(path, self.translated_1) 1322 path = self.handler_2.translate_path('c:\\c:..\\foo/filename') 1323 path = path.replace(ntpath.sep, os.sep) 1324 self.assertEqual(path, self.translated_2) 1325 path = self.handler_3.translate_path('c:\\c:..\\foo/filename') 1326 path = path.replace(ntpath.sep, os.sep) 1327 self.assertEqual(path, self.translated_3) 1328 1329 path = self.handler_1.translate_path('c:c:foo\\c:c:bar/filename') 1330 path = path.replace(ntpath.sep, os.sep) 1331 self.assertEqual(path, self.translated_1) 1332 path = self.handler_2.translate_path('c:c:foo\\c:c:bar/filename') 1333 path = path.replace(ntpath.sep, os.sep) 1334 self.assertEqual(path, self.translated_2) 1335 path = self.handler_3.translate_path('c:c:foo\\c:c:bar/filename') 1336 path = path.replace(ntpath.sep, os.sep) 1337 self.assertEqual(path, self.translated_3) 1338 1339 1340class MiscTestCase(unittest.TestCase): 1341 def test_all(self): 1342 expected = [] 1343 denylist = {'executable', 'nobody_uid', 'test'} 1344 for name in dir(server): 1345 if name.startswith('_') or name in denylist: 1346 continue 1347 module_object = getattr(server, name) 1348 if getattr(module_object, '__module__', None) == 'http.server': 1349 expected.append(name) 1350 self.assertCountEqual(server.__all__, expected) 1351 1352 1353class ScriptTestCase(unittest.TestCase): 1354 1355 def mock_server_class(self): 1356 return mock.MagicMock( 1357 return_value=mock.MagicMock( 1358 __enter__=mock.MagicMock( 1359 return_value=mock.MagicMock( 1360 socket=mock.MagicMock( 1361 getsockname=lambda: ('', 0), 1362 ), 1363 ), 1364 ), 1365 ), 1366 ) 1367 1368 @mock.patch('builtins.print') 1369 def test_server_test_unspec(self, _): 1370 mock_server = self.mock_server_class() 1371 server.test(ServerClass=mock_server, bind=None) 1372 self.assertIn( 1373 mock_server.address_family, 1374 (socket.AF_INET6, socket.AF_INET), 1375 ) 1376 1377 @mock.patch('builtins.print') 1378 def test_server_test_localhost(self, _): 1379 mock_server = self.mock_server_class() 1380 server.test(ServerClass=mock_server, bind="localhost") 1381 self.assertIn( 1382 mock_server.address_family, 1383 (socket.AF_INET6, socket.AF_INET), 1384 ) 1385 1386 ipv6_addrs = ( 1387 "::", 1388 "2001:0db8:85a3:0000:0000:8a2e:0370:7334", 1389 "::1", 1390 ) 1391 1392 ipv4_addrs = ( 1393 "0.0.0.0", 1394 "8.8.8.8", 1395 "127.0.0.1", 1396 ) 1397 1398 @mock.patch('builtins.print') 1399 def test_server_test_ipv6(self, _): 1400 for bind in self.ipv6_addrs: 1401 mock_server = self.mock_server_class() 1402 server.test(ServerClass=mock_server, bind=bind) 1403 self.assertEqual(mock_server.address_family, socket.AF_INET6) 1404 1405 @mock.patch('builtins.print') 1406 def test_server_test_ipv4(self, _): 1407 for bind in self.ipv4_addrs: 1408 mock_server = self.mock_server_class() 1409 server.test(ServerClass=mock_server, bind=bind) 1410 self.assertEqual(mock_server.address_family, socket.AF_INET) 1411 1412 1413def setUpModule(): 1414 unittest.addModuleCleanup(os.chdir, os.getcwd()) 1415 1416 1417if __name__ == '__main__': 1418 unittest.main() 1419