test_httplib.py 21.4 KB
Newer Older
1
import errno
2
from http import client
3
import io
4
import os
5
import array
6
import socket
7

8 9
import unittest
TestCase = unittest.TestCase
10

11
from test import support
12

13 14 15 16 17 18 19 20
here = os.path.dirname(__file__)
# Self-signed cert file for 'localhost'
CERT_localhost = os.path.join(here, 'keycert.pem')
# Self-signed cert file for 'fakehostname'
CERT_fakehostname = os.path.join(here, 'keycert2.pem')
# Root cert file (CA) for svn.python.org's cert
CACERT_svn_python_org = os.path.join(here, 'https_svn_python_org_root.pem')

21
HOST = support.HOST
Christian Heimes's avatar
Christian Heimes committed
22

23
class FakeSocket:
24 25
    def __init__(self, text, fileclass=io.BytesIO):
        if isinstance(text, str):
26
            text = text.encode("ascii")
27
        self.text = text
28
        self.fileclass = fileclass
Martin v. Löwis's avatar
Martin v. Löwis committed
29
        self.data = b''
30

31
    def sendall(self, data):
32
        self.data += data
33

34 35
    def makefile(self, mode, bufsize=None):
        if mode != 'r' and mode != 'rb':
36
            raise client.UnimplementedFileMode()
37 38
        return self.fileclass(self.text)

39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
class EPipeSocket(FakeSocket):

    def __init__(self, text, pipe_trigger):
        # When sendall() is called with pipe_trigger, raise EPIPE.
        FakeSocket.__init__(self, text)
        self.pipe_trigger = pipe_trigger

    def sendall(self, data):
        if self.pipe_trigger in data:
            raise socket.error(errno.EPIPE, "gotcha")
        self.data += data

    def close(self):
        pass

54
class NoEOFStringIO(io.BytesIO):
55 56
    """Like StringIO, but raises AssertionError on EOF.

57
    This is used below to test that http.client doesn't try to read
58 59 60
    more from the underlying file than it should.
    """
    def read(self, n=-1):
61
        data = io.BytesIO.read(self, n)
62
        if data == b'':
63 64 65 66
            raise AssertionError('caller tried to read past EOF')
        return data

    def readline(self, length=None):
67
        data = io.BytesIO.readline(self, length)
68
        if data == b'':
69 70
            raise AssertionError('caller tried to read past EOF')
        return data
71

72 73 74 75 76 77 78 79 80
class HeaderTests(TestCase):
    def test_auto_headers(self):
        # Some headers are added automatically, but should not be added by
        # .request() if they are explicitly set.

        class HeaderCountingBuffer(list):
            def __init__(self):
                self.count = {}
            def append(self, item):
81
                kv = item.split(b':')
82 83
                if len(kv) > 1:
                    # item is a 'Key: Value' header string
Martin v. Löwis's avatar
Martin v. Löwis committed
84
                    lcKey = kv[0].decode('ascii').lower()
85 86 87 88 89 90
                    self.count.setdefault(lcKey, 0)
                    self.count[lcKey] += 1
                list.append(self, item)

        for explicit_header in True, False:
            for header in 'Content-length', 'Host', 'Accept-encoding':
91
                conn = client.HTTPConnection('example.com')
92 93 94 95 96 97 98 99 100 101
                conn.sock = FakeSocket('blahblahblah')
                conn._buffer = HeaderCountingBuffer()

                body = 'spamspamspam'
                headers = {}
                if explicit_header:
                    headers[header] = str(len(body))
                conn.request('POST', '/', body, headers)
                self.assertEqual(conn._buffer.count[header.lower()], 1)

102 103 104 105 106 107 108 109
    def test_putheader(self):
        conn = client.HTTPConnection('example.com')
        conn.sock = FakeSocket(None)
        conn.putrequest('GET','/')
        conn.putheader('Content-length', 42)
        self.assertTrue(b'Content-length: 42' in conn._buffer)


110 111 112 113 114 115
class BasicTest(TestCase):
    def test_status_lines(self):
        # Test HTTP status lines

        body = "HTTP/1.1 200 Ok\r\n\r\nText"
        sock = FakeSocket(body)
116
        resp = client.HTTPResponse(sock)
117
        resp.begin()
118
        self.assertEqual(resp.read(), b"Text")
119
        self.assertTrue(resp.isclosed())
120 121 122

        body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText"
        sock = FakeSocket(body)
123 124
        resp = client.HTTPResponse(sock)
        self.assertRaises(client.BadStatusLine, resp.begin)
125

126 127 128 129
    def test_bad_status_repr(self):
        exc = client.BadStatusLine('')
        self.assertEquals(repr(exc), '''BadStatusLine("\'\'",)''')

130 131 132 133 134
    def test_partial_reads(self):
        # if we have a lenght, the system knows when to close itself
        # same behaviour than when we read the whole thing with read()
        body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
        sock = FakeSocket(body)
135
        resp = client.HTTPResponse(sock)
136 137 138 139 140 141
        resp.begin()
        self.assertEqual(resp.read(2), b'Te')
        self.assertFalse(resp.isclosed())
        self.assertEqual(resp.read(2), b'xt')
        self.assertTrue(resp.isclosed())

142 143 144 145
    def test_host_port(self):
        # Check invalid host_port

        for hp in ("www.python.org:abc", "www.python.org:"):
146
            self.assertRaises(client.InvalidURL, client.HTTPConnection, hp)
147

148 149
        for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
                          "fe80::207:e9ff:fe9b", 8000),
150 151 152
                         ("www.python.org:80", "www.python.org", 80),
                         ("www.python.org", "www.python.org", 80),
                         ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80)):
153
            c = client.HTTPConnection(hp)
154 155
            self.assertEqual(h, c.host)
            self.assertEqual(p, c.port)
156 157 158 159

    def test_response_headers(self):
        # test response with multiple message headers with the same field name.
        text = ('HTTP/1.1 200 OK\r\n'
160 161
                'Set-Cookie: Customer="WILE_E_COYOTE"; '
                'Version="1"; Path="/acme"\r\n'
162 163 164 165 166 167 168 169
                'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";'
                ' Path="/acme"\r\n'
                '\r\n'
                'No body\r\n')
        hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"'
               ', '
               'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"')
        s = FakeSocket(text)
170
        r = client.HTTPResponse(s)
171 172
        r.begin()
        cookies = r.getheader("Set-Cookie")
173
        self.assertEqual(cookies, hdr)
174 175 176 177 178 179 180 181 182

    def test_read_head(self):
        # Test that the library doesn't attempt to read any data
        # from a HEAD request.  (Tickles SF bug #622042.)
        sock = FakeSocket(
            'HTTP/1.1 200 OK\r\n'
            'Content-Length: 14432\r\n'
            '\r\n',
            NoEOFStringIO)
183
        resp = client.HTTPResponse(sock, method="HEAD")
184
        resp.begin()
185
        if resp.read():
186 187 188
            self.fail("Did not expect response from HEAD request")

    def test_send_file(self):
189 190
        expected = (b'GET /foo HTTP/1.1\r\nHost: example.com\r\n'
                    b'Accept-Encoding: identity\r\nContent-Length:')
191

192
        body = open(__file__, 'rb')
193
        conn = client.HTTPConnection('example.com')
194 195 196
        sock = FakeSocket(body)
        conn.sock = sock
        conn.request('GET', '/foo', body)
197 198
        self.assertTrue(sock.data.startswith(expected), '%r != %r' %
                (sock.data[:len(expected)], expected))
199

200 201 202 203 204 205 206 207 208 209 210 211 212 213
    def test_send(self):
        expected = b'this is a test this is only a test'
        conn = client.HTTPConnection('example.com')
        sock = FakeSocket(None)
        conn.sock = sock
        conn.send(expected)
        self.assertEquals(expected, sock.data)
        sock.data = b''
        conn.send(array.array('b', expected))
        self.assertEquals(expected, sock.data)
        sock.data = b''
        conn.send(io.BytesIO(expected))
        self.assertEquals(expected, sock.data)

214 215 216 217 218 219 220 221 222 223
    def test_chunked(self):
        chunked_start = (
            'HTTP/1.1 200 OK\r\n'
            'Transfer-Encoding: chunked\r\n\r\n'
            'a\r\n'
            'hello worl\r\n'
            '1\r\n'
            'd\r\n'
        )
        sock = FakeSocket(chunked_start + '0\r\n')
224
        resp = client.HTTPResponse(sock, method="GET")
225 226 227 228 229 230
        resp.begin()
        self.assertEquals(resp.read(), b'hello world')
        resp.close()

        for x in ('', 'foo\r\n'):
            sock = FakeSocket(chunked_start + x)
231
            resp = client.HTTPResponse(sock, method="GET")
232 233 234
            resp.begin()
            try:
                resp.read()
235
            except client.IncompleteRead as i:
236
                self.assertEquals(i.partial, b'hello world')
237 238
                self.assertEqual(repr(i),'IncompleteRead(11 bytes read)')
                self.assertEqual(str(i),'IncompleteRead(11 bytes read)')
239 240 241 242 243
            else:
                self.fail('IncompleteRead expected')
            finally:
                resp.close()

244 245 246 247 248 249 250 251 252 253 254 255 256 257 258
    def test_chunked_head(self):
        chunked_start = (
            'HTTP/1.1 200 OK\r\n'
            'Transfer-Encoding: chunked\r\n\r\n'
            'a\r\n'
            'hello world\r\n'
            '1\r\n'
            'd\r\n'
        )
        sock = FakeSocket(chunked_start + '0\r\n')
        resp = client.HTTPResponse(sock, method="HEAD")
        resp.begin()
        self.assertEquals(resp.read(), b'')
        self.assertEquals(resp.status, 200)
        self.assertEquals(resp.reason, 'OK')
259
        self.assertTrue(resp.isclosed())
260

261
    def test_negative_content_length(self):
Jeremy Hylton's avatar
Jeremy Hylton committed
262 263
        sock = FakeSocket(
            'HTTP/1.1 200 OK\r\nContent-Length: -1\r\n\r\nHello\r\n')
264
        resp = client.HTTPResponse(sock, method="GET")
265 266 267 268
        resp.begin()
        self.assertEquals(resp.read(), b'Hello\r\n')
        resp.close()

269 270
    def test_incomplete_read(self):
        sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello\r\n')
271
        resp = client.HTTPResponse(sock, method="GET")
272 273 274
        resp.begin()
        try:
            resp.read()
275
        except client.IncompleteRead as i:
276 277 278 279 280 281 282 283 284 285
            self.assertEquals(i.partial, b'Hello\r\n')
            self.assertEqual(repr(i),
                             "IncompleteRead(7 bytes read, 3 more expected)")
            self.assertEqual(str(i),
                             "IncompleteRead(7 bytes read, 3 more expected)")
        else:
            self.fail('IncompleteRead expected')
        finally:
            resp.close()

286 287 288 289 290 291 292 293 294 295 296 297 298 299
    def test_epipe(self):
        sock = EPipeSocket(
            "HTTP/1.0 401 Authorization Required\r\n"
            "Content-type: text/html\r\n"
            "WWW-Authenticate: Basic realm=\"example\"\r\n",
            b"Content-Length")
        conn = client.HTTPConnection("example.com")
        conn.sock = sock
        self.assertRaises(socket.error,
                          lambda: conn.request("PUT", "/url", "body"))
        resp = conn.getresponse()
        self.assertEqual(401, resp.status)
        self.assertEqual("Basic realm=\"example\"",
                         resp.getheader("www-authenticate"))
300

301 302
class OfflineTest(TestCase):
    def test_responses(self):
303
        self.assertEquals(client.responses[client.NOT_FOUND], "Not Found")
304

305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336

class SourceAddressTest(TestCase):
    def setUp(self):
        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.port = support.bind_port(self.serv)
        self.source_port = support.find_unused_port()
        self.serv.listen(5)
        self.conn = None

    def tearDown(self):
        if self.conn:
            self.conn.close()
            self.conn = None
        self.serv.close()
        self.serv = None

    def testHTTPConnectionSourceAddress(self):
        self.conn = client.HTTPConnection(HOST, self.port,
                source_address=('', self.source_port))
        self.conn.connect()
        self.assertEqual(self.conn.sock.getsockname()[1], self.source_port)

    @unittest.skipIf(not hasattr(client, 'HTTPSConnection'),
                     'http.client.HTTPSConnection not defined')
    def testHTTPSConnectionSourceAddress(self):
        self.conn = client.HTTPSConnection(HOST, self.port,
                source_address=('', self.source_port))
        # We don't test anything here other the constructor not barfing as
        # this code doesn't deal with setting up an active running SSL server
        # for an ssl_wrapped connect() to actually return from.


337
class TimeoutTest(TestCase):
Christian Heimes's avatar
Christian Heimes committed
338
    PORT = None
339 340 341

    def setUp(self):
        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
342
        TimeoutTest.PORT = support.bind_port(self.serv)
343 344 345 346 347 348 349
        self.serv.listen(5)

    def tearDown(self):
        self.serv.close()
        self.serv = None

    def testTimeoutAttribute(self):
350 351 352
        # This will prove that the timeout gets through HTTPConnection
        # and into the socket.

Georg Brandl's avatar
Georg Brandl committed
353
        # default -- use global socket timeout
354
        self.assertTrue(socket.getdefaulttimeout() is None)
Georg Brandl's avatar
Georg Brandl committed
355 356
        socket.setdefaulttimeout(30)
        try:
357
            httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT)
Georg Brandl's avatar
Georg Brandl committed
358 359 360
            httpConn.connect()
        finally:
            socket.setdefaulttimeout(None)
361 362 363
        self.assertEqual(httpConn.sock.gettimeout(), 30)
        httpConn.close()

Georg Brandl's avatar
Georg Brandl committed
364
        # no timeout -- do not use global socket default
365
        self.assertTrue(socket.getdefaulttimeout() is None)
366 367
        socket.setdefaulttimeout(30)
        try:
368
            httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT,
Christian Heimes's avatar
Christian Heimes committed
369
                                              timeout=None)
370 371
            httpConn.connect()
        finally:
Georg Brandl's avatar
Georg Brandl committed
372 373 374 375 376
            socket.setdefaulttimeout(None)
        self.assertEqual(httpConn.sock.gettimeout(), None)
        httpConn.close()

        # a value
377
        httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30)
Georg Brandl's avatar
Georg Brandl committed
378
        httpConn.connect()
379 380 381
        self.assertEqual(httpConn.sock.gettimeout(), 30)
        httpConn.close()

382 383 384 385 386 387 388 389 390 391

class HTTPSTest(TestCase):

    def setUp(self):
        if not hasattr(client, 'HTTPSConnection'):
            self.skipTest('ssl support required')

    def make_server(self, certfile):
        from test.ssl_servers import make_https_server
        return make_https_server(self, certfile)
392 393

    def test_attributes(self):
394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472
        # simple test to check it's storing the timeout
        h = client.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30)
        self.assertEqual(h.timeout, 30)

    def _check_svn_python_org(self, resp):
        # Just a simple check that everything went fine
        server_string = resp.getheader('server')
        self.assertIn('Apache', server_string)

    def test_networked(self):
        # Default settings: no cert verification is done
        support.requires('network')
        with support.transient_internet('svn.python.org'):
            h = client.HTTPSConnection('svn.python.org', 443)
            h.request('GET', '/')
            resp = h.getresponse()
            self._check_svn_python_org(resp)

    def test_networked_good_cert(self):
        # We feed a CA cert that validates the server's cert
        import ssl
        support.requires('network')
        with support.transient_internet('svn.python.org'):
            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
            context.verify_mode = ssl.CERT_REQUIRED
            context.load_verify_locations(CACERT_svn_python_org)
            h = client.HTTPSConnection('svn.python.org', 443, context=context)
            h.request('GET', '/')
            resp = h.getresponse()
            self._check_svn_python_org(resp)

    def test_networked_bad_cert(self):
        # We feed a "CA" cert that is unrelated to the server's cert
        import ssl
        support.requires('network')
        with support.transient_internet('svn.python.org'):
            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
            context.verify_mode = ssl.CERT_REQUIRED
            context.load_verify_locations(CERT_localhost)
            h = client.HTTPSConnection('svn.python.org', 443, context=context)
            with self.assertRaises(ssl.SSLError):
                h.request('GET', '/')

    def test_local_good_hostname(self):
        # The (valid) cert validates the HTTP hostname
        import ssl
        from test.ssl_servers import make_https_server
        server = make_https_server(self, CERT_localhost)
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        context.verify_mode = ssl.CERT_REQUIRED
        context.load_verify_locations(CERT_localhost)
        h = client.HTTPSConnection('localhost', server.port, context=context)
        h.request('GET', '/nonexistent')
        resp = h.getresponse()
        self.assertEqual(resp.status, 404)

    def test_local_bad_hostname(self):
        # The (valid) cert doesn't validate the HTTP hostname
        import ssl
        from test.ssl_servers import make_https_server
        server = make_https_server(self, CERT_fakehostname)
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        context.verify_mode = ssl.CERT_REQUIRED
        context.load_verify_locations(CERT_fakehostname)
        h = client.HTTPSConnection('localhost', server.port, context=context)
        with self.assertRaises(ssl.CertificateError):
            h.request('GET', '/')
        # Same with explicit check_hostname=True
        h = client.HTTPSConnection('localhost', server.port, context=context,
                                   check_hostname=True)
        with self.assertRaises(ssl.CertificateError):
            h.request('GET', '/')
        # With check_hostname=False, the mismatching is ignored
        h = client.HTTPSConnection('localhost', server.port, context=context,
                                   check_hostname=False)
        h.request('GET', '/nonexistent')
        resp = h.getresponse()
        self.assertEqual(resp.status, 404)

473

474 475 476 477
class RequestBodyTest(TestCase):
    """Test cases where a request includes a message body."""

    def setUp(self):
478
        self.conn = client.HTTPConnection('example.com')
479
        self.conn.sock = self.sock = FakeSocket("")
480 481 482 483 484
        self.conn.sock = self.sock

    def get_headers_and_fp(self):
        f = io.BytesIO(self.sock.data)
        f.readline()  # read the request line
485
        message = client.parse_headers(f)
486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544
        return message, f

    def test_manual_content_length(self):
        # Set an incorrect content-length so that we can verify that
        # it will not be over-ridden by the library.
        self.conn.request("PUT", "/url", "body",
                          {"Content-Length": "42"})
        message, f = self.get_headers_and_fp()
        self.assertEqual("42", message.get("content-length"))
        self.assertEqual(4, len(f.read()))

    def test_ascii_body(self):
        self.conn.request("PUT", "/url", "body")
        message, f = self.get_headers_and_fp()
        self.assertEqual("text/plain", message.get_content_type())
        self.assertEqual(None, message.get_charset())
        self.assertEqual("4", message.get("content-length"))
        self.assertEqual(b'body', f.read())

    def test_latin1_body(self):
        self.conn.request("PUT", "/url", "body\xc1")
        message, f = self.get_headers_and_fp()
        self.assertEqual("text/plain", message.get_content_type())
        self.assertEqual(None, message.get_charset())
        self.assertEqual("5", message.get("content-length"))
        self.assertEqual(b'body\xc1', f.read())

    def test_bytes_body(self):
        self.conn.request("PUT", "/url", b"body\xc1")
        message, f = self.get_headers_and_fp()
        self.assertEqual("text/plain", message.get_content_type())
        self.assertEqual(None, message.get_charset())
        self.assertEqual("5", message.get("content-length"))
        self.assertEqual(b'body\xc1', f.read())

    def test_file_body(self):
        f = open(support.TESTFN, "w")
        f.write("body")
        f.close()
        f = open(support.TESTFN)
        self.conn.request("PUT", "/url", f)
        message, f = self.get_headers_and_fp()
        self.assertEqual("text/plain", message.get_content_type())
        self.assertEqual(None, message.get_charset())
        self.assertEqual("4", message.get("content-length"))
        self.assertEqual(b'body', f.read())

    def test_binary_file_body(self):
        f = open(support.TESTFN, "wb")
        f.write(b"body\xc1")
        f.close()
        f = open(support.TESTFN, "rb")
        self.conn.request("PUT", "/url", f)
        message, f = self.get_headers_and_fp()
        self.assertEqual("text/plain", message.get_content_type())
        self.assertEqual(None, message.get_charset())
        self.assertEqual("5", message.get("content-length"))
        self.assertEqual(b'body\xc1', f.read())

545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580

class HTTPResponseTest(TestCase):

    def setUp(self):
        body = "HTTP/1.1 200 Ok\r\nMy-Header: first-value\r\nMy-Header: \
                second-value\r\n\r\nText"
        sock = FakeSocket(body)
        self.resp = client.HTTPResponse(sock)
        self.resp.begin()

    def test_getting_header(self):
        header = self.resp.getheader('My-Header')
        self.assertEqual(header, 'first-value, second-value')

        header = self.resp.getheader('My-Header', 'some default')
        self.assertEqual(header, 'first-value, second-value')

    def test_getting_nonexistent_header_with_string_default(self):
        header = self.resp.getheader('No-Such-Header', 'default-value')
        self.assertEqual(header, 'default-value')

    def test_getting_nonexistent_header_with_iterable_default(self):
        header = self.resp.getheader('No-Such-Header', ['default', 'values'])
        self.assertEqual(header, 'default, values')

        header = self.resp.getheader('No-Such-Header', ('default', 'values'))
        self.assertEqual(header, 'default, values')

    def test_getting_nonexistent_header_without_default(self):
        header = self.resp.getheader('No-Such-Header')
        self.assertEqual(header, None)

    def test_getting_header_defaultint(self):
        header = self.resp.getheader('No-Such-Header',default=42)
        self.assertEqual(header, 42)

581
def test_main(verbose=None):
582
    support.run_unittest(HeaderTests, OfflineTest, BasicTest, TimeoutTest,
583
                         HTTPSTest, RequestBodyTest, SourceAddressTest,
584
                         HTTPResponseTest)
585

586 587
if __name__ == '__main__':
    test_main()