1from test import support 2from test.support import import_helper 3from test.support import threading_helper 4 5# Skip tests if _multiprocessing wasn't built. 6import_helper.import_module('_multiprocessing') 7 8from test.support import hashlib_helper 9from test.support.script_helper import assert_python_ok 10 11import contextlib 12import itertools 13import logging 14from logging.handlers import QueueHandler 15import os 16import queue 17import signal 18import sys 19import threading 20import time 21import unittest 22import weakref 23from pickle import PicklingError 24 25from concurrent import futures 26from concurrent.futures._base import ( 27 PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future, 28 BrokenExecutor) 29from concurrent.futures.process import BrokenProcessPool, _check_system_limits 30 31import multiprocessing.process 32import multiprocessing.util 33import multiprocessing as mp 34 35 36if support.check_sanitizer(address=True, memory=True): 37 # bpo-46633: Skip the test because it is too slow when Python is built 38 # with ASAN/MSAN: between 5 and 20 minutes on GitHub Actions. 39 raise unittest.SkipTest("test too slow on ASAN/MSAN build") 40 41 42def create_future(state=PENDING, exception=None, result=None): 43 f = Future() 44 f._state = state 45 f._exception = exception 46 f._result = result 47 return f 48 49 50PENDING_FUTURE = create_future(state=PENDING) 51RUNNING_FUTURE = create_future(state=RUNNING) 52CANCELLED_FUTURE = create_future(state=CANCELLED) 53CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) 54EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError()) 55SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) 56 57INITIALIZER_STATUS = 'uninitialized' 58 59def mul(x, y): 60 return x * y 61 62def capture(*args, **kwargs): 63 return args, kwargs 64 65def sleep_and_raise(t): 66 time.sleep(t) 67 raise Exception('this is an exception') 68 69def sleep_and_print(t, msg): 70 time.sleep(t) 71 print(msg) 72 sys.stdout.flush() 73 74def init(x): 75 global INITIALIZER_STATUS 76 INITIALIZER_STATUS = x 77 78def get_init_status(): 79 return INITIALIZER_STATUS 80 81def init_fail(log_queue=None): 82 if log_queue is not None: 83 logger = logging.getLogger('concurrent.futures') 84 logger.addHandler(QueueHandler(log_queue)) 85 logger.setLevel('CRITICAL') 86 logger.propagate = False 87 time.sleep(0.1) # let some futures be scheduled 88 raise ValueError('error in initializer') 89 90 91class MyObject(object): 92 def my_method(self): 93 pass 94 95 96class EventfulGCObj(): 97 def __init__(self, mgr): 98 self.event = mgr.Event() 99 100 def __del__(self): 101 self.event.set() 102 103 104def make_dummy_object(_): 105 return MyObject() 106 107 108class BaseTestCase(unittest.TestCase): 109 def setUp(self): 110 self._thread_key = threading_helper.threading_setup() 111 112 def tearDown(self): 113 support.reap_children() 114 threading_helper.threading_cleanup(*self._thread_key) 115 116 117class ExecutorMixin: 118 worker_count = 5 119 executor_kwargs = {} 120 121 def setUp(self): 122 super().setUp() 123 124 self.t1 = time.monotonic() 125 if hasattr(self, "ctx"): 126 self.executor = self.executor_type( 127 max_workers=self.worker_count, 128 mp_context=self.get_context(), 129 **self.executor_kwargs) 130 else: 131 self.executor = self.executor_type( 132 max_workers=self.worker_count, 133 **self.executor_kwargs) 134 135 def tearDown(self): 136 self.executor.shutdown(wait=True) 137 self.executor = None 138 139 dt = time.monotonic() - self.t1 140 if support.verbose: 141 print("%.2fs" % dt, end=' ') 142 self.assertLess(dt, 300, "synchronization issue: test lasted too long") 143 144 super().tearDown() 145 146 def get_context(self): 147 return mp.get_context(self.ctx) 148 149 150class ThreadPoolMixin(ExecutorMixin): 151 executor_type = futures.ThreadPoolExecutor 152 153 154class ProcessPoolForkMixin(ExecutorMixin): 155 executor_type = futures.ProcessPoolExecutor 156 ctx = "fork" 157 158 def get_context(self): 159 try: 160 _check_system_limits() 161 except NotImplementedError: 162 self.skipTest("ProcessPoolExecutor unavailable on this system") 163 if sys.platform == "win32": 164 self.skipTest("require unix system") 165 return super().get_context() 166 167 168class ProcessPoolSpawnMixin(ExecutorMixin): 169 executor_type = futures.ProcessPoolExecutor 170 ctx = "spawn" 171 172 def get_context(self): 173 try: 174 _check_system_limits() 175 except NotImplementedError: 176 self.skipTest("ProcessPoolExecutor unavailable on this system") 177 return super().get_context() 178 179 180class ProcessPoolForkserverMixin(ExecutorMixin): 181 executor_type = futures.ProcessPoolExecutor 182 ctx = "forkserver" 183 184 def get_context(self): 185 try: 186 _check_system_limits() 187 except NotImplementedError: 188 self.skipTest("ProcessPoolExecutor unavailable on this system") 189 if sys.platform == "win32": 190 self.skipTest("require unix system") 191 return super().get_context() 192 193 194def create_executor_tests(mixin, bases=(BaseTestCase,), 195 executor_mixins=(ThreadPoolMixin, 196 ProcessPoolForkMixin, 197 ProcessPoolForkserverMixin, 198 ProcessPoolSpawnMixin)): 199 def strip_mixin(name): 200 if name.endswith(('Mixin', 'Tests')): 201 return name[:-5] 202 elif name.endswith('Test'): 203 return name[:-4] 204 else: 205 return name 206 207 for exe in executor_mixins: 208 name = ("%s%sTest" 209 % (strip_mixin(exe.__name__), strip_mixin(mixin.__name__))) 210 cls = type(name, (mixin,) + (exe,) + bases, {}) 211 globals()[name] = cls 212 213 214class InitializerMixin(ExecutorMixin): 215 worker_count = 2 216 217 def setUp(self): 218 global INITIALIZER_STATUS 219 INITIALIZER_STATUS = 'uninitialized' 220 self.executor_kwargs = dict(initializer=init, 221 initargs=('initialized',)) 222 super().setUp() 223 224 def test_initializer(self): 225 futures = [self.executor.submit(get_init_status) 226 for _ in range(self.worker_count)] 227 228 for f in futures: 229 self.assertEqual(f.result(), 'initialized') 230 231 232class FailingInitializerMixin(ExecutorMixin): 233 worker_count = 2 234 235 def setUp(self): 236 if hasattr(self, "ctx"): 237 # Pass a queue to redirect the child's logging output 238 self.mp_context = self.get_context() 239 self.log_queue = self.mp_context.Queue() 240 self.executor_kwargs = dict(initializer=init_fail, 241 initargs=(self.log_queue,)) 242 else: 243 # In a thread pool, the child shares our logging setup 244 # (see _assert_logged()) 245 self.mp_context = None 246 self.log_queue = None 247 self.executor_kwargs = dict(initializer=init_fail) 248 super().setUp() 249 250 def test_initializer(self): 251 with self._assert_logged('ValueError: error in initializer'): 252 try: 253 future = self.executor.submit(get_init_status) 254 except BrokenExecutor: 255 # Perhaps the executor is already broken 256 pass 257 else: 258 with self.assertRaises(BrokenExecutor): 259 future.result() 260 # At some point, the executor should break 261 t1 = time.monotonic() 262 while not self.executor._broken: 263 if time.monotonic() - t1 > 5: 264 self.fail("executor not broken after 5 s.") 265 time.sleep(0.01) 266 # ... and from this point submit() is guaranteed to fail 267 with self.assertRaises(BrokenExecutor): 268 self.executor.submit(get_init_status) 269 270 @contextlib.contextmanager 271 def _assert_logged(self, msg): 272 if self.log_queue is not None: 273 yield 274 output = [] 275 try: 276 while True: 277 output.append(self.log_queue.get_nowait().getMessage()) 278 except queue.Empty: 279 pass 280 else: 281 with self.assertLogs('concurrent.futures', 'CRITICAL') as cm: 282 yield 283 output = cm.output 284 self.assertTrue(any(msg in line for line in output), 285 output) 286 287 288create_executor_tests(InitializerMixin) 289create_executor_tests(FailingInitializerMixin) 290 291 292class ExecutorShutdownTest: 293 def test_run_after_shutdown(self): 294 self.executor.shutdown() 295 self.assertRaises(RuntimeError, 296 self.executor.submit, 297 pow, 2, 5) 298 299 def test_interpreter_shutdown(self): 300 # Test the atexit hook for shutdown of worker threads and processes 301 rc, out, err = assert_python_ok('-c', """if 1: 302 from concurrent.futures import {executor_type} 303 from time import sleep 304 from test.test_concurrent_futures import sleep_and_print 305 if __name__ == "__main__": 306 context = '{context}' 307 if context == "": 308 t = {executor_type}(5) 309 else: 310 from multiprocessing import get_context 311 context = get_context(context) 312 t = {executor_type}(5, mp_context=context) 313 t.submit(sleep_and_print, 1.0, "apple") 314 """.format(executor_type=self.executor_type.__name__, 315 context=getattr(self, "ctx", ""))) 316 # Errors in atexit hooks don't change the process exit code, check 317 # stderr manually. 318 self.assertFalse(err) 319 self.assertEqual(out.strip(), b"apple") 320 321 def test_submit_after_interpreter_shutdown(self): 322 # Test the atexit hook for shutdown of worker threads and processes 323 rc, out, err = assert_python_ok('-c', """if 1: 324 import atexit 325 @atexit.register 326 def run_last(): 327 try: 328 t.submit(id, None) 329 except RuntimeError: 330 print("runtime-error") 331 raise 332 from concurrent.futures import {executor_type} 333 if __name__ == "__main__": 334 context = '{context}' 335 if not context: 336 t = {executor_type}(5) 337 else: 338 from multiprocessing import get_context 339 context = get_context(context) 340 t = {executor_type}(5, mp_context=context) 341 t.submit(id, 42).result() 342 """.format(executor_type=self.executor_type.__name__, 343 context=getattr(self, "ctx", ""))) 344 # Errors in atexit hooks don't change the process exit code, check 345 # stderr manually. 346 self.assertIn("RuntimeError: cannot schedule new futures", err.decode()) 347 self.assertEqual(out.strip(), b"runtime-error") 348 349 def test_hang_issue12364(self): 350 fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)] 351 self.executor.shutdown() 352 for f in fs: 353 f.result() 354 355 def test_cancel_futures(self): 356 assert self.worker_count <= 5, "test needs few workers" 357 fs = [self.executor.submit(time.sleep, .1) for _ in range(50)] 358 self.executor.shutdown(cancel_futures=True) 359 # We can't guarantee the exact number of cancellations, but we can 360 # guarantee that *some* were cancelled. With few workers, many of 361 # the submitted futures should have been cancelled. 362 cancelled = [fut for fut in fs if fut.cancelled()] 363 self.assertGreater(len(cancelled), 20) 364 365 # Ensure the other futures were able to finish. 366 # Use "not fut.cancelled()" instead of "fut.done()" to include futures 367 # that may have been left in a pending state. 368 others = [fut for fut in fs if not fut.cancelled()] 369 for fut in others: 370 self.assertTrue(fut.done(), msg=f"{fut._state=}") 371 self.assertIsNone(fut.exception()) 372 373 # Similar to the number of cancelled futures, we can't guarantee the 374 # exact number that completed. But, we can guarantee that at least 375 # one finished. 376 self.assertGreater(len(others), 0) 377 378 def test_hang_gh83386(self): 379 """shutdown(wait=False) doesn't hang at exit with running futures. 380 381 See https://github.com/python/cpython/issues/83386. 382 """ 383 if self.executor_type == futures.ProcessPoolExecutor: 384 raise unittest.SkipTest( 385 "Hangs, see https://github.com/python/cpython/issues/83386") 386 387 rc, out, err = assert_python_ok('-c', """if True: 388 from concurrent.futures import {executor_type} 389 from test.test_concurrent_futures import sleep_and_print 390 if __name__ == "__main__": 391 if {context!r}: multiprocessing.set_start_method({context!r}) 392 t = {executor_type}(max_workers=3) 393 t.submit(sleep_and_print, 1.0, "apple") 394 t.shutdown(wait=False) 395 """.format(executor_type=self.executor_type.__name__, 396 context=getattr(self, 'ctx', None))) 397 self.assertFalse(err) 398 self.assertEqual(out.strip(), b"apple") 399 400 def test_hang_gh94440(self): 401 """shutdown(wait=True) doesn't hang when a future was submitted and 402 quickly canceled right before shutdown. 403 404 See https://github.com/python/cpython/issues/94440. 405 """ 406 if not hasattr(signal, 'alarm'): 407 raise unittest.SkipTest( 408 "Tested platform does not support the alarm signal") 409 410 def timeout(_signum, _frame): 411 raise RuntimeError("timed out waiting for shutdown") 412 413 kwargs = {} 414 if getattr(self, 'ctx', None): 415 kwargs['mp_context'] = self.get_context() 416 executor = self.executor_type(max_workers=1, **kwargs) 417 executor.submit(int).result() 418 old_handler = signal.signal(signal.SIGALRM, timeout) 419 try: 420 signal.alarm(5) 421 executor.submit(int).cancel() 422 executor.shutdown(wait=True) 423 finally: 424 signal.alarm(0) 425 signal.signal(signal.SIGALRM, old_handler) 426 427 428class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase): 429 def test_threads_terminate(self): 430 def acquire_lock(lock): 431 lock.acquire() 432 433 sem = threading.Semaphore(0) 434 for i in range(3): 435 self.executor.submit(acquire_lock, sem) 436 self.assertEqual(len(self.executor._threads), 3) 437 for i in range(3): 438 sem.release() 439 self.executor.shutdown() 440 for t in self.executor._threads: 441 t.join() 442 443 def test_context_manager_shutdown(self): 444 with futures.ThreadPoolExecutor(max_workers=5) as e: 445 executor = e 446 self.assertEqual(list(e.map(abs, range(-5, 5))), 447 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) 448 449 for t in executor._threads: 450 t.join() 451 452 def test_del_shutdown(self): 453 executor = futures.ThreadPoolExecutor(max_workers=5) 454 res = executor.map(abs, range(-5, 5)) 455 threads = executor._threads 456 del executor 457 458 for t in threads: 459 t.join() 460 461 # Make sure the results were all computed before the 462 # executor got shutdown. 463 assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) 464 465 def test_shutdown_no_wait(self): 466 # Ensure that the executor cleans up the threads when calling 467 # shutdown with wait=False 468 executor = futures.ThreadPoolExecutor(max_workers=5) 469 res = executor.map(abs, range(-5, 5)) 470 threads = executor._threads 471 executor.shutdown(wait=False) 472 for t in threads: 473 t.join() 474 475 # Make sure the results were all computed before the 476 # executor got shutdown. 477 assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) 478 479 480 def test_thread_names_assigned(self): 481 executor = futures.ThreadPoolExecutor( 482 max_workers=5, thread_name_prefix='SpecialPool') 483 executor.map(abs, range(-5, 5)) 484 threads = executor._threads 485 del executor 486 support.gc_collect() # For PyPy or other GCs. 487 488 for t in threads: 489 self.assertRegex(t.name, r'^SpecialPool_[0-4]$') 490 t.join() 491 492 def test_thread_names_default(self): 493 executor = futures.ThreadPoolExecutor(max_workers=5) 494 executor.map(abs, range(-5, 5)) 495 threads = executor._threads 496 del executor 497 support.gc_collect() # For PyPy or other GCs. 498 499 for t in threads: 500 # Ensure that our default name is reasonably sane and unique when 501 # no thread_name_prefix was supplied. 502 self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$') 503 t.join() 504 505 def test_cancel_futures_wait_false(self): 506 # Can only be reliably tested for TPE, since PPE often hangs with 507 # `wait=False` (even without *cancel_futures*). 508 rc, out, err = assert_python_ok('-c', """if True: 509 from concurrent.futures import ThreadPoolExecutor 510 from test.test_concurrent_futures import sleep_and_print 511 if __name__ == "__main__": 512 t = ThreadPoolExecutor() 513 t.submit(sleep_and_print, .1, "apple") 514 t.shutdown(wait=False, cancel_futures=True) 515 """) 516 # Errors in atexit hooks don't change the process exit code, check 517 # stderr manually. 518 self.assertFalse(err) 519 self.assertEqual(out.strip(), b"apple") 520 521 522class ProcessPoolShutdownTest(ExecutorShutdownTest): 523 def test_processes_terminate(self): 524 def acquire_lock(lock): 525 lock.acquire() 526 527 mp_context = self.get_context() 528 if mp_context.get_start_method(allow_none=False) == "fork": 529 # fork pre-spawns, not on demand. 530 expected_num_processes = self.worker_count 531 else: 532 expected_num_processes = 3 533 534 sem = mp_context.Semaphore(0) 535 for _ in range(3): 536 self.executor.submit(acquire_lock, sem) 537 self.assertEqual(len(self.executor._processes), expected_num_processes) 538 for _ in range(3): 539 sem.release() 540 processes = self.executor._processes 541 self.executor.shutdown() 542 543 for p in processes.values(): 544 p.join() 545 546 def test_context_manager_shutdown(self): 547 with futures.ProcessPoolExecutor( 548 max_workers=5, mp_context=self.get_context()) as e: 549 processes = e._processes 550 self.assertEqual(list(e.map(abs, range(-5, 5))), 551 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) 552 553 for p in processes.values(): 554 p.join() 555 556 def test_del_shutdown(self): 557 executor = futures.ProcessPoolExecutor( 558 max_workers=5, mp_context=self.get_context()) 559 res = executor.map(abs, range(-5, 5)) 560 executor_manager_thread = executor._executor_manager_thread 561 processes = executor._processes 562 call_queue = executor._call_queue 563 executor_manager_thread = executor._executor_manager_thread 564 del executor 565 support.gc_collect() # For PyPy or other GCs. 566 567 # Make sure that all the executor resources were properly cleaned by 568 # the shutdown process 569 executor_manager_thread.join() 570 for p in processes.values(): 571 p.join() 572 call_queue.join_thread() 573 574 # Make sure the results were all computed before the 575 # executor got shutdown. 576 assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) 577 578 def test_shutdown_no_wait(self): 579 # Ensure that the executor cleans up the processes when calling 580 # shutdown with wait=False 581 executor = futures.ProcessPoolExecutor( 582 max_workers=5, mp_context=self.get_context()) 583 res = executor.map(abs, range(-5, 5)) 584 processes = executor._processes 585 call_queue = executor._call_queue 586 executor_manager_thread = executor._executor_manager_thread 587 executor.shutdown(wait=False) 588 589 # Make sure that all the executor resources were properly cleaned by 590 # the shutdown process 591 executor_manager_thread.join() 592 for p in processes.values(): 593 p.join() 594 call_queue.join_thread() 595 596 # Make sure the results were all computed before the executor got 597 # shutdown. 598 assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) 599 600 601create_executor_tests(ProcessPoolShutdownTest, 602 executor_mixins=(ProcessPoolForkMixin, 603 ProcessPoolForkserverMixin, 604 ProcessPoolSpawnMixin)) 605 606 607class WaitTests: 608 def test_20369(self): 609 # See https://bugs.python.org/issue20369 610 future = self.executor.submit(time.sleep, 1.5) 611 done, not_done = futures.wait([future, future], 612 return_when=futures.ALL_COMPLETED) 613 self.assertEqual({future}, done) 614 self.assertEqual(set(), not_done) 615 616 617 def test_first_completed(self): 618 future1 = self.executor.submit(mul, 21, 2) 619 future2 = self.executor.submit(time.sleep, 1.5) 620 621 done, not_done = futures.wait( 622 [CANCELLED_FUTURE, future1, future2], 623 return_when=futures.FIRST_COMPLETED) 624 625 self.assertEqual(set([future1]), done) 626 self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done) 627 628 def test_first_completed_some_already_completed(self): 629 future1 = self.executor.submit(time.sleep, 1.5) 630 631 finished, pending = futures.wait( 632 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1], 633 return_when=futures.FIRST_COMPLETED) 634 635 self.assertEqual( 636 set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]), 637 finished) 638 self.assertEqual(set([future1]), pending) 639 640 def test_first_exception(self): 641 future1 = self.executor.submit(mul, 2, 21) 642 future2 = self.executor.submit(sleep_and_raise, 1.5) 643 future3 = self.executor.submit(time.sleep, 3) 644 645 finished, pending = futures.wait( 646 [future1, future2, future3], 647 return_when=futures.FIRST_EXCEPTION) 648 649 self.assertEqual(set([future1, future2]), finished) 650 self.assertEqual(set([future3]), pending) 651 652 def test_first_exception_some_already_complete(self): 653 future1 = self.executor.submit(divmod, 21, 0) 654 future2 = self.executor.submit(time.sleep, 1.5) 655 656 finished, pending = futures.wait( 657 [SUCCESSFUL_FUTURE, 658 CANCELLED_FUTURE, 659 CANCELLED_AND_NOTIFIED_FUTURE, 660 future1, future2], 661 return_when=futures.FIRST_EXCEPTION) 662 663 self.assertEqual(set([SUCCESSFUL_FUTURE, 664 CANCELLED_AND_NOTIFIED_FUTURE, 665 future1]), finished) 666 self.assertEqual(set([CANCELLED_FUTURE, future2]), pending) 667 668 def test_first_exception_one_already_failed(self): 669 future1 = self.executor.submit(time.sleep, 2) 670 671 finished, pending = futures.wait( 672 [EXCEPTION_FUTURE, future1], 673 return_when=futures.FIRST_EXCEPTION) 674 675 self.assertEqual(set([EXCEPTION_FUTURE]), finished) 676 self.assertEqual(set([future1]), pending) 677 678 def test_all_completed(self): 679 future1 = self.executor.submit(divmod, 2, 0) 680 future2 = self.executor.submit(mul, 2, 21) 681 682 finished, pending = futures.wait( 683 [SUCCESSFUL_FUTURE, 684 CANCELLED_AND_NOTIFIED_FUTURE, 685 EXCEPTION_FUTURE, 686 future1, 687 future2], 688 return_when=futures.ALL_COMPLETED) 689 690 self.assertEqual(set([SUCCESSFUL_FUTURE, 691 CANCELLED_AND_NOTIFIED_FUTURE, 692 EXCEPTION_FUTURE, 693 future1, 694 future2]), finished) 695 self.assertEqual(set(), pending) 696 697 def test_timeout(self): 698 future1 = self.executor.submit(mul, 6, 7) 699 future2 = self.executor.submit(time.sleep, 6) 700 701 finished, pending = futures.wait( 702 [CANCELLED_AND_NOTIFIED_FUTURE, 703 EXCEPTION_FUTURE, 704 SUCCESSFUL_FUTURE, 705 future1, future2], 706 timeout=5, 707 return_when=futures.ALL_COMPLETED) 708 709 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, 710 EXCEPTION_FUTURE, 711 SUCCESSFUL_FUTURE, 712 future1]), finished) 713 self.assertEqual(set([future2]), pending) 714 715 716class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, BaseTestCase): 717 718 def test_pending_calls_race(self): 719 # Issue #14406: multi-threaded race condition when waiting on all 720 # futures. 721 event = threading.Event() 722 def future_func(): 723 event.wait() 724 oldswitchinterval = sys.getswitchinterval() 725 sys.setswitchinterval(1e-6) 726 try: 727 fs = {self.executor.submit(future_func) for i in range(100)} 728 event.set() 729 futures.wait(fs, return_when=futures.ALL_COMPLETED) 730 finally: 731 sys.setswitchinterval(oldswitchinterval) 732 733 734create_executor_tests(WaitTests, 735 executor_mixins=(ProcessPoolForkMixin, 736 ProcessPoolForkserverMixin, 737 ProcessPoolSpawnMixin)) 738 739 740class AsCompletedTests: 741 # TODO([email protected]): Should have a test with a non-zero timeout. 742 def test_no_timeout(self): 743 future1 = self.executor.submit(mul, 2, 21) 744 future2 = self.executor.submit(mul, 7, 6) 745 746 completed = set(futures.as_completed( 747 [CANCELLED_AND_NOTIFIED_FUTURE, 748 EXCEPTION_FUTURE, 749 SUCCESSFUL_FUTURE, 750 future1, future2])) 751 self.assertEqual(set( 752 [CANCELLED_AND_NOTIFIED_FUTURE, 753 EXCEPTION_FUTURE, 754 SUCCESSFUL_FUTURE, 755 future1, future2]), 756 completed) 757 758 def test_zero_timeout(self): 759 future1 = self.executor.submit(time.sleep, 2) 760 completed_futures = set() 761 try: 762 for future in futures.as_completed( 763 [CANCELLED_AND_NOTIFIED_FUTURE, 764 EXCEPTION_FUTURE, 765 SUCCESSFUL_FUTURE, 766 future1], 767 timeout=0): 768 completed_futures.add(future) 769 except futures.TimeoutError: 770 pass 771 772 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, 773 EXCEPTION_FUTURE, 774 SUCCESSFUL_FUTURE]), 775 completed_futures) 776 777 def test_duplicate_futures(self): 778 # Issue 20367. Duplicate futures should not raise exceptions or give 779 # duplicate responses. 780 # Issue #31641: accept arbitrary iterables. 781 future1 = self.executor.submit(time.sleep, 2) 782 completed = [ 783 f for f in futures.as_completed(itertools.repeat(future1, 3)) 784 ] 785 self.assertEqual(len(completed), 1) 786 787 def test_free_reference_yielded_future(self): 788 # Issue #14406: Generator should not keep references 789 # to finished futures. 790 futures_list = [Future() for _ in range(8)] 791 futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED)) 792 futures_list.append(create_future(state=FINISHED, result=42)) 793 794 with self.assertRaises(futures.TimeoutError): 795 for future in futures.as_completed(futures_list, timeout=0): 796 futures_list.remove(future) 797 wr = weakref.ref(future) 798 del future 799 support.gc_collect() # For PyPy or other GCs. 800 self.assertIsNone(wr()) 801 802 futures_list[0].set_result("test") 803 for future in futures.as_completed(futures_list): 804 futures_list.remove(future) 805 wr = weakref.ref(future) 806 del future 807 support.gc_collect() # For PyPy or other GCs. 808 self.assertIsNone(wr()) 809 if futures_list: 810 futures_list[0].set_result("test") 811 812 def test_correct_timeout_exception_msg(self): 813 futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE, 814 RUNNING_FUTURE, SUCCESSFUL_FUTURE] 815 816 with self.assertRaises(futures.TimeoutError) as cm: 817 list(futures.as_completed(futures_list, timeout=0)) 818 819 self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished') 820 821 822create_executor_tests(AsCompletedTests) 823 824 825class ExecutorTest: 826 # Executor.shutdown() and context manager usage is tested by 827 # ExecutorShutdownTest. 828 def test_submit(self): 829 future = self.executor.submit(pow, 2, 8) 830 self.assertEqual(256, future.result()) 831 832 def test_submit_keyword(self): 833 future = self.executor.submit(mul, 2, y=8) 834 self.assertEqual(16, future.result()) 835 future = self.executor.submit(capture, 1, self=2, fn=3) 836 self.assertEqual(future.result(), ((1,), {'self': 2, 'fn': 3})) 837 with self.assertRaises(TypeError): 838 self.executor.submit(fn=capture, arg=1) 839 with self.assertRaises(TypeError): 840 self.executor.submit(arg=1) 841 842 def test_map(self): 843 self.assertEqual( 844 list(self.executor.map(pow, range(10), range(10))), 845 list(map(pow, range(10), range(10)))) 846 847 self.assertEqual( 848 list(self.executor.map(pow, range(10), range(10), chunksize=3)), 849 list(map(pow, range(10), range(10)))) 850 851 def test_map_exception(self): 852 i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5]) 853 self.assertEqual(i.__next__(), (0, 1)) 854 self.assertEqual(i.__next__(), (0, 1)) 855 self.assertRaises(ZeroDivisionError, i.__next__) 856 857 def test_map_timeout(self): 858 results = [] 859 try: 860 for i in self.executor.map(time.sleep, 861 [0, 0, 6], 862 timeout=5): 863 results.append(i) 864 except futures.TimeoutError: 865 pass 866 else: 867 self.fail('expected TimeoutError') 868 869 self.assertEqual([None, None], results) 870 871 def test_shutdown_race_issue12456(self): 872 # Issue #12456: race condition at shutdown where trying to post a 873 # sentinel in the call queue blocks (the queue is full while processes 874 # have exited). 875 self.executor.map(str, [2] * (self.worker_count + 1)) 876 self.executor.shutdown() 877 878 @support.cpython_only 879 def test_no_stale_references(self): 880 # Issue #16284: check that the executors don't unnecessarily hang onto 881 # references. 882 my_object = MyObject() 883 my_object_collected = threading.Event() 884 my_object_callback = weakref.ref( 885 my_object, lambda obj: my_object_collected.set()) 886 # Deliberately discarding the future. 887 self.executor.submit(my_object.my_method) 888 del my_object 889 890 collected = my_object_collected.wait(timeout=support.SHORT_TIMEOUT) 891 self.assertTrue(collected, 892 "Stale reference not collected within timeout.") 893 894 def test_max_workers_negative(self): 895 for number in (0, -1): 896 with self.assertRaisesRegex(ValueError, 897 "max_workers must be greater " 898 "than 0"): 899 self.executor_type(max_workers=number) 900 901 def test_free_reference(self): 902 # Issue #14406: Result iterator should not keep an internal 903 # reference to result objects. 904 for obj in self.executor.map(make_dummy_object, range(10)): 905 wr = weakref.ref(obj) 906 del obj 907 support.gc_collect() # For PyPy or other GCs. 908 self.assertIsNone(wr()) 909 910 911class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase): 912 def test_map_submits_without_iteration(self): 913 """Tests verifying issue 11777.""" 914 finished = [] 915 def record_finished(n): 916 finished.append(n) 917 918 self.executor.map(record_finished, range(10)) 919 self.executor.shutdown(wait=True) 920 self.assertCountEqual(finished, range(10)) 921 922 def test_default_workers(self): 923 executor = self.executor_type() 924 expected = min(32, (os.cpu_count() or 1) + 4) 925 self.assertEqual(executor._max_workers, expected) 926 927 def test_saturation(self): 928 executor = self.executor_type(4) 929 def acquire_lock(lock): 930 lock.acquire() 931 932 sem = threading.Semaphore(0) 933 for i in range(15 * executor._max_workers): 934 executor.submit(acquire_lock, sem) 935 self.assertEqual(len(executor._threads), executor._max_workers) 936 for i in range(15 * executor._max_workers): 937 sem.release() 938 executor.shutdown(wait=True) 939 940 def test_idle_thread_reuse(self): 941 executor = self.executor_type() 942 executor.submit(mul, 21, 2).result() 943 executor.submit(mul, 6, 7).result() 944 executor.submit(mul, 3, 14).result() 945 self.assertEqual(len(executor._threads), 1) 946 executor.shutdown(wait=True) 947 948 @unittest.skipUnless(hasattr(os, 'register_at_fork'), 'need os.register_at_fork') 949 def test_hang_global_shutdown_lock(self): 950 # bpo-45021: _global_shutdown_lock should be reinitialized in the child 951 # process, otherwise it will never exit 952 def submit(pool): 953 pool.submit(submit, pool) 954 955 with futures.ThreadPoolExecutor(1) as pool: 956 pool.submit(submit, pool) 957 958 for _ in range(50): 959 with futures.ProcessPoolExecutor(1, mp_context=mp.get_context('fork')) as workers: 960 workers.submit(tuple) 961 962 def test_executor_map_current_future_cancel(self): 963 stop_event = threading.Event() 964 log = [] 965 966 def log_n_wait(ident): 967 log.append(f"{ident=} started") 968 try: 969 stop_event.wait() 970 finally: 971 log.append(f"{ident=} stopped") 972 973 with self.executor_type(max_workers=1) as pool: 974 # submit work to saturate the pool 975 fut = pool.submit(log_n_wait, ident="first") 976 try: 977 with contextlib.closing( 978 pool.map(log_n_wait, ["second", "third"], timeout=0) 979 ) as gen: 980 with self.assertRaises(TimeoutError): 981 next(gen) 982 finally: 983 stop_event.set() 984 fut.result() 985 # ident='second' is cancelled as a result of raising a TimeoutError 986 # ident='third' is cancelled because it remained in the collection of futures 987 self.assertListEqual(log, ["ident='first' started", "ident='first' stopped"]) 988 989 990class ProcessPoolExecutorTest(ExecutorTest): 991 992 @unittest.skipUnless(sys.platform=='win32', 'Windows-only process limit') 993 def test_max_workers_too_large(self): 994 with self.assertRaisesRegex(ValueError, 995 "max_workers must be <= 61"): 996 futures.ProcessPoolExecutor(max_workers=62) 997 998 def test_killed_child(self): 999 # When a child process is abruptly terminated, the whole pool gets 1000 # "broken". 1001 futures = [self.executor.submit(time.sleep, 3)] 1002 # Get one of the processes, and terminate (kill) it 1003 p = next(iter(self.executor._processes.values())) 1004 p.terminate() 1005 for fut in futures: 1006 self.assertRaises(BrokenProcessPool, fut.result) 1007 # Submitting other jobs fails as well. 1008 self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8) 1009 1010 def test_map_chunksize(self): 1011 def bad_map(): 1012 list(self.executor.map(pow, range(40), range(40), chunksize=-1)) 1013 1014 ref = list(map(pow, range(40), range(40))) 1015 self.assertEqual( 1016 list(self.executor.map(pow, range(40), range(40), chunksize=6)), 1017 ref) 1018 self.assertEqual( 1019 list(self.executor.map(pow, range(40), range(40), chunksize=50)), 1020 ref) 1021 self.assertEqual( 1022 list(self.executor.map(pow, range(40), range(40), chunksize=40)), 1023 ref) 1024 self.assertRaises(ValueError, bad_map) 1025 1026 @classmethod 1027 def _test_traceback(cls): 1028 raise RuntimeError(123) # some comment 1029 1030 def test_traceback(self): 1031 # We want ensure that the traceback from the child process is 1032 # contained in the traceback raised in the main process. 1033 future = self.executor.submit(self._test_traceback) 1034 with self.assertRaises(Exception) as cm: 1035 future.result() 1036 1037 exc = cm.exception 1038 self.assertIs(type(exc), RuntimeError) 1039 self.assertEqual(exc.args, (123,)) 1040 cause = exc.__cause__ 1041 self.assertIs(type(cause), futures.process._RemoteTraceback) 1042 self.assertIn('raise RuntimeError(123) # some comment', cause.tb) 1043 1044 with support.captured_stderr() as f1: 1045 try: 1046 raise exc 1047 except RuntimeError: 1048 sys.excepthook(*sys.exc_info()) 1049 self.assertIn('raise RuntimeError(123) # some comment', 1050 f1.getvalue()) 1051 1052 @hashlib_helper.requires_hashdigest('md5') 1053 def test_ressources_gced_in_workers(self): 1054 # Ensure that argument for a job are correctly gc-ed after the job 1055 # is finished 1056 mgr = self.get_context().Manager() 1057 obj = EventfulGCObj(mgr) 1058 future = self.executor.submit(id, obj) 1059 future.result() 1060 1061 self.assertTrue(obj.event.wait(timeout=1)) 1062 1063 # explicitly destroy the object to ensure that EventfulGCObj.__del__() 1064 # is called while manager is still running. 1065 obj = None 1066 support.gc_collect() 1067 1068 mgr.shutdown() 1069 mgr.join() 1070 1071 def test_saturation(self): 1072 executor = self.executor 1073 mp_context = self.get_context() 1074 sem = mp_context.Semaphore(0) 1075 job_count = 15 * executor._max_workers 1076 for _ in range(job_count): 1077 executor.submit(sem.acquire) 1078 self.assertEqual(len(executor._processes), executor._max_workers) 1079 for _ in range(job_count): 1080 sem.release() 1081 1082 def test_idle_process_reuse_one(self): 1083 executor = self.executor 1084 assert executor._max_workers >= 4 1085 if self.get_context().get_start_method(allow_none=False) == "fork": 1086 raise unittest.SkipTest("Incompatible with the fork start method.") 1087 executor.submit(mul, 21, 2).result() 1088 executor.submit(mul, 6, 7).result() 1089 executor.submit(mul, 3, 14).result() 1090 self.assertEqual(len(executor._processes), 1) 1091 1092 def test_idle_process_reuse_multiple(self): 1093 executor = self.executor 1094 assert executor._max_workers <= 5 1095 if self.get_context().get_start_method(allow_none=False) == "fork": 1096 raise unittest.SkipTest("Incompatible with the fork start method.") 1097 executor.submit(mul, 12, 7).result() 1098 executor.submit(mul, 33, 25) 1099 executor.submit(mul, 25, 26).result() 1100 executor.submit(mul, 18, 29) 1101 executor.submit(mul, 1, 2).result() 1102 executor.submit(mul, 0, 9) 1103 self.assertLessEqual(len(executor._processes), 3) 1104 executor.shutdown() 1105 1106 def test_max_tasks_per_child(self): 1107 context = self.get_context() 1108 if context.get_start_method(allow_none=False) == "fork": 1109 with self.assertRaises(ValueError): 1110 self.executor_type(1, mp_context=context, max_tasks_per_child=3) 1111 return 1112 # not using self.executor as we need to control construction. 1113 # arguably this could go in another class w/o that mixin. 1114 executor = self.executor_type( 1115 1, mp_context=context, max_tasks_per_child=3) 1116 f1 = executor.submit(os.getpid) 1117 original_pid = f1.result() 1118 # The worker pid remains the same as the worker could be reused 1119 f2 = executor.submit(os.getpid) 1120 self.assertEqual(f2.result(), original_pid) 1121 self.assertEqual(len(executor._processes), 1) 1122 f3 = executor.submit(os.getpid) 1123 self.assertEqual(f3.result(), original_pid) 1124 1125 # A new worker is spawned, with a statistically different pid, 1126 # while the previous was reaped. 1127 f4 = executor.submit(os.getpid) 1128 new_pid = f4.result() 1129 self.assertNotEqual(original_pid, new_pid) 1130 self.assertEqual(len(executor._processes), 1) 1131 1132 executor.shutdown() 1133 1134 def test_max_tasks_per_child_defaults_to_spawn_context(self): 1135 # not using self.executor as we need to control construction. 1136 # arguably this could go in another class w/o that mixin. 1137 executor = self.executor_type(1, max_tasks_per_child=3) 1138 self.assertEqual(executor._mp_context.get_start_method(), "spawn") 1139 1140 def test_max_tasks_early_shutdown(self): 1141 context = self.get_context() 1142 if context.get_start_method(allow_none=False) == "fork": 1143 raise unittest.SkipTest("Incompatible with the fork start method.") 1144 # not using self.executor as we need to control construction. 1145 # arguably this could go in another class w/o that mixin. 1146 executor = self.executor_type( 1147 3, mp_context=context, max_tasks_per_child=1) 1148 futures = [] 1149 for i in range(6): 1150 futures.append(executor.submit(mul, i, i)) 1151 executor.shutdown() 1152 for i, future in enumerate(futures): 1153 self.assertEqual(future.result(), mul(i, i)) 1154 1155 1156create_executor_tests(ProcessPoolExecutorTest, 1157 executor_mixins=(ProcessPoolForkMixin, 1158 ProcessPoolForkserverMixin, 1159 ProcessPoolSpawnMixin)) 1160 1161def _crash(delay=None): 1162 """Induces a segfault.""" 1163 if delay: 1164 time.sleep(delay) 1165 import faulthandler 1166 faulthandler.disable() 1167 faulthandler._sigsegv() 1168 1169 1170def _exit(): 1171 """Induces a sys exit with exitcode 1.""" 1172 sys.exit(1) 1173 1174 1175def _raise_error(Err): 1176 """Function that raises an Exception in process.""" 1177 raise Err() 1178 1179 1180def _raise_error_ignore_stderr(Err): 1181 """Function that raises an Exception in process and ignores stderr.""" 1182 import io 1183 sys.stderr = io.StringIO() 1184 raise Err() 1185 1186 1187def _return_instance(cls): 1188 """Function that returns a instance of cls.""" 1189 return cls() 1190 1191 1192class CrashAtPickle(object): 1193 """Bad object that triggers a segfault at pickling time.""" 1194 def __reduce__(self): 1195 _crash() 1196 1197 1198class CrashAtUnpickle(object): 1199 """Bad object that triggers a segfault at unpickling time.""" 1200 def __reduce__(self): 1201 return _crash, () 1202 1203 1204class ExitAtPickle(object): 1205 """Bad object that triggers a process exit at pickling time.""" 1206 def __reduce__(self): 1207 _exit() 1208 1209 1210class ExitAtUnpickle(object): 1211 """Bad object that triggers a process exit at unpickling time.""" 1212 def __reduce__(self): 1213 return _exit, () 1214 1215 1216class ErrorAtPickle(object): 1217 """Bad object that triggers an error at pickling time.""" 1218 def __reduce__(self): 1219 from pickle import PicklingError 1220 raise PicklingError("Error in pickle") 1221 1222 1223class ErrorAtUnpickle(object): 1224 """Bad object that triggers an error at unpickling time.""" 1225 def __reduce__(self): 1226 from pickle import UnpicklingError 1227 return _raise_error_ignore_stderr, (UnpicklingError, ) 1228 1229 1230class ExecutorDeadlockTest: 1231 TIMEOUT = support.SHORT_TIMEOUT 1232 1233 def _fail_on_deadlock(self, executor): 1234 # If we did not recover before TIMEOUT seconds, consider that the 1235 # executor is in a deadlock state and forcefully clean all its 1236 # composants. 1237 import faulthandler 1238 from tempfile import TemporaryFile 1239 with TemporaryFile(mode="w+") as f: 1240 faulthandler.dump_traceback(file=f) 1241 f.seek(0) 1242 tb = f.read() 1243 for p in executor._processes.values(): 1244 p.terminate() 1245 # This should be safe to call executor.shutdown here as all possible 1246 # deadlocks should have been broken. 1247 executor.shutdown(wait=True) 1248 print(f"\nTraceback:\n {tb}", file=sys.__stderr__) 1249 self.fail(f"Executor deadlock:\n\n{tb}") 1250 1251 1252 def _check_crash(self, error, func, *args, ignore_stderr=False): 1253 # test for deadlock caused by crashes in a pool 1254 self.executor.shutdown(wait=True) 1255 1256 executor = self.executor_type( 1257 max_workers=2, mp_context=self.get_context()) 1258 res = executor.submit(func, *args) 1259 1260 if ignore_stderr: 1261 cm = support.captured_stderr() 1262 else: 1263 cm = contextlib.nullcontext() 1264 1265 try: 1266 with self.assertRaises(error): 1267 with cm: 1268 res.result(timeout=self.TIMEOUT) 1269 except futures.TimeoutError: 1270 # If we did not recover before TIMEOUT seconds, 1271 # consider that the executor is in a deadlock state 1272 self._fail_on_deadlock(executor) 1273 executor.shutdown(wait=True) 1274 1275 def test_error_at_task_pickle(self): 1276 # Check problem occurring while pickling a task in 1277 # the task_handler thread 1278 self._check_crash(PicklingError, id, ErrorAtPickle()) 1279 1280 def test_exit_at_task_unpickle(self): 1281 # Check problem occurring while unpickling a task on workers 1282 self._check_crash(BrokenProcessPool, id, ExitAtUnpickle()) 1283 1284 def test_error_at_task_unpickle(self): 1285 # Check problem occurring while unpickling a task on workers 1286 self._check_crash(BrokenProcessPool, id, ErrorAtUnpickle()) 1287 1288 def test_crash_at_task_unpickle(self): 1289 # Check problem occurring while unpickling a task on workers 1290 self._check_crash(BrokenProcessPool, id, CrashAtUnpickle()) 1291 1292 def test_crash_during_func_exec_on_worker(self): 1293 # Check problem occurring during func execution on workers 1294 self._check_crash(BrokenProcessPool, _crash) 1295 1296 def test_exit_during_func_exec_on_worker(self): 1297 # Check problem occurring during func execution on workers 1298 self._check_crash(SystemExit, _exit) 1299 1300 def test_error_during_func_exec_on_worker(self): 1301 # Check problem occurring during func execution on workers 1302 self._check_crash(RuntimeError, _raise_error, RuntimeError) 1303 1304 def test_crash_during_result_pickle_on_worker(self): 1305 # Check problem occurring while pickling a task result 1306 # on workers 1307 self._check_crash(BrokenProcessPool, _return_instance, CrashAtPickle) 1308 1309 def test_exit_during_result_pickle_on_worker(self): 1310 # Check problem occurring while pickling a task result 1311 # on workers 1312 self._check_crash(SystemExit, _return_instance, ExitAtPickle) 1313 1314 def test_error_during_result_pickle_on_worker(self): 1315 # Check problem occurring while pickling a task result 1316 # on workers 1317 self._check_crash(PicklingError, _return_instance, ErrorAtPickle) 1318 1319 def test_error_during_result_unpickle_in_result_handler(self): 1320 # Check problem occurring while unpickling a task in 1321 # the result_handler thread 1322 self._check_crash(BrokenProcessPool, 1323 _return_instance, ErrorAtUnpickle, 1324 ignore_stderr=True) 1325 1326 def test_exit_during_result_unpickle_in_result_handler(self): 1327 # Check problem occurring while unpickling a task in 1328 # the result_handler thread 1329 self._check_crash(BrokenProcessPool, _return_instance, ExitAtUnpickle) 1330 1331 def test_shutdown_deadlock(self): 1332 # Test that the pool calling shutdown do not cause deadlock 1333 # if a worker fails after the shutdown call. 1334 self.executor.shutdown(wait=True) 1335 with self.executor_type(max_workers=2, 1336 mp_context=self.get_context()) as executor: 1337 self.executor = executor # Allow clean up in fail_on_deadlock 1338 f = executor.submit(_crash, delay=.1) 1339 executor.shutdown(wait=True) 1340 with self.assertRaises(BrokenProcessPool): 1341 f.result() 1342 1343 def test_shutdown_deadlock_pickle(self): 1344 # Test that the pool calling shutdown with wait=False does not cause 1345 # a deadlock if a task fails at pickle after the shutdown call. 1346 # Reported in bpo-39104. 1347 self.executor.shutdown(wait=True) 1348 with self.executor_type(max_workers=2, 1349 mp_context=self.get_context()) as executor: 1350 self.executor = executor # Allow clean up in fail_on_deadlock 1351 1352 # Start the executor and get the executor_manager_thread to collect 1353 # the threads and avoid dangling thread that should be cleaned up 1354 # asynchronously. 1355 executor.submit(id, 42).result() 1356 executor_manager = executor._executor_manager_thread 1357 1358 # Submit a task that fails at pickle and shutdown the executor 1359 # without waiting 1360 f = executor.submit(id, ErrorAtPickle()) 1361 executor.shutdown(wait=False) 1362 with self.assertRaises(PicklingError): 1363 f.result() 1364 1365 # Make sure the executor is eventually shutdown and do not leave 1366 # dangling threads 1367 executor_manager.join() 1368 1369 1370create_executor_tests(ExecutorDeadlockTest, 1371 executor_mixins=(ProcessPoolForkMixin, 1372 ProcessPoolForkserverMixin, 1373 ProcessPoolSpawnMixin)) 1374 1375 1376class FutureTests(BaseTestCase): 1377 def test_done_callback_with_result(self): 1378 callback_result = None 1379 def fn(callback_future): 1380 nonlocal callback_result 1381 callback_result = callback_future.result() 1382 1383 f = Future() 1384 f.add_done_callback(fn) 1385 f.set_result(5) 1386 self.assertEqual(5, callback_result) 1387 1388 def test_done_callback_with_exception(self): 1389 callback_exception = None 1390 def fn(callback_future): 1391 nonlocal callback_exception 1392 callback_exception = callback_future.exception() 1393 1394 f = Future() 1395 f.add_done_callback(fn) 1396 f.set_exception(Exception('test')) 1397 self.assertEqual(('test',), callback_exception.args) 1398 1399 def test_done_callback_with_cancel(self): 1400 was_cancelled = None 1401 def fn(callback_future): 1402 nonlocal was_cancelled 1403 was_cancelled = callback_future.cancelled() 1404 1405 f = Future() 1406 f.add_done_callback(fn) 1407 self.assertTrue(f.cancel()) 1408 self.assertTrue(was_cancelled) 1409 1410 def test_done_callback_raises(self): 1411 with support.captured_stderr() as stderr: 1412 raising_was_called = False 1413 fn_was_called = False 1414 1415 def raising_fn(callback_future): 1416 nonlocal raising_was_called 1417 raising_was_called = True 1418 raise Exception('doh!') 1419 1420 def fn(callback_future): 1421 nonlocal fn_was_called 1422 fn_was_called = True 1423 1424 f = Future() 1425 f.add_done_callback(raising_fn) 1426 f.add_done_callback(fn) 1427 f.set_result(5) 1428 self.assertTrue(raising_was_called) 1429 self.assertTrue(fn_was_called) 1430 self.assertIn('Exception: doh!', stderr.getvalue()) 1431 1432 def test_done_callback_already_successful(self): 1433 callback_result = None 1434 def fn(callback_future): 1435 nonlocal callback_result 1436 callback_result = callback_future.result() 1437 1438 f = Future() 1439 f.set_result(5) 1440 f.add_done_callback(fn) 1441 self.assertEqual(5, callback_result) 1442 1443 def test_done_callback_already_failed(self): 1444 callback_exception = None 1445 def fn(callback_future): 1446 nonlocal callback_exception 1447 callback_exception = callback_future.exception() 1448 1449 f = Future() 1450 f.set_exception(Exception('test')) 1451 f.add_done_callback(fn) 1452 self.assertEqual(('test',), callback_exception.args) 1453 1454 def test_done_callback_already_cancelled(self): 1455 was_cancelled = None 1456 def fn(callback_future): 1457 nonlocal was_cancelled 1458 was_cancelled = callback_future.cancelled() 1459 1460 f = Future() 1461 self.assertTrue(f.cancel()) 1462 f.add_done_callback(fn) 1463 self.assertTrue(was_cancelled) 1464 1465 def test_done_callback_raises_already_succeeded(self): 1466 with support.captured_stderr() as stderr: 1467 def raising_fn(callback_future): 1468 raise Exception('doh!') 1469 1470 f = Future() 1471 1472 # Set the result first to simulate a future that runs instantly, 1473 # effectively allowing the callback to be run immediately. 1474 f.set_result(5) 1475 f.add_done_callback(raising_fn) 1476 1477 self.assertIn('exception calling callback for', stderr.getvalue()) 1478 self.assertIn('doh!', stderr.getvalue()) 1479 1480 1481 def test_repr(self): 1482 self.assertRegex(repr(PENDING_FUTURE), 1483 '<Future at 0x[0-9a-f]+ state=pending>') 1484 self.assertRegex(repr(RUNNING_FUTURE), 1485 '<Future at 0x[0-9a-f]+ state=running>') 1486 self.assertRegex(repr(CANCELLED_FUTURE), 1487 '<Future at 0x[0-9a-f]+ state=cancelled>') 1488 self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE), 1489 '<Future at 0x[0-9a-f]+ state=cancelled>') 1490 self.assertRegex( 1491 repr(EXCEPTION_FUTURE), 1492 '<Future at 0x[0-9a-f]+ state=finished raised OSError>') 1493 self.assertRegex( 1494 repr(SUCCESSFUL_FUTURE), 1495 '<Future at 0x[0-9a-f]+ state=finished returned int>') 1496 1497 1498 def test_cancel(self): 1499 f1 = create_future(state=PENDING) 1500 f2 = create_future(state=RUNNING) 1501 f3 = create_future(state=CANCELLED) 1502 f4 = create_future(state=CANCELLED_AND_NOTIFIED) 1503 f5 = create_future(state=FINISHED, exception=OSError()) 1504 f6 = create_future(state=FINISHED, result=5) 1505 1506 self.assertTrue(f1.cancel()) 1507 self.assertEqual(f1._state, CANCELLED) 1508 1509 self.assertFalse(f2.cancel()) 1510 self.assertEqual(f2._state, RUNNING) 1511 1512 self.assertTrue(f3.cancel()) 1513 self.assertEqual(f3._state, CANCELLED) 1514 1515 self.assertTrue(f4.cancel()) 1516 self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED) 1517 1518 self.assertFalse(f5.cancel()) 1519 self.assertEqual(f5._state, FINISHED) 1520 1521 self.assertFalse(f6.cancel()) 1522 self.assertEqual(f6._state, FINISHED) 1523 1524 def test_cancelled(self): 1525 self.assertFalse(PENDING_FUTURE.cancelled()) 1526 self.assertFalse(RUNNING_FUTURE.cancelled()) 1527 self.assertTrue(CANCELLED_FUTURE.cancelled()) 1528 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled()) 1529 self.assertFalse(EXCEPTION_FUTURE.cancelled()) 1530 self.assertFalse(SUCCESSFUL_FUTURE.cancelled()) 1531 1532 def test_done(self): 1533 self.assertFalse(PENDING_FUTURE.done()) 1534 self.assertFalse(RUNNING_FUTURE.done()) 1535 self.assertTrue(CANCELLED_FUTURE.done()) 1536 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done()) 1537 self.assertTrue(EXCEPTION_FUTURE.done()) 1538 self.assertTrue(SUCCESSFUL_FUTURE.done()) 1539 1540 def test_running(self): 1541 self.assertFalse(PENDING_FUTURE.running()) 1542 self.assertTrue(RUNNING_FUTURE.running()) 1543 self.assertFalse(CANCELLED_FUTURE.running()) 1544 self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running()) 1545 self.assertFalse(EXCEPTION_FUTURE.running()) 1546 self.assertFalse(SUCCESSFUL_FUTURE.running()) 1547 1548 def test_result_with_timeout(self): 1549 self.assertRaises(futures.TimeoutError, 1550 PENDING_FUTURE.result, timeout=0) 1551 self.assertRaises(futures.TimeoutError, 1552 RUNNING_FUTURE.result, timeout=0) 1553 self.assertRaises(futures.CancelledError, 1554 CANCELLED_FUTURE.result, timeout=0) 1555 self.assertRaises(futures.CancelledError, 1556 CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0) 1557 self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0) 1558 self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42) 1559 1560 def test_result_with_success(self): 1561 # TODO([email protected]): This test is timing dependent. 1562 def notification(): 1563 # Wait until the main thread is waiting for the result. 1564 time.sleep(1) 1565 f1.set_result(42) 1566 1567 f1 = create_future(state=PENDING) 1568 t = threading.Thread(target=notification) 1569 t.start() 1570 1571 self.assertEqual(f1.result(timeout=5), 42) 1572 t.join() 1573 1574 def test_result_with_cancel(self): 1575 # TODO([email protected]): This test is timing dependent. 1576 def notification(): 1577 # Wait until the main thread is waiting for the result. 1578 time.sleep(1) 1579 f1.cancel() 1580 1581 f1 = create_future(state=PENDING) 1582 t = threading.Thread(target=notification) 1583 t.start() 1584 1585 self.assertRaises(futures.CancelledError, 1586 f1.result, timeout=support.SHORT_TIMEOUT) 1587 t.join() 1588 1589 def test_exception_with_timeout(self): 1590 self.assertRaises(futures.TimeoutError, 1591 PENDING_FUTURE.exception, timeout=0) 1592 self.assertRaises(futures.TimeoutError, 1593 RUNNING_FUTURE.exception, timeout=0) 1594 self.assertRaises(futures.CancelledError, 1595 CANCELLED_FUTURE.exception, timeout=0) 1596 self.assertRaises(futures.CancelledError, 1597 CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0) 1598 self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0), 1599 OSError)) 1600 self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None) 1601 1602 def test_exception_with_success(self): 1603 def notification(): 1604 # Wait until the main thread is waiting for the exception. 1605 time.sleep(1) 1606 with f1._condition: 1607 f1._state = FINISHED 1608 f1._exception = OSError() 1609 f1._condition.notify_all() 1610 1611 f1 = create_future(state=PENDING) 1612 t = threading.Thread(target=notification) 1613 t.start() 1614 1615 self.assertTrue(isinstance(f1.exception(timeout=support.SHORT_TIMEOUT), OSError)) 1616 t.join() 1617 1618 def test_multiple_set_result(self): 1619 f = create_future(state=PENDING) 1620 f.set_result(1) 1621 1622 with self.assertRaisesRegex( 1623 futures.InvalidStateError, 1624 'FINISHED: <Future at 0x[0-9a-f]+ ' 1625 'state=finished returned int>' 1626 ): 1627 f.set_result(2) 1628 1629 self.assertTrue(f.done()) 1630 self.assertEqual(f.result(), 1) 1631 1632 def test_multiple_set_exception(self): 1633 f = create_future(state=PENDING) 1634 e = ValueError() 1635 f.set_exception(e) 1636 1637 with self.assertRaisesRegex( 1638 futures.InvalidStateError, 1639 'FINISHED: <Future at 0x[0-9a-f]+ ' 1640 'state=finished raised ValueError>' 1641 ): 1642 f.set_exception(Exception()) 1643 1644 self.assertEqual(f.exception(), e) 1645 1646 1647def setUpModule(): 1648 unittest.addModuleCleanup(multiprocessing.util._cleanup_tests) 1649 thread_info = threading_helper.threading_setup() 1650 unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info) 1651 1652 1653if __name__ == "__main__": 1654 unittest.main() 1655