1import base64
2import datetime
3import decimal
4import sys
5import time
6import unittest
7from unittest import mock
8import xmlrpc.client as xmlrpclib
9import xmlrpc.server
10import http.client
11import http, http.server
12import socket
13import threading
14import re
15import io
16import contextlib
17from test import support
18from test.support import os_helper
19from test.support import socket_helper
20from test.support import threading_helper
21from test.support import ALWAYS_EQ, LARGEST, SMALLEST
22
23try:
24    import gzip
25except ImportError:
26    gzip = None
27
28support.requires_working_socket(module=True)
29
30alist = [{'astring': '[email protected]',
31          'afloat': 7283.43,
32          'anint': 2**20,
33          'ashortlong': 2,
34          'anotherlist': ['.zyx.41'],
35          'abase64': xmlrpclib.Binary(b"my dog has fleas"),
36          'b64bytes': b"my dog has fleas",
37          'b64bytearray': bytearray(b"my dog has fleas"),
38          'boolean': False,
39          'unicode': '\u4000\u6000\u8000',
40          'ukey\u4000': 'regular value',
41          'datetime1': xmlrpclib.DateTime('20050210T11:41:23'),
42          'datetime2': xmlrpclib.DateTime(
43                        (2005, 2, 10, 11, 41, 23, 0, 1, -1)),
44          'datetime3': xmlrpclib.DateTime(
45                        datetime.datetime(2005, 2, 10, 11, 41, 23)),
46          }]
47
48class XMLRPCTestCase(unittest.TestCase):
49
50    def test_dump_load(self):
51        dump = xmlrpclib.dumps((alist,))
52        load = xmlrpclib.loads(dump)
53        self.assertEqual(alist, load[0][0])
54
55    def test_dump_bare_datetime(self):
56        # This checks that an unwrapped datetime.date object can be handled
57        # by the marshalling code.  This can't be done via test_dump_load()
58        # since with use_builtin_types set to 1 the unmarshaller would create
59        # datetime objects for the 'datetime[123]' keys as well
60        dt = datetime.datetime(2005, 2, 10, 11, 41, 23)
61        self.assertEqual(dt, xmlrpclib.DateTime('20050210T11:41:23'))
62        s = xmlrpclib.dumps((dt,))
63
64        result, m = xmlrpclib.loads(s, use_builtin_types=True)
65        (newdt,) = result
66        self.assertEqual(newdt, dt)
67        self.assertIs(type(newdt), datetime.datetime)
68        self.assertIsNone(m)
69
70        result, m = xmlrpclib.loads(s, use_builtin_types=False)
71        (newdt,) = result
72        self.assertEqual(newdt, dt)
73        self.assertIs(type(newdt), xmlrpclib.DateTime)
74        self.assertIsNone(m)
75
76        result, m = xmlrpclib.loads(s, use_datetime=True)
77        (newdt,) = result
78        self.assertEqual(newdt, dt)
79        self.assertIs(type(newdt), datetime.datetime)
80        self.assertIsNone(m)
81
82        result, m = xmlrpclib.loads(s, use_datetime=False)
83        (newdt,) = result
84        self.assertEqual(newdt, dt)
85        self.assertIs(type(newdt), xmlrpclib.DateTime)
86        self.assertIsNone(m)
87
88
89    def test_datetime_before_1900(self):
90        # same as before but with a date before 1900
91        dt = datetime.datetime(1,  2, 10, 11, 41, 23)
92        self.assertEqual(dt, xmlrpclib.DateTime('00010210T11:41:23'))
93        s = xmlrpclib.dumps((dt,))
94
95        result, m = xmlrpclib.loads(s, use_builtin_types=True)
96        (newdt,) = result
97        self.assertEqual(newdt, dt)
98        self.assertIs(type(newdt), datetime.datetime)
99        self.assertIsNone(m)
100
101        result, m = xmlrpclib.loads(s, use_builtin_types=False)
102        (newdt,) = result
103        self.assertEqual(newdt, dt)
104        self.assertIs(type(newdt), xmlrpclib.DateTime)
105        self.assertIsNone(m)
106
107    def test_bug_1164912 (self):
108        d = xmlrpclib.DateTime()
109        ((new_d,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((d,),
110                                            methodresponse=True))
111        self.assertIsInstance(new_d.value, str)
112
113        # Check that the output of dumps() is still an 8-bit string
114        s = xmlrpclib.dumps((new_d,), methodresponse=True)
115        self.assertIsInstance(s, str)
116
117    def test_newstyle_class(self):
118        class T(object):
119            pass
120        t = T()
121        t.x = 100
122        t.y = "Hello"
123        ((t2,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((t,)))
124        self.assertEqual(t2, t.__dict__)
125
126    def test_dump_big_long(self):
127        self.assertRaises(OverflowError, xmlrpclib.dumps, (2**99,))
128
129    def test_dump_bad_dict(self):
130        self.assertRaises(TypeError, xmlrpclib.dumps, ({(1,2,3): 1},))
131
132    def test_dump_recursive_seq(self):
133        l = [1,2,3]
134        t = [3,4,5,l]
135        l.append(t)
136        self.assertRaises(TypeError, xmlrpclib.dumps, (l,))
137
138    def test_dump_recursive_dict(self):
139        d = {'1':1, '2':1}
140        t = {'3':3, 'd':d}
141        d['t'] = t
142        self.assertRaises(TypeError, xmlrpclib.dumps, (d,))
143
144    def test_dump_big_int(self):
145        if sys.maxsize > 2**31-1:
146            self.assertRaises(OverflowError, xmlrpclib.dumps,
147                              (int(2**34),))
148
149        xmlrpclib.dumps((xmlrpclib.MAXINT, xmlrpclib.MININT))
150        self.assertRaises(OverflowError, xmlrpclib.dumps,
151                          (xmlrpclib.MAXINT+1,))
152        self.assertRaises(OverflowError, xmlrpclib.dumps,
153                          (xmlrpclib.MININT-1,))
154
155        def dummy_write(s):
156            pass
157
158        m = xmlrpclib.Marshaller()
159        m.dump_int(xmlrpclib.MAXINT, dummy_write)
160        m.dump_int(xmlrpclib.MININT, dummy_write)
161        self.assertRaises(OverflowError, m.dump_int,
162                          xmlrpclib.MAXINT+1, dummy_write)
163        self.assertRaises(OverflowError, m.dump_int,
164                          xmlrpclib.MININT-1, dummy_write)
165
166    def test_dump_double(self):
167        xmlrpclib.dumps((float(2 ** 34),))
168        xmlrpclib.dumps((float(xmlrpclib.MAXINT),
169                         float(xmlrpclib.MININT)))
170        xmlrpclib.dumps((float(xmlrpclib.MAXINT + 42),
171                         float(xmlrpclib.MININT - 42)))
172
173        def dummy_write(s):
174            pass
175
176        m = xmlrpclib.Marshaller()
177        m.dump_double(xmlrpclib.MAXINT, dummy_write)
178        m.dump_double(xmlrpclib.MININT, dummy_write)
179        m.dump_double(xmlrpclib.MAXINT + 42, dummy_write)
180        m.dump_double(xmlrpclib.MININT - 42, dummy_write)
181
182    def test_dump_none(self):
183        value = alist + [None]
184        arg1 = (alist + [None],)
185        strg = xmlrpclib.dumps(arg1, allow_none=True)
186        self.assertEqual(value,
187                          xmlrpclib.loads(strg)[0][0])
188        self.assertRaises(TypeError, xmlrpclib.dumps, (arg1,))
189
190    def test_dump_encoding(self):
191        value = {'key\u20ac\xa4':
192                 'value\u20ac\xa4'}
193        strg = xmlrpclib.dumps((value,), encoding='iso-8859-15')
194        strg = "<?xml version='1.0' encoding='iso-8859-15'?>" + strg
195        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
196        strg = strg.encode('iso-8859-15', 'xmlcharrefreplace')
197        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
198
199        strg = xmlrpclib.dumps((value,), encoding='iso-8859-15',
200                               methodresponse=True)
201        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
202        strg = strg.encode('iso-8859-15', 'xmlcharrefreplace')
203        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
204
205        methodname = 'method\u20ac\xa4'
206        strg = xmlrpclib.dumps((value,), encoding='iso-8859-15',
207                               methodname=methodname)
208        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
209        self.assertEqual(xmlrpclib.loads(strg)[1], methodname)
210
211    def test_dump_bytes(self):
212        sample = b"my dog has fleas"
213        self.assertEqual(sample, xmlrpclib.Binary(sample))
214        for type_ in bytes, bytearray, xmlrpclib.Binary:
215            value = type_(sample)
216            s = xmlrpclib.dumps((value,))
217
218            result, m = xmlrpclib.loads(s, use_builtin_types=True)
219            (newvalue,) = result
220            self.assertEqual(newvalue, sample)
221            self.assertIs(type(newvalue), bytes)
222            self.assertIsNone(m)
223
224            result, m = xmlrpclib.loads(s, use_builtin_types=False)
225            (newvalue,) = result
226            self.assertEqual(newvalue, sample)
227            self.assertIs(type(newvalue), xmlrpclib.Binary)
228            self.assertIsNone(m)
229
230    def test_loads_unsupported(self):
231        ResponseError = xmlrpclib.ResponseError
232        data = '<params><param><value><spam/></value></param></params>'
233        self.assertRaises(ResponseError, xmlrpclib.loads, data)
234        data = ('<params><param><value><array>'
235                '<value><spam/></value>'
236                '</array></value></param></params>')
237        self.assertRaises(ResponseError, xmlrpclib.loads, data)
238        data = ('<params><param><value><struct>'
239                '<member><name>a</name><value><spam/></value></member>'
240                '<member><name>b</name><value><spam/></value></member>'
241                '</struct></value></param></params>')
242        self.assertRaises(ResponseError, xmlrpclib.loads, data)
243
244    def check_loads(self, s, value, **kwargs):
245        dump = '<params><param><value>%s</value></param></params>' % s
246        result, m = xmlrpclib.loads(dump, **kwargs)
247        (newvalue,) = result
248        self.assertEqual(newvalue, value)
249        self.assertIs(type(newvalue), type(value))
250        self.assertIsNone(m)
251
252    def test_load_standard_types(self):
253        check = self.check_loads
254        check('string', 'string')
255        check('<string>string</string>', 'string')
256        check('<string>�������������� string</string>', '�������������� string')
257        check('<int>2056183947</int>', 2056183947)
258        check('<int>-2056183947</int>', -2056183947)
259        check('<i4>2056183947</i4>', 2056183947)
260        check('<double>46093.78125</double>', 46093.78125)
261        check('<boolean>0</boolean>', False)
262        check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>',
263              xmlrpclib.Binary(b'\x00byte string\xff'))
264        check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>',
265              b'\x00byte string\xff', use_builtin_types=True)
266        check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>',
267              xmlrpclib.DateTime('20050210T11:41:23'))
268        check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>',
269              datetime.datetime(2005, 2, 10, 11, 41, 23),
270              use_builtin_types=True)
271        check('<array><data>'
272              '<value><int>1</int></value><value><int>2</int></value>'
273              '</data></array>', [1, 2])
274        check('<struct>'
275              '<member><name>b</name><value><int>2</int></value></member>'
276              '<member><name>a</name><value><int>1</int></value></member>'
277              '</struct>', {'a': 1, 'b': 2})
278
279    def test_load_extension_types(self):
280        check = self.check_loads
281        check('<nil/>', None)
282        check('<ex:nil/>', None)
283        check('<i1>205</i1>', 205)
284        check('<i2>20561</i2>', 20561)
285        check('<i8>9876543210</i8>', 9876543210)
286        check('<biginteger>98765432100123456789</biginteger>',
287              98765432100123456789)
288        check('<float>93.78125</float>', 93.78125)
289        check('<bigdecimal>9876543210.0123456789</bigdecimal>',
290              decimal.Decimal('9876543210.0123456789'))
291
292    def test_limit_int(self):
293        check = self.check_loads
294        maxdigits = 5000
295        with support.adjust_int_max_str_digits(maxdigits):
296            s = '1' * (maxdigits + 1)
297            with self.assertRaises(ValueError):
298                check(f'<int>{s}</int>', None)
299            with self.assertRaises(ValueError):
300                check(f'<biginteger>{s}</biginteger>', None)
301
302    def test_get_host_info(self):
303        # see bug #3613, this raised a TypeError
304        transp = xmlrpc.client.Transport()
305        self.assertEqual(transp.get_host_info("[email protected]"),
306                          ('host.tld',
307                           [('Authorization', 'Basic dXNlcg==')], {}))
308
309    def test_ssl_presence(self):
310        try:
311            import ssl
312        except ImportError:
313            has_ssl = False
314        else:
315            has_ssl = True
316        try:
317            xmlrpc.client.ServerProxy('https://localhost:9999').bad_function()
318        except NotImplementedError:
319            self.assertFalse(has_ssl, "xmlrpc client's error with SSL support")
320        except OSError:
321            self.assertTrue(has_ssl)
322
323    def test_keepalive_disconnect(self):
324        class RequestHandler(http.server.BaseHTTPRequestHandler):
325            protocol_version = "HTTP/1.1"
326            handled = False
327
328            def do_POST(self):
329                length = int(self.headers.get("Content-Length"))
330                self.rfile.read(length)
331                if self.handled:
332                    self.close_connection = True
333                    return
334                response = xmlrpclib.dumps((5,), methodresponse=True)
335                response = response.encode()
336                self.send_response(http.HTTPStatus.OK)
337                self.send_header("Content-Length", len(response))
338                self.end_headers()
339                self.wfile.write(response)
340                self.handled = True
341                self.close_connection = False
342
343            def log_message(self, format, *args):
344                # don't clobber sys.stderr
345                pass
346
347        def run_server():
348            server.socket.settimeout(float(1))  # Don't hang if client fails
349            server.handle_request()  # First request and attempt at second
350            server.handle_request()  # Retried second request
351
352        server = http.server.HTTPServer((socket_helper.HOST, 0), RequestHandler)
353        self.addCleanup(server.server_close)
354        thread = threading.Thread(target=run_server)
355        thread.start()
356        self.addCleanup(thread.join)
357        url = "http://{}:{}/".format(*server.server_address)
358        with xmlrpclib.ServerProxy(url) as p:
359            self.assertEqual(p.method(), 5)
360            self.assertEqual(p.method(), 5)
361
362
363class SimpleXMLRPCDispatcherTestCase(unittest.TestCase):
364    class DispatchExc(Exception):
365        """Raised inside the dispatched functions when checking for
366        chained exceptions"""
367
368    def test_call_registered_func(self):
369        """Calls explicitly registered function"""
370        # Makes sure any exception raised inside the function has no other
371        # exception chained to it
372
373        exp_params = 1, 2, 3
374
375        def dispatched_func(*params):
376            raise self.DispatchExc(params)
377
378        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
379        dispatcher.register_function(dispatched_func)
380        with self.assertRaises(self.DispatchExc) as exc_ctx:
381            dispatcher._dispatch('dispatched_func', exp_params)
382        self.assertEqual(exc_ctx.exception.args, (exp_params,))
383        self.assertIsNone(exc_ctx.exception.__cause__)
384        self.assertIsNone(exc_ctx.exception.__context__)
385
386    def test_call_instance_func(self):
387        """Calls a registered instance attribute as a function"""
388        # Makes sure any exception raised inside the function has no other
389        # exception chained to it
390
391        exp_params = 1, 2, 3
392
393        class DispatchedClass:
394            def dispatched_func(self, *params):
395                raise SimpleXMLRPCDispatcherTestCase.DispatchExc(params)
396
397        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
398        dispatcher.register_instance(DispatchedClass())
399        with self.assertRaises(self.DispatchExc) as exc_ctx:
400            dispatcher._dispatch('dispatched_func', exp_params)
401        self.assertEqual(exc_ctx.exception.args, (exp_params,))
402        self.assertIsNone(exc_ctx.exception.__cause__)
403        self.assertIsNone(exc_ctx.exception.__context__)
404
405    def test_call_dispatch_func(self):
406        """Calls the registered instance's `_dispatch` function"""
407        # Makes sure any exception raised inside the function has no other
408        # exception chained to it
409
410        exp_method = 'method'
411        exp_params = 1, 2, 3
412
413        class TestInstance:
414            def _dispatch(self, method, params):
415                raise SimpleXMLRPCDispatcherTestCase.DispatchExc(
416                    method, params)
417
418        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
419        dispatcher.register_instance(TestInstance())
420        with self.assertRaises(self.DispatchExc) as exc_ctx:
421            dispatcher._dispatch(exp_method, exp_params)
422        self.assertEqual(exc_ctx.exception.args, (exp_method, exp_params))
423        self.assertIsNone(exc_ctx.exception.__cause__)
424        self.assertIsNone(exc_ctx.exception.__context__)
425
426    def test_registered_func_is_none(self):
427        """Calls explicitly registered function which is None"""
428
429        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
430        dispatcher.register_function(None, name='method')
431        with self.assertRaisesRegex(Exception, 'method'):
432            dispatcher._dispatch('method', ('param',))
433
434    def test_instance_has_no_func(self):
435        """Attempts to call nonexistent function on a registered instance"""
436
437        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
438        dispatcher.register_instance(object())
439        with self.assertRaisesRegex(Exception, 'method'):
440            dispatcher._dispatch('method', ('param',))
441
442    def test_cannot_locate_func(self):
443        """Calls a function that the dispatcher cannot locate"""
444
445        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
446        with self.assertRaisesRegex(Exception, 'method'):
447            dispatcher._dispatch('method', ('param',))
448
449
450class HelperTestCase(unittest.TestCase):
451    def test_escape(self):
452        self.assertEqual(xmlrpclib.escape("a&b"), "a&amp;b")
453        self.assertEqual(xmlrpclib.escape("a<b"), "a&lt;b")
454        self.assertEqual(xmlrpclib.escape("a>b"), "a&gt;b")
455
456class FaultTestCase(unittest.TestCase):
457    def test_repr(self):
458        f = xmlrpclib.Fault(42, 'Test Fault')
459        self.assertEqual(repr(f), "<Fault 42: 'Test Fault'>")
460        self.assertEqual(repr(f), str(f))
461
462    def test_dump_fault(self):
463        f = xmlrpclib.Fault(42, 'Test Fault')
464        s = xmlrpclib.dumps((f,))
465        (newf,), m = xmlrpclib.loads(s)
466        self.assertEqual(newf, {'faultCode': 42, 'faultString': 'Test Fault'})
467        self.assertEqual(m, None)
468
469        s = xmlrpclib.Marshaller().dumps(f)
470        self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s)
471
472    def test_dotted_attribute(self):
473        # this will raise AttributeError because code don't want us to use
474        # private methods
475        self.assertRaises(AttributeError,
476                          xmlrpc.server.resolve_dotted_attribute, str, '__add')
477        self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title'))
478
479class DateTimeTestCase(unittest.TestCase):
480    def test_default(self):
481        with mock.patch('time.localtime') as localtime_mock:
482            time_struct = time.struct_time(
483                [2013, 7, 15, 0, 24, 49, 0, 196, 0])
484            localtime_mock.return_value = time_struct
485            localtime = time.localtime()
486            t = xmlrpclib.DateTime()
487            self.assertEqual(str(t),
488                             time.strftime("%Y%m%dT%H:%M:%S", localtime))
489
490    def test_time(self):
491        d = 1181399930.036952
492        t = xmlrpclib.DateTime(d)
493        self.assertEqual(str(t),
494                         time.strftime("%Y%m%dT%H:%M:%S", time.localtime(d)))
495
496    def test_time_tuple(self):
497        d = (2007,6,9,10,38,50,5,160,0)
498        t = xmlrpclib.DateTime(d)
499        self.assertEqual(str(t), '20070609T10:38:50')
500
501    def test_time_struct(self):
502        d = time.localtime(1181399930.036952)
503        t = xmlrpclib.DateTime(d)
504        self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", d))
505
506    def test_datetime_datetime(self):
507        d = datetime.datetime(2007,1,2,3,4,5)
508        t = xmlrpclib.DateTime(d)
509        self.assertEqual(str(t), '20070102T03:04:05')
510
511    def test_repr(self):
512        d = datetime.datetime(2007,1,2,3,4,5)
513        t = xmlrpclib.DateTime(d)
514        val ="<DateTime '20070102T03:04:05' at %#x>" % id(t)
515        self.assertEqual(repr(t), val)
516
517    def test_decode(self):
518        d = ' 20070908T07:11:13  '
519        t1 = xmlrpclib.DateTime()
520        t1.decode(d)
521        tref = xmlrpclib.DateTime(datetime.datetime(2007,9,8,7,11,13))
522        self.assertEqual(t1, tref)
523
524        t2 = xmlrpclib._datetime(d)
525        self.assertEqual(t2, tref)
526
527    def test_comparison(self):
528        now = datetime.datetime.now()
529        dtime = xmlrpclib.DateTime(now.timetuple())
530
531        # datetime vs. DateTime
532        self.assertTrue(dtime == now)
533        self.assertTrue(now == dtime)
534        then = now + datetime.timedelta(seconds=4)
535        self.assertTrue(then >= dtime)
536        self.assertTrue(dtime < then)
537
538        # str vs. DateTime
539        dstr = now.strftime("%Y%m%dT%H:%M:%S")
540        self.assertTrue(dtime == dstr)
541        self.assertTrue(dstr == dtime)
542        dtime_then = xmlrpclib.DateTime(then.timetuple())
543        self.assertTrue(dtime_then >= dstr)
544        self.assertTrue(dstr < dtime_then)
545
546        # some other types
547        dbytes = dstr.encode('ascii')
548        dtuple = now.timetuple()
549        self.assertFalse(dtime == 1970)
550        self.assertTrue(dtime != dbytes)
551        self.assertFalse(dtime == bytearray(dbytes))
552        self.assertTrue(dtime != dtuple)
553        with self.assertRaises(TypeError):
554            dtime < float(1970)
555        with self.assertRaises(TypeError):
556            dtime > dbytes
557        with self.assertRaises(TypeError):
558            dtime <= bytearray(dbytes)
559        with self.assertRaises(TypeError):
560            dtime >= dtuple
561
562        self.assertTrue(dtime == ALWAYS_EQ)
563        self.assertFalse(dtime != ALWAYS_EQ)
564        self.assertTrue(dtime < LARGEST)
565        self.assertFalse(dtime > LARGEST)
566        self.assertTrue(dtime <= LARGEST)
567        self.assertFalse(dtime >= LARGEST)
568        self.assertFalse(dtime < SMALLEST)
569        self.assertTrue(dtime > SMALLEST)
570        self.assertFalse(dtime <= SMALLEST)
571        self.assertTrue(dtime >= SMALLEST)
572
573
574class BinaryTestCase(unittest.TestCase):
575
576    # XXX What should str(Binary(b"\xff")) return?  I'm choosing "\xff"
577    # for now (i.e. interpreting the binary data as Latin-1-encoded
578    # text).  But this feels very unsatisfactory.  Perhaps we should
579    # only define repr(), and return r"Binary(b'\xff')" instead?
580
581    def test_default(self):
582        t = xmlrpclib.Binary()
583        self.assertEqual(str(t), '')
584
585    def test_string(self):
586        d = b'\x01\x02\x03abc123\xff\xfe'
587        t = xmlrpclib.Binary(d)
588        self.assertEqual(str(t), str(d, "latin-1"))
589
590    def test_decode(self):
591        d = b'\x01\x02\x03abc123\xff\xfe'
592        de = base64.encodebytes(d)
593        t1 = xmlrpclib.Binary()
594        t1.decode(de)
595        self.assertEqual(str(t1), str(d, "latin-1"))
596
597        t2 = xmlrpclib._binary(de)
598        self.assertEqual(str(t2), str(d, "latin-1"))
599
600
601ADDR = PORT = URL = None
602
603# The evt is set twice.  First when the server is ready to serve.
604# Second when the server has been shutdown.  The user must clear
605# the event after it has been set the first time to catch the second set.
606def http_server(evt, numrequests, requestHandler=None, encoding=None):
607    class TestInstanceClass:
608        def div(self, x, y):
609            return x // y
610
611        def _methodHelp(self, name):
612            if name == 'div':
613                return 'This is the div function'
614
615        class Fixture:
616            @staticmethod
617            def getData():
618                return '42'
619
620    class MyXMLRPCServer(xmlrpc.server.SimpleXMLRPCServer):
621        def get_request(self):
622            # Ensure the socket is always non-blocking.  On Linux, socket
623            # attributes are not inherited like they are on *BSD and Windows.
624            s, port = self.socket.accept()
625            s.setblocking(True)
626            return s, port
627
628    if not requestHandler:
629        requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler
630    serv = MyXMLRPCServer(("localhost", 0), requestHandler,
631                          encoding=encoding,
632                          logRequests=False, bind_and_activate=False)
633    try:
634        serv.server_bind()
635        global ADDR, PORT, URL
636        ADDR, PORT = serv.socket.getsockname()
637        #connect to IP address directly.  This avoids socket.create_connection()
638        #trying to connect to "localhost" using all address families, which
639        #causes slowdown e.g. on vista which supports AF_INET6.  The server listens
640        #on AF_INET only.
641        URL = "http://%s:%d"%(ADDR, PORT)
642        serv.server_activate()
643        serv.register_introspection_functions()
644        serv.register_multicall_functions()
645        serv.register_function(pow)
646        serv.register_function(lambda x: x, 'têšt')
647        @serv.register_function
648        def my_function():
649            '''This is my function'''
650            return True
651        @serv.register_function(name='add')
652        def _(x, y):
653            return x + y
654        testInstance = TestInstanceClass()
655        serv.register_instance(testInstance, allow_dotted_names=True)
656        evt.set()
657
658        # handle up to 'numrequests' requests
659        while numrequests > 0:
660            serv.handle_request()
661            numrequests -= 1
662
663    except TimeoutError:
664        pass
665    finally:
666        serv.socket.close()
667        PORT = None
668        evt.set()
669
670def http_multi_server(evt, numrequests, requestHandler=None):
671    class TestInstanceClass:
672        def div(self, x, y):
673            return x // y
674
675        def _methodHelp(self, name):
676            if name == 'div':
677                return 'This is the div function'
678
679    def my_function():
680        '''This is my function'''
681        return True
682
683    class MyXMLRPCServer(xmlrpc.server.MultiPathXMLRPCServer):
684        def get_request(self):
685            # Ensure the socket is always non-blocking.  On Linux, socket
686            # attributes are not inherited like they are on *BSD and Windows.
687            s, port = self.socket.accept()
688            s.setblocking(True)
689            return s, port
690
691    if not requestHandler:
692        requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler
693    class MyRequestHandler(requestHandler):
694        rpc_paths = []
695
696    class BrokenDispatcher:
697        def _marshaled_dispatch(self, data, dispatch_method=None, path=None):
698            raise RuntimeError("broken dispatcher")
699
700    serv = MyXMLRPCServer(("localhost", 0), MyRequestHandler,
701                          logRequests=False, bind_and_activate=False)
702    serv.socket.settimeout(3)
703    serv.server_bind()
704    try:
705        global ADDR, PORT, URL
706        ADDR, PORT = serv.socket.getsockname()
707        #connect to IP address directly.  This avoids socket.create_connection()
708        #trying to connect to "localhost" using all address families, which
709        #causes slowdown e.g. on vista which supports AF_INET6.  The server listens
710        #on AF_INET only.
711        URL = "http://%s:%d"%(ADDR, PORT)
712        serv.server_activate()
713        paths = [
714            "/foo", "/foo/bar",
715            "/foo?k=v", "/foo#frag", "/foo?k=v#frag",
716            "", "/", "/RPC2", "?k=v", "#frag",
717        ]
718        for path in paths:
719            d = serv.add_dispatcher(path, xmlrpc.server.SimpleXMLRPCDispatcher())
720            d.register_introspection_functions()
721            d.register_multicall_functions()
722            d.register_function(lambda p=path: p, 'test')
723        serv.get_dispatcher(paths[0]).register_function(pow)
724        serv.get_dispatcher(paths[1]).register_function(lambda x,y: x+y, 'add')
725        serv.add_dispatcher("/is/broken", BrokenDispatcher())
726        evt.set()
727
728        # handle up to 'numrequests' requests
729        while numrequests > 0:
730            serv.handle_request()
731            numrequests -= 1
732
733    except TimeoutError:
734        pass
735    finally:
736        serv.socket.close()
737        PORT = None
738        evt.set()
739
740# This function prevents errors like:
741#    <ProtocolError for localhost:57527/RPC2: 500 Internal Server Error>
742def is_unavailable_exception(e):
743    '''Returns True if the given ProtocolError is the product of a server-side
744       exception caused by the 'temporarily unavailable' response sometimes
745       given by operations on non-blocking sockets.'''
746
747    # sometimes we get a -1 error code and/or empty headers
748    try:
749        if e.errcode == -1 or e.headers is None:
750            return True
751        exc_mess = e.headers.get('X-exception')
752    except AttributeError:
753        # Ignore OSErrors here.
754        exc_mess = str(e)
755
756    if exc_mess and 'temporarily unavailable' in exc_mess.lower():
757        return True
758
759def make_request_and_skipIf(condition, reason):
760    # If we skip the test, we have to make a request because
761    # the server created in setUp blocks expecting one to come in.
762    if not condition:
763        return lambda func: func
764    def decorator(func):
765        def make_request_and_skip(self):
766            try:
767                xmlrpclib.ServerProxy(URL).my_function()
768            except (xmlrpclib.ProtocolError, OSError) as e:
769                if not is_unavailable_exception(e):
770                    raise
771            raise unittest.SkipTest(reason)
772        return make_request_and_skip
773    return decorator
774
775class BaseServerTestCase(unittest.TestCase):
776    requestHandler = None
777    request_count = 1
778    threadFunc = staticmethod(http_server)
779
780    def setUp(self):
781        # enable traceback reporting
782        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
783
784        self.evt = threading.Event()
785        # start server thread to handle requests
786        serv_args = (self.evt, self.request_count, self.requestHandler)
787        thread = threading.Thread(target=self.threadFunc, args=serv_args)
788        thread.start()
789        self.addCleanup(thread.join)
790
791        # wait for the server to be ready
792        self.evt.wait()
793        self.evt.clear()
794
795    def tearDown(self):
796        # wait on the server thread to terminate
797        self.evt.wait()
798
799        # disable traceback reporting
800        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False
801
802class SimpleServerTestCase(BaseServerTestCase):
803    def test_simple1(self):
804        try:
805            p = xmlrpclib.ServerProxy(URL)
806            self.assertEqual(p.pow(6,8), 6**8)
807        except (xmlrpclib.ProtocolError, OSError) as e:
808            # ignore failures due to non-blocking socket 'unavailable' errors
809            if not is_unavailable_exception(e):
810                # protocol error; provide additional information in test output
811                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
812
813    def test_nonascii(self):
814        start_string = 'P\N{LATIN SMALL LETTER Y WITH CIRCUMFLEX}t'
815        end_string = 'h\N{LATIN SMALL LETTER O WITH HORN}n'
816        try:
817            p = xmlrpclib.ServerProxy(URL)
818            self.assertEqual(p.add(start_string, end_string),
819                             start_string + end_string)
820        except (xmlrpclib.ProtocolError, OSError) as e:
821            # ignore failures due to non-blocking socket 'unavailable' errors
822            if not is_unavailable_exception(e):
823                # protocol error; provide additional information in test output
824                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
825
826    def test_client_encoding(self):
827        start_string = '\u20ac'
828        end_string = '\xa4'
829
830        try:
831            p = xmlrpclib.ServerProxy(URL, encoding='iso-8859-15')
832            self.assertEqual(p.add(start_string, end_string),
833                             start_string + end_string)
834        except (xmlrpclib.ProtocolError, socket.error) as e:
835            # ignore failures due to non-blocking socket unavailable errors.
836            if not is_unavailable_exception(e):
837                # protocol error; provide additional information in test output
838                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
839
840    def test_nonascii_methodname(self):
841        try:
842            p = xmlrpclib.ServerProxy(URL, encoding='ascii')
843            self.assertEqual(p.têšt(42), 42)
844        except (xmlrpclib.ProtocolError, socket.error) as e:
845            # ignore failures due to non-blocking socket unavailable errors.
846            if not is_unavailable_exception(e):
847                # protocol error; provide additional information in test output
848                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
849
850    def test_404(self):
851        # send POST with http.client, it should return 404 header and
852        # 'Not Found' message.
853        with contextlib.closing(http.client.HTTPConnection(ADDR, PORT)) as conn:
854            conn.request('POST', '/this-is-not-valid')
855            response = conn.getresponse()
856
857        self.assertEqual(response.status, 404)
858        self.assertEqual(response.reason, 'Not Found')
859
860    def test_introspection1(self):
861        expected_methods = set(['pow', 'div', 'my_function', 'add', 'têšt',
862                                'system.listMethods', 'system.methodHelp',
863                                'system.methodSignature', 'system.multicall',
864                                'Fixture'])
865        try:
866            p = xmlrpclib.ServerProxy(URL)
867            meth = p.system.listMethods()
868            self.assertEqual(set(meth), expected_methods)
869        except (xmlrpclib.ProtocolError, OSError) as e:
870            # ignore failures due to non-blocking socket 'unavailable' errors
871            if not is_unavailable_exception(e):
872                # protocol error; provide additional information in test output
873                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
874
875
876    def test_introspection2(self):
877        try:
878            # test _methodHelp()
879            p = xmlrpclib.ServerProxy(URL)
880            divhelp = p.system.methodHelp('div')
881            self.assertEqual(divhelp, 'This is the div function')
882        except (xmlrpclib.ProtocolError, OSError) as e:
883            # ignore failures due to non-blocking socket 'unavailable' errors
884            if not is_unavailable_exception(e):
885                # protocol error; provide additional information in test output
886                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
887
888    @make_request_and_skipIf(sys.flags.optimize >= 2,
889                     "Docstrings are omitted with -O2 and above")
890    def test_introspection3(self):
891        try:
892            # test native doc
893            p = xmlrpclib.ServerProxy(URL)
894            myfunction = p.system.methodHelp('my_function')
895            self.assertEqual(myfunction, 'This is my function')
896        except (xmlrpclib.ProtocolError, OSError) as e:
897            # ignore failures due to non-blocking socket 'unavailable' errors
898            if not is_unavailable_exception(e):
899                # protocol error; provide additional information in test output
900                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
901
902    def test_introspection4(self):
903        # the SimpleXMLRPCServer doesn't support signatures, but
904        # at least check that we can try making the call
905        try:
906            p = xmlrpclib.ServerProxy(URL)
907            divsig = p.system.methodSignature('div')
908            self.assertEqual(divsig, 'signatures not supported')
909        except (xmlrpclib.ProtocolError, OSError) as e:
910            # ignore failures due to non-blocking socket 'unavailable' errors
911            if not is_unavailable_exception(e):
912                # protocol error; provide additional information in test output
913                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
914
915    def test_multicall(self):
916        try:
917            p = xmlrpclib.ServerProxy(URL)
918            multicall = xmlrpclib.MultiCall(p)
919            multicall.add(2,3)
920            multicall.pow(6,8)
921            multicall.div(127,42)
922            add_result, pow_result, div_result = multicall()
923            self.assertEqual(add_result, 2+3)
924            self.assertEqual(pow_result, 6**8)
925            self.assertEqual(div_result, 127//42)
926        except (xmlrpclib.ProtocolError, OSError) as e:
927            # ignore failures due to non-blocking socket 'unavailable' errors
928            if not is_unavailable_exception(e):
929                # protocol error; provide additional information in test output
930                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
931
932    def test_non_existing_multicall(self):
933        try:
934            p = xmlrpclib.ServerProxy(URL)
935            multicall = xmlrpclib.MultiCall(p)
936            multicall.this_is_not_exists()
937            result = multicall()
938
939            # result.results contains;
940            # [{'faultCode': 1, 'faultString': '<class \'exceptions.Exception\'>:'
941            #   'method "this_is_not_exists" is not supported'>}]
942
943            self.assertEqual(result.results[0]['faultCode'], 1)
944            self.assertEqual(result.results[0]['faultString'],
945                '<class \'Exception\'>:method "this_is_not_exists" '
946                'is not supported')
947        except (xmlrpclib.ProtocolError, OSError) as e:
948            # ignore failures due to non-blocking socket 'unavailable' errors
949            if not is_unavailable_exception(e):
950                # protocol error; provide additional information in test output
951                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
952
953    def test_dotted_attribute(self):
954        # Raises an AttributeError because private methods are not allowed.
955        self.assertRaises(AttributeError,
956                          xmlrpc.server.resolve_dotted_attribute, str, '__add')
957
958        self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title'))
959        # Get the test to run faster by sending a request with test_simple1.
960        # This avoids waiting for the socket timeout.
961        self.test_simple1()
962
963    def test_allow_dotted_names_true(self):
964        # XXX also need allow_dotted_names_false test.
965        server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT))
966        data = server.Fixture.getData()
967        self.assertEqual(data, '42')
968
969    def test_unicode_host(self):
970        server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT))
971        self.assertEqual(server.add("a", "\xe9"), "a\xe9")
972
973    def test_partial_post(self):
974        # Check that a partial POST doesn't make the server loop: issue #14001.
975        with contextlib.closing(socket.create_connection((ADDR, PORT))) as conn:
976            conn.send('POST /RPC2 HTTP/1.0\r\n'
977                      'Content-Length: 100\r\n\r\n'
978                      'bye HTTP/1.1\r\n'
979                      f'Host: {ADDR}:{PORT}\r\n'
980                      'Accept-Encoding: identity\r\n'
981                      'Content-Length: 0\r\n\r\n'.encode('ascii'))
982
983    def test_context_manager(self):
984        with xmlrpclib.ServerProxy(URL) as server:
985            server.add(2, 3)
986            self.assertNotEqual(server('transport')._connection,
987                                (None, None))
988        self.assertEqual(server('transport')._connection,
989                         (None, None))
990
991    def test_context_manager_method_error(self):
992        try:
993            with xmlrpclib.ServerProxy(URL) as server:
994                server.add(2, "a")
995        except xmlrpclib.Fault:
996            pass
997        self.assertEqual(server('transport')._connection,
998                         (None, None))
999
1000
1001class SimpleServerEncodingTestCase(BaseServerTestCase):
1002    @staticmethod
1003    def threadFunc(evt, numrequests, requestHandler=None, encoding=None):
1004        http_server(evt, numrequests, requestHandler, 'iso-8859-15')
1005
1006    def test_server_encoding(self):
1007        start_string = '\u20ac'
1008        end_string = '\xa4'
1009
1010        try:
1011            p = xmlrpclib.ServerProxy(URL)
1012            self.assertEqual(p.add(start_string, end_string),
1013                             start_string + end_string)
1014        except (xmlrpclib.ProtocolError, socket.error) as e:
1015            # ignore failures due to non-blocking socket unavailable errors.
1016            if not is_unavailable_exception(e):
1017                # protocol error; provide additional information in test output
1018                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
1019
1020
1021class MultiPathServerTestCase(BaseServerTestCase):
1022    threadFunc = staticmethod(http_multi_server)
1023    request_count = 2
1024    def test_path1(self):
1025        p = xmlrpclib.ServerProxy(URL+"/foo")
1026        self.assertEqual(p.pow(6,8), 6**8)
1027        self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
1028
1029    def test_path2(self):
1030        p = xmlrpclib.ServerProxy(URL+"/foo/bar")
1031        self.assertEqual(p.add(6,8), 6+8)
1032        self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8)
1033
1034    def test_path3(self):
1035        p = xmlrpclib.ServerProxy(URL+"/is/broken")
1036        self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
1037
1038    def test_invalid_path(self):
1039        p = xmlrpclib.ServerProxy(URL+"/invalid")
1040        self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
1041
1042    def test_path_query_fragment(self):
1043        p = xmlrpclib.ServerProxy(URL+"/foo?k=v#frag")
1044        self.assertEqual(p.test(), "/foo?k=v#frag")
1045
1046    def test_path_fragment(self):
1047        p = xmlrpclib.ServerProxy(URL+"/foo#frag")
1048        self.assertEqual(p.test(), "/foo#frag")
1049
1050    def test_path_query(self):
1051        p = xmlrpclib.ServerProxy(URL+"/foo?k=v")
1052        self.assertEqual(p.test(), "/foo?k=v")
1053
1054    def test_empty_path(self):
1055        p = xmlrpclib.ServerProxy(URL)
1056        self.assertEqual(p.test(), "/RPC2")
1057
1058    def test_root_path(self):
1059        p = xmlrpclib.ServerProxy(URL + "/")
1060        self.assertEqual(p.test(), "/")
1061
1062    def test_empty_path_query(self):
1063        p = xmlrpclib.ServerProxy(URL + "?k=v")
1064        self.assertEqual(p.test(), "?k=v")
1065
1066    def test_empty_path_fragment(self):
1067        p = xmlrpclib.ServerProxy(URL + "#frag")
1068        self.assertEqual(p.test(), "#frag")
1069
1070
1071#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
1072#does indeed serve subsequent requests on the same connection
1073class BaseKeepaliveServerTestCase(BaseServerTestCase):
1074    #a request handler that supports keep-alive and logs requests into a
1075    #class variable
1076    class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
1077        parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler
1078        protocol_version = 'HTTP/1.1'
1079        myRequests = []
1080        def handle(self):
1081            self.myRequests.append([])
1082            self.reqidx = len(self.myRequests)-1
1083            return self.parentClass.handle(self)
1084        def handle_one_request(self):
1085            result = self.parentClass.handle_one_request(self)
1086            self.myRequests[self.reqidx].append(self.raw_requestline)
1087            return result
1088
1089    requestHandler = RequestHandler
1090    def setUp(self):
1091        #clear request log
1092        self.RequestHandler.myRequests = []
1093        return BaseServerTestCase.setUp(self)
1094
1095#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
1096#does indeed serve subsequent requests on the same connection
1097class KeepaliveServerTestCase1(BaseKeepaliveServerTestCase):
1098    def test_two(self):
1099        p = xmlrpclib.ServerProxy(URL)
1100        #do three requests.
1101        self.assertEqual(p.pow(6,8), 6**8)
1102        self.assertEqual(p.pow(6,8), 6**8)
1103        self.assertEqual(p.pow(6,8), 6**8)
1104        p("close")()
1105
1106        #they should have all been handled by a single request handler
1107        self.assertEqual(len(self.RequestHandler.myRequests), 1)
1108
1109        #check that we did at least two (the third may be pending append
1110        #due to thread scheduling)
1111        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
1112
1113
1114#test special attribute access on the serverproxy, through the __call__
1115#function.
1116class KeepaliveServerTestCase2(BaseKeepaliveServerTestCase):
1117    #ask for two keepalive requests to be handled.
1118    request_count=2
1119
1120    def test_close(self):
1121        p = xmlrpclib.ServerProxy(URL)
1122        #do some requests with close.
1123        self.assertEqual(p.pow(6,8), 6**8)
1124        self.assertEqual(p.pow(6,8), 6**8)
1125        self.assertEqual(p.pow(6,8), 6**8)
1126        p("close")() #this should trigger a new keep-alive request
1127        self.assertEqual(p.pow(6,8), 6**8)
1128        self.assertEqual(p.pow(6,8), 6**8)
1129        self.assertEqual(p.pow(6,8), 6**8)
1130        p("close")()
1131
1132        #they should have all been two request handlers, each having logged at least
1133        #two complete requests
1134        self.assertEqual(len(self.RequestHandler.myRequests), 2)
1135        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
1136        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2)
1137
1138
1139    def test_transport(self):
1140        p = xmlrpclib.ServerProxy(URL)
1141        #do some requests with close.
1142        self.assertEqual(p.pow(6,8), 6**8)
1143        p("transport").close() #same as above, really.
1144        self.assertEqual(p.pow(6,8), 6**8)
1145        p("close")()
1146        self.assertEqual(len(self.RequestHandler.myRequests), 2)
1147
1148#A test case that verifies that gzip encoding works in both directions
1149#(for a request and the response)
1150@unittest.skipIf(gzip is None, 'requires gzip')
1151class GzipServerTestCase(BaseServerTestCase):
1152    #a request handler that supports keep-alive and logs requests into a
1153    #class variable
1154    class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
1155        parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler
1156        protocol_version = 'HTTP/1.1'
1157
1158        def do_POST(self):
1159            #store content of last request in class
1160            self.__class__.content_length = int(self.headers["content-length"])
1161            return self.parentClass.do_POST(self)
1162    requestHandler = RequestHandler
1163
1164    class Transport(xmlrpclib.Transport):
1165        #custom transport, stores the response length for our perusal
1166        fake_gzip = False
1167        def parse_response(self, response):
1168            self.response_length=int(response.getheader("content-length", 0))
1169            return xmlrpclib.Transport.parse_response(self, response)
1170
1171        def send_content(self, connection, body):
1172            if self.fake_gzip:
1173                #add a lone gzip header to induce decode error remotely
1174                connection.putheader("Content-Encoding", "gzip")
1175            return xmlrpclib.Transport.send_content(self, connection, body)
1176
1177    def setUp(self):
1178        BaseServerTestCase.setUp(self)
1179
1180    def test_gzip_request(self):
1181        t = self.Transport()
1182        t.encode_threshold = None
1183        p = xmlrpclib.ServerProxy(URL, transport=t)
1184        self.assertEqual(p.pow(6,8), 6**8)
1185        a = self.RequestHandler.content_length
1186        t.encode_threshold = 0 #turn on request encoding
1187        self.assertEqual(p.pow(6,8), 6**8)
1188        b = self.RequestHandler.content_length
1189        self.assertTrue(a>b)
1190        p("close")()
1191
1192    def test_bad_gzip_request(self):
1193        t = self.Transport()
1194        t.encode_threshold = None
1195        t.fake_gzip = True
1196        p = xmlrpclib.ServerProxy(URL, transport=t)
1197        cm = self.assertRaisesRegex(xmlrpclib.ProtocolError,
1198                                    re.compile(r"\b400\b"))
1199        with cm:
1200            p.pow(6, 8)
1201        p("close")()
1202
1203    def test_gzip_response(self):
1204        t = self.Transport()
1205        p = xmlrpclib.ServerProxy(URL, transport=t)
1206        old = self.requestHandler.encode_threshold
1207        self.requestHandler.encode_threshold = None #no encoding
1208        self.assertEqual(p.pow(6,8), 6**8)
1209        a = t.response_length
1210        self.requestHandler.encode_threshold = 0 #always encode
1211        self.assertEqual(p.pow(6,8), 6**8)
1212        p("close")()
1213        b = t.response_length
1214        self.requestHandler.encode_threshold = old
1215        self.assertTrue(a>b)
1216
1217
1218@unittest.skipIf(gzip is None, 'requires gzip')
1219class GzipUtilTestCase(unittest.TestCase):
1220
1221    def test_gzip_decode_limit(self):
1222        max_gzip_decode = 20 * 1024 * 1024
1223        data = b'\0' * max_gzip_decode
1224        encoded = xmlrpclib.gzip_encode(data)
1225        decoded = xmlrpclib.gzip_decode(encoded)
1226        self.assertEqual(len(decoded), max_gzip_decode)
1227
1228        data = b'\0' * (max_gzip_decode + 1)
1229        encoded = xmlrpclib.gzip_encode(data)
1230
1231        with self.assertRaisesRegex(ValueError,
1232                                    "max gzipped payload length exceeded"):
1233            xmlrpclib.gzip_decode(encoded)
1234
1235        xmlrpclib.gzip_decode(encoded, max_decode=-1)
1236
1237
1238class HeadersServerTestCase(BaseServerTestCase):
1239    class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
1240        test_headers = None
1241
1242        def do_POST(self):
1243            self.__class__.test_headers = self.headers
1244            return super().do_POST()
1245    requestHandler = RequestHandler
1246    standard_headers = [
1247        'Host', 'Accept-Encoding', 'Content-Type', 'User-Agent',
1248        'Content-Length']
1249
1250    def setUp(self):
1251        self.RequestHandler.test_headers = None
1252        return super().setUp()
1253
1254    def assertContainsAdditionalHeaders(self, headers, additional):
1255        expected_keys = sorted(self.standard_headers + list(additional.keys()))
1256        self.assertListEqual(sorted(headers.keys()), expected_keys)
1257
1258        for key, value in additional.items():
1259            self.assertEqual(headers.get(key), value)
1260
1261    def test_header(self):
1262        p = xmlrpclib.ServerProxy(URL, headers=[('X-Test', 'foo')])
1263        self.assertEqual(p.pow(6, 8), 6**8)
1264
1265        headers = self.RequestHandler.test_headers
1266        self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'})
1267
1268    def test_header_many(self):
1269        p = xmlrpclib.ServerProxy(
1270            URL, headers=[('X-Test', 'foo'), ('X-Test-Second', 'bar')])
1271        self.assertEqual(p.pow(6, 8), 6**8)
1272
1273        headers = self.RequestHandler.test_headers
1274        self.assertContainsAdditionalHeaders(
1275            headers, {'X-Test': 'foo', 'X-Test-Second': 'bar'})
1276
1277    def test_header_empty(self):
1278        p = xmlrpclib.ServerProxy(URL, headers=[])
1279        self.assertEqual(p.pow(6, 8), 6**8)
1280
1281        headers = self.RequestHandler.test_headers
1282        self.assertContainsAdditionalHeaders(headers, {})
1283
1284    def test_header_tuple(self):
1285        p = xmlrpclib.ServerProxy(URL, headers=(('X-Test', 'foo'),))
1286        self.assertEqual(p.pow(6, 8), 6**8)
1287
1288        headers = self.RequestHandler.test_headers
1289        self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'})
1290
1291    def test_header_items(self):
1292        p = xmlrpclib.ServerProxy(URL, headers={'X-Test': 'foo'}.items())
1293        self.assertEqual(p.pow(6, 8), 6**8)
1294
1295        headers = self.RequestHandler.test_headers
1296        self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'})
1297
1298
1299#Test special attributes of the ServerProxy object
1300class ServerProxyTestCase(unittest.TestCase):
1301    def setUp(self):
1302        unittest.TestCase.setUp(self)
1303        # Actual value of the URL doesn't matter if it is a string in
1304        # the correct format.
1305        self.url = 'http://fake.localhost'
1306
1307    def test_close(self):
1308        p = xmlrpclib.ServerProxy(self.url)
1309        self.assertEqual(p('close')(), None)
1310
1311    def test_transport(self):
1312        t = xmlrpclib.Transport()
1313        p = xmlrpclib.ServerProxy(self.url, transport=t)
1314        self.assertEqual(p('transport'), t)
1315
1316
1317# This is a contrived way to make a failure occur on the server side
1318# in order to test the _send_traceback_header flag on the server
1319class FailingMessageClass(http.client.HTTPMessage):
1320    def get(self, key, failobj=None):
1321        key = key.lower()
1322        if key == 'content-length':
1323            return 'I am broken'
1324        return super().get(key, failobj)
1325
1326
1327class FailingServerTestCase(unittest.TestCase):
1328    def setUp(self):
1329        self.evt = threading.Event()
1330        # start server thread to handle requests
1331        serv_args = (self.evt, 1)
1332        thread = threading.Thread(target=http_server, args=serv_args)
1333        thread.start()
1334        self.addCleanup(thread.join)
1335
1336        # wait for the server to be ready
1337        self.evt.wait()
1338        self.evt.clear()
1339
1340    def tearDown(self):
1341        # wait on the server thread to terminate
1342        self.evt.wait()
1343        # reset flag
1344        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False
1345        # reset message class
1346        default_class = http.client.HTTPMessage
1347        xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = default_class
1348
1349    def test_basic(self):
1350        # check that flag is false by default
1351        flagval = xmlrpc.server.SimpleXMLRPCServer._send_traceback_header
1352        self.assertEqual(flagval, False)
1353
1354        # enable traceback reporting
1355        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
1356
1357        # test a call that shouldn't fail just as a smoke test
1358        try:
1359            p = xmlrpclib.ServerProxy(URL)
1360            self.assertEqual(p.pow(6,8), 6**8)
1361        except (xmlrpclib.ProtocolError, OSError) as e:
1362            # ignore failures due to non-blocking socket 'unavailable' errors
1363            if not is_unavailable_exception(e):
1364                # protocol error; provide additional information in test output
1365                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
1366
1367    def test_fail_no_info(self):
1368        # use the broken message class
1369        xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
1370
1371        try:
1372            p = xmlrpclib.ServerProxy(URL)
1373            p.pow(6,8)
1374        except (xmlrpclib.ProtocolError, OSError) as e:
1375            # ignore failures due to non-blocking socket 'unavailable' errors
1376            if not is_unavailable_exception(e) and hasattr(e, "headers"):
1377                # The two server-side error headers shouldn't be sent back in this case
1378                self.assertTrue(e.headers.get("X-exception") is None)
1379                self.assertTrue(e.headers.get("X-traceback") is None)
1380        else:
1381            self.fail('ProtocolError not raised')
1382
1383    def test_fail_with_info(self):
1384        # use the broken message class
1385        xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
1386
1387        # Check that errors in the server send back exception/traceback
1388        # info when flag is set
1389        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
1390
1391        try:
1392            p = xmlrpclib.ServerProxy(URL)
1393            p.pow(6,8)
1394        except (xmlrpclib.ProtocolError, OSError) as e:
1395            # ignore failures due to non-blocking socket 'unavailable' errors
1396            if not is_unavailable_exception(e) and hasattr(e, "headers"):
1397                # We should get error info in the response
1398                expected_err = "invalid literal for int() with base 10: 'I am broken'"
1399                self.assertEqual(e.headers.get("X-exception"), expected_err)
1400                self.assertTrue(e.headers.get("X-traceback") is not None)
1401        else:
1402            self.fail('ProtocolError not raised')
1403
1404
1405@contextlib.contextmanager
1406def captured_stdout(encoding='utf-8'):
1407    """A variation on support.captured_stdout() which gives a text stream
1408    having a `buffer` attribute.
1409    """
1410    orig_stdout = sys.stdout
1411    sys.stdout = io.TextIOWrapper(io.BytesIO(), encoding=encoding)
1412    try:
1413        yield sys.stdout
1414    finally:
1415        sys.stdout = orig_stdout
1416
1417
1418class CGIHandlerTestCase(unittest.TestCase):
1419    def setUp(self):
1420        self.cgi = xmlrpc.server.CGIXMLRPCRequestHandler()
1421
1422    def tearDown(self):
1423        self.cgi = None
1424
1425    def test_cgi_get(self):
1426        with os_helper.EnvironmentVarGuard() as env:
1427            env['REQUEST_METHOD'] = 'GET'
1428            # if the method is GET and no request_text is given, it runs handle_get
1429            # get sysout output
1430            with captured_stdout(encoding=self.cgi.encoding) as data_out:
1431                self.cgi.handle_request()
1432
1433            # parse Status header
1434            data_out.seek(0)
1435            handle = data_out.read()
1436            status = handle.split()[1]
1437            message = ' '.join(handle.split()[2:4])
1438
1439            self.assertEqual(status, '400')
1440            self.assertEqual(message, 'Bad Request')
1441
1442
1443    def test_cgi_xmlrpc_response(self):
1444        data = """<?xml version='1.0'?>
1445        <methodCall>
1446            <methodName>test_method</methodName>
1447            <params>
1448                <param>
1449                    <value><string>foo</string></value>
1450                </param>
1451                <param>
1452                    <value><string>bar</string></value>
1453                </param>
1454            </params>
1455        </methodCall>
1456        """
1457
1458        with os_helper.EnvironmentVarGuard() as env, \
1459             captured_stdout(encoding=self.cgi.encoding) as data_out, \
1460             support.captured_stdin() as data_in:
1461            data_in.write(data)
1462            data_in.seek(0)
1463            env['CONTENT_LENGTH'] = str(len(data))
1464            self.cgi.handle_request()
1465        data_out.seek(0)
1466
1467        # will respond exception, if so, our goal is achieved ;)
1468        handle = data_out.read()
1469
1470        # start with 44th char so as not to get http header, we just
1471        # need only xml
1472        self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, handle[44:])
1473
1474        # Also test the content-length returned  by handle_request
1475        # Using the same test method inorder to avoid all the datapassing
1476        # boilerplate code.
1477        # Test for bug: http://bugs.python.org/issue5040
1478
1479        content = handle[handle.find("<?xml"):]
1480
1481        self.assertEqual(
1482            int(re.search(r'Content-Length: (\d+)', handle).group(1)),
1483            len(content))
1484
1485
1486class UseBuiltinTypesTestCase(unittest.TestCase):
1487
1488    def test_use_builtin_types(self):
1489        # SimpleXMLRPCDispatcher.__init__ accepts use_builtin_types, which
1490        # makes all dispatch of binary data as bytes instances, and all
1491        # dispatch of datetime argument as datetime.datetime instances.
1492        self.log = []
1493        expected_bytes = b"my dog has fleas"
1494        expected_date = datetime.datetime(2008, 5, 26, 18, 25, 12)
1495        marshaled = xmlrpclib.dumps((expected_bytes, expected_date), 'foobar')
1496        def foobar(*args):
1497            self.log.extend(args)
1498        handler = xmlrpc.server.SimpleXMLRPCDispatcher(
1499            allow_none=True, encoding=None, use_builtin_types=True)
1500        handler.register_function(foobar)
1501        handler._marshaled_dispatch(marshaled)
1502        self.assertEqual(len(self.log), 2)
1503        mybytes, mydate = self.log
1504        self.assertEqual(self.log, [expected_bytes, expected_date])
1505        self.assertIs(type(mydate), datetime.datetime)
1506        self.assertIs(type(mybytes), bytes)
1507
1508    def test_cgihandler_has_use_builtin_types_flag(self):
1509        handler = xmlrpc.server.CGIXMLRPCRequestHandler(use_builtin_types=True)
1510        self.assertTrue(handler.use_builtin_types)
1511
1512    def test_xmlrpcserver_has_use_builtin_types_flag(self):
1513        server = xmlrpc.server.SimpleXMLRPCServer(("localhost", 0),
1514            use_builtin_types=True)
1515        server.server_close()
1516        self.assertTrue(server.use_builtin_types)
1517
1518
1519def setUpModule():
1520    thread_info = threading_helper.threading_setup()
1521    unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
1522
1523
1524if __name__ == "__main__":
1525    unittest.main()
1526