test_httplib.py 28.2 KB
Newer Older
1
import errno
2
from http import client
3
import io
4
import os
5
import array
6
import socket
7

8 9
import unittest
TestCase = unittest.TestCase
10

11
from test import support
12

13 14 15 16 17 18 19 20
here = os.path.dirname(__file__)
# Self-signed cert file for 'localhost'
CERT_localhost = os.path.join(here, 'keycert.pem')
# Self-signed cert file for 'fakehostname'
CERT_fakehostname = os.path.join(here, 'keycert2.pem')
# Root cert file (CA) for svn.python.org's cert
CACERT_svn_python_org = os.path.join(here, 'https_svn_python_org_root.pem')

21
HOST = support.HOST
Christian Heimes's avatar
Christian Heimes committed
22

23
class FakeSocket:
24 25
    def __init__(self, text, fileclass=io.BytesIO):
        if isinstance(text, str):
26
            text = text.encode("ascii")
27
        self.text = text
28
        self.fileclass = fileclass
Martin v. Löwis's avatar
Martin v. Löwis committed
29
        self.data = b''
30

31
    def sendall(self, data):
32
        self.data += data
33

34 35
    def makefile(self, mode, bufsize=None):
        if mode != 'r' and mode != 'rb':
36
            raise client.UnimplementedFileMode()
37 38
        return self.fileclass(self.text)

39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
class EPipeSocket(FakeSocket):

    def __init__(self, text, pipe_trigger):
        # When sendall() is called with pipe_trigger, raise EPIPE.
        FakeSocket.__init__(self, text)
        self.pipe_trigger = pipe_trigger

    def sendall(self, data):
        if self.pipe_trigger in data:
            raise socket.error(errno.EPIPE, "gotcha")
        self.data += data

    def close(self):
        pass

54
class NoEOFStringIO(io.BytesIO):
55 56
    """Like StringIO, but raises AssertionError on EOF.

57
    This is used below to test that http.client doesn't try to read
58 59 60
    more from the underlying file than it should.
    """
    def read(self, n=-1):
61
        data = io.BytesIO.read(self, n)
62
        if data == b'':
63 64 65 66
            raise AssertionError('caller tried to read past EOF')
        return data

    def readline(self, length=None):
67
        data = io.BytesIO.readline(self, length)
68
        if data == b'':
69 70
            raise AssertionError('caller tried to read past EOF')
        return data
71

72 73 74 75 76 77 78 79 80
class HeaderTests(TestCase):
    def test_auto_headers(self):
        # Some headers are added automatically, but should not be added by
        # .request() if they are explicitly set.

        class HeaderCountingBuffer(list):
            def __init__(self):
                self.count = {}
            def append(self, item):
81
                kv = item.split(b':')
82 83
                if len(kv) > 1:
                    # item is a 'Key: Value' header string
Martin v. Löwis's avatar
Martin v. Löwis committed
84
                    lcKey = kv[0].decode('ascii').lower()
85 86 87 88 89 90
                    self.count.setdefault(lcKey, 0)
                    self.count[lcKey] += 1
                list.append(self, item)

        for explicit_header in True, False:
            for header in 'Content-length', 'Host', 'Accept-encoding':
91
                conn = client.HTTPConnection('example.com')
92 93 94 95 96 97 98 99 100 101
                conn.sock = FakeSocket('blahblahblah')
                conn._buffer = HeaderCountingBuffer()

                body = 'spamspamspam'
                headers = {}
                if explicit_header:
                    headers[header] = str(len(body))
                conn.request('POST', '/', body, headers)
                self.assertEqual(conn._buffer.count[header.lower()], 1)

102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
    def test_content_length_0(self):

        class ContentLengthChecker(list):
            def __init__(self):
                list.__init__(self)
                self.content_length = None
            def append(self, item):
                kv = item.split(b':', 1)
                if len(kv) > 1 and kv[0].lower() == b'content-length':
                    self.content_length = kv[1].strip()
                list.append(self, item)

        # POST with empty body
        conn = client.HTTPConnection('example.com')
        conn.sock = FakeSocket(None)
        conn._buffer = ContentLengthChecker()
        conn.request('POST', '/', '')
        self.assertEqual(conn._buffer.content_length, b'0',
                        'Header Content-Length not set')

        # PUT request with empty body
        conn = client.HTTPConnection('example.com')
        conn.sock = FakeSocket(None)
        conn._buffer = ContentLengthChecker()
        conn.request('PUT', '/', '')
        self.assertEqual(conn._buffer.content_length, b'0',
                        'Header Content-Length not set')

130 131 132 133 134 135 136
    def test_putheader(self):
        conn = client.HTTPConnection('example.com')
        conn.sock = FakeSocket(None)
        conn.putrequest('GET','/')
        conn.putheader('Content-length', 42)
        self.assertTrue(b'Content-length: 42' in conn._buffer)

137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
    def test_ipv6host_header(self):
        # Default host header on IPv6 transaction should wrapped by [] if
        # its actual IPv6 address
        expected = b'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \
                   b'Accept-Encoding: identity\r\n\r\n'
        conn = client.HTTPConnection('[2001::]:81')
        sock = FakeSocket('')
        conn.sock = sock
        conn.request('GET', '/foo')
        self.assertTrue(sock.data.startswith(expected))

        expected = b'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \
                   b'Accept-Encoding: identity\r\n\r\n'
        conn = client.HTTPConnection('[2001:102A::]')
        sock = FakeSocket('')
        conn.sock = sock
        conn.request('GET', '/foo')
        self.assertTrue(sock.data.startswith(expected))

156

157 158 159 160 161 162
class BasicTest(TestCase):
    def test_status_lines(self):
        # Test HTTP status lines

        body = "HTTP/1.1 200 Ok\r\n\r\nText"
        sock = FakeSocket(body)
163
        resp = client.HTTPResponse(sock)
164
        resp.begin()
165
        self.assertEqual(resp.read(), b"Text")
166
        self.assertTrue(resp.isclosed())
167 168 169
        self.assertFalse(resp.closed)
        resp.close()
        self.assertTrue(resp.closed)
170 171 172

        body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText"
        sock = FakeSocket(body)
173 174
        resp = client.HTTPResponse(sock)
        self.assertRaises(client.BadStatusLine, resp.begin)
175

176 177
    def test_bad_status_repr(self):
        exc = client.BadStatusLine('')
178
        self.assertEqual(repr(exc), '''BadStatusLine("\'\'",)''')
179

180
    def test_partial_reads(self):
181
        # if we have a length, the system knows when to close itself
182 183 184
        # same behaviour than when we read the whole thing with read()
        body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
        sock = FakeSocket(body)
185
        resp = client.HTTPResponse(sock)
186 187 188 189 190
        resp.begin()
        self.assertEqual(resp.read(2), b'Te')
        self.assertFalse(resp.isclosed())
        self.assertEqual(resp.read(2), b'xt')
        self.assertTrue(resp.isclosed())
191 192 193
        self.assertFalse(resp.closed)
        resp.close()
        self.assertTrue(resp.closed)
194

195 196 197 198 199 200 201 202 203 204 205 206
    def test_partial_reads_no_content_length(self):
        # when no length is present, the socket should be gracefully closed when
        # all data was read
        body = "HTTP/1.1 200 Ok\r\n\r\nText"
        sock = FakeSocket(body)
        resp = client.HTTPResponse(sock)
        resp.begin()
        self.assertEqual(resp.read(2), b'Te')
        self.assertFalse(resp.isclosed())
        self.assertEqual(resp.read(2), b'xt')
        self.assertEqual(resp.read(1), b'')
        self.assertTrue(resp.isclosed())
207 208 209
        self.assertFalse(resp.closed)
        resp.close()
        self.assertTrue(resp.closed)
210

211 212 213 214 215 216 217 218 219 220 221 222
    def test_partial_reads_incomplete_body(self):
        # if the server shuts down the connection before the whole
        # content-length is delivered, the socket is gracefully closed
        body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText"
        sock = FakeSocket(body)
        resp = client.HTTPResponse(sock)
        resp.begin()
        self.assertEqual(resp.read(2), b'Te')
        self.assertFalse(resp.isclosed())
        self.assertEqual(resp.read(2), b'xt')
        self.assertEqual(resp.read(1), b'')
        self.assertTrue(resp.isclosed())
223 224 225
        self.assertFalse(resp.closed)
        resp.close()
        self.assertTrue(resp.closed)
226

227 228 229
    def test_host_port(self):
        # Check invalid host_port

230
        for hp in ("www.python.org:abc", "user:password@www.python.org"):
231
            self.assertRaises(client.InvalidURL, client.HTTPConnection, hp)
232

233 234
        for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
                          "fe80::207:e9ff:fe9b", 8000),
235
                         ("www.python.org:80", "www.python.org", 80),
236
                         ("www.python.org:", "www.python.org", 80),
237
                         ("www.python.org", "www.python.org", 80),
238 239
                         ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80),
                         ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b", 80)):
240
            c = client.HTTPConnection(hp)
241 242
            self.assertEqual(h, c.host)
            self.assertEqual(p, c.port)
243 244 245 246

    def test_response_headers(self):
        # test response with multiple message headers with the same field name.
        text = ('HTTP/1.1 200 OK\r\n'
247 248
                'Set-Cookie: Customer="WILE_E_COYOTE"; '
                'Version="1"; Path="/acme"\r\n'
249 250 251 252 253 254 255 256
                'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";'
                ' Path="/acme"\r\n'
                '\r\n'
                'No body\r\n')
        hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"'
               ', '
               'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"')
        s = FakeSocket(text)
257
        r = client.HTTPResponse(s)
258 259
        r.begin()
        cookies = r.getheader("Set-Cookie")
260
        self.assertEqual(cookies, hdr)
261 262 263 264 265 266 267 268 269

    def test_read_head(self):
        # Test that the library doesn't attempt to read any data
        # from a HEAD request.  (Tickles SF bug #622042.)
        sock = FakeSocket(
            'HTTP/1.1 200 OK\r\n'
            'Content-Length: 14432\r\n'
            '\r\n',
            NoEOFStringIO)
270
        resp = client.HTTPResponse(sock, method="HEAD")
271
        resp.begin()
272
        if resp.read():
273 274 275
            self.fail("Did not expect response from HEAD request")

    def test_send_file(self):
276 277
        expected = (b'GET /foo HTTP/1.1\r\nHost: example.com\r\n'
                    b'Accept-Encoding: identity\r\nContent-Length:')
278

279 280 281 282 283 284 285
        with open(__file__, 'rb') as body:
            conn = client.HTTPConnection('example.com')
            sock = FakeSocket(body)
            conn.sock = sock
            conn.request('GET', '/foo', body)
            self.assertTrue(sock.data.startswith(expected), '%r != %r' %
                    (sock.data[:len(expected)], expected))
286

287 288 289 290 291 292
    def test_send(self):
        expected = b'this is a test this is only a test'
        conn = client.HTTPConnection('example.com')
        sock = FakeSocket(None)
        conn.sock = sock
        conn.send(expected)
293
        self.assertEqual(expected, sock.data)
294 295
        sock.data = b''
        conn.send(array.array('b', expected))
296
        self.assertEqual(expected, sock.data)
297 298
        sock.data = b''
        conn.send(io.BytesIO(expected))
299
        self.assertEqual(expected, sock.data)
300

301 302 303 304 305 306 307 308 309 310 311 312 313 314
    def test_send_iter(self):
        expected = b'GET /foo HTTP/1.1\r\nHost: example.com\r\n' \
                   b'Accept-Encoding: identity\r\nContent-Length: 11\r\n' \
                   b'\r\nonetwothree'

        def body():
            yield b"one"
            yield b"two"
            yield b"three"

        conn = client.HTTPConnection('example.com')
        sock = FakeSocket("")
        conn.sock = sock
        conn.request('GET', '/foo', body(), {'Content-Length': '11'})
315
        self.assertEqual(sock.data, expected)
316

317 318 319 320 321 322 323
    def test_send_type_error(self):
        # See: Issue #12676
        conn = client.HTTPConnection('example.com')
        conn.sock = FakeSocket('')
        with self.assertRaises(TypeError):
            conn.request('POST', 'test', conn)

324 325 326 327 328 329 330 331 332 333
    def test_chunked(self):
        chunked_start = (
            'HTTP/1.1 200 OK\r\n'
            'Transfer-Encoding: chunked\r\n\r\n'
            'a\r\n'
            'hello worl\r\n'
            '1\r\n'
            'd\r\n'
        )
        sock = FakeSocket(chunked_start + '0\r\n')
334
        resp = client.HTTPResponse(sock, method="GET")
335
        resp.begin()
336
        self.assertEqual(resp.read(), b'hello world')
337 338 339 340
        resp.close()

        for x in ('', 'foo\r\n'):
            sock = FakeSocket(chunked_start + x)
341
            resp = client.HTTPResponse(sock, method="GET")
342 343 344
            resp.begin()
            try:
                resp.read()
345
            except client.IncompleteRead as i:
346
                self.assertEqual(i.partial, b'hello world')
347 348
                self.assertEqual(repr(i),'IncompleteRead(11 bytes read)')
                self.assertEqual(str(i),'IncompleteRead(11 bytes read)')
349 350 351 352 353
            else:
                self.fail('IncompleteRead expected')
            finally:
                resp.close()

354 355 356 357 358 359 360 361 362 363 364 365
    def test_chunked_head(self):
        chunked_start = (
            'HTTP/1.1 200 OK\r\n'
            'Transfer-Encoding: chunked\r\n\r\n'
            'a\r\n'
            'hello world\r\n'
            '1\r\n'
            'd\r\n'
        )
        sock = FakeSocket(chunked_start + '0\r\n')
        resp = client.HTTPResponse(sock, method="HEAD")
        resp.begin()
366 367 368
        self.assertEqual(resp.read(), b'')
        self.assertEqual(resp.status, 200)
        self.assertEqual(resp.reason, 'OK')
369
        self.assertTrue(resp.isclosed())
370 371 372
        self.assertFalse(resp.closed)
        resp.close()
        self.assertTrue(resp.closed)
373

374
    def test_negative_content_length(self):
Jeremy Hylton's avatar
Jeremy Hylton committed
375 376
        sock = FakeSocket(
            'HTTP/1.1 200 OK\r\nContent-Length: -1\r\n\r\nHello\r\n')
377
        resp = client.HTTPResponse(sock, method="GET")
378
        resp.begin()
379
        self.assertEqual(resp.read(), b'Hello\r\n')
380
        self.assertTrue(resp.isclosed())
381

382 383
    def test_incomplete_read(self):
        sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello\r\n')
384
        resp = client.HTTPResponse(sock, method="GET")
385 386 387
        resp.begin()
        try:
            resp.read()
388
        except client.IncompleteRead as i:
389
            self.assertEqual(i.partial, b'Hello\r\n')
390 391 392 393
            self.assertEqual(repr(i),
                             "IncompleteRead(7 bytes read, 3 more expected)")
            self.assertEqual(str(i),
                             "IncompleteRead(7 bytes read, 3 more expected)")
394
            self.assertTrue(resp.isclosed())
395 396 397
        else:
            self.fail('IncompleteRead expected')

398 399 400 401 402 403 404 405 406 407 408 409 410 411
    def test_epipe(self):
        sock = EPipeSocket(
            "HTTP/1.0 401 Authorization Required\r\n"
            "Content-type: text/html\r\n"
            "WWW-Authenticate: Basic realm=\"example\"\r\n",
            b"Content-Length")
        conn = client.HTTPConnection("example.com")
        conn.sock = sock
        self.assertRaises(socket.error,
                          lambda: conn.request("PUT", "/url", "body"))
        resp = conn.getresponse()
        self.assertEqual(401, resp.status)
        self.assertEqual("Basic realm=\"example\"",
                         resp.getheader("www-authenticate"))
412

413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439
    # Test lines overflowing the max line size (_MAXLINE in http.client)

    def test_overflowing_status_line(self):
        body = "HTTP/1.1 200 Ok" + "k" * 65536 + "\r\n"
        resp = client.HTTPResponse(FakeSocket(body))
        self.assertRaises((client.LineTooLong, client.BadStatusLine), resp.begin)

    def test_overflowing_header_line(self):
        body = (
            'HTTP/1.1 200 OK\r\n'
            'X-Foo: bar' + 'r' * 65536 + '\r\n\r\n'
        )
        resp = client.HTTPResponse(FakeSocket(body))
        self.assertRaises(client.LineTooLong, resp.begin)

    def test_overflowing_chunked_line(self):
        body = (
            'HTTP/1.1 200 OK\r\n'
            'Transfer-Encoding: chunked\r\n\r\n'
            + '0' * 65536 + 'a\r\n'
            'hello world\r\n'
            '0\r\n'
        )
        resp = client.HTTPResponse(FakeSocket(body))
        resp.begin()
        self.assertRaises(client.LineTooLong, resp.read)

440 441 442 443 444 445 446 447
    def test_early_eof(self):
        # Test httpresponse with no \r\n termination,
        body = "HTTP/1.1 200 Ok"
        sock = FakeSocket(body)
        resp = client.HTTPResponse(sock)
        resp.begin()
        self.assertEqual(resp.read(), b'')
        self.assertTrue(resp.isclosed())
448 449 450
        self.assertFalse(resp.closed)
        resp.close()
        self.assertTrue(resp.closed)
451

452 453
class OfflineTest(TestCase):
    def test_responses(self):
454
        self.assertEqual(client.responses[client.NOT_FOUND], "Not Found")
455

456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487

class SourceAddressTest(TestCase):
    def setUp(self):
        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.port = support.bind_port(self.serv)
        self.source_port = support.find_unused_port()
        self.serv.listen(5)
        self.conn = None

    def tearDown(self):
        if self.conn:
            self.conn.close()
            self.conn = None
        self.serv.close()
        self.serv = None

    def testHTTPConnectionSourceAddress(self):
        self.conn = client.HTTPConnection(HOST, self.port,
                source_address=('', self.source_port))
        self.conn.connect()
        self.assertEqual(self.conn.sock.getsockname()[1], self.source_port)

    @unittest.skipIf(not hasattr(client, 'HTTPSConnection'),
                     'http.client.HTTPSConnection not defined')
    def testHTTPSConnectionSourceAddress(self):
        self.conn = client.HTTPSConnection(HOST, self.port,
                source_address=('', self.source_port))
        # We don't test anything here other the constructor not barfing as
        # this code doesn't deal with setting up an active running SSL server
        # for an ssl_wrapped connect() to actually return from.


488
class TimeoutTest(TestCase):
Christian Heimes's avatar
Christian Heimes committed
489
    PORT = None
490 491 492

    def setUp(self):
        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
493
        TimeoutTest.PORT = support.bind_port(self.serv)
494 495 496 497 498 499 500
        self.serv.listen(5)

    def tearDown(self):
        self.serv.close()
        self.serv = None

    def testTimeoutAttribute(self):
501 502 503
        # This will prove that the timeout gets through HTTPConnection
        # and into the socket.

Georg Brandl's avatar
Georg Brandl committed
504
        # default -- use global socket timeout
505
        self.assertTrue(socket.getdefaulttimeout() is None)
Georg Brandl's avatar
Georg Brandl committed
506 507
        socket.setdefaulttimeout(30)
        try:
508
            httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT)
Georg Brandl's avatar
Georg Brandl committed
509 510 511
            httpConn.connect()
        finally:
            socket.setdefaulttimeout(None)
512 513 514
        self.assertEqual(httpConn.sock.gettimeout(), 30)
        httpConn.close()

Georg Brandl's avatar
Georg Brandl committed
515
        # no timeout -- do not use global socket default
516
        self.assertTrue(socket.getdefaulttimeout() is None)
517 518
        socket.setdefaulttimeout(30)
        try:
519
            httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT,
Christian Heimes's avatar
Christian Heimes committed
520
                                              timeout=None)
521 522
            httpConn.connect()
        finally:
Georg Brandl's avatar
Georg Brandl committed
523 524 525 526 527
            socket.setdefaulttimeout(None)
        self.assertEqual(httpConn.sock.gettimeout(), None)
        httpConn.close()

        # a value
528
        httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30)
Georg Brandl's avatar
Georg Brandl committed
529
        httpConn.connect()
530 531 532
        self.assertEqual(httpConn.sock.gettimeout(), 30)
        httpConn.close()

533 534 535 536 537 538 539 540 541 542

class HTTPSTest(TestCase):

    def setUp(self):
        if not hasattr(client, 'HTTPSConnection'):
            self.skipTest('ssl support required')

    def make_server(self, certfile):
        from test.ssl_servers import make_https_server
        return make_https_server(self, certfile)
543 544

    def test_attributes(self):
545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623
        # simple test to check it's storing the timeout
        h = client.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30)
        self.assertEqual(h.timeout, 30)

    def _check_svn_python_org(self, resp):
        # Just a simple check that everything went fine
        server_string = resp.getheader('server')
        self.assertIn('Apache', server_string)

    def test_networked(self):
        # Default settings: no cert verification is done
        support.requires('network')
        with support.transient_internet('svn.python.org'):
            h = client.HTTPSConnection('svn.python.org', 443)
            h.request('GET', '/')
            resp = h.getresponse()
            self._check_svn_python_org(resp)

    def test_networked_good_cert(self):
        # We feed a CA cert that validates the server's cert
        import ssl
        support.requires('network')
        with support.transient_internet('svn.python.org'):
            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
            context.verify_mode = ssl.CERT_REQUIRED
            context.load_verify_locations(CACERT_svn_python_org)
            h = client.HTTPSConnection('svn.python.org', 443, context=context)
            h.request('GET', '/')
            resp = h.getresponse()
            self._check_svn_python_org(resp)

    def test_networked_bad_cert(self):
        # We feed a "CA" cert that is unrelated to the server's cert
        import ssl
        support.requires('network')
        with support.transient_internet('svn.python.org'):
            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
            context.verify_mode = ssl.CERT_REQUIRED
            context.load_verify_locations(CERT_localhost)
            h = client.HTTPSConnection('svn.python.org', 443, context=context)
            with self.assertRaises(ssl.SSLError):
                h.request('GET', '/')

    def test_local_good_hostname(self):
        # The (valid) cert validates the HTTP hostname
        import ssl
        from test.ssl_servers import make_https_server
        server = make_https_server(self, CERT_localhost)
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        context.verify_mode = ssl.CERT_REQUIRED
        context.load_verify_locations(CERT_localhost)
        h = client.HTTPSConnection('localhost', server.port, context=context)
        h.request('GET', '/nonexistent')
        resp = h.getresponse()
        self.assertEqual(resp.status, 404)

    def test_local_bad_hostname(self):
        # The (valid) cert doesn't validate the HTTP hostname
        import ssl
        from test.ssl_servers import make_https_server
        server = make_https_server(self, CERT_fakehostname)
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        context.verify_mode = ssl.CERT_REQUIRED
        context.load_verify_locations(CERT_fakehostname)
        h = client.HTTPSConnection('localhost', server.port, context=context)
        with self.assertRaises(ssl.CertificateError):
            h.request('GET', '/')
        # Same with explicit check_hostname=True
        h = client.HTTPSConnection('localhost', server.port, context=context,
                                   check_hostname=True)
        with self.assertRaises(ssl.CertificateError):
            h.request('GET', '/')
        # With check_hostname=False, the mismatching is ignored
        h = client.HTTPSConnection('localhost', server.port, context=context,
                                   check_hostname=False)
        h.request('GET', '/nonexistent')
        resp = h.getresponse()
        self.assertEqual(resp.status, 404)

624 625
    @unittest.skipIf(not hasattr(client, 'HTTPSConnection'),
                     'http.client.HTTPSConnection not available')
626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643
    def test_host_port(self):
        # Check invalid host_port

        for hp in ("www.python.org:abc", "user:password@www.python.org"):
            self.assertRaises(client.InvalidURL, client.HTTPSConnection, hp)

        for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
                          "fe80::207:e9ff:fe9b", 8000),
                         ("www.python.org:443", "www.python.org", 443),
                         ("www.python.org:", "www.python.org", 443),
                         ("www.python.org", "www.python.org", 443),
                         ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 443),
                         ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b",
                             443)):
            c = client.HTTPSConnection(hp)
            self.assertEqual(h, c.host)
            self.assertEqual(p, c.port)

644

645 646 647 648
class RequestBodyTest(TestCase):
    """Test cases where a request includes a message body."""

    def setUp(self):
649
        self.conn = client.HTTPConnection('example.com')
650
        self.conn.sock = self.sock = FakeSocket("")
651 652 653 654 655
        self.conn.sock = self.sock

    def get_headers_and_fp(self):
        f = io.BytesIO(self.sock.data)
        f.readline()  # read the request line
656
        message = client.parse_headers(f)
657 658 659 660 661 662 663 664 665 666 667 668 669 670 671
        return message, f

    def test_manual_content_length(self):
        # Set an incorrect content-length so that we can verify that
        # it will not be over-ridden by the library.
        self.conn.request("PUT", "/url", "body",
                          {"Content-Length": "42"})
        message, f = self.get_headers_and_fp()
        self.assertEqual("42", message.get("content-length"))
        self.assertEqual(4, len(f.read()))

    def test_ascii_body(self):
        self.conn.request("PUT", "/url", "body")
        message, f = self.get_headers_and_fp()
        self.assertEqual("text/plain", message.get_content_type())
672
        self.assertIsNone(message.get_charset())
673 674 675 676 677 678 679
        self.assertEqual("4", message.get("content-length"))
        self.assertEqual(b'body', f.read())

    def test_latin1_body(self):
        self.conn.request("PUT", "/url", "body\xc1")
        message, f = self.get_headers_and_fp()
        self.assertEqual("text/plain", message.get_content_type())
680
        self.assertIsNone(message.get_charset())
681 682 683 684 685 686 687
        self.assertEqual("5", message.get("content-length"))
        self.assertEqual(b'body\xc1', f.read())

    def test_bytes_body(self):
        self.conn.request("PUT", "/url", b"body\xc1")
        message, f = self.get_headers_and_fp()
        self.assertEqual("text/plain", message.get_content_type())
688
        self.assertIsNone(message.get_charset())
689 690 691 692
        self.assertEqual("5", message.get("content-length"))
        self.assertEqual(b'body\xc1', f.read())

    def test_file_body(self):
693
        self.addCleanup(support.unlink, support.TESTFN)
694 695 696 697 698 699
        with open(support.TESTFN, "w") as f:
            f.write("body")
        with open(support.TESTFN) as f:
            self.conn.request("PUT", "/url", f)
            message, f = self.get_headers_and_fp()
            self.assertEqual("text/plain", message.get_content_type())
700
            self.assertIsNone(message.get_charset())
701 702
            self.assertEqual("4", message.get("content-length"))
            self.assertEqual(b'body', f.read())
703 704

    def test_binary_file_body(self):
705
        self.addCleanup(support.unlink, support.TESTFN)
706 707 708 709 710 711
        with open(support.TESTFN, "wb") as f:
            f.write(b"body\xc1")
        with open(support.TESTFN, "rb") as f:
            self.conn.request("PUT", "/url", f)
            message, f = self.get_headers_and_fp()
            self.assertEqual("text/plain", message.get_content_type())
712
            self.assertIsNone(message.get_charset())
713 714
            self.assertEqual("5", message.get("content-length"))
            self.assertEqual(b'body\xc1', f.read())
715

716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751

class HTTPResponseTest(TestCase):

    def setUp(self):
        body = "HTTP/1.1 200 Ok\r\nMy-Header: first-value\r\nMy-Header: \
                second-value\r\n\r\nText"
        sock = FakeSocket(body)
        self.resp = client.HTTPResponse(sock)
        self.resp.begin()

    def test_getting_header(self):
        header = self.resp.getheader('My-Header')
        self.assertEqual(header, 'first-value, second-value')

        header = self.resp.getheader('My-Header', 'some default')
        self.assertEqual(header, 'first-value, second-value')

    def test_getting_nonexistent_header_with_string_default(self):
        header = self.resp.getheader('No-Such-Header', 'default-value')
        self.assertEqual(header, 'default-value')

    def test_getting_nonexistent_header_with_iterable_default(self):
        header = self.resp.getheader('No-Such-Header', ['default', 'values'])
        self.assertEqual(header, 'default, values')

        header = self.resp.getheader('No-Such-Header', ('default', 'values'))
        self.assertEqual(header, 'default, values')

    def test_getting_nonexistent_header_without_default(self):
        header = self.resp.getheader('No-Such-Header')
        self.assertEqual(header, None)

    def test_getting_header_defaultint(self):
        header = self.resp.getheader('No-Such-Header',default=42)
        self.assertEqual(header, 42)

752
def test_main(verbose=None):
753
    support.run_unittest(HeaderTests, OfflineTest, BasicTest, TimeoutTest,
754
                         HTTPSTest, RequestBodyTest, SourceAddressTest,
755
                         HTTPResponseTest)
756

757 758
if __name__ == '__main__':
    test_main()