test_urllib2net.py 12.9 KB
Newer Older
1
#!/usr/bin/env python3
2 3

import unittest
4
from test import support
5
from test.test_urllib2 import sanepathname2url
6

7
import os
8
import socket
9 10
import urllib.error
import urllib.request
11
import sys
12 13 14 15
try:
    import ssl
except ImportError:
    ssl = None
16 17

TIMEOUT = 60  # seconds
18

19

Georg Brandl's avatar
Georg Brandl committed
20
def _retry_thrice(func, exc, *args, **kwargs):
21 22
    for i in range(3):
        try:
Georg Brandl's avatar
Georg Brandl committed
23 24
            return func(*args, **kwargs)
        except exc as e:
Neal Norwitz's avatar
Neal Norwitz committed
25
            last_exc = e
26 27 28 29 30
            continue
        except:
            raise
    raise last_exc

Georg Brandl's avatar
Georg Brandl committed
31 32 33 34 35 36 37
def _wrap_with_retry_thrice(func, exc):
    def wrapped(*args, **kwargs):
        return _retry_thrice(func, exc, *args, **kwargs)
    return wrapped

# Connecting to remote hosts is flaky.  Make it more robust by retrying
# the connection several times.
38 39
_urlopen_with_retry = _wrap_with_retry_thrice(urllib.request.urlopen,
                                              urllib.error.URLError)
40

41 42 43 44 45 46 47 48

class AuthTests(unittest.TestCase):
    """Tests urllib2 authentication features."""

## Disabled at the moment since there is no page under python.org which
## could be used to HTTP authentication.
#
#    def test_basic_auth(self):
49
#        import http.client
50 51 52 53 54 55 56 57 58
#
#        test_url = "http://www.python.org/test/test_urllib2/basic_auth"
#        test_hostport = "www.python.org"
#        test_realm = 'Test Realm'
#        test_user = 'test.test_urllib2net'
#        test_password = 'blah'
#
#        # failure
#        try:
59
#            _urlopen_with_retry(test_url)
60 61 62 63 64 65 66 67 68 69 70
#        except urllib2.HTTPError, exc:
#            self.assertEqual(exc.code, 401)
#        else:
#            self.fail("urlopen() should have failed with 401")
#
#        # success
#        auth_handler = urllib2.HTTPBasicAuthHandler()
#        auth_handler.add_password(test_realm, test_hostport,
#                                  test_user, test_password)
#        opener = urllib2.build_opener(auth_handler)
#        f = opener.open('http://localhost/')
71
#        response = _urlopen_with_retry("http://www.python.org/")
72 73 74 75 76
#
#        # The 'userinfo' URL component is deprecated by RFC 3986 for security
#        # reasons, let's not implement it!  (it's already implemented for proxy
#        # specification strings (that is, URLs or authorities specifying a
#        # proxy), so we must keep that)
77
#        self.assertRaises(http.client.InvalidURL,
78 79 80
#                          urllib2.urlopen, "http://evil:thing@example.com")


81 82 83 84 85 86
class CloseSocketTest(unittest.TestCase):

    def test_close(self):
        # calling .close() on urllib2's response objects should close the
        # underlying socket

87
        response = _urlopen_with_retry("http://www.python.org/")
88
        sock = response.fp
89
        self.assertTrue(not sock.closed)
90
        response.close()
91
        self.assertTrue(sock.closed)
92

93 94 95 96 97 98 99 100 101 102 103 104
class OtherNetworkTests(unittest.TestCase):
    def setUp(self):
        if 0:  # for debugging
            import logging
            logger = logging.getLogger("test_urllib2net")
            logger.addHandler(logging.StreamHandler())

    # XXX The rest of these tests aren't very good -- they don't check much.
    # They do sometimes catch some major disasters, though.

    def test_ftp(self):
        urls = [
105
            'ftp://ftp.kernel.org/pub/linux/kernel/README',
106
            'ftp://ftp.kernel.org/pub/linux/kernel/non-existent-file',
107
            #'ftp://ftp.kernel.org/pub/leenox/kernel/test',
108 109 110 111 112 113
            'ftp://gatekeeper.research.compaq.com/pub/DEC/SRC'
                '/research-reports/00README-Legal-Rules-Regs',
            ]
        self._test_urls(urls, self._extra_handlers())

    def test_file(self):
114
        TESTFN = support.TESTFN
115 116 117 118 119
        f = open(TESTFN, 'w')
        try:
            f.write('hi there\n')
            f.close()
            urls = [
120 121 122
                'file:' + sanepathname2url(os.path.abspath(TESTFN)),
                ('file:///nonsensename/etc/passwd', None,
                 urllib.error.URLError),
123
                ]
Georg Brandl's avatar
Georg Brandl committed
124
            self._test_urls(urls, self._extra_handlers(), retry=True)
125 126 127
        finally:
            os.remove(TESTFN)

128 129
        self.assertRaises(ValueError, urllib.request.urlopen,'./relative_path/to/file')

130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
    # XXX Following test depends on machine configurations that are internal
    # to CNRI.  Need to set up a public server with the right authentication
    # configuration for test purposes.

##     def test_cnri(self):
##         if socket.gethostname() == 'bitdiddle':
##             localhost = 'bitdiddle.cnri.reston.va.us'
##         elif socket.gethostname() == 'bitdiddle.concentric.net':
##             localhost = 'localhost'
##         else:
##             localhost = None
##         if localhost is not None:
##             urls = [
##                 'file://%s/etc/passwd' % localhost,
##                 'http://%s/simple/' % localhost,
##                 'http://%s/digest/' % localhost,
##                 'http://%s/not/found.h' % localhost,
##                 ]

##             bauth = HTTPBasicAuthHandler()
##             bauth.add_password('basic_test_realm', localhost, 'jhylton',
##                                'password')
##             dauth = HTTPDigestAuthHandler()
##             dauth.add_password('digest_test_realm', localhost, 'jhylton',
##                                'password')

##             self._test_urls(urls, self._extra_handlers()+[bauth, dauth])

158 159
    def test_urlwithfrag(self):
        urlwith_frag = "http://docs.python.org/glossary.html#glossary"
160 161 162 163
        with support.transient_internet(urlwith_frag):
            req = urllib.request.Request(urlwith_frag)
            res = urllib.request.urlopen(req)
            self.assertEqual(res.geturl(),
164
                    "http://docs.python.org/glossary.html#glossary")
165

166 167
    def test_custom_headers(self):
        url = "http://www.example.com"
168 169 170 171 172 173 174 175 176 177
        with support.transient_internet(url):
            opener = urllib.request.build_opener()
            request = urllib.request.Request(url)
            self.assertFalse(request.header_items())
            opener.open(request)
            self.assertTrue(request.header_items())
            self.assertTrue(request.has_header('User-agent'))
            request.add_header('User-Agent','Test-Agent')
            opener.open(request)
            self.assertEqual(request.get_header('User-agent'),'Test-Agent')
178

179 180 181 182
    def test_sites_no_connection_close(self):
        # Some sites do not send Connection: close header.
        # Verify that those work properly. (#issue12576)

183 184 185 186 187 188 189 190 191 192 193 194 195 196
        URL = 'http://www.imdb.com' # mangles Connection:close

        with support.transient_internet(URL):
            try:
                with urllib.request.urlopen(URL) as res:
                    pass
            except ValueError as e:
                self.fail("urlopen failed for site not sending \
                           Connection:close")
            else:
                self.assertTrue(res)

            req = urllib.request.urlopen(URL)
            res = req.read()
197 198
            self.assertTrue(res)

Georg Brandl's avatar
Georg Brandl committed
199
    def _test_urls(self, urls, handlers, retry=True):
200 201 202 203
        import time
        import logging
        debug = logging.getLogger("test_urllib2").debug

204
        urlopen = urllib.request.build_opener(*handlers).open
Georg Brandl's avatar
Georg Brandl committed
205
        if retry:
206
            urlopen = _wrap_with_retry_thrice(urlopen, urllib.error.URLError)
207 208 209 210 211 212

        for url in urls:
            if isinstance(url, tuple):
                url, req, expected_err = url
            else:
                req = expected_err = None
213 214 215

            with support.transient_internet(url):
                debug(url)
216
                try:
217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239
                    f = urlopen(url, req, TIMEOUT)
                except EnvironmentError as err:
                    debug(err)
                    if expected_err:
                        msg = ("Didn't get expected error(s) %s for %s %s, got %s: %s" %
                               (expected_err, url, req, type(err), err))
                        self.assertIsInstance(err, expected_err, msg)
                except urllib.error.URLError as err:
                    if isinstance(err[0], socket.timeout):
                        print("<timeout: %s>" % url, file=sys.stderr)
                        continue
                    else:
                        raise
                else:
                    try:
                        with support.time_out, \
                             support.socket_peer_reset, \
                             support.ioerror_peer_reset:
                            buf = f.read()
                            debug("read %d bytes" % len(buf))
                    except socket.timeout:
                        print("<timeout: %s>" % url, file=sys.stderr)
                    f.close()
240 241 242 243 244 245
            debug("******** next url coming up...")
            time.sleep(0.1)

    def _extra_handlers(self):
        handlers = []

246
        cfh = urllib.request.CacheFTPHandler()
247
        self.addCleanup(cfh.clear_cache)
248 249 250 251 252
        cfh.setTimeout(1)
        handlers.append(cfh)

        return handlers

253

254 255
class TimeoutTest(unittest.TestCase):
    def test_http_basic(self):
Georg Brandl's avatar
Georg Brandl committed
256
        self.assertTrue(socket.getdefaulttimeout() is None)
257 258 259
        url = "http://www.python.org"
        with support.transient_internet(url, timeout=None):
            u = _urlopen_with_retry(url)
260
            self.addCleanup(u.close)
261
            self.assertTrue(u.fp.raw._sock.gettimeout() is None)
262

Georg Brandl's avatar
Georg Brandl committed
263 264
    def test_http_default_timeout(self):
        self.assertTrue(socket.getdefaulttimeout() is None)
265 266 267 268 269
        url = "http://www.python.org"
        with support.transient_internet(url):
            socket.setdefaulttimeout(60)
            try:
                u = _urlopen_with_retry(url)
270
                self.addCleanup(u.close)
271 272 273
            finally:
                socket.setdefaulttimeout(None)
            self.assertEqual(u.fp.raw._sock.gettimeout(), 60)
Georg Brandl's avatar
Georg Brandl committed
274 275 276

    def test_http_no_timeout(self):
        self.assertTrue(socket.getdefaulttimeout() is None)
277 278 279 280 281
        url = "http://www.python.org"
        with support.transient_internet(url):
            socket.setdefaulttimeout(60)
            try:
                u = _urlopen_with_retry(url, timeout=None)
282
                self.addCleanup(u.close)
283 284 285
            finally:
                socket.setdefaulttimeout(None)
            self.assertTrue(u.fp.raw._sock.gettimeout() is None)
286

Georg Brandl's avatar
Georg Brandl committed
287
    def test_http_timeout(self):
288 289 290
        url = "http://www.python.org"
        with support.transient_internet(url):
            u = _urlopen_with_retry(url, timeout=120)
291
            self.addCleanup(u.close)
292
            self.assertEqual(u.fp.raw._sock.gettimeout(), 120)
293

294
    FTP_HOST = "ftp://ftp.mirror.nl/pub/gnu/"
295

296
    def test_ftp_basic(self):
Georg Brandl's avatar
Georg Brandl committed
297
        self.assertTrue(socket.getdefaulttimeout() is None)
298 299
        with support.transient_internet(self.FTP_HOST, timeout=None):
            u = _urlopen_with_retry(self.FTP_HOST)
300
            self.addCleanup(u.close)
301
            self.assertTrue(u.fp.fp.raw._sock.gettimeout() is None)
302

Georg Brandl's avatar
Georg Brandl committed
303 304
    def test_ftp_default_timeout(self):
        self.assertTrue(socket.getdefaulttimeout() is None)
305 306 307 308
        with support.transient_internet(self.FTP_HOST):
            socket.setdefaulttimeout(60)
            try:
                u = _urlopen_with_retry(self.FTP_HOST)
309
                self.addCleanup(u.close)
310 311 312
            finally:
                socket.setdefaulttimeout(None)
            self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60)
313

Georg Brandl's avatar
Georg Brandl committed
314 315
    def test_ftp_no_timeout(self):
        self.assertTrue(socket.getdefaulttimeout() is None)
316 317 318 319
        with support.transient_internet(self.FTP_HOST):
            socket.setdefaulttimeout(60)
            try:
                u = _urlopen_with_retry(self.FTP_HOST, timeout=None)
320
                self.addCleanup(u.close)
321 322 323
            finally:
                socket.setdefaulttimeout(None)
            self.assertTrue(u.fp.fp.raw._sock.gettimeout() is None)
324

Georg Brandl's avatar
Georg Brandl committed
325
    def test_ftp_timeout(self):
326 327
        with support.transient_internet(self.FTP_HOST):
            u = _urlopen_with_retry(self.FTP_HOST, timeout=60)
328
            self.addCleanup(u.close)
329
            self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60)
330

331

332 333 334 335
@unittest.skipUnless(ssl, "requires SSL support")
class HTTPSTests(unittest.TestCase):

    def test_sni(self):
336
        self.skipTest("test disabled - test server needed")
337 338 339 340 341
        # Checks that Server Name Indication works, if supported by the
        # OpenSSL linked to.
        # The ssl module itself doesn't have server-side support for SNI,
        # so we rely on a third-party test site.
        expect_sni = ssl.HAS_SNI
342 343
        with support.transient_internet("XXX"):
            u = urllib.request.urlopen("XXX")
344 345 346 347 348 349 350 351 352
            contents = u.readall()
            if expect_sni:
                self.assertIn(b"Great", contents)
                self.assertNotIn(b"Unfortunately", contents)
            else:
                self.assertNotIn(b"Great", contents)
                self.assertIn(b"Unfortunately", contents)


353
def test_main():
354 355
    support.requires("network")
    support.run_unittest(AuthTests,
356 357 358 359 360
                         HTTPSTests,
                         OtherNetworkTests,
                         CloseSocketTest,
                         TimeoutTest,
                         )
361 362 363

if __name__ == "__main__":
    test_main()