test_httplib.py 15.3 KB
Newer Older
1
import errno
2
from http import client
3
import io
4
import array
5
import socket
6

7 8
import unittest
TestCase = unittest.TestCase
9

10
from test import support
11

12
HOST = support.HOST
Christian Heimes's avatar
Christian Heimes committed
13

14
class FakeSocket:
15 16
    def __init__(self, text, fileclass=io.BytesIO):
        if isinstance(text, str):
17
            text = text.encode("ascii")
18
        self.text = text
19
        self.fileclass = fileclass
Martin v. Löwis's avatar
Martin v. Löwis committed
20
        self.data = b''
21

22
    def sendall(self, data):
23
        self.data += data
24

25 26
    def makefile(self, mode, bufsize=None):
        if mode != 'r' and mode != 'rb':
27
            raise client.UnimplementedFileMode()
28 29
        return self.fileclass(self.text)

30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
class EPipeSocket(FakeSocket):

    def __init__(self, text, pipe_trigger):
        # When sendall() is called with pipe_trigger, raise EPIPE.
        FakeSocket.__init__(self, text)
        self.pipe_trigger = pipe_trigger

    def sendall(self, data):
        if self.pipe_trigger in data:
            raise socket.error(errno.EPIPE, "gotcha")
        self.data += data

    def close(self):
        pass

45
class NoEOFStringIO(io.BytesIO):
46 47
    """Like StringIO, but raises AssertionError on EOF.

48
    This is used below to test that http.client doesn't try to read
49 50 51
    more from the underlying file than it should.
    """
    def read(self, n=-1):
52
        data = io.BytesIO.read(self, n)
53
        if data == b'':
54 55 56 57
            raise AssertionError('caller tried to read past EOF')
        return data

    def readline(self, length=None):
58
        data = io.BytesIO.readline(self, length)
59
        if data == b'':
60 61
            raise AssertionError('caller tried to read past EOF')
        return data
62

63 64 65 66 67 68 69 70 71
class HeaderTests(TestCase):
    def test_auto_headers(self):
        # Some headers are added automatically, but should not be added by
        # .request() if they are explicitly set.

        class HeaderCountingBuffer(list):
            def __init__(self):
                self.count = {}
            def append(self, item):
72
                kv = item.split(b':')
73 74
                if len(kv) > 1:
                    # item is a 'Key: Value' header string
Martin v. Löwis's avatar
Martin v. Löwis committed
75
                    lcKey = kv[0].decode('ascii').lower()
76 77 78 79 80 81
                    self.count.setdefault(lcKey, 0)
                    self.count[lcKey] += 1
                list.append(self, item)

        for explicit_header in True, False:
            for header in 'Content-length', 'Host', 'Accept-encoding':
82
                conn = client.HTTPConnection('example.com')
83 84 85 86 87 88 89 90 91 92
                conn.sock = FakeSocket('blahblahblah')
                conn._buffer = HeaderCountingBuffer()

                body = 'spamspamspam'
                headers = {}
                if explicit_header:
                    headers[header] = str(len(body))
                conn.request('POST', '/', body, headers)
                self.assertEqual(conn._buffer.count[header.lower()], 1)

93 94 95 96 97 98
class BasicTest(TestCase):
    def test_status_lines(self):
        # Test HTTP status lines

        body = "HTTP/1.1 200 Ok\r\n\r\nText"
        sock = FakeSocket(body)
99
        resp = client.HTTPResponse(sock)
100
        resp.begin()
101
        self.assertEqual(resp.read(), b"Text")
102
        self.assertTrue(resp.isclosed())
103 104 105

        body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText"
        sock = FakeSocket(body)
106 107
        resp = client.HTTPResponse(sock)
        self.assertRaises(client.BadStatusLine, resp.begin)
108

109 110 111 112 113
    def test_partial_reads(self):
        # if we have a lenght, the system knows when to close itself
        # same behaviour than when we read the whole thing with read()
        body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
        sock = FakeSocket(body)
114
        resp = client.HTTPResponse(sock)
115 116 117 118 119 120
        resp.begin()
        self.assertEqual(resp.read(2), b'Te')
        self.assertFalse(resp.isclosed())
        self.assertEqual(resp.read(2), b'xt')
        self.assertTrue(resp.isclosed())

121 122 123 124
    def test_host_port(self):
        # Check invalid host_port

        for hp in ("www.python.org:abc", "www.python.org:"):
125
            self.assertRaises(client.InvalidURL, client.HTTPConnection, hp)
126

127 128
        for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
                          "fe80::207:e9ff:fe9b", 8000),
129 130 131
                         ("www.python.org:80", "www.python.org", 80),
                         ("www.python.org", "www.python.org", 80),
                         ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80)):
132
            c = client.HTTPConnection(hp)
133 134
            self.assertEqual(h, c.host)
            self.assertEqual(p, c.port)
135 136 137 138

    def test_response_headers(self):
        # test response with multiple message headers with the same field name.
        text = ('HTTP/1.1 200 OK\r\n'
139 140
                'Set-Cookie: Customer="WILE_E_COYOTE"; '
                'Version="1"; Path="/acme"\r\n'
141 142 143 144 145 146 147 148
                'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";'
                ' Path="/acme"\r\n'
                '\r\n'
                'No body\r\n')
        hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"'
               ', '
               'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"')
        s = FakeSocket(text)
149
        r = client.HTTPResponse(s)
150 151
        r.begin()
        cookies = r.getheader("Set-Cookie")
152
        self.assertEqual(cookies, hdr)
153 154 155 156 157 158 159 160 161

    def test_read_head(self):
        # Test that the library doesn't attempt to read any data
        # from a HEAD request.  (Tickles SF bug #622042.)
        sock = FakeSocket(
            'HTTP/1.1 200 OK\r\n'
            'Content-Length: 14432\r\n'
            '\r\n',
            NoEOFStringIO)
162
        resp = client.HTTPResponse(sock, method="HEAD")
163
        resp.begin()
164
        if resp.read():
165 166 167
            self.fail("Did not expect response from HEAD request")

    def test_send_file(self):
168 169
        expected = (b'GET /foo HTTP/1.1\r\nHost: example.com\r\n'
                    b'Accept-Encoding: identity\r\nContent-Length:')
170

171
        body = open(__file__, 'rb')
172
        conn = client.HTTPConnection('example.com')
173 174 175
        sock = FakeSocket(body)
        conn.sock = sock
        conn.request('GET', '/foo', body)
176 177
        self.assertTrue(sock.data.startswith(expected), '%r != %r' %
                (sock.data[:len(expected)], expected))
178

179 180 181 182 183 184 185 186 187 188 189 190 191 192
    def test_send(self):
        expected = b'this is a test this is only a test'
        conn = client.HTTPConnection('example.com')
        sock = FakeSocket(None)
        conn.sock = sock
        conn.send(expected)
        self.assertEquals(expected, sock.data)
        sock.data = b''
        conn.send(array.array('b', expected))
        self.assertEquals(expected, sock.data)
        sock.data = b''
        conn.send(io.BytesIO(expected))
        self.assertEquals(expected, sock.data)

193 194 195 196 197 198 199 200 201 202
    def test_chunked(self):
        chunked_start = (
            'HTTP/1.1 200 OK\r\n'
            'Transfer-Encoding: chunked\r\n\r\n'
            'a\r\n'
            'hello worl\r\n'
            '1\r\n'
            'd\r\n'
        )
        sock = FakeSocket(chunked_start + '0\r\n')
203
        resp = client.HTTPResponse(sock, method="GET")
204 205 206 207 208 209
        resp.begin()
        self.assertEquals(resp.read(), b'hello world')
        resp.close()

        for x in ('', 'foo\r\n'):
            sock = FakeSocket(chunked_start + x)
210
            resp = client.HTTPResponse(sock, method="GET")
211 212 213
            resp.begin()
            try:
                resp.read()
214
            except client.IncompleteRead as i:
215
                self.assertEquals(i.partial, b'hello world')
216 217
                self.assertEqual(repr(i),'IncompleteRead(11 bytes read)')
                self.assertEqual(str(i),'IncompleteRead(11 bytes read)')
218 219 220 221 222 223
            else:
                self.fail('IncompleteRead expected')
            finally:
                resp.close()

    def test_negative_content_length(self):
Jeremy Hylton's avatar
Jeremy Hylton committed
224 225
        sock = FakeSocket(
            'HTTP/1.1 200 OK\r\nContent-Length: -1\r\n\r\nHello\r\n')
226
        resp = client.HTTPResponse(sock, method="GET")
227 228 229 230
        resp.begin()
        self.assertEquals(resp.read(), b'Hello\r\n')
        resp.close()

231 232
    def test_incomplete_read(self):
        sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello\r\n')
233
        resp = client.HTTPResponse(sock, method="GET")
234 235 236
        resp.begin()
        try:
            resp.read()
237
        except client.IncompleteRead as i:
238 239 240 241 242 243 244 245 246 247
            self.assertEquals(i.partial, b'Hello\r\n')
            self.assertEqual(repr(i),
                             "IncompleteRead(7 bytes read, 3 more expected)")
            self.assertEqual(str(i),
                             "IncompleteRead(7 bytes read, 3 more expected)")
        else:
            self.fail('IncompleteRead expected')
        finally:
            resp.close()

248 249 250 251 252 253 254 255 256 257 258 259 260 261
    def test_epipe(self):
        sock = EPipeSocket(
            "HTTP/1.0 401 Authorization Required\r\n"
            "Content-type: text/html\r\n"
            "WWW-Authenticate: Basic realm=\"example\"\r\n",
            b"Content-Length")
        conn = client.HTTPConnection("example.com")
        conn.sock = sock
        self.assertRaises(socket.error,
                          lambda: conn.request("PUT", "/url", "body"))
        resp = conn.getresponse()
        self.assertEqual(401, resp.status)
        self.assertEqual("Basic realm=\"example\"",
                         resp.getheader("www-authenticate"))
262

263 264
class OfflineTest(TestCase):
    def test_responses(self):
265
        self.assertEquals(client.responses[client.NOT_FOUND], "Not Found")
266

267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298

class SourceAddressTest(TestCase):
    def setUp(self):
        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.port = support.bind_port(self.serv)
        self.source_port = support.find_unused_port()
        self.serv.listen(5)
        self.conn = None

    def tearDown(self):
        if self.conn:
            self.conn.close()
            self.conn = None
        self.serv.close()
        self.serv = None

    def testHTTPConnectionSourceAddress(self):
        self.conn = client.HTTPConnection(HOST, self.port,
                source_address=('', self.source_port))
        self.conn.connect()
        self.assertEqual(self.conn.sock.getsockname()[1], self.source_port)

    @unittest.skipIf(not hasattr(client, 'HTTPSConnection'),
                     'http.client.HTTPSConnection not defined')
    def testHTTPSConnectionSourceAddress(self):
        self.conn = client.HTTPSConnection(HOST, self.port,
                source_address=('', self.source_port))
        # We don't test anything here other the constructor not barfing as
        # this code doesn't deal with setting up an active running SSL server
        # for an ssl_wrapped connect() to actually return from.


299
class TimeoutTest(TestCase):
Christian Heimes's avatar
Christian Heimes committed
300
    PORT = None
301 302 303

    def setUp(self):
        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
304
        TimeoutTest.PORT = support.bind_port(self.serv)
305 306 307 308 309 310 311
        self.serv.listen(5)

    def tearDown(self):
        self.serv.close()
        self.serv = None

    def testTimeoutAttribute(self):
312 313 314
        # This will prove that the timeout gets through HTTPConnection
        # and into the socket.

Georg Brandl's avatar
Georg Brandl committed
315
        # default -- use global socket timeout
316
        self.assertTrue(socket.getdefaulttimeout() is None)
Georg Brandl's avatar
Georg Brandl committed
317 318
        socket.setdefaulttimeout(30)
        try:
319
            httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT)
Georg Brandl's avatar
Georg Brandl committed
320 321 322
            httpConn.connect()
        finally:
            socket.setdefaulttimeout(None)
323 324 325
        self.assertEqual(httpConn.sock.gettimeout(), 30)
        httpConn.close()

Georg Brandl's avatar
Georg Brandl committed
326
        # no timeout -- do not use global socket default
327
        self.assertTrue(socket.getdefaulttimeout() is None)
328 329
        socket.setdefaulttimeout(30)
        try:
330
            httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT,
Christian Heimes's avatar
Christian Heimes committed
331
                                              timeout=None)
332 333
            httpConn.connect()
        finally:
Georg Brandl's avatar
Georg Brandl committed
334 335 336 337 338
            socket.setdefaulttimeout(None)
        self.assertEqual(httpConn.sock.gettimeout(), None)
        httpConn.close()

        # a value
339
        httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30)
Georg Brandl's avatar
Georg Brandl committed
340
        httpConn.connect()
341 342 343
        self.assertEqual(httpConn.sock.gettimeout(), 30)
        httpConn.close()

344 345 346 347 348
class HTTPSTimeoutTest(TestCase):
# XXX Here should be tests for HTTPS, there isn't any right now!

    def test_attributes(self):
        # simple test to check it's storing it
349 350
        if hasattr(client, 'HTTPSConnection'):
            h = client.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30)
351
            self.assertEqual(h.timeout, 30)
352

353 354 355 356
class RequestBodyTest(TestCase):
    """Test cases where a request includes a message body."""

    def setUp(self):
357
        self.conn = client.HTTPConnection('example.com')
358
        self.conn.sock = self.sock = FakeSocket("")
359 360 361 362 363
        self.conn.sock = self.sock

    def get_headers_and_fp(self):
        f = io.BytesIO(self.sock.data)
        f.readline()  # read the request line
364
        message = client.parse_headers(f)
365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423
        return message, f

    def test_manual_content_length(self):
        # Set an incorrect content-length so that we can verify that
        # it will not be over-ridden by the library.
        self.conn.request("PUT", "/url", "body",
                          {"Content-Length": "42"})
        message, f = self.get_headers_and_fp()
        self.assertEqual("42", message.get("content-length"))
        self.assertEqual(4, len(f.read()))

    def test_ascii_body(self):
        self.conn.request("PUT", "/url", "body")
        message, f = self.get_headers_and_fp()
        self.assertEqual("text/plain", message.get_content_type())
        self.assertEqual(None, message.get_charset())
        self.assertEqual("4", message.get("content-length"))
        self.assertEqual(b'body', f.read())

    def test_latin1_body(self):
        self.conn.request("PUT", "/url", "body\xc1")
        message, f = self.get_headers_and_fp()
        self.assertEqual("text/plain", message.get_content_type())
        self.assertEqual(None, message.get_charset())
        self.assertEqual("5", message.get("content-length"))
        self.assertEqual(b'body\xc1', f.read())

    def test_bytes_body(self):
        self.conn.request("PUT", "/url", b"body\xc1")
        message, f = self.get_headers_and_fp()
        self.assertEqual("text/plain", message.get_content_type())
        self.assertEqual(None, message.get_charset())
        self.assertEqual("5", message.get("content-length"))
        self.assertEqual(b'body\xc1', f.read())

    def test_file_body(self):
        f = open(support.TESTFN, "w")
        f.write("body")
        f.close()
        f = open(support.TESTFN)
        self.conn.request("PUT", "/url", f)
        message, f = self.get_headers_and_fp()
        self.assertEqual("text/plain", message.get_content_type())
        self.assertEqual(None, message.get_charset())
        self.assertEqual("4", message.get("content-length"))
        self.assertEqual(b'body', f.read())

    def test_binary_file_body(self):
        f = open(support.TESTFN, "wb")
        f.write(b"body\xc1")
        f.close()
        f = open(support.TESTFN, "rb")
        self.conn.request("PUT", "/url", f)
        message, f = self.get_headers_and_fp()
        self.assertEqual("text/plain", message.get_content_type())
        self.assertEqual(None, message.get_charset())
        self.assertEqual("5", message.get("content-length"))
        self.assertEqual(b'body\xc1', f.read())

424
def test_main(verbose=None):
425
    support.run_unittest(HeaderTests, OfflineTest, BasicTest, TimeoutTest,
426
                         HTTPSTimeoutTest, RequestBodyTest, SourceAddressTest)
427

428 429
if __name__ == '__main__':
    test_main()