Unverified Kaydet (Commit) a70232f2 authored tarafından Yury Selivanov's avatar Yury Selivanov Kaydeden (comit) GitHub

bpo-32296: Implement asyncio.get_event_loop and _get_running_loop in C. (#4827)

asyncio.get_event_loop(), and, subsequently asyncio._get_running_loop()
are one of the most frequently executed functions in asyncio.  They also
can't be sped up by third-party event loops like uvloop.

When implemented in C they become 4x faster.
üst d5dda98f
......@@ -652,6 +652,7 @@ def get_running_loop():
This function is thread-specific.
"""
# NOTE: this function is implemented in C (see _asynciomodule.c)
loop = _get_running_loop()
if loop is None:
raise RuntimeError('no running event loop')
......@@ -664,6 +665,7 @@ def _get_running_loop():
This is a low-level function intended to be used by event loops.
This function is thread-specific.
"""
# NOTE: this function is implemented in C (see _asynciomodule.c)
running_loop, pid = _running_loop.loop_pid
if running_loop is not None and pid == os.getpid():
return running_loop
......@@ -675,6 +677,7 @@ def _set_running_loop(loop):
This is a low-level function intended to be used by event loops.
This function is thread-specific.
"""
# NOTE: this function is implemented in C (see _asynciomodule.c)
_running_loop.loop_pid = (loop, os.getpid())
......@@ -711,6 +714,7 @@ def get_event_loop():
If there is no running event loop set, the function will return
the result of `get_event_loop_policy().get_event_loop()` call.
"""
# NOTE: this function is implemented in C (see _asynciomodule.c)
current_loop = _get_running_loop()
if current_loop is not None:
return current_loop
......@@ -736,3 +740,26 @@ def set_child_watcher(watcher):
"""Equivalent to calling
get_event_loop_policy().set_child_watcher(watcher)."""
return get_event_loop_policy().set_child_watcher(watcher)
# Alias pure-Python implementations for testing purposes.
_py__get_running_loop = _get_running_loop
_py__set_running_loop = _set_running_loop
_py_get_running_loop = get_running_loop
_py_get_event_loop = get_event_loop
try:
# get_event_loop() is one of the most frequently called
# functions in asyncio. Pure Python implementation is
# about 4 times slower than C-accelerated.
from _asyncio import (_get_running_loop, _set_running_loop,
get_running_loop, get_event_loop)
except ImportError:
pass
else:
# Alias C implementations for testing purposes.
_c__get_running_loop = _get_running_loop
_c__set_running_loop = _set_running_loop
_c_get_running_loop = get_running_loop
_c_get_event_loop = get_event_loop
......@@ -27,6 +27,7 @@ if sys.platform != 'win32':
import asyncio
from asyncio import coroutines
from asyncio import events
from asyncio import proactor_events
from asyncio import selector_events
from test.test_asyncio import utils as test_utils
......@@ -2145,23 +2146,6 @@ else:
asyncio.set_child_watcher(None)
super().tearDown()
def test_get_event_loop_new_process(self):
# Issue bpo-32126: The multiprocessing module used by
# ProcessPoolExecutor is not functional when the
# multiprocessing.synchronize module cannot be imported.
support.import_module('multiprocessing.synchronize')
async def main():
pool = concurrent.futures.ProcessPoolExecutor()
result = await self.loop.run_in_executor(
pool, _test_get_event_loop_new_process__sub_proc)
pool.shutdown()
return result
self.unpatch_get_running_loop()
self.assertEqual(
self.loop.run_until_complete(main()),
'hello')
if hasattr(selectors, 'KqueueSelector'):
class KqueueEventLoopTests(UnixEventLoopTestsMixin,
......@@ -2722,17 +2706,95 @@ class PolicyTests(unittest.TestCase):
self.assertIs(policy, asyncio.get_event_loop_policy())
self.assertIsNot(policy, old_policy)
class GetEventLoopTestsMixin:
_get_running_loop_impl = None
_set_running_loop_impl = None
get_running_loop_impl = None
get_event_loop_impl = None
def setUp(self):
self._get_running_loop_saved = events._get_running_loop
self._set_running_loop_saved = events._set_running_loop
self.get_running_loop_saved = events.get_running_loop
self.get_event_loop_saved = events.get_event_loop
events._get_running_loop = type(self)._get_running_loop_impl
events._set_running_loop = type(self)._set_running_loop_impl
events.get_running_loop = type(self).get_running_loop_impl
events.get_event_loop = type(self).get_event_loop_impl
asyncio._get_running_loop = type(self)._get_running_loop_impl
asyncio._set_running_loop = type(self)._set_running_loop_impl
asyncio.get_running_loop = type(self).get_running_loop_impl
asyncio.get_event_loop = type(self).get_event_loop_impl
super().setUp()
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
watcher = asyncio.SafeChildWatcher()
watcher.attach_loop(self.loop)
asyncio.set_child_watcher(watcher)
def tearDown(self):
try:
asyncio.set_child_watcher(None)
super().tearDown()
finally:
self.loop.close()
asyncio.set_event_loop(None)
events._get_running_loop = self._get_running_loop_saved
events._set_running_loop = self._set_running_loop_saved
events.get_running_loop = self.get_running_loop_saved
events.get_event_loop = self.get_event_loop_saved
asyncio._get_running_loop = self._get_running_loop_saved
asyncio._set_running_loop = self._set_running_loop_saved
asyncio.get_running_loop = self.get_running_loop_saved
asyncio.get_event_loop = self.get_event_loop_saved
if sys.platform != 'win32':
def test_get_event_loop_new_process(self):
# Issue bpo-32126: The multiprocessing module used by
# ProcessPoolExecutor is not functional when the
# multiprocessing.synchronize module cannot be imported.
support.import_module('multiprocessing.synchronize')
async def main():
pool = concurrent.futures.ProcessPoolExecutor()
result = await self.loop.run_in_executor(
pool, _test_get_event_loop_new_process__sub_proc)
pool.shutdown()
return result
self.assertEqual(
self.loop.run_until_complete(main()),
'hello')
def test_get_event_loop_returns_running_loop(self):
class TestError(Exception):
pass
class Policy(asyncio.DefaultEventLoopPolicy):
def get_event_loop(self):
raise NotImplementedError
loop = None
raise TestError
old_policy = asyncio.get_event_loop_policy()
try:
asyncio.set_event_loop_policy(Policy())
loop = asyncio.new_event_loop()
with self.assertRaises(TestError):
asyncio.get_event_loop()
asyncio.set_event_loop(None)
with self.assertRaises(TestError):
asyncio.get_event_loop()
with self.assertRaisesRegex(RuntimeError, 'no running'):
self.assertIs(asyncio.get_running_loop(), None)
self.assertIs(asyncio._get_running_loop(), None)
......@@ -2743,6 +2805,15 @@ class PolicyTests(unittest.TestCase):
self.assertIs(asyncio._get_running_loop(), loop)
loop.run_until_complete(func())
asyncio.set_event_loop(loop)
with self.assertRaises(TestError):
asyncio.get_event_loop()
asyncio.set_event_loop(None)
with self.assertRaises(TestError):
asyncio.get_event_loop()
finally:
asyncio.set_event_loop_policy(old_policy)
if loop is not None:
......@@ -2754,5 +2825,27 @@ class PolicyTests(unittest.TestCase):
self.assertIs(asyncio._get_running_loop(), None)
class TestPyGetEventLoop(GetEventLoopTestsMixin, unittest.TestCase):
_get_running_loop_impl = events._py__get_running_loop
_set_running_loop_impl = events._py__set_running_loop
get_running_loop_impl = events._py_get_running_loop
get_event_loop_impl = events._py_get_event_loop
try:
import _asyncio # NoQA
except ImportError:
pass
else:
class TestCGetEventLoop(GetEventLoopTestsMixin, unittest.TestCase):
_get_running_loop_impl = events._c__get_running_loop
_set_running_loop_impl = events._c__set_running_loop
get_running_loop_impl = events._c_get_running_loop
get_event_loop_impl = events._c_get_event_loop
if __name__ == '__main__':
unittest.main()
Implement asyncio._get_running_loop() and get_event_loop() in C. This makes
them 4x faster.
......@@ -9,9 +9,11 @@ module _asyncio
/* identifiers used from some functions */
_Py_IDENTIFIER(__asyncio_running_event_loop__);
_Py_IDENTIFIER(add_done_callback);
_Py_IDENTIFIER(call_soon);
_Py_IDENTIFIER(cancel);
_Py_IDENTIFIER(get_event_loop);
_Py_IDENTIFIER(send);
_Py_IDENTIFIER(throw);
_Py_IDENTIFIER(_step);
......@@ -23,7 +25,7 @@ _Py_IDENTIFIER(_wakeup);
static PyObject *all_tasks;
static PyObject *current_tasks;
static PyObject *traceback_extract_stack;
static PyObject *asyncio_get_event_loop;
static PyObject *asyncio_get_event_loop_policy;
static PyObject *asyncio_future_repr_info_func;
static PyObject *asyncio_task_repr_info_func;
static PyObject *asyncio_task_get_stack_func;
......@@ -31,6 +33,7 @@ static PyObject *asyncio_task_print_stack_func;
static PyObject *asyncio_InvalidStateError;
static PyObject *asyncio_CancelledError;
static PyObject *inspect_isgenerator;
static PyObject *os_getpid;
typedef enum {
......@@ -88,6 +91,134 @@ class _asyncio.Future "FutureObj *" "&Future_Type"
static PyObject* future_new_iter(PyObject *);
static inline int future_call_schedule_callbacks(FutureObj *);
static int
get_running_loop(PyObject **loop)
{
PyObject *ts_dict;
PyObject *running_tuple;
PyObject *running_loop;
PyObject *running_loop_pid;
PyObject *current_pid;
int same_pid;
ts_dict = PyThreadState_GetDict(); // borrowed
if (ts_dict == NULL) {
PyErr_SetString(
PyExc_RuntimeError, "thread-local storage is not available");
goto error;
}
running_tuple = _PyDict_GetItemId(
ts_dict, &PyId___asyncio_running_event_loop__); // borrowed
if (running_tuple == NULL) {
/* _PyDict_GetItemId doesn't set an error if key is not found */
goto not_found;
}
assert(PyTuple_CheckExact(running_tuple));
assert(PyTuple_Size(running_tuple) == 2);
running_loop = PyTuple_GET_ITEM(running_tuple, 0); // borrowed
running_loop_pid = PyTuple_GET_ITEM(running_tuple, 1); // borrowed
if (running_loop == Py_None) {
goto not_found;
}
current_pid = _PyObject_CallNoArg(os_getpid);
if (current_pid == NULL) {
goto error;
}
same_pid = PyObject_RichCompareBool(current_pid, running_loop_pid, Py_EQ);
Py_DECREF(current_pid);
if (same_pid == -1) {
goto error;
}
if (same_pid) {
// current_pid == running_loop_pid
goto found;
}
not_found:
*loop = NULL;
return 0;
found:
Py_INCREF(running_loop);
*loop = running_loop;
return 0;
error:
*loop = NULL;
return -1;
}
static int
set_running_loop(PyObject *loop)
{
PyObject *ts_dict;
PyObject *running_tuple;
PyObject *current_pid;
ts_dict = PyThreadState_GetDict(); // borrowed
if (ts_dict == NULL) {
PyErr_SetString(
PyExc_RuntimeError, "thread-local storage is not available");
return -1;
}
current_pid = _PyObject_CallNoArg(os_getpid);
if (current_pid == NULL) {
return -1;
}
running_tuple = PyTuple_New(2);
if (running_tuple == NULL) {
Py_DECREF(current_pid);
return -1;
}
Py_INCREF(loop);
PyTuple_SET_ITEM(running_tuple, 0, loop);
PyTuple_SET_ITEM(running_tuple, 1, current_pid); // borrowed
if (_PyDict_SetItemId(
ts_dict, &PyId___asyncio_running_event_loop__, running_tuple)) {
Py_DECREF(running_tuple); // will cleanup loop & current_pid
return -1;
}
Py_DECREF(running_tuple);
return 0;
}
static PyObject *
get_event_loop(void)
{
PyObject *loop;
PyObject *policy;
if (get_running_loop(&loop)) {
return NULL;
}
if (loop != NULL) {
return loop;
}
policy = _PyObject_CallNoArg(asyncio_get_event_loop_policy);
if (policy == NULL) {
return NULL;
}
loop = _PyObject_CallMethodId(policy, &PyId_get_event_loop, NULL);
Py_DECREF(policy);
return loop;
}
static int
future_schedule_callbacks(FutureObj *fut)
{
......@@ -140,7 +271,7 @@ future_init(FutureObj *fut, PyObject *loop)
_Py_IDENTIFIER(get_debug);
if (loop == Py_None) {
loop = _PyObject_CallNoArg(asyncio_get_event_loop);
loop = get_event_loop();
if (loop == NULL) {
return -1;
}
......@@ -1449,7 +1580,7 @@ _asyncio_Task_current_task_impl(PyTypeObject *type, PyObject *loop)
PyObject *res;
if (loop == Py_None) {
loop = _PyObject_CallNoArg(asyncio_get_event_loop);
loop = get_event_loop();
if (loop == NULL) {
return NULL;
}
......@@ -1536,7 +1667,7 @@ _asyncio_Task_all_tasks_impl(PyTypeObject *type, PyObject *loop)
PyObject *res;
if (loop == Py_None) {
loop = _PyObject_CallNoArg(asyncio_get_event_loop);
loop = get_event_loop();
if (loop == NULL) {
return NULL;
}
......@@ -2368,6 +2499,100 @@ task_wakeup(TaskObj *task, PyObject *o)
}
/*********************** Functions **************************/
/*[clinic input]
_asyncio._get_running_loop
Return the running event loop or None.
This is a low-level function intended to be used by event loops.
This function is thread-specific.
[clinic start generated code]*/
static PyObject *
_asyncio__get_running_loop_impl(PyObject *module)
/*[clinic end generated code: output=b4390af721411a0a input=0a21627e25a4bd43]*/
{
PyObject *loop;
if (get_running_loop(&loop)) {
return NULL;
}
if (loop == NULL) {
/* There's no currently running event loop */
Py_RETURN_NONE;
}
return loop;
}
/*[clinic input]
_asyncio._set_running_loop
loop: 'O'
/
Set the running event loop.
This is a low-level function intended to be used by event loops.
This function is thread-specific.
[clinic start generated code]*/
static PyObject *
_asyncio__set_running_loop(PyObject *module, PyObject *loop)
/*[clinic end generated code: output=ae56bf7a28ca189a input=4c9720233d606604]*/
{
if (set_running_loop(loop)) {
return NULL;
}
Py_RETURN_NONE;
}
/*[clinic input]
_asyncio.get_event_loop
Return an asyncio event loop.
When called from a coroutine or a callback (e.g. scheduled with
call_soon or similar API), this function will always return the
running event loop.
If there is no running event loop set, the function will return
the result of `get_event_loop_policy().get_event_loop()` call.
[clinic start generated code]*/
static PyObject *
_asyncio_get_event_loop_impl(PyObject *module)
/*[clinic end generated code: output=2a2d8b2f824c648b input=9364bf2916c8655d]*/
{
return get_event_loop();
}
/*[clinic input]
_asyncio.get_running_loop
Return the running event loop. Raise a RuntimeError if there is none.
This function is thread-specific.
[clinic start generated code]*/
static PyObject *
_asyncio_get_running_loop_impl(PyObject *module)
/*[clinic end generated code: output=c247b5f9e529530e input=2a3bf02ba39f173d]*/
{
PyObject *loop;
if (get_running_loop(&loop)) {
return NULL;
}
if (loop == NULL) {
/* There's no currently running event loop */
PyErr_SetString(
PyExc_RuntimeError, "no running event loop");
}
return loop;
}
/*********************** Module **************************/
......@@ -2377,7 +2602,7 @@ module_free(void *m)
Py_CLEAR(current_tasks);
Py_CLEAR(all_tasks);
Py_CLEAR(traceback_extract_stack);
Py_CLEAR(asyncio_get_event_loop);
Py_CLEAR(asyncio_get_event_loop_policy);
Py_CLEAR(asyncio_future_repr_info_func);
Py_CLEAR(asyncio_task_repr_info_func);
Py_CLEAR(asyncio_task_get_stack_func);
......@@ -2385,6 +2610,7 @@ module_free(void *m)
Py_CLEAR(asyncio_InvalidStateError);
Py_CLEAR(asyncio_CancelledError);
Py_CLEAR(inspect_isgenerator);
Py_CLEAR(os_getpid);
}
static int
......@@ -2407,7 +2633,7 @@ module_init(void)
}
WITH_MOD("asyncio.events")
GET_MOD_ATTR(asyncio_get_event_loop, "get_event_loop")
GET_MOD_ATTR(asyncio_get_event_loop_policy, "get_event_loop_policy")
WITH_MOD("asyncio.base_futures")
GET_MOD_ATTR(asyncio_future_repr_info_func, "_future_repr_info")
......@@ -2422,6 +2648,9 @@ module_init(void)
WITH_MOD("inspect")
GET_MOD_ATTR(inspect_isgenerator, "isgenerator")
WITH_MOD("os")
GET_MOD_ATTR(os_getpid, "getpid")
WITH_MOD("traceback")
GET_MOD_ATTR(traceback_extract_stack, "extract_stack")
......@@ -2452,12 +2681,20 @@ fail:
PyDoc_STRVAR(module_doc, "Accelerator module for asyncio");
static PyMethodDef asyncio_methods[] = {
_ASYNCIO_GET_EVENT_LOOP_METHODDEF
_ASYNCIO_GET_RUNNING_LOOP_METHODDEF
_ASYNCIO__GET_RUNNING_LOOP_METHODDEF
_ASYNCIO__SET_RUNNING_LOOP_METHODDEF
{NULL, NULL}
};
static struct PyModuleDef _asynciomodule = {
PyModuleDef_HEAD_INIT, /* m_base */
"_asyncio", /* m_name */
module_doc, /* m_doc */
-1, /* m_size */
NULL, /* m_methods */
asyncio_methods, /* m_methods */
NULL, /* m_slots */
NULL, /* m_traverse */
NULL, /* m_clear */
......
......@@ -517,4 +517,82 @@ _asyncio_Task__wakeup(TaskObj *self, PyObject **args, Py_ssize_t nargs, PyObject
exit:
return return_value;
}
/*[clinic end generated code: output=b92f9cd2b9fb37ef input=a9049054013a1b77]*/
PyDoc_STRVAR(_asyncio__get_running_loop__doc__,
"_get_running_loop($module, /)\n"
"--\n"
"\n"
"Return the running event loop or None.\n"
"\n"
"This is a low-level function intended to be used by event loops.\n"
"This function is thread-specific.");
#define _ASYNCIO__GET_RUNNING_LOOP_METHODDEF \
{"_get_running_loop", (PyCFunction)_asyncio__get_running_loop, METH_NOARGS, _asyncio__get_running_loop__doc__},
static PyObject *
_asyncio__get_running_loop_impl(PyObject *module);
static PyObject *
_asyncio__get_running_loop(PyObject *module, PyObject *Py_UNUSED(ignored))
{
return _asyncio__get_running_loop_impl(module);
}
PyDoc_STRVAR(_asyncio__set_running_loop__doc__,
"_set_running_loop($module, loop, /)\n"
"--\n"
"\n"
"Set the running event loop.\n"
"\n"
"This is a low-level function intended to be used by event loops.\n"
"This function is thread-specific.");
#define _ASYNCIO__SET_RUNNING_LOOP_METHODDEF \
{"_set_running_loop", (PyCFunction)_asyncio__set_running_loop, METH_O, _asyncio__set_running_loop__doc__},
PyDoc_STRVAR(_asyncio_get_event_loop__doc__,
"get_event_loop($module, /)\n"
"--\n"
"\n"
"Return an asyncio event loop.\n"
"\n"
"When called from a coroutine or a callback (e.g. scheduled with\n"
"call_soon or similar API), this function will always return the\n"
"running event loop.\n"
"\n"
"If there is no running event loop set, the function will return\n"
"the result of `get_event_loop_policy().get_event_loop()` call.");
#define _ASYNCIO_GET_EVENT_LOOP_METHODDEF \
{"get_event_loop", (PyCFunction)_asyncio_get_event_loop, METH_NOARGS, _asyncio_get_event_loop__doc__},
static PyObject *
_asyncio_get_event_loop_impl(PyObject *module);
static PyObject *
_asyncio_get_event_loop(PyObject *module, PyObject *Py_UNUSED(ignored))
{
return _asyncio_get_event_loop_impl(module);
}
PyDoc_STRVAR(_asyncio_get_running_loop__doc__,
"get_running_loop($module, /)\n"
"--\n"
"\n"
"Return the running event loop. Raise a RuntimeError if there is none.\n"
"\n"
"This function is thread-specific.");
#define _ASYNCIO_GET_RUNNING_LOOP_METHODDEF \
{"get_running_loop", (PyCFunction)_asyncio_get_running_loop, METH_NOARGS, _asyncio_get_running_loop__doc__},
static PyObject *
_asyncio_get_running_loop_impl(PyObject *module);
static PyObject *
_asyncio_get_running_loop(PyObject *module, PyObject *Py_UNUSED(ignored))
{
return _asyncio_get_running_loop_impl(module);
}
/*[clinic end generated code: output=d40b94e629571d48 input=a9049054013a1b77]*/
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment