Unverified Kaydet (Commit) 36c2c044 authored tarafından Yury Selivanov's avatar Yury Selivanov Kaydeden (comit) GitHub

bpo-32355: Optimize asyncio.gather() (#4913)

üst a9d7e552
...@@ -139,11 +139,12 @@ def _ipaddr_info(host, port, family, type, proto): ...@@ -139,11 +139,12 @@ def _ipaddr_info(host, port, family, type, proto):
def _run_until_complete_cb(fut): def _run_until_complete_cb(fut):
exc = fut._exception if not fut.cancelled():
if isinstance(exc, BaseException) and not isinstance(exc, Exception): exc = fut.exception()
# Issue #22429: run_forever() already finished, no need to if isinstance(exc, BaseException) and not isinstance(exc, Exception):
# stop it. # Issue #22429: run_forever() already finished, no need to
return # stop it.
return
fut._loop.stop() fut._loop.stop()
......
...@@ -575,8 +575,7 @@ class _GatheringFuture(futures.Future): ...@@ -575,8 +575,7 @@ class _GatheringFuture(futures.Future):
def gather(*coros_or_futures, loop=None, return_exceptions=False): def gather(*coros_or_futures, loop=None, return_exceptions=False):
"""Return a future aggregating results from the given coroutines """Return a future aggregating results from the given coroutines/futures.
or futures.
Coroutines will be wrapped in a future and scheduled in the event Coroutines will be wrapped in a future and scheduled in the event
loop. They will not necessarily be scheduled in the same order as loop. They will not necessarily be scheduled in the same order as
...@@ -605,56 +604,76 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False): ...@@ -605,56 +604,76 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False):
outer.set_result([]) outer.set_result([])
return outer return outer
arg_to_fut = {} def _done_callback(fut):
for arg in set(coros_or_futures):
if not futures.isfuture(arg):
fut = ensure_future(arg, loop=loop)
if loop is None:
loop = fut._loop
# The caller cannot control this future, the "destroy pending task"
# warning should not be emitted.
fut._log_destroy_pending = False
else:
fut = arg
if loop is None:
loop = fut._loop
elif fut._loop is not loop:
raise ValueError("futures are tied to different event loops")
arg_to_fut[arg] = fut
children = [arg_to_fut[arg] for arg in coros_or_futures]
nchildren = len(children)
outer = _GatheringFuture(children, loop=loop)
nfinished = 0
results = [None] * nchildren
def _done_callback(i, fut):
nonlocal nfinished nonlocal nfinished
nfinished += 1
if outer.done(): if outer.done():
if not fut.cancelled(): if not fut.cancelled():
# Mark exception retrieved. # Mark exception retrieved.
fut.exception() fut.exception()
return return
if fut.cancelled(): if not return_exceptions:
res = futures.CancelledError() if fut.cancelled():
if not return_exceptions: # Check if 'fut' is cancelled first, as
outer.set_exception(res) # 'fut.exception()' will *raise* a CancelledError
return # instead of returning it.
elif fut._exception is not None: exc = futures.CancelledError()
res = fut.exception() # Mark exception retrieved. outer.set_exception(exc)
if not return_exceptions:
outer.set_exception(res)
return return
else: else:
res = fut._result exc = fut.exception()
results[i] = res if exc is not None:
nfinished += 1 outer.set_exception(exc)
if nfinished == nchildren: return
if nfinished == nfuts:
# All futures are done; create a list of results
# and set it to the 'outer' future.
results = []
for fut in children:
if fut.cancelled():
# Check if 'fut' is cancelled first, as
# 'fut.exception()' will *raise* a CancelledError
# instead of returning it.
res = futures.CancelledError()
else:
res = fut.exception()
if res is None:
res = fut.result()
results.append(res)
outer.set_result(results) outer.set_result(results)
for i, fut in enumerate(children): arg_to_fut = {}
fut.add_done_callback(functools.partial(_done_callback, i)) children = []
nfuts = 0
nfinished = 0
for arg in coros_or_futures:
if arg not in arg_to_fut:
fut = ensure_future(arg, loop=loop)
if loop is None:
loop = fut._loop
if fut is not arg:
# 'arg' was not a Future, therefore, 'fut' is a new
# Future created specifically for 'arg'. Since the caller
# can't control it, disable the "destroy pending task"
# warning.
fut._log_destroy_pending = False
nfuts += 1
arg_to_fut[arg] = fut
fut.add_done_callback(_done_callback)
else:
# There's a duplicate Future object in coros_or_futures.
fut = arg_to_fut[arg]
children.append(fut)
outer = _GatheringFuture(children, loop=loop)
return outer return outer
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment