Kaydet (Commit) a0c1ba60 authored tarafından Yury Selivanov's avatar Yury Selivanov

Issue #28544: Implement asyncio.Task in C.

This implementation provides additional 10-20% speed boost for
asyncio programs.

The patch also fixes _asynciomodule.c to use Arguments Clinic, and
makes '_schedule_callbacks' an overridable method (as it was in 3.5).
üst bbcb7992
......@@ -57,7 +57,7 @@ _FATAL_ERROR_IGNORE = (BrokenPipeError,
def _format_handle(handle):
cb = handle._callback
if inspect.ismethod(cb) and isinstance(cb.__self__, tasks.Task):
if isinstance(getattr(cb, '__self__', None), tasks.Task):
# format the task
return repr(cb.__self__)
else:
......
__all__ = []
import concurrent.futures._base
import reprlib
from . import events
Error = concurrent.futures._base.Error
CancelledError = concurrent.futures.CancelledError
TimeoutError = concurrent.futures.TimeoutError
class InvalidStateError(Error):
"""The operation is not allowed in this state."""
# States for Future.
_PENDING = 'PENDING'
_CANCELLED = 'CANCELLED'
_FINISHED = 'FINISHED'
def isfuture(obj):
"""Check for a Future.
This returns True when obj is a Future instance or is advertising
itself as duck-type compatible by setting _asyncio_future_blocking.
See comment in Future for more details.
"""
return getattr(obj, '_asyncio_future_blocking', None) is not None
def _format_callbacks(cb):
"""helper function for Future.__repr__"""
size = len(cb)
if not size:
cb = ''
def format_cb(callback):
return events._format_callback_source(callback, ())
if size == 1:
cb = format_cb(cb[0])
elif size == 2:
cb = '{}, {}'.format(format_cb(cb[0]), format_cb(cb[1]))
elif size > 2:
cb = '{}, <{} more>, {}'.format(format_cb(cb[0]),
size - 2,
format_cb(cb[-1]))
return 'cb=[%s]' % cb
def _future_repr_info(future):
# (Future) -> str
"""helper function for Future.__repr__"""
info = [future._state.lower()]
if future._state == _FINISHED:
if future._exception is not None:
info.append('exception={!r}'.format(future._exception))
else:
# use reprlib to limit the length of the output, especially
# for very long strings
result = reprlib.repr(future._result)
info.append('result={}'.format(result))
if future._callbacks:
info.append(_format_callbacks(future._callbacks))
if future._source_traceback:
frame = future._source_traceback[-1]
info.append('created at %s:%s' % (frame[0], frame[1]))
return info
import linecache
import traceback
from . import base_futures
from . import coroutines
def _task_repr_info(task):
info = base_futures._future_repr_info(task)
if task._must_cancel:
# replace status
info[0] = 'cancelling'
coro = coroutines._format_coroutine(task._coro)
info.insert(1, 'coro=<%s>' % coro)
if task._fut_waiter is not None:
info.insert(2, 'wait_for=%r' % task._fut_waiter)
return info
def _task_get_stack(task, limit):
frames = []
try:
# 'async def' coroutines
f = task._coro.cr_frame
except AttributeError:
f = task._coro.gi_frame
if f is not None:
while f is not None:
if limit is not None:
if limit <= 0:
break
limit -= 1
frames.append(f)
f = f.f_back
frames.reverse()
elif task._exception is not None:
tb = task._exception.__traceback__
while tb is not None:
if limit is not None:
if limit <= 0:
break
limit -= 1
frames.append(tb.tb_frame)
tb = tb.tb_next
return frames
def _task_print_stack(task, limit, file):
extracted_list = []
checked = set()
for f in task.get_stack(limit=limit):
lineno = f.f_lineno
co = f.f_code
filename = co.co_filename
name = co.co_name
if filename not in checked:
checked.add(filename)
linecache.checkcache(filename)
line = linecache.getline(filename, lineno, f.f_globals)
extracted_list.append((filename, lineno, name, line))
exc = task._exception
if not extracted_list:
print('No stack for %r' % task, file=file)
elif exc is not None:
print('Traceback for %r (most recent call last):' % task,
file=file)
else:
print('Stack for %r (most recent call last):' % task,
file=file)
traceback.print_list(extracted_list, file=file)
if exc is not None:
for line in traceback.format_exception_only(exc.__class__, exc):
print(line, file=file, end='')
......@@ -11,7 +11,7 @@ import types
from . import compat
from . import events
from . import futures
from . import base_futures
from .log import logger
......@@ -204,7 +204,7 @@ def coroutine(func):
@functools.wraps(func)
def coro(*args, **kw):
res = func(*args, **kw)
if (futures.isfuture(res) or inspect.isgenerator(res) or
if (base_futures.isfuture(res) or inspect.isgenerator(res) or
isinstance(res, CoroWrapper)):
res = yield from res
elif _AwaitableABC is not None:
......
"""A Future class similar to the one in PEP 3148."""
__all__ = ['CancelledError', 'TimeoutError',
'InvalidStateError',
'Future', 'wrap_future', 'isfuture'
]
__all__ = ['CancelledError', 'TimeoutError', 'InvalidStateError',
'Future', 'wrap_future', 'isfuture']
import concurrent.futures._base
import concurrent.futures
import logging
import reprlib
import sys
import traceback
from . import base_futures
from . import compat
from . import events
# States for Future.
_PENDING = 'PENDING'
_CANCELLED = 'CANCELLED'
_FINISHED = 'FINISHED'
Error = concurrent.futures._base.Error
CancelledError = concurrent.futures.CancelledError
TimeoutError = concurrent.futures.TimeoutError
CancelledError = base_futures.CancelledError
InvalidStateError = base_futures.InvalidStateError
TimeoutError = base_futures.TimeoutError
isfuture = base_futures.isfuture
STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging
_PENDING = base_futures._PENDING
_CANCELLED = base_futures._CANCELLED
_FINISHED = base_futures._FINISHED
class InvalidStateError(Error):
"""The operation is not allowed in this state."""
STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging
class _TracebackLogger:
......@@ -110,56 +107,6 @@ class _TracebackLogger:
self.loop.call_exception_handler({'message': msg})
def isfuture(obj):
"""Check for a Future.
This returns True when obj is a Future instance or is advertising
itself as duck-type compatible by setting _asyncio_future_blocking.
See comment in Future for more details.
"""
return getattr(obj, '_asyncio_future_blocking', None) is not None
def _format_callbacks(cb):
"""helper function for Future.__repr__"""
size = len(cb)
if not size:
cb = ''
def format_cb(callback):
return events._format_callback_source(callback, ())
if size == 1:
cb = format_cb(cb[0])
elif size == 2:
cb = '{}, {}'.format(format_cb(cb[0]), format_cb(cb[1]))
elif size > 2:
cb = '{}, <{} more>, {}'.format(format_cb(cb[0]),
size-2,
format_cb(cb[-1]))
return 'cb=[%s]' % cb
def _future_repr_info(future):
# (Future) -> str
"""helper function for Future.__repr__"""
info = [future._state.lower()]
if future._state == _FINISHED:
if future._exception is not None:
info.append('exception={!r}'.format(future._exception))
else:
# use reprlib to limit the length of the output, especially
# for very long strings
result = reprlib.repr(future._result)
info.append('result={}'.format(result))
if future._callbacks:
info.append(_format_callbacks(future._callbacks))
if future._source_traceback:
frame = future._source_traceback[-1]
info.append('created at %s:%s' % (frame[0], frame[1]))
return info
class Future:
"""This class is *almost* compatible with concurrent.futures.Future.
......@@ -212,7 +159,7 @@ class Future:
if self._loop.get_debug():
self._source_traceback = traceback.extract_stack(sys._getframe(1))
_repr_info = _future_repr_info
_repr_info = base_futures._future_repr_info
def __repr__(self):
return '<%s %s>' % (self.__class__.__name__, ' '.join(self._repr_info()))
......@@ -247,10 +194,10 @@ class Future:
if self._state != _PENDING:
return False
self._state = _CANCELLED
self.__schedule_callbacks()
self._schedule_callbacks()
return True
def __schedule_callbacks(self):
def _schedule_callbacks(self):
"""Internal: Ask the event loop to call all callbacks.
The callbacks are scheduled to be called as soon as possible. Also
......@@ -352,7 +299,7 @@ class Future:
raise InvalidStateError('{}: {!r}'.format(self._state, self))
self._result = result
self._state = _FINISHED
self.__schedule_callbacks()
self._schedule_callbacks()
def set_exception(self, exception):
"""Mark the future done and set an exception.
......@@ -369,7 +316,7 @@ class Future:
"and cannot be raised into a Future")
self._exception = exception
self._state = _FINISHED
self.__schedule_callbacks()
self._schedule_callbacks()
if compat.PY34:
self._log_traceback = True
else:
......
......@@ -9,11 +9,10 @@ __all__ = ['Task',
import concurrent.futures
import functools
import inspect
import linecache
import traceback
import warnings
import weakref
from . import base_tasks
from . import compat
from . import coroutines
from . import events
......@@ -93,18 +92,7 @@ class Task(futures.Future):
futures.Future.__del__(self)
def _repr_info(self):
info = super()._repr_info()
if self._must_cancel:
# replace status
info[0] = 'cancelling'
coro = coroutines._format_coroutine(self._coro)
info.insert(1, 'coro=<%s>' % coro)
if self._fut_waiter is not None:
info.insert(2, 'wait_for=%r' % self._fut_waiter)
return info
return base_tasks._task_repr_info(self)
def get_stack(self, *, limit=None):
"""Return the list of stack frames for this task's coroutine.
......@@ -127,31 +115,7 @@ class Task(futures.Future):
For reasons beyond our control, only one stack frame is
returned for a suspended coroutine.
"""
frames = []
try:
# 'async def' coroutines
f = self._coro.cr_frame
except AttributeError:
f = self._coro.gi_frame
if f is not None:
while f is not None:
if limit is not None:
if limit <= 0:
break
limit -= 1
frames.append(f)
f = f.f_back
frames.reverse()
elif self._exception is not None:
tb = self._exception.__traceback__
while tb is not None:
if limit is not None:
if limit <= 0:
break
limit -= 1
frames.append(tb.tb_frame)
tb = tb.tb_next
return frames
return base_tasks._task_get_stack(self, limit)
def print_stack(self, *, limit=None, file=None):
"""Print the stack or traceback for this task's coroutine.
......@@ -162,31 +126,7 @@ class Task(futures.Future):
to which the output is written; by default output is written
to sys.stderr.
"""
extracted_list = []
checked = set()
for f in self.get_stack(limit=limit):
lineno = f.f_lineno
co = f.f_code
filename = co.co_filename
name = co.co_name
if filename not in checked:
checked.add(filename)
linecache.checkcache(filename)
line = linecache.getline(filename, lineno, f.f_globals)
extracted_list.append((filename, lineno, name, line))
exc = self._exception
if not extracted_list:
print('No stack for %r' % self, file=file)
elif exc is not None:
print('Traceback for %r (most recent call last):' % self,
file=file)
else:
print('Stack for %r (most recent call last):' % self,
file=file)
traceback.print_list(extracted_list, file=file)
if exc is not None:
for line in traceback.format_exception_only(exc.__class__, exc):
print(line, file=file, end='')
return base_tasks._task_print_stack(self, limit, file)
def cancel(self):
"""Request that this task cancel itself.
......@@ -316,6 +256,18 @@ class Task(futures.Future):
self = None # Needed to break cycles when an exception occurs.
_PyTask = Task
try:
import _asyncio
except ImportError:
pass
else:
# _CTask is needed for tests.
Task = _CTask = _asyncio.Task
# wait() and as_completed() similar to those in PEP 3148.
FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
......
......@@ -85,6 +85,8 @@ Library
threadpool executor.
Initial patch by Hans Lawrenz.
- Issue #28544: Implement asyncio.Task in C.
Windows
-------
......
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment