1#!/usr/bin/env python2.4 2"""A set of unit tests for httplib2.py.""" 3 4__author__ = "Joe Gregorio ([email protected])" 5__copyright__ = "Copyright 2006, Joe Gregorio" 6__contributors__ = [] 7__license__ = "MIT" 8__version__ = "0.1 ($Rev: 118 $)" 9 10import base64 11import httplib 12import httplib2 13import os 14import pickle 15import socket 16import StringIO 17import sys 18import time 19import unittest 20import urlparse 21 22try: 23 import ssl 24except ImportError: 25 pass 26 27# Python 2.3 support 28if not hasattr(unittest.TestCase, "assertTrue"): 29 unittest.TestCase.assertTrue = unittest.TestCase.failUnless 30 unittest.TestCase.assertFalse = unittest.TestCase.failIf 31 32# The test resources base uri 33base = "http://bitworking.org/projects/httplib2/test/" 34# base = 'http://localhost/projects/httplib2/test/' 35cacheDirName = ".cache" 36 37 38class CredentialsTest(unittest.TestCase): 39 def test(self): 40 c = httplib2.Credentials() 41 c.add("joe", "password") 42 self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0]) 43 self.assertEqual(("joe", "password"), list(c.iter(""))[0]) 44 c.add("fred", "password2", "wellformedweb.org") 45 self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0]) 46 self.assertEqual(1, len(list(c.iter("bitworking.org")))) 47 self.assertEqual(2, len(list(c.iter("wellformedweb.org")))) 48 self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org"))) 49 c.clear() 50 self.assertEqual(0, len(list(c.iter("bitworking.org")))) 51 c.add("fred", "password2", "wellformedweb.org") 52 self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org"))) 53 self.assertEqual(0, len(list(c.iter("bitworking.org")))) 54 self.assertEqual(0, len(list(c.iter("")))) 55 56 57class ParserTest(unittest.TestCase): 58 def testFromStd66(self): 59 self.assertEqual( 60 ("http", "example.com", "", None, None), 61 httplib2.parse_uri("http://example.com"), 62 ) 63 self.assertEqual( 64 ("https", "example.com", "", None, None), 65 httplib2.parse_uri("https://example.com"), 66 ) 67 self.assertEqual( 68 ("https", "example.com:8080", "", None, None), 69 httplib2.parse_uri("https://example.com:8080"), 70 ) 71 self.assertEqual( 72 ("http", "example.com", "/", None, None), 73 httplib2.parse_uri("http://example.com/"), 74 ) 75 self.assertEqual( 76 ("http", "example.com", "/path", None, None), 77 httplib2.parse_uri("http://example.com/path"), 78 ) 79 self.assertEqual( 80 ("http", "example.com", "/path", "a=1&b=2", None), 81 httplib2.parse_uri("http://example.com/path?a=1&b=2"), 82 ) 83 self.assertEqual( 84 ("http", "example.com", "/path", "a=1&b=2", "fred"), 85 httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"), 86 ) 87 self.assertEqual( 88 ("http", "example.com", "/path", "a=1&b=2", "fred"), 89 httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"), 90 ) 91 92 93class UrlNormTest(unittest.TestCase): 94 def test(self): 95 self.assertEqual( 96 "http://example.org/", httplib2.urlnorm("http://example.org")[-1] 97 ) 98 self.assertEqual( 99 "http://example.org/", httplib2.urlnorm("http://EXAMple.org")[-1] 100 ) 101 self.assertEqual( 102 "http://example.org/?=b", httplib2.urlnorm("http://EXAMple.org?=b")[-1] 103 ) 104 self.assertEqual( 105 "http://example.org/mypath?a=b", 106 httplib2.urlnorm("http://EXAMple.org/mypath?a=b")[-1], 107 ) 108 self.assertEqual( 109 "http://localhost:80/", httplib2.urlnorm("http://localhost:80")[-1] 110 ) 111 self.assertEqual( 112 httplib2.urlnorm("http://localhost:80/"), 113 httplib2.urlnorm("HTTP://LOCALHOST:80"), 114 ) 115 try: 116 httplib2.urlnorm("/") 117 self.fail("Non-absolute URIs should raise an exception") 118 except httplib2.RelativeURIError: 119 pass 120 121 122class UrlSafenameTest(unittest.TestCase): 123 def test(self): 124 # Test that different URIs end up generating different safe names 125 self.assertEqual( 126 "example.org,fred,a=b,58489f63a7a83c3b7794a6a398ee8b1f", 127 httplib2.safename("http://example.org/fred/?a=b"), 128 ) 129 self.assertEqual( 130 "example.org,fred,a=b,8c5946d56fec453071f43329ff0be46b", 131 httplib2.safename("http://example.org/fred?/a=b"), 132 ) 133 self.assertEqual( 134 "www.example.org,fred,a=b,499c44b8d844a011b67ea2c015116968", 135 httplib2.safename("http://www.example.org/fred?/a=b"), 136 ) 137 self.assertEqual( 138 httplib2.safename(httplib2.urlnorm("http://www")[-1]), 139 httplib2.safename(httplib2.urlnorm("http://WWW")[-1]), 140 ) 141 self.assertEqual( 142 "www.example.org,fred,a=b,692e843a333484ce0095b070497ab45d", 143 httplib2.safename("https://www.example.org/fred?/a=b"), 144 ) 145 self.assertNotEqual( 146 httplib2.safename("http://www"), httplib2.safename("https://www") 147 ) 148 # Test the max length limits 149 uri = "http://" + ("w" * 200) + ".org" 150 uri2 = "http://" + ("w" * 201) + ".org" 151 self.assertNotEqual(httplib2.safename(uri2), httplib2.safename(uri)) 152 # Max length should be 200 + 1 (",") + 32 153 self.assertEqual(233, len(httplib2.safename(uri2))) 154 self.assertEqual(233, len(httplib2.safename(uri))) 155 # Unicode 156 if sys.version_info >= (2, 3): 157 self.assertEqual( 158 "xn--http,-4y1d.org,fred,a=b,579924c35db315e5a32e3d9963388193", 159 httplib2.safename(u"http://\u2304.org/fred/?a=b"), 160 ) 161 162 163class _MyResponse(StringIO.StringIO): 164 def __init__(self, body, **kwargs): 165 StringIO.StringIO.__init__(self, body) 166 self.headers = kwargs 167 168 def iteritems(self): 169 return self.headers.iteritems() 170 171 172class _MyHTTPConnection(object): 173 "This class is just a mock of httplib.HTTPConnection used for testing" 174 175 def __init__( 176 self, 177 host, 178 port=None, 179 key_file=None, 180 cert_file=None, 181 strict=None, 182 timeout=None, 183 proxy_info=None, 184 ): 185 self.host = host 186 self.port = port 187 self.timeout = timeout 188 self.log = "" 189 self.sock = None 190 191 def set_debuglevel(self, level): 192 pass 193 194 def connect(self): 195 "Connect to a host on a given port." 196 pass 197 198 def close(self): 199 pass 200 201 def request(self, method, request_uri, body, headers): 202 pass 203 204 def getresponse(self): 205 return _MyResponse("the body", status="200") 206 207 208class _MyHTTPBadStatusConnection(object): 209 "Mock of httplib.HTTPConnection that raises BadStatusLine." 210 211 num_calls = 0 212 213 def __init__( 214 self, 215 host, 216 port=None, 217 key_file=None, 218 cert_file=None, 219 strict=None, 220 timeout=None, 221 proxy_info=None, 222 ): 223 self.host = host 224 self.port = port 225 self.timeout = timeout 226 self.log = "" 227 self.sock = None 228 _MyHTTPBadStatusConnection.num_calls = 0 229 230 def set_debuglevel(self, level): 231 pass 232 233 def connect(self): 234 pass 235 236 def close(self): 237 pass 238 239 def request(self, method, request_uri, body, headers): 240 pass 241 242 def getresponse(self): 243 _MyHTTPBadStatusConnection.num_calls += 1 244 raise httplib.BadStatusLine("") 245 246 247class HttpTest(unittest.TestCase): 248 def setUp(self): 249 if os.path.exists(cacheDirName): 250 [ 251 os.remove(os.path.join(cacheDirName, file)) 252 for file in os.listdir(cacheDirName) 253 ] 254 255 if sys.version_info < (2, 6): 256 disable_cert_validation = True 257 else: 258 disable_cert_validation = False 259 self.http = httplib2.Http( 260 cacheDirName, disable_ssl_certificate_validation=disable_cert_validation 261 ) 262 self.http.clear_credentials() 263 264 def testIPv6NoSSL(self): 265 try: 266 self.http.request("http://[::1]/") 267 except socket.gaierror: 268 self.fail("should get the address family right for IPv6") 269 except socket.error: 270 # Even if IPv6 isn't installed on a machine it should just raise socket.error 271 pass 272 273 def testIPv6SSL(self): 274 try: 275 self.http.request("https://[::1]/") 276 except socket.gaierror: 277 self.fail("should get the address family right for IPv6") 278 except httplib2.CertificateHostnameMismatch: 279 # We connected and verified that the certificate doesn't match 280 # the name. Good enough. 281 pass 282 except socket.error: 283 # Even if IPv6 isn't installed on a machine it should just raise socket.error 284 pass 285 286 def testConnectionType(self): 287 self.http.force_exception_to_status_code = False 288 response, content = self.http.request( 289 "http://bitworking.org", connection_type=_MyHTTPConnection 290 ) 291 self.assertEqual(response["content-location"], "http://bitworking.org") 292 self.assertEqual(content, "the body") 293 294 def testBadStatusLineRetry(self): 295 old_retries = httplib2.RETRIES 296 httplib2.RETRIES = 1 297 self.http.force_exception_to_status_code = False 298 try: 299 response, content = self.http.request( 300 "http://bitworking.org", connection_type=_MyHTTPBadStatusConnection 301 ) 302 except httplib.BadStatusLine: 303 self.assertEqual(2, _MyHTTPBadStatusConnection.num_calls) 304 httplib2.RETRIES = old_retries 305 306 def testGetUnknownServer(self): 307 self.http.force_exception_to_status_code = False 308 try: 309 self.http.request("http://fred.bitworking.org/") 310 self.fail( 311 "An httplib2.ServerNotFoundError Exception must be thrown on an unresolvable server." 312 ) 313 except httplib2.ServerNotFoundError: 314 pass 315 316 # Now test with exceptions turned off 317 self.http.force_exception_to_status_code = True 318 319 (response, content) = self.http.request("http://fred.bitworking.org/") 320 self.assertEqual(response["content-type"], "text/plain") 321 self.assertTrue(content.startswith("Unable to find")) 322 self.assertEqual(response.status, 400) 323 324 def testGetConnectionRefused(self): 325 self.http.force_exception_to_status_code = False 326 try: 327 self.http.request("http://localhost:7777/") 328 self.fail("An socket.error exception must be thrown on Connection Refused.") 329 except socket.error: 330 pass 331 332 # Now test with exceptions turned off 333 self.http.force_exception_to_status_code = True 334 335 (response, content) = self.http.request("http://localhost:7777/") 336 self.assertEqual(response["content-type"], "text/plain") 337 self.assertTrue( 338 "Connection refused" in content or "actively refused" in content, 339 "Unexpected status %(content)s" % vars(), 340 ) 341 self.assertEqual(response.status, 400) 342 343 def testGetIRI(self): 344 if sys.version_info >= (2, 3): 345 uri = urlparse.urljoin( 346 base, u"reflector/reflector.cgi?d=\N{CYRILLIC CAPITAL LETTER DJE}" 347 ) 348 (response, content) = self.http.request(uri, "GET") 349 d = self.reflector(content) 350 self.assertTrue("QUERY_STRING" in d) 351 self.assertTrue(d["QUERY_STRING"].find("%D0%82") > 0) 352 353 def testGetIsDefaultMethod(self): 354 # Test that GET is the default method 355 uri = urlparse.urljoin(base, "methods/method_reflector.cgi") 356 (response, content) = self.http.request(uri) 357 self.assertEqual(response["x-method"], "GET") 358 359 def testDifferentMethods(self): 360 # Test that all methods can be used 361 uri = urlparse.urljoin(base, "methods/method_reflector.cgi") 362 for method in ["GET", "PUT", "DELETE", "POST"]: 363 (response, content) = self.http.request(uri, method, body=" ") 364 self.assertEqual(response["x-method"], method) 365 366 def testHeadRead(self): 367 # Test that we don't try to read the response of a HEAD request 368 # since httplib blocks response.read() for HEAD requests. 369 # Oddly enough this doesn't appear as a problem when doing HEAD requests 370 # against Apache servers. 371 uri = "http://www.google.com/" 372 (response, content) = self.http.request(uri, "HEAD") 373 self.assertEqual(response.status, 200) 374 self.assertEqual(content, "") 375 376 def testGetNoCache(self): 377 # Test that can do a GET w/o the cache turned on. 378 http = httplib2.Http() 379 uri = urlparse.urljoin(base, "304/test_etag.txt") 380 (response, content) = http.request(uri, "GET") 381 self.assertEqual(response.status, 200) 382 self.assertEqual(response.previous, None) 383 384 def testGetOnlyIfCachedCacheHit(self): 385 # Test that can do a GET with cache and 'only-if-cached' 386 uri = urlparse.urljoin(base, "304/test_etag.txt") 387 (response, content) = self.http.request(uri, "GET") 388 (response, content) = self.http.request( 389 uri, "GET", headers={"cache-control": "only-if-cached"} 390 ) 391 self.assertEqual(response.fromcache, True) 392 self.assertEqual(response.status, 200) 393 394 def testGetOnlyIfCachedCacheMiss(self): 395 # Test that can do a GET with no cache with 'only-if-cached' 396 uri = urlparse.urljoin(base, "304/test_etag.txt") 397 (response, content) = self.http.request( 398 uri, "GET", headers={"cache-control": "only-if-cached"} 399 ) 400 self.assertEqual(response.fromcache, False) 401 self.assertEqual(response.status, 504) 402 403 def testGetOnlyIfCachedNoCacheAtAll(self): 404 # Test that can do a GET with no cache with 'only-if-cached' 405 # Of course, there might be an intermediary beyond us 406 # that responds to the 'only-if-cached', so this 407 # test can't really be guaranteed to pass. 408 http = httplib2.Http() 409 uri = urlparse.urljoin(base, "304/test_etag.txt") 410 (response, content) = http.request( 411 uri, "GET", headers={"cache-control": "only-if-cached"} 412 ) 413 self.assertEqual(response.fromcache, False) 414 self.assertEqual(response.status, 504) 415 416 def testUserAgent(self): 417 # Test that we provide a default user-agent 418 uri = urlparse.urljoin(base, "user-agent/test.cgi") 419 (response, content) = self.http.request(uri, "GET") 420 self.assertEqual(response.status, 200) 421 self.assertTrue(content.startswith("Python-httplib2/")) 422 423 def testUserAgentNonDefault(self): 424 # Test that the default user-agent can be over-ridden 425 426 uri = urlparse.urljoin(base, "user-agent/test.cgi") 427 (response, content) = self.http.request( 428 uri, "GET", headers={"User-Agent": "fred/1.0"} 429 ) 430 self.assertEqual(response.status, 200) 431 self.assertTrue(content.startswith("fred/1.0")) 432 433 def testGet300WithLocation(self): 434 # Test the we automatically follow 300 redirects if a Location: header is provided 435 uri = urlparse.urljoin(base, "300/with-location-header.asis") 436 (response, content) = self.http.request(uri, "GET") 437 self.assertEqual(response.status, 200) 438 self.assertEqual(content, "This is the final destination.\n") 439 self.assertEqual(response.previous.status, 300) 440 self.assertEqual(response.previous.fromcache, False) 441 442 # Confirm that the intermediate 300 is not cached 443 (response, content) = self.http.request(uri, "GET") 444 self.assertEqual(response.status, 200) 445 self.assertEqual(content, "This is the final destination.\n") 446 self.assertEqual(response.previous.status, 300) 447 self.assertEqual(response.previous.fromcache, False) 448 449 def testGet300WithLocationNoRedirect(self): 450 # Test the we automatically follow 300 redirects if a Location: header is provided 451 self.http.follow_redirects = False 452 uri = urlparse.urljoin(base, "300/with-location-header.asis") 453 (response, content) = self.http.request(uri, "GET") 454 self.assertEqual(response.status, 300) 455 456 def testGet300WithoutLocation(self): 457 # Not giving a Location: header in a 300 response is acceptable 458 # In which case we just return the 300 response 459 uri = urlparse.urljoin(base, "300/without-location-header.asis") 460 (response, content) = self.http.request(uri, "GET") 461 self.assertEqual(response.status, 300) 462 self.assertTrue(response["content-type"].startswith("text/html")) 463 self.assertEqual(response.previous, None) 464 465 def testGet301(self): 466 # Test that we automatically follow 301 redirects 467 # and that we cache the 301 response 468 uri = urlparse.urljoin(base, "301/onestep.asis") 469 destination = urlparse.urljoin(base, "302/final-destination.txt") 470 (response, content) = self.http.request(uri, "GET") 471 self.assertEqual(response.status, 200) 472 self.assertTrue("content-location" in response) 473 self.assertEqual(response["content-location"], destination) 474 self.assertEqual(content, "This is the final destination.\n") 475 self.assertEqual(response.previous.status, 301) 476 self.assertEqual(response.previous.fromcache, False) 477 478 (response, content) = self.http.request(uri, "GET") 479 self.assertEqual(response.status, 200) 480 self.assertEqual(response["content-location"], destination) 481 self.assertEqual(content, "This is the final destination.\n") 482 self.assertEqual(response.previous.status, 301) 483 self.assertEqual(response.previous.fromcache, True) 484 485 def testHead301(self): 486 # Test that we automatically follow 301 redirects 487 uri = urlparse.urljoin(base, "301/onestep.asis") 488 destination = urlparse.urljoin(base, "302/final-destination.txt") 489 (response, content) = self.http.request(uri, "HEAD") 490 self.assertEqual(response.status, 200) 491 self.assertEqual(response.previous.status, 301) 492 self.assertEqual(response.previous.fromcache, False) 493 494 def testGet301NoRedirect(self): 495 # Test that we automatically follow 301 redirects 496 # and that we cache the 301 response 497 self.http.follow_redirects = False 498 uri = urlparse.urljoin(base, "301/onestep.asis") 499 destination = urlparse.urljoin(base, "302/final-destination.txt") 500 (response, content) = self.http.request(uri, "GET") 501 self.assertEqual(response.status, 301) 502 503 def testGet302(self): 504 # Test that we automatically follow 302 redirects 505 # and that we DO NOT cache the 302 response 506 uri = urlparse.urljoin(base, "302/onestep.asis") 507 destination = urlparse.urljoin(base, "302/final-destination.txt") 508 (response, content) = self.http.request(uri, "GET") 509 self.assertEqual(response.status, 200) 510 self.assertEqual(response["content-location"], destination) 511 self.assertEqual(content, "This is the final destination.\n") 512 self.assertEqual(response.previous.status, 302) 513 self.assertEqual(response.previous.fromcache, False) 514 515 uri = urlparse.urljoin(base, "302/onestep.asis") 516 (response, content) = self.http.request(uri, "GET") 517 self.assertEqual(response.status, 200) 518 self.assertEqual(response.fromcache, True) 519 self.assertEqual(response["content-location"], destination) 520 self.assertEqual(content, "This is the final destination.\n") 521 self.assertEqual(response.previous.status, 302) 522 self.assertEqual(response.previous.fromcache, False) 523 self.assertEqual(response.previous["content-location"], uri) 524 525 uri = urlparse.urljoin(base, "302/twostep.asis") 526 527 (response, content) = self.http.request(uri, "GET") 528 self.assertEqual(response.status, 200) 529 self.assertEqual(response.fromcache, True) 530 self.assertEqual(content, "This is the final destination.\n") 531 self.assertEqual(response.previous.status, 302) 532 self.assertEqual(response.previous.fromcache, False) 533 534 def testGet302RedirectionLimit(self): 535 # Test that we can set a lower redirection limit 536 # and that we raise an exception when we exceed 537 # that limit. 538 self.http.force_exception_to_status_code = False 539 540 uri = urlparse.urljoin(base, "302/twostep.asis") 541 try: 542 (response, content) = self.http.request(uri, "GET", redirections=1) 543 self.fail("This should not happen") 544 except httplib2.RedirectLimit: 545 pass 546 except Exception as e: 547 self.fail("Threw wrong kind of exception ") 548 549 # Re-run the test with out the exceptions 550 self.http.force_exception_to_status_code = True 551 552 (response, content) = self.http.request(uri, "GET", redirections=1) 553 self.assertEqual(response.status, 500) 554 self.assertTrue(response.reason.startswith("Redirected more")) 555 self.assertEqual("302", response["status"]) 556 self.assertTrue(content.startswith("<html>")) 557 self.assertTrue(response.previous != None) 558 559 def testGet302NoLocation(self): 560 # Test that we throw an exception when we get 561 # a 302 with no Location: header. 562 self.http.force_exception_to_status_code = False 563 uri = urlparse.urljoin(base, "302/no-location.asis") 564 try: 565 (response, content) = self.http.request(uri, "GET") 566 self.fail("Should never reach here") 567 except httplib2.RedirectMissingLocation: 568 pass 569 except Exception as e: 570 self.fail("Threw wrong kind of exception ") 571 572 # Re-run the test with out the exceptions 573 self.http.force_exception_to_status_code = True 574 575 (response, content) = self.http.request(uri, "GET") 576 self.assertEqual(response.status, 500) 577 self.assertTrue(response.reason.startswith("Redirected but")) 578 self.assertEqual("302", response["status"]) 579 self.assertTrue(content.startswith("This is content")) 580 581 def testGet301ViaHttps(self): 582 # Google always redirects to https://www.google.com 583 (response, content) = self.http.request("https://code.google.com/apis/", "GET") 584 self.assertEqual(200, response.status) 585 self.assertEqual(301, response.previous.status) 586 587 def testGetViaHttps(self): 588 # Test that we can handle HTTPS 589 (response, content) = self.http.request( 590 "https://www.google.com/adsense/", "GET" 591 ) 592 self.assertEqual(200, response.status) 593 594 def testGetViaHttpsSpecViolationOnLocation(self): 595 # Test that we follow redirects through HTTPS 596 # even if they violate the spec by including 597 # a relative Location: header instead of an 598 # absolute one. 599 (response, content) = self.http.request("https://www.google.com/adsense", "GET") 600 self.assertEqual(200, response.status) 601 self.assertNotEqual(None, response.previous) 602 603 def testSslCertValidationDoubleDots(self): 604 pass 605 # No longer a valid test. 606 # if sys.version_info >= (2, 6): 607 # Test that we get match a double dot cert 608 # try: 609 # self.http.request("https://www.appspot.com/", "GET") 610 # except httplib2.CertificateHostnameMismatch: 611 # self.fail('cert with *.*.appspot.com should not raise an exception.') 612 613 def testSslHostnameValidation(self): 614 pass 615 # No longer a valid test. 616 # if sys.version_info >= (2, 6): 617 # The SSL server at google.com:443 returns a certificate for 618 # 'www.google.com', which results in a host name mismatch. 619 # Note that this test only works because the ssl module and httplib2 620 # do not support SNI; for requests specifying a server name of 621 # 'google.com' via SNI, a matching cert would be returned. 622 # self.assertRaises(httplib2.CertificateHostnameMismatch, 623 # self.http.request, "https://google.com/", "GET") 624 625 def testSslCertValidationWithoutSslModuleFails(self): 626 if sys.version_info < (2, 6): 627 http = httplib2.Http(disable_ssl_certificate_validation=False) 628 self.assertRaises( 629 httplib2.CertificateValidationUnsupported, 630 http.request, 631 "https://www.google.com/", 632 "GET", 633 ) 634 635 def testGetViaHttpsKeyCert(self): 636 # At this point I can only test 637 # that the key and cert files are passed in 638 # correctly to httplib. It would be nice to have 639 # a real https endpoint to test against. 640 641 # bitworking.org presents an certificate for a non-matching host 642 # (*.webfaction.com), so we need to disable cert checking for this test. 643 http = httplib2.Http(timeout=2, disable_ssl_certificate_validation=True) 644 645 http.add_certificate("akeyfile", "acertfile", "bitworking.org") 646 try: 647 (response, content) = http.request("https://bitworking.org", "GET") 648 except: 649 pass 650 self.assertEqual(http.connections["https:bitworking.org"].key_file, "akeyfile") 651 self.assertEqual( 652 http.connections["https:bitworking.org"].cert_file, "acertfile" 653 ) 654 655 try: 656 (response, content) = http.request("https://notthere.bitworking.org", "GET") 657 except: 658 pass 659 self.assertEqual( 660 http.connections["https:notthere.bitworking.org"].key_file, None 661 ) 662 self.assertEqual( 663 http.connections["https:notthere.bitworking.org"].cert_file, None 664 ) 665 666 def testGet303(self): 667 # Do a follow-up GET on a Location: header 668 # returned from a POST that gave a 303. 669 uri = urlparse.urljoin(base, "303/303.cgi") 670 (response, content) = self.http.request(uri, "POST", " ") 671 self.assertEqual(response.status, 200) 672 self.assertEqual(content, "This is the final destination.\n") 673 self.assertEqual(response.previous.status, 303) 674 675 def testGet303NoRedirect(self): 676 # Do a follow-up GET on a Location: header 677 # returned from a POST that gave a 303. 678 self.http.follow_redirects = False 679 uri = urlparse.urljoin(base, "303/303.cgi") 680 (response, content) = self.http.request(uri, "POST", " ") 681 self.assertEqual(response.status, 303) 682 683 def test303ForDifferentMethods(self): 684 # Test that all methods can be used 685 uri = urlparse.urljoin(base, "303/redirect-to-reflector.cgi") 686 for (method, method_on_303) in [ 687 ("PUT", "GET"), 688 ("DELETE", "GET"), 689 ("POST", "GET"), 690 ("GET", "GET"), 691 ("HEAD", "GET"), 692 ]: 693 (response, content) = self.http.request(uri, method, body=" ") 694 self.assertEqual(response["x-method"], method_on_303) 695 696 def test303AndForwardAuthorizationHeader(self): 697 # Test that all methods can be used 698 uri = urlparse.urljoin(base, "303/redirect-to-header-reflector.cgi") 699 headers = {"authorization": "Bearer foo"} 700 response, content = self.http.request(uri, "GET", body=" ", headers=headers) 701 # self.assertTrue('authorization' not in content) 702 self.http.follow_all_redirects = True 703 self.http.forward_authorization_headers = True 704 response, content = self.http.request(uri, "GET", body=" ", headers=headers) 705 # Oh, how I wish Apache didn't eat the Authorization header. 706 # self.assertTrue('authorization' in content) 707 708 def testGet304(self): 709 # Test that we use ETags properly to validate our cache 710 uri = urlparse.urljoin(base, "304/test_etag.txt") 711 (response, content) = self.http.request( 712 uri, "GET", headers={"accept-encoding": "identity"} 713 ) 714 self.assertNotEqual(response["etag"], "") 715 716 (response, content) = self.http.request(uri, "GET") 717 (response, content) = self.http.request( 718 uri, "GET", headers={"cache-control": "must-revalidate"} 719 ) 720 self.assertEqual(response.status, 200) 721 self.assertEqual(response.fromcache, True) 722 723 cache_file_name = os.path.join( 724 cacheDirName, httplib2.safename(httplib2.urlnorm(uri)[-1]) 725 ) 726 f = open(cache_file_name, "r") 727 status_line = f.readline() 728 f.close() 729 730 self.assertTrue(status_line.startswith("status:")) 731 732 (response, content) = self.http.request(uri, "HEAD") 733 self.assertEqual(response.status, 200) 734 self.assertEqual(response.fromcache, True) 735 736 (response, content) = self.http.request( 737 uri, "GET", headers={"range": "bytes=0-0"} 738 ) 739 self.assertEqual(response.status, 206) 740 self.assertEqual(response.fromcache, False) 741 742 def testGetIgnoreEtag(self): 743 # Test that we can forcibly ignore ETags 744 uri = urlparse.urljoin(base, "reflector/reflector.cgi") 745 (response, content) = self.http.request( 746 uri, "GET", headers={"accept-encoding": "identity"} 747 ) 748 self.assertNotEqual(response["etag"], "") 749 750 (response, content) = self.http.request( 751 uri, 752 "GET", 753 headers={"accept-encoding": "identity", "cache-control": "max-age=0"}, 754 ) 755 d = self.reflector(content) 756 self.assertTrue("HTTP_IF_NONE_MATCH" in d) 757 758 self.http.ignore_etag = True 759 (response, content) = self.http.request( 760 uri, 761 "GET", 762 headers={"accept-encoding": "identity", "cache-control": "max-age=0"}, 763 ) 764 d = self.reflector(content) 765 self.assertEqual(response.fromcache, False) 766 self.assertFalse("HTTP_IF_NONE_MATCH" in d) 767 768 def testOverrideEtag(self): 769 # Test that we can forcibly ignore ETags 770 uri = urlparse.urljoin(base, "reflector/reflector.cgi") 771 (response, content) = self.http.request( 772 uri, "GET", headers={"accept-encoding": "identity"} 773 ) 774 self.assertNotEqual(response["etag"], "") 775 776 (response, content) = self.http.request( 777 uri, 778 "GET", 779 headers={"accept-encoding": "identity", "cache-control": "max-age=0"}, 780 ) 781 d = self.reflector(content) 782 self.assertTrue("HTTP_IF_NONE_MATCH" in d) 783 self.assertNotEqual(d["HTTP_IF_NONE_MATCH"], "fred") 784 785 (response, content) = self.http.request( 786 uri, 787 "GET", 788 headers={ 789 "accept-encoding": "identity", 790 "cache-control": "max-age=0", 791 "if-none-match": "fred", 792 }, 793 ) 794 d = self.reflector(content) 795 self.assertTrue("HTTP_IF_NONE_MATCH" in d) 796 self.assertEqual(d["HTTP_IF_NONE_MATCH"], "fred") 797 798 # MAP-commented this out because it consistently fails 799 # def testGet304EndToEnd(self): 800 # # Test that end to end headers get overwritten in the cache 801 # uri = urlparse.urljoin(base, "304/end2end.cgi") 802 # (response, content) = self.http.request(uri, "GET") 803 # self.assertNotEqual(response['etag'], "") 804 # old_date = response['date'] 805 # time.sleep(2) 806 # 807 # (response, content) = self.http.request(uri, "GET", headers = {'Cache-Control': 'max-age=0'}) 808 # # The response should be from the cache, but the Date: header should be updated. 809 # new_date = response['date'] 810 # self.assertNotEqual(new_date, old_date) 811 # self.assertEqual(response.status, 200) 812 # self.assertEqual(response.fromcache, True) 813 814 def testGet304LastModified(self): 815 # Test that we can still handle a 304 816 # by only using the last-modified cache validator. 817 uri = urlparse.urljoin(base, "304/last-modified-only/last-modified-only.txt") 818 (response, content) = self.http.request(uri, "GET") 819 820 self.assertNotEqual(response["last-modified"], "") 821 (response, content) = self.http.request(uri, "GET") 822 (response, content) = self.http.request(uri, "GET") 823 self.assertEqual(response.status, 200) 824 self.assertEqual(response.fromcache, True) 825 826 def testGet307(self): 827 # Test that we do follow 307 redirects but 828 # do not cache the 307 829 uri = urlparse.urljoin(base, "307/onestep.asis") 830 (response, content) = self.http.request(uri, "GET") 831 self.assertEqual(response.status, 200) 832 self.assertEqual(content, "This is the final destination.\n") 833 self.assertEqual(response.previous.status, 307) 834 self.assertEqual(response.previous.fromcache, False) 835 836 (response, content) = self.http.request(uri, "GET") 837 self.assertEqual(response.status, 200) 838 self.assertEqual(response.fromcache, True) 839 self.assertEqual(content, "This is the final destination.\n") 840 self.assertEqual(response.previous.status, 307) 841 self.assertEqual(response.previous.fromcache, False) 842 843 def testGet410(self): 844 # Test that we pass 410's through 845 uri = urlparse.urljoin(base, "410/410.asis") 846 (response, content) = self.http.request(uri, "GET") 847 self.assertEqual(response.status, 410) 848 849 def testVaryHeaderSimple(self): 850 """RFC 2616 13.6 When the cache receives a subsequent request whose Request-URI specifies one or more cache entries including a Vary header field, the cache MUST NOT use such a cache entry to construct a response to the new request unless all of the selecting request-headers present in the new request match the corresponding stored request-headers in the original request. 851 852 """ 853 # test that the vary header is sent 854 uri = urlparse.urljoin(base, "vary/accept.asis") 855 (response, content) = self.http.request( 856 uri, "GET", headers={"Accept": "text/plain"} 857 ) 858 self.assertEqual(response.status, 200) 859 self.assertTrue("vary" in response) 860 861 # get the resource again, from the cache since accept header in this 862 # request is the same as the request 863 (response, content) = self.http.request( 864 uri, "GET", headers={"Accept": "text/plain"} 865 ) 866 self.assertEqual(response.status, 200) 867 self.assertEqual(response.fromcache, True, msg="Should be from cache") 868 869 # get the resource again, not from cache since Accept headers does not match 870 (response, content) = self.http.request( 871 uri, "GET", headers={"Accept": "text/html"} 872 ) 873 self.assertEqual(response.status, 200) 874 self.assertEqual(response.fromcache, False, msg="Should not be from cache") 875 876 # get the resource again, without any Accept header, so again no match 877 (response, content) = self.http.request(uri, "GET") 878 self.assertEqual(response.status, 200) 879 self.assertEqual(response.fromcache, False, msg="Should not be from cache") 880 881 def testNoVary(self): 882 pass 883 # when there is no vary, a different Accept header (e.g.) should not 884 # impact if the cache is used 885 # test that the vary header is not sent 886 # uri = urlparse.urljoin(base, "vary/no-vary.asis") 887 # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'}) 888 # self.assertEqual(response.status, 200) 889 # self.assertFalse(response.has_key('vary')) 890 891 # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'}) 892 # self.assertEqual(response.status, 200) 893 # self.assertEqual(response.fromcache, True, msg="Should be from cache") 894 # 895 # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'}) 896 # self.assertEqual(response.status, 200) 897 # self.assertEqual(response.fromcache, True, msg="Should be from cache") 898 899 def testVaryHeaderDouble(self): 900 uri = urlparse.urljoin(base, "vary/accept-double.asis") 901 (response, content) = self.http.request( 902 uri, 903 "GET", 904 headers={ 905 "Accept": "text/plain", 906 "Accept-Language": "da, en-gb;q=0.8, en;q=0.7", 907 }, 908 ) 909 self.assertEqual(response.status, 200) 910 self.assertTrue("vary" in response) 911 912 # we are from cache 913 (response, content) = self.http.request( 914 uri, 915 "GET", 916 headers={ 917 "Accept": "text/plain", 918 "Accept-Language": "da, en-gb;q=0.8, en;q=0.7", 919 }, 920 ) 921 self.assertEqual(response.fromcache, True, msg="Should be from cache") 922 923 (response, content) = self.http.request( 924 uri, "GET", headers={"Accept": "text/plain"} 925 ) 926 self.assertEqual(response.status, 200) 927 self.assertEqual(response.fromcache, False) 928 929 # get the resource again, not from cache, varied headers don't match exact 930 (response, content) = self.http.request( 931 uri, "GET", headers={"Accept-Language": "da"} 932 ) 933 self.assertEqual(response.status, 200) 934 self.assertEqual(response.fromcache, False, msg="Should not be from cache") 935 936 def testVaryUnusedHeader(self): 937 # A header's value is not considered to vary if it's not used at all. 938 uri = urlparse.urljoin(base, "vary/unused-header.asis") 939 (response, content) = self.http.request( 940 uri, "GET", headers={"Accept": "text/plain"} 941 ) 942 self.assertEqual(response.status, 200) 943 self.assertTrue("vary" in response) 944 945 # we are from cache 946 (response, content) = self.http.request( 947 uri, "GET", headers={"Accept": "text/plain"} 948 ) 949 self.assertEqual(response.fromcache, True, msg="Should be from cache") 950 951 def testHeadGZip(self): 952 # Test that we don't try to decompress a HEAD response 953 uri = urlparse.urljoin(base, "gzip/final-destination.txt") 954 (response, content) = self.http.request(uri, "HEAD") 955 self.assertEqual(response.status, 200) 956 self.assertNotEqual(int(response["content-length"]), 0) 957 self.assertEqual(content, "") 958 959 def testGetGZip(self): 960 # Test that we support gzip compression 961 uri = urlparse.urljoin(base, "gzip/final-destination.txt") 962 (response, content) = self.http.request(uri, "GET") 963 self.assertEqual(response.status, 200) 964 self.assertFalse("content-encoding" in response) 965 self.assertTrue("-content-encoding" in response) 966 self.assertEqual( 967 int(response["content-length"]), len("This is the final destination.\n") 968 ) 969 self.assertEqual(content, "This is the final destination.\n") 970 971 def testPostAndGZipResponse(self): 972 uri = urlparse.urljoin(base, "gzip/post.cgi") 973 (response, content) = self.http.request(uri, "POST", body=" ") 974 self.assertEqual(response.status, 200) 975 self.assertFalse("content-encoding" in response) 976 self.assertTrue("-content-encoding" in response) 977 978 def testGetGZipFailure(self): 979 # Test that we raise a good exception when the gzip fails 980 self.http.force_exception_to_status_code = False 981 uri = urlparse.urljoin(base, "gzip/failed-compression.asis") 982 try: 983 (response, content) = self.http.request(uri, "GET") 984 self.fail("Should never reach here") 985 except httplib2.FailedToDecompressContent: 986 pass 987 except Exception: 988 self.fail("Threw wrong kind of exception") 989 990 # Re-run the test with out the exceptions 991 self.http.force_exception_to_status_code = True 992 993 (response, content) = self.http.request(uri, "GET") 994 self.assertEqual(response.status, 500) 995 self.assertTrue(response.reason.startswith("Content purported")) 996 997 def testTimeout(self): 998 self.http.force_exception_to_status_code = True 999 uri = urlparse.urljoin(base, "timeout/timeout.cgi") 1000 try: 1001 import socket 1002 1003 socket.setdefaulttimeout(1) 1004 except: 1005 # Don't run the test if we can't set the timeout 1006 return 1007 (response, content) = self.http.request(uri) 1008 self.assertEqual(response.status, 408) 1009 self.assertTrue(response.reason.startswith("Request Timeout")) 1010 self.assertTrue(content.startswith("Request Timeout")) 1011 1012 def testIndividualTimeout(self): 1013 uri = urlparse.urljoin(base, "timeout/timeout.cgi") 1014 http = httplib2.Http(timeout=1) 1015 http.force_exception_to_status_code = True 1016 1017 (response, content) = http.request(uri) 1018 self.assertEqual(response.status, 408) 1019 self.assertTrue(response.reason.startswith("Request Timeout")) 1020 self.assertTrue(content.startswith("Request Timeout")) 1021 1022 def testHTTPSInitTimeout(self): 1023 c = httplib2.HTTPSConnectionWithTimeout("localhost", 80, timeout=47) 1024 self.assertEqual(47, c.timeout) 1025 1026 def testGetDeflate(self): 1027 # Test that we support deflate compression 1028 uri = urlparse.urljoin(base, "deflate/deflated.asis") 1029 (response, content) = self.http.request(uri, "GET") 1030 self.assertEqual(response.status, 200) 1031 self.assertFalse("content-encoding" in response) 1032 self.assertEqual( 1033 int(response["content-length"]), len("This is the final destination.") 1034 ) 1035 self.assertEqual(content, "This is the final destination.") 1036 1037 def testGetDeflateFailure(self): 1038 # Test that we raise a good exception when the deflate fails 1039 self.http.force_exception_to_status_code = False 1040 1041 uri = urlparse.urljoin(base, "deflate/failed-compression.asis") 1042 try: 1043 (response, content) = self.http.request(uri, "GET") 1044 self.fail("Should never reach here") 1045 except httplib2.FailedToDecompressContent: 1046 pass 1047 except Exception: 1048 self.fail("Threw wrong kind of exception") 1049 1050 # Re-run the test with out the exceptions 1051 self.http.force_exception_to_status_code = True 1052 1053 (response, content) = self.http.request(uri, "GET") 1054 self.assertEqual(response.status, 500) 1055 self.assertTrue(response.reason.startswith("Content purported")) 1056 1057 def testGetDuplicateHeaders(self): 1058 # Test that duplicate headers get concatenated via ',' 1059 uri = urlparse.urljoin(base, "duplicate-headers/multilink.asis") 1060 (response, content) = self.http.request(uri, "GET") 1061 self.assertEqual(response.status, 200) 1062 self.assertEqual(content, "This is content\n") 1063 self.assertEqual( 1064 response["link"].split(",")[0], 1065 '<http://bitworking.org>; rel="home"; title="BitWorking"', 1066 ) 1067 1068 def testGetCacheControlNoCache(self): 1069 # Test Cache-Control: no-cache on requests 1070 uri = urlparse.urljoin(base, "304/test_etag.txt") 1071 (response, content) = self.http.request( 1072 uri, "GET", headers={"accept-encoding": "identity"} 1073 ) 1074 self.assertNotEqual(response["etag"], "") 1075 (response, content) = self.http.request( 1076 uri, "GET", headers={"accept-encoding": "identity"} 1077 ) 1078 self.assertEqual(response.status, 200) 1079 self.assertEqual(response.fromcache, True) 1080 1081 (response, content) = self.http.request( 1082 uri, 1083 "GET", 1084 headers={"accept-encoding": "identity", "Cache-Control": "no-cache"}, 1085 ) 1086 self.assertEqual(response.status, 200) 1087 self.assertEqual(response.fromcache, False) 1088 1089 def testGetCacheControlPragmaNoCache(self): 1090 # Test Pragma: no-cache on requests 1091 uri = urlparse.urljoin(base, "304/test_etag.txt") 1092 (response, content) = self.http.request( 1093 uri, "GET", headers={"accept-encoding": "identity"} 1094 ) 1095 self.assertNotEqual(response["etag"], "") 1096 (response, content) = self.http.request( 1097 uri, "GET", headers={"accept-encoding": "identity"} 1098 ) 1099 self.assertEqual(response.status, 200) 1100 self.assertEqual(response.fromcache, True) 1101 1102 (response, content) = self.http.request( 1103 uri, "GET", headers={"accept-encoding": "identity", "Pragma": "no-cache"} 1104 ) 1105 self.assertEqual(response.status, 200) 1106 self.assertEqual(response.fromcache, False) 1107 1108 def testGetCacheControlNoStoreRequest(self): 1109 # A no-store request means that the response should not be stored. 1110 uri = urlparse.urljoin(base, "304/test_etag.txt") 1111 1112 (response, content) = self.http.request( 1113 uri, "GET", headers={"Cache-Control": "no-store"} 1114 ) 1115 self.assertEqual(response.status, 200) 1116 self.assertEqual(response.fromcache, False) 1117 1118 (response, content) = self.http.request( 1119 uri, "GET", headers={"Cache-Control": "no-store"} 1120 ) 1121 self.assertEqual(response.status, 200) 1122 self.assertEqual(response.fromcache, False) 1123 1124 def testGetCacheControlNoStoreResponse(self): 1125 # A no-store response means that the response should not be stored. 1126 uri = urlparse.urljoin(base, "no-store/no-store.asis") 1127 1128 (response, content) = self.http.request(uri, "GET") 1129 self.assertEqual(response.status, 200) 1130 self.assertEqual(response.fromcache, False) 1131 1132 (response, content) = self.http.request(uri, "GET") 1133 self.assertEqual(response.status, 200) 1134 self.assertEqual(response.fromcache, False) 1135 1136 def testGetCacheControlNoCacheNoStoreRequest(self): 1137 # Test that a no-store, no-cache clears the entry from the cache 1138 # even if it was cached previously. 1139 uri = urlparse.urljoin(base, "304/test_etag.txt") 1140 1141 (response, content) = self.http.request(uri, "GET") 1142 (response, content) = self.http.request(uri, "GET") 1143 self.assertEqual(response.fromcache, True) 1144 (response, content) = self.http.request( 1145 uri, "GET", headers={"Cache-Control": "no-store, no-cache"} 1146 ) 1147 (response, content) = self.http.request( 1148 uri, "GET", headers={"Cache-Control": "no-store, no-cache"} 1149 ) 1150 self.assertEqual(response.status, 200) 1151 self.assertEqual(response.fromcache, False) 1152 1153 def testUpdateInvalidatesCache(self): 1154 # Test that calling PUT or DELETE on a 1155 # URI that is cache invalidates that cache. 1156 uri = urlparse.urljoin(base, "304/test_etag.txt") 1157 1158 (response, content) = self.http.request(uri, "GET") 1159 (response, content) = self.http.request(uri, "GET") 1160 self.assertEqual(response.fromcache, True) 1161 (response, content) = self.http.request(uri, "DELETE") 1162 self.assertEqual(response.status, 405) 1163 1164 (response, content) = self.http.request(uri, "GET") 1165 self.assertEqual(response.fromcache, False) 1166 1167 def testUpdateUsesCachedETag(self): 1168 # Test that we natively support http://www.w3.org/1999/04/Editing/ 1169 uri = urlparse.urljoin(base, "conditional-updates/test.cgi") 1170 1171 (response, content) = self.http.request(uri, "GET") 1172 self.assertEqual(response.status, 200) 1173 self.assertEqual(response.fromcache, False) 1174 (response, content) = self.http.request(uri, "GET") 1175 self.assertEqual(response.status, 200) 1176 self.assertEqual(response.fromcache, True) 1177 (response, content) = self.http.request(uri, "PUT", body="foo") 1178 self.assertEqual(response.status, 200) 1179 (response, content) = self.http.request(uri, "PUT", body="foo") 1180 self.assertEqual(response.status, 412) 1181 1182 def testUpdatePatchUsesCachedETag(self): 1183 # Test that we natively support http://www.w3.org/1999/04/Editing/ 1184 uri = urlparse.urljoin(base, "conditional-updates/test.cgi") 1185 1186 (response, content) = self.http.request(uri, "GET") 1187 self.assertEqual(response.status, 200) 1188 self.assertEqual(response.fromcache, False) 1189 (response, content) = self.http.request(uri, "GET") 1190 self.assertEqual(response.status, 200) 1191 self.assertEqual(response.fromcache, True) 1192 (response, content) = self.http.request(uri, "PATCH", body="foo") 1193 self.assertEqual(response.status, 200) 1194 (response, content) = self.http.request(uri, "PATCH", body="foo") 1195 self.assertEqual(response.status, 412) 1196 1197 def testUpdateUsesCachedETagAndOCMethod(self): 1198 # Test that we natively support http://www.w3.org/1999/04/Editing/ 1199 uri = urlparse.urljoin(base, "conditional-updates/test.cgi") 1200 1201 (response, content) = self.http.request(uri, "GET") 1202 self.assertEqual(response.status, 200) 1203 self.assertEqual(response.fromcache, False) 1204 (response, content) = self.http.request(uri, "GET") 1205 self.assertEqual(response.status, 200) 1206 self.assertEqual(response.fromcache, True) 1207 self.http.optimistic_concurrency_methods.append("DELETE") 1208 (response, content) = self.http.request(uri, "DELETE") 1209 self.assertEqual(response.status, 200) 1210 1211 def testUpdateUsesCachedETagOverridden(self): 1212 # Test that we natively support http://www.w3.org/1999/04/Editing/ 1213 uri = urlparse.urljoin(base, "conditional-updates/test.cgi") 1214 1215 (response, content) = self.http.request(uri, "GET") 1216 self.assertEqual(response.status, 200) 1217 self.assertEqual(response.fromcache, False) 1218 (response, content) = self.http.request(uri, "GET") 1219 self.assertEqual(response.status, 200) 1220 self.assertEqual(response.fromcache, True) 1221 (response, content) = self.http.request( 1222 uri, "PUT", body="foo", headers={"if-match": "fred"} 1223 ) 1224 self.assertEqual(response.status, 412) 1225 1226 def testBasicAuth(self): 1227 # Test Basic Authentication 1228 uri = urlparse.urljoin(base, "basic/file.txt") 1229 (response, content) = self.http.request(uri, "GET") 1230 self.assertEqual(response.status, 401) 1231 1232 uri = urlparse.urljoin(base, "basic/") 1233 (response, content) = self.http.request(uri, "GET") 1234 self.assertEqual(response.status, 401) 1235 1236 self.http.add_credentials("joe", "password") 1237 (response, content) = self.http.request(uri, "GET") 1238 self.assertEqual(response.status, 200) 1239 1240 uri = urlparse.urljoin(base, "basic/file.txt") 1241 (response, content) = self.http.request(uri, "GET") 1242 self.assertEqual(response.status, 200) 1243 1244 def testBasicAuthWithDomain(self): 1245 # Test Basic Authentication 1246 uri = urlparse.urljoin(base, "basic/file.txt") 1247 (response, content) = self.http.request(uri, "GET") 1248 self.assertEqual(response.status, 401) 1249 1250 uri = urlparse.urljoin(base, "basic/") 1251 (response, content) = self.http.request(uri, "GET") 1252 self.assertEqual(response.status, 401) 1253 1254 self.http.add_credentials("joe", "password", "example.org") 1255 (response, content) = self.http.request(uri, "GET") 1256 self.assertEqual(response.status, 401) 1257 1258 uri = urlparse.urljoin(base, "basic/file.txt") 1259 (response, content) = self.http.request(uri, "GET") 1260 self.assertEqual(response.status, 401) 1261 1262 domain = urlparse.urlparse(base)[1] 1263 self.http.add_credentials("joe", "password", domain) 1264 (response, content) = self.http.request(uri, "GET") 1265 self.assertEqual(response.status, 200) 1266 1267 uri = urlparse.urljoin(base, "basic/file.txt") 1268 (response, content) = self.http.request(uri, "GET") 1269 self.assertEqual(response.status, 200) 1270 1271 def testBasicAuthTwoDifferentCredentials(self): 1272 # Test Basic Authentication with multiple sets of credentials 1273 uri = urlparse.urljoin(base, "basic2/file.txt") 1274 (response, content) = self.http.request(uri, "GET") 1275 self.assertEqual(response.status, 401) 1276 1277 uri = urlparse.urljoin(base, "basic2/") 1278 (response, content) = self.http.request(uri, "GET") 1279 self.assertEqual(response.status, 401) 1280 1281 self.http.add_credentials("fred", "barney") 1282 (response, content) = self.http.request(uri, "GET") 1283 self.assertEqual(response.status, 200) 1284 1285 uri = urlparse.urljoin(base, "basic2/file.txt") 1286 (response, content) = self.http.request(uri, "GET") 1287 self.assertEqual(response.status, 200) 1288 1289 def testBasicAuthNested(self): 1290 # Test Basic Authentication with resources 1291 # that are nested 1292 uri = urlparse.urljoin(base, "basic-nested/") 1293 (response, content) = self.http.request(uri, "GET") 1294 self.assertEqual(response.status, 401) 1295 1296 uri = urlparse.urljoin(base, "basic-nested/subdir") 1297 (response, content) = self.http.request(uri, "GET") 1298 self.assertEqual(response.status, 401) 1299 1300 # Now add in credentials one at a time and test. 1301 self.http.add_credentials("joe", "password") 1302 1303 uri = urlparse.urljoin(base, "basic-nested/") 1304 (response, content) = self.http.request(uri, "GET") 1305 self.assertEqual(response.status, 200) 1306 1307 uri = urlparse.urljoin(base, "basic-nested/subdir") 1308 (response, content) = self.http.request(uri, "GET") 1309 self.assertEqual(response.status, 401) 1310 1311 self.http.add_credentials("fred", "barney") 1312 1313 uri = urlparse.urljoin(base, "basic-nested/") 1314 (response, content) = self.http.request(uri, "GET") 1315 self.assertEqual(response.status, 200) 1316 1317 uri = urlparse.urljoin(base, "basic-nested/subdir") 1318 (response, content) = self.http.request(uri, "GET") 1319 self.assertEqual(response.status, 200) 1320 1321 def testDigestAuth(self): 1322 # Test that we support Digest Authentication 1323 uri = urlparse.urljoin(base, "digest/") 1324 (response, content) = self.http.request(uri, "GET") 1325 self.assertEqual(response.status, 401) 1326 1327 self.http.add_credentials("joe", "password") 1328 (response, content) = self.http.request(uri, "GET") 1329 self.assertEqual(response.status, 200) 1330 1331 uri = urlparse.urljoin(base, "digest/file.txt") 1332 (response, content) = self.http.request(uri, "GET") 1333 1334 def testDigestAuthNextNonceAndNC(self): 1335 # Test that if the server sets nextnonce that we reset 1336 # the nonce count back to 1 1337 uri = urlparse.urljoin(base, "digest/file.txt") 1338 self.http.add_credentials("joe", "password") 1339 (response, content) = self.http.request( 1340 uri, "GET", headers={"cache-control": "no-cache"} 1341 ) 1342 info = httplib2._parse_www_authenticate(response, "authentication-info") 1343 self.assertEqual(response.status, 200) 1344 (response, content) = self.http.request( 1345 uri, "GET", headers={"cache-control": "no-cache"} 1346 ) 1347 info2 = httplib2._parse_www_authenticate(response, "authentication-info") 1348 self.assertEqual(response.status, 200) 1349 1350 if "nextnonce" in info: 1351 self.assertEqual(info2["nc"], 1) 1352 1353 def testDigestAuthStale(self): 1354 # Test that we can handle a nonce becoming stale 1355 uri = urlparse.urljoin(base, "digest-expire/file.txt") 1356 self.http.add_credentials("joe", "password") 1357 (response, content) = self.http.request( 1358 uri, "GET", headers={"cache-control": "no-cache"} 1359 ) 1360 info = httplib2._parse_www_authenticate(response, "authentication-info") 1361 self.assertEqual(response.status, 200) 1362 1363 time.sleep(3) 1364 # Sleep long enough that the nonce becomes stale 1365 1366 (response, content) = self.http.request( 1367 uri, "GET", headers={"cache-control": "no-cache"} 1368 ) 1369 self.assertFalse(response.fromcache) 1370 self.assertTrue(response._stale_digest) 1371 info3 = httplib2._parse_www_authenticate(response, "authentication-info") 1372 self.assertEqual(response.status, 200) 1373 1374 def reflector(self, content): 1375 return dict([tuple(x.split("=", 1)) for x in content.strip().split("\n")]) 1376 1377 def testReflector(self): 1378 uri = urlparse.urljoin(base, "reflector/reflector.cgi") 1379 (response, content) = self.http.request(uri, "GET") 1380 d = self.reflector(content) 1381 self.assertTrue("HTTP_USER_AGENT" in d) 1382 1383 def testConnectionClose(self): 1384 uri = "http://www.google.com/" 1385 (response, content) = self.http.request(uri, "GET") 1386 for c in self.http.connections.values(): 1387 self.assertNotEqual(None, c.sock) 1388 (response, content) = self.http.request( 1389 uri, "GET", headers={"connection": "close"} 1390 ) 1391 for c in self.http.connections.values(): 1392 self.assertEqual(None, c.sock) 1393 1394 def testPickleHttp(self): 1395 pickled_http = pickle.dumps(self.http) 1396 new_http = pickle.loads(pickled_http) 1397 1398 self.assertEqual( 1399 sorted(new_http.__dict__.keys()), sorted(self.http.__dict__.keys()) 1400 ) 1401 for key in new_http.__dict__: 1402 if key in ("certificates", "credentials"): 1403 self.assertEqual( 1404 new_http.__dict__[key].credentials, 1405 self.http.__dict__[key].credentials, 1406 ) 1407 elif key == "cache": 1408 self.assertEqual( 1409 new_http.__dict__[key].cache, self.http.__dict__[key].cache 1410 ) 1411 else: 1412 self.assertEqual(new_http.__dict__[key], self.http.__dict__[key]) 1413 1414 def testPickleHttpWithConnection(self): 1415 self.http.request("http://bitworking.org", connection_type=_MyHTTPConnection) 1416 pickled_http = pickle.dumps(self.http) 1417 new_http = pickle.loads(pickled_http) 1418 1419 self.assertEqual(self.http.connections.keys(), ["http:bitworking.org"]) 1420 self.assertEqual(new_http.connections, {}) 1421 1422 def testPickleCustomRequestHttp(self): 1423 def dummy_request(*args, **kwargs): 1424 return new_request(*args, **kwargs) 1425 1426 dummy_request.dummy_attr = "dummy_value" 1427 1428 self.http.request = dummy_request 1429 pickled_http = pickle.dumps(self.http) 1430 self.assertFalse("S'request'" in pickled_http) 1431 1432 1433try: 1434 import memcache 1435 1436 class HttpTestMemCached(HttpTest): 1437 def setUp(self): 1438 self.cache = memcache.Client(["127.0.0.1:11211"], debug=0) 1439 # self.cache = memcache.Client(['10.0.0.4:11211'], debug=1) 1440 self.http = httplib2.Http(self.cache) 1441 self.cache.flush_all() 1442 # Not exactly sure why the sleep is needed here, but 1443 # if not present then some unit tests that rely on caching 1444 # fail. Memcached seems to lose some sets immediately 1445 # after a flush_all if the set is to a value that 1446 # was previously cached. (Maybe the flush is handled async?) 1447 time.sleep(1) 1448 self.http.clear_credentials() 1449 1450 1451except: 1452 pass 1453 1454# ------------------------------------------------------------------------ 1455 1456 1457class HttpPrivateTest(unittest.TestCase): 1458 def testParseCacheControl(self): 1459 # Test that we can parse the Cache-Control header 1460 self.assertEqual({}, httplib2._parse_cache_control({})) 1461 self.assertEqual( 1462 {"no-cache": 1}, 1463 httplib2._parse_cache_control({"cache-control": " no-cache"}), 1464 ) 1465 cc = httplib2._parse_cache_control( 1466 {"cache-control": " no-cache, max-age = 7200"} 1467 ) 1468 self.assertEqual(cc["no-cache"], 1) 1469 self.assertEqual(cc["max-age"], "7200") 1470 cc = httplib2._parse_cache_control({"cache-control": " , "}) 1471 self.assertEqual(cc[""], 1) 1472 1473 try: 1474 cc = httplib2._parse_cache_control( 1475 {"cache-control": "Max-age=3600;post-check=1800,pre-check=3600"} 1476 ) 1477 self.assertTrue("max-age" in cc) 1478 except: 1479 self.fail("Should not throw exception") 1480 1481 def testNormalizeHeaders(self): 1482 # Test that we normalize headers to lowercase 1483 h = httplib2._normalize_headers({"Cache-Control": "no-cache", "Other": "Stuff"}) 1484 self.assertTrue("cache-control" in h) 1485 self.assertTrue("other" in h) 1486 self.assertEqual("Stuff", h["other"]) 1487 1488 def testExpirationModelTransparent(self): 1489 # Test that no-cache makes our request TRANSPARENT 1490 response_headers = {"cache-control": "max-age=7200"} 1491 request_headers = {"cache-control": "no-cache"} 1492 self.assertEqual( 1493 "TRANSPARENT", 1494 httplib2._entry_disposition(response_headers, request_headers), 1495 ) 1496 1497 def testMaxAgeNonNumeric(self): 1498 # Test that no-cache makes our request TRANSPARENT 1499 response_headers = {"cache-control": "max-age=fred, min-fresh=barney"} 1500 request_headers = {} 1501 self.assertEqual( 1502 "STALE", httplib2._entry_disposition(response_headers, request_headers) 1503 ) 1504 1505 def testExpirationModelNoCacheResponse(self): 1506 # The date and expires point to an entry that should be 1507 # FRESH, but the no-cache over-rides that. 1508 now = time.time() 1509 response_headers = { 1510 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)), 1511 "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 4)), 1512 "cache-control": "no-cache", 1513 } 1514 request_headers = {} 1515 self.assertEqual( 1516 "STALE", httplib2._entry_disposition(response_headers, request_headers) 1517 ) 1518 1519 def testExpirationModelStaleRequestMustReval(self): 1520 # must-revalidate forces STALE 1521 self.assertEqual( 1522 "STALE", 1523 httplib2._entry_disposition({}, {"cache-control": "must-revalidate"}), 1524 ) 1525 1526 def testExpirationModelStaleResponseMustReval(self): 1527 # must-revalidate forces STALE 1528 self.assertEqual( 1529 "STALE", 1530 httplib2._entry_disposition({"cache-control": "must-revalidate"}, {}), 1531 ) 1532 1533 def testExpirationModelFresh(self): 1534 response_headers = { 1535 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()), 1536 "cache-control": "max-age=2", 1537 } 1538 request_headers = {} 1539 self.assertEqual( 1540 "FRESH", httplib2._entry_disposition(response_headers, request_headers) 1541 ) 1542 time.sleep(3) 1543 self.assertEqual( 1544 "STALE", httplib2._entry_disposition(response_headers, request_headers) 1545 ) 1546 1547 def testExpirationMaxAge0(self): 1548 response_headers = { 1549 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()), 1550 "cache-control": "max-age=0", 1551 } 1552 request_headers = {} 1553 self.assertEqual( 1554 "STALE", httplib2._entry_disposition(response_headers, request_headers) 1555 ) 1556 1557 def testExpirationModelDateAndExpires(self): 1558 now = time.time() 1559 response_headers = { 1560 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)), 1561 "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 2)), 1562 } 1563 request_headers = {} 1564 self.assertEqual( 1565 "FRESH", httplib2._entry_disposition(response_headers, request_headers) 1566 ) 1567 time.sleep(3) 1568 self.assertEqual( 1569 "STALE", httplib2._entry_disposition(response_headers, request_headers) 1570 ) 1571 1572 def testExpiresZero(self): 1573 now = time.time() 1574 response_headers = { 1575 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)), 1576 "expires": "0", 1577 } 1578 request_headers = {} 1579 self.assertEqual( 1580 "STALE", httplib2._entry_disposition(response_headers, request_headers) 1581 ) 1582 1583 def testExpirationModelDateOnly(self): 1584 now = time.time() 1585 response_headers = { 1586 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 3)) 1587 } 1588 request_headers = {} 1589 self.assertEqual( 1590 "STALE", httplib2._entry_disposition(response_headers, request_headers) 1591 ) 1592 1593 def testExpirationModelOnlyIfCached(self): 1594 response_headers = {} 1595 request_headers = {"cache-control": "only-if-cached"} 1596 self.assertEqual( 1597 "FRESH", httplib2._entry_disposition(response_headers, request_headers) 1598 ) 1599 1600 def testExpirationModelMaxAgeBoth(self): 1601 now = time.time() 1602 response_headers = { 1603 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)), 1604 "cache-control": "max-age=2", 1605 } 1606 request_headers = {"cache-control": "max-age=0"} 1607 self.assertEqual( 1608 "STALE", httplib2._entry_disposition(response_headers, request_headers) 1609 ) 1610 1611 def testExpirationModelDateAndExpiresMinFresh1(self): 1612 now = time.time() 1613 response_headers = { 1614 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)), 1615 "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 2)), 1616 } 1617 request_headers = {"cache-control": "min-fresh=2"} 1618 self.assertEqual( 1619 "STALE", httplib2._entry_disposition(response_headers, request_headers) 1620 ) 1621 1622 def testExpirationModelDateAndExpiresMinFresh2(self): 1623 now = time.time() 1624 response_headers = { 1625 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)), 1626 "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 4)), 1627 } 1628 request_headers = {"cache-control": "min-fresh=2"} 1629 self.assertEqual( 1630 "FRESH", httplib2._entry_disposition(response_headers, request_headers) 1631 ) 1632 1633 def testParseWWWAuthenticateEmpty(self): 1634 res = httplib2._parse_www_authenticate({}) 1635 self.assertEqual(len(res.keys()), 0) 1636 1637 def testParseWWWAuthenticate(self): 1638 # different uses of spaces around commas 1639 res = httplib2._parse_www_authenticate( 1640 { 1641 "www-authenticate": 'Test realm="test realm" , foo=foo ,bar="bar", baz=baz,qux=qux' 1642 } 1643 ) 1644 self.assertEqual(len(res.keys()), 1) 1645 self.assertEqual(len(res["test"].keys()), 5) 1646 1647 # tokens with non-alphanum 1648 res = httplib2._parse_www_authenticate( 1649 {"www-authenticate": 'T*!%#st realm=to*!%#en, to*!%#en="quoted string"'} 1650 ) 1651 self.assertEqual(len(res.keys()), 1) 1652 self.assertEqual(len(res["t*!%#st"].keys()), 2) 1653 1654 # quoted string with quoted pairs 1655 res = httplib2._parse_www_authenticate( 1656 {"www-authenticate": 'Test realm="a \\"test\\" realm"'} 1657 ) 1658 self.assertEqual(len(res.keys()), 1) 1659 self.assertEqual(res["test"]["realm"], 'a "test" realm') 1660 1661 def testParseWWWAuthenticateStrict(self): 1662 httplib2.USE_WWW_AUTH_STRICT_PARSING = 1 1663 self.testParseWWWAuthenticate() 1664 httplib2.USE_WWW_AUTH_STRICT_PARSING = 0 1665 1666 def testParseWWWAuthenticateBasic(self): 1667 res = httplib2._parse_www_authenticate({"www-authenticate": 'Basic realm="me"'}) 1668 basic = res["basic"] 1669 self.assertEqual("me", basic["realm"]) 1670 1671 res = httplib2._parse_www_authenticate( 1672 {"www-authenticate": 'Basic realm="me", algorithm="MD5"'} 1673 ) 1674 basic = res["basic"] 1675 self.assertEqual("me", basic["realm"]) 1676 self.assertEqual("MD5", basic["algorithm"]) 1677 1678 res = httplib2._parse_www_authenticate( 1679 {"www-authenticate": 'Basic realm="me", algorithm=MD5'} 1680 ) 1681 basic = res["basic"] 1682 self.assertEqual("me", basic["realm"]) 1683 self.assertEqual("MD5", basic["algorithm"]) 1684 1685 def testParseWWWAuthenticateBasic2(self): 1686 res = httplib2._parse_www_authenticate( 1687 {"www-authenticate": 'Basic realm="me",other="fred" '} 1688 ) 1689 basic = res["basic"] 1690 self.assertEqual("me", basic["realm"]) 1691 self.assertEqual("fred", basic["other"]) 1692 1693 def testParseWWWAuthenticateBasic3(self): 1694 res = httplib2._parse_www_authenticate( 1695 {"www-authenticate": 'Basic REAlm="me" '} 1696 ) 1697 basic = res["basic"] 1698 self.assertEqual("me", basic["realm"]) 1699 1700 def testParseWWWAuthenticateDigest(self): 1701 res = httplib2._parse_www_authenticate( 1702 { 1703 "www-authenticate": 'Digest realm="[email protected]", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41"' 1704 } 1705 ) 1706 digest = res["digest"] 1707 self.assertEqual("[email protected]", digest["realm"]) 1708 self.assertEqual("auth,auth-int", digest["qop"]) 1709 1710 def testParseWWWAuthenticateMultiple(self): 1711 res = httplib2._parse_www_authenticate( 1712 { 1713 "www-authenticate": 'Digest realm="[email protected]", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41" Basic REAlm="me" ' 1714 } 1715 ) 1716 digest = res["digest"] 1717 self.assertEqual("[email protected]", digest["realm"]) 1718 self.assertEqual("auth,auth-int", digest["qop"]) 1719 self.assertEqual("dcd98b7102dd2f0e8b11d0f600bfb0c093", digest["nonce"]) 1720 self.assertEqual("5ccc069c403ebaf9f0171e9517f40e41", digest["opaque"]) 1721 basic = res["basic"] 1722 self.assertEqual("me", basic["realm"]) 1723 1724 def testParseWWWAuthenticateMultiple2(self): 1725 # Handle an added comma between challenges, which might get thrown in if the challenges were 1726 # originally sent in separate www-authenticate headers. 1727 res = httplib2._parse_www_authenticate( 1728 { 1729 "www-authenticate": 'Digest realm="[email protected]", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me" ' 1730 } 1731 ) 1732 digest = res["digest"] 1733 self.assertEqual("[email protected]", digest["realm"]) 1734 self.assertEqual("auth,auth-int", digest["qop"]) 1735 self.assertEqual("dcd98b7102dd2f0e8b11d0f600bfb0c093", digest["nonce"]) 1736 self.assertEqual("5ccc069c403ebaf9f0171e9517f40e41", digest["opaque"]) 1737 basic = res["basic"] 1738 self.assertEqual("me", basic["realm"]) 1739 1740 def testParseWWWAuthenticateMultiple3(self): 1741 # Handle an added comma between challenges, which might get thrown in if the challenges were 1742 # originally sent in separate www-authenticate headers. 1743 res = httplib2._parse_www_authenticate( 1744 { 1745 "www-authenticate": 'Digest realm="[email protected]", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"' 1746 } 1747 ) 1748 digest = res["digest"] 1749 self.assertEqual("[email protected]", digest["realm"]) 1750 self.assertEqual("auth,auth-int", digest["qop"]) 1751 self.assertEqual("dcd98b7102dd2f0e8b11d0f600bfb0c093", digest["nonce"]) 1752 self.assertEqual("5ccc069c403ebaf9f0171e9517f40e41", digest["opaque"]) 1753 basic = res["basic"] 1754 self.assertEqual("me", basic["realm"]) 1755 wsse = res["wsse"] 1756 self.assertEqual("foo", wsse["realm"]) 1757 self.assertEqual("UsernameToken", wsse["profile"]) 1758 1759 def testParseWWWAuthenticateMultiple4(self): 1760 res = httplib2._parse_www_authenticate( 1761 { 1762 "www-authenticate": 'Digest realm="[email protected]", qop \t=\t"\tauth,auth-int", nonce="(*)&^&$%#",opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"' 1763 } 1764 ) 1765 digest = res["digest"] 1766 self.assertEqual("[email protected]", digest["realm"]) 1767 self.assertEqual("\tauth,auth-int", digest["qop"]) 1768 self.assertEqual("(*)&^&$%#", digest["nonce"]) 1769 1770 def testParseWWWAuthenticateMoreQuoteCombos(self): 1771 res = httplib2._parse_www_authenticate( 1772 { 1773 "www-authenticate": 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true' 1774 } 1775 ) 1776 digest = res["digest"] 1777 self.assertEqual("myrealm", digest["realm"]) 1778 1779 def testParseWWWAuthenticateMalformed(self): 1780 try: 1781 res = httplib2._parse_www_authenticate( 1782 { 1783 "www-authenticate": 'OAuth "Facebook Platform" "invalid_token" "Invalid OAuth access token."' 1784 } 1785 ) 1786 self.fail("should raise an exception") 1787 except httplib2.MalformedHeader: 1788 pass 1789 1790 def testDigestObject(self): 1791 credentials = ("joe", "password") 1792 host = None 1793 request_uri = "/projects/httplib2/test/digest/" 1794 headers = {} 1795 response = { 1796 "www-authenticate": 'Digest realm="myrealm", ' 1797 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", ' 1798 'algorithm=MD5, qop="auth"' 1799 } 1800 content = "" 1801 1802 d = httplib2.DigestAuthentication( 1803 credentials, host, request_uri, headers, response, content, None 1804 ) 1805 d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46") 1806 our_request = "authorization: %s" % headers["authorization"] 1807 working_request = ( 1808 'authorization: Digest username="joe", realm="myrealm", ' 1809 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",' 1810 ' uri="/projects/httplib2/test/digest/", algorithm=MD5, ' 1811 'response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, ' 1812 'nc=00000001, cnonce="33033375ec278a46"' 1813 ) 1814 self.assertEqual(our_request, working_request) 1815 1816 def testDigestObjectWithOpaque(self): 1817 credentials = ("joe", "password") 1818 host = None 1819 request_uri = "/projects/httplib2/test/digest/" 1820 headers = {} 1821 response = { 1822 "www-authenticate": 'Digest realm="myrealm", ' 1823 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", ' 1824 'algorithm=MD5, qop="auth", opaque="atestopaque"' 1825 } 1826 content = "" 1827 1828 d = httplib2.DigestAuthentication( 1829 credentials, host, request_uri, headers, response, content, None 1830 ) 1831 d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46") 1832 our_request = "authorization: %s" % headers["authorization"] 1833 working_request = ( 1834 'authorization: Digest username="joe", realm="myrealm", ' 1835 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",' 1836 ' uri="/projects/httplib2/test/digest/", algorithm=MD5, ' 1837 'response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, ' 1838 'nc=00000001, cnonce="33033375ec278a46", ' 1839 'opaque="atestopaque"' 1840 ) 1841 self.assertEqual(our_request, working_request) 1842 1843 def testDigestObjectStale(self): 1844 credentials = ("joe", "password") 1845 host = None 1846 request_uri = "/projects/httplib2/test/digest/" 1847 headers = {} 1848 response = httplib2.Response({}) 1849 response["www-authenticate"] = ( 1850 'Digest realm="myrealm", ' 1851 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",' 1852 ' algorithm=MD5, qop="auth", stale=true' 1853 ) 1854 response.status = 401 1855 content = "" 1856 d = httplib2.DigestAuthentication( 1857 credentials, host, request_uri, headers, response, content, None 1858 ) 1859 # Returns true to force a retry 1860 self.assertTrue(d.response(response, content)) 1861 1862 def testDigestObjectAuthInfo(self): 1863 credentials = ("joe", "password") 1864 host = None 1865 request_uri = "/projects/httplib2/test/digest/" 1866 headers = {} 1867 response = httplib2.Response({}) 1868 response["www-authenticate"] = ( 1869 'Digest realm="myrealm", ' 1870 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",' 1871 ' algorithm=MD5, qop="auth", stale=true' 1872 ) 1873 response["authentication-info"] = 'nextnonce="fred"' 1874 content = "" 1875 d = httplib2.DigestAuthentication( 1876 credentials, host, request_uri, headers, response, content, None 1877 ) 1878 # Returns true to force a retry 1879 self.assertFalse(d.response(response, content)) 1880 self.assertEqual("fred", d.challenge["nonce"]) 1881 self.assertEqual(1, d.challenge["nc"]) 1882 1883 def testWsseAlgorithm(self): 1884 digest = httplib2._wsse_username_token( 1885 "d36e316282959a9ed4c89851497a717f", "2003-12-15T14:43:07Z", "taadtaadpstcsm" 1886 ) 1887 expected = "quR/EWLAV4xLf9Zqyw4pDmfV9OY=" 1888 self.assertEqual(expected, digest) 1889 1890 def testEnd2End(self): 1891 # one end to end header 1892 response = {"content-type": "application/atom+xml", "te": "deflate"} 1893 end2end = httplib2._get_end2end_headers(response) 1894 self.assertTrue("content-type" in end2end) 1895 self.assertTrue("te" not in end2end) 1896 self.assertTrue("connection" not in end2end) 1897 1898 # one end to end header that gets eliminated 1899 response = { 1900 "connection": "content-type", 1901 "content-type": "application/atom+xml", 1902 "te": "deflate", 1903 } 1904 end2end = httplib2._get_end2end_headers(response) 1905 self.assertTrue("content-type" not in end2end) 1906 self.assertTrue("te" not in end2end) 1907 self.assertTrue("connection" not in end2end) 1908 1909 # Degenerate case of no headers 1910 response = {} 1911 end2end = httplib2._get_end2end_headers(response) 1912 self.assertEquals(0, len(end2end)) 1913 1914 # Degenerate case of connection referrring to a header not passed in 1915 response = {"connection": "content-type"} 1916 end2end = httplib2._get_end2end_headers(response) 1917 self.assertEquals(0, len(end2end)) 1918 1919 1920class TestProxyInfo(unittest.TestCase): 1921 def setUp(self): 1922 self.orig_env = dict(os.environ) 1923 1924 def tearDown(self): 1925 os.environ.clear() 1926 os.environ.update(self.orig_env) 1927 1928 def test_from_url(self): 1929 pi = httplib2.proxy_info_from_url("http://myproxy.example.com") 1930 self.assertEquals(pi.proxy_host, "myproxy.example.com") 1931 self.assertEquals(pi.proxy_port, 80) 1932 self.assertEquals(pi.proxy_user, None) 1933 1934 def test_from_url_ident(self): 1935 pi = httplib2.proxy_info_from_url("http://zoidberg:fish@someproxy:99") 1936 self.assertEquals(pi.proxy_host, "someproxy") 1937 self.assertEquals(pi.proxy_port, 99) 1938 self.assertEquals(pi.proxy_user, "zoidberg") 1939 self.assertEquals(pi.proxy_pass, "fish") 1940 1941 def test_from_env(self): 1942 os.environ["http_proxy"] = "http://myproxy.example.com:8080" 1943 pi = httplib2.proxy_info_from_environment() 1944 self.assertEquals(pi.proxy_host, "myproxy.example.com") 1945 self.assertEquals(pi.proxy_port, 8080) 1946 self.assertEquals(pi.bypass_hosts, []) 1947 1948 def test_from_env_no_proxy(self): 1949 os.environ["http_proxy"] = "http://myproxy.example.com:80" 1950 os.environ["https_proxy"] = "http://myproxy.example.com:81" 1951 os.environ["no_proxy"] = "localhost,otherhost.domain.local" 1952 pi = httplib2.proxy_info_from_environment("https") 1953 self.assertEquals(pi.proxy_host, "myproxy.example.com") 1954 self.assertEquals(pi.proxy_port, 81) 1955 self.assertEquals(pi.bypass_hosts, ["localhost", "otherhost.domain.local"]) 1956 1957 def test_from_env_none(self): 1958 os.environ.clear() 1959 pi = httplib2.proxy_info_from_environment() 1960 self.assertEquals(pi, None) 1961 1962 def test_applies_to(self): 1963 os.environ["http_proxy"] = "http://myproxy.example.com:80" 1964 os.environ["https_proxy"] = "http://myproxy.example.com:81" 1965 os.environ["no_proxy"] = "localhost,otherhost.domain.local,example.com" 1966 pi = httplib2.proxy_info_from_environment() 1967 self.assertFalse(pi.applies_to("localhost")) 1968 self.assertTrue(pi.applies_to("www.google.com")) 1969 self.assertFalse(pi.applies_to("www.example.com")) 1970 1971 def test_no_proxy_star(self): 1972 os.environ["http_proxy"] = "http://myproxy.example.com:80" 1973 os.environ["NO_PROXY"] = "*" 1974 pi = httplib2.proxy_info_from_environment() 1975 for host in ("localhost", "169.254.38.192", "www.google.com"): 1976 self.assertFalse(pi.applies_to(host)) 1977 1978 def test_proxy_headers(self): 1979 headers = {"key0": "val0", "key1": "val1"} 1980 pi = httplib2.ProxyInfo( 1981 httplib2.socks.PROXY_TYPE_HTTP, "localhost", 1234, proxy_headers=headers 1982 ) 1983 self.assertEquals(pi.proxy_headers, headers) 1984 1985 1986if __name__ == "__main__": 1987 unittest.main() 1988