1from test import support
2from test.support import import_helper
3from test.support import threading_helper
4
5# Skip tests if _multiprocessing wasn't built.
6import_helper.import_module('_multiprocessing')
7
8from test.support import hashlib_helper
9from test.support.script_helper import assert_python_ok
10
11import contextlib
12import itertools
13import logging
14from logging.handlers import QueueHandler
15import os
16import queue
17import signal
18import sys
19import threading
20import time
21import unittest
22import weakref
23from pickle import PicklingError
24
25from concurrent import futures
26from concurrent.futures._base import (
27    PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
28    BrokenExecutor)
29from concurrent.futures.process import BrokenProcessPool, _check_system_limits
30
31import multiprocessing.process
32import multiprocessing.util
33import multiprocessing as mp
34
35
36if support.check_sanitizer(address=True, memory=True):
37    # bpo-46633: Skip the test because it is too slow when Python is built
38    # with ASAN/MSAN: between 5 and 20 minutes on GitHub Actions.
39    raise unittest.SkipTest("test too slow on ASAN/MSAN build")
40
41
42def create_future(state=PENDING, exception=None, result=None):
43    f = Future()
44    f._state = state
45    f._exception = exception
46    f._result = result
47    return f
48
49
50PENDING_FUTURE = create_future(state=PENDING)
51RUNNING_FUTURE = create_future(state=RUNNING)
52CANCELLED_FUTURE = create_future(state=CANCELLED)
53CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
54EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError())
55SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
56
57INITIALIZER_STATUS = 'uninitialized'
58
59def mul(x, y):
60    return x * y
61
62def capture(*args, **kwargs):
63    return args, kwargs
64
65def sleep_and_raise(t):
66    time.sleep(t)
67    raise Exception('this is an exception')
68
69def sleep_and_print(t, msg):
70    time.sleep(t)
71    print(msg)
72    sys.stdout.flush()
73
74def init(x):
75    global INITIALIZER_STATUS
76    INITIALIZER_STATUS = x
77
78def get_init_status():
79    return INITIALIZER_STATUS
80
81def init_fail(log_queue=None):
82    if log_queue is not None:
83        logger = logging.getLogger('concurrent.futures')
84        logger.addHandler(QueueHandler(log_queue))
85        logger.setLevel('CRITICAL')
86        logger.propagate = False
87    time.sleep(0.1)  # let some futures be scheduled
88    raise ValueError('error in initializer')
89
90
91class MyObject(object):
92    def my_method(self):
93        pass
94
95
96class EventfulGCObj():
97    def __init__(self, mgr):
98        self.event = mgr.Event()
99
100    def __del__(self):
101        self.event.set()
102
103
104def make_dummy_object(_):
105    return MyObject()
106
107
108class BaseTestCase(unittest.TestCase):
109    def setUp(self):
110        self._thread_key = threading_helper.threading_setup()
111
112    def tearDown(self):
113        support.reap_children()
114        threading_helper.threading_cleanup(*self._thread_key)
115
116
117class ExecutorMixin:
118    worker_count = 5
119    executor_kwargs = {}
120
121    def setUp(self):
122        super().setUp()
123
124        self.t1 = time.monotonic()
125        if hasattr(self, "ctx"):
126            self.executor = self.executor_type(
127                max_workers=self.worker_count,
128                mp_context=self.get_context(),
129                **self.executor_kwargs)
130        else:
131            self.executor = self.executor_type(
132                max_workers=self.worker_count,
133                **self.executor_kwargs)
134
135    def tearDown(self):
136        self.executor.shutdown(wait=True)
137        self.executor = None
138
139        dt = time.monotonic() - self.t1
140        if support.verbose:
141            print("%.2fs" % dt, end=' ')
142        self.assertLess(dt, 300, "synchronization issue: test lasted too long")
143
144        super().tearDown()
145
146    def get_context(self):
147        return mp.get_context(self.ctx)
148
149
150class ThreadPoolMixin(ExecutorMixin):
151    executor_type = futures.ThreadPoolExecutor
152
153
154class ProcessPoolForkMixin(ExecutorMixin):
155    executor_type = futures.ProcessPoolExecutor
156    ctx = "fork"
157
158    def get_context(self):
159        try:
160            _check_system_limits()
161        except NotImplementedError:
162            self.skipTest("ProcessPoolExecutor unavailable on this system")
163        if sys.platform == "win32":
164            self.skipTest("require unix system")
165        return super().get_context()
166
167
168class ProcessPoolSpawnMixin(ExecutorMixin):
169    executor_type = futures.ProcessPoolExecutor
170    ctx = "spawn"
171
172    def get_context(self):
173        try:
174            _check_system_limits()
175        except NotImplementedError:
176            self.skipTest("ProcessPoolExecutor unavailable on this system")
177        return super().get_context()
178
179
180class ProcessPoolForkserverMixin(ExecutorMixin):
181    executor_type = futures.ProcessPoolExecutor
182    ctx = "forkserver"
183
184    def get_context(self):
185        try:
186            _check_system_limits()
187        except NotImplementedError:
188            self.skipTest("ProcessPoolExecutor unavailable on this system")
189        if sys.platform == "win32":
190            self.skipTest("require unix system")
191        return super().get_context()
192
193
194def create_executor_tests(mixin, bases=(BaseTestCase,),
195                          executor_mixins=(ThreadPoolMixin,
196                                           ProcessPoolForkMixin,
197                                           ProcessPoolForkserverMixin,
198                                           ProcessPoolSpawnMixin)):
199    def strip_mixin(name):
200        if name.endswith(('Mixin', 'Tests')):
201            return name[:-5]
202        elif name.endswith('Test'):
203            return name[:-4]
204        else:
205            return name
206
207    for exe in executor_mixins:
208        name = ("%s%sTest"
209                % (strip_mixin(exe.__name__), strip_mixin(mixin.__name__)))
210        cls = type(name, (mixin,) + (exe,) + bases, {})
211        globals()[name] = cls
212
213
214class InitializerMixin(ExecutorMixin):
215    worker_count = 2
216
217    def setUp(self):
218        global INITIALIZER_STATUS
219        INITIALIZER_STATUS = 'uninitialized'
220        self.executor_kwargs = dict(initializer=init,
221                                    initargs=('initialized',))
222        super().setUp()
223
224    def test_initializer(self):
225        futures = [self.executor.submit(get_init_status)
226                   for _ in range(self.worker_count)]
227
228        for f in futures:
229            self.assertEqual(f.result(), 'initialized')
230
231
232class FailingInitializerMixin(ExecutorMixin):
233    worker_count = 2
234
235    def setUp(self):
236        if hasattr(self, "ctx"):
237            # Pass a queue to redirect the child's logging output
238            self.mp_context = self.get_context()
239            self.log_queue = self.mp_context.Queue()
240            self.executor_kwargs = dict(initializer=init_fail,
241                                        initargs=(self.log_queue,))
242        else:
243            # In a thread pool, the child shares our logging setup
244            # (see _assert_logged())
245            self.mp_context = None
246            self.log_queue = None
247            self.executor_kwargs = dict(initializer=init_fail)
248        super().setUp()
249
250    def test_initializer(self):
251        with self._assert_logged('ValueError: error in initializer'):
252            try:
253                future = self.executor.submit(get_init_status)
254            except BrokenExecutor:
255                # Perhaps the executor is already broken
256                pass
257            else:
258                with self.assertRaises(BrokenExecutor):
259                    future.result()
260            # At some point, the executor should break
261            t1 = time.monotonic()
262            while not self.executor._broken:
263                if time.monotonic() - t1 > 5:
264                    self.fail("executor not broken after 5 s.")
265                time.sleep(0.01)
266            # ... and from this point submit() is guaranteed to fail
267            with self.assertRaises(BrokenExecutor):
268                self.executor.submit(get_init_status)
269
270    @contextlib.contextmanager
271    def _assert_logged(self, msg):
272        if self.log_queue is not None:
273            yield
274            output = []
275            try:
276                while True:
277                    output.append(self.log_queue.get_nowait().getMessage())
278            except queue.Empty:
279                pass
280        else:
281            with self.assertLogs('concurrent.futures', 'CRITICAL') as cm:
282                yield
283            output = cm.output
284        self.assertTrue(any(msg in line for line in output),
285                        output)
286
287
288create_executor_tests(InitializerMixin)
289create_executor_tests(FailingInitializerMixin)
290
291
292class ExecutorShutdownTest:
293    def test_run_after_shutdown(self):
294        self.executor.shutdown()
295        self.assertRaises(RuntimeError,
296                          self.executor.submit,
297                          pow, 2, 5)
298
299    def test_interpreter_shutdown(self):
300        # Test the atexit hook for shutdown of worker threads and processes
301        rc, out, err = assert_python_ok('-c', """if 1:
302            from concurrent.futures import {executor_type}
303            from time import sleep
304            from test.test_concurrent_futures import sleep_and_print
305            if __name__ == "__main__":
306                context = '{context}'
307                if context == "":
308                    t = {executor_type}(5)
309                else:
310                    from multiprocessing import get_context
311                    context = get_context(context)
312                    t = {executor_type}(5, mp_context=context)
313                t.submit(sleep_and_print, 1.0, "apple")
314            """.format(executor_type=self.executor_type.__name__,
315                       context=getattr(self, "ctx", "")))
316        # Errors in atexit hooks don't change the process exit code, check
317        # stderr manually.
318        self.assertFalse(err)
319        self.assertEqual(out.strip(), b"apple")
320
321    def test_submit_after_interpreter_shutdown(self):
322        # Test the atexit hook for shutdown of worker threads and processes
323        rc, out, err = assert_python_ok('-c', """if 1:
324            import atexit
325            @atexit.register
326            def run_last():
327                try:
328                    t.submit(id, None)
329                except RuntimeError:
330                    print("runtime-error")
331                    raise
332            from concurrent.futures import {executor_type}
333            if __name__ == "__main__":
334                context = '{context}'
335                if not context:
336                    t = {executor_type}(5)
337                else:
338                    from multiprocessing import get_context
339                    context = get_context(context)
340                    t = {executor_type}(5, mp_context=context)
341                    t.submit(id, 42).result()
342            """.format(executor_type=self.executor_type.__name__,
343                       context=getattr(self, "ctx", "")))
344        # Errors in atexit hooks don't change the process exit code, check
345        # stderr manually.
346        self.assertIn("RuntimeError: cannot schedule new futures", err.decode())
347        self.assertEqual(out.strip(), b"runtime-error")
348
349    def test_hang_issue12364(self):
350        fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
351        self.executor.shutdown()
352        for f in fs:
353            f.result()
354
355    def test_cancel_futures(self):
356        assert self.worker_count <= 5, "test needs few workers"
357        fs = [self.executor.submit(time.sleep, .1) for _ in range(50)]
358        self.executor.shutdown(cancel_futures=True)
359        # We can't guarantee the exact number of cancellations, but we can
360        # guarantee that *some* were cancelled. With few workers, many of
361        # the submitted futures should have been cancelled.
362        cancelled = [fut for fut in fs if fut.cancelled()]
363        self.assertGreater(len(cancelled), 20)
364
365        # Ensure the other futures were able to finish.
366        # Use "not fut.cancelled()" instead of "fut.done()" to include futures
367        # that may have been left in a pending state.
368        others = [fut for fut in fs if not fut.cancelled()]
369        for fut in others:
370            self.assertTrue(fut.done(), msg=f"{fut._state=}")
371            self.assertIsNone(fut.exception())
372
373        # Similar to the number of cancelled futures, we can't guarantee the
374        # exact number that completed. But, we can guarantee that at least
375        # one finished.
376        self.assertGreater(len(others), 0)
377
378    def test_hang_gh83386(self):
379        """shutdown(wait=False) doesn't hang at exit with running futures.
380
381        See https://github.com/python/cpython/issues/83386.
382        """
383        if self.executor_type == futures.ProcessPoolExecutor:
384            raise unittest.SkipTest(
385                "Hangs, see https://github.com/python/cpython/issues/83386")
386
387        rc, out, err = assert_python_ok('-c', """if True:
388            from concurrent.futures import {executor_type}
389            from test.test_concurrent_futures import sleep_and_print
390            if __name__ == "__main__":
391                if {context!r}: multiprocessing.set_start_method({context!r})
392                t = {executor_type}(max_workers=3)
393                t.submit(sleep_and_print, 1.0, "apple")
394                t.shutdown(wait=False)
395            """.format(executor_type=self.executor_type.__name__,
396                       context=getattr(self, 'ctx', None)))
397        self.assertFalse(err)
398        self.assertEqual(out.strip(), b"apple")
399
400    def test_hang_gh94440(self):
401        """shutdown(wait=True) doesn't hang when a future was submitted and
402        quickly canceled right before shutdown.
403
404        See https://github.com/python/cpython/issues/94440.
405        """
406        if not hasattr(signal, 'alarm'):
407            raise unittest.SkipTest(
408                "Tested platform does not support the alarm signal")
409
410        def timeout(_signum, _frame):
411            raise RuntimeError("timed out waiting for shutdown")
412
413        kwargs = {}
414        if getattr(self, 'ctx', None):
415            kwargs['mp_context'] = self.get_context()
416        executor = self.executor_type(max_workers=1, **kwargs)
417        executor.submit(int).result()
418        old_handler = signal.signal(signal.SIGALRM, timeout)
419        try:
420            signal.alarm(5)
421            executor.submit(int).cancel()
422            executor.shutdown(wait=True)
423        finally:
424            signal.alarm(0)
425            signal.signal(signal.SIGALRM, old_handler)
426
427
428class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase):
429    def test_threads_terminate(self):
430        def acquire_lock(lock):
431            lock.acquire()
432
433        sem = threading.Semaphore(0)
434        for i in range(3):
435            self.executor.submit(acquire_lock, sem)
436        self.assertEqual(len(self.executor._threads), 3)
437        for i in range(3):
438            sem.release()
439        self.executor.shutdown()
440        for t in self.executor._threads:
441            t.join()
442
443    def test_context_manager_shutdown(self):
444        with futures.ThreadPoolExecutor(max_workers=5) as e:
445            executor = e
446            self.assertEqual(list(e.map(abs, range(-5, 5))),
447                             [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
448
449        for t in executor._threads:
450            t.join()
451
452    def test_del_shutdown(self):
453        executor = futures.ThreadPoolExecutor(max_workers=5)
454        res = executor.map(abs, range(-5, 5))
455        threads = executor._threads
456        del executor
457
458        for t in threads:
459            t.join()
460
461        # Make sure the results were all computed before the
462        # executor got shutdown.
463        assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
464
465    def test_shutdown_no_wait(self):
466        # Ensure that the executor cleans up the threads when calling
467        # shutdown with wait=False
468        executor = futures.ThreadPoolExecutor(max_workers=5)
469        res = executor.map(abs, range(-5, 5))
470        threads = executor._threads
471        executor.shutdown(wait=False)
472        for t in threads:
473            t.join()
474
475        # Make sure the results were all computed before the
476        # executor got shutdown.
477        assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
478
479
480    def test_thread_names_assigned(self):
481        executor = futures.ThreadPoolExecutor(
482            max_workers=5, thread_name_prefix='SpecialPool')
483        executor.map(abs, range(-5, 5))
484        threads = executor._threads
485        del executor
486        support.gc_collect()  # For PyPy or other GCs.
487
488        for t in threads:
489            self.assertRegex(t.name, r'^SpecialPool_[0-4]$')
490            t.join()
491
492    def test_thread_names_default(self):
493        executor = futures.ThreadPoolExecutor(max_workers=5)
494        executor.map(abs, range(-5, 5))
495        threads = executor._threads
496        del executor
497        support.gc_collect()  # For PyPy or other GCs.
498
499        for t in threads:
500            # Ensure that our default name is reasonably sane and unique when
501            # no thread_name_prefix was supplied.
502            self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$')
503            t.join()
504
505    def test_cancel_futures_wait_false(self):
506        # Can only be reliably tested for TPE, since PPE often hangs with
507        # `wait=False` (even without *cancel_futures*).
508        rc, out, err = assert_python_ok('-c', """if True:
509            from concurrent.futures import ThreadPoolExecutor
510            from test.test_concurrent_futures import sleep_and_print
511            if __name__ == "__main__":
512                t = ThreadPoolExecutor()
513                t.submit(sleep_and_print, .1, "apple")
514                t.shutdown(wait=False, cancel_futures=True)
515            """)
516        # Errors in atexit hooks don't change the process exit code, check
517        # stderr manually.
518        self.assertFalse(err)
519        self.assertEqual(out.strip(), b"apple")
520
521
522class ProcessPoolShutdownTest(ExecutorShutdownTest):
523    def test_processes_terminate(self):
524        def acquire_lock(lock):
525            lock.acquire()
526
527        mp_context = self.get_context()
528        if mp_context.get_start_method(allow_none=False) == "fork":
529            # fork pre-spawns, not on demand.
530            expected_num_processes = self.worker_count
531        else:
532            expected_num_processes = 3
533
534        sem = mp_context.Semaphore(0)
535        for _ in range(3):
536            self.executor.submit(acquire_lock, sem)
537        self.assertEqual(len(self.executor._processes), expected_num_processes)
538        for _ in range(3):
539            sem.release()
540        processes = self.executor._processes
541        self.executor.shutdown()
542
543        for p in processes.values():
544            p.join()
545
546    def test_context_manager_shutdown(self):
547        with futures.ProcessPoolExecutor(
548                max_workers=5, mp_context=self.get_context()) as e:
549            processes = e._processes
550            self.assertEqual(list(e.map(abs, range(-5, 5))),
551                             [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
552
553        for p in processes.values():
554            p.join()
555
556    def test_del_shutdown(self):
557        executor = futures.ProcessPoolExecutor(
558                max_workers=5, mp_context=self.get_context())
559        res = executor.map(abs, range(-5, 5))
560        executor_manager_thread = executor._executor_manager_thread
561        processes = executor._processes
562        call_queue = executor._call_queue
563        executor_manager_thread = executor._executor_manager_thread
564        del executor
565        support.gc_collect()  # For PyPy or other GCs.
566
567        # Make sure that all the executor resources were properly cleaned by
568        # the shutdown process
569        executor_manager_thread.join()
570        for p in processes.values():
571            p.join()
572        call_queue.join_thread()
573
574        # Make sure the results were all computed before the
575        # executor got shutdown.
576        assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
577
578    def test_shutdown_no_wait(self):
579        # Ensure that the executor cleans up the processes when calling
580        # shutdown with wait=False
581        executor = futures.ProcessPoolExecutor(
582                max_workers=5, mp_context=self.get_context())
583        res = executor.map(abs, range(-5, 5))
584        processes = executor._processes
585        call_queue = executor._call_queue
586        executor_manager_thread = executor._executor_manager_thread
587        executor.shutdown(wait=False)
588
589        # Make sure that all the executor resources were properly cleaned by
590        # the shutdown process
591        executor_manager_thread.join()
592        for p in processes.values():
593            p.join()
594        call_queue.join_thread()
595
596        # Make sure the results were all computed before the executor got
597        # shutdown.
598        assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
599
600
601create_executor_tests(ProcessPoolShutdownTest,
602                      executor_mixins=(ProcessPoolForkMixin,
603                                       ProcessPoolForkserverMixin,
604                                       ProcessPoolSpawnMixin))
605
606
607class WaitTests:
608    def test_20369(self):
609        # See https://bugs.python.org/issue20369
610        future = self.executor.submit(time.sleep, 1.5)
611        done, not_done = futures.wait([future, future],
612                            return_when=futures.ALL_COMPLETED)
613        self.assertEqual({future}, done)
614        self.assertEqual(set(), not_done)
615
616
617    def test_first_completed(self):
618        future1 = self.executor.submit(mul, 21, 2)
619        future2 = self.executor.submit(time.sleep, 1.5)
620
621        done, not_done = futures.wait(
622                [CANCELLED_FUTURE, future1, future2],
623                 return_when=futures.FIRST_COMPLETED)
624
625        self.assertEqual(set([future1]), done)
626        self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
627
628    def test_first_completed_some_already_completed(self):
629        future1 = self.executor.submit(time.sleep, 1.5)
630
631        finished, pending = futures.wait(
632                 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
633                 return_when=futures.FIRST_COMPLETED)
634
635        self.assertEqual(
636                set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
637                finished)
638        self.assertEqual(set([future1]), pending)
639
640    def test_first_exception(self):
641        future1 = self.executor.submit(mul, 2, 21)
642        future2 = self.executor.submit(sleep_and_raise, 1.5)
643        future3 = self.executor.submit(time.sleep, 3)
644
645        finished, pending = futures.wait(
646                [future1, future2, future3],
647                return_when=futures.FIRST_EXCEPTION)
648
649        self.assertEqual(set([future1, future2]), finished)
650        self.assertEqual(set([future3]), pending)
651
652    def test_first_exception_some_already_complete(self):
653        future1 = self.executor.submit(divmod, 21, 0)
654        future2 = self.executor.submit(time.sleep, 1.5)
655
656        finished, pending = futures.wait(
657                [SUCCESSFUL_FUTURE,
658                 CANCELLED_FUTURE,
659                 CANCELLED_AND_NOTIFIED_FUTURE,
660                 future1, future2],
661                return_when=futures.FIRST_EXCEPTION)
662
663        self.assertEqual(set([SUCCESSFUL_FUTURE,
664                              CANCELLED_AND_NOTIFIED_FUTURE,
665                              future1]), finished)
666        self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
667
668    def test_first_exception_one_already_failed(self):
669        future1 = self.executor.submit(time.sleep, 2)
670
671        finished, pending = futures.wait(
672                 [EXCEPTION_FUTURE, future1],
673                 return_when=futures.FIRST_EXCEPTION)
674
675        self.assertEqual(set([EXCEPTION_FUTURE]), finished)
676        self.assertEqual(set([future1]), pending)
677
678    def test_all_completed(self):
679        future1 = self.executor.submit(divmod, 2, 0)
680        future2 = self.executor.submit(mul, 2, 21)
681
682        finished, pending = futures.wait(
683                [SUCCESSFUL_FUTURE,
684                 CANCELLED_AND_NOTIFIED_FUTURE,
685                 EXCEPTION_FUTURE,
686                 future1,
687                 future2],
688                return_when=futures.ALL_COMPLETED)
689
690        self.assertEqual(set([SUCCESSFUL_FUTURE,
691                              CANCELLED_AND_NOTIFIED_FUTURE,
692                              EXCEPTION_FUTURE,
693                              future1,
694                              future2]), finished)
695        self.assertEqual(set(), pending)
696
697    def test_timeout(self):
698        future1 = self.executor.submit(mul, 6, 7)
699        future2 = self.executor.submit(time.sleep, 6)
700
701        finished, pending = futures.wait(
702                [CANCELLED_AND_NOTIFIED_FUTURE,
703                 EXCEPTION_FUTURE,
704                 SUCCESSFUL_FUTURE,
705                 future1, future2],
706                timeout=5,
707                return_when=futures.ALL_COMPLETED)
708
709        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
710                              EXCEPTION_FUTURE,
711                              SUCCESSFUL_FUTURE,
712                              future1]), finished)
713        self.assertEqual(set([future2]), pending)
714
715
716class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, BaseTestCase):
717
718    def test_pending_calls_race(self):
719        # Issue #14406: multi-threaded race condition when waiting on all
720        # futures.
721        event = threading.Event()
722        def future_func():
723            event.wait()
724        oldswitchinterval = sys.getswitchinterval()
725        sys.setswitchinterval(1e-6)
726        try:
727            fs = {self.executor.submit(future_func) for i in range(100)}
728            event.set()
729            futures.wait(fs, return_when=futures.ALL_COMPLETED)
730        finally:
731            sys.setswitchinterval(oldswitchinterval)
732
733
734create_executor_tests(WaitTests,
735                      executor_mixins=(ProcessPoolForkMixin,
736                                       ProcessPoolForkserverMixin,
737                                       ProcessPoolSpawnMixin))
738
739
740class AsCompletedTests:
741    # TODO([email protected]): Should have a test with a non-zero timeout.
742    def test_no_timeout(self):
743        future1 = self.executor.submit(mul, 2, 21)
744        future2 = self.executor.submit(mul, 7, 6)
745
746        completed = set(futures.as_completed(
747                [CANCELLED_AND_NOTIFIED_FUTURE,
748                 EXCEPTION_FUTURE,
749                 SUCCESSFUL_FUTURE,
750                 future1, future2]))
751        self.assertEqual(set(
752                [CANCELLED_AND_NOTIFIED_FUTURE,
753                 EXCEPTION_FUTURE,
754                 SUCCESSFUL_FUTURE,
755                 future1, future2]),
756                completed)
757
758    def test_zero_timeout(self):
759        future1 = self.executor.submit(time.sleep, 2)
760        completed_futures = set()
761        try:
762            for future in futures.as_completed(
763                    [CANCELLED_AND_NOTIFIED_FUTURE,
764                     EXCEPTION_FUTURE,
765                     SUCCESSFUL_FUTURE,
766                     future1],
767                    timeout=0):
768                completed_futures.add(future)
769        except futures.TimeoutError:
770            pass
771
772        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
773                              EXCEPTION_FUTURE,
774                              SUCCESSFUL_FUTURE]),
775                         completed_futures)
776
777    def test_duplicate_futures(self):
778        # Issue 20367. Duplicate futures should not raise exceptions or give
779        # duplicate responses.
780        # Issue #31641: accept arbitrary iterables.
781        future1 = self.executor.submit(time.sleep, 2)
782        completed = [
783            f for f in futures.as_completed(itertools.repeat(future1, 3))
784        ]
785        self.assertEqual(len(completed), 1)
786
787    def test_free_reference_yielded_future(self):
788        # Issue #14406: Generator should not keep references
789        # to finished futures.
790        futures_list = [Future() for _ in range(8)]
791        futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED))
792        futures_list.append(create_future(state=FINISHED, result=42))
793
794        with self.assertRaises(futures.TimeoutError):
795            for future in futures.as_completed(futures_list, timeout=0):
796                futures_list.remove(future)
797                wr = weakref.ref(future)
798                del future
799                support.gc_collect()  # For PyPy or other GCs.
800                self.assertIsNone(wr())
801
802        futures_list[0].set_result("test")
803        for future in futures.as_completed(futures_list):
804            futures_list.remove(future)
805            wr = weakref.ref(future)
806            del future
807            support.gc_collect()  # For PyPy or other GCs.
808            self.assertIsNone(wr())
809            if futures_list:
810                futures_list[0].set_result("test")
811
812    def test_correct_timeout_exception_msg(self):
813        futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE,
814                        RUNNING_FUTURE, SUCCESSFUL_FUTURE]
815
816        with self.assertRaises(futures.TimeoutError) as cm:
817            list(futures.as_completed(futures_list, timeout=0))
818
819        self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished')
820
821
822create_executor_tests(AsCompletedTests)
823
824
825class ExecutorTest:
826    # Executor.shutdown() and context manager usage is tested by
827    # ExecutorShutdownTest.
828    def test_submit(self):
829        future = self.executor.submit(pow, 2, 8)
830        self.assertEqual(256, future.result())
831
832    def test_submit_keyword(self):
833        future = self.executor.submit(mul, 2, y=8)
834        self.assertEqual(16, future.result())
835        future = self.executor.submit(capture, 1, self=2, fn=3)
836        self.assertEqual(future.result(), ((1,), {'self': 2, 'fn': 3}))
837        with self.assertRaises(TypeError):
838            self.executor.submit(fn=capture, arg=1)
839        with self.assertRaises(TypeError):
840            self.executor.submit(arg=1)
841
842    def test_map(self):
843        self.assertEqual(
844                list(self.executor.map(pow, range(10), range(10))),
845                list(map(pow, range(10), range(10))))
846
847        self.assertEqual(
848                list(self.executor.map(pow, range(10), range(10), chunksize=3)),
849                list(map(pow, range(10), range(10))))
850
851    def test_map_exception(self):
852        i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
853        self.assertEqual(i.__next__(), (0, 1))
854        self.assertEqual(i.__next__(), (0, 1))
855        self.assertRaises(ZeroDivisionError, i.__next__)
856
857    def test_map_timeout(self):
858        results = []
859        try:
860            for i in self.executor.map(time.sleep,
861                                       [0, 0, 6],
862                                       timeout=5):
863                results.append(i)
864        except futures.TimeoutError:
865            pass
866        else:
867            self.fail('expected TimeoutError')
868
869        self.assertEqual([None, None], results)
870
871    def test_shutdown_race_issue12456(self):
872        # Issue #12456: race condition at shutdown where trying to post a
873        # sentinel in the call queue blocks (the queue is full while processes
874        # have exited).
875        self.executor.map(str, [2] * (self.worker_count + 1))
876        self.executor.shutdown()
877
878    @support.cpython_only
879    def test_no_stale_references(self):
880        # Issue #16284: check that the executors don't unnecessarily hang onto
881        # references.
882        my_object = MyObject()
883        my_object_collected = threading.Event()
884        my_object_callback = weakref.ref(
885            my_object, lambda obj: my_object_collected.set())
886        # Deliberately discarding the future.
887        self.executor.submit(my_object.my_method)
888        del my_object
889
890        collected = my_object_collected.wait(timeout=support.SHORT_TIMEOUT)
891        self.assertTrue(collected,
892                        "Stale reference not collected within timeout.")
893
894    def test_max_workers_negative(self):
895        for number in (0, -1):
896            with self.assertRaisesRegex(ValueError,
897                                        "max_workers must be greater "
898                                        "than 0"):
899                self.executor_type(max_workers=number)
900
901    def test_free_reference(self):
902        # Issue #14406: Result iterator should not keep an internal
903        # reference to result objects.
904        for obj in self.executor.map(make_dummy_object, range(10)):
905            wr = weakref.ref(obj)
906            del obj
907            support.gc_collect()  # For PyPy or other GCs.
908            self.assertIsNone(wr())
909
910
911class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase):
912    def test_map_submits_without_iteration(self):
913        """Tests verifying issue 11777."""
914        finished = []
915        def record_finished(n):
916            finished.append(n)
917
918        self.executor.map(record_finished, range(10))
919        self.executor.shutdown(wait=True)
920        self.assertCountEqual(finished, range(10))
921
922    def test_default_workers(self):
923        executor = self.executor_type()
924        expected = min(32, (os.cpu_count() or 1) + 4)
925        self.assertEqual(executor._max_workers, expected)
926
927    def test_saturation(self):
928        executor = self.executor_type(4)
929        def acquire_lock(lock):
930            lock.acquire()
931
932        sem = threading.Semaphore(0)
933        for i in range(15 * executor._max_workers):
934            executor.submit(acquire_lock, sem)
935        self.assertEqual(len(executor._threads), executor._max_workers)
936        for i in range(15 * executor._max_workers):
937            sem.release()
938        executor.shutdown(wait=True)
939
940    def test_idle_thread_reuse(self):
941        executor = self.executor_type()
942        executor.submit(mul, 21, 2).result()
943        executor.submit(mul, 6, 7).result()
944        executor.submit(mul, 3, 14).result()
945        self.assertEqual(len(executor._threads), 1)
946        executor.shutdown(wait=True)
947
948    @unittest.skipUnless(hasattr(os, 'register_at_fork'), 'need os.register_at_fork')
949    def test_hang_global_shutdown_lock(self):
950        # bpo-45021: _global_shutdown_lock should be reinitialized in the child
951        # process, otherwise it will never exit
952        def submit(pool):
953            pool.submit(submit, pool)
954
955        with futures.ThreadPoolExecutor(1) as pool:
956            pool.submit(submit, pool)
957
958            for _ in range(50):
959                with futures.ProcessPoolExecutor(1, mp_context=mp.get_context('fork')) as workers:
960                    workers.submit(tuple)
961
962    def test_executor_map_current_future_cancel(self):
963        stop_event = threading.Event()
964        log = []
965
966        def log_n_wait(ident):
967            log.append(f"{ident=} started")
968            try:
969                stop_event.wait()
970            finally:
971                log.append(f"{ident=} stopped")
972
973        with self.executor_type(max_workers=1) as pool:
974            # submit work to saturate the pool
975            fut = pool.submit(log_n_wait, ident="first")
976            try:
977                with contextlib.closing(
978                    pool.map(log_n_wait, ["second", "third"], timeout=0)
979                ) as gen:
980                    with self.assertRaises(TimeoutError):
981                        next(gen)
982            finally:
983                stop_event.set()
984            fut.result()
985        # ident='second' is cancelled as a result of raising a TimeoutError
986        # ident='third' is cancelled because it remained in the collection of futures
987        self.assertListEqual(log, ["ident='first' started", "ident='first' stopped"])
988
989
990class ProcessPoolExecutorTest(ExecutorTest):
991
992    @unittest.skipUnless(sys.platform=='win32', 'Windows-only process limit')
993    def test_max_workers_too_large(self):
994        with self.assertRaisesRegex(ValueError,
995                                    "max_workers must be <= 61"):
996            futures.ProcessPoolExecutor(max_workers=62)
997
998    def test_killed_child(self):
999        # When a child process is abruptly terminated, the whole pool gets
1000        # "broken".
1001        futures = [self.executor.submit(time.sleep, 3)]
1002        # Get one of the processes, and terminate (kill) it
1003        p = next(iter(self.executor._processes.values()))
1004        p.terminate()
1005        for fut in futures:
1006            self.assertRaises(BrokenProcessPool, fut.result)
1007        # Submitting other jobs fails as well.
1008        self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
1009
1010    def test_map_chunksize(self):
1011        def bad_map():
1012            list(self.executor.map(pow, range(40), range(40), chunksize=-1))
1013
1014        ref = list(map(pow, range(40), range(40)))
1015        self.assertEqual(
1016            list(self.executor.map(pow, range(40), range(40), chunksize=6)),
1017            ref)
1018        self.assertEqual(
1019            list(self.executor.map(pow, range(40), range(40), chunksize=50)),
1020            ref)
1021        self.assertEqual(
1022            list(self.executor.map(pow, range(40), range(40), chunksize=40)),
1023            ref)
1024        self.assertRaises(ValueError, bad_map)
1025
1026    @classmethod
1027    def _test_traceback(cls):
1028        raise RuntimeError(123) # some comment
1029
1030    def test_traceback(self):
1031        # We want ensure that the traceback from the child process is
1032        # contained in the traceback raised in the main process.
1033        future = self.executor.submit(self._test_traceback)
1034        with self.assertRaises(Exception) as cm:
1035            future.result()
1036
1037        exc = cm.exception
1038        self.assertIs(type(exc), RuntimeError)
1039        self.assertEqual(exc.args, (123,))
1040        cause = exc.__cause__
1041        self.assertIs(type(cause), futures.process._RemoteTraceback)
1042        self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
1043
1044        with support.captured_stderr() as f1:
1045            try:
1046                raise exc
1047            except RuntimeError:
1048                sys.excepthook(*sys.exc_info())
1049        self.assertIn('raise RuntimeError(123) # some comment',
1050                      f1.getvalue())
1051
1052    @hashlib_helper.requires_hashdigest('md5')
1053    def test_ressources_gced_in_workers(self):
1054        # Ensure that argument for a job are correctly gc-ed after the job
1055        # is finished
1056        mgr = self.get_context().Manager()
1057        obj = EventfulGCObj(mgr)
1058        future = self.executor.submit(id, obj)
1059        future.result()
1060
1061        self.assertTrue(obj.event.wait(timeout=1))
1062
1063        # explicitly destroy the object to ensure that EventfulGCObj.__del__()
1064        # is called while manager is still running.
1065        obj = None
1066        support.gc_collect()
1067
1068        mgr.shutdown()
1069        mgr.join()
1070
1071    def test_saturation(self):
1072        executor = self.executor
1073        mp_context = self.get_context()
1074        sem = mp_context.Semaphore(0)
1075        job_count = 15 * executor._max_workers
1076        for _ in range(job_count):
1077            executor.submit(sem.acquire)
1078        self.assertEqual(len(executor._processes), executor._max_workers)
1079        for _ in range(job_count):
1080            sem.release()
1081
1082    def test_idle_process_reuse_one(self):
1083        executor = self.executor
1084        assert executor._max_workers >= 4
1085        if self.get_context().get_start_method(allow_none=False) == "fork":
1086            raise unittest.SkipTest("Incompatible with the fork start method.")
1087        executor.submit(mul, 21, 2).result()
1088        executor.submit(mul, 6, 7).result()
1089        executor.submit(mul, 3, 14).result()
1090        self.assertEqual(len(executor._processes), 1)
1091
1092    def test_idle_process_reuse_multiple(self):
1093        executor = self.executor
1094        assert executor._max_workers <= 5
1095        if self.get_context().get_start_method(allow_none=False) == "fork":
1096            raise unittest.SkipTest("Incompatible with the fork start method.")
1097        executor.submit(mul, 12, 7).result()
1098        executor.submit(mul, 33, 25)
1099        executor.submit(mul, 25, 26).result()
1100        executor.submit(mul, 18, 29)
1101        executor.submit(mul, 1, 2).result()
1102        executor.submit(mul, 0, 9)
1103        self.assertLessEqual(len(executor._processes), 3)
1104        executor.shutdown()
1105
1106    def test_max_tasks_per_child(self):
1107        context = self.get_context()
1108        if context.get_start_method(allow_none=False) == "fork":
1109            with self.assertRaises(ValueError):
1110                self.executor_type(1, mp_context=context, max_tasks_per_child=3)
1111            return
1112        # not using self.executor as we need to control construction.
1113        # arguably this could go in another class w/o that mixin.
1114        executor = self.executor_type(
1115                1, mp_context=context, max_tasks_per_child=3)
1116        f1 = executor.submit(os.getpid)
1117        original_pid = f1.result()
1118        # The worker pid remains the same as the worker could be reused
1119        f2 = executor.submit(os.getpid)
1120        self.assertEqual(f2.result(), original_pid)
1121        self.assertEqual(len(executor._processes), 1)
1122        f3 = executor.submit(os.getpid)
1123        self.assertEqual(f3.result(), original_pid)
1124
1125        # A new worker is spawned, with a statistically different pid,
1126        # while the previous was reaped.
1127        f4 = executor.submit(os.getpid)
1128        new_pid = f4.result()
1129        self.assertNotEqual(original_pid, new_pid)
1130        self.assertEqual(len(executor._processes), 1)
1131
1132        executor.shutdown()
1133
1134    def test_max_tasks_per_child_defaults_to_spawn_context(self):
1135        # not using self.executor as we need to control construction.
1136        # arguably this could go in another class w/o that mixin.
1137        executor = self.executor_type(1, max_tasks_per_child=3)
1138        self.assertEqual(executor._mp_context.get_start_method(), "spawn")
1139
1140    def test_max_tasks_early_shutdown(self):
1141        context = self.get_context()
1142        if context.get_start_method(allow_none=False) == "fork":
1143            raise unittest.SkipTest("Incompatible with the fork start method.")
1144        # not using self.executor as we need to control construction.
1145        # arguably this could go in another class w/o that mixin.
1146        executor = self.executor_type(
1147                3, mp_context=context, max_tasks_per_child=1)
1148        futures = []
1149        for i in range(6):
1150            futures.append(executor.submit(mul, i, i))
1151        executor.shutdown()
1152        for i, future in enumerate(futures):
1153            self.assertEqual(future.result(), mul(i, i))
1154
1155
1156create_executor_tests(ProcessPoolExecutorTest,
1157                      executor_mixins=(ProcessPoolForkMixin,
1158                                       ProcessPoolForkserverMixin,
1159                                       ProcessPoolSpawnMixin))
1160
1161def _crash(delay=None):
1162    """Induces a segfault."""
1163    if delay:
1164        time.sleep(delay)
1165    import faulthandler
1166    faulthandler.disable()
1167    faulthandler._sigsegv()
1168
1169
1170def _exit():
1171    """Induces a sys exit with exitcode 1."""
1172    sys.exit(1)
1173
1174
1175def _raise_error(Err):
1176    """Function that raises an Exception in process."""
1177    raise Err()
1178
1179
1180def _raise_error_ignore_stderr(Err):
1181    """Function that raises an Exception in process and ignores stderr."""
1182    import io
1183    sys.stderr = io.StringIO()
1184    raise Err()
1185
1186
1187def _return_instance(cls):
1188    """Function that returns a instance of cls."""
1189    return cls()
1190
1191
1192class CrashAtPickle(object):
1193    """Bad object that triggers a segfault at pickling time."""
1194    def __reduce__(self):
1195        _crash()
1196
1197
1198class CrashAtUnpickle(object):
1199    """Bad object that triggers a segfault at unpickling time."""
1200    def __reduce__(self):
1201        return _crash, ()
1202
1203
1204class ExitAtPickle(object):
1205    """Bad object that triggers a process exit at pickling time."""
1206    def __reduce__(self):
1207        _exit()
1208
1209
1210class ExitAtUnpickle(object):
1211    """Bad object that triggers a process exit at unpickling time."""
1212    def __reduce__(self):
1213        return _exit, ()
1214
1215
1216class ErrorAtPickle(object):
1217    """Bad object that triggers an error at pickling time."""
1218    def __reduce__(self):
1219        from pickle import PicklingError
1220        raise PicklingError("Error in pickle")
1221
1222
1223class ErrorAtUnpickle(object):
1224    """Bad object that triggers an error at unpickling time."""
1225    def __reduce__(self):
1226        from pickle import UnpicklingError
1227        return _raise_error_ignore_stderr, (UnpicklingError, )
1228
1229
1230class ExecutorDeadlockTest:
1231    TIMEOUT = support.SHORT_TIMEOUT
1232
1233    def _fail_on_deadlock(self, executor):
1234        # If we did not recover before TIMEOUT seconds, consider that the
1235        # executor is in a deadlock state and forcefully clean all its
1236        # composants.
1237        import faulthandler
1238        from tempfile import TemporaryFile
1239        with TemporaryFile(mode="w+") as f:
1240            faulthandler.dump_traceback(file=f)
1241            f.seek(0)
1242            tb = f.read()
1243        for p in executor._processes.values():
1244            p.terminate()
1245        # This should be safe to call executor.shutdown here as all possible
1246        # deadlocks should have been broken.
1247        executor.shutdown(wait=True)
1248        print(f"\nTraceback:\n {tb}", file=sys.__stderr__)
1249        self.fail(f"Executor deadlock:\n\n{tb}")
1250
1251
1252    def _check_crash(self, error, func, *args, ignore_stderr=False):
1253        # test for deadlock caused by crashes in a pool
1254        self.executor.shutdown(wait=True)
1255
1256        executor = self.executor_type(
1257            max_workers=2, mp_context=self.get_context())
1258        res = executor.submit(func, *args)
1259
1260        if ignore_stderr:
1261            cm = support.captured_stderr()
1262        else:
1263            cm = contextlib.nullcontext()
1264
1265        try:
1266            with self.assertRaises(error):
1267                with cm:
1268                    res.result(timeout=self.TIMEOUT)
1269        except futures.TimeoutError:
1270            # If we did not recover before TIMEOUT seconds,
1271            # consider that the executor is in a deadlock state
1272            self._fail_on_deadlock(executor)
1273        executor.shutdown(wait=True)
1274
1275    def test_error_at_task_pickle(self):
1276        # Check problem occurring while pickling a task in
1277        # the task_handler thread
1278        self._check_crash(PicklingError, id, ErrorAtPickle())
1279
1280    def test_exit_at_task_unpickle(self):
1281        # Check problem occurring while unpickling a task on workers
1282        self._check_crash(BrokenProcessPool, id, ExitAtUnpickle())
1283
1284    def test_error_at_task_unpickle(self):
1285        # Check problem occurring while unpickling a task on workers
1286        self._check_crash(BrokenProcessPool, id, ErrorAtUnpickle())
1287
1288    def test_crash_at_task_unpickle(self):
1289        # Check problem occurring while unpickling a task on workers
1290        self._check_crash(BrokenProcessPool, id, CrashAtUnpickle())
1291
1292    def test_crash_during_func_exec_on_worker(self):
1293        # Check problem occurring during func execution on workers
1294        self._check_crash(BrokenProcessPool, _crash)
1295
1296    def test_exit_during_func_exec_on_worker(self):
1297        # Check problem occurring during func execution on workers
1298        self._check_crash(SystemExit, _exit)
1299
1300    def test_error_during_func_exec_on_worker(self):
1301        # Check problem occurring during func execution on workers
1302        self._check_crash(RuntimeError, _raise_error, RuntimeError)
1303
1304    def test_crash_during_result_pickle_on_worker(self):
1305        # Check problem occurring while pickling a task result
1306        # on workers
1307        self._check_crash(BrokenProcessPool, _return_instance, CrashAtPickle)
1308
1309    def test_exit_during_result_pickle_on_worker(self):
1310        # Check problem occurring while pickling a task result
1311        # on workers
1312        self._check_crash(SystemExit, _return_instance, ExitAtPickle)
1313
1314    def test_error_during_result_pickle_on_worker(self):
1315        # Check problem occurring while pickling a task result
1316        # on workers
1317        self._check_crash(PicklingError, _return_instance, ErrorAtPickle)
1318
1319    def test_error_during_result_unpickle_in_result_handler(self):
1320        # Check problem occurring while unpickling a task in
1321        # the result_handler thread
1322        self._check_crash(BrokenProcessPool,
1323                          _return_instance, ErrorAtUnpickle,
1324                          ignore_stderr=True)
1325
1326    def test_exit_during_result_unpickle_in_result_handler(self):
1327        # Check problem occurring while unpickling a task in
1328        # the result_handler thread
1329        self._check_crash(BrokenProcessPool, _return_instance, ExitAtUnpickle)
1330
1331    def test_shutdown_deadlock(self):
1332        # Test that the pool calling shutdown do not cause deadlock
1333        # if a worker fails after the shutdown call.
1334        self.executor.shutdown(wait=True)
1335        with self.executor_type(max_workers=2,
1336                                mp_context=self.get_context()) as executor:
1337            self.executor = executor  # Allow clean up in fail_on_deadlock
1338            f = executor.submit(_crash, delay=.1)
1339            executor.shutdown(wait=True)
1340            with self.assertRaises(BrokenProcessPool):
1341                f.result()
1342
1343    def test_shutdown_deadlock_pickle(self):
1344        # Test that the pool calling shutdown with wait=False does not cause
1345        # a deadlock if a task fails at pickle after the shutdown call.
1346        # Reported in bpo-39104.
1347        self.executor.shutdown(wait=True)
1348        with self.executor_type(max_workers=2,
1349                                mp_context=self.get_context()) as executor:
1350            self.executor = executor  # Allow clean up in fail_on_deadlock
1351
1352            # Start the executor and get the executor_manager_thread to collect
1353            # the threads and avoid dangling thread that should be cleaned up
1354            # asynchronously.
1355            executor.submit(id, 42).result()
1356            executor_manager = executor._executor_manager_thread
1357
1358            # Submit a task that fails at pickle and shutdown the executor
1359            # without waiting
1360            f = executor.submit(id, ErrorAtPickle())
1361            executor.shutdown(wait=False)
1362            with self.assertRaises(PicklingError):
1363                f.result()
1364
1365        # Make sure the executor is eventually shutdown and do not leave
1366        # dangling threads
1367        executor_manager.join()
1368
1369
1370create_executor_tests(ExecutorDeadlockTest,
1371                      executor_mixins=(ProcessPoolForkMixin,
1372                                       ProcessPoolForkserverMixin,
1373                                       ProcessPoolSpawnMixin))
1374
1375
1376class FutureTests(BaseTestCase):
1377    def test_done_callback_with_result(self):
1378        callback_result = None
1379        def fn(callback_future):
1380            nonlocal callback_result
1381            callback_result = callback_future.result()
1382
1383        f = Future()
1384        f.add_done_callback(fn)
1385        f.set_result(5)
1386        self.assertEqual(5, callback_result)
1387
1388    def test_done_callback_with_exception(self):
1389        callback_exception = None
1390        def fn(callback_future):
1391            nonlocal callback_exception
1392            callback_exception = callback_future.exception()
1393
1394        f = Future()
1395        f.add_done_callback(fn)
1396        f.set_exception(Exception('test'))
1397        self.assertEqual(('test',), callback_exception.args)
1398
1399    def test_done_callback_with_cancel(self):
1400        was_cancelled = None
1401        def fn(callback_future):
1402            nonlocal was_cancelled
1403            was_cancelled = callback_future.cancelled()
1404
1405        f = Future()
1406        f.add_done_callback(fn)
1407        self.assertTrue(f.cancel())
1408        self.assertTrue(was_cancelled)
1409
1410    def test_done_callback_raises(self):
1411        with support.captured_stderr() as stderr:
1412            raising_was_called = False
1413            fn_was_called = False
1414
1415            def raising_fn(callback_future):
1416                nonlocal raising_was_called
1417                raising_was_called = True
1418                raise Exception('doh!')
1419
1420            def fn(callback_future):
1421                nonlocal fn_was_called
1422                fn_was_called = True
1423
1424            f = Future()
1425            f.add_done_callback(raising_fn)
1426            f.add_done_callback(fn)
1427            f.set_result(5)
1428            self.assertTrue(raising_was_called)
1429            self.assertTrue(fn_was_called)
1430            self.assertIn('Exception: doh!', stderr.getvalue())
1431
1432    def test_done_callback_already_successful(self):
1433        callback_result = None
1434        def fn(callback_future):
1435            nonlocal callback_result
1436            callback_result = callback_future.result()
1437
1438        f = Future()
1439        f.set_result(5)
1440        f.add_done_callback(fn)
1441        self.assertEqual(5, callback_result)
1442
1443    def test_done_callback_already_failed(self):
1444        callback_exception = None
1445        def fn(callback_future):
1446            nonlocal callback_exception
1447            callback_exception = callback_future.exception()
1448
1449        f = Future()
1450        f.set_exception(Exception('test'))
1451        f.add_done_callback(fn)
1452        self.assertEqual(('test',), callback_exception.args)
1453
1454    def test_done_callback_already_cancelled(self):
1455        was_cancelled = None
1456        def fn(callback_future):
1457            nonlocal was_cancelled
1458            was_cancelled = callback_future.cancelled()
1459
1460        f = Future()
1461        self.assertTrue(f.cancel())
1462        f.add_done_callback(fn)
1463        self.assertTrue(was_cancelled)
1464
1465    def test_done_callback_raises_already_succeeded(self):
1466        with support.captured_stderr() as stderr:
1467            def raising_fn(callback_future):
1468                raise Exception('doh!')
1469
1470            f = Future()
1471
1472            # Set the result first to simulate a future that runs instantly,
1473            # effectively allowing the callback to be run immediately.
1474            f.set_result(5)
1475            f.add_done_callback(raising_fn)
1476
1477            self.assertIn('exception calling callback for', stderr.getvalue())
1478            self.assertIn('doh!', stderr.getvalue())
1479
1480
1481    def test_repr(self):
1482        self.assertRegex(repr(PENDING_FUTURE),
1483                         '<Future at 0x[0-9a-f]+ state=pending>')
1484        self.assertRegex(repr(RUNNING_FUTURE),
1485                         '<Future at 0x[0-9a-f]+ state=running>')
1486        self.assertRegex(repr(CANCELLED_FUTURE),
1487                         '<Future at 0x[0-9a-f]+ state=cancelled>')
1488        self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE),
1489                         '<Future at 0x[0-9a-f]+ state=cancelled>')
1490        self.assertRegex(
1491                repr(EXCEPTION_FUTURE),
1492                '<Future at 0x[0-9a-f]+ state=finished raised OSError>')
1493        self.assertRegex(
1494                repr(SUCCESSFUL_FUTURE),
1495                '<Future at 0x[0-9a-f]+ state=finished returned int>')
1496
1497
1498    def test_cancel(self):
1499        f1 = create_future(state=PENDING)
1500        f2 = create_future(state=RUNNING)
1501        f3 = create_future(state=CANCELLED)
1502        f4 = create_future(state=CANCELLED_AND_NOTIFIED)
1503        f5 = create_future(state=FINISHED, exception=OSError())
1504        f6 = create_future(state=FINISHED, result=5)
1505
1506        self.assertTrue(f1.cancel())
1507        self.assertEqual(f1._state, CANCELLED)
1508
1509        self.assertFalse(f2.cancel())
1510        self.assertEqual(f2._state, RUNNING)
1511
1512        self.assertTrue(f3.cancel())
1513        self.assertEqual(f3._state, CANCELLED)
1514
1515        self.assertTrue(f4.cancel())
1516        self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
1517
1518        self.assertFalse(f5.cancel())
1519        self.assertEqual(f5._state, FINISHED)
1520
1521        self.assertFalse(f6.cancel())
1522        self.assertEqual(f6._state, FINISHED)
1523
1524    def test_cancelled(self):
1525        self.assertFalse(PENDING_FUTURE.cancelled())
1526        self.assertFalse(RUNNING_FUTURE.cancelled())
1527        self.assertTrue(CANCELLED_FUTURE.cancelled())
1528        self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
1529        self.assertFalse(EXCEPTION_FUTURE.cancelled())
1530        self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
1531
1532    def test_done(self):
1533        self.assertFalse(PENDING_FUTURE.done())
1534        self.assertFalse(RUNNING_FUTURE.done())
1535        self.assertTrue(CANCELLED_FUTURE.done())
1536        self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
1537        self.assertTrue(EXCEPTION_FUTURE.done())
1538        self.assertTrue(SUCCESSFUL_FUTURE.done())
1539
1540    def test_running(self):
1541        self.assertFalse(PENDING_FUTURE.running())
1542        self.assertTrue(RUNNING_FUTURE.running())
1543        self.assertFalse(CANCELLED_FUTURE.running())
1544        self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
1545        self.assertFalse(EXCEPTION_FUTURE.running())
1546        self.assertFalse(SUCCESSFUL_FUTURE.running())
1547
1548    def test_result_with_timeout(self):
1549        self.assertRaises(futures.TimeoutError,
1550                          PENDING_FUTURE.result, timeout=0)
1551        self.assertRaises(futures.TimeoutError,
1552                          RUNNING_FUTURE.result, timeout=0)
1553        self.assertRaises(futures.CancelledError,
1554                          CANCELLED_FUTURE.result, timeout=0)
1555        self.assertRaises(futures.CancelledError,
1556                          CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
1557        self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0)
1558        self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
1559
1560    def test_result_with_success(self):
1561        # TODO([email protected]): This test is timing dependent.
1562        def notification():
1563            # Wait until the main thread is waiting for the result.
1564            time.sleep(1)
1565            f1.set_result(42)
1566
1567        f1 = create_future(state=PENDING)
1568        t = threading.Thread(target=notification)
1569        t.start()
1570
1571        self.assertEqual(f1.result(timeout=5), 42)
1572        t.join()
1573
1574    def test_result_with_cancel(self):
1575        # TODO([email protected]): This test is timing dependent.
1576        def notification():
1577            # Wait until the main thread is waiting for the result.
1578            time.sleep(1)
1579            f1.cancel()
1580
1581        f1 = create_future(state=PENDING)
1582        t = threading.Thread(target=notification)
1583        t.start()
1584
1585        self.assertRaises(futures.CancelledError,
1586                          f1.result, timeout=support.SHORT_TIMEOUT)
1587        t.join()
1588
1589    def test_exception_with_timeout(self):
1590        self.assertRaises(futures.TimeoutError,
1591                          PENDING_FUTURE.exception, timeout=0)
1592        self.assertRaises(futures.TimeoutError,
1593                          RUNNING_FUTURE.exception, timeout=0)
1594        self.assertRaises(futures.CancelledError,
1595                          CANCELLED_FUTURE.exception, timeout=0)
1596        self.assertRaises(futures.CancelledError,
1597                          CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
1598        self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
1599                                   OSError))
1600        self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
1601
1602    def test_exception_with_success(self):
1603        def notification():
1604            # Wait until the main thread is waiting for the exception.
1605            time.sleep(1)
1606            with f1._condition:
1607                f1._state = FINISHED
1608                f1._exception = OSError()
1609                f1._condition.notify_all()
1610
1611        f1 = create_future(state=PENDING)
1612        t = threading.Thread(target=notification)
1613        t.start()
1614
1615        self.assertTrue(isinstance(f1.exception(timeout=support.SHORT_TIMEOUT), OSError))
1616        t.join()
1617
1618    def test_multiple_set_result(self):
1619        f = create_future(state=PENDING)
1620        f.set_result(1)
1621
1622        with self.assertRaisesRegex(
1623                futures.InvalidStateError,
1624                'FINISHED: <Future at 0x[0-9a-f]+ '
1625                'state=finished returned int>'
1626        ):
1627            f.set_result(2)
1628
1629        self.assertTrue(f.done())
1630        self.assertEqual(f.result(), 1)
1631
1632    def test_multiple_set_exception(self):
1633        f = create_future(state=PENDING)
1634        e = ValueError()
1635        f.set_exception(e)
1636
1637        with self.assertRaisesRegex(
1638                futures.InvalidStateError,
1639                'FINISHED: <Future at 0x[0-9a-f]+ '
1640                'state=finished raised ValueError>'
1641        ):
1642            f.set_exception(Exception())
1643
1644        self.assertEqual(f.exception(), e)
1645
1646
1647def setUpModule():
1648    unittest.addModuleCleanup(multiprocessing.util._cleanup_tests)
1649    thread_info = threading_helper.threading_setup()
1650    unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
1651
1652
1653if __name__ == "__main__":
1654    unittest.main()
1655