test_httplib.py 35.2 KB
Newer Older
1
import errno
2
from http import client
3
import io
4
import os
5
import array
6
import socket
7

8 9
import unittest
TestCase = unittest.TestCase
10

11
from test import support
12

13 14 15 16 17 18 19 20
here = os.path.dirname(__file__)
# Self-signed cert file for 'localhost'
CERT_localhost = os.path.join(here, 'keycert.pem')
# Self-signed cert file for 'fakehostname'
CERT_fakehostname = os.path.join(here, 'keycert2.pem')
# Root cert file (CA) for svn.python.org's cert
CACERT_svn_python_org = os.path.join(here, 'https_svn_python_org_root.pem')

21
HOST = support.HOST
Christian Heimes's avatar
Christian Heimes committed
22

23
class FakeSocket:
24 25
    def __init__(self, text, fileclass=io.BytesIO):
        if isinstance(text, str):
26
            text = text.encode("ascii")
27
        self.text = text
28
        self.fileclass = fileclass
Martin v. Löwis's avatar
Martin v. Löwis committed
29
        self.data = b''
30
        self.sendall_calls = 0
31

32
    def sendall(self, data):
33
        self.sendall_calls += 1
34
        self.data += data
35

36 37
    def makefile(self, mode, bufsize=None):
        if mode != 'r' and mode != 'rb':
38
            raise client.UnimplementedFileMode()
39 40
        return self.fileclass(self.text)

41 42 43 44 45 46 47 48 49
class EPipeSocket(FakeSocket):

    def __init__(self, text, pipe_trigger):
        # When sendall() is called with pipe_trigger, raise EPIPE.
        FakeSocket.__init__(self, text)
        self.pipe_trigger = pipe_trigger

    def sendall(self, data):
        if self.pipe_trigger in data:
50
            raise OSError(errno.EPIPE, "gotcha")
51 52 53 54 55
        self.data += data

    def close(self):
        pass

56 57
class NoEOFBytesIO(io.BytesIO):
    """Like BytesIO, but raises AssertionError on EOF.
58

59
    This is used below to test that http.client doesn't try to read
60 61 62
    more from the underlying file than it should.
    """
    def read(self, n=-1):
63
        data = io.BytesIO.read(self, n)
64
        if data == b'':
65 66 67 68
            raise AssertionError('caller tried to read past EOF')
        return data

    def readline(self, length=None):
69
        data = io.BytesIO.readline(self, length)
70
        if data == b'':
71 72
            raise AssertionError('caller tried to read past EOF')
        return data
73

74 75 76 77 78 79 80 81 82
class HeaderTests(TestCase):
    def test_auto_headers(self):
        # Some headers are added automatically, but should not be added by
        # .request() if they are explicitly set.

        class HeaderCountingBuffer(list):
            def __init__(self):
                self.count = {}
            def append(self, item):
83
                kv = item.split(b':')
84 85
                if len(kv) > 1:
                    # item is a 'Key: Value' header string
Martin v. Löwis's avatar
Martin v. Löwis committed
86
                    lcKey = kv[0].decode('ascii').lower()
87 88 89 90 91 92
                    self.count.setdefault(lcKey, 0)
                    self.count[lcKey] += 1
                list.append(self, item)

        for explicit_header in True, False:
            for header in 'Content-length', 'Host', 'Accept-encoding':
93
                conn = client.HTTPConnection('example.com')
94 95 96 97 98 99 100 101 102 103
                conn.sock = FakeSocket('blahblahblah')
                conn._buffer = HeaderCountingBuffer()

                body = 'spamspamspam'
                headers = {}
                if explicit_header:
                    headers[header] = str(len(body))
                conn.request('POST', '/', body, headers)
                self.assertEqual(conn._buffer.count[header.lower()], 1)

104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
    def test_content_length_0(self):

        class ContentLengthChecker(list):
            def __init__(self):
                list.__init__(self)
                self.content_length = None
            def append(self, item):
                kv = item.split(b':', 1)
                if len(kv) > 1 and kv[0].lower() == b'content-length':
                    self.content_length = kv[1].strip()
                list.append(self, item)

        # POST with empty body
        conn = client.HTTPConnection('example.com')
        conn.sock = FakeSocket(None)
        conn._buffer = ContentLengthChecker()
        conn.request('POST', '/', '')
        self.assertEqual(conn._buffer.content_length, b'0',
                        'Header Content-Length not set')

        # PUT request with empty body
        conn = client.HTTPConnection('example.com')
        conn.sock = FakeSocket(None)
        conn._buffer = ContentLengthChecker()
        conn.request('PUT', '/', '')
        self.assertEqual(conn._buffer.content_length, b'0',
                        'Header Content-Length not set')

132 133 134 135 136 137 138
    def test_putheader(self):
        conn = client.HTTPConnection('example.com')
        conn.sock = FakeSocket(None)
        conn.putrequest('GET','/')
        conn.putheader('Content-length', 42)
        self.assertTrue(b'Content-length: 42' in conn._buffer)

139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
    def test_ipv6host_header(self):
        # Default host header on IPv6 transaction should wrapped by [] if
        # its actual IPv6 address
        expected = b'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \
                   b'Accept-Encoding: identity\r\n\r\n'
        conn = client.HTTPConnection('[2001::]:81')
        sock = FakeSocket('')
        conn.sock = sock
        conn.request('GET', '/foo')
        self.assertTrue(sock.data.startswith(expected))

        expected = b'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \
                   b'Accept-Encoding: identity\r\n\r\n'
        conn = client.HTTPConnection('[2001:102A::]')
        sock = FakeSocket('')
        conn.sock = sock
        conn.request('GET', '/foo')
        self.assertTrue(sock.data.startswith(expected))

158

159 160 161 162 163 164
class BasicTest(TestCase):
    def test_status_lines(self):
        # Test HTTP status lines

        body = "HTTP/1.1 200 Ok\r\n\r\nText"
        sock = FakeSocket(body)
165
        resp = client.HTTPResponse(sock)
166
        resp.begin()
167
        self.assertEqual(resp.read(), b"Text")
168
        self.assertTrue(resp.isclosed())
169 170 171
        self.assertFalse(resp.closed)
        resp.close()
        self.assertTrue(resp.closed)
172 173 174

        body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText"
        sock = FakeSocket(body)
175 176
        resp = client.HTTPResponse(sock)
        self.assertRaises(client.BadStatusLine, resp.begin)
177

178 179
    def test_bad_status_repr(self):
        exc = client.BadStatusLine('')
180
        self.assertEqual(repr(exc), '''BadStatusLine("\'\'",)''')
181

182
    def test_partial_reads(self):
183
        # if we have a length, the system knows when to close itself
184 185 186
        # same behaviour than when we read the whole thing with read()
        body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
        sock = FakeSocket(body)
187
        resp = client.HTTPResponse(sock)
188 189 190 191 192
        resp.begin()
        self.assertEqual(resp.read(2), b'Te')
        self.assertFalse(resp.isclosed())
        self.assertEqual(resp.read(2), b'xt')
        self.assertTrue(resp.isclosed())
193 194 195
        self.assertFalse(resp.closed)
        resp.close()
        self.assertTrue(resp.closed)
196

197
    def test_partial_readintos(self):
198
        # if we have a length, the system knows when to close itself
199 200 201 202 203 204 205 206 207 208 209 210 211 212
        # same behaviour than when we read the whole thing with read()
        body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
        sock = FakeSocket(body)
        resp = client.HTTPResponse(sock)
        resp.begin()
        b = bytearray(2)
        n = resp.readinto(b)
        self.assertEqual(n, 2)
        self.assertEqual(bytes(b), b'Te')
        self.assertFalse(resp.isclosed())
        n = resp.readinto(b)
        self.assertEqual(n, 2)
        self.assertEqual(bytes(b), b'xt')
        self.assertTrue(resp.isclosed())
213 214 215
        self.assertFalse(resp.closed)
        resp.close()
        self.assertTrue(resp.closed)
216

217 218 219 220 221 222 223 224 225 226 227 228
    def test_partial_reads_no_content_length(self):
        # when no length is present, the socket should be gracefully closed when
        # all data was read
        body = "HTTP/1.1 200 Ok\r\n\r\nText"
        sock = FakeSocket(body)
        resp = client.HTTPResponse(sock)
        resp.begin()
        self.assertEqual(resp.read(2), b'Te')
        self.assertFalse(resp.isclosed())
        self.assertEqual(resp.read(2), b'xt')
        self.assertEqual(resp.read(1), b'')
        self.assertTrue(resp.isclosed())
229 230 231
        self.assertFalse(resp.closed)
        resp.close()
        self.assertTrue(resp.closed)
232

233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251
    def test_partial_readintos_no_content_length(self):
        # when no length is present, the socket should be gracefully closed when
        # all data was read
        body = "HTTP/1.1 200 Ok\r\n\r\nText"
        sock = FakeSocket(body)
        resp = client.HTTPResponse(sock)
        resp.begin()
        b = bytearray(2)
        n = resp.readinto(b)
        self.assertEqual(n, 2)
        self.assertEqual(bytes(b), b'Te')
        self.assertFalse(resp.isclosed())
        n = resp.readinto(b)
        self.assertEqual(n, 2)
        self.assertEqual(bytes(b), b'xt')
        n = resp.readinto(b)
        self.assertEqual(n, 0)
        self.assertTrue(resp.isclosed())

252 253 254 255 256 257 258 259 260 261 262 263 264
    def test_partial_reads_incomplete_body(self):
        # if the server shuts down the connection before the whole
        # content-length is delivered, the socket is gracefully closed
        body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText"
        sock = FakeSocket(body)
        resp = client.HTTPResponse(sock)
        resp.begin()
        self.assertEqual(resp.read(2), b'Te')
        self.assertFalse(resp.isclosed())
        self.assertEqual(resp.read(2), b'xt')
        self.assertEqual(resp.read(1), b'')
        self.assertTrue(resp.isclosed())

265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282
    def test_partial_readintos_incomplete_body(self):
        # if the server shuts down the connection before the whole
        # content-length is delivered, the socket is gracefully closed
        body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText"
        sock = FakeSocket(body)
        resp = client.HTTPResponse(sock)
        resp.begin()
        b = bytearray(2)
        n = resp.readinto(b)
        self.assertEqual(n, 2)
        self.assertEqual(bytes(b), b'Te')
        self.assertFalse(resp.isclosed())
        n = resp.readinto(b)
        self.assertEqual(n, 2)
        self.assertEqual(bytes(b), b'xt')
        n = resp.readinto(b)
        self.assertEqual(n, 0)
        self.assertTrue(resp.isclosed())
283 284 285
        self.assertFalse(resp.closed)
        resp.close()
        self.assertTrue(resp.closed)
286

287 288 289
    def test_host_port(self):
        # Check invalid host_port

290
        for hp in ("www.python.org:abc", "user:password@www.python.org"):
291
            self.assertRaises(client.InvalidURL, client.HTTPConnection, hp)
292

293 294
        for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
                          "fe80::207:e9ff:fe9b", 8000),
295
                         ("www.python.org:80", "www.python.org", 80),
296
                         ("www.python.org:", "www.python.org", 80),
297
                         ("www.python.org", "www.python.org", 80),
298 299
                         ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80),
                         ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b", 80)):
300
            c = client.HTTPConnection(hp)
301 302
            self.assertEqual(h, c.host)
            self.assertEqual(p, c.port)
303 304 305 306

    def test_response_headers(self):
        # test response with multiple message headers with the same field name.
        text = ('HTTP/1.1 200 OK\r\n'
307 308
                'Set-Cookie: Customer="WILE_E_COYOTE"; '
                'Version="1"; Path="/acme"\r\n'
309 310 311 312 313 314 315 316
                'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";'
                ' Path="/acme"\r\n'
                '\r\n'
                'No body\r\n')
        hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"'
               ', '
               'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"')
        s = FakeSocket(text)
317
        r = client.HTTPResponse(s)
318 319
        r.begin()
        cookies = r.getheader("Set-Cookie")
320
        self.assertEqual(cookies, hdr)
321 322 323 324 325 326 327 328

    def test_read_head(self):
        # Test that the library doesn't attempt to read any data
        # from a HEAD request.  (Tickles SF bug #622042.)
        sock = FakeSocket(
            'HTTP/1.1 200 OK\r\n'
            'Content-Length: 14432\r\n'
            '\r\n',
329
            NoEOFBytesIO)
330
        resp = client.HTTPResponse(sock, method="HEAD")
331
        resp.begin()
332
        if resp.read():
333 334
            self.fail("Did not expect response from HEAD request")

335 336 337 338 339 340 341
    def test_readinto_head(self):
        # Test that the library doesn't attempt to read any data
        # from a HEAD request.  (Tickles SF bug #622042.)
        sock = FakeSocket(
            'HTTP/1.1 200 OK\r\n'
            'Content-Length: 14432\r\n'
            '\r\n',
342
            NoEOFBytesIO)
343 344 345 346 347 348 349
        resp = client.HTTPResponse(sock, method="HEAD")
        resp.begin()
        b = bytearray(5)
        if resp.readinto(b) != 0:
            self.fail("Did not expect response from HEAD request")
        self.assertEqual(bytes(b), b'\x00'*5)

350
    def test_send_file(self):
351 352
        expected = (b'GET /foo HTTP/1.1\r\nHost: example.com\r\n'
                    b'Accept-Encoding: identity\r\nContent-Length:')
353

354 355 356 357 358 359 360
        with open(__file__, 'rb') as body:
            conn = client.HTTPConnection('example.com')
            sock = FakeSocket(body)
            conn.sock = sock
            conn.request('GET', '/foo', body)
            self.assertTrue(sock.data.startswith(expected), '%r != %r' %
                    (sock.data[:len(expected)], expected))
361

362 363 364 365 366 367
    def test_send(self):
        expected = b'this is a test this is only a test'
        conn = client.HTTPConnection('example.com')
        sock = FakeSocket(None)
        conn.sock = sock
        conn.send(expected)
368
        self.assertEqual(expected, sock.data)
369 370
        sock.data = b''
        conn.send(array.array('b', expected))
371
        self.assertEqual(expected, sock.data)
372 373
        sock.data = b''
        conn.send(io.BytesIO(expected))
374
        self.assertEqual(expected, sock.data)
375

376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396
    def test_send_updating_file(self):
        def data():
            yield 'data'
            yield None
            yield 'data_two'

        class UpdatingFile():
            mode = 'r'
            d = data()
            def read(self, blocksize=-1):
                return self.d.__next__()

        expected = b'data'

        conn = client.HTTPConnection('example.com')
        sock = FakeSocket("")
        conn.sock = sock
        conn.send(UpdatingFile())
        self.assertEqual(sock.data, expected)


397 398 399 400 401 402 403 404 405 406 407 408 409 410
    def test_send_iter(self):
        expected = b'GET /foo HTTP/1.1\r\nHost: example.com\r\n' \
                   b'Accept-Encoding: identity\r\nContent-Length: 11\r\n' \
                   b'\r\nonetwothree'

        def body():
            yield b"one"
            yield b"two"
            yield b"three"

        conn = client.HTTPConnection('example.com')
        sock = FakeSocket("")
        conn.sock = sock
        conn.request('GET', '/foo', body(), {'Content-Length': '11'})
411
        self.assertEqual(sock.data, expected)
412

413 414 415 416 417 418 419
    def test_send_type_error(self):
        # See: Issue #12676
        conn = client.HTTPConnection('example.com')
        conn.sock = FakeSocket('')
        with self.assertRaises(TypeError):
            conn.request('POST', 'test', conn)

420 421 422 423 424 425
    def test_chunked(self):
        chunked_start = (
            'HTTP/1.1 200 OK\r\n'
            'Transfer-Encoding: chunked\r\n\r\n'
            'a\r\n'
            'hello worl\r\n'
426 427 428 429 430 431
            '3\r\n'
            'd! \r\n'
            '8\r\n'
            'and now \r\n'
            '22\r\n'
            'for something completely different\r\n'
432
        )
433
        expected = b'hello world! and now for something completely different'
434
        sock = FakeSocket(chunked_start + '0\r\n')
435
        resp = client.HTTPResponse(sock, method="GET")
436
        resp.begin()
437
        self.assertEqual(resp.read(), expected)
438 439
        resp.close()

440 441 442 443 444 445 446 447
        # Various read sizes
        for n in range(1, 12):
            sock = FakeSocket(chunked_start + '0\r\n')
            resp = client.HTTPResponse(sock, method="GET")
            resp.begin()
            self.assertEqual(resp.read(n) + resp.read(n) + resp.read(), expected)
            resp.close()

448 449
        for x in ('', 'foo\r\n'):
            sock = FakeSocket(chunked_start + x)
450
            resp = client.HTTPResponse(sock, method="GET")
451 452 453
            resp.begin()
            try:
                resp.read()
454
            except client.IncompleteRead as i:
455 456 457 458
                self.assertEqual(i.partial, expected)
                expected_message = 'IncompleteRead(%d bytes read)' % len(expected)
                self.assertEqual(repr(i), expected_message)
                self.assertEqual(str(i), expected_message)
459 460 461 462 463
            else:
                self.fail('IncompleteRead expected')
            finally:
                resp.close()

464 465 466 467 468 469
    def test_readinto_chunked(self):
        chunked_start = (
            'HTTP/1.1 200 OK\r\n'
            'Transfer-Encoding: chunked\r\n\r\n'
            'a\r\n'
            'hello worl\r\n'
470 471 472 473 474 475
            '3\r\n'
            'd! \r\n'
            '8\r\n'
            'and now \r\n'
            '22\r\n'
            'for something completely different\r\n'
476
        )
477 478 479 480
        expected = b'hello world! and now for something completely different'
        nexpected = len(expected)
        b = bytearray(128)

481 482 483 484
        sock = FakeSocket(chunked_start + '0\r\n')
        resp = client.HTTPResponse(sock, method="GET")
        resp.begin()
        n = resp.readinto(b)
485 486
        self.assertEqual(b[:nexpected], expected)
        self.assertEqual(n, nexpected)
487 488
        resp.close()

489 490 491 492 493 494 495 496 497 498 499 500 501
        # Various read sizes
        for n in range(1, 12):
            sock = FakeSocket(chunked_start + '0\r\n')
            resp = client.HTTPResponse(sock, method="GET")
            resp.begin()
            m = memoryview(b)
            i = resp.readinto(m[0:n])
            i += resp.readinto(m[i:n + i])
            i += resp.readinto(m[i:])
            self.assertEqual(b[:nexpected], expected)
            self.assertEqual(i, nexpected)
            resp.close()

502 503 504 505 506 507 508
        for x in ('', 'foo\r\n'):
            sock = FakeSocket(chunked_start + x)
            resp = client.HTTPResponse(sock, method="GET")
            resp.begin()
            try:
                n = resp.readinto(b)
            except client.IncompleteRead as i:
509 510 511 512
                self.assertEqual(i.partial, expected)
                expected_message = 'IncompleteRead(%d bytes read)' % len(expected)
                self.assertEqual(repr(i), expected_message)
                self.assertEqual(str(i), expected_message)
513 514 515 516 517
            else:
                self.fail('IncompleteRead expected')
            finally:
                resp.close()

518 519 520 521 522 523 524 525 526 527 528 529
    def test_chunked_head(self):
        chunked_start = (
            'HTTP/1.1 200 OK\r\n'
            'Transfer-Encoding: chunked\r\n\r\n'
            'a\r\n'
            'hello world\r\n'
            '1\r\n'
            'd\r\n'
        )
        sock = FakeSocket(chunked_start + '0\r\n')
        resp = client.HTTPResponse(sock, method="HEAD")
        resp.begin()
530 531 532
        self.assertEqual(resp.read(), b'')
        self.assertEqual(resp.status, 200)
        self.assertEqual(resp.reason, 'OK')
533
        self.assertTrue(resp.isclosed())
534 535 536
        self.assertFalse(resp.closed)
        resp.close()
        self.assertTrue(resp.closed)
537

538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556
    def test_readinto_chunked_head(self):
        chunked_start = (
            'HTTP/1.1 200 OK\r\n'
            'Transfer-Encoding: chunked\r\n\r\n'
            'a\r\n'
            'hello world\r\n'
            '1\r\n'
            'd\r\n'
        )
        sock = FakeSocket(chunked_start + '0\r\n')
        resp = client.HTTPResponse(sock, method="HEAD")
        resp.begin()
        b = bytearray(5)
        n = resp.readinto(b)
        self.assertEqual(n, 0)
        self.assertEqual(bytes(b), b'\x00'*5)
        self.assertEqual(resp.status, 200)
        self.assertEqual(resp.reason, 'OK')
        self.assertTrue(resp.isclosed())
557 558 559
        self.assertFalse(resp.closed)
        resp.close()
        self.assertTrue(resp.closed)
560

561
    def test_negative_content_length(self):
Jeremy Hylton's avatar
Jeremy Hylton committed
562 563
        sock = FakeSocket(
            'HTTP/1.1 200 OK\r\nContent-Length: -1\r\n\r\nHello\r\n')
564
        resp = client.HTTPResponse(sock, method="GET")
565
        resp.begin()
566
        self.assertEqual(resp.read(), b'Hello\r\n')
567
        self.assertTrue(resp.isclosed())
568

569 570
    def test_incomplete_read(self):
        sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello\r\n')
571
        resp = client.HTTPResponse(sock, method="GET")
572 573 574
        resp.begin()
        try:
            resp.read()
575
        except client.IncompleteRead as i:
576
            self.assertEqual(i.partial, b'Hello\r\n')
577 578 579 580
            self.assertEqual(repr(i),
                             "IncompleteRead(7 bytes read, 3 more expected)")
            self.assertEqual(str(i),
                             "IncompleteRead(7 bytes read, 3 more expected)")
581
            self.assertTrue(resp.isclosed())
582 583 584
        else:
            self.fail('IncompleteRead expected')

585 586 587 588 589 590 591 592
    def test_epipe(self):
        sock = EPipeSocket(
            "HTTP/1.0 401 Authorization Required\r\n"
            "Content-type: text/html\r\n"
            "WWW-Authenticate: Basic realm=\"example\"\r\n",
            b"Content-Length")
        conn = client.HTTPConnection("example.com")
        conn.sock = sock
593
        self.assertRaises(OSError,
594 595 596 597 598
                          lambda: conn.request("PUT", "/url", "body"))
        resp = conn.getresponse()
        self.assertEqual(401, resp.status)
        self.assertEqual("Basic realm=\"example\"",
                         resp.getheader("www-authenticate"))
599

600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626
    # Test lines overflowing the max line size (_MAXLINE in http.client)

    def test_overflowing_status_line(self):
        body = "HTTP/1.1 200 Ok" + "k" * 65536 + "\r\n"
        resp = client.HTTPResponse(FakeSocket(body))
        self.assertRaises((client.LineTooLong, client.BadStatusLine), resp.begin)

    def test_overflowing_header_line(self):
        body = (
            'HTTP/1.1 200 OK\r\n'
            'X-Foo: bar' + 'r' * 65536 + '\r\n\r\n'
        )
        resp = client.HTTPResponse(FakeSocket(body))
        self.assertRaises(client.LineTooLong, resp.begin)

    def test_overflowing_chunked_line(self):
        body = (
            'HTTP/1.1 200 OK\r\n'
            'Transfer-Encoding: chunked\r\n\r\n'
            + '0' * 65536 + 'a\r\n'
            'hello world\r\n'
            '0\r\n'
        )
        resp = client.HTTPResponse(FakeSocket(body))
        resp.begin()
        self.assertRaises(client.LineTooLong, resp.read)

627 628 629 630 631 632 633 634
    def test_early_eof(self):
        # Test httpresponse with no \r\n termination,
        body = "HTTP/1.1 200 Ok"
        sock = FakeSocket(body)
        resp = client.HTTPResponse(sock)
        resp.begin()
        self.assertEqual(resp.read(), b'')
        self.assertTrue(resp.isclosed())
635 636 637
        self.assertFalse(resp.closed)
        resp.close()
        self.assertTrue(resp.closed)
638

639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660
    def test_delayed_ack_opt(self):
        # Test that Nagle/delayed_ack optimistaion works correctly.

        # For small payloads, it should coalesce the body with
        # headers, resulting in a single sendall() call
        conn = client.HTTPConnection('example.com')
        sock = FakeSocket(None)
        conn.sock = sock
        body = b'x' * (conn.mss - 1)
        conn.request('POST', '/', body)
        self.assertEqual(sock.sendall_calls, 1)

        # For large payloads, it should send the headers and
        # then the body, resulting in more than one sendall()
        # call
        conn = client.HTTPConnection('example.com')
        sock = FakeSocket(None)
        conn.sock = sock
        body = b'x' * conn.mss
        conn.request('POST', '/', body)
        self.assertGreater(sock.sendall_calls, 1)

661 662
class OfflineTest(TestCase):
    def test_responses(self):
663
        self.assertEqual(client.responses[client.NOT_FOUND], "Not Found")
664

665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696

class SourceAddressTest(TestCase):
    def setUp(self):
        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.port = support.bind_port(self.serv)
        self.source_port = support.find_unused_port()
        self.serv.listen(5)
        self.conn = None

    def tearDown(self):
        if self.conn:
            self.conn.close()
            self.conn = None
        self.serv.close()
        self.serv = None

    def testHTTPConnectionSourceAddress(self):
        self.conn = client.HTTPConnection(HOST, self.port,
                source_address=('', self.source_port))
        self.conn.connect()
        self.assertEqual(self.conn.sock.getsockname()[1], self.source_port)

    @unittest.skipIf(not hasattr(client, 'HTTPSConnection'),
                     'http.client.HTTPSConnection not defined')
    def testHTTPSConnectionSourceAddress(self):
        self.conn = client.HTTPSConnection(HOST, self.port,
                source_address=('', self.source_port))
        # We don't test anything here other the constructor not barfing as
        # this code doesn't deal with setting up an active running SSL server
        # for an ssl_wrapped connect() to actually return from.


697
class TimeoutTest(TestCase):
Christian Heimes's avatar
Christian Heimes committed
698
    PORT = None
699 700 701

    def setUp(self):
        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
702
        TimeoutTest.PORT = support.bind_port(self.serv)
703 704 705 706 707 708 709
        self.serv.listen(5)

    def tearDown(self):
        self.serv.close()
        self.serv = None

    def testTimeoutAttribute(self):
710 711 712
        # This will prove that the timeout gets through HTTPConnection
        # and into the socket.

Georg Brandl's avatar
Georg Brandl committed
713
        # default -- use global socket timeout
714
        self.assertTrue(socket.getdefaulttimeout() is None)
Georg Brandl's avatar
Georg Brandl committed
715 716
        socket.setdefaulttimeout(30)
        try:
717
            httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT)
Georg Brandl's avatar
Georg Brandl committed
718 719 720
            httpConn.connect()
        finally:
            socket.setdefaulttimeout(None)
721 722 723
        self.assertEqual(httpConn.sock.gettimeout(), 30)
        httpConn.close()

Georg Brandl's avatar
Georg Brandl committed
724
        # no timeout -- do not use global socket default
725
        self.assertTrue(socket.getdefaulttimeout() is None)
726 727
        socket.setdefaulttimeout(30)
        try:
728
            httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT,
Christian Heimes's avatar
Christian Heimes committed
729
                                              timeout=None)
730 731
            httpConn.connect()
        finally:
Georg Brandl's avatar
Georg Brandl committed
732 733 734 735 736
            socket.setdefaulttimeout(None)
        self.assertEqual(httpConn.sock.gettimeout(), None)
        httpConn.close()

        # a value
737
        httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30)
Georg Brandl's avatar
Georg Brandl committed
738
        httpConn.connect()
739 740 741
        self.assertEqual(httpConn.sock.gettimeout(), 30)
        httpConn.close()

742 743 744 745 746 747 748 749 750

class HTTPSTest(TestCase):

    def setUp(self):
        if not hasattr(client, 'HTTPSConnection'):
            self.skipTest('ssl support required')

    def make_server(self, certfile):
        from test.ssl_servers import make_https_server
751
        return make_https_server(self, certfile=certfile)
752 753

    def test_attributes(self):
754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799
        # simple test to check it's storing the timeout
        h = client.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30)
        self.assertEqual(h.timeout, 30)

    def _check_svn_python_org(self, resp):
        # Just a simple check that everything went fine
        server_string = resp.getheader('server')
        self.assertIn('Apache', server_string)

    def test_networked(self):
        # Default settings: no cert verification is done
        support.requires('network')
        with support.transient_internet('svn.python.org'):
            h = client.HTTPSConnection('svn.python.org', 443)
            h.request('GET', '/')
            resp = h.getresponse()
            self._check_svn_python_org(resp)

    def test_networked_good_cert(self):
        # We feed a CA cert that validates the server's cert
        import ssl
        support.requires('network')
        with support.transient_internet('svn.python.org'):
            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
            context.verify_mode = ssl.CERT_REQUIRED
            context.load_verify_locations(CACERT_svn_python_org)
            h = client.HTTPSConnection('svn.python.org', 443, context=context)
            h.request('GET', '/')
            resp = h.getresponse()
            self._check_svn_python_org(resp)

    def test_networked_bad_cert(self):
        # We feed a "CA" cert that is unrelated to the server's cert
        import ssl
        support.requires('network')
        with support.transient_internet('svn.python.org'):
            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
            context.verify_mode = ssl.CERT_REQUIRED
            context.load_verify_locations(CERT_localhost)
            h = client.HTTPSConnection('svn.python.org', 443, context=context)
            with self.assertRaises(ssl.SSLError):
                h.request('GET', '/')

    def test_local_good_hostname(self):
        # The (valid) cert validates the HTTP hostname
        import ssl
800
        server = self.make_server(CERT_localhost)
801 802 803 804 805 806 807
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        context.verify_mode = ssl.CERT_REQUIRED
        context.load_verify_locations(CERT_localhost)
        h = client.HTTPSConnection('localhost', server.port, context=context)
        h.request('GET', '/nonexistent')
        resp = h.getresponse()
        self.assertEqual(resp.status, 404)
808
        del server
809 810 811 812

    def test_local_bad_hostname(self):
        # The (valid) cert doesn't validate the HTTP hostname
        import ssl
813
        server = self.make_server(CERT_fakehostname)
814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        context.verify_mode = ssl.CERT_REQUIRED
        context.load_verify_locations(CERT_fakehostname)
        h = client.HTTPSConnection('localhost', server.port, context=context)
        with self.assertRaises(ssl.CertificateError):
            h.request('GET', '/')
        # Same with explicit check_hostname=True
        h = client.HTTPSConnection('localhost', server.port, context=context,
                                   check_hostname=True)
        with self.assertRaises(ssl.CertificateError):
            h.request('GET', '/')
        # With check_hostname=False, the mismatching is ignored
        h = client.HTTPSConnection('localhost', server.port, context=context,
                                   check_hostname=False)
        h.request('GET', '/nonexistent')
        resp = h.getresponse()
        self.assertEqual(resp.status, 404)
831
        del server
832

833 834
    @unittest.skipIf(not hasattr(client, 'HTTPSConnection'),
                     'http.client.HTTPSConnection not available')
835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852
    def test_host_port(self):
        # Check invalid host_port

        for hp in ("www.python.org:abc", "user:password@www.python.org"):
            self.assertRaises(client.InvalidURL, client.HTTPSConnection, hp)

        for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
                          "fe80::207:e9ff:fe9b", 8000),
                         ("www.python.org:443", "www.python.org", 443),
                         ("www.python.org:", "www.python.org", 443),
                         ("www.python.org", "www.python.org", 443),
                         ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 443),
                         ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b",
                             443)):
            c = client.HTTPSConnection(hp)
            self.assertEqual(h, c.host)
            self.assertEqual(p, c.port)

853

854 855 856 857
class RequestBodyTest(TestCase):
    """Test cases where a request includes a message body."""

    def setUp(self):
858
        self.conn = client.HTTPConnection('example.com')
859
        self.conn.sock = self.sock = FakeSocket("")
860 861 862 863 864
        self.conn.sock = self.sock

    def get_headers_and_fp(self):
        f = io.BytesIO(self.sock.data)
        f.readline()  # read the request line
865
        message = client.parse_headers(f)
866 867 868 869 870 871 872 873 874 875 876 877 878 879 880
        return message, f

    def test_manual_content_length(self):
        # Set an incorrect content-length so that we can verify that
        # it will not be over-ridden by the library.
        self.conn.request("PUT", "/url", "body",
                          {"Content-Length": "42"})
        message, f = self.get_headers_and_fp()
        self.assertEqual("42", message.get("content-length"))
        self.assertEqual(4, len(f.read()))

    def test_ascii_body(self):
        self.conn.request("PUT", "/url", "body")
        message, f = self.get_headers_and_fp()
        self.assertEqual("text/plain", message.get_content_type())
881
        self.assertIsNone(message.get_charset())
882 883 884 885 886 887 888
        self.assertEqual("4", message.get("content-length"))
        self.assertEqual(b'body', f.read())

    def test_latin1_body(self):
        self.conn.request("PUT", "/url", "body\xc1")
        message, f = self.get_headers_and_fp()
        self.assertEqual("text/plain", message.get_content_type())
889
        self.assertIsNone(message.get_charset())
890 891 892 893 894 895 896
        self.assertEqual("5", message.get("content-length"))
        self.assertEqual(b'body\xc1', f.read())

    def test_bytes_body(self):
        self.conn.request("PUT", "/url", b"body\xc1")
        message, f = self.get_headers_and_fp()
        self.assertEqual("text/plain", message.get_content_type())
897
        self.assertIsNone(message.get_charset())
898 899 900 901
        self.assertEqual("5", message.get("content-length"))
        self.assertEqual(b'body\xc1', f.read())

    def test_file_body(self):
902
        self.addCleanup(support.unlink, support.TESTFN)
903 904 905 906 907 908
        with open(support.TESTFN, "w") as f:
            f.write("body")
        with open(support.TESTFN) as f:
            self.conn.request("PUT", "/url", f)
            message, f = self.get_headers_and_fp()
            self.assertEqual("text/plain", message.get_content_type())
909
            self.assertIsNone(message.get_charset())
910 911
            self.assertEqual("4", message.get("content-length"))
            self.assertEqual(b'body', f.read())
912 913

    def test_binary_file_body(self):
914
        self.addCleanup(support.unlink, support.TESTFN)
915 916 917 918 919 920
        with open(support.TESTFN, "wb") as f:
            f.write(b"body\xc1")
        with open(support.TESTFN, "rb") as f:
            self.conn.request("PUT", "/url", f)
            message, f = self.get_headers_and_fp()
            self.assertEqual("text/plain", message.get_content_type())
921
            self.assertIsNone(message.get_charset())
922 923
            self.assertEqual("5", message.get("content-length"))
            self.assertEqual(b'body\xc1', f.read())
924

925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960

class HTTPResponseTest(TestCase):

    def setUp(self):
        body = "HTTP/1.1 200 Ok\r\nMy-Header: first-value\r\nMy-Header: \
                second-value\r\n\r\nText"
        sock = FakeSocket(body)
        self.resp = client.HTTPResponse(sock)
        self.resp.begin()

    def test_getting_header(self):
        header = self.resp.getheader('My-Header')
        self.assertEqual(header, 'first-value, second-value')

        header = self.resp.getheader('My-Header', 'some default')
        self.assertEqual(header, 'first-value, second-value')

    def test_getting_nonexistent_header_with_string_default(self):
        header = self.resp.getheader('No-Such-Header', 'default-value')
        self.assertEqual(header, 'default-value')

    def test_getting_nonexistent_header_with_iterable_default(self):
        header = self.resp.getheader('No-Such-Header', ['default', 'values'])
        self.assertEqual(header, 'default, values')

        header = self.resp.getheader('No-Such-Header', ('default', 'values'))
        self.assertEqual(header, 'default, values')

    def test_getting_nonexistent_header_without_default(self):
        header = self.resp.getheader('No-Such-Header')
        self.assertEqual(header, None)

    def test_getting_header_defaultint(self):
        header = self.resp.getheader('No-Such-Header',default=42)
        self.assertEqual(header, 42)

961
def test_main(verbose=None):
962
    support.run_unittest(HeaderTests, OfflineTest, BasicTest, TimeoutTest,
963
                         HTTPSTest, RequestBodyTest, SourceAddressTest,
964
                         HTTPResponseTest)
965

966 967
if __name__ == '__main__':
    test_main()