1# 2# Unit tests for the multiprocessing package 3# 4 5import unittest 6import unittest.mock 7import queue as pyqueue 8import textwrap 9import time 10import io 11import itertools 12import sys 13import os 14import gc 15import errno 16import signal 17import array 18import socket 19import random 20import logging 21import subprocess 22import struct 23import operator 24import pathlib 25import pickle 26import weakref 27import warnings 28import test.support 29import test.support.script_helper 30from test import support 31from test.support import hashlib_helper 32from test.support import import_helper 33from test.support import os_helper 34from test.support import socket_helper 35from test.support import threading_helper 36from test.support import warnings_helper 37 38 39# Skip tests if _multiprocessing wasn't built. 40_multiprocessing = import_helper.import_module('_multiprocessing') 41# Skip tests if sem_open implementation is broken. 42support.skip_if_broken_multiprocessing_synchronize() 43import threading 44 45import multiprocessing.connection 46import multiprocessing.dummy 47import multiprocessing.heap 48import multiprocessing.managers 49import multiprocessing.pool 50import multiprocessing.queues 51 52from multiprocessing import util 53 54try: 55 from multiprocessing import reduction 56 HAS_REDUCTION = reduction.HAVE_SEND_HANDLE 57except ImportError: 58 HAS_REDUCTION = False 59 60try: 61 from multiprocessing.sharedctypes import Value, copy 62 HAS_SHAREDCTYPES = True 63except ImportError: 64 HAS_SHAREDCTYPES = False 65 66try: 67 from multiprocessing import shared_memory 68 HAS_SHMEM = True 69except ImportError: 70 HAS_SHMEM = False 71 72try: 73 import msvcrt 74except ImportError: 75 msvcrt = None 76 77 78if support.check_sanitizer(address=True): 79 # bpo-45200: Skip multiprocessing tests if Python is built with ASAN to 80 # work around a libasan race condition: dead lock in pthread_create(). 81 raise unittest.SkipTest("libasan has a pthread_create() dead lock") 82 83 84def latin(s): 85 return s.encode('latin') 86 87 88def close_queue(queue): 89 if isinstance(queue, multiprocessing.queues.Queue): 90 queue.close() 91 queue.join_thread() 92 93 94def join_process(process): 95 # Since multiprocessing.Process has the same API than threading.Thread 96 # (join() and is_alive(), the support function can be reused 97 threading_helper.join_thread(process) 98 99 100if os.name == "posix": 101 from multiprocessing import resource_tracker 102 103 def _resource_unlink(name, rtype): 104 resource_tracker._CLEANUP_FUNCS[rtype](name) 105 106 107# 108# Constants 109# 110 111LOG_LEVEL = util.SUBWARNING 112#LOG_LEVEL = logging.DEBUG 113 114DELTA = 0.1 115CHECK_TIMINGS = False # making true makes tests take a lot longer 116 # and can sometimes cause some non-serious 117 # failures because some calls block a bit 118 # longer than expected 119if CHECK_TIMINGS: 120 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4 121else: 122 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1 123 124# BaseManager.shutdown_timeout 125SHUTDOWN_TIMEOUT = support.SHORT_TIMEOUT 126 127HAVE_GETVALUE = not getattr(_multiprocessing, 128 'HAVE_BROKEN_SEM_GETVALUE', False) 129 130WIN32 = (sys.platform == "win32") 131 132from multiprocessing.connection import wait 133 134def wait_for_handle(handle, timeout): 135 if timeout is not None and timeout < 0.0: 136 timeout = None 137 return wait([handle], timeout) 138 139try: 140 MAXFD = os.sysconf("SC_OPEN_MAX") 141except: 142 MAXFD = 256 143 144# To speed up tests when using the forkserver, we can preload these: 145PRELOAD = ['__main__', 'test.test_multiprocessing_forkserver'] 146 147# 148# Some tests require ctypes 149# 150 151try: 152 from ctypes import Structure, c_int, c_double, c_longlong 153except ImportError: 154 Structure = object 155 c_int = c_double = c_longlong = None 156 157 158def check_enough_semaphores(): 159 """Check that the system supports enough semaphores to run the test.""" 160 # minimum number of semaphores available according to POSIX 161 nsems_min = 256 162 try: 163 nsems = os.sysconf("SC_SEM_NSEMS_MAX") 164 except (AttributeError, ValueError): 165 # sysconf not available or setting not available 166 return 167 if nsems == -1 or nsems >= nsems_min: 168 return 169 raise unittest.SkipTest("The OS doesn't support enough semaphores " 170 "to run the test (required: %d)." % nsems_min) 171 172 173# 174# Creates a wrapper for a function which records the time it takes to finish 175# 176 177class TimingWrapper(object): 178 179 def __init__(self, func): 180 self.func = func 181 self.elapsed = None 182 183 def __call__(self, *args, **kwds): 184 t = time.monotonic() 185 try: 186 return self.func(*args, **kwds) 187 finally: 188 self.elapsed = time.monotonic() - t 189 190# 191# Base class for test cases 192# 193 194class BaseTestCase(object): 195 196 ALLOWED_TYPES = ('processes', 'manager', 'threads') 197 198 def assertTimingAlmostEqual(self, a, b): 199 if CHECK_TIMINGS: 200 self.assertAlmostEqual(a, b, 1) 201 202 def assertReturnsIfImplemented(self, value, func, *args): 203 try: 204 res = func(*args) 205 except NotImplementedError: 206 pass 207 else: 208 return self.assertEqual(value, res) 209 210 # For the sanity of Windows users, rather than crashing or freezing in 211 # multiple ways. 212 def __reduce__(self, *args): 213 raise NotImplementedError("shouldn't try to pickle a test case") 214 215 __reduce_ex__ = __reduce__ 216 217# 218# Return the value of a semaphore 219# 220 221def get_value(self): 222 try: 223 return self.get_value() 224 except AttributeError: 225 try: 226 return self._Semaphore__value 227 except AttributeError: 228 try: 229 return self._value 230 except AttributeError: 231 raise NotImplementedError 232 233# 234# Testcases 235# 236 237class DummyCallable: 238 def __call__(self, q, c): 239 assert isinstance(c, DummyCallable) 240 q.put(5) 241 242 243class _TestProcess(BaseTestCase): 244 245 ALLOWED_TYPES = ('processes', 'threads') 246 247 def test_current(self): 248 if self.TYPE == 'threads': 249 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 250 251 current = self.current_process() 252 authkey = current.authkey 253 254 self.assertTrue(current.is_alive()) 255 self.assertTrue(not current.daemon) 256 self.assertIsInstance(authkey, bytes) 257 self.assertTrue(len(authkey) > 0) 258 self.assertEqual(current.ident, os.getpid()) 259 self.assertEqual(current.exitcode, None) 260 261 def test_set_executable(self): 262 if self.TYPE == 'threads': 263 self.skipTest(f'test not appropriate for {self.TYPE}') 264 paths = [ 265 sys.executable, # str 266 sys.executable.encode(), # bytes 267 pathlib.Path(sys.executable) # os.PathLike 268 ] 269 for path in paths: 270 self.set_executable(path) 271 p = self.Process() 272 p.start() 273 p.join() 274 self.assertEqual(p.exitcode, 0) 275 276 def test_args_argument(self): 277 # bpo-45735: Using list or tuple as *args* in constructor could 278 # achieve the same effect. 279 args_cases = (1, "str", [1], (1,)) 280 args_types = (list, tuple) 281 282 test_cases = itertools.product(args_cases, args_types) 283 284 for args, args_type in test_cases: 285 with self.subTest(args=args, args_type=args_type): 286 q = self.Queue(1) 287 # pass a tuple or list as args 288 p = self.Process(target=self._test_args, args=args_type((q, args))) 289 p.daemon = True 290 p.start() 291 child_args = q.get() 292 self.assertEqual(child_args, args) 293 p.join() 294 close_queue(q) 295 296 @classmethod 297 def _test_args(cls, q, arg): 298 q.put(arg) 299 300 def test_daemon_argument(self): 301 if self.TYPE == "threads": 302 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 303 304 # By default uses the current process's daemon flag. 305 proc0 = self.Process(target=self._test) 306 self.assertEqual(proc0.daemon, self.current_process().daemon) 307 proc1 = self.Process(target=self._test, daemon=True) 308 self.assertTrue(proc1.daemon) 309 proc2 = self.Process(target=self._test, daemon=False) 310 self.assertFalse(proc2.daemon) 311 312 @classmethod 313 def _test(cls, q, *args, **kwds): 314 current = cls.current_process() 315 q.put(args) 316 q.put(kwds) 317 q.put(current.name) 318 if cls.TYPE != 'threads': 319 q.put(bytes(current.authkey)) 320 q.put(current.pid) 321 322 def test_parent_process_attributes(self): 323 if self.TYPE == "threads": 324 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 325 326 self.assertIsNone(self.parent_process()) 327 328 rconn, wconn = self.Pipe(duplex=False) 329 p = self.Process(target=self._test_send_parent_process, args=(wconn,)) 330 p.start() 331 p.join() 332 parent_pid, parent_name = rconn.recv() 333 self.assertEqual(parent_pid, self.current_process().pid) 334 self.assertEqual(parent_pid, os.getpid()) 335 self.assertEqual(parent_name, self.current_process().name) 336 337 @classmethod 338 def _test_send_parent_process(cls, wconn): 339 from multiprocessing.process import parent_process 340 wconn.send([parent_process().pid, parent_process().name]) 341 342 def test_parent_process(self): 343 if self.TYPE == "threads": 344 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 345 346 # Launch a child process. Make it launch a grandchild process. Kill the 347 # child process and make sure that the grandchild notices the death of 348 # its parent (a.k.a the child process). 349 rconn, wconn = self.Pipe(duplex=False) 350 p = self.Process( 351 target=self._test_create_grandchild_process, args=(wconn, )) 352 p.start() 353 354 if not rconn.poll(timeout=support.LONG_TIMEOUT): 355 raise AssertionError("Could not communicate with child process") 356 parent_process_status = rconn.recv() 357 self.assertEqual(parent_process_status, "alive") 358 359 p.terminate() 360 p.join() 361 362 if not rconn.poll(timeout=support.LONG_TIMEOUT): 363 raise AssertionError("Could not communicate with child process") 364 parent_process_status = rconn.recv() 365 self.assertEqual(parent_process_status, "not alive") 366 367 @classmethod 368 def _test_create_grandchild_process(cls, wconn): 369 p = cls.Process(target=cls._test_report_parent_status, args=(wconn, )) 370 p.start() 371 time.sleep(300) 372 373 @classmethod 374 def _test_report_parent_status(cls, wconn): 375 from multiprocessing.process import parent_process 376 wconn.send("alive" if parent_process().is_alive() else "not alive") 377 parent_process().join(timeout=support.SHORT_TIMEOUT) 378 wconn.send("alive" if parent_process().is_alive() else "not alive") 379 380 def test_process(self): 381 q = self.Queue(1) 382 e = self.Event() 383 args = (q, 1, 2) 384 kwargs = {'hello':23, 'bye':2.54} 385 name = 'SomeProcess' 386 p = self.Process( 387 target=self._test, args=args, kwargs=kwargs, name=name 388 ) 389 p.daemon = True 390 current = self.current_process() 391 392 if self.TYPE != 'threads': 393 self.assertEqual(p.authkey, current.authkey) 394 self.assertEqual(p.is_alive(), False) 395 self.assertEqual(p.daemon, True) 396 self.assertNotIn(p, self.active_children()) 397 self.assertTrue(type(self.active_children()) is list) 398 self.assertEqual(p.exitcode, None) 399 400 p.start() 401 402 self.assertEqual(p.exitcode, None) 403 self.assertEqual(p.is_alive(), True) 404 self.assertIn(p, self.active_children()) 405 406 self.assertEqual(q.get(), args[1:]) 407 self.assertEqual(q.get(), kwargs) 408 self.assertEqual(q.get(), p.name) 409 if self.TYPE != 'threads': 410 self.assertEqual(q.get(), current.authkey) 411 self.assertEqual(q.get(), p.pid) 412 413 p.join() 414 415 self.assertEqual(p.exitcode, 0) 416 self.assertEqual(p.is_alive(), False) 417 self.assertNotIn(p, self.active_children()) 418 close_queue(q) 419 420 @unittest.skipUnless(threading._HAVE_THREAD_NATIVE_ID, "needs native_id") 421 def test_process_mainthread_native_id(self): 422 if self.TYPE == 'threads': 423 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 424 425 current_mainthread_native_id = threading.main_thread().native_id 426 427 q = self.Queue(1) 428 p = self.Process(target=self._test_process_mainthread_native_id, args=(q,)) 429 p.start() 430 431 child_mainthread_native_id = q.get() 432 p.join() 433 close_queue(q) 434 435 self.assertNotEqual(current_mainthread_native_id, child_mainthread_native_id) 436 437 @classmethod 438 def _test_process_mainthread_native_id(cls, q): 439 mainthread_native_id = threading.main_thread().native_id 440 q.put(mainthread_native_id) 441 442 @classmethod 443 def _sleep_some(cls): 444 time.sleep(100) 445 446 @classmethod 447 def _test_sleep(cls, delay): 448 time.sleep(delay) 449 450 def _kill_process(self, meth): 451 if self.TYPE == 'threads': 452 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 453 454 p = self.Process(target=self._sleep_some) 455 p.daemon = True 456 p.start() 457 458 self.assertEqual(p.is_alive(), True) 459 self.assertIn(p, self.active_children()) 460 self.assertEqual(p.exitcode, None) 461 462 join = TimingWrapper(p.join) 463 464 self.assertEqual(join(0), None) 465 self.assertTimingAlmostEqual(join.elapsed, 0.0) 466 self.assertEqual(p.is_alive(), True) 467 468 self.assertEqual(join(-1), None) 469 self.assertTimingAlmostEqual(join.elapsed, 0.0) 470 self.assertEqual(p.is_alive(), True) 471 472 # XXX maybe terminating too soon causes the problems on Gentoo... 473 time.sleep(1) 474 475 meth(p) 476 477 if hasattr(signal, 'alarm'): 478 # On the Gentoo buildbot waitpid() often seems to block forever. 479 # We use alarm() to interrupt it if it blocks for too long. 480 def handler(*args): 481 raise RuntimeError('join took too long: %s' % p) 482 old_handler = signal.signal(signal.SIGALRM, handler) 483 try: 484 signal.alarm(10) 485 self.assertEqual(join(), None) 486 finally: 487 signal.alarm(0) 488 signal.signal(signal.SIGALRM, old_handler) 489 else: 490 self.assertEqual(join(), None) 491 492 self.assertTimingAlmostEqual(join.elapsed, 0.0) 493 494 self.assertEqual(p.is_alive(), False) 495 self.assertNotIn(p, self.active_children()) 496 497 p.join() 498 499 return p.exitcode 500 501 def test_terminate(self): 502 exitcode = self._kill_process(multiprocessing.Process.terminate) 503 if os.name != 'nt': 504 self.assertEqual(exitcode, -signal.SIGTERM) 505 506 def test_kill(self): 507 exitcode = self._kill_process(multiprocessing.Process.kill) 508 if os.name != 'nt': 509 self.assertEqual(exitcode, -signal.SIGKILL) 510 511 def test_cpu_count(self): 512 try: 513 cpus = multiprocessing.cpu_count() 514 except NotImplementedError: 515 cpus = 1 516 self.assertTrue(type(cpus) is int) 517 self.assertTrue(cpus >= 1) 518 519 def test_active_children(self): 520 self.assertEqual(type(self.active_children()), list) 521 522 p = self.Process(target=time.sleep, args=(DELTA,)) 523 self.assertNotIn(p, self.active_children()) 524 525 p.daemon = True 526 p.start() 527 self.assertIn(p, self.active_children()) 528 529 p.join() 530 self.assertNotIn(p, self.active_children()) 531 532 @classmethod 533 def _test_recursion(cls, wconn, id): 534 wconn.send(id) 535 if len(id) < 2: 536 for i in range(2): 537 p = cls.Process( 538 target=cls._test_recursion, args=(wconn, id+[i]) 539 ) 540 p.start() 541 p.join() 542 543 def test_recursion(self): 544 rconn, wconn = self.Pipe(duplex=False) 545 self._test_recursion(wconn, []) 546 547 time.sleep(DELTA) 548 result = [] 549 while rconn.poll(): 550 result.append(rconn.recv()) 551 552 expected = [ 553 [], 554 [0], 555 [0, 0], 556 [0, 1], 557 [1], 558 [1, 0], 559 [1, 1] 560 ] 561 self.assertEqual(result, expected) 562 563 @classmethod 564 def _test_sentinel(cls, event): 565 event.wait(10.0) 566 567 def test_sentinel(self): 568 if self.TYPE == "threads": 569 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 570 event = self.Event() 571 p = self.Process(target=self._test_sentinel, args=(event,)) 572 with self.assertRaises(ValueError): 573 p.sentinel 574 p.start() 575 self.addCleanup(p.join) 576 sentinel = p.sentinel 577 self.assertIsInstance(sentinel, int) 578 self.assertFalse(wait_for_handle(sentinel, timeout=0.0)) 579 event.set() 580 p.join() 581 self.assertTrue(wait_for_handle(sentinel, timeout=1)) 582 583 @classmethod 584 def _test_close(cls, rc=0, q=None): 585 if q is not None: 586 q.get() 587 sys.exit(rc) 588 589 def test_close(self): 590 if self.TYPE == "threads": 591 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 592 q = self.Queue() 593 p = self.Process(target=self._test_close, kwargs={'q': q}) 594 p.daemon = True 595 p.start() 596 self.assertEqual(p.is_alive(), True) 597 # Child is still alive, cannot close 598 with self.assertRaises(ValueError): 599 p.close() 600 601 q.put(None) 602 p.join() 603 self.assertEqual(p.is_alive(), False) 604 self.assertEqual(p.exitcode, 0) 605 p.close() 606 with self.assertRaises(ValueError): 607 p.is_alive() 608 with self.assertRaises(ValueError): 609 p.join() 610 with self.assertRaises(ValueError): 611 p.terminate() 612 p.close() 613 614 wr = weakref.ref(p) 615 del p 616 gc.collect() 617 self.assertIs(wr(), None) 618 619 close_queue(q) 620 621 def test_many_processes(self): 622 if self.TYPE == 'threads': 623 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 624 625 sm = multiprocessing.get_start_method() 626 N = 5 if sm == 'spawn' else 100 627 628 # Try to overwhelm the forkserver loop with events 629 procs = [self.Process(target=self._test_sleep, args=(0.01,)) 630 for i in range(N)] 631 for p in procs: 632 p.start() 633 for p in procs: 634 join_process(p) 635 for p in procs: 636 self.assertEqual(p.exitcode, 0) 637 638 procs = [self.Process(target=self._sleep_some) 639 for i in range(N)] 640 for p in procs: 641 p.start() 642 time.sleep(0.001) # let the children start... 643 for p in procs: 644 p.terminate() 645 for p in procs: 646 join_process(p) 647 if os.name != 'nt': 648 exitcodes = [-signal.SIGTERM] 649 if sys.platform == 'darwin': 650 # bpo-31510: On macOS, killing a freshly started process with 651 # SIGTERM sometimes kills the process with SIGKILL. 652 exitcodes.append(-signal.SIGKILL) 653 for p in procs: 654 self.assertIn(p.exitcode, exitcodes) 655 656 def test_lose_target_ref(self): 657 c = DummyCallable() 658 wr = weakref.ref(c) 659 q = self.Queue() 660 p = self.Process(target=c, args=(q, c)) 661 del c 662 p.start() 663 p.join() 664 gc.collect() # For PyPy or other GCs. 665 self.assertIs(wr(), None) 666 self.assertEqual(q.get(), 5) 667 close_queue(q) 668 669 @classmethod 670 def _test_child_fd_inflation(self, evt, q): 671 q.put(os_helper.fd_count()) 672 evt.wait() 673 674 def test_child_fd_inflation(self): 675 # Number of fds in child processes should not grow with the 676 # number of running children. 677 if self.TYPE == 'threads': 678 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 679 680 sm = multiprocessing.get_start_method() 681 if sm == 'fork': 682 # The fork method by design inherits all fds from the parent, 683 # trying to go against it is a lost battle 684 self.skipTest('test not appropriate for {}'.format(sm)) 685 686 N = 5 687 evt = self.Event() 688 q = self.Queue() 689 690 procs = [self.Process(target=self._test_child_fd_inflation, args=(evt, q)) 691 for i in range(N)] 692 for p in procs: 693 p.start() 694 695 try: 696 fd_counts = [q.get() for i in range(N)] 697 self.assertEqual(len(set(fd_counts)), 1, fd_counts) 698 699 finally: 700 evt.set() 701 for p in procs: 702 p.join() 703 close_queue(q) 704 705 @classmethod 706 def _test_wait_for_threads(self, evt): 707 def func1(): 708 time.sleep(0.5) 709 evt.set() 710 711 def func2(): 712 time.sleep(20) 713 evt.clear() 714 715 threading.Thread(target=func1).start() 716 threading.Thread(target=func2, daemon=True).start() 717 718 def test_wait_for_threads(self): 719 # A child process should wait for non-daemonic threads to end 720 # before exiting 721 if self.TYPE == 'threads': 722 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 723 724 evt = self.Event() 725 proc = self.Process(target=self._test_wait_for_threads, args=(evt,)) 726 proc.start() 727 proc.join() 728 self.assertTrue(evt.is_set()) 729 730 @classmethod 731 def _test_error_on_stdio_flush(self, evt, break_std_streams={}): 732 for stream_name, action in break_std_streams.items(): 733 if action == 'close': 734 stream = io.StringIO() 735 stream.close() 736 else: 737 assert action == 'remove' 738 stream = None 739 setattr(sys, stream_name, None) 740 evt.set() 741 742 def test_error_on_stdio_flush_1(self): 743 # Check that Process works with broken standard streams 744 streams = [io.StringIO(), None] 745 streams[0].close() 746 for stream_name in ('stdout', 'stderr'): 747 for stream in streams: 748 old_stream = getattr(sys, stream_name) 749 setattr(sys, stream_name, stream) 750 try: 751 evt = self.Event() 752 proc = self.Process(target=self._test_error_on_stdio_flush, 753 args=(evt,)) 754 proc.start() 755 proc.join() 756 self.assertTrue(evt.is_set()) 757 self.assertEqual(proc.exitcode, 0) 758 finally: 759 setattr(sys, stream_name, old_stream) 760 761 def test_error_on_stdio_flush_2(self): 762 # Same as test_error_on_stdio_flush_1(), but standard streams are 763 # broken by the child process 764 for stream_name in ('stdout', 'stderr'): 765 for action in ('close', 'remove'): 766 old_stream = getattr(sys, stream_name) 767 try: 768 evt = self.Event() 769 proc = self.Process(target=self._test_error_on_stdio_flush, 770 args=(evt, {stream_name: action})) 771 proc.start() 772 proc.join() 773 self.assertTrue(evt.is_set()) 774 self.assertEqual(proc.exitcode, 0) 775 finally: 776 setattr(sys, stream_name, old_stream) 777 778 @classmethod 779 def _sleep_and_set_event(self, evt, delay=0.0): 780 time.sleep(delay) 781 evt.set() 782 783 def check_forkserver_death(self, signum): 784 # bpo-31308: if the forkserver process has died, we should still 785 # be able to create and run new Process instances (the forkserver 786 # is implicitly restarted). 787 if self.TYPE == 'threads': 788 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 789 sm = multiprocessing.get_start_method() 790 if sm != 'forkserver': 791 # The fork method by design inherits all fds from the parent, 792 # trying to go against it is a lost battle 793 self.skipTest('test not appropriate for {}'.format(sm)) 794 795 from multiprocessing.forkserver import _forkserver 796 _forkserver.ensure_running() 797 798 # First process sleeps 500 ms 799 delay = 0.5 800 801 evt = self.Event() 802 proc = self.Process(target=self._sleep_and_set_event, args=(evt, delay)) 803 proc.start() 804 805 pid = _forkserver._forkserver_pid 806 os.kill(pid, signum) 807 # give time to the fork server to die and time to proc to complete 808 time.sleep(delay * 2.0) 809 810 evt2 = self.Event() 811 proc2 = self.Process(target=self._sleep_and_set_event, args=(evt2,)) 812 proc2.start() 813 proc2.join() 814 self.assertTrue(evt2.is_set()) 815 self.assertEqual(proc2.exitcode, 0) 816 817 proc.join() 818 self.assertTrue(evt.is_set()) 819 self.assertIn(proc.exitcode, (0, 255)) 820 821 def test_forkserver_sigint(self): 822 # Catchable signal 823 self.check_forkserver_death(signal.SIGINT) 824 825 def test_forkserver_sigkill(self): 826 # Uncatchable signal 827 if os.name != 'nt': 828 self.check_forkserver_death(signal.SIGKILL) 829 830 831# 832# 833# 834 835class _UpperCaser(multiprocessing.Process): 836 837 def __init__(self): 838 multiprocessing.Process.__init__(self) 839 self.child_conn, self.parent_conn = multiprocessing.Pipe() 840 841 def run(self): 842 self.parent_conn.close() 843 for s in iter(self.child_conn.recv, None): 844 self.child_conn.send(s.upper()) 845 self.child_conn.close() 846 847 def submit(self, s): 848 assert type(s) is str 849 self.parent_conn.send(s) 850 return self.parent_conn.recv() 851 852 def stop(self): 853 self.parent_conn.send(None) 854 self.parent_conn.close() 855 self.child_conn.close() 856 857class _TestSubclassingProcess(BaseTestCase): 858 859 ALLOWED_TYPES = ('processes',) 860 861 def test_subclassing(self): 862 uppercaser = _UpperCaser() 863 uppercaser.daemon = True 864 uppercaser.start() 865 self.assertEqual(uppercaser.submit('hello'), 'HELLO') 866 self.assertEqual(uppercaser.submit('world'), 'WORLD') 867 uppercaser.stop() 868 uppercaser.join() 869 870 def test_stderr_flush(self): 871 # sys.stderr is flushed at process shutdown (issue #13812) 872 if self.TYPE == "threads": 873 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 874 875 testfn = os_helper.TESTFN 876 self.addCleanup(os_helper.unlink, testfn) 877 proc = self.Process(target=self._test_stderr_flush, args=(testfn,)) 878 proc.start() 879 proc.join() 880 with open(testfn, encoding="utf-8") as f: 881 err = f.read() 882 # The whole traceback was printed 883 self.assertIn("ZeroDivisionError", err) 884 self.assertIn("test_multiprocessing.py", err) 885 self.assertIn("1/0 # MARKER", err) 886 887 @classmethod 888 def _test_stderr_flush(cls, testfn): 889 fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL) 890 sys.stderr = open(fd, 'w', encoding="utf-8", closefd=False) 891 1/0 # MARKER 892 893 894 @classmethod 895 def _test_sys_exit(cls, reason, testfn): 896 fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL) 897 sys.stderr = open(fd, 'w', encoding="utf-8", closefd=False) 898 sys.exit(reason) 899 900 def test_sys_exit(self): 901 # See Issue 13854 902 if self.TYPE == 'threads': 903 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 904 905 testfn = os_helper.TESTFN 906 self.addCleanup(os_helper.unlink, testfn) 907 908 for reason in ( 909 [1, 2, 3], 910 'ignore this', 911 ): 912 p = self.Process(target=self._test_sys_exit, args=(reason, testfn)) 913 p.daemon = True 914 p.start() 915 join_process(p) 916 self.assertEqual(p.exitcode, 1) 917 918 with open(testfn, encoding="utf-8") as f: 919 content = f.read() 920 self.assertEqual(content.rstrip(), str(reason)) 921 922 os.unlink(testfn) 923 924 cases = [ 925 ((True,), 1), 926 ((False,), 0), 927 ((8,), 8), 928 ((None,), 0), 929 ((), 0), 930 ] 931 932 for args, expected in cases: 933 with self.subTest(args=args): 934 p = self.Process(target=sys.exit, args=args) 935 p.daemon = True 936 p.start() 937 join_process(p) 938 self.assertEqual(p.exitcode, expected) 939 940# 941# 942# 943 944def queue_empty(q): 945 if hasattr(q, 'empty'): 946 return q.empty() 947 else: 948 return q.qsize() == 0 949 950def queue_full(q, maxsize): 951 if hasattr(q, 'full'): 952 return q.full() 953 else: 954 return q.qsize() == maxsize 955 956 957class _TestQueue(BaseTestCase): 958 959 960 @classmethod 961 def _test_put(cls, queue, child_can_start, parent_can_continue): 962 child_can_start.wait() 963 for i in range(6): 964 queue.get() 965 parent_can_continue.set() 966 967 def test_put(self): 968 MAXSIZE = 6 969 queue = self.Queue(maxsize=MAXSIZE) 970 child_can_start = self.Event() 971 parent_can_continue = self.Event() 972 973 proc = self.Process( 974 target=self._test_put, 975 args=(queue, child_can_start, parent_can_continue) 976 ) 977 proc.daemon = True 978 proc.start() 979 980 self.assertEqual(queue_empty(queue), True) 981 self.assertEqual(queue_full(queue, MAXSIZE), False) 982 983 queue.put(1) 984 queue.put(2, True) 985 queue.put(3, True, None) 986 queue.put(4, False) 987 queue.put(5, False, None) 988 queue.put_nowait(6) 989 990 # the values may be in buffer but not yet in pipe so sleep a bit 991 time.sleep(DELTA) 992 993 self.assertEqual(queue_empty(queue), False) 994 self.assertEqual(queue_full(queue, MAXSIZE), True) 995 996 put = TimingWrapper(queue.put) 997 put_nowait = TimingWrapper(queue.put_nowait) 998 999 self.assertRaises(pyqueue.Full, put, 7, False) 1000 self.assertTimingAlmostEqual(put.elapsed, 0) 1001 1002 self.assertRaises(pyqueue.Full, put, 7, False, None) 1003 self.assertTimingAlmostEqual(put.elapsed, 0) 1004 1005 self.assertRaises(pyqueue.Full, put_nowait, 7) 1006 self.assertTimingAlmostEqual(put_nowait.elapsed, 0) 1007 1008 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1) 1009 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1) 1010 1011 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2) 1012 self.assertTimingAlmostEqual(put.elapsed, 0) 1013 1014 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3) 1015 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3) 1016 1017 child_can_start.set() 1018 parent_can_continue.wait() 1019 1020 self.assertEqual(queue_empty(queue), True) 1021 self.assertEqual(queue_full(queue, MAXSIZE), False) 1022 1023 proc.join() 1024 close_queue(queue) 1025 1026 @classmethod 1027 def _test_get(cls, queue, child_can_start, parent_can_continue): 1028 child_can_start.wait() 1029 #queue.put(1) 1030 queue.put(2) 1031 queue.put(3) 1032 queue.put(4) 1033 queue.put(5) 1034 parent_can_continue.set() 1035 1036 def test_get(self): 1037 queue = self.Queue() 1038 child_can_start = self.Event() 1039 parent_can_continue = self.Event() 1040 1041 proc = self.Process( 1042 target=self._test_get, 1043 args=(queue, child_can_start, parent_can_continue) 1044 ) 1045 proc.daemon = True 1046 proc.start() 1047 1048 self.assertEqual(queue_empty(queue), True) 1049 1050 child_can_start.set() 1051 parent_can_continue.wait() 1052 1053 time.sleep(DELTA) 1054 self.assertEqual(queue_empty(queue), False) 1055 1056 # Hangs unexpectedly, remove for now 1057 #self.assertEqual(queue.get(), 1) 1058 self.assertEqual(queue.get(True, None), 2) 1059 self.assertEqual(queue.get(True), 3) 1060 self.assertEqual(queue.get(timeout=1), 4) 1061 self.assertEqual(queue.get_nowait(), 5) 1062 1063 self.assertEqual(queue_empty(queue), True) 1064 1065 get = TimingWrapper(queue.get) 1066 get_nowait = TimingWrapper(queue.get_nowait) 1067 1068 self.assertRaises(pyqueue.Empty, get, False) 1069 self.assertTimingAlmostEqual(get.elapsed, 0) 1070 1071 self.assertRaises(pyqueue.Empty, get, False, None) 1072 self.assertTimingAlmostEqual(get.elapsed, 0) 1073 1074 self.assertRaises(pyqueue.Empty, get_nowait) 1075 self.assertTimingAlmostEqual(get_nowait.elapsed, 0) 1076 1077 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1) 1078 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) 1079 1080 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2) 1081 self.assertTimingAlmostEqual(get.elapsed, 0) 1082 1083 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3) 1084 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3) 1085 1086 proc.join() 1087 close_queue(queue) 1088 1089 @classmethod 1090 def _test_fork(cls, queue): 1091 for i in range(10, 20): 1092 queue.put(i) 1093 # note that at this point the items may only be buffered, so the 1094 # process cannot shutdown until the feeder thread has finished 1095 # pushing items onto the pipe. 1096 1097 def test_fork(self): 1098 # Old versions of Queue would fail to create a new feeder 1099 # thread for a forked process if the original process had its 1100 # own feeder thread. This test checks that this no longer 1101 # happens. 1102 1103 queue = self.Queue() 1104 1105 # put items on queue so that main process starts a feeder thread 1106 for i in range(10): 1107 queue.put(i) 1108 1109 # wait to make sure thread starts before we fork a new process 1110 time.sleep(DELTA) 1111 1112 # fork process 1113 p = self.Process(target=self._test_fork, args=(queue,)) 1114 p.daemon = True 1115 p.start() 1116 1117 # check that all expected items are in the queue 1118 for i in range(20): 1119 self.assertEqual(queue.get(), i) 1120 self.assertRaises(pyqueue.Empty, queue.get, False) 1121 1122 p.join() 1123 close_queue(queue) 1124 1125 def test_qsize(self): 1126 q = self.Queue() 1127 try: 1128 self.assertEqual(q.qsize(), 0) 1129 except NotImplementedError: 1130 self.skipTest('qsize method not implemented') 1131 q.put(1) 1132 self.assertEqual(q.qsize(), 1) 1133 q.put(5) 1134 self.assertEqual(q.qsize(), 2) 1135 q.get() 1136 self.assertEqual(q.qsize(), 1) 1137 q.get() 1138 self.assertEqual(q.qsize(), 0) 1139 close_queue(q) 1140 1141 @classmethod 1142 def _test_task_done(cls, q): 1143 for obj in iter(q.get, None): 1144 time.sleep(DELTA) 1145 q.task_done() 1146 1147 def test_task_done(self): 1148 queue = self.JoinableQueue() 1149 1150 workers = [self.Process(target=self._test_task_done, args=(queue,)) 1151 for i in range(4)] 1152 1153 for p in workers: 1154 p.daemon = True 1155 p.start() 1156 1157 for i in range(10): 1158 queue.put(i) 1159 1160 queue.join() 1161 1162 for p in workers: 1163 queue.put(None) 1164 1165 for p in workers: 1166 p.join() 1167 close_queue(queue) 1168 1169 def test_no_import_lock_contention(self): 1170 with os_helper.temp_cwd(): 1171 module_name = 'imported_by_an_imported_module' 1172 with open(module_name + '.py', 'w', encoding="utf-8") as f: 1173 f.write("""if 1: 1174 import multiprocessing 1175 1176 q = multiprocessing.Queue() 1177 q.put('knock knock') 1178 q.get(timeout=3) 1179 q.close() 1180 del q 1181 """) 1182 1183 with import_helper.DirsOnSysPath(os.getcwd()): 1184 try: 1185 __import__(module_name) 1186 except pyqueue.Empty: 1187 self.fail("Probable regression on import lock contention;" 1188 " see Issue #22853") 1189 1190 def test_timeout(self): 1191 q = multiprocessing.Queue() 1192 start = time.monotonic() 1193 self.assertRaises(pyqueue.Empty, q.get, True, 0.200) 1194 delta = time.monotonic() - start 1195 # bpo-30317: Tolerate a delta of 100 ms because of the bad clock 1196 # resolution on Windows (usually 15.6 ms). x86 Windows7 3.x once 1197 # failed because the delta was only 135.8 ms. 1198 self.assertGreaterEqual(delta, 0.100) 1199 close_queue(q) 1200 1201 def test_queue_feeder_donot_stop_onexc(self): 1202 # bpo-30414: verify feeder handles exceptions correctly 1203 if self.TYPE != 'processes': 1204 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1205 1206 class NotSerializable(object): 1207 def __reduce__(self): 1208 raise AttributeError 1209 with test.support.captured_stderr(): 1210 q = self.Queue() 1211 q.put(NotSerializable()) 1212 q.put(True) 1213 self.assertTrue(q.get(timeout=support.SHORT_TIMEOUT)) 1214 close_queue(q) 1215 1216 with test.support.captured_stderr(): 1217 # bpo-33078: verify that the queue size is correctly handled 1218 # on errors. 1219 q = self.Queue(maxsize=1) 1220 q.put(NotSerializable()) 1221 q.put(True) 1222 try: 1223 self.assertEqual(q.qsize(), 1) 1224 except NotImplementedError: 1225 # qsize is not available on all platform as it 1226 # relies on sem_getvalue 1227 pass 1228 self.assertTrue(q.get(timeout=support.SHORT_TIMEOUT)) 1229 # Check that the size of the queue is correct 1230 self.assertTrue(q.empty()) 1231 close_queue(q) 1232 1233 def test_queue_feeder_on_queue_feeder_error(self): 1234 # bpo-30006: verify feeder handles exceptions using the 1235 # _on_queue_feeder_error hook. 1236 if self.TYPE != 'processes': 1237 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1238 1239 class NotSerializable(object): 1240 """Mock unserializable object""" 1241 def __init__(self): 1242 self.reduce_was_called = False 1243 self.on_queue_feeder_error_was_called = False 1244 1245 def __reduce__(self): 1246 self.reduce_was_called = True 1247 raise AttributeError 1248 1249 class SafeQueue(multiprocessing.queues.Queue): 1250 """Queue with overloaded _on_queue_feeder_error hook""" 1251 @staticmethod 1252 def _on_queue_feeder_error(e, obj): 1253 if (isinstance(e, AttributeError) and 1254 isinstance(obj, NotSerializable)): 1255 obj.on_queue_feeder_error_was_called = True 1256 1257 not_serializable_obj = NotSerializable() 1258 # The captured_stderr reduces the noise in the test report 1259 with test.support.captured_stderr(): 1260 q = SafeQueue(ctx=multiprocessing.get_context()) 1261 q.put(not_serializable_obj) 1262 1263 # Verify that q is still functioning correctly 1264 q.put(True) 1265 self.assertTrue(q.get(timeout=support.SHORT_TIMEOUT)) 1266 1267 # Assert that the serialization and the hook have been called correctly 1268 self.assertTrue(not_serializable_obj.reduce_was_called) 1269 self.assertTrue(not_serializable_obj.on_queue_feeder_error_was_called) 1270 1271 def test_closed_queue_put_get_exceptions(self): 1272 for q in multiprocessing.Queue(), multiprocessing.JoinableQueue(): 1273 q.close() 1274 with self.assertRaisesRegex(ValueError, 'is closed'): 1275 q.put('foo') 1276 with self.assertRaisesRegex(ValueError, 'is closed'): 1277 q.get() 1278# 1279# 1280# 1281 1282class _TestLock(BaseTestCase): 1283 1284 def test_lock(self): 1285 lock = self.Lock() 1286 self.assertEqual(lock.acquire(), True) 1287 self.assertEqual(lock.acquire(False), False) 1288 self.assertEqual(lock.release(), None) 1289 self.assertRaises((ValueError, threading.ThreadError), lock.release) 1290 1291 def test_rlock(self): 1292 lock = self.RLock() 1293 self.assertEqual(lock.acquire(), True) 1294 self.assertEqual(lock.acquire(), True) 1295 self.assertEqual(lock.acquire(), True) 1296 self.assertEqual(lock.release(), None) 1297 self.assertEqual(lock.release(), None) 1298 self.assertEqual(lock.release(), None) 1299 self.assertRaises((AssertionError, RuntimeError), lock.release) 1300 1301 def test_lock_context(self): 1302 with self.Lock(): 1303 pass 1304 1305 1306class _TestSemaphore(BaseTestCase): 1307 1308 def _test_semaphore(self, sem): 1309 self.assertReturnsIfImplemented(2, get_value, sem) 1310 self.assertEqual(sem.acquire(), True) 1311 self.assertReturnsIfImplemented(1, get_value, sem) 1312 self.assertEqual(sem.acquire(), True) 1313 self.assertReturnsIfImplemented(0, get_value, sem) 1314 self.assertEqual(sem.acquire(False), False) 1315 self.assertReturnsIfImplemented(0, get_value, sem) 1316 self.assertEqual(sem.release(), None) 1317 self.assertReturnsIfImplemented(1, get_value, sem) 1318 self.assertEqual(sem.release(), None) 1319 self.assertReturnsIfImplemented(2, get_value, sem) 1320 1321 def test_semaphore(self): 1322 sem = self.Semaphore(2) 1323 self._test_semaphore(sem) 1324 self.assertEqual(sem.release(), None) 1325 self.assertReturnsIfImplemented(3, get_value, sem) 1326 self.assertEqual(sem.release(), None) 1327 self.assertReturnsIfImplemented(4, get_value, sem) 1328 1329 def test_bounded_semaphore(self): 1330 sem = self.BoundedSemaphore(2) 1331 self._test_semaphore(sem) 1332 # Currently fails on OS/X 1333 #if HAVE_GETVALUE: 1334 # self.assertRaises(ValueError, sem.release) 1335 # self.assertReturnsIfImplemented(2, get_value, sem) 1336 1337 def test_timeout(self): 1338 if self.TYPE != 'processes': 1339 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1340 1341 sem = self.Semaphore(0) 1342 acquire = TimingWrapper(sem.acquire) 1343 1344 self.assertEqual(acquire(False), False) 1345 self.assertTimingAlmostEqual(acquire.elapsed, 0.0) 1346 1347 self.assertEqual(acquire(False, None), False) 1348 self.assertTimingAlmostEqual(acquire.elapsed, 0.0) 1349 1350 self.assertEqual(acquire(False, TIMEOUT1), False) 1351 self.assertTimingAlmostEqual(acquire.elapsed, 0) 1352 1353 self.assertEqual(acquire(True, TIMEOUT2), False) 1354 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2) 1355 1356 self.assertEqual(acquire(timeout=TIMEOUT3), False) 1357 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3) 1358 1359 1360class _TestCondition(BaseTestCase): 1361 1362 @classmethod 1363 def f(cls, cond, sleeping, woken, timeout=None): 1364 cond.acquire() 1365 sleeping.release() 1366 cond.wait(timeout) 1367 woken.release() 1368 cond.release() 1369 1370 def assertReachesEventually(self, func, value): 1371 for i in range(10): 1372 try: 1373 if func() == value: 1374 break 1375 except NotImplementedError: 1376 break 1377 time.sleep(DELTA) 1378 time.sleep(DELTA) 1379 self.assertReturnsIfImplemented(value, func) 1380 1381 def check_invariant(self, cond): 1382 # this is only supposed to succeed when there are no sleepers 1383 if self.TYPE == 'processes': 1384 try: 1385 sleepers = (cond._sleeping_count.get_value() - 1386 cond._woken_count.get_value()) 1387 self.assertEqual(sleepers, 0) 1388 self.assertEqual(cond._wait_semaphore.get_value(), 0) 1389 except NotImplementedError: 1390 pass 1391 1392 def test_notify(self): 1393 cond = self.Condition() 1394 sleeping = self.Semaphore(0) 1395 woken = self.Semaphore(0) 1396 1397 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 1398 p.daemon = True 1399 p.start() 1400 self.addCleanup(p.join) 1401 1402 p = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 1403 p.daemon = True 1404 p.start() 1405 self.addCleanup(p.join) 1406 1407 # wait for both children to start sleeping 1408 sleeping.acquire() 1409 sleeping.acquire() 1410 1411 # check no process/thread has woken up 1412 time.sleep(DELTA) 1413 self.assertReturnsIfImplemented(0, get_value, woken) 1414 1415 # wake up one process/thread 1416 cond.acquire() 1417 cond.notify() 1418 cond.release() 1419 1420 # check one process/thread has woken up 1421 time.sleep(DELTA) 1422 self.assertReturnsIfImplemented(1, get_value, woken) 1423 1424 # wake up another 1425 cond.acquire() 1426 cond.notify() 1427 cond.release() 1428 1429 # check other has woken up 1430 time.sleep(DELTA) 1431 self.assertReturnsIfImplemented(2, get_value, woken) 1432 1433 # check state is not mucked up 1434 self.check_invariant(cond) 1435 p.join() 1436 1437 def test_notify_all(self): 1438 cond = self.Condition() 1439 sleeping = self.Semaphore(0) 1440 woken = self.Semaphore(0) 1441 1442 # start some threads/processes which will timeout 1443 for i in range(3): 1444 p = self.Process(target=self.f, 1445 args=(cond, sleeping, woken, TIMEOUT1)) 1446 p.daemon = True 1447 p.start() 1448 self.addCleanup(p.join) 1449 1450 t = threading.Thread(target=self.f, 1451 args=(cond, sleeping, woken, TIMEOUT1)) 1452 t.daemon = True 1453 t.start() 1454 self.addCleanup(t.join) 1455 1456 # wait for them all to sleep 1457 for i in range(6): 1458 sleeping.acquire() 1459 1460 # check they have all timed out 1461 for i in range(6): 1462 woken.acquire() 1463 self.assertReturnsIfImplemented(0, get_value, woken) 1464 1465 # check state is not mucked up 1466 self.check_invariant(cond) 1467 1468 # start some more threads/processes 1469 for i in range(3): 1470 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 1471 p.daemon = True 1472 p.start() 1473 self.addCleanup(p.join) 1474 1475 t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 1476 t.daemon = True 1477 t.start() 1478 self.addCleanup(t.join) 1479 1480 # wait for them to all sleep 1481 for i in range(6): 1482 sleeping.acquire() 1483 1484 # check no process/thread has woken up 1485 time.sleep(DELTA) 1486 self.assertReturnsIfImplemented(0, get_value, woken) 1487 1488 # wake them all up 1489 cond.acquire() 1490 cond.notify_all() 1491 cond.release() 1492 1493 # check they have all woken 1494 self.assertReachesEventually(lambda: get_value(woken), 6) 1495 1496 # check state is not mucked up 1497 self.check_invariant(cond) 1498 1499 def test_notify_n(self): 1500 cond = self.Condition() 1501 sleeping = self.Semaphore(0) 1502 woken = self.Semaphore(0) 1503 1504 # start some threads/processes 1505 for i in range(3): 1506 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 1507 p.daemon = True 1508 p.start() 1509 self.addCleanup(p.join) 1510 1511 t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 1512 t.daemon = True 1513 t.start() 1514 self.addCleanup(t.join) 1515 1516 # wait for them to all sleep 1517 for i in range(6): 1518 sleeping.acquire() 1519 1520 # check no process/thread has woken up 1521 time.sleep(DELTA) 1522 self.assertReturnsIfImplemented(0, get_value, woken) 1523 1524 # wake some of them up 1525 cond.acquire() 1526 cond.notify(n=2) 1527 cond.release() 1528 1529 # check 2 have woken 1530 self.assertReachesEventually(lambda: get_value(woken), 2) 1531 1532 # wake the rest of them 1533 cond.acquire() 1534 cond.notify(n=4) 1535 cond.release() 1536 1537 self.assertReachesEventually(lambda: get_value(woken), 6) 1538 1539 # doesn't do anything more 1540 cond.acquire() 1541 cond.notify(n=3) 1542 cond.release() 1543 1544 self.assertReturnsIfImplemented(6, get_value, woken) 1545 1546 # check state is not mucked up 1547 self.check_invariant(cond) 1548 1549 def test_timeout(self): 1550 cond = self.Condition() 1551 wait = TimingWrapper(cond.wait) 1552 cond.acquire() 1553 res = wait(TIMEOUT1) 1554 cond.release() 1555 self.assertEqual(res, False) 1556 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) 1557 1558 @classmethod 1559 def _test_waitfor_f(cls, cond, state): 1560 with cond: 1561 state.value = 0 1562 cond.notify() 1563 result = cond.wait_for(lambda : state.value==4) 1564 if not result or state.value != 4: 1565 sys.exit(1) 1566 1567 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes') 1568 def test_waitfor(self): 1569 # based on test in test/lock_tests.py 1570 cond = self.Condition() 1571 state = self.Value('i', -1) 1572 1573 p = self.Process(target=self._test_waitfor_f, args=(cond, state)) 1574 p.daemon = True 1575 p.start() 1576 1577 with cond: 1578 result = cond.wait_for(lambda : state.value==0) 1579 self.assertTrue(result) 1580 self.assertEqual(state.value, 0) 1581 1582 for i in range(4): 1583 time.sleep(0.01) 1584 with cond: 1585 state.value += 1 1586 cond.notify() 1587 1588 join_process(p) 1589 self.assertEqual(p.exitcode, 0) 1590 1591 @classmethod 1592 def _test_waitfor_timeout_f(cls, cond, state, success, sem): 1593 sem.release() 1594 with cond: 1595 expected = 0.1 1596 dt = time.monotonic() 1597 result = cond.wait_for(lambda : state.value==4, timeout=expected) 1598 dt = time.monotonic() - dt 1599 # borrow logic in assertTimeout() from test/lock_tests.py 1600 if not result and expected * 0.6 < dt < expected * 10.0: 1601 success.value = True 1602 1603 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes') 1604 def test_waitfor_timeout(self): 1605 # based on test in test/lock_tests.py 1606 cond = self.Condition() 1607 state = self.Value('i', 0) 1608 success = self.Value('i', False) 1609 sem = self.Semaphore(0) 1610 1611 p = self.Process(target=self._test_waitfor_timeout_f, 1612 args=(cond, state, success, sem)) 1613 p.daemon = True 1614 p.start() 1615 self.assertTrue(sem.acquire(timeout=support.LONG_TIMEOUT)) 1616 1617 # Only increment 3 times, so state == 4 is never reached. 1618 for i in range(3): 1619 time.sleep(0.01) 1620 with cond: 1621 state.value += 1 1622 cond.notify() 1623 1624 join_process(p) 1625 self.assertTrue(success.value) 1626 1627 @classmethod 1628 def _test_wait_result(cls, c, pid): 1629 with c: 1630 c.notify() 1631 time.sleep(1) 1632 if pid is not None: 1633 os.kill(pid, signal.SIGINT) 1634 1635 def test_wait_result(self): 1636 if isinstance(self, ProcessesMixin) and sys.platform != 'win32': 1637 pid = os.getpid() 1638 else: 1639 pid = None 1640 1641 c = self.Condition() 1642 with c: 1643 self.assertFalse(c.wait(0)) 1644 self.assertFalse(c.wait(0.1)) 1645 1646 p = self.Process(target=self._test_wait_result, args=(c, pid)) 1647 p.start() 1648 1649 self.assertTrue(c.wait(60)) 1650 if pid is not None: 1651 self.assertRaises(KeyboardInterrupt, c.wait, 60) 1652 1653 p.join() 1654 1655 1656class _TestEvent(BaseTestCase): 1657 1658 @classmethod 1659 def _test_event(cls, event): 1660 time.sleep(TIMEOUT2) 1661 event.set() 1662 1663 def test_event(self): 1664 event = self.Event() 1665 wait = TimingWrapper(event.wait) 1666 1667 # Removed temporarily, due to API shear, this does not 1668 # work with threading._Event objects. is_set == isSet 1669 self.assertEqual(event.is_set(), False) 1670 1671 # Removed, threading.Event.wait() will return the value of the __flag 1672 # instead of None. API Shear with the semaphore backed mp.Event 1673 self.assertEqual(wait(0.0), False) 1674 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 1675 self.assertEqual(wait(TIMEOUT1), False) 1676 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) 1677 1678 event.set() 1679 1680 # See note above on the API differences 1681 self.assertEqual(event.is_set(), True) 1682 self.assertEqual(wait(), True) 1683 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 1684 self.assertEqual(wait(TIMEOUT1), True) 1685 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 1686 # self.assertEqual(event.is_set(), True) 1687 1688 event.clear() 1689 1690 #self.assertEqual(event.is_set(), False) 1691 1692 p = self.Process(target=self._test_event, args=(event,)) 1693 p.daemon = True 1694 p.start() 1695 self.assertEqual(wait(), True) 1696 p.join() 1697 1698 def test_repr(self) -> None: 1699 event = self.Event() 1700 if self.TYPE == 'processes': 1701 self.assertRegex(repr(event), r"<Event at .* unset>") 1702 event.set() 1703 self.assertRegex(repr(event), r"<Event at .* set>") 1704 event.clear() 1705 self.assertRegex(repr(event), r"<Event at .* unset>") 1706 elif self.TYPE == 'manager': 1707 self.assertRegex(repr(event), r"<EventProxy object, typeid 'Event' at .*") 1708 event.set() 1709 self.assertRegex(repr(event), r"<EventProxy object, typeid 'Event' at .*") 1710 1711 1712# Tests for Barrier - adapted from tests in test/lock_tests.py 1713# 1714 1715# Many of the tests for threading.Barrier use a list as an atomic 1716# counter: a value is appended to increment the counter, and the 1717# length of the list gives the value. We use the class DummyList 1718# for the same purpose. 1719 1720class _DummyList(object): 1721 1722 def __init__(self): 1723 wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i')) 1724 lock = multiprocessing.Lock() 1725 self.__setstate__((wrapper, lock)) 1726 self._lengthbuf[0] = 0 1727 1728 def __setstate__(self, state): 1729 (self._wrapper, self._lock) = state 1730 self._lengthbuf = self._wrapper.create_memoryview().cast('i') 1731 1732 def __getstate__(self): 1733 return (self._wrapper, self._lock) 1734 1735 def append(self, _): 1736 with self._lock: 1737 self._lengthbuf[0] += 1 1738 1739 def __len__(self): 1740 with self._lock: 1741 return self._lengthbuf[0] 1742 1743def _wait(): 1744 # A crude wait/yield function not relying on synchronization primitives. 1745 time.sleep(0.01) 1746 1747 1748class Bunch(object): 1749 """ 1750 A bunch of threads. 1751 """ 1752 def __init__(self, namespace, f, args, n, wait_before_exit=False): 1753 """ 1754 Construct a bunch of `n` threads running the same function `f`. 1755 If `wait_before_exit` is True, the threads won't terminate until 1756 do_finish() is called. 1757 """ 1758 self.f = f 1759 self.args = args 1760 self.n = n 1761 self.started = namespace.DummyList() 1762 self.finished = namespace.DummyList() 1763 self._can_exit = namespace.Event() 1764 if not wait_before_exit: 1765 self._can_exit.set() 1766 1767 threads = [] 1768 for i in range(n): 1769 p = namespace.Process(target=self.task) 1770 p.daemon = True 1771 p.start() 1772 threads.append(p) 1773 1774 def finalize(threads): 1775 for p in threads: 1776 p.join() 1777 1778 self._finalizer = weakref.finalize(self, finalize, threads) 1779 1780 def task(self): 1781 pid = os.getpid() 1782 self.started.append(pid) 1783 try: 1784 self.f(*self.args) 1785 finally: 1786 self.finished.append(pid) 1787 self._can_exit.wait(30) 1788 assert self._can_exit.is_set() 1789 1790 def wait_for_started(self): 1791 while len(self.started) < self.n: 1792 _wait() 1793 1794 def wait_for_finished(self): 1795 while len(self.finished) < self.n: 1796 _wait() 1797 1798 def do_finish(self): 1799 self._can_exit.set() 1800 1801 def close(self): 1802 self._finalizer() 1803 1804 1805class AppendTrue(object): 1806 def __init__(self, obj): 1807 self.obj = obj 1808 def __call__(self): 1809 self.obj.append(True) 1810 1811 1812class _TestBarrier(BaseTestCase): 1813 """ 1814 Tests for Barrier objects. 1815 """ 1816 N = 5 1817 defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout 1818 1819 def setUp(self): 1820 self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout) 1821 1822 def tearDown(self): 1823 self.barrier.abort() 1824 self.barrier = None 1825 1826 def DummyList(self): 1827 if self.TYPE == 'threads': 1828 return [] 1829 elif self.TYPE == 'manager': 1830 return self.manager.list() 1831 else: 1832 return _DummyList() 1833 1834 def run_threads(self, f, args): 1835 b = Bunch(self, f, args, self.N-1) 1836 try: 1837 f(*args) 1838 b.wait_for_finished() 1839 finally: 1840 b.close() 1841 1842 @classmethod 1843 def multipass(cls, barrier, results, n): 1844 m = barrier.parties 1845 assert m == cls.N 1846 for i in range(n): 1847 results[0].append(True) 1848 assert len(results[1]) == i * m 1849 barrier.wait() 1850 results[1].append(True) 1851 assert len(results[0]) == (i + 1) * m 1852 barrier.wait() 1853 try: 1854 assert barrier.n_waiting == 0 1855 except NotImplementedError: 1856 pass 1857 assert not barrier.broken 1858 1859 def test_barrier(self, passes=1): 1860 """ 1861 Test that a barrier is passed in lockstep 1862 """ 1863 results = [self.DummyList(), self.DummyList()] 1864 self.run_threads(self.multipass, (self.barrier, results, passes)) 1865 1866 def test_barrier_10(self): 1867 """ 1868 Test that a barrier works for 10 consecutive runs 1869 """ 1870 return self.test_barrier(10) 1871 1872 @classmethod 1873 def _test_wait_return_f(cls, barrier, queue): 1874 res = barrier.wait() 1875 queue.put(res) 1876 1877 def test_wait_return(self): 1878 """ 1879 test the return value from barrier.wait 1880 """ 1881 queue = self.Queue() 1882 self.run_threads(self._test_wait_return_f, (self.barrier, queue)) 1883 results = [queue.get() for i in range(self.N)] 1884 self.assertEqual(results.count(0), 1) 1885 close_queue(queue) 1886 1887 @classmethod 1888 def _test_action_f(cls, barrier, results): 1889 barrier.wait() 1890 if len(results) != 1: 1891 raise RuntimeError 1892 1893 def test_action(self): 1894 """ 1895 Test the 'action' callback 1896 """ 1897 results = self.DummyList() 1898 barrier = self.Barrier(self.N, action=AppendTrue(results)) 1899 self.run_threads(self._test_action_f, (barrier, results)) 1900 self.assertEqual(len(results), 1) 1901 1902 @classmethod 1903 def _test_abort_f(cls, barrier, results1, results2): 1904 try: 1905 i = barrier.wait() 1906 if i == cls.N//2: 1907 raise RuntimeError 1908 barrier.wait() 1909 results1.append(True) 1910 except threading.BrokenBarrierError: 1911 results2.append(True) 1912 except RuntimeError: 1913 barrier.abort() 1914 1915 def test_abort(self): 1916 """ 1917 Test that an abort will put the barrier in a broken state 1918 """ 1919 results1 = self.DummyList() 1920 results2 = self.DummyList() 1921 self.run_threads(self._test_abort_f, 1922 (self.barrier, results1, results2)) 1923 self.assertEqual(len(results1), 0) 1924 self.assertEqual(len(results2), self.N-1) 1925 self.assertTrue(self.barrier.broken) 1926 1927 @classmethod 1928 def _test_reset_f(cls, barrier, results1, results2, results3): 1929 i = barrier.wait() 1930 if i == cls.N//2: 1931 # Wait until the other threads are all in the barrier. 1932 while barrier.n_waiting < cls.N-1: 1933 time.sleep(0.001) 1934 barrier.reset() 1935 else: 1936 try: 1937 barrier.wait() 1938 results1.append(True) 1939 except threading.BrokenBarrierError: 1940 results2.append(True) 1941 # Now, pass the barrier again 1942 barrier.wait() 1943 results3.append(True) 1944 1945 def test_reset(self): 1946 """ 1947 Test that a 'reset' on a barrier frees the waiting threads 1948 """ 1949 results1 = self.DummyList() 1950 results2 = self.DummyList() 1951 results3 = self.DummyList() 1952 self.run_threads(self._test_reset_f, 1953 (self.barrier, results1, results2, results3)) 1954 self.assertEqual(len(results1), 0) 1955 self.assertEqual(len(results2), self.N-1) 1956 self.assertEqual(len(results3), self.N) 1957 1958 @classmethod 1959 def _test_abort_and_reset_f(cls, barrier, barrier2, 1960 results1, results2, results3): 1961 try: 1962 i = barrier.wait() 1963 if i == cls.N//2: 1964 raise RuntimeError 1965 barrier.wait() 1966 results1.append(True) 1967 except threading.BrokenBarrierError: 1968 results2.append(True) 1969 except RuntimeError: 1970 barrier.abort() 1971 # Synchronize and reset the barrier. Must synchronize first so 1972 # that everyone has left it when we reset, and after so that no 1973 # one enters it before the reset. 1974 if barrier2.wait() == cls.N//2: 1975 barrier.reset() 1976 barrier2.wait() 1977 barrier.wait() 1978 results3.append(True) 1979 1980 def test_abort_and_reset(self): 1981 """ 1982 Test that a barrier can be reset after being broken. 1983 """ 1984 results1 = self.DummyList() 1985 results2 = self.DummyList() 1986 results3 = self.DummyList() 1987 barrier2 = self.Barrier(self.N) 1988 1989 self.run_threads(self._test_abort_and_reset_f, 1990 (self.barrier, barrier2, results1, results2, results3)) 1991 self.assertEqual(len(results1), 0) 1992 self.assertEqual(len(results2), self.N-1) 1993 self.assertEqual(len(results3), self.N) 1994 1995 @classmethod 1996 def _test_timeout_f(cls, barrier, results): 1997 i = barrier.wait() 1998 if i == cls.N//2: 1999 # One thread is late! 2000 time.sleep(1.0) 2001 try: 2002 barrier.wait(0.5) 2003 except threading.BrokenBarrierError: 2004 results.append(True) 2005 2006 def test_timeout(self): 2007 """ 2008 Test wait(timeout) 2009 """ 2010 results = self.DummyList() 2011 self.run_threads(self._test_timeout_f, (self.barrier, results)) 2012 self.assertEqual(len(results), self.barrier.parties) 2013 2014 @classmethod 2015 def _test_default_timeout_f(cls, barrier, results): 2016 i = barrier.wait(cls.defaultTimeout) 2017 if i == cls.N//2: 2018 # One thread is later than the default timeout 2019 time.sleep(1.0) 2020 try: 2021 barrier.wait() 2022 except threading.BrokenBarrierError: 2023 results.append(True) 2024 2025 def test_default_timeout(self): 2026 """ 2027 Test the barrier's default timeout 2028 """ 2029 barrier = self.Barrier(self.N, timeout=0.5) 2030 results = self.DummyList() 2031 self.run_threads(self._test_default_timeout_f, (barrier, results)) 2032 self.assertEqual(len(results), barrier.parties) 2033 2034 def test_single_thread(self): 2035 b = self.Barrier(1) 2036 b.wait() 2037 b.wait() 2038 2039 @classmethod 2040 def _test_thousand_f(cls, barrier, passes, conn, lock): 2041 for i in range(passes): 2042 barrier.wait() 2043 with lock: 2044 conn.send(i) 2045 2046 def test_thousand(self): 2047 if self.TYPE == 'manager': 2048 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2049 passes = 1000 2050 lock = self.Lock() 2051 conn, child_conn = self.Pipe(False) 2052 for j in range(self.N): 2053 p = self.Process(target=self._test_thousand_f, 2054 args=(self.barrier, passes, child_conn, lock)) 2055 p.start() 2056 self.addCleanup(p.join) 2057 2058 for i in range(passes): 2059 for j in range(self.N): 2060 self.assertEqual(conn.recv(), i) 2061 2062# 2063# 2064# 2065 2066class _TestValue(BaseTestCase): 2067 2068 ALLOWED_TYPES = ('processes',) 2069 2070 codes_values = [ 2071 ('i', 4343, 24234), 2072 ('d', 3.625, -4.25), 2073 ('h', -232, 234), 2074 ('q', 2 ** 33, 2 ** 34), 2075 ('c', latin('x'), latin('y')) 2076 ] 2077 2078 def setUp(self): 2079 if not HAS_SHAREDCTYPES: 2080 self.skipTest("requires multiprocessing.sharedctypes") 2081 2082 @classmethod 2083 def _test(cls, values): 2084 for sv, cv in zip(values, cls.codes_values): 2085 sv.value = cv[2] 2086 2087 2088 def test_value(self, raw=False): 2089 if raw: 2090 values = [self.RawValue(code, value) 2091 for code, value, _ in self.codes_values] 2092 else: 2093 values = [self.Value(code, value) 2094 for code, value, _ in self.codes_values] 2095 2096 for sv, cv in zip(values, self.codes_values): 2097 self.assertEqual(sv.value, cv[1]) 2098 2099 proc = self.Process(target=self._test, args=(values,)) 2100 proc.daemon = True 2101 proc.start() 2102 proc.join() 2103 2104 for sv, cv in zip(values, self.codes_values): 2105 self.assertEqual(sv.value, cv[2]) 2106 2107 def test_rawvalue(self): 2108 self.test_value(raw=True) 2109 2110 def test_getobj_getlock(self): 2111 val1 = self.Value('i', 5) 2112 lock1 = val1.get_lock() 2113 obj1 = val1.get_obj() 2114 2115 val2 = self.Value('i', 5, lock=None) 2116 lock2 = val2.get_lock() 2117 obj2 = val2.get_obj() 2118 2119 lock = self.Lock() 2120 val3 = self.Value('i', 5, lock=lock) 2121 lock3 = val3.get_lock() 2122 obj3 = val3.get_obj() 2123 self.assertEqual(lock, lock3) 2124 2125 arr4 = self.Value('i', 5, lock=False) 2126 self.assertFalse(hasattr(arr4, 'get_lock')) 2127 self.assertFalse(hasattr(arr4, 'get_obj')) 2128 2129 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue') 2130 2131 arr5 = self.RawValue('i', 5) 2132 self.assertFalse(hasattr(arr5, 'get_lock')) 2133 self.assertFalse(hasattr(arr5, 'get_obj')) 2134 2135 2136class _TestArray(BaseTestCase): 2137 2138 ALLOWED_TYPES = ('processes',) 2139 2140 @classmethod 2141 def f(cls, seq): 2142 for i in range(1, len(seq)): 2143 seq[i] += seq[i-1] 2144 2145 @unittest.skipIf(c_int is None, "requires _ctypes") 2146 def test_array(self, raw=False): 2147 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831] 2148 if raw: 2149 arr = self.RawArray('i', seq) 2150 else: 2151 arr = self.Array('i', seq) 2152 2153 self.assertEqual(len(arr), len(seq)) 2154 self.assertEqual(arr[3], seq[3]) 2155 self.assertEqual(list(arr[2:7]), list(seq[2:7])) 2156 2157 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4]) 2158 2159 self.assertEqual(list(arr[:]), seq) 2160 2161 self.f(seq) 2162 2163 p = self.Process(target=self.f, args=(arr,)) 2164 p.daemon = True 2165 p.start() 2166 p.join() 2167 2168 self.assertEqual(list(arr[:]), seq) 2169 2170 @unittest.skipIf(c_int is None, "requires _ctypes") 2171 def test_array_from_size(self): 2172 size = 10 2173 # Test for zeroing (see issue #11675). 2174 # The repetition below strengthens the test by increasing the chances 2175 # of previously allocated non-zero memory being used for the new array 2176 # on the 2nd and 3rd loops. 2177 for _ in range(3): 2178 arr = self.Array('i', size) 2179 self.assertEqual(len(arr), size) 2180 self.assertEqual(list(arr), [0] * size) 2181 arr[:] = range(10) 2182 self.assertEqual(list(arr), list(range(10))) 2183 del arr 2184 2185 @unittest.skipIf(c_int is None, "requires _ctypes") 2186 def test_rawarray(self): 2187 self.test_array(raw=True) 2188 2189 @unittest.skipIf(c_int is None, "requires _ctypes") 2190 def test_getobj_getlock_obj(self): 2191 arr1 = self.Array('i', list(range(10))) 2192 lock1 = arr1.get_lock() 2193 obj1 = arr1.get_obj() 2194 2195 arr2 = self.Array('i', list(range(10)), lock=None) 2196 lock2 = arr2.get_lock() 2197 obj2 = arr2.get_obj() 2198 2199 lock = self.Lock() 2200 arr3 = self.Array('i', list(range(10)), lock=lock) 2201 lock3 = arr3.get_lock() 2202 obj3 = arr3.get_obj() 2203 self.assertEqual(lock, lock3) 2204 2205 arr4 = self.Array('i', range(10), lock=False) 2206 self.assertFalse(hasattr(arr4, 'get_lock')) 2207 self.assertFalse(hasattr(arr4, 'get_obj')) 2208 self.assertRaises(AttributeError, 2209 self.Array, 'i', range(10), lock='notalock') 2210 2211 arr5 = self.RawArray('i', range(10)) 2212 self.assertFalse(hasattr(arr5, 'get_lock')) 2213 self.assertFalse(hasattr(arr5, 'get_obj')) 2214 2215# 2216# 2217# 2218 2219class _TestContainers(BaseTestCase): 2220 2221 ALLOWED_TYPES = ('manager',) 2222 2223 def test_list(self): 2224 a = self.list(list(range(10))) 2225 self.assertEqual(a[:], list(range(10))) 2226 2227 b = self.list() 2228 self.assertEqual(b[:], []) 2229 2230 b.extend(list(range(5))) 2231 self.assertEqual(b[:], list(range(5))) 2232 2233 self.assertEqual(b[2], 2) 2234 self.assertEqual(b[2:10], [2,3,4]) 2235 2236 b *= 2 2237 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]) 2238 2239 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6]) 2240 2241 self.assertEqual(a[:], list(range(10))) 2242 2243 d = [a, b] 2244 e = self.list(d) 2245 self.assertEqual( 2246 [element[:] for element in e], 2247 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]] 2248 ) 2249 2250 f = self.list([a]) 2251 a.append('hello') 2252 self.assertEqual(f[0][:], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']) 2253 2254 def test_list_iter(self): 2255 a = self.list(list(range(10))) 2256 it = iter(a) 2257 self.assertEqual(list(it), list(range(10))) 2258 self.assertEqual(list(it), []) # exhausted 2259 # list modified during iteration 2260 it = iter(a) 2261 a[0] = 100 2262 self.assertEqual(next(it), 100) 2263 2264 def test_list_proxy_in_list(self): 2265 a = self.list([self.list(range(3)) for _i in range(3)]) 2266 self.assertEqual([inner[:] for inner in a], [[0, 1, 2]] * 3) 2267 2268 a[0][-1] = 55 2269 self.assertEqual(a[0][:], [0, 1, 55]) 2270 for i in range(1, 3): 2271 self.assertEqual(a[i][:], [0, 1, 2]) 2272 2273 self.assertEqual(a[1].pop(), 2) 2274 self.assertEqual(len(a[1]), 2) 2275 for i in range(0, 3, 2): 2276 self.assertEqual(len(a[i]), 3) 2277 2278 del a 2279 2280 b = self.list() 2281 b.append(b) 2282 del b 2283 2284 def test_dict(self): 2285 d = self.dict() 2286 indices = list(range(65, 70)) 2287 for i in indices: 2288 d[i] = chr(i) 2289 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices)) 2290 self.assertEqual(sorted(d.keys()), indices) 2291 self.assertEqual(sorted(d.values()), [chr(i) for i in indices]) 2292 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices]) 2293 2294 def test_dict_iter(self): 2295 d = self.dict() 2296 indices = list(range(65, 70)) 2297 for i in indices: 2298 d[i] = chr(i) 2299 it = iter(d) 2300 self.assertEqual(list(it), indices) 2301 self.assertEqual(list(it), []) # exhausted 2302 # dictionary changed size during iteration 2303 it = iter(d) 2304 d.clear() 2305 self.assertRaises(RuntimeError, next, it) 2306 2307 def test_dict_proxy_nested(self): 2308 pets = self.dict(ferrets=2, hamsters=4) 2309 supplies = self.dict(water=10, feed=3) 2310 d = self.dict(pets=pets, supplies=supplies) 2311 2312 self.assertEqual(supplies['water'], 10) 2313 self.assertEqual(d['supplies']['water'], 10) 2314 2315 d['supplies']['blankets'] = 5 2316 self.assertEqual(supplies['blankets'], 5) 2317 self.assertEqual(d['supplies']['blankets'], 5) 2318 2319 d['supplies']['water'] = 7 2320 self.assertEqual(supplies['water'], 7) 2321 self.assertEqual(d['supplies']['water'], 7) 2322 2323 del pets 2324 del supplies 2325 self.assertEqual(d['pets']['ferrets'], 2) 2326 d['supplies']['blankets'] = 11 2327 self.assertEqual(d['supplies']['blankets'], 11) 2328 2329 pets = d['pets'] 2330 supplies = d['supplies'] 2331 supplies['water'] = 7 2332 self.assertEqual(supplies['water'], 7) 2333 self.assertEqual(d['supplies']['water'], 7) 2334 2335 d.clear() 2336 self.assertEqual(len(d), 0) 2337 self.assertEqual(supplies['water'], 7) 2338 self.assertEqual(pets['hamsters'], 4) 2339 2340 l = self.list([pets, supplies]) 2341 l[0]['marmots'] = 1 2342 self.assertEqual(pets['marmots'], 1) 2343 self.assertEqual(l[0]['marmots'], 1) 2344 2345 del pets 2346 del supplies 2347 self.assertEqual(l[0]['marmots'], 1) 2348 2349 outer = self.list([[88, 99], l]) 2350 self.assertIsInstance(outer[0], list) # Not a ListProxy 2351 self.assertEqual(outer[-1][-1]['feed'], 3) 2352 2353 def test_nested_queue(self): 2354 a = self.list() # Test queue inside list 2355 a.append(self.Queue()) 2356 a[0].put(123) 2357 self.assertEqual(a[0].get(), 123) 2358 b = self.dict() # Test queue inside dict 2359 b[0] = self.Queue() 2360 b[0].put(456) 2361 self.assertEqual(b[0].get(), 456) 2362 2363 def test_namespace(self): 2364 n = self.Namespace() 2365 n.name = 'Bob' 2366 n.job = 'Builder' 2367 n._hidden = 'hidden' 2368 self.assertEqual((n.name, n.job), ('Bob', 'Builder')) 2369 del n.job 2370 self.assertEqual(str(n), "Namespace(name='Bob')") 2371 self.assertTrue(hasattr(n, 'name')) 2372 self.assertTrue(not hasattr(n, 'job')) 2373 2374# 2375# 2376# 2377 2378def sqr(x, wait=0.0): 2379 time.sleep(wait) 2380 return x*x 2381 2382def mul(x, y): 2383 return x*y 2384 2385def raise_large_valuerror(wait): 2386 time.sleep(wait) 2387 raise ValueError("x" * 1024**2) 2388 2389def identity(x): 2390 return x 2391 2392class CountedObject(object): 2393 n_instances = 0 2394 2395 def __new__(cls): 2396 cls.n_instances += 1 2397 return object.__new__(cls) 2398 2399 def __del__(self): 2400 type(self).n_instances -= 1 2401 2402class SayWhenError(ValueError): pass 2403 2404def exception_throwing_generator(total, when): 2405 if when == -1: 2406 raise SayWhenError("Somebody said when") 2407 for i in range(total): 2408 if i == when: 2409 raise SayWhenError("Somebody said when") 2410 yield i 2411 2412 2413class _TestPool(BaseTestCase): 2414 2415 @classmethod 2416 def setUpClass(cls): 2417 super().setUpClass() 2418 cls.pool = cls.Pool(4) 2419 2420 @classmethod 2421 def tearDownClass(cls): 2422 cls.pool.terminate() 2423 cls.pool.join() 2424 cls.pool = None 2425 super().tearDownClass() 2426 2427 def test_apply(self): 2428 papply = self.pool.apply 2429 self.assertEqual(papply(sqr, (5,)), sqr(5)) 2430 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3)) 2431 2432 def test_map(self): 2433 pmap = self.pool.map 2434 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10))))) 2435 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20), 2436 list(map(sqr, list(range(100))))) 2437 2438 def test_starmap(self): 2439 psmap = self.pool.starmap 2440 tuples = list(zip(range(10), range(9,-1, -1))) 2441 self.assertEqual(psmap(mul, tuples), 2442 list(itertools.starmap(mul, tuples))) 2443 tuples = list(zip(range(100), range(99,-1, -1))) 2444 self.assertEqual(psmap(mul, tuples, chunksize=20), 2445 list(itertools.starmap(mul, tuples))) 2446 2447 def test_starmap_async(self): 2448 tuples = list(zip(range(100), range(99,-1, -1))) 2449 self.assertEqual(self.pool.starmap_async(mul, tuples).get(), 2450 list(itertools.starmap(mul, tuples))) 2451 2452 def test_map_async(self): 2453 self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(), 2454 list(map(sqr, list(range(10))))) 2455 2456 def test_map_async_callbacks(self): 2457 call_args = self.manager.list() if self.TYPE == 'manager' else [] 2458 self.pool.map_async(int, ['1'], 2459 callback=call_args.append, 2460 error_callback=call_args.append).wait() 2461 self.assertEqual(1, len(call_args)) 2462 self.assertEqual([1], call_args[0]) 2463 self.pool.map_async(int, ['a'], 2464 callback=call_args.append, 2465 error_callback=call_args.append).wait() 2466 self.assertEqual(2, len(call_args)) 2467 self.assertIsInstance(call_args[1], ValueError) 2468 2469 def test_map_unplicklable(self): 2470 # Issue #19425 -- failure to pickle should not cause a hang 2471 if self.TYPE == 'threads': 2472 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2473 class A(object): 2474 def __reduce__(self): 2475 raise RuntimeError('cannot pickle') 2476 with self.assertRaises(RuntimeError): 2477 self.pool.map(sqr, [A()]*10) 2478 2479 def test_map_chunksize(self): 2480 try: 2481 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1) 2482 except multiprocessing.TimeoutError: 2483 self.fail("pool.map_async with chunksize stalled on null list") 2484 2485 def test_map_handle_iterable_exception(self): 2486 if self.TYPE == 'manager': 2487 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2488 2489 # SayWhenError seen at the very first of the iterable 2490 with self.assertRaises(SayWhenError): 2491 self.pool.map(sqr, exception_throwing_generator(1, -1), 1) 2492 # again, make sure it's reentrant 2493 with self.assertRaises(SayWhenError): 2494 self.pool.map(sqr, exception_throwing_generator(1, -1), 1) 2495 2496 with self.assertRaises(SayWhenError): 2497 self.pool.map(sqr, exception_throwing_generator(10, 3), 1) 2498 2499 class SpecialIterable: 2500 def __iter__(self): 2501 return self 2502 def __next__(self): 2503 raise SayWhenError 2504 def __len__(self): 2505 return 1 2506 with self.assertRaises(SayWhenError): 2507 self.pool.map(sqr, SpecialIterable(), 1) 2508 with self.assertRaises(SayWhenError): 2509 self.pool.map(sqr, SpecialIterable(), 1) 2510 2511 def test_async(self): 2512 res = self.pool.apply_async(sqr, (7, TIMEOUT1,)) 2513 get = TimingWrapper(res.get) 2514 self.assertEqual(get(), 49) 2515 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) 2516 2517 def test_async_timeout(self): 2518 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0)) 2519 get = TimingWrapper(res.get) 2520 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2) 2521 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2) 2522 2523 def test_imap(self): 2524 it = self.pool.imap(sqr, list(range(10))) 2525 self.assertEqual(list(it), list(map(sqr, list(range(10))))) 2526 2527 it = self.pool.imap(sqr, list(range(10))) 2528 for i in range(10): 2529 self.assertEqual(next(it), i*i) 2530 self.assertRaises(StopIteration, it.__next__) 2531 2532 it = self.pool.imap(sqr, list(range(1000)), chunksize=100) 2533 for i in range(1000): 2534 self.assertEqual(next(it), i*i) 2535 self.assertRaises(StopIteration, it.__next__) 2536 2537 def test_imap_handle_iterable_exception(self): 2538 if self.TYPE == 'manager': 2539 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2540 2541 # SayWhenError seen at the very first of the iterable 2542 it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1) 2543 self.assertRaises(SayWhenError, it.__next__) 2544 # again, make sure it's reentrant 2545 it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1) 2546 self.assertRaises(SayWhenError, it.__next__) 2547 2548 it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1) 2549 for i in range(3): 2550 self.assertEqual(next(it), i*i) 2551 self.assertRaises(SayWhenError, it.__next__) 2552 2553 # SayWhenError seen at start of problematic chunk's results 2554 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2) 2555 for i in range(6): 2556 self.assertEqual(next(it), i*i) 2557 self.assertRaises(SayWhenError, it.__next__) 2558 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4) 2559 for i in range(4): 2560 self.assertEqual(next(it), i*i) 2561 self.assertRaises(SayWhenError, it.__next__) 2562 2563 def test_imap_unordered(self): 2564 it = self.pool.imap_unordered(sqr, list(range(10))) 2565 self.assertEqual(sorted(it), list(map(sqr, list(range(10))))) 2566 2567 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=100) 2568 self.assertEqual(sorted(it), list(map(sqr, list(range(1000))))) 2569 2570 def test_imap_unordered_handle_iterable_exception(self): 2571 if self.TYPE == 'manager': 2572 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2573 2574 # SayWhenError seen at the very first of the iterable 2575 it = self.pool.imap_unordered(sqr, 2576 exception_throwing_generator(1, -1), 2577 1) 2578 self.assertRaises(SayWhenError, it.__next__) 2579 # again, make sure it's reentrant 2580 it = self.pool.imap_unordered(sqr, 2581 exception_throwing_generator(1, -1), 2582 1) 2583 self.assertRaises(SayWhenError, it.__next__) 2584 2585 it = self.pool.imap_unordered(sqr, 2586 exception_throwing_generator(10, 3), 2587 1) 2588 expected_values = list(map(sqr, list(range(10)))) 2589 with self.assertRaises(SayWhenError): 2590 # imap_unordered makes it difficult to anticipate the SayWhenError 2591 for i in range(10): 2592 value = next(it) 2593 self.assertIn(value, expected_values) 2594 expected_values.remove(value) 2595 2596 it = self.pool.imap_unordered(sqr, 2597 exception_throwing_generator(20, 7), 2598 2) 2599 expected_values = list(map(sqr, list(range(20)))) 2600 with self.assertRaises(SayWhenError): 2601 for i in range(20): 2602 value = next(it) 2603 self.assertIn(value, expected_values) 2604 expected_values.remove(value) 2605 2606 def test_make_pool(self): 2607 expected_error = (RemoteError if self.TYPE == 'manager' 2608 else ValueError) 2609 2610 self.assertRaises(expected_error, self.Pool, -1) 2611 self.assertRaises(expected_error, self.Pool, 0) 2612 2613 if self.TYPE != 'manager': 2614 p = self.Pool(3) 2615 try: 2616 self.assertEqual(3, len(p._pool)) 2617 finally: 2618 p.close() 2619 p.join() 2620 2621 def test_terminate(self): 2622 result = self.pool.map_async( 2623 time.sleep, [0.1 for i in range(10000)], chunksize=1 2624 ) 2625 self.pool.terminate() 2626 join = TimingWrapper(self.pool.join) 2627 join() 2628 # Sanity check the pool didn't wait for all tasks to finish 2629 self.assertLess(join.elapsed, 2.0) 2630 2631 def test_empty_iterable(self): 2632 # See Issue 12157 2633 p = self.Pool(1) 2634 2635 self.assertEqual(p.map(sqr, []), []) 2636 self.assertEqual(list(p.imap(sqr, [])), []) 2637 self.assertEqual(list(p.imap_unordered(sqr, [])), []) 2638 self.assertEqual(p.map_async(sqr, []).get(), []) 2639 2640 p.close() 2641 p.join() 2642 2643 def test_context(self): 2644 if self.TYPE == 'processes': 2645 L = list(range(10)) 2646 expected = [sqr(i) for i in L] 2647 with self.Pool(2) as p: 2648 r = p.map_async(sqr, L) 2649 self.assertEqual(r.get(), expected) 2650 p.join() 2651 self.assertRaises(ValueError, p.map_async, sqr, L) 2652 2653 @classmethod 2654 def _test_traceback(cls): 2655 raise RuntimeError(123) # some comment 2656 2657 def test_traceback(self): 2658 # We want ensure that the traceback from the child process is 2659 # contained in the traceback raised in the main process. 2660 if self.TYPE == 'processes': 2661 with self.Pool(1) as p: 2662 try: 2663 p.apply(self._test_traceback) 2664 except Exception as e: 2665 exc = e 2666 else: 2667 self.fail('expected RuntimeError') 2668 p.join() 2669 self.assertIs(type(exc), RuntimeError) 2670 self.assertEqual(exc.args, (123,)) 2671 cause = exc.__cause__ 2672 self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback) 2673 self.assertIn('raise RuntimeError(123) # some comment', cause.tb) 2674 2675 with test.support.captured_stderr() as f1: 2676 try: 2677 raise exc 2678 except RuntimeError: 2679 sys.excepthook(*sys.exc_info()) 2680 self.assertIn('raise RuntimeError(123) # some comment', 2681 f1.getvalue()) 2682 # _helper_reraises_exception should not make the error 2683 # a remote exception 2684 with self.Pool(1) as p: 2685 try: 2686 p.map(sqr, exception_throwing_generator(1, -1), 1) 2687 except Exception as e: 2688 exc = e 2689 else: 2690 self.fail('expected SayWhenError') 2691 self.assertIs(type(exc), SayWhenError) 2692 self.assertIs(exc.__cause__, None) 2693 p.join() 2694 2695 @classmethod 2696 def _test_wrapped_exception(cls): 2697 raise RuntimeError('foo') 2698 2699 def test_wrapped_exception(self): 2700 # Issue #20980: Should not wrap exception when using thread pool 2701 with self.Pool(1) as p: 2702 with self.assertRaises(RuntimeError): 2703 p.apply(self._test_wrapped_exception) 2704 p.join() 2705 2706 def test_map_no_failfast(self): 2707 # Issue #23992: the fail-fast behaviour when an exception is raised 2708 # during map() would make Pool.join() deadlock, because a worker 2709 # process would fill the result queue (after the result handler thread 2710 # terminated, hence not draining it anymore). 2711 2712 t_start = time.monotonic() 2713 2714 with self.assertRaises(ValueError): 2715 with self.Pool(2) as p: 2716 try: 2717 p.map(raise_large_valuerror, [0, 1]) 2718 finally: 2719 time.sleep(0.5) 2720 p.close() 2721 p.join() 2722 2723 # check that we indeed waited for all jobs 2724 self.assertGreater(time.monotonic() - t_start, 0.9) 2725 2726 def test_release_task_refs(self): 2727 # Issue #29861: task arguments and results should not be kept 2728 # alive after we are done with them. 2729 objs = [CountedObject() for i in range(10)] 2730 refs = [weakref.ref(o) for o in objs] 2731 self.pool.map(identity, objs) 2732 2733 del objs 2734 gc.collect() # For PyPy or other GCs. 2735 time.sleep(DELTA) # let threaded cleanup code run 2736 self.assertEqual(set(wr() for wr in refs), {None}) 2737 # With a process pool, copies of the objects are returned, check 2738 # they were released too. 2739 self.assertEqual(CountedObject.n_instances, 0) 2740 2741 def test_enter(self): 2742 if self.TYPE == 'manager': 2743 self.skipTest("test not applicable to manager") 2744 2745 pool = self.Pool(1) 2746 with pool: 2747 pass 2748 # call pool.terminate() 2749 # pool is no longer running 2750 2751 with self.assertRaises(ValueError): 2752 # bpo-35477: pool.__enter__() fails if the pool is not running 2753 with pool: 2754 pass 2755 pool.join() 2756 2757 def test_resource_warning(self): 2758 if self.TYPE == 'manager': 2759 self.skipTest("test not applicable to manager") 2760 2761 pool = self.Pool(1) 2762 pool.terminate() 2763 pool.join() 2764 2765 # force state to RUN to emit ResourceWarning in __del__() 2766 pool._state = multiprocessing.pool.RUN 2767 2768 with warnings_helper.check_warnings( 2769 ('unclosed running multiprocessing pool', ResourceWarning)): 2770 pool = None 2771 support.gc_collect() 2772 2773def raising(): 2774 raise KeyError("key") 2775 2776def unpickleable_result(): 2777 return lambda: 42 2778 2779class _TestPoolWorkerErrors(BaseTestCase): 2780 ALLOWED_TYPES = ('processes', ) 2781 2782 def test_async_error_callback(self): 2783 p = multiprocessing.Pool(2) 2784 2785 scratchpad = [None] 2786 def errback(exc): 2787 scratchpad[0] = exc 2788 2789 res = p.apply_async(raising, error_callback=errback) 2790 self.assertRaises(KeyError, res.get) 2791 self.assertTrue(scratchpad[0]) 2792 self.assertIsInstance(scratchpad[0], KeyError) 2793 2794 p.close() 2795 p.join() 2796 2797 def test_unpickleable_result(self): 2798 from multiprocessing.pool import MaybeEncodingError 2799 p = multiprocessing.Pool(2) 2800 2801 # Make sure we don't lose pool processes because of encoding errors. 2802 for iteration in range(20): 2803 2804 scratchpad = [None] 2805 def errback(exc): 2806 scratchpad[0] = exc 2807 2808 res = p.apply_async(unpickleable_result, error_callback=errback) 2809 self.assertRaises(MaybeEncodingError, res.get) 2810 wrapped = scratchpad[0] 2811 self.assertTrue(wrapped) 2812 self.assertIsInstance(scratchpad[0], MaybeEncodingError) 2813 self.assertIsNotNone(wrapped.exc) 2814 self.assertIsNotNone(wrapped.value) 2815 2816 p.close() 2817 p.join() 2818 2819class _TestPoolWorkerLifetime(BaseTestCase): 2820 ALLOWED_TYPES = ('processes', ) 2821 2822 def test_pool_worker_lifetime(self): 2823 p = multiprocessing.Pool(3, maxtasksperchild=10) 2824 self.assertEqual(3, len(p._pool)) 2825 origworkerpids = [w.pid for w in p._pool] 2826 # Run many tasks so each worker gets replaced (hopefully) 2827 results = [] 2828 for i in range(100): 2829 results.append(p.apply_async(sqr, (i, ))) 2830 # Fetch the results and verify we got the right answers, 2831 # also ensuring all the tasks have completed. 2832 for (j, res) in enumerate(results): 2833 self.assertEqual(res.get(), sqr(j)) 2834 # Refill the pool 2835 p._repopulate_pool() 2836 # Wait until all workers are alive 2837 # (countdown * DELTA = 5 seconds max startup process time) 2838 countdown = 50 2839 while countdown and not all(w.is_alive() for w in p._pool): 2840 countdown -= 1 2841 time.sleep(DELTA) 2842 finalworkerpids = [w.pid for w in p._pool] 2843 # All pids should be assigned. See issue #7805. 2844 self.assertNotIn(None, origworkerpids) 2845 self.assertNotIn(None, finalworkerpids) 2846 # Finally, check that the worker pids have changed 2847 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids)) 2848 p.close() 2849 p.join() 2850 2851 def test_pool_worker_lifetime_early_close(self): 2852 # Issue #10332: closing a pool whose workers have limited lifetimes 2853 # before all the tasks completed would make join() hang. 2854 p = multiprocessing.Pool(3, maxtasksperchild=1) 2855 results = [] 2856 for i in range(6): 2857 results.append(p.apply_async(sqr, (i, 0.3))) 2858 p.close() 2859 p.join() 2860 # check the results 2861 for (j, res) in enumerate(results): 2862 self.assertEqual(res.get(), sqr(j)) 2863 2864 def test_pool_maxtasksperchild_invalid(self): 2865 for value in [0, -1, 0.5, "12"]: 2866 with self.assertRaises(ValueError): 2867 multiprocessing.Pool(3, maxtasksperchild=value) 2868 2869 def test_worker_finalization_via_atexit_handler_of_multiprocessing(self): 2870 # tests cases against bpo-38744 and bpo-39360 2871 cmd = '''if 1: 2872 from multiprocessing import Pool 2873 problem = None 2874 class A: 2875 def __init__(self): 2876 self.pool = Pool(processes=1) 2877 def test(): 2878 global problem 2879 problem = A() 2880 problem.pool.map(float, tuple(range(10))) 2881 if __name__ == "__main__": 2882 test() 2883 ''' 2884 rc, out, err = test.support.script_helper.assert_python_ok('-c', cmd) 2885 self.assertEqual(rc, 0) 2886 2887# 2888# Test of creating a customized manager class 2889# 2890 2891from multiprocessing.managers import BaseManager, BaseProxy, RemoteError 2892 2893class FooBar(object): 2894 def f(self): 2895 return 'f()' 2896 def g(self): 2897 raise ValueError 2898 def _h(self): 2899 return '_h()' 2900 2901def baz(): 2902 for i in range(10): 2903 yield i*i 2904 2905class IteratorProxy(BaseProxy): 2906 _exposed_ = ('__next__',) 2907 def __iter__(self): 2908 return self 2909 def __next__(self): 2910 return self._callmethod('__next__') 2911 2912class MyManager(BaseManager): 2913 pass 2914 2915MyManager.register('Foo', callable=FooBar) 2916MyManager.register('Bar', callable=FooBar, exposed=('f', '_h')) 2917MyManager.register('baz', callable=baz, proxytype=IteratorProxy) 2918 2919 2920class _TestMyManager(BaseTestCase): 2921 2922 ALLOWED_TYPES = ('manager',) 2923 2924 def test_mymanager(self): 2925 manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT) 2926 manager.start() 2927 self.common(manager) 2928 manager.shutdown() 2929 2930 # bpo-30356: BaseManager._finalize_manager() sends SIGTERM 2931 # to the manager process if it takes longer than 1 second to stop, 2932 # which happens on slow buildbots. 2933 self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM)) 2934 2935 def test_mymanager_context(self): 2936 manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT) 2937 with manager: 2938 self.common(manager) 2939 # bpo-30356: BaseManager._finalize_manager() sends SIGTERM 2940 # to the manager process if it takes longer than 1 second to stop, 2941 # which happens on slow buildbots. 2942 self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM)) 2943 2944 def test_mymanager_context_prestarted(self): 2945 manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT) 2946 manager.start() 2947 with manager: 2948 self.common(manager) 2949 self.assertEqual(manager._process.exitcode, 0) 2950 2951 def common(self, manager): 2952 foo = manager.Foo() 2953 bar = manager.Bar() 2954 baz = manager.baz() 2955 2956 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)] 2957 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)] 2958 2959 self.assertEqual(foo_methods, ['f', 'g']) 2960 self.assertEqual(bar_methods, ['f', '_h']) 2961 2962 self.assertEqual(foo.f(), 'f()') 2963 self.assertRaises(ValueError, foo.g) 2964 self.assertEqual(foo._callmethod('f'), 'f()') 2965 self.assertRaises(RemoteError, foo._callmethod, '_h') 2966 2967 self.assertEqual(bar.f(), 'f()') 2968 self.assertEqual(bar._h(), '_h()') 2969 self.assertEqual(bar._callmethod('f'), 'f()') 2970 self.assertEqual(bar._callmethod('_h'), '_h()') 2971 2972 self.assertEqual(list(baz), [i*i for i in range(10)]) 2973 2974 2975# 2976# Test of connecting to a remote server and using xmlrpclib for serialization 2977# 2978 2979_queue = pyqueue.Queue() 2980def get_queue(): 2981 return _queue 2982 2983class QueueManager(BaseManager): 2984 '''manager class used by server process''' 2985QueueManager.register('get_queue', callable=get_queue) 2986 2987class QueueManager2(BaseManager): 2988 '''manager class which specifies the same interface as QueueManager''' 2989QueueManager2.register('get_queue') 2990 2991 2992SERIALIZER = 'xmlrpclib' 2993 2994class _TestRemoteManager(BaseTestCase): 2995 2996 ALLOWED_TYPES = ('manager',) 2997 values = ['hello world', None, True, 2.25, 2998 'hall\xe5 v\xe4rlden', 2999 '\u043f\u0440\u0438\u0432\u0456\u0442 \u0441\u0432\u0456\u0442', 3000 b'hall\xe5 v\xe4rlden', 3001 ] 3002 result = values[:] 3003 3004 @classmethod 3005 def _putter(cls, address, authkey): 3006 manager = QueueManager2( 3007 address=address, authkey=authkey, serializer=SERIALIZER, 3008 shutdown_timeout=SHUTDOWN_TIMEOUT) 3009 manager.connect() 3010 queue = manager.get_queue() 3011 # Note that xmlrpclib will deserialize object as a list not a tuple 3012 queue.put(tuple(cls.values)) 3013 3014 def test_remote(self): 3015 authkey = os.urandom(32) 3016 3017 manager = QueueManager( 3018 address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER, 3019 shutdown_timeout=SHUTDOWN_TIMEOUT) 3020 manager.start() 3021 self.addCleanup(manager.shutdown) 3022 3023 p = self.Process(target=self._putter, args=(manager.address, authkey)) 3024 p.daemon = True 3025 p.start() 3026 3027 manager2 = QueueManager2( 3028 address=manager.address, authkey=authkey, serializer=SERIALIZER, 3029 shutdown_timeout=SHUTDOWN_TIMEOUT) 3030 manager2.connect() 3031 queue = manager2.get_queue() 3032 3033 self.assertEqual(queue.get(), self.result) 3034 3035 # Because we are using xmlrpclib for serialization instead of 3036 # pickle this will cause a serialization error. 3037 self.assertRaises(Exception, queue.put, time.sleep) 3038 3039 # Make queue finalizer run before the server is stopped 3040 del queue 3041 3042 3043@hashlib_helper.requires_hashdigest('md5') 3044class _TestManagerRestart(BaseTestCase): 3045 3046 @classmethod 3047 def _putter(cls, address, authkey): 3048 manager = QueueManager( 3049 address=address, authkey=authkey, serializer=SERIALIZER, 3050 shutdown_timeout=SHUTDOWN_TIMEOUT) 3051 manager.connect() 3052 queue = manager.get_queue() 3053 queue.put('hello world') 3054 3055 def test_rapid_restart(self): 3056 authkey = os.urandom(32) 3057 manager = QueueManager( 3058 address=(socket_helper.HOST, 0), authkey=authkey, 3059 serializer=SERIALIZER, shutdown_timeout=SHUTDOWN_TIMEOUT) 3060 try: 3061 srvr = manager.get_server() 3062 addr = srvr.address 3063 # Close the connection.Listener socket which gets opened as a part 3064 # of manager.get_server(). It's not needed for the test. 3065 srvr.listener.close() 3066 manager.start() 3067 3068 p = self.Process(target=self._putter, args=(manager.address, authkey)) 3069 p.start() 3070 p.join() 3071 queue = manager.get_queue() 3072 self.assertEqual(queue.get(), 'hello world') 3073 del queue 3074 finally: 3075 if hasattr(manager, "shutdown"): 3076 manager.shutdown() 3077 3078 manager = QueueManager( 3079 address=addr, authkey=authkey, serializer=SERIALIZER, 3080 shutdown_timeout=SHUTDOWN_TIMEOUT) 3081 try: 3082 manager.start() 3083 self.addCleanup(manager.shutdown) 3084 except OSError as e: 3085 if e.errno != errno.EADDRINUSE: 3086 raise 3087 # Retry after some time, in case the old socket was lingering 3088 # (sporadic failure on buildbots) 3089 time.sleep(1.0) 3090 manager = QueueManager( 3091 address=addr, authkey=authkey, serializer=SERIALIZER, 3092 shutdown_timeout=SHUTDOWN_TIMEOUT) 3093 if hasattr(manager, "shutdown"): 3094 self.addCleanup(manager.shutdown) 3095 3096# 3097# 3098# 3099 3100SENTINEL = latin('') 3101 3102class _TestConnection(BaseTestCase): 3103 3104 ALLOWED_TYPES = ('processes', 'threads') 3105 3106 @classmethod 3107 def _echo(cls, conn): 3108 for msg in iter(conn.recv_bytes, SENTINEL): 3109 conn.send_bytes(msg) 3110 conn.close() 3111 3112 def test_connection(self): 3113 conn, child_conn = self.Pipe() 3114 3115 p = self.Process(target=self._echo, args=(child_conn,)) 3116 p.daemon = True 3117 p.start() 3118 3119 seq = [1, 2.25, None] 3120 msg = latin('hello world') 3121 longmsg = msg * 10 3122 arr = array.array('i', list(range(4))) 3123 3124 if self.TYPE == 'processes': 3125 self.assertEqual(type(conn.fileno()), int) 3126 3127 self.assertEqual(conn.send(seq), None) 3128 self.assertEqual(conn.recv(), seq) 3129 3130 self.assertEqual(conn.send_bytes(msg), None) 3131 self.assertEqual(conn.recv_bytes(), msg) 3132 3133 if self.TYPE == 'processes': 3134 buffer = array.array('i', [0]*10) 3135 expected = list(arr) + [0] * (10 - len(arr)) 3136 self.assertEqual(conn.send_bytes(arr), None) 3137 self.assertEqual(conn.recv_bytes_into(buffer), 3138 len(arr) * buffer.itemsize) 3139 self.assertEqual(list(buffer), expected) 3140 3141 buffer = array.array('i', [0]*10) 3142 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr)) 3143 self.assertEqual(conn.send_bytes(arr), None) 3144 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize), 3145 len(arr) * buffer.itemsize) 3146 self.assertEqual(list(buffer), expected) 3147 3148 buffer = bytearray(latin(' ' * 40)) 3149 self.assertEqual(conn.send_bytes(longmsg), None) 3150 try: 3151 res = conn.recv_bytes_into(buffer) 3152 except multiprocessing.BufferTooShort as e: 3153 self.assertEqual(e.args, (longmsg,)) 3154 else: 3155 self.fail('expected BufferTooShort, got %s' % res) 3156 3157 poll = TimingWrapper(conn.poll) 3158 3159 self.assertEqual(poll(), False) 3160 self.assertTimingAlmostEqual(poll.elapsed, 0) 3161 3162 self.assertEqual(poll(-1), False) 3163 self.assertTimingAlmostEqual(poll.elapsed, 0) 3164 3165 self.assertEqual(poll(TIMEOUT1), False) 3166 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1) 3167 3168 conn.send(None) 3169 time.sleep(.1) 3170 3171 self.assertEqual(poll(TIMEOUT1), True) 3172 self.assertTimingAlmostEqual(poll.elapsed, 0) 3173 3174 self.assertEqual(conn.recv(), None) 3175 3176 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb 3177 conn.send_bytes(really_big_msg) 3178 self.assertEqual(conn.recv_bytes(), really_big_msg) 3179 3180 conn.send_bytes(SENTINEL) # tell child to quit 3181 child_conn.close() 3182 3183 if self.TYPE == 'processes': 3184 self.assertEqual(conn.readable, True) 3185 self.assertEqual(conn.writable, True) 3186 self.assertRaises(EOFError, conn.recv) 3187 self.assertRaises(EOFError, conn.recv_bytes) 3188 3189 p.join() 3190 3191 def test_duplex_false(self): 3192 reader, writer = self.Pipe(duplex=False) 3193 self.assertEqual(writer.send(1), None) 3194 self.assertEqual(reader.recv(), 1) 3195 if self.TYPE == 'processes': 3196 self.assertEqual(reader.readable, True) 3197 self.assertEqual(reader.writable, False) 3198 self.assertEqual(writer.readable, False) 3199 self.assertEqual(writer.writable, True) 3200 self.assertRaises(OSError, reader.send, 2) 3201 self.assertRaises(OSError, writer.recv) 3202 self.assertRaises(OSError, writer.poll) 3203 3204 def test_spawn_close(self): 3205 # We test that a pipe connection can be closed by parent 3206 # process immediately after child is spawned. On Windows this 3207 # would have sometimes failed on old versions because 3208 # child_conn would be closed before the child got a chance to 3209 # duplicate it. 3210 conn, child_conn = self.Pipe() 3211 3212 p = self.Process(target=self._echo, args=(child_conn,)) 3213 p.daemon = True 3214 p.start() 3215 child_conn.close() # this might complete before child initializes 3216 3217 msg = latin('hello') 3218 conn.send_bytes(msg) 3219 self.assertEqual(conn.recv_bytes(), msg) 3220 3221 conn.send_bytes(SENTINEL) 3222 conn.close() 3223 p.join() 3224 3225 def test_sendbytes(self): 3226 if self.TYPE != 'processes': 3227 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 3228 3229 msg = latin('abcdefghijklmnopqrstuvwxyz') 3230 a, b = self.Pipe() 3231 3232 a.send_bytes(msg) 3233 self.assertEqual(b.recv_bytes(), msg) 3234 3235 a.send_bytes(msg, 5) 3236 self.assertEqual(b.recv_bytes(), msg[5:]) 3237 3238 a.send_bytes(msg, 7, 8) 3239 self.assertEqual(b.recv_bytes(), msg[7:7+8]) 3240 3241 a.send_bytes(msg, 26) 3242 self.assertEqual(b.recv_bytes(), latin('')) 3243 3244 a.send_bytes(msg, 26, 0) 3245 self.assertEqual(b.recv_bytes(), latin('')) 3246 3247 self.assertRaises(ValueError, a.send_bytes, msg, 27) 3248 3249 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5) 3250 3251 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1) 3252 3253 self.assertRaises(ValueError, a.send_bytes, msg, -1) 3254 3255 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1) 3256 3257 @classmethod 3258 def _is_fd_assigned(cls, fd): 3259 try: 3260 os.fstat(fd) 3261 except OSError as e: 3262 if e.errno == errno.EBADF: 3263 return False 3264 raise 3265 else: 3266 return True 3267 3268 @classmethod 3269 def _writefd(cls, conn, data, create_dummy_fds=False): 3270 if create_dummy_fds: 3271 for i in range(0, 256): 3272 if not cls._is_fd_assigned(i): 3273 os.dup2(conn.fileno(), i) 3274 fd = reduction.recv_handle(conn) 3275 if msvcrt: 3276 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY) 3277 os.write(fd, data) 3278 os.close(fd) 3279 3280 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 3281 def test_fd_transfer(self): 3282 if self.TYPE != 'processes': 3283 self.skipTest("only makes sense with processes") 3284 conn, child_conn = self.Pipe(duplex=True) 3285 3286 p = self.Process(target=self._writefd, args=(child_conn, b"foo")) 3287 p.daemon = True 3288 p.start() 3289 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 3290 with open(os_helper.TESTFN, "wb") as f: 3291 fd = f.fileno() 3292 if msvcrt: 3293 fd = msvcrt.get_osfhandle(fd) 3294 reduction.send_handle(conn, fd, p.pid) 3295 p.join() 3296 with open(os_helper.TESTFN, "rb") as f: 3297 self.assertEqual(f.read(), b"foo") 3298 3299 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 3300 @unittest.skipIf(sys.platform == "win32", 3301 "test semantics don't make sense on Windows") 3302 @unittest.skipIf(MAXFD <= 256, 3303 "largest assignable fd number is too small") 3304 @unittest.skipUnless(hasattr(os, "dup2"), 3305 "test needs os.dup2()") 3306 def test_large_fd_transfer(self): 3307 # With fd > 256 (issue #11657) 3308 if self.TYPE != 'processes': 3309 self.skipTest("only makes sense with processes") 3310 conn, child_conn = self.Pipe(duplex=True) 3311 3312 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True)) 3313 p.daemon = True 3314 p.start() 3315 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 3316 with open(os_helper.TESTFN, "wb") as f: 3317 fd = f.fileno() 3318 for newfd in range(256, MAXFD): 3319 if not self._is_fd_assigned(newfd): 3320 break 3321 else: 3322 self.fail("could not find an unassigned large file descriptor") 3323 os.dup2(fd, newfd) 3324 try: 3325 reduction.send_handle(conn, newfd, p.pid) 3326 finally: 3327 os.close(newfd) 3328 p.join() 3329 with open(os_helper.TESTFN, "rb") as f: 3330 self.assertEqual(f.read(), b"bar") 3331 3332 @classmethod 3333 def _send_data_without_fd(self, conn): 3334 os.write(conn.fileno(), b"\0") 3335 3336 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 3337 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows") 3338 def test_missing_fd_transfer(self): 3339 # Check that exception is raised when received data is not 3340 # accompanied by a file descriptor in ancillary data. 3341 if self.TYPE != 'processes': 3342 self.skipTest("only makes sense with processes") 3343 conn, child_conn = self.Pipe(duplex=True) 3344 3345 p = self.Process(target=self._send_data_without_fd, args=(child_conn,)) 3346 p.daemon = True 3347 p.start() 3348 self.assertRaises(RuntimeError, reduction.recv_handle, conn) 3349 p.join() 3350 3351 def test_context(self): 3352 a, b = self.Pipe() 3353 3354 with a, b: 3355 a.send(1729) 3356 self.assertEqual(b.recv(), 1729) 3357 if self.TYPE == 'processes': 3358 self.assertFalse(a.closed) 3359 self.assertFalse(b.closed) 3360 3361 if self.TYPE == 'processes': 3362 self.assertTrue(a.closed) 3363 self.assertTrue(b.closed) 3364 self.assertRaises(OSError, a.recv) 3365 self.assertRaises(OSError, b.recv) 3366 3367class _TestListener(BaseTestCase): 3368 3369 ALLOWED_TYPES = ('processes',) 3370 3371 def test_multiple_bind(self): 3372 for family in self.connection.families: 3373 l = self.connection.Listener(family=family) 3374 self.addCleanup(l.close) 3375 self.assertRaises(OSError, self.connection.Listener, 3376 l.address, family) 3377 3378 def test_context(self): 3379 with self.connection.Listener() as l: 3380 with self.connection.Client(l.address) as c: 3381 with l.accept() as d: 3382 c.send(1729) 3383 self.assertEqual(d.recv(), 1729) 3384 3385 if self.TYPE == 'processes': 3386 self.assertRaises(OSError, l.accept) 3387 3388 @unittest.skipUnless(util.abstract_sockets_supported, 3389 "test needs abstract socket support") 3390 def test_abstract_socket(self): 3391 with self.connection.Listener("\0something") as listener: 3392 with self.connection.Client(listener.address) as client: 3393 with listener.accept() as d: 3394 client.send(1729) 3395 self.assertEqual(d.recv(), 1729) 3396 3397 if self.TYPE == 'processes': 3398 self.assertRaises(OSError, listener.accept) 3399 3400 3401class _TestListenerClient(BaseTestCase): 3402 3403 ALLOWED_TYPES = ('processes', 'threads') 3404 3405 @classmethod 3406 def _test(cls, address): 3407 conn = cls.connection.Client(address) 3408 conn.send('hello') 3409 conn.close() 3410 3411 def test_listener_client(self): 3412 for family in self.connection.families: 3413 l = self.connection.Listener(family=family) 3414 p = self.Process(target=self._test, args=(l.address,)) 3415 p.daemon = True 3416 p.start() 3417 conn = l.accept() 3418 self.assertEqual(conn.recv(), 'hello') 3419 p.join() 3420 l.close() 3421 3422 def test_issue14725(self): 3423 l = self.connection.Listener() 3424 p = self.Process(target=self._test, args=(l.address,)) 3425 p.daemon = True 3426 p.start() 3427 time.sleep(1) 3428 # On Windows the client process should by now have connected, 3429 # written data and closed the pipe handle by now. This causes 3430 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue 3431 # 14725. 3432 conn = l.accept() 3433 self.assertEqual(conn.recv(), 'hello') 3434 conn.close() 3435 p.join() 3436 l.close() 3437 3438 def test_issue16955(self): 3439 for fam in self.connection.families: 3440 l = self.connection.Listener(family=fam) 3441 c = self.connection.Client(l.address) 3442 a = l.accept() 3443 a.send_bytes(b"hello") 3444 self.assertTrue(c.poll(1)) 3445 a.close() 3446 c.close() 3447 l.close() 3448 3449class _TestPoll(BaseTestCase): 3450 3451 ALLOWED_TYPES = ('processes', 'threads') 3452 3453 def test_empty_string(self): 3454 a, b = self.Pipe() 3455 self.assertEqual(a.poll(), False) 3456 b.send_bytes(b'') 3457 self.assertEqual(a.poll(), True) 3458 self.assertEqual(a.poll(), True) 3459 3460 @classmethod 3461 def _child_strings(cls, conn, strings): 3462 for s in strings: 3463 time.sleep(0.1) 3464 conn.send_bytes(s) 3465 conn.close() 3466 3467 def test_strings(self): 3468 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop') 3469 a, b = self.Pipe() 3470 p = self.Process(target=self._child_strings, args=(b, strings)) 3471 p.start() 3472 3473 for s in strings: 3474 for i in range(200): 3475 if a.poll(0.01): 3476 break 3477 x = a.recv_bytes() 3478 self.assertEqual(s, x) 3479 3480 p.join() 3481 3482 @classmethod 3483 def _child_boundaries(cls, r): 3484 # Polling may "pull" a message in to the child process, but we 3485 # don't want it to pull only part of a message, as that would 3486 # corrupt the pipe for any other processes which might later 3487 # read from it. 3488 r.poll(5) 3489 3490 def test_boundaries(self): 3491 r, w = self.Pipe(False) 3492 p = self.Process(target=self._child_boundaries, args=(r,)) 3493 p.start() 3494 time.sleep(2) 3495 L = [b"first", b"second"] 3496 for obj in L: 3497 w.send_bytes(obj) 3498 w.close() 3499 p.join() 3500 self.assertIn(r.recv_bytes(), L) 3501 3502 @classmethod 3503 def _child_dont_merge(cls, b): 3504 b.send_bytes(b'a') 3505 b.send_bytes(b'b') 3506 b.send_bytes(b'cd') 3507 3508 def test_dont_merge(self): 3509 a, b = self.Pipe() 3510 self.assertEqual(a.poll(0.0), False) 3511 self.assertEqual(a.poll(0.1), False) 3512 3513 p = self.Process(target=self._child_dont_merge, args=(b,)) 3514 p.start() 3515 3516 self.assertEqual(a.recv_bytes(), b'a') 3517 self.assertEqual(a.poll(1.0), True) 3518 self.assertEqual(a.poll(1.0), True) 3519 self.assertEqual(a.recv_bytes(), b'b') 3520 self.assertEqual(a.poll(1.0), True) 3521 self.assertEqual(a.poll(1.0), True) 3522 self.assertEqual(a.poll(0.0), True) 3523 self.assertEqual(a.recv_bytes(), b'cd') 3524 3525 p.join() 3526 3527# 3528# Test of sending connection and socket objects between processes 3529# 3530 3531@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 3532@hashlib_helper.requires_hashdigest('md5') 3533class _TestPicklingConnections(BaseTestCase): 3534 3535 ALLOWED_TYPES = ('processes',) 3536 3537 @classmethod 3538 def tearDownClass(cls): 3539 from multiprocessing import resource_sharer 3540 resource_sharer.stop(timeout=support.LONG_TIMEOUT) 3541 3542 @classmethod 3543 def _listener(cls, conn, families): 3544 for fam in families: 3545 l = cls.connection.Listener(family=fam) 3546 conn.send(l.address) 3547 new_conn = l.accept() 3548 conn.send(new_conn) 3549 new_conn.close() 3550 l.close() 3551 3552 l = socket.create_server((socket_helper.HOST, 0)) 3553 conn.send(l.getsockname()) 3554 new_conn, addr = l.accept() 3555 conn.send(new_conn) 3556 new_conn.close() 3557 l.close() 3558 3559 conn.recv() 3560 3561 @classmethod 3562 def _remote(cls, conn): 3563 for (address, msg) in iter(conn.recv, None): 3564 client = cls.connection.Client(address) 3565 client.send(msg.upper()) 3566 client.close() 3567 3568 address, msg = conn.recv() 3569 client = socket.socket() 3570 client.connect(address) 3571 client.sendall(msg.upper()) 3572 client.close() 3573 3574 conn.close() 3575 3576 def test_pickling(self): 3577 families = self.connection.families 3578 3579 lconn, lconn0 = self.Pipe() 3580 lp = self.Process(target=self._listener, args=(lconn0, families)) 3581 lp.daemon = True 3582 lp.start() 3583 lconn0.close() 3584 3585 rconn, rconn0 = self.Pipe() 3586 rp = self.Process(target=self._remote, args=(rconn0,)) 3587 rp.daemon = True 3588 rp.start() 3589 rconn0.close() 3590 3591 for fam in families: 3592 msg = ('This connection uses family %s' % fam).encode('ascii') 3593 address = lconn.recv() 3594 rconn.send((address, msg)) 3595 new_conn = lconn.recv() 3596 self.assertEqual(new_conn.recv(), msg.upper()) 3597 3598 rconn.send(None) 3599 3600 msg = latin('This connection uses a normal socket') 3601 address = lconn.recv() 3602 rconn.send((address, msg)) 3603 new_conn = lconn.recv() 3604 buf = [] 3605 while True: 3606 s = new_conn.recv(100) 3607 if not s: 3608 break 3609 buf.append(s) 3610 buf = b''.join(buf) 3611 self.assertEqual(buf, msg.upper()) 3612 new_conn.close() 3613 3614 lconn.send(None) 3615 3616 rconn.close() 3617 lconn.close() 3618 3619 lp.join() 3620 rp.join() 3621 3622 @classmethod 3623 def child_access(cls, conn): 3624 w = conn.recv() 3625 w.send('all is well') 3626 w.close() 3627 3628 r = conn.recv() 3629 msg = r.recv() 3630 conn.send(msg*2) 3631 3632 conn.close() 3633 3634 def test_access(self): 3635 # On Windows, if we do not specify a destination pid when 3636 # using DupHandle then we need to be careful to use the 3637 # correct access flags for DuplicateHandle(), or else 3638 # DupHandle.detach() will raise PermissionError. For example, 3639 # for a read only pipe handle we should use 3640 # access=FILE_GENERIC_READ. (Unfortunately 3641 # DUPLICATE_SAME_ACCESS does not work.) 3642 conn, child_conn = self.Pipe() 3643 p = self.Process(target=self.child_access, args=(child_conn,)) 3644 p.daemon = True 3645 p.start() 3646 child_conn.close() 3647 3648 r, w = self.Pipe(duplex=False) 3649 conn.send(w) 3650 w.close() 3651 self.assertEqual(r.recv(), 'all is well') 3652 r.close() 3653 3654 r, w = self.Pipe(duplex=False) 3655 conn.send(r) 3656 r.close() 3657 w.send('foobar') 3658 w.close() 3659 self.assertEqual(conn.recv(), 'foobar'*2) 3660 3661 p.join() 3662 3663# 3664# 3665# 3666 3667class _TestHeap(BaseTestCase): 3668 3669 ALLOWED_TYPES = ('processes',) 3670 3671 def setUp(self): 3672 super().setUp() 3673 # Make pristine heap for these tests 3674 self.old_heap = multiprocessing.heap.BufferWrapper._heap 3675 multiprocessing.heap.BufferWrapper._heap = multiprocessing.heap.Heap() 3676 3677 def tearDown(self): 3678 multiprocessing.heap.BufferWrapper._heap = self.old_heap 3679 super().tearDown() 3680 3681 def test_heap(self): 3682 iterations = 5000 3683 maxblocks = 50 3684 blocks = [] 3685 3686 # get the heap object 3687 heap = multiprocessing.heap.BufferWrapper._heap 3688 heap._DISCARD_FREE_SPACE_LARGER_THAN = 0 3689 3690 # create and destroy lots of blocks of different sizes 3691 for i in range(iterations): 3692 size = int(random.lognormvariate(0, 1) * 1000) 3693 b = multiprocessing.heap.BufferWrapper(size) 3694 blocks.append(b) 3695 if len(blocks) > maxblocks: 3696 i = random.randrange(maxblocks) 3697 del blocks[i] 3698 del b 3699 3700 # verify the state of the heap 3701 with heap._lock: 3702 all = [] 3703 free = 0 3704 occupied = 0 3705 for L in list(heap._len_to_seq.values()): 3706 # count all free blocks in arenas 3707 for arena, start, stop in L: 3708 all.append((heap._arenas.index(arena), start, stop, 3709 stop-start, 'free')) 3710 free += (stop-start) 3711 for arena, arena_blocks in heap._allocated_blocks.items(): 3712 # count all allocated blocks in arenas 3713 for start, stop in arena_blocks: 3714 all.append((heap._arenas.index(arena), start, stop, 3715 stop-start, 'occupied')) 3716 occupied += (stop-start) 3717 3718 self.assertEqual(free + occupied, 3719 sum(arena.size for arena in heap._arenas)) 3720 3721 all.sort() 3722 3723 for i in range(len(all)-1): 3724 (arena, start, stop) = all[i][:3] 3725 (narena, nstart, nstop) = all[i+1][:3] 3726 if arena != narena: 3727 # Two different arenas 3728 self.assertEqual(stop, heap._arenas[arena].size) # last block 3729 self.assertEqual(nstart, 0) # first block 3730 else: 3731 # Same arena: two adjacent blocks 3732 self.assertEqual(stop, nstart) 3733 3734 # test free'ing all blocks 3735 random.shuffle(blocks) 3736 while blocks: 3737 blocks.pop() 3738 3739 self.assertEqual(heap._n_frees, heap._n_mallocs) 3740 self.assertEqual(len(heap._pending_free_blocks), 0) 3741 self.assertEqual(len(heap._arenas), 0) 3742 self.assertEqual(len(heap._allocated_blocks), 0, heap._allocated_blocks) 3743 self.assertEqual(len(heap._len_to_seq), 0) 3744 3745 def test_free_from_gc(self): 3746 # Check that freeing of blocks by the garbage collector doesn't deadlock 3747 # (issue #12352). 3748 # Make sure the GC is enabled, and set lower collection thresholds to 3749 # make collections more frequent (and increase the probability of 3750 # deadlock). 3751 if not gc.isenabled(): 3752 gc.enable() 3753 self.addCleanup(gc.disable) 3754 thresholds = gc.get_threshold() 3755 self.addCleanup(gc.set_threshold, *thresholds) 3756 gc.set_threshold(10) 3757 3758 # perform numerous block allocations, with cyclic references to make 3759 # sure objects are collected asynchronously by the gc 3760 for i in range(5000): 3761 a = multiprocessing.heap.BufferWrapper(1) 3762 b = multiprocessing.heap.BufferWrapper(1) 3763 # circular references 3764 a.buddy = b 3765 b.buddy = a 3766 3767# 3768# 3769# 3770 3771class _Foo(Structure): 3772 _fields_ = [ 3773 ('x', c_int), 3774 ('y', c_double), 3775 ('z', c_longlong,) 3776 ] 3777 3778class _TestSharedCTypes(BaseTestCase): 3779 3780 ALLOWED_TYPES = ('processes',) 3781 3782 def setUp(self): 3783 if not HAS_SHAREDCTYPES: 3784 self.skipTest("requires multiprocessing.sharedctypes") 3785 3786 @classmethod 3787 def _double(cls, x, y, z, foo, arr, string): 3788 x.value *= 2 3789 y.value *= 2 3790 z.value *= 2 3791 foo.x *= 2 3792 foo.y *= 2 3793 string.value *= 2 3794 for i in range(len(arr)): 3795 arr[i] *= 2 3796 3797 def test_sharedctypes(self, lock=False): 3798 x = Value('i', 7, lock=lock) 3799 y = Value(c_double, 1.0/3.0, lock=lock) 3800 z = Value(c_longlong, 2 ** 33, lock=lock) 3801 foo = Value(_Foo, 3, 2, lock=lock) 3802 arr = self.Array('d', list(range(10)), lock=lock) 3803 string = self.Array('c', 20, lock=lock) 3804 string.value = latin('hello') 3805 3806 p = self.Process(target=self._double, args=(x, y, z, foo, arr, string)) 3807 p.daemon = True 3808 p.start() 3809 p.join() 3810 3811 self.assertEqual(x.value, 14) 3812 self.assertAlmostEqual(y.value, 2.0/3.0) 3813 self.assertEqual(z.value, 2 ** 34) 3814 self.assertEqual(foo.x, 6) 3815 self.assertAlmostEqual(foo.y, 4.0) 3816 for i in range(10): 3817 self.assertAlmostEqual(arr[i], i*2) 3818 self.assertEqual(string.value, latin('hellohello')) 3819 3820 def test_synchronize(self): 3821 self.test_sharedctypes(lock=True) 3822 3823 def test_copy(self): 3824 foo = _Foo(2, 5.0, 2 ** 33) 3825 bar = copy(foo) 3826 foo.x = 0 3827 foo.y = 0 3828 foo.z = 0 3829 self.assertEqual(bar.x, 2) 3830 self.assertAlmostEqual(bar.y, 5.0) 3831 self.assertEqual(bar.z, 2 ** 33) 3832 3833 3834@unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory") 3835@hashlib_helper.requires_hashdigest('md5') 3836class _TestSharedMemory(BaseTestCase): 3837 3838 ALLOWED_TYPES = ('processes',) 3839 3840 @staticmethod 3841 def _attach_existing_shmem_then_write(shmem_name_or_obj, binary_data): 3842 if isinstance(shmem_name_or_obj, str): 3843 local_sms = shared_memory.SharedMemory(shmem_name_or_obj) 3844 else: 3845 local_sms = shmem_name_or_obj 3846 local_sms.buf[:len(binary_data)] = binary_data 3847 local_sms.close() 3848 3849 def _new_shm_name(self, prefix): 3850 # Add a PID to the name of a POSIX shared memory object to allow 3851 # running multiprocessing tests (test_multiprocessing_fork, 3852 # test_multiprocessing_spawn, etc) in parallel. 3853 return prefix + str(os.getpid()) 3854 3855 def test_shared_memory_basics(self): 3856 name_tsmb = self._new_shm_name('test01_tsmb') 3857 sms = shared_memory.SharedMemory(name_tsmb, create=True, size=512) 3858 self.addCleanup(sms.unlink) 3859 3860 # Verify attributes are readable. 3861 self.assertEqual(sms.name, name_tsmb) 3862 self.assertGreaterEqual(sms.size, 512) 3863 self.assertGreaterEqual(len(sms.buf), sms.size) 3864 3865 # Verify __repr__ 3866 self.assertIn(sms.name, str(sms)) 3867 self.assertIn(str(sms.size), str(sms)) 3868 3869 # Modify contents of shared memory segment through memoryview. 3870 sms.buf[0] = 42 3871 self.assertEqual(sms.buf[0], 42) 3872 3873 # Attach to existing shared memory segment. 3874 also_sms = shared_memory.SharedMemory(name_tsmb) 3875 self.assertEqual(also_sms.buf[0], 42) 3876 also_sms.close() 3877 3878 # Attach to existing shared memory segment but specify a new size. 3879 same_sms = shared_memory.SharedMemory(name_tsmb, size=20*sms.size) 3880 self.assertLess(same_sms.size, 20*sms.size) # Size was ignored. 3881 same_sms.close() 3882 3883 # Creating Shared Memory Segment with -ve size 3884 with self.assertRaises(ValueError): 3885 shared_memory.SharedMemory(create=True, size=-2) 3886 3887 # Attaching Shared Memory Segment without a name 3888 with self.assertRaises(ValueError): 3889 shared_memory.SharedMemory(create=False) 3890 3891 # Test if shared memory segment is created properly, 3892 # when _make_filename returns an existing shared memory segment name 3893 with unittest.mock.patch( 3894 'multiprocessing.shared_memory._make_filename') as mock_make_filename: 3895 3896 NAME_PREFIX = shared_memory._SHM_NAME_PREFIX 3897 names = [self._new_shm_name('test01_fn'), self._new_shm_name('test02_fn')] 3898 # Prepend NAME_PREFIX which can be '/psm_' or 'wnsm_', necessary 3899 # because some POSIX compliant systems require name to start with / 3900 names = [NAME_PREFIX + name for name in names] 3901 3902 mock_make_filename.side_effect = names 3903 shm1 = shared_memory.SharedMemory(create=True, size=1) 3904 self.addCleanup(shm1.unlink) 3905 self.assertEqual(shm1._name, names[0]) 3906 3907 mock_make_filename.side_effect = names 3908 shm2 = shared_memory.SharedMemory(create=True, size=1) 3909 self.addCleanup(shm2.unlink) 3910 self.assertEqual(shm2._name, names[1]) 3911 3912 if shared_memory._USE_POSIX: 3913 # Posix Shared Memory can only be unlinked once. Here we 3914 # test an implementation detail that is not observed across 3915 # all supported platforms (since WindowsNamedSharedMemory 3916 # manages unlinking on its own and unlink() does nothing). 3917 # True release of shared memory segment does not necessarily 3918 # happen until process exits, depending on the OS platform. 3919 name_dblunlink = self._new_shm_name('test01_dblunlink') 3920 sms_uno = shared_memory.SharedMemory( 3921 name_dblunlink, 3922 create=True, 3923 size=5000 3924 ) 3925 with self.assertRaises(FileNotFoundError): 3926 try: 3927 self.assertGreaterEqual(sms_uno.size, 5000) 3928 3929 sms_duo = shared_memory.SharedMemory(name_dblunlink) 3930 sms_duo.unlink() # First shm_unlink() call. 3931 sms_duo.close() 3932 sms_uno.close() 3933 3934 finally: 3935 sms_uno.unlink() # A second shm_unlink() call is bad. 3936 3937 with self.assertRaises(FileExistsError): 3938 # Attempting to create a new shared memory segment with a 3939 # name that is already in use triggers an exception. 3940 there_can_only_be_one_sms = shared_memory.SharedMemory( 3941 name_tsmb, 3942 create=True, 3943 size=512 3944 ) 3945 3946 if shared_memory._USE_POSIX: 3947 # Requesting creation of a shared memory segment with the option 3948 # to attach to an existing segment, if that name is currently in 3949 # use, should not trigger an exception. 3950 # Note: Using a smaller size could possibly cause truncation of 3951 # the existing segment but is OS platform dependent. In the 3952 # case of MacOS/darwin, requesting a smaller size is disallowed. 3953 class OptionalAttachSharedMemory(shared_memory.SharedMemory): 3954 _flags = os.O_CREAT | os.O_RDWR 3955 ok_if_exists_sms = OptionalAttachSharedMemory(name_tsmb) 3956 self.assertEqual(ok_if_exists_sms.size, sms.size) 3957 ok_if_exists_sms.close() 3958 3959 # Attempting to attach to an existing shared memory segment when 3960 # no segment exists with the supplied name triggers an exception. 3961 with self.assertRaises(FileNotFoundError): 3962 nonexisting_sms = shared_memory.SharedMemory('test01_notthere') 3963 nonexisting_sms.unlink() # Error should occur on prior line. 3964 3965 sms.close() 3966 3967 def test_shared_memory_recreate(self): 3968 # Test if shared memory segment is created properly, 3969 # when _make_filename returns an existing shared memory segment name 3970 with unittest.mock.patch( 3971 'multiprocessing.shared_memory._make_filename') as mock_make_filename: 3972 3973 NAME_PREFIX = shared_memory._SHM_NAME_PREFIX 3974 names = [self._new_shm_name('test03_fn'), self._new_shm_name('test04_fn')] 3975 # Prepend NAME_PREFIX which can be '/psm_' or 'wnsm_', necessary 3976 # because some POSIX compliant systems require name to start with / 3977 names = [NAME_PREFIX + name for name in names] 3978 3979 mock_make_filename.side_effect = names 3980 shm1 = shared_memory.SharedMemory(create=True, size=1) 3981 self.addCleanup(shm1.unlink) 3982 self.assertEqual(shm1._name, names[0]) 3983 3984 mock_make_filename.side_effect = names 3985 shm2 = shared_memory.SharedMemory(create=True, size=1) 3986 self.addCleanup(shm2.unlink) 3987 self.assertEqual(shm2._name, names[1]) 3988 3989 def test_invalid_shared_memory_cration(self): 3990 # Test creating a shared memory segment with negative size 3991 with self.assertRaises(ValueError): 3992 sms_invalid = shared_memory.SharedMemory(create=True, size=-1) 3993 3994 # Test creating a shared memory segment with size 0 3995 with self.assertRaises(ValueError): 3996 sms_invalid = shared_memory.SharedMemory(create=True, size=0) 3997 3998 # Test creating a shared memory segment without size argument 3999 with self.assertRaises(ValueError): 4000 sms_invalid = shared_memory.SharedMemory(create=True) 4001 4002 def test_shared_memory_pickle_unpickle(self): 4003 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 4004 with self.subTest(proto=proto): 4005 sms = shared_memory.SharedMemory(create=True, size=512) 4006 self.addCleanup(sms.unlink) 4007 sms.buf[0:6] = b'pickle' 4008 4009 # Test pickling 4010 pickled_sms = pickle.dumps(sms, protocol=proto) 4011 4012 # Test unpickling 4013 sms2 = pickle.loads(pickled_sms) 4014 self.assertIsInstance(sms2, shared_memory.SharedMemory) 4015 self.assertEqual(sms.name, sms2.name) 4016 self.assertEqual(bytes(sms.buf[0:6]), b'pickle') 4017 self.assertEqual(bytes(sms2.buf[0:6]), b'pickle') 4018 4019 # Test that unpickled version is still the same SharedMemory 4020 sms.buf[0:6] = b'newval' 4021 self.assertEqual(bytes(sms.buf[0:6]), b'newval') 4022 self.assertEqual(bytes(sms2.buf[0:6]), b'newval') 4023 4024 sms2.buf[0:6] = b'oldval' 4025 self.assertEqual(bytes(sms.buf[0:6]), b'oldval') 4026 self.assertEqual(bytes(sms2.buf[0:6]), b'oldval') 4027 4028 def test_shared_memory_pickle_unpickle_dead_object(self): 4029 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 4030 with self.subTest(proto=proto): 4031 sms = shared_memory.SharedMemory(create=True, size=512) 4032 sms.buf[0:6] = b'pickle' 4033 pickled_sms = pickle.dumps(sms, protocol=proto) 4034 4035 # Now, we are going to kill the original object. 4036 # So, unpickled one won't be able to attach to it. 4037 sms.close() 4038 sms.unlink() 4039 4040 with self.assertRaises(FileNotFoundError): 4041 pickle.loads(pickled_sms) 4042 4043 def test_shared_memory_across_processes(self): 4044 # bpo-40135: don't define shared memory block's name in case of 4045 # the failure when we run multiprocessing tests in parallel. 4046 sms = shared_memory.SharedMemory(create=True, size=512) 4047 self.addCleanup(sms.unlink) 4048 4049 # Verify remote attachment to existing block by name is working. 4050 p = self.Process( 4051 target=self._attach_existing_shmem_then_write, 4052 args=(sms.name, b'howdy') 4053 ) 4054 p.daemon = True 4055 p.start() 4056 p.join() 4057 self.assertEqual(bytes(sms.buf[:5]), b'howdy') 4058 4059 # Verify pickling of SharedMemory instance also works. 4060 p = self.Process( 4061 target=self._attach_existing_shmem_then_write, 4062 args=(sms, b'HELLO') 4063 ) 4064 p.daemon = True 4065 p.start() 4066 p.join() 4067 self.assertEqual(bytes(sms.buf[:5]), b'HELLO') 4068 4069 sms.close() 4070 4071 @unittest.skipIf(os.name != "posix", "not feasible in non-posix platforms") 4072 def test_shared_memory_SharedMemoryServer_ignores_sigint(self): 4073 # bpo-36368: protect SharedMemoryManager server process from 4074 # KeyboardInterrupt signals. 4075 smm = multiprocessing.managers.SharedMemoryManager() 4076 smm.start() 4077 4078 # make sure the manager works properly at the beginning 4079 sl = smm.ShareableList(range(10)) 4080 4081 # the manager's server should ignore KeyboardInterrupt signals, and 4082 # maintain its connection with the current process, and success when 4083 # asked to deliver memory segments. 4084 os.kill(smm._process.pid, signal.SIGINT) 4085 4086 sl2 = smm.ShareableList(range(10)) 4087 4088 # test that the custom signal handler registered in the Manager does 4089 # not affect signal handling in the parent process. 4090 with self.assertRaises(KeyboardInterrupt): 4091 os.kill(os.getpid(), signal.SIGINT) 4092 4093 smm.shutdown() 4094 4095 @unittest.skipIf(os.name != "posix", "resource_tracker is posix only") 4096 def test_shared_memory_SharedMemoryManager_reuses_resource_tracker(self): 4097 # bpo-36867: test that a SharedMemoryManager uses the 4098 # same resource_tracker process as its parent. 4099 cmd = '''if 1: 4100 from multiprocessing.managers import SharedMemoryManager 4101 4102 4103 smm = SharedMemoryManager() 4104 smm.start() 4105 sl = smm.ShareableList(range(10)) 4106 smm.shutdown() 4107 ''' 4108 rc, out, err = test.support.script_helper.assert_python_ok('-c', cmd) 4109 4110 # Before bpo-36867 was fixed, a SharedMemoryManager not using the same 4111 # resource_tracker process as its parent would make the parent's 4112 # tracker complain about sl being leaked even though smm.shutdown() 4113 # properly released sl. 4114 self.assertFalse(err) 4115 4116 def test_shared_memory_SharedMemoryManager_basics(self): 4117 smm1 = multiprocessing.managers.SharedMemoryManager() 4118 with self.assertRaises(ValueError): 4119 smm1.SharedMemory(size=9) # Fails if SharedMemoryServer not started 4120 smm1.start() 4121 lol = [ smm1.ShareableList(range(i)) for i in range(5, 10) ] 4122 lom = [ smm1.SharedMemory(size=j) for j in range(32, 128, 16) ] 4123 doppleganger_list0 = shared_memory.ShareableList(name=lol[0].shm.name) 4124 self.assertEqual(len(doppleganger_list0), 5) 4125 doppleganger_shm0 = shared_memory.SharedMemory(name=lom[0].name) 4126 self.assertGreaterEqual(len(doppleganger_shm0.buf), 32) 4127 held_name = lom[0].name 4128 smm1.shutdown() 4129 if sys.platform != "win32": 4130 # Calls to unlink() have no effect on Windows platform; shared 4131 # memory will only be released once final process exits. 4132 with self.assertRaises(FileNotFoundError): 4133 # No longer there to be attached to again. 4134 absent_shm = shared_memory.SharedMemory(name=held_name) 4135 4136 with multiprocessing.managers.SharedMemoryManager() as smm2: 4137 sl = smm2.ShareableList("howdy") 4138 shm = smm2.SharedMemory(size=128) 4139 held_name = sl.shm.name 4140 if sys.platform != "win32": 4141 with self.assertRaises(FileNotFoundError): 4142 # No longer there to be attached to again. 4143 absent_sl = shared_memory.ShareableList(name=held_name) 4144 4145 4146 def test_shared_memory_ShareableList_basics(self): 4147 sl = shared_memory.ShareableList( 4148 ['howdy', b'HoWdY', -273.154, 100, None, True, 42] 4149 ) 4150 self.addCleanup(sl.shm.unlink) 4151 4152 # Verify __repr__ 4153 self.assertIn(sl.shm.name, str(sl)) 4154 self.assertIn(str(list(sl)), str(sl)) 4155 4156 # Index Out of Range (get) 4157 with self.assertRaises(IndexError): 4158 sl[7] 4159 4160 # Index Out of Range (set) 4161 with self.assertRaises(IndexError): 4162 sl[7] = 2 4163 4164 # Assign value without format change (str -> str) 4165 current_format = sl._get_packing_format(0) 4166 sl[0] = 'howdy' 4167 self.assertEqual(current_format, sl._get_packing_format(0)) 4168 4169 # Verify attributes are readable. 4170 self.assertEqual(sl.format, '8s8sdqxxxxxx?xxxxxxxx?q') 4171 4172 # Exercise len(). 4173 self.assertEqual(len(sl), 7) 4174 4175 # Exercise index(). 4176 with warnings.catch_warnings(): 4177 # Suppress BytesWarning when comparing against b'HoWdY'. 4178 warnings.simplefilter('ignore') 4179 with self.assertRaises(ValueError): 4180 sl.index('100') 4181 self.assertEqual(sl.index(100), 3) 4182 4183 # Exercise retrieving individual values. 4184 self.assertEqual(sl[0], 'howdy') 4185 self.assertEqual(sl[-2], True) 4186 4187 # Exercise iterability. 4188 self.assertEqual( 4189 tuple(sl), 4190 ('howdy', b'HoWdY', -273.154, 100, None, True, 42) 4191 ) 4192 4193 # Exercise modifying individual values. 4194 sl[3] = 42 4195 self.assertEqual(sl[3], 42) 4196 sl[4] = 'some' # Change type at a given position. 4197 self.assertEqual(sl[4], 'some') 4198 self.assertEqual(sl.format, '8s8sdq8sxxxxxxx?q') 4199 with self.assertRaisesRegex(ValueError, 4200 "exceeds available storage"): 4201 sl[4] = 'far too many' 4202 self.assertEqual(sl[4], 'some') 4203 sl[0] = 'encodés' # Exactly 8 bytes of UTF-8 data 4204 self.assertEqual(sl[0], 'encodés') 4205 self.assertEqual(sl[1], b'HoWdY') # no spillage 4206 with self.assertRaisesRegex(ValueError, 4207 "exceeds available storage"): 4208 sl[0] = 'encodées' # Exactly 9 bytes of UTF-8 data 4209 self.assertEqual(sl[1], b'HoWdY') 4210 with self.assertRaisesRegex(ValueError, 4211 "exceeds available storage"): 4212 sl[1] = b'123456789' 4213 self.assertEqual(sl[1], b'HoWdY') 4214 4215 # Exercise count(). 4216 with warnings.catch_warnings(): 4217 # Suppress BytesWarning when comparing against b'HoWdY'. 4218 warnings.simplefilter('ignore') 4219 self.assertEqual(sl.count(42), 2) 4220 self.assertEqual(sl.count(b'HoWdY'), 1) 4221 self.assertEqual(sl.count(b'adios'), 0) 4222 4223 # Exercise creating a duplicate. 4224 name_duplicate = self._new_shm_name('test03_duplicate') 4225 sl_copy = shared_memory.ShareableList(sl, name=name_duplicate) 4226 try: 4227 self.assertNotEqual(sl.shm.name, sl_copy.shm.name) 4228 self.assertEqual(name_duplicate, sl_copy.shm.name) 4229 self.assertEqual(list(sl), list(sl_copy)) 4230 self.assertEqual(sl.format, sl_copy.format) 4231 sl_copy[-1] = 77 4232 self.assertEqual(sl_copy[-1], 77) 4233 self.assertNotEqual(sl[-1], 77) 4234 sl_copy.shm.close() 4235 finally: 4236 sl_copy.shm.unlink() 4237 4238 # Obtain a second handle on the same ShareableList. 4239 sl_tethered = shared_memory.ShareableList(name=sl.shm.name) 4240 self.assertEqual(sl.shm.name, sl_tethered.shm.name) 4241 sl_tethered[-1] = 880 4242 self.assertEqual(sl[-1], 880) 4243 sl_tethered.shm.close() 4244 4245 sl.shm.close() 4246 4247 # Exercise creating an empty ShareableList. 4248 empty_sl = shared_memory.ShareableList() 4249 try: 4250 self.assertEqual(len(empty_sl), 0) 4251 self.assertEqual(empty_sl.format, '') 4252 self.assertEqual(empty_sl.count('any'), 0) 4253 with self.assertRaises(ValueError): 4254 empty_sl.index(None) 4255 empty_sl.shm.close() 4256 finally: 4257 empty_sl.shm.unlink() 4258 4259 def test_shared_memory_ShareableList_pickling(self): 4260 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 4261 with self.subTest(proto=proto): 4262 sl = shared_memory.ShareableList(range(10)) 4263 self.addCleanup(sl.shm.unlink) 4264 4265 serialized_sl = pickle.dumps(sl, protocol=proto) 4266 deserialized_sl = pickle.loads(serialized_sl) 4267 self.assertIsInstance( 4268 deserialized_sl, shared_memory.ShareableList) 4269 self.assertEqual(deserialized_sl[-1], 9) 4270 self.assertIsNot(sl, deserialized_sl) 4271 4272 deserialized_sl[4] = "changed" 4273 self.assertEqual(sl[4], "changed") 4274 sl[3] = "newvalue" 4275 self.assertEqual(deserialized_sl[3], "newvalue") 4276 4277 larger_sl = shared_memory.ShareableList(range(400)) 4278 self.addCleanup(larger_sl.shm.unlink) 4279 serialized_larger_sl = pickle.dumps(larger_sl, protocol=proto) 4280 self.assertEqual(len(serialized_sl), len(serialized_larger_sl)) 4281 larger_sl.shm.close() 4282 4283 deserialized_sl.shm.close() 4284 sl.shm.close() 4285 4286 def test_shared_memory_ShareableList_pickling_dead_object(self): 4287 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 4288 with self.subTest(proto=proto): 4289 sl = shared_memory.ShareableList(range(10)) 4290 serialized_sl = pickle.dumps(sl, protocol=proto) 4291 4292 # Now, we are going to kill the original object. 4293 # So, unpickled one won't be able to attach to it. 4294 sl.shm.close() 4295 sl.shm.unlink() 4296 4297 with self.assertRaises(FileNotFoundError): 4298 pickle.loads(serialized_sl) 4299 4300 def test_shared_memory_cleaned_after_process_termination(self): 4301 cmd = '''if 1: 4302 import os, time, sys 4303 from multiprocessing import shared_memory 4304 4305 # Create a shared_memory segment, and send the segment name 4306 sm = shared_memory.SharedMemory(create=True, size=10) 4307 sys.stdout.write(sm.name + '\\n') 4308 sys.stdout.flush() 4309 time.sleep(100) 4310 ''' 4311 with subprocess.Popen([sys.executable, '-E', '-c', cmd], 4312 stdout=subprocess.PIPE, 4313 stderr=subprocess.PIPE) as p: 4314 name = p.stdout.readline().strip().decode() 4315 4316 # killing abruptly processes holding reference to a shared memory 4317 # segment should not leak the given memory segment. 4318 p.terminate() 4319 p.wait() 4320 4321 deadline = time.monotonic() + support.LONG_TIMEOUT 4322 t = 0.1 4323 while time.monotonic() < deadline: 4324 time.sleep(t) 4325 t = min(t*2, 5) 4326 try: 4327 smm = shared_memory.SharedMemory(name, create=False) 4328 except FileNotFoundError: 4329 break 4330 else: 4331 raise AssertionError("A SharedMemory segment was leaked after" 4332 " a process was abruptly terminated.") 4333 4334 if os.name == 'posix': 4335 # Without this line it was raising warnings like: 4336 # UserWarning: resource_tracker: 4337 # There appear to be 1 leaked shared_memory 4338 # objects to clean up at shutdown 4339 # See: https://bugs.python.org/issue45209 4340 resource_tracker.unregister(f"/{name}", "shared_memory") 4341 4342 # A warning was emitted by the subprocess' own 4343 # resource_tracker (on Windows, shared memory segments 4344 # are released automatically by the OS). 4345 err = p.stderr.read().decode() 4346 self.assertIn( 4347 "resource_tracker: There appear to be 1 leaked " 4348 "shared_memory objects to clean up at shutdown", err) 4349 4350# 4351# Test to verify that `Finalize` works. 4352# 4353 4354class _TestFinalize(BaseTestCase): 4355 4356 ALLOWED_TYPES = ('processes',) 4357 4358 def setUp(self): 4359 self.registry_backup = util._finalizer_registry.copy() 4360 util._finalizer_registry.clear() 4361 4362 def tearDown(self): 4363 gc.collect() # For PyPy or other GCs. 4364 self.assertFalse(util._finalizer_registry) 4365 util._finalizer_registry.update(self.registry_backup) 4366 4367 @classmethod 4368 def _test_finalize(cls, conn): 4369 class Foo(object): 4370 pass 4371 4372 a = Foo() 4373 util.Finalize(a, conn.send, args=('a',)) 4374 del a # triggers callback for a 4375 gc.collect() # For PyPy or other GCs. 4376 4377 b = Foo() 4378 close_b = util.Finalize(b, conn.send, args=('b',)) 4379 close_b() # triggers callback for b 4380 close_b() # does nothing because callback has already been called 4381 del b # does nothing because callback has already been called 4382 gc.collect() # For PyPy or other GCs. 4383 4384 c = Foo() 4385 util.Finalize(c, conn.send, args=('c',)) 4386 4387 d10 = Foo() 4388 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1) 4389 4390 d01 = Foo() 4391 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0) 4392 d02 = Foo() 4393 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0) 4394 d03 = Foo() 4395 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0) 4396 4397 util.Finalize(None, conn.send, args=('e',), exitpriority=-10) 4398 4399 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100) 4400 4401 # call multiprocessing's cleanup function then exit process without 4402 # garbage collecting locals 4403 util._exit_function() 4404 conn.close() 4405 os._exit(0) 4406 4407 def test_finalize(self): 4408 conn, child_conn = self.Pipe() 4409 4410 p = self.Process(target=self._test_finalize, args=(child_conn,)) 4411 p.daemon = True 4412 p.start() 4413 p.join() 4414 4415 result = [obj for obj in iter(conn.recv, 'STOP')] 4416 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e']) 4417 4418 def test_thread_safety(self): 4419 # bpo-24484: _run_finalizers() should be thread-safe 4420 def cb(): 4421 pass 4422 4423 class Foo(object): 4424 def __init__(self): 4425 self.ref = self # create reference cycle 4426 # insert finalizer at random key 4427 util.Finalize(self, cb, exitpriority=random.randint(1, 100)) 4428 4429 finish = False 4430 exc = None 4431 4432 def run_finalizers(): 4433 nonlocal exc 4434 while not finish: 4435 time.sleep(random.random() * 1e-1) 4436 try: 4437 # A GC run will eventually happen during this, 4438 # collecting stale Foo's and mutating the registry 4439 util._run_finalizers() 4440 except Exception as e: 4441 exc = e 4442 4443 def make_finalizers(): 4444 nonlocal exc 4445 d = {} 4446 while not finish: 4447 try: 4448 # Old Foo's get gradually replaced and later 4449 # collected by the GC (because of the cyclic ref) 4450 d[random.getrandbits(5)] = {Foo() for i in range(10)} 4451 except Exception as e: 4452 exc = e 4453 d.clear() 4454 4455 old_interval = sys.getswitchinterval() 4456 old_threshold = gc.get_threshold() 4457 try: 4458 sys.setswitchinterval(1e-6) 4459 gc.set_threshold(5, 5, 5) 4460 threads = [threading.Thread(target=run_finalizers), 4461 threading.Thread(target=make_finalizers)] 4462 with threading_helper.start_threads(threads): 4463 time.sleep(4.0) # Wait a bit to trigger race condition 4464 finish = True 4465 if exc is not None: 4466 raise exc 4467 finally: 4468 sys.setswitchinterval(old_interval) 4469 gc.set_threshold(*old_threshold) 4470 gc.collect() # Collect remaining Foo's 4471 4472 4473# 4474# Test that from ... import * works for each module 4475# 4476 4477class _TestImportStar(unittest.TestCase): 4478 4479 def get_module_names(self): 4480 import glob 4481 folder = os.path.dirname(multiprocessing.__file__) 4482 pattern = os.path.join(glob.escape(folder), '*.py') 4483 files = glob.glob(pattern) 4484 modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files] 4485 modules = ['multiprocessing.' + m for m in modules] 4486 modules.remove('multiprocessing.__init__') 4487 modules.append('multiprocessing') 4488 return modules 4489 4490 def test_import(self): 4491 modules = self.get_module_names() 4492 if sys.platform == 'win32': 4493 modules.remove('multiprocessing.popen_fork') 4494 modules.remove('multiprocessing.popen_forkserver') 4495 modules.remove('multiprocessing.popen_spawn_posix') 4496 else: 4497 modules.remove('multiprocessing.popen_spawn_win32') 4498 if not HAS_REDUCTION: 4499 modules.remove('multiprocessing.popen_forkserver') 4500 4501 if c_int is None: 4502 # This module requires _ctypes 4503 modules.remove('multiprocessing.sharedctypes') 4504 4505 for name in modules: 4506 __import__(name) 4507 mod = sys.modules[name] 4508 self.assertTrue(hasattr(mod, '__all__'), name) 4509 4510 for attr in mod.__all__: 4511 self.assertTrue( 4512 hasattr(mod, attr), 4513 '%r does not have attribute %r' % (mod, attr) 4514 ) 4515 4516# 4517# Quick test that logging works -- does not test logging output 4518# 4519 4520class _TestLogging(BaseTestCase): 4521 4522 ALLOWED_TYPES = ('processes',) 4523 4524 def test_enable_logging(self): 4525 logger = multiprocessing.get_logger() 4526 logger.setLevel(util.SUBWARNING) 4527 self.assertTrue(logger is not None) 4528 logger.debug('this will not be printed') 4529 logger.info('nor will this') 4530 logger.setLevel(LOG_LEVEL) 4531 4532 @classmethod 4533 def _test_level(cls, conn): 4534 logger = multiprocessing.get_logger() 4535 conn.send(logger.getEffectiveLevel()) 4536 4537 def test_level(self): 4538 LEVEL1 = 32 4539 LEVEL2 = 37 4540 4541 logger = multiprocessing.get_logger() 4542 root_logger = logging.getLogger() 4543 root_level = root_logger.level 4544 4545 reader, writer = multiprocessing.Pipe(duplex=False) 4546 4547 logger.setLevel(LEVEL1) 4548 p = self.Process(target=self._test_level, args=(writer,)) 4549 p.start() 4550 self.assertEqual(LEVEL1, reader.recv()) 4551 p.join() 4552 p.close() 4553 4554 logger.setLevel(logging.NOTSET) 4555 root_logger.setLevel(LEVEL2) 4556 p = self.Process(target=self._test_level, args=(writer,)) 4557 p.start() 4558 self.assertEqual(LEVEL2, reader.recv()) 4559 p.join() 4560 p.close() 4561 4562 root_logger.setLevel(root_level) 4563 logger.setLevel(level=LOG_LEVEL) 4564 4565 4566# class _TestLoggingProcessName(BaseTestCase): 4567# 4568# def handle(self, record): 4569# assert record.processName == multiprocessing.current_process().name 4570# self.__handled = True 4571# 4572# def test_logging(self): 4573# handler = logging.Handler() 4574# handler.handle = self.handle 4575# self.__handled = False 4576# # Bypass getLogger() and side-effects 4577# logger = logging.getLoggerClass()( 4578# 'multiprocessing.test.TestLoggingProcessName') 4579# logger.addHandler(handler) 4580# logger.propagate = False 4581# 4582# logger.warn('foo') 4583# assert self.__handled 4584 4585# 4586# Check that Process.join() retries if os.waitpid() fails with EINTR 4587# 4588 4589class _TestPollEintr(BaseTestCase): 4590 4591 ALLOWED_TYPES = ('processes',) 4592 4593 @classmethod 4594 def _killer(cls, pid): 4595 time.sleep(0.1) 4596 os.kill(pid, signal.SIGUSR1) 4597 4598 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 4599 def test_poll_eintr(self): 4600 got_signal = [False] 4601 def record(*args): 4602 got_signal[0] = True 4603 pid = os.getpid() 4604 oldhandler = signal.signal(signal.SIGUSR1, record) 4605 try: 4606 killer = self.Process(target=self._killer, args=(pid,)) 4607 killer.start() 4608 try: 4609 p = self.Process(target=time.sleep, args=(2,)) 4610 p.start() 4611 p.join() 4612 finally: 4613 killer.join() 4614 self.assertTrue(got_signal[0]) 4615 self.assertEqual(p.exitcode, 0) 4616 finally: 4617 signal.signal(signal.SIGUSR1, oldhandler) 4618 4619# 4620# Test to verify handle verification, see issue 3321 4621# 4622 4623class TestInvalidHandle(unittest.TestCase): 4624 4625 @unittest.skipIf(WIN32, "skipped on Windows") 4626 def test_invalid_handles(self): 4627 conn = multiprocessing.connection.Connection(44977608) 4628 # check that poll() doesn't crash 4629 try: 4630 conn.poll() 4631 except (ValueError, OSError): 4632 pass 4633 finally: 4634 # Hack private attribute _handle to avoid printing an error 4635 # in conn.__del__ 4636 conn._handle = None 4637 self.assertRaises((ValueError, OSError), 4638 multiprocessing.connection.Connection, -1) 4639 4640 4641 4642@hashlib_helper.requires_hashdigest('md5') 4643class OtherTest(unittest.TestCase): 4644 # TODO: add more tests for deliver/answer challenge. 4645 def test_deliver_challenge_auth_failure(self): 4646 class _FakeConnection(object): 4647 def recv_bytes(self, size): 4648 return b'something bogus' 4649 def send_bytes(self, data): 4650 pass 4651 self.assertRaises(multiprocessing.AuthenticationError, 4652 multiprocessing.connection.deliver_challenge, 4653 _FakeConnection(), b'abc') 4654 4655 def test_answer_challenge_auth_failure(self): 4656 class _FakeConnection(object): 4657 def __init__(self): 4658 self.count = 0 4659 def recv_bytes(self, size): 4660 self.count += 1 4661 if self.count == 1: 4662 return multiprocessing.connection.CHALLENGE 4663 elif self.count == 2: 4664 return b'something bogus' 4665 return b'' 4666 def send_bytes(self, data): 4667 pass 4668 self.assertRaises(multiprocessing.AuthenticationError, 4669 multiprocessing.connection.answer_challenge, 4670 _FakeConnection(), b'abc') 4671 4672# 4673# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585 4674# 4675 4676def initializer(ns): 4677 ns.test += 1 4678 4679@hashlib_helper.requires_hashdigest('md5') 4680class TestInitializers(unittest.TestCase): 4681 def setUp(self): 4682 self.mgr = multiprocessing.Manager() 4683 self.ns = self.mgr.Namespace() 4684 self.ns.test = 0 4685 4686 def tearDown(self): 4687 self.mgr.shutdown() 4688 self.mgr.join() 4689 4690 def test_manager_initializer(self): 4691 m = multiprocessing.managers.SyncManager() 4692 self.assertRaises(TypeError, m.start, 1) 4693 m.start(initializer, (self.ns,)) 4694 self.assertEqual(self.ns.test, 1) 4695 m.shutdown() 4696 m.join() 4697 4698 def test_pool_initializer(self): 4699 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1) 4700 p = multiprocessing.Pool(1, initializer, (self.ns,)) 4701 p.close() 4702 p.join() 4703 self.assertEqual(self.ns.test, 1) 4704 4705# 4706# Issue 5155, 5313, 5331: Test process in processes 4707# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior 4708# 4709 4710def _this_sub_process(q): 4711 try: 4712 item = q.get(block=False) 4713 except pyqueue.Empty: 4714 pass 4715 4716def _test_process(): 4717 queue = multiprocessing.Queue() 4718 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,)) 4719 subProc.daemon = True 4720 subProc.start() 4721 subProc.join() 4722 4723def _afunc(x): 4724 return x*x 4725 4726def pool_in_process(): 4727 pool = multiprocessing.Pool(processes=4) 4728 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7]) 4729 pool.close() 4730 pool.join() 4731 4732class _file_like(object): 4733 def __init__(self, delegate): 4734 self._delegate = delegate 4735 self._pid = None 4736 4737 @property 4738 def cache(self): 4739 pid = os.getpid() 4740 # There are no race conditions since fork keeps only the running thread 4741 if pid != self._pid: 4742 self._pid = pid 4743 self._cache = [] 4744 return self._cache 4745 4746 def write(self, data): 4747 self.cache.append(data) 4748 4749 def flush(self): 4750 self._delegate.write(''.join(self.cache)) 4751 self._cache = [] 4752 4753class TestStdinBadfiledescriptor(unittest.TestCase): 4754 4755 def test_queue_in_process(self): 4756 proc = multiprocessing.Process(target=_test_process) 4757 proc.start() 4758 proc.join() 4759 4760 def test_pool_in_process(self): 4761 p = multiprocessing.Process(target=pool_in_process) 4762 p.start() 4763 p.join() 4764 4765 def test_flushing(self): 4766 sio = io.StringIO() 4767 flike = _file_like(sio) 4768 flike.write('foo') 4769 proc = multiprocessing.Process(target=lambda: flike.flush()) 4770 flike.flush() 4771 assert sio.getvalue() == 'foo' 4772 4773 4774class TestWait(unittest.TestCase): 4775 4776 @classmethod 4777 def _child_test_wait(cls, w, slow): 4778 for i in range(10): 4779 if slow: 4780 time.sleep(random.random()*0.1) 4781 w.send((i, os.getpid())) 4782 w.close() 4783 4784 def test_wait(self, slow=False): 4785 from multiprocessing.connection import wait 4786 readers = [] 4787 procs = [] 4788 messages = [] 4789 4790 for i in range(4): 4791 r, w = multiprocessing.Pipe(duplex=False) 4792 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow)) 4793 p.daemon = True 4794 p.start() 4795 w.close() 4796 readers.append(r) 4797 procs.append(p) 4798 self.addCleanup(p.join) 4799 4800 while readers: 4801 for r in wait(readers): 4802 try: 4803 msg = r.recv() 4804 except EOFError: 4805 readers.remove(r) 4806 r.close() 4807 else: 4808 messages.append(msg) 4809 4810 messages.sort() 4811 expected = sorted((i, p.pid) for i in range(10) for p in procs) 4812 self.assertEqual(messages, expected) 4813 4814 @classmethod 4815 def _child_test_wait_socket(cls, address, slow): 4816 s = socket.socket() 4817 s.connect(address) 4818 for i in range(10): 4819 if slow: 4820 time.sleep(random.random()*0.1) 4821 s.sendall(('%s\n' % i).encode('ascii')) 4822 s.close() 4823 4824 def test_wait_socket(self, slow=False): 4825 from multiprocessing.connection import wait 4826 l = socket.create_server((socket_helper.HOST, 0)) 4827 addr = l.getsockname() 4828 readers = [] 4829 procs = [] 4830 dic = {} 4831 4832 for i in range(4): 4833 p = multiprocessing.Process(target=self._child_test_wait_socket, 4834 args=(addr, slow)) 4835 p.daemon = True 4836 p.start() 4837 procs.append(p) 4838 self.addCleanup(p.join) 4839 4840 for i in range(4): 4841 r, _ = l.accept() 4842 readers.append(r) 4843 dic[r] = [] 4844 l.close() 4845 4846 while readers: 4847 for r in wait(readers): 4848 msg = r.recv(32) 4849 if not msg: 4850 readers.remove(r) 4851 r.close() 4852 else: 4853 dic[r].append(msg) 4854 4855 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii') 4856 for v in dic.values(): 4857 self.assertEqual(b''.join(v), expected) 4858 4859 def test_wait_slow(self): 4860 self.test_wait(True) 4861 4862 def test_wait_socket_slow(self): 4863 self.test_wait_socket(True) 4864 4865 def test_wait_timeout(self): 4866 from multiprocessing.connection import wait 4867 4868 expected = 5 4869 a, b = multiprocessing.Pipe() 4870 4871 start = time.monotonic() 4872 res = wait([a, b], expected) 4873 delta = time.monotonic() - start 4874 4875 self.assertEqual(res, []) 4876 self.assertLess(delta, expected * 2) 4877 self.assertGreater(delta, expected * 0.5) 4878 4879 b.send(None) 4880 4881 start = time.monotonic() 4882 res = wait([a, b], 20) 4883 delta = time.monotonic() - start 4884 4885 self.assertEqual(res, [a]) 4886 self.assertLess(delta, 0.4) 4887 4888 @classmethod 4889 def signal_and_sleep(cls, sem, period): 4890 sem.release() 4891 time.sleep(period) 4892 4893 def test_wait_integer(self): 4894 from multiprocessing.connection import wait 4895 4896 expected = 3 4897 sorted_ = lambda l: sorted(l, key=lambda x: id(x)) 4898 sem = multiprocessing.Semaphore(0) 4899 a, b = multiprocessing.Pipe() 4900 p = multiprocessing.Process(target=self.signal_and_sleep, 4901 args=(sem, expected)) 4902 4903 p.start() 4904 self.assertIsInstance(p.sentinel, int) 4905 self.assertTrue(sem.acquire(timeout=20)) 4906 4907 start = time.monotonic() 4908 res = wait([a, p.sentinel, b], expected + 20) 4909 delta = time.monotonic() - start 4910 4911 self.assertEqual(res, [p.sentinel]) 4912 self.assertLess(delta, expected + 2) 4913 self.assertGreater(delta, expected - 2) 4914 4915 a.send(None) 4916 4917 start = time.monotonic() 4918 res = wait([a, p.sentinel, b], 20) 4919 delta = time.monotonic() - start 4920 4921 self.assertEqual(sorted_(res), sorted_([p.sentinel, b])) 4922 self.assertLess(delta, 0.4) 4923 4924 b.send(None) 4925 4926 start = time.monotonic() 4927 res = wait([a, p.sentinel, b], 20) 4928 delta = time.monotonic() - start 4929 4930 self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b])) 4931 self.assertLess(delta, 0.4) 4932 4933 p.terminate() 4934 p.join() 4935 4936 def test_neg_timeout(self): 4937 from multiprocessing.connection import wait 4938 a, b = multiprocessing.Pipe() 4939 t = time.monotonic() 4940 res = wait([a], timeout=-1) 4941 t = time.monotonic() - t 4942 self.assertEqual(res, []) 4943 self.assertLess(t, 1) 4944 a.close() 4945 b.close() 4946 4947# 4948# Issue 14151: Test invalid family on invalid environment 4949# 4950 4951class TestInvalidFamily(unittest.TestCase): 4952 4953 @unittest.skipIf(WIN32, "skipped on Windows") 4954 def test_invalid_family(self): 4955 with self.assertRaises(ValueError): 4956 multiprocessing.connection.Listener(r'\\.\test') 4957 4958 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms") 4959 def test_invalid_family_win32(self): 4960 with self.assertRaises(ValueError): 4961 multiprocessing.connection.Listener('/var/test.pipe') 4962 4963# 4964# Issue 12098: check sys.flags of child matches that for parent 4965# 4966 4967class TestFlags(unittest.TestCase): 4968 @classmethod 4969 def run_in_grandchild(cls, conn): 4970 conn.send(tuple(sys.flags)) 4971 4972 @classmethod 4973 def run_in_child(cls): 4974 import json 4975 r, w = multiprocessing.Pipe(duplex=False) 4976 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,)) 4977 p.start() 4978 grandchild_flags = r.recv() 4979 p.join() 4980 r.close() 4981 w.close() 4982 flags = (tuple(sys.flags), grandchild_flags) 4983 print(json.dumps(flags)) 4984 4985 def test_flags(self): 4986 import json 4987 # start child process using unusual flags 4988 prog = ('from test._test_multiprocessing import TestFlags; ' + 4989 'TestFlags.run_in_child()') 4990 data = subprocess.check_output( 4991 [sys.executable, '-E', '-S', '-O', '-c', prog]) 4992 child_flags, grandchild_flags = json.loads(data.decode('ascii')) 4993 self.assertEqual(child_flags, grandchild_flags) 4994 4995# 4996# Test interaction with socket timeouts - see Issue #6056 4997# 4998 4999class TestTimeouts(unittest.TestCase): 5000 @classmethod 5001 def _test_timeout(cls, child, address): 5002 time.sleep(1) 5003 child.send(123) 5004 child.close() 5005 conn = multiprocessing.connection.Client(address) 5006 conn.send(456) 5007 conn.close() 5008 5009 def test_timeout(self): 5010 old_timeout = socket.getdefaulttimeout() 5011 try: 5012 socket.setdefaulttimeout(0.1) 5013 parent, child = multiprocessing.Pipe(duplex=True) 5014 l = multiprocessing.connection.Listener(family='AF_INET') 5015 p = multiprocessing.Process(target=self._test_timeout, 5016 args=(child, l.address)) 5017 p.start() 5018 child.close() 5019 self.assertEqual(parent.recv(), 123) 5020 parent.close() 5021 conn = l.accept() 5022 self.assertEqual(conn.recv(), 456) 5023 conn.close() 5024 l.close() 5025 join_process(p) 5026 finally: 5027 socket.setdefaulttimeout(old_timeout) 5028 5029# 5030# Test what happens with no "if __name__ == '__main__'" 5031# 5032 5033class TestNoForkBomb(unittest.TestCase): 5034 def test_noforkbomb(self): 5035 sm = multiprocessing.get_start_method() 5036 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py') 5037 if sm != 'fork': 5038 rc, out, err = test.support.script_helper.assert_python_failure(name, sm) 5039 self.assertEqual(out, b'') 5040 self.assertIn(b'RuntimeError', err) 5041 else: 5042 rc, out, err = test.support.script_helper.assert_python_ok(name, sm) 5043 self.assertEqual(out.rstrip(), b'123') 5044 self.assertEqual(err, b'') 5045 5046# 5047# Issue #17555: ForkAwareThreadLock 5048# 5049 5050class TestForkAwareThreadLock(unittest.TestCase): 5051 # We recursively start processes. Issue #17555 meant that the 5052 # after fork registry would get duplicate entries for the same 5053 # lock. The size of the registry at generation n was ~2**n. 5054 5055 @classmethod 5056 def child(cls, n, conn): 5057 if n > 1: 5058 p = multiprocessing.Process(target=cls.child, args=(n-1, conn)) 5059 p.start() 5060 conn.close() 5061 join_process(p) 5062 else: 5063 conn.send(len(util._afterfork_registry)) 5064 conn.close() 5065 5066 def test_lock(self): 5067 r, w = multiprocessing.Pipe(False) 5068 l = util.ForkAwareThreadLock() 5069 old_size = len(util._afterfork_registry) 5070 p = multiprocessing.Process(target=self.child, args=(5, w)) 5071 p.start() 5072 w.close() 5073 new_size = r.recv() 5074 join_process(p) 5075 self.assertLessEqual(new_size, old_size) 5076 5077# 5078# Check that non-forked child processes do not inherit unneeded fds/handles 5079# 5080 5081class TestCloseFds(unittest.TestCase): 5082 5083 def get_high_socket_fd(self): 5084 if WIN32: 5085 # The child process will not have any socket handles, so 5086 # calling socket.fromfd() should produce WSAENOTSOCK even 5087 # if there is a handle of the same number. 5088 return socket.socket().detach() 5089 else: 5090 # We want to produce a socket with an fd high enough that a 5091 # freshly created child process will not have any fds as high. 5092 fd = socket.socket().detach() 5093 to_close = [] 5094 while fd < 50: 5095 to_close.append(fd) 5096 fd = os.dup(fd) 5097 for x in to_close: 5098 os.close(x) 5099 return fd 5100 5101 def close(self, fd): 5102 if WIN32: 5103 socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd).close() 5104 else: 5105 os.close(fd) 5106 5107 @classmethod 5108 def _test_closefds(cls, conn, fd): 5109 try: 5110 s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) 5111 except Exception as e: 5112 conn.send(e) 5113 else: 5114 s.close() 5115 conn.send(None) 5116 5117 def test_closefd(self): 5118 if not HAS_REDUCTION: 5119 raise unittest.SkipTest('requires fd pickling') 5120 5121 reader, writer = multiprocessing.Pipe() 5122 fd = self.get_high_socket_fd() 5123 try: 5124 p = multiprocessing.Process(target=self._test_closefds, 5125 args=(writer, fd)) 5126 p.start() 5127 writer.close() 5128 e = reader.recv() 5129 join_process(p) 5130 finally: 5131 self.close(fd) 5132 writer.close() 5133 reader.close() 5134 5135 if multiprocessing.get_start_method() == 'fork': 5136 self.assertIs(e, None) 5137 else: 5138 WSAENOTSOCK = 10038 5139 self.assertIsInstance(e, OSError) 5140 self.assertTrue(e.errno == errno.EBADF or 5141 e.winerror == WSAENOTSOCK, e) 5142 5143# 5144# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc 5145# 5146 5147class TestIgnoreEINTR(unittest.TestCase): 5148 5149 # Sending CONN_MAX_SIZE bytes into a multiprocessing pipe must block 5150 CONN_MAX_SIZE = max(support.PIPE_MAX_SIZE, support.SOCK_MAX_SIZE) 5151 5152 @classmethod 5153 def _test_ignore(cls, conn): 5154 def handler(signum, frame): 5155 pass 5156 signal.signal(signal.SIGUSR1, handler) 5157 conn.send('ready') 5158 x = conn.recv() 5159 conn.send(x) 5160 conn.send_bytes(b'x' * cls.CONN_MAX_SIZE) 5161 5162 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 5163 def test_ignore(self): 5164 conn, child_conn = multiprocessing.Pipe() 5165 try: 5166 p = multiprocessing.Process(target=self._test_ignore, 5167 args=(child_conn,)) 5168 p.daemon = True 5169 p.start() 5170 child_conn.close() 5171 self.assertEqual(conn.recv(), 'ready') 5172 time.sleep(0.1) 5173 os.kill(p.pid, signal.SIGUSR1) 5174 time.sleep(0.1) 5175 conn.send(1234) 5176 self.assertEqual(conn.recv(), 1234) 5177 time.sleep(0.1) 5178 os.kill(p.pid, signal.SIGUSR1) 5179 self.assertEqual(conn.recv_bytes(), b'x' * self.CONN_MAX_SIZE) 5180 time.sleep(0.1) 5181 p.join() 5182 finally: 5183 conn.close() 5184 5185 @classmethod 5186 def _test_ignore_listener(cls, conn): 5187 def handler(signum, frame): 5188 pass 5189 signal.signal(signal.SIGUSR1, handler) 5190 with multiprocessing.connection.Listener() as l: 5191 conn.send(l.address) 5192 a = l.accept() 5193 a.send('welcome') 5194 5195 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 5196 def test_ignore_listener(self): 5197 conn, child_conn = multiprocessing.Pipe() 5198 try: 5199 p = multiprocessing.Process(target=self._test_ignore_listener, 5200 args=(child_conn,)) 5201 p.daemon = True 5202 p.start() 5203 child_conn.close() 5204 address = conn.recv() 5205 time.sleep(0.1) 5206 os.kill(p.pid, signal.SIGUSR1) 5207 time.sleep(0.1) 5208 client = multiprocessing.connection.Client(address) 5209 self.assertEqual(client.recv(), 'welcome') 5210 p.join() 5211 finally: 5212 conn.close() 5213 5214class TestStartMethod(unittest.TestCase): 5215 @classmethod 5216 def _check_context(cls, conn): 5217 conn.send(multiprocessing.get_start_method()) 5218 5219 def check_context(self, ctx): 5220 r, w = ctx.Pipe(duplex=False) 5221 p = ctx.Process(target=self._check_context, args=(w,)) 5222 p.start() 5223 w.close() 5224 child_method = r.recv() 5225 r.close() 5226 p.join() 5227 self.assertEqual(child_method, ctx.get_start_method()) 5228 5229 def test_context(self): 5230 for method in ('fork', 'spawn', 'forkserver'): 5231 try: 5232 ctx = multiprocessing.get_context(method) 5233 except ValueError: 5234 continue 5235 self.assertEqual(ctx.get_start_method(), method) 5236 self.assertIs(ctx.get_context(), ctx) 5237 self.assertRaises(ValueError, ctx.set_start_method, 'spawn') 5238 self.assertRaises(ValueError, ctx.set_start_method, None) 5239 self.check_context(ctx) 5240 5241 def test_set_get(self): 5242 multiprocessing.set_forkserver_preload(PRELOAD) 5243 count = 0 5244 old_method = multiprocessing.get_start_method() 5245 try: 5246 for method in ('fork', 'spawn', 'forkserver'): 5247 try: 5248 multiprocessing.set_start_method(method, force=True) 5249 except ValueError: 5250 continue 5251 self.assertEqual(multiprocessing.get_start_method(), method) 5252 ctx = multiprocessing.get_context() 5253 self.assertEqual(ctx.get_start_method(), method) 5254 self.assertTrue(type(ctx).__name__.lower().startswith(method)) 5255 self.assertTrue( 5256 ctx.Process.__name__.lower().startswith(method)) 5257 self.check_context(multiprocessing) 5258 count += 1 5259 finally: 5260 multiprocessing.set_start_method(old_method, force=True) 5261 self.assertGreaterEqual(count, 1) 5262 5263 def test_get_all(self): 5264 methods = multiprocessing.get_all_start_methods() 5265 if sys.platform == 'win32': 5266 self.assertEqual(methods, ['spawn']) 5267 else: 5268 self.assertTrue(methods == ['fork', 'spawn'] or 5269 methods == ['spawn', 'fork'] or 5270 methods == ['fork', 'spawn', 'forkserver'] or 5271 methods == ['spawn', 'fork', 'forkserver']) 5272 5273 def test_preload_resources(self): 5274 if multiprocessing.get_start_method() != 'forkserver': 5275 self.skipTest("test only relevant for 'forkserver' method") 5276 name = os.path.join(os.path.dirname(__file__), 'mp_preload.py') 5277 rc, out, err = test.support.script_helper.assert_python_ok(name) 5278 out = out.decode() 5279 err = err.decode() 5280 if out.rstrip() != 'ok' or err != '': 5281 print(out) 5282 print(err) 5283 self.fail("failed spawning forkserver or grandchild") 5284 5285 5286@unittest.skipIf(sys.platform == "win32", 5287 "test semantics don't make sense on Windows") 5288class TestResourceTracker(unittest.TestCase): 5289 5290 def test_resource_tracker(self): 5291 # 5292 # Check that killing process does not leak named semaphores 5293 # 5294 cmd = '''if 1: 5295 import time, os, tempfile 5296 import multiprocessing as mp 5297 from multiprocessing import resource_tracker 5298 from multiprocessing.shared_memory import SharedMemory 5299 5300 mp.set_start_method("spawn") 5301 rand = tempfile._RandomNameSequence() 5302 5303 5304 def create_and_register_resource(rtype): 5305 if rtype == "semaphore": 5306 lock = mp.Lock() 5307 return lock, lock._semlock.name 5308 elif rtype == "shared_memory": 5309 sm = SharedMemory(create=True, size=10) 5310 return sm, sm._name 5311 else: 5312 raise ValueError( 5313 "Resource type {{}} not understood".format(rtype)) 5314 5315 5316 resource1, rname1 = create_and_register_resource("{rtype}") 5317 resource2, rname2 = create_and_register_resource("{rtype}") 5318 5319 os.write({w}, rname1.encode("ascii") + b"\\n") 5320 os.write({w}, rname2.encode("ascii") + b"\\n") 5321 5322 time.sleep(10) 5323 ''' 5324 for rtype in resource_tracker._CLEANUP_FUNCS: 5325 with self.subTest(rtype=rtype): 5326 if rtype == "noop": 5327 # Artefact resource type used by the resource_tracker 5328 continue 5329 r, w = os.pipe() 5330 p = subprocess.Popen([sys.executable, 5331 '-E', '-c', cmd.format(w=w, rtype=rtype)], 5332 pass_fds=[w], 5333 stderr=subprocess.PIPE) 5334 os.close(w) 5335 with open(r, 'rb', closefd=True) as f: 5336 name1 = f.readline().rstrip().decode('ascii') 5337 name2 = f.readline().rstrip().decode('ascii') 5338 _resource_unlink(name1, rtype) 5339 p.terminate() 5340 p.wait() 5341 5342 deadline = time.monotonic() + support.LONG_TIMEOUT 5343 while time.monotonic() < deadline: 5344 time.sleep(.5) 5345 try: 5346 _resource_unlink(name2, rtype) 5347 except OSError as e: 5348 # docs say it should be ENOENT, but OSX seems to give 5349 # EINVAL 5350 self.assertIn(e.errno, (errno.ENOENT, errno.EINVAL)) 5351 break 5352 else: 5353 raise AssertionError( 5354 f"A {rtype} resource was leaked after a process was " 5355 f"abruptly terminated.") 5356 err = p.stderr.read().decode('utf-8') 5357 p.stderr.close() 5358 expected = ('resource_tracker: There appear to be 2 leaked {} ' 5359 'objects'.format( 5360 rtype)) 5361 self.assertRegex(err, expected) 5362 self.assertRegex(err, r'resource_tracker: %r: \[Errno' % name1) 5363 5364 def check_resource_tracker_death(self, signum, should_die): 5365 # bpo-31310: if the semaphore tracker process has died, it should 5366 # be restarted implicitly. 5367 from multiprocessing.resource_tracker import _resource_tracker 5368 pid = _resource_tracker._pid 5369 if pid is not None: 5370 os.kill(pid, signal.SIGKILL) 5371 support.wait_process(pid, exitcode=-signal.SIGKILL) 5372 with warnings.catch_warnings(): 5373 warnings.simplefilter("ignore") 5374 _resource_tracker.ensure_running() 5375 pid = _resource_tracker._pid 5376 5377 os.kill(pid, signum) 5378 time.sleep(1.0) # give it time to die 5379 5380 ctx = multiprocessing.get_context("spawn") 5381 with warnings.catch_warnings(record=True) as all_warn: 5382 warnings.simplefilter("always") 5383 sem = ctx.Semaphore() 5384 sem.acquire() 5385 sem.release() 5386 wr = weakref.ref(sem) 5387 # ensure `sem` gets collected, which triggers communication with 5388 # the semaphore tracker 5389 del sem 5390 gc.collect() 5391 self.assertIsNone(wr()) 5392 if should_die: 5393 self.assertEqual(len(all_warn), 1) 5394 the_warn = all_warn[0] 5395 self.assertTrue(issubclass(the_warn.category, UserWarning)) 5396 self.assertTrue("resource_tracker: process died" 5397 in str(the_warn.message)) 5398 else: 5399 self.assertEqual(len(all_warn), 0) 5400 5401 def test_resource_tracker_sigint(self): 5402 # Catchable signal (ignored by semaphore tracker) 5403 self.check_resource_tracker_death(signal.SIGINT, False) 5404 5405 def test_resource_tracker_sigterm(self): 5406 # Catchable signal (ignored by semaphore tracker) 5407 self.check_resource_tracker_death(signal.SIGTERM, False) 5408 5409 def test_resource_tracker_sigkill(self): 5410 # Uncatchable signal. 5411 self.check_resource_tracker_death(signal.SIGKILL, True) 5412 5413 @staticmethod 5414 def _is_resource_tracker_reused(conn, pid): 5415 from multiprocessing.resource_tracker import _resource_tracker 5416 _resource_tracker.ensure_running() 5417 # The pid should be None in the child process, expect for the fork 5418 # context. It should not be a new value. 5419 reused = _resource_tracker._pid in (None, pid) 5420 reused &= _resource_tracker._check_alive() 5421 conn.send(reused) 5422 5423 def test_resource_tracker_reused(self): 5424 from multiprocessing.resource_tracker import _resource_tracker 5425 _resource_tracker.ensure_running() 5426 pid = _resource_tracker._pid 5427 5428 r, w = multiprocessing.Pipe(duplex=False) 5429 p = multiprocessing.Process(target=self._is_resource_tracker_reused, 5430 args=(w, pid)) 5431 p.start() 5432 is_resource_tracker_reused = r.recv() 5433 5434 # Clean up 5435 p.join() 5436 w.close() 5437 r.close() 5438 5439 self.assertTrue(is_resource_tracker_reused) 5440 5441 def test_too_long_name_resource(self): 5442 # gh-96819: Resource names that will make the length of a write to a pipe 5443 # greater than PIPE_BUF are not allowed 5444 rtype = "shared_memory" 5445 too_long_name_resource = "a" * (512 - len(rtype)) 5446 with self.assertRaises(ValueError): 5447 resource_tracker.register(too_long_name_resource, rtype) 5448 5449 5450class TestSimpleQueue(unittest.TestCase): 5451 5452 @classmethod 5453 def _test_empty(cls, queue, child_can_start, parent_can_continue): 5454 child_can_start.wait() 5455 # issue 30301, could fail under spawn and forkserver 5456 try: 5457 queue.put(queue.empty()) 5458 queue.put(queue.empty()) 5459 finally: 5460 parent_can_continue.set() 5461 5462 def test_empty(self): 5463 queue = multiprocessing.SimpleQueue() 5464 child_can_start = multiprocessing.Event() 5465 parent_can_continue = multiprocessing.Event() 5466 5467 proc = multiprocessing.Process( 5468 target=self._test_empty, 5469 args=(queue, child_can_start, parent_can_continue) 5470 ) 5471 proc.daemon = True 5472 proc.start() 5473 5474 self.assertTrue(queue.empty()) 5475 5476 child_can_start.set() 5477 parent_can_continue.wait() 5478 5479 self.assertFalse(queue.empty()) 5480 self.assertEqual(queue.get(), True) 5481 self.assertEqual(queue.get(), False) 5482 self.assertTrue(queue.empty()) 5483 5484 proc.join() 5485 5486 def test_close(self): 5487 queue = multiprocessing.SimpleQueue() 5488 queue.close() 5489 # closing a queue twice should not fail 5490 queue.close() 5491 5492 # Test specific to CPython since it tests private attributes 5493 @test.support.cpython_only 5494 def test_closed(self): 5495 queue = multiprocessing.SimpleQueue() 5496 queue.close() 5497 self.assertTrue(queue._reader.closed) 5498 self.assertTrue(queue._writer.closed) 5499 5500 5501class TestPoolNotLeakOnFailure(unittest.TestCase): 5502 5503 def test_release_unused_processes(self): 5504 # Issue #19675: During pool creation, if we can't create a process, 5505 # don't leak already created ones. 5506 will_fail_in = 3 5507 forked_processes = [] 5508 5509 class FailingForkProcess: 5510 def __init__(self, **kwargs): 5511 self.name = 'Fake Process' 5512 self.exitcode = None 5513 self.state = None 5514 forked_processes.append(self) 5515 5516 def start(self): 5517 nonlocal will_fail_in 5518 if will_fail_in <= 0: 5519 raise OSError("Manually induced OSError") 5520 will_fail_in -= 1 5521 self.state = 'started' 5522 5523 def terminate(self): 5524 self.state = 'stopping' 5525 5526 def join(self): 5527 if self.state == 'stopping': 5528 self.state = 'stopped' 5529 5530 def is_alive(self): 5531 return self.state == 'started' or self.state == 'stopping' 5532 5533 with self.assertRaisesRegex(OSError, 'Manually induced OSError'): 5534 p = multiprocessing.pool.Pool(5, context=unittest.mock.MagicMock( 5535 Process=FailingForkProcess)) 5536 p.close() 5537 p.join() 5538 self.assertFalse( 5539 any(process.is_alive() for process in forked_processes)) 5540 5541 5542@hashlib_helper.requires_hashdigest('md5') 5543class TestSyncManagerTypes(unittest.TestCase): 5544 """Test all the types which can be shared between a parent and a 5545 child process by using a manager which acts as an intermediary 5546 between them. 5547 5548 In the following unit-tests the base type is created in the parent 5549 process, the @classmethod represents the worker process and the 5550 shared object is readable and editable between the two. 5551 5552 # The child. 5553 @classmethod 5554 def _test_list(cls, obj): 5555 assert obj[0] == 5 5556 assert obj.append(6) 5557 5558 # The parent. 5559 def test_list(self): 5560 o = self.manager.list() 5561 o.append(5) 5562 self.run_worker(self._test_list, o) 5563 assert o[1] == 6 5564 """ 5565 manager_class = multiprocessing.managers.SyncManager 5566 5567 def setUp(self): 5568 self.manager = self.manager_class() 5569 self.manager.start() 5570 self.proc = None 5571 5572 def tearDown(self): 5573 if self.proc is not None and self.proc.is_alive(): 5574 self.proc.terminate() 5575 self.proc.join() 5576 self.manager.shutdown() 5577 self.manager = None 5578 self.proc = None 5579 5580 @classmethod 5581 def setUpClass(cls): 5582 support.reap_children() 5583 5584 tearDownClass = setUpClass 5585 5586 def wait_proc_exit(self): 5587 # Only the manager process should be returned by active_children() 5588 # but this can take a bit on slow machines, so wait a few seconds 5589 # if there are other children too (see #17395). 5590 join_process(self.proc) 5591 start_time = time.monotonic() 5592 t = 0.01 5593 while len(multiprocessing.active_children()) > 1: 5594 time.sleep(t) 5595 t *= 2 5596 dt = time.monotonic() - start_time 5597 if dt >= 5.0: 5598 test.support.environment_altered = True 5599 support.print_warning(f"multiprocessing.Manager still has " 5600 f"{multiprocessing.active_children()} " 5601 f"active children after {dt} seconds") 5602 break 5603 5604 def run_worker(self, worker, obj): 5605 self.proc = multiprocessing.Process(target=worker, args=(obj, )) 5606 self.proc.daemon = True 5607 self.proc.start() 5608 self.wait_proc_exit() 5609 self.assertEqual(self.proc.exitcode, 0) 5610 5611 @classmethod 5612 def _test_event(cls, obj): 5613 assert obj.is_set() 5614 obj.wait() 5615 obj.clear() 5616 obj.wait(0.001) 5617 5618 def test_event(self): 5619 o = self.manager.Event() 5620 o.set() 5621 self.run_worker(self._test_event, o) 5622 assert not o.is_set() 5623 o.wait(0.001) 5624 5625 @classmethod 5626 def _test_lock(cls, obj): 5627 obj.acquire() 5628 5629 def test_lock(self, lname="Lock"): 5630 o = getattr(self.manager, lname)() 5631 self.run_worker(self._test_lock, o) 5632 o.release() 5633 self.assertRaises(RuntimeError, o.release) # already released 5634 5635 @classmethod 5636 def _test_rlock(cls, obj): 5637 obj.acquire() 5638 obj.release() 5639 5640 def test_rlock(self, lname="Lock"): 5641 o = getattr(self.manager, lname)() 5642 self.run_worker(self._test_rlock, o) 5643 5644 @classmethod 5645 def _test_semaphore(cls, obj): 5646 obj.acquire() 5647 5648 def test_semaphore(self, sname="Semaphore"): 5649 o = getattr(self.manager, sname)() 5650 self.run_worker(self._test_semaphore, o) 5651 o.release() 5652 5653 def test_bounded_semaphore(self): 5654 self.test_semaphore(sname="BoundedSemaphore") 5655 5656 @classmethod 5657 def _test_condition(cls, obj): 5658 obj.acquire() 5659 obj.release() 5660 5661 def test_condition(self): 5662 o = self.manager.Condition() 5663 self.run_worker(self._test_condition, o) 5664 5665 @classmethod 5666 def _test_barrier(cls, obj): 5667 assert obj.parties == 5 5668 obj.reset() 5669 5670 def test_barrier(self): 5671 o = self.manager.Barrier(5) 5672 self.run_worker(self._test_barrier, o) 5673 5674 @classmethod 5675 def _test_pool(cls, obj): 5676 # TODO: fix https://bugs.python.org/issue35919 5677 with obj: 5678 pass 5679 5680 def test_pool(self): 5681 o = self.manager.Pool(processes=4) 5682 self.run_worker(self._test_pool, o) 5683 5684 @classmethod 5685 def _test_queue(cls, obj): 5686 assert obj.qsize() == 2 5687 assert obj.full() 5688 assert not obj.empty() 5689 assert obj.get() == 5 5690 assert not obj.empty() 5691 assert obj.get() == 6 5692 assert obj.empty() 5693 5694 def test_queue(self, qname="Queue"): 5695 o = getattr(self.manager, qname)(2) 5696 o.put(5) 5697 o.put(6) 5698 self.run_worker(self._test_queue, o) 5699 assert o.empty() 5700 assert not o.full() 5701 5702 def test_joinable_queue(self): 5703 self.test_queue("JoinableQueue") 5704 5705 @classmethod 5706 def _test_list(cls, obj): 5707 assert obj[0] == 5 5708 assert obj.count(5) == 1 5709 assert obj.index(5) == 0 5710 obj.sort() 5711 obj.reverse() 5712 for x in obj: 5713 pass 5714 assert len(obj) == 1 5715 assert obj.pop(0) == 5 5716 5717 def test_list(self): 5718 o = self.manager.list() 5719 o.append(5) 5720 self.run_worker(self._test_list, o) 5721 assert not o 5722 self.assertEqual(len(o), 0) 5723 5724 @classmethod 5725 def _test_dict(cls, obj): 5726 assert len(obj) == 1 5727 assert obj['foo'] == 5 5728 assert obj.get('foo') == 5 5729 assert list(obj.items()) == [('foo', 5)] 5730 assert list(obj.keys()) == ['foo'] 5731 assert list(obj.values()) == [5] 5732 assert obj.copy() == {'foo': 5} 5733 assert obj.popitem() == ('foo', 5) 5734 5735 def test_dict(self): 5736 o = self.manager.dict() 5737 o['foo'] = 5 5738 self.run_worker(self._test_dict, o) 5739 assert not o 5740 self.assertEqual(len(o), 0) 5741 5742 @classmethod 5743 def _test_value(cls, obj): 5744 assert obj.value == 1 5745 assert obj.get() == 1 5746 obj.set(2) 5747 5748 def test_value(self): 5749 o = self.manager.Value('i', 1) 5750 self.run_worker(self._test_value, o) 5751 self.assertEqual(o.value, 2) 5752 self.assertEqual(o.get(), 2) 5753 5754 @classmethod 5755 def _test_array(cls, obj): 5756 assert obj[0] == 0 5757 assert obj[1] == 1 5758 assert len(obj) == 2 5759 assert list(obj) == [0, 1] 5760 5761 def test_array(self): 5762 o = self.manager.Array('i', [0, 1]) 5763 self.run_worker(self._test_array, o) 5764 5765 @classmethod 5766 def _test_namespace(cls, obj): 5767 assert obj.x == 0 5768 assert obj.y == 1 5769 5770 def test_namespace(self): 5771 o = self.manager.Namespace() 5772 o.x = 0 5773 o.y = 1 5774 self.run_worker(self._test_namespace, o) 5775 5776 5777class TestNamedResource(unittest.TestCase): 5778 def test_global_named_resource_spawn(self): 5779 # 5780 # gh-90549: Check that global named resources in main module 5781 # will not leak by a subprocess, in spawn context. 5782 # 5783 testfn = os_helper.TESTFN 5784 self.addCleanup(os_helper.unlink, testfn) 5785 with open(testfn, 'w', encoding='utf-8') as f: 5786 f.write(textwrap.dedent('''\ 5787 import multiprocessing as mp 5788 5789 ctx = mp.get_context('spawn') 5790 5791 global_resource = ctx.Semaphore() 5792 5793 def submain(): pass 5794 5795 if __name__ == '__main__': 5796 p = ctx.Process(target=submain) 5797 p.start() 5798 p.join() 5799 ''')) 5800 rc, out, err = test.support.script_helper.assert_python_ok(testfn) 5801 # on error, err = 'UserWarning: resource_tracker: There appear to 5802 # be 1 leaked semaphore objects to clean up at shutdown' 5803 self.assertEqual(err, b'') 5804 5805 5806class MiscTestCase(unittest.TestCase): 5807 def test__all__(self): 5808 # Just make sure names in not_exported are excluded 5809 support.check__all__(self, multiprocessing, extra=multiprocessing.__all__, 5810 not_exported=['SUBDEBUG', 'SUBWARNING']) 5811 5812 5813# 5814# Mixins 5815# 5816 5817class BaseMixin(object): 5818 @classmethod 5819 def setUpClass(cls): 5820 cls.dangling = (multiprocessing.process._dangling.copy(), 5821 threading._dangling.copy()) 5822 5823 @classmethod 5824 def tearDownClass(cls): 5825 # bpo-26762: Some multiprocessing objects like Pool create reference 5826 # cycles. Trigger a garbage collection to break these cycles. 5827 test.support.gc_collect() 5828 5829 processes = set(multiprocessing.process._dangling) - set(cls.dangling[0]) 5830 if processes: 5831 test.support.environment_altered = True 5832 support.print_warning(f'Dangling processes: {processes}') 5833 processes = None 5834 5835 threads = set(threading._dangling) - set(cls.dangling[1]) 5836 if threads: 5837 test.support.environment_altered = True 5838 support.print_warning(f'Dangling threads: {threads}') 5839 threads = None 5840 5841 5842class ProcessesMixin(BaseMixin): 5843 TYPE = 'processes' 5844 Process = multiprocessing.Process 5845 connection = multiprocessing.connection 5846 current_process = staticmethod(multiprocessing.current_process) 5847 parent_process = staticmethod(multiprocessing.parent_process) 5848 active_children = staticmethod(multiprocessing.active_children) 5849 set_executable = staticmethod(multiprocessing.set_executable) 5850 Pool = staticmethod(multiprocessing.Pool) 5851 Pipe = staticmethod(multiprocessing.Pipe) 5852 Queue = staticmethod(multiprocessing.Queue) 5853 JoinableQueue = staticmethod(multiprocessing.JoinableQueue) 5854 Lock = staticmethod(multiprocessing.Lock) 5855 RLock = staticmethod(multiprocessing.RLock) 5856 Semaphore = staticmethod(multiprocessing.Semaphore) 5857 BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore) 5858 Condition = staticmethod(multiprocessing.Condition) 5859 Event = staticmethod(multiprocessing.Event) 5860 Barrier = staticmethod(multiprocessing.Barrier) 5861 Value = staticmethod(multiprocessing.Value) 5862 Array = staticmethod(multiprocessing.Array) 5863 RawValue = staticmethod(multiprocessing.RawValue) 5864 RawArray = staticmethod(multiprocessing.RawArray) 5865 5866 5867class ManagerMixin(BaseMixin): 5868 TYPE = 'manager' 5869 Process = multiprocessing.Process 5870 Queue = property(operator.attrgetter('manager.Queue')) 5871 JoinableQueue = property(operator.attrgetter('manager.JoinableQueue')) 5872 Lock = property(operator.attrgetter('manager.Lock')) 5873 RLock = property(operator.attrgetter('manager.RLock')) 5874 Semaphore = property(operator.attrgetter('manager.Semaphore')) 5875 BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore')) 5876 Condition = property(operator.attrgetter('manager.Condition')) 5877 Event = property(operator.attrgetter('manager.Event')) 5878 Barrier = property(operator.attrgetter('manager.Barrier')) 5879 Value = property(operator.attrgetter('manager.Value')) 5880 Array = property(operator.attrgetter('manager.Array')) 5881 list = property(operator.attrgetter('manager.list')) 5882 dict = property(operator.attrgetter('manager.dict')) 5883 Namespace = property(operator.attrgetter('manager.Namespace')) 5884 5885 @classmethod 5886 def Pool(cls, *args, **kwds): 5887 return cls.manager.Pool(*args, **kwds) 5888 5889 @classmethod 5890 def setUpClass(cls): 5891 super().setUpClass() 5892 cls.manager = multiprocessing.Manager() 5893 5894 @classmethod 5895 def tearDownClass(cls): 5896 # only the manager process should be returned by active_children() 5897 # but this can take a bit on slow machines, so wait a few seconds 5898 # if there are other children too (see #17395) 5899 start_time = time.monotonic() 5900 t = 0.01 5901 while len(multiprocessing.active_children()) > 1: 5902 time.sleep(t) 5903 t *= 2 5904 dt = time.monotonic() - start_time 5905 if dt >= 5.0: 5906 test.support.environment_altered = True 5907 support.print_warning(f"multiprocessing.Manager still has " 5908 f"{multiprocessing.active_children()} " 5909 f"active children after {dt} seconds") 5910 break 5911 5912 gc.collect() # do garbage collection 5913 if cls.manager._number_of_objects() != 0: 5914 # This is not really an error since some tests do not 5915 # ensure that all processes which hold a reference to a 5916 # managed object have been joined. 5917 test.support.environment_altered = True 5918 support.print_warning('Shared objects which still exist ' 5919 'at manager shutdown:') 5920 support.print_warning(cls.manager._debug_info()) 5921 cls.manager.shutdown() 5922 cls.manager.join() 5923 cls.manager = None 5924 5925 super().tearDownClass() 5926 5927 5928class ThreadsMixin(BaseMixin): 5929 TYPE = 'threads' 5930 Process = multiprocessing.dummy.Process 5931 connection = multiprocessing.dummy.connection 5932 current_process = staticmethod(multiprocessing.dummy.current_process) 5933 active_children = staticmethod(multiprocessing.dummy.active_children) 5934 Pool = staticmethod(multiprocessing.dummy.Pool) 5935 Pipe = staticmethod(multiprocessing.dummy.Pipe) 5936 Queue = staticmethod(multiprocessing.dummy.Queue) 5937 JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue) 5938 Lock = staticmethod(multiprocessing.dummy.Lock) 5939 RLock = staticmethod(multiprocessing.dummy.RLock) 5940 Semaphore = staticmethod(multiprocessing.dummy.Semaphore) 5941 BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore) 5942 Condition = staticmethod(multiprocessing.dummy.Condition) 5943 Event = staticmethod(multiprocessing.dummy.Event) 5944 Barrier = staticmethod(multiprocessing.dummy.Barrier) 5945 Value = staticmethod(multiprocessing.dummy.Value) 5946 Array = staticmethod(multiprocessing.dummy.Array) 5947 5948# 5949# Functions used to create test cases from the base ones in this module 5950# 5951 5952def install_tests_in_module_dict(remote_globs, start_method): 5953 __module__ = remote_globs['__name__'] 5954 local_globs = globals() 5955 ALL_TYPES = {'processes', 'threads', 'manager'} 5956 5957 for name, base in local_globs.items(): 5958 if not isinstance(base, type): 5959 continue 5960 if issubclass(base, BaseTestCase): 5961 if base is BaseTestCase: 5962 continue 5963 assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES 5964 for type_ in base.ALLOWED_TYPES: 5965 newname = 'With' + type_.capitalize() + name[1:] 5966 Mixin = local_globs[type_.capitalize() + 'Mixin'] 5967 class Temp(base, Mixin, unittest.TestCase): 5968 pass 5969 if type_ == 'manager': 5970 Temp = hashlib_helper.requires_hashdigest('md5')(Temp) 5971 Temp.__name__ = Temp.__qualname__ = newname 5972 Temp.__module__ = __module__ 5973 remote_globs[newname] = Temp 5974 elif issubclass(base, unittest.TestCase): 5975 class Temp(base, object): 5976 pass 5977 Temp.__name__ = Temp.__qualname__ = name 5978 Temp.__module__ = __module__ 5979 remote_globs[name] = Temp 5980 5981 dangling = [None, None] 5982 old_start_method = [None] 5983 5984 def setUpModule(): 5985 multiprocessing.set_forkserver_preload(PRELOAD) 5986 multiprocessing.process._cleanup() 5987 dangling[0] = multiprocessing.process._dangling.copy() 5988 dangling[1] = threading._dangling.copy() 5989 old_start_method[0] = multiprocessing.get_start_method(allow_none=True) 5990 try: 5991 multiprocessing.set_start_method(start_method, force=True) 5992 except ValueError: 5993 raise unittest.SkipTest(start_method + 5994 ' start method not supported') 5995 5996 if sys.platform.startswith("linux"): 5997 try: 5998 lock = multiprocessing.RLock() 5999 except OSError: 6000 raise unittest.SkipTest("OSError raises on RLock creation, " 6001 "see issue 3111!") 6002 check_enough_semaphores() 6003 util.get_temp_dir() # creates temp directory 6004 multiprocessing.get_logger().setLevel(LOG_LEVEL) 6005 6006 def tearDownModule(): 6007 need_sleep = False 6008 6009 # bpo-26762: Some multiprocessing objects like Pool create reference 6010 # cycles. Trigger a garbage collection to break these cycles. 6011 test.support.gc_collect() 6012 6013 multiprocessing.set_start_method(old_start_method[0], force=True) 6014 # pause a bit so we don't get warning about dangling threads/processes 6015 processes = set(multiprocessing.process._dangling) - set(dangling[0]) 6016 if processes: 6017 need_sleep = True 6018 test.support.environment_altered = True 6019 support.print_warning(f'Dangling processes: {processes}') 6020 processes = None 6021 6022 threads = set(threading._dangling) - set(dangling[1]) 6023 if threads: 6024 need_sleep = True 6025 test.support.environment_altered = True 6026 support.print_warning(f'Dangling threads: {threads}') 6027 threads = None 6028 6029 # Sleep 500 ms to give time to child processes to complete. 6030 if need_sleep: 6031 time.sleep(0.5) 6032 6033 multiprocessing.util._cleanup_tests() 6034 6035 remote_globs['setUpModule'] = setUpModule 6036 remote_globs['tearDownModule'] = tearDownModule 6037 6038 6039@unittest.skipIf(not hasattr(_multiprocessing, 'SemLock'), 'SemLock not available') 6040@unittest.skipIf(sys.platform != "linux", "Linux only") 6041class SemLockTests(unittest.TestCase): 6042 6043 def test_semlock_subclass(self): 6044 class SemLock(_multiprocessing.SemLock): 6045 pass 6046 name = f'test_semlock_subclass-{os.getpid()}' 6047 s = SemLock(1, 0, 10, name, False) 6048 _multiprocessing.sem_unlink(name) 6049