1#!/usr/bin/env python3
2"""A set of unit tests for httplib2.py."""
3
4__author__ = "Joe Gregorio ([email protected])"
5__copyright__ = "Copyright 2006, Joe Gregorio"
6__contributors__ = ["Mark Pilgrim"]
7__license__ = "MIT"
8__version__ = "0.2 ($Rev: 118 $)"
9
10import base64
11import http.client
12import httplib2
13import io
14import os
15import pickle
16import socket
17import ssl
18import sys
19import time
20import unittest
21import urllib.parse
22
23base = "http://bitworking.org/projects/httplib2/test/"
24cacheDirName = ".cache"
25
26
27class CredentialsTest(unittest.TestCase):
28    def test(self):
29        c = httplib2.Credentials()
30        c.add("joe", "password")
31        self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
32        self.assertEqual(("joe", "password"), list(c.iter(""))[0])
33        c.add("fred", "password2", "wellformedweb.org")
34        self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
35        self.assertEqual(1, len(list(c.iter("bitworking.org"))))
36        self.assertEqual(2, len(list(c.iter("wellformedweb.org"))))
37        self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
38        c.clear()
39        self.assertEqual(0, len(list(c.iter("bitworking.org"))))
40        c.add("fred", "password2", "wellformedweb.org")
41        self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
42        self.assertEqual(0, len(list(c.iter("bitworking.org"))))
43        self.assertEqual(0, len(list(c.iter(""))))
44
45
46class ParserTest(unittest.TestCase):
47    def testFromStd66(self):
48        self.assertEqual(
49            ("http", "example.com", "", None, None),
50            httplib2.parse_uri("http://example.com"),
51        )
52        self.assertEqual(
53            ("https", "example.com", "", None, None),
54            httplib2.parse_uri("https://example.com"),
55        )
56        self.assertEqual(
57            ("https", "example.com:8080", "", None, None),
58            httplib2.parse_uri("https://example.com:8080"),
59        )
60        self.assertEqual(
61            ("http", "example.com", "/", None, None),
62            httplib2.parse_uri("http://example.com/"),
63        )
64        self.assertEqual(
65            ("http", "example.com", "/path", None, None),
66            httplib2.parse_uri("http://example.com/path"),
67        )
68        self.assertEqual(
69            ("http", "example.com", "/path", "a=1&b=2", None),
70            httplib2.parse_uri("http://example.com/path?a=1&b=2"),
71        )
72        self.assertEqual(
73            ("http", "example.com", "/path", "a=1&b=2", "fred"),
74            httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"),
75        )
76        self.assertEqual(
77            ("http", "example.com", "/path", "a=1&b=2", "fred"),
78            httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"),
79        )
80
81
82class UrlNormTest(unittest.TestCase):
83    def test(self):
84        self.assertEqual(
85            "http://example.org/", httplib2.urlnorm("http://example.org")[-1]
86        )
87        self.assertEqual(
88            "http://example.org/", httplib2.urlnorm("http://EXAMple.org")[-1]
89        )
90        self.assertEqual(
91            "http://example.org/?=b", httplib2.urlnorm("http://EXAMple.org?=b")[-1]
92        )
93        self.assertEqual(
94            "http://example.org/mypath?a=b",
95            httplib2.urlnorm("http://EXAMple.org/mypath?a=b")[-1],
96        )
97        self.assertEqual(
98            "http://localhost:80/", httplib2.urlnorm("http://localhost:80")[-1]
99        )
100        self.assertEqual(
101            httplib2.urlnorm("http://localhost:80/"),
102            httplib2.urlnorm("HTTP://LOCALHOST:80"),
103        )
104        try:
105            httplib2.urlnorm("/")
106            self.fail("Non-absolute URIs should raise an exception")
107        except httplib2.RelativeURIError:
108            pass
109
110
111class UrlSafenameTest(unittest.TestCase):
112    def test(self):
113        # Test that different URIs end up generating different safe names
114        self.assertEqual(
115            "example.org,fred,a=b,58489f63a7a83c3b7794a6a398ee8b1f",
116            httplib2.safename("http://example.org/fred/?a=b"),
117        )
118        self.assertEqual(
119            "example.org,fred,a=b,8c5946d56fec453071f43329ff0be46b",
120            httplib2.safename("http://example.org/fred?/a=b"),
121        )
122        self.assertEqual(
123            "www.example.org,fred,a=b,499c44b8d844a011b67ea2c015116968",
124            httplib2.safename("http://www.example.org/fred?/a=b"),
125        )
126        self.assertEqual(
127            httplib2.safename(httplib2.urlnorm("http://www")[-1]),
128            httplib2.safename(httplib2.urlnorm("http://WWW")[-1]),
129        )
130        self.assertEqual(
131            "www.example.org,fred,a=b,692e843a333484ce0095b070497ab45d",
132            httplib2.safename("https://www.example.org/fred?/a=b"),
133        )
134        self.assertNotEqual(
135            httplib2.safename("http://www"), httplib2.safename("https://www")
136        )
137        # Test the max length limits
138        uri = "http://" + ("w" * 200) + ".org"
139        uri2 = "http://" + ("w" * 201) + ".org"
140        self.assertNotEqual(httplib2.safename(uri2), httplib2.safename(uri))
141        # Max length should be 200 + 1 (",") + 32
142        self.assertEqual(233, len(httplib2.safename(uri2)))
143        self.assertEqual(233, len(httplib2.safename(uri)))
144        # Unicode
145        if sys.version_info >= (2, 3):
146            self.assertEqual(
147                "xn--http,-4y1d.org,fred,a=b,579924c35db315e5a32e3d9963388193",
148                httplib2.safename("http://\u2304.org/fred/?a=b"),
149            )
150
151
152class _MyResponse(io.BytesIO):
153    def __init__(self, body, **kwargs):
154        io.BytesIO.__init__(self, body)
155        self.headers = kwargs
156
157    def items(self):
158        return self.headers.items()
159
160    def iteritems(self):
161        return iter(self.headers.items())
162
163
164class _MyHTTPConnection(object):
165    "This class is just a mock of httplib.HTTPConnection used for testing"
166
167    def __init__(
168        self,
169        host,
170        port=None,
171        key_file=None,
172        cert_file=None,
173        strict=None,
174        timeout=None,
175        proxy_info=None,
176    ):
177        self.host = host
178        self.port = port
179        self.timeout = timeout
180        self.log = ""
181        self.sock = None
182
183    def set_debuglevel(self, level):
184        pass
185
186    def connect(self):
187        "Connect to a host on a given port."
188        pass
189
190    def close(self):
191        pass
192
193    def request(self, method, request_uri, body, headers):
194        pass
195
196    def getresponse(self):
197        return _MyResponse(b"the body", status="200")
198
199
200class _MyHTTPBadStatusConnection(object):
201    "Mock of httplib.HTTPConnection that raises BadStatusLine."
202
203    num_calls = 0
204
205    def __init__(
206        self,
207        host,
208        port=None,
209        key_file=None,
210        cert_file=None,
211        strict=None,
212        timeout=None,
213        proxy_info=None,
214    ):
215        self.host = host
216        self.port = port
217        self.timeout = timeout
218        self.log = ""
219        self.sock = None
220        _MyHTTPBadStatusConnection.num_calls = 0
221
222    def set_debuglevel(self, level):
223        pass
224
225    def connect(self):
226        pass
227
228    def close(self):
229        pass
230
231    def request(self, method, request_uri, body, headers):
232        pass
233
234    def getresponse(self):
235        _MyHTTPBadStatusConnection.num_calls += 1
236        raise http.client.BadStatusLine("")
237
238
239class HttpTest(unittest.TestCase):
240    def setUp(self):
241        if os.path.exists(cacheDirName):
242            [
243                os.remove(os.path.join(cacheDirName, file))
244                for file in os.listdir(cacheDirName)
245            ]
246        self.http = httplib2.Http(cacheDirName)
247        self.http.clear_credentials()
248
249    def testIPv6NoSSL(self):
250        try:
251            self.http.request("http://[::1]/")
252        except socket.gaierror:
253            self.fail("should get the address family right for IPv6")
254        except socket.error:
255            # Even if IPv6 isn't installed on a machine it should just raise socket.error
256            pass
257
258    def testIPv6SSL(self):
259        try:
260            self.http.request("https://[::1]/")
261        except socket.gaierror:
262            self.fail("should get the address family right for IPv6")
263        except socket.error:
264            # Even if IPv6 isn't installed on a machine it should just raise socket.error
265            pass
266
267    def testConnectionType(self):
268        self.http.force_exception_to_status_code = False
269        response, content = self.http.request(
270            "http://bitworking.org", connection_type=_MyHTTPConnection
271        )
272        self.assertEqual(response["content-location"], "http://bitworking.org")
273        self.assertEqual(content, b"the body")
274
275    def testBadStatusLineRetry(self):
276        old_retries = httplib2.RETRIES
277        httplib2.RETRIES = 1
278        self.http.force_exception_to_status_code = False
279        try:
280            response, content = self.http.request(
281                "http://bitworking.org", connection_type=_MyHTTPBadStatusConnection
282            )
283        except http.client.BadStatusLine:
284            self.assertEqual(2, _MyHTTPBadStatusConnection.num_calls)
285        httplib2.RETRIES = old_retries
286
287    def testGetUnknownServer(self):
288        self.http.force_exception_to_status_code = False
289        try:
290            self.http.request("http://fred.bitworking.org/")
291            self.fail(
292                "An httplib2.ServerNotFoundError Exception must be thrown on an unresolvable server."
293            )
294        except httplib2.ServerNotFoundError:
295            pass
296
297        # Now test with exceptions turned off
298        self.http.force_exception_to_status_code = True
299
300        (response, content) = self.http.request("http://fred.bitworking.org/")
301        self.assertEqual(response["content-type"], "text/plain")
302        self.assertTrue(content.startswith(b"Unable to find"))
303        self.assertEqual(response.status, 400)
304
305    def testGetConnectionRefused(self):
306        self.http.force_exception_to_status_code = False
307        try:
308            self.http.request("http://localhost:7777/")
309            self.fail("An socket.error exception must be thrown on Connection Refused.")
310        except socket.error:
311            pass
312
313        # Now test with exceptions turned off
314        self.http.force_exception_to_status_code = True
315
316        (response, content) = self.http.request("http://localhost:7777/")
317        self.assertEqual(response["content-type"], "text/plain")
318        self.assertTrue(b"Connection refused" in content)
319        self.assertEqual(response.status, 400)
320
321    def testGetIRI(self):
322        if sys.version_info >= (2, 3):
323            uri = urllib.parse.urljoin(
324                base, "reflector/reflector.cgi?d=\N{CYRILLIC CAPITAL LETTER DJE}"
325            )
326            (response, content) = self.http.request(uri, "GET")
327            d = self.reflector(content)
328            self.assertTrue("QUERY_STRING" in d)
329            self.assertTrue(d["QUERY_STRING"].find("%D0%82") > 0)
330
331    def testGetIsDefaultMethod(self):
332        # Test that GET is the default method
333        uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi")
334        (response, content) = self.http.request(uri)
335        self.assertEqual(response["x-method"], "GET")
336
337    def testDifferentMethods(self):
338        # Test that all methods can be used
339        uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi")
340        for method in ["GET", "PUT", "DELETE", "POST"]:
341            (response, content) = self.http.request(uri, method, body=b" ")
342            self.assertEqual(response["x-method"], method)
343
344    def testHeadRead(self):
345        # Test that we don't try to read the response of a HEAD request
346        # since httplib blocks response.read() for HEAD requests.
347        # Oddly enough this doesn't appear as a problem when doing HEAD requests
348        # against Apache servers.
349        uri = "http://www.google.com/"
350        (response, content) = self.http.request(uri, "HEAD")
351        self.assertEqual(response.status, 200)
352        self.assertEqual(content, b"")
353
354    def testGetNoCache(self):
355        # Test that can do a GET w/o the cache turned on.
356        http = httplib2.Http()
357        uri = urllib.parse.urljoin(base, "304/test_etag.txt")
358        (response, content) = http.request(uri, "GET")
359        self.assertEqual(response.status, 200)
360        self.assertEqual(response.previous, None)
361
362    def testGetOnlyIfCachedCacheHit(self):
363        # Test that can do a GET with cache and 'only-if-cached'
364        uri = urllib.parse.urljoin(base, "304/test_etag.txt")
365        (response, content) = self.http.request(uri, "GET")
366        (response, content) = self.http.request(
367            uri, "GET", headers={"cache-control": "only-if-cached"}
368        )
369        self.assertEqual(response.fromcache, True)
370        self.assertEqual(response.status, 200)
371
372    def testGetOnlyIfCachedCacheMiss(self):
373        # Test that can do a GET with no cache with 'only-if-cached'
374        uri = urllib.parse.urljoin(base, "304/test_etag.txt")
375        (response, content) = self.http.request(
376            uri, "GET", headers={"cache-control": "only-if-cached"}
377        )
378        self.assertEqual(response.fromcache, False)
379        self.assertEqual(response.status, 504)
380
381    def testGetOnlyIfCachedNoCacheAtAll(self):
382        # Test that can do a GET with no cache with 'only-if-cached'
383        # Of course, there might be an intermediary beyond us
384        # that responds to the 'only-if-cached', so this
385        # test can't really be guaranteed to pass.
386        http = httplib2.Http()
387        uri = urllib.parse.urljoin(base, "304/test_etag.txt")
388        (response, content) = http.request(
389            uri, "GET", headers={"cache-control": "only-if-cached"}
390        )
391        self.assertEqual(response.fromcache, False)
392        self.assertEqual(response.status, 504)
393
394    def testUserAgent(self):
395        # Test that we provide a default user-agent
396        uri = urllib.parse.urljoin(base, "user-agent/test.cgi")
397        (response, content) = self.http.request(uri, "GET")
398        self.assertEqual(response.status, 200)
399        self.assertTrue(content.startswith(b"Python-httplib2/"))
400
401    def testUserAgentNonDefault(self):
402        # Test that the default user-agent can be over-ridden
403
404        uri = urllib.parse.urljoin(base, "user-agent/test.cgi")
405        (response, content) = self.http.request(
406            uri, "GET", headers={"User-Agent": "fred/1.0"}
407        )
408        self.assertEqual(response.status, 200)
409        self.assertTrue(content.startswith(b"fred/1.0"))
410
411    def testGet300WithLocation(self):
412        # Test the we automatically follow 300 redirects if a Location: header is provided
413        uri = urllib.parse.urljoin(base, "300/with-location-header.asis")
414        (response, content) = self.http.request(uri, "GET")
415        self.assertEqual(response.status, 200)
416        self.assertEqual(content, b"This is the final destination.\n")
417        self.assertEqual(response.previous.status, 300)
418        self.assertEqual(response.previous.fromcache, False)
419
420        # Confirm that the intermediate 300 is not cached
421        (response, content) = self.http.request(uri, "GET")
422        self.assertEqual(response.status, 200)
423        self.assertEqual(content, b"This is the final destination.\n")
424        self.assertEqual(response.previous.status, 300)
425        self.assertEqual(response.previous.fromcache, False)
426
427    def testGet300WithLocationNoRedirect(self):
428        # Test the we automatically follow 300 redirects if a Location: header is provided
429        self.http.follow_redirects = False
430        uri = urllib.parse.urljoin(base, "300/with-location-header.asis")
431        (response, content) = self.http.request(uri, "GET")
432        self.assertEqual(response.status, 300)
433
434    def testGet300WithoutLocation(self):
435        # Not giving a Location: header in a 300 response is acceptable
436        # In which case we just return the 300 response
437        uri = urllib.parse.urljoin(base, "300/without-location-header.asis")
438        (response, content) = self.http.request(uri, "GET")
439        self.assertEqual(response.status, 300)
440        self.assertTrue(response["content-type"].startswith("text/html"))
441        self.assertEqual(response.previous, None)
442
443    def testGet301(self):
444        # Test that we automatically follow 301 redirects
445        # and that we cache the 301 response
446        uri = urllib.parse.urljoin(base, "301/onestep.asis")
447        destination = urllib.parse.urljoin(base, "302/final-destination.txt")
448        (response, content) = self.http.request(uri, "GET")
449        self.assertEqual(response.status, 200)
450        self.assertTrue("content-location" in response)
451        self.assertEqual(response["content-location"], destination)
452        self.assertEqual(content, b"This is the final destination.\n")
453        self.assertEqual(response.previous.status, 301)
454        self.assertEqual(response.previous.fromcache, False)
455
456        (response, content) = self.http.request(uri, "GET")
457        self.assertEqual(response.status, 200)
458        self.assertEqual(response["content-location"], destination)
459        self.assertEqual(content, b"This is the final destination.\n")
460        self.assertEqual(response.previous.status, 301)
461        self.assertEqual(response.previous.fromcache, True)
462
463    def testHead301(self):
464        # Test that we automatically follow 301 redirects
465        uri = urllib.parse.urljoin(base, "301/onestep.asis")
466        (response, content) = self.http.request(uri, "HEAD")
467        self.assertEqual(response.status, 200)
468        self.assertEqual(response.previous.status, 301)
469        self.assertEqual(response.previous.fromcache, False)
470
471    def testGet301NoRedirect(self):
472        # Test that we automatically follow 301 redirects
473        # and that we cache the 301 response
474        self.http.follow_redirects = False
475        uri = urllib.parse.urljoin(base, "301/onestep.asis")
476        destination = urllib.parse.urljoin(base, "302/final-destination.txt")
477        (response, content) = self.http.request(uri, "GET")
478        self.assertEqual(response.status, 301)
479
480    def testGet302(self):
481        # Test that we automatically follow 302 redirects
482        # and that we DO NOT cache the 302 response
483        uri = urllib.parse.urljoin(base, "302/onestep.asis")
484        destination = urllib.parse.urljoin(base, "302/final-destination.txt")
485        (response, content) = self.http.request(uri, "GET")
486        self.assertEqual(response.status, 200)
487        self.assertEqual(response["content-location"], destination)
488        self.assertEqual(content, b"This is the final destination.\n")
489        self.assertEqual(response.previous.status, 302)
490        self.assertEqual(response.previous.fromcache, False)
491
492        uri = urllib.parse.urljoin(base, "302/onestep.asis")
493        (response, content) = self.http.request(uri, "GET")
494        self.assertEqual(response.status, 200)
495        self.assertEqual(response.fromcache, True)
496        self.assertEqual(response["content-location"], destination)
497        self.assertEqual(content, b"This is the final destination.\n")
498        self.assertEqual(response.previous.status, 302)
499        self.assertEqual(response.previous.fromcache, False)
500        self.assertEqual(response.previous["content-location"], uri)
501
502        uri = urllib.parse.urljoin(base, "302/twostep.asis")
503
504        (response, content) = self.http.request(uri, "GET")
505        self.assertEqual(response.status, 200)
506        self.assertEqual(response.fromcache, True)
507        self.assertEqual(content, b"This is the final destination.\n")
508        self.assertEqual(response.previous.status, 302)
509        self.assertEqual(response.previous.fromcache, False)
510
511    def testGet302RedirectionLimit(self):
512        # Test that we can set a lower redirection limit
513        # and that we raise an exception when we exceed
514        # that limit.
515        self.http.force_exception_to_status_code = False
516
517        uri = urllib.parse.urljoin(base, "302/twostep.asis")
518        try:
519            (response, content) = self.http.request(uri, "GET", redirections=1)
520            self.fail("This should not happen")
521        except httplib2.RedirectLimit:
522            pass
523        except Exception as e:
524            self.fail("Threw wrong kind of exception ")
525
526        # Re-run the test with out the exceptions
527        self.http.force_exception_to_status_code = True
528
529        (response, content) = self.http.request(uri, "GET", redirections=1)
530        self.assertEqual(response.status, 500)
531        self.assertTrue(response.reason.startswith("Redirected more"))
532        self.assertEqual("302", response["status"])
533        self.assertTrue(content.startswith(b"<html>"))
534        self.assertTrue(response.previous != None)
535
536    def testGet302NoLocation(self):
537        # Test that we throw an exception when we get
538        # a 302 with no Location: header.
539        self.http.force_exception_to_status_code = False
540        uri = urllib.parse.urljoin(base, "302/no-location.asis")
541        try:
542            (response, content) = self.http.request(uri, "GET")
543            self.fail("Should never reach here")
544        except httplib2.RedirectMissingLocation:
545            pass
546        except Exception as e:
547            self.fail("Threw wrong kind of exception ")
548
549        # Re-run the test with out the exceptions
550        self.http.force_exception_to_status_code = True
551
552        (response, content) = self.http.request(uri, "GET")
553        self.assertEqual(response.status, 500)
554        self.assertTrue(response.reason.startswith("Redirected but"))
555        self.assertEqual("302", response["status"])
556        self.assertTrue(content.startswith(b"This is content"))
557
558    def testGet301ViaHttps(self):
559        # Google always redirects to http://google.com
560        (response, content) = self.http.request("https://code.google.com/apis/", "GET")
561        self.assertEqual(200, response.status)
562        self.assertEqual(301, response.previous.status)
563
564    def testGetViaHttps(self):
565        # Test that we can handle HTTPS
566        (response, content) = self.http.request("https://google.com/adsense/", "GET")
567        self.assertEqual(200, response.status)
568
569    def testGetViaHttpsSpecViolationOnLocation(self):
570        # Test that we follow redirects through HTTPS
571        # even if they violate the spec by including
572        # a relative Location: header instead of an
573        # absolute one.
574        (response, content) = self.http.request("https://google.com/adsense", "GET")
575        self.assertEqual(200, response.status)
576        self.assertNotEqual(None, response.previous)
577
578    def testGetViaHttpsKeyCert(self):
579        #  At this point I can only test
580        #  that the key and cert files are passed in
581        #  correctly to httplib. It would be nice to have
582        #  a real https endpoint to test against.
583        http = httplib2.Http(timeout=2)
584
585        http.add_certificate("akeyfile", "acertfile", "bitworking.org")
586        try:
587            (response, content) = http.request("https://bitworking.org", "GET")
588        except AttributeError:
589            self.assertEqual(
590                http.connections["https:bitworking.org"].key_file, "akeyfile"
591            )
592            self.assertEqual(
593                http.connections["https:bitworking.org"].cert_file, "acertfile"
594            )
595        except IOError:
596            # Skip on 3.2
597            pass
598
599        try:
600            (response, content) = http.request("https://notthere.bitworking.org", "GET")
601        except httplib2.ServerNotFoundError:
602            self.assertEqual(
603                http.connections["https:notthere.bitworking.org"].key_file, None
604            )
605            self.assertEqual(
606                http.connections["https:notthere.bitworking.org"].cert_file, None
607            )
608        except IOError:
609            # Skip on 3.2
610            pass
611
612    def testSslCertValidation(self):
613        # Test that we get an ssl.SSLError when specifying a non-existent CA
614        # certs file.
615        http = httplib2.Http(ca_certs="/nosuchfile")
616        self.assertRaises(IOError, http.request, "https://www.google.com/", "GET")
617
618        # Test that we get a SSLHandshakeError if we try to access
619        # https://www.google.com, using a CA cert file that doesn't contain
620        # the CA Google uses (i.e., simulating a cert that's not signed by a
621        # trusted CA).
622        other_ca_certs = os.path.join(
623            os.path.dirname(os.path.abspath(httplib2.__file__)),
624            "test",
625            "other_cacerts.txt",
626        )
627        http = httplib2.Http(ca_certs=other_ca_certs)
628        self.assertRaises(ssl.SSLError, http.request, "https://www.google.com/", "GET")
629
630    def testSniHostnameValidation(self):
631        self.http.request("https://google.com/", method="GET")
632
633    def testGet303(self):
634        # Do a follow-up GET on a Location: header
635        # returned from a POST that gave a 303.
636        uri = urllib.parse.urljoin(base, "303/303.cgi")
637        (response, content) = self.http.request(uri, "POST", " ")
638        self.assertEqual(response.status, 200)
639        self.assertEqual(content, b"This is the final destination.\n")
640        self.assertEqual(response.previous.status, 303)
641
642    def testGet303NoRedirect(self):
643        # Do a follow-up GET on a Location: header
644        # returned from a POST that gave a 303.
645        self.http.follow_redirects = False
646        uri = urllib.parse.urljoin(base, "303/303.cgi")
647        (response, content) = self.http.request(uri, "POST", " ")
648        self.assertEqual(response.status, 303)
649
650    def test303ForDifferentMethods(self):
651        # Test that all methods can be used
652        uri = urllib.parse.urljoin(base, "303/redirect-to-reflector.cgi")
653        for (method, method_on_303) in [
654            ("PUT", "GET"),
655            ("DELETE", "GET"),
656            ("POST", "GET"),
657            ("GET", "GET"),
658            ("HEAD", "GET"),
659        ]:
660            (response, content) = self.http.request(uri, method, body=b" ")
661            self.assertEqual(response["x-method"], method_on_303)
662
663    def testGet304(self):
664        # Test that we use ETags properly to validate our cache
665        uri = urllib.parse.urljoin(base, "304/test_etag.txt")
666        (response, content) = self.http.request(
667            uri, "GET", headers={"accept-encoding": "identity"}
668        )
669        self.assertNotEqual(response["etag"], "")
670
671        (response, content) = self.http.request(
672            uri, "GET", headers={"accept-encoding": "identity"}
673        )
674        (response, content) = self.http.request(
675            uri,
676            "GET",
677            headers={"accept-encoding": "identity", "cache-control": "must-revalidate"},
678        )
679        self.assertEqual(response.status, 200)
680        self.assertEqual(response.fromcache, True)
681
682        cache_file_name = os.path.join(
683            cacheDirName, httplib2.safename(httplib2.urlnorm(uri)[-1])
684        )
685        f = open(cache_file_name, "r")
686        status_line = f.readline()
687        f.close()
688
689        self.assertTrue(status_line.startswith("status:"))
690
691        (response, content) = self.http.request(
692            uri, "HEAD", headers={"accept-encoding": "identity"}
693        )
694        self.assertEqual(response.status, 200)
695        self.assertEqual(response.fromcache, True)
696
697        (response, content) = self.http.request(
698            uri, "GET", headers={"accept-encoding": "identity", "range": "bytes=0-0"}
699        )
700        self.assertEqual(response.status, 206)
701        self.assertEqual(response.fromcache, False)
702
703    def testGetIgnoreEtag(self):
704        # Test that we can forcibly ignore ETags
705        uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
706        (response, content) = self.http.request(
707            uri, "GET", headers={"accept-encoding": "identity"}
708        )
709        self.assertNotEqual(response["etag"], "")
710
711        (response, content) = self.http.request(
712            uri,
713            "GET",
714            headers={"accept-encoding": "identity", "cache-control": "max-age=0"},
715        )
716        d = self.reflector(content)
717        self.assertTrue("HTTP_IF_NONE_MATCH" in d)
718
719        self.http.ignore_etag = True
720        (response, content) = self.http.request(
721            uri,
722            "GET",
723            headers={"accept-encoding": "identity", "cache-control": "max-age=0"},
724        )
725        d = self.reflector(content)
726        self.assertEqual(response.fromcache, False)
727        self.assertFalse("HTTP_IF_NONE_MATCH" in d)
728
729    def testOverrideEtag(self):
730        # Test that we can forcibly ignore ETags
731        uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
732        (response, content) = self.http.request(
733            uri, "GET", headers={"accept-encoding": "identity"}
734        )
735        self.assertNotEqual(response["etag"], "")
736
737        (response, content) = self.http.request(
738            uri,
739            "GET",
740            headers={"accept-encoding": "identity", "cache-control": "max-age=0"},
741        )
742        d = self.reflector(content)
743        self.assertTrue("HTTP_IF_NONE_MATCH" in d)
744        self.assertNotEqual(d["HTTP_IF_NONE_MATCH"], "fred")
745
746        (response, content) = self.http.request(
747            uri,
748            "GET",
749            headers={
750                "accept-encoding": "identity",
751                "cache-control": "max-age=0",
752                "if-none-match": "fred",
753            },
754        )
755        d = self.reflector(content)
756        self.assertTrue("HTTP_IF_NONE_MATCH" in d)
757        self.assertEqual(d["HTTP_IF_NONE_MATCH"], "fred")
758
759    # MAP-commented this out because it consistently fails
760    #    def testGet304EndToEnd(self):
761    #       # Test that end to end headers get overwritten in the cache
762    #        uri = urllib.parse.urljoin(base, "304/end2end.cgi")
763    #        (response, content) = self.http.request(uri, "GET")
764    #        self.assertNotEqual(response['etag'], "")
765    #        old_date = response['date']
766    #        time.sleep(2)
767    #
768    #        (response, content) = self.http.request(uri, "GET", headers = {'Cache-Control': 'max-age=0'})
769    #        # The response should be from the cache, but the Date: header should be updated.
770    #        new_date = response['date']
771    #        self.assertNotEqual(new_date, old_date)
772    #        self.assertEqual(response.status, 200)
773    #        self.assertEqual(response.fromcache, True)
774
775    def testGet304LastModified(self):
776        # Test that we can still handle a 304
777        # by only using the last-modified cache validator.
778        uri = urllib.parse.urljoin(
779            base, "304/last-modified-only/last-modified-only.txt"
780        )
781        (response, content) = self.http.request(uri, "GET")
782
783        self.assertNotEqual(response["last-modified"], "")
784        (response, content) = self.http.request(uri, "GET")
785        (response, content) = self.http.request(uri, "GET")
786        self.assertEqual(response.status, 200)
787        self.assertEqual(response.fromcache, True)
788
789    def testGet307(self):
790        # Test that we do follow 307 redirects but
791        # do not cache the 307
792        uri = urllib.parse.urljoin(base, "307/onestep.asis")
793        (response, content) = self.http.request(uri, "GET")
794        self.assertEqual(response.status, 200)
795        self.assertEqual(content, b"This is the final destination.\n")
796        self.assertEqual(response.previous.status, 307)
797        self.assertEqual(response.previous.fromcache, False)
798
799        (response, content) = self.http.request(uri, "GET")
800        self.assertEqual(response.status, 200)
801        self.assertEqual(response.fromcache, True)
802        self.assertEqual(content, b"This is the final destination.\n")
803        self.assertEqual(response.previous.status, 307)
804        self.assertEqual(response.previous.fromcache, False)
805
806    def testGet410(self):
807        # Test that we pass 410's through
808        uri = urllib.parse.urljoin(base, "410/410.asis")
809        (response, content) = self.http.request(uri, "GET")
810        self.assertEqual(response.status, 410)
811
812    def testVaryHeaderSimple(self):
813        """RFC 2616 13.6 When the cache receives a subsequent request whose Request-URI specifies one or more cache entries including a Vary header field, the cache MUST NOT use such a cache entry to construct a response to the new request unless all of the selecting request-headers present in the new request match the corresponding stored request-headers in the original request.
814
815        """
816        # test that the vary header is sent
817        uri = urllib.parse.urljoin(base, "vary/accept.asis")
818        (response, content) = self.http.request(
819            uri, "GET", headers={"Accept": "text/plain"}
820        )
821        self.assertEqual(response.status, 200)
822        self.assertTrue("vary" in response)
823
824        # get the resource again, from the cache since accept header in this
825        # request is the same as the request
826        (response, content) = self.http.request(
827            uri, "GET", headers={"Accept": "text/plain"}
828        )
829        self.assertEqual(response.status, 200)
830        self.assertEqual(response.fromcache, True, msg="Should be from cache")
831
832        # get the resource again, not from cache since Accept headers does not match
833        (response, content) = self.http.request(
834            uri, "GET", headers={"Accept": "text/html"}
835        )
836        self.assertEqual(response.status, 200)
837        self.assertEqual(response.fromcache, False, msg="Should not be from cache")
838
839        # get the resource again, without any Accept header, so again no match
840        (response, content) = self.http.request(uri, "GET")
841        self.assertEqual(response.status, 200)
842        self.assertEqual(response.fromcache, False, msg="Should not be from cache")
843
844    def testNoVary(self):
845        pass
846        # when there is no vary, a different Accept header (e.g.) should not
847        # impact if the cache is used
848        # test that the vary header is not sent
849        # uri = urllib.parse.urljoin(base, "vary/no-vary.asis")
850        # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
851        # self.assertEqual(response.status, 200)
852        # self.assertFalse('vary' in response)
853        #
854        # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
855        # self.assertEqual(response.status, 200)
856        # self.assertEqual(response.fromcache, True, msg="Should be from cache")
857        #
858        # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'})
859        # self.assertEqual(response.status, 200)
860        # self.assertEqual(response.fromcache, True, msg="Should be from cache")
861
862    def testVaryHeaderDouble(self):
863        uri = urllib.parse.urljoin(base, "vary/accept-double.asis")
864        (response, content) = self.http.request(
865            uri,
866            "GET",
867            headers={
868                "Accept": "text/plain",
869                "Accept-Language": "da, en-gb;q=0.8, en;q=0.7",
870            },
871        )
872        self.assertEqual(response.status, 200)
873        self.assertTrue("vary" in response)
874
875        # we are from cache
876        (response, content) = self.http.request(
877            uri,
878            "GET",
879            headers={
880                "Accept": "text/plain",
881                "Accept-Language": "da, en-gb;q=0.8, en;q=0.7",
882            },
883        )
884        self.assertEqual(response.fromcache, True, msg="Should be from cache")
885
886        (response, content) = self.http.request(
887            uri, "GET", headers={"Accept": "text/plain"}
888        )
889        self.assertEqual(response.status, 200)
890        self.assertEqual(response.fromcache, False)
891
892        # get the resource again, not from cache, varied headers don't match exact
893        (response, content) = self.http.request(
894            uri, "GET", headers={"Accept-Language": "da"}
895        )
896        self.assertEqual(response.status, 200)
897        self.assertEqual(response.fromcache, False, msg="Should not be from cache")
898
899    def testVaryUnusedHeader(self):
900        # A header's value is not considered to vary if it's not used at all.
901        uri = urllib.parse.urljoin(base, "vary/unused-header.asis")
902        (response, content) = self.http.request(
903            uri, "GET", headers={"Accept": "text/plain"}
904        )
905        self.assertEqual(response.status, 200)
906        self.assertTrue("vary" in response)
907
908        # we are from cache
909        (response, content) = self.http.request(
910            uri, "GET", headers={"Accept": "text/plain"}
911        )
912        self.assertEqual(response.fromcache, True, msg="Should be from cache")
913
914    def testHeadGZip(self):
915        # Test that we don't try to decompress a HEAD response
916        uri = urllib.parse.urljoin(base, "gzip/final-destination.txt")
917        (response, content) = self.http.request(uri, "HEAD")
918        self.assertEqual(response.status, 200)
919        self.assertNotEqual(int(response["content-length"]), 0)
920        self.assertEqual(content, b"")
921
922    def testGetGZip(self):
923        # Test that we support gzip compression
924        uri = urllib.parse.urljoin(base, "gzip/final-destination.txt")
925        (response, content) = self.http.request(uri, "GET")
926        self.assertEqual(response.status, 200)
927        self.assertFalse("content-encoding" in response)
928        self.assertTrue("-content-encoding" in response)
929        self.assertEqual(
930            int(response["content-length"]), len(b"This is the final destination.\n")
931        )
932        self.assertEqual(content, b"This is the final destination.\n")
933
934    def testPostAndGZipResponse(self):
935        uri = urllib.parse.urljoin(base, "gzip/post.cgi")
936        (response, content) = self.http.request(uri, "POST", body=" ")
937        self.assertEqual(response.status, 200)
938        self.assertFalse("content-encoding" in response)
939        self.assertTrue("-content-encoding" in response)
940
941    def testGetGZipFailure(self):
942        # Test that we raise a good exception when the gzip fails
943        self.http.force_exception_to_status_code = False
944        uri = urllib.parse.urljoin(base, "gzip/failed-compression.asis")
945        try:
946            (response, content) = self.http.request(uri, "GET")
947            self.fail("Should never reach here")
948        except httplib2.FailedToDecompressContent:
949            pass
950        except Exception:
951            self.fail("Threw wrong kind of exception")
952
953        # Re-run the test with out the exceptions
954        self.http.force_exception_to_status_code = True
955
956        (response, content) = self.http.request(uri, "GET")
957        self.assertEqual(response.status, 500)
958        self.assertTrue(response.reason.startswith("Content purported"))
959
960    def testIndividualTimeout(self):
961        uri = urllib.parse.urljoin(base, "timeout/timeout.cgi")
962        http = httplib2.Http(timeout=1)
963        http.force_exception_to_status_code = True
964
965        (response, content) = http.request(uri)
966        self.assertEqual(response.status, 408)
967        self.assertTrue(response.reason.startswith("Request Timeout"))
968        self.assertTrue(content.startswith(b"Request Timeout"))
969
970    def testGetDeflate(self):
971        # Test that we support deflate compression
972        uri = urllib.parse.urljoin(base, "deflate/deflated.asis")
973        (response, content) = self.http.request(uri, "GET")
974        self.assertEqual(response.status, 200)
975        self.assertFalse("content-encoding" in response)
976        self.assertEqual(
977            int(response["content-length"]), len("This is the final destination.")
978        )
979        self.assertEqual(content, b"This is the final destination.")
980
981    def testGetDeflateFailure(self):
982        # Test that we raise a good exception when the deflate fails
983        self.http.force_exception_to_status_code = False
984
985        uri = urllib.parse.urljoin(base, "deflate/failed-compression.asis")
986        try:
987            (response, content) = self.http.request(uri, "GET")
988            self.fail("Should never reach here")
989        except httplib2.FailedToDecompressContent:
990            pass
991        except Exception:
992            self.fail("Threw wrong kind of exception")
993
994        # Re-run the test with out the exceptions
995        self.http.force_exception_to_status_code = True
996
997        (response, content) = self.http.request(uri, "GET")
998        self.assertEqual(response.status, 500)
999        self.assertTrue(response.reason.startswith("Content purported"))
1000
1001    def testGetDuplicateHeaders(self):
1002        # Test that duplicate headers get concatenated via ','
1003        uri = urllib.parse.urljoin(base, "duplicate-headers/multilink.asis")
1004        (response, content) = self.http.request(uri, "GET")
1005        self.assertEqual(response.status, 200)
1006        self.assertEqual(content, b"This is content\n")
1007        self.assertEqual(
1008            response["link"].split(",")[0],
1009            '<http://bitworking.org>; rel="home"; title="BitWorking"',
1010        )
1011
1012    def testGetCacheControlNoCache(self):
1013        # Test Cache-Control: no-cache on requests
1014        uri = urllib.parse.urljoin(base, "304/test_etag.txt")
1015        (response, content) = self.http.request(
1016            uri, "GET", headers={"accept-encoding": "identity"}
1017        )
1018        self.assertNotEqual(response["etag"], "")
1019        (response, content) = self.http.request(
1020            uri, "GET", headers={"accept-encoding": "identity"}
1021        )
1022        self.assertEqual(response.status, 200)
1023        self.assertEqual(response.fromcache, True)
1024
1025        (response, content) = self.http.request(
1026            uri,
1027            "GET",
1028            headers={"accept-encoding": "identity", "Cache-Control": "no-cache"},
1029        )
1030        self.assertEqual(response.status, 200)
1031        self.assertEqual(response.fromcache, False)
1032
1033    def testGetCacheControlPragmaNoCache(self):
1034        # Test Pragma: no-cache on requests
1035        uri = urllib.parse.urljoin(base, "304/test_etag.txt")
1036        (response, content) = self.http.request(
1037            uri, "GET", headers={"accept-encoding": "identity"}
1038        )
1039        self.assertNotEqual(response["etag"], "")
1040        (response, content) = self.http.request(
1041            uri, "GET", headers={"accept-encoding": "identity"}
1042        )
1043        self.assertEqual(response.status, 200)
1044        self.assertEqual(response.fromcache, True)
1045
1046        (response, content) = self.http.request(
1047            uri, "GET", headers={"accept-encoding": "identity", "Pragma": "no-cache"}
1048        )
1049        self.assertEqual(response.status, 200)
1050        self.assertEqual(response.fromcache, False)
1051
1052    def testGetCacheControlNoStoreRequest(self):
1053        # A no-store request means that the response should not be stored.
1054        uri = urllib.parse.urljoin(base, "304/test_etag.txt")
1055
1056        (response, content) = self.http.request(
1057            uri, "GET", headers={"Cache-Control": "no-store"}
1058        )
1059        self.assertEqual(response.status, 200)
1060        self.assertEqual(response.fromcache, False)
1061
1062        (response, content) = self.http.request(
1063            uri, "GET", headers={"Cache-Control": "no-store"}
1064        )
1065        self.assertEqual(response.status, 200)
1066        self.assertEqual(response.fromcache, False)
1067
1068    def testGetCacheControlNoStoreResponse(self):
1069        # A no-store response means that the response should not be stored.
1070        uri = urllib.parse.urljoin(base, "no-store/no-store.asis")
1071
1072        (response, content) = self.http.request(uri, "GET")
1073        self.assertEqual(response.status, 200)
1074        self.assertEqual(response.fromcache, False)
1075
1076        (response, content) = self.http.request(uri, "GET")
1077        self.assertEqual(response.status, 200)
1078        self.assertEqual(response.fromcache, False)
1079
1080    def testGetCacheControlNoCacheNoStoreRequest(self):
1081        # Test that a no-store, no-cache clears the entry from the cache
1082        # even if it was cached previously.
1083        uri = urllib.parse.urljoin(base, "304/test_etag.txt")
1084
1085        (response, content) = self.http.request(uri, "GET")
1086        (response, content) = self.http.request(uri, "GET")
1087        self.assertEqual(response.fromcache, True)
1088        (response, content) = self.http.request(
1089            uri, "GET", headers={"Cache-Control": "no-store, no-cache"}
1090        )
1091        (response, content) = self.http.request(
1092            uri, "GET", headers={"Cache-Control": "no-store, no-cache"}
1093        )
1094        self.assertEqual(response.status, 200)
1095        self.assertEqual(response.fromcache, False)
1096
1097    def testUpdateInvalidatesCache(self):
1098        # Test that calling PUT or DELETE on a
1099        # URI that is cache invalidates that cache.
1100        uri = urllib.parse.urljoin(base, "304/test_etag.txt")
1101
1102        (response, content) = self.http.request(uri, "GET")
1103        (response, content) = self.http.request(uri, "GET")
1104        self.assertEqual(response.fromcache, True)
1105        (response, content) = self.http.request(uri, "DELETE")
1106        self.assertEqual(response.status, 405)
1107
1108        (response, content) = self.http.request(uri, "GET")
1109        self.assertEqual(response.fromcache, False)
1110
1111    def testUpdateUsesCachedETag(self):
1112        # Test that we natively support http://www.w3.org/1999/04/Editing/
1113        uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
1114
1115        (response, content) = self.http.request(uri, "GET")
1116        self.assertEqual(response.status, 200)
1117        self.assertEqual(response.fromcache, False)
1118        (response, content) = self.http.request(uri, "GET")
1119        self.assertEqual(response.status, 200)
1120        self.assertEqual(response.fromcache, True)
1121        (response, content) = self.http.request(uri, "PUT", body="foo")
1122        self.assertEqual(response.status, 200)
1123        (response, content) = self.http.request(uri, "PUT", body="foo")
1124        self.assertEqual(response.status, 412)
1125
1126    def testUpdatePatchUsesCachedETag(self):
1127        # Test that we natively support http://www.w3.org/1999/04/Editing/
1128        uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
1129
1130        (response, content) = self.http.request(uri, "GET")
1131        self.assertEqual(response.status, 200)
1132        self.assertEqual(response.fromcache, False)
1133        (response, content) = self.http.request(uri, "GET")
1134        self.assertEqual(response.status, 200)
1135        self.assertEqual(response.fromcache, True)
1136        (response, content) = self.http.request(uri, "PATCH", body="foo")
1137        self.assertEqual(response.status, 200)
1138        (response, content) = self.http.request(uri, "PATCH", body="foo")
1139        self.assertEqual(response.status, 412)
1140
1141    def testUpdateUsesCachedETagAndOCMethod(self):
1142        # Test that we natively support http://www.w3.org/1999/04/Editing/
1143        uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
1144
1145        (response, content) = self.http.request(uri, "GET")
1146        self.assertEqual(response.status, 200)
1147        self.assertEqual(response.fromcache, False)
1148        (response, content) = self.http.request(uri, "GET")
1149        self.assertEqual(response.status, 200)
1150        self.assertEqual(response.fromcache, True)
1151        self.http.optimistic_concurrency_methods.append("DELETE")
1152        (response, content) = self.http.request(uri, "DELETE")
1153        self.assertEqual(response.status, 200)
1154
1155    def testUpdateUsesCachedETagOverridden(self):
1156        # Test that we natively support http://www.w3.org/1999/04/Editing/
1157        uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
1158
1159        (response, content) = self.http.request(uri, "GET")
1160        self.assertEqual(response.status, 200)
1161        self.assertEqual(response.fromcache, False)
1162        (response, content) = self.http.request(uri, "GET")
1163        self.assertEqual(response.status, 200)
1164        self.assertEqual(response.fromcache, True)
1165        (response, content) = self.http.request(
1166            uri, "PUT", body="foo", headers={"if-match": "fred"}
1167        )
1168        self.assertEqual(response.status, 412)
1169
1170    def testBasicAuth(self):
1171        # Test Basic Authentication
1172        uri = urllib.parse.urljoin(base, "basic/file.txt")
1173        (response, content) = self.http.request(uri, "GET")
1174        self.assertEqual(response.status, 401)
1175
1176        uri = urllib.parse.urljoin(base, "basic/")
1177        (response, content) = self.http.request(uri, "GET")
1178        self.assertEqual(response.status, 401)
1179
1180        self.http.add_credentials("joe", "password")
1181        (response, content) = self.http.request(uri, "GET")
1182        self.assertEqual(response.status, 200)
1183
1184        uri = urllib.parse.urljoin(base, "basic/file.txt")
1185        (response, content) = self.http.request(uri, "GET")
1186        self.assertEqual(response.status, 200)
1187
1188    def testBasicAuthWithDomain(self):
1189        # Test Basic Authentication
1190        uri = urllib.parse.urljoin(base, "basic/file.txt")
1191        (response, content) = self.http.request(uri, "GET")
1192        self.assertEqual(response.status, 401)
1193
1194        uri = urllib.parse.urljoin(base, "basic/")
1195        (response, content) = self.http.request(uri, "GET")
1196        self.assertEqual(response.status, 401)
1197
1198        self.http.add_credentials("joe", "password", "example.org")
1199        (response, content) = self.http.request(uri, "GET")
1200        self.assertEqual(response.status, 401)
1201
1202        uri = urllib.parse.urljoin(base, "basic/file.txt")
1203        (response, content) = self.http.request(uri, "GET")
1204        self.assertEqual(response.status, 401)
1205
1206        domain = urllib.parse.urlparse(base)[1]
1207        self.http.add_credentials("joe", "password", domain)
1208        (response, content) = self.http.request(uri, "GET")
1209        self.assertEqual(response.status, 200)
1210
1211        uri = urllib.parse.urljoin(base, "basic/file.txt")
1212        (response, content) = self.http.request(uri, "GET")
1213        self.assertEqual(response.status, 200)
1214
1215    def testBasicAuthTwoDifferentCredentials(self):
1216        # Test Basic Authentication with multiple sets of credentials
1217        uri = urllib.parse.urljoin(base, "basic2/file.txt")
1218        (response, content) = self.http.request(uri, "GET")
1219        self.assertEqual(response.status, 401)
1220
1221        uri = urllib.parse.urljoin(base, "basic2/")
1222        (response, content) = self.http.request(uri, "GET")
1223        self.assertEqual(response.status, 401)
1224
1225        self.http.add_credentials("fred", "barney")
1226        (response, content) = self.http.request(uri, "GET")
1227        self.assertEqual(response.status, 200)
1228
1229        uri = urllib.parse.urljoin(base, "basic2/file.txt")
1230        (response, content) = self.http.request(uri, "GET")
1231        self.assertEqual(response.status, 200)
1232
1233    def testBasicAuthNested(self):
1234        # Test Basic Authentication with resources
1235        # that are nested
1236        uri = urllib.parse.urljoin(base, "basic-nested/")
1237        (response, content) = self.http.request(uri, "GET")
1238        self.assertEqual(response.status, 401)
1239
1240        uri = urllib.parse.urljoin(base, "basic-nested/subdir")
1241        (response, content) = self.http.request(uri, "GET")
1242        self.assertEqual(response.status, 401)
1243
1244        # Now add in credentials one at a time and test.
1245        self.http.add_credentials("joe", "password")
1246
1247        uri = urllib.parse.urljoin(base, "basic-nested/")
1248        (response, content) = self.http.request(uri, "GET")
1249        self.assertEqual(response.status, 200)
1250
1251        uri = urllib.parse.urljoin(base, "basic-nested/subdir")
1252        (response, content) = self.http.request(uri, "GET")
1253        self.assertEqual(response.status, 401)
1254
1255        self.http.add_credentials("fred", "barney")
1256
1257        uri = urllib.parse.urljoin(base, "basic-nested/")
1258        (response, content) = self.http.request(uri, "GET")
1259        self.assertEqual(response.status, 200)
1260
1261        uri = urllib.parse.urljoin(base, "basic-nested/subdir")
1262        (response, content) = self.http.request(uri, "GET")
1263        self.assertEqual(response.status, 200)
1264
1265    def testDigestAuth(self):
1266        # Test that we support Digest Authentication
1267        uri = urllib.parse.urljoin(base, "digest/")
1268        (response, content) = self.http.request(uri, "GET")
1269        self.assertEqual(response.status, 401)
1270
1271        self.http.add_credentials("joe", "password")
1272        (response, content) = self.http.request(uri, "GET")
1273        self.assertEqual(response.status, 200)
1274
1275        uri = urllib.parse.urljoin(base, "digest/file.txt")
1276        (response, content) = self.http.request(uri, "GET")
1277
1278    def testDigestAuthNextNonceAndNC(self):
1279        # Test that if the server sets nextnonce that we reset
1280        # the nonce count back to 1
1281        uri = urllib.parse.urljoin(base, "digest/file.txt")
1282        self.http.add_credentials("joe", "password")
1283        (response, content) = self.http.request(
1284            uri, "GET", headers={"cache-control": "no-cache"}
1285        )
1286        info = httplib2._parse_www_authenticate(response, "authentication-info")
1287        self.assertEqual(response.status, 200)
1288        (response, content) = self.http.request(
1289            uri, "GET", headers={"cache-control": "no-cache"}
1290        )
1291        info2 = httplib2._parse_www_authenticate(response, "authentication-info")
1292        self.assertEqual(response.status, 200)
1293
1294        if "nextnonce" in info:
1295            self.assertEqual(info2["nc"], 1)
1296
1297    def testDigestAuthStale(self):
1298        # Test that we can handle a nonce becoming stale
1299        uri = urllib.parse.urljoin(base, "digest-expire/file.txt")
1300        self.http.add_credentials("joe", "password")
1301        (response, content) = self.http.request(
1302            uri, "GET", headers={"cache-control": "no-cache"}
1303        )
1304        info = httplib2._parse_www_authenticate(response, "authentication-info")
1305        self.assertEqual(response.status, 200)
1306
1307        time.sleep(3)
1308        # Sleep long enough that the nonce becomes stale
1309
1310        (response, content) = self.http.request(
1311            uri, "GET", headers={"cache-control": "no-cache"}
1312        )
1313        self.assertFalse(response.fromcache)
1314        self.assertTrue(response._stale_digest)
1315        info3 = httplib2._parse_www_authenticate(response, "authentication-info")
1316        self.assertEqual(response.status, 200)
1317
1318    def reflector(self, content):
1319        return dict(
1320            [
1321                tuple(x.split("=", 1))
1322                for x in content.decode("utf-8").strip().split("\n")
1323            ]
1324        )
1325
1326    def testReflector(self):
1327        uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
1328        (response, content) = self.http.request(uri, "GET")
1329        d = self.reflector(content)
1330        self.assertTrue("HTTP_USER_AGENT" in d)
1331
1332    def testConnectionClose(self):
1333        uri = "http://www.google.com/"
1334        (response, content) = self.http.request(uri, "GET")
1335        for c in self.http.connections.values():
1336            self.assertNotEqual(None, c.sock)
1337        (response, content) = self.http.request(
1338            uri, "GET", headers={"connection": "close"}
1339        )
1340        for c in self.http.connections.values():
1341            self.assertEqual(None, c.sock)
1342
1343    def testPickleHttp(self):
1344        pickled_http = pickle.dumps(self.http)
1345        new_http = pickle.loads(pickled_http)
1346
1347        self.assertEqual(
1348            sorted(new_http.__dict__.keys()), sorted(self.http.__dict__.keys())
1349        )
1350        for key in new_http.__dict__:
1351            if key in ("certificates", "credentials"):
1352                self.assertEqual(
1353                    new_http.__dict__[key].credentials,
1354                    self.http.__dict__[key].credentials,
1355                )
1356            elif key == "cache":
1357                self.assertEqual(
1358                    new_http.__dict__[key].cache, self.http.__dict__[key].cache
1359                )
1360            else:
1361                self.assertEqual(new_http.__dict__[key], self.http.__dict__[key])
1362
1363    def testPickleHttpWithConnection(self):
1364        self.http.request("http://bitworking.org", connection_type=_MyHTTPConnection)
1365        pickled_http = pickle.dumps(self.http)
1366        new_http = pickle.loads(pickled_http)
1367
1368        self.assertEqual(list(self.http.connections.keys()), ["http:bitworking.org"])
1369        self.assertEqual(new_http.connections, {})
1370
1371    def testPickleCustomRequestHttp(self):
1372        def dummy_request(*args, **kwargs):
1373            return new_request(*args, **kwargs)
1374
1375        dummy_request.dummy_attr = "dummy_value"
1376
1377        self.http.request = dummy_request
1378        pickled_http = pickle.dumps(self.http)
1379        self.assertFalse(b"S'request'" in pickled_http)
1380
1381
1382try:
1383    import memcache
1384
1385    class HttpTestMemCached(HttpTest):
1386        def setUp(self):
1387            self.cache = memcache.Client(["127.0.0.1:11211"], debug=0)
1388            # self.cache = memcache.Client(['10.0.0.4:11211'], debug=1)
1389            self.http = httplib2.Http(self.cache)
1390            self.cache.flush_all()
1391            # Not exactly sure why the sleep is needed here, but
1392            # if not present then some unit tests that rely on caching
1393            # fail. Memcached seems to lose some sets immediately
1394            # after a flush_all if the set is to a value that
1395            # was previously cached. (Maybe the flush is handled async?)
1396            time.sleep(1)
1397            self.http.clear_credentials()
1398
1399
1400except:
1401    pass
1402
1403# ------------------------------------------------------------------------
1404
1405
1406class HttpPrivateTest(unittest.TestCase):
1407    def testParseCacheControl(self):
1408        # Test that we can parse the Cache-Control header
1409        self.assertEqual({}, httplib2._parse_cache_control({}))
1410        self.assertEqual(
1411            {"no-cache": 1},
1412            httplib2._parse_cache_control({"cache-control": " no-cache"}),
1413        )
1414        cc = httplib2._parse_cache_control(
1415            {"cache-control": " no-cache, max-age = 7200"}
1416        )
1417        self.assertEqual(cc["no-cache"], 1)
1418        self.assertEqual(cc["max-age"], "7200")
1419        cc = httplib2._parse_cache_control({"cache-control": " , "})
1420        self.assertEqual(cc[""], 1)
1421
1422        try:
1423            cc = httplib2._parse_cache_control(
1424                {"cache-control": "Max-age=3600;post-check=1800,pre-check=3600"}
1425            )
1426            self.assertTrue("max-age" in cc)
1427        except:
1428            self.fail("Should not throw exception")
1429
1430    def testNormalizeHeaders(self):
1431        # Test that we normalize headers to lowercase
1432        h = httplib2._normalize_headers({"Cache-Control": "no-cache", "Other": "Stuff"})
1433        self.assertTrue("cache-control" in h)
1434        self.assertTrue("other" in h)
1435        self.assertEqual("Stuff", h["other"])
1436
1437    def testConvertByteStr(self):
1438        with self.assertRaises(TypeError):
1439            httplib2._convert_byte_str(4)
1440        self.assertEqual("Hello World", httplib2._convert_byte_str(b"Hello World"))
1441        self.assertEqual("Bye World", httplib2._convert_byte_str("Bye World"))
1442
1443    def testExpirationModelTransparent(self):
1444        # Test that no-cache makes our request TRANSPARENT
1445        response_headers = {"cache-control": "max-age=7200"}
1446        request_headers = {"cache-control": "no-cache"}
1447        self.assertEqual(
1448            "TRANSPARENT",
1449            httplib2._entry_disposition(response_headers, request_headers),
1450        )
1451
1452    def testMaxAgeNonNumeric(self):
1453        # Test that no-cache makes our request TRANSPARENT
1454        response_headers = {"cache-control": "max-age=fred, min-fresh=barney"}
1455        request_headers = {}
1456        self.assertEqual(
1457            "STALE", httplib2._entry_disposition(response_headers, request_headers)
1458        )
1459
1460    def testExpirationModelNoCacheResponse(self):
1461        # The date and expires point to an entry that should be
1462        # FRESH, but the no-cache over-rides that.
1463        now = time.time()
1464        response_headers = {
1465            "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1466            "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 4)),
1467            "cache-control": "no-cache",
1468        }
1469        request_headers = {}
1470        self.assertEqual(
1471            "STALE", httplib2._entry_disposition(response_headers, request_headers)
1472        )
1473
1474    def testExpirationModelStaleRequestMustReval(self):
1475        # must-revalidate forces STALE
1476        self.assertEqual(
1477            "STALE",
1478            httplib2._entry_disposition({}, {"cache-control": "must-revalidate"}),
1479        )
1480
1481    def testExpirationModelStaleResponseMustReval(self):
1482        # must-revalidate forces STALE
1483        self.assertEqual(
1484            "STALE",
1485            httplib2._entry_disposition({"cache-control": "must-revalidate"}, {}),
1486        )
1487
1488    def testExpirationModelFresh(self):
1489        response_headers = {
1490            "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
1491            "cache-control": "max-age=2",
1492        }
1493        request_headers = {}
1494        self.assertEqual(
1495            "FRESH", httplib2._entry_disposition(response_headers, request_headers)
1496        )
1497        time.sleep(3)
1498        self.assertEqual(
1499            "STALE", httplib2._entry_disposition(response_headers, request_headers)
1500        )
1501
1502    def testExpirationMaxAge0(self):
1503        response_headers = {
1504            "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
1505            "cache-control": "max-age=0",
1506        }
1507        request_headers = {}
1508        self.assertEqual(
1509            "STALE", httplib2._entry_disposition(response_headers, request_headers)
1510        )
1511
1512    def testExpirationModelDateAndExpires(self):
1513        now = time.time()
1514        response_headers = {
1515            "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1516            "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 2)),
1517        }
1518        request_headers = {}
1519        self.assertEqual(
1520            "FRESH", httplib2._entry_disposition(response_headers, request_headers)
1521        )
1522        time.sleep(3)
1523        self.assertEqual(
1524            "STALE", httplib2._entry_disposition(response_headers, request_headers)
1525        )
1526
1527    def testExpiresZero(self):
1528        now = time.time()
1529        response_headers = {
1530            "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1531            "expires": "0",
1532        }
1533        request_headers = {}
1534        self.assertEqual(
1535            "STALE", httplib2._entry_disposition(response_headers, request_headers)
1536        )
1537
1538    def testExpirationModelDateOnly(self):
1539        now = time.time()
1540        response_headers = {
1541            "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 3))
1542        }
1543        request_headers = {}
1544        self.assertEqual(
1545            "STALE", httplib2._entry_disposition(response_headers, request_headers)
1546        )
1547
1548    def testExpirationModelOnlyIfCached(self):
1549        response_headers = {}
1550        request_headers = {"cache-control": "only-if-cached"}
1551        self.assertEqual(
1552            "FRESH", httplib2._entry_disposition(response_headers, request_headers)
1553        )
1554
1555    def testExpirationModelMaxAgeBoth(self):
1556        now = time.time()
1557        response_headers = {
1558            "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1559            "cache-control": "max-age=2",
1560        }
1561        request_headers = {"cache-control": "max-age=0"}
1562        self.assertEqual(
1563            "STALE", httplib2._entry_disposition(response_headers, request_headers)
1564        )
1565
1566    def testExpirationModelDateAndExpiresMinFresh1(self):
1567        now = time.time()
1568        response_headers = {
1569            "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1570            "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 2)),
1571        }
1572        request_headers = {"cache-control": "min-fresh=2"}
1573        self.assertEqual(
1574            "STALE", httplib2._entry_disposition(response_headers, request_headers)
1575        )
1576
1577    def testExpirationModelDateAndExpiresMinFresh2(self):
1578        now = time.time()
1579        response_headers = {
1580            "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1581            "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 4)),
1582        }
1583        request_headers = {"cache-control": "min-fresh=2"}
1584        self.assertEqual(
1585            "FRESH", httplib2._entry_disposition(response_headers, request_headers)
1586        )
1587
1588    def testParseWWWAuthenticateEmpty(self):
1589        res = httplib2._parse_www_authenticate({})
1590        self.assertEqual(len(list(res.keys())), 0)
1591
1592    def testParseWWWAuthenticate(self):
1593        # different uses of spaces around commas
1594        res = httplib2._parse_www_authenticate(
1595            {
1596                "www-authenticate": 'Test realm="test realm" , foo=foo ,bar="bar", baz=baz,qux=qux'
1597            }
1598        )
1599        self.assertEqual(len(list(res.keys())), 1)
1600        self.assertEqual(len(list(res["test"].keys())), 5)
1601
1602        # tokens with non-alphanum
1603        res = httplib2._parse_www_authenticate(
1604            {"www-authenticate": 'T*!%#st realm=to*!%#en, to*!%#en="quoted string"'}
1605        )
1606        self.assertEqual(len(list(res.keys())), 1)
1607        self.assertEqual(len(list(res["t*!%#st"].keys())), 2)
1608
1609        # quoted string with quoted pairs
1610        res = httplib2._parse_www_authenticate(
1611            {"www-authenticate": 'Test realm="a \\"test\\" realm"'}
1612        )
1613        self.assertEqual(len(list(res.keys())), 1)
1614        self.assertEqual(res["test"]["realm"], 'a "test" realm')
1615
1616    def testParseWWWAuthenticateStrict(self):
1617        httplib2.USE_WWW_AUTH_STRICT_PARSING = 1
1618        self.testParseWWWAuthenticate()
1619        httplib2.USE_WWW_AUTH_STRICT_PARSING = 0
1620
1621    def testParseWWWAuthenticateBasic(self):
1622        res = httplib2._parse_www_authenticate({"www-authenticate": 'Basic realm="me"'})
1623        basic = res["basic"]
1624        self.assertEqual("me", basic["realm"])
1625
1626        res = httplib2._parse_www_authenticate(
1627            {"www-authenticate": 'Basic realm="me", algorithm="MD5"'}
1628        )
1629        basic = res["basic"]
1630        self.assertEqual("me", basic["realm"])
1631        self.assertEqual("MD5", basic["algorithm"])
1632
1633        res = httplib2._parse_www_authenticate(
1634            {"www-authenticate": 'Basic realm="me", algorithm=MD5'}
1635        )
1636        basic = res["basic"]
1637        self.assertEqual("me", basic["realm"])
1638        self.assertEqual("MD5", basic["algorithm"])
1639
1640    def testParseWWWAuthenticateBasic2(self):
1641        res = httplib2._parse_www_authenticate(
1642            {"www-authenticate": 'Basic realm="me",other="fred" '}
1643        )
1644        basic = res["basic"]
1645        self.assertEqual("me", basic["realm"])
1646        self.assertEqual("fred", basic["other"])
1647
1648    def testParseWWWAuthenticateBasic3(self):
1649        res = httplib2._parse_www_authenticate(
1650            {"www-authenticate": 'Basic REAlm="me" '}
1651        )
1652        basic = res["basic"]
1653        self.assertEqual("me", basic["realm"])
1654
1655    def testParseWWWAuthenticateDigest(self):
1656        res = httplib2._parse_www_authenticate(
1657            {
1658                "www-authenticate": 'Digest realm="[email protected]", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41"'
1659            }
1660        )
1661        digest = res["digest"]
1662        self.assertEqual("[email protected]", digest["realm"])
1663        self.assertEqual("auth,auth-int", digest["qop"])
1664
1665    def testParseWWWAuthenticateMultiple(self):
1666        res = httplib2._parse_www_authenticate(
1667            {
1668                "www-authenticate": 'Digest realm="[email protected]", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41" Basic REAlm="me" '
1669            }
1670        )
1671        digest = res["digest"]
1672        self.assertEqual("[email protected]", digest["realm"])
1673        self.assertEqual("auth,auth-int", digest["qop"])
1674        self.assertEqual("dcd98b7102dd2f0e8b11d0f600bfb0c093", digest["nonce"])
1675        self.assertEqual("5ccc069c403ebaf9f0171e9517f40e41", digest["opaque"])
1676        basic = res["basic"]
1677        self.assertEqual("me", basic["realm"])
1678
1679    def testParseWWWAuthenticateMultiple2(self):
1680        # Handle an added comma between challenges, which might get thrown in if the challenges were
1681        # originally sent in separate www-authenticate headers.
1682        res = httplib2._parse_www_authenticate(
1683            {
1684                "www-authenticate": 'Digest realm="[email protected]", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me" '
1685            }
1686        )
1687        digest = res["digest"]
1688        self.assertEqual("[email protected]", digest["realm"])
1689        self.assertEqual("auth,auth-int", digest["qop"])
1690        self.assertEqual("dcd98b7102dd2f0e8b11d0f600bfb0c093", digest["nonce"])
1691        self.assertEqual("5ccc069c403ebaf9f0171e9517f40e41", digest["opaque"])
1692        basic = res["basic"]
1693        self.assertEqual("me", basic["realm"])
1694
1695    def testParseWWWAuthenticateMultiple3(self):
1696        # Handle an added comma between challenges, which might get thrown in if the challenges were
1697        # originally sent in separate www-authenticate headers.
1698        res = httplib2._parse_www_authenticate(
1699            {
1700                "www-authenticate": 'Digest realm="[email protected]", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'
1701            }
1702        )
1703        digest = res["digest"]
1704        self.assertEqual("[email protected]", digest["realm"])
1705        self.assertEqual("auth,auth-int", digest["qop"])
1706        self.assertEqual("dcd98b7102dd2f0e8b11d0f600bfb0c093", digest["nonce"])
1707        self.assertEqual("5ccc069c403ebaf9f0171e9517f40e41", digest["opaque"])
1708        basic = res["basic"]
1709        self.assertEqual("me", basic["realm"])
1710        wsse = res["wsse"]
1711        self.assertEqual("foo", wsse["realm"])
1712        self.assertEqual("UsernameToken", wsse["profile"])
1713
1714    def testParseWWWAuthenticateMultiple4(self):
1715        res = httplib2._parse_www_authenticate(
1716            {
1717                "www-authenticate": 'Digest realm="[email protected]", qop \t=\t"\tauth,auth-int", nonce="(*)&^&$%#",opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'
1718            }
1719        )
1720        digest = res["digest"]
1721        self.assertEqual("[email protected]", digest["realm"])
1722        self.assertEqual("\tauth,auth-int", digest["qop"])
1723        self.assertEqual("(*)&^&$%#", digest["nonce"])
1724
1725    def testParseWWWAuthenticateMoreQuoteCombos(self):
1726        res = httplib2._parse_www_authenticate(
1727            {
1728                "www-authenticate": 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
1729            }
1730        )
1731        digest = res["digest"]
1732        self.assertEqual("myrealm", digest["realm"])
1733
1734    def testParseWWWAuthenticateMalformed(self):
1735        try:
1736            res = httplib2._parse_www_authenticate(
1737                {
1738                    "www-authenticate": 'OAuth "Facebook Platform" "invalid_token" "Invalid OAuth access token."'
1739                }
1740            )
1741            self.fail("should raise an exception")
1742        except httplib2.MalformedHeader:
1743            pass
1744
1745    def testDigestObject(self):
1746        credentials = ("joe", "password")
1747        host = None
1748        request_uri = "/projects/httplib2/test/digest/"
1749        headers = {}
1750        response = {
1751            "www-authenticate": 'Digest realm="myrealm", '
1752            'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", '
1753            'algorithm=MD5, qop="auth"'
1754        }
1755        content = b""
1756
1757        d = httplib2.DigestAuthentication(
1758            credentials, host, request_uri, headers, response, content, None
1759        )
1760        d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46")
1761        our_request = "authorization: %s" % headers["authorization"]
1762        working_request = (
1763            'authorization: Digest username="joe", realm="myrealm", '
1764            'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",'
1765            ' uri="/projects/httplib2/test/digest/", algorithm=MD5, '
1766            'response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, '
1767            'nc=00000001, cnonce="33033375ec278a46"'
1768        )
1769        self.assertEqual(our_request, working_request)
1770
1771    def testDigestObjectWithOpaque(self):
1772        credentials = ("joe", "password")
1773        host = None
1774        request_uri = "/projects/httplib2/test/digest/"
1775        headers = {}
1776        response = {
1777            "www-authenticate": 'Digest realm="myrealm", '
1778            'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", '
1779            'algorithm=MD5, qop="auth", opaque="atestopaque"'
1780        }
1781        content = ""
1782
1783        d = httplib2.DigestAuthentication(
1784            credentials, host, request_uri, headers, response, content, None
1785        )
1786        d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46")
1787        our_request = "authorization: %s" % headers["authorization"]
1788        working_request = (
1789            'authorization: Digest username="joe", realm="myrealm", '
1790            'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",'
1791            ' uri="/projects/httplib2/test/digest/", algorithm=MD5, '
1792            'response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, '
1793            'nc=00000001, cnonce="33033375ec278a46", '
1794            'opaque="atestopaque"'
1795        )
1796        self.assertEqual(our_request, working_request)
1797
1798    def testDigestObjectStale(self):
1799        credentials = ("joe", "password")
1800        host = None
1801        request_uri = "/projects/httplib2/test/digest/"
1802        headers = {}
1803        response = httplib2.Response({})
1804        response["www-authenticate"] = (
1805            'Digest realm="myrealm", '
1806            'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",'
1807            ' algorithm=MD5, qop="auth", stale=true'
1808        )
1809        response.status = 401
1810        content = b""
1811        d = httplib2.DigestAuthentication(
1812            credentials, host, request_uri, headers, response, content, None
1813        )
1814        # Returns true to force a retry
1815        self.assertTrue(d.response(response, content))
1816
1817    def testDigestObjectAuthInfo(self):
1818        credentials = ("joe", "password")
1819        host = None
1820        request_uri = "/projects/httplib2/test/digest/"
1821        headers = {}
1822        response = httplib2.Response({})
1823        response["www-authenticate"] = (
1824            'Digest realm="myrealm", '
1825            'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",'
1826            ' algorithm=MD5, qop="auth", stale=true'
1827        )
1828        response["authentication-info"] = 'nextnonce="fred"'
1829        content = b""
1830        d = httplib2.DigestAuthentication(
1831            credentials, host, request_uri, headers, response, content, None
1832        )
1833        # Returns true to force a retry
1834        self.assertFalse(d.response(response, content))
1835        self.assertEqual("fred", d.challenge["nonce"])
1836        self.assertEqual(1, d.challenge["nc"])
1837
1838    def testWsseAlgorithm(self):
1839        digest = httplib2._wsse_username_token(
1840            "d36e316282959a9ed4c89851497a717f", "2003-12-15T14:43:07Z", "taadtaadpstcsm"
1841        )
1842        expected = b"quR/EWLAV4xLf9Zqyw4pDmfV9OY="
1843        self.assertEqual(expected, digest)
1844
1845    def testEnd2End(self):
1846        # one end to end header
1847        response = {"content-type": "application/atom+xml", "te": "deflate"}
1848        end2end = httplib2._get_end2end_headers(response)
1849        self.assertTrue("content-type" in end2end)
1850        self.assertTrue("te" not in end2end)
1851        self.assertTrue("connection" not in end2end)
1852
1853        # one end to end header that gets eliminated
1854        response = {
1855            "connection": "content-type",
1856            "content-type": "application/atom+xml",
1857            "te": "deflate",
1858        }
1859        end2end = httplib2._get_end2end_headers(response)
1860        self.assertTrue("content-type" not in end2end)
1861        self.assertTrue("te" not in end2end)
1862        self.assertTrue("connection" not in end2end)
1863
1864        # Degenerate case of no headers
1865        response = {}
1866        end2end = httplib2._get_end2end_headers(response)
1867        self.assertEqual(0, len(end2end))
1868
1869        # Degenerate case of connection referrring to a header not passed in
1870        response = {"connection": "content-type"}
1871        end2end = httplib2._get_end2end_headers(response)
1872        self.assertEqual(0, len(end2end))
1873
1874
1875class TestProxyInfo(unittest.TestCase):
1876    def setUp(self):
1877        self.orig_env = dict(os.environ)
1878
1879    def tearDown(self):
1880        os.environ.clear()
1881        os.environ.update(self.orig_env)
1882
1883    def test_from_url(self):
1884        pi = httplib2.proxy_info_from_url("http://myproxy.example.com")
1885        self.assertEqual(pi.proxy_host, "myproxy.example.com")
1886        self.assertEqual(pi.proxy_port, 80)
1887        self.assertEqual(pi.proxy_user, None)
1888
1889    def test_from_url_ident(self):
1890        pi = httplib2.proxy_info_from_url("http://zoidberg:fish@someproxy:99")
1891        self.assertEqual(pi.proxy_host, "someproxy")
1892        self.assertEqual(pi.proxy_port, 99)
1893        self.assertEqual(pi.proxy_user, "zoidberg")
1894        self.assertEqual(pi.proxy_pass, "fish")
1895
1896    def test_from_env(self):
1897        os.environ["http_proxy"] = "http://myproxy.example.com:8080"
1898        pi = httplib2.proxy_info_from_environment()
1899        self.assertEqual(pi.proxy_host, "myproxy.example.com")
1900        self.assertEqual(pi.proxy_port, 8080)
1901
1902    def test_from_env_no_proxy(self):
1903        os.environ["http_proxy"] = "http://myproxy.example.com:80"
1904        os.environ["https_proxy"] = "http://myproxy.example.com:81"
1905        pi = httplib2.proxy_info_from_environment("https")
1906        self.assertEqual(pi.proxy_host, "myproxy.example.com")
1907        self.assertEqual(pi.proxy_port, 81)
1908
1909    def test_from_env_none(self):
1910        os.environ.clear()
1911        pi = httplib2.proxy_info_from_environment()
1912        self.assertEqual(pi, None)
1913
1914    def test_proxy_headers(self):
1915        headers = {"key0": "val0", "key1": "val1"}
1916        pi = httplib2.ProxyInfo(
1917            httplib2.socks.PROXY_TYPE_HTTP, "localhost", 1234, proxy_headers=headers
1918        )
1919        self.assertEqual(pi.proxy_headers, headers)
1920
1921    # regression: ensure that httplib2.HTTPConnectionWithTimeout initializes when proxy_info is not supplied
1922    def test_proxy_init(self):
1923        connection = httplib2.HTTPConnectionWithTimeout("www.google.com", 80)
1924        connection.request("GET", "/")
1925        connection.close()
1926
1927
1928if __name__ == "__main__":
1929    unittest.main()
1930