Skip to content
Projeler
Gruplar
Parçacıklar
Yardım
Yükleniyor...
Oturum aç / Kaydol
Gezinmeyi değiştir
C
cpython
Proje
Proje
Ayrıntılar
Etkinlik
Cycle Analytics
Depo (repository)
Depo (repository)
Dosyalar
Kayıtlar (commit)
Dallar (branch)
Etiketler
Katkıda bulunanlar
Grafik
Karşılaştır
Grafikler
Konular (issue)
0
Konular (issue)
0
Liste
Pano
Etiketler
Kilometre Taşları
Birleştirme (merge) Talepleri
0
Birleştirme (merge) Talepleri
0
CI / CD
CI / CD
İş akışları (pipeline)
İşler
Zamanlamalar
Grafikler
Paketler
Paketler
Wiki
Wiki
Parçacıklar
Parçacıklar
Üyeler
Üyeler
Collapse sidebar
Close sidebar
Etkinlik
Grafik
Grafikler
Yeni bir konu (issue) oluştur
İşler
Kayıtlar (commit)
Konu (issue) Panoları
Kenar çubuğunu aç
Batuhan Osman TASKAYA
cpython
Commits
5dad1ff8
Kaydet (Commit)
5dad1ff8
authored
Eki 03, 2015
tarafından
Guido van Rossum
Dosyalara gözat
Seçenekler
Dosyalara Gözat
İndir
Sade Fark
Issue #25304: Add asyncio.run_coroutine_threadsafe(). By Vincent Michel. (Merge 3.5->3.6.)
üst
9846187b
0d9bef92
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
147 additions
and
19 deletions
+147
-19
futures.py
Lib/asyncio/futures.py
+58
-16
tasks.py
Lib/asyncio/tasks.py
+17
-1
test_futures.py
Lib/test/test_asyncio/test_futures.py
+0
-2
test_tasks.py
Lib/test/test_asyncio/test_tasks.py
+67
-0
ACKS
Misc/ACKS
+1
-0
NEWS
Misc/NEWS
+4
-0
No files found.
Lib/asyncio/futures.py
Dosyayı görüntüle @
5dad1ff8
...
@@ -390,22 +390,64 @@ class Future:
...
@@ -390,22 +390,64 @@ class Future:
__await__
=
__iter__
# make compatible with 'await' expression
__await__
=
__iter__
# make compatible with 'await' expression
def
wrap_future
(
fut
,
*
,
loop
=
None
):
def
_set_concurrent_future_state
(
concurrent
,
source
):
"""Wrap concurrent.futures.Future object."""
"""Copy state from a future to a concurrent.futures.Future."""
if
isinstance
(
fut
,
Future
):
assert
source
.
done
()
return
fut
if
source
.
cancelled
():
assert
isinstance
(
fut
,
concurrent
.
futures
.
Future
),
\
concurrent
.
cancel
()
'concurrent.futures.Future is expected, got {!r}'
.
format
(
fut
)
if
not
concurrent
.
set_running_or_notify_cancel
():
if
loop
is
None
:
return
loop
=
events
.
get_event_loop
()
exception
=
source
.
exception
()
new_future
=
Future
(
loop
=
loop
)
if
exception
is
not
None
:
concurrent
.
set_exception
(
exception
)
else
:
result
=
source
.
result
()
concurrent
.
set_result
(
result
)
def
_chain_future
(
source
,
destination
):
"""Chain two futures so that when one completes, so does the other.
The result (or exception) of source will be copied to destination.
If destination is cancelled, source gets cancelled too.
Compatible with both asyncio.Future and concurrent.futures.Future.
"""
if
not
isinstance
(
source
,
(
Future
,
concurrent
.
futures
.
Future
)):
raise
TypeError
(
'A future is required for source argument'
)
if
not
isinstance
(
destination
,
(
Future
,
concurrent
.
futures
.
Future
)):
raise
TypeError
(
'A future is required for destination argument'
)
source_loop
=
source
.
_loop
if
isinstance
(
source
,
Future
)
else
None
dest_loop
=
destination
.
_loop
if
isinstance
(
destination
,
Future
)
else
None
def
_set_state
(
future
,
other
):
if
isinstance
(
future
,
Future
):
future
.
_copy_state
(
other
)
else
:
_set_concurrent_future_state
(
future
,
other
)
def
_check_cancel_other
(
f
):
def
_call_check_cancel
(
destination
):
if
f
.
cancelled
():
if
destination
.
cancelled
():
fut
.
cancel
()
if
source_loop
is
None
or
source_loop
is
dest_loop
:
source
.
cancel
()
else
:
source_loop
.
call_soon_threadsafe
(
source
.
cancel
)
new_future
.
add_done_callback
(
_check_cancel_other
)
def
_call_set_state
(
source
):
fut
.
add_done_callback
(
if
dest_loop
is
None
or
dest_loop
is
source_loop
:
lambda
future
:
loop
.
call_soon_threadsafe
(
_set_state
(
destination
,
source
)
new_future
.
_copy_state
,
future
))
else
:
dest_loop
.
call_soon_threadsafe
(
_set_state
,
destination
,
source
)
destination
.
add_done_callback
(
_call_check_cancel
)
source
.
add_done_callback
(
_call_set_state
)
def
wrap_future
(
future
,
*
,
loop
=
None
):
"""Wrap concurrent.futures.Future object."""
if
isinstance
(
future
,
Future
):
return
future
assert
isinstance
(
future
,
concurrent
.
futures
.
Future
),
\
'concurrent.futures.Future is expected, got {!r}'
.
format
(
future
)
new_future
=
Future
(
loop
=
loop
)
_chain_future
(
future
,
new_future
)
return
new_future
return
new_future
Lib/asyncio/tasks.py
Dosyayı görüntüle @
5dad1ff8
...
@@ -3,7 +3,7 @@
...
@@ -3,7 +3,7 @@
__all__
=
[
'Task'
,
__all__
=
[
'Task'
,
'FIRST_COMPLETED'
,
'FIRST_EXCEPTION'
,
'ALL_COMPLETED'
,
'FIRST_COMPLETED'
,
'FIRST_EXCEPTION'
,
'ALL_COMPLETED'
,
'wait'
,
'wait_for'
,
'as_completed'
,
'sleep'
,
'async'
,
'wait'
,
'wait_for'
,
'as_completed'
,
'sleep'
,
'async'
,
'gather'
,
'shield'
,
'ensure_future'
,
'gather'
,
'shield'
,
'ensure_future'
,
'run_coroutine_threadsafe'
,
]
]
import
concurrent.futures
import
concurrent.futures
...
@@ -692,3 +692,19 @@ def shield(arg, *, loop=None):
...
@@ -692,3 +692,19 @@ def shield(arg, *, loop=None):
inner
.
add_done_callback
(
_done_callback
)
inner
.
add_done_callback
(
_done_callback
)
return
outer
return
outer
def
run_coroutine_threadsafe
(
coro
,
loop
):
"""Submit a coroutine object to a given event loop.
Return a concurrent.futures.Future to access the result.
"""
if
not
coroutines
.
iscoroutine
(
coro
):
raise
TypeError
(
'A coroutine object is required'
)
future
=
concurrent
.
futures
.
Future
()
def
callback
():
futures
.
_chain_future
(
ensure_future
(
coro
,
loop
=
loop
),
future
)
loop
.
call_soon_threadsafe
(
callback
)
return
future
Lib/test/test_asyncio/test_futures.py
Dosyayı görüntüle @
5dad1ff8
...
@@ -174,8 +174,6 @@ class FutureTests(test_utils.TestCase):
...
@@ -174,8 +174,6 @@ class FutureTests(test_utils.TestCase):
'<Future cancelled>'
)
'<Future cancelled>'
)
def
test_copy_state
(
self
):
def
test_copy_state
(
self
):
# Test the internal _copy_state method since it's being directly
# invoked in other modules.
f
=
asyncio
.
Future
(
loop
=
self
.
loop
)
f
=
asyncio
.
Future
(
loop
=
self
.
loop
)
f
.
set_result
(
10
)
f
.
set_result
(
10
)
...
...
Lib/test/test_asyncio/test_tasks.py
Dosyayı görüntüle @
5dad1ff8
...
@@ -2100,5 +2100,72 @@ class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase):
...
@@ -2100,5 +2100,72 @@ class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase):
self
.
assertIsInstance
(
f
.
exception
(),
RuntimeError
)
self
.
assertIsInstance
(
f
.
exception
(),
RuntimeError
)
class
RunCoroutineThreadsafeTests
(
test_utils
.
TestCase
):
"""Test case for futures.submit_to_loop."""
def
setUp
(
self
):
self
.
loop
=
self
.
new_test_loop
(
self
.
time_gen
)
def
time_gen
(
self
):
"""Handle the timer."""
yield
0
# second
yield
1
# second
@asyncio.coroutine
def
add
(
self
,
a
,
b
,
fail
=
False
,
cancel
=
False
):
"""Wait 1 second and return a + b."""
yield
from
asyncio
.
sleep
(
1
,
loop
=
self
.
loop
)
if
fail
:
raise
RuntimeError
(
"Fail!"
)
if
cancel
:
asyncio
.
tasks
.
Task
.
current_task
(
self
.
loop
)
.
cancel
()
yield
return
a
+
b
def
target
(
self
,
fail
=
False
,
cancel
=
False
,
timeout
=
None
):
"""Run add coroutine in the event loop."""
coro
=
self
.
add
(
1
,
2
,
fail
=
fail
,
cancel
=
cancel
)
future
=
asyncio
.
run_coroutine_threadsafe
(
coro
,
self
.
loop
)
try
:
return
future
.
result
(
timeout
)
finally
:
future
.
done
()
or
future
.
cancel
()
def
test_run_coroutine_threadsafe
(
self
):
"""Test coroutine submission from a thread to an event loop."""
future
=
self
.
loop
.
run_in_executor
(
None
,
self
.
target
)
result
=
self
.
loop
.
run_until_complete
(
future
)
self
.
assertEqual
(
result
,
3
)
def
test_run_coroutine_threadsafe_with_exception
(
self
):
"""Test coroutine submission from a thread to an event loop
when an exception is raised."""
future
=
self
.
loop
.
run_in_executor
(
None
,
self
.
target
,
True
)
with
self
.
assertRaises
(
RuntimeError
)
as
exc_context
:
self
.
loop
.
run_until_complete
(
future
)
self
.
assertIn
(
"Fail!"
,
exc_context
.
exception
.
args
)
def
test_run_coroutine_threadsafe_with_timeout
(
self
):
"""Test coroutine submission from a thread to an event loop
when a timeout is raised."""
callback
=
lambda
:
self
.
target
(
timeout
=
0
)
future
=
self
.
loop
.
run_in_executor
(
None
,
callback
)
with
self
.
assertRaises
(
asyncio
.
TimeoutError
):
self
.
loop
.
run_until_complete
(
future
)
# Clear the time generator and tasks
test_utils
.
run_briefly
(
self
.
loop
)
# Check that there's no pending task (add has been cancelled)
for
task
in
asyncio
.
Task
.
all_tasks
(
self
.
loop
):
self
.
assertTrue
(
task
.
done
())
def
test_run_coroutine_threadsafe_task_cancelled
(
self
):
"""Test coroutine submission from a tread to an event loop
when the task is cancelled."""
callback
=
lambda
:
self
.
target
(
cancel
=
True
)
future
=
self
.
loop
.
run_in_executor
(
None
,
callback
)
with
self
.
assertRaises
(
asyncio
.
CancelledError
):
self
.
loop
.
run_until_complete
(
future
)
if
__name__
==
'__main__'
:
if
__name__
==
'__main__'
:
unittest
.
main
()
unittest
.
main
()
Misc/ACKS
Dosyayı görüntüle @
5dad1ff8
...
@@ -958,6 +958,7 @@ Steven Miale
...
@@ -958,6 +958,7 @@ Steven Miale
Trent Mick
Trent Mick
Jason Michalski
Jason Michalski
Franck Michea
Franck Michea
Vincent Michel
Tom Middleton
Tom Middleton
Thomas Miedema
Thomas Miedema
Stan Mihai
Stan Mihai
...
...
Misc/NEWS
Dosyayı görüntüle @
5dad1ff8
...
@@ -40,6 +40,10 @@ Core and Builtins
...
@@ -40,6 +40,10 @@ Core and Builtins
Library
Library
-------
-------
- Issue #25304: Add asyncio.run_coroutine_threadsafe(). This lets you
submit a coroutine to a loop from another thread, returning a
concurrent.futures.Future. By Vincent Michel.
- Issue #25232: Fix CGIRequestHandler to split the query from the URL at the
- Issue #25232: Fix CGIRequestHandler to split the query from the URL at the
first question mark (?) rather than the last. Patch from Xiang Zhang.
first question mark (?) rather than the last. Patch from Xiang Zhang.
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment