Kaydet (Commit) 4aae276e authored tarafından Antoine Pitrou's avatar Antoine Pitrou

Issue #11271: concurrent.futures.Executor.map() now takes a *chunksize*

argument to allow batching of tasks in child processes and improve
performance of ProcessPoolExecutor.  Patch by Dan O'Reilly.
üst e4f47088
......@@ -38,7 +38,7 @@ Executor Objects
future = executor.submit(pow, 323, 1235)
print(future.result())
.. method:: map(func, *iterables, timeout=None)
.. method:: map(func, *iterables, timeout=None, chunksize=1)
Equivalent to :func:`map(func, *iterables) <map>` except *func* is executed
asynchronously and several calls to *func* may be made concurrently. The
......@@ -48,7 +48,16 @@ Executor Objects
*timeout* can be an int or a float. If *timeout* is not specified or
``None``, there is no limit to the wait time. If a call raises an
exception, then that exception will be raised when its value is
retrieved from the iterator.
retrieved from the iterator. When using :class:`ProcessPoolExecutor`, this
method chops *iterables* into a number of chunks which it submits to the
pool as separate tasks. The (approximate) size of these chunks can be
specified by setting *chunksize* to a positive integer. For very long
iterables, using a large value for *chunksize* can significantly improve
performance compared to the default size of 1. With :class:`ThreadPoolExecutor`,
*chunksize* has no effect.
.. versionchanged:: 3.5
Added the *chunksize* argument.
.. method:: shutdown(wait=True)
......
......@@ -520,7 +520,7 @@ class Executor(object):
"""
raise NotImplementedError()
def map(self, fn, *iterables, timeout=None):
def map(self, fn, *iterables, timeout=None, chunksize=1):
"""Returns a iterator equivalent to map(fn, iter).
Args:
......@@ -528,6 +528,10 @@ class Executor(object):
passed iterables.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.
chunksize: The size of the chunks the iterable will be broken into
before being passed to a child process. This argument is only
used by ProcessPoolExecutor; it is ignored by
ThreadPoolExecutor.
Returns:
An iterator equivalent to: map(func, *iterables) but the calls may
......
......@@ -55,6 +55,8 @@ from multiprocessing import SimpleQueue
from multiprocessing.connection import wait
import threading
import weakref
from functools import partial
import itertools
# Workers are created as daemon threads and processes. This is done to allow the
# interpreter to exit when there are still idle processes in a
......@@ -108,6 +110,26 @@ class _CallItem(object):
self.args = args
self.kwargs = kwargs
def _get_chunks(*iterables, chunksize):
""" Iterates over zip()ed iterables in chunks. """
it = zip(*iterables)
while True:
chunk = tuple(itertools.islice(it, chunksize))
if not chunk:
return
yield chunk
def _process_chunk(fn, chunk):
""" Processes a chunk of an iterable passed to map.
Runs the function passed to map() on a chunk of the
iterable passed to map.
This function is run in a separate process.
"""
return [fn(*args) for args in chunk]
def _process_worker(call_queue, result_queue):
"""Evaluates calls from call_queue and places the results in result_queue.
......@@ -411,6 +433,35 @@ class ProcessPoolExecutor(_base.Executor):
return f
submit.__doc__ = _base.Executor.submit.__doc__
def map(self, fn, *iterables, timeout=None, chunksize=1):
"""Returns a iterator equivalent to map(fn, iter).
Args:
fn: A callable that will take as many arguments as there are
passed iterables.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.
chunksize: If greater than one, the iterables will be chopped into
chunks of size chunksize and submitted to the process pool.
If set to one, the items in the list will be sent one at a time.
Returns:
An iterator equivalent to: map(func, *iterables) but the calls may
be evaluated out-of-order.
Raises:
TimeoutError: If the entire result iterator could not be generated
before the given timeout.
Exception: If fn(*args) raises for any values.
"""
if chunksize < 1:
raise ValueError("chunksize must be >= 1.")
results = super().map(partial(_process_chunk, fn),
_get_chunks(*iterables, chunksize=chunksize),
timeout=timeout)
return itertools.chain.from_iterable(results)
def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown_thread = True
......
......@@ -464,6 +464,22 @@ class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest, unittest.TestCase)
# Submitting other jobs fails as well.
self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
def test_map_chunksize(self):
def bad_map():
list(self.executor.map(pow, range(40), range(40), chunksize=-1))
ref = list(map(pow, range(40), range(40)))
self.assertEqual(
list(self.executor.map(pow, range(40), range(40), chunksize=6)),
ref)
self.assertEqual(
list(self.executor.map(pow, range(40), range(40), chunksize=50)),
ref)
self.assertEqual(
list(self.executor.map(pow, range(40), range(40), chunksize=40)),
ref)
self.assertRaises(ValueError, bad_map)
class FutureTests(unittest.TestCase):
def test_done_callback_with_result(self):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment