Skip to content
Projeler
Gruplar
Parçacıklar
Yardım
Yükleniyor...
Oturum aç / Kaydol
Gezinmeyi değiştir
C
cpython
Proje
Proje
Ayrıntılar
Etkinlik
Cycle Analytics
Depo (repository)
Depo (repository)
Dosyalar
Kayıtlar (commit)
Dallar (branch)
Etiketler
Katkıda bulunanlar
Grafik
Karşılaştır
Grafikler
Konular (issue)
0
Konular (issue)
0
Liste
Pano
Etiketler
Kilometre Taşları
Birleştirme (merge) Talepleri
0
Birleştirme (merge) Talepleri
0
CI / CD
CI / CD
İş akışları (pipeline)
İşler
Zamanlamalar
Grafikler
Paketler
Paketler
Wiki
Wiki
Parçacıklar
Parçacıklar
Üyeler
Üyeler
Collapse sidebar
Close sidebar
Etkinlik
Grafik
Grafikler
Yeni bir konu (issue) oluştur
İşler
Kayıtlar (commit)
Konu (issue) Panoları
Kenar çubuğunu aç
Batuhan Osman TASKAYA
cpython
Commits
daded802
Kaydet (Commit)
daded802
authored
Tem 14, 2014
tarafından
Victor Stinner
Dosyalara gözat
Seçenekler
Dosyalara Gözat
İndir
Sade Fark
Merge with Python 3.4
üst
56ded52f
acdb782a
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
181 additions
and
43 deletions
+181
-43
base_events.py
Lib/asyncio/base_events.py
+92
-39
base_subprocess.py
Lib/asyncio/base_subprocess.py
+39
-1
streams.py
Lib/asyncio/streams.py
+12
-0
subprocess.py
Lib/asyncio/subprocess.py
+26
-0
unix_events.py
Lib/asyncio/unix_events.py
+12
-1
test_base_events.py
Lib/test/test_asyncio/test_base_events.py
+0
-2
No files found.
Lib/asyncio/base_events.py
Dosyayı görüntüle @
daded802
"""Base implementation of event loop.
"""Base implementation of event loop.
The event loop can be broken up into a multiplexer (the part
The event loop can be broken up into a multiplexer (the part
responsible for notifying us of IO events) and the event loop proper,
responsible for notifying us of I
/
O events) and the event loop proper,
which wraps a multiplexer with functionality for scheduling callbacks,
which wraps a multiplexer with functionality for scheduling callbacks,
immediately or at a given time in the future.
immediately or at a given time in the future.
...
@@ -50,6 +50,15 @@ def _format_handle(handle):
...
@@ -50,6 +50,15 @@ def _format_handle(handle):
return
str
(
handle
)
return
str
(
handle
)
def
_format_pipe
(
fd
):
if
fd
==
subprocess
.
PIPE
:
return
'<pipe>'
elif
fd
==
subprocess
.
STDOUT
:
return
'<stdout>'
else
:
return
repr
(
fd
)
class
_StopError
(
BaseException
):
class
_StopError
(
BaseException
):
"""Raised to stop the event loop."""
"""Raised to stop the event loop."""
...
@@ -70,7 +79,7 @@ def _check_resolved_address(sock, address):
...
@@ -70,7 +79,7 @@ def _check_resolved_address(sock, address):
type_mask
|=
socket
.
SOCK_NONBLOCK
type_mask
|=
socket
.
SOCK_NONBLOCK
if
hasattr
(
socket
,
'SOCK_CLOEXEC'
):
if
hasattr
(
socket
,
'SOCK_CLOEXEC'
):
type_mask
|=
socket
.
SOCK_CLOEXEC
type_mask
|=
socket
.
SOCK_CLOEXEC
# Use getaddrinfo(AI_NUMERICHOST) to ensure that the address is
# Use getaddrinfo(
flags=
AI_NUMERICHOST) to ensure that the address is
# already resolved.
# already resolved.
try
:
try
:
socket
.
getaddrinfo
(
host
,
port
,
socket
.
getaddrinfo
(
host
,
port
,
...
@@ -158,7 +167,8 @@ class BaseEventLoop(events.AbstractEventLoop):
...
@@ -158,7 +167,8 @@ class BaseEventLoop(events.AbstractEventLoop):
def
create_task
(
self
,
coro
):
def
create_task
(
self
,
coro
):
"""Schedule a coroutine object.
"""Schedule a coroutine object.
Return a task object."""
Return a task object.
"""
task
=
tasks
.
Task
(
coro
,
loop
=
self
)
task
=
tasks
.
Task
(
coro
,
loop
=
self
)
if
task
.
_source_traceback
:
if
task
.
_source_traceback
:
del
task
.
_source_traceback
[
-
1
]
del
task
.
_source_traceback
[
-
1
]
...
@@ -197,12 +207,13 @@ class BaseEventLoop(events.AbstractEventLoop):
...
@@ -197,12 +207,13 @@ class BaseEventLoop(events.AbstractEventLoop):
"""Create subprocess transport."""
"""Create subprocess transport."""
raise
NotImplementedError
raise
NotImplementedError
def
_read_from_self
(
self
):
"""XXX"""
raise
NotImplementedError
def
_write_to_self
(
self
):
def
_write_to_self
(
self
):
"""XXX"""
"""Write a byte to self-pipe, to wake up the event loop.
This may be called from a different thread.
The subclass is responsible for implementing the self-pipe.
"""
raise
NotImplementedError
raise
NotImplementedError
def
_process_events
(
self
,
event_list
):
def
_process_events
(
self
,
event_list
):
...
@@ -233,7 +244,7 @@ class BaseEventLoop(events.AbstractEventLoop):
...
@@ -233,7 +244,7 @@ class BaseEventLoop(events.AbstractEventLoop):
If the argument is a coroutine, it is wrapped in a Task.
If the argument is a coroutine, it is wrapped in a Task.
XXX TBD
: It would be disastrous to call run_until_complete()
WARNING
: It would be disastrous to call run_until_complete()
with the same coroutine twice -- it would wrap it in two
with the same coroutine twice -- it would wrap it in two
different Tasks and that can't be good.
different Tasks and that can't be good.
...
@@ -261,7 +272,7 @@ class BaseEventLoop(events.AbstractEventLoop):
...
@@ -261,7 +272,7 @@ class BaseEventLoop(events.AbstractEventLoop):
Every callback scheduled before stop() is called will run.
Every callback scheduled before stop() is called will run.
Callback scheduled after stop() is called won't. However,
Callback scheduled after stop() is called won't. However,
those callbacks will run if run() is called again later.
those callbacks will run if run
_*
() is called again later.
"""
"""
self
.
call_soon
(
_raise_stop_error
)
self
.
call_soon
(
_raise_stop_error
)
...
@@ -274,7 +285,7 @@ class BaseEventLoop(events.AbstractEventLoop):
...
@@ -274,7 +285,7 @@ class BaseEventLoop(events.AbstractEventLoop):
The event loop must not be running.
The event loop must not be running.
"""
"""
if
self
.
_running
:
if
self
.
_running
:
raise
RuntimeError
(
"
c
annot close a running event loop"
)
raise
RuntimeError
(
"
C
annot close a running event loop"
)
if
self
.
_closed
:
if
self
.
_closed
:
return
return
if
self
.
_debug
:
if
self
.
_debug
:
...
@@ -292,11 +303,16 @@ class BaseEventLoop(events.AbstractEventLoop):
...
@@ -292,11 +303,16 @@ class BaseEventLoop(events.AbstractEventLoop):
return
self
.
_closed
return
self
.
_closed
def
is_running
(
self
):
def
is_running
(
self
):
"""Returns
running status of event loop
."""
"""Returns
True if the event loop is running
."""
return
self
.
_running
return
self
.
_running
def
time
(
self
):
def
time
(
self
):
"""Return the time according to the event loop's clock."""
"""Return the time according to the event loop's clock.
This is a float expressed in seconds since an epoch, but the
epoch, precision, accuracy and drift are unspecified and may
differ per event loop.
"""
return
time
.
monotonic
()
return
time
.
monotonic
()
def
call_later
(
self
,
delay
,
callback
,
*
args
):
def
call_later
(
self
,
delay
,
callback
,
*
args
):
...
@@ -306,7 +322,7 @@ class BaseEventLoop(events.AbstractEventLoop):
...
@@ -306,7 +322,7 @@ class BaseEventLoop(events.AbstractEventLoop):
can be used to cancel the call.
can be used to cancel the call.
The delay can be an int or float, expressed in seconds. It is
The delay can be an int or float, expressed in seconds. It is
always
a relative
time.
always
relative to the current
time.
Each callback will be called exactly once. If two callbacks
Each callback will be called exactly once. If two callbacks
are scheduled for exactly the same time, it undefined which
are scheduled for exactly the same time, it undefined which
...
@@ -321,7 +337,10 @@ class BaseEventLoop(events.AbstractEventLoop):
...
@@ -321,7 +337,10 @@ class BaseEventLoop(events.AbstractEventLoop):
return
timer
return
timer
def
call_at
(
self
,
when
,
callback
,
*
args
):
def
call_at
(
self
,
when
,
callback
,
*
args
):
"""Like call_later(), but uses an absolute time."""
"""Like call_later(), but uses an absolute time.
Absolute time corresponds to the event loop's time() method.
"""
if
coroutines
.
iscoroutinefunction
(
callback
):
if
coroutines
.
iscoroutinefunction
(
callback
):
raise
TypeError
(
"coroutines cannot be used with call_at()"
)
raise
TypeError
(
"coroutines cannot be used with call_at()"
)
if
self
.
_debug
:
if
self
.
_debug
:
...
@@ -335,7 +354,7 @@ class BaseEventLoop(events.AbstractEventLoop):
...
@@ -335,7 +354,7 @@ class BaseEventLoop(events.AbstractEventLoop):
def
call_soon
(
self
,
callback
,
*
args
):
def
call_soon
(
self
,
callback
,
*
args
):
"""Arrange for a callback to be called as soon as possible.
"""Arrange for a callback to be called as soon as possible.
This operates as a FIFO queue
,
callbacks are called in the
This operates as a FIFO queue
:
callbacks are called in the
order in which they are registered. Each callback will be
order in which they are registered. Each callback will be
called exactly once.
called exactly once.
...
@@ -361,10 +380,10 @@ class BaseEventLoop(events.AbstractEventLoop):
...
@@ -361,10 +380,10 @@ class BaseEventLoop(events.AbstractEventLoop):
def
_assert_is_current_event_loop
(
self
):
def
_assert_is_current_event_loop
(
self
):
"""Asserts that this event loop is the current event loop.
"""Asserts that this event loop is the current event loop.
Non-threadsafe methods of this class make this assumption and will
Non-thread
-
safe methods of this class make this assumption and will
likely behave incorrectly when the assumption is violated.
likely behave incorrectly when the assumption is violated.
Should only be called when (self._debug == True). The caller is
Should only be called when (self._debug == True).
The caller is
responsible for checking this condition for performance reasons.
responsible for checking this condition for performance reasons.
"""
"""
try
:
try
:
...
@@ -373,11 +392,11 @@ class BaseEventLoop(events.AbstractEventLoop):
...
@@ -373,11 +392,11 @@ class BaseEventLoop(events.AbstractEventLoop):
return
return
if
current
is
not
self
:
if
current
is
not
self
:
raise
RuntimeError
(
raise
RuntimeError
(
"
non-thread
safe operation invoked on an event loop other "
"
Non-thread-
safe operation invoked on an event loop other "
"than the current one"
)
"than the current one"
)
def
call_soon_threadsafe
(
self
,
callback
,
*
args
):
def
call_soon_threadsafe
(
self
,
callback
,
*
args
):
"""Like call_soon(), but thread
safe."""
"""Like call_soon(), but thread
-
safe."""
handle
=
self
.
_call_soon
(
callback
,
args
,
check_loop
=
False
)
handle
=
self
.
_call_soon
(
callback
,
args
,
check_loop
=
False
)
if
handle
.
_source_traceback
:
if
handle
.
_source_traceback
:
del
handle
.
_source_traceback
[
-
1
]
del
handle
.
_source_traceback
[
-
1
]
...
@@ -386,7 +405,7 @@ class BaseEventLoop(events.AbstractEventLoop):
...
@@ -386,7 +405,7 @@ class BaseEventLoop(events.AbstractEventLoop):
def
run_in_executor
(
self
,
executor
,
callback
,
*
args
):
def
run_in_executor
(
self
,
executor
,
callback
,
*
args
):
if
coroutines
.
iscoroutinefunction
(
callback
):
if
coroutines
.
iscoroutinefunction
(
callback
):
raise
TypeError
(
"
c
oroutines cannot be used with run_in_executor()"
)
raise
TypeError
(
"
C
oroutines cannot be used with run_in_executor()"
)
if
isinstance
(
callback
,
events
.
Handle
):
if
isinstance
(
callback
,
events
.
Handle
):
assert
not
args
assert
not
args
assert
not
isinstance
(
callback
,
events
.
TimerHandle
)
assert
not
isinstance
(
callback
,
events
.
TimerHandle
)
...
@@ -416,13 +435,13 @@ class BaseEventLoop(events.AbstractEventLoop):
...
@@ -416,13 +435,13 @@ class BaseEventLoop(events.AbstractEventLoop):
if
flags
:
if
flags
:
msg
.
append
(
'flags=
%
r'
%
flags
)
msg
.
append
(
'flags=
%
r'
%
flags
)
msg
=
', '
.
join
(
msg
)
msg
=
', '
.
join
(
msg
)
logger
.
debug
(
'Get address
s
info
%
s'
,
msg
)
logger
.
debug
(
'Get address info
%
s'
,
msg
)
t0
=
self
.
time
()
t0
=
self
.
time
()
addrinfo
=
socket
.
getaddrinfo
(
host
,
port
,
family
,
type
,
proto
,
flags
)
addrinfo
=
socket
.
getaddrinfo
(
host
,
port
,
family
,
type
,
proto
,
flags
)
dt
=
self
.
time
()
-
t0
dt
=
self
.
time
()
-
t0
msg
=
(
'Getting address
s
info
%
s took
%.3
f ms:
%
r'
msg
=
(
'Getting address info
%
s took
%.3
f ms:
%
r'
%
(
msg
,
dt
*
1e3
,
addrinfo
))
%
(
msg
,
dt
*
1e3
,
addrinfo
))
if
dt
>=
self
.
slow_callback_duration
:
if
dt
>=
self
.
slow_callback_duration
:
logger
.
info
(
msg
)
logger
.
info
(
msg
)
...
@@ -559,8 +578,8 @@ class BaseEventLoop(events.AbstractEventLoop):
...
@@ -559,8 +578,8 @@ class BaseEventLoop(events.AbstractEventLoop):
transport
,
protocol
=
yield
from
self
.
_create_connection_transport
(
transport
,
protocol
=
yield
from
self
.
_create_connection_transport
(
sock
,
protocol_factory
,
ssl
,
server_hostname
)
sock
,
protocol_factory
,
ssl
,
server_hostname
)
if
self
.
_debug
:
if
self
.
_debug
:
logger
.
debug
(
"connected to
%
s:
%
r: (
%
r,
%
r)"
,
logger
.
debug
(
"
%
r
connected to
%
s:
%
r: (
%
r,
%
r)"
,
host
,
port
,
transport
,
protocol
)
sock
,
host
,
port
,
transport
,
protocol
)
return
transport
,
protocol
return
transport
,
protocol
@coroutine
@coroutine
...
@@ -589,7 +608,7 @@ class BaseEventLoop(events.AbstractEventLoop):
...
@@ -589,7 +608,7 @@ class BaseEventLoop(events.AbstractEventLoop):
raise
ValueError
(
'unexpected address family'
)
raise
ValueError
(
'unexpected address family'
)
addr_pairs_info
=
(((
family
,
proto
),
(
None
,
None
)),)
addr_pairs_info
=
(((
family
,
proto
),
(
None
,
None
)),)
else
:
else
:
# join address
s
by (family, protocol)
# join address by (family, protocol)
addr_infos
=
collections
.
OrderedDict
()
addr_infos
=
collections
.
OrderedDict
()
for
idx
,
addr
in
((
0
,
local_addr
),
(
1
,
remote_addr
)):
for
idx
,
addr
in
((
0
,
local_addr
),
(
1
,
remote_addr
)):
if
addr
is
not
None
:
if
addr
is
not
None
:
...
@@ -674,7 +693,7 @@ class BaseEventLoop(events.AbstractEventLoop):
...
@@ -674,7 +693,7 @@ class BaseEventLoop(events.AbstractEventLoop):
reuse_address
=
None
):
reuse_address
=
None
):
"""Create a TCP server bound to host and port.
"""Create a TCP server bound to host and port.
Return a
n
Server object which can be used to stop the service.
Return a Server object which can be used to stop the service.
This method is a coroutine.
This method is a coroutine.
"""
"""
...
@@ -731,8 +750,7 @@ class BaseEventLoop(events.AbstractEventLoop):
...
@@ -731,8 +750,7 @@ class BaseEventLoop(events.AbstractEventLoop):
sock
.
close
()
sock
.
close
()
else
:
else
:
if
sock
is
None
:
if
sock
is
None
:
raise
ValueError
(
raise
ValueError
(
'Neither host/port nor sock were specified'
)
'host and port was not specified and no sock specified'
)
sockets
=
[
sock
]
sockets
=
[
sock
]
server
=
Server
(
self
,
sockets
)
server
=
Server
(
self
,
sockets
)
...
@@ -750,6 +768,9 @@ class BaseEventLoop(events.AbstractEventLoop):
...
@@ -750,6 +768,9 @@ class BaseEventLoop(events.AbstractEventLoop):
waiter
=
futures
.
Future
(
loop
=
self
)
waiter
=
futures
.
Future
(
loop
=
self
)
transport
=
self
.
_make_read_pipe_transport
(
pipe
,
protocol
,
waiter
)
transport
=
self
.
_make_read_pipe_transport
(
pipe
,
protocol
,
waiter
)
yield
from
waiter
yield
from
waiter
if
self
.
_debug
:
logger
.
debug
(
'Read pipe
%
r connected: (
%
r,
%
r)'
,
pipe
.
fileno
(),
transport
,
protocol
)
return
transport
,
protocol
return
transport
,
protocol
@coroutine
@coroutine
...
@@ -758,8 +779,24 @@ class BaseEventLoop(events.AbstractEventLoop):
...
@@ -758,8 +779,24 @@ class BaseEventLoop(events.AbstractEventLoop):
waiter
=
futures
.
Future
(
loop
=
self
)
waiter
=
futures
.
Future
(
loop
=
self
)
transport
=
self
.
_make_write_pipe_transport
(
pipe
,
protocol
,
waiter
)
transport
=
self
.
_make_write_pipe_transport
(
pipe
,
protocol
,
waiter
)
yield
from
waiter
yield
from
waiter
if
self
.
_debug
:
logger
.
debug
(
'Write pipe
%
r connected: (
%
r,
%
r)'
,
pipe
.
fileno
(),
transport
,
protocol
)
return
transport
,
protocol
return
transport
,
protocol
def
_log_subprocess
(
self
,
msg
,
stdin
,
stdout
,
stderr
):
info
=
[
msg
]
if
stdin
is
not
None
:
info
.
append
(
'stdin=
%
s'
%
_format_pipe
(
stdin
))
if
stdout
is
not
None
and
stderr
==
subprocess
.
STDOUT
:
info
.
append
(
'stdout=stderr=
%
s'
%
_format_pipe
(
stdout
))
else
:
if
stdout
is
not
None
:
info
.
append
(
'stdout=
%
s'
%
_format_pipe
(
stdout
))
if
stderr
is
not
None
:
info
.
append
(
'stderr=
%
s'
%
_format_pipe
(
stderr
))
logger
.
debug
(
' '
.
join
(
info
))
@coroutine
@coroutine
def
subprocess_shell
(
self
,
protocol_factory
,
cmd
,
*
,
stdin
=
subprocess
.
PIPE
,
def
subprocess_shell
(
self
,
protocol_factory
,
cmd
,
*
,
stdin
=
subprocess
.
PIPE
,
stdout
=
subprocess
.
PIPE
,
stderr
=
subprocess
.
PIPE
,
stdout
=
subprocess
.
PIPE
,
stderr
=
subprocess
.
PIPE
,
...
@@ -774,8 +811,15 @@ class BaseEventLoop(events.AbstractEventLoop):
...
@@ -774,8 +811,15 @@ class BaseEventLoop(events.AbstractEventLoop):
if
bufsize
!=
0
:
if
bufsize
!=
0
:
raise
ValueError
(
"bufsize must be 0"
)
raise
ValueError
(
"bufsize must be 0"
)
protocol
=
protocol_factory
()
protocol
=
protocol_factory
()
if
self
.
_debug
:
# don't log parameters: they may contain sensitive information
# (password) and may be too long
debug_log
=
'run shell command
%
r'
%
cmd
self
.
_log_subprocess
(
debug_log
,
stdin
,
stdout
,
stderr
)
transport
=
yield
from
self
.
_make_subprocess_transport
(
transport
=
yield
from
self
.
_make_subprocess_transport
(
protocol
,
cmd
,
True
,
stdin
,
stdout
,
stderr
,
bufsize
,
**
kwargs
)
protocol
,
cmd
,
True
,
stdin
,
stdout
,
stderr
,
bufsize
,
**
kwargs
)
if
self
.
_debug
:
logger
.
info
(
'
%
s:
%
r'
%
(
debug_log
,
transport
))
return
transport
,
protocol
return
transport
,
protocol
@coroutine
@coroutine
...
@@ -796,9 +840,16 @@ class BaseEventLoop(events.AbstractEventLoop):
...
@@ -796,9 +840,16 @@ class BaseEventLoop(events.AbstractEventLoop):
"a bytes or text string, not
%
s"
"a bytes or text string, not
%
s"
%
type
(
arg
)
.
__name__
)
%
type
(
arg
)
.
__name__
)
protocol
=
protocol_factory
()
protocol
=
protocol_factory
()
if
self
.
_debug
:
# don't log parameters: they may contain sensitive information
# (password) and may be too long
debug_log
=
'execute program
%
r'
%
program
self
.
_log_subprocess
(
debug_log
,
stdin
,
stdout
,
stderr
)
transport
=
yield
from
self
.
_make_subprocess_transport
(
transport
=
yield
from
self
.
_make_subprocess_transport
(
protocol
,
popen_args
,
False
,
stdin
,
stdout
,
stderr
,
protocol
,
popen_args
,
False
,
stdin
,
stdout
,
stderr
,
bufsize
,
**
kwargs
)
bufsize
,
**
kwargs
)
if
self
.
_debug
:
logger
.
info
(
'
%
s:
%
r'
%
(
debug_log
,
transport
))
return
transport
,
protocol
return
transport
,
protocol
def
set_exception_handler
(
self
,
handler
):
def
set_exception_handler
(
self
,
handler
):
...
@@ -808,7 +859,7 @@ class BaseEventLoop(events.AbstractEventLoop):
...
@@ -808,7 +859,7 @@ class BaseEventLoop(events.AbstractEventLoop):
be set.
be set.
If handler is a callable object, it should have a
If handler is a callable object, it should have a
matching signature to
'(loop, context)', where 'loop'
signature matching
'(loop, context)', where 'loop'
will be a reference to the active event loop, 'context'
will be a reference to the active event loop, 'context'
will be a dict object (see `call_exception_handler()`
will be a dict object (see `call_exception_handler()`
documentation for details about context).
documentation for details about context).
...
@@ -825,7 +876,7 @@ class BaseEventLoop(events.AbstractEventLoop):
...
@@ -825,7 +876,7 @@ class BaseEventLoop(events.AbstractEventLoop):
handler is set, and can be called by a custom exception
handler is set, and can be called by a custom exception
handler that wants to defer to the default behavior.
handler that wants to defer to the default behavior.
context parameter has the same meaning as in
The
context parameter has the same meaning as in
`call_exception_handler()`.
`call_exception_handler()`.
"""
"""
message
=
context
.
get
(
'message'
)
message
=
context
.
get
(
'message'
)
...
@@ -854,10 +905,10 @@ class BaseEventLoop(events.AbstractEventLoop):
...
@@ -854,10 +905,10 @@ class BaseEventLoop(events.AbstractEventLoop):
logger
.
error
(
'
\n
'
.
join
(
log_lines
),
exc_info
=
exc_info
)
logger
.
error
(
'
\n
'
.
join
(
log_lines
),
exc_info
=
exc_info
)
def
call_exception_handler
(
self
,
context
):
def
call_exception_handler
(
self
,
context
):
"""Call the current event loop exception handler.
"""Call the current event loop's exception handler.
The context argument is a dict containing the following keys:
context is a dict object containing the following keys
(new keys maybe introduced later):
- 'message': Error message;
- 'message': Error message;
- 'exception' (optional): Exception object;
- 'exception' (optional): Exception object;
- 'future' (optional): Future instance;
- 'future' (optional): Future instance;
...
@@ -866,8 +917,10 @@ class BaseEventLoop(events.AbstractEventLoop):
...
@@ -866,8 +917,10 @@ class BaseEventLoop(events.AbstractEventLoop):
- 'transport' (optional): Transport instance;
- 'transport' (optional): Transport instance;
- 'socket' (optional): Socket instance.
- 'socket' (optional): Socket instance.
Note: this method should not be overloaded in subclassed
New keys maybe introduced in the future.
event loops. For any custom exception handling, use
Note: do not overload this method in an event loop subclass.
For custom exception handling, use the
`set_exception_handler()` method.
`set_exception_handler()` method.
"""
"""
if
self
.
_exception_handler
is
None
:
if
self
.
_exception_handler
is
None
:
...
@@ -892,7 +945,7 @@ class BaseEventLoop(events.AbstractEventLoop):
...
@@ -892,7 +945,7 @@ class BaseEventLoop(events.AbstractEventLoop):
'context'
:
context
,
'context'
:
context
,
})
})
except
Exception
:
except
Exception
:
# Guard 'default_exception_handler' in case it
'
s
# Guard 'default_exception_handler' in case it
i
s
# overloaded.
# overloaded.
logger
.
error
(
'Exception in default exception handler '
logger
.
error
(
'Exception in default exception handler '
'while handling an unexpected error '
'while handling an unexpected error '
...
@@ -900,7 +953,7 @@ class BaseEventLoop(events.AbstractEventLoop):
...
@@ -900,7 +953,7 @@ class BaseEventLoop(events.AbstractEventLoop):
exc_info
=
True
)
exc_info
=
True
)
def
_add_callback
(
self
,
handle
):
def
_add_callback
(
self
,
handle
):
"""Add a Handle to
ready or scheduled
."""
"""Add a Handle to
_scheduled (TimerHandle) or _ready
."""
assert
isinstance
(
handle
,
events
.
Handle
),
'A Handle is required here'
assert
isinstance
(
handle
,
events
.
Handle
),
'A Handle is required here'
if
handle
.
_cancelled
:
if
handle
.
_cancelled
:
return
return
...
@@ -971,7 +1024,7 @@ class BaseEventLoop(events.AbstractEventLoop):
...
@@ -971,7 +1024,7 @@ class BaseEventLoop(events.AbstractEventLoop):
# Note: We run all currently scheduled callbacks, but not any
# Note: We run all currently scheduled callbacks, but not any
# callbacks scheduled by callbacks run this time around --
# callbacks scheduled by callbacks run this time around --
# they will be run the next time (after another I/O poll).
# they will be run the next time (after another I/O poll).
# Use an idiom that is threadsafe without using locks.
# Use an idiom that is thread
-
safe without using locks.
ntodo
=
len
(
self
.
_ready
)
ntodo
=
len
(
self
.
_ready
)
for
i
in
range
(
ntodo
):
for
i
in
range
(
ntodo
):
handle
=
self
.
_ready
.
popleft
()
handle
=
self
.
_ready
.
popleft
()
...
...
Lib/asyncio/base_subprocess.py
Dosyayı görüntüle @
daded802
...
@@ -4,6 +4,7 @@ import subprocess
...
@@ -4,6 +4,7 @@ import subprocess
from
.
import
protocols
from
.
import
protocols
from
.
import
transports
from
.
import
transports
from
.coroutines
import
coroutine
from
.coroutines
import
coroutine
from
.log
import
logger
class
BaseSubprocessTransport
(
transports
.
SubprocessTransport
):
class
BaseSubprocessTransport
(
transports
.
SubprocessTransport
):
...
@@ -14,6 +15,7 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
...
@@ -14,6 +15,7 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
super
()
.
__init__
(
extra
)
super
()
.
__init__
(
extra
)
self
.
_protocol
=
protocol
self
.
_protocol
=
protocol
self
.
_loop
=
loop
self
.
_loop
=
loop
self
.
_pid
=
None
self
.
_pipes
=
{}
self
.
_pipes
=
{}
if
stdin
==
subprocess
.
PIPE
:
if
stdin
==
subprocess
.
PIPE
:
...
@@ -27,7 +29,36 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
...
@@ -27,7 +29,36 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
self
.
_returncode
=
None
self
.
_returncode
=
None
self
.
_start
(
args
=
args
,
shell
=
shell
,
stdin
=
stdin
,
stdout
=
stdout
,
self
.
_start
(
args
=
args
,
shell
=
shell
,
stdin
=
stdin
,
stdout
=
stdout
,
stderr
=
stderr
,
bufsize
=
bufsize
,
**
kwargs
)
stderr
=
stderr
,
bufsize
=
bufsize
,
**
kwargs
)
self
.
_pid
=
self
.
_proc
.
pid
self
.
_extra
[
'subprocess'
]
=
self
.
_proc
self
.
_extra
[
'subprocess'
]
=
self
.
_proc
if
self
.
_loop
.
get_debug
():
if
isinstance
(
args
,
(
bytes
,
str
)):
program
=
args
else
:
program
=
args
[
0
]
logger
.
debug
(
'process
%
r created: pid
%
s'
,
program
,
self
.
_pid
)
def
__repr__
(
self
):
info
=
[
self
.
__class__
.
__name__
,
'pid=
%
s'
%
self
.
_pid
]
if
self
.
_returncode
is
not
None
:
info
.
append
(
'returncode=
%
s'
%
self
.
_returncode
)
stdin
=
self
.
_pipes
.
get
(
0
)
if
stdin
is
not
None
:
info
.
append
(
'stdin=
%
s'
%
stdin
.
pipe
)
stdout
=
self
.
_pipes
.
get
(
1
)
stderr
=
self
.
_pipes
.
get
(
2
)
if
stdout
is
not
None
and
stderr
is
stdout
:
info
.
append
(
'stdout=stderr=
%
s'
%
stdout
.
pipe
)
else
:
if
stdout
is
not
None
:
info
.
append
(
'stdout=
%
s'
%
stdout
.
pipe
)
if
stderr
is
not
None
:
info
.
append
(
'stderr=
%
s'
%
stderr
.
pipe
)
return
'<
%
s>'
%
' '
.
join
(
info
)
def
_start
(
self
,
args
,
shell
,
stdin
,
stdout
,
stderr
,
bufsize
,
**
kwargs
):
def
_start
(
self
,
args
,
shell
,
stdin
,
stdout
,
stderr
,
bufsize
,
**
kwargs
):
raise
NotImplementedError
raise
NotImplementedError
...
@@ -45,7 +76,7 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
...
@@ -45,7 +76,7 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
self
.
terminate
()
self
.
terminate
()
def
get_pid
(
self
):
def
get_pid
(
self
):
return
self
.
_p
roc
.
p
id
return
self
.
_pid
def
get_returncode
(
self
):
def
get_returncode
(
self
):
return
self
.
_returncode
return
self
.
_returncode
...
@@ -108,6 +139,9 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
...
@@ -108,6 +139,9 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
def
_process_exited
(
self
,
returncode
):
def
_process_exited
(
self
,
returncode
):
assert
returncode
is
not
None
,
returncode
assert
returncode
is
not
None
,
returncode
assert
self
.
_returncode
is
None
,
self
.
_returncode
assert
self
.
_returncode
is
None
,
self
.
_returncode
if
self
.
_loop
.
get_debug
():
logger
.
info
(
'
%
r exited with return code
%
r'
,
self
,
returncode
)
self
.
_returncode
=
returncode
self
.
_returncode
=
returncode
self
.
_call
(
self
.
_protocol
.
process_exited
)
self
.
_call
(
self
.
_protocol
.
process_exited
)
self
.
_try_finish
()
self
.
_try_finish
()
...
@@ -141,6 +175,10 @@ class WriteSubprocessPipeProto(protocols.BaseProtocol):
...
@@ -141,6 +175,10 @@ class WriteSubprocessPipeProto(protocols.BaseProtocol):
def
connection_made
(
self
,
transport
):
def
connection_made
(
self
,
transport
):
self
.
pipe
=
transport
self
.
pipe
=
transport
def
__repr__
(
self
):
return
(
'<
%
s fd=
%
s pipe=
%
r>'
%
(
self
.
__class__
.
__name__
,
self
.
fd
,
self
.
pipe
))
def
connection_lost
(
self
,
exc
):
def
connection_lost
(
self
,
exc
):
self
.
disconnected
=
True
self
.
disconnected
=
True
self
.
proc
.
_pipe_connection_lost
(
self
.
fd
,
exc
)
self
.
proc
.
_pipe_connection_lost
(
self
.
fd
,
exc
)
...
...
Lib/asyncio/streams.py
Dosyayı görüntüle @
daded802
...
@@ -15,6 +15,7 @@ from . import events
...
@@ -15,6 +15,7 @@ from . import events
from
.
import
futures
from
.
import
futures
from
.
import
protocols
from
.
import
protocols
from
.coroutines
import
coroutine
from
.coroutines
import
coroutine
from
.log
import
logger
_DEFAULT_LIMIT
=
2
**
16
_DEFAULT_LIMIT
=
2
**
16
...
@@ -153,10 +154,15 @@ class FlowControlMixin(protocols.Protocol):
...
@@ -153,10 +154,15 @@ class FlowControlMixin(protocols.Protocol):
def
pause_writing
(
self
):
def
pause_writing
(
self
):
assert
not
self
.
_paused
assert
not
self
.
_paused
self
.
_paused
=
True
self
.
_paused
=
True
if
self
.
_loop
.
get_debug
():
logger
.
debug
(
"
%
r pauses writing"
,
self
)
def
resume_writing
(
self
):
def
resume_writing
(
self
):
assert
self
.
_paused
assert
self
.
_paused
self
.
_paused
=
False
self
.
_paused
=
False
if
self
.
_loop
.
get_debug
():
logger
.
debug
(
"
%
r resumes writing"
,
self
)
waiter
=
self
.
_drain_waiter
waiter
=
self
.
_drain_waiter
if
waiter
is
not
None
:
if
waiter
is
not
None
:
self
.
_drain_waiter
=
None
self
.
_drain_waiter
=
None
...
@@ -244,6 +250,12 @@ class StreamWriter:
...
@@ -244,6 +250,12 @@ class StreamWriter:
self
.
_reader
=
reader
self
.
_reader
=
reader
self
.
_loop
=
loop
self
.
_loop
=
loop
def
__repr__
(
self
):
info
=
[
self
.
__class__
.
__name__
,
'transport=
%
r'
%
self
.
_transport
]
if
self
.
_reader
is
not
None
:
info
.
append
(
'reader=
%
r'
%
self
.
_reader
)
return
'<
%
s>'
%
' '
.
join
(
info
)
@property
@property
def
transport
(
self
):
def
transport
(
self
):
return
self
.
_transport
return
self
.
_transport
...
...
Lib/asyncio/subprocess.py
Dosyayı görüntüle @
daded802
...
@@ -9,6 +9,7 @@ from . import protocols
...
@@ -9,6 +9,7 @@ from . import protocols
from
.
import
streams
from
.
import
streams
from
.
import
tasks
from
.
import
tasks
from
.coroutines
import
coroutine
from
.coroutines
import
coroutine
from
.log
import
logger
PIPE
=
subprocess
.
PIPE
PIPE
=
subprocess
.
PIPE
...
@@ -28,6 +29,16 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
...
@@ -28,6 +29,16 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
self
.
_waiters
=
collections
.
deque
()
self
.
_waiters
=
collections
.
deque
()
self
.
_transport
=
None
self
.
_transport
=
None
def
__repr__
(
self
):
info
=
[
self
.
__class__
.
__name__
]
if
self
.
stdin
is
not
None
:
info
.
append
(
'stdin=
%
r'
%
self
.
stdin
)
if
self
.
stdout
is
not
None
:
info
.
append
(
'stdout=
%
r'
%
self
.
stdout
)
if
self
.
stderr
is
not
None
:
info
.
append
(
'stderr=
%
r'
%
self
.
stderr
)
return
'<
%
s>'
%
' '
.
join
(
info
)
def
connection_made
(
self
,
transport
):
def
connection_made
(
self
,
transport
):
self
.
_transport
=
transport
self
.
_transport
=
transport
if
transport
.
get_pipe_transport
(
1
):
if
transport
.
get_pipe_transport
(
1
):
...
@@ -91,6 +102,9 @@ class Process:
...
@@ -91,6 +102,9 @@ class Process:
self
.
stderr
=
protocol
.
stderr
self
.
stderr
=
protocol
.
stderr
self
.
pid
=
transport
.
get_pid
()
self
.
pid
=
transport
.
get_pid
()
def
__repr__
(
self
):
return
'<
%
s
%
s>'
%
(
self
.
__class__
.
__name__
,
self
.
pid
)
@property
@property
def
returncode
(
self
):
def
returncode
(
self
):
return
self
.
_transport
.
get_returncode
()
return
self
.
_transport
.
get_returncode
()
...
@@ -126,7 +140,13 @@ class Process:
...
@@ -126,7 +140,13 @@ class Process:
@coroutine
@coroutine
def
_feed_stdin
(
self
,
input
):
def
_feed_stdin
(
self
,
input
):
self
.
stdin
.
write
(
input
)
self
.
stdin
.
write
(
input
)
if
self
.
_loop
.
get_debug
():
logger
.
debug
(
'
%
r communicate: feed stdin (
%
s bytes)'
,
self
,
len
(
input
))
yield
from
self
.
stdin
.
drain
()
yield
from
self
.
stdin
.
drain
()
if
self
.
_loop
.
get_debug
():
logger
.
debug
(
'
%
r communicate: close stdin'
,
self
)
self
.
stdin
.
close
()
self
.
stdin
.
close
()
@coroutine
@coroutine
...
@@ -141,7 +161,13 @@ class Process:
...
@@ -141,7 +161,13 @@ class Process:
else
:
else
:
assert
fd
==
1
assert
fd
==
1
stream
=
self
.
stdout
stream
=
self
.
stdout
if
self
.
_loop
.
get_debug
():
name
=
'stdout'
if
fd
==
1
else
'stderr'
logger
.
debug
(
'
%
r communicate: read
%
s'
,
self
,
name
)
output
=
yield
from
stream
.
read
()
output
=
yield
from
stream
.
read
()
if
self
.
_loop
.
get_debug
():
name
=
'stdout'
if
fd
==
1
else
'stderr'
logger
.
debug
(
'
%
r communicate: close
%
s'
,
self
,
name
)
transport
.
close
()
transport
.
close
()
return
output
return
output
...
...
Lib/asyncio/unix_events.py
Dosyayı görüntüle @
daded802
...
@@ -565,7 +565,7 @@ class AbstractChildWatcher:
...
@@ -565,7 +565,7 @@ class AbstractChildWatcher:
process 'pid' terminates. Specifying another callback for the same
process 'pid' terminates. Specifying another callback for the same
process replaces the previous handler.
process replaces the previous handler.
Note: callback() must be thread-safe
Note: callback() must be thread-safe
.
"""
"""
raise
NotImplementedError
()
raise
NotImplementedError
()
...
@@ -721,6 +721,9 @@ class SafeChildWatcher(BaseChildWatcher):
...
@@ -721,6 +721,9 @@ class SafeChildWatcher(BaseChildWatcher):
return
return
returncode
=
self
.
_compute_returncode
(
status
)
returncode
=
self
.
_compute_returncode
(
status
)
if
self
.
_loop
.
get_debug
():
logger
.
debug
(
'process
%
s exited with returncode
%
s'
,
expected_pid
,
returncode
)
try
:
try
:
callback
,
args
=
self
.
_callbacks
.
pop
(
pid
)
callback
,
args
=
self
.
_callbacks
.
pop
(
pid
)
...
@@ -818,8 +821,16 @@ class FastChildWatcher(BaseChildWatcher):
...
@@ -818,8 +821,16 @@ class FastChildWatcher(BaseChildWatcher):
if
self
.
_forks
:
if
self
.
_forks
:
# It may not be registered yet.
# It may not be registered yet.
self
.
_zombies
[
pid
]
=
returncode
self
.
_zombies
[
pid
]
=
returncode
if
self
.
_loop
.
get_debug
():
logger
.
debug
(
'unknown process
%
s exited '
'with returncode
%
s'
,
pid
,
returncode
)
continue
continue
callback
=
None
callback
=
None
else
:
if
self
.
_loop
.
get_debug
():
logger
.
debug
(
'process
%
s exited with returncode
%
s'
,
pid
,
returncode
)
if
callback
is
None
:
if
callback
is
None
:
logger
.
warning
(
logger
.
warning
(
...
...
Lib/test/test_asyncio/test_base_events.py
Dosyayı görüntüle @
daded802
...
@@ -43,8 +43,6 @@ class BaseEventLoopTests(test_utils.TestCase):
...
@@ -43,8 +43,6 @@ class BaseEventLoopTests(test_utils.TestCase):
NotImplementedError
,
self
.
loop
.
_process_events
,
[])
NotImplementedError
,
self
.
loop
.
_process_events
,
[])
self
.
assertRaises
(
self
.
assertRaises
(
NotImplementedError
,
self
.
loop
.
_write_to_self
)
NotImplementedError
,
self
.
loop
.
_write_to_self
)
self
.
assertRaises
(
NotImplementedError
,
self
.
loop
.
_read_from_self
)
self
.
assertRaises
(
self
.
assertRaises
(
NotImplementedError
,
NotImplementedError
,
self
.
loop
.
_make_read_pipe_transport
,
m
,
m
)
self
.
loop
.
_make_read_pipe_transport
,
m
,
m
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment