1#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
6import unittest.mock
7import queue as pyqueue
8import textwrap
9import time
10import io
11import itertools
12import sys
13import os
14import gc
15import errno
16import signal
17import array
18import socket
19import random
20import logging
21import subprocess
22import struct
23import operator
24import pathlib
25import pickle
26import weakref
27import warnings
28import test.support
29import test.support.script_helper
30from test import support
31from test.support import hashlib_helper
32from test.support import import_helper
33from test.support import os_helper
34from test.support import socket_helper
35from test.support import threading_helper
36from test.support import warnings_helper
37
38
39# Skip tests if _multiprocessing wasn't built.
40_multiprocessing = import_helper.import_module('_multiprocessing')
41# Skip tests if sem_open implementation is broken.
42support.skip_if_broken_multiprocessing_synchronize()
43import threading
44
45import multiprocessing.connection
46import multiprocessing.dummy
47import multiprocessing.heap
48import multiprocessing.managers
49import multiprocessing.pool
50import multiprocessing.queues
51
52from multiprocessing import util
53
54try:
55    from multiprocessing import reduction
56    HAS_REDUCTION = reduction.HAVE_SEND_HANDLE
57except ImportError:
58    HAS_REDUCTION = False
59
60try:
61    from multiprocessing.sharedctypes import Value, copy
62    HAS_SHAREDCTYPES = True
63except ImportError:
64    HAS_SHAREDCTYPES = False
65
66try:
67    from multiprocessing import shared_memory
68    HAS_SHMEM = True
69except ImportError:
70    HAS_SHMEM = False
71
72try:
73    import msvcrt
74except ImportError:
75    msvcrt = None
76
77
78if support.check_sanitizer(address=True):
79    # bpo-45200: Skip multiprocessing tests if Python is built with ASAN to
80    # work around a libasan race condition: dead lock in pthread_create().
81    raise unittest.SkipTest("libasan has a pthread_create() dead lock")
82
83
84def latin(s):
85    return s.encode('latin')
86
87
88def close_queue(queue):
89    if isinstance(queue, multiprocessing.queues.Queue):
90        queue.close()
91        queue.join_thread()
92
93
94def join_process(process):
95    # Since multiprocessing.Process has the same API than threading.Thread
96    # (join() and is_alive(), the support function can be reused
97    threading_helper.join_thread(process)
98
99
100if os.name == "posix":
101    from multiprocessing import resource_tracker
102
103    def _resource_unlink(name, rtype):
104        resource_tracker._CLEANUP_FUNCS[rtype](name)
105
106
107#
108# Constants
109#
110
111LOG_LEVEL = util.SUBWARNING
112#LOG_LEVEL = logging.DEBUG
113
114DELTA = 0.1
115CHECK_TIMINGS = False     # making true makes tests take a lot longer
116                          # and can sometimes cause some non-serious
117                          # failures because some calls block a bit
118                          # longer than expected
119if CHECK_TIMINGS:
120    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
121else:
122    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
123
124# BaseManager.shutdown_timeout
125SHUTDOWN_TIMEOUT = support.SHORT_TIMEOUT
126
127HAVE_GETVALUE = not getattr(_multiprocessing,
128                            'HAVE_BROKEN_SEM_GETVALUE', False)
129
130WIN32 = (sys.platform == "win32")
131
132from multiprocessing.connection import wait
133
134def wait_for_handle(handle, timeout):
135    if timeout is not None and timeout < 0.0:
136        timeout = None
137    return wait([handle], timeout)
138
139try:
140    MAXFD = os.sysconf("SC_OPEN_MAX")
141except:
142    MAXFD = 256
143
144# To speed up tests when using the forkserver, we can preload these:
145PRELOAD = ['__main__', 'test.test_multiprocessing_forkserver']
146
147#
148# Some tests require ctypes
149#
150
151try:
152    from ctypes import Structure, c_int, c_double, c_longlong
153except ImportError:
154    Structure = object
155    c_int = c_double = c_longlong = None
156
157
158def check_enough_semaphores():
159    """Check that the system supports enough semaphores to run the test."""
160    # minimum number of semaphores available according to POSIX
161    nsems_min = 256
162    try:
163        nsems = os.sysconf("SC_SEM_NSEMS_MAX")
164    except (AttributeError, ValueError):
165        # sysconf not available or setting not available
166        return
167    if nsems == -1 or nsems >= nsems_min:
168        return
169    raise unittest.SkipTest("The OS doesn't support enough semaphores "
170                            "to run the test (required: %d)." % nsems_min)
171
172
173#
174# Creates a wrapper for a function which records the time it takes to finish
175#
176
177class TimingWrapper(object):
178
179    def __init__(self, func):
180        self.func = func
181        self.elapsed = None
182
183    def __call__(self, *args, **kwds):
184        t = time.monotonic()
185        try:
186            return self.func(*args, **kwds)
187        finally:
188            self.elapsed = time.monotonic() - t
189
190#
191# Base class for test cases
192#
193
194class BaseTestCase(object):
195
196    ALLOWED_TYPES = ('processes', 'manager', 'threads')
197
198    def assertTimingAlmostEqual(self, a, b):
199        if CHECK_TIMINGS:
200            self.assertAlmostEqual(a, b, 1)
201
202    def assertReturnsIfImplemented(self, value, func, *args):
203        try:
204            res = func(*args)
205        except NotImplementedError:
206            pass
207        else:
208            return self.assertEqual(value, res)
209
210    # For the sanity of Windows users, rather than crashing or freezing in
211    # multiple ways.
212    def __reduce__(self, *args):
213        raise NotImplementedError("shouldn't try to pickle a test case")
214
215    __reduce_ex__ = __reduce__
216
217#
218# Return the value of a semaphore
219#
220
221def get_value(self):
222    try:
223        return self.get_value()
224    except AttributeError:
225        try:
226            return self._Semaphore__value
227        except AttributeError:
228            try:
229                return self._value
230            except AttributeError:
231                raise NotImplementedError
232
233#
234# Testcases
235#
236
237class DummyCallable:
238    def __call__(self, q, c):
239        assert isinstance(c, DummyCallable)
240        q.put(5)
241
242
243class _TestProcess(BaseTestCase):
244
245    ALLOWED_TYPES = ('processes', 'threads')
246
247    def test_current(self):
248        if self.TYPE == 'threads':
249            self.skipTest('test not appropriate for {}'.format(self.TYPE))
250
251        current = self.current_process()
252        authkey = current.authkey
253
254        self.assertTrue(current.is_alive())
255        self.assertTrue(not current.daemon)
256        self.assertIsInstance(authkey, bytes)
257        self.assertTrue(len(authkey) > 0)
258        self.assertEqual(current.ident, os.getpid())
259        self.assertEqual(current.exitcode, None)
260
261    def test_set_executable(self):
262        if self.TYPE == 'threads':
263            self.skipTest(f'test not appropriate for {self.TYPE}')
264        paths = [
265            sys.executable,               # str
266            sys.executable.encode(),      # bytes
267            pathlib.Path(sys.executable)  # os.PathLike
268        ]
269        for path in paths:
270            self.set_executable(path)
271            p = self.Process()
272            p.start()
273            p.join()
274            self.assertEqual(p.exitcode, 0)
275
276    def test_args_argument(self):
277        # bpo-45735: Using list or tuple as *args* in constructor could
278        # achieve the same effect.
279        args_cases = (1, "str", [1], (1,))
280        args_types = (list, tuple)
281
282        test_cases = itertools.product(args_cases, args_types)
283
284        for args, args_type in test_cases:
285            with self.subTest(args=args, args_type=args_type):
286                q = self.Queue(1)
287                # pass a tuple or list as args
288                p = self.Process(target=self._test_args, args=args_type((q, args)))
289                p.daemon = True
290                p.start()
291                child_args = q.get()
292                self.assertEqual(child_args, args)
293                p.join()
294                close_queue(q)
295
296    @classmethod
297    def _test_args(cls, q, arg):
298        q.put(arg)
299
300    def test_daemon_argument(self):
301        if self.TYPE == "threads":
302            self.skipTest('test not appropriate for {}'.format(self.TYPE))
303
304        # By default uses the current process's daemon flag.
305        proc0 = self.Process(target=self._test)
306        self.assertEqual(proc0.daemon, self.current_process().daemon)
307        proc1 = self.Process(target=self._test, daemon=True)
308        self.assertTrue(proc1.daemon)
309        proc2 = self.Process(target=self._test, daemon=False)
310        self.assertFalse(proc2.daemon)
311
312    @classmethod
313    def _test(cls, q, *args, **kwds):
314        current = cls.current_process()
315        q.put(args)
316        q.put(kwds)
317        q.put(current.name)
318        if cls.TYPE != 'threads':
319            q.put(bytes(current.authkey))
320            q.put(current.pid)
321
322    def test_parent_process_attributes(self):
323        if self.TYPE == "threads":
324            self.skipTest('test not appropriate for {}'.format(self.TYPE))
325
326        self.assertIsNone(self.parent_process())
327
328        rconn, wconn = self.Pipe(duplex=False)
329        p = self.Process(target=self._test_send_parent_process, args=(wconn,))
330        p.start()
331        p.join()
332        parent_pid, parent_name = rconn.recv()
333        self.assertEqual(parent_pid, self.current_process().pid)
334        self.assertEqual(parent_pid, os.getpid())
335        self.assertEqual(parent_name, self.current_process().name)
336
337    @classmethod
338    def _test_send_parent_process(cls, wconn):
339        from multiprocessing.process import parent_process
340        wconn.send([parent_process().pid, parent_process().name])
341
342    def test_parent_process(self):
343        if self.TYPE == "threads":
344            self.skipTest('test not appropriate for {}'.format(self.TYPE))
345
346        # Launch a child process. Make it launch a grandchild process. Kill the
347        # child process and make sure that the grandchild notices the death of
348        # its parent (a.k.a the child process).
349        rconn, wconn = self.Pipe(duplex=False)
350        p = self.Process(
351            target=self._test_create_grandchild_process, args=(wconn, ))
352        p.start()
353
354        if not rconn.poll(timeout=support.LONG_TIMEOUT):
355            raise AssertionError("Could not communicate with child process")
356        parent_process_status = rconn.recv()
357        self.assertEqual(parent_process_status, "alive")
358
359        p.terminate()
360        p.join()
361
362        if not rconn.poll(timeout=support.LONG_TIMEOUT):
363            raise AssertionError("Could not communicate with child process")
364        parent_process_status = rconn.recv()
365        self.assertEqual(parent_process_status, "not alive")
366
367    @classmethod
368    def _test_create_grandchild_process(cls, wconn):
369        p = cls.Process(target=cls._test_report_parent_status, args=(wconn, ))
370        p.start()
371        time.sleep(300)
372
373    @classmethod
374    def _test_report_parent_status(cls, wconn):
375        from multiprocessing.process import parent_process
376        wconn.send("alive" if parent_process().is_alive() else "not alive")
377        parent_process().join(timeout=support.SHORT_TIMEOUT)
378        wconn.send("alive" if parent_process().is_alive() else "not alive")
379
380    def test_process(self):
381        q = self.Queue(1)
382        e = self.Event()
383        args = (q, 1, 2)
384        kwargs = {'hello':23, 'bye':2.54}
385        name = 'SomeProcess'
386        p = self.Process(
387            target=self._test, args=args, kwargs=kwargs, name=name
388            )
389        p.daemon = True
390        current = self.current_process()
391
392        if self.TYPE != 'threads':
393            self.assertEqual(p.authkey, current.authkey)
394        self.assertEqual(p.is_alive(), False)
395        self.assertEqual(p.daemon, True)
396        self.assertNotIn(p, self.active_children())
397        self.assertTrue(type(self.active_children()) is list)
398        self.assertEqual(p.exitcode, None)
399
400        p.start()
401
402        self.assertEqual(p.exitcode, None)
403        self.assertEqual(p.is_alive(), True)
404        self.assertIn(p, self.active_children())
405
406        self.assertEqual(q.get(), args[1:])
407        self.assertEqual(q.get(), kwargs)
408        self.assertEqual(q.get(), p.name)
409        if self.TYPE != 'threads':
410            self.assertEqual(q.get(), current.authkey)
411            self.assertEqual(q.get(), p.pid)
412
413        p.join()
414
415        self.assertEqual(p.exitcode, 0)
416        self.assertEqual(p.is_alive(), False)
417        self.assertNotIn(p, self.active_children())
418        close_queue(q)
419
420    @unittest.skipUnless(threading._HAVE_THREAD_NATIVE_ID, "needs native_id")
421    def test_process_mainthread_native_id(self):
422        if self.TYPE == 'threads':
423            self.skipTest('test not appropriate for {}'.format(self.TYPE))
424
425        current_mainthread_native_id = threading.main_thread().native_id
426
427        q = self.Queue(1)
428        p = self.Process(target=self._test_process_mainthread_native_id, args=(q,))
429        p.start()
430
431        child_mainthread_native_id = q.get()
432        p.join()
433        close_queue(q)
434
435        self.assertNotEqual(current_mainthread_native_id, child_mainthread_native_id)
436
437    @classmethod
438    def _test_process_mainthread_native_id(cls, q):
439        mainthread_native_id = threading.main_thread().native_id
440        q.put(mainthread_native_id)
441
442    @classmethod
443    def _sleep_some(cls):
444        time.sleep(100)
445
446    @classmethod
447    def _test_sleep(cls, delay):
448        time.sleep(delay)
449
450    def _kill_process(self, meth):
451        if self.TYPE == 'threads':
452            self.skipTest('test not appropriate for {}'.format(self.TYPE))
453
454        p = self.Process(target=self._sleep_some)
455        p.daemon = True
456        p.start()
457
458        self.assertEqual(p.is_alive(), True)
459        self.assertIn(p, self.active_children())
460        self.assertEqual(p.exitcode, None)
461
462        join = TimingWrapper(p.join)
463
464        self.assertEqual(join(0), None)
465        self.assertTimingAlmostEqual(join.elapsed, 0.0)
466        self.assertEqual(p.is_alive(), True)
467
468        self.assertEqual(join(-1), None)
469        self.assertTimingAlmostEqual(join.elapsed, 0.0)
470        self.assertEqual(p.is_alive(), True)
471
472        # XXX maybe terminating too soon causes the problems on Gentoo...
473        time.sleep(1)
474
475        meth(p)
476
477        if hasattr(signal, 'alarm'):
478            # On the Gentoo buildbot waitpid() often seems to block forever.
479            # We use alarm() to interrupt it if it blocks for too long.
480            def handler(*args):
481                raise RuntimeError('join took too long: %s' % p)
482            old_handler = signal.signal(signal.SIGALRM, handler)
483            try:
484                signal.alarm(10)
485                self.assertEqual(join(), None)
486            finally:
487                signal.alarm(0)
488                signal.signal(signal.SIGALRM, old_handler)
489        else:
490            self.assertEqual(join(), None)
491
492        self.assertTimingAlmostEqual(join.elapsed, 0.0)
493
494        self.assertEqual(p.is_alive(), False)
495        self.assertNotIn(p, self.active_children())
496
497        p.join()
498
499        return p.exitcode
500
501    def test_terminate(self):
502        exitcode = self._kill_process(multiprocessing.Process.terminate)
503        if os.name != 'nt':
504            self.assertEqual(exitcode, -signal.SIGTERM)
505
506    def test_kill(self):
507        exitcode = self._kill_process(multiprocessing.Process.kill)
508        if os.name != 'nt':
509            self.assertEqual(exitcode, -signal.SIGKILL)
510
511    def test_cpu_count(self):
512        try:
513            cpus = multiprocessing.cpu_count()
514        except NotImplementedError:
515            cpus = 1
516        self.assertTrue(type(cpus) is int)
517        self.assertTrue(cpus >= 1)
518
519    def test_active_children(self):
520        self.assertEqual(type(self.active_children()), list)
521
522        p = self.Process(target=time.sleep, args=(DELTA,))
523        self.assertNotIn(p, self.active_children())
524
525        p.daemon = True
526        p.start()
527        self.assertIn(p, self.active_children())
528
529        p.join()
530        self.assertNotIn(p, self.active_children())
531
532    @classmethod
533    def _test_recursion(cls, wconn, id):
534        wconn.send(id)
535        if len(id) < 2:
536            for i in range(2):
537                p = cls.Process(
538                    target=cls._test_recursion, args=(wconn, id+[i])
539                    )
540                p.start()
541                p.join()
542
543    def test_recursion(self):
544        rconn, wconn = self.Pipe(duplex=False)
545        self._test_recursion(wconn, [])
546
547        time.sleep(DELTA)
548        result = []
549        while rconn.poll():
550            result.append(rconn.recv())
551
552        expected = [
553            [],
554              [0],
555                [0, 0],
556                [0, 1],
557              [1],
558                [1, 0],
559                [1, 1]
560            ]
561        self.assertEqual(result, expected)
562
563    @classmethod
564    def _test_sentinel(cls, event):
565        event.wait(10.0)
566
567    def test_sentinel(self):
568        if self.TYPE == "threads":
569            self.skipTest('test not appropriate for {}'.format(self.TYPE))
570        event = self.Event()
571        p = self.Process(target=self._test_sentinel, args=(event,))
572        with self.assertRaises(ValueError):
573            p.sentinel
574        p.start()
575        self.addCleanup(p.join)
576        sentinel = p.sentinel
577        self.assertIsInstance(sentinel, int)
578        self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
579        event.set()
580        p.join()
581        self.assertTrue(wait_for_handle(sentinel, timeout=1))
582
583    @classmethod
584    def _test_close(cls, rc=0, q=None):
585        if q is not None:
586            q.get()
587        sys.exit(rc)
588
589    def test_close(self):
590        if self.TYPE == "threads":
591            self.skipTest('test not appropriate for {}'.format(self.TYPE))
592        q = self.Queue()
593        p = self.Process(target=self._test_close, kwargs={'q': q})
594        p.daemon = True
595        p.start()
596        self.assertEqual(p.is_alive(), True)
597        # Child is still alive, cannot close
598        with self.assertRaises(ValueError):
599            p.close()
600
601        q.put(None)
602        p.join()
603        self.assertEqual(p.is_alive(), False)
604        self.assertEqual(p.exitcode, 0)
605        p.close()
606        with self.assertRaises(ValueError):
607            p.is_alive()
608        with self.assertRaises(ValueError):
609            p.join()
610        with self.assertRaises(ValueError):
611            p.terminate()
612        p.close()
613
614        wr = weakref.ref(p)
615        del p
616        gc.collect()
617        self.assertIs(wr(), None)
618
619        close_queue(q)
620
621    def test_many_processes(self):
622        if self.TYPE == 'threads':
623            self.skipTest('test not appropriate for {}'.format(self.TYPE))
624
625        sm = multiprocessing.get_start_method()
626        N = 5 if sm == 'spawn' else 100
627
628        # Try to overwhelm the forkserver loop with events
629        procs = [self.Process(target=self._test_sleep, args=(0.01,))
630                 for i in range(N)]
631        for p in procs:
632            p.start()
633        for p in procs:
634            join_process(p)
635        for p in procs:
636            self.assertEqual(p.exitcode, 0)
637
638        procs = [self.Process(target=self._sleep_some)
639                 for i in range(N)]
640        for p in procs:
641            p.start()
642        time.sleep(0.001)  # let the children start...
643        for p in procs:
644            p.terminate()
645        for p in procs:
646            join_process(p)
647        if os.name != 'nt':
648            exitcodes = [-signal.SIGTERM]
649            if sys.platform == 'darwin':
650                # bpo-31510: On macOS, killing a freshly started process with
651                # SIGTERM sometimes kills the process with SIGKILL.
652                exitcodes.append(-signal.SIGKILL)
653            for p in procs:
654                self.assertIn(p.exitcode, exitcodes)
655
656    def test_lose_target_ref(self):
657        c = DummyCallable()
658        wr = weakref.ref(c)
659        q = self.Queue()
660        p = self.Process(target=c, args=(q, c))
661        del c
662        p.start()
663        p.join()
664        gc.collect()  # For PyPy or other GCs.
665        self.assertIs(wr(), None)
666        self.assertEqual(q.get(), 5)
667        close_queue(q)
668
669    @classmethod
670    def _test_child_fd_inflation(self, evt, q):
671        q.put(os_helper.fd_count())
672        evt.wait()
673
674    def test_child_fd_inflation(self):
675        # Number of fds in child processes should not grow with the
676        # number of running children.
677        if self.TYPE == 'threads':
678            self.skipTest('test not appropriate for {}'.format(self.TYPE))
679
680        sm = multiprocessing.get_start_method()
681        if sm == 'fork':
682            # The fork method by design inherits all fds from the parent,
683            # trying to go against it is a lost battle
684            self.skipTest('test not appropriate for {}'.format(sm))
685
686        N = 5
687        evt = self.Event()
688        q = self.Queue()
689
690        procs = [self.Process(target=self._test_child_fd_inflation, args=(evt, q))
691                 for i in range(N)]
692        for p in procs:
693            p.start()
694
695        try:
696            fd_counts = [q.get() for i in range(N)]
697            self.assertEqual(len(set(fd_counts)), 1, fd_counts)
698
699        finally:
700            evt.set()
701            for p in procs:
702                p.join()
703            close_queue(q)
704
705    @classmethod
706    def _test_wait_for_threads(self, evt):
707        def func1():
708            time.sleep(0.5)
709            evt.set()
710
711        def func2():
712            time.sleep(20)
713            evt.clear()
714
715        threading.Thread(target=func1).start()
716        threading.Thread(target=func2, daemon=True).start()
717
718    def test_wait_for_threads(self):
719        # A child process should wait for non-daemonic threads to end
720        # before exiting
721        if self.TYPE == 'threads':
722            self.skipTest('test not appropriate for {}'.format(self.TYPE))
723
724        evt = self.Event()
725        proc = self.Process(target=self._test_wait_for_threads, args=(evt,))
726        proc.start()
727        proc.join()
728        self.assertTrue(evt.is_set())
729
730    @classmethod
731    def _test_error_on_stdio_flush(self, evt, break_std_streams={}):
732        for stream_name, action in break_std_streams.items():
733            if action == 'close':
734                stream = io.StringIO()
735                stream.close()
736            else:
737                assert action == 'remove'
738                stream = None
739            setattr(sys, stream_name, None)
740        evt.set()
741
742    def test_error_on_stdio_flush_1(self):
743        # Check that Process works with broken standard streams
744        streams = [io.StringIO(), None]
745        streams[0].close()
746        for stream_name in ('stdout', 'stderr'):
747            for stream in streams:
748                old_stream = getattr(sys, stream_name)
749                setattr(sys, stream_name, stream)
750                try:
751                    evt = self.Event()
752                    proc = self.Process(target=self._test_error_on_stdio_flush,
753                                        args=(evt,))
754                    proc.start()
755                    proc.join()
756                    self.assertTrue(evt.is_set())
757                    self.assertEqual(proc.exitcode, 0)
758                finally:
759                    setattr(sys, stream_name, old_stream)
760
761    def test_error_on_stdio_flush_2(self):
762        # Same as test_error_on_stdio_flush_1(), but standard streams are
763        # broken by the child process
764        for stream_name in ('stdout', 'stderr'):
765            for action in ('close', 'remove'):
766                old_stream = getattr(sys, stream_name)
767                try:
768                    evt = self.Event()
769                    proc = self.Process(target=self._test_error_on_stdio_flush,
770                                        args=(evt, {stream_name: action}))
771                    proc.start()
772                    proc.join()
773                    self.assertTrue(evt.is_set())
774                    self.assertEqual(proc.exitcode, 0)
775                finally:
776                    setattr(sys, stream_name, old_stream)
777
778    @classmethod
779    def _sleep_and_set_event(self, evt, delay=0.0):
780        time.sleep(delay)
781        evt.set()
782
783    def check_forkserver_death(self, signum):
784        # bpo-31308: if the forkserver process has died, we should still
785        # be able to create and run new Process instances (the forkserver
786        # is implicitly restarted).
787        if self.TYPE == 'threads':
788            self.skipTest('test not appropriate for {}'.format(self.TYPE))
789        sm = multiprocessing.get_start_method()
790        if sm != 'forkserver':
791            # The fork method by design inherits all fds from the parent,
792            # trying to go against it is a lost battle
793            self.skipTest('test not appropriate for {}'.format(sm))
794
795        from multiprocessing.forkserver import _forkserver
796        _forkserver.ensure_running()
797
798        # First process sleeps 500 ms
799        delay = 0.5
800
801        evt = self.Event()
802        proc = self.Process(target=self._sleep_and_set_event, args=(evt, delay))
803        proc.start()
804
805        pid = _forkserver._forkserver_pid
806        os.kill(pid, signum)
807        # give time to the fork server to die and time to proc to complete
808        time.sleep(delay * 2.0)
809
810        evt2 = self.Event()
811        proc2 = self.Process(target=self._sleep_and_set_event, args=(evt2,))
812        proc2.start()
813        proc2.join()
814        self.assertTrue(evt2.is_set())
815        self.assertEqual(proc2.exitcode, 0)
816
817        proc.join()
818        self.assertTrue(evt.is_set())
819        self.assertIn(proc.exitcode, (0, 255))
820
821    def test_forkserver_sigint(self):
822        # Catchable signal
823        self.check_forkserver_death(signal.SIGINT)
824
825    def test_forkserver_sigkill(self):
826        # Uncatchable signal
827        if os.name != 'nt':
828            self.check_forkserver_death(signal.SIGKILL)
829
830
831#
832#
833#
834
835class _UpperCaser(multiprocessing.Process):
836
837    def __init__(self):
838        multiprocessing.Process.__init__(self)
839        self.child_conn, self.parent_conn = multiprocessing.Pipe()
840
841    def run(self):
842        self.parent_conn.close()
843        for s in iter(self.child_conn.recv, None):
844            self.child_conn.send(s.upper())
845        self.child_conn.close()
846
847    def submit(self, s):
848        assert type(s) is str
849        self.parent_conn.send(s)
850        return self.parent_conn.recv()
851
852    def stop(self):
853        self.parent_conn.send(None)
854        self.parent_conn.close()
855        self.child_conn.close()
856
857class _TestSubclassingProcess(BaseTestCase):
858
859    ALLOWED_TYPES = ('processes',)
860
861    def test_subclassing(self):
862        uppercaser = _UpperCaser()
863        uppercaser.daemon = True
864        uppercaser.start()
865        self.assertEqual(uppercaser.submit('hello'), 'HELLO')
866        self.assertEqual(uppercaser.submit('world'), 'WORLD')
867        uppercaser.stop()
868        uppercaser.join()
869
870    def test_stderr_flush(self):
871        # sys.stderr is flushed at process shutdown (issue #13812)
872        if self.TYPE == "threads":
873            self.skipTest('test not appropriate for {}'.format(self.TYPE))
874
875        testfn = os_helper.TESTFN
876        self.addCleanup(os_helper.unlink, testfn)
877        proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
878        proc.start()
879        proc.join()
880        with open(testfn, encoding="utf-8") as f:
881            err = f.read()
882            # The whole traceback was printed
883            self.assertIn("ZeroDivisionError", err)
884            self.assertIn("test_multiprocessing.py", err)
885            self.assertIn("1/0 # MARKER", err)
886
887    @classmethod
888    def _test_stderr_flush(cls, testfn):
889        fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
890        sys.stderr = open(fd, 'w', encoding="utf-8", closefd=False)
891        1/0 # MARKER
892
893
894    @classmethod
895    def _test_sys_exit(cls, reason, testfn):
896        fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
897        sys.stderr = open(fd, 'w', encoding="utf-8", closefd=False)
898        sys.exit(reason)
899
900    def test_sys_exit(self):
901        # See Issue 13854
902        if self.TYPE == 'threads':
903            self.skipTest('test not appropriate for {}'.format(self.TYPE))
904
905        testfn = os_helper.TESTFN
906        self.addCleanup(os_helper.unlink, testfn)
907
908        for reason in (
909            [1, 2, 3],
910            'ignore this',
911        ):
912            p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
913            p.daemon = True
914            p.start()
915            join_process(p)
916            self.assertEqual(p.exitcode, 1)
917
918            with open(testfn, encoding="utf-8") as f:
919                content = f.read()
920            self.assertEqual(content.rstrip(), str(reason))
921
922            os.unlink(testfn)
923
924        cases = [
925            ((True,), 1),
926            ((False,), 0),
927            ((8,), 8),
928            ((None,), 0),
929            ((), 0),
930            ]
931
932        for args, expected in cases:
933            with self.subTest(args=args):
934                p = self.Process(target=sys.exit, args=args)
935                p.daemon = True
936                p.start()
937                join_process(p)
938                self.assertEqual(p.exitcode, expected)
939
940#
941#
942#
943
944def queue_empty(q):
945    if hasattr(q, 'empty'):
946        return q.empty()
947    else:
948        return q.qsize() == 0
949
950def queue_full(q, maxsize):
951    if hasattr(q, 'full'):
952        return q.full()
953    else:
954        return q.qsize() == maxsize
955
956
957class _TestQueue(BaseTestCase):
958
959
960    @classmethod
961    def _test_put(cls, queue, child_can_start, parent_can_continue):
962        child_can_start.wait()
963        for i in range(6):
964            queue.get()
965        parent_can_continue.set()
966
967    def test_put(self):
968        MAXSIZE = 6
969        queue = self.Queue(maxsize=MAXSIZE)
970        child_can_start = self.Event()
971        parent_can_continue = self.Event()
972
973        proc = self.Process(
974            target=self._test_put,
975            args=(queue, child_can_start, parent_can_continue)
976            )
977        proc.daemon = True
978        proc.start()
979
980        self.assertEqual(queue_empty(queue), True)
981        self.assertEqual(queue_full(queue, MAXSIZE), False)
982
983        queue.put(1)
984        queue.put(2, True)
985        queue.put(3, True, None)
986        queue.put(4, False)
987        queue.put(5, False, None)
988        queue.put_nowait(6)
989
990        # the values may be in buffer but not yet in pipe so sleep a bit
991        time.sleep(DELTA)
992
993        self.assertEqual(queue_empty(queue), False)
994        self.assertEqual(queue_full(queue, MAXSIZE), True)
995
996        put = TimingWrapper(queue.put)
997        put_nowait = TimingWrapper(queue.put_nowait)
998
999        self.assertRaises(pyqueue.Full, put, 7, False)
1000        self.assertTimingAlmostEqual(put.elapsed, 0)
1001
1002        self.assertRaises(pyqueue.Full, put, 7, False, None)
1003        self.assertTimingAlmostEqual(put.elapsed, 0)
1004
1005        self.assertRaises(pyqueue.Full, put_nowait, 7)
1006        self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
1007
1008        self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
1009        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
1010
1011        self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
1012        self.assertTimingAlmostEqual(put.elapsed, 0)
1013
1014        self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
1015        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
1016
1017        child_can_start.set()
1018        parent_can_continue.wait()
1019
1020        self.assertEqual(queue_empty(queue), True)
1021        self.assertEqual(queue_full(queue, MAXSIZE), False)
1022
1023        proc.join()
1024        close_queue(queue)
1025
1026    @classmethod
1027    def _test_get(cls, queue, child_can_start, parent_can_continue):
1028        child_can_start.wait()
1029        #queue.put(1)
1030        queue.put(2)
1031        queue.put(3)
1032        queue.put(4)
1033        queue.put(5)
1034        parent_can_continue.set()
1035
1036    def test_get(self):
1037        queue = self.Queue()
1038        child_can_start = self.Event()
1039        parent_can_continue = self.Event()
1040
1041        proc = self.Process(
1042            target=self._test_get,
1043            args=(queue, child_can_start, parent_can_continue)
1044            )
1045        proc.daemon = True
1046        proc.start()
1047
1048        self.assertEqual(queue_empty(queue), True)
1049
1050        child_can_start.set()
1051        parent_can_continue.wait()
1052
1053        time.sleep(DELTA)
1054        self.assertEqual(queue_empty(queue), False)
1055
1056        # Hangs unexpectedly, remove for now
1057        #self.assertEqual(queue.get(), 1)
1058        self.assertEqual(queue.get(True, None), 2)
1059        self.assertEqual(queue.get(True), 3)
1060        self.assertEqual(queue.get(timeout=1), 4)
1061        self.assertEqual(queue.get_nowait(), 5)
1062
1063        self.assertEqual(queue_empty(queue), True)
1064
1065        get = TimingWrapper(queue.get)
1066        get_nowait = TimingWrapper(queue.get_nowait)
1067
1068        self.assertRaises(pyqueue.Empty, get, False)
1069        self.assertTimingAlmostEqual(get.elapsed, 0)
1070
1071        self.assertRaises(pyqueue.Empty, get, False, None)
1072        self.assertTimingAlmostEqual(get.elapsed, 0)
1073
1074        self.assertRaises(pyqueue.Empty, get_nowait)
1075        self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
1076
1077        self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
1078        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1079
1080        self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
1081        self.assertTimingAlmostEqual(get.elapsed, 0)
1082
1083        self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
1084        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
1085
1086        proc.join()
1087        close_queue(queue)
1088
1089    @classmethod
1090    def _test_fork(cls, queue):
1091        for i in range(10, 20):
1092            queue.put(i)
1093        # note that at this point the items may only be buffered, so the
1094        # process cannot shutdown until the feeder thread has finished
1095        # pushing items onto the pipe.
1096
1097    def test_fork(self):
1098        # Old versions of Queue would fail to create a new feeder
1099        # thread for a forked process if the original process had its
1100        # own feeder thread.  This test checks that this no longer
1101        # happens.
1102
1103        queue = self.Queue()
1104
1105        # put items on queue so that main process starts a feeder thread
1106        for i in range(10):
1107            queue.put(i)
1108
1109        # wait to make sure thread starts before we fork a new process
1110        time.sleep(DELTA)
1111
1112        # fork process
1113        p = self.Process(target=self._test_fork, args=(queue,))
1114        p.daemon = True
1115        p.start()
1116
1117        # check that all expected items are in the queue
1118        for i in range(20):
1119            self.assertEqual(queue.get(), i)
1120        self.assertRaises(pyqueue.Empty, queue.get, False)
1121
1122        p.join()
1123        close_queue(queue)
1124
1125    def test_qsize(self):
1126        q = self.Queue()
1127        try:
1128            self.assertEqual(q.qsize(), 0)
1129        except NotImplementedError:
1130            self.skipTest('qsize method not implemented')
1131        q.put(1)
1132        self.assertEqual(q.qsize(), 1)
1133        q.put(5)
1134        self.assertEqual(q.qsize(), 2)
1135        q.get()
1136        self.assertEqual(q.qsize(), 1)
1137        q.get()
1138        self.assertEqual(q.qsize(), 0)
1139        close_queue(q)
1140
1141    @classmethod
1142    def _test_task_done(cls, q):
1143        for obj in iter(q.get, None):
1144            time.sleep(DELTA)
1145            q.task_done()
1146
1147    def test_task_done(self):
1148        queue = self.JoinableQueue()
1149
1150        workers = [self.Process(target=self._test_task_done, args=(queue,))
1151                   for i in range(4)]
1152
1153        for p in workers:
1154            p.daemon = True
1155            p.start()
1156
1157        for i in range(10):
1158            queue.put(i)
1159
1160        queue.join()
1161
1162        for p in workers:
1163            queue.put(None)
1164
1165        for p in workers:
1166            p.join()
1167        close_queue(queue)
1168
1169    def test_no_import_lock_contention(self):
1170        with os_helper.temp_cwd():
1171            module_name = 'imported_by_an_imported_module'
1172            with open(module_name + '.py', 'w', encoding="utf-8") as f:
1173                f.write("""if 1:
1174                    import multiprocessing
1175
1176                    q = multiprocessing.Queue()
1177                    q.put('knock knock')
1178                    q.get(timeout=3)
1179                    q.close()
1180                    del q
1181                """)
1182
1183            with import_helper.DirsOnSysPath(os.getcwd()):
1184                try:
1185                    __import__(module_name)
1186                except pyqueue.Empty:
1187                    self.fail("Probable regression on import lock contention;"
1188                              " see Issue #22853")
1189
1190    def test_timeout(self):
1191        q = multiprocessing.Queue()
1192        start = time.monotonic()
1193        self.assertRaises(pyqueue.Empty, q.get, True, 0.200)
1194        delta = time.monotonic() - start
1195        # bpo-30317: Tolerate a delta of 100 ms because of the bad clock
1196        # resolution on Windows (usually 15.6 ms). x86 Windows7 3.x once
1197        # failed because the delta was only 135.8 ms.
1198        self.assertGreaterEqual(delta, 0.100)
1199        close_queue(q)
1200
1201    def test_queue_feeder_donot_stop_onexc(self):
1202        # bpo-30414: verify feeder handles exceptions correctly
1203        if self.TYPE != 'processes':
1204            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1205
1206        class NotSerializable(object):
1207            def __reduce__(self):
1208                raise AttributeError
1209        with test.support.captured_stderr():
1210            q = self.Queue()
1211            q.put(NotSerializable())
1212            q.put(True)
1213            self.assertTrue(q.get(timeout=support.SHORT_TIMEOUT))
1214            close_queue(q)
1215
1216        with test.support.captured_stderr():
1217            # bpo-33078: verify that the queue size is correctly handled
1218            # on errors.
1219            q = self.Queue(maxsize=1)
1220            q.put(NotSerializable())
1221            q.put(True)
1222            try:
1223                self.assertEqual(q.qsize(), 1)
1224            except NotImplementedError:
1225                # qsize is not available on all platform as it
1226                # relies on sem_getvalue
1227                pass
1228            self.assertTrue(q.get(timeout=support.SHORT_TIMEOUT))
1229            # Check that the size of the queue is correct
1230            self.assertTrue(q.empty())
1231            close_queue(q)
1232
1233    def test_queue_feeder_on_queue_feeder_error(self):
1234        # bpo-30006: verify feeder handles exceptions using the
1235        # _on_queue_feeder_error hook.
1236        if self.TYPE != 'processes':
1237            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1238
1239        class NotSerializable(object):
1240            """Mock unserializable object"""
1241            def __init__(self):
1242                self.reduce_was_called = False
1243                self.on_queue_feeder_error_was_called = False
1244
1245            def __reduce__(self):
1246                self.reduce_was_called = True
1247                raise AttributeError
1248
1249        class SafeQueue(multiprocessing.queues.Queue):
1250            """Queue with overloaded _on_queue_feeder_error hook"""
1251            @staticmethod
1252            def _on_queue_feeder_error(e, obj):
1253                if (isinstance(e, AttributeError) and
1254                        isinstance(obj, NotSerializable)):
1255                    obj.on_queue_feeder_error_was_called = True
1256
1257        not_serializable_obj = NotSerializable()
1258        # The captured_stderr reduces the noise in the test report
1259        with test.support.captured_stderr():
1260            q = SafeQueue(ctx=multiprocessing.get_context())
1261            q.put(not_serializable_obj)
1262
1263            # Verify that q is still functioning correctly
1264            q.put(True)
1265            self.assertTrue(q.get(timeout=support.SHORT_TIMEOUT))
1266
1267        # Assert that the serialization and the hook have been called correctly
1268        self.assertTrue(not_serializable_obj.reduce_was_called)
1269        self.assertTrue(not_serializable_obj.on_queue_feeder_error_was_called)
1270
1271    def test_closed_queue_put_get_exceptions(self):
1272        for q in multiprocessing.Queue(), multiprocessing.JoinableQueue():
1273            q.close()
1274            with self.assertRaisesRegex(ValueError, 'is closed'):
1275                q.put('foo')
1276            with self.assertRaisesRegex(ValueError, 'is closed'):
1277                q.get()
1278#
1279#
1280#
1281
1282class _TestLock(BaseTestCase):
1283
1284    def test_lock(self):
1285        lock = self.Lock()
1286        self.assertEqual(lock.acquire(), True)
1287        self.assertEqual(lock.acquire(False), False)
1288        self.assertEqual(lock.release(), None)
1289        self.assertRaises((ValueError, threading.ThreadError), lock.release)
1290
1291    def test_rlock(self):
1292        lock = self.RLock()
1293        self.assertEqual(lock.acquire(), True)
1294        self.assertEqual(lock.acquire(), True)
1295        self.assertEqual(lock.acquire(), True)
1296        self.assertEqual(lock.release(), None)
1297        self.assertEqual(lock.release(), None)
1298        self.assertEqual(lock.release(), None)
1299        self.assertRaises((AssertionError, RuntimeError), lock.release)
1300
1301    def test_lock_context(self):
1302        with self.Lock():
1303            pass
1304
1305
1306class _TestSemaphore(BaseTestCase):
1307
1308    def _test_semaphore(self, sem):
1309        self.assertReturnsIfImplemented(2, get_value, sem)
1310        self.assertEqual(sem.acquire(), True)
1311        self.assertReturnsIfImplemented(1, get_value, sem)
1312        self.assertEqual(sem.acquire(), True)
1313        self.assertReturnsIfImplemented(0, get_value, sem)
1314        self.assertEqual(sem.acquire(False), False)
1315        self.assertReturnsIfImplemented(0, get_value, sem)
1316        self.assertEqual(sem.release(), None)
1317        self.assertReturnsIfImplemented(1, get_value, sem)
1318        self.assertEqual(sem.release(), None)
1319        self.assertReturnsIfImplemented(2, get_value, sem)
1320
1321    def test_semaphore(self):
1322        sem = self.Semaphore(2)
1323        self._test_semaphore(sem)
1324        self.assertEqual(sem.release(), None)
1325        self.assertReturnsIfImplemented(3, get_value, sem)
1326        self.assertEqual(sem.release(), None)
1327        self.assertReturnsIfImplemented(4, get_value, sem)
1328
1329    def test_bounded_semaphore(self):
1330        sem = self.BoundedSemaphore(2)
1331        self._test_semaphore(sem)
1332        # Currently fails on OS/X
1333        #if HAVE_GETVALUE:
1334        #    self.assertRaises(ValueError, sem.release)
1335        #    self.assertReturnsIfImplemented(2, get_value, sem)
1336
1337    def test_timeout(self):
1338        if self.TYPE != 'processes':
1339            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1340
1341        sem = self.Semaphore(0)
1342        acquire = TimingWrapper(sem.acquire)
1343
1344        self.assertEqual(acquire(False), False)
1345        self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
1346
1347        self.assertEqual(acquire(False, None), False)
1348        self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
1349
1350        self.assertEqual(acquire(False, TIMEOUT1), False)
1351        self.assertTimingAlmostEqual(acquire.elapsed, 0)
1352
1353        self.assertEqual(acquire(True, TIMEOUT2), False)
1354        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
1355
1356        self.assertEqual(acquire(timeout=TIMEOUT3), False)
1357        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
1358
1359
1360class _TestCondition(BaseTestCase):
1361
1362    @classmethod
1363    def f(cls, cond, sleeping, woken, timeout=None):
1364        cond.acquire()
1365        sleeping.release()
1366        cond.wait(timeout)
1367        woken.release()
1368        cond.release()
1369
1370    def assertReachesEventually(self, func, value):
1371        for i in range(10):
1372            try:
1373                if func() == value:
1374                    break
1375            except NotImplementedError:
1376                break
1377            time.sleep(DELTA)
1378        time.sleep(DELTA)
1379        self.assertReturnsIfImplemented(value, func)
1380
1381    def check_invariant(self, cond):
1382        # this is only supposed to succeed when there are no sleepers
1383        if self.TYPE == 'processes':
1384            try:
1385                sleepers = (cond._sleeping_count.get_value() -
1386                            cond._woken_count.get_value())
1387                self.assertEqual(sleepers, 0)
1388                self.assertEqual(cond._wait_semaphore.get_value(), 0)
1389            except NotImplementedError:
1390                pass
1391
1392    def test_notify(self):
1393        cond = self.Condition()
1394        sleeping = self.Semaphore(0)
1395        woken = self.Semaphore(0)
1396
1397        p = self.Process(target=self.f, args=(cond, sleeping, woken))
1398        p.daemon = True
1399        p.start()
1400        self.addCleanup(p.join)
1401
1402        p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
1403        p.daemon = True
1404        p.start()
1405        self.addCleanup(p.join)
1406
1407        # wait for both children to start sleeping
1408        sleeping.acquire()
1409        sleeping.acquire()
1410
1411        # check no process/thread has woken up
1412        time.sleep(DELTA)
1413        self.assertReturnsIfImplemented(0, get_value, woken)
1414
1415        # wake up one process/thread
1416        cond.acquire()
1417        cond.notify()
1418        cond.release()
1419
1420        # check one process/thread has woken up
1421        time.sleep(DELTA)
1422        self.assertReturnsIfImplemented(1, get_value, woken)
1423
1424        # wake up another
1425        cond.acquire()
1426        cond.notify()
1427        cond.release()
1428
1429        # check other has woken up
1430        time.sleep(DELTA)
1431        self.assertReturnsIfImplemented(2, get_value, woken)
1432
1433        # check state is not mucked up
1434        self.check_invariant(cond)
1435        p.join()
1436
1437    def test_notify_all(self):
1438        cond = self.Condition()
1439        sleeping = self.Semaphore(0)
1440        woken = self.Semaphore(0)
1441
1442        # start some threads/processes which will timeout
1443        for i in range(3):
1444            p = self.Process(target=self.f,
1445                             args=(cond, sleeping, woken, TIMEOUT1))
1446            p.daemon = True
1447            p.start()
1448            self.addCleanup(p.join)
1449
1450            t = threading.Thread(target=self.f,
1451                                 args=(cond, sleeping, woken, TIMEOUT1))
1452            t.daemon = True
1453            t.start()
1454            self.addCleanup(t.join)
1455
1456        # wait for them all to sleep
1457        for i in range(6):
1458            sleeping.acquire()
1459
1460        # check they have all timed out
1461        for i in range(6):
1462            woken.acquire()
1463        self.assertReturnsIfImplemented(0, get_value, woken)
1464
1465        # check state is not mucked up
1466        self.check_invariant(cond)
1467
1468        # start some more threads/processes
1469        for i in range(3):
1470            p = self.Process(target=self.f, args=(cond, sleeping, woken))
1471            p.daemon = True
1472            p.start()
1473            self.addCleanup(p.join)
1474
1475            t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
1476            t.daemon = True
1477            t.start()
1478            self.addCleanup(t.join)
1479
1480        # wait for them to all sleep
1481        for i in range(6):
1482            sleeping.acquire()
1483
1484        # check no process/thread has woken up
1485        time.sleep(DELTA)
1486        self.assertReturnsIfImplemented(0, get_value, woken)
1487
1488        # wake them all up
1489        cond.acquire()
1490        cond.notify_all()
1491        cond.release()
1492
1493        # check they have all woken
1494        self.assertReachesEventually(lambda: get_value(woken), 6)
1495
1496        # check state is not mucked up
1497        self.check_invariant(cond)
1498
1499    def test_notify_n(self):
1500        cond = self.Condition()
1501        sleeping = self.Semaphore(0)
1502        woken = self.Semaphore(0)
1503
1504        # start some threads/processes
1505        for i in range(3):
1506            p = self.Process(target=self.f, args=(cond, sleeping, woken))
1507            p.daemon = True
1508            p.start()
1509            self.addCleanup(p.join)
1510
1511            t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
1512            t.daemon = True
1513            t.start()
1514            self.addCleanup(t.join)
1515
1516        # wait for them to all sleep
1517        for i in range(6):
1518            sleeping.acquire()
1519
1520        # check no process/thread has woken up
1521        time.sleep(DELTA)
1522        self.assertReturnsIfImplemented(0, get_value, woken)
1523
1524        # wake some of them up
1525        cond.acquire()
1526        cond.notify(n=2)
1527        cond.release()
1528
1529        # check 2 have woken
1530        self.assertReachesEventually(lambda: get_value(woken), 2)
1531
1532        # wake the rest of them
1533        cond.acquire()
1534        cond.notify(n=4)
1535        cond.release()
1536
1537        self.assertReachesEventually(lambda: get_value(woken), 6)
1538
1539        # doesn't do anything more
1540        cond.acquire()
1541        cond.notify(n=3)
1542        cond.release()
1543
1544        self.assertReturnsIfImplemented(6, get_value, woken)
1545
1546        # check state is not mucked up
1547        self.check_invariant(cond)
1548
1549    def test_timeout(self):
1550        cond = self.Condition()
1551        wait = TimingWrapper(cond.wait)
1552        cond.acquire()
1553        res = wait(TIMEOUT1)
1554        cond.release()
1555        self.assertEqual(res, False)
1556        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1557
1558    @classmethod
1559    def _test_waitfor_f(cls, cond, state):
1560        with cond:
1561            state.value = 0
1562            cond.notify()
1563            result = cond.wait_for(lambda : state.value==4)
1564            if not result or state.value != 4:
1565                sys.exit(1)
1566
1567    @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
1568    def test_waitfor(self):
1569        # based on test in test/lock_tests.py
1570        cond = self.Condition()
1571        state = self.Value('i', -1)
1572
1573        p = self.Process(target=self._test_waitfor_f, args=(cond, state))
1574        p.daemon = True
1575        p.start()
1576
1577        with cond:
1578            result = cond.wait_for(lambda : state.value==0)
1579            self.assertTrue(result)
1580            self.assertEqual(state.value, 0)
1581
1582        for i in range(4):
1583            time.sleep(0.01)
1584            with cond:
1585                state.value += 1
1586                cond.notify()
1587
1588        join_process(p)
1589        self.assertEqual(p.exitcode, 0)
1590
1591    @classmethod
1592    def _test_waitfor_timeout_f(cls, cond, state, success, sem):
1593        sem.release()
1594        with cond:
1595            expected = 0.1
1596            dt = time.monotonic()
1597            result = cond.wait_for(lambda : state.value==4, timeout=expected)
1598            dt = time.monotonic() - dt
1599            # borrow logic in assertTimeout() from test/lock_tests.py
1600            if not result and expected * 0.6 < dt < expected * 10.0:
1601                success.value = True
1602
1603    @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
1604    def test_waitfor_timeout(self):
1605        # based on test in test/lock_tests.py
1606        cond = self.Condition()
1607        state = self.Value('i', 0)
1608        success = self.Value('i', False)
1609        sem = self.Semaphore(0)
1610
1611        p = self.Process(target=self._test_waitfor_timeout_f,
1612                         args=(cond, state, success, sem))
1613        p.daemon = True
1614        p.start()
1615        self.assertTrue(sem.acquire(timeout=support.LONG_TIMEOUT))
1616
1617        # Only increment 3 times, so state == 4 is never reached.
1618        for i in range(3):
1619            time.sleep(0.01)
1620            with cond:
1621                state.value += 1
1622                cond.notify()
1623
1624        join_process(p)
1625        self.assertTrue(success.value)
1626
1627    @classmethod
1628    def _test_wait_result(cls, c, pid):
1629        with c:
1630            c.notify()
1631        time.sleep(1)
1632        if pid is not None:
1633            os.kill(pid, signal.SIGINT)
1634
1635    def test_wait_result(self):
1636        if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
1637            pid = os.getpid()
1638        else:
1639            pid = None
1640
1641        c = self.Condition()
1642        with c:
1643            self.assertFalse(c.wait(0))
1644            self.assertFalse(c.wait(0.1))
1645
1646            p = self.Process(target=self._test_wait_result, args=(c, pid))
1647            p.start()
1648
1649            self.assertTrue(c.wait(60))
1650            if pid is not None:
1651                self.assertRaises(KeyboardInterrupt, c.wait, 60)
1652
1653            p.join()
1654
1655
1656class _TestEvent(BaseTestCase):
1657
1658    @classmethod
1659    def _test_event(cls, event):
1660        time.sleep(TIMEOUT2)
1661        event.set()
1662
1663    def test_event(self):
1664        event = self.Event()
1665        wait = TimingWrapper(event.wait)
1666
1667        # Removed temporarily, due to API shear, this does not
1668        # work with threading._Event objects. is_set == isSet
1669        self.assertEqual(event.is_set(), False)
1670
1671        # Removed, threading.Event.wait() will return the value of the __flag
1672        # instead of None. API Shear with the semaphore backed mp.Event
1673        self.assertEqual(wait(0.0), False)
1674        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1675        self.assertEqual(wait(TIMEOUT1), False)
1676        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1677
1678        event.set()
1679
1680        # See note above on the API differences
1681        self.assertEqual(event.is_set(), True)
1682        self.assertEqual(wait(), True)
1683        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1684        self.assertEqual(wait(TIMEOUT1), True)
1685        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1686        # self.assertEqual(event.is_set(), True)
1687
1688        event.clear()
1689
1690        #self.assertEqual(event.is_set(), False)
1691
1692        p = self.Process(target=self._test_event, args=(event,))
1693        p.daemon = True
1694        p.start()
1695        self.assertEqual(wait(), True)
1696        p.join()
1697
1698    def test_repr(self) -> None:
1699        event = self.Event()
1700        if self.TYPE == 'processes':
1701            self.assertRegex(repr(event), r"<Event at .* unset>")
1702            event.set()
1703            self.assertRegex(repr(event), r"<Event at .* set>")
1704            event.clear()
1705            self.assertRegex(repr(event), r"<Event at .* unset>")
1706        elif self.TYPE == 'manager':
1707            self.assertRegex(repr(event), r"<EventProxy object, typeid 'Event' at .*")
1708            event.set()
1709            self.assertRegex(repr(event), r"<EventProxy object, typeid 'Event' at .*")
1710
1711
1712# Tests for Barrier - adapted from tests in test/lock_tests.py
1713#
1714
1715# Many of the tests for threading.Barrier use a list as an atomic
1716# counter: a value is appended to increment the counter, and the
1717# length of the list gives the value.  We use the class DummyList
1718# for the same purpose.
1719
1720class _DummyList(object):
1721
1722    def __init__(self):
1723        wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
1724        lock = multiprocessing.Lock()
1725        self.__setstate__((wrapper, lock))
1726        self._lengthbuf[0] = 0
1727
1728    def __setstate__(self, state):
1729        (self._wrapper, self._lock) = state
1730        self._lengthbuf = self._wrapper.create_memoryview().cast('i')
1731
1732    def __getstate__(self):
1733        return (self._wrapper, self._lock)
1734
1735    def append(self, _):
1736        with self._lock:
1737            self._lengthbuf[0] += 1
1738
1739    def __len__(self):
1740        with self._lock:
1741            return self._lengthbuf[0]
1742
1743def _wait():
1744    # A crude wait/yield function not relying on synchronization primitives.
1745    time.sleep(0.01)
1746
1747
1748class Bunch(object):
1749    """
1750    A bunch of threads.
1751    """
1752    def __init__(self, namespace, f, args, n, wait_before_exit=False):
1753        """
1754        Construct a bunch of `n` threads running the same function `f`.
1755        If `wait_before_exit` is True, the threads won't terminate until
1756        do_finish() is called.
1757        """
1758        self.f = f
1759        self.args = args
1760        self.n = n
1761        self.started = namespace.DummyList()
1762        self.finished = namespace.DummyList()
1763        self._can_exit = namespace.Event()
1764        if not wait_before_exit:
1765            self._can_exit.set()
1766
1767        threads = []
1768        for i in range(n):
1769            p = namespace.Process(target=self.task)
1770            p.daemon = True
1771            p.start()
1772            threads.append(p)
1773
1774        def finalize(threads):
1775            for p in threads:
1776                p.join()
1777
1778        self._finalizer = weakref.finalize(self, finalize, threads)
1779
1780    def task(self):
1781        pid = os.getpid()
1782        self.started.append(pid)
1783        try:
1784            self.f(*self.args)
1785        finally:
1786            self.finished.append(pid)
1787            self._can_exit.wait(30)
1788            assert self._can_exit.is_set()
1789
1790    def wait_for_started(self):
1791        while len(self.started) < self.n:
1792            _wait()
1793
1794    def wait_for_finished(self):
1795        while len(self.finished) < self.n:
1796            _wait()
1797
1798    def do_finish(self):
1799        self._can_exit.set()
1800
1801    def close(self):
1802        self._finalizer()
1803
1804
1805class AppendTrue(object):
1806    def __init__(self, obj):
1807        self.obj = obj
1808    def __call__(self):
1809        self.obj.append(True)
1810
1811
1812class _TestBarrier(BaseTestCase):
1813    """
1814    Tests for Barrier objects.
1815    """
1816    N = 5
1817    defaultTimeout = 30.0  # XXX Slow Windows buildbots need generous timeout
1818
1819    def setUp(self):
1820        self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
1821
1822    def tearDown(self):
1823        self.barrier.abort()
1824        self.barrier = None
1825
1826    def DummyList(self):
1827        if self.TYPE == 'threads':
1828            return []
1829        elif self.TYPE == 'manager':
1830            return self.manager.list()
1831        else:
1832            return _DummyList()
1833
1834    def run_threads(self, f, args):
1835        b = Bunch(self, f, args, self.N-1)
1836        try:
1837            f(*args)
1838            b.wait_for_finished()
1839        finally:
1840            b.close()
1841
1842    @classmethod
1843    def multipass(cls, barrier, results, n):
1844        m = barrier.parties
1845        assert m == cls.N
1846        for i in range(n):
1847            results[0].append(True)
1848            assert len(results[1]) == i * m
1849            barrier.wait()
1850            results[1].append(True)
1851            assert len(results[0]) == (i + 1) * m
1852            barrier.wait()
1853        try:
1854            assert barrier.n_waiting == 0
1855        except NotImplementedError:
1856            pass
1857        assert not barrier.broken
1858
1859    def test_barrier(self, passes=1):
1860        """
1861        Test that a barrier is passed in lockstep
1862        """
1863        results = [self.DummyList(), self.DummyList()]
1864        self.run_threads(self.multipass, (self.barrier, results, passes))
1865
1866    def test_barrier_10(self):
1867        """
1868        Test that a barrier works for 10 consecutive runs
1869        """
1870        return self.test_barrier(10)
1871
1872    @classmethod
1873    def _test_wait_return_f(cls, barrier, queue):
1874        res = barrier.wait()
1875        queue.put(res)
1876
1877    def test_wait_return(self):
1878        """
1879        test the return value from barrier.wait
1880        """
1881        queue = self.Queue()
1882        self.run_threads(self._test_wait_return_f, (self.barrier, queue))
1883        results = [queue.get() for i in range(self.N)]
1884        self.assertEqual(results.count(0), 1)
1885        close_queue(queue)
1886
1887    @classmethod
1888    def _test_action_f(cls, barrier, results):
1889        barrier.wait()
1890        if len(results) != 1:
1891            raise RuntimeError
1892
1893    def test_action(self):
1894        """
1895        Test the 'action' callback
1896        """
1897        results = self.DummyList()
1898        barrier = self.Barrier(self.N, action=AppendTrue(results))
1899        self.run_threads(self._test_action_f, (barrier, results))
1900        self.assertEqual(len(results), 1)
1901
1902    @classmethod
1903    def _test_abort_f(cls, barrier, results1, results2):
1904        try:
1905            i = barrier.wait()
1906            if i == cls.N//2:
1907                raise RuntimeError
1908            barrier.wait()
1909            results1.append(True)
1910        except threading.BrokenBarrierError:
1911            results2.append(True)
1912        except RuntimeError:
1913            barrier.abort()
1914
1915    def test_abort(self):
1916        """
1917        Test that an abort will put the barrier in a broken state
1918        """
1919        results1 = self.DummyList()
1920        results2 = self.DummyList()
1921        self.run_threads(self._test_abort_f,
1922                         (self.barrier, results1, results2))
1923        self.assertEqual(len(results1), 0)
1924        self.assertEqual(len(results2), self.N-1)
1925        self.assertTrue(self.barrier.broken)
1926
1927    @classmethod
1928    def _test_reset_f(cls, barrier, results1, results2, results3):
1929        i = barrier.wait()
1930        if i == cls.N//2:
1931            # Wait until the other threads are all in the barrier.
1932            while barrier.n_waiting < cls.N-1:
1933                time.sleep(0.001)
1934            barrier.reset()
1935        else:
1936            try:
1937                barrier.wait()
1938                results1.append(True)
1939            except threading.BrokenBarrierError:
1940                results2.append(True)
1941        # Now, pass the barrier again
1942        barrier.wait()
1943        results3.append(True)
1944
1945    def test_reset(self):
1946        """
1947        Test that a 'reset' on a barrier frees the waiting threads
1948        """
1949        results1 = self.DummyList()
1950        results2 = self.DummyList()
1951        results3 = self.DummyList()
1952        self.run_threads(self._test_reset_f,
1953                         (self.barrier, results1, results2, results3))
1954        self.assertEqual(len(results1), 0)
1955        self.assertEqual(len(results2), self.N-1)
1956        self.assertEqual(len(results3), self.N)
1957
1958    @classmethod
1959    def _test_abort_and_reset_f(cls, barrier, barrier2,
1960                                results1, results2, results3):
1961        try:
1962            i = barrier.wait()
1963            if i == cls.N//2:
1964                raise RuntimeError
1965            barrier.wait()
1966            results1.append(True)
1967        except threading.BrokenBarrierError:
1968            results2.append(True)
1969        except RuntimeError:
1970            barrier.abort()
1971        # Synchronize and reset the barrier.  Must synchronize first so
1972        # that everyone has left it when we reset, and after so that no
1973        # one enters it before the reset.
1974        if barrier2.wait() == cls.N//2:
1975            barrier.reset()
1976        barrier2.wait()
1977        barrier.wait()
1978        results3.append(True)
1979
1980    def test_abort_and_reset(self):
1981        """
1982        Test that a barrier can be reset after being broken.
1983        """
1984        results1 = self.DummyList()
1985        results2 = self.DummyList()
1986        results3 = self.DummyList()
1987        barrier2 = self.Barrier(self.N)
1988
1989        self.run_threads(self._test_abort_and_reset_f,
1990                         (self.barrier, barrier2, results1, results2, results3))
1991        self.assertEqual(len(results1), 0)
1992        self.assertEqual(len(results2), self.N-1)
1993        self.assertEqual(len(results3), self.N)
1994
1995    @classmethod
1996    def _test_timeout_f(cls, barrier, results):
1997        i = barrier.wait()
1998        if i == cls.N//2:
1999            # One thread is late!
2000            time.sleep(1.0)
2001        try:
2002            barrier.wait(0.5)
2003        except threading.BrokenBarrierError:
2004            results.append(True)
2005
2006    def test_timeout(self):
2007        """
2008        Test wait(timeout)
2009        """
2010        results = self.DummyList()
2011        self.run_threads(self._test_timeout_f, (self.barrier, results))
2012        self.assertEqual(len(results), self.barrier.parties)
2013
2014    @classmethod
2015    def _test_default_timeout_f(cls, barrier, results):
2016        i = barrier.wait(cls.defaultTimeout)
2017        if i == cls.N//2:
2018            # One thread is later than the default timeout
2019            time.sleep(1.0)
2020        try:
2021            barrier.wait()
2022        except threading.BrokenBarrierError:
2023            results.append(True)
2024
2025    def test_default_timeout(self):
2026        """
2027        Test the barrier's default timeout
2028        """
2029        barrier = self.Barrier(self.N, timeout=0.5)
2030        results = self.DummyList()
2031        self.run_threads(self._test_default_timeout_f, (barrier, results))
2032        self.assertEqual(len(results), barrier.parties)
2033
2034    def test_single_thread(self):
2035        b = self.Barrier(1)
2036        b.wait()
2037        b.wait()
2038
2039    @classmethod
2040    def _test_thousand_f(cls, barrier, passes, conn, lock):
2041        for i in range(passes):
2042            barrier.wait()
2043            with lock:
2044                conn.send(i)
2045
2046    def test_thousand(self):
2047        if self.TYPE == 'manager':
2048            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2049        passes = 1000
2050        lock = self.Lock()
2051        conn, child_conn = self.Pipe(False)
2052        for j in range(self.N):
2053            p = self.Process(target=self._test_thousand_f,
2054                           args=(self.barrier, passes, child_conn, lock))
2055            p.start()
2056            self.addCleanup(p.join)
2057
2058        for i in range(passes):
2059            for j in range(self.N):
2060                self.assertEqual(conn.recv(), i)
2061
2062#
2063#
2064#
2065
2066class _TestValue(BaseTestCase):
2067
2068    ALLOWED_TYPES = ('processes',)
2069
2070    codes_values = [
2071        ('i', 4343, 24234),
2072        ('d', 3.625, -4.25),
2073        ('h', -232, 234),
2074        ('q', 2 ** 33, 2 ** 34),
2075        ('c', latin('x'), latin('y'))
2076        ]
2077
2078    def setUp(self):
2079        if not HAS_SHAREDCTYPES:
2080            self.skipTest("requires multiprocessing.sharedctypes")
2081
2082    @classmethod
2083    def _test(cls, values):
2084        for sv, cv in zip(values, cls.codes_values):
2085            sv.value = cv[2]
2086
2087
2088    def test_value(self, raw=False):
2089        if raw:
2090            values = [self.RawValue(code, value)
2091                      for code, value, _ in self.codes_values]
2092        else:
2093            values = [self.Value(code, value)
2094                      for code, value, _ in self.codes_values]
2095
2096        for sv, cv in zip(values, self.codes_values):
2097            self.assertEqual(sv.value, cv[1])
2098
2099        proc = self.Process(target=self._test, args=(values,))
2100        proc.daemon = True
2101        proc.start()
2102        proc.join()
2103
2104        for sv, cv in zip(values, self.codes_values):
2105            self.assertEqual(sv.value, cv[2])
2106
2107    def test_rawvalue(self):
2108        self.test_value(raw=True)
2109
2110    def test_getobj_getlock(self):
2111        val1 = self.Value('i', 5)
2112        lock1 = val1.get_lock()
2113        obj1 = val1.get_obj()
2114
2115        val2 = self.Value('i', 5, lock=None)
2116        lock2 = val2.get_lock()
2117        obj2 = val2.get_obj()
2118
2119        lock = self.Lock()
2120        val3 = self.Value('i', 5, lock=lock)
2121        lock3 = val3.get_lock()
2122        obj3 = val3.get_obj()
2123        self.assertEqual(lock, lock3)
2124
2125        arr4 = self.Value('i', 5, lock=False)
2126        self.assertFalse(hasattr(arr4, 'get_lock'))
2127        self.assertFalse(hasattr(arr4, 'get_obj'))
2128
2129        self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
2130
2131        arr5 = self.RawValue('i', 5)
2132        self.assertFalse(hasattr(arr5, 'get_lock'))
2133        self.assertFalse(hasattr(arr5, 'get_obj'))
2134
2135
2136class _TestArray(BaseTestCase):
2137
2138    ALLOWED_TYPES = ('processes',)
2139
2140    @classmethod
2141    def f(cls, seq):
2142        for i in range(1, len(seq)):
2143            seq[i] += seq[i-1]
2144
2145    @unittest.skipIf(c_int is None, "requires _ctypes")
2146    def test_array(self, raw=False):
2147        seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
2148        if raw:
2149            arr = self.RawArray('i', seq)
2150        else:
2151            arr = self.Array('i', seq)
2152
2153        self.assertEqual(len(arr), len(seq))
2154        self.assertEqual(arr[3], seq[3])
2155        self.assertEqual(list(arr[2:7]), list(seq[2:7]))
2156
2157        arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
2158
2159        self.assertEqual(list(arr[:]), seq)
2160
2161        self.f(seq)
2162
2163        p = self.Process(target=self.f, args=(arr,))
2164        p.daemon = True
2165        p.start()
2166        p.join()
2167
2168        self.assertEqual(list(arr[:]), seq)
2169
2170    @unittest.skipIf(c_int is None, "requires _ctypes")
2171    def test_array_from_size(self):
2172        size = 10
2173        # Test for zeroing (see issue #11675).
2174        # The repetition below strengthens the test by increasing the chances
2175        # of previously allocated non-zero memory being used for the new array
2176        # on the 2nd and 3rd loops.
2177        for _ in range(3):
2178            arr = self.Array('i', size)
2179            self.assertEqual(len(arr), size)
2180            self.assertEqual(list(arr), [0] * size)
2181            arr[:] = range(10)
2182            self.assertEqual(list(arr), list(range(10)))
2183            del arr
2184
2185    @unittest.skipIf(c_int is None, "requires _ctypes")
2186    def test_rawarray(self):
2187        self.test_array(raw=True)
2188
2189    @unittest.skipIf(c_int is None, "requires _ctypes")
2190    def test_getobj_getlock_obj(self):
2191        arr1 = self.Array('i', list(range(10)))
2192        lock1 = arr1.get_lock()
2193        obj1 = arr1.get_obj()
2194
2195        arr2 = self.Array('i', list(range(10)), lock=None)
2196        lock2 = arr2.get_lock()
2197        obj2 = arr2.get_obj()
2198
2199        lock = self.Lock()
2200        arr3 = self.Array('i', list(range(10)), lock=lock)
2201        lock3 = arr3.get_lock()
2202        obj3 = arr3.get_obj()
2203        self.assertEqual(lock, lock3)
2204
2205        arr4 = self.Array('i', range(10), lock=False)
2206        self.assertFalse(hasattr(arr4, 'get_lock'))
2207        self.assertFalse(hasattr(arr4, 'get_obj'))
2208        self.assertRaises(AttributeError,
2209                          self.Array, 'i', range(10), lock='notalock')
2210
2211        arr5 = self.RawArray('i', range(10))
2212        self.assertFalse(hasattr(arr5, 'get_lock'))
2213        self.assertFalse(hasattr(arr5, 'get_obj'))
2214
2215#
2216#
2217#
2218
2219class _TestContainers(BaseTestCase):
2220
2221    ALLOWED_TYPES = ('manager',)
2222
2223    def test_list(self):
2224        a = self.list(list(range(10)))
2225        self.assertEqual(a[:], list(range(10)))
2226
2227        b = self.list()
2228        self.assertEqual(b[:], [])
2229
2230        b.extend(list(range(5)))
2231        self.assertEqual(b[:], list(range(5)))
2232
2233        self.assertEqual(b[2], 2)
2234        self.assertEqual(b[2:10], [2,3,4])
2235
2236        b *= 2
2237        self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
2238
2239        self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
2240
2241        self.assertEqual(a[:], list(range(10)))
2242
2243        d = [a, b]
2244        e = self.list(d)
2245        self.assertEqual(
2246            [element[:] for element in e],
2247            [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
2248            )
2249
2250        f = self.list([a])
2251        a.append('hello')
2252        self.assertEqual(f[0][:], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello'])
2253
2254    def test_list_iter(self):
2255        a = self.list(list(range(10)))
2256        it = iter(a)
2257        self.assertEqual(list(it), list(range(10)))
2258        self.assertEqual(list(it), [])  # exhausted
2259        # list modified during iteration
2260        it = iter(a)
2261        a[0] = 100
2262        self.assertEqual(next(it), 100)
2263
2264    def test_list_proxy_in_list(self):
2265        a = self.list([self.list(range(3)) for _i in range(3)])
2266        self.assertEqual([inner[:] for inner in a], [[0, 1, 2]] * 3)
2267
2268        a[0][-1] = 55
2269        self.assertEqual(a[0][:], [0, 1, 55])
2270        for i in range(1, 3):
2271            self.assertEqual(a[i][:], [0, 1, 2])
2272
2273        self.assertEqual(a[1].pop(), 2)
2274        self.assertEqual(len(a[1]), 2)
2275        for i in range(0, 3, 2):
2276            self.assertEqual(len(a[i]), 3)
2277
2278        del a
2279
2280        b = self.list()
2281        b.append(b)
2282        del b
2283
2284    def test_dict(self):
2285        d = self.dict()
2286        indices = list(range(65, 70))
2287        for i in indices:
2288            d[i] = chr(i)
2289        self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
2290        self.assertEqual(sorted(d.keys()), indices)
2291        self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
2292        self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
2293
2294    def test_dict_iter(self):
2295        d = self.dict()
2296        indices = list(range(65, 70))
2297        for i in indices:
2298            d[i] = chr(i)
2299        it = iter(d)
2300        self.assertEqual(list(it), indices)
2301        self.assertEqual(list(it), [])  # exhausted
2302        # dictionary changed size during iteration
2303        it = iter(d)
2304        d.clear()
2305        self.assertRaises(RuntimeError, next, it)
2306
2307    def test_dict_proxy_nested(self):
2308        pets = self.dict(ferrets=2, hamsters=4)
2309        supplies = self.dict(water=10, feed=3)
2310        d = self.dict(pets=pets, supplies=supplies)
2311
2312        self.assertEqual(supplies['water'], 10)
2313        self.assertEqual(d['supplies']['water'], 10)
2314
2315        d['supplies']['blankets'] = 5
2316        self.assertEqual(supplies['blankets'], 5)
2317        self.assertEqual(d['supplies']['blankets'], 5)
2318
2319        d['supplies']['water'] = 7
2320        self.assertEqual(supplies['water'], 7)
2321        self.assertEqual(d['supplies']['water'], 7)
2322
2323        del pets
2324        del supplies
2325        self.assertEqual(d['pets']['ferrets'], 2)
2326        d['supplies']['blankets'] = 11
2327        self.assertEqual(d['supplies']['blankets'], 11)
2328
2329        pets = d['pets']
2330        supplies = d['supplies']
2331        supplies['water'] = 7
2332        self.assertEqual(supplies['water'], 7)
2333        self.assertEqual(d['supplies']['water'], 7)
2334
2335        d.clear()
2336        self.assertEqual(len(d), 0)
2337        self.assertEqual(supplies['water'], 7)
2338        self.assertEqual(pets['hamsters'], 4)
2339
2340        l = self.list([pets, supplies])
2341        l[0]['marmots'] = 1
2342        self.assertEqual(pets['marmots'], 1)
2343        self.assertEqual(l[0]['marmots'], 1)
2344
2345        del pets
2346        del supplies
2347        self.assertEqual(l[0]['marmots'], 1)
2348
2349        outer = self.list([[88, 99], l])
2350        self.assertIsInstance(outer[0], list)  # Not a ListProxy
2351        self.assertEqual(outer[-1][-1]['feed'], 3)
2352
2353    def test_nested_queue(self):
2354        a = self.list() # Test queue inside list
2355        a.append(self.Queue())
2356        a[0].put(123)
2357        self.assertEqual(a[0].get(), 123)
2358        b = self.dict() # Test queue inside dict
2359        b[0] = self.Queue()
2360        b[0].put(456)
2361        self.assertEqual(b[0].get(), 456)
2362
2363    def test_namespace(self):
2364        n = self.Namespace()
2365        n.name = 'Bob'
2366        n.job = 'Builder'
2367        n._hidden = 'hidden'
2368        self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
2369        del n.job
2370        self.assertEqual(str(n), "Namespace(name='Bob')")
2371        self.assertTrue(hasattr(n, 'name'))
2372        self.assertTrue(not hasattr(n, 'job'))
2373
2374#
2375#
2376#
2377
2378def sqr(x, wait=0.0):
2379    time.sleep(wait)
2380    return x*x
2381
2382def mul(x, y):
2383    return x*y
2384
2385def raise_large_valuerror(wait):
2386    time.sleep(wait)
2387    raise ValueError("x" * 1024**2)
2388
2389def identity(x):
2390    return x
2391
2392class CountedObject(object):
2393    n_instances = 0
2394
2395    def __new__(cls):
2396        cls.n_instances += 1
2397        return object.__new__(cls)
2398
2399    def __del__(self):
2400        type(self).n_instances -= 1
2401
2402class SayWhenError(ValueError): pass
2403
2404def exception_throwing_generator(total, when):
2405    if when == -1:
2406        raise SayWhenError("Somebody said when")
2407    for i in range(total):
2408        if i == when:
2409            raise SayWhenError("Somebody said when")
2410        yield i
2411
2412
2413class _TestPool(BaseTestCase):
2414
2415    @classmethod
2416    def setUpClass(cls):
2417        super().setUpClass()
2418        cls.pool = cls.Pool(4)
2419
2420    @classmethod
2421    def tearDownClass(cls):
2422        cls.pool.terminate()
2423        cls.pool.join()
2424        cls.pool = None
2425        super().tearDownClass()
2426
2427    def test_apply(self):
2428        papply = self.pool.apply
2429        self.assertEqual(papply(sqr, (5,)), sqr(5))
2430        self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
2431
2432    def test_map(self):
2433        pmap = self.pool.map
2434        self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
2435        self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
2436                         list(map(sqr, list(range(100)))))
2437
2438    def test_starmap(self):
2439        psmap = self.pool.starmap
2440        tuples = list(zip(range(10), range(9,-1, -1)))
2441        self.assertEqual(psmap(mul, tuples),
2442                         list(itertools.starmap(mul, tuples)))
2443        tuples = list(zip(range(100), range(99,-1, -1)))
2444        self.assertEqual(psmap(mul, tuples, chunksize=20),
2445                         list(itertools.starmap(mul, tuples)))
2446
2447    def test_starmap_async(self):
2448        tuples = list(zip(range(100), range(99,-1, -1)))
2449        self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
2450                         list(itertools.starmap(mul, tuples)))
2451
2452    def test_map_async(self):
2453        self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
2454                         list(map(sqr, list(range(10)))))
2455
2456    def test_map_async_callbacks(self):
2457        call_args = self.manager.list() if self.TYPE == 'manager' else []
2458        self.pool.map_async(int, ['1'],
2459                            callback=call_args.append,
2460                            error_callback=call_args.append).wait()
2461        self.assertEqual(1, len(call_args))
2462        self.assertEqual([1], call_args[0])
2463        self.pool.map_async(int, ['a'],
2464                            callback=call_args.append,
2465                            error_callback=call_args.append).wait()
2466        self.assertEqual(2, len(call_args))
2467        self.assertIsInstance(call_args[1], ValueError)
2468
2469    def test_map_unplicklable(self):
2470        # Issue #19425 -- failure to pickle should not cause a hang
2471        if self.TYPE == 'threads':
2472            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2473        class A(object):
2474            def __reduce__(self):
2475                raise RuntimeError('cannot pickle')
2476        with self.assertRaises(RuntimeError):
2477            self.pool.map(sqr, [A()]*10)
2478
2479    def test_map_chunksize(self):
2480        try:
2481            self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
2482        except multiprocessing.TimeoutError:
2483            self.fail("pool.map_async with chunksize stalled on null list")
2484
2485    def test_map_handle_iterable_exception(self):
2486        if self.TYPE == 'manager':
2487            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2488
2489        # SayWhenError seen at the very first of the iterable
2490        with self.assertRaises(SayWhenError):
2491            self.pool.map(sqr, exception_throwing_generator(1, -1), 1)
2492        # again, make sure it's reentrant
2493        with self.assertRaises(SayWhenError):
2494            self.pool.map(sqr, exception_throwing_generator(1, -1), 1)
2495
2496        with self.assertRaises(SayWhenError):
2497            self.pool.map(sqr, exception_throwing_generator(10, 3), 1)
2498
2499        class SpecialIterable:
2500            def __iter__(self):
2501                return self
2502            def __next__(self):
2503                raise SayWhenError
2504            def __len__(self):
2505                return 1
2506        with self.assertRaises(SayWhenError):
2507            self.pool.map(sqr, SpecialIterable(), 1)
2508        with self.assertRaises(SayWhenError):
2509            self.pool.map(sqr, SpecialIterable(), 1)
2510
2511    def test_async(self):
2512        res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
2513        get = TimingWrapper(res.get)
2514        self.assertEqual(get(), 49)
2515        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
2516
2517    def test_async_timeout(self):
2518        res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
2519        get = TimingWrapper(res.get)
2520        self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
2521        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
2522
2523    def test_imap(self):
2524        it = self.pool.imap(sqr, list(range(10)))
2525        self.assertEqual(list(it), list(map(sqr, list(range(10)))))
2526
2527        it = self.pool.imap(sqr, list(range(10)))
2528        for i in range(10):
2529            self.assertEqual(next(it), i*i)
2530        self.assertRaises(StopIteration, it.__next__)
2531
2532        it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
2533        for i in range(1000):
2534            self.assertEqual(next(it), i*i)
2535        self.assertRaises(StopIteration, it.__next__)
2536
2537    def test_imap_handle_iterable_exception(self):
2538        if self.TYPE == 'manager':
2539            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2540
2541        # SayWhenError seen at the very first of the iterable
2542        it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1)
2543        self.assertRaises(SayWhenError, it.__next__)
2544        # again, make sure it's reentrant
2545        it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1)
2546        self.assertRaises(SayWhenError, it.__next__)
2547
2548        it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
2549        for i in range(3):
2550            self.assertEqual(next(it), i*i)
2551        self.assertRaises(SayWhenError, it.__next__)
2552
2553        # SayWhenError seen at start of problematic chunk's results
2554        it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
2555        for i in range(6):
2556            self.assertEqual(next(it), i*i)
2557        self.assertRaises(SayWhenError, it.__next__)
2558        it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
2559        for i in range(4):
2560            self.assertEqual(next(it), i*i)
2561        self.assertRaises(SayWhenError, it.__next__)
2562
2563    def test_imap_unordered(self):
2564        it = self.pool.imap_unordered(sqr, list(range(10)))
2565        self.assertEqual(sorted(it), list(map(sqr, list(range(10)))))
2566
2567        it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=100)
2568        self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
2569
2570    def test_imap_unordered_handle_iterable_exception(self):
2571        if self.TYPE == 'manager':
2572            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2573
2574        # SayWhenError seen at the very first of the iterable
2575        it = self.pool.imap_unordered(sqr,
2576                                      exception_throwing_generator(1, -1),
2577                                      1)
2578        self.assertRaises(SayWhenError, it.__next__)
2579        # again, make sure it's reentrant
2580        it = self.pool.imap_unordered(sqr,
2581                                      exception_throwing_generator(1, -1),
2582                                      1)
2583        self.assertRaises(SayWhenError, it.__next__)
2584
2585        it = self.pool.imap_unordered(sqr,
2586                                      exception_throwing_generator(10, 3),
2587                                      1)
2588        expected_values = list(map(sqr, list(range(10))))
2589        with self.assertRaises(SayWhenError):
2590            # imap_unordered makes it difficult to anticipate the SayWhenError
2591            for i in range(10):
2592                value = next(it)
2593                self.assertIn(value, expected_values)
2594                expected_values.remove(value)
2595
2596        it = self.pool.imap_unordered(sqr,
2597                                      exception_throwing_generator(20, 7),
2598                                      2)
2599        expected_values = list(map(sqr, list(range(20))))
2600        with self.assertRaises(SayWhenError):
2601            for i in range(20):
2602                value = next(it)
2603                self.assertIn(value, expected_values)
2604                expected_values.remove(value)
2605
2606    def test_make_pool(self):
2607        expected_error = (RemoteError if self.TYPE == 'manager'
2608                          else ValueError)
2609
2610        self.assertRaises(expected_error, self.Pool, -1)
2611        self.assertRaises(expected_error, self.Pool, 0)
2612
2613        if self.TYPE != 'manager':
2614            p = self.Pool(3)
2615            try:
2616                self.assertEqual(3, len(p._pool))
2617            finally:
2618                p.close()
2619                p.join()
2620
2621    def test_terminate(self):
2622        result = self.pool.map_async(
2623            time.sleep, [0.1 for i in range(10000)], chunksize=1
2624            )
2625        self.pool.terminate()
2626        join = TimingWrapper(self.pool.join)
2627        join()
2628        # Sanity check the pool didn't wait for all tasks to finish
2629        self.assertLess(join.elapsed, 2.0)
2630
2631    def test_empty_iterable(self):
2632        # See Issue 12157
2633        p = self.Pool(1)
2634
2635        self.assertEqual(p.map(sqr, []), [])
2636        self.assertEqual(list(p.imap(sqr, [])), [])
2637        self.assertEqual(list(p.imap_unordered(sqr, [])), [])
2638        self.assertEqual(p.map_async(sqr, []).get(), [])
2639
2640        p.close()
2641        p.join()
2642
2643    def test_context(self):
2644        if self.TYPE == 'processes':
2645            L = list(range(10))
2646            expected = [sqr(i) for i in L]
2647            with self.Pool(2) as p:
2648                r = p.map_async(sqr, L)
2649                self.assertEqual(r.get(), expected)
2650            p.join()
2651            self.assertRaises(ValueError, p.map_async, sqr, L)
2652
2653    @classmethod
2654    def _test_traceback(cls):
2655        raise RuntimeError(123) # some comment
2656
2657    def test_traceback(self):
2658        # We want ensure that the traceback from the child process is
2659        # contained in the traceback raised in the main process.
2660        if self.TYPE == 'processes':
2661            with self.Pool(1) as p:
2662                try:
2663                    p.apply(self._test_traceback)
2664                except Exception as e:
2665                    exc = e
2666                else:
2667                    self.fail('expected RuntimeError')
2668            p.join()
2669            self.assertIs(type(exc), RuntimeError)
2670            self.assertEqual(exc.args, (123,))
2671            cause = exc.__cause__
2672            self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback)
2673            self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
2674
2675            with test.support.captured_stderr() as f1:
2676                try:
2677                    raise exc
2678                except RuntimeError:
2679                    sys.excepthook(*sys.exc_info())
2680            self.assertIn('raise RuntimeError(123) # some comment',
2681                          f1.getvalue())
2682            # _helper_reraises_exception should not make the error
2683            # a remote exception
2684            with self.Pool(1) as p:
2685                try:
2686                    p.map(sqr, exception_throwing_generator(1, -1), 1)
2687                except Exception as e:
2688                    exc = e
2689                else:
2690                    self.fail('expected SayWhenError')
2691                self.assertIs(type(exc), SayWhenError)
2692                self.assertIs(exc.__cause__, None)
2693            p.join()
2694
2695    @classmethod
2696    def _test_wrapped_exception(cls):
2697        raise RuntimeError('foo')
2698
2699    def test_wrapped_exception(self):
2700        # Issue #20980: Should not wrap exception when using thread pool
2701        with self.Pool(1) as p:
2702            with self.assertRaises(RuntimeError):
2703                p.apply(self._test_wrapped_exception)
2704        p.join()
2705
2706    def test_map_no_failfast(self):
2707        # Issue #23992: the fail-fast behaviour when an exception is raised
2708        # during map() would make Pool.join() deadlock, because a worker
2709        # process would fill the result queue (after the result handler thread
2710        # terminated, hence not draining it anymore).
2711
2712        t_start = time.monotonic()
2713
2714        with self.assertRaises(ValueError):
2715            with self.Pool(2) as p:
2716                try:
2717                    p.map(raise_large_valuerror, [0, 1])
2718                finally:
2719                    time.sleep(0.5)
2720                    p.close()
2721                    p.join()
2722
2723        # check that we indeed waited for all jobs
2724        self.assertGreater(time.monotonic() - t_start, 0.9)
2725
2726    def test_release_task_refs(self):
2727        # Issue #29861: task arguments and results should not be kept
2728        # alive after we are done with them.
2729        objs = [CountedObject() for i in range(10)]
2730        refs = [weakref.ref(o) for o in objs]
2731        self.pool.map(identity, objs)
2732
2733        del objs
2734        gc.collect()  # For PyPy or other GCs.
2735        time.sleep(DELTA)  # let threaded cleanup code run
2736        self.assertEqual(set(wr() for wr in refs), {None})
2737        # With a process pool, copies of the objects are returned, check
2738        # they were released too.
2739        self.assertEqual(CountedObject.n_instances, 0)
2740
2741    def test_enter(self):
2742        if self.TYPE == 'manager':
2743            self.skipTest("test not applicable to manager")
2744
2745        pool = self.Pool(1)
2746        with pool:
2747            pass
2748            # call pool.terminate()
2749        # pool is no longer running
2750
2751        with self.assertRaises(ValueError):
2752            # bpo-35477: pool.__enter__() fails if the pool is not running
2753            with pool:
2754                pass
2755        pool.join()
2756
2757    def test_resource_warning(self):
2758        if self.TYPE == 'manager':
2759            self.skipTest("test not applicable to manager")
2760
2761        pool = self.Pool(1)
2762        pool.terminate()
2763        pool.join()
2764
2765        # force state to RUN to emit ResourceWarning in __del__()
2766        pool._state = multiprocessing.pool.RUN
2767
2768        with warnings_helper.check_warnings(
2769                ('unclosed running multiprocessing pool', ResourceWarning)):
2770            pool = None
2771            support.gc_collect()
2772
2773def raising():
2774    raise KeyError("key")
2775
2776def unpickleable_result():
2777    return lambda: 42
2778
2779class _TestPoolWorkerErrors(BaseTestCase):
2780    ALLOWED_TYPES = ('processes', )
2781
2782    def test_async_error_callback(self):
2783        p = multiprocessing.Pool(2)
2784
2785        scratchpad = [None]
2786        def errback(exc):
2787            scratchpad[0] = exc
2788
2789        res = p.apply_async(raising, error_callback=errback)
2790        self.assertRaises(KeyError, res.get)
2791        self.assertTrue(scratchpad[0])
2792        self.assertIsInstance(scratchpad[0], KeyError)
2793
2794        p.close()
2795        p.join()
2796
2797    def test_unpickleable_result(self):
2798        from multiprocessing.pool import MaybeEncodingError
2799        p = multiprocessing.Pool(2)
2800
2801        # Make sure we don't lose pool processes because of encoding errors.
2802        for iteration in range(20):
2803
2804            scratchpad = [None]
2805            def errback(exc):
2806                scratchpad[0] = exc
2807
2808            res = p.apply_async(unpickleable_result, error_callback=errback)
2809            self.assertRaises(MaybeEncodingError, res.get)
2810            wrapped = scratchpad[0]
2811            self.assertTrue(wrapped)
2812            self.assertIsInstance(scratchpad[0], MaybeEncodingError)
2813            self.assertIsNotNone(wrapped.exc)
2814            self.assertIsNotNone(wrapped.value)
2815
2816        p.close()
2817        p.join()
2818
2819class _TestPoolWorkerLifetime(BaseTestCase):
2820    ALLOWED_TYPES = ('processes', )
2821
2822    def test_pool_worker_lifetime(self):
2823        p = multiprocessing.Pool(3, maxtasksperchild=10)
2824        self.assertEqual(3, len(p._pool))
2825        origworkerpids = [w.pid for w in p._pool]
2826        # Run many tasks so each worker gets replaced (hopefully)
2827        results = []
2828        for i in range(100):
2829            results.append(p.apply_async(sqr, (i, )))
2830        # Fetch the results and verify we got the right answers,
2831        # also ensuring all the tasks have completed.
2832        for (j, res) in enumerate(results):
2833            self.assertEqual(res.get(), sqr(j))
2834        # Refill the pool
2835        p._repopulate_pool()
2836        # Wait until all workers are alive
2837        # (countdown * DELTA = 5 seconds max startup process time)
2838        countdown = 50
2839        while countdown and not all(w.is_alive() for w in p._pool):
2840            countdown -= 1
2841            time.sleep(DELTA)
2842        finalworkerpids = [w.pid for w in p._pool]
2843        # All pids should be assigned.  See issue #7805.
2844        self.assertNotIn(None, origworkerpids)
2845        self.assertNotIn(None, finalworkerpids)
2846        # Finally, check that the worker pids have changed
2847        self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
2848        p.close()
2849        p.join()
2850
2851    def test_pool_worker_lifetime_early_close(self):
2852        # Issue #10332: closing a pool whose workers have limited lifetimes
2853        # before all the tasks completed would make join() hang.
2854        p = multiprocessing.Pool(3, maxtasksperchild=1)
2855        results = []
2856        for i in range(6):
2857            results.append(p.apply_async(sqr, (i, 0.3)))
2858        p.close()
2859        p.join()
2860        # check the results
2861        for (j, res) in enumerate(results):
2862            self.assertEqual(res.get(), sqr(j))
2863
2864    def test_pool_maxtasksperchild_invalid(self):
2865        for value in [0, -1, 0.5, "12"]:
2866            with self.assertRaises(ValueError):
2867                multiprocessing.Pool(3, maxtasksperchild=value)
2868
2869    def test_worker_finalization_via_atexit_handler_of_multiprocessing(self):
2870        # tests cases against bpo-38744 and bpo-39360
2871        cmd = '''if 1:
2872            from multiprocessing import Pool
2873            problem = None
2874            class A:
2875                def __init__(self):
2876                    self.pool = Pool(processes=1)
2877            def test():
2878                global problem
2879                problem = A()
2880                problem.pool.map(float, tuple(range(10)))
2881            if __name__ == "__main__":
2882                test()
2883        '''
2884        rc, out, err = test.support.script_helper.assert_python_ok('-c', cmd)
2885        self.assertEqual(rc, 0)
2886
2887#
2888# Test of creating a customized manager class
2889#
2890
2891from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
2892
2893class FooBar(object):
2894    def f(self):
2895        return 'f()'
2896    def g(self):
2897        raise ValueError
2898    def _h(self):
2899        return '_h()'
2900
2901def baz():
2902    for i in range(10):
2903        yield i*i
2904
2905class IteratorProxy(BaseProxy):
2906    _exposed_ = ('__next__',)
2907    def __iter__(self):
2908        return self
2909    def __next__(self):
2910        return self._callmethod('__next__')
2911
2912class MyManager(BaseManager):
2913    pass
2914
2915MyManager.register('Foo', callable=FooBar)
2916MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
2917MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
2918
2919
2920class _TestMyManager(BaseTestCase):
2921
2922    ALLOWED_TYPES = ('manager',)
2923
2924    def test_mymanager(self):
2925        manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT)
2926        manager.start()
2927        self.common(manager)
2928        manager.shutdown()
2929
2930        # bpo-30356: BaseManager._finalize_manager() sends SIGTERM
2931        # to the manager process if it takes longer than 1 second to stop,
2932        # which happens on slow buildbots.
2933        self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
2934
2935    def test_mymanager_context(self):
2936        manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT)
2937        with manager:
2938            self.common(manager)
2939        # bpo-30356: BaseManager._finalize_manager() sends SIGTERM
2940        # to the manager process if it takes longer than 1 second to stop,
2941        # which happens on slow buildbots.
2942        self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
2943
2944    def test_mymanager_context_prestarted(self):
2945        manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT)
2946        manager.start()
2947        with manager:
2948            self.common(manager)
2949        self.assertEqual(manager._process.exitcode, 0)
2950
2951    def common(self, manager):
2952        foo = manager.Foo()
2953        bar = manager.Bar()
2954        baz = manager.baz()
2955
2956        foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
2957        bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
2958
2959        self.assertEqual(foo_methods, ['f', 'g'])
2960        self.assertEqual(bar_methods, ['f', '_h'])
2961
2962        self.assertEqual(foo.f(), 'f()')
2963        self.assertRaises(ValueError, foo.g)
2964        self.assertEqual(foo._callmethod('f'), 'f()')
2965        self.assertRaises(RemoteError, foo._callmethod, '_h')
2966
2967        self.assertEqual(bar.f(), 'f()')
2968        self.assertEqual(bar._h(), '_h()')
2969        self.assertEqual(bar._callmethod('f'), 'f()')
2970        self.assertEqual(bar._callmethod('_h'), '_h()')
2971
2972        self.assertEqual(list(baz), [i*i for i in range(10)])
2973
2974
2975#
2976# Test of connecting to a remote server and using xmlrpclib for serialization
2977#
2978
2979_queue = pyqueue.Queue()
2980def get_queue():
2981    return _queue
2982
2983class QueueManager(BaseManager):
2984    '''manager class used by server process'''
2985QueueManager.register('get_queue', callable=get_queue)
2986
2987class QueueManager2(BaseManager):
2988    '''manager class which specifies the same interface as QueueManager'''
2989QueueManager2.register('get_queue')
2990
2991
2992SERIALIZER = 'xmlrpclib'
2993
2994class _TestRemoteManager(BaseTestCase):
2995
2996    ALLOWED_TYPES = ('manager',)
2997    values = ['hello world', None, True, 2.25,
2998              'hall\xe5 v\xe4rlden',
2999              '\u043f\u0440\u0438\u0432\u0456\u0442 \u0441\u0432\u0456\u0442',
3000              b'hall\xe5 v\xe4rlden',
3001             ]
3002    result = values[:]
3003
3004    @classmethod
3005    def _putter(cls, address, authkey):
3006        manager = QueueManager2(
3007            address=address, authkey=authkey, serializer=SERIALIZER,
3008            shutdown_timeout=SHUTDOWN_TIMEOUT)
3009        manager.connect()
3010        queue = manager.get_queue()
3011        # Note that xmlrpclib will deserialize object as a list not a tuple
3012        queue.put(tuple(cls.values))
3013
3014    def test_remote(self):
3015        authkey = os.urandom(32)
3016
3017        manager = QueueManager(
3018            address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER,
3019            shutdown_timeout=SHUTDOWN_TIMEOUT)
3020        manager.start()
3021        self.addCleanup(manager.shutdown)
3022
3023        p = self.Process(target=self._putter, args=(manager.address, authkey))
3024        p.daemon = True
3025        p.start()
3026
3027        manager2 = QueueManager2(
3028            address=manager.address, authkey=authkey, serializer=SERIALIZER,
3029            shutdown_timeout=SHUTDOWN_TIMEOUT)
3030        manager2.connect()
3031        queue = manager2.get_queue()
3032
3033        self.assertEqual(queue.get(), self.result)
3034
3035        # Because we are using xmlrpclib for serialization instead of
3036        # pickle this will cause a serialization error.
3037        self.assertRaises(Exception, queue.put, time.sleep)
3038
3039        # Make queue finalizer run before the server is stopped
3040        del queue
3041
3042
3043@hashlib_helper.requires_hashdigest('md5')
3044class _TestManagerRestart(BaseTestCase):
3045
3046    @classmethod
3047    def _putter(cls, address, authkey):
3048        manager = QueueManager(
3049            address=address, authkey=authkey, serializer=SERIALIZER,
3050            shutdown_timeout=SHUTDOWN_TIMEOUT)
3051        manager.connect()
3052        queue = manager.get_queue()
3053        queue.put('hello world')
3054
3055    def test_rapid_restart(self):
3056        authkey = os.urandom(32)
3057        manager = QueueManager(
3058            address=(socket_helper.HOST, 0), authkey=authkey,
3059            serializer=SERIALIZER, shutdown_timeout=SHUTDOWN_TIMEOUT)
3060        try:
3061            srvr = manager.get_server()
3062            addr = srvr.address
3063            # Close the connection.Listener socket which gets opened as a part
3064            # of manager.get_server(). It's not needed for the test.
3065            srvr.listener.close()
3066            manager.start()
3067
3068            p = self.Process(target=self._putter, args=(manager.address, authkey))
3069            p.start()
3070            p.join()
3071            queue = manager.get_queue()
3072            self.assertEqual(queue.get(), 'hello world')
3073            del queue
3074        finally:
3075            if hasattr(manager, "shutdown"):
3076                manager.shutdown()
3077
3078        manager = QueueManager(
3079            address=addr, authkey=authkey, serializer=SERIALIZER,
3080            shutdown_timeout=SHUTDOWN_TIMEOUT)
3081        try:
3082            manager.start()
3083            self.addCleanup(manager.shutdown)
3084        except OSError as e:
3085            if e.errno != errno.EADDRINUSE:
3086                raise
3087            # Retry after some time, in case the old socket was lingering
3088            # (sporadic failure on buildbots)
3089            time.sleep(1.0)
3090            manager = QueueManager(
3091                address=addr, authkey=authkey, serializer=SERIALIZER,
3092                shutdown_timeout=SHUTDOWN_TIMEOUT)
3093            if hasattr(manager, "shutdown"):
3094                self.addCleanup(manager.shutdown)
3095
3096#
3097#
3098#
3099
3100SENTINEL = latin('')
3101
3102class _TestConnection(BaseTestCase):
3103
3104    ALLOWED_TYPES = ('processes', 'threads')
3105
3106    @classmethod
3107    def _echo(cls, conn):
3108        for msg in iter(conn.recv_bytes, SENTINEL):
3109            conn.send_bytes(msg)
3110        conn.close()
3111
3112    def test_connection(self):
3113        conn, child_conn = self.Pipe()
3114
3115        p = self.Process(target=self._echo, args=(child_conn,))
3116        p.daemon = True
3117        p.start()
3118
3119        seq = [1, 2.25, None]
3120        msg = latin('hello world')
3121        longmsg = msg * 10
3122        arr = array.array('i', list(range(4)))
3123
3124        if self.TYPE == 'processes':
3125            self.assertEqual(type(conn.fileno()), int)
3126
3127        self.assertEqual(conn.send(seq), None)
3128        self.assertEqual(conn.recv(), seq)
3129
3130        self.assertEqual(conn.send_bytes(msg), None)
3131        self.assertEqual(conn.recv_bytes(), msg)
3132
3133        if self.TYPE == 'processes':
3134            buffer = array.array('i', [0]*10)
3135            expected = list(arr) + [0] * (10 - len(arr))
3136            self.assertEqual(conn.send_bytes(arr), None)
3137            self.assertEqual(conn.recv_bytes_into(buffer),
3138                             len(arr) * buffer.itemsize)
3139            self.assertEqual(list(buffer), expected)
3140
3141            buffer = array.array('i', [0]*10)
3142            expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
3143            self.assertEqual(conn.send_bytes(arr), None)
3144            self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
3145                             len(arr) * buffer.itemsize)
3146            self.assertEqual(list(buffer), expected)
3147
3148            buffer = bytearray(latin(' ' * 40))
3149            self.assertEqual(conn.send_bytes(longmsg), None)
3150            try:
3151                res = conn.recv_bytes_into(buffer)
3152            except multiprocessing.BufferTooShort as e:
3153                self.assertEqual(e.args, (longmsg,))
3154            else:
3155                self.fail('expected BufferTooShort, got %s' % res)
3156
3157        poll = TimingWrapper(conn.poll)
3158
3159        self.assertEqual(poll(), False)
3160        self.assertTimingAlmostEqual(poll.elapsed, 0)
3161
3162        self.assertEqual(poll(-1), False)
3163        self.assertTimingAlmostEqual(poll.elapsed, 0)
3164
3165        self.assertEqual(poll(TIMEOUT1), False)
3166        self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
3167
3168        conn.send(None)
3169        time.sleep(.1)
3170
3171        self.assertEqual(poll(TIMEOUT1), True)
3172        self.assertTimingAlmostEqual(poll.elapsed, 0)
3173
3174        self.assertEqual(conn.recv(), None)
3175
3176        really_big_msg = latin('X') * (1024 * 1024 * 16)   # 16Mb
3177        conn.send_bytes(really_big_msg)
3178        self.assertEqual(conn.recv_bytes(), really_big_msg)
3179
3180        conn.send_bytes(SENTINEL)                          # tell child to quit
3181        child_conn.close()
3182
3183        if self.TYPE == 'processes':
3184            self.assertEqual(conn.readable, True)
3185            self.assertEqual(conn.writable, True)
3186            self.assertRaises(EOFError, conn.recv)
3187            self.assertRaises(EOFError, conn.recv_bytes)
3188
3189        p.join()
3190
3191    def test_duplex_false(self):
3192        reader, writer = self.Pipe(duplex=False)
3193        self.assertEqual(writer.send(1), None)
3194        self.assertEqual(reader.recv(), 1)
3195        if self.TYPE == 'processes':
3196            self.assertEqual(reader.readable, True)
3197            self.assertEqual(reader.writable, False)
3198            self.assertEqual(writer.readable, False)
3199            self.assertEqual(writer.writable, True)
3200            self.assertRaises(OSError, reader.send, 2)
3201            self.assertRaises(OSError, writer.recv)
3202            self.assertRaises(OSError, writer.poll)
3203
3204    def test_spawn_close(self):
3205        # We test that a pipe connection can be closed by parent
3206        # process immediately after child is spawned.  On Windows this
3207        # would have sometimes failed on old versions because
3208        # child_conn would be closed before the child got a chance to
3209        # duplicate it.
3210        conn, child_conn = self.Pipe()
3211
3212        p = self.Process(target=self._echo, args=(child_conn,))
3213        p.daemon = True
3214        p.start()
3215        child_conn.close()    # this might complete before child initializes
3216
3217        msg = latin('hello')
3218        conn.send_bytes(msg)
3219        self.assertEqual(conn.recv_bytes(), msg)
3220
3221        conn.send_bytes(SENTINEL)
3222        conn.close()
3223        p.join()
3224
3225    def test_sendbytes(self):
3226        if self.TYPE != 'processes':
3227            self.skipTest('test not appropriate for {}'.format(self.TYPE))
3228
3229        msg = latin('abcdefghijklmnopqrstuvwxyz')
3230        a, b = self.Pipe()
3231
3232        a.send_bytes(msg)
3233        self.assertEqual(b.recv_bytes(), msg)
3234
3235        a.send_bytes(msg, 5)
3236        self.assertEqual(b.recv_bytes(), msg[5:])
3237
3238        a.send_bytes(msg, 7, 8)
3239        self.assertEqual(b.recv_bytes(), msg[7:7+8])
3240
3241        a.send_bytes(msg, 26)
3242        self.assertEqual(b.recv_bytes(), latin(''))
3243
3244        a.send_bytes(msg, 26, 0)
3245        self.assertEqual(b.recv_bytes(), latin(''))
3246
3247        self.assertRaises(ValueError, a.send_bytes, msg, 27)
3248
3249        self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
3250
3251        self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
3252
3253        self.assertRaises(ValueError, a.send_bytes, msg, -1)
3254
3255        self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
3256
3257    @classmethod
3258    def _is_fd_assigned(cls, fd):
3259        try:
3260            os.fstat(fd)
3261        except OSError as e:
3262            if e.errno == errno.EBADF:
3263                return False
3264            raise
3265        else:
3266            return True
3267
3268    @classmethod
3269    def _writefd(cls, conn, data, create_dummy_fds=False):
3270        if create_dummy_fds:
3271            for i in range(0, 256):
3272                if not cls._is_fd_assigned(i):
3273                    os.dup2(conn.fileno(), i)
3274        fd = reduction.recv_handle(conn)
3275        if msvcrt:
3276            fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
3277        os.write(fd, data)
3278        os.close(fd)
3279
3280    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
3281    def test_fd_transfer(self):
3282        if self.TYPE != 'processes':
3283            self.skipTest("only makes sense with processes")
3284        conn, child_conn = self.Pipe(duplex=True)
3285
3286        p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
3287        p.daemon = True
3288        p.start()
3289        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
3290        with open(os_helper.TESTFN, "wb") as f:
3291            fd = f.fileno()
3292            if msvcrt:
3293                fd = msvcrt.get_osfhandle(fd)
3294            reduction.send_handle(conn, fd, p.pid)
3295        p.join()
3296        with open(os_helper.TESTFN, "rb") as f:
3297            self.assertEqual(f.read(), b"foo")
3298
3299    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
3300    @unittest.skipIf(sys.platform == "win32",
3301                     "test semantics don't make sense on Windows")
3302    @unittest.skipIf(MAXFD <= 256,
3303                     "largest assignable fd number is too small")
3304    @unittest.skipUnless(hasattr(os, "dup2"),
3305                         "test needs os.dup2()")
3306    def test_large_fd_transfer(self):
3307        # With fd > 256 (issue #11657)
3308        if self.TYPE != 'processes':
3309            self.skipTest("only makes sense with processes")
3310        conn, child_conn = self.Pipe(duplex=True)
3311
3312        p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
3313        p.daemon = True
3314        p.start()
3315        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
3316        with open(os_helper.TESTFN, "wb") as f:
3317            fd = f.fileno()
3318            for newfd in range(256, MAXFD):
3319                if not self._is_fd_assigned(newfd):
3320                    break
3321            else:
3322                self.fail("could not find an unassigned large file descriptor")
3323            os.dup2(fd, newfd)
3324            try:
3325                reduction.send_handle(conn, newfd, p.pid)
3326            finally:
3327                os.close(newfd)
3328        p.join()
3329        with open(os_helper.TESTFN, "rb") as f:
3330            self.assertEqual(f.read(), b"bar")
3331
3332    @classmethod
3333    def _send_data_without_fd(self, conn):
3334        os.write(conn.fileno(), b"\0")
3335
3336    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
3337    @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
3338    def test_missing_fd_transfer(self):
3339        # Check that exception is raised when received data is not
3340        # accompanied by a file descriptor in ancillary data.
3341        if self.TYPE != 'processes':
3342            self.skipTest("only makes sense with processes")
3343        conn, child_conn = self.Pipe(duplex=True)
3344
3345        p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
3346        p.daemon = True
3347        p.start()
3348        self.assertRaises(RuntimeError, reduction.recv_handle, conn)
3349        p.join()
3350
3351    def test_context(self):
3352        a, b = self.Pipe()
3353
3354        with a, b:
3355            a.send(1729)
3356            self.assertEqual(b.recv(), 1729)
3357            if self.TYPE == 'processes':
3358                self.assertFalse(a.closed)
3359                self.assertFalse(b.closed)
3360
3361        if self.TYPE == 'processes':
3362            self.assertTrue(a.closed)
3363            self.assertTrue(b.closed)
3364            self.assertRaises(OSError, a.recv)
3365            self.assertRaises(OSError, b.recv)
3366
3367class _TestListener(BaseTestCase):
3368
3369    ALLOWED_TYPES = ('processes',)
3370
3371    def test_multiple_bind(self):
3372        for family in self.connection.families:
3373            l = self.connection.Listener(family=family)
3374            self.addCleanup(l.close)
3375            self.assertRaises(OSError, self.connection.Listener,
3376                              l.address, family)
3377
3378    def test_context(self):
3379        with self.connection.Listener() as l:
3380            with self.connection.Client(l.address) as c:
3381                with l.accept() as d:
3382                    c.send(1729)
3383                    self.assertEqual(d.recv(), 1729)
3384
3385        if self.TYPE == 'processes':
3386            self.assertRaises(OSError, l.accept)
3387
3388    @unittest.skipUnless(util.abstract_sockets_supported,
3389                         "test needs abstract socket support")
3390    def test_abstract_socket(self):
3391        with self.connection.Listener("\0something") as listener:
3392            with self.connection.Client(listener.address) as client:
3393                with listener.accept() as d:
3394                    client.send(1729)
3395                    self.assertEqual(d.recv(), 1729)
3396
3397        if self.TYPE == 'processes':
3398            self.assertRaises(OSError, listener.accept)
3399
3400
3401class _TestListenerClient(BaseTestCase):
3402
3403    ALLOWED_TYPES = ('processes', 'threads')
3404
3405    @classmethod
3406    def _test(cls, address):
3407        conn = cls.connection.Client(address)
3408        conn.send('hello')
3409        conn.close()
3410
3411    def test_listener_client(self):
3412        for family in self.connection.families:
3413            l = self.connection.Listener(family=family)
3414            p = self.Process(target=self._test, args=(l.address,))
3415            p.daemon = True
3416            p.start()
3417            conn = l.accept()
3418            self.assertEqual(conn.recv(), 'hello')
3419            p.join()
3420            l.close()
3421
3422    def test_issue14725(self):
3423        l = self.connection.Listener()
3424        p = self.Process(target=self._test, args=(l.address,))
3425        p.daemon = True
3426        p.start()
3427        time.sleep(1)
3428        # On Windows the client process should by now have connected,
3429        # written data and closed the pipe handle by now.  This causes
3430        # ConnectNamdedPipe() to fail with ERROR_NO_DATA.  See Issue
3431        # 14725.
3432        conn = l.accept()
3433        self.assertEqual(conn.recv(), 'hello')
3434        conn.close()
3435        p.join()
3436        l.close()
3437
3438    def test_issue16955(self):
3439        for fam in self.connection.families:
3440            l = self.connection.Listener(family=fam)
3441            c = self.connection.Client(l.address)
3442            a = l.accept()
3443            a.send_bytes(b"hello")
3444            self.assertTrue(c.poll(1))
3445            a.close()
3446            c.close()
3447            l.close()
3448
3449class _TestPoll(BaseTestCase):
3450
3451    ALLOWED_TYPES = ('processes', 'threads')
3452
3453    def test_empty_string(self):
3454        a, b = self.Pipe()
3455        self.assertEqual(a.poll(), False)
3456        b.send_bytes(b'')
3457        self.assertEqual(a.poll(), True)
3458        self.assertEqual(a.poll(), True)
3459
3460    @classmethod
3461    def _child_strings(cls, conn, strings):
3462        for s in strings:
3463            time.sleep(0.1)
3464            conn.send_bytes(s)
3465        conn.close()
3466
3467    def test_strings(self):
3468        strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
3469        a, b = self.Pipe()
3470        p = self.Process(target=self._child_strings, args=(b, strings))
3471        p.start()
3472
3473        for s in strings:
3474            for i in range(200):
3475                if a.poll(0.01):
3476                    break
3477            x = a.recv_bytes()
3478            self.assertEqual(s, x)
3479
3480        p.join()
3481
3482    @classmethod
3483    def _child_boundaries(cls, r):
3484        # Polling may "pull" a message in to the child process, but we
3485        # don't want it to pull only part of a message, as that would
3486        # corrupt the pipe for any other processes which might later
3487        # read from it.
3488        r.poll(5)
3489
3490    def test_boundaries(self):
3491        r, w = self.Pipe(False)
3492        p = self.Process(target=self._child_boundaries, args=(r,))
3493        p.start()
3494        time.sleep(2)
3495        L = [b"first", b"second"]
3496        for obj in L:
3497            w.send_bytes(obj)
3498        w.close()
3499        p.join()
3500        self.assertIn(r.recv_bytes(), L)
3501
3502    @classmethod
3503    def _child_dont_merge(cls, b):
3504        b.send_bytes(b'a')
3505        b.send_bytes(b'b')
3506        b.send_bytes(b'cd')
3507
3508    def test_dont_merge(self):
3509        a, b = self.Pipe()
3510        self.assertEqual(a.poll(0.0), False)
3511        self.assertEqual(a.poll(0.1), False)
3512
3513        p = self.Process(target=self._child_dont_merge, args=(b,))
3514        p.start()
3515
3516        self.assertEqual(a.recv_bytes(), b'a')
3517        self.assertEqual(a.poll(1.0), True)
3518        self.assertEqual(a.poll(1.0), True)
3519        self.assertEqual(a.recv_bytes(), b'b')
3520        self.assertEqual(a.poll(1.0), True)
3521        self.assertEqual(a.poll(1.0), True)
3522        self.assertEqual(a.poll(0.0), True)
3523        self.assertEqual(a.recv_bytes(), b'cd')
3524
3525        p.join()
3526
3527#
3528# Test of sending connection and socket objects between processes
3529#
3530
3531@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
3532@hashlib_helper.requires_hashdigest('md5')
3533class _TestPicklingConnections(BaseTestCase):
3534
3535    ALLOWED_TYPES = ('processes',)
3536
3537    @classmethod
3538    def tearDownClass(cls):
3539        from multiprocessing import resource_sharer
3540        resource_sharer.stop(timeout=support.LONG_TIMEOUT)
3541
3542    @classmethod
3543    def _listener(cls, conn, families):
3544        for fam in families:
3545            l = cls.connection.Listener(family=fam)
3546            conn.send(l.address)
3547            new_conn = l.accept()
3548            conn.send(new_conn)
3549            new_conn.close()
3550            l.close()
3551
3552        l = socket.create_server((socket_helper.HOST, 0))
3553        conn.send(l.getsockname())
3554        new_conn, addr = l.accept()
3555        conn.send(new_conn)
3556        new_conn.close()
3557        l.close()
3558
3559        conn.recv()
3560
3561    @classmethod
3562    def _remote(cls, conn):
3563        for (address, msg) in iter(conn.recv, None):
3564            client = cls.connection.Client(address)
3565            client.send(msg.upper())
3566            client.close()
3567
3568        address, msg = conn.recv()
3569        client = socket.socket()
3570        client.connect(address)
3571        client.sendall(msg.upper())
3572        client.close()
3573
3574        conn.close()
3575
3576    def test_pickling(self):
3577        families = self.connection.families
3578
3579        lconn, lconn0 = self.Pipe()
3580        lp = self.Process(target=self._listener, args=(lconn0, families))
3581        lp.daemon = True
3582        lp.start()
3583        lconn0.close()
3584
3585        rconn, rconn0 = self.Pipe()
3586        rp = self.Process(target=self._remote, args=(rconn0,))
3587        rp.daemon = True
3588        rp.start()
3589        rconn0.close()
3590
3591        for fam in families:
3592            msg = ('This connection uses family %s' % fam).encode('ascii')
3593            address = lconn.recv()
3594            rconn.send((address, msg))
3595            new_conn = lconn.recv()
3596            self.assertEqual(new_conn.recv(), msg.upper())
3597
3598        rconn.send(None)
3599
3600        msg = latin('This connection uses a normal socket')
3601        address = lconn.recv()
3602        rconn.send((address, msg))
3603        new_conn = lconn.recv()
3604        buf = []
3605        while True:
3606            s = new_conn.recv(100)
3607            if not s:
3608                break
3609            buf.append(s)
3610        buf = b''.join(buf)
3611        self.assertEqual(buf, msg.upper())
3612        new_conn.close()
3613
3614        lconn.send(None)
3615
3616        rconn.close()
3617        lconn.close()
3618
3619        lp.join()
3620        rp.join()
3621
3622    @classmethod
3623    def child_access(cls, conn):
3624        w = conn.recv()
3625        w.send('all is well')
3626        w.close()
3627
3628        r = conn.recv()
3629        msg = r.recv()
3630        conn.send(msg*2)
3631
3632        conn.close()
3633
3634    def test_access(self):
3635        # On Windows, if we do not specify a destination pid when
3636        # using DupHandle then we need to be careful to use the
3637        # correct access flags for DuplicateHandle(), or else
3638        # DupHandle.detach() will raise PermissionError.  For example,
3639        # for a read only pipe handle we should use
3640        # access=FILE_GENERIC_READ.  (Unfortunately
3641        # DUPLICATE_SAME_ACCESS does not work.)
3642        conn, child_conn = self.Pipe()
3643        p = self.Process(target=self.child_access, args=(child_conn,))
3644        p.daemon = True
3645        p.start()
3646        child_conn.close()
3647
3648        r, w = self.Pipe(duplex=False)
3649        conn.send(w)
3650        w.close()
3651        self.assertEqual(r.recv(), 'all is well')
3652        r.close()
3653
3654        r, w = self.Pipe(duplex=False)
3655        conn.send(r)
3656        r.close()
3657        w.send('foobar')
3658        w.close()
3659        self.assertEqual(conn.recv(), 'foobar'*2)
3660
3661        p.join()
3662
3663#
3664#
3665#
3666
3667class _TestHeap(BaseTestCase):
3668
3669    ALLOWED_TYPES = ('processes',)
3670
3671    def setUp(self):
3672        super().setUp()
3673        # Make pristine heap for these tests
3674        self.old_heap = multiprocessing.heap.BufferWrapper._heap
3675        multiprocessing.heap.BufferWrapper._heap = multiprocessing.heap.Heap()
3676
3677    def tearDown(self):
3678        multiprocessing.heap.BufferWrapper._heap = self.old_heap
3679        super().tearDown()
3680
3681    def test_heap(self):
3682        iterations = 5000
3683        maxblocks = 50
3684        blocks = []
3685
3686        # get the heap object
3687        heap = multiprocessing.heap.BufferWrapper._heap
3688        heap._DISCARD_FREE_SPACE_LARGER_THAN = 0
3689
3690        # create and destroy lots of blocks of different sizes
3691        for i in range(iterations):
3692            size = int(random.lognormvariate(0, 1) * 1000)
3693            b = multiprocessing.heap.BufferWrapper(size)
3694            blocks.append(b)
3695            if len(blocks) > maxblocks:
3696                i = random.randrange(maxblocks)
3697                del blocks[i]
3698            del b
3699
3700        # verify the state of the heap
3701        with heap._lock:
3702            all = []
3703            free = 0
3704            occupied = 0
3705            for L in list(heap._len_to_seq.values()):
3706                # count all free blocks in arenas
3707                for arena, start, stop in L:
3708                    all.append((heap._arenas.index(arena), start, stop,
3709                                stop-start, 'free'))
3710                    free += (stop-start)
3711            for arena, arena_blocks in heap._allocated_blocks.items():
3712                # count all allocated blocks in arenas
3713                for start, stop in arena_blocks:
3714                    all.append((heap._arenas.index(arena), start, stop,
3715                                stop-start, 'occupied'))
3716                    occupied += (stop-start)
3717
3718            self.assertEqual(free + occupied,
3719                             sum(arena.size for arena in heap._arenas))
3720
3721            all.sort()
3722
3723            for i in range(len(all)-1):
3724                (arena, start, stop) = all[i][:3]
3725                (narena, nstart, nstop) = all[i+1][:3]
3726                if arena != narena:
3727                    # Two different arenas
3728                    self.assertEqual(stop, heap._arenas[arena].size)  # last block
3729                    self.assertEqual(nstart, 0)         # first block
3730                else:
3731                    # Same arena: two adjacent blocks
3732                    self.assertEqual(stop, nstart)
3733
3734        # test free'ing all blocks
3735        random.shuffle(blocks)
3736        while blocks:
3737            blocks.pop()
3738
3739        self.assertEqual(heap._n_frees, heap._n_mallocs)
3740        self.assertEqual(len(heap._pending_free_blocks), 0)
3741        self.assertEqual(len(heap._arenas), 0)
3742        self.assertEqual(len(heap._allocated_blocks), 0, heap._allocated_blocks)
3743        self.assertEqual(len(heap._len_to_seq), 0)
3744
3745    def test_free_from_gc(self):
3746        # Check that freeing of blocks by the garbage collector doesn't deadlock
3747        # (issue #12352).
3748        # Make sure the GC is enabled, and set lower collection thresholds to
3749        # make collections more frequent (and increase the probability of
3750        # deadlock).
3751        if not gc.isenabled():
3752            gc.enable()
3753            self.addCleanup(gc.disable)
3754        thresholds = gc.get_threshold()
3755        self.addCleanup(gc.set_threshold, *thresholds)
3756        gc.set_threshold(10)
3757
3758        # perform numerous block allocations, with cyclic references to make
3759        # sure objects are collected asynchronously by the gc
3760        for i in range(5000):
3761            a = multiprocessing.heap.BufferWrapper(1)
3762            b = multiprocessing.heap.BufferWrapper(1)
3763            # circular references
3764            a.buddy = b
3765            b.buddy = a
3766
3767#
3768#
3769#
3770
3771class _Foo(Structure):
3772    _fields_ = [
3773        ('x', c_int),
3774        ('y', c_double),
3775        ('z', c_longlong,)
3776        ]
3777
3778class _TestSharedCTypes(BaseTestCase):
3779
3780    ALLOWED_TYPES = ('processes',)
3781
3782    def setUp(self):
3783        if not HAS_SHAREDCTYPES:
3784            self.skipTest("requires multiprocessing.sharedctypes")
3785
3786    @classmethod
3787    def _double(cls, x, y, z, foo, arr, string):
3788        x.value *= 2
3789        y.value *= 2
3790        z.value *= 2
3791        foo.x *= 2
3792        foo.y *= 2
3793        string.value *= 2
3794        for i in range(len(arr)):
3795            arr[i] *= 2
3796
3797    def test_sharedctypes(self, lock=False):
3798        x = Value('i', 7, lock=lock)
3799        y = Value(c_double, 1.0/3.0, lock=lock)
3800        z = Value(c_longlong, 2 ** 33, lock=lock)
3801        foo = Value(_Foo, 3, 2, lock=lock)
3802        arr = self.Array('d', list(range(10)), lock=lock)
3803        string = self.Array('c', 20, lock=lock)
3804        string.value = latin('hello')
3805
3806        p = self.Process(target=self._double, args=(x, y, z, foo, arr, string))
3807        p.daemon = True
3808        p.start()
3809        p.join()
3810
3811        self.assertEqual(x.value, 14)
3812        self.assertAlmostEqual(y.value, 2.0/3.0)
3813        self.assertEqual(z.value, 2 ** 34)
3814        self.assertEqual(foo.x, 6)
3815        self.assertAlmostEqual(foo.y, 4.0)
3816        for i in range(10):
3817            self.assertAlmostEqual(arr[i], i*2)
3818        self.assertEqual(string.value, latin('hellohello'))
3819
3820    def test_synchronize(self):
3821        self.test_sharedctypes(lock=True)
3822
3823    def test_copy(self):
3824        foo = _Foo(2, 5.0, 2 ** 33)
3825        bar = copy(foo)
3826        foo.x = 0
3827        foo.y = 0
3828        foo.z = 0
3829        self.assertEqual(bar.x, 2)
3830        self.assertAlmostEqual(bar.y, 5.0)
3831        self.assertEqual(bar.z, 2 ** 33)
3832
3833
3834@unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory")
3835@hashlib_helper.requires_hashdigest('md5')
3836class _TestSharedMemory(BaseTestCase):
3837
3838    ALLOWED_TYPES = ('processes',)
3839
3840    @staticmethod
3841    def _attach_existing_shmem_then_write(shmem_name_or_obj, binary_data):
3842        if isinstance(shmem_name_or_obj, str):
3843            local_sms = shared_memory.SharedMemory(shmem_name_or_obj)
3844        else:
3845            local_sms = shmem_name_or_obj
3846        local_sms.buf[:len(binary_data)] = binary_data
3847        local_sms.close()
3848
3849    def _new_shm_name(self, prefix):
3850        # Add a PID to the name of a POSIX shared memory object to allow
3851        # running multiprocessing tests (test_multiprocessing_fork,
3852        # test_multiprocessing_spawn, etc) in parallel.
3853        return prefix + str(os.getpid())
3854
3855    def test_shared_memory_basics(self):
3856        name_tsmb = self._new_shm_name('test01_tsmb')
3857        sms = shared_memory.SharedMemory(name_tsmb, create=True, size=512)
3858        self.addCleanup(sms.unlink)
3859
3860        # Verify attributes are readable.
3861        self.assertEqual(sms.name, name_tsmb)
3862        self.assertGreaterEqual(sms.size, 512)
3863        self.assertGreaterEqual(len(sms.buf), sms.size)
3864
3865        # Verify __repr__
3866        self.assertIn(sms.name, str(sms))
3867        self.assertIn(str(sms.size), str(sms))
3868
3869        # Modify contents of shared memory segment through memoryview.
3870        sms.buf[0] = 42
3871        self.assertEqual(sms.buf[0], 42)
3872
3873        # Attach to existing shared memory segment.
3874        also_sms = shared_memory.SharedMemory(name_tsmb)
3875        self.assertEqual(also_sms.buf[0], 42)
3876        also_sms.close()
3877
3878        # Attach to existing shared memory segment but specify a new size.
3879        same_sms = shared_memory.SharedMemory(name_tsmb, size=20*sms.size)
3880        self.assertLess(same_sms.size, 20*sms.size)  # Size was ignored.
3881        same_sms.close()
3882
3883        # Creating Shared Memory Segment with -ve size
3884        with self.assertRaises(ValueError):
3885            shared_memory.SharedMemory(create=True, size=-2)
3886
3887        # Attaching Shared Memory Segment without a name
3888        with self.assertRaises(ValueError):
3889            shared_memory.SharedMemory(create=False)
3890
3891        # Test if shared memory segment is created properly,
3892        # when _make_filename returns an existing shared memory segment name
3893        with unittest.mock.patch(
3894            'multiprocessing.shared_memory._make_filename') as mock_make_filename:
3895
3896            NAME_PREFIX = shared_memory._SHM_NAME_PREFIX
3897            names = [self._new_shm_name('test01_fn'), self._new_shm_name('test02_fn')]
3898            # Prepend NAME_PREFIX which can be '/psm_' or 'wnsm_', necessary
3899            # because some POSIX compliant systems require name to start with /
3900            names = [NAME_PREFIX + name for name in names]
3901
3902            mock_make_filename.side_effect = names
3903            shm1 = shared_memory.SharedMemory(create=True, size=1)
3904            self.addCleanup(shm1.unlink)
3905            self.assertEqual(shm1._name, names[0])
3906
3907            mock_make_filename.side_effect = names
3908            shm2 = shared_memory.SharedMemory(create=True, size=1)
3909            self.addCleanup(shm2.unlink)
3910            self.assertEqual(shm2._name, names[1])
3911
3912        if shared_memory._USE_POSIX:
3913            # Posix Shared Memory can only be unlinked once.  Here we
3914            # test an implementation detail that is not observed across
3915            # all supported platforms (since WindowsNamedSharedMemory
3916            # manages unlinking on its own and unlink() does nothing).
3917            # True release of shared memory segment does not necessarily
3918            # happen until process exits, depending on the OS platform.
3919            name_dblunlink = self._new_shm_name('test01_dblunlink')
3920            sms_uno = shared_memory.SharedMemory(
3921                name_dblunlink,
3922                create=True,
3923                size=5000
3924            )
3925            with self.assertRaises(FileNotFoundError):
3926                try:
3927                    self.assertGreaterEqual(sms_uno.size, 5000)
3928
3929                    sms_duo = shared_memory.SharedMemory(name_dblunlink)
3930                    sms_duo.unlink()  # First shm_unlink() call.
3931                    sms_duo.close()
3932                    sms_uno.close()
3933
3934                finally:
3935                    sms_uno.unlink()  # A second shm_unlink() call is bad.
3936
3937        with self.assertRaises(FileExistsError):
3938            # Attempting to create a new shared memory segment with a
3939            # name that is already in use triggers an exception.
3940            there_can_only_be_one_sms = shared_memory.SharedMemory(
3941                name_tsmb,
3942                create=True,
3943                size=512
3944            )
3945
3946        if shared_memory._USE_POSIX:
3947            # Requesting creation of a shared memory segment with the option
3948            # to attach to an existing segment, if that name is currently in
3949            # use, should not trigger an exception.
3950            # Note:  Using a smaller size could possibly cause truncation of
3951            # the existing segment but is OS platform dependent.  In the
3952            # case of MacOS/darwin, requesting a smaller size is disallowed.
3953            class OptionalAttachSharedMemory(shared_memory.SharedMemory):
3954                _flags = os.O_CREAT | os.O_RDWR
3955            ok_if_exists_sms = OptionalAttachSharedMemory(name_tsmb)
3956            self.assertEqual(ok_if_exists_sms.size, sms.size)
3957            ok_if_exists_sms.close()
3958
3959        # Attempting to attach to an existing shared memory segment when
3960        # no segment exists with the supplied name triggers an exception.
3961        with self.assertRaises(FileNotFoundError):
3962            nonexisting_sms = shared_memory.SharedMemory('test01_notthere')
3963            nonexisting_sms.unlink()  # Error should occur on prior line.
3964
3965        sms.close()
3966
3967    def test_shared_memory_recreate(self):
3968        # Test if shared memory segment is created properly,
3969        # when _make_filename returns an existing shared memory segment name
3970        with unittest.mock.patch(
3971            'multiprocessing.shared_memory._make_filename') as mock_make_filename:
3972
3973            NAME_PREFIX = shared_memory._SHM_NAME_PREFIX
3974            names = [self._new_shm_name('test03_fn'), self._new_shm_name('test04_fn')]
3975            # Prepend NAME_PREFIX which can be '/psm_' or 'wnsm_', necessary
3976            # because some POSIX compliant systems require name to start with /
3977            names = [NAME_PREFIX + name for name in names]
3978
3979            mock_make_filename.side_effect = names
3980            shm1 = shared_memory.SharedMemory(create=True, size=1)
3981            self.addCleanup(shm1.unlink)
3982            self.assertEqual(shm1._name, names[0])
3983
3984            mock_make_filename.side_effect = names
3985            shm2 = shared_memory.SharedMemory(create=True, size=1)
3986            self.addCleanup(shm2.unlink)
3987            self.assertEqual(shm2._name, names[1])
3988
3989    def test_invalid_shared_memory_cration(self):
3990        # Test creating a shared memory segment with negative size
3991        with self.assertRaises(ValueError):
3992            sms_invalid = shared_memory.SharedMemory(create=True, size=-1)
3993
3994        # Test creating a shared memory segment with size 0
3995        with self.assertRaises(ValueError):
3996            sms_invalid = shared_memory.SharedMemory(create=True, size=0)
3997
3998        # Test creating a shared memory segment without size argument
3999        with self.assertRaises(ValueError):
4000            sms_invalid = shared_memory.SharedMemory(create=True)
4001
4002    def test_shared_memory_pickle_unpickle(self):
4003        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
4004            with self.subTest(proto=proto):
4005                sms = shared_memory.SharedMemory(create=True, size=512)
4006                self.addCleanup(sms.unlink)
4007                sms.buf[0:6] = b'pickle'
4008
4009                # Test pickling
4010                pickled_sms = pickle.dumps(sms, protocol=proto)
4011
4012                # Test unpickling
4013                sms2 = pickle.loads(pickled_sms)
4014                self.assertIsInstance(sms2, shared_memory.SharedMemory)
4015                self.assertEqual(sms.name, sms2.name)
4016                self.assertEqual(bytes(sms.buf[0:6]), b'pickle')
4017                self.assertEqual(bytes(sms2.buf[0:6]), b'pickle')
4018
4019                # Test that unpickled version is still the same SharedMemory
4020                sms.buf[0:6] = b'newval'
4021                self.assertEqual(bytes(sms.buf[0:6]), b'newval')
4022                self.assertEqual(bytes(sms2.buf[0:6]), b'newval')
4023
4024                sms2.buf[0:6] = b'oldval'
4025                self.assertEqual(bytes(sms.buf[0:6]), b'oldval')
4026                self.assertEqual(bytes(sms2.buf[0:6]), b'oldval')
4027
4028    def test_shared_memory_pickle_unpickle_dead_object(self):
4029        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
4030            with self.subTest(proto=proto):
4031                sms = shared_memory.SharedMemory(create=True, size=512)
4032                sms.buf[0:6] = b'pickle'
4033                pickled_sms = pickle.dumps(sms, protocol=proto)
4034
4035                # Now, we are going to kill the original object.
4036                # So, unpickled one won't be able to attach to it.
4037                sms.close()
4038                sms.unlink()
4039
4040                with self.assertRaises(FileNotFoundError):
4041                    pickle.loads(pickled_sms)
4042
4043    def test_shared_memory_across_processes(self):
4044        # bpo-40135: don't define shared memory block's name in case of
4045        # the failure when we run multiprocessing tests in parallel.
4046        sms = shared_memory.SharedMemory(create=True, size=512)
4047        self.addCleanup(sms.unlink)
4048
4049        # Verify remote attachment to existing block by name is working.
4050        p = self.Process(
4051            target=self._attach_existing_shmem_then_write,
4052            args=(sms.name, b'howdy')
4053        )
4054        p.daemon = True
4055        p.start()
4056        p.join()
4057        self.assertEqual(bytes(sms.buf[:5]), b'howdy')
4058
4059        # Verify pickling of SharedMemory instance also works.
4060        p = self.Process(
4061            target=self._attach_existing_shmem_then_write,
4062            args=(sms, b'HELLO')
4063        )
4064        p.daemon = True
4065        p.start()
4066        p.join()
4067        self.assertEqual(bytes(sms.buf[:5]), b'HELLO')
4068
4069        sms.close()
4070
4071    @unittest.skipIf(os.name != "posix", "not feasible in non-posix platforms")
4072    def test_shared_memory_SharedMemoryServer_ignores_sigint(self):
4073        # bpo-36368: protect SharedMemoryManager server process from
4074        # KeyboardInterrupt signals.
4075        smm = multiprocessing.managers.SharedMemoryManager()
4076        smm.start()
4077
4078        # make sure the manager works properly at the beginning
4079        sl = smm.ShareableList(range(10))
4080
4081        # the manager's server should ignore KeyboardInterrupt signals, and
4082        # maintain its connection with the current process, and success when
4083        # asked to deliver memory segments.
4084        os.kill(smm._process.pid, signal.SIGINT)
4085
4086        sl2 = smm.ShareableList(range(10))
4087
4088        # test that the custom signal handler registered in the Manager does
4089        # not affect signal handling in the parent process.
4090        with self.assertRaises(KeyboardInterrupt):
4091            os.kill(os.getpid(), signal.SIGINT)
4092
4093        smm.shutdown()
4094
4095    @unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
4096    def test_shared_memory_SharedMemoryManager_reuses_resource_tracker(self):
4097        # bpo-36867: test that a SharedMemoryManager uses the
4098        # same resource_tracker process as its parent.
4099        cmd = '''if 1:
4100            from multiprocessing.managers import SharedMemoryManager
4101
4102
4103            smm = SharedMemoryManager()
4104            smm.start()
4105            sl = smm.ShareableList(range(10))
4106            smm.shutdown()
4107        '''
4108        rc, out, err = test.support.script_helper.assert_python_ok('-c', cmd)
4109
4110        # Before bpo-36867 was fixed, a SharedMemoryManager not using the same
4111        # resource_tracker process as its parent would make the parent's
4112        # tracker complain about sl being leaked even though smm.shutdown()
4113        # properly released sl.
4114        self.assertFalse(err)
4115
4116    def test_shared_memory_SharedMemoryManager_basics(self):
4117        smm1 = multiprocessing.managers.SharedMemoryManager()
4118        with self.assertRaises(ValueError):
4119            smm1.SharedMemory(size=9)  # Fails if SharedMemoryServer not started
4120        smm1.start()
4121        lol = [ smm1.ShareableList(range(i)) for i in range(5, 10) ]
4122        lom = [ smm1.SharedMemory(size=j) for j in range(32, 128, 16) ]
4123        doppleganger_list0 = shared_memory.ShareableList(name=lol[0].shm.name)
4124        self.assertEqual(len(doppleganger_list0), 5)
4125        doppleganger_shm0 = shared_memory.SharedMemory(name=lom[0].name)
4126        self.assertGreaterEqual(len(doppleganger_shm0.buf), 32)
4127        held_name = lom[0].name
4128        smm1.shutdown()
4129        if sys.platform != "win32":
4130            # Calls to unlink() have no effect on Windows platform; shared
4131            # memory will only be released once final process exits.
4132            with self.assertRaises(FileNotFoundError):
4133                # No longer there to be attached to again.
4134                absent_shm = shared_memory.SharedMemory(name=held_name)
4135
4136        with multiprocessing.managers.SharedMemoryManager() as smm2:
4137            sl = smm2.ShareableList("howdy")
4138            shm = smm2.SharedMemory(size=128)
4139            held_name = sl.shm.name
4140        if sys.platform != "win32":
4141            with self.assertRaises(FileNotFoundError):
4142                # No longer there to be attached to again.
4143                absent_sl = shared_memory.ShareableList(name=held_name)
4144
4145
4146    def test_shared_memory_ShareableList_basics(self):
4147        sl = shared_memory.ShareableList(
4148            ['howdy', b'HoWdY', -273.154, 100, None, True, 42]
4149        )
4150        self.addCleanup(sl.shm.unlink)
4151
4152        # Verify __repr__
4153        self.assertIn(sl.shm.name, str(sl))
4154        self.assertIn(str(list(sl)), str(sl))
4155
4156        # Index Out of Range (get)
4157        with self.assertRaises(IndexError):
4158            sl[7]
4159
4160        # Index Out of Range (set)
4161        with self.assertRaises(IndexError):
4162            sl[7] = 2
4163
4164        # Assign value without format change (str -> str)
4165        current_format = sl._get_packing_format(0)
4166        sl[0] = 'howdy'
4167        self.assertEqual(current_format, sl._get_packing_format(0))
4168
4169        # Verify attributes are readable.
4170        self.assertEqual(sl.format, '8s8sdqxxxxxx?xxxxxxxx?q')
4171
4172        # Exercise len().
4173        self.assertEqual(len(sl), 7)
4174
4175        # Exercise index().
4176        with warnings.catch_warnings():
4177            # Suppress BytesWarning when comparing against b'HoWdY'.
4178            warnings.simplefilter('ignore')
4179            with self.assertRaises(ValueError):
4180                sl.index('100')
4181            self.assertEqual(sl.index(100), 3)
4182
4183        # Exercise retrieving individual values.
4184        self.assertEqual(sl[0], 'howdy')
4185        self.assertEqual(sl[-2], True)
4186
4187        # Exercise iterability.
4188        self.assertEqual(
4189            tuple(sl),
4190            ('howdy', b'HoWdY', -273.154, 100, None, True, 42)
4191        )
4192
4193        # Exercise modifying individual values.
4194        sl[3] = 42
4195        self.assertEqual(sl[3], 42)
4196        sl[4] = 'some'  # Change type at a given position.
4197        self.assertEqual(sl[4], 'some')
4198        self.assertEqual(sl.format, '8s8sdq8sxxxxxxx?q')
4199        with self.assertRaisesRegex(ValueError,
4200                                    "exceeds available storage"):
4201            sl[4] = 'far too many'
4202        self.assertEqual(sl[4], 'some')
4203        sl[0] = 'encodés'  # Exactly 8 bytes of UTF-8 data
4204        self.assertEqual(sl[0], 'encodés')
4205        self.assertEqual(sl[1], b'HoWdY')  # no spillage
4206        with self.assertRaisesRegex(ValueError,
4207                                    "exceeds available storage"):
4208            sl[0] = 'encodées'  # Exactly 9 bytes of UTF-8 data
4209        self.assertEqual(sl[1], b'HoWdY')
4210        with self.assertRaisesRegex(ValueError,
4211                                    "exceeds available storage"):
4212            sl[1] = b'123456789'
4213        self.assertEqual(sl[1], b'HoWdY')
4214
4215        # Exercise count().
4216        with warnings.catch_warnings():
4217            # Suppress BytesWarning when comparing against b'HoWdY'.
4218            warnings.simplefilter('ignore')
4219            self.assertEqual(sl.count(42), 2)
4220            self.assertEqual(sl.count(b'HoWdY'), 1)
4221            self.assertEqual(sl.count(b'adios'), 0)
4222
4223        # Exercise creating a duplicate.
4224        name_duplicate = self._new_shm_name('test03_duplicate')
4225        sl_copy = shared_memory.ShareableList(sl, name=name_duplicate)
4226        try:
4227            self.assertNotEqual(sl.shm.name, sl_copy.shm.name)
4228            self.assertEqual(name_duplicate, sl_copy.shm.name)
4229            self.assertEqual(list(sl), list(sl_copy))
4230            self.assertEqual(sl.format, sl_copy.format)
4231            sl_copy[-1] = 77
4232            self.assertEqual(sl_copy[-1], 77)
4233            self.assertNotEqual(sl[-1], 77)
4234            sl_copy.shm.close()
4235        finally:
4236            sl_copy.shm.unlink()
4237
4238        # Obtain a second handle on the same ShareableList.
4239        sl_tethered = shared_memory.ShareableList(name=sl.shm.name)
4240        self.assertEqual(sl.shm.name, sl_tethered.shm.name)
4241        sl_tethered[-1] = 880
4242        self.assertEqual(sl[-1], 880)
4243        sl_tethered.shm.close()
4244
4245        sl.shm.close()
4246
4247        # Exercise creating an empty ShareableList.
4248        empty_sl = shared_memory.ShareableList()
4249        try:
4250            self.assertEqual(len(empty_sl), 0)
4251            self.assertEqual(empty_sl.format, '')
4252            self.assertEqual(empty_sl.count('any'), 0)
4253            with self.assertRaises(ValueError):
4254                empty_sl.index(None)
4255            empty_sl.shm.close()
4256        finally:
4257            empty_sl.shm.unlink()
4258
4259    def test_shared_memory_ShareableList_pickling(self):
4260        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
4261            with self.subTest(proto=proto):
4262                sl = shared_memory.ShareableList(range(10))
4263                self.addCleanup(sl.shm.unlink)
4264
4265                serialized_sl = pickle.dumps(sl, protocol=proto)
4266                deserialized_sl = pickle.loads(serialized_sl)
4267                self.assertIsInstance(
4268                    deserialized_sl, shared_memory.ShareableList)
4269                self.assertEqual(deserialized_sl[-1], 9)
4270                self.assertIsNot(sl, deserialized_sl)
4271
4272                deserialized_sl[4] = "changed"
4273                self.assertEqual(sl[4], "changed")
4274                sl[3] = "newvalue"
4275                self.assertEqual(deserialized_sl[3], "newvalue")
4276
4277                larger_sl = shared_memory.ShareableList(range(400))
4278                self.addCleanup(larger_sl.shm.unlink)
4279                serialized_larger_sl = pickle.dumps(larger_sl, protocol=proto)
4280                self.assertEqual(len(serialized_sl), len(serialized_larger_sl))
4281                larger_sl.shm.close()
4282
4283                deserialized_sl.shm.close()
4284                sl.shm.close()
4285
4286    def test_shared_memory_ShareableList_pickling_dead_object(self):
4287        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
4288            with self.subTest(proto=proto):
4289                sl = shared_memory.ShareableList(range(10))
4290                serialized_sl = pickle.dumps(sl, protocol=proto)
4291
4292                # Now, we are going to kill the original object.
4293                # So, unpickled one won't be able to attach to it.
4294                sl.shm.close()
4295                sl.shm.unlink()
4296
4297                with self.assertRaises(FileNotFoundError):
4298                    pickle.loads(serialized_sl)
4299
4300    def test_shared_memory_cleaned_after_process_termination(self):
4301        cmd = '''if 1:
4302            import os, time, sys
4303            from multiprocessing import shared_memory
4304
4305            # Create a shared_memory segment, and send the segment name
4306            sm = shared_memory.SharedMemory(create=True, size=10)
4307            sys.stdout.write(sm.name + '\\n')
4308            sys.stdout.flush()
4309            time.sleep(100)
4310        '''
4311        with subprocess.Popen([sys.executable, '-E', '-c', cmd],
4312                              stdout=subprocess.PIPE,
4313                              stderr=subprocess.PIPE) as p:
4314            name = p.stdout.readline().strip().decode()
4315
4316            # killing abruptly processes holding reference to a shared memory
4317            # segment should not leak the given memory segment.
4318            p.terminate()
4319            p.wait()
4320
4321            deadline = time.monotonic() + support.LONG_TIMEOUT
4322            t = 0.1
4323            while time.monotonic() < deadline:
4324                time.sleep(t)
4325                t = min(t*2, 5)
4326                try:
4327                    smm = shared_memory.SharedMemory(name, create=False)
4328                except FileNotFoundError:
4329                    break
4330            else:
4331                raise AssertionError("A SharedMemory segment was leaked after"
4332                                     " a process was abruptly terminated.")
4333
4334            if os.name == 'posix':
4335                # Without this line it was raising warnings like:
4336                #   UserWarning: resource_tracker:
4337                #   There appear to be 1 leaked shared_memory
4338                #   objects to clean up at shutdown
4339                # See: https://bugs.python.org/issue45209
4340                resource_tracker.unregister(f"/{name}", "shared_memory")
4341
4342                # A warning was emitted by the subprocess' own
4343                # resource_tracker (on Windows, shared memory segments
4344                # are released automatically by the OS).
4345                err = p.stderr.read().decode()
4346                self.assertIn(
4347                    "resource_tracker: There appear to be 1 leaked "
4348                    "shared_memory objects to clean up at shutdown", err)
4349
4350#
4351# Test to verify that `Finalize` works.
4352#
4353
4354class _TestFinalize(BaseTestCase):
4355
4356    ALLOWED_TYPES = ('processes',)
4357
4358    def setUp(self):
4359        self.registry_backup = util._finalizer_registry.copy()
4360        util._finalizer_registry.clear()
4361
4362    def tearDown(self):
4363        gc.collect()  # For PyPy or other GCs.
4364        self.assertFalse(util._finalizer_registry)
4365        util._finalizer_registry.update(self.registry_backup)
4366
4367    @classmethod
4368    def _test_finalize(cls, conn):
4369        class Foo(object):
4370            pass
4371
4372        a = Foo()
4373        util.Finalize(a, conn.send, args=('a',))
4374        del a           # triggers callback for a
4375        gc.collect()  # For PyPy or other GCs.
4376
4377        b = Foo()
4378        close_b = util.Finalize(b, conn.send, args=('b',))
4379        close_b()       # triggers callback for b
4380        close_b()       # does nothing because callback has already been called
4381        del b           # does nothing because callback has already been called
4382        gc.collect()  # For PyPy or other GCs.
4383
4384        c = Foo()
4385        util.Finalize(c, conn.send, args=('c',))
4386
4387        d10 = Foo()
4388        util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
4389
4390        d01 = Foo()
4391        util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
4392        d02 = Foo()
4393        util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
4394        d03 = Foo()
4395        util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
4396
4397        util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
4398
4399        util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
4400
4401        # call multiprocessing's cleanup function then exit process without
4402        # garbage collecting locals
4403        util._exit_function()
4404        conn.close()
4405        os._exit(0)
4406
4407    def test_finalize(self):
4408        conn, child_conn = self.Pipe()
4409
4410        p = self.Process(target=self._test_finalize, args=(child_conn,))
4411        p.daemon = True
4412        p.start()
4413        p.join()
4414
4415        result = [obj for obj in iter(conn.recv, 'STOP')]
4416        self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
4417
4418    def test_thread_safety(self):
4419        # bpo-24484: _run_finalizers() should be thread-safe
4420        def cb():
4421            pass
4422
4423        class Foo(object):
4424            def __init__(self):
4425                self.ref = self  # create reference cycle
4426                # insert finalizer at random key
4427                util.Finalize(self, cb, exitpriority=random.randint(1, 100))
4428
4429        finish = False
4430        exc = None
4431
4432        def run_finalizers():
4433            nonlocal exc
4434            while not finish:
4435                time.sleep(random.random() * 1e-1)
4436                try:
4437                    # A GC run will eventually happen during this,
4438                    # collecting stale Foo's and mutating the registry
4439                    util._run_finalizers()
4440                except Exception as e:
4441                    exc = e
4442
4443        def make_finalizers():
4444            nonlocal exc
4445            d = {}
4446            while not finish:
4447                try:
4448                    # Old Foo's get gradually replaced and later
4449                    # collected by the GC (because of the cyclic ref)
4450                    d[random.getrandbits(5)] = {Foo() for i in range(10)}
4451                except Exception as e:
4452                    exc = e
4453                    d.clear()
4454
4455        old_interval = sys.getswitchinterval()
4456        old_threshold = gc.get_threshold()
4457        try:
4458            sys.setswitchinterval(1e-6)
4459            gc.set_threshold(5, 5, 5)
4460            threads = [threading.Thread(target=run_finalizers),
4461                       threading.Thread(target=make_finalizers)]
4462            with threading_helper.start_threads(threads):
4463                time.sleep(4.0)  # Wait a bit to trigger race condition
4464                finish = True
4465            if exc is not None:
4466                raise exc
4467        finally:
4468            sys.setswitchinterval(old_interval)
4469            gc.set_threshold(*old_threshold)
4470            gc.collect()  # Collect remaining Foo's
4471
4472
4473#
4474# Test that from ... import * works for each module
4475#
4476
4477class _TestImportStar(unittest.TestCase):
4478
4479    def get_module_names(self):
4480        import glob
4481        folder = os.path.dirname(multiprocessing.__file__)
4482        pattern = os.path.join(glob.escape(folder), '*.py')
4483        files = glob.glob(pattern)
4484        modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files]
4485        modules = ['multiprocessing.' + m for m in modules]
4486        modules.remove('multiprocessing.__init__')
4487        modules.append('multiprocessing')
4488        return modules
4489
4490    def test_import(self):
4491        modules = self.get_module_names()
4492        if sys.platform == 'win32':
4493            modules.remove('multiprocessing.popen_fork')
4494            modules.remove('multiprocessing.popen_forkserver')
4495            modules.remove('multiprocessing.popen_spawn_posix')
4496        else:
4497            modules.remove('multiprocessing.popen_spawn_win32')
4498            if not HAS_REDUCTION:
4499                modules.remove('multiprocessing.popen_forkserver')
4500
4501        if c_int is None:
4502            # This module requires _ctypes
4503            modules.remove('multiprocessing.sharedctypes')
4504
4505        for name in modules:
4506            __import__(name)
4507            mod = sys.modules[name]
4508            self.assertTrue(hasattr(mod, '__all__'), name)
4509
4510            for attr in mod.__all__:
4511                self.assertTrue(
4512                    hasattr(mod, attr),
4513                    '%r does not have attribute %r' % (mod, attr)
4514                    )
4515
4516#
4517# Quick test that logging works -- does not test logging output
4518#
4519
4520class _TestLogging(BaseTestCase):
4521
4522    ALLOWED_TYPES = ('processes',)
4523
4524    def test_enable_logging(self):
4525        logger = multiprocessing.get_logger()
4526        logger.setLevel(util.SUBWARNING)
4527        self.assertTrue(logger is not None)
4528        logger.debug('this will not be printed')
4529        logger.info('nor will this')
4530        logger.setLevel(LOG_LEVEL)
4531
4532    @classmethod
4533    def _test_level(cls, conn):
4534        logger = multiprocessing.get_logger()
4535        conn.send(logger.getEffectiveLevel())
4536
4537    def test_level(self):
4538        LEVEL1 = 32
4539        LEVEL2 = 37
4540
4541        logger = multiprocessing.get_logger()
4542        root_logger = logging.getLogger()
4543        root_level = root_logger.level
4544
4545        reader, writer = multiprocessing.Pipe(duplex=False)
4546
4547        logger.setLevel(LEVEL1)
4548        p = self.Process(target=self._test_level, args=(writer,))
4549        p.start()
4550        self.assertEqual(LEVEL1, reader.recv())
4551        p.join()
4552        p.close()
4553
4554        logger.setLevel(logging.NOTSET)
4555        root_logger.setLevel(LEVEL2)
4556        p = self.Process(target=self._test_level, args=(writer,))
4557        p.start()
4558        self.assertEqual(LEVEL2, reader.recv())
4559        p.join()
4560        p.close()
4561
4562        root_logger.setLevel(root_level)
4563        logger.setLevel(level=LOG_LEVEL)
4564
4565
4566# class _TestLoggingProcessName(BaseTestCase):
4567#
4568#     def handle(self, record):
4569#         assert record.processName == multiprocessing.current_process().name
4570#         self.__handled = True
4571#
4572#     def test_logging(self):
4573#         handler = logging.Handler()
4574#         handler.handle = self.handle
4575#         self.__handled = False
4576#         # Bypass getLogger() and side-effects
4577#         logger = logging.getLoggerClass()(
4578#                 'multiprocessing.test.TestLoggingProcessName')
4579#         logger.addHandler(handler)
4580#         logger.propagate = False
4581#
4582#         logger.warn('foo')
4583#         assert self.__handled
4584
4585#
4586# Check that Process.join() retries if os.waitpid() fails with EINTR
4587#
4588
4589class _TestPollEintr(BaseTestCase):
4590
4591    ALLOWED_TYPES = ('processes',)
4592
4593    @classmethod
4594    def _killer(cls, pid):
4595        time.sleep(0.1)
4596        os.kill(pid, signal.SIGUSR1)
4597
4598    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
4599    def test_poll_eintr(self):
4600        got_signal = [False]
4601        def record(*args):
4602            got_signal[0] = True
4603        pid = os.getpid()
4604        oldhandler = signal.signal(signal.SIGUSR1, record)
4605        try:
4606            killer = self.Process(target=self._killer, args=(pid,))
4607            killer.start()
4608            try:
4609                p = self.Process(target=time.sleep, args=(2,))
4610                p.start()
4611                p.join()
4612            finally:
4613                killer.join()
4614            self.assertTrue(got_signal[0])
4615            self.assertEqual(p.exitcode, 0)
4616        finally:
4617            signal.signal(signal.SIGUSR1, oldhandler)
4618
4619#
4620# Test to verify handle verification, see issue 3321
4621#
4622
4623class TestInvalidHandle(unittest.TestCase):
4624
4625    @unittest.skipIf(WIN32, "skipped on Windows")
4626    def test_invalid_handles(self):
4627        conn = multiprocessing.connection.Connection(44977608)
4628        # check that poll() doesn't crash
4629        try:
4630            conn.poll()
4631        except (ValueError, OSError):
4632            pass
4633        finally:
4634            # Hack private attribute _handle to avoid printing an error
4635            # in conn.__del__
4636            conn._handle = None
4637        self.assertRaises((ValueError, OSError),
4638                          multiprocessing.connection.Connection, -1)
4639
4640
4641
4642@hashlib_helper.requires_hashdigest('md5')
4643class OtherTest(unittest.TestCase):
4644    # TODO: add more tests for deliver/answer challenge.
4645    def test_deliver_challenge_auth_failure(self):
4646        class _FakeConnection(object):
4647            def recv_bytes(self, size):
4648                return b'something bogus'
4649            def send_bytes(self, data):
4650                pass
4651        self.assertRaises(multiprocessing.AuthenticationError,
4652                          multiprocessing.connection.deliver_challenge,
4653                          _FakeConnection(), b'abc')
4654
4655    def test_answer_challenge_auth_failure(self):
4656        class _FakeConnection(object):
4657            def __init__(self):
4658                self.count = 0
4659            def recv_bytes(self, size):
4660                self.count += 1
4661                if self.count == 1:
4662                    return multiprocessing.connection.CHALLENGE
4663                elif self.count == 2:
4664                    return b'something bogus'
4665                return b''
4666            def send_bytes(self, data):
4667                pass
4668        self.assertRaises(multiprocessing.AuthenticationError,
4669                          multiprocessing.connection.answer_challenge,
4670                          _FakeConnection(), b'abc')
4671
4672#
4673# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
4674#
4675
4676def initializer(ns):
4677    ns.test += 1
4678
4679@hashlib_helper.requires_hashdigest('md5')
4680class TestInitializers(unittest.TestCase):
4681    def setUp(self):
4682        self.mgr = multiprocessing.Manager()
4683        self.ns = self.mgr.Namespace()
4684        self.ns.test = 0
4685
4686    def tearDown(self):
4687        self.mgr.shutdown()
4688        self.mgr.join()
4689
4690    def test_manager_initializer(self):
4691        m = multiprocessing.managers.SyncManager()
4692        self.assertRaises(TypeError, m.start, 1)
4693        m.start(initializer, (self.ns,))
4694        self.assertEqual(self.ns.test, 1)
4695        m.shutdown()
4696        m.join()
4697
4698    def test_pool_initializer(self):
4699        self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
4700        p = multiprocessing.Pool(1, initializer, (self.ns,))
4701        p.close()
4702        p.join()
4703        self.assertEqual(self.ns.test, 1)
4704
4705#
4706# Issue 5155, 5313, 5331: Test process in processes
4707# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
4708#
4709
4710def _this_sub_process(q):
4711    try:
4712        item = q.get(block=False)
4713    except pyqueue.Empty:
4714        pass
4715
4716def _test_process():
4717    queue = multiprocessing.Queue()
4718    subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
4719    subProc.daemon = True
4720    subProc.start()
4721    subProc.join()
4722
4723def _afunc(x):
4724    return x*x
4725
4726def pool_in_process():
4727    pool = multiprocessing.Pool(processes=4)
4728    x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
4729    pool.close()
4730    pool.join()
4731
4732class _file_like(object):
4733    def __init__(self, delegate):
4734        self._delegate = delegate
4735        self._pid = None
4736
4737    @property
4738    def cache(self):
4739        pid = os.getpid()
4740        # There are no race conditions since fork keeps only the running thread
4741        if pid != self._pid:
4742            self._pid = pid
4743            self._cache = []
4744        return self._cache
4745
4746    def write(self, data):
4747        self.cache.append(data)
4748
4749    def flush(self):
4750        self._delegate.write(''.join(self.cache))
4751        self._cache = []
4752
4753class TestStdinBadfiledescriptor(unittest.TestCase):
4754
4755    def test_queue_in_process(self):
4756        proc = multiprocessing.Process(target=_test_process)
4757        proc.start()
4758        proc.join()
4759
4760    def test_pool_in_process(self):
4761        p = multiprocessing.Process(target=pool_in_process)
4762        p.start()
4763        p.join()
4764
4765    def test_flushing(self):
4766        sio = io.StringIO()
4767        flike = _file_like(sio)
4768        flike.write('foo')
4769        proc = multiprocessing.Process(target=lambda: flike.flush())
4770        flike.flush()
4771        assert sio.getvalue() == 'foo'
4772
4773
4774class TestWait(unittest.TestCase):
4775
4776    @classmethod
4777    def _child_test_wait(cls, w, slow):
4778        for i in range(10):
4779            if slow:
4780                time.sleep(random.random()*0.1)
4781            w.send((i, os.getpid()))
4782        w.close()
4783
4784    def test_wait(self, slow=False):
4785        from multiprocessing.connection import wait
4786        readers = []
4787        procs = []
4788        messages = []
4789
4790        for i in range(4):
4791            r, w = multiprocessing.Pipe(duplex=False)
4792            p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
4793            p.daemon = True
4794            p.start()
4795            w.close()
4796            readers.append(r)
4797            procs.append(p)
4798            self.addCleanup(p.join)
4799
4800        while readers:
4801            for r in wait(readers):
4802                try:
4803                    msg = r.recv()
4804                except EOFError:
4805                    readers.remove(r)
4806                    r.close()
4807                else:
4808                    messages.append(msg)
4809
4810        messages.sort()
4811        expected = sorted((i, p.pid) for i in range(10) for p in procs)
4812        self.assertEqual(messages, expected)
4813
4814    @classmethod
4815    def _child_test_wait_socket(cls, address, slow):
4816        s = socket.socket()
4817        s.connect(address)
4818        for i in range(10):
4819            if slow:
4820                time.sleep(random.random()*0.1)
4821            s.sendall(('%s\n' % i).encode('ascii'))
4822        s.close()
4823
4824    def test_wait_socket(self, slow=False):
4825        from multiprocessing.connection import wait
4826        l = socket.create_server((socket_helper.HOST, 0))
4827        addr = l.getsockname()
4828        readers = []
4829        procs = []
4830        dic = {}
4831
4832        for i in range(4):
4833            p = multiprocessing.Process(target=self._child_test_wait_socket,
4834                                        args=(addr, slow))
4835            p.daemon = True
4836            p.start()
4837            procs.append(p)
4838            self.addCleanup(p.join)
4839
4840        for i in range(4):
4841            r, _ = l.accept()
4842            readers.append(r)
4843            dic[r] = []
4844        l.close()
4845
4846        while readers:
4847            for r in wait(readers):
4848                msg = r.recv(32)
4849                if not msg:
4850                    readers.remove(r)
4851                    r.close()
4852                else:
4853                    dic[r].append(msg)
4854
4855        expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
4856        for v in dic.values():
4857            self.assertEqual(b''.join(v), expected)
4858
4859    def test_wait_slow(self):
4860        self.test_wait(True)
4861
4862    def test_wait_socket_slow(self):
4863        self.test_wait_socket(True)
4864
4865    def test_wait_timeout(self):
4866        from multiprocessing.connection import wait
4867
4868        expected = 5
4869        a, b = multiprocessing.Pipe()
4870
4871        start = time.monotonic()
4872        res = wait([a, b], expected)
4873        delta = time.monotonic() - start
4874
4875        self.assertEqual(res, [])
4876        self.assertLess(delta, expected * 2)
4877        self.assertGreater(delta, expected * 0.5)
4878
4879        b.send(None)
4880
4881        start = time.monotonic()
4882        res = wait([a, b], 20)
4883        delta = time.monotonic() - start
4884
4885        self.assertEqual(res, [a])
4886        self.assertLess(delta, 0.4)
4887
4888    @classmethod
4889    def signal_and_sleep(cls, sem, period):
4890        sem.release()
4891        time.sleep(period)
4892
4893    def test_wait_integer(self):
4894        from multiprocessing.connection import wait
4895
4896        expected = 3
4897        sorted_ = lambda l: sorted(l, key=lambda x: id(x))
4898        sem = multiprocessing.Semaphore(0)
4899        a, b = multiprocessing.Pipe()
4900        p = multiprocessing.Process(target=self.signal_and_sleep,
4901                                    args=(sem, expected))
4902
4903        p.start()
4904        self.assertIsInstance(p.sentinel, int)
4905        self.assertTrue(sem.acquire(timeout=20))
4906
4907        start = time.monotonic()
4908        res = wait([a, p.sentinel, b], expected + 20)
4909        delta = time.monotonic() - start
4910
4911        self.assertEqual(res, [p.sentinel])
4912        self.assertLess(delta, expected + 2)
4913        self.assertGreater(delta, expected - 2)
4914
4915        a.send(None)
4916
4917        start = time.monotonic()
4918        res = wait([a, p.sentinel, b], 20)
4919        delta = time.monotonic() - start
4920
4921        self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
4922        self.assertLess(delta, 0.4)
4923
4924        b.send(None)
4925
4926        start = time.monotonic()
4927        res = wait([a, p.sentinel, b], 20)
4928        delta = time.monotonic() - start
4929
4930        self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
4931        self.assertLess(delta, 0.4)
4932
4933        p.terminate()
4934        p.join()
4935
4936    def test_neg_timeout(self):
4937        from multiprocessing.connection import wait
4938        a, b = multiprocessing.Pipe()
4939        t = time.monotonic()
4940        res = wait([a], timeout=-1)
4941        t = time.monotonic() - t
4942        self.assertEqual(res, [])
4943        self.assertLess(t, 1)
4944        a.close()
4945        b.close()
4946
4947#
4948# Issue 14151: Test invalid family on invalid environment
4949#
4950
4951class TestInvalidFamily(unittest.TestCase):
4952
4953    @unittest.skipIf(WIN32, "skipped on Windows")
4954    def test_invalid_family(self):
4955        with self.assertRaises(ValueError):
4956            multiprocessing.connection.Listener(r'\\.\test')
4957
4958    @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
4959    def test_invalid_family_win32(self):
4960        with self.assertRaises(ValueError):
4961            multiprocessing.connection.Listener('/var/test.pipe')
4962
4963#
4964# Issue 12098: check sys.flags of child matches that for parent
4965#
4966
4967class TestFlags(unittest.TestCase):
4968    @classmethod
4969    def run_in_grandchild(cls, conn):
4970        conn.send(tuple(sys.flags))
4971
4972    @classmethod
4973    def run_in_child(cls):
4974        import json
4975        r, w = multiprocessing.Pipe(duplex=False)
4976        p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
4977        p.start()
4978        grandchild_flags = r.recv()
4979        p.join()
4980        r.close()
4981        w.close()
4982        flags = (tuple(sys.flags), grandchild_flags)
4983        print(json.dumps(flags))
4984
4985    def test_flags(self):
4986        import json
4987        # start child process using unusual flags
4988        prog = ('from test._test_multiprocessing import TestFlags; ' +
4989                'TestFlags.run_in_child()')
4990        data = subprocess.check_output(
4991            [sys.executable, '-E', '-S', '-O', '-c', prog])
4992        child_flags, grandchild_flags = json.loads(data.decode('ascii'))
4993        self.assertEqual(child_flags, grandchild_flags)
4994
4995#
4996# Test interaction with socket timeouts - see Issue #6056
4997#
4998
4999class TestTimeouts(unittest.TestCase):
5000    @classmethod
5001    def _test_timeout(cls, child, address):
5002        time.sleep(1)
5003        child.send(123)
5004        child.close()
5005        conn = multiprocessing.connection.Client(address)
5006        conn.send(456)
5007        conn.close()
5008
5009    def test_timeout(self):
5010        old_timeout = socket.getdefaulttimeout()
5011        try:
5012            socket.setdefaulttimeout(0.1)
5013            parent, child = multiprocessing.Pipe(duplex=True)
5014            l = multiprocessing.connection.Listener(family='AF_INET')
5015            p = multiprocessing.Process(target=self._test_timeout,
5016                                        args=(child, l.address))
5017            p.start()
5018            child.close()
5019            self.assertEqual(parent.recv(), 123)
5020            parent.close()
5021            conn = l.accept()
5022            self.assertEqual(conn.recv(), 456)
5023            conn.close()
5024            l.close()
5025            join_process(p)
5026        finally:
5027            socket.setdefaulttimeout(old_timeout)
5028
5029#
5030# Test what happens with no "if __name__ == '__main__'"
5031#
5032
5033class TestNoForkBomb(unittest.TestCase):
5034    def test_noforkbomb(self):
5035        sm = multiprocessing.get_start_method()
5036        name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
5037        if sm != 'fork':
5038            rc, out, err = test.support.script_helper.assert_python_failure(name, sm)
5039            self.assertEqual(out, b'')
5040            self.assertIn(b'RuntimeError', err)
5041        else:
5042            rc, out, err = test.support.script_helper.assert_python_ok(name, sm)
5043            self.assertEqual(out.rstrip(), b'123')
5044            self.assertEqual(err, b'')
5045
5046#
5047# Issue #17555: ForkAwareThreadLock
5048#
5049
5050class TestForkAwareThreadLock(unittest.TestCase):
5051    # We recursively start processes.  Issue #17555 meant that the
5052    # after fork registry would get duplicate entries for the same
5053    # lock.  The size of the registry at generation n was ~2**n.
5054
5055    @classmethod
5056    def child(cls, n, conn):
5057        if n > 1:
5058            p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
5059            p.start()
5060            conn.close()
5061            join_process(p)
5062        else:
5063            conn.send(len(util._afterfork_registry))
5064        conn.close()
5065
5066    def test_lock(self):
5067        r, w = multiprocessing.Pipe(False)
5068        l = util.ForkAwareThreadLock()
5069        old_size = len(util._afterfork_registry)
5070        p = multiprocessing.Process(target=self.child, args=(5, w))
5071        p.start()
5072        w.close()
5073        new_size = r.recv()
5074        join_process(p)
5075        self.assertLessEqual(new_size, old_size)
5076
5077#
5078# Check that non-forked child processes do not inherit unneeded fds/handles
5079#
5080
5081class TestCloseFds(unittest.TestCase):
5082
5083    def get_high_socket_fd(self):
5084        if WIN32:
5085            # The child process will not have any socket handles, so
5086            # calling socket.fromfd() should produce WSAENOTSOCK even
5087            # if there is a handle of the same number.
5088            return socket.socket().detach()
5089        else:
5090            # We want to produce a socket with an fd high enough that a
5091            # freshly created child process will not have any fds as high.
5092            fd = socket.socket().detach()
5093            to_close = []
5094            while fd < 50:
5095                to_close.append(fd)
5096                fd = os.dup(fd)
5097            for x in to_close:
5098                os.close(x)
5099            return fd
5100
5101    def close(self, fd):
5102        if WIN32:
5103            socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd).close()
5104        else:
5105            os.close(fd)
5106
5107    @classmethod
5108    def _test_closefds(cls, conn, fd):
5109        try:
5110            s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
5111        except Exception as e:
5112            conn.send(e)
5113        else:
5114            s.close()
5115            conn.send(None)
5116
5117    def test_closefd(self):
5118        if not HAS_REDUCTION:
5119            raise unittest.SkipTest('requires fd pickling')
5120
5121        reader, writer = multiprocessing.Pipe()
5122        fd = self.get_high_socket_fd()
5123        try:
5124            p = multiprocessing.Process(target=self._test_closefds,
5125                                        args=(writer, fd))
5126            p.start()
5127            writer.close()
5128            e = reader.recv()
5129            join_process(p)
5130        finally:
5131            self.close(fd)
5132            writer.close()
5133            reader.close()
5134
5135        if multiprocessing.get_start_method() == 'fork':
5136            self.assertIs(e, None)
5137        else:
5138            WSAENOTSOCK = 10038
5139            self.assertIsInstance(e, OSError)
5140            self.assertTrue(e.errno == errno.EBADF or
5141                            e.winerror == WSAENOTSOCK, e)
5142
5143#
5144# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
5145#
5146
5147class TestIgnoreEINTR(unittest.TestCase):
5148
5149    # Sending CONN_MAX_SIZE bytes into a multiprocessing pipe must block
5150    CONN_MAX_SIZE = max(support.PIPE_MAX_SIZE, support.SOCK_MAX_SIZE)
5151
5152    @classmethod
5153    def _test_ignore(cls, conn):
5154        def handler(signum, frame):
5155            pass
5156        signal.signal(signal.SIGUSR1, handler)
5157        conn.send('ready')
5158        x = conn.recv()
5159        conn.send(x)
5160        conn.send_bytes(b'x' * cls.CONN_MAX_SIZE)
5161
5162    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
5163    def test_ignore(self):
5164        conn, child_conn = multiprocessing.Pipe()
5165        try:
5166            p = multiprocessing.Process(target=self._test_ignore,
5167                                        args=(child_conn,))
5168            p.daemon = True
5169            p.start()
5170            child_conn.close()
5171            self.assertEqual(conn.recv(), 'ready')
5172            time.sleep(0.1)
5173            os.kill(p.pid, signal.SIGUSR1)
5174            time.sleep(0.1)
5175            conn.send(1234)
5176            self.assertEqual(conn.recv(), 1234)
5177            time.sleep(0.1)
5178            os.kill(p.pid, signal.SIGUSR1)
5179            self.assertEqual(conn.recv_bytes(), b'x' * self.CONN_MAX_SIZE)
5180            time.sleep(0.1)
5181            p.join()
5182        finally:
5183            conn.close()
5184
5185    @classmethod
5186    def _test_ignore_listener(cls, conn):
5187        def handler(signum, frame):
5188            pass
5189        signal.signal(signal.SIGUSR1, handler)
5190        with multiprocessing.connection.Listener() as l:
5191            conn.send(l.address)
5192            a = l.accept()
5193            a.send('welcome')
5194
5195    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
5196    def test_ignore_listener(self):
5197        conn, child_conn = multiprocessing.Pipe()
5198        try:
5199            p = multiprocessing.Process(target=self._test_ignore_listener,
5200                                        args=(child_conn,))
5201            p.daemon = True
5202            p.start()
5203            child_conn.close()
5204            address = conn.recv()
5205            time.sleep(0.1)
5206            os.kill(p.pid, signal.SIGUSR1)
5207            time.sleep(0.1)
5208            client = multiprocessing.connection.Client(address)
5209            self.assertEqual(client.recv(), 'welcome')
5210            p.join()
5211        finally:
5212            conn.close()
5213
5214class TestStartMethod(unittest.TestCase):
5215    @classmethod
5216    def _check_context(cls, conn):
5217        conn.send(multiprocessing.get_start_method())
5218
5219    def check_context(self, ctx):
5220        r, w = ctx.Pipe(duplex=False)
5221        p = ctx.Process(target=self._check_context, args=(w,))
5222        p.start()
5223        w.close()
5224        child_method = r.recv()
5225        r.close()
5226        p.join()
5227        self.assertEqual(child_method, ctx.get_start_method())
5228
5229    def test_context(self):
5230        for method in ('fork', 'spawn', 'forkserver'):
5231            try:
5232                ctx = multiprocessing.get_context(method)
5233            except ValueError:
5234                continue
5235            self.assertEqual(ctx.get_start_method(), method)
5236            self.assertIs(ctx.get_context(), ctx)
5237            self.assertRaises(ValueError, ctx.set_start_method, 'spawn')
5238            self.assertRaises(ValueError, ctx.set_start_method, None)
5239            self.check_context(ctx)
5240
5241    def test_set_get(self):
5242        multiprocessing.set_forkserver_preload(PRELOAD)
5243        count = 0
5244        old_method = multiprocessing.get_start_method()
5245        try:
5246            for method in ('fork', 'spawn', 'forkserver'):
5247                try:
5248                    multiprocessing.set_start_method(method, force=True)
5249                except ValueError:
5250                    continue
5251                self.assertEqual(multiprocessing.get_start_method(), method)
5252                ctx = multiprocessing.get_context()
5253                self.assertEqual(ctx.get_start_method(), method)
5254                self.assertTrue(type(ctx).__name__.lower().startswith(method))
5255                self.assertTrue(
5256                    ctx.Process.__name__.lower().startswith(method))
5257                self.check_context(multiprocessing)
5258                count += 1
5259        finally:
5260            multiprocessing.set_start_method(old_method, force=True)
5261        self.assertGreaterEqual(count, 1)
5262
5263    def test_get_all(self):
5264        methods = multiprocessing.get_all_start_methods()
5265        if sys.platform == 'win32':
5266            self.assertEqual(methods, ['spawn'])
5267        else:
5268            self.assertTrue(methods == ['fork', 'spawn'] or
5269                            methods == ['spawn', 'fork'] or
5270                            methods == ['fork', 'spawn', 'forkserver'] or
5271                            methods == ['spawn', 'fork', 'forkserver'])
5272
5273    def test_preload_resources(self):
5274        if multiprocessing.get_start_method() != 'forkserver':
5275            self.skipTest("test only relevant for 'forkserver' method")
5276        name = os.path.join(os.path.dirname(__file__), 'mp_preload.py')
5277        rc, out, err = test.support.script_helper.assert_python_ok(name)
5278        out = out.decode()
5279        err = err.decode()
5280        if out.rstrip() != 'ok' or err != '':
5281            print(out)
5282            print(err)
5283            self.fail("failed spawning forkserver or grandchild")
5284
5285
5286@unittest.skipIf(sys.platform == "win32",
5287                 "test semantics don't make sense on Windows")
5288class TestResourceTracker(unittest.TestCase):
5289
5290    def test_resource_tracker(self):
5291        #
5292        # Check that killing process does not leak named semaphores
5293        #
5294        cmd = '''if 1:
5295            import time, os, tempfile
5296            import multiprocessing as mp
5297            from multiprocessing import resource_tracker
5298            from multiprocessing.shared_memory import SharedMemory
5299
5300            mp.set_start_method("spawn")
5301            rand = tempfile._RandomNameSequence()
5302
5303
5304            def create_and_register_resource(rtype):
5305                if rtype == "semaphore":
5306                    lock = mp.Lock()
5307                    return lock, lock._semlock.name
5308                elif rtype == "shared_memory":
5309                    sm = SharedMemory(create=True, size=10)
5310                    return sm, sm._name
5311                else:
5312                    raise ValueError(
5313                        "Resource type {{}} not understood".format(rtype))
5314
5315
5316            resource1, rname1 = create_and_register_resource("{rtype}")
5317            resource2, rname2 = create_and_register_resource("{rtype}")
5318
5319            os.write({w}, rname1.encode("ascii") + b"\\n")
5320            os.write({w}, rname2.encode("ascii") + b"\\n")
5321
5322            time.sleep(10)
5323        '''
5324        for rtype in resource_tracker._CLEANUP_FUNCS:
5325            with self.subTest(rtype=rtype):
5326                if rtype == "noop":
5327                    # Artefact resource type used by the resource_tracker
5328                    continue
5329                r, w = os.pipe()
5330                p = subprocess.Popen([sys.executable,
5331                                     '-E', '-c', cmd.format(w=w, rtype=rtype)],
5332                                     pass_fds=[w],
5333                                     stderr=subprocess.PIPE)
5334                os.close(w)
5335                with open(r, 'rb', closefd=True) as f:
5336                    name1 = f.readline().rstrip().decode('ascii')
5337                    name2 = f.readline().rstrip().decode('ascii')
5338                _resource_unlink(name1, rtype)
5339                p.terminate()
5340                p.wait()
5341
5342                deadline = time.monotonic() + support.LONG_TIMEOUT
5343                while time.monotonic() < deadline:
5344                    time.sleep(.5)
5345                    try:
5346                        _resource_unlink(name2, rtype)
5347                    except OSError as e:
5348                        # docs say it should be ENOENT, but OSX seems to give
5349                        # EINVAL
5350                        self.assertIn(e.errno, (errno.ENOENT, errno.EINVAL))
5351                        break
5352                else:
5353                    raise AssertionError(
5354                        f"A {rtype} resource was leaked after a process was "
5355                        f"abruptly terminated.")
5356                err = p.stderr.read().decode('utf-8')
5357                p.stderr.close()
5358                expected = ('resource_tracker: There appear to be 2 leaked {} '
5359                            'objects'.format(
5360                            rtype))
5361                self.assertRegex(err, expected)
5362                self.assertRegex(err, r'resource_tracker: %r: \[Errno' % name1)
5363
5364    def check_resource_tracker_death(self, signum, should_die):
5365        # bpo-31310: if the semaphore tracker process has died, it should
5366        # be restarted implicitly.
5367        from multiprocessing.resource_tracker import _resource_tracker
5368        pid = _resource_tracker._pid
5369        if pid is not None:
5370            os.kill(pid, signal.SIGKILL)
5371            support.wait_process(pid, exitcode=-signal.SIGKILL)
5372        with warnings.catch_warnings():
5373            warnings.simplefilter("ignore")
5374            _resource_tracker.ensure_running()
5375        pid = _resource_tracker._pid
5376
5377        os.kill(pid, signum)
5378        time.sleep(1.0)  # give it time to die
5379
5380        ctx = multiprocessing.get_context("spawn")
5381        with warnings.catch_warnings(record=True) as all_warn:
5382            warnings.simplefilter("always")
5383            sem = ctx.Semaphore()
5384            sem.acquire()
5385            sem.release()
5386            wr = weakref.ref(sem)
5387            # ensure `sem` gets collected, which triggers communication with
5388            # the semaphore tracker
5389            del sem
5390            gc.collect()
5391            self.assertIsNone(wr())
5392            if should_die:
5393                self.assertEqual(len(all_warn), 1)
5394                the_warn = all_warn[0]
5395                self.assertTrue(issubclass(the_warn.category, UserWarning))
5396                self.assertTrue("resource_tracker: process died"
5397                                in str(the_warn.message))
5398            else:
5399                self.assertEqual(len(all_warn), 0)
5400
5401    def test_resource_tracker_sigint(self):
5402        # Catchable signal (ignored by semaphore tracker)
5403        self.check_resource_tracker_death(signal.SIGINT, False)
5404
5405    def test_resource_tracker_sigterm(self):
5406        # Catchable signal (ignored by semaphore tracker)
5407        self.check_resource_tracker_death(signal.SIGTERM, False)
5408
5409    def test_resource_tracker_sigkill(self):
5410        # Uncatchable signal.
5411        self.check_resource_tracker_death(signal.SIGKILL, True)
5412
5413    @staticmethod
5414    def _is_resource_tracker_reused(conn, pid):
5415        from multiprocessing.resource_tracker import _resource_tracker
5416        _resource_tracker.ensure_running()
5417        # The pid should be None in the child process, expect for the fork
5418        # context. It should not be a new value.
5419        reused = _resource_tracker._pid in (None, pid)
5420        reused &= _resource_tracker._check_alive()
5421        conn.send(reused)
5422
5423    def test_resource_tracker_reused(self):
5424        from multiprocessing.resource_tracker import _resource_tracker
5425        _resource_tracker.ensure_running()
5426        pid = _resource_tracker._pid
5427
5428        r, w = multiprocessing.Pipe(duplex=False)
5429        p = multiprocessing.Process(target=self._is_resource_tracker_reused,
5430                                    args=(w, pid))
5431        p.start()
5432        is_resource_tracker_reused = r.recv()
5433
5434        # Clean up
5435        p.join()
5436        w.close()
5437        r.close()
5438
5439        self.assertTrue(is_resource_tracker_reused)
5440
5441    def test_too_long_name_resource(self):
5442        # gh-96819: Resource names that will make the length of a write to a pipe
5443        # greater than PIPE_BUF are not allowed
5444        rtype = "shared_memory"
5445        too_long_name_resource = "a" * (512 - len(rtype))
5446        with self.assertRaises(ValueError):
5447            resource_tracker.register(too_long_name_resource, rtype)
5448
5449
5450class TestSimpleQueue(unittest.TestCase):
5451
5452    @classmethod
5453    def _test_empty(cls, queue, child_can_start, parent_can_continue):
5454        child_can_start.wait()
5455        # issue 30301, could fail under spawn and forkserver
5456        try:
5457            queue.put(queue.empty())
5458            queue.put(queue.empty())
5459        finally:
5460            parent_can_continue.set()
5461
5462    def test_empty(self):
5463        queue = multiprocessing.SimpleQueue()
5464        child_can_start = multiprocessing.Event()
5465        parent_can_continue = multiprocessing.Event()
5466
5467        proc = multiprocessing.Process(
5468            target=self._test_empty,
5469            args=(queue, child_can_start, parent_can_continue)
5470        )
5471        proc.daemon = True
5472        proc.start()
5473
5474        self.assertTrue(queue.empty())
5475
5476        child_can_start.set()
5477        parent_can_continue.wait()
5478
5479        self.assertFalse(queue.empty())
5480        self.assertEqual(queue.get(), True)
5481        self.assertEqual(queue.get(), False)
5482        self.assertTrue(queue.empty())
5483
5484        proc.join()
5485
5486    def test_close(self):
5487        queue = multiprocessing.SimpleQueue()
5488        queue.close()
5489        # closing a queue twice should not fail
5490        queue.close()
5491
5492    # Test specific to CPython since it tests private attributes
5493    @test.support.cpython_only
5494    def test_closed(self):
5495        queue = multiprocessing.SimpleQueue()
5496        queue.close()
5497        self.assertTrue(queue._reader.closed)
5498        self.assertTrue(queue._writer.closed)
5499
5500
5501class TestPoolNotLeakOnFailure(unittest.TestCase):
5502
5503    def test_release_unused_processes(self):
5504        # Issue #19675: During pool creation, if we can't create a process,
5505        # don't leak already created ones.
5506        will_fail_in = 3
5507        forked_processes = []
5508
5509        class FailingForkProcess:
5510            def __init__(self, **kwargs):
5511                self.name = 'Fake Process'
5512                self.exitcode = None
5513                self.state = None
5514                forked_processes.append(self)
5515
5516            def start(self):
5517                nonlocal will_fail_in
5518                if will_fail_in <= 0:
5519                    raise OSError("Manually induced OSError")
5520                will_fail_in -= 1
5521                self.state = 'started'
5522
5523            def terminate(self):
5524                self.state = 'stopping'
5525
5526            def join(self):
5527                if self.state == 'stopping':
5528                    self.state = 'stopped'
5529
5530            def is_alive(self):
5531                return self.state == 'started' or self.state == 'stopping'
5532
5533        with self.assertRaisesRegex(OSError, 'Manually induced OSError'):
5534            p = multiprocessing.pool.Pool(5, context=unittest.mock.MagicMock(
5535                Process=FailingForkProcess))
5536            p.close()
5537            p.join()
5538        self.assertFalse(
5539            any(process.is_alive() for process in forked_processes))
5540
5541
5542@hashlib_helper.requires_hashdigest('md5')
5543class TestSyncManagerTypes(unittest.TestCase):
5544    """Test all the types which can be shared between a parent and a
5545    child process by using a manager which acts as an intermediary
5546    between them.
5547
5548    In the following unit-tests the base type is created in the parent
5549    process, the @classmethod represents the worker process and the
5550    shared object is readable and editable between the two.
5551
5552    # The child.
5553    @classmethod
5554    def _test_list(cls, obj):
5555        assert obj[0] == 5
5556        assert obj.append(6)
5557
5558    # The parent.
5559    def test_list(self):
5560        o = self.manager.list()
5561        o.append(5)
5562        self.run_worker(self._test_list, o)
5563        assert o[1] == 6
5564    """
5565    manager_class = multiprocessing.managers.SyncManager
5566
5567    def setUp(self):
5568        self.manager = self.manager_class()
5569        self.manager.start()
5570        self.proc = None
5571
5572    def tearDown(self):
5573        if self.proc is not None and self.proc.is_alive():
5574            self.proc.terminate()
5575            self.proc.join()
5576        self.manager.shutdown()
5577        self.manager = None
5578        self.proc = None
5579
5580    @classmethod
5581    def setUpClass(cls):
5582        support.reap_children()
5583
5584    tearDownClass = setUpClass
5585
5586    def wait_proc_exit(self):
5587        # Only the manager process should be returned by active_children()
5588        # but this can take a bit on slow machines, so wait a few seconds
5589        # if there are other children too (see #17395).
5590        join_process(self.proc)
5591        start_time = time.monotonic()
5592        t = 0.01
5593        while len(multiprocessing.active_children()) > 1:
5594            time.sleep(t)
5595            t *= 2
5596            dt = time.monotonic() - start_time
5597            if dt >= 5.0:
5598                test.support.environment_altered = True
5599                support.print_warning(f"multiprocessing.Manager still has "
5600                                      f"{multiprocessing.active_children()} "
5601                                      f"active children after {dt} seconds")
5602                break
5603
5604    def run_worker(self, worker, obj):
5605        self.proc = multiprocessing.Process(target=worker, args=(obj, ))
5606        self.proc.daemon = True
5607        self.proc.start()
5608        self.wait_proc_exit()
5609        self.assertEqual(self.proc.exitcode, 0)
5610
5611    @classmethod
5612    def _test_event(cls, obj):
5613        assert obj.is_set()
5614        obj.wait()
5615        obj.clear()
5616        obj.wait(0.001)
5617
5618    def test_event(self):
5619        o = self.manager.Event()
5620        o.set()
5621        self.run_worker(self._test_event, o)
5622        assert not o.is_set()
5623        o.wait(0.001)
5624
5625    @classmethod
5626    def _test_lock(cls, obj):
5627        obj.acquire()
5628
5629    def test_lock(self, lname="Lock"):
5630        o = getattr(self.manager, lname)()
5631        self.run_worker(self._test_lock, o)
5632        o.release()
5633        self.assertRaises(RuntimeError, o.release)  # already released
5634
5635    @classmethod
5636    def _test_rlock(cls, obj):
5637        obj.acquire()
5638        obj.release()
5639
5640    def test_rlock(self, lname="Lock"):
5641        o = getattr(self.manager, lname)()
5642        self.run_worker(self._test_rlock, o)
5643
5644    @classmethod
5645    def _test_semaphore(cls, obj):
5646        obj.acquire()
5647
5648    def test_semaphore(self, sname="Semaphore"):
5649        o = getattr(self.manager, sname)()
5650        self.run_worker(self._test_semaphore, o)
5651        o.release()
5652
5653    def test_bounded_semaphore(self):
5654        self.test_semaphore(sname="BoundedSemaphore")
5655
5656    @classmethod
5657    def _test_condition(cls, obj):
5658        obj.acquire()
5659        obj.release()
5660
5661    def test_condition(self):
5662        o = self.manager.Condition()
5663        self.run_worker(self._test_condition, o)
5664
5665    @classmethod
5666    def _test_barrier(cls, obj):
5667        assert obj.parties == 5
5668        obj.reset()
5669
5670    def test_barrier(self):
5671        o = self.manager.Barrier(5)
5672        self.run_worker(self._test_barrier, o)
5673
5674    @classmethod
5675    def _test_pool(cls, obj):
5676        # TODO: fix https://bugs.python.org/issue35919
5677        with obj:
5678            pass
5679
5680    def test_pool(self):
5681        o = self.manager.Pool(processes=4)
5682        self.run_worker(self._test_pool, o)
5683
5684    @classmethod
5685    def _test_queue(cls, obj):
5686        assert obj.qsize() == 2
5687        assert obj.full()
5688        assert not obj.empty()
5689        assert obj.get() == 5
5690        assert not obj.empty()
5691        assert obj.get() == 6
5692        assert obj.empty()
5693
5694    def test_queue(self, qname="Queue"):
5695        o = getattr(self.manager, qname)(2)
5696        o.put(5)
5697        o.put(6)
5698        self.run_worker(self._test_queue, o)
5699        assert o.empty()
5700        assert not o.full()
5701
5702    def test_joinable_queue(self):
5703        self.test_queue("JoinableQueue")
5704
5705    @classmethod
5706    def _test_list(cls, obj):
5707        assert obj[0] == 5
5708        assert obj.count(5) == 1
5709        assert obj.index(5) == 0
5710        obj.sort()
5711        obj.reverse()
5712        for x in obj:
5713            pass
5714        assert len(obj) == 1
5715        assert obj.pop(0) == 5
5716
5717    def test_list(self):
5718        o = self.manager.list()
5719        o.append(5)
5720        self.run_worker(self._test_list, o)
5721        assert not o
5722        self.assertEqual(len(o), 0)
5723
5724    @classmethod
5725    def _test_dict(cls, obj):
5726        assert len(obj) == 1
5727        assert obj['foo'] == 5
5728        assert obj.get('foo') == 5
5729        assert list(obj.items()) == [('foo', 5)]
5730        assert list(obj.keys()) == ['foo']
5731        assert list(obj.values()) == [5]
5732        assert obj.copy() == {'foo': 5}
5733        assert obj.popitem() == ('foo', 5)
5734
5735    def test_dict(self):
5736        o = self.manager.dict()
5737        o['foo'] = 5
5738        self.run_worker(self._test_dict, o)
5739        assert not o
5740        self.assertEqual(len(o), 0)
5741
5742    @classmethod
5743    def _test_value(cls, obj):
5744        assert obj.value == 1
5745        assert obj.get() == 1
5746        obj.set(2)
5747
5748    def test_value(self):
5749        o = self.manager.Value('i', 1)
5750        self.run_worker(self._test_value, o)
5751        self.assertEqual(o.value, 2)
5752        self.assertEqual(o.get(), 2)
5753
5754    @classmethod
5755    def _test_array(cls, obj):
5756        assert obj[0] == 0
5757        assert obj[1] == 1
5758        assert len(obj) == 2
5759        assert list(obj) == [0, 1]
5760
5761    def test_array(self):
5762        o = self.manager.Array('i', [0, 1])
5763        self.run_worker(self._test_array, o)
5764
5765    @classmethod
5766    def _test_namespace(cls, obj):
5767        assert obj.x == 0
5768        assert obj.y == 1
5769
5770    def test_namespace(self):
5771        o = self.manager.Namespace()
5772        o.x = 0
5773        o.y = 1
5774        self.run_worker(self._test_namespace, o)
5775
5776
5777class TestNamedResource(unittest.TestCase):
5778    def test_global_named_resource_spawn(self):
5779        #
5780        # gh-90549: Check that global named resources in main module
5781        # will not leak by a subprocess, in spawn context.
5782        #
5783        testfn = os_helper.TESTFN
5784        self.addCleanup(os_helper.unlink, testfn)
5785        with open(testfn, 'w', encoding='utf-8') as f:
5786            f.write(textwrap.dedent('''\
5787                import multiprocessing as mp
5788
5789                ctx = mp.get_context('spawn')
5790
5791                global_resource = ctx.Semaphore()
5792
5793                def submain(): pass
5794
5795                if __name__ == '__main__':
5796                    p = ctx.Process(target=submain)
5797                    p.start()
5798                    p.join()
5799            '''))
5800        rc, out, err = test.support.script_helper.assert_python_ok(testfn)
5801        # on error, err = 'UserWarning: resource_tracker: There appear to
5802        # be 1 leaked semaphore objects to clean up at shutdown'
5803        self.assertEqual(err, b'')
5804
5805
5806class MiscTestCase(unittest.TestCase):
5807    def test__all__(self):
5808        # Just make sure names in not_exported are excluded
5809        support.check__all__(self, multiprocessing, extra=multiprocessing.__all__,
5810                             not_exported=['SUBDEBUG', 'SUBWARNING'])
5811
5812
5813#
5814# Mixins
5815#
5816
5817class BaseMixin(object):
5818    @classmethod
5819    def setUpClass(cls):
5820        cls.dangling = (multiprocessing.process._dangling.copy(),
5821                        threading._dangling.copy())
5822
5823    @classmethod
5824    def tearDownClass(cls):
5825        # bpo-26762: Some multiprocessing objects like Pool create reference
5826        # cycles. Trigger a garbage collection to break these cycles.
5827        test.support.gc_collect()
5828
5829        processes = set(multiprocessing.process._dangling) - set(cls.dangling[0])
5830        if processes:
5831            test.support.environment_altered = True
5832            support.print_warning(f'Dangling processes: {processes}')
5833        processes = None
5834
5835        threads = set(threading._dangling) - set(cls.dangling[1])
5836        if threads:
5837            test.support.environment_altered = True
5838            support.print_warning(f'Dangling threads: {threads}')
5839        threads = None
5840
5841
5842class ProcessesMixin(BaseMixin):
5843    TYPE = 'processes'
5844    Process = multiprocessing.Process
5845    connection = multiprocessing.connection
5846    current_process = staticmethod(multiprocessing.current_process)
5847    parent_process = staticmethod(multiprocessing.parent_process)
5848    active_children = staticmethod(multiprocessing.active_children)
5849    set_executable = staticmethod(multiprocessing.set_executable)
5850    Pool = staticmethod(multiprocessing.Pool)
5851    Pipe = staticmethod(multiprocessing.Pipe)
5852    Queue = staticmethod(multiprocessing.Queue)
5853    JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
5854    Lock = staticmethod(multiprocessing.Lock)
5855    RLock = staticmethod(multiprocessing.RLock)
5856    Semaphore = staticmethod(multiprocessing.Semaphore)
5857    BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
5858    Condition = staticmethod(multiprocessing.Condition)
5859    Event = staticmethod(multiprocessing.Event)
5860    Barrier = staticmethod(multiprocessing.Barrier)
5861    Value = staticmethod(multiprocessing.Value)
5862    Array = staticmethod(multiprocessing.Array)
5863    RawValue = staticmethod(multiprocessing.RawValue)
5864    RawArray = staticmethod(multiprocessing.RawArray)
5865
5866
5867class ManagerMixin(BaseMixin):
5868    TYPE = 'manager'
5869    Process = multiprocessing.Process
5870    Queue = property(operator.attrgetter('manager.Queue'))
5871    JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
5872    Lock = property(operator.attrgetter('manager.Lock'))
5873    RLock = property(operator.attrgetter('manager.RLock'))
5874    Semaphore = property(operator.attrgetter('manager.Semaphore'))
5875    BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
5876    Condition = property(operator.attrgetter('manager.Condition'))
5877    Event = property(operator.attrgetter('manager.Event'))
5878    Barrier = property(operator.attrgetter('manager.Barrier'))
5879    Value = property(operator.attrgetter('manager.Value'))
5880    Array = property(operator.attrgetter('manager.Array'))
5881    list = property(operator.attrgetter('manager.list'))
5882    dict = property(operator.attrgetter('manager.dict'))
5883    Namespace = property(operator.attrgetter('manager.Namespace'))
5884
5885    @classmethod
5886    def Pool(cls, *args, **kwds):
5887        return cls.manager.Pool(*args, **kwds)
5888
5889    @classmethod
5890    def setUpClass(cls):
5891        super().setUpClass()
5892        cls.manager = multiprocessing.Manager()
5893
5894    @classmethod
5895    def tearDownClass(cls):
5896        # only the manager process should be returned by active_children()
5897        # but this can take a bit on slow machines, so wait a few seconds
5898        # if there are other children too (see #17395)
5899        start_time = time.monotonic()
5900        t = 0.01
5901        while len(multiprocessing.active_children()) > 1:
5902            time.sleep(t)
5903            t *= 2
5904            dt = time.monotonic() - start_time
5905            if dt >= 5.0:
5906                test.support.environment_altered = True
5907                support.print_warning(f"multiprocessing.Manager still has "
5908                                      f"{multiprocessing.active_children()} "
5909                                      f"active children after {dt} seconds")
5910                break
5911
5912        gc.collect()                       # do garbage collection
5913        if cls.manager._number_of_objects() != 0:
5914            # This is not really an error since some tests do not
5915            # ensure that all processes which hold a reference to a
5916            # managed object have been joined.
5917            test.support.environment_altered = True
5918            support.print_warning('Shared objects which still exist '
5919                                  'at manager shutdown:')
5920            support.print_warning(cls.manager._debug_info())
5921        cls.manager.shutdown()
5922        cls.manager.join()
5923        cls.manager = None
5924
5925        super().tearDownClass()
5926
5927
5928class ThreadsMixin(BaseMixin):
5929    TYPE = 'threads'
5930    Process = multiprocessing.dummy.Process
5931    connection = multiprocessing.dummy.connection
5932    current_process = staticmethod(multiprocessing.dummy.current_process)
5933    active_children = staticmethod(multiprocessing.dummy.active_children)
5934    Pool = staticmethod(multiprocessing.dummy.Pool)
5935    Pipe = staticmethod(multiprocessing.dummy.Pipe)
5936    Queue = staticmethod(multiprocessing.dummy.Queue)
5937    JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
5938    Lock = staticmethod(multiprocessing.dummy.Lock)
5939    RLock = staticmethod(multiprocessing.dummy.RLock)
5940    Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
5941    BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
5942    Condition = staticmethod(multiprocessing.dummy.Condition)
5943    Event = staticmethod(multiprocessing.dummy.Event)
5944    Barrier = staticmethod(multiprocessing.dummy.Barrier)
5945    Value = staticmethod(multiprocessing.dummy.Value)
5946    Array = staticmethod(multiprocessing.dummy.Array)
5947
5948#
5949# Functions used to create test cases from the base ones in this module
5950#
5951
5952def install_tests_in_module_dict(remote_globs, start_method):
5953    __module__ = remote_globs['__name__']
5954    local_globs = globals()
5955    ALL_TYPES = {'processes', 'threads', 'manager'}
5956
5957    for name, base in local_globs.items():
5958        if not isinstance(base, type):
5959            continue
5960        if issubclass(base, BaseTestCase):
5961            if base is BaseTestCase:
5962                continue
5963            assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES
5964            for type_ in base.ALLOWED_TYPES:
5965                newname = 'With' + type_.capitalize() + name[1:]
5966                Mixin = local_globs[type_.capitalize() + 'Mixin']
5967                class Temp(base, Mixin, unittest.TestCase):
5968                    pass
5969                if type_ == 'manager':
5970                    Temp = hashlib_helper.requires_hashdigest('md5')(Temp)
5971                Temp.__name__ = Temp.__qualname__ = newname
5972                Temp.__module__ = __module__
5973                remote_globs[newname] = Temp
5974        elif issubclass(base, unittest.TestCase):
5975            class Temp(base, object):
5976                pass
5977            Temp.__name__ = Temp.__qualname__ = name
5978            Temp.__module__ = __module__
5979            remote_globs[name] = Temp
5980
5981    dangling = [None, None]
5982    old_start_method = [None]
5983
5984    def setUpModule():
5985        multiprocessing.set_forkserver_preload(PRELOAD)
5986        multiprocessing.process._cleanup()
5987        dangling[0] = multiprocessing.process._dangling.copy()
5988        dangling[1] = threading._dangling.copy()
5989        old_start_method[0] = multiprocessing.get_start_method(allow_none=True)
5990        try:
5991            multiprocessing.set_start_method(start_method, force=True)
5992        except ValueError:
5993            raise unittest.SkipTest(start_method +
5994                                    ' start method not supported')
5995
5996        if sys.platform.startswith("linux"):
5997            try:
5998                lock = multiprocessing.RLock()
5999            except OSError:
6000                raise unittest.SkipTest("OSError raises on RLock creation, "
6001                                        "see issue 3111!")
6002        check_enough_semaphores()
6003        util.get_temp_dir()     # creates temp directory
6004        multiprocessing.get_logger().setLevel(LOG_LEVEL)
6005
6006    def tearDownModule():
6007        need_sleep = False
6008
6009        # bpo-26762: Some multiprocessing objects like Pool create reference
6010        # cycles. Trigger a garbage collection to break these cycles.
6011        test.support.gc_collect()
6012
6013        multiprocessing.set_start_method(old_start_method[0], force=True)
6014        # pause a bit so we don't get warning about dangling threads/processes
6015        processes = set(multiprocessing.process._dangling) - set(dangling[0])
6016        if processes:
6017            need_sleep = True
6018            test.support.environment_altered = True
6019            support.print_warning(f'Dangling processes: {processes}')
6020        processes = None
6021
6022        threads = set(threading._dangling) - set(dangling[1])
6023        if threads:
6024            need_sleep = True
6025            test.support.environment_altered = True
6026            support.print_warning(f'Dangling threads: {threads}')
6027        threads = None
6028
6029        # Sleep 500 ms to give time to child processes to complete.
6030        if need_sleep:
6031            time.sleep(0.5)
6032
6033        multiprocessing.util._cleanup_tests()
6034
6035    remote_globs['setUpModule'] = setUpModule
6036    remote_globs['tearDownModule'] = tearDownModule
6037
6038
6039@unittest.skipIf(not hasattr(_multiprocessing, 'SemLock'), 'SemLock not available')
6040@unittest.skipIf(sys.platform != "linux", "Linux only")
6041class SemLockTests(unittest.TestCase):
6042
6043    def test_semlock_subclass(self):
6044        class SemLock(_multiprocessing.SemLock):
6045            pass
6046        name = f'test_semlock_subclass-{os.getpid()}'
6047        s = SemLock(1, 0, 10, name, False)
6048        _multiprocessing.sem_unlink(name)
6049