concurrent.futures.rst 18.8 KB
Newer Older
1 2
:mod:`concurrent.futures` --- Launching parallel tasks
======================================================
3 4 5 6

.. module:: concurrent.futures
   :synopsis: Execute computations concurrently using threads or processes.

7 8
.. versionadded:: 3.2

Raymond Hettinger's avatar
Raymond Hettinger committed
9 10 11 12 13
**Source code:** :source:`Lib/concurrent/futures/thread.py`
and :source:`Lib/concurrent/futures/process.py`

--------------

14 15 16
The :mod:`concurrent.futures` module provides a high-level interface for
asynchronously executing callables.

Ezio Melotti's avatar
Ezio Melotti committed
17
The asynchronous execution can be performed with threads, using
Georg Brandl's avatar
Georg Brandl committed
18
:class:`ThreadPoolExecutor`, or separate processes, using
19
:class:`ProcessPoolExecutor`.  Both implement the same interface, which is
20 21
defined by the abstract :class:`Executor` class.

22

23
Executor Objects
24
----------------
25

26
.. class:: Executor
27

28
   An abstract class that provides methods to execute calls asynchronously.  It
29
   should not be used directly, but through its concrete subclasses.
30 31 32

    .. method:: submit(fn, *args, **kwargs)

33 34
       Schedules the callable, *fn*, to be executed as ``fn(*args **kwargs)``
       and returns a :class:`Future` object representing the execution of the
35
       callable. ::
36

37 38 39
          with ThreadPoolExecutor(max_workers=1) as executor:
              future = executor.submit(pow, 323, 1235)
              print(future.result())
40

41
    .. method:: map(func, *iterables, timeout=None, chunksize=1)
42

43 44 45 46 47 48 49 50 51
       Similar to :func:`map(func, *iterables) <map>` except:

       * the *iterables* are collected immediately rather than lazily;

       * *func* is executed asynchronously and several calls to
         *func* may be made concurrently.

       The returned iterator raises a :exc:`concurrent.futures.TimeoutError`
       if :meth:`~iterator.__next__` is called and the result isn't available
52 53
       after *timeout* seconds from the original call to :meth:`Executor.map`.
       *timeout* can be an int or a float.  If *timeout* is not specified or
54 55 56 57 58 59 60 61 62 63 64 65
       ``None``, there is no limit to the wait time.

       If a *func* call raises an exception, then that exception will be
       raised when its value is retrieved from the iterator.

       When using :class:`ProcessPoolExecutor`, this method chops *iterables*
       into a number of chunks which it submits to the pool as separate
       tasks.  The (approximate) size of these chunks can be specified by
       setting *chunksize* to a positive integer.  For very long iterables,
       using a large value for *chunksize* can significantly improve
       performance compared to the default size of 1.  With
       :class:`ThreadPoolExecutor`, *chunksize* has no effect.
66 67 68

       .. versionchanged:: 3.5
          Added the *chunksize* argument.
69 70 71 72

    .. method:: shutdown(wait=True)

       Signal the executor that it should free any resources that it is using
73
       when the currently pending futures are done executing.  Calls to
74 75 76
       :meth:`Executor.submit` and :meth:`Executor.map` made after shutdown will
       raise :exc:`RuntimeError`.

77
       If *wait* is ``True`` then this method will not return until all the
78
       pending futures are done executing and the resources associated with the
79
       executor have been freed.  If *wait* is ``False`` then this method will
80
       return immediately and the resources associated with the executor will be
81
       freed when all pending futures are done executing.  Regardless of the
82 83 84
       value of *wait*, the entire Python program will not exit until all
       pending futures are done executing.

85 86 87 88
       You can avoid having to call this method explicitly if you use the
       :keyword:`with` statement, which will shutdown the :class:`Executor`
       (waiting as if :meth:`Executor.shutdown` were called with *wait* set to
       ``True``)::
89

90 91 92 93 94
          import shutil
          with ThreadPoolExecutor(max_workers=4) as e:
              e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
              e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
              e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
95
              e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
96 97 98


ThreadPoolExecutor
99
------------------
100

101
:class:`ThreadPoolExecutor` is an :class:`Executor` subclass that uses a pool of
102
threads to execute calls asynchronously.
103

104
Deadlocks can occur when the callable associated with a :class:`Future` waits on
105
the results of another :class:`Future`.  For example::
106

107 108 109
   import time
   def wait_on_b():
       time.sleep(5)
110
       print(b.result())  # b will never complete because it is waiting on a.
111
       return 5
112

113 114
   def wait_on_a():
       time.sleep(5)
115
       print(a.result())  # a will never complete because it is waiting on b.
116
       return 6
117 118


119 120 121
   executor = ThreadPoolExecutor(max_workers=2)
   a = executor.submit(wait_on_b)
   b = executor.submit(wait_on_a)
122

123
And::
124

125 126 127 128 129
   def wait_on_future():
       f = executor.submit(pow, 5, 2)
       # This will never complete because there is only one worker thread and
       # it is executing this function.
       print(f.result())
130

131 132
   executor = ThreadPoolExecutor(max_workers=1)
   executor.submit(wait_on_future)
133 134


135
.. class:: ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())
136 137 138 139

   An :class:`Executor` subclass that uses a pool of at most *max_workers*
   threads to execute calls asynchronously.

140 141 142 143 144 145
   *initializer* is an optional callable that is called at the start of
   each worker thread; *initargs* is a tuple of arguments passed to the
   initializer.  Should *initializer* raise an exception, all currently
   pending jobs will raise a :exc:`~concurrent.futures.thread.BrokenThreadPool`,
   as well any attempt to submit more jobs to the pool.

146 147 148 149 150 151 152 153
   .. versionchanged:: 3.5
      If *max_workers* is ``None`` or
      not given, it will default to the number of processors on the machine,
      multiplied by ``5``, assuming that :class:`ThreadPoolExecutor` is often
      used to overlap I/O instead of CPU work and the number of workers
      should be higher than the number of workers
      for :class:`ProcessPoolExecutor`.

154 155 156 157
   .. versionadded:: 3.6
      The *thread_name_prefix* argument was added to allow users to
      control the threading.Thread names for worker threads created by
      the pool for easier debugging.
158

159 160 161 162
   .. versionchanged:: 3.7
      Added the *initializer* and *initargs* arguments.


163 164 165
.. _threadpoolexecutor-example:

ThreadPoolExecutor Example
166
~~~~~~~~~~~~~~~~~~~~~~~~~~
167 168
::

169 170
   import concurrent.futures
   import urllib.request
171

172 173 174 175 176
   URLS = ['http://www.foxnews.com/',
           'http://www.cnn.com/',
           'http://europe.wsj.com/',
           'http://www.bbc.co.uk/',
           'http://some-made-up-domain.com/']
177

178
   # Retrieve a single page and report the URL and contents
179
   def load_url(url, timeout):
180 181
       with urllib.request.urlopen(url, timeout=timeout) as conn:
           return conn.read()
182

183
   # We can use a with statement to ensure threads are cleaned up promptly
184
   with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
185
       # Start the load operations and mark each future with its URL
186
       future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
187
       for future in concurrent.futures.as_completed(future_to_url):
188
           url = future_to_url[future]
189 190 191 192
           try:
               data = future.result()
           except Exception as exc:
               print('%r generated an exception: %s' % (url, exc))
193
           else:
194
               print('%r page is %d bytes' % (url, len(data)))
195 196 197


ProcessPoolExecutor
198
-------------------
199 200 201 202 203 204 205

The :class:`ProcessPoolExecutor` class is an :class:`Executor` subclass that
uses a pool of processes to execute calls asynchronously.
:class:`ProcessPoolExecutor` uses the :mod:`multiprocessing` module, which
allows it to side-step the :term:`Global Interpreter Lock` but also means that
only picklable objects can be executed and returned.

206 207 208
The ``__main__`` module must be importable by worker subprocesses. This means
that :class:`ProcessPoolExecutor` will not work in the interactive interpreter.

209 210 211
Calling :class:`Executor` or :class:`Future` methods from a callable submitted
to a :class:`ProcessPoolExecutor` will result in deadlock.

212
.. class:: ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())
213

214
   An :class:`Executor` subclass that executes calls asynchronously using a pool
215
   of at most *max_workers* processes.  If *max_workers* is ``None`` or not
216
   given, it will default to the number of processors on the machine.
217 218
   If *max_workers* is lower or equal to ``0``, then a :exc:`ValueError`
   will be raised.
219 220 221
   *mp_context* can be a multiprocessing context or None. It will be used to
   launch the workers. If *mp_context* is ``None`` or not given, the default
   multiprocessing context is used.
222

223 224 225 226 227 228
   *initializer* is an optional callable that is called at the start of
   each worker process; *initargs* is a tuple of arguments passed to the
   initializer.  Should *initializer* raise an exception, all currently
   pending jobs will raise a :exc:`~concurrent.futures.thread.BrokenThreadPool`,
   as well any attempt to submit more jobs to the pool.

229 230 231 232 233 234
   .. versionchanged:: 3.3
      When one of the worker processes terminates abruptly, a
      :exc:`BrokenProcessPool` error is now raised.  Previously, behaviour
      was undefined but operations on the executor or its futures would often
      freeze or deadlock.

235 236 237 238
   .. versionchanged:: 3.7
      The *mp_context* argument was added to allow users to control the
      start_method for worker processes created by the pool.

239 240
      Added the *initializer* and *initargs* arguments.

241

242 243 244
.. _processpoolexecutor-example:

ProcessPoolExecutor Example
245
~~~~~~~~~~~~~~~~~~~~~~~~~~~
246 247
::

248 249 250 251 252 253 254 255 256 257
   import concurrent.futures
   import math

   PRIMES = [
       112272535095293,
       112582705942171,
       112272535095293,
       115280095190773,
       115797848077099,
       1099726899285419]
258

259 260 261
   def is_prime(n):
       if n % 2 == 0:
           return False
262

263 264 265 266 267
       sqrt_n = int(math.floor(math.sqrt(n)))
       for i in range(3, sqrt_n + 1, 2):
           if n % i == 0:
               return False
       return True
268

269 270 271 272
   def main():
       with concurrent.futures.ProcessPoolExecutor() as executor:
           for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
               print('%d is prime: %s' % (number, prime))
273

274 275
   if __name__ == '__main__':
       main()
276 277 278


Future Objects
279
--------------
280

Georg Brandl's avatar
Georg Brandl committed
281
The :class:`Future` class encapsulates the asynchronous execution of a callable.
282 283
:class:`Future` instances are created by :meth:`Executor.submit`.

284
.. class:: Future
285

Georg Brandl's avatar
Georg Brandl committed
286
   Encapsulates the asynchronous execution of a callable.  :class:`Future`
287 288 289 290 291
   instances are created by :meth:`Executor.submit` and should not be created
   directly except for testing.

    .. method:: cancel()

292
       Attempt to cancel the call.  If the call is currently being executed and
Eric Smith's avatar
Eric Smith committed
293
       cannot be cancelled then the method will return ``False``, otherwise the
294
       call will be cancelled and the method will return ``True``.
295 296 297

    .. method:: cancelled()

298
       Return ``True`` if the call was successfully cancelled.
299 300 301

    .. method:: running()

302
       Return ``True`` if the call is currently being executed and cannot be
303 304 305 306
       cancelled.

    .. method:: done()

307 308
       Return ``True`` if the call was successfully cancelled or finished
       running.
309 310 311 312

    .. method:: result(timeout=None)

       Return the value returned by the call. If the call hasn't yet completed
313
       then this method will wait up to *timeout* seconds.  If the call hasn't
314 315 316 317
       completed in *timeout* seconds, then a
       :exc:`concurrent.futures.TimeoutError` will be raised. *timeout* can be
       an int or float.  If *timeout* is not specified or ``None``, there is no
       limit to the wait time.
318

319
       If the future is cancelled before completing then :exc:`.CancelledError`
320 321
       will be raised.

322
       If the call raised, this method will raise the same exception.
323 324 325

    .. method:: exception(timeout=None)

326 327
       Return the exception raised by the call.  If the call hasn't yet
       completed then this method will wait up to *timeout* seconds.  If the
328 329 330 331
       call hasn't completed in *timeout* seconds, then a
       :exc:`concurrent.futures.TimeoutError` will be raised.  *timeout* can be
       an int or float.  If *timeout* is not specified or ``None``, there is no
       limit to the wait time.
332

333
       If the future is cancelled before completing then :exc:`.CancelledError`
334 335
       will be raised.

336
       If the call completed without raising, ``None`` is returned.
337 338 339

    .. method:: add_done_callback(fn)

340
       Attaches the callable *fn* to the future.  *fn* will be called, with the
341 342 343 344
       future as its only argument, when the future is cancelled or finishes
       running.

       Added callables are called in the order that they were added and are
345
       always called in a thread belonging to the process that added them.  If
346
       the callable raises an :exc:`Exception` subclass, it will be logged and
347
       ignored.  If the callable raises a :exc:`BaseException` subclass, the
348
       behavior is undefined.
349

350
       If the future has already completed or been cancelled, *fn* will be
351 352 353 354 355 356 357 358
       called immediately.

   The following :class:`Future` methods are meant for use in unit tests and
   :class:`Executor` implementations.

    .. method:: set_running_or_notify_cancel()

       This method should only be called by :class:`Executor` implementations
359 360
       before executing the work associated with the :class:`Future` and by unit
       tests.
361

362
       If the method returns ``False`` then the :class:`Future` was cancelled,
363 364 365
       i.e. :meth:`Future.cancel` was called and returned `True`.  Any threads
       waiting on the :class:`Future` completing (i.e. through
       :func:`as_completed` or :func:`wait`) will be woken up.
366

367
       If the method returns ``True`` then the :class:`Future` was not cancelled
368
       and has been put in the running state, i.e. calls to
369 370 371 372 373 374 375 376 377 378 379 380 381 382
       :meth:`Future.running` will return `True`.

       This method can only be called once and cannot be called after
       :meth:`Future.set_result` or :meth:`Future.set_exception` have been
       called.

    .. method:: set_result(result)

       Sets the result of the work associated with the :class:`Future` to
       *result*.

       This method should only be used by :class:`Executor` implementations and
       unit tests.

383 384 385 386 387
       .. versionchanged:: 3.8
          This method raises
          :exc:`concurrent.futures.InvalidStateError` if the :class:`Future` is
          already done.

388 389 390 391 392 393 394 395
    .. method:: set_exception(exception)

       Sets the result of the work associated with the :class:`Future` to the
       :class:`Exception` *exception*.

       This method should only be used by :class:`Executor` implementations and
       unit tests.

396 397 398 399
       .. versionchanged:: 3.8
          This method raises
          :exc:`concurrent.futures.InvalidStateError` if the :class:`Future` is
          already done.
400 401

Module Functions
402
----------------
403 404 405 406

.. function:: wait(fs, timeout=None, return_when=ALL_COMPLETED)

   Wait for the :class:`Future` instances (possibly created by different
407 408 409
   :class:`Executor` instances) given by *fs* to complete.  Returns a named
   2-tuple of sets.  The first set, named ``done``, contains the futures that
   completed (finished or were cancelled) before the wait completed.  The second
410
   set, named ``not_done``, contains uncompleted futures.
411 412

   *timeout* can be used to control the maximum number of seconds to wait before
413 414
   returning.  *timeout* can be an int or float.  If *timeout* is not specified
   or ``None``, there is no limit to the wait time.
415

416
   *return_when* indicates when this function should return.  It must be one of
417 418
   the following constants:

419 420
   .. tabularcolumns:: |l|L|

421 422 423 424 425 426 427 428 429 430 431 432 433 434 435
   +-----------------------------+----------------------------------------+
   | Constant                    | Description                            |
   +=============================+========================================+
   | :const:`FIRST_COMPLETED`    | The function will return when any      |
   |                             | future finishes or is cancelled.       |
   +-----------------------------+----------------------------------------+
   | :const:`FIRST_EXCEPTION`    | The function will return when any      |
   |                             | future finishes by raising an          |
   |                             | exception.  If no future raises an     |
   |                             | exception then it is equivalent to     |
   |                             | :const:`ALL_COMPLETED`.                |
   +-----------------------------+----------------------------------------+
   | :const:`ALL_COMPLETED`      | The function will return when all      |
   |                             | futures finish or are cancelled.       |
   +-----------------------------+----------------------------------------+
436 437 438

.. function:: as_completed(fs, timeout=None)

439 440
   Returns an iterator over the :class:`Future` instances (possibly created by
   different :class:`Executor` instances) given by *fs* that yields futures as
441
   they complete (finished or were cancelled). Any futures given by *fs* that
442 443 444 445 446 447
   are duplicated will be returned once. Any futures that completed before
   :func:`as_completed` is called will be yielded first.  The returned iterator
   raises a :exc:`concurrent.futures.TimeoutError` if :meth:`~iterator.__next__`
   is called and the result isn't available after *timeout* seconds from the
   original call to :func:`as_completed`.  *timeout* can be an int or float. If
   *timeout* is not specified or ``None``, there is no limit to the wait time.
448 449 450 451 452 453 454


.. seealso::

   :pep:`3148` -- futures - execute computations asynchronously
      The proposal which described this feature for inclusion in the Python
      standard library.
455 456 457 458 459


Exception classes
-----------------

460 461 462 463 464 465 466 467 468 469
.. currentmodule:: concurrent.futures

.. exception:: CancelledError

   Raised when a future is cancelled.

.. exception:: TimeoutError

   Raised when a future operation exceeds the given timeout.

470 471 472 473 474 475 476 477
.. exception:: BrokenExecutor

   Derived from :exc:`RuntimeError`, this exception class is raised
   when an executor is broken for some reason, and cannot be used
   to submit or execute new tasks.

   .. versionadded:: 3.7

478 479 480 481 482 483 484
.. exception:: InvalidStateError

   Raised when an operation is performed on a future that is not allowed
   in the current state.

   .. versionadded:: 3.8

485 486 487 488 489 490 491 492 493 494
.. currentmodule:: concurrent.futures.thread

.. exception:: BrokenThreadPool

   Derived from :exc:`~concurrent.futures.BrokenExecutor`, this exception
   class is raised when one of the workers of a :class:`ThreadPoolExecutor`
   has failed initializing.

   .. versionadded:: 3.7

Georg Brandl's avatar
Georg Brandl committed
495 496
.. currentmodule:: concurrent.futures.process

497 498
.. exception:: BrokenProcessPool

499 500 501 502
   Derived from :exc:`~concurrent.futures.BrokenExecutor` (formerly
   :exc:`RuntimeError`), this exception class is raised when one of the
   workers of a :class:`ProcessPoolExecutor` has terminated in a non-clean
   fashion (for example, if it was killed from the outside).
503 504

   .. versionadded:: 3.3