concurrent.futures.rst 14.4 KB
Newer Older
1 2
:mod:`concurrent.futures` --- Launching parallel tasks
======================================================
3 4 5 6

.. module:: concurrent.futures
   :synopsis: Execute computations concurrently using threads or processes.

7 8
.. versionadded:: 3.2

Raymond Hettinger's avatar
Raymond Hettinger committed
9 10 11 12 13
**Source code:** :source:`Lib/concurrent/futures/thread.py`
and :source:`Lib/concurrent/futures/process.py`

--------------

14 15 16
The :mod:`concurrent.futures` module provides a high-level interface for
asynchronously executing callables.

Ezio Melotti's avatar
Ezio Melotti committed
17
The asynchronous execution can be performed with threads, using
Georg Brandl's avatar
Georg Brandl committed
18
:class:`ThreadPoolExecutor`, or separate processes, using
19
:class:`ProcessPoolExecutor`.  Both implement the same interface, which is
20 21
defined by the abstract :class:`Executor` class.

22

23
Executor Objects
24
----------------
25

26
.. class:: Executor
27

28
   An abstract class that provides methods to execute calls asynchronously.  It
29
   should not be used directly, but through its concrete subclasses.
30 31 32

    .. method:: submit(fn, *args, **kwargs)

33 34
       Schedules the callable, *fn*, to be executed as ``fn(*args **kwargs)``
       and returns a :class:`Future` object representing the execution of the
35
       callable. ::
36

37 38 39
          with ThreadPoolExecutor(max_workers=1) as executor:
              future = executor.submit(pow, 323, 1235)
              print(future.result())
40 41 42

    .. method:: map(func, *iterables, timeout=None)

43
       Equivalent to ``map(func, *iterables)`` except *func* is executed
44
       asynchronously and several calls to *func* may be made concurrently.  The
45 46
       returned iterator raises a :exc:`TimeoutError` if :meth:`__next__()` is
       called and the result isn't available after *timeout* seconds from the
47
       original call to :meth:`Executor.map`. *timeout* can be an int or a
48 49 50
       float.  If *timeout* is not specified or ``None``, there is no limit to
       the wait time.  If a call raises an exception, then that exception will
       be raised when its value is retrieved from the iterator.
51 52 53 54

    .. method:: shutdown(wait=True)

       Signal the executor that it should free any resources that it is using
55
       when the currently pending futures are done executing.  Calls to
56 57 58
       :meth:`Executor.submit` and :meth:`Executor.map` made after shutdown will
       raise :exc:`RuntimeError`.

59
       If *wait* is ``True`` then this method will not return until all the
60
       pending futures are done executing and the resources associated with the
61
       executor have been freed.  If *wait* is ``False`` then this method will
62
       return immediately and the resources associated with the executor will be
63
       freed when all pending futures are done executing.  Regardless of the
64 65 66
       value of *wait*, the entire Python program will not exit until all
       pending futures are done executing.

67 68 69 70
       You can avoid having to call this method explicitly if you use the
       :keyword:`with` statement, which will shutdown the :class:`Executor`
       (waiting as if :meth:`Executor.shutdown` were called with *wait* set to
       ``True``)::
71

72 73 74 75 76 77
          import shutil
          with ThreadPoolExecutor(max_workers=4) as e:
              e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
              e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
              e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
              e.submit(shutil.copy, 'src3.txt', 'dest4.txt')
78 79 80


ThreadPoolExecutor
81
------------------
82

83 84
:class:`ThreadPoolExecutor` is a :class:`Executor` subclass that uses a pool of
threads to execute calls asynchronously.
85

86
Deadlocks can occur when the callable associated with a :class:`Future` waits on
87
the results of another :class:`Future`.  For example::
88

89 90 91 92 93
   import time
   def wait_on_b():
       time.sleep(5)
       print(b.result()) # b will never complete because it is waiting on a.
       return 5
94

95 96 97 98
   def wait_on_a():
       time.sleep(5)
       print(a.result()) # a will never complete because it is waiting on b.
       return 6
99 100


101 102 103
   executor = ThreadPoolExecutor(max_workers=2)
   a = executor.submit(wait_on_b)
   b = executor.submit(wait_on_a)
104

105
And::
106

107 108 109 110 111
   def wait_on_future():
       f = executor.submit(pow, 5, 2)
       # This will never complete because there is only one worker thread and
       # it is executing this function.
       print(f.result())
112

113 114
   executor = ThreadPoolExecutor(max_workers=1)
   executor.submit(wait_on_future)
115 116 117 118 119 120 121


.. class:: ThreadPoolExecutor(max_workers)

   An :class:`Executor` subclass that uses a pool of at most *max_workers*
   threads to execute calls asynchronously.

122

123 124 125
.. _threadpoolexecutor-example:

ThreadPoolExecutor Example
126
~~~~~~~~~~~~~~~~~~~~~~~~~~
127 128
::

129 130
   import concurrent.futures
   import urllib.request
131

132 133 134 135 136
   URLS = ['http://www.foxnews.com/',
           'http://www.cnn.com/',
           'http://europe.wsj.com/',
           'http://www.bbc.co.uk/',
           'http://some-made-up-domain.com/']
137

138 139
   def load_url(url, timeout):
       return urllib.request.urlopen(url, timeout=timeout).read()
140

141 142 143
   with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
       future_to_url = dict((executor.submit(load_url, url, 60), url)
                            for url in URLS)
144

145 146 147 148 149 150 151
       for future in concurrent.futures.as_completed(future_to_url):
           url = future_to_url[future]
           if future.exception() is not None:
               print('%r generated an exception: %s' % (url,
                                                        future.exception()))
           else:
               print('%r page is %d bytes' % (url, len(future.result())))
152 153 154


ProcessPoolExecutor
155
-------------------
156 157 158 159 160 161 162 163 164 165 166 167

The :class:`ProcessPoolExecutor` class is an :class:`Executor` subclass that
uses a pool of processes to execute calls asynchronously.
:class:`ProcessPoolExecutor` uses the :mod:`multiprocessing` module, which
allows it to side-step the :term:`Global Interpreter Lock` but also means that
only picklable objects can be executed and returned.

Calling :class:`Executor` or :class:`Future` methods from a callable submitted
to a :class:`ProcessPoolExecutor` will result in deadlock.

.. class:: ProcessPoolExecutor(max_workers=None)

168
   An :class:`Executor` subclass that executes calls asynchronously using a pool
169
   of at most *max_workers* processes.  If *max_workers* is ``None`` or not
170
   given, it will default to the number of processors on the machine.
171

172 173 174 175 176 177
   .. versionchanged:: 3.3
      When one of the worker processes terminates abruptly, a
      :exc:`BrokenProcessPool` error is now raised.  Previously, behaviour
      was undefined but operations on the executor or its futures would often
      freeze or deadlock.

178

179 180 181
.. _processpoolexecutor-example:

ProcessPoolExecutor Example
182
~~~~~~~~~~~~~~~~~~~~~~~~~~~
183 184
::

185 186 187 188 189 190 191 192 193 194
   import concurrent.futures
   import math

   PRIMES = [
       112272535095293,
       112582705942171,
       112272535095293,
       115280095190773,
       115797848077099,
       1099726899285419]
195

196 197 198
   def is_prime(n):
       if n % 2 == 0:
           return False
199

200 201 202 203 204
       sqrt_n = int(math.floor(math.sqrt(n)))
       for i in range(3, sqrt_n + 1, 2):
           if n % i == 0:
               return False
       return True
205

206 207 208 209
   def main():
       with concurrent.futures.ProcessPoolExecutor() as executor:
           for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
               print('%d is prime: %s' % (number, prime))
210

211 212
   if __name__ == '__main__':
       main()
213 214 215


Future Objects
216
--------------
217

Georg Brandl's avatar
Georg Brandl committed
218
The :class:`Future` class encapsulates the asynchronous execution of a callable.
219 220
:class:`Future` instances are created by :meth:`Executor.submit`.

221
.. class:: Future
222

Georg Brandl's avatar
Georg Brandl committed
223
   Encapsulates the asynchronous execution of a callable.  :class:`Future`
224 225 226 227 228
   instances are created by :meth:`Executor.submit` and should not be created
   directly except for testing.

    .. method:: cancel()

229
       Attempt to cancel the call.  If the call is currently being executed and
Eric Smith's avatar
Eric Smith committed
230
       cannot be cancelled then the method will return ``False``, otherwise the
231
       call will be cancelled and the method will return ``True``.
232 233 234

    .. method:: cancelled()

235
       Return ``True`` if the call was successfully cancelled.
236 237 238

    .. method:: running()

239
       Return ``True`` if the call is currently being executed and cannot be
240 241 242 243
       cancelled.

    .. method:: done()

244 245
       Return ``True`` if the call was successfully cancelled or finished
       running.
246 247 248 249

    .. method:: result(timeout=None)

       Return the value returned by the call. If the call hasn't yet completed
250
       then this method will wait up to *timeout* seconds.  If the call hasn't
251
       completed in *timeout* seconds, then a :exc:`TimeoutError` will be
252
       raised. *timeout* can be an int or float.  If *timeout* is not specified
253
       or ``None``, there is no limit to the wait time.
254 255 256 257

       If the future is cancelled before completing then :exc:`CancelledError`
       will be raised.

258
       If the call raised, this method will raise the same exception.
259 260 261

    .. method:: exception(timeout=None)

262 263 264 265 266
       Return the exception raised by the call.  If the call hasn't yet
       completed then this method will wait up to *timeout* seconds.  If the
       call hasn't completed in *timeout* seconds, then a :exc:`TimeoutError`
       will be raised.  *timeout* can be an int or float.  If *timeout* is not
       specified or ``None``, there is no limit to the wait time.
267 268 269 270

       If the future is cancelled before completing then :exc:`CancelledError`
       will be raised.

271
       If the call completed without raising, ``None`` is returned.
272 273 274

    .. method:: add_done_callback(fn)

275
       Attaches the callable *fn* to the future.  *fn* will be called, with the
276 277 278 279
       future as its only argument, when the future is cancelled or finishes
       running.

       Added callables are called in the order that they were added and are
280
       always called in a thread belonging to the process that added them.  If
281
       the callable raises a :exc:`Exception` subclass, it will be logged and
282
       ignored.  If the callable raises a :exc:`BaseException` subclass, the
283
       behavior is undefined.
284

285
       If the future has already completed or been cancelled, *fn* will be
286 287 288 289 290 291 292 293
       called immediately.

   The following :class:`Future` methods are meant for use in unit tests and
   :class:`Executor` implementations.

    .. method:: set_running_or_notify_cancel()

       This method should only be called by :class:`Executor` implementations
294 295
       before executing the work associated with the :class:`Future` and by unit
       tests.
296

297
       If the method returns ``False`` then the :class:`Future` was cancelled,
298 299 300
       i.e. :meth:`Future.cancel` was called and returned `True`.  Any threads
       waiting on the :class:`Future` completing (i.e. through
       :func:`as_completed` or :func:`wait`) will be woken up.
301

302
       If the method returns ``True`` then the :class:`Future` was not cancelled
303
       and has been put in the running state, i.e. calls to
304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
       :meth:`Future.running` will return `True`.

       This method can only be called once and cannot be called after
       :meth:`Future.set_result` or :meth:`Future.set_exception` have been
       called.

    .. method:: set_result(result)

       Sets the result of the work associated with the :class:`Future` to
       *result*.

       This method should only be used by :class:`Executor` implementations and
       unit tests.

    .. method:: set_exception(exception)

       Sets the result of the work associated with the :class:`Future` to the
       :class:`Exception` *exception*.

       This method should only be used by :class:`Executor` implementations and
       unit tests.


Module Functions
328
----------------
329 330 331 332

.. function:: wait(fs, timeout=None, return_when=ALL_COMPLETED)

   Wait for the :class:`Future` instances (possibly created by different
333 334 335
   :class:`Executor` instances) given by *fs* to complete.  Returns a named
   2-tuple of sets.  The first set, named ``done``, contains the futures that
   completed (finished or were cancelled) before the wait completed.  The second
336
   set, named ``not_done``, contains uncompleted futures.
337 338

   *timeout* can be used to control the maximum number of seconds to wait before
339 340
   returning.  *timeout* can be an int or float.  If *timeout* is not specified
   or ``None``, there is no limit to the wait time.
341

342
   *return_when* indicates when this function should return.  It must be one of
343 344
   the following constants:

345 346 347 348 349 350 351 352 353 354 355 356 357 358 359
   +-----------------------------+----------------------------------------+
   | Constant                    | Description                            |
   +=============================+========================================+
   | :const:`FIRST_COMPLETED`    | The function will return when any      |
   |                             | future finishes or is cancelled.       |
   +-----------------------------+----------------------------------------+
   | :const:`FIRST_EXCEPTION`    | The function will return when any      |
   |                             | future finishes by raising an          |
   |                             | exception.  If no future raises an     |
   |                             | exception then it is equivalent to     |
   |                             | :const:`ALL_COMPLETED`.                |
   +-----------------------------+----------------------------------------+
   | :const:`ALL_COMPLETED`      | The function will return when all      |
   |                             | futures finish or are cancelled.       |
   +-----------------------------+----------------------------------------+
360 361 362

.. function:: as_completed(fs, timeout=None)

363 364
   Returns an iterator over the :class:`Future` instances (possibly created by
   different :class:`Executor` instances) given by *fs* that yields futures as
365 366 367 368 369
   they complete (finished or were cancelled).  Any futures that completed
   before :func:`as_completed` is called will be yielded first.  The returned
   iterator raises a :exc:`TimeoutError` if :meth:`__next__` is called and the
   result isn't available after *timeout* seconds from the original call to
   :func:`as_completed`.  *timeout* can be an int or float.  If *timeout* is not
370
   specified or ``None``, there is no limit to the wait time.
371 372 373 374 375 376 377


.. seealso::

   :pep:`3148` -- futures - execute computations asynchronously
      The proposal which described this feature for inclusion in the Python
      standard library.
378 379 380 381 382 383 384 385 386 387 388 389 390


Exception classes
-----------------

.. exception:: BrokenProcessPool

   Derived from :exc:`RuntimeError`, this exception class is raised when
   one of the workers of a :class:`ProcessPoolExecutor` has terminated
   in a non-clean fashion (for example, if it was killed from the outside).

   .. versionadded:: 3.3