test_xmlrpc.py 47.2 KB
Newer Older
1
import base64
2
import datetime
3
import sys
4
import time
5
import unittest
6
from unittest import mock
7 8
import xmlrpc.client as xmlrpclib
import xmlrpc.server
9
import http.client
10
import http, http.server
11
import socket
12
import re
13 14
import io
import contextlib
15
from test import support
16

17 18
try:
    import gzip
19
except ImportError:
20
    gzip = None
21 22
try:
    import threading
23
except ImportError:
24 25
    threading = None

26 27
alist = [{'astring': 'foo@bar.baz.spam',
          'afloat': 7283.43,
28
          'anint': 2**20,
29
          'ashortlong': 2,
30
          'anotherlist': ['.zyx.41'],
31
          'abase64': xmlrpclib.Binary(b"my dog has fleas"),
32 33
          'b64bytes': b"my dog has fleas",
          'b64bytearray': bytearray(b"my dog has fleas"),
34
          'boolean': False,
35 36
          'unicode': '\u4000\u6000\u8000',
          'ukey\u4000': 'regular value',
37 38
          'datetime1': xmlrpclib.DateTime('20050210T11:41:23'),
          'datetime2': xmlrpclib.DateTime(
39
                        (2005, 2, 10, 11, 41, 23, 0, 1, -1)),
40
          'datetime3': xmlrpclib.DateTime(
41
                        datetime.datetime(2005, 2, 10, 11, 41, 23)),
42 43 44 45 46
          }]

class XMLRPCTestCase(unittest.TestCase):

    def test_dump_load(self):
47 48
        dump = xmlrpclib.dumps((alist,))
        load = xmlrpclib.loads(dump)
49
        self.assertEqual(alist, load[0][0])
50

51
    def test_dump_bare_datetime(self):
52 53
        # This checks that an unwrapped datetime.date object can be handled
        # by the marshalling code.  This can't be done via test_dump_load()
54
        # since with use_builtin_types set to 1 the unmarshaller would create
55
        # datetime objects for the 'datetime[123]' keys as well
56
        dt = datetime.datetime(2005, 2, 10, 11, 41, 23)
57
        self.assertEqual(dt, xmlrpclib.DateTime('20050210T11:41:23'))
58
        s = xmlrpclib.dumps((dt,))
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79

        result, m = xmlrpclib.loads(s, use_builtin_types=True)
        (newdt,) = result
        self.assertEqual(newdt, dt)
        self.assertIs(type(newdt), datetime.datetime)
        self.assertIsNone(m)

        result, m = xmlrpclib.loads(s, use_builtin_types=False)
        (newdt,) = result
        self.assertEqual(newdt, dt)
        self.assertIs(type(newdt), xmlrpclib.DateTime)
        self.assertIsNone(m)

        result, m = xmlrpclib.loads(s, use_datetime=True)
        (newdt,) = result
        self.assertEqual(newdt, dt)
        self.assertIs(type(newdt), datetime.datetime)
        self.assertIsNone(m)

        result, m = xmlrpclib.loads(s, use_datetime=False)
        (newdt,) = result
80
        self.assertEqual(newdt, dt)
81 82
        self.assertIs(type(newdt), xmlrpclib.DateTime)
        self.assertIsNone(m)
83

84

Christian Heimes's avatar
Christian Heimes committed
85
    def test_datetime_before_1900(self):
Christian Heimes's avatar
Christian Heimes committed
86
        # same as before but with a date before 1900
Christian Heimes's avatar
Christian Heimes committed
87
        dt = datetime.datetime(1,  2, 10, 11, 41, 23)
88
        self.assertEqual(dt, xmlrpclib.DateTime('00010210T11:41:23'))
Christian Heimes's avatar
Christian Heimes committed
89
        s = xmlrpclib.dumps((dt,))
90 91 92

        result, m = xmlrpclib.loads(s, use_builtin_types=True)
        (newdt,) = result
93
        self.assertEqual(newdt, dt)
94 95
        self.assertIs(type(newdt), datetime.datetime)
        self.assertIsNone(m)
Christian Heimes's avatar
Christian Heimes committed
96

97 98 99 100 101
        result, m = xmlrpclib.loads(s, use_builtin_types=False)
        (newdt,) = result
        self.assertEqual(newdt, dt)
        self.assertIs(type(newdt), xmlrpclib.DateTime)
        self.assertIsNone(m)
Christian Heimes's avatar
Christian Heimes committed
102

103 104
    def test_bug_1164912 (self):
        d = xmlrpclib.DateTime()
Tim Peters's avatar
Tim Peters committed
105
        ((new_d,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((d,),
106
                                            methodresponse=True))
107
        self.assertIsInstance(new_d.value, str)
108 109 110

        # Check that the output of dumps() is still an 8-bit string
        s = xmlrpclib.dumps((new_d,), methodresponse=True)
111
        self.assertIsInstance(s, str)
112

113 114 115 116 117 118 119
    def test_newstyle_class(self):
        class T(object):
            pass
        t = T()
        t.x = 100
        t.y = "Hello"
        ((t2,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((t,)))
120
        self.assertEqual(t2, t.__dict__)
121

122
    def test_dump_big_long(self):
123
        self.assertRaises(OverflowError, xmlrpclib.dumps, (2**99,))
124 125 126 127

    def test_dump_bad_dict(self):
        self.assertRaises(TypeError, xmlrpclib.dumps, ({(1,2,3): 1},))

128 129 130 131 132 133 134 135 136 137 138 139
    def test_dump_recursive_seq(self):
        l = [1,2,3]
        t = [3,4,5,l]
        l.append(t)
        self.assertRaises(TypeError, xmlrpclib.dumps, (l,))

    def test_dump_recursive_dict(self):
        d = {'1':1, '2':1}
        t = {'3':3, 'd':d}
        d['t'] = t
        self.assertRaises(TypeError, xmlrpclib.dumps, (d,))

140
    def test_dump_big_int(self):
141
        if sys.maxsize > 2**31-1:
142
            self.assertRaises(OverflowError, xmlrpclib.dumps,
143
                              (int(2**34),))
144

145
        xmlrpclib.dumps((xmlrpclib.MAXINT, xmlrpclib.MININT))
146 147 148 149
        self.assertRaises(OverflowError, xmlrpclib.dumps,
                          (xmlrpclib.MAXINT+1,))
        self.assertRaises(OverflowError, xmlrpclib.dumps,
                          (xmlrpclib.MININT-1,))
150 151 152 153 154 155 156

        def dummy_write(s):
            pass

        m = xmlrpclib.Marshaller()
        m.dump_int(xmlrpclib.MAXINT, dummy_write)
        m.dump_int(xmlrpclib.MININT, dummy_write)
157 158 159 160
        self.assertRaises(OverflowError, m.dump_int,
                          xmlrpclib.MAXINT+1, dummy_write)
        self.assertRaises(OverflowError, m.dump_int,
                          xmlrpclib.MININT-1, dummy_write)
161

162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
    def test_dump_double(self):
        xmlrpclib.dumps((float(2 ** 34),))
        xmlrpclib.dumps((float(xmlrpclib.MAXINT),
                         float(xmlrpclib.MININT)))
        xmlrpclib.dumps((float(xmlrpclib.MAXINT + 42),
                         float(xmlrpclib.MININT - 42)))

        def dummy_write(s):
            pass

        m = xmlrpclib.Marshaller()
        m.dump_double(xmlrpclib.MAXINT, dummy_write)
        m.dump_double(xmlrpclib.MININT, dummy_write)
        m.dump_double(xmlrpclib.MAXINT + 42, dummy_write)
        m.dump_double(xmlrpclib.MININT - 42, dummy_write)

178 179 180 181
    def test_dump_none(self):
        value = alist + [None]
        arg1 = (alist + [None],)
        strg = xmlrpclib.dumps(arg1, allow_none=True)
182
        self.assertEqual(value,
183 184 185
                          xmlrpclib.loads(strg)[0][0])
        self.assertRaises(TypeError, xmlrpclib.dumps, (arg1,))

186
    def test_dump_encoding(self):
187 188
        value = {'key\u20ac\xa4':
                 'value\u20ac\xa4'}
189 190 191
        strg = xmlrpclib.dumps((value,), encoding='iso-8859-15')
        strg = "<?xml version='1.0' encoding='iso-8859-15'?>" + strg
        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
192
        strg = strg.encode('iso-8859-15', 'xmlcharrefreplace')
193 194 195 196 197
        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)

        strg = xmlrpclib.dumps((value,), encoding='iso-8859-15',
                               methodresponse=True)
        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
198
        strg = strg.encode('iso-8859-15', 'xmlcharrefreplace')
199 200
        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)

201 202 203 204 205 206
        methodname = 'method\u20ac\xa4'
        strg = xmlrpclib.dumps((value,), encoding='iso-8859-15',
                               methodname=methodname)
        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
        self.assertEqual(xmlrpclib.loads(strg)[1], methodname)

207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
    def test_dump_bytes(self):
        sample = b"my dog has fleas"
        self.assertEqual(sample, xmlrpclib.Binary(sample))
        for type_ in bytes, bytearray, xmlrpclib.Binary:
            value = type_(sample)
            s = xmlrpclib.dumps((value,))

            result, m = xmlrpclib.loads(s, use_builtin_types=True)
            (newvalue,) = result
            self.assertEqual(newvalue, sample)
            self.assertIs(type(newvalue), bytes)
            self.assertIsNone(m)

            result, m = xmlrpclib.loads(s, use_builtin_types=False)
            (newvalue,) = result
            self.assertEqual(newvalue, sample)
            self.assertIs(type(newvalue), xmlrpclib.Binary)
            self.assertIsNone(m)

226 227 228
    def test_get_host_info(self):
        # see bug #3613, this raised a TypeError
        transp = xmlrpc.client.Transport()
229
        self.assertEqual(transp.get_host_info("user@host.tld"),
230 231 232
                          ('host.tld',
                           [('Authorization', 'Basic dXNlcg==')], {}))

233 234
    def test_ssl_presence(self):
        try:
235
            import ssl
236 237
        except ImportError:
            has_ssl = False
238
        else:
239 240 241 242 243
            has_ssl = True
        try:
            xmlrpc.client.ServerProxy('https://localhost:9999').bad_function()
        except NotImplementedError:
            self.assertFalse(has_ssl, "xmlrpc client's error with SSL support")
244
        except OSError:
245
            self.assertTrue(has_ssl)
246

247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282
    @unittest.skipUnless(threading, "Threading required for this test.")
    def test_keepalive_disconnect(self):
        class RequestHandler(http.server.BaseHTTPRequestHandler):
            protocol_version = "HTTP/1.1"
            handled = False

            def do_POST(self):
                length = int(self.headers.get("Content-Length"))
                self.rfile.read(length)
                if self.handled:
                    self.close_connection = True
                    return
                response = xmlrpclib.dumps((5,), methodresponse=True)
                response = response.encode()
                self.send_response(http.HTTPStatus.OK)
                self.send_header("Content-Length", len(response))
                self.end_headers()
                self.wfile.write(response)
                self.handled = True
                self.close_connection = False

        def run_server():
            server.socket.settimeout(float(1))  # Don't hang if client fails
            server.handle_request()  # First request and attempt at second
            server.handle_request()  # Retried second request

        server = http.server.HTTPServer((support.HOST, 0), RequestHandler)
        self.addCleanup(server.server_close)
        thread = threading.Thread(target=run_server)
        thread.start()
        self.addCleanup(thread.join)
        url = "http://{}:{}/".format(*server.server_address)
        with xmlrpclib.ServerProxy(url) as p:
            self.assertEqual(p.method(), 5)
            self.assertEqual(p.method(), 5)

283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298
class HelperTestCase(unittest.TestCase):
    def test_escape(self):
        self.assertEqual(xmlrpclib.escape("a&b"), "a&amp;b")
        self.assertEqual(xmlrpclib.escape("a<b"), "a&lt;b")
        self.assertEqual(xmlrpclib.escape("a>b"), "a&gt;b")

class FaultTestCase(unittest.TestCase):
    def test_repr(self):
        f = xmlrpclib.Fault(42, 'Test Fault')
        self.assertEqual(repr(f), "<Fault 42: 'Test Fault'>")
        self.assertEqual(repr(f), str(f))

    def test_dump_fault(self):
        f = xmlrpclib.Fault(42, 'Test Fault')
        s = xmlrpclib.dumps((f,))
        (newf,), m = xmlrpclib.loads(s)
299 300
        self.assertEqual(newf, {'faultCode': 42, 'faultString': 'Test Fault'})
        self.assertEqual(m, None)
301 302 303 304

        s = xmlrpclib.Marshaller().dumps(f)
        self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s)

305
    def test_dotted_attribute(self):
306
        # this will raise AttributeError because code don't want us to use
307 308
        # private methods
        self.assertRaises(AttributeError,
309
                          xmlrpc.server.resolve_dotted_attribute, str, '__add')
310
        self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title'))
311 312 313

class DateTimeTestCase(unittest.TestCase):
    def test_default(self):
314 315 316 317 318 319 320 321
        with mock.patch('time.localtime') as localtime_mock:
            time_struct = time.struct_time(
                [2013, 7, 15, 0, 24, 49, 0, 196, 0])
            localtime_mock.return_value = time_struct
            localtime = time.localtime()
            t = xmlrpclib.DateTime()
            self.assertEqual(str(t),
                             time.strftime("%Y%m%dT%H:%M:%S", localtime))
322 323 324 325

    def test_time(self):
        d = 1181399930.036952
        t = xmlrpclib.DateTime(d)
326 327
        self.assertEqual(str(t),
                         time.strftime("%Y%m%dT%H:%M:%S", time.localtime(d)))
328 329 330 331 332 333 334 335 336

    def test_time_tuple(self):
        d = (2007,6,9,10,38,50,5,160,0)
        t = xmlrpclib.DateTime(d)
        self.assertEqual(str(t), '20070609T10:38:50')

    def test_time_struct(self):
        d = time.localtime(1181399930.036952)
        t = xmlrpclib.DateTime(d)
337
        self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", d))
338 339 340 341 342 343 344 345 346

    def test_datetime_datetime(self):
        d = datetime.datetime(2007,1,2,3,4,5)
        t = xmlrpclib.DateTime(d)
        self.assertEqual(str(t), '20070102T03:04:05')

    def test_repr(self):
        d = datetime.datetime(2007,1,2,3,4,5)
        t = xmlrpclib.DateTime(d)
347
        val ="<DateTime '20070102T03:04:05' at %#x>" % id(t)
348 349 350 351 352 353 354 355 356 357
        self.assertEqual(repr(t), val)

    def test_decode(self):
        d = ' 20070908T07:11:13  '
        t1 = xmlrpclib.DateTime()
        t1.decode(d)
        tref = xmlrpclib.DateTime(datetime.datetime(2007,9,8,7,11,13))
        self.assertEqual(t1, tref)

        t2 = xmlrpclib._datetime(d)
358
        self.assertEqual(t2, tref)
359

360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398
    def test_comparison(self):
        now = datetime.datetime.now()
        dtime = xmlrpclib.DateTime(now.timetuple())

        # datetime vs. DateTime
        self.assertTrue(dtime == now)
        self.assertTrue(now == dtime)
        then = now + datetime.timedelta(seconds=4)
        self.assertTrue(then >= dtime)
        self.assertTrue(dtime < then)

        # str vs. DateTime
        dstr = now.strftime("%Y%m%dT%H:%M:%S")
        self.assertTrue(dtime == dstr)
        self.assertTrue(dstr == dtime)
        dtime_then = xmlrpclib.DateTime(then.timetuple())
        self.assertTrue(dtime_then >= dstr)
        self.assertTrue(dstr < dtime_then)

        # some other types
        dbytes = dstr.encode('ascii')
        dtuple = now.timetuple()
        with self.assertRaises(TypeError):
            dtime == 1970
        with self.assertRaises(TypeError):
            dtime != dbytes
        with self.assertRaises(TypeError):
            dtime == bytearray(dbytes)
        with self.assertRaises(TypeError):
            dtime != dtuple
        with self.assertRaises(TypeError):
            dtime < float(1970)
        with self.assertRaises(TypeError):
            dtime > dbytes
        with self.assertRaises(TypeError):
            dtime <= bytearray(dbytes)
        with self.assertRaises(TypeError):
            dtime >= dtuple

399
class BinaryTestCase(unittest.TestCase):
400 401 402 403 404 405

    # XXX What should str(Binary(b"\xff")) return?  I'm chosing "\xff"
    # for now (i.e. interpreting the binary data as Latin-1-encoded
    # text).  But this feels very unsatisfactory.  Perhaps we should
    # only define repr(), and return r"Binary(b'\xff')" instead?

406 407 408 409 410
    def test_default(self):
        t = xmlrpclib.Binary()
        self.assertEqual(str(t), '')

    def test_string(self):
411
        d = b'\x01\x02\x03abc123\xff\xfe'
412
        t = xmlrpclib.Binary(d)
413
        self.assertEqual(str(t), str(d, "latin-1"))
414 415

    def test_decode(self):
416
        d = b'\x01\x02\x03abc123\xff\xfe'
417
        de = base64.encodebytes(d)
418 419
        t1 = xmlrpclib.Binary()
        t1.decode(de)
420
        self.assertEqual(str(t1), str(d, "latin-1"))
421 422

        t2 = xmlrpclib._binary(de)
423
        self.assertEqual(str(t2), str(d, "latin-1"))
424 425


426
ADDR = PORT = URL = None
427

428 429 430
# The evt is set twice.  First when the server is ready to serve.
# Second when the server has been shutdown.  The user must clear
# the event after it has been set the first time to catch the second set.
431
def http_server(evt, numrequests, requestHandler=None, encoding=None):
432 433 434 435
    class TestInstanceClass:
        def div(self, x, y):
            return x // y

436 437 438 439
        def _methodHelp(self, name):
            if name == 'div':
                return 'This is the div function'

440 441 442 443 444
        class Fixture:
            @staticmethod
            def getData():
                return '42'

445 446 447 448
    def my_function():
        '''This is my function'''
        return True

449
    class MyXMLRPCServer(xmlrpc.server.SimpleXMLRPCServer):
450 451 452 453 454 455 456
        def get_request(self):
            # Ensure the socket is always non-blocking.  On Linux, socket
            # attributes are not inherited like they are on *BSD and Windows.
            s, port = self.socket.accept()
            s.setblocking(True)
            return s, port

457 458 459
    if not requestHandler:
        requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler
    serv = MyXMLRPCServer(("localhost", 0), requestHandler,
460
                          encoding=encoding,
461
                          logRequests=False, bind_and_activate=False)
462 463
    try:
        serv.server_bind()
464 465 466
        global ADDR, PORT, URL
        ADDR, PORT = serv.socket.getsockname()
        #connect to IP address directly.  This avoids socket.create_connection()
Ezio Melotti's avatar
Ezio Melotti committed
467
        #trying to connect to "localhost" using all address families, which
468 469 470
        #causes slowdown e.g. on vista which supports AF_INET6.  The server listens
        #on AF_INET only.
        URL = "http://%s:%d"%(ADDR, PORT)
471 472 473 474 475
        serv.server_activate()
        serv.register_introspection_functions()
        serv.register_multicall_functions()
        serv.register_function(pow)
        serv.register_function(lambda x,y: x+y, 'add')
476
        serv.register_function(lambda x: x, 'têšt')
477
        serv.register_function(my_function)
478 479
        testInstance = TestInstanceClass()
        serv.register_instance(testInstance, allow_dotted_names=True)
480
        evt.set()
481 482 483 484 485 486 487 488 489 490 491 492 493

        # handle up to 'numrequests' requests
        while numrequests > 0:
            serv.handle_request()
            numrequests -= 1

    except socket.timeout:
        pass
    finally:
        serv.socket.close()
        PORT = None
        evt.set()

494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519
def http_multi_server(evt, numrequests, requestHandler=None):
    class TestInstanceClass:
        def div(self, x, y):
            return x // y

        def _methodHelp(self, name):
            if name == 'div':
                return 'This is the div function'

    def my_function():
        '''This is my function'''
        return True

    class MyXMLRPCServer(xmlrpc.server.MultiPathXMLRPCServer):
        def get_request(self):
            # Ensure the socket is always non-blocking.  On Linux, socket
            # attributes are not inherited like they are on *BSD and Windows.
            s, port = self.socket.accept()
            s.setblocking(True)
            return s, port

    if not requestHandler:
        requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler
    class MyRequestHandler(requestHandler):
        rpc_paths = []

520 521 522 523
    class BrokenDispatcher:
        def _marshaled_dispatch(self, data, dispatch_method=None, path=None):
            raise RuntimeError("broken dispatcher")

524 525 526 527 528 529 530 531
    serv = MyXMLRPCServer(("localhost", 0), MyRequestHandler,
                          logRequests=False, bind_and_activate=False)
    serv.socket.settimeout(3)
    serv.server_bind()
    try:
        global ADDR, PORT, URL
        ADDR, PORT = serv.socket.getsockname()
        #connect to IP address directly.  This avoids socket.create_connection()
Ezio Melotti's avatar
Ezio Melotti committed
532
        #trying to connect to "localhost" using all address families, which
533 534 535 536 537 538 539 540 541 542 543
        #causes slowdown e.g. on vista which supports AF_INET6.  The server listens
        #on AF_INET only.
        URL = "http://%s:%d"%(ADDR, PORT)
        serv.server_activate()
        paths = ["/foo", "/foo/bar"]
        for path in paths:
            d = serv.add_dispatcher(path, xmlrpc.server.SimpleXMLRPCDispatcher())
            d.register_introspection_functions()
            d.register_multicall_functions()
        serv.get_dispatcher(paths[0]).register_function(pow)
        serv.get_dispatcher(paths[1]).register_function(lambda x,y: x+y, 'add')
544
        serv.add_dispatcher("/is/broken", BrokenDispatcher())
545 546 547 548 549 550 551 552 553 554 555 556 557 558
        evt.set()

        # handle up to 'numrequests' requests
        while numrequests > 0:
            serv.handle_request()
            numrequests -= 1

    except socket.timeout:
        pass
    finally:
        serv.socket.close()
        PORT = None
        evt.set()

559 560 561 562 563 564 565 566
# This function prevents errors like:
#    <ProtocolError for localhost:57527/RPC2: 500 Internal Server Error>
def is_unavailable_exception(e):
    '''Returns True if the given ProtocolError is the product of a server-side
       exception caused by the 'temporarily unavailable' response sometimes
       given by operations on non-blocking sockets.'''

    # sometimes we get a -1 error code and/or empty headers
567 568 569 570 571
    try:
        if e.errcode == -1 or e.headers is None:
            return True
        exc_mess = e.headers.get('X-exception')
    except AttributeError:
572
        # Ignore OSErrors here.
573 574 575
        exc_mess = str(e)

    if exc_mess and 'temporarily unavailable' in exc_mess.lower():
576
        return True
577

578
def make_request_and_skipIf(condition, reason):
579
    # If we skip the test, we have to make a request because
580 581 582 583 584 585 586
    # the server created in setUp blocks expecting one to come in.
    if not condition:
        return lambda func: func
    def decorator(func):
        def make_request_and_skip(self):
            try:
                xmlrpclib.ServerProxy(URL).my_function()
587
            except (xmlrpclib.ProtocolError, OSError) as e:
588 589 590 591 592 593
                if not is_unavailable_exception(e):
                    raise
            raise unittest.SkipTest(reason)
        return make_request_and_skip
    return decorator

594
@unittest.skipUnless(threading, 'Threading required for this test.')
595 596
class BaseServerTestCase(unittest.TestCase):
    requestHandler = None
597
    request_count = 1
598
    threadFunc = staticmethod(http_server)
599

600
    def setUp(self):
601
        # enable traceback reporting
602
        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
603

604
        self.evt = threading.Event()
605
        # start server thread to handle requests
606
        serv_args = (self.evt, self.request_count, self.requestHandler)
607
        threading.Thread(target=self.threadFunc, args=serv_args).start()
608

609 610 611
        # wait for the server to be ready
        self.evt.wait()
        self.evt.clear()
612 613 614

    def tearDown(self):
        # wait on the server thread to terminate
615
        self.evt.wait()
616

617
        # disable traceback reporting
618
        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False
619

620
class SimpleServerTestCase(BaseServerTestCase):
621
    def test_simple1(self):
622
        try:
623
            p = xmlrpclib.ServerProxy(URL)
624
            self.assertEqual(p.pow(6,8), 6**8)
625
        except (xmlrpclib.ProtocolError, OSError) as e:
626 627 628
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e):
                # protocol error; provide additional information in test output
629
                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
630

631 632 633 634 635 636 637
    def test_nonascii(self):
        start_string = 'P\N{LATIN SMALL LETTER Y WITH CIRCUMFLEX}t'
        end_string = 'h\N{LATIN SMALL LETTER O WITH HORN}n'
        try:
            p = xmlrpclib.ServerProxy(URL)
            self.assertEqual(p.add(start_string, end_string),
                             start_string + end_string)
638
        except (xmlrpclib.ProtocolError, OSError) as e:
639 640 641 642 643
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e):
                # protocol error; provide additional information in test output
                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))

644 645
    def test_client_encoding(self):
        start_string = '\u20ac'
646
        end_string = '\xa4'
647 648 649 650 651 652 653 654 655 656 657

        try:
            p = xmlrpclib.ServerProxy(URL, encoding='iso-8859-15')
            self.assertEqual(p.add(start_string, end_string),
                             start_string + end_string)
        except (xmlrpclib.ProtocolError, socket.error) as e:
            # ignore failures due to non-blocking socket unavailable errors.
            if not is_unavailable_exception(e):
                # protocol error; provide additional information in test output
                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))

658 659 660 661 662 663 664 665 666 667
    def test_nonascii_methodname(self):
        try:
            p = xmlrpclib.ServerProxy(URL, encoding='ascii')
            self.assertEqual(p.têšt(42), 42)
        except (xmlrpclib.ProtocolError, socket.error) as e:
            # ignore failures due to non-blocking socket unavailable errors.
            if not is_unavailable_exception(e):
                # protocol error; provide additional information in test output
                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))

668 669
    # [ch] The test 404 is causing lots of false alarms.
    def XXXtest_404(self):
670
        # send POST with http.client, it should return 404 header and
671
        # 'Not Found' message.
672
        conn = httplib.client.HTTPConnection(ADDR, PORT)
673
        conn.request('POST', '/this-is-not-valid')
674 675 676 677 678 679
        response = conn.getresponse()
        conn.close()

        self.assertEqual(response.status, 404)
        self.assertEqual(response.reason, 'Not Found')

680
    def test_introspection1(self):
681
        expected_methods = set(['pow', 'div', 'my_function', 'add', 'têšt',
682
                                'system.listMethods', 'system.methodHelp',
683 684
                                'system.methodSignature', 'system.multicall',
                                'Fixture'])
685
        try:
686
            p = xmlrpclib.ServerProxy(URL)
687 688
            meth = p.system.listMethods()
            self.assertEqual(set(meth), expected_methods)
689
        except (xmlrpclib.ProtocolError, OSError) as e:
690 691 692
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e):
                # protocol error; provide additional information in test output
693
                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
694

695 696

    def test_introspection2(self):
697
        try:
698
            # test _methodHelp()
699
            p = xmlrpclib.ServerProxy(URL)
700 701
            divhelp = p.system.methodHelp('div')
            self.assertEqual(divhelp, 'This is the div function')
702
        except (xmlrpclib.ProtocolError, OSError) as e:
703 704 705
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e):
                # protocol error; provide additional information in test output
706
                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
707

708 709
    @make_request_and_skipIf(sys.flags.optimize >= 2,
                     "Docstrings are omitted with -O2 and above")
710
    def test_introspection3(self):
711 712
        try:
            # test native doc
713
            p = xmlrpclib.ServerProxy(URL)
714 715
            myfunction = p.system.methodHelp('my_function')
            self.assertEqual(myfunction, 'This is my function')
716
        except (xmlrpclib.ProtocolError, OSError) as e:
717 718 719
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e):
                # protocol error; provide additional information in test output
720
                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
721 722

    def test_introspection4(self):
723
        # the SimpleXMLRPCServer doesn't support signatures, but
724
        # at least check that we can try making the call
725
        try:
726
            p = xmlrpclib.ServerProxy(URL)
727 728
            divsig = p.system.methodSignature('div')
            self.assertEqual(divsig, 'signatures not supported')
729
        except (xmlrpclib.ProtocolError, OSError) as e:
730 731 732
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e):
                # protocol error; provide additional information in test output
733
                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
734 735

    def test_multicall(self):
736
        try:
737
            p = xmlrpclib.ServerProxy(URL)
738 739 740 741 742 743 744 745
            multicall = xmlrpclib.MultiCall(p)
            multicall.add(2,3)
            multicall.pow(6,8)
            multicall.div(127,42)
            add_result, pow_result, div_result = multicall()
            self.assertEqual(add_result, 2+3)
            self.assertEqual(pow_result, 6**8)
            self.assertEqual(div_result, 127//42)
746
        except (xmlrpclib.ProtocolError, OSError) as e:
747 748 749
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e):
                # protocol error; provide additional information in test output
750
                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
751

752 753
    def test_non_existing_multicall(self):
        try:
754
            p = xmlrpclib.ServerProxy(URL)
755 756 757 758 759
            multicall = xmlrpclib.MultiCall(p)
            multicall.this_is_not_exists()
            result = multicall()

            # result.results contains;
760
            # [{'faultCode': 1, 'faultString': '<class \'exceptions.Exception\'>:'
761 762 763 764
            #   'method "this_is_not_exists" is not supported'>}]

            self.assertEqual(result.results[0]['faultCode'], 1)
            self.assertEqual(result.results[0]['faultString'],
765
                '<class \'Exception\'>:method "this_is_not_exists" '
766
                'is not supported')
767
        except (xmlrpclib.ProtocolError, OSError) as e:
768 769 770
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e):
                # protocol error; provide additional information in test output
771
                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
772

Christian Heimes's avatar
Christian Heimes committed
773 774 775
    def test_dotted_attribute(self):
        # Raises an AttributeError because private methods are not allowed.
        self.assertRaises(AttributeError,
776
                          xmlrpc.server.resolve_dotted_attribute, str, '__add')
Christian Heimes's avatar
Christian Heimes committed
777

778
        self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title'))
Christian Heimes's avatar
Christian Heimes committed
779 780 781 782
        # Get the test to run faster by sending a request with test_simple1.
        # This avoids waiting for the socket timeout.
        self.test_simple1()

783 784 785 786 787 788
    def test_allow_dotted_names_true(self):
        # XXX also need allow_dotted_names_false test.
        server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT))
        data = server.Fixture.getData()
        self.assertEqual(data, '42')

789 790 791 792
    def test_unicode_host(self):
        server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT))
        self.assertEqual(server.add("a", "\xe9"), "a\xe9")

793 794 795 796 797 798
    def test_partial_post(self):
        # Check that a partial POST doesn't make the server loop: issue #14001.
        conn = http.client.HTTPConnection(ADDR, PORT)
        conn.request('POST', '/RPC2 HTTP/1.0\r\nContent-Length: 100\r\n\r\nbye')
        conn.close()

799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815
    def test_context_manager(self):
        with xmlrpclib.ServerProxy(URL) as server:
            server.add(2, 3)
            self.assertNotEqual(server('transport')._connection,
                                (None, None))
        self.assertEqual(server('transport')._connection,
                         (None, None))

    def test_context_manager_method_error(self):
        try:
            with xmlrpclib.ServerProxy(URL) as server:
                server.add(2, "a")
        except xmlrpclib.Fault:
            pass
        self.assertEqual(server('transport')._connection,
                         (None, None))

816

817 818 819 820 821 822 823
class SimpleServerEncodingTestCase(BaseServerTestCase):
    @staticmethod
    def threadFunc(evt, numrequests, requestHandler=None, encoding=None):
        http_server(evt, numrequests, requestHandler, 'iso-8859-15')

    def test_server_encoding(self):
        start_string = '\u20ac'
824
        end_string = '\xa4'
825 826 827 828 829 830 831 832 833 834 835 836

        try:
            p = xmlrpclib.ServerProxy(URL)
            self.assertEqual(p.add(start_string, end_string),
                             start_string + end_string)
        except (xmlrpclib.ProtocolError, socket.error) as e:
            # ignore failures due to non-blocking socket unavailable errors.
            if not is_unavailable_exception(e):
                # protocol error; provide additional information in test output
                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))


837 838 839 840 841 842 843
class MultiPathServerTestCase(BaseServerTestCase):
    threadFunc = staticmethod(http_multi_server)
    request_count = 2
    def test_path1(self):
        p = xmlrpclib.ServerProxy(URL+"/foo")
        self.assertEqual(p.pow(6,8), 6**8)
        self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
844

845 846 847 848 849
    def test_path2(self):
        p = xmlrpclib.ServerProxy(URL+"/foo/bar")
        self.assertEqual(p.add(6,8), 6+8)
        self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8)

850 851 852 853
    def test_path3(self):
        p = xmlrpclib.ServerProxy(URL+"/is/broken")
        self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)

854 855
#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
#does indeed serve subsequent requests on the same connection
856
class BaseKeepaliveServerTestCase(BaseServerTestCase):
857 858 859 860 861 862 863 864
    #a request handler that supports keep-alive and logs requests into a
    #class variable
    class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
        parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler
        protocol_version = 'HTTP/1.1'
        myRequests = []
        def handle(self):
            self.myRequests.append([])
865
            self.reqidx = len(self.myRequests)-1
866 867 868
            return self.parentClass.handle(self)
        def handle_one_request(self):
            result = self.parentClass.handle_one_request(self)
869
            self.myRequests[self.reqidx].append(self.raw_requestline)
870 871 872 873 874 875 876 877
            return result

    requestHandler = RequestHandler
    def setUp(self):
        #clear request log
        self.RequestHandler.myRequests = []
        return BaseServerTestCase.setUp(self)

878 879 880
#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
#does indeed serve subsequent requests on the same connection
class KeepaliveServerTestCase1(BaseKeepaliveServerTestCase):
881 882
    def test_two(self):
        p = xmlrpclib.ServerProxy(URL)
883
        #do three requests.
884 885
        self.assertEqual(p.pow(6,8), 6**8)
        self.assertEqual(p.pow(6,8), 6**8)
886
        self.assertEqual(p.pow(6,8), 6**8)
887
        p("close")()
888 889

        #they should have all been handled by a single request handler
890
        self.assertEqual(len(self.RequestHandler.myRequests), 1)
891 892 893 894

        #check that we did at least two (the third may be pending append
        #due to thread scheduling)
        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
895

896

897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912
#test special attribute access on the serverproxy, through the __call__
#function.
class KeepaliveServerTestCase2(BaseKeepaliveServerTestCase):
    #ask for two keepalive requests to be handled.
    request_count=2

    def test_close(self):
        p = xmlrpclib.ServerProxy(URL)
        #do some requests with close.
        self.assertEqual(p.pow(6,8), 6**8)
        self.assertEqual(p.pow(6,8), 6**8)
        self.assertEqual(p.pow(6,8), 6**8)
        p("close")() #this should trigger a new keep-alive request
        self.assertEqual(p.pow(6,8), 6**8)
        self.assertEqual(p.pow(6,8), 6**8)
        self.assertEqual(p.pow(6,8), 6**8)
913
        p("close")()
914 915 916 917 918 919 920

        #they should have all been two request handlers, each having logged at least
        #two complete requests
        self.assertEqual(len(self.RequestHandler.myRequests), 2)
        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2)

921

922 923 924 925 926 927
    def test_transport(self):
        p = xmlrpclib.ServerProxy(URL)
        #do some requests with close.
        self.assertEqual(p.pow(6,8), 6**8)
        p("transport").close() #same as above, really.
        self.assertEqual(p.pow(6,8), 6**8)
928
        p("close")()
929 930
        self.assertEqual(len(self.RequestHandler.myRequests), 2)

931 932
#A test case that verifies that gzip encoding works in both directions
#(for a request and the response)
933
@unittest.skipIf(gzip is None, 'requires gzip')
934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959
class GzipServerTestCase(BaseServerTestCase):
    #a request handler that supports keep-alive and logs requests into a
    #class variable
    class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
        parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler
        protocol_version = 'HTTP/1.1'

        def do_POST(self):
            #store content of last request in class
            self.__class__.content_length = int(self.headers["content-length"])
            return self.parentClass.do_POST(self)
    requestHandler = RequestHandler

    class Transport(xmlrpclib.Transport):
        #custom transport, stores the response length for our perusal
        fake_gzip = False
        def parse_response(self, response):
            self.response_length=int(response.getheader("content-length", 0))
            return xmlrpclib.Transport.parse_response(self, response)

        def send_content(self, connection, body):
            if self.fake_gzip:
                #add a lone gzip header to induce decode error remotely
                connection.putheader("Content-Encoding", "gzip")
            return xmlrpclib.Transport.send_content(self, connection, body)

960 961 962
    def setUp(self):
        BaseServerTestCase.setUp(self)

963 964 965 966 967 968 969 970 971 972
    def test_gzip_request(self):
        t = self.Transport()
        t.encode_threshold = None
        p = xmlrpclib.ServerProxy(URL, transport=t)
        self.assertEqual(p.pow(6,8), 6**8)
        a = self.RequestHandler.content_length
        t.encode_threshold = 0 #turn on request encoding
        self.assertEqual(p.pow(6,8), 6**8)
        b = self.RequestHandler.content_length
        self.assertTrue(a>b)
973
        p("close")()
974 975 976 977 978 979

    def test_bad_gzip_request(self):
        t = self.Transport()
        t.encode_threshold = None
        t.fake_gzip = True
        p = xmlrpclib.ServerProxy(URL, transport=t)
980 981
        cm = self.assertRaisesRegex(xmlrpclib.ProtocolError,
                                    re.compile(r"\b400\b"))
982 983
        with cm:
            p.pow(6, 8)
984
        p("close")()
985

986
    def test_gzip_response(self):
987 988 989 990 991 992 993 994
        t = self.Transport()
        p = xmlrpclib.ServerProxy(URL, transport=t)
        old = self.requestHandler.encode_threshold
        self.requestHandler.encode_threshold = None #no encoding
        self.assertEqual(p.pow(6,8), 6**8)
        a = t.response_length
        self.requestHandler.encode_threshold = 0 #always encode
        self.assertEqual(p.pow(6,8), 6**8)
995
        p("close")()
996 997 998 999
        b = t.response_length
        self.requestHandler.encode_threshold = old
        self.assertTrue(a>b)

1000

Benjamin Peterson's avatar
Benjamin Peterson committed
1001
@unittest.skipIf(gzip is None, 'requires gzip')
1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013
class GzipUtilTestCase(unittest.TestCase):

    def test_gzip_decode_limit(self):
        max_gzip_decode = 20 * 1024 * 1024
        data = b'\0' * max_gzip_decode
        encoded = xmlrpclib.gzip_encode(data)
        decoded = xmlrpclib.gzip_decode(encoded)
        self.assertEqual(len(decoded), max_gzip_decode)

        data = b'\0' * (max_gzip_decode + 1)
        encoded = xmlrpclib.gzip_encode(data)

1014
        with self.assertRaisesRegex(ValueError,
1015
                                    "max gzipped payload length exceeded"):
1016 1017 1018 1019 1020
            xmlrpclib.gzip_decode(encoded)

        xmlrpclib.gzip_decode(encoded, max_decode=-1)


1021 1022
#Test special attributes of the ServerProxy object
class ServerProxyTestCase(unittest.TestCase):
1023 1024 1025 1026 1027 1028 1029 1030 1031 1032
    def setUp(self):
        unittest.TestCase.setUp(self)
        if threading:
            self.url = URL
        else:
            # Without threading, http_server() and http_multi_server() will not
            # be executed and URL is still equal to None. 'http://' is a just
            # enough to choose the scheme (HTTP)
            self.url = 'http://'

1033
    def test_close(self):
1034
        p = xmlrpclib.ServerProxy(self.url)
1035 1036 1037 1038
        self.assertEqual(p('close')(), None)

    def test_transport(self):
        t = xmlrpclib.Transport()
1039
        p = xmlrpclib.ServerProxy(self.url, transport=t)
1040 1041
        self.assertEqual(p('transport'), t)

1042

1043 1044
# This is a contrived way to make a failure occur on the server side
# in order to test the _send_traceback_header flag on the server
1045 1046
class FailingMessageClass(http.client.HTTPMessage):
    def get(self, key, failobj=None):
1047 1048 1049
        key = key.lower()
        if key == 'content-length':
            return 'I am broken'
1050
        return super().get(key, failobj)
1051 1052


1053
@unittest.skipUnless(threading, 'Threading required for this test.')
1054 1055 1056 1057
class FailingServerTestCase(unittest.TestCase):
    def setUp(self):
        self.evt = threading.Event()
        # start server thread to handle requests
1058
        serv_args = (self.evt, 1)
1059 1060
        threading.Thread(target=http_server, args=serv_args).start()

1061 1062 1063
        # wait for the server to be ready
        self.evt.wait()
        self.evt.clear()
1064 1065 1066 1067 1068

    def tearDown(self):
        # wait on the server thread to terminate
        self.evt.wait()
        # reset flag
1069
        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False
1070
        # reset message class
1071 1072
        default_class = http.client.HTTPMessage
        xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = default_class
1073 1074 1075

    def test_basic(self):
        # check that flag is false by default
1076
        flagval = xmlrpc.server.SimpleXMLRPCServer._send_traceback_header
1077 1078
        self.assertEqual(flagval, False)

1079
        # enable traceback reporting
1080
        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
1081 1082 1083

        # test a call that shouldn't fail just as a smoke test
        try:
1084
            p = xmlrpclib.ServerProxy(URL)
1085
            self.assertEqual(p.pow(6,8), 6**8)
1086
        except (xmlrpclib.ProtocolError, OSError) as e:
1087 1088 1089
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e):
                # protocol error; provide additional information in test output
1090
                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
1091 1092 1093

    def test_fail_no_info(self):
        # use the broken message class
1094
        xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
1095 1096

        try:
1097
            p = xmlrpclib.ServerProxy(URL)
1098
            p.pow(6,8)
1099
        except (xmlrpclib.ProtocolError, OSError) as e:
1100
            # ignore failures due to non-blocking socket 'unavailable' errors
1101
            if not is_unavailable_exception(e) and hasattr(e, "headers"):
1102 1103 1104
                # The two server-side error headers shouldn't be sent back in this case
                self.assertTrue(e.headers.get("X-exception") is None)
                self.assertTrue(e.headers.get("X-traceback") is None)
1105 1106 1107 1108 1109
        else:
            self.fail('ProtocolError not raised')

    def test_fail_with_info(self):
        # use the broken message class
1110
        xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
1111 1112 1113

        # Check that errors in the server send back exception/traceback
        # info when flag is set
1114
        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
1115 1116

        try:
1117
            p = xmlrpclib.ServerProxy(URL)
1118
            p.pow(6,8)
1119
        except (xmlrpclib.ProtocolError, OSError) as e:
1120
            # ignore failures due to non-blocking socket 'unavailable' errors
1121
            if not is_unavailable_exception(e) and hasattr(e, "headers"):
1122 1123
                # We should get error info in the response
                expected_err = "invalid literal for int() with base 10: 'I am broken'"
1124 1125
                self.assertEqual(e.headers.get("X-exception"), expected_err)
                self.assertTrue(e.headers.get("X-traceback") is not None)
1126 1127 1128
        else:
            self.fail('ProtocolError not raised')

1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142

@contextlib.contextmanager
def captured_stdout(encoding='utf-8'):
    """A variation on support.captured_stdout() which gives a text stream
    having a `buffer` attribute.
    """
    orig_stdout = sys.stdout
    sys.stdout = io.TextIOWrapper(io.BytesIO(), encoding=encoding)
    try:
        yield sys.stdout
    finally:
        sys.stdout = orig_stdout


1143 1144
class CGIHandlerTestCase(unittest.TestCase):
    def setUp(self):
1145
        self.cgi = xmlrpc.server.CGIXMLRPCRequestHandler()
1146 1147 1148 1149 1150

    def tearDown(self):
        self.cgi = None

    def test_cgi_get(self):
1151
        with support.EnvironmentVarGuard() as env:
1152
            env['REQUEST_METHOD'] = 'GET'
1153 1154
            # if the method is GET and no request_text is given, it runs handle_get
            # get sysout output
1155
            with captured_stdout(encoding=self.cgi.encoding) as data_out:
1156
                self.cgi.handle_request()
1157

1158
            # parse Status header
1159 1160
            data_out.seek(0)
            handle = data_out.read()
1161 1162
            status = handle.split()[1]
            message = ' '.join(handle.split()[2:4])
1163

1164 1165
            self.assertEqual(status, '400')
            self.assertEqual(message, 'Bad Request')
1166 1167 1168 1169


    def test_cgi_xmlrpc_response(self):
        data = """<?xml version='1.0'?>
1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183
        <methodCall>
            <methodName>test_method</methodName>
            <params>
                <param>
                    <value><string>foo</string></value>
                </param>
                <param>
                    <value><string>bar</string></value>
                </param>
            </params>
        </methodCall>
        """

        with support.EnvironmentVarGuard() as env, \
1184
             captured_stdout(encoding=self.cgi.encoding) as data_out, \
1185 1186 1187
             support.captured_stdin() as data_in:
            data_in.write(data)
            data_in.seek(0)
1188
            env['CONTENT_LENGTH'] = str(len(data))
1189
            self.cgi.handle_request()
1190
        data_out.seek(0)
1191 1192

        # will respond exception, if so, our goal is achieved ;)
1193
        handle = data_out.read()
1194

1195 1196
        # start with 44th char so as not to get http header, we just
        # need only xml
1197 1198
        self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, handle[44:])

1199 1200 1201 1202 1203 1204 1205
        # Also test the content-length returned  by handle_request
        # Using the same test method inorder to avoid all the datapassing
        # boilerplate code.
        # Test for bug: http://bugs.python.org/issue5040

        content = handle[handle.find("<?xml"):]

1206
        self.assertEqual(
1207 1208 1209 1210
            int(re.search('Content-Length: (\d+)', handle).group(1)),
            len(content))


1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243
class UseBuiltinTypesTestCase(unittest.TestCase):

    def test_use_builtin_types(self):
        # SimpleXMLRPCDispatcher.__init__ accepts use_builtin_types, which
        # makes all dispatch of binary data as bytes instances, and all
        # dispatch of datetime argument as datetime.datetime instances.
        self.log = []
        expected_bytes = b"my dog has fleas"
        expected_date = datetime.datetime(2008, 5, 26, 18, 25, 12)
        marshaled = xmlrpclib.dumps((expected_bytes, expected_date), 'foobar')
        def foobar(*args):
            self.log.extend(args)
        handler = xmlrpc.server.SimpleXMLRPCDispatcher(
            allow_none=True, encoding=None, use_builtin_types=True)
        handler.register_function(foobar)
        handler._marshaled_dispatch(marshaled)
        self.assertEqual(len(self.log), 2)
        mybytes, mydate = self.log
        self.assertEqual(self.log, [expected_bytes, expected_date])
        self.assertIs(type(mydate), datetime.datetime)
        self.assertIs(type(mybytes), bytes)

    def test_cgihandler_has_use_builtin_types_flag(self):
        handler = xmlrpc.server.CGIXMLRPCRequestHandler(use_builtin_types=True)
        self.assertTrue(handler.use_builtin_types)

    def test_xmlrpcserver_has_use_builtin_types_flag(self):
        server = xmlrpc.server.SimpleXMLRPCServer(("localhost", 0),
            use_builtin_types=True)
        server.server_close()
        self.assertTrue(server.use_builtin_types)


1244 1245 1246 1247
@support.reap_threads
def test_main():
    support.run_unittest(XMLRPCTestCase, HelperTestCase, DateTimeTestCase,
            BinaryTestCase, FaultTestCase, UseBuiltinTypesTestCase,
1248 1249 1250
            SimpleServerTestCase, SimpleServerEncodingTestCase,
            KeepaliveServerTestCase1, KeepaliveServerTestCase2,
            GzipServerTestCase, GzipUtilTestCase,
1251 1252 1253 1254
            MultiPathServerTestCase, ServerProxyTestCase, FailingServerTestCase,
            CGIHandlerTestCase)


1255
if __name__ == "__main__":
1256
    test_main()