concurrent.futures.rst 14.6 KB
Newer Older
1 2
:mod:`concurrent.futures` --- Launching parallel tasks
======================================================
3 4 5 6

.. module:: concurrent.futures
   :synopsis: Execute computations concurrently using threads or processes.

7 8
.. versionadded:: 3.2

Raymond Hettinger's avatar
Raymond Hettinger committed
9 10 11 12 13
**Source code:** :source:`Lib/concurrent/futures/thread.py`
and :source:`Lib/concurrent/futures/process.py`

--------------

14 15 16
The :mod:`concurrent.futures` module provides a high-level interface for
asynchronously executing callables.

Ezio Melotti's avatar
Ezio Melotti committed
17
The asynchronous execution can be performed with threads, using
Georg Brandl's avatar
Georg Brandl committed
18
:class:`ThreadPoolExecutor`, or separate processes, using
19
:class:`ProcessPoolExecutor`.  Both implement the same interface, which is
20 21
defined by the abstract :class:`Executor` class.

22

23
Executor Objects
24
----------------
25

26
.. class:: Executor
27

28
   An abstract class that provides methods to execute calls asynchronously.  It
29
   should not be used directly, but through its concrete subclasses.
30 31 32

    .. method:: submit(fn, *args, **kwargs)

33 34
       Schedules the callable, *fn*, to be executed as ``fn(*args **kwargs)``
       and returns a :class:`Future` object representing the execution of the
35
       callable. ::
36

37 38 39
          with ThreadPoolExecutor(max_workers=1) as executor:
              future = executor.submit(pow, 323, 1235)
              print(future.result())
40 41 42

    .. method:: map(func, *iterables, timeout=None)

43
       Equivalent to :func:`map(func, *iterables) <map>` except *func* is executed
44
       asynchronously and several calls to *func* may be made concurrently.  The
45 46 47 48 49 50 51
       returned iterator raises a :exc:`TimeoutError` if
       :meth:`~iterator.__next__` is called and the result isn't available
       after *timeout* seconds from the original call to :meth:`Executor.map`.
       *timeout* can be an int or a float.  If *timeout* is not specified or
       ``None``, there is no limit to the wait time.  If a call raises an
       exception, then that exception will be raised when its value is
       retrieved from the iterator.
52 53 54 55

    .. method:: shutdown(wait=True)

       Signal the executor that it should free any resources that it is using
56
       when the currently pending futures are done executing.  Calls to
57 58 59
       :meth:`Executor.submit` and :meth:`Executor.map` made after shutdown will
       raise :exc:`RuntimeError`.

60
       If *wait* is ``True`` then this method will not return until all the
61
       pending futures are done executing and the resources associated with the
62
       executor have been freed.  If *wait* is ``False`` then this method will
63
       return immediately and the resources associated with the executor will be
64
       freed when all pending futures are done executing.  Regardless of the
65 66 67
       value of *wait*, the entire Python program will not exit until all
       pending futures are done executing.

68 69 70 71
       You can avoid having to call this method explicitly if you use the
       :keyword:`with` statement, which will shutdown the :class:`Executor`
       (waiting as if :meth:`Executor.shutdown` were called with *wait* set to
       ``True``)::
72

73 74 75 76 77 78
          import shutil
          with ThreadPoolExecutor(max_workers=4) as e:
              e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
              e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
              e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
              e.submit(shutil.copy, 'src3.txt', 'dest4.txt')
79 80 81


ThreadPoolExecutor
82
------------------
83

84 85
:class:`ThreadPoolExecutor` is a :class:`Executor` subclass that uses a pool of
threads to execute calls asynchronously.
86

87
Deadlocks can occur when the callable associated with a :class:`Future` waits on
88
the results of another :class:`Future`.  For example::
89

90 91 92 93 94
   import time
   def wait_on_b():
       time.sleep(5)
       print(b.result()) # b will never complete because it is waiting on a.
       return 5
95

96 97 98 99
   def wait_on_a():
       time.sleep(5)
       print(a.result()) # a will never complete because it is waiting on b.
       return 6
100 101


102 103 104
   executor = ThreadPoolExecutor(max_workers=2)
   a = executor.submit(wait_on_b)
   b = executor.submit(wait_on_a)
105

106
And::
107

108 109 110 111 112
   def wait_on_future():
       f = executor.submit(pow, 5, 2)
       # This will never complete because there is only one worker thread and
       # it is executing this function.
       print(f.result())
113

114 115
   executor = ThreadPoolExecutor(max_workers=1)
   executor.submit(wait_on_future)
116 117 118 119 120 121 122


.. class:: ThreadPoolExecutor(max_workers)

   An :class:`Executor` subclass that uses a pool of at most *max_workers*
   threads to execute calls asynchronously.

123

124 125 126
.. _threadpoolexecutor-example:

ThreadPoolExecutor Example
127
~~~~~~~~~~~~~~~~~~~~~~~~~~
128 129
::

130 131
   import concurrent.futures
   import urllib.request
132

133 134 135 136 137
   URLS = ['http://www.foxnews.com/',
           'http://www.cnn.com/',
           'http://europe.wsj.com/',
           'http://www.bbc.co.uk/',
           'http://some-made-up-domain.com/']
138

139
   # Retrieve a single page and report the url and contents
140
   def load_url(url, timeout):
141 142
       conn = urllib.request.urlopen(url, timeout=timeout)
       return conn.readall()
143

144
   # We can use a with statement to ensure threads are cleaned up promptly
145
   with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
146
       # Start the load operations and mark each future with its URL
147
       future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
148
       for future in concurrent.futures.as_completed(future_to_url):
149
           url = future_to_url[future]
150 151 152 153
           try:
               data = future.result()
           except Exception as exc:
               print('%r generated an exception: %s' % (url, exc))
154
           else:
155
               print('%r page is %d bytes' % (url, len(data)))
156 157 158


ProcessPoolExecutor
159
-------------------
160 161 162 163 164 165 166 167 168 169 170 171

The :class:`ProcessPoolExecutor` class is an :class:`Executor` subclass that
uses a pool of processes to execute calls asynchronously.
:class:`ProcessPoolExecutor` uses the :mod:`multiprocessing` module, which
allows it to side-step the :term:`Global Interpreter Lock` but also means that
only picklable objects can be executed and returned.

Calling :class:`Executor` or :class:`Future` methods from a callable submitted
to a :class:`ProcessPoolExecutor` will result in deadlock.

.. class:: ProcessPoolExecutor(max_workers=None)

172
   An :class:`Executor` subclass that executes calls asynchronously using a pool
173
   of at most *max_workers* processes.  If *max_workers* is ``None`` or not
174
   given, it will default to the number of processors on the machine.
175

176 177 178 179 180 181
   .. versionchanged:: 3.3
      When one of the worker processes terminates abruptly, a
      :exc:`BrokenProcessPool` error is now raised.  Previously, behaviour
      was undefined but operations on the executor or its futures would often
      freeze or deadlock.

182

183 184 185
.. _processpoolexecutor-example:

ProcessPoolExecutor Example
186
~~~~~~~~~~~~~~~~~~~~~~~~~~~
187 188
::

189 190 191 192 193 194 195 196 197 198
   import concurrent.futures
   import math

   PRIMES = [
       112272535095293,
       112582705942171,
       112272535095293,
       115280095190773,
       115797848077099,
       1099726899285419]
199

200 201 202
   def is_prime(n):
       if n % 2 == 0:
           return False
203

204 205 206 207 208
       sqrt_n = int(math.floor(math.sqrt(n)))
       for i in range(3, sqrt_n + 1, 2):
           if n % i == 0:
               return False
       return True
209

210 211 212 213
   def main():
       with concurrent.futures.ProcessPoolExecutor() as executor:
           for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
               print('%d is prime: %s' % (number, prime))
214

215 216
   if __name__ == '__main__':
       main()
217 218 219


Future Objects
220
--------------
221

Georg Brandl's avatar
Georg Brandl committed
222
The :class:`Future` class encapsulates the asynchronous execution of a callable.
223 224
:class:`Future` instances are created by :meth:`Executor.submit`.

225
.. class:: Future
226

Georg Brandl's avatar
Georg Brandl committed
227
   Encapsulates the asynchronous execution of a callable.  :class:`Future`
228 229 230 231 232
   instances are created by :meth:`Executor.submit` and should not be created
   directly except for testing.

    .. method:: cancel()

233
       Attempt to cancel the call.  If the call is currently being executed and
Eric Smith's avatar
Eric Smith committed
234
       cannot be cancelled then the method will return ``False``, otherwise the
235
       call will be cancelled and the method will return ``True``.
236 237 238

    .. method:: cancelled()

239
       Return ``True`` if the call was successfully cancelled.
240 241 242

    .. method:: running()

243
       Return ``True`` if the call is currently being executed and cannot be
244 245 246 247
       cancelled.

    .. method:: done()

248 249
       Return ``True`` if the call was successfully cancelled or finished
       running.
250 251 252 253

    .. method:: result(timeout=None)

       Return the value returned by the call. If the call hasn't yet completed
254
       then this method will wait up to *timeout* seconds.  If the call hasn't
255
       completed in *timeout* seconds, then a :exc:`TimeoutError` will be
256
       raised. *timeout* can be an int or float.  If *timeout* is not specified
257
       or ``None``, there is no limit to the wait time.
258 259 260 261

       If the future is cancelled before completing then :exc:`CancelledError`
       will be raised.

262
       If the call raised, this method will raise the same exception.
263 264 265

    .. method:: exception(timeout=None)

266 267 268 269 270
       Return the exception raised by the call.  If the call hasn't yet
       completed then this method will wait up to *timeout* seconds.  If the
       call hasn't completed in *timeout* seconds, then a :exc:`TimeoutError`
       will be raised.  *timeout* can be an int or float.  If *timeout* is not
       specified or ``None``, there is no limit to the wait time.
271 272 273 274

       If the future is cancelled before completing then :exc:`CancelledError`
       will be raised.

275
       If the call completed without raising, ``None`` is returned.
276 277 278

    .. method:: add_done_callback(fn)

279
       Attaches the callable *fn* to the future.  *fn* will be called, with the
280 281 282 283
       future as its only argument, when the future is cancelled or finishes
       running.

       Added callables are called in the order that they were added and are
284
       always called in a thread belonging to the process that added them.  If
285
       the callable raises a :exc:`Exception` subclass, it will be logged and
286
       ignored.  If the callable raises a :exc:`BaseException` subclass, the
287
       behavior is undefined.
288

289
       If the future has already completed or been cancelled, *fn* will be
290 291 292 293 294 295 296 297
       called immediately.

   The following :class:`Future` methods are meant for use in unit tests and
   :class:`Executor` implementations.

    .. method:: set_running_or_notify_cancel()

       This method should only be called by :class:`Executor` implementations
298 299
       before executing the work associated with the :class:`Future` and by unit
       tests.
300

301
       If the method returns ``False`` then the :class:`Future` was cancelled,
302 303 304
       i.e. :meth:`Future.cancel` was called and returned `True`.  Any threads
       waiting on the :class:`Future` completing (i.e. through
       :func:`as_completed` or :func:`wait`) will be woken up.
305

306
       If the method returns ``True`` then the :class:`Future` was not cancelled
307
       and has been put in the running state, i.e. calls to
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331
       :meth:`Future.running` will return `True`.

       This method can only be called once and cannot be called after
       :meth:`Future.set_result` or :meth:`Future.set_exception` have been
       called.

    .. method:: set_result(result)

       Sets the result of the work associated with the :class:`Future` to
       *result*.

       This method should only be used by :class:`Executor` implementations and
       unit tests.

    .. method:: set_exception(exception)

       Sets the result of the work associated with the :class:`Future` to the
       :class:`Exception` *exception*.

       This method should only be used by :class:`Executor` implementations and
       unit tests.


Module Functions
332
----------------
333 334 335 336

.. function:: wait(fs, timeout=None, return_when=ALL_COMPLETED)

   Wait for the :class:`Future` instances (possibly created by different
337 338 339
   :class:`Executor` instances) given by *fs* to complete.  Returns a named
   2-tuple of sets.  The first set, named ``done``, contains the futures that
   completed (finished or were cancelled) before the wait completed.  The second
340
   set, named ``not_done``, contains uncompleted futures.
341 342

   *timeout* can be used to control the maximum number of seconds to wait before
343 344
   returning.  *timeout* can be an int or float.  If *timeout* is not specified
   or ``None``, there is no limit to the wait time.
345

346
   *return_when* indicates when this function should return.  It must be one of
347 348
   the following constants:

349 350
   .. tabularcolumns:: |l|L|

351 352 353 354 355 356 357 358 359 360 361 362 363 364 365
   +-----------------------------+----------------------------------------+
   | Constant                    | Description                            |
   +=============================+========================================+
   | :const:`FIRST_COMPLETED`    | The function will return when any      |
   |                             | future finishes or is cancelled.       |
   +-----------------------------+----------------------------------------+
   | :const:`FIRST_EXCEPTION`    | The function will return when any      |
   |                             | future finishes by raising an          |
   |                             | exception.  If no future raises an     |
   |                             | exception then it is equivalent to     |
   |                             | :const:`ALL_COMPLETED`.                |
   +-----------------------------+----------------------------------------+
   | :const:`ALL_COMPLETED`      | The function will return when all      |
   |                             | futures finish or are cancelled.       |
   +-----------------------------+----------------------------------------+
366 367 368

.. function:: as_completed(fs, timeout=None)

369 370
   Returns an iterator over the :class:`Future` instances (possibly created by
   different :class:`Executor` instances) given by *fs* that yields futures as
371 372
   they complete (finished or were cancelled).  Any futures that completed
   before :func:`as_completed` is called will be yielded first.  The returned
373 374 375 376 377
   iterator raises a :exc:`TimeoutError` if :meth:`~iterator.__next__` is
   called and the result isn't available after *timeout* seconds from the
   original call to :func:`as_completed`.  *timeout* can be an int or float.
   If *timeout* is not specified or ``None``, there is no limit to the wait
   time.
378 379 380 381 382 383 384


.. seealso::

   :pep:`3148` -- futures - execute computations asynchronously
      The proposal which described this feature for inclusion in the Python
      standard library.
385 386 387 388 389 390 391 392 393 394 395 396 397


Exception classes
-----------------

.. exception:: BrokenProcessPool

   Derived from :exc:`RuntimeError`, this exception class is raised when
   one of the workers of a :class:`ProcessPoolExecutor` has terminated
   in a non-clean fashion (for example, if it was killed from the outside).

   .. versionadded:: 3.3