1:mod:`concurrent.futures` --- Launching parallel tasks 2====================================================== 3 4.. module:: concurrent.futures 5 :synopsis: Execute computations concurrently using threads or processes. 6 7.. versionadded:: 3.2 8 9**Source code:** :source:`Lib/concurrent/futures/thread.py` 10and :source:`Lib/concurrent/futures/process.py` 11 12-------------- 13 14The :mod:`concurrent.futures` module provides a high-level interface for 15asynchronously executing callables. 16 17The asynchronous execution can be performed with threads, using 18:class:`ThreadPoolExecutor`, or separate processes, using 19:class:`ProcessPoolExecutor`. Both implement the same interface, which is 20defined by the abstract :class:`Executor` class. 21 22.. include:: ../includes/wasm-notavail.rst 23 24Executor Objects 25---------------- 26 27.. class:: Executor 28 29 An abstract class that provides methods to execute calls asynchronously. It 30 should not be used directly, but through its concrete subclasses. 31 32 .. method:: submit(fn, /, *args, **kwargs) 33 34 Schedules the callable, *fn*, to be executed as ``fn(*args, **kwargs)`` 35 and returns a :class:`Future` object representing the execution of the 36 callable. :: 37 38 with ThreadPoolExecutor(max_workers=1) as executor: 39 future = executor.submit(pow, 323, 1235) 40 print(future.result()) 41 42 .. method:: map(func, *iterables, timeout=None, chunksize=1) 43 44 Similar to :func:`map(func, *iterables) <map>` except: 45 46 * the *iterables* are collected immediately rather than lazily; 47 48 * *func* is executed asynchronously and several calls to 49 *func* may be made concurrently. 50 51 The returned iterator raises a :exc:`TimeoutError` 52 if :meth:`~iterator.__next__` is called and the result isn't available 53 after *timeout* seconds from the original call to :meth:`Executor.map`. 54 *timeout* can be an int or a float. If *timeout* is not specified or 55 ``None``, there is no limit to the wait time. 56 57 If a *func* call raises an exception, then that exception will be 58 raised when its value is retrieved from the iterator. 59 60 When using :class:`ProcessPoolExecutor`, this method chops *iterables* 61 into a number of chunks which it submits to the pool as separate 62 tasks. The (approximate) size of these chunks can be specified by 63 setting *chunksize* to a positive integer. For very long iterables, 64 using a large value for *chunksize* can significantly improve 65 performance compared to the default size of 1. With 66 :class:`ThreadPoolExecutor`, *chunksize* has no effect. 67 68 .. versionchanged:: 3.5 69 Added the *chunksize* argument. 70 71 .. method:: shutdown(wait=True, *, cancel_futures=False) 72 73 Signal the executor that it should free any resources that it is using 74 when the currently pending futures are done executing. Calls to 75 :meth:`Executor.submit` and :meth:`Executor.map` made after shutdown will 76 raise :exc:`RuntimeError`. 77 78 If *wait* is ``True`` then this method will not return until all the 79 pending futures are done executing and the resources associated with the 80 executor have been freed. If *wait* is ``False`` then this method will 81 return immediately and the resources associated with the executor will be 82 freed when all pending futures are done executing. Regardless of the 83 value of *wait*, the entire Python program will not exit until all 84 pending futures are done executing. 85 86 If *cancel_futures* is ``True``, this method will cancel all pending 87 futures that the executor has not started running. Any futures that 88 are completed or running won't be cancelled, regardless of the value 89 of *cancel_futures*. 90 91 If both *cancel_futures* and *wait* are ``True``, all futures that the 92 executor has started running will be completed prior to this method 93 returning. The remaining futures are cancelled. 94 95 You can avoid having to call this method explicitly if you use the 96 :keyword:`with` statement, which will shutdown the :class:`Executor` 97 (waiting as if :meth:`Executor.shutdown` were called with *wait* set to 98 ``True``):: 99 100 import shutil 101 with ThreadPoolExecutor(max_workers=4) as e: 102 e.submit(shutil.copy, 'src1.txt', 'dest1.txt') 103 e.submit(shutil.copy, 'src2.txt', 'dest2.txt') 104 e.submit(shutil.copy, 'src3.txt', 'dest3.txt') 105 e.submit(shutil.copy, 'src4.txt', 'dest4.txt') 106 107 .. versionchanged:: 3.9 108 Added *cancel_futures*. 109 110 111ThreadPoolExecutor 112------------------ 113 114:class:`ThreadPoolExecutor` is an :class:`Executor` subclass that uses a pool of 115threads to execute calls asynchronously. 116 117Deadlocks can occur when the callable associated with a :class:`Future` waits on 118the results of another :class:`Future`. For example:: 119 120 import time 121 def wait_on_b(): 122 time.sleep(5) 123 print(b.result()) # b will never complete because it is waiting on a. 124 return 5 125 126 def wait_on_a(): 127 time.sleep(5) 128 print(a.result()) # a will never complete because it is waiting on b. 129 return 6 130 131 132 executor = ThreadPoolExecutor(max_workers=2) 133 a = executor.submit(wait_on_b) 134 b = executor.submit(wait_on_a) 135 136And:: 137 138 def wait_on_future(): 139 f = executor.submit(pow, 5, 2) 140 # This will never complete because there is only one worker thread and 141 # it is executing this function. 142 print(f.result()) 143 144 executor = ThreadPoolExecutor(max_workers=1) 145 executor.submit(wait_on_future) 146 147 148.. class:: ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=()) 149 150 An :class:`Executor` subclass that uses a pool of at most *max_workers* 151 threads to execute calls asynchronously. 152 153 All threads enqueued to ``ThreadPoolExecutor`` will be joined before the 154 interpreter can exit. Note that the exit handler which does this is 155 executed *before* any exit handlers added using ``atexit``. This means 156 exceptions in the main thread must be caught and handled in order to 157 signal threads to exit gracefully. For this reason, it is recommended 158 that ``ThreadPoolExecutor`` not be used for long-running tasks. 159 160 *initializer* is an optional callable that is called at the start of 161 each worker thread; *initargs* is a tuple of arguments passed to the 162 initializer. Should *initializer* raise an exception, all currently 163 pending jobs will raise a :exc:`~concurrent.futures.thread.BrokenThreadPool`, 164 as well as any attempt to submit more jobs to the pool. 165 166 .. versionchanged:: 3.5 167 If *max_workers* is ``None`` or 168 not given, it will default to the number of processors on the machine, 169 multiplied by ``5``, assuming that :class:`ThreadPoolExecutor` is often 170 used to overlap I/O instead of CPU work and the number of workers 171 should be higher than the number of workers 172 for :class:`ProcessPoolExecutor`. 173 174 .. versionadded:: 3.6 175 The *thread_name_prefix* argument was added to allow users to 176 control the :class:`threading.Thread` names for worker threads created by 177 the pool for easier debugging. 178 179 .. versionchanged:: 3.7 180 Added the *initializer* and *initargs* arguments. 181 182 .. versionchanged:: 3.8 183 Default value of *max_workers* is changed to ``min(32, os.cpu_count() + 4)``. 184 This default value preserves at least 5 workers for I/O bound tasks. 185 It utilizes at most 32 CPU cores for CPU bound tasks which release the GIL. 186 And it avoids using very large resources implicitly on many-core machines. 187 188 ThreadPoolExecutor now reuses idle worker threads before starting 189 *max_workers* worker threads too. 190 191 192.. _threadpoolexecutor-example: 193 194ThreadPoolExecutor Example 195~~~~~~~~~~~~~~~~~~~~~~~~~~ 196:: 197 198 import concurrent.futures 199 import urllib.request 200 201 URLS = ['http://www.foxnews.com/', 202 'http://www.cnn.com/', 203 'http://europe.wsj.com/', 204 'http://www.bbc.co.uk/', 205 'http://nonexistant-subdomain.python.org/'] 206 207 # Retrieve a single page and report the URL and contents 208 def load_url(url, timeout): 209 with urllib.request.urlopen(url, timeout=timeout) as conn: 210 return conn.read() 211 212 # We can use a with statement to ensure threads are cleaned up promptly 213 with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: 214 # Start the load operations and mark each future with its URL 215 future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} 216 for future in concurrent.futures.as_completed(future_to_url): 217 url = future_to_url[future] 218 try: 219 data = future.result() 220 except Exception as exc: 221 print('%r generated an exception: %s' % (url, exc)) 222 else: 223 print('%r page is %d bytes' % (url, len(data))) 224 225 226ProcessPoolExecutor 227------------------- 228 229The :class:`ProcessPoolExecutor` class is an :class:`Executor` subclass that 230uses a pool of processes to execute calls asynchronously. 231:class:`ProcessPoolExecutor` uses the :mod:`multiprocessing` module, which 232allows it to side-step the :term:`Global Interpreter Lock 233<global interpreter lock>` but also means that 234only picklable objects can be executed and returned. 235 236The ``__main__`` module must be importable by worker subprocesses. This means 237that :class:`ProcessPoolExecutor` will not work in the interactive interpreter. 238 239Calling :class:`Executor` or :class:`Future` methods from a callable submitted 240to a :class:`ProcessPoolExecutor` will result in deadlock. 241 242.. class:: ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None) 243 244 An :class:`Executor` subclass that executes calls asynchronously using a pool 245 of at most *max_workers* processes. If *max_workers* is ``None`` or not 246 given, it will default to the number of processors on the machine. 247 If *max_workers* is less than or equal to ``0``, then a :exc:`ValueError` 248 will be raised. 249 On Windows, *max_workers* must be less than or equal to ``61``. If it is not 250 then :exc:`ValueError` will be raised. If *max_workers* is ``None``, then 251 the default chosen will be at most ``61``, even if more processors are 252 available. 253 *mp_context* can be a multiprocessing context or None. It will be used to 254 launch the workers. If *mp_context* is ``None`` or not given, the default 255 multiprocessing context is used. 256 257 *initializer* is an optional callable that is called at the start of 258 each worker process; *initargs* is a tuple of arguments passed to the 259 initializer. Should *initializer* raise an exception, all currently 260 pending jobs will raise a :exc:`~concurrent.futures.process.BrokenProcessPool`, 261 as well as any attempt to submit more jobs to the pool. 262 263 *max_tasks_per_child* is an optional argument that specifies the maximum 264 number of tasks a single process can execute before it will exit and be 265 replaced with a fresh worker process. By default *max_tasks_per_child* is 266 ``None`` which means worker processes will live as long as the pool. When 267 a max is specified, the "spawn" multiprocessing start method will be used by 268 default in absence of a *mp_context* parameter. This feature is incompatible 269 with the "fork" start method. 270 271 .. versionchanged:: 3.3 272 When one of the worker processes terminates abruptly, a 273 :exc:`BrokenProcessPool` error is now raised. Previously, behaviour 274 was undefined but operations on the executor or its futures would often 275 freeze or deadlock. 276 277 .. versionchanged:: 3.7 278 The *mp_context* argument was added to allow users to control the 279 start_method for worker processes created by the pool. 280 281 Added the *initializer* and *initargs* arguments. 282 283 .. versionchanged:: 3.11 284 The *max_tasks_per_child* argument was added to allow users to 285 control the lifetime of workers in the pool. 286 287 288.. _processpoolexecutor-example: 289 290ProcessPoolExecutor Example 291~~~~~~~~~~~~~~~~~~~~~~~~~~~ 292:: 293 294 import concurrent.futures 295 import math 296 297 PRIMES = [ 298 112272535095293, 299 112582705942171, 300 112272535095293, 301 115280095190773, 302 115797848077099, 303 1099726899285419] 304 305 def is_prime(n): 306 if n < 2: 307 return False 308 if n == 2: 309 return True 310 if n % 2 == 0: 311 return False 312 313 sqrt_n = int(math.floor(math.sqrt(n))) 314 for i in range(3, sqrt_n + 1, 2): 315 if n % i == 0: 316 return False 317 return True 318 319 def main(): 320 with concurrent.futures.ProcessPoolExecutor() as executor: 321 for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): 322 print('%d is prime: %s' % (number, prime)) 323 324 if __name__ == '__main__': 325 main() 326 327 328Future Objects 329-------------- 330 331The :class:`Future` class encapsulates the asynchronous execution of a callable. 332:class:`Future` instances are created by :meth:`Executor.submit`. 333 334.. class:: Future 335 336 Encapsulates the asynchronous execution of a callable. :class:`Future` 337 instances are created by :meth:`Executor.submit` and should not be created 338 directly except for testing. 339 340 .. method:: cancel() 341 342 Attempt to cancel the call. If the call is currently being executed or 343 finished running and cannot be cancelled then the method will return 344 ``False``, otherwise the call will be cancelled and the method will 345 return ``True``. 346 347 .. method:: cancelled() 348 349 Return ``True`` if the call was successfully cancelled. 350 351 .. method:: running() 352 353 Return ``True`` if the call is currently being executed and cannot be 354 cancelled. 355 356 .. method:: done() 357 358 Return ``True`` if the call was successfully cancelled or finished 359 running. 360 361 .. method:: result(timeout=None) 362 363 Return the value returned by the call. If the call hasn't yet completed 364 then this method will wait up to *timeout* seconds. If the call hasn't 365 completed in *timeout* seconds, then a 366 :exc:`TimeoutError` will be raised. *timeout* can be 367 an int or float. If *timeout* is not specified or ``None``, there is no 368 limit to the wait time. 369 370 If the future is cancelled before completing then :exc:`.CancelledError` 371 will be raised. 372 373 If the call raised an exception, this method will raise the same exception. 374 375 .. method:: exception(timeout=None) 376 377 Return the exception raised by the call. If the call hasn't yet 378 completed then this method will wait up to *timeout* seconds. If the 379 call hasn't completed in *timeout* seconds, then a 380 :exc:`TimeoutError` will be raised. *timeout* can be 381 an int or float. If *timeout* is not specified or ``None``, there is no 382 limit to the wait time. 383 384 If the future is cancelled before completing then :exc:`.CancelledError` 385 will be raised. 386 387 If the call completed without raising, ``None`` is returned. 388 389 .. method:: add_done_callback(fn) 390 391 Attaches the callable *fn* to the future. *fn* will be called, with the 392 future as its only argument, when the future is cancelled or finishes 393 running. 394 395 Added callables are called in the order that they were added and are 396 always called in a thread belonging to the process that added them. If 397 the callable raises an :exc:`Exception` subclass, it will be logged and 398 ignored. If the callable raises a :exc:`BaseException` subclass, the 399 behavior is undefined. 400 401 If the future has already completed or been cancelled, *fn* will be 402 called immediately. 403 404 The following :class:`Future` methods are meant for use in unit tests and 405 :class:`Executor` implementations. 406 407 .. method:: set_running_or_notify_cancel() 408 409 This method should only be called by :class:`Executor` implementations 410 before executing the work associated with the :class:`Future` and by unit 411 tests. 412 413 If the method returns ``False`` then the :class:`Future` was cancelled, 414 i.e. :meth:`Future.cancel` was called and returned ``True``. Any threads 415 waiting on the :class:`Future` completing (i.e. through 416 :func:`as_completed` or :func:`wait`) will be woken up. 417 418 If the method returns ``True`` then the :class:`Future` was not cancelled 419 and has been put in the running state, i.e. calls to 420 :meth:`Future.running` will return ``True``. 421 422 This method can only be called once and cannot be called after 423 :meth:`Future.set_result` or :meth:`Future.set_exception` have been 424 called. 425 426 .. method:: set_result(result) 427 428 Sets the result of the work associated with the :class:`Future` to 429 *result*. 430 431 This method should only be used by :class:`Executor` implementations and 432 unit tests. 433 434 .. versionchanged:: 3.8 435 This method raises 436 :exc:`concurrent.futures.InvalidStateError` if the :class:`Future` is 437 already done. 438 439 .. method:: set_exception(exception) 440 441 Sets the result of the work associated with the :class:`Future` to the 442 :class:`Exception` *exception*. 443 444 This method should only be used by :class:`Executor` implementations and 445 unit tests. 446 447 .. versionchanged:: 3.8 448 This method raises 449 :exc:`concurrent.futures.InvalidStateError` if the :class:`Future` is 450 already done. 451 452Module Functions 453---------------- 454 455.. function:: wait(fs, timeout=None, return_when=ALL_COMPLETED) 456 457 Wait for the :class:`Future` instances (possibly created by different 458 :class:`Executor` instances) given by *fs* to complete. Duplicate futures 459 given to *fs* are removed and will be returned only once. Returns a named 460 2-tuple of sets. The first set, named ``done``, contains the futures that 461 completed (finished or cancelled futures) before the wait completed. The 462 second set, named ``not_done``, contains the futures that did not complete 463 (pending or running futures). 464 465 *timeout* can be used to control the maximum number of seconds to wait before 466 returning. *timeout* can be an int or float. If *timeout* is not specified 467 or ``None``, there is no limit to the wait time. 468 469 *return_when* indicates when this function should return. It must be one of 470 the following constants: 471 472 .. tabularcolumns:: |l|L| 473 474 +-----------------------------+----------------------------------------+ 475 | Constant | Description | 476 +=============================+========================================+ 477 | :const:`FIRST_COMPLETED` | The function will return when any | 478 | | future finishes or is cancelled. | 479 +-----------------------------+----------------------------------------+ 480 | :const:`FIRST_EXCEPTION` | The function will return when any | 481 | | future finishes by raising an | 482 | | exception. If no future raises an | 483 | | exception then it is equivalent to | 484 | | :const:`ALL_COMPLETED`. | 485 +-----------------------------+----------------------------------------+ 486 | :const:`ALL_COMPLETED` | The function will return when all | 487 | | futures finish or are cancelled. | 488 +-----------------------------+----------------------------------------+ 489 490.. function:: as_completed(fs, timeout=None) 491 492 Returns an iterator over the :class:`Future` instances (possibly created by 493 different :class:`Executor` instances) given by *fs* that yields futures as 494 they complete (finished or cancelled futures). Any futures given by *fs* that 495 are duplicated will be returned once. Any futures that completed before 496 :func:`as_completed` is called will be yielded first. The returned iterator 497 raises a :exc:`TimeoutError` if :meth:`~iterator.__next__` 498 is called and the result isn't available after *timeout* seconds from the 499 original call to :func:`as_completed`. *timeout* can be an int or float. If 500 *timeout* is not specified or ``None``, there is no limit to the wait time. 501 502 503.. seealso:: 504 505 :pep:`3148` -- futures - execute computations asynchronously 506 The proposal which described this feature for inclusion in the Python 507 standard library. 508 509 510Exception classes 511----------------- 512 513.. currentmodule:: concurrent.futures 514 515.. exception:: CancelledError 516 517 Raised when a future is cancelled. 518 519.. exception:: TimeoutError 520 521 A deprecated alias of :exc:`TimeoutError`, 522 raised when a future operation exceeds the given timeout. 523 524 .. versionchanged:: 3.11 525 526 This class was made an alias of :exc:`TimeoutError`. 527 528 529.. exception:: BrokenExecutor 530 531 Derived from :exc:`RuntimeError`, this exception class is raised 532 when an executor is broken for some reason, and cannot be used 533 to submit or execute new tasks. 534 535 .. versionadded:: 3.7 536 537.. exception:: InvalidStateError 538 539 Raised when an operation is performed on a future that is not allowed 540 in the current state. 541 542 .. versionadded:: 3.8 543 544.. currentmodule:: concurrent.futures.thread 545 546.. exception:: BrokenThreadPool 547 548 Derived from :exc:`~concurrent.futures.BrokenExecutor`, this exception 549 class is raised when one of the workers of a :class:`ThreadPoolExecutor` 550 has failed initializing. 551 552 .. versionadded:: 3.7 553 554.. currentmodule:: concurrent.futures.process 555 556.. exception:: BrokenProcessPool 557 558 Derived from :exc:`~concurrent.futures.BrokenExecutor` (formerly 559 :exc:`RuntimeError`), this exception class is raised when one of the 560 workers of a :class:`ProcessPoolExecutor` has terminated in a non-clean 561 fashion (for example, if it was killed from the outside). 562 563 .. versionadded:: 3.3 564