1import errno 2import unittest 3from test import support 4from test.support import os_helper 5from test.support import socket_helper 6from test.support import ResourceDenied 7from test.test_urllib2 import sanepathname2url 8 9import os 10import socket 11import urllib.error 12import urllib.request 13import sys 14 15support.requires("network") 16 17 18def _retry_thrice(func, exc, *args, **kwargs): 19 for i in range(3): 20 try: 21 return func(*args, **kwargs) 22 except exc as e: 23 last_exc = e 24 continue 25 raise last_exc 26 27def _wrap_with_retry_thrice(func, exc): 28 def wrapped(*args, **kwargs): 29 return _retry_thrice(func, exc, *args, **kwargs) 30 return wrapped 31 32# Connecting to remote hosts is flaky. Make it more robust by retrying 33# the connection several times. 34_urlopen_with_retry = _wrap_with_retry_thrice(urllib.request.urlopen, 35 urllib.error.URLError) 36 37 38class TransientResource(object): 39 40 """Raise ResourceDenied if an exception is raised while the context manager 41 is in effect that matches the specified exception and attributes.""" 42 43 def __init__(self, exc, **kwargs): 44 self.exc = exc 45 self.attrs = kwargs 46 47 def __enter__(self): 48 return self 49 50 def __exit__(self, type_=None, value=None, traceback=None): 51 """If type_ is a subclass of self.exc and value has attributes matching 52 self.attrs, raise ResourceDenied. Otherwise let the exception 53 propagate (if any).""" 54 if type_ is not None and issubclass(self.exc, type_): 55 for attr, attr_value in self.attrs.items(): 56 if not hasattr(value, attr): 57 break 58 if getattr(value, attr) != attr_value: 59 break 60 else: 61 raise ResourceDenied("an optional resource is not available") 62 63# Context managers that raise ResourceDenied when various issues 64# with the internet connection manifest themselves as exceptions. 65# XXX deprecate these and use transient_internet() instead 66time_out = TransientResource(OSError, errno=errno.ETIMEDOUT) 67socket_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET) 68ioerror_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET) 69 70 71class AuthTests(unittest.TestCase): 72 """Tests urllib2 authentication features.""" 73 74## Disabled at the moment since there is no page under python.org which 75## could be used to HTTP authentication. 76# 77# def test_basic_auth(self): 78# import http.client 79# 80# test_url = "http://www.python.org/test/test_urllib2/basic_auth" 81# test_hostport = "www.python.org" 82# test_realm = 'Test Realm' 83# test_user = 'test.test_urllib2net' 84# test_password = 'blah' 85# 86# # failure 87# try: 88# _urlopen_with_retry(test_url) 89# except urllib2.HTTPError, exc: 90# self.assertEqual(exc.code, 401) 91# else: 92# self.fail("urlopen() should have failed with 401") 93# 94# # success 95# auth_handler = urllib2.HTTPBasicAuthHandler() 96# auth_handler.add_password(test_realm, test_hostport, 97# test_user, test_password) 98# opener = urllib2.build_opener(auth_handler) 99# f = opener.open('http://localhost/') 100# response = _urlopen_with_retry("http://www.python.org/") 101# 102# # The 'userinfo' URL component is deprecated by RFC 3986 for security 103# # reasons, let's not implement it! (it's already implemented for proxy 104# # specification strings (that is, URLs or authorities specifying a 105# # proxy), so we must keep that) 106# self.assertRaises(http.client.InvalidURL, 107# urllib2.urlopen, "http://evil:[email protected]") 108 109 110class CloseSocketTest(unittest.TestCase): 111 112 def test_close(self): 113 # clear _opener global variable 114 self.addCleanup(urllib.request.urlcleanup) 115 116 # calling .close() on urllib2's response objects should close the 117 # underlying socket 118 url = support.TEST_HTTP_URL 119 with socket_helper.transient_internet(url): 120 response = _urlopen_with_retry(url) 121 sock = response.fp 122 self.assertFalse(sock.closed) 123 response.close() 124 self.assertTrue(sock.closed) 125 126class OtherNetworkTests(unittest.TestCase): 127 def setUp(self): 128 if 0: # for debugging 129 import logging 130 logger = logging.getLogger("test_urllib2net") 131 logger.addHandler(logging.StreamHandler()) 132 133 # XXX The rest of these tests aren't very good -- they don't check much. 134 # They do sometimes catch some major disasters, though. 135 136 def test_ftp(self): 137 # Testing the same URL twice exercises the caching in CacheFTPHandler 138 urls = [ 139 'ftp://www.pythontest.net/README', 140 'ftp://www.pythontest.net/README', 141 ('ftp://www.pythontest.net/non-existent-file', 142 None, urllib.error.URLError), 143 ] 144 self._test_urls(urls, self._extra_handlers()) 145 146 def test_file(self): 147 TESTFN = os_helper.TESTFN 148 f = open(TESTFN, 'w') 149 try: 150 f.write('hi there\n') 151 f.close() 152 urls = [ 153 'file:' + sanepathname2url(os.path.abspath(TESTFN)), 154 ('file:///nonsensename/etc/passwd', None, 155 urllib.error.URLError), 156 ] 157 self._test_urls(urls, self._extra_handlers(), retry=True) 158 finally: 159 os.remove(TESTFN) 160 161 self.assertRaises(ValueError, urllib.request.urlopen,'./relative_path/to/file') 162 163 # XXX Following test depends on machine configurations that are internal 164 # to CNRI. Need to set up a public server with the right authentication 165 # configuration for test purposes. 166 167## def test_cnri(self): 168## if socket.gethostname() == 'bitdiddle': 169## localhost = 'bitdiddle.cnri.reston.va.us' 170## elif socket.gethostname() == 'bitdiddle.concentric.net': 171## localhost = 'localhost' 172## else: 173## localhost = None 174## if localhost is not None: 175## urls = [ 176## 'file://%s/etc/passwd' % localhost, 177## 'http://%s/simple/' % localhost, 178## 'http://%s/digest/' % localhost, 179## 'http://%s/not/found.h' % localhost, 180## ] 181 182## bauth = HTTPBasicAuthHandler() 183## bauth.add_password('basic_test_realm', localhost, 'jhylton', 184## 'password') 185## dauth = HTTPDigestAuthHandler() 186## dauth.add_password('digest_test_realm', localhost, 'jhylton', 187## 'password') 188 189## self._test_urls(urls, self._extra_handlers()+[bauth, dauth]) 190 191 def test_urlwithfrag(self): 192 urlwith_frag = "http://www.pythontest.net/index.html#frag" 193 with socket_helper.transient_internet(urlwith_frag): 194 req = urllib.request.Request(urlwith_frag) 195 res = urllib.request.urlopen(req) 196 self.assertEqual(res.geturl(), 197 "http://www.pythontest.net/index.html#frag") 198 199 def test_redirect_url_withfrag(self): 200 redirect_url_with_frag = "http://www.pythontest.net/redir/with_frag/" 201 with socket_helper.transient_internet(redirect_url_with_frag): 202 req = urllib.request.Request(redirect_url_with_frag) 203 res = urllib.request.urlopen(req) 204 self.assertEqual(res.geturl(), 205 "http://www.pythontest.net/elsewhere/#frag") 206 207 def test_custom_headers(self): 208 url = support.TEST_HTTP_URL 209 with socket_helper.transient_internet(url): 210 opener = urllib.request.build_opener() 211 request = urllib.request.Request(url) 212 self.assertFalse(request.header_items()) 213 opener.open(request) 214 self.assertTrue(request.header_items()) 215 self.assertTrue(request.has_header('User-agent')) 216 request.add_header('User-Agent','Test-Agent') 217 opener.open(request) 218 self.assertEqual(request.get_header('User-agent'),'Test-Agent') 219 220 @unittest.skip('XXX: http://www.imdb.com is gone') 221 def test_sites_no_connection_close(self): 222 # Some sites do not send Connection: close header. 223 # Verify that those work properly. (#issue12576) 224 225 URL = 'http://www.imdb.com' # mangles Connection:close 226 227 with socket_helper.transient_internet(URL): 228 try: 229 with urllib.request.urlopen(URL) as res: 230 pass 231 except ValueError: 232 self.fail("urlopen failed for site not sending \ 233 Connection:close") 234 else: 235 self.assertTrue(res) 236 237 req = urllib.request.urlopen(URL) 238 res = req.read() 239 self.assertTrue(res) 240 241 def _test_urls(self, urls, handlers, retry=True): 242 import time 243 import logging 244 debug = logging.getLogger("test_urllib2").debug 245 246 urlopen = urllib.request.build_opener(*handlers).open 247 if retry: 248 urlopen = _wrap_with_retry_thrice(urlopen, urllib.error.URLError) 249 250 for url in urls: 251 with self.subTest(url=url): 252 if isinstance(url, tuple): 253 url, req, expected_err = url 254 else: 255 req = expected_err = None 256 257 with socket_helper.transient_internet(url): 258 try: 259 f = urlopen(url, req, support.INTERNET_TIMEOUT) 260 # urllib.error.URLError is a subclass of OSError 261 except OSError as err: 262 if expected_err: 263 msg = ("Didn't get expected error(s) %s for %s %s, got %s: %s" % 264 (expected_err, url, req, type(err), err)) 265 self.assertIsInstance(err, expected_err, msg) 266 else: 267 raise 268 else: 269 try: 270 with time_out, \ 271 socket_peer_reset, \ 272 ioerror_peer_reset: 273 buf = f.read() 274 debug("read %d bytes" % len(buf)) 275 except TimeoutError: 276 print("<timeout: %s>" % url, file=sys.stderr) 277 f.close() 278 time.sleep(0.1) 279 280 def _extra_handlers(self): 281 handlers = [] 282 283 cfh = urllib.request.CacheFTPHandler() 284 self.addCleanup(cfh.clear_cache) 285 cfh.setTimeout(1) 286 handlers.append(cfh) 287 288 return handlers 289 290 291class TimeoutTest(unittest.TestCase): 292 def setUp(self): 293 # clear _opener global variable 294 self.addCleanup(urllib.request.urlcleanup) 295 296 def test_http_basic(self): 297 self.assertIsNone(socket.getdefaulttimeout()) 298 url = support.TEST_HTTP_URL 299 with socket_helper.transient_internet(url, timeout=None): 300 u = _urlopen_with_retry(url) 301 self.addCleanup(u.close) 302 self.assertIsNone(u.fp.raw._sock.gettimeout()) 303 304 def test_http_default_timeout(self): 305 self.assertIsNone(socket.getdefaulttimeout()) 306 url = support.TEST_HTTP_URL 307 with socket_helper.transient_internet(url): 308 socket.setdefaulttimeout(60) 309 try: 310 u = _urlopen_with_retry(url) 311 self.addCleanup(u.close) 312 finally: 313 socket.setdefaulttimeout(None) 314 self.assertEqual(u.fp.raw._sock.gettimeout(), 60) 315 316 def test_http_no_timeout(self): 317 self.assertIsNone(socket.getdefaulttimeout()) 318 url = support.TEST_HTTP_URL 319 with socket_helper.transient_internet(url): 320 socket.setdefaulttimeout(60) 321 try: 322 u = _urlopen_with_retry(url, timeout=None) 323 self.addCleanup(u.close) 324 finally: 325 socket.setdefaulttimeout(None) 326 self.assertIsNone(u.fp.raw._sock.gettimeout()) 327 328 def test_http_timeout(self): 329 url = support.TEST_HTTP_URL 330 with socket_helper.transient_internet(url): 331 u = _urlopen_with_retry(url, timeout=120) 332 self.addCleanup(u.close) 333 self.assertEqual(u.fp.raw._sock.gettimeout(), 120) 334 335 FTP_HOST = 'ftp://www.pythontest.net/' 336 337 def test_ftp_basic(self): 338 self.assertIsNone(socket.getdefaulttimeout()) 339 with socket_helper.transient_internet(self.FTP_HOST, timeout=None): 340 u = _urlopen_with_retry(self.FTP_HOST) 341 self.addCleanup(u.close) 342 self.assertIsNone(u.fp.fp.raw._sock.gettimeout()) 343 344 def test_ftp_default_timeout(self): 345 self.assertIsNone(socket.getdefaulttimeout()) 346 with socket_helper.transient_internet(self.FTP_HOST): 347 socket.setdefaulttimeout(60) 348 try: 349 u = _urlopen_with_retry(self.FTP_HOST) 350 self.addCleanup(u.close) 351 finally: 352 socket.setdefaulttimeout(None) 353 self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60) 354 355 def test_ftp_no_timeout(self): 356 self.assertIsNone(socket.getdefaulttimeout()) 357 with socket_helper.transient_internet(self.FTP_HOST): 358 socket.setdefaulttimeout(60) 359 try: 360 u = _urlopen_with_retry(self.FTP_HOST, timeout=None) 361 self.addCleanup(u.close) 362 finally: 363 socket.setdefaulttimeout(None) 364 self.assertIsNone(u.fp.fp.raw._sock.gettimeout()) 365 366 def test_ftp_timeout(self): 367 with socket_helper.transient_internet(self.FTP_HOST): 368 u = _urlopen_with_retry(self.FTP_HOST, timeout=60) 369 self.addCleanup(u.close) 370 self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60) 371 372 373if __name__ == "__main__": 374 unittest.main() 375