1import base64 2import datetime 3import decimal 4import sys 5import time 6import unittest 7from unittest import mock 8import xmlrpc.client as xmlrpclib 9import xmlrpc.server 10import http.client 11import http, http.server 12import socket 13import threading 14import re 15import io 16import contextlib 17from test import support 18from test.support import os_helper 19from test.support import socket_helper 20from test.support import threading_helper 21from test.support import ALWAYS_EQ, LARGEST, SMALLEST 22 23try: 24 import gzip 25except ImportError: 26 gzip = None 27 28support.requires_working_socket(module=True) 29 30alist = [{'astring': '[email protected]', 31 'afloat': 7283.43, 32 'anint': 2**20, 33 'ashortlong': 2, 34 'anotherlist': ['.zyx.41'], 35 'abase64': xmlrpclib.Binary(b"my dog has fleas"), 36 'b64bytes': b"my dog has fleas", 37 'b64bytearray': bytearray(b"my dog has fleas"), 38 'boolean': False, 39 'unicode': '\u4000\u6000\u8000', 40 'ukey\u4000': 'regular value', 41 'datetime1': xmlrpclib.DateTime('20050210T11:41:23'), 42 'datetime2': xmlrpclib.DateTime( 43 (2005, 2, 10, 11, 41, 23, 0, 1, -1)), 44 'datetime3': xmlrpclib.DateTime( 45 datetime.datetime(2005, 2, 10, 11, 41, 23)), 46 }] 47 48class XMLRPCTestCase(unittest.TestCase): 49 50 def test_dump_load(self): 51 dump = xmlrpclib.dumps((alist,)) 52 load = xmlrpclib.loads(dump) 53 self.assertEqual(alist, load[0][0]) 54 55 def test_dump_bare_datetime(self): 56 # This checks that an unwrapped datetime.date object can be handled 57 # by the marshalling code. This can't be done via test_dump_load() 58 # since with use_builtin_types set to 1 the unmarshaller would create 59 # datetime objects for the 'datetime[123]' keys as well 60 dt = datetime.datetime(2005, 2, 10, 11, 41, 23) 61 self.assertEqual(dt, xmlrpclib.DateTime('20050210T11:41:23')) 62 s = xmlrpclib.dumps((dt,)) 63 64 result, m = xmlrpclib.loads(s, use_builtin_types=True) 65 (newdt,) = result 66 self.assertEqual(newdt, dt) 67 self.assertIs(type(newdt), datetime.datetime) 68 self.assertIsNone(m) 69 70 result, m = xmlrpclib.loads(s, use_builtin_types=False) 71 (newdt,) = result 72 self.assertEqual(newdt, dt) 73 self.assertIs(type(newdt), xmlrpclib.DateTime) 74 self.assertIsNone(m) 75 76 result, m = xmlrpclib.loads(s, use_datetime=True) 77 (newdt,) = result 78 self.assertEqual(newdt, dt) 79 self.assertIs(type(newdt), datetime.datetime) 80 self.assertIsNone(m) 81 82 result, m = xmlrpclib.loads(s, use_datetime=False) 83 (newdt,) = result 84 self.assertEqual(newdt, dt) 85 self.assertIs(type(newdt), xmlrpclib.DateTime) 86 self.assertIsNone(m) 87 88 89 def test_datetime_before_1900(self): 90 # same as before but with a date before 1900 91 dt = datetime.datetime(1, 2, 10, 11, 41, 23) 92 self.assertEqual(dt, xmlrpclib.DateTime('00010210T11:41:23')) 93 s = xmlrpclib.dumps((dt,)) 94 95 result, m = xmlrpclib.loads(s, use_builtin_types=True) 96 (newdt,) = result 97 self.assertEqual(newdt, dt) 98 self.assertIs(type(newdt), datetime.datetime) 99 self.assertIsNone(m) 100 101 result, m = xmlrpclib.loads(s, use_builtin_types=False) 102 (newdt,) = result 103 self.assertEqual(newdt, dt) 104 self.assertIs(type(newdt), xmlrpclib.DateTime) 105 self.assertIsNone(m) 106 107 def test_bug_1164912 (self): 108 d = xmlrpclib.DateTime() 109 ((new_d,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((d,), 110 methodresponse=True)) 111 self.assertIsInstance(new_d.value, str) 112 113 # Check that the output of dumps() is still an 8-bit string 114 s = xmlrpclib.dumps((new_d,), methodresponse=True) 115 self.assertIsInstance(s, str) 116 117 def test_newstyle_class(self): 118 class T(object): 119 pass 120 t = T() 121 t.x = 100 122 t.y = "Hello" 123 ((t2,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((t,))) 124 self.assertEqual(t2, t.__dict__) 125 126 def test_dump_big_long(self): 127 self.assertRaises(OverflowError, xmlrpclib.dumps, (2**99,)) 128 129 def test_dump_bad_dict(self): 130 self.assertRaises(TypeError, xmlrpclib.dumps, ({(1,2,3): 1},)) 131 132 def test_dump_recursive_seq(self): 133 l = [1,2,3] 134 t = [3,4,5,l] 135 l.append(t) 136 self.assertRaises(TypeError, xmlrpclib.dumps, (l,)) 137 138 def test_dump_recursive_dict(self): 139 d = {'1':1, '2':1} 140 t = {'3':3, 'd':d} 141 d['t'] = t 142 self.assertRaises(TypeError, xmlrpclib.dumps, (d,)) 143 144 def test_dump_big_int(self): 145 if sys.maxsize > 2**31-1: 146 self.assertRaises(OverflowError, xmlrpclib.dumps, 147 (int(2**34),)) 148 149 xmlrpclib.dumps((xmlrpclib.MAXINT, xmlrpclib.MININT)) 150 self.assertRaises(OverflowError, xmlrpclib.dumps, 151 (xmlrpclib.MAXINT+1,)) 152 self.assertRaises(OverflowError, xmlrpclib.dumps, 153 (xmlrpclib.MININT-1,)) 154 155 def dummy_write(s): 156 pass 157 158 m = xmlrpclib.Marshaller() 159 m.dump_int(xmlrpclib.MAXINT, dummy_write) 160 m.dump_int(xmlrpclib.MININT, dummy_write) 161 self.assertRaises(OverflowError, m.dump_int, 162 xmlrpclib.MAXINT+1, dummy_write) 163 self.assertRaises(OverflowError, m.dump_int, 164 xmlrpclib.MININT-1, dummy_write) 165 166 def test_dump_double(self): 167 xmlrpclib.dumps((float(2 ** 34),)) 168 xmlrpclib.dumps((float(xmlrpclib.MAXINT), 169 float(xmlrpclib.MININT))) 170 xmlrpclib.dumps((float(xmlrpclib.MAXINT + 42), 171 float(xmlrpclib.MININT - 42))) 172 173 def dummy_write(s): 174 pass 175 176 m = xmlrpclib.Marshaller() 177 m.dump_double(xmlrpclib.MAXINT, dummy_write) 178 m.dump_double(xmlrpclib.MININT, dummy_write) 179 m.dump_double(xmlrpclib.MAXINT + 42, dummy_write) 180 m.dump_double(xmlrpclib.MININT - 42, dummy_write) 181 182 def test_dump_none(self): 183 value = alist + [None] 184 arg1 = (alist + [None],) 185 strg = xmlrpclib.dumps(arg1, allow_none=True) 186 self.assertEqual(value, 187 xmlrpclib.loads(strg)[0][0]) 188 self.assertRaises(TypeError, xmlrpclib.dumps, (arg1,)) 189 190 def test_dump_encoding(self): 191 value = {'key\u20ac\xa4': 192 'value\u20ac\xa4'} 193 strg = xmlrpclib.dumps((value,), encoding='iso-8859-15') 194 strg = "<?xml version='1.0' encoding='iso-8859-15'?>" + strg 195 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 196 strg = strg.encode('iso-8859-15', 'xmlcharrefreplace') 197 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 198 199 strg = xmlrpclib.dumps((value,), encoding='iso-8859-15', 200 methodresponse=True) 201 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 202 strg = strg.encode('iso-8859-15', 'xmlcharrefreplace') 203 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 204 205 methodname = 'method\u20ac\xa4' 206 strg = xmlrpclib.dumps((value,), encoding='iso-8859-15', 207 methodname=methodname) 208 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 209 self.assertEqual(xmlrpclib.loads(strg)[1], methodname) 210 211 def test_dump_bytes(self): 212 sample = b"my dog has fleas" 213 self.assertEqual(sample, xmlrpclib.Binary(sample)) 214 for type_ in bytes, bytearray, xmlrpclib.Binary: 215 value = type_(sample) 216 s = xmlrpclib.dumps((value,)) 217 218 result, m = xmlrpclib.loads(s, use_builtin_types=True) 219 (newvalue,) = result 220 self.assertEqual(newvalue, sample) 221 self.assertIs(type(newvalue), bytes) 222 self.assertIsNone(m) 223 224 result, m = xmlrpclib.loads(s, use_builtin_types=False) 225 (newvalue,) = result 226 self.assertEqual(newvalue, sample) 227 self.assertIs(type(newvalue), xmlrpclib.Binary) 228 self.assertIsNone(m) 229 230 def test_loads_unsupported(self): 231 ResponseError = xmlrpclib.ResponseError 232 data = '<params><param><value><spam/></value></param></params>' 233 self.assertRaises(ResponseError, xmlrpclib.loads, data) 234 data = ('<params><param><value><array>' 235 '<value><spam/></value>' 236 '</array></value></param></params>') 237 self.assertRaises(ResponseError, xmlrpclib.loads, data) 238 data = ('<params><param><value><struct>' 239 '<member><name>a</name><value><spam/></value></member>' 240 '<member><name>b</name><value><spam/></value></member>' 241 '</struct></value></param></params>') 242 self.assertRaises(ResponseError, xmlrpclib.loads, data) 243 244 def check_loads(self, s, value, **kwargs): 245 dump = '<params><param><value>%s</value></param></params>' % s 246 result, m = xmlrpclib.loads(dump, **kwargs) 247 (newvalue,) = result 248 self.assertEqual(newvalue, value) 249 self.assertIs(type(newvalue), type(value)) 250 self.assertIsNone(m) 251 252 def test_load_standard_types(self): 253 check = self.check_loads 254 check('string', 'string') 255 check('<string>string</string>', 'string') 256 check('<string> string</string>', ' string') 257 check('<int>2056183947</int>', 2056183947) 258 check('<int>-2056183947</int>', -2056183947) 259 check('<i4>2056183947</i4>', 2056183947) 260 check('<double>46093.78125</double>', 46093.78125) 261 check('<boolean>0</boolean>', False) 262 check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>', 263 xmlrpclib.Binary(b'\x00byte string\xff')) 264 check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>', 265 b'\x00byte string\xff', use_builtin_types=True) 266 check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>', 267 xmlrpclib.DateTime('20050210T11:41:23')) 268 check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>', 269 datetime.datetime(2005, 2, 10, 11, 41, 23), 270 use_builtin_types=True) 271 check('<array><data>' 272 '<value><int>1</int></value><value><int>2</int></value>' 273 '</data></array>', [1, 2]) 274 check('<struct>' 275 '<member><name>b</name><value><int>2</int></value></member>' 276 '<member><name>a</name><value><int>1</int></value></member>' 277 '</struct>', {'a': 1, 'b': 2}) 278 279 def test_load_extension_types(self): 280 check = self.check_loads 281 check('<nil/>', None) 282 check('<ex:nil/>', None) 283 check('<i1>205</i1>', 205) 284 check('<i2>20561</i2>', 20561) 285 check('<i8>9876543210</i8>', 9876543210) 286 check('<biginteger>98765432100123456789</biginteger>', 287 98765432100123456789) 288 check('<float>93.78125</float>', 93.78125) 289 check('<bigdecimal>9876543210.0123456789</bigdecimal>', 290 decimal.Decimal('9876543210.0123456789')) 291 292 def test_limit_int(self): 293 check = self.check_loads 294 maxdigits = 5000 295 with support.adjust_int_max_str_digits(maxdigits): 296 s = '1' * (maxdigits + 1) 297 with self.assertRaises(ValueError): 298 check(f'<int>{s}</int>', None) 299 with self.assertRaises(ValueError): 300 check(f'<biginteger>{s}</biginteger>', None) 301 302 def test_get_host_info(self): 303 # see bug #3613, this raised a TypeError 304 transp = xmlrpc.client.Transport() 305 self.assertEqual(transp.get_host_info("[email protected]"), 306 ('host.tld', 307 [('Authorization', 'Basic dXNlcg==')], {})) 308 309 def test_ssl_presence(self): 310 try: 311 import ssl 312 except ImportError: 313 has_ssl = False 314 else: 315 has_ssl = True 316 try: 317 xmlrpc.client.ServerProxy('https://localhost:9999').bad_function() 318 except NotImplementedError: 319 self.assertFalse(has_ssl, "xmlrpc client's error with SSL support") 320 except OSError: 321 self.assertTrue(has_ssl) 322 323 def test_keepalive_disconnect(self): 324 class RequestHandler(http.server.BaseHTTPRequestHandler): 325 protocol_version = "HTTP/1.1" 326 handled = False 327 328 def do_POST(self): 329 length = int(self.headers.get("Content-Length")) 330 self.rfile.read(length) 331 if self.handled: 332 self.close_connection = True 333 return 334 response = xmlrpclib.dumps((5,), methodresponse=True) 335 response = response.encode() 336 self.send_response(http.HTTPStatus.OK) 337 self.send_header("Content-Length", len(response)) 338 self.end_headers() 339 self.wfile.write(response) 340 self.handled = True 341 self.close_connection = False 342 343 def log_message(self, format, *args): 344 # don't clobber sys.stderr 345 pass 346 347 def run_server(): 348 server.socket.settimeout(float(1)) # Don't hang if client fails 349 server.handle_request() # First request and attempt at second 350 server.handle_request() # Retried second request 351 352 server = http.server.HTTPServer((socket_helper.HOST, 0), RequestHandler) 353 self.addCleanup(server.server_close) 354 thread = threading.Thread(target=run_server) 355 thread.start() 356 self.addCleanup(thread.join) 357 url = "http://{}:{}/".format(*server.server_address) 358 with xmlrpclib.ServerProxy(url) as p: 359 self.assertEqual(p.method(), 5) 360 self.assertEqual(p.method(), 5) 361 362 363class SimpleXMLRPCDispatcherTestCase(unittest.TestCase): 364 class DispatchExc(Exception): 365 """Raised inside the dispatched functions when checking for 366 chained exceptions""" 367 368 def test_call_registered_func(self): 369 """Calls explicitly registered function""" 370 # Makes sure any exception raised inside the function has no other 371 # exception chained to it 372 373 exp_params = 1, 2, 3 374 375 def dispatched_func(*params): 376 raise self.DispatchExc(params) 377 378 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 379 dispatcher.register_function(dispatched_func) 380 with self.assertRaises(self.DispatchExc) as exc_ctx: 381 dispatcher._dispatch('dispatched_func', exp_params) 382 self.assertEqual(exc_ctx.exception.args, (exp_params,)) 383 self.assertIsNone(exc_ctx.exception.__cause__) 384 self.assertIsNone(exc_ctx.exception.__context__) 385 386 def test_call_instance_func(self): 387 """Calls a registered instance attribute as a function""" 388 # Makes sure any exception raised inside the function has no other 389 # exception chained to it 390 391 exp_params = 1, 2, 3 392 393 class DispatchedClass: 394 def dispatched_func(self, *params): 395 raise SimpleXMLRPCDispatcherTestCase.DispatchExc(params) 396 397 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 398 dispatcher.register_instance(DispatchedClass()) 399 with self.assertRaises(self.DispatchExc) as exc_ctx: 400 dispatcher._dispatch('dispatched_func', exp_params) 401 self.assertEqual(exc_ctx.exception.args, (exp_params,)) 402 self.assertIsNone(exc_ctx.exception.__cause__) 403 self.assertIsNone(exc_ctx.exception.__context__) 404 405 def test_call_dispatch_func(self): 406 """Calls the registered instance's `_dispatch` function""" 407 # Makes sure any exception raised inside the function has no other 408 # exception chained to it 409 410 exp_method = 'method' 411 exp_params = 1, 2, 3 412 413 class TestInstance: 414 def _dispatch(self, method, params): 415 raise SimpleXMLRPCDispatcherTestCase.DispatchExc( 416 method, params) 417 418 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 419 dispatcher.register_instance(TestInstance()) 420 with self.assertRaises(self.DispatchExc) as exc_ctx: 421 dispatcher._dispatch(exp_method, exp_params) 422 self.assertEqual(exc_ctx.exception.args, (exp_method, exp_params)) 423 self.assertIsNone(exc_ctx.exception.__cause__) 424 self.assertIsNone(exc_ctx.exception.__context__) 425 426 def test_registered_func_is_none(self): 427 """Calls explicitly registered function which is None""" 428 429 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 430 dispatcher.register_function(None, name='method') 431 with self.assertRaisesRegex(Exception, 'method'): 432 dispatcher._dispatch('method', ('param',)) 433 434 def test_instance_has_no_func(self): 435 """Attempts to call nonexistent function on a registered instance""" 436 437 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 438 dispatcher.register_instance(object()) 439 with self.assertRaisesRegex(Exception, 'method'): 440 dispatcher._dispatch('method', ('param',)) 441 442 def test_cannot_locate_func(self): 443 """Calls a function that the dispatcher cannot locate""" 444 445 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 446 with self.assertRaisesRegex(Exception, 'method'): 447 dispatcher._dispatch('method', ('param',)) 448 449 450class HelperTestCase(unittest.TestCase): 451 def test_escape(self): 452 self.assertEqual(xmlrpclib.escape("a&b"), "a&b") 453 self.assertEqual(xmlrpclib.escape("a<b"), "a<b") 454 self.assertEqual(xmlrpclib.escape("a>b"), "a>b") 455 456class FaultTestCase(unittest.TestCase): 457 def test_repr(self): 458 f = xmlrpclib.Fault(42, 'Test Fault') 459 self.assertEqual(repr(f), "<Fault 42: 'Test Fault'>") 460 self.assertEqual(repr(f), str(f)) 461 462 def test_dump_fault(self): 463 f = xmlrpclib.Fault(42, 'Test Fault') 464 s = xmlrpclib.dumps((f,)) 465 (newf,), m = xmlrpclib.loads(s) 466 self.assertEqual(newf, {'faultCode': 42, 'faultString': 'Test Fault'}) 467 self.assertEqual(m, None) 468 469 s = xmlrpclib.Marshaller().dumps(f) 470 self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s) 471 472 def test_dotted_attribute(self): 473 # this will raise AttributeError because code don't want us to use 474 # private methods 475 self.assertRaises(AttributeError, 476 xmlrpc.server.resolve_dotted_attribute, str, '__add') 477 self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title')) 478 479class DateTimeTestCase(unittest.TestCase): 480 def test_default(self): 481 with mock.patch('time.localtime') as localtime_mock: 482 time_struct = time.struct_time( 483 [2013, 7, 15, 0, 24, 49, 0, 196, 0]) 484 localtime_mock.return_value = time_struct 485 localtime = time.localtime() 486 t = xmlrpclib.DateTime() 487 self.assertEqual(str(t), 488 time.strftime("%Y%m%dT%H:%M:%S", localtime)) 489 490 def test_time(self): 491 d = 1181399930.036952 492 t = xmlrpclib.DateTime(d) 493 self.assertEqual(str(t), 494 time.strftime("%Y%m%dT%H:%M:%S", time.localtime(d))) 495 496 def test_time_tuple(self): 497 d = (2007,6,9,10,38,50,5,160,0) 498 t = xmlrpclib.DateTime(d) 499 self.assertEqual(str(t), '20070609T10:38:50') 500 501 def test_time_struct(self): 502 d = time.localtime(1181399930.036952) 503 t = xmlrpclib.DateTime(d) 504 self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", d)) 505 506 def test_datetime_datetime(self): 507 d = datetime.datetime(2007,1,2,3,4,5) 508 t = xmlrpclib.DateTime(d) 509 self.assertEqual(str(t), '20070102T03:04:05') 510 511 def test_repr(self): 512 d = datetime.datetime(2007,1,2,3,4,5) 513 t = xmlrpclib.DateTime(d) 514 val ="<DateTime '20070102T03:04:05' at %#x>" % id(t) 515 self.assertEqual(repr(t), val) 516 517 def test_decode(self): 518 d = ' 20070908T07:11:13 ' 519 t1 = xmlrpclib.DateTime() 520 t1.decode(d) 521 tref = xmlrpclib.DateTime(datetime.datetime(2007,9,8,7,11,13)) 522 self.assertEqual(t1, tref) 523 524 t2 = xmlrpclib._datetime(d) 525 self.assertEqual(t2, tref) 526 527 def test_comparison(self): 528 now = datetime.datetime.now() 529 dtime = xmlrpclib.DateTime(now.timetuple()) 530 531 # datetime vs. DateTime 532 self.assertTrue(dtime == now) 533 self.assertTrue(now == dtime) 534 then = now + datetime.timedelta(seconds=4) 535 self.assertTrue(then >= dtime) 536 self.assertTrue(dtime < then) 537 538 # str vs. DateTime 539 dstr = now.strftime("%Y%m%dT%H:%M:%S") 540 self.assertTrue(dtime == dstr) 541 self.assertTrue(dstr == dtime) 542 dtime_then = xmlrpclib.DateTime(then.timetuple()) 543 self.assertTrue(dtime_then >= dstr) 544 self.assertTrue(dstr < dtime_then) 545 546 # some other types 547 dbytes = dstr.encode('ascii') 548 dtuple = now.timetuple() 549 self.assertFalse(dtime == 1970) 550 self.assertTrue(dtime != dbytes) 551 self.assertFalse(dtime == bytearray(dbytes)) 552 self.assertTrue(dtime != dtuple) 553 with self.assertRaises(TypeError): 554 dtime < float(1970) 555 with self.assertRaises(TypeError): 556 dtime > dbytes 557 with self.assertRaises(TypeError): 558 dtime <= bytearray(dbytes) 559 with self.assertRaises(TypeError): 560 dtime >= dtuple 561 562 self.assertTrue(dtime == ALWAYS_EQ) 563 self.assertFalse(dtime != ALWAYS_EQ) 564 self.assertTrue(dtime < LARGEST) 565 self.assertFalse(dtime > LARGEST) 566 self.assertTrue(dtime <= LARGEST) 567 self.assertFalse(dtime >= LARGEST) 568 self.assertFalse(dtime < SMALLEST) 569 self.assertTrue(dtime > SMALLEST) 570 self.assertFalse(dtime <= SMALLEST) 571 self.assertTrue(dtime >= SMALLEST) 572 573 574class BinaryTestCase(unittest.TestCase): 575 576 # XXX What should str(Binary(b"\xff")) return? I'm choosing "\xff" 577 # for now (i.e. interpreting the binary data as Latin-1-encoded 578 # text). But this feels very unsatisfactory. Perhaps we should 579 # only define repr(), and return r"Binary(b'\xff')" instead? 580 581 def test_default(self): 582 t = xmlrpclib.Binary() 583 self.assertEqual(str(t), '') 584 585 def test_string(self): 586 d = b'\x01\x02\x03abc123\xff\xfe' 587 t = xmlrpclib.Binary(d) 588 self.assertEqual(str(t), str(d, "latin-1")) 589 590 def test_decode(self): 591 d = b'\x01\x02\x03abc123\xff\xfe' 592 de = base64.encodebytes(d) 593 t1 = xmlrpclib.Binary() 594 t1.decode(de) 595 self.assertEqual(str(t1), str(d, "latin-1")) 596 597 t2 = xmlrpclib._binary(de) 598 self.assertEqual(str(t2), str(d, "latin-1")) 599 600 601ADDR = PORT = URL = None 602 603# The evt is set twice. First when the server is ready to serve. 604# Second when the server has been shutdown. The user must clear 605# the event after it has been set the first time to catch the second set. 606def http_server(evt, numrequests, requestHandler=None, encoding=None): 607 class TestInstanceClass: 608 def div(self, x, y): 609 return x // y 610 611 def _methodHelp(self, name): 612 if name == 'div': 613 return 'This is the div function' 614 615 class Fixture: 616 @staticmethod 617 def getData(): 618 return '42' 619 620 class MyXMLRPCServer(xmlrpc.server.SimpleXMLRPCServer): 621 def get_request(self): 622 # Ensure the socket is always non-blocking. On Linux, socket 623 # attributes are not inherited like they are on *BSD and Windows. 624 s, port = self.socket.accept() 625 s.setblocking(True) 626 return s, port 627 628 if not requestHandler: 629 requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler 630 serv = MyXMLRPCServer(("localhost", 0), requestHandler, 631 encoding=encoding, 632 logRequests=False, bind_and_activate=False) 633 try: 634 serv.server_bind() 635 global ADDR, PORT, URL 636 ADDR, PORT = serv.socket.getsockname() 637 #connect to IP address directly. This avoids socket.create_connection() 638 #trying to connect to "localhost" using all address families, which 639 #causes slowdown e.g. on vista which supports AF_INET6. The server listens 640 #on AF_INET only. 641 URL = "http://%s:%d"%(ADDR, PORT) 642 serv.server_activate() 643 serv.register_introspection_functions() 644 serv.register_multicall_functions() 645 serv.register_function(pow) 646 serv.register_function(lambda x: x, 'têšt') 647 @serv.register_function 648 def my_function(): 649 '''This is my function''' 650 return True 651 @serv.register_function(name='add') 652 def _(x, y): 653 return x + y 654 testInstance = TestInstanceClass() 655 serv.register_instance(testInstance, allow_dotted_names=True) 656 evt.set() 657 658 # handle up to 'numrequests' requests 659 while numrequests > 0: 660 serv.handle_request() 661 numrequests -= 1 662 663 except TimeoutError: 664 pass 665 finally: 666 serv.socket.close() 667 PORT = None 668 evt.set() 669 670def http_multi_server(evt, numrequests, requestHandler=None): 671 class TestInstanceClass: 672 def div(self, x, y): 673 return x // y 674 675 def _methodHelp(self, name): 676 if name == 'div': 677 return 'This is the div function' 678 679 def my_function(): 680 '''This is my function''' 681 return True 682 683 class MyXMLRPCServer(xmlrpc.server.MultiPathXMLRPCServer): 684 def get_request(self): 685 # Ensure the socket is always non-blocking. On Linux, socket 686 # attributes are not inherited like they are on *BSD and Windows. 687 s, port = self.socket.accept() 688 s.setblocking(True) 689 return s, port 690 691 if not requestHandler: 692 requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler 693 class MyRequestHandler(requestHandler): 694 rpc_paths = [] 695 696 class BrokenDispatcher: 697 def _marshaled_dispatch(self, data, dispatch_method=None, path=None): 698 raise RuntimeError("broken dispatcher") 699 700 serv = MyXMLRPCServer(("localhost", 0), MyRequestHandler, 701 logRequests=False, bind_and_activate=False) 702 serv.socket.settimeout(3) 703 serv.server_bind() 704 try: 705 global ADDR, PORT, URL 706 ADDR, PORT = serv.socket.getsockname() 707 #connect to IP address directly. This avoids socket.create_connection() 708 #trying to connect to "localhost" using all address families, which 709 #causes slowdown e.g. on vista which supports AF_INET6. The server listens 710 #on AF_INET only. 711 URL = "http://%s:%d"%(ADDR, PORT) 712 serv.server_activate() 713 paths = [ 714 "/foo", "/foo/bar", 715 "/foo?k=v", "/foo#frag", "/foo?k=v#frag", 716 "", "/", "/RPC2", "?k=v", "#frag", 717 ] 718 for path in paths: 719 d = serv.add_dispatcher(path, xmlrpc.server.SimpleXMLRPCDispatcher()) 720 d.register_introspection_functions() 721 d.register_multicall_functions() 722 d.register_function(lambda p=path: p, 'test') 723 serv.get_dispatcher(paths[0]).register_function(pow) 724 serv.get_dispatcher(paths[1]).register_function(lambda x,y: x+y, 'add') 725 serv.add_dispatcher("/is/broken", BrokenDispatcher()) 726 evt.set() 727 728 # handle up to 'numrequests' requests 729 while numrequests > 0: 730 serv.handle_request() 731 numrequests -= 1 732 733 except TimeoutError: 734 pass 735 finally: 736 serv.socket.close() 737 PORT = None 738 evt.set() 739 740# This function prevents errors like: 741# <ProtocolError for localhost:57527/RPC2: 500 Internal Server Error> 742def is_unavailable_exception(e): 743 '''Returns True if the given ProtocolError is the product of a server-side 744 exception caused by the 'temporarily unavailable' response sometimes 745 given by operations on non-blocking sockets.''' 746 747 # sometimes we get a -1 error code and/or empty headers 748 try: 749 if e.errcode == -1 or e.headers is None: 750 return True 751 exc_mess = e.headers.get('X-exception') 752 except AttributeError: 753 # Ignore OSErrors here. 754 exc_mess = str(e) 755 756 if exc_mess and 'temporarily unavailable' in exc_mess.lower(): 757 return True 758 759def make_request_and_skipIf(condition, reason): 760 # If we skip the test, we have to make a request because 761 # the server created in setUp blocks expecting one to come in. 762 if not condition: 763 return lambda func: func 764 def decorator(func): 765 def make_request_and_skip(self): 766 try: 767 xmlrpclib.ServerProxy(URL).my_function() 768 except (xmlrpclib.ProtocolError, OSError) as e: 769 if not is_unavailable_exception(e): 770 raise 771 raise unittest.SkipTest(reason) 772 return make_request_and_skip 773 return decorator 774 775class BaseServerTestCase(unittest.TestCase): 776 requestHandler = None 777 request_count = 1 778 threadFunc = staticmethod(http_server) 779 780 def setUp(self): 781 # enable traceback reporting 782 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True 783 784 self.evt = threading.Event() 785 # start server thread to handle requests 786 serv_args = (self.evt, self.request_count, self.requestHandler) 787 thread = threading.Thread(target=self.threadFunc, args=serv_args) 788 thread.start() 789 self.addCleanup(thread.join) 790 791 # wait for the server to be ready 792 self.evt.wait() 793 self.evt.clear() 794 795 def tearDown(self): 796 # wait on the server thread to terminate 797 self.evt.wait() 798 799 # disable traceback reporting 800 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False 801 802class SimpleServerTestCase(BaseServerTestCase): 803 def test_simple1(self): 804 try: 805 p = xmlrpclib.ServerProxy(URL) 806 self.assertEqual(p.pow(6,8), 6**8) 807 except (xmlrpclib.ProtocolError, OSError) as e: 808 # ignore failures due to non-blocking socket 'unavailable' errors 809 if not is_unavailable_exception(e): 810 # protocol error; provide additional information in test output 811 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 812 813 def test_nonascii(self): 814 start_string = 'P\N{LATIN SMALL LETTER Y WITH CIRCUMFLEX}t' 815 end_string = 'h\N{LATIN SMALL LETTER O WITH HORN}n' 816 try: 817 p = xmlrpclib.ServerProxy(URL) 818 self.assertEqual(p.add(start_string, end_string), 819 start_string + end_string) 820 except (xmlrpclib.ProtocolError, OSError) as e: 821 # ignore failures due to non-blocking socket 'unavailable' errors 822 if not is_unavailable_exception(e): 823 # protocol error; provide additional information in test output 824 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 825 826 def test_client_encoding(self): 827 start_string = '\u20ac' 828 end_string = '\xa4' 829 830 try: 831 p = xmlrpclib.ServerProxy(URL, encoding='iso-8859-15') 832 self.assertEqual(p.add(start_string, end_string), 833 start_string + end_string) 834 except (xmlrpclib.ProtocolError, socket.error) as e: 835 # ignore failures due to non-blocking socket unavailable errors. 836 if not is_unavailable_exception(e): 837 # protocol error; provide additional information in test output 838 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 839 840 def test_nonascii_methodname(self): 841 try: 842 p = xmlrpclib.ServerProxy(URL, encoding='ascii') 843 self.assertEqual(p.têšt(42), 42) 844 except (xmlrpclib.ProtocolError, socket.error) as e: 845 # ignore failures due to non-blocking socket unavailable errors. 846 if not is_unavailable_exception(e): 847 # protocol error; provide additional information in test output 848 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 849 850 def test_404(self): 851 # send POST with http.client, it should return 404 header and 852 # 'Not Found' message. 853 with contextlib.closing(http.client.HTTPConnection(ADDR, PORT)) as conn: 854 conn.request('POST', '/this-is-not-valid') 855 response = conn.getresponse() 856 857 self.assertEqual(response.status, 404) 858 self.assertEqual(response.reason, 'Not Found') 859 860 def test_introspection1(self): 861 expected_methods = set(['pow', 'div', 'my_function', 'add', 'têšt', 862 'system.listMethods', 'system.methodHelp', 863 'system.methodSignature', 'system.multicall', 864 'Fixture']) 865 try: 866 p = xmlrpclib.ServerProxy(URL) 867 meth = p.system.listMethods() 868 self.assertEqual(set(meth), expected_methods) 869 except (xmlrpclib.ProtocolError, OSError) as e: 870 # ignore failures due to non-blocking socket 'unavailable' errors 871 if not is_unavailable_exception(e): 872 # protocol error; provide additional information in test output 873 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 874 875 876 def test_introspection2(self): 877 try: 878 # test _methodHelp() 879 p = xmlrpclib.ServerProxy(URL) 880 divhelp = p.system.methodHelp('div') 881 self.assertEqual(divhelp, 'This is the div function') 882 except (xmlrpclib.ProtocolError, OSError) as e: 883 # ignore failures due to non-blocking socket 'unavailable' errors 884 if not is_unavailable_exception(e): 885 # protocol error; provide additional information in test output 886 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 887 888 @make_request_and_skipIf(sys.flags.optimize >= 2, 889 "Docstrings are omitted with -O2 and above") 890 def test_introspection3(self): 891 try: 892 # test native doc 893 p = xmlrpclib.ServerProxy(URL) 894 myfunction = p.system.methodHelp('my_function') 895 self.assertEqual(myfunction, 'This is my function') 896 except (xmlrpclib.ProtocolError, OSError) as e: 897 # ignore failures due to non-blocking socket 'unavailable' errors 898 if not is_unavailable_exception(e): 899 # protocol error; provide additional information in test output 900 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 901 902 def test_introspection4(self): 903 # the SimpleXMLRPCServer doesn't support signatures, but 904 # at least check that we can try making the call 905 try: 906 p = xmlrpclib.ServerProxy(URL) 907 divsig = p.system.methodSignature('div') 908 self.assertEqual(divsig, 'signatures not supported') 909 except (xmlrpclib.ProtocolError, OSError) as e: 910 # ignore failures due to non-blocking socket 'unavailable' errors 911 if not is_unavailable_exception(e): 912 # protocol error; provide additional information in test output 913 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 914 915 def test_multicall(self): 916 try: 917 p = xmlrpclib.ServerProxy(URL) 918 multicall = xmlrpclib.MultiCall(p) 919 multicall.add(2,3) 920 multicall.pow(6,8) 921 multicall.div(127,42) 922 add_result, pow_result, div_result = multicall() 923 self.assertEqual(add_result, 2+3) 924 self.assertEqual(pow_result, 6**8) 925 self.assertEqual(div_result, 127//42) 926 except (xmlrpclib.ProtocolError, OSError) as e: 927 # ignore failures due to non-blocking socket 'unavailable' errors 928 if not is_unavailable_exception(e): 929 # protocol error; provide additional information in test output 930 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 931 932 def test_non_existing_multicall(self): 933 try: 934 p = xmlrpclib.ServerProxy(URL) 935 multicall = xmlrpclib.MultiCall(p) 936 multicall.this_is_not_exists() 937 result = multicall() 938 939 # result.results contains; 940 # [{'faultCode': 1, 'faultString': '<class \'exceptions.Exception\'>:' 941 # 'method "this_is_not_exists" is not supported'>}] 942 943 self.assertEqual(result.results[0]['faultCode'], 1) 944 self.assertEqual(result.results[0]['faultString'], 945 '<class \'Exception\'>:method "this_is_not_exists" ' 946 'is not supported') 947 except (xmlrpclib.ProtocolError, OSError) as e: 948 # ignore failures due to non-blocking socket 'unavailable' errors 949 if not is_unavailable_exception(e): 950 # protocol error; provide additional information in test output 951 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 952 953 def test_dotted_attribute(self): 954 # Raises an AttributeError because private methods are not allowed. 955 self.assertRaises(AttributeError, 956 xmlrpc.server.resolve_dotted_attribute, str, '__add') 957 958 self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title')) 959 # Get the test to run faster by sending a request with test_simple1. 960 # This avoids waiting for the socket timeout. 961 self.test_simple1() 962 963 def test_allow_dotted_names_true(self): 964 # XXX also need allow_dotted_names_false test. 965 server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT)) 966 data = server.Fixture.getData() 967 self.assertEqual(data, '42') 968 969 def test_unicode_host(self): 970 server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT)) 971 self.assertEqual(server.add("a", "\xe9"), "a\xe9") 972 973 def test_partial_post(self): 974 # Check that a partial POST doesn't make the server loop: issue #14001. 975 with contextlib.closing(socket.create_connection((ADDR, PORT))) as conn: 976 conn.send('POST /RPC2 HTTP/1.0\r\n' 977 'Content-Length: 100\r\n\r\n' 978 'bye HTTP/1.1\r\n' 979 f'Host: {ADDR}:{PORT}\r\n' 980 'Accept-Encoding: identity\r\n' 981 'Content-Length: 0\r\n\r\n'.encode('ascii')) 982 983 def test_context_manager(self): 984 with xmlrpclib.ServerProxy(URL) as server: 985 server.add(2, 3) 986 self.assertNotEqual(server('transport')._connection, 987 (None, None)) 988 self.assertEqual(server('transport')._connection, 989 (None, None)) 990 991 def test_context_manager_method_error(self): 992 try: 993 with xmlrpclib.ServerProxy(URL) as server: 994 server.add(2, "a") 995 except xmlrpclib.Fault: 996 pass 997 self.assertEqual(server('transport')._connection, 998 (None, None)) 999 1000 1001class SimpleServerEncodingTestCase(BaseServerTestCase): 1002 @staticmethod 1003 def threadFunc(evt, numrequests, requestHandler=None, encoding=None): 1004 http_server(evt, numrequests, requestHandler, 'iso-8859-15') 1005 1006 def test_server_encoding(self): 1007 start_string = '\u20ac' 1008 end_string = '\xa4' 1009 1010 try: 1011 p = xmlrpclib.ServerProxy(URL) 1012 self.assertEqual(p.add(start_string, end_string), 1013 start_string + end_string) 1014 except (xmlrpclib.ProtocolError, socket.error) as e: 1015 # ignore failures due to non-blocking socket unavailable errors. 1016 if not is_unavailable_exception(e): 1017 # protocol error; provide additional information in test output 1018 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 1019 1020 1021class MultiPathServerTestCase(BaseServerTestCase): 1022 threadFunc = staticmethod(http_multi_server) 1023 request_count = 2 1024 def test_path1(self): 1025 p = xmlrpclib.ServerProxy(URL+"/foo") 1026 self.assertEqual(p.pow(6,8), 6**8) 1027 self.assertRaises(xmlrpclib.Fault, p.add, 6, 8) 1028 1029 def test_path2(self): 1030 p = xmlrpclib.ServerProxy(URL+"/foo/bar") 1031 self.assertEqual(p.add(6,8), 6+8) 1032 self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8) 1033 1034 def test_path3(self): 1035 p = xmlrpclib.ServerProxy(URL+"/is/broken") 1036 self.assertRaises(xmlrpclib.Fault, p.add, 6, 8) 1037 1038 def test_invalid_path(self): 1039 p = xmlrpclib.ServerProxy(URL+"/invalid") 1040 self.assertRaises(xmlrpclib.Fault, p.add, 6, 8) 1041 1042 def test_path_query_fragment(self): 1043 p = xmlrpclib.ServerProxy(URL+"/foo?k=v#frag") 1044 self.assertEqual(p.test(), "/foo?k=v#frag") 1045 1046 def test_path_fragment(self): 1047 p = xmlrpclib.ServerProxy(URL+"/foo#frag") 1048 self.assertEqual(p.test(), "/foo#frag") 1049 1050 def test_path_query(self): 1051 p = xmlrpclib.ServerProxy(URL+"/foo?k=v") 1052 self.assertEqual(p.test(), "/foo?k=v") 1053 1054 def test_empty_path(self): 1055 p = xmlrpclib.ServerProxy(URL) 1056 self.assertEqual(p.test(), "/RPC2") 1057 1058 def test_root_path(self): 1059 p = xmlrpclib.ServerProxy(URL + "/") 1060 self.assertEqual(p.test(), "/") 1061 1062 def test_empty_path_query(self): 1063 p = xmlrpclib.ServerProxy(URL + "?k=v") 1064 self.assertEqual(p.test(), "?k=v") 1065 1066 def test_empty_path_fragment(self): 1067 p = xmlrpclib.ServerProxy(URL + "#frag") 1068 self.assertEqual(p.test(), "#frag") 1069 1070 1071#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism 1072#does indeed serve subsequent requests on the same connection 1073class BaseKeepaliveServerTestCase(BaseServerTestCase): 1074 #a request handler that supports keep-alive and logs requests into a 1075 #class variable 1076 class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler): 1077 parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler 1078 protocol_version = 'HTTP/1.1' 1079 myRequests = [] 1080 def handle(self): 1081 self.myRequests.append([]) 1082 self.reqidx = len(self.myRequests)-1 1083 return self.parentClass.handle(self) 1084 def handle_one_request(self): 1085 result = self.parentClass.handle_one_request(self) 1086 self.myRequests[self.reqidx].append(self.raw_requestline) 1087 return result 1088 1089 requestHandler = RequestHandler 1090 def setUp(self): 1091 #clear request log 1092 self.RequestHandler.myRequests = [] 1093 return BaseServerTestCase.setUp(self) 1094 1095#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism 1096#does indeed serve subsequent requests on the same connection 1097class KeepaliveServerTestCase1(BaseKeepaliveServerTestCase): 1098 def test_two(self): 1099 p = xmlrpclib.ServerProxy(URL) 1100 #do three requests. 1101 self.assertEqual(p.pow(6,8), 6**8) 1102 self.assertEqual(p.pow(6,8), 6**8) 1103 self.assertEqual(p.pow(6,8), 6**8) 1104 p("close")() 1105 1106 #they should have all been handled by a single request handler 1107 self.assertEqual(len(self.RequestHandler.myRequests), 1) 1108 1109 #check that we did at least two (the third may be pending append 1110 #due to thread scheduling) 1111 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2) 1112 1113 1114#test special attribute access on the serverproxy, through the __call__ 1115#function. 1116class KeepaliveServerTestCase2(BaseKeepaliveServerTestCase): 1117 #ask for two keepalive requests to be handled. 1118 request_count=2 1119 1120 def test_close(self): 1121 p = xmlrpclib.ServerProxy(URL) 1122 #do some requests with close. 1123 self.assertEqual(p.pow(6,8), 6**8) 1124 self.assertEqual(p.pow(6,8), 6**8) 1125 self.assertEqual(p.pow(6,8), 6**8) 1126 p("close")() #this should trigger a new keep-alive request 1127 self.assertEqual(p.pow(6,8), 6**8) 1128 self.assertEqual(p.pow(6,8), 6**8) 1129 self.assertEqual(p.pow(6,8), 6**8) 1130 p("close")() 1131 1132 #they should have all been two request handlers, each having logged at least 1133 #two complete requests 1134 self.assertEqual(len(self.RequestHandler.myRequests), 2) 1135 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2) 1136 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2) 1137 1138 1139 def test_transport(self): 1140 p = xmlrpclib.ServerProxy(URL) 1141 #do some requests with close. 1142 self.assertEqual(p.pow(6,8), 6**8) 1143 p("transport").close() #same as above, really. 1144 self.assertEqual(p.pow(6,8), 6**8) 1145 p("close")() 1146 self.assertEqual(len(self.RequestHandler.myRequests), 2) 1147 1148#A test case that verifies that gzip encoding works in both directions 1149#(for a request and the response) 1150@unittest.skipIf(gzip is None, 'requires gzip') 1151class GzipServerTestCase(BaseServerTestCase): 1152 #a request handler that supports keep-alive and logs requests into a 1153 #class variable 1154 class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler): 1155 parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler 1156 protocol_version = 'HTTP/1.1' 1157 1158 def do_POST(self): 1159 #store content of last request in class 1160 self.__class__.content_length = int(self.headers["content-length"]) 1161 return self.parentClass.do_POST(self) 1162 requestHandler = RequestHandler 1163 1164 class Transport(xmlrpclib.Transport): 1165 #custom transport, stores the response length for our perusal 1166 fake_gzip = False 1167 def parse_response(self, response): 1168 self.response_length=int(response.getheader("content-length", 0)) 1169 return xmlrpclib.Transport.parse_response(self, response) 1170 1171 def send_content(self, connection, body): 1172 if self.fake_gzip: 1173 #add a lone gzip header to induce decode error remotely 1174 connection.putheader("Content-Encoding", "gzip") 1175 return xmlrpclib.Transport.send_content(self, connection, body) 1176 1177 def setUp(self): 1178 BaseServerTestCase.setUp(self) 1179 1180 def test_gzip_request(self): 1181 t = self.Transport() 1182 t.encode_threshold = None 1183 p = xmlrpclib.ServerProxy(URL, transport=t) 1184 self.assertEqual(p.pow(6,8), 6**8) 1185 a = self.RequestHandler.content_length 1186 t.encode_threshold = 0 #turn on request encoding 1187 self.assertEqual(p.pow(6,8), 6**8) 1188 b = self.RequestHandler.content_length 1189 self.assertTrue(a>b) 1190 p("close")() 1191 1192 def test_bad_gzip_request(self): 1193 t = self.Transport() 1194 t.encode_threshold = None 1195 t.fake_gzip = True 1196 p = xmlrpclib.ServerProxy(URL, transport=t) 1197 cm = self.assertRaisesRegex(xmlrpclib.ProtocolError, 1198 re.compile(r"\b400\b")) 1199 with cm: 1200 p.pow(6, 8) 1201 p("close")() 1202 1203 def test_gzip_response(self): 1204 t = self.Transport() 1205 p = xmlrpclib.ServerProxy(URL, transport=t) 1206 old = self.requestHandler.encode_threshold 1207 self.requestHandler.encode_threshold = None #no encoding 1208 self.assertEqual(p.pow(6,8), 6**8) 1209 a = t.response_length 1210 self.requestHandler.encode_threshold = 0 #always encode 1211 self.assertEqual(p.pow(6,8), 6**8) 1212 p("close")() 1213 b = t.response_length 1214 self.requestHandler.encode_threshold = old 1215 self.assertTrue(a>b) 1216 1217 1218@unittest.skipIf(gzip is None, 'requires gzip') 1219class GzipUtilTestCase(unittest.TestCase): 1220 1221 def test_gzip_decode_limit(self): 1222 max_gzip_decode = 20 * 1024 * 1024 1223 data = b'\0' * max_gzip_decode 1224 encoded = xmlrpclib.gzip_encode(data) 1225 decoded = xmlrpclib.gzip_decode(encoded) 1226 self.assertEqual(len(decoded), max_gzip_decode) 1227 1228 data = b'\0' * (max_gzip_decode + 1) 1229 encoded = xmlrpclib.gzip_encode(data) 1230 1231 with self.assertRaisesRegex(ValueError, 1232 "max gzipped payload length exceeded"): 1233 xmlrpclib.gzip_decode(encoded) 1234 1235 xmlrpclib.gzip_decode(encoded, max_decode=-1) 1236 1237 1238class HeadersServerTestCase(BaseServerTestCase): 1239 class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler): 1240 test_headers = None 1241 1242 def do_POST(self): 1243 self.__class__.test_headers = self.headers 1244 return super().do_POST() 1245 requestHandler = RequestHandler 1246 standard_headers = [ 1247 'Host', 'Accept-Encoding', 'Content-Type', 'User-Agent', 1248 'Content-Length'] 1249 1250 def setUp(self): 1251 self.RequestHandler.test_headers = None 1252 return super().setUp() 1253 1254 def assertContainsAdditionalHeaders(self, headers, additional): 1255 expected_keys = sorted(self.standard_headers + list(additional.keys())) 1256 self.assertListEqual(sorted(headers.keys()), expected_keys) 1257 1258 for key, value in additional.items(): 1259 self.assertEqual(headers.get(key), value) 1260 1261 def test_header(self): 1262 p = xmlrpclib.ServerProxy(URL, headers=[('X-Test', 'foo')]) 1263 self.assertEqual(p.pow(6, 8), 6**8) 1264 1265 headers = self.RequestHandler.test_headers 1266 self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'}) 1267 1268 def test_header_many(self): 1269 p = xmlrpclib.ServerProxy( 1270 URL, headers=[('X-Test', 'foo'), ('X-Test-Second', 'bar')]) 1271 self.assertEqual(p.pow(6, 8), 6**8) 1272 1273 headers = self.RequestHandler.test_headers 1274 self.assertContainsAdditionalHeaders( 1275 headers, {'X-Test': 'foo', 'X-Test-Second': 'bar'}) 1276 1277 def test_header_empty(self): 1278 p = xmlrpclib.ServerProxy(URL, headers=[]) 1279 self.assertEqual(p.pow(6, 8), 6**8) 1280 1281 headers = self.RequestHandler.test_headers 1282 self.assertContainsAdditionalHeaders(headers, {}) 1283 1284 def test_header_tuple(self): 1285 p = xmlrpclib.ServerProxy(URL, headers=(('X-Test', 'foo'),)) 1286 self.assertEqual(p.pow(6, 8), 6**8) 1287 1288 headers = self.RequestHandler.test_headers 1289 self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'}) 1290 1291 def test_header_items(self): 1292 p = xmlrpclib.ServerProxy(URL, headers={'X-Test': 'foo'}.items()) 1293 self.assertEqual(p.pow(6, 8), 6**8) 1294 1295 headers = self.RequestHandler.test_headers 1296 self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'}) 1297 1298 1299#Test special attributes of the ServerProxy object 1300class ServerProxyTestCase(unittest.TestCase): 1301 def setUp(self): 1302 unittest.TestCase.setUp(self) 1303 # Actual value of the URL doesn't matter if it is a string in 1304 # the correct format. 1305 self.url = 'http://fake.localhost' 1306 1307 def test_close(self): 1308 p = xmlrpclib.ServerProxy(self.url) 1309 self.assertEqual(p('close')(), None) 1310 1311 def test_transport(self): 1312 t = xmlrpclib.Transport() 1313 p = xmlrpclib.ServerProxy(self.url, transport=t) 1314 self.assertEqual(p('transport'), t) 1315 1316 1317# This is a contrived way to make a failure occur on the server side 1318# in order to test the _send_traceback_header flag on the server 1319class FailingMessageClass(http.client.HTTPMessage): 1320 def get(self, key, failobj=None): 1321 key = key.lower() 1322 if key == 'content-length': 1323 return 'I am broken' 1324 return super().get(key, failobj) 1325 1326 1327class FailingServerTestCase(unittest.TestCase): 1328 def setUp(self): 1329 self.evt = threading.Event() 1330 # start server thread to handle requests 1331 serv_args = (self.evt, 1) 1332 thread = threading.Thread(target=http_server, args=serv_args) 1333 thread.start() 1334 self.addCleanup(thread.join) 1335 1336 # wait for the server to be ready 1337 self.evt.wait() 1338 self.evt.clear() 1339 1340 def tearDown(self): 1341 # wait on the server thread to terminate 1342 self.evt.wait() 1343 # reset flag 1344 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False 1345 # reset message class 1346 default_class = http.client.HTTPMessage 1347 xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = default_class 1348 1349 def test_basic(self): 1350 # check that flag is false by default 1351 flagval = xmlrpc.server.SimpleXMLRPCServer._send_traceback_header 1352 self.assertEqual(flagval, False) 1353 1354 # enable traceback reporting 1355 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True 1356 1357 # test a call that shouldn't fail just as a smoke test 1358 try: 1359 p = xmlrpclib.ServerProxy(URL) 1360 self.assertEqual(p.pow(6,8), 6**8) 1361 except (xmlrpclib.ProtocolError, OSError) as e: 1362 # ignore failures due to non-blocking socket 'unavailable' errors 1363 if not is_unavailable_exception(e): 1364 # protocol error; provide additional information in test output 1365 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 1366 1367 def test_fail_no_info(self): 1368 # use the broken message class 1369 xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass 1370 1371 try: 1372 p = xmlrpclib.ServerProxy(URL) 1373 p.pow(6,8) 1374 except (xmlrpclib.ProtocolError, OSError) as e: 1375 # ignore failures due to non-blocking socket 'unavailable' errors 1376 if not is_unavailable_exception(e) and hasattr(e, "headers"): 1377 # The two server-side error headers shouldn't be sent back in this case 1378 self.assertTrue(e.headers.get("X-exception") is None) 1379 self.assertTrue(e.headers.get("X-traceback") is None) 1380 else: 1381 self.fail('ProtocolError not raised') 1382 1383 def test_fail_with_info(self): 1384 # use the broken message class 1385 xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass 1386 1387 # Check that errors in the server send back exception/traceback 1388 # info when flag is set 1389 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True 1390 1391 try: 1392 p = xmlrpclib.ServerProxy(URL) 1393 p.pow(6,8) 1394 except (xmlrpclib.ProtocolError, OSError) as e: 1395 # ignore failures due to non-blocking socket 'unavailable' errors 1396 if not is_unavailable_exception(e) and hasattr(e, "headers"): 1397 # We should get error info in the response 1398 expected_err = "invalid literal for int() with base 10: 'I am broken'" 1399 self.assertEqual(e.headers.get("X-exception"), expected_err) 1400 self.assertTrue(e.headers.get("X-traceback") is not None) 1401 else: 1402 self.fail('ProtocolError not raised') 1403 1404 1405@contextlib.contextmanager 1406def captured_stdout(encoding='utf-8'): 1407 """A variation on support.captured_stdout() which gives a text stream 1408 having a `buffer` attribute. 1409 """ 1410 orig_stdout = sys.stdout 1411 sys.stdout = io.TextIOWrapper(io.BytesIO(), encoding=encoding) 1412 try: 1413 yield sys.stdout 1414 finally: 1415 sys.stdout = orig_stdout 1416 1417 1418class CGIHandlerTestCase(unittest.TestCase): 1419 def setUp(self): 1420 self.cgi = xmlrpc.server.CGIXMLRPCRequestHandler() 1421 1422 def tearDown(self): 1423 self.cgi = None 1424 1425 def test_cgi_get(self): 1426 with os_helper.EnvironmentVarGuard() as env: 1427 env['REQUEST_METHOD'] = 'GET' 1428 # if the method is GET and no request_text is given, it runs handle_get 1429 # get sysout output 1430 with captured_stdout(encoding=self.cgi.encoding) as data_out: 1431 self.cgi.handle_request() 1432 1433 # parse Status header 1434 data_out.seek(0) 1435 handle = data_out.read() 1436 status = handle.split()[1] 1437 message = ' '.join(handle.split()[2:4]) 1438 1439 self.assertEqual(status, '400') 1440 self.assertEqual(message, 'Bad Request') 1441 1442 1443 def test_cgi_xmlrpc_response(self): 1444 data = """<?xml version='1.0'?> 1445 <methodCall> 1446 <methodName>test_method</methodName> 1447 <params> 1448 <param> 1449 <value><string>foo</string></value> 1450 </param> 1451 <param> 1452 <value><string>bar</string></value> 1453 </param> 1454 </params> 1455 </methodCall> 1456 """ 1457 1458 with os_helper.EnvironmentVarGuard() as env, \ 1459 captured_stdout(encoding=self.cgi.encoding) as data_out, \ 1460 support.captured_stdin() as data_in: 1461 data_in.write(data) 1462 data_in.seek(0) 1463 env['CONTENT_LENGTH'] = str(len(data)) 1464 self.cgi.handle_request() 1465 data_out.seek(0) 1466 1467 # will respond exception, if so, our goal is achieved ;) 1468 handle = data_out.read() 1469 1470 # start with 44th char so as not to get http header, we just 1471 # need only xml 1472 self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, handle[44:]) 1473 1474 # Also test the content-length returned by handle_request 1475 # Using the same test method inorder to avoid all the datapassing 1476 # boilerplate code. 1477 # Test for bug: http://bugs.python.org/issue5040 1478 1479 content = handle[handle.find("<?xml"):] 1480 1481 self.assertEqual( 1482 int(re.search(r'Content-Length: (\d+)', handle).group(1)), 1483 len(content)) 1484 1485 1486class UseBuiltinTypesTestCase(unittest.TestCase): 1487 1488 def test_use_builtin_types(self): 1489 # SimpleXMLRPCDispatcher.__init__ accepts use_builtin_types, which 1490 # makes all dispatch of binary data as bytes instances, and all 1491 # dispatch of datetime argument as datetime.datetime instances. 1492 self.log = [] 1493 expected_bytes = b"my dog has fleas" 1494 expected_date = datetime.datetime(2008, 5, 26, 18, 25, 12) 1495 marshaled = xmlrpclib.dumps((expected_bytes, expected_date), 'foobar') 1496 def foobar(*args): 1497 self.log.extend(args) 1498 handler = xmlrpc.server.SimpleXMLRPCDispatcher( 1499 allow_none=True, encoding=None, use_builtin_types=True) 1500 handler.register_function(foobar) 1501 handler._marshaled_dispatch(marshaled) 1502 self.assertEqual(len(self.log), 2) 1503 mybytes, mydate = self.log 1504 self.assertEqual(self.log, [expected_bytes, expected_date]) 1505 self.assertIs(type(mydate), datetime.datetime) 1506 self.assertIs(type(mybytes), bytes) 1507 1508 def test_cgihandler_has_use_builtin_types_flag(self): 1509 handler = xmlrpc.server.CGIXMLRPCRequestHandler(use_builtin_types=True) 1510 self.assertTrue(handler.use_builtin_types) 1511 1512 def test_xmlrpcserver_has_use_builtin_types_flag(self): 1513 server = xmlrpc.server.SimpleXMLRPCServer(("localhost", 0), 1514 use_builtin_types=True) 1515 server.server_close() 1516 self.assertTrue(server.use_builtin_types) 1517 1518 1519def setUpModule(): 1520 thread_info = threading_helper.threading_setup() 1521 unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info) 1522 1523 1524if __name__ == "__main__": 1525 unittest.main() 1526