1#!/usr/bin/env python3 2"""A set of unit tests for httplib2.py.""" 3 4__author__ = "Joe Gregorio ([email protected])" 5__copyright__ = "Copyright 2006, Joe Gregorio" 6__contributors__ = ["Mark Pilgrim"] 7__license__ = "MIT" 8__version__ = "0.2 ($Rev: 118 $)" 9 10import base64 11import http.client 12import httplib2 13import io 14import os 15import pickle 16import socket 17import ssl 18import sys 19import time 20import unittest 21import urllib.parse 22 23base = "http://bitworking.org/projects/httplib2/test/" 24cacheDirName = ".cache" 25 26 27class CredentialsTest(unittest.TestCase): 28 def test(self): 29 c = httplib2.Credentials() 30 c.add("joe", "password") 31 self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0]) 32 self.assertEqual(("joe", "password"), list(c.iter(""))[0]) 33 c.add("fred", "password2", "wellformedweb.org") 34 self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0]) 35 self.assertEqual(1, len(list(c.iter("bitworking.org")))) 36 self.assertEqual(2, len(list(c.iter("wellformedweb.org")))) 37 self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org"))) 38 c.clear() 39 self.assertEqual(0, len(list(c.iter("bitworking.org")))) 40 c.add("fred", "password2", "wellformedweb.org") 41 self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org"))) 42 self.assertEqual(0, len(list(c.iter("bitworking.org")))) 43 self.assertEqual(0, len(list(c.iter("")))) 44 45 46class ParserTest(unittest.TestCase): 47 def testFromStd66(self): 48 self.assertEqual( 49 ("http", "example.com", "", None, None), 50 httplib2.parse_uri("http://example.com"), 51 ) 52 self.assertEqual( 53 ("https", "example.com", "", None, None), 54 httplib2.parse_uri("https://example.com"), 55 ) 56 self.assertEqual( 57 ("https", "example.com:8080", "", None, None), 58 httplib2.parse_uri("https://example.com:8080"), 59 ) 60 self.assertEqual( 61 ("http", "example.com", "/", None, None), 62 httplib2.parse_uri("http://example.com/"), 63 ) 64 self.assertEqual( 65 ("http", "example.com", "/path", None, None), 66 httplib2.parse_uri("http://example.com/path"), 67 ) 68 self.assertEqual( 69 ("http", "example.com", "/path", "a=1&b=2", None), 70 httplib2.parse_uri("http://example.com/path?a=1&b=2"), 71 ) 72 self.assertEqual( 73 ("http", "example.com", "/path", "a=1&b=2", "fred"), 74 httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"), 75 ) 76 self.assertEqual( 77 ("http", "example.com", "/path", "a=1&b=2", "fred"), 78 httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"), 79 ) 80 81 82class UrlNormTest(unittest.TestCase): 83 def test(self): 84 self.assertEqual( 85 "http://example.org/", httplib2.urlnorm("http://example.org")[-1] 86 ) 87 self.assertEqual( 88 "http://example.org/", httplib2.urlnorm("http://EXAMple.org")[-1] 89 ) 90 self.assertEqual( 91 "http://example.org/?=b", httplib2.urlnorm("http://EXAMple.org?=b")[-1] 92 ) 93 self.assertEqual( 94 "http://example.org/mypath?a=b", 95 httplib2.urlnorm("http://EXAMple.org/mypath?a=b")[-1], 96 ) 97 self.assertEqual( 98 "http://localhost:80/", httplib2.urlnorm("http://localhost:80")[-1] 99 ) 100 self.assertEqual( 101 httplib2.urlnorm("http://localhost:80/"), 102 httplib2.urlnorm("HTTP://LOCALHOST:80"), 103 ) 104 try: 105 httplib2.urlnorm("/") 106 self.fail("Non-absolute URIs should raise an exception") 107 except httplib2.RelativeURIError: 108 pass 109 110 111class UrlSafenameTest(unittest.TestCase): 112 def test(self): 113 # Test that different URIs end up generating different safe names 114 self.assertEqual( 115 "example.org,fred,a=b,58489f63a7a83c3b7794a6a398ee8b1f", 116 httplib2.safename("http://example.org/fred/?a=b"), 117 ) 118 self.assertEqual( 119 "example.org,fred,a=b,8c5946d56fec453071f43329ff0be46b", 120 httplib2.safename("http://example.org/fred?/a=b"), 121 ) 122 self.assertEqual( 123 "www.example.org,fred,a=b,499c44b8d844a011b67ea2c015116968", 124 httplib2.safename("http://www.example.org/fred?/a=b"), 125 ) 126 self.assertEqual( 127 httplib2.safename(httplib2.urlnorm("http://www")[-1]), 128 httplib2.safename(httplib2.urlnorm("http://WWW")[-1]), 129 ) 130 self.assertEqual( 131 "www.example.org,fred,a=b,692e843a333484ce0095b070497ab45d", 132 httplib2.safename("https://www.example.org/fred?/a=b"), 133 ) 134 self.assertNotEqual( 135 httplib2.safename("http://www"), httplib2.safename("https://www") 136 ) 137 # Test the max length limits 138 uri = "http://" + ("w" * 200) + ".org" 139 uri2 = "http://" + ("w" * 201) + ".org" 140 self.assertNotEqual(httplib2.safename(uri2), httplib2.safename(uri)) 141 # Max length should be 200 + 1 (",") + 32 142 self.assertEqual(233, len(httplib2.safename(uri2))) 143 self.assertEqual(233, len(httplib2.safename(uri))) 144 # Unicode 145 if sys.version_info >= (2, 3): 146 self.assertEqual( 147 "xn--http,-4y1d.org,fred,a=b,579924c35db315e5a32e3d9963388193", 148 httplib2.safename("http://\u2304.org/fred/?a=b"), 149 ) 150 151 152class _MyResponse(io.BytesIO): 153 def __init__(self, body, **kwargs): 154 io.BytesIO.__init__(self, body) 155 self.headers = kwargs 156 157 def items(self): 158 return self.headers.items() 159 160 def iteritems(self): 161 return iter(self.headers.items()) 162 163 164class _MyHTTPConnection(object): 165 "This class is just a mock of httplib.HTTPConnection used for testing" 166 167 def __init__( 168 self, 169 host, 170 port=None, 171 key_file=None, 172 cert_file=None, 173 strict=None, 174 timeout=None, 175 proxy_info=None, 176 ): 177 self.host = host 178 self.port = port 179 self.timeout = timeout 180 self.log = "" 181 self.sock = None 182 183 def set_debuglevel(self, level): 184 pass 185 186 def connect(self): 187 "Connect to a host on a given port." 188 pass 189 190 def close(self): 191 pass 192 193 def request(self, method, request_uri, body, headers): 194 pass 195 196 def getresponse(self): 197 return _MyResponse(b"the body", status="200") 198 199 200class _MyHTTPBadStatusConnection(object): 201 "Mock of httplib.HTTPConnection that raises BadStatusLine." 202 203 num_calls = 0 204 205 def __init__( 206 self, 207 host, 208 port=None, 209 key_file=None, 210 cert_file=None, 211 strict=None, 212 timeout=None, 213 proxy_info=None, 214 ): 215 self.host = host 216 self.port = port 217 self.timeout = timeout 218 self.log = "" 219 self.sock = None 220 _MyHTTPBadStatusConnection.num_calls = 0 221 222 def set_debuglevel(self, level): 223 pass 224 225 def connect(self): 226 pass 227 228 def close(self): 229 pass 230 231 def request(self, method, request_uri, body, headers): 232 pass 233 234 def getresponse(self): 235 _MyHTTPBadStatusConnection.num_calls += 1 236 raise http.client.BadStatusLine("") 237 238 239class HttpTest(unittest.TestCase): 240 def setUp(self): 241 if os.path.exists(cacheDirName): 242 [ 243 os.remove(os.path.join(cacheDirName, file)) 244 for file in os.listdir(cacheDirName) 245 ] 246 self.http = httplib2.Http(cacheDirName) 247 self.http.clear_credentials() 248 249 def testIPv6NoSSL(self): 250 try: 251 self.http.request("http://[::1]/") 252 except socket.gaierror: 253 self.fail("should get the address family right for IPv6") 254 except socket.error: 255 # Even if IPv6 isn't installed on a machine it should just raise socket.error 256 pass 257 258 def testIPv6SSL(self): 259 try: 260 self.http.request("https://[::1]/") 261 except socket.gaierror: 262 self.fail("should get the address family right for IPv6") 263 except socket.error: 264 # Even if IPv6 isn't installed on a machine it should just raise socket.error 265 pass 266 267 def testConnectionType(self): 268 self.http.force_exception_to_status_code = False 269 response, content = self.http.request( 270 "http://bitworking.org", connection_type=_MyHTTPConnection 271 ) 272 self.assertEqual(response["content-location"], "http://bitworking.org") 273 self.assertEqual(content, b"the body") 274 275 def testBadStatusLineRetry(self): 276 old_retries = httplib2.RETRIES 277 httplib2.RETRIES = 1 278 self.http.force_exception_to_status_code = False 279 try: 280 response, content = self.http.request( 281 "http://bitworking.org", connection_type=_MyHTTPBadStatusConnection 282 ) 283 except http.client.BadStatusLine: 284 self.assertEqual(2, _MyHTTPBadStatusConnection.num_calls) 285 httplib2.RETRIES = old_retries 286 287 def testGetUnknownServer(self): 288 self.http.force_exception_to_status_code = False 289 try: 290 self.http.request("http://fred.bitworking.org/") 291 self.fail( 292 "An httplib2.ServerNotFoundError Exception must be thrown on an unresolvable server." 293 ) 294 except httplib2.ServerNotFoundError: 295 pass 296 297 # Now test with exceptions turned off 298 self.http.force_exception_to_status_code = True 299 300 (response, content) = self.http.request("http://fred.bitworking.org/") 301 self.assertEqual(response["content-type"], "text/plain") 302 self.assertTrue(content.startswith(b"Unable to find")) 303 self.assertEqual(response.status, 400) 304 305 def testGetConnectionRefused(self): 306 self.http.force_exception_to_status_code = False 307 try: 308 self.http.request("http://localhost:7777/") 309 self.fail("An socket.error exception must be thrown on Connection Refused.") 310 except socket.error: 311 pass 312 313 # Now test with exceptions turned off 314 self.http.force_exception_to_status_code = True 315 316 (response, content) = self.http.request("http://localhost:7777/") 317 self.assertEqual(response["content-type"], "text/plain") 318 self.assertTrue(b"Connection refused" in content) 319 self.assertEqual(response.status, 400) 320 321 def testGetIRI(self): 322 if sys.version_info >= (2, 3): 323 uri = urllib.parse.urljoin( 324 base, "reflector/reflector.cgi?d=\N{CYRILLIC CAPITAL LETTER DJE}" 325 ) 326 (response, content) = self.http.request(uri, "GET") 327 d = self.reflector(content) 328 self.assertTrue("QUERY_STRING" in d) 329 self.assertTrue(d["QUERY_STRING"].find("%D0%82") > 0) 330 331 def testGetIsDefaultMethod(self): 332 # Test that GET is the default method 333 uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi") 334 (response, content) = self.http.request(uri) 335 self.assertEqual(response["x-method"], "GET") 336 337 def testDifferentMethods(self): 338 # Test that all methods can be used 339 uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi") 340 for method in ["GET", "PUT", "DELETE", "POST"]: 341 (response, content) = self.http.request(uri, method, body=b" ") 342 self.assertEqual(response["x-method"], method) 343 344 def testHeadRead(self): 345 # Test that we don't try to read the response of a HEAD request 346 # since httplib blocks response.read() for HEAD requests. 347 # Oddly enough this doesn't appear as a problem when doing HEAD requests 348 # against Apache servers. 349 uri = "http://www.google.com/" 350 (response, content) = self.http.request(uri, "HEAD") 351 self.assertEqual(response.status, 200) 352 self.assertEqual(content, b"") 353 354 def testGetNoCache(self): 355 # Test that can do a GET w/o the cache turned on. 356 http = httplib2.Http() 357 uri = urllib.parse.urljoin(base, "304/test_etag.txt") 358 (response, content) = http.request(uri, "GET") 359 self.assertEqual(response.status, 200) 360 self.assertEqual(response.previous, None) 361 362 def testGetOnlyIfCachedCacheHit(self): 363 # Test that can do a GET with cache and 'only-if-cached' 364 uri = urllib.parse.urljoin(base, "304/test_etag.txt") 365 (response, content) = self.http.request(uri, "GET") 366 (response, content) = self.http.request( 367 uri, "GET", headers={"cache-control": "only-if-cached"} 368 ) 369 self.assertEqual(response.fromcache, True) 370 self.assertEqual(response.status, 200) 371 372 def testGetOnlyIfCachedCacheMiss(self): 373 # Test that can do a GET with no cache with 'only-if-cached' 374 uri = urllib.parse.urljoin(base, "304/test_etag.txt") 375 (response, content) = self.http.request( 376 uri, "GET", headers={"cache-control": "only-if-cached"} 377 ) 378 self.assertEqual(response.fromcache, False) 379 self.assertEqual(response.status, 504) 380 381 def testGetOnlyIfCachedNoCacheAtAll(self): 382 # Test that can do a GET with no cache with 'only-if-cached' 383 # Of course, there might be an intermediary beyond us 384 # that responds to the 'only-if-cached', so this 385 # test can't really be guaranteed to pass. 386 http = httplib2.Http() 387 uri = urllib.parse.urljoin(base, "304/test_etag.txt") 388 (response, content) = http.request( 389 uri, "GET", headers={"cache-control": "only-if-cached"} 390 ) 391 self.assertEqual(response.fromcache, False) 392 self.assertEqual(response.status, 504) 393 394 def testUserAgent(self): 395 # Test that we provide a default user-agent 396 uri = urllib.parse.urljoin(base, "user-agent/test.cgi") 397 (response, content) = self.http.request(uri, "GET") 398 self.assertEqual(response.status, 200) 399 self.assertTrue(content.startswith(b"Python-httplib2/")) 400 401 def testUserAgentNonDefault(self): 402 # Test that the default user-agent can be over-ridden 403 404 uri = urllib.parse.urljoin(base, "user-agent/test.cgi") 405 (response, content) = self.http.request( 406 uri, "GET", headers={"User-Agent": "fred/1.0"} 407 ) 408 self.assertEqual(response.status, 200) 409 self.assertTrue(content.startswith(b"fred/1.0")) 410 411 def testGet300WithLocation(self): 412 # Test the we automatically follow 300 redirects if a Location: header is provided 413 uri = urllib.parse.urljoin(base, "300/with-location-header.asis") 414 (response, content) = self.http.request(uri, "GET") 415 self.assertEqual(response.status, 200) 416 self.assertEqual(content, b"This is the final destination.\n") 417 self.assertEqual(response.previous.status, 300) 418 self.assertEqual(response.previous.fromcache, False) 419 420 # Confirm that the intermediate 300 is not cached 421 (response, content) = self.http.request(uri, "GET") 422 self.assertEqual(response.status, 200) 423 self.assertEqual(content, b"This is the final destination.\n") 424 self.assertEqual(response.previous.status, 300) 425 self.assertEqual(response.previous.fromcache, False) 426 427 def testGet300WithLocationNoRedirect(self): 428 # Test the we automatically follow 300 redirects if a Location: header is provided 429 self.http.follow_redirects = False 430 uri = urllib.parse.urljoin(base, "300/with-location-header.asis") 431 (response, content) = self.http.request(uri, "GET") 432 self.assertEqual(response.status, 300) 433 434 def testGet300WithoutLocation(self): 435 # Not giving a Location: header in a 300 response is acceptable 436 # In which case we just return the 300 response 437 uri = urllib.parse.urljoin(base, "300/without-location-header.asis") 438 (response, content) = self.http.request(uri, "GET") 439 self.assertEqual(response.status, 300) 440 self.assertTrue(response["content-type"].startswith("text/html")) 441 self.assertEqual(response.previous, None) 442 443 def testGet301(self): 444 # Test that we automatically follow 301 redirects 445 # and that we cache the 301 response 446 uri = urllib.parse.urljoin(base, "301/onestep.asis") 447 destination = urllib.parse.urljoin(base, "302/final-destination.txt") 448 (response, content) = self.http.request(uri, "GET") 449 self.assertEqual(response.status, 200) 450 self.assertTrue("content-location" in response) 451 self.assertEqual(response["content-location"], destination) 452 self.assertEqual(content, b"This is the final destination.\n") 453 self.assertEqual(response.previous.status, 301) 454 self.assertEqual(response.previous.fromcache, False) 455 456 (response, content) = self.http.request(uri, "GET") 457 self.assertEqual(response.status, 200) 458 self.assertEqual(response["content-location"], destination) 459 self.assertEqual(content, b"This is the final destination.\n") 460 self.assertEqual(response.previous.status, 301) 461 self.assertEqual(response.previous.fromcache, True) 462 463 def testHead301(self): 464 # Test that we automatically follow 301 redirects 465 uri = urllib.parse.urljoin(base, "301/onestep.asis") 466 (response, content) = self.http.request(uri, "HEAD") 467 self.assertEqual(response.status, 200) 468 self.assertEqual(response.previous.status, 301) 469 self.assertEqual(response.previous.fromcache, False) 470 471 def testGet301NoRedirect(self): 472 # Test that we automatically follow 301 redirects 473 # and that we cache the 301 response 474 self.http.follow_redirects = False 475 uri = urllib.parse.urljoin(base, "301/onestep.asis") 476 destination = urllib.parse.urljoin(base, "302/final-destination.txt") 477 (response, content) = self.http.request(uri, "GET") 478 self.assertEqual(response.status, 301) 479 480 def testGet302(self): 481 # Test that we automatically follow 302 redirects 482 # and that we DO NOT cache the 302 response 483 uri = urllib.parse.urljoin(base, "302/onestep.asis") 484 destination = urllib.parse.urljoin(base, "302/final-destination.txt") 485 (response, content) = self.http.request(uri, "GET") 486 self.assertEqual(response.status, 200) 487 self.assertEqual(response["content-location"], destination) 488 self.assertEqual(content, b"This is the final destination.\n") 489 self.assertEqual(response.previous.status, 302) 490 self.assertEqual(response.previous.fromcache, False) 491 492 uri = urllib.parse.urljoin(base, "302/onestep.asis") 493 (response, content) = self.http.request(uri, "GET") 494 self.assertEqual(response.status, 200) 495 self.assertEqual(response.fromcache, True) 496 self.assertEqual(response["content-location"], destination) 497 self.assertEqual(content, b"This is the final destination.\n") 498 self.assertEqual(response.previous.status, 302) 499 self.assertEqual(response.previous.fromcache, False) 500 self.assertEqual(response.previous["content-location"], uri) 501 502 uri = urllib.parse.urljoin(base, "302/twostep.asis") 503 504 (response, content) = self.http.request(uri, "GET") 505 self.assertEqual(response.status, 200) 506 self.assertEqual(response.fromcache, True) 507 self.assertEqual(content, b"This is the final destination.\n") 508 self.assertEqual(response.previous.status, 302) 509 self.assertEqual(response.previous.fromcache, False) 510 511 def testGet302RedirectionLimit(self): 512 # Test that we can set a lower redirection limit 513 # and that we raise an exception when we exceed 514 # that limit. 515 self.http.force_exception_to_status_code = False 516 517 uri = urllib.parse.urljoin(base, "302/twostep.asis") 518 try: 519 (response, content) = self.http.request(uri, "GET", redirections=1) 520 self.fail("This should not happen") 521 except httplib2.RedirectLimit: 522 pass 523 except Exception as e: 524 self.fail("Threw wrong kind of exception ") 525 526 # Re-run the test with out the exceptions 527 self.http.force_exception_to_status_code = True 528 529 (response, content) = self.http.request(uri, "GET", redirections=1) 530 self.assertEqual(response.status, 500) 531 self.assertTrue(response.reason.startswith("Redirected more")) 532 self.assertEqual("302", response["status"]) 533 self.assertTrue(content.startswith(b"<html>")) 534 self.assertTrue(response.previous != None) 535 536 def testGet302NoLocation(self): 537 # Test that we throw an exception when we get 538 # a 302 with no Location: header. 539 self.http.force_exception_to_status_code = False 540 uri = urllib.parse.urljoin(base, "302/no-location.asis") 541 try: 542 (response, content) = self.http.request(uri, "GET") 543 self.fail("Should never reach here") 544 except httplib2.RedirectMissingLocation: 545 pass 546 except Exception as e: 547 self.fail("Threw wrong kind of exception ") 548 549 # Re-run the test with out the exceptions 550 self.http.force_exception_to_status_code = True 551 552 (response, content) = self.http.request(uri, "GET") 553 self.assertEqual(response.status, 500) 554 self.assertTrue(response.reason.startswith("Redirected but")) 555 self.assertEqual("302", response["status"]) 556 self.assertTrue(content.startswith(b"This is content")) 557 558 def testGet301ViaHttps(self): 559 # Google always redirects to http://google.com 560 (response, content) = self.http.request("https://code.google.com/apis/", "GET") 561 self.assertEqual(200, response.status) 562 self.assertEqual(301, response.previous.status) 563 564 def testGetViaHttps(self): 565 # Test that we can handle HTTPS 566 (response, content) = self.http.request("https://google.com/adsense/", "GET") 567 self.assertEqual(200, response.status) 568 569 def testGetViaHttpsSpecViolationOnLocation(self): 570 # Test that we follow redirects through HTTPS 571 # even if they violate the spec by including 572 # a relative Location: header instead of an 573 # absolute one. 574 (response, content) = self.http.request("https://google.com/adsense", "GET") 575 self.assertEqual(200, response.status) 576 self.assertNotEqual(None, response.previous) 577 578 def testGetViaHttpsKeyCert(self): 579 # At this point I can only test 580 # that the key and cert files are passed in 581 # correctly to httplib. It would be nice to have 582 # a real https endpoint to test against. 583 http = httplib2.Http(timeout=2) 584 585 http.add_certificate("akeyfile", "acertfile", "bitworking.org") 586 try: 587 (response, content) = http.request("https://bitworking.org", "GET") 588 except AttributeError: 589 self.assertEqual( 590 http.connections["https:bitworking.org"].key_file, "akeyfile" 591 ) 592 self.assertEqual( 593 http.connections["https:bitworking.org"].cert_file, "acertfile" 594 ) 595 except IOError: 596 # Skip on 3.2 597 pass 598 599 try: 600 (response, content) = http.request("https://notthere.bitworking.org", "GET") 601 except httplib2.ServerNotFoundError: 602 self.assertEqual( 603 http.connections["https:notthere.bitworking.org"].key_file, None 604 ) 605 self.assertEqual( 606 http.connections["https:notthere.bitworking.org"].cert_file, None 607 ) 608 except IOError: 609 # Skip on 3.2 610 pass 611 612 def testSslCertValidation(self): 613 # Test that we get an ssl.SSLError when specifying a non-existent CA 614 # certs file. 615 http = httplib2.Http(ca_certs="/nosuchfile") 616 self.assertRaises(IOError, http.request, "https://www.google.com/", "GET") 617 618 # Test that we get a SSLHandshakeError if we try to access 619 # https://www.google.com, using a CA cert file that doesn't contain 620 # the CA Google uses (i.e., simulating a cert that's not signed by a 621 # trusted CA). 622 other_ca_certs = os.path.join( 623 os.path.dirname(os.path.abspath(httplib2.__file__)), 624 "test", 625 "other_cacerts.txt", 626 ) 627 http = httplib2.Http(ca_certs=other_ca_certs) 628 self.assertRaises(ssl.SSLError, http.request, "https://www.google.com/", "GET") 629 630 def testSniHostnameValidation(self): 631 self.http.request("https://google.com/", method="GET") 632 633 def testGet303(self): 634 # Do a follow-up GET on a Location: header 635 # returned from a POST that gave a 303. 636 uri = urllib.parse.urljoin(base, "303/303.cgi") 637 (response, content) = self.http.request(uri, "POST", " ") 638 self.assertEqual(response.status, 200) 639 self.assertEqual(content, b"This is the final destination.\n") 640 self.assertEqual(response.previous.status, 303) 641 642 def testGet303NoRedirect(self): 643 # Do a follow-up GET on a Location: header 644 # returned from a POST that gave a 303. 645 self.http.follow_redirects = False 646 uri = urllib.parse.urljoin(base, "303/303.cgi") 647 (response, content) = self.http.request(uri, "POST", " ") 648 self.assertEqual(response.status, 303) 649 650 def test303ForDifferentMethods(self): 651 # Test that all methods can be used 652 uri = urllib.parse.urljoin(base, "303/redirect-to-reflector.cgi") 653 for (method, method_on_303) in [ 654 ("PUT", "GET"), 655 ("DELETE", "GET"), 656 ("POST", "GET"), 657 ("GET", "GET"), 658 ("HEAD", "GET"), 659 ]: 660 (response, content) = self.http.request(uri, method, body=b" ") 661 self.assertEqual(response["x-method"], method_on_303) 662 663 def testGet304(self): 664 # Test that we use ETags properly to validate our cache 665 uri = urllib.parse.urljoin(base, "304/test_etag.txt") 666 (response, content) = self.http.request( 667 uri, "GET", headers={"accept-encoding": "identity"} 668 ) 669 self.assertNotEqual(response["etag"], "") 670 671 (response, content) = self.http.request( 672 uri, "GET", headers={"accept-encoding": "identity"} 673 ) 674 (response, content) = self.http.request( 675 uri, 676 "GET", 677 headers={"accept-encoding": "identity", "cache-control": "must-revalidate"}, 678 ) 679 self.assertEqual(response.status, 200) 680 self.assertEqual(response.fromcache, True) 681 682 cache_file_name = os.path.join( 683 cacheDirName, httplib2.safename(httplib2.urlnorm(uri)[-1]) 684 ) 685 f = open(cache_file_name, "r") 686 status_line = f.readline() 687 f.close() 688 689 self.assertTrue(status_line.startswith("status:")) 690 691 (response, content) = self.http.request( 692 uri, "HEAD", headers={"accept-encoding": "identity"} 693 ) 694 self.assertEqual(response.status, 200) 695 self.assertEqual(response.fromcache, True) 696 697 (response, content) = self.http.request( 698 uri, "GET", headers={"accept-encoding": "identity", "range": "bytes=0-0"} 699 ) 700 self.assertEqual(response.status, 206) 701 self.assertEqual(response.fromcache, False) 702 703 def testGetIgnoreEtag(self): 704 # Test that we can forcibly ignore ETags 705 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi") 706 (response, content) = self.http.request( 707 uri, "GET", headers={"accept-encoding": "identity"} 708 ) 709 self.assertNotEqual(response["etag"], "") 710 711 (response, content) = self.http.request( 712 uri, 713 "GET", 714 headers={"accept-encoding": "identity", "cache-control": "max-age=0"}, 715 ) 716 d = self.reflector(content) 717 self.assertTrue("HTTP_IF_NONE_MATCH" in d) 718 719 self.http.ignore_etag = True 720 (response, content) = self.http.request( 721 uri, 722 "GET", 723 headers={"accept-encoding": "identity", "cache-control": "max-age=0"}, 724 ) 725 d = self.reflector(content) 726 self.assertEqual(response.fromcache, False) 727 self.assertFalse("HTTP_IF_NONE_MATCH" in d) 728 729 def testOverrideEtag(self): 730 # Test that we can forcibly ignore ETags 731 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi") 732 (response, content) = self.http.request( 733 uri, "GET", headers={"accept-encoding": "identity"} 734 ) 735 self.assertNotEqual(response["etag"], "") 736 737 (response, content) = self.http.request( 738 uri, 739 "GET", 740 headers={"accept-encoding": "identity", "cache-control": "max-age=0"}, 741 ) 742 d = self.reflector(content) 743 self.assertTrue("HTTP_IF_NONE_MATCH" in d) 744 self.assertNotEqual(d["HTTP_IF_NONE_MATCH"], "fred") 745 746 (response, content) = self.http.request( 747 uri, 748 "GET", 749 headers={ 750 "accept-encoding": "identity", 751 "cache-control": "max-age=0", 752 "if-none-match": "fred", 753 }, 754 ) 755 d = self.reflector(content) 756 self.assertTrue("HTTP_IF_NONE_MATCH" in d) 757 self.assertEqual(d["HTTP_IF_NONE_MATCH"], "fred") 758 759 # MAP-commented this out because it consistently fails 760 # def testGet304EndToEnd(self): 761 # # Test that end to end headers get overwritten in the cache 762 # uri = urllib.parse.urljoin(base, "304/end2end.cgi") 763 # (response, content) = self.http.request(uri, "GET") 764 # self.assertNotEqual(response['etag'], "") 765 # old_date = response['date'] 766 # time.sleep(2) 767 # 768 # (response, content) = self.http.request(uri, "GET", headers = {'Cache-Control': 'max-age=0'}) 769 # # The response should be from the cache, but the Date: header should be updated. 770 # new_date = response['date'] 771 # self.assertNotEqual(new_date, old_date) 772 # self.assertEqual(response.status, 200) 773 # self.assertEqual(response.fromcache, True) 774 775 def testGet304LastModified(self): 776 # Test that we can still handle a 304 777 # by only using the last-modified cache validator. 778 uri = urllib.parse.urljoin( 779 base, "304/last-modified-only/last-modified-only.txt" 780 ) 781 (response, content) = self.http.request(uri, "GET") 782 783 self.assertNotEqual(response["last-modified"], "") 784 (response, content) = self.http.request(uri, "GET") 785 (response, content) = self.http.request(uri, "GET") 786 self.assertEqual(response.status, 200) 787 self.assertEqual(response.fromcache, True) 788 789 def testGet307(self): 790 # Test that we do follow 307 redirects but 791 # do not cache the 307 792 uri = urllib.parse.urljoin(base, "307/onestep.asis") 793 (response, content) = self.http.request(uri, "GET") 794 self.assertEqual(response.status, 200) 795 self.assertEqual(content, b"This is the final destination.\n") 796 self.assertEqual(response.previous.status, 307) 797 self.assertEqual(response.previous.fromcache, False) 798 799 (response, content) = self.http.request(uri, "GET") 800 self.assertEqual(response.status, 200) 801 self.assertEqual(response.fromcache, True) 802 self.assertEqual(content, b"This is the final destination.\n") 803 self.assertEqual(response.previous.status, 307) 804 self.assertEqual(response.previous.fromcache, False) 805 806 def testGet410(self): 807 # Test that we pass 410's through 808 uri = urllib.parse.urljoin(base, "410/410.asis") 809 (response, content) = self.http.request(uri, "GET") 810 self.assertEqual(response.status, 410) 811 812 def testVaryHeaderSimple(self): 813 """RFC 2616 13.6 When the cache receives a subsequent request whose Request-URI specifies one or more cache entries including a Vary header field, the cache MUST NOT use such a cache entry to construct a response to the new request unless all of the selecting request-headers present in the new request match the corresponding stored request-headers in the original request. 814 815 """ 816 # test that the vary header is sent 817 uri = urllib.parse.urljoin(base, "vary/accept.asis") 818 (response, content) = self.http.request( 819 uri, "GET", headers={"Accept": "text/plain"} 820 ) 821 self.assertEqual(response.status, 200) 822 self.assertTrue("vary" in response) 823 824 # get the resource again, from the cache since accept header in this 825 # request is the same as the request 826 (response, content) = self.http.request( 827 uri, "GET", headers={"Accept": "text/plain"} 828 ) 829 self.assertEqual(response.status, 200) 830 self.assertEqual(response.fromcache, True, msg="Should be from cache") 831 832 # get the resource again, not from cache since Accept headers does not match 833 (response, content) = self.http.request( 834 uri, "GET", headers={"Accept": "text/html"} 835 ) 836 self.assertEqual(response.status, 200) 837 self.assertEqual(response.fromcache, False, msg="Should not be from cache") 838 839 # get the resource again, without any Accept header, so again no match 840 (response, content) = self.http.request(uri, "GET") 841 self.assertEqual(response.status, 200) 842 self.assertEqual(response.fromcache, False, msg="Should not be from cache") 843 844 def testNoVary(self): 845 pass 846 # when there is no vary, a different Accept header (e.g.) should not 847 # impact if the cache is used 848 # test that the vary header is not sent 849 # uri = urllib.parse.urljoin(base, "vary/no-vary.asis") 850 # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'}) 851 # self.assertEqual(response.status, 200) 852 # self.assertFalse('vary' in response) 853 # 854 # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'}) 855 # self.assertEqual(response.status, 200) 856 # self.assertEqual(response.fromcache, True, msg="Should be from cache") 857 # 858 # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'}) 859 # self.assertEqual(response.status, 200) 860 # self.assertEqual(response.fromcache, True, msg="Should be from cache") 861 862 def testVaryHeaderDouble(self): 863 uri = urllib.parse.urljoin(base, "vary/accept-double.asis") 864 (response, content) = self.http.request( 865 uri, 866 "GET", 867 headers={ 868 "Accept": "text/plain", 869 "Accept-Language": "da, en-gb;q=0.8, en;q=0.7", 870 }, 871 ) 872 self.assertEqual(response.status, 200) 873 self.assertTrue("vary" in response) 874 875 # we are from cache 876 (response, content) = self.http.request( 877 uri, 878 "GET", 879 headers={ 880 "Accept": "text/plain", 881 "Accept-Language": "da, en-gb;q=0.8, en;q=0.7", 882 }, 883 ) 884 self.assertEqual(response.fromcache, True, msg="Should be from cache") 885 886 (response, content) = self.http.request( 887 uri, "GET", headers={"Accept": "text/plain"} 888 ) 889 self.assertEqual(response.status, 200) 890 self.assertEqual(response.fromcache, False) 891 892 # get the resource again, not from cache, varied headers don't match exact 893 (response, content) = self.http.request( 894 uri, "GET", headers={"Accept-Language": "da"} 895 ) 896 self.assertEqual(response.status, 200) 897 self.assertEqual(response.fromcache, False, msg="Should not be from cache") 898 899 def testVaryUnusedHeader(self): 900 # A header's value is not considered to vary if it's not used at all. 901 uri = urllib.parse.urljoin(base, "vary/unused-header.asis") 902 (response, content) = self.http.request( 903 uri, "GET", headers={"Accept": "text/plain"} 904 ) 905 self.assertEqual(response.status, 200) 906 self.assertTrue("vary" in response) 907 908 # we are from cache 909 (response, content) = self.http.request( 910 uri, "GET", headers={"Accept": "text/plain"} 911 ) 912 self.assertEqual(response.fromcache, True, msg="Should be from cache") 913 914 def testHeadGZip(self): 915 # Test that we don't try to decompress a HEAD response 916 uri = urllib.parse.urljoin(base, "gzip/final-destination.txt") 917 (response, content) = self.http.request(uri, "HEAD") 918 self.assertEqual(response.status, 200) 919 self.assertNotEqual(int(response["content-length"]), 0) 920 self.assertEqual(content, b"") 921 922 def testGetGZip(self): 923 # Test that we support gzip compression 924 uri = urllib.parse.urljoin(base, "gzip/final-destination.txt") 925 (response, content) = self.http.request(uri, "GET") 926 self.assertEqual(response.status, 200) 927 self.assertFalse("content-encoding" in response) 928 self.assertTrue("-content-encoding" in response) 929 self.assertEqual( 930 int(response["content-length"]), len(b"This is the final destination.\n") 931 ) 932 self.assertEqual(content, b"This is the final destination.\n") 933 934 def testPostAndGZipResponse(self): 935 uri = urllib.parse.urljoin(base, "gzip/post.cgi") 936 (response, content) = self.http.request(uri, "POST", body=" ") 937 self.assertEqual(response.status, 200) 938 self.assertFalse("content-encoding" in response) 939 self.assertTrue("-content-encoding" in response) 940 941 def testGetGZipFailure(self): 942 # Test that we raise a good exception when the gzip fails 943 self.http.force_exception_to_status_code = False 944 uri = urllib.parse.urljoin(base, "gzip/failed-compression.asis") 945 try: 946 (response, content) = self.http.request(uri, "GET") 947 self.fail("Should never reach here") 948 except httplib2.FailedToDecompressContent: 949 pass 950 except Exception: 951 self.fail("Threw wrong kind of exception") 952 953 # Re-run the test with out the exceptions 954 self.http.force_exception_to_status_code = True 955 956 (response, content) = self.http.request(uri, "GET") 957 self.assertEqual(response.status, 500) 958 self.assertTrue(response.reason.startswith("Content purported")) 959 960 def testIndividualTimeout(self): 961 uri = urllib.parse.urljoin(base, "timeout/timeout.cgi") 962 http = httplib2.Http(timeout=1) 963 http.force_exception_to_status_code = True 964 965 (response, content) = http.request(uri) 966 self.assertEqual(response.status, 408) 967 self.assertTrue(response.reason.startswith("Request Timeout")) 968 self.assertTrue(content.startswith(b"Request Timeout")) 969 970 def testGetDeflate(self): 971 # Test that we support deflate compression 972 uri = urllib.parse.urljoin(base, "deflate/deflated.asis") 973 (response, content) = self.http.request(uri, "GET") 974 self.assertEqual(response.status, 200) 975 self.assertFalse("content-encoding" in response) 976 self.assertEqual( 977 int(response["content-length"]), len("This is the final destination.") 978 ) 979 self.assertEqual(content, b"This is the final destination.") 980 981 def testGetDeflateFailure(self): 982 # Test that we raise a good exception when the deflate fails 983 self.http.force_exception_to_status_code = False 984 985 uri = urllib.parse.urljoin(base, "deflate/failed-compression.asis") 986 try: 987 (response, content) = self.http.request(uri, "GET") 988 self.fail("Should never reach here") 989 except httplib2.FailedToDecompressContent: 990 pass 991 except Exception: 992 self.fail("Threw wrong kind of exception") 993 994 # Re-run the test with out the exceptions 995 self.http.force_exception_to_status_code = True 996 997 (response, content) = self.http.request(uri, "GET") 998 self.assertEqual(response.status, 500) 999 self.assertTrue(response.reason.startswith("Content purported")) 1000 1001 def testGetDuplicateHeaders(self): 1002 # Test that duplicate headers get concatenated via ',' 1003 uri = urllib.parse.urljoin(base, "duplicate-headers/multilink.asis") 1004 (response, content) = self.http.request(uri, "GET") 1005 self.assertEqual(response.status, 200) 1006 self.assertEqual(content, b"This is content\n") 1007 self.assertEqual( 1008 response["link"].split(",")[0], 1009 '<http://bitworking.org>; rel="home"; title="BitWorking"', 1010 ) 1011 1012 def testGetCacheControlNoCache(self): 1013 # Test Cache-Control: no-cache on requests 1014 uri = urllib.parse.urljoin(base, "304/test_etag.txt") 1015 (response, content) = self.http.request( 1016 uri, "GET", headers={"accept-encoding": "identity"} 1017 ) 1018 self.assertNotEqual(response["etag"], "") 1019 (response, content) = self.http.request( 1020 uri, "GET", headers={"accept-encoding": "identity"} 1021 ) 1022 self.assertEqual(response.status, 200) 1023 self.assertEqual(response.fromcache, True) 1024 1025 (response, content) = self.http.request( 1026 uri, 1027 "GET", 1028 headers={"accept-encoding": "identity", "Cache-Control": "no-cache"}, 1029 ) 1030 self.assertEqual(response.status, 200) 1031 self.assertEqual(response.fromcache, False) 1032 1033 def testGetCacheControlPragmaNoCache(self): 1034 # Test Pragma: no-cache on requests 1035 uri = urllib.parse.urljoin(base, "304/test_etag.txt") 1036 (response, content) = self.http.request( 1037 uri, "GET", headers={"accept-encoding": "identity"} 1038 ) 1039 self.assertNotEqual(response["etag"], "") 1040 (response, content) = self.http.request( 1041 uri, "GET", headers={"accept-encoding": "identity"} 1042 ) 1043 self.assertEqual(response.status, 200) 1044 self.assertEqual(response.fromcache, True) 1045 1046 (response, content) = self.http.request( 1047 uri, "GET", headers={"accept-encoding": "identity", "Pragma": "no-cache"} 1048 ) 1049 self.assertEqual(response.status, 200) 1050 self.assertEqual(response.fromcache, False) 1051 1052 def testGetCacheControlNoStoreRequest(self): 1053 # A no-store request means that the response should not be stored. 1054 uri = urllib.parse.urljoin(base, "304/test_etag.txt") 1055 1056 (response, content) = self.http.request( 1057 uri, "GET", headers={"Cache-Control": "no-store"} 1058 ) 1059 self.assertEqual(response.status, 200) 1060 self.assertEqual(response.fromcache, False) 1061 1062 (response, content) = self.http.request( 1063 uri, "GET", headers={"Cache-Control": "no-store"} 1064 ) 1065 self.assertEqual(response.status, 200) 1066 self.assertEqual(response.fromcache, False) 1067 1068 def testGetCacheControlNoStoreResponse(self): 1069 # A no-store response means that the response should not be stored. 1070 uri = urllib.parse.urljoin(base, "no-store/no-store.asis") 1071 1072 (response, content) = self.http.request(uri, "GET") 1073 self.assertEqual(response.status, 200) 1074 self.assertEqual(response.fromcache, False) 1075 1076 (response, content) = self.http.request(uri, "GET") 1077 self.assertEqual(response.status, 200) 1078 self.assertEqual(response.fromcache, False) 1079 1080 def testGetCacheControlNoCacheNoStoreRequest(self): 1081 # Test that a no-store, no-cache clears the entry from the cache 1082 # even if it was cached previously. 1083 uri = urllib.parse.urljoin(base, "304/test_etag.txt") 1084 1085 (response, content) = self.http.request(uri, "GET") 1086 (response, content) = self.http.request(uri, "GET") 1087 self.assertEqual(response.fromcache, True) 1088 (response, content) = self.http.request( 1089 uri, "GET", headers={"Cache-Control": "no-store, no-cache"} 1090 ) 1091 (response, content) = self.http.request( 1092 uri, "GET", headers={"Cache-Control": "no-store, no-cache"} 1093 ) 1094 self.assertEqual(response.status, 200) 1095 self.assertEqual(response.fromcache, False) 1096 1097 def testUpdateInvalidatesCache(self): 1098 # Test that calling PUT or DELETE on a 1099 # URI that is cache invalidates that cache. 1100 uri = urllib.parse.urljoin(base, "304/test_etag.txt") 1101 1102 (response, content) = self.http.request(uri, "GET") 1103 (response, content) = self.http.request(uri, "GET") 1104 self.assertEqual(response.fromcache, True) 1105 (response, content) = self.http.request(uri, "DELETE") 1106 self.assertEqual(response.status, 405) 1107 1108 (response, content) = self.http.request(uri, "GET") 1109 self.assertEqual(response.fromcache, False) 1110 1111 def testUpdateUsesCachedETag(self): 1112 # Test that we natively support http://www.w3.org/1999/04/Editing/ 1113 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi") 1114 1115 (response, content) = self.http.request(uri, "GET") 1116 self.assertEqual(response.status, 200) 1117 self.assertEqual(response.fromcache, False) 1118 (response, content) = self.http.request(uri, "GET") 1119 self.assertEqual(response.status, 200) 1120 self.assertEqual(response.fromcache, True) 1121 (response, content) = self.http.request(uri, "PUT", body="foo") 1122 self.assertEqual(response.status, 200) 1123 (response, content) = self.http.request(uri, "PUT", body="foo") 1124 self.assertEqual(response.status, 412) 1125 1126 def testUpdatePatchUsesCachedETag(self): 1127 # Test that we natively support http://www.w3.org/1999/04/Editing/ 1128 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi") 1129 1130 (response, content) = self.http.request(uri, "GET") 1131 self.assertEqual(response.status, 200) 1132 self.assertEqual(response.fromcache, False) 1133 (response, content) = self.http.request(uri, "GET") 1134 self.assertEqual(response.status, 200) 1135 self.assertEqual(response.fromcache, True) 1136 (response, content) = self.http.request(uri, "PATCH", body="foo") 1137 self.assertEqual(response.status, 200) 1138 (response, content) = self.http.request(uri, "PATCH", body="foo") 1139 self.assertEqual(response.status, 412) 1140 1141 def testUpdateUsesCachedETagAndOCMethod(self): 1142 # Test that we natively support http://www.w3.org/1999/04/Editing/ 1143 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi") 1144 1145 (response, content) = self.http.request(uri, "GET") 1146 self.assertEqual(response.status, 200) 1147 self.assertEqual(response.fromcache, False) 1148 (response, content) = self.http.request(uri, "GET") 1149 self.assertEqual(response.status, 200) 1150 self.assertEqual(response.fromcache, True) 1151 self.http.optimistic_concurrency_methods.append("DELETE") 1152 (response, content) = self.http.request(uri, "DELETE") 1153 self.assertEqual(response.status, 200) 1154 1155 def testUpdateUsesCachedETagOverridden(self): 1156 # Test that we natively support http://www.w3.org/1999/04/Editing/ 1157 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi") 1158 1159 (response, content) = self.http.request(uri, "GET") 1160 self.assertEqual(response.status, 200) 1161 self.assertEqual(response.fromcache, False) 1162 (response, content) = self.http.request(uri, "GET") 1163 self.assertEqual(response.status, 200) 1164 self.assertEqual(response.fromcache, True) 1165 (response, content) = self.http.request( 1166 uri, "PUT", body="foo", headers={"if-match": "fred"} 1167 ) 1168 self.assertEqual(response.status, 412) 1169 1170 def testBasicAuth(self): 1171 # Test Basic Authentication 1172 uri = urllib.parse.urljoin(base, "basic/file.txt") 1173 (response, content) = self.http.request(uri, "GET") 1174 self.assertEqual(response.status, 401) 1175 1176 uri = urllib.parse.urljoin(base, "basic/") 1177 (response, content) = self.http.request(uri, "GET") 1178 self.assertEqual(response.status, 401) 1179 1180 self.http.add_credentials("joe", "password") 1181 (response, content) = self.http.request(uri, "GET") 1182 self.assertEqual(response.status, 200) 1183 1184 uri = urllib.parse.urljoin(base, "basic/file.txt") 1185 (response, content) = self.http.request(uri, "GET") 1186 self.assertEqual(response.status, 200) 1187 1188 def testBasicAuthWithDomain(self): 1189 # Test Basic Authentication 1190 uri = urllib.parse.urljoin(base, "basic/file.txt") 1191 (response, content) = self.http.request(uri, "GET") 1192 self.assertEqual(response.status, 401) 1193 1194 uri = urllib.parse.urljoin(base, "basic/") 1195 (response, content) = self.http.request(uri, "GET") 1196 self.assertEqual(response.status, 401) 1197 1198 self.http.add_credentials("joe", "password", "example.org") 1199 (response, content) = self.http.request(uri, "GET") 1200 self.assertEqual(response.status, 401) 1201 1202 uri = urllib.parse.urljoin(base, "basic/file.txt") 1203 (response, content) = self.http.request(uri, "GET") 1204 self.assertEqual(response.status, 401) 1205 1206 domain = urllib.parse.urlparse(base)[1] 1207 self.http.add_credentials("joe", "password", domain) 1208 (response, content) = self.http.request(uri, "GET") 1209 self.assertEqual(response.status, 200) 1210 1211 uri = urllib.parse.urljoin(base, "basic/file.txt") 1212 (response, content) = self.http.request(uri, "GET") 1213 self.assertEqual(response.status, 200) 1214 1215 def testBasicAuthTwoDifferentCredentials(self): 1216 # Test Basic Authentication with multiple sets of credentials 1217 uri = urllib.parse.urljoin(base, "basic2/file.txt") 1218 (response, content) = self.http.request(uri, "GET") 1219 self.assertEqual(response.status, 401) 1220 1221 uri = urllib.parse.urljoin(base, "basic2/") 1222 (response, content) = self.http.request(uri, "GET") 1223 self.assertEqual(response.status, 401) 1224 1225 self.http.add_credentials("fred", "barney") 1226 (response, content) = self.http.request(uri, "GET") 1227 self.assertEqual(response.status, 200) 1228 1229 uri = urllib.parse.urljoin(base, "basic2/file.txt") 1230 (response, content) = self.http.request(uri, "GET") 1231 self.assertEqual(response.status, 200) 1232 1233 def testBasicAuthNested(self): 1234 # Test Basic Authentication with resources 1235 # that are nested 1236 uri = urllib.parse.urljoin(base, "basic-nested/") 1237 (response, content) = self.http.request(uri, "GET") 1238 self.assertEqual(response.status, 401) 1239 1240 uri = urllib.parse.urljoin(base, "basic-nested/subdir") 1241 (response, content) = self.http.request(uri, "GET") 1242 self.assertEqual(response.status, 401) 1243 1244 # Now add in credentials one at a time and test. 1245 self.http.add_credentials("joe", "password") 1246 1247 uri = urllib.parse.urljoin(base, "basic-nested/") 1248 (response, content) = self.http.request(uri, "GET") 1249 self.assertEqual(response.status, 200) 1250 1251 uri = urllib.parse.urljoin(base, "basic-nested/subdir") 1252 (response, content) = self.http.request(uri, "GET") 1253 self.assertEqual(response.status, 401) 1254 1255 self.http.add_credentials("fred", "barney") 1256 1257 uri = urllib.parse.urljoin(base, "basic-nested/") 1258 (response, content) = self.http.request(uri, "GET") 1259 self.assertEqual(response.status, 200) 1260 1261 uri = urllib.parse.urljoin(base, "basic-nested/subdir") 1262 (response, content) = self.http.request(uri, "GET") 1263 self.assertEqual(response.status, 200) 1264 1265 def testDigestAuth(self): 1266 # Test that we support Digest Authentication 1267 uri = urllib.parse.urljoin(base, "digest/") 1268 (response, content) = self.http.request(uri, "GET") 1269 self.assertEqual(response.status, 401) 1270 1271 self.http.add_credentials("joe", "password") 1272 (response, content) = self.http.request(uri, "GET") 1273 self.assertEqual(response.status, 200) 1274 1275 uri = urllib.parse.urljoin(base, "digest/file.txt") 1276 (response, content) = self.http.request(uri, "GET") 1277 1278 def testDigestAuthNextNonceAndNC(self): 1279 # Test that if the server sets nextnonce that we reset 1280 # the nonce count back to 1 1281 uri = urllib.parse.urljoin(base, "digest/file.txt") 1282 self.http.add_credentials("joe", "password") 1283 (response, content) = self.http.request( 1284 uri, "GET", headers={"cache-control": "no-cache"} 1285 ) 1286 info = httplib2._parse_www_authenticate(response, "authentication-info") 1287 self.assertEqual(response.status, 200) 1288 (response, content) = self.http.request( 1289 uri, "GET", headers={"cache-control": "no-cache"} 1290 ) 1291 info2 = httplib2._parse_www_authenticate(response, "authentication-info") 1292 self.assertEqual(response.status, 200) 1293 1294 if "nextnonce" in info: 1295 self.assertEqual(info2["nc"], 1) 1296 1297 def testDigestAuthStale(self): 1298 # Test that we can handle a nonce becoming stale 1299 uri = urllib.parse.urljoin(base, "digest-expire/file.txt") 1300 self.http.add_credentials("joe", "password") 1301 (response, content) = self.http.request( 1302 uri, "GET", headers={"cache-control": "no-cache"} 1303 ) 1304 info = httplib2._parse_www_authenticate(response, "authentication-info") 1305 self.assertEqual(response.status, 200) 1306 1307 time.sleep(3) 1308 # Sleep long enough that the nonce becomes stale 1309 1310 (response, content) = self.http.request( 1311 uri, "GET", headers={"cache-control": "no-cache"} 1312 ) 1313 self.assertFalse(response.fromcache) 1314 self.assertTrue(response._stale_digest) 1315 info3 = httplib2._parse_www_authenticate(response, "authentication-info") 1316 self.assertEqual(response.status, 200) 1317 1318 def reflector(self, content): 1319 return dict( 1320 [ 1321 tuple(x.split("=", 1)) 1322 for x in content.decode("utf-8").strip().split("\n") 1323 ] 1324 ) 1325 1326 def testReflector(self): 1327 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi") 1328 (response, content) = self.http.request(uri, "GET") 1329 d = self.reflector(content) 1330 self.assertTrue("HTTP_USER_AGENT" in d) 1331 1332 def testConnectionClose(self): 1333 uri = "http://www.google.com/" 1334 (response, content) = self.http.request(uri, "GET") 1335 for c in self.http.connections.values(): 1336 self.assertNotEqual(None, c.sock) 1337 (response, content) = self.http.request( 1338 uri, "GET", headers={"connection": "close"} 1339 ) 1340 for c in self.http.connections.values(): 1341 self.assertEqual(None, c.sock) 1342 1343 def testPickleHttp(self): 1344 pickled_http = pickle.dumps(self.http) 1345 new_http = pickle.loads(pickled_http) 1346 1347 self.assertEqual( 1348 sorted(new_http.__dict__.keys()), sorted(self.http.__dict__.keys()) 1349 ) 1350 for key in new_http.__dict__: 1351 if key in ("certificates", "credentials"): 1352 self.assertEqual( 1353 new_http.__dict__[key].credentials, 1354 self.http.__dict__[key].credentials, 1355 ) 1356 elif key == "cache": 1357 self.assertEqual( 1358 new_http.__dict__[key].cache, self.http.__dict__[key].cache 1359 ) 1360 else: 1361 self.assertEqual(new_http.__dict__[key], self.http.__dict__[key]) 1362 1363 def testPickleHttpWithConnection(self): 1364 self.http.request("http://bitworking.org", connection_type=_MyHTTPConnection) 1365 pickled_http = pickle.dumps(self.http) 1366 new_http = pickle.loads(pickled_http) 1367 1368 self.assertEqual(list(self.http.connections.keys()), ["http:bitworking.org"]) 1369 self.assertEqual(new_http.connections, {}) 1370 1371 def testPickleCustomRequestHttp(self): 1372 def dummy_request(*args, **kwargs): 1373 return new_request(*args, **kwargs) 1374 1375 dummy_request.dummy_attr = "dummy_value" 1376 1377 self.http.request = dummy_request 1378 pickled_http = pickle.dumps(self.http) 1379 self.assertFalse(b"S'request'" in pickled_http) 1380 1381 1382try: 1383 import memcache 1384 1385 class HttpTestMemCached(HttpTest): 1386 def setUp(self): 1387 self.cache = memcache.Client(["127.0.0.1:11211"], debug=0) 1388 # self.cache = memcache.Client(['10.0.0.4:11211'], debug=1) 1389 self.http = httplib2.Http(self.cache) 1390 self.cache.flush_all() 1391 # Not exactly sure why the sleep is needed here, but 1392 # if not present then some unit tests that rely on caching 1393 # fail. Memcached seems to lose some sets immediately 1394 # after a flush_all if the set is to a value that 1395 # was previously cached. (Maybe the flush is handled async?) 1396 time.sleep(1) 1397 self.http.clear_credentials() 1398 1399 1400except: 1401 pass 1402 1403# ------------------------------------------------------------------------ 1404 1405 1406class HttpPrivateTest(unittest.TestCase): 1407 def testParseCacheControl(self): 1408 # Test that we can parse the Cache-Control header 1409 self.assertEqual({}, httplib2._parse_cache_control({})) 1410 self.assertEqual( 1411 {"no-cache": 1}, 1412 httplib2._parse_cache_control({"cache-control": " no-cache"}), 1413 ) 1414 cc = httplib2._parse_cache_control( 1415 {"cache-control": " no-cache, max-age = 7200"} 1416 ) 1417 self.assertEqual(cc["no-cache"], 1) 1418 self.assertEqual(cc["max-age"], "7200") 1419 cc = httplib2._parse_cache_control({"cache-control": " , "}) 1420 self.assertEqual(cc[""], 1) 1421 1422 try: 1423 cc = httplib2._parse_cache_control( 1424 {"cache-control": "Max-age=3600;post-check=1800,pre-check=3600"} 1425 ) 1426 self.assertTrue("max-age" in cc) 1427 except: 1428 self.fail("Should not throw exception") 1429 1430 def testNormalizeHeaders(self): 1431 # Test that we normalize headers to lowercase 1432 h = httplib2._normalize_headers({"Cache-Control": "no-cache", "Other": "Stuff"}) 1433 self.assertTrue("cache-control" in h) 1434 self.assertTrue("other" in h) 1435 self.assertEqual("Stuff", h["other"]) 1436 1437 def testConvertByteStr(self): 1438 with self.assertRaises(TypeError): 1439 httplib2._convert_byte_str(4) 1440 self.assertEqual("Hello World", httplib2._convert_byte_str(b"Hello World")) 1441 self.assertEqual("Bye World", httplib2._convert_byte_str("Bye World")) 1442 1443 def testExpirationModelTransparent(self): 1444 # Test that no-cache makes our request TRANSPARENT 1445 response_headers = {"cache-control": "max-age=7200"} 1446 request_headers = {"cache-control": "no-cache"} 1447 self.assertEqual( 1448 "TRANSPARENT", 1449 httplib2._entry_disposition(response_headers, request_headers), 1450 ) 1451 1452 def testMaxAgeNonNumeric(self): 1453 # Test that no-cache makes our request TRANSPARENT 1454 response_headers = {"cache-control": "max-age=fred, min-fresh=barney"} 1455 request_headers = {} 1456 self.assertEqual( 1457 "STALE", httplib2._entry_disposition(response_headers, request_headers) 1458 ) 1459 1460 def testExpirationModelNoCacheResponse(self): 1461 # The date and expires point to an entry that should be 1462 # FRESH, but the no-cache over-rides that. 1463 now = time.time() 1464 response_headers = { 1465 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)), 1466 "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 4)), 1467 "cache-control": "no-cache", 1468 } 1469 request_headers = {} 1470 self.assertEqual( 1471 "STALE", httplib2._entry_disposition(response_headers, request_headers) 1472 ) 1473 1474 def testExpirationModelStaleRequestMustReval(self): 1475 # must-revalidate forces STALE 1476 self.assertEqual( 1477 "STALE", 1478 httplib2._entry_disposition({}, {"cache-control": "must-revalidate"}), 1479 ) 1480 1481 def testExpirationModelStaleResponseMustReval(self): 1482 # must-revalidate forces STALE 1483 self.assertEqual( 1484 "STALE", 1485 httplib2._entry_disposition({"cache-control": "must-revalidate"}, {}), 1486 ) 1487 1488 def testExpirationModelFresh(self): 1489 response_headers = { 1490 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()), 1491 "cache-control": "max-age=2", 1492 } 1493 request_headers = {} 1494 self.assertEqual( 1495 "FRESH", httplib2._entry_disposition(response_headers, request_headers) 1496 ) 1497 time.sleep(3) 1498 self.assertEqual( 1499 "STALE", httplib2._entry_disposition(response_headers, request_headers) 1500 ) 1501 1502 def testExpirationMaxAge0(self): 1503 response_headers = { 1504 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()), 1505 "cache-control": "max-age=0", 1506 } 1507 request_headers = {} 1508 self.assertEqual( 1509 "STALE", httplib2._entry_disposition(response_headers, request_headers) 1510 ) 1511 1512 def testExpirationModelDateAndExpires(self): 1513 now = time.time() 1514 response_headers = { 1515 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)), 1516 "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 2)), 1517 } 1518 request_headers = {} 1519 self.assertEqual( 1520 "FRESH", httplib2._entry_disposition(response_headers, request_headers) 1521 ) 1522 time.sleep(3) 1523 self.assertEqual( 1524 "STALE", httplib2._entry_disposition(response_headers, request_headers) 1525 ) 1526 1527 def testExpiresZero(self): 1528 now = time.time() 1529 response_headers = { 1530 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)), 1531 "expires": "0", 1532 } 1533 request_headers = {} 1534 self.assertEqual( 1535 "STALE", httplib2._entry_disposition(response_headers, request_headers) 1536 ) 1537 1538 def testExpirationModelDateOnly(self): 1539 now = time.time() 1540 response_headers = { 1541 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 3)) 1542 } 1543 request_headers = {} 1544 self.assertEqual( 1545 "STALE", httplib2._entry_disposition(response_headers, request_headers) 1546 ) 1547 1548 def testExpirationModelOnlyIfCached(self): 1549 response_headers = {} 1550 request_headers = {"cache-control": "only-if-cached"} 1551 self.assertEqual( 1552 "FRESH", httplib2._entry_disposition(response_headers, request_headers) 1553 ) 1554 1555 def testExpirationModelMaxAgeBoth(self): 1556 now = time.time() 1557 response_headers = { 1558 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)), 1559 "cache-control": "max-age=2", 1560 } 1561 request_headers = {"cache-control": "max-age=0"} 1562 self.assertEqual( 1563 "STALE", httplib2._entry_disposition(response_headers, request_headers) 1564 ) 1565 1566 def testExpirationModelDateAndExpiresMinFresh1(self): 1567 now = time.time() 1568 response_headers = { 1569 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)), 1570 "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 2)), 1571 } 1572 request_headers = {"cache-control": "min-fresh=2"} 1573 self.assertEqual( 1574 "STALE", httplib2._entry_disposition(response_headers, request_headers) 1575 ) 1576 1577 def testExpirationModelDateAndExpiresMinFresh2(self): 1578 now = time.time() 1579 response_headers = { 1580 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)), 1581 "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 4)), 1582 } 1583 request_headers = {"cache-control": "min-fresh=2"} 1584 self.assertEqual( 1585 "FRESH", httplib2._entry_disposition(response_headers, request_headers) 1586 ) 1587 1588 def testParseWWWAuthenticateEmpty(self): 1589 res = httplib2._parse_www_authenticate({}) 1590 self.assertEqual(len(list(res.keys())), 0) 1591 1592 def testParseWWWAuthenticate(self): 1593 # different uses of spaces around commas 1594 res = httplib2._parse_www_authenticate( 1595 { 1596 "www-authenticate": 'Test realm="test realm" , foo=foo ,bar="bar", baz=baz,qux=qux' 1597 } 1598 ) 1599 self.assertEqual(len(list(res.keys())), 1) 1600 self.assertEqual(len(list(res["test"].keys())), 5) 1601 1602 # tokens with non-alphanum 1603 res = httplib2._parse_www_authenticate( 1604 {"www-authenticate": 'T*!%#st realm=to*!%#en, to*!%#en="quoted string"'} 1605 ) 1606 self.assertEqual(len(list(res.keys())), 1) 1607 self.assertEqual(len(list(res["t*!%#st"].keys())), 2) 1608 1609 # quoted string with quoted pairs 1610 res = httplib2._parse_www_authenticate( 1611 {"www-authenticate": 'Test realm="a \\"test\\" realm"'} 1612 ) 1613 self.assertEqual(len(list(res.keys())), 1) 1614 self.assertEqual(res["test"]["realm"], 'a "test" realm') 1615 1616 def testParseWWWAuthenticateStrict(self): 1617 httplib2.USE_WWW_AUTH_STRICT_PARSING = 1 1618 self.testParseWWWAuthenticate() 1619 httplib2.USE_WWW_AUTH_STRICT_PARSING = 0 1620 1621 def testParseWWWAuthenticateBasic(self): 1622 res = httplib2._parse_www_authenticate({"www-authenticate": 'Basic realm="me"'}) 1623 basic = res["basic"] 1624 self.assertEqual("me", basic["realm"]) 1625 1626 res = httplib2._parse_www_authenticate( 1627 {"www-authenticate": 'Basic realm="me", algorithm="MD5"'} 1628 ) 1629 basic = res["basic"] 1630 self.assertEqual("me", basic["realm"]) 1631 self.assertEqual("MD5", basic["algorithm"]) 1632 1633 res = httplib2._parse_www_authenticate( 1634 {"www-authenticate": 'Basic realm="me", algorithm=MD5'} 1635 ) 1636 basic = res["basic"] 1637 self.assertEqual("me", basic["realm"]) 1638 self.assertEqual("MD5", basic["algorithm"]) 1639 1640 def testParseWWWAuthenticateBasic2(self): 1641 res = httplib2._parse_www_authenticate( 1642 {"www-authenticate": 'Basic realm="me",other="fred" '} 1643 ) 1644 basic = res["basic"] 1645 self.assertEqual("me", basic["realm"]) 1646 self.assertEqual("fred", basic["other"]) 1647 1648 def testParseWWWAuthenticateBasic3(self): 1649 res = httplib2._parse_www_authenticate( 1650 {"www-authenticate": 'Basic REAlm="me" '} 1651 ) 1652 basic = res["basic"] 1653 self.assertEqual("me", basic["realm"]) 1654 1655 def testParseWWWAuthenticateDigest(self): 1656 res = httplib2._parse_www_authenticate( 1657 { 1658 "www-authenticate": 'Digest realm="[email protected]", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41"' 1659 } 1660 ) 1661 digest = res["digest"] 1662 self.assertEqual("[email protected]", digest["realm"]) 1663 self.assertEqual("auth,auth-int", digest["qop"]) 1664 1665 def testParseWWWAuthenticateMultiple(self): 1666 res = httplib2._parse_www_authenticate( 1667 { 1668 "www-authenticate": 'Digest realm="[email protected]", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41" Basic REAlm="me" ' 1669 } 1670 ) 1671 digest = res["digest"] 1672 self.assertEqual("[email protected]", digest["realm"]) 1673 self.assertEqual("auth,auth-int", digest["qop"]) 1674 self.assertEqual("dcd98b7102dd2f0e8b11d0f600bfb0c093", digest["nonce"]) 1675 self.assertEqual("5ccc069c403ebaf9f0171e9517f40e41", digest["opaque"]) 1676 basic = res["basic"] 1677 self.assertEqual("me", basic["realm"]) 1678 1679 def testParseWWWAuthenticateMultiple2(self): 1680 # Handle an added comma between challenges, which might get thrown in if the challenges were 1681 # originally sent in separate www-authenticate headers. 1682 res = httplib2._parse_www_authenticate( 1683 { 1684 "www-authenticate": 'Digest realm="[email protected]", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me" ' 1685 } 1686 ) 1687 digest = res["digest"] 1688 self.assertEqual("[email protected]", digest["realm"]) 1689 self.assertEqual("auth,auth-int", digest["qop"]) 1690 self.assertEqual("dcd98b7102dd2f0e8b11d0f600bfb0c093", digest["nonce"]) 1691 self.assertEqual("5ccc069c403ebaf9f0171e9517f40e41", digest["opaque"]) 1692 basic = res["basic"] 1693 self.assertEqual("me", basic["realm"]) 1694 1695 def testParseWWWAuthenticateMultiple3(self): 1696 # Handle an added comma between challenges, which might get thrown in if the challenges were 1697 # originally sent in separate www-authenticate headers. 1698 res = httplib2._parse_www_authenticate( 1699 { 1700 "www-authenticate": 'Digest realm="[email protected]", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"' 1701 } 1702 ) 1703 digest = res["digest"] 1704 self.assertEqual("[email protected]", digest["realm"]) 1705 self.assertEqual("auth,auth-int", digest["qop"]) 1706 self.assertEqual("dcd98b7102dd2f0e8b11d0f600bfb0c093", digest["nonce"]) 1707 self.assertEqual("5ccc069c403ebaf9f0171e9517f40e41", digest["opaque"]) 1708 basic = res["basic"] 1709 self.assertEqual("me", basic["realm"]) 1710 wsse = res["wsse"] 1711 self.assertEqual("foo", wsse["realm"]) 1712 self.assertEqual("UsernameToken", wsse["profile"]) 1713 1714 def testParseWWWAuthenticateMultiple4(self): 1715 res = httplib2._parse_www_authenticate( 1716 { 1717 "www-authenticate": 'Digest realm="[email protected]", qop \t=\t"\tauth,auth-int", nonce="(*)&^&$%#",opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"' 1718 } 1719 ) 1720 digest = res["digest"] 1721 self.assertEqual("[email protected]", digest["realm"]) 1722 self.assertEqual("\tauth,auth-int", digest["qop"]) 1723 self.assertEqual("(*)&^&$%#", digest["nonce"]) 1724 1725 def testParseWWWAuthenticateMoreQuoteCombos(self): 1726 res = httplib2._parse_www_authenticate( 1727 { 1728 "www-authenticate": 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true' 1729 } 1730 ) 1731 digest = res["digest"] 1732 self.assertEqual("myrealm", digest["realm"]) 1733 1734 def testParseWWWAuthenticateMalformed(self): 1735 try: 1736 res = httplib2._parse_www_authenticate( 1737 { 1738 "www-authenticate": 'OAuth "Facebook Platform" "invalid_token" "Invalid OAuth access token."' 1739 } 1740 ) 1741 self.fail("should raise an exception") 1742 except httplib2.MalformedHeader: 1743 pass 1744 1745 def testDigestObject(self): 1746 credentials = ("joe", "password") 1747 host = None 1748 request_uri = "/projects/httplib2/test/digest/" 1749 headers = {} 1750 response = { 1751 "www-authenticate": 'Digest realm="myrealm", ' 1752 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", ' 1753 'algorithm=MD5, qop="auth"' 1754 } 1755 content = b"" 1756 1757 d = httplib2.DigestAuthentication( 1758 credentials, host, request_uri, headers, response, content, None 1759 ) 1760 d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46") 1761 our_request = "authorization: %s" % headers["authorization"] 1762 working_request = ( 1763 'authorization: Digest username="joe", realm="myrealm", ' 1764 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",' 1765 ' uri="/projects/httplib2/test/digest/", algorithm=MD5, ' 1766 'response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, ' 1767 'nc=00000001, cnonce="33033375ec278a46"' 1768 ) 1769 self.assertEqual(our_request, working_request) 1770 1771 def testDigestObjectWithOpaque(self): 1772 credentials = ("joe", "password") 1773 host = None 1774 request_uri = "/projects/httplib2/test/digest/" 1775 headers = {} 1776 response = { 1777 "www-authenticate": 'Digest realm="myrealm", ' 1778 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", ' 1779 'algorithm=MD5, qop="auth", opaque="atestopaque"' 1780 } 1781 content = "" 1782 1783 d = httplib2.DigestAuthentication( 1784 credentials, host, request_uri, headers, response, content, None 1785 ) 1786 d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46") 1787 our_request = "authorization: %s" % headers["authorization"] 1788 working_request = ( 1789 'authorization: Digest username="joe", realm="myrealm", ' 1790 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",' 1791 ' uri="/projects/httplib2/test/digest/", algorithm=MD5, ' 1792 'response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, ' 1793 'nc=00000001, cnonce="33033375ec278a46", ' 1794 'opaque="atestopaque"' 1795 ) 1796 self.assertEqual(our_request, working_request) 1797 1798 def testDigestObjectStale(self): 1799 credentials = ("joe", "password") 1800 host = None 1801 request_uri = "/projects/httplib2/test/digest/" 1802 headers = {} 1803 response = httplib2.Response({}) 1804 response["www-authenticate"] = ( 1805 'Digest realm="myrealm", ' 1806 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",' 1807 ' algorithm=MD5, qop="auth", stale=true' 1808 ) 1809 response.status = 401 1810 content = b"" 1811 d = httplib2.DigestAuthentication( 1812 credentials, host, request_uri, headers, response, content, None 1813 ) 1814 # Returns true to force a retry 1815 self.assertTrue(d.response(response, content)) 1816 1817 def testDigestObjectAuthInfo(self): 1818 credentials = ("joe", "password") 1819 host = None 1820 request_uri = "/projects/httplib2/test/digest/" 1821 headers = {} 1822 response = httplib2.Response({}) 1823 response["www-authenticate"] = ( 1824 'Digest realm="myrealm", ' 1825 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",' 1826 ' algorithm=MD5, qop="auth", stale=true' 1827 ) 1828 response["authentication-info"] = 'nextnonce="fred"' 1829 content = b"" 1830 d = httplib2.DigestAuthentication( 1831 credentials, host, request_uri, headers, response, content, None 1832 ) 1833 # Returns true to force a retry 1834 self.assertFalse(d.response(response, content)) 1835 self.assertEqual("fred", d.challenge["nonce"]) 1836 self.assertEqual(1, d.challenge["nc"]) 1837 1838 def testWsseAlgorithm(self): 1839 digest = httplib2._wsse_username_token( 1840 "d36e316282959a9ed4c89851497a717f", "2003-12-15T14:43:07Z", "taadtaadpstcsm" 1841 ) 1842 expected = b"quR/EWLAV4xLf9Zqyw4pDmfV9OY=" 1843 self.assertEqual(expected, digest) 1844 1845 def testEnd2End(self): 1846 # one end to end header 1847 response = {"content-type": "application/atom+xml", "te": "deflate"} 1848 end2end = httplib2._get_end2end_headers(response) 1849 self.assertTrue("content-type" in end2end) 1850 self.assertTrue("te" not in end2end) 1851 self.assertTrue("connection" not in end2end) 1852 1853 # one end to end header that gets eliminated 1854 response = { 1855 "connection": "content-type", 1856 "content-type": "application/atom+xml", 1857 "te": "deflate", 1858 } 1859 end2end = httplib2._get_end2end_headers(response) 1860 self.assertTrue("content-type" not in end2end) 1861 self.assertTrue("te" not in end2end) 1862 self.assertTrue("connection" not in end2end) 1863 1864 # Degenerate case of no headers 1865 response = {} 1866 end2end = httplib2._get_end2end_headers(response) 1867 self.assertEqual(0, len(end2end)) 1868 1869 # Degenerate case of connection referrring to a header not passed in 1870 response = {"connection": "content-type"} 1871 end2end = httplib2._get_end2end_headers(response) 1872 self.assertEqual(0, len(end2end)) 1873 1874 1875class TestProxyInfo(unittest.TestCase): 1876 def setUp(self): 1877 self.orig_env = dict(os.environ) 1878 1879 def tearDown(self): 1880 os.environ.clear() 1881 os.environ.update(self.orig_env) 1882 1883 def test_from_url(self): 1884 pi = httplib2.proxy_info_from_url("http://myproxy.example.com") 1885 self.assertEqual(pi.proxy_host, "myproxy.example.com") 1886 self.assertEqual(pi.proxy_port, 80) 1887 self.assertEqual(pi.proxy_user, None) 1888 1889 def test_from_url_ident(self): 1890 pi = httplib2.proxy_info_from_url("http://zoidberg:fish@someproxy:99") 1891 self.assertEqual(pi.proxy_host, "someproxy") 1892 self.assertEqual(pi.proxy_port, 99) 1893 self.assertEqual(pi.proxy_user, "zoidberg") 1894 self.assertEqual(pi.proxy_pass, "fish") 1895 1896 def test_from_env(self): 1897 os.environ["http_proxy"] = "http://myproxy.example.com:8080" 1898 pi = httplib2.proxy_info_from_environment() 1899 self.assertEqual(pi.proxy_host, "myproxy.example.com") 1900 self.assertEqual(pi.proxy_port, 8080) 1901 1902 def test_from_env_no_proxy(self): 1903 os.environ["http_proxy"] = "http://myproxy.example.com:80" 1904 os.environ["https_proxy"] = "http://myproxy.example.com:81" 1905 pi = httplib2.proxy_info_from_environment("https") 1906 self.assertEqual(pi.proxy_host, "myproxy.example.com") 1907 self.assertEqual(pi.proxy_port, 81) 1908 1909 def test_from_env_none(self): 1910 os.environ.clear() 1911 pi = httplib2.proxy_info_from_environment() 1912 self.assertEqual(pi, None) 1913 1914 def test_proxy_headers(self): 1915 headers = {"key0": "val0", "key1": "val1"} 1916 pi = httplib2.ProxyInfo( 1917 httplib2.socks.PROXY_TYPE_HTTP, "localhost", 1234, proxy_headers=headers 1918 ) 1919 self.assertEqual(pi.proxy_headers, headers) 1920 1921 # regression: ensure that httplib2.HTTPConnectionWithTimeout initializes when proxy_info is not supplied 1922 def test_proxy_init(self): 1923 connection = httplib2.HTTPConnectionWithTimeout("www.google.com", 80) 1924 connection.request("GET", "/") 1925 connection.close() 1926 1927 1928if __name__ == "__main__": 1929 unittest.main() 1930