Skip to content
Projeler
Gruplar
Parçacıklar
Yardım
Yükleniyor...
Oturum aç / Kaydol
Gezinmeyi değiştir
C
cpython
Proje
Proje
Ayrıntılar
Etkinlik
Cycle Analytics
Depo (repository)
Depo (repository)
Dosyalar
Kayıtlar (commit)
Dallar (branch)
Etiketler
Katkıda bulunanlar
Grafik
Karşılaştır
Grafikler
Konular (issue)
0
Konular (issue)
0
Liste
Pano
Etiketler
Kilometre Taşları
Birleştirme (merge) Talepleri
0
Birleştirme (merge) Talepleri
0
CI / CD
CI / CD
İş akışları (pipeline)
İşler
Zamanlamalar
Grafikler
Paketler
Paketler
Wiki
Wiki
Parçacıklar
Parçacıklar
Üyeler
Üyeler
Collapse sidebar
Close sidebar
Etkinlik
Grafik
Grafikler
Yeni bir konu (issue) oluştur
İşler
Kayıtlar (commit)
Konu (issue) Panoları
Kenar çubuğunu aç
Batuhan Osman TASKAYA
cpython
Commits
ff871226
Kaydet (Commit)
ff871226
authored
Haz 07, 2007
tarafından
Georg Brandl
Dosyalara gözat
Seçenekler
Dosyalara Gözat
İndir
Eposta Yamaları
Sade Fark
Patch #1667860: Fix UnboundLocalError in urllib2.
üst
04233ee5
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
302 additions
and
1 deletion
+302
-1
test_urllib2_localnet.py
Lib/test/test_urllib2_localnet.py
+301
-0
urllib2.py
Lib/urllib2.py
+1
-1
No files found.
Lib/test/test_urllib2_localnet.py
0 → 100644
Dosyayı görüntüle @
ff871226
#!/usr/bin/env python
import
sys
import
threading
import
urlparse
import
urllib2
import
BaseHTTPServer
import
unittest
import
hashlib
from
test
import
test_support
# Loopback http server infrastructure
class
LoopbackHttpServer
(
BaseHTTPServer
.
HTTPServer
):
"""HTTP server w/ a few modifications that make it useful for
loopback testing purposes.
"""
def
__init__
(
self
,
server_address
,
RequestHandlerClass
):
BaseHTTPServer
.
HTTPServer
.
__init__
(
self
,
server_address
,
RequestHandlerClass
)
# Set the timeout of our listening socket really low so
# that we can stop the server easily.
self
.
socket
.
settimeout
(
1.0
)
def
get_request
(
self
):
"""BaseHTTPServer method, overridden."""
request
,
client_address
=
self
.
socket
.
accept
()
# It's a loopback connection, so setting the timeout
# really low shouldn't affect anything, but should make
# deadlocks less likely to occur.
request
.
settimeout
(
10.0
)
return
(
request
,
client_address
)
class
LoopbackHttpServerThread
(
threading
.
Thread
):
"""Stoppable thread that runs a loopback http server."""
def
__init__
(
self
,
port
,
RequestHandlerClass
):
threading
.
Thread
.
__init__
(
self
)
self
.
_RequestHandlerClass
=
RequestHandlerClass
self
.
_stop
=
False
self
.
_port
=
port
self
.
_server_address
=
(
'127.0.0.1'
,
self
.
_port
)
self
.
ready
=
threading
.
Event
()
def
stop
(
self
):
"""Stops the webserver if it's currently running."""
# Set the stop flag.
self
.
_stop
=
True
self
.
join
()
def
run
(
self
):
protocol
=
"HTTP/1.0"
self
.
_RequestHandlerClass
.
protocol_version
=
protocol
httpd
=
LoopbackHttpServer
(
self
.
_server_address
,
self
.
_RequestHandlerClass
)
sa
=
httpd
.
socket
.
getsockname
()
#print "Serving HTTP on", sa[0], "port", sa[1], "..."
self
.
ready
.
set
()
while
not
self
.
_stop
:
httpd
.
handle_request
()
# Authentication infrastructure
class
DigestAuthHandler
:
"""Handler for performing digest authentication."""
def
__init__
(
self
):
self
.
_request_num
=
0
self
.
_nonces
=
[]
self
.
_users
=
{}
self
.
_realm_name
=
"Test Realm"
self
.
_qop
=
"auth"
def
set_qop
(
self
,
qop
):
self
.
_qop
=
qop
def
set_users
(
self
,
users
):
assert
isinstance
(
users
,
dict
)
self
.
_users
=
users
def
set_realm
(
self
,
realm
):
self
.
_realm_name
=
realm
def
_generate_nonce
(
self
):
self
.
_request_num
+=
1
nonce
=
hashlib
.
md5
(
str
(
self
.
_request_num
))
.
hexdigest
()
self
.
_nonces
.
append
(
nonce
)
return
nonce
def
_create_auth_dict
(
self
,
auth_str
):
first_space_index
=
auth_str
.
find
(
" "
)
auth_str
=
auth_str
[
first_space_index
+
1
:]
parts
=
auth_str
.
split
(
","
)
auth_dict
=
{}
for
part
in
parts
:
name
,
value
=
part
.
split
(
"="
)
name
=
name
.
strip
()
if
value
[
0
]
==
'"'
and
value
[
-
1
]
==
'"'
:
value
=
value
[
1
:
-
1
]
else
:
value
=
value
.
strip
()
auth_dict
[
name
]
=
value
return
auth_dict
def
_validate_auth
(
self
,
auth_dict
,
password
,
method
,
uri
):
final_dict
=
{}
final_dict
.
update
(
auth_dict
)
final_dict
[
"password"
]
=
password
final_dict
[
"method"
]
=
method
final_dict
[
"uri"
]
=
uri
HA1_str
=
"
%(username)
s:
%(realm)
s:
%(password)
s"
%
final_dict
HA1
=
hashlib
.
md5
(
HA1_str
)
.
hexdigest
()
HA2_str
=
"
%(method)
s:
%(uri)
s"
%
final_dict
HA2
=
hashlib
.
md5
(
HA2_str
)
.
hexdigest
()
final_dict
[
"HA1"
]
=
HA1
final_dict
[
"HA2"
]
=
HA2
response_str
=
"
%(HA1)
s:
%(nonce)
s:
%(nc)
s:"
\
"
%(cnonce)
s:
%(qop)
s:
%(HA2)
s"
%
final_dict
response
=
hashlib
.
md5
(
response_str
)
.
hexdigest
()
return
response
==
auth_dict
[
"response"
]
def
_return_auth_challenge
(
self
,
request_handler
):
request_handler
.
send_response
(
407
,
"Proxy Authentication Required"
)
request_handler
.
send_header
(
"Content-Type"
,
"text/html"
)
request_handler
.
send_header
(
'Proxy-Authenticate'
,
'Digest realm="
%
s", '
'qop="
%
s",'
'nonce="
%
s", '
%
\
(
self
.
_realm_name
,
self
.
_qop
,
self
.
_generate_nonce
()))
# XXX: Not sure if we're supposed to add this next header or
# not.
#request_handler.send_header('Connection', 'close')
request_handler
.
end_headers
()
request_handler
.
wfile
.
write
(
"Proxy Authentication Required."
)
return
False
def
handle_request
(
self
,
request_handler
):
"""Performs digest authentication on the given HTTP request
handler. Returns True if authentication was successful, False
otherwise.
If no users have been set, then digest auth is effectively
disabled and this method will always return True.
"""
if
len
(
self
.
_users
)
==
0
:
return
True
if
not
request_handler
.
headers
.
has_key
(
'Proxy-Authorization'
):
return
self
.
_return_auth_challenge
(
request_handler
)
else
:
auth_dict
=
self
.
_create_auth_dict
(
request_handler
.
headers
[
'Proxy-Authorization'
]
)
if
self
.
_users
.
has_key
(
auth_dict
[
"username"
]):
password
=
self
.
_users
[
auth_dict
[
"username"
]
]
else
:
return
self
.
_return_auth_challenge
(
request_handler
)
if
not
auth_dict
.
get
(
"nonce"
)
in
self
.
_nonces
:
return
self
.
_return_auth_challenge
(
request_handler
)
else
:
self
.
_nonces
.
remove
(
auth_dict
[
"nonce"
])
auth_validated
=
False
# MSIE uses short_path in its validation, but Python's
# urllib2 uses the full path, so we're going to see if
# either of them works here.
for
path
in
[
request_handler
.
path
,
request_handler
.
short_path
]:
if
self
.
_validate_auth
(
auth_dict
,
password
,
request_handler
.
command
,
path
):
auth_validated
=
True
if
not
auth_validated
:
return
self
.
_return_auth_challenge
(
request_handler
)
return
True
# Proxy test infrastructure
class
FakeProxyHandler
(
BaseHTTPServer
.
BaseHTTPRequestHandler
):
"""This is a 'fake proxy' that makes it look like the entire
internet has gone down due to a sudden zombie invasion. It main
utility is in providing us with authentication support for
testing.
"""
digest_auth_handler
=
DigestAuthHandler
()
def
log_message
(
self
,
format
,
*
args
):
# Uncomment the next line for debugging.
#sys.stderr.write(format % args)
pass
def
do_GET
(
self
):
(
scm
,
netloc
,
path
,
params
,
query
,
fragment
)
=
urlparse
.
urlparse
(
self
.
path
,
'http'
)
self
.
short_path
=
path
if
self
.
digest_auth_handler
.
handle_request
(
self
):
self
.
send_response
(
200
,
"OK"
)
self
.
send_header
(
"Content-Type"
,
"text/html"
)
self
.
end_headers
()
self
.
wfile
.
write
(
"You've reached
%
s!<BR>"
%
self
.
path
)
self
.
wfile
.
write
(
"Our apologies, but our server is down due to "
"a sudden zombie invasion."
)
# Test cases
class
ProxyAuthTests
(
unittest
.
TestCase
):
URL
=
"http://www.foo.com"
PORT
=
8080
USER
=
"tester"
PASSWD
=
"test123"
REALM
=
"TestRealm"
PROXY_URL
=
"http://127.0.0.1:
%
d"
%
PORT
def
setUp
(
self
):
FakeProxyHandler
.
digest_auth_handler
.
set_users
({
self
.
USER
:
self
.
PASSWD
})
FakeProxyHandler
.
digest_auth_handler
.
set_realm
(
self
.
REALM
)
self
.
server
=
LoopbackHttpServerThread
(
self
.
PORT
,
FakeProxyHandler
)
self
.
server
.
start
()
self
.
server
.
ready
.
wait
()
handler
=
urllib2
.
ProxyHandler
({
"http"
:
self
.
PROXY_URL
})
self
.
_digest_auth_handler
=
urllib2
.
ProxyDigestAuthHandler
()
self
.
opener
=
urllib2
.
build_opener
(
handler
,
self
.
_digest_auth_handler
)
def
tearDown
(
self
):
self
.
server
.
stop
()
def
test_proxy_with_bad_password_raises_httperror
(
self
):
self
.
_digest_auth_handler
.
add_password
(
self
.
REALM
,
self
.
URL
,
self
.
USER
,
self
.
PASSWD
+
"bad"
)
FakeProxyHandler
.
digest_auth_handler
.
set_qop
(
"auth"
)
self
.
assertRaises
(
urllib2
.
HTTPError
,
self
.
opener
.
open
,
self
.
URL
)
def
test_proxy_with_no_password_raises_httperror
(
self
):
FakeProxyHandler
.
digest_auth_handler
.
set_qop
(
"auth"
)
self
.
assertRaises
(
urllib2
.
HTTPError
,
self
.
opener
.
open
,
self
.
URL
)
def
test_proxy_qop_auth_works
(
self
):
self
.
_digest_auth_handler
.
add_password
(
self
.
REALM
,
self
.
URL
,
self
.
USER
,
self
.
PASSWD
)
FakeProxyHandler
.
digest_auth_handler
.
set_qop
(
"auth"
)
result
=
self
.
opener
.
open
(
self
.
URL
)
while
result
.
read
():
pass
result
.
close
()
def
test_proxy_qop_auth_int_works_or_throws_urlerror
(
self
):
self
.
_digest_auth_handler
.
add_password
(
self
.
REALM
,
self
.
URL
,
self
.
USER
,
self
.
PASSWD
)
FakeProxyHandler
.
digest_auth_handler
.
set_qop
(
"auth-int"
)
try
:
result
=
self
.
opener
.
open
(
self
.
URL
)
except
urllib2
.
URLError
:
# It's okay if we don't support auth-int, but we certainly
# shouldn't receive any kind of exception here other than
# a URLError.
result
=
None
if
result
:
while
result
.
read
():
pass
result
.
close
()
def
test_main
():
# We will NOT depend on the network resource flag
# (Lib/test/regrtest.py -u network) since all tests here are only
# localhost. However, if this is a bad rationale, then uncomment
# the next line.
#test_support.requires("network")
test_support
.
run_unittest
(
ProxyAuthTests
)
if
__name__
==
"__main__"
:
test_main
()
Lib/urllib2.py
Dosyayı görüntüle @
ff871226
...
...
@@ -949,7 +949,7 @@ class AbstractDigestAuthHandler:
respdig
=
KD
(
H
(
A1
),
"
%
s:
%
s"
%
(
nonce
,
H
(
A2
)))
else
:
# XXX handle auth-int.
pass
raise
URLError
(
"qop '
%
s' is not supported."
%
qop
)
# XXX should the partial digests be encoded too?
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment