Skip to content
Projeler
Gruplar
Parçacıklar
Yardım
Yükleniyor...
Oturum aç / Kaydol
Gezinmeyi değiştir
C
cpython
Proje
Proje
Ayrıntılar
Etkinlik
Cycle Analytics
Depo (repository)
Depo (repository)
Dosyalar
Kayıtlar (commit)
Dallar (branch)
Etiketler
Katkıda bulunanlar
Grafik
Karşılaştır
Grafikler
Konular (issue)
0
Konular (issue)
0
Liste
Pano
Etiketler
Kilometre Taşları
Birleştirme (merge) Talepleri
0
Birleştirme (merge) Talepleri
0
CI / CD
CI / CD
İş akışları (pipeline)
İşler
Zamanlamalar
Grafikler
Paketler
Paketler
Wiki
Wiki
Parçacıklar
Parçacıklar
Üyeler
Üyeler
Collapse sidebar
Close sidebar
Etkinlik
Grafik
Grafikler
Yeni bir konu (issue) oluştur
İşler
Kayıtlar (commit)
Konu (issue) Panoları
Kenar çubuğunu aç
Batuhan Osman TASKAYA
cpython
Commits
60d230c7
Unverified
Kaydet (Commit)
60d230c7
authored
Eki 08, 2018
tarafından
Andrew Svetlov
Kaydeden (comit)
GitHub
Eki 08, 2018
Dosyalara gözat
Seçenekler
Dosyalara Gözat
İndir
Eposta Yamaları
Sade Fark
Extract tests for sock_*() functions into a separate file (GH-9761)
üst
0854b92c
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
238 additions
and
130 deletions
+238
-130
test_events.py
Lib/test/test_asyncio/test_events.py
+0
-130
test_sock_lowlevel.py
Lib/test/test_asyncio/test_sock_lowlevel.py
+238
-0
No files found.
Lib/test/test_asyncio/test_events.py
Dosyayı görüntüle @
60d230c7
...
...
@@ -419,108 +419,6 @@ class EventLoopTestsMixin:
r
.
close
()
self
.
assertEqual
(
read
,
data
)
def
_basetest_sock_client_ops
(
self
,
httpd
,
sock
):
if
not
isinstance
(
self
.
loop
,
proactor_events
.
BaseProactorEventLoop
):
# in debug mode, socket operations must fail
# if the socket is not in blocking mode
self
.
loop
.
set_debug
(
True
)
sock
.
setblocking
(
True
)
with
self
.
assertRaises
(
ValueError
):
self
.
loop
.
run_until_complete
(
self
.
loop
.
sock_connect
(
sock
,
httpd
.
address
))
with
self
.
assertRaises
(
ValueError
):
self
.
loop
.
run_until_complete
(
self
.
loop
.
sock_sendall
(
sock
,
b
'GET / HTTP/1.0
\r\n\r\n
'
))
with
self
.
assertRaises
(
ValueError
):
self
.
loop
.
run_until_complete
(
self
.
loop
.
sock_recv
(
sock
,
1024
))
with
self
.
assertRaises
(
ValueError
):
self
.
loop
.
run_until_complete
(
self
.
loop
.
sock_recv_into
(
sock
,
bytearray
()))
with
self
.
assertRaises
(
ValueError
):
self
.
loop
.
run_until_complete
(
self
.
loop
.
sock_accept
(
sock
))
# test in non-blocking mode
sock
.
setblocking
(
False
)
self
.
loop
.
run_until_complete
(
self
.
loop
.
sock_connect
(
sock
,
httpd
.
address
))
self
.
loop
.
run_until_complete
(
self
.
loop
.
sock_sendall
(
sock
,
b
'GET / HTTP/1.0
\r\n\r\n
'
))
data
=
self
.
loop
.
run_until_complete
(
self
.
loop
.
sock_recv
(
sock
,
1024
))
# consume data
self
.
loop
.
run_until_complete
(
self
.
loop
.
sock_recv
(
sock
,
1024
))
sock
.
close
()
self
.
assertTrue
(
data
.
startswith
(
b
'HTTP/1.0 200 OK'
))
def
_basetest_sock_recv_into
(
self
,
httpd
,
sock
):
# same as _basetest_sock_client_ops, but using sock_recv_into
sock
.
setblocking
(
False
)
self
.
loop
.
run_until_complete
(
self
.
loop
.
sock_connect
(
sock
,
httpd
.
address
))
self
.
loop
.
run_until_complete
(
self
.
loop
.
sock_sendall
(
sock
,
b
'GET / HTTP/1.0
\r\n\r\n
'
))
data
=
bytearray
(
1024
)
with
memoryview
(
data
)
as
buf
:
nbytes
=
self
.
loop
.
run_until_complete
(
self
.
loop
.
sock_recv_into
(
sock
,
buf
[:
1024
]))
# consume data
self
.
loop
.
run_until_complete
(
self
.
loop
.
sock_recv_into
(
sock
,
buf
[
nbytes
:]))
sock
.
close
()
self
.
assertTrue
(
data
.
startswith
(
b
'HTTP/1.0 200 OK'
))
def
test_sock_client_ops
(
self
):
with
test_utils
.
run_test_server
()
as
httpd
:
sock
=
socket
.
socket
()
self
.
_basetest_sock_client_ops
(
httpd
,
sock
)
sock
=
socket
.
socket
()
self
.
_basetest_sock_recv_into
(
httpd
,
sock
)
@support.skip_unless_bind_unix_socket
def
test_unix_sock_client_ops
(
self
):
with
test_utils
.
run_test_unix_server
()
as
httpd
:
sock
=
socket
.
socket
(
socket
.
AF_UNIX
)
self
.
_basetest_sock_client_ops
(
httpd
,
sock
)
sock
=
socket
.
socket
(
socket
.
AF_UNIX
)
self
.
_basetest_sock_recv_into
(
httpd
,
sock
)
def
test_sock_client_fail
(
self
):
# Make sure that we will get an unused port
address
=
None
try
:
s
=
socket
.
socket
()
s
.
bind
((
'127.0.0.1'
,
0
))
address
=
s
.
getsockname
()
finally
:
s
.
close
()
sock
=
socket
.
socket
()
sock
.
setblocking
(
False
)
with
self
.
assertRaises
(
ConnectionRefusedError
):
self
.
loop
.
run_until_complete
(
self
.
loop
.
sock_connect
(
sock
,
address
))
sock
.
close
()
def
test_sock_accept
(
self
):
listener
=
socket
.
socket
()
listener
.
setblocking
(
False
)
listener
.
bind
((
'127.0.0.1'
,
0
))
listener
.
listen
(
1
)
client
=
socket
.
socket
()
client
.
connect
(
listener
.
getsockname
())
f
=
self
.
loop
.
sock_accept
(
listener
)
conn
,
addr
=
self
.
loop
.
run_until_complete
(
f
)
self
.
assertEqual
(
conn
.
gettimeout
(),
0
)
self
.
assertEqual
(
addr
,
client
.
getsockname
())
self
.
assertEqual
(
client
.
getpeername
(),
listener
.
getsockname
())
client
.
close
()
conn
.
close
()
listener
.
close
()
@unittest.skipUnless
(
hasattr
(
signal
,
'SIGKILL'
),
'No SIGKILL'
)
def
test_add_signal_handler
(
self
):
caught
=
0
...
...
@@ -626,34 +524,6 @@ class EventLoopTestsMixin:
lambda
:
MyProto
(
loop
=
self
.
loop
),
httpd
.
address
)
self
.
_basetest_create_connection
(
conn_fut
,
check_sockname
)
def
test_create_connection_sock
(
self
):
with
test_utils
.
run_test_server
()
as
httpd
:
sock
=
None
infos
=
self
.
loop
.
run_until_complete
(
self
.
loop
.
getaddrinfo
(
*
httpd
.
address
,
type
=
socket
.
SOCK_STREAM
))
for
family
,
type
,
proto
,
cname
,
address
in
infos
:
try
:
sock
=
socket
.
socket
(
family
=
family
,
type
=
type
,
proto
=
proto
)
sock
.
setblocking
(
False
)
self
.
loop
.
run_until_complete
(
self
.
loop
.
sock_connect
(
sock
,
address
))
except
:
pass
else
:
break
else
:
assert
False
,
'Can not create socket.'
f
=
self
.
loop
.
create_connection
(
lambda
:
MyProto
(
loop
=
self
.
loop
),
sock
=
sock
)
tr
,
pr
=
self
.
loop
.
run_until_complete
(
f
)
self
.
assertIsInstance
(
tr
,
asyncio
.
Transport
)
self
.
assertIsInstance
(
pr
,
asyncio
.
Protocol
)
self
.
loop
.
run_until_complete
(
pr
.
done
)
self
.
assertGreater
(
pr
.
nbytes
,
0
)
tr
.
close
()
def
check_ssl_extra_info
(
self
,
client
,
check_sockname
=
True
,
peername
=
None
,
peercert
=
{}):
if
check_sockname
:
...
...
Lib/test/test_asyncio/test_sock_lowlevel.py
0 → 100644
Dosyayı görüntüle @
60d230c7
import
socket
import
asyncio
import
sys
from
asyncio
import
proactor_events
from
test.test_asyncio
import
utils
as
test_utils
from
test
import
support
class
MyProto
(
asyncio
.
Protocol
):
connected
=
None
done
=
None
def
__init__
(
self
,
loop
=
None
):
self
.
transport
=
None
self
.
state
=
'INITIAL'
self
.
nbytes
=
0
if
loop
is
not
None
:
self
.
connected
=
asyncio
.
Future
(
loop
=
loop
)
self
.
done
=
asyncio
.
Future
(
loop
=
loop
)
def
connection_made
(
self
,
transport
):
self
.
transport
=
transport
assert
self
.
state
==
'INITIAL'
,
self
.
state
self
.
state
=
'CONNECTED'
if
self
.
connected
:
self
.
connected
.
set_result
(
None
)
transport
.
write
(
b
'GET / HTTP/1.0
\r\n
Host: example.com
\r\n\r\n
'
)
def
data_received
(
self
,
data
):
assert
self
.
state
==
'CONNECTED'
,
self
.
state
self
.
nbytes
+=
len
(
data
)
def
eof_received
(
self
):
assert
self
.
state
==
'CONNECTED'
,
self
.
state
self
.
state
=
'EOF'
def
connection_lost
(
self
,
exc
):
assert
self
.
state
in
(
'CONNECTED'
,
'EOF'
),
self
.
state
self
.
state
=
'CLOSED'
if
self
.
done
:
self
.
done
.
set_result
(
None
)
class
BaseSockTestsMixin
:
def
create_event_loop
(
self
):
raise
NotImplementedError
def
setUp
(
self
):
self
.
loop
=
self
.
create_event_loop
()
self
.
set_event_loop
(
self
.
loop
)
super
()
.
setUp
()
def
tearDown
(
self
):
# just in case if we have transport close callbacks
if
not
self
.
loop
.
is_closed
():
test_utils
.
run_briefly
(
self
.
loop
)
self
.
doCleanups
()
support
.
gc_collect
()
super
()
.
tearDown
()
def
_basetest_sock_client_ops
(
self
,
httpd
,
sock
):
if
not
isinstance
(
self
.
loop
,
proactor_events
.
BaseProactorEventLoop
):
# in debug mode, socket operations must fail
# if the socket is not in blocking mode
self
.
loop
.
set_debug
(
True
)
sock
.
setblocking
(
True
)
with
self
.
assertRaises
(
ValueError
):
self
.
loop
.
run_until_complete
(
self
.
loop
.
sock_connect
(
sock
,
httpd
.
address
))
with
self
.
assertRaises
(
ValueError
):
self
.
loop
.
run_until_complete
(
self
.
loop
.
sock_sendall
(
sock
,
b
'GET / HTTP/1.0
\r\n\r\n
'
))
with
self
.
assertRaises
(
ValueError
):
self
.
loop
.
run_until_complete
(
self
.
loop
.
sock_recv
(
sock
,
1024
))
with
self
.
assertRaises
(
ValueError
):
self
.
loop
.
run_until_complete
(
self
.
loop
.
sock_recv_into
(
sock
,
bytearray
()))
with
self
.
assertRaises
(
ValueError
):
self
.
loop
.
run_until_complete
(
self
.
loop
.
sock_accept
(
sock
))
# test in non-blocking mode
sock
.
setblocking
(
False
)
self
.
loop
.
run_until_complete
(
self
.
loop
.
sock_connect
(
sock
,
httpd
.
address
))
self
.
loop
.
run_until_complete
(
self
.
loop
.
sock_sendall
(
sock
,
b
'GET / HTTP/1.0
\r\n\r\n
'
))
data
=
self
.
loop
.
run_until_complete
(
self
.
loop
.
sock_recv
(
sock
,
1024
))
# consume data
self
.
loop
.
run_until_complete
(
self
.
loop
.
sock_recv
(
sock
,
1024
))
sock
.
close
()
self
.
assertTrue
(
data
.
startswith
(
b
'HTTP/1.0 200 OK'
))
def
_basetest_sock_recv_into
(
self
,
httpd
,
sock
):
# same as _basetest_sock_client_ops, but using sock_recv_into
sock
.
setblocking
(
False
)
self
.
loop
.
run_until_complete
(
self
.
loop
.
sock_connect
(
sock
,
httpd
.
address
))
self
.
loop
.
run_until_complete
(
self
.
loop
.
sock_sendall
(
sock
,
b
'GET / HTTP/1.0
\r\n\r\n
'
))
data
=
bytearray
(
1024
)
with
memoryview
(
data
)
as
buf
:
nbytes
=
self
.
loop
.
run_until_complete
(
self
.
loop
.
sock_recv_into
(
sock
,
buf
[:
1024
]))
# consume data
self
.
loop
.
run_until_complete
(
self
.
loop
.
sock_recv_into
(
sock
,
buf
[
nbytes
:]))
sock
.
close
()
self
.
assertTrue
(
data
.
startswith
(
b
'HTTP/1.0 200 OK'
))
def
test_sock_client_ops
(
self
):
with
test_utils
.
run_test_server
()
as
httpd
:
sock
=
socket
.
socket
()
self
.
_basetest_sock_client_ops
(
httpd
,
sock
)
sock
=
socket
.
socket
()
self
.
_basetest_sock_recv_into
(
httpd
,
sock
)
@support.skip_unless_bind_unix_socket
def
test_unix_sock_client_ops
(
self
):
with
test_utils
.
run_test_unix_server
()
as
httpd
:
sock
=
socket
.
socket
(
socket
.
AF_UNIX
)
self
.
_basetest_sock_client_ops
(
httpd
,
sock
)
sock
=
socket
.
socket
(
socket
.
AF_UNIX
)
self
.
_basetest_sock_recv_into
(
httpd
,
sock
)
def
test_sock_client_fail
(
self
):
# Make sure that we will get an unused port
address
=
None
try
:
s
=
socket
.
socket
()
s
.
bind
((
'127.0.0.1'
,
0
))
address
=
s
.
getsockname
()
finally
:
s
.
close
()
sock
=
socket
.
socket
()
sock
.
setblocking
(
False
)
with
self
.
assertRaises
(
ConnectionRefusedError
):
self
.
loop
.
run_until_complete
(
self
.
loop
.
sock_connect
(
sock
,
address
))
sock
.
close
()
def
test_sock_accept
(
self
):
listener
=
socket
.
socket
()
listener
.
setblocking
(
False
)
listener
.
bind
((
'127.0.0.1'
,
0
))
listener
.
listen
(
1
)
client
=
socket
.
socket
()
client
.
connect
(
listener
.
getsockname
())
f
=
self
.
loop
.
sock_accept
(
listener
)
conn
,
addr
=
self
.
loop
.
run_until_complete
(
f
)
self
.
assertEqual
(
conn
.
gettimeout
(),
0
)
self
.
assertEqual
(
addr
,
client
.
getsockname
())
self
.
assertEqual
(
client
.
getpeername
(),
listener
.
getsockname
())
client
.
close
()
conn
.
close
()
listener
.
close
()
def
test_create_connection_sock
(
self
):
with
test_utils
.
run_test_server
()
as
httpd
:
sock
=
None
infos
=
self
.
loop
.
run_until_complete
(
self
.
loop
.
getaddrinfo
(
*
httpd
.
address
,
type
=
socket
.
SOCK_STREAM
))
for
family
,
type
,
proto
,
cname
,
address
in
infos
:
try
:
sock
=
socket
.
socket
(
family
=
family
,
type
=
type
,
proto
=
proto
)
sock
.
setblocking
(
False
)
self
.
loop
.
run_until_complete
(
self
.
loop
.
sock_connect
(
sock
,
address
))
except
BaseException
:
pass
else
:
break
else
:
assert
False
,
'Can not create socket.'
f
=
self
.
loop
.
create_connection
(
lambda
:
MyProto
(
loop
=
self
.
loop
),
sock
=
sock
)
tr
,
pr
=
self
.
loop
.
run_until_complete
(
f
)
self
.
assertIsInstance
(
tr
,
asyncio
.
Transport
)
self
.
assertIsInstance
(
pr
,
asyncio
.
Protocol
)
self
.
loop
.
run_until_complete
(
pr
.
done
)
self
.
assertGreater
(
pr
.
nbytes
,
0
)
tr
.
close
()
if
sys
.
platform
==
'win32'
:
class
SelectEventLoopTests
(
BaseSockTestsMixin
,
test_utils
.
TestCase
):
def
create_event_loop
(
self
):
return
asyncio
.
SelectorEventLoop
()
class
ProactorEventLoopTests
(
BaseSockTestsMixin
,
test_utils
.
TestCase
):
def
create_event_loop
(
self
):
return
asyncio
.
ProactorEventLoop
()
else
:
import
selectors
if
hasattr
(
selectors
,
'KqueueSelector'
):
class
KqueueEventLoopTests
(
BaseSockTestsMixin
,
test_utils
.
TestCase
):
def
create_event_loop
(
self
):
return
asyncio
.
SelectorEventLoop
(
selectors
.
KqueueSelector
())
if
hasattr
(
selectors
,
'EpollSelector'
):
class
EPollEventLoopTests
(
BaseSockTestsMixin
,
test_utils
.
TestCase
):
def
create_event_loop
(
self
):
return
asyncio
.
SelectorEventLoop
(
selectors
.
EpollSelector
())
if
hasattr
(
selectors
,
'PollSelector'
):
class
PollEventLoopTests
(
BaseSockTestsMixin
,
test_utils
.
TestCase
):
def
create_event_loop
(
self
):
return
asyncio
.
SelectorEventLoop
(
selectors
.
PollSelector
())
# Should always exist.
class
SelectEventLoopTests
(
BaseSockTestsMixin
,
test_utils
.
TestCase
):
def
create_event_loop
(
self
):
return
asyncio
.
SelectorEventLoop
(
selectors
.
SelectSelector
())
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment