1import os 2import shutil 3import signal 4import sys 5import unittest 6import warnings 7from unittest import mock 8 9import asyncio 10from asyncio import base_subprocess 11from asyncio import subprocess 12from test.test_asyncio import utils as test_utils 13from test import support 14from test.support import os_helper 15 16if sys.platform != 'win32': 17 from asyncio import unix_events 18 19if support.check_sanitizer(address=True): 20 raise unittest.SkipTest("Exposes ASAN flakiness in GitHub CI") 21 22# Program blocking 23PROGRAM_BLOCKED = [sys.executable, '-c', 'import time; time.sleep(3600)'] 24 25# Program copying input to output 26PROGRAM_CAT = [ 27 sys.executable, '-c', 28 ';'.join(('import sys', 29 'data = sys.stdin.buffer.read()', 30 'sys.stdout.buffer.write(data)'))] 31 32 33def tearDownModule(): 34 asyncio.set_event_loop_policy(None) 35 36 37class TestSubprocessTransport(base_subprocess.BaseSubprocessTransport): 38 def _start(self, *args, **kwargs): 39 self._proc = mock.Mock() 40 self._proc.stdin = None 41 self._proc.stdout = None 42 self._proc.stderr = None 43 self._proc.pid = -1 44 45 46class SubprocessTransportTests(test_utils.TestCase): 47 def setUp(self): 48 super().setUp() 49 self.loop = self.new_test_loop() 50 self.set_event_loop(self.loop) 51 52 def create_transport(self, waiter=None): 53 protocol = mock.Mock() 54 transport = TestSubprocessTransport( 55 self.loop, protocol, ['test'], False, 56 None, None, None, 0, waiter=waiter) 57 return (transport, protocol) 58 59 def test_proc_exited(self): 60 waiter = self.loop.create_future() 61 transport, protocol = self.create_transport(waiter) 62 transport._process_exited(6) 63 self.loop.run_until_complete(waiter) 64 65 self.assertEqual(transport.get_returncode(), 6) 66 67 self.assertTrue(protocol.connection_made.called) 68 self.assertTrue(protocol.process_exited.called) 69 self.assertTrue(protocol.connection_lost.called) 70 self.assertEqual(protocol.connection_lost.call_args[0], (None,)) 71 72 self.assertFalse(transport.is_closing()) 73 self.assertIsNone(transport._loop) 74 self.assertIsNone(transport._proc) 75 self.assertIsNone(transport._protocol) 76 77 # methods must raise ProcessLookupError if the process exited 78 self.assertRaises(ProcessLookupError, 79 transport.send_signal, signal.SIGTERM) 80 self.assertRaises(ProcessLookupError, transport.terminate) 81 self.assertRaises(ProcessLookupError, transport.kill) 82 83 transport.close() 84 85 def test_subprocess_repr(self): 86 waiter = self.loop.create_future() 87 transport, protocol = self.create_transport(waiter) 88 transport._process_exited(6) 89 self.loop.run_until_complete(waiter) 90 91 self.assertEqual( 92 repr(transport), 93 "<TestSubprocessTransport pid=-1 returncode=6>" 94 ) 95 transport._returncode = None 96 self.assertEqual( 97 repr(transport), 98 "<TestSubprocessTransport pid=-1 running>" 99 ) 100 transport._pid = None 101 transport._returncode = None 102 self.assertEqual( 103 repr(transport), 104 "<TestSubprocessTransport not started>" 105 ) 106 transport.close() 107 108 109class SubprocessMixin: 110 111 def test_stdin_stdout(self): 112 args = PROGRAM_CAT 113 114 async def run(data): 115 proc = await asyncio.create_subprocess_exec( 116 *args, 117 stdin=subprocess.PIPE, 118 stdout=subprocess.PIPE, 119 ) 120 121 # feed data 122 proc.stdin.write(data) 123 await proc.stdin.drain() 124 proc.stdin.close() 125 126 # get output and exitcode 127 data = await proc.stdout.read() 128 exitcode = await proc.wait() 129 return (exitcode, data) 130 131 task = run(b'some data') 132 task = asyncio.wait_for(task, 60.0) 133 exitcode, stdout = self.loop.run_until_complete(task) 134 self.assertEqual(exitcode, 0) 135 self.assertEqual(stdout, b'some data') 136 137 def test_communicate(self): 138 args = PROGRAM_CAT 139 140 async def run(data): 141 proc = await asyncio.create_subprocess_exec( 142 *args, 143 stdin=subprocess.PIPE, 144 stdout=subprocess.PIPE, 145 ) 146 stdout, stderr = await proc.communicate(data) 147 return proc.returncode, stdout 148 149 task = run(b'some data') 150 task = asyncio.wait_for(task, support.LONG_TIMEOUT) 151 exitcode, stdout = self.loop.run_until_complete(task) 152 self.assertEqual(exitcode, 0) 153 self.assertEqual(stdout, b'some data') 154 155 def test_shell(self): 156 proc = self.loop.run_until_complete( 157 asyncio.create_subprocess_shell('exit 7') 158 ) 159 exitcode = self.loop.run_until_complete(proc.wait()) 160 self.assertEqual(exitcode, 7) 161 162 def test_start_new_session(self): 163 # start the new process in a new session 164 proc = self.loop.run_until_complete( 165 asyncio.create_subprocess_shell( 166 'exit 8', 167 start_new_session=True, 168 ) 169 ) 170 exitcode = self.loop.run_until_complete(proc.wait()) 171 self.assertEqual(exitcode, 8) 172 173 def test_kill(self): 174 args = PROGRAM_BLOCKED 175 proc = self.loop.run_until_complete( 176 asyncio.create_subprocess_exec(*args) 177 ) 178 proc.kill() 179 returncode = self.loop.run_until_complete(proc.wait()) 180 if sys.platform == 'win32': 181 self.assertIsInstance(returncode, int) 182 # expect 1 but sometimes get 0 183 else: 184 self.assertEqual(-signal.SIGKILL, returncode) 185 186 def test_kill_issue43884(self): 187 if sys.platform == 'win32': 188 blocking_shell_command = f'{sys.executable} -c "import time; time.sleep(2)"' 189 else: 190 blocking_shell_command = 'sleep 1; sleep 1' 191 creationflags = 0 192 if sys.platform == 'win32': 193 from subprocess import CREATE_NEW_PROCESS_GROUP 194 # On windows create a new process group so that killing process 195 # kills the process and all its children. 196 creationflags = CREATE_NEW_PROCESS_GROUP 197 proc = self.loop.run_until_complete( 198 asyncio.create_subprocess_shell(blocking_shell_command, stdout=asyncio.subprocess.PIPE, 199 creationflags=creationflags) 200 ) 201 self.loop.run_until_complete(asyncio.sleep(1)) 202 if sys.platform == 'win32': 203 proc.send_signal(signal.CTRL_BREAK_EVENT) 204 # On windows it is an alias of terminate which sets the return code 205 proc.kill() 206 returncode = self.loop.run_until_complete(proc.wait()) 207 if sys.platform == 'win32': 208 self.assertIsInstance(returncode, int) 209 # expect 1 but sometimes get 0 210 else: 211 self.assertEqual(-signal.SIGKILL, returncode) 212 213 def test_terminate(self): 214 args = PROGRAM_BLOCKED 215 proc = self.loop.run_until_complete( 216 asyncio.create_subprocess_exec(*args) 217 ) 218 proc.terminate() 219 returncode = self.loop.run_until_complete(proc.wait()) 220 if sys.platform == 'win32': 221 self.assertIsInstance(returncode, int) 222 # expect 1 but sometimes get 0 223 else: 224 self.assertEqual(-signal.SIGTERM, returncode) 225 226 @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP") 227 def test_send_signal(self): 228 # bpo-31034: Make sure that we get the default signal handler (killing 229 # the process). The parent process may have decided to ignore SIGHUP, 230 # and signal handlers are inherited. 231 old_handler = signal.signal(signal.SIGHUP, signal.SIG_DFL) 232 try: 233 code = 'import time; print("sleeping", flush=True); time.sleep(3600)' 234 args = [sys.executable, '-c', code] 235 proc = self.loop.run_until_complete( 236 asyncio.create_subprocess_exec( 237 *args, 238 stdout=subprocess.PIPE, 239 ) 240 ) 241 242 async def send_signal(proc): 243 # basic synchronization to wait until the program is sleeping 244 line = await proc.stdout.readline() 245 self.assertEqual(line, b'sleeping\n') 246 247 proc.send_signal(signal.SIGHUP) 248 returncode = await proc.wait() 249 return returncode 250 251 returncode = self.loop.run_until_complete(send_signal(proc)) 252 self.assertEqual(-signal.SIGHUP, returncode) 253 finally: 254 signal.signal(signal.SIGHUP, old_handler) 255 256 def prepare_broken_pipe_test(self): 257 # buffer large enough to feed the whole pipe buffer 258 large_data = b'x' * support.PIPE_MAX_SIZE 259 260 # the program ends before the stdin can be fed 261 proc = self.loop.run_until_complete( 262 asyncio.create_subprocess_exec( 263 sys.executable, '-c', 'pass', 264 stdin=subprocess.PIPE, 265 ) 266 ) 267 268 return (proc, large_data) 269 270 def test_stdin_broken_pipe(self): 271 proc, large_data = self.prepare_broken_pipe_test() 272 273 async def write_stdin(proc, data): 274 await asyncio.sleep(0.5) 275 proc.stdin.write(data) 276 await proc.stdin.drain() 277 278 coro = write_stdin(proc, large_data) 279 # drain() must raise BrokenPipeError or ConnectionResetError 280 with test_utils.disable_logger(): 281 self.assertRaises((BrokenPipeError, ConnectionResetError), 282 self.loop.run_until_complete, coro) 283 self.loop.run_until_complete(proc.wait()) 284 285 def test_communicate_ignore_broken_pipe(self): 286 proc, large_data = self.prepare_broken_pipe_test() 287 288 # communicate() must ignore BrokenPipeError when feeding stdin 289 self.loop.set_exception_handler(lambda loop, msg: None) 290 self.loop.run_until_complete(proc.communicate(large_data)) 291 self.loop.run_until_complete(proc.wait()) 292 293 def test_pause_reading(self): 294 limit = 10 295 size = (limit * 2 + 1) 296 297 async def test_pause_reading(): 298 code = '\n'.join(( 299 'import sys', 300 'sys.stdout.write("x" * %s)' % size, 301 'sys.stdout.flush()', 302 )) 303 304 connect_read_pipe = self.loop.connect_read_pipe 305 306 async def connect_read_pipe_mock(*args, **kw): 307 transport, protocol = await connect_read_pipe(*args, **kw) 308 transport.pause_reading = mock.Mock() 309 transport.resume_reading = mock.Mock() 310 return (transport, protocol) 311 312 self.loop.connect_read_pipe = connect_read_pipe_mock 313 314 proc = await asyncio.create_subprocess_exec( 315 sys.executable, '-c', code, 316 stdin=asyncio.subprocess.PIPE, 317 stdout=asyncio.subprocess.PIPE, 318 limit=limit, 319 ) 320 stdout_transport = proc._transport.get_pipe_transport(1) 321 322 stdout, stderr = await proc.communicate() 323 324 # The child process produced more than limit bytes of output, 325 # the stream reader transport should pause the protocol to not 326 # allocate too much memory. 327 return (stdout, stdout_transport) 328 329 # Issue #22685: Ensure that the stream reader pauses the protocol 330 # when the child process produces too much data 331 stdout, transport = self.loop.run_until_complete(test_pause_reading()) 332 333 self.assertEqual(stdout, b'x' * size) 334 self.assertTrue(transport.pause_reading.called) 335 self.assertTrue(transport.resume_reading.called) 336 337 def test_stdin_not_inheritable(self): 338 # asyncio issue #209: stdin must not be inheritable, otherwise 339 # the Process.communicate() hangs 340 async def len_message(message): 341 code = 'import sys; data = sys.stdin.read(); print(len(data))' 342 proc = await asyncio.create_subprocess_exec( 343 sys.executable, '-c', code, 344 stdin=asyncio.subprocess.PIPE, 345 stdout=asyncio.subprocess.PIPE, 346 stderr=asyncio.subprocess.PIPE, 347 close_fds=False, 348 ) 349 stdout, stderr = await proc.communicate(message) 350 exitcode = await proc.wait() 351 return (stdout, exitcode) 352 353 output, exitcode = self.loop.run_until_complete(len_message(b'abc')) 354 self.assertEqual(output.rstrip(), b'3') 355 self.assertEqual(exitcode, 0) 356 357 def test_empty_input(self): 358 359 async def empty_input(): 360 code = 'import sys; data = sys.stdin.read(); print(len(data))' 361 proc = await asyncio.create_subprocess_exec( 362 sys.executable, '-c', code, 363 stdin=asyncio.subprocess.PIPE, 364 stdout=asyncio.subprocess.PIPE, 365 stderr=asyncio.subprocess.PIPE, 366 close_fds=False, 367 ) 368 stdout, stderr = await proc.communicate(b'') 369 exitcode = await proc.wait() 370 return (stdout, exitcode) 371 372 output, exitcode = self.loop.run_until_complete(empty_input()) 373 self.assertEqual(output.rstrip(), b'0') 374 self.assertEqual(exitcode, 0) 375 376 def test_devnull_input(self): 377 378 async def empty_input(): 379 code = 'import sys; data = sys.stdin.read(); print(len(data))' 380 proc = await asyncio.create_subprocess_exec( 381 sys.executable, '-c', code, 382 stdin=asyncio.subprocess.DEVNULL, 383 stdout=asyncio.subprocess.PIPE, 384 stderr=asyncio.subprocess.PIPE, 385 close_fds=False, 386 ) 387 stdout, stderr = await proc.communicate() 388 exitcode = await proc.wait() 389 return (stdout, exitcode) 390 391 output, exitcode = self.loop.run_until_complete(empty_input()) 392 self.assertEqual(output.rstrip(), b'0') 393 self.assertEqual(exitcode, 0) 394 395 def test_devnull_output(self): 396 397 async def empty_output(): 398 code = 'import sys; data = sys.stdin.read(); print(len(data))' 399 proc = await asyncio.create_subprocess_exec( 400 sys.executable, '-c', code, 401 stdin=asyncio.subprocess.PIPE, 402 stdout=asyncio.subprocess.DEVNULL, 403 stderr=asyncio.subprocess.PIPE, 404 close_fds=False, 405 ) 406 stdout, stderr = await proc.communicate(b"abc") 407 exitcode = await proc.wait() 408 return (stdout, exitcode) 409 410 output, exitcode = self.loop.run_until_complete(empty_output()) 411 self.assertEqual(output, None) 412 self.assertEqual(exitcode, 0) 413 414 def test_devnull_error(self): 415 416 async def empty_error(): 417 code = 'import sys; data = sys.stdin.read(); print(len(data))' 418 proc = await asyncio.create_subprocess_exec( 419 sys.executable, '-c', code, 420 stdin=asyncio.subprocess.PIPE, 421 stdout=asyncio.subprocess.PIPE, 422 stderr=asyncio.subprocess.DEVNULL, 423 close_fds=False, 424 ) 425 stdout, stderr = await proc.communicate(b"abc") 426 exitcode = await proc.wait() 427 return (stderr, exitcode) 428 429 output, exitcode = self.loop.run_until_complete(empty_error()) 430 self.assertEqual(output, None) 431 self.assertEqual(exitcode, 0) 432 433 @unittest.skipIf(sys.platform != 'linux', "Don't have /dev/stdin") 434 def test_devstdin_input(self): 435 436 async def devstdin_input(message): 437 code = 'file = open("/dev/stdin"); data = file.read(); print(len(data))' 438 proc = await asyncio.create_subprocess_exec( 439 sys.executable, '-c', code, 440 stdin=asyncio.subprocess.PIPE, 441 stdout=asyncio.subprocess.PIPE, 442 stderr=asyncio.subprocess.PIPE, 443 close_fds=False, 444 ) 445 stdout, stderr = await proc.communicate(message) 446 exitcode = await proc.wait() 447 return (stdout, exitcode) 448 449 output, exitcode = self.loop.run_until_complete(devstdin_input(b'abc')) 450 self.assertEqual(output.rstrip(), b'3') 451 self.assertEqual(exitcode, 0) 452 453 def test_cancel_process_wait(self): 454 # Issue #23140: cancel Process.wait() 455 456 async def cancel_wait(): 457 proc = await asyncio.create_subprocess_exec(*PROGRAM_BLOCKED) 458 459 # Create an internal future waiting on the process exit 460 task = self.loop.create_task(proc.wait()) 461 self.loop.call_soon(task.cancel) 462 try: 463 await task 464 except asyncio.CancelledError: 465 pass 466 467 # Cancel the future 468 task.cancel() 469 470 # Kill the process and wait until it is done 471 proc.kill() 472 await proc.wait() 473 474 self.loop.run_until_complete(cancel_wait()) 475 476 def test_cancel_make_subprocess_transport_exec(self): 477 478 async def cancel_make_transport(): 479 coro = asyncio.create_subprocess_exec(*PROGRAM_BLOCKED) 480 task = self.loop.create_task(coro) 481 482 self.loop.call_soon(task.cancel) 483 try: 484 await task 485 except asyncio.CancelledError: 486 pass 487 488 # ignore the log: 489 # "Exception during subprocess creation, kill the subprocess" 490 with test_utils.disable_logger(): 491 self.loop.run_until_complete(cancel_make_transport()) 492 493 def test_cancel_post_init(self): 494 495 async def cancel_make_transport(): 496 coro = self.loop.subprocess_exec(asyncio.SubprocessProtocol, 497 *PROGRAM_BLOCKED) 498 task = self.loop.create_task(coro) 499 500 self.loop.call_soon(task.cancel) 501 try: 502 await task 503 except asyncio.CancelledError: 504 pass 505 506 # ignore the log: 507 # "Exception during subprocess creation, kill the subprocess" 508 with test_utils.disable_logger(): 509 self.loop.run_until_complete(cancel_make_transport()) 510 test_utils.run_briefly(self.loop) 511 512 def test_close_kill_running(self): 513 514 async def kill_running(): 515 create = self.loop.subprocess_exec(asyncio.SubprocessProtocol, 516 *PROGRAM_BLOCKED) 517 transport, protocol = await create 518 519 kill_called = False 520 def kill(): 521 nonlocal kill_called 522 kill_called = True 523 orig_kill() 524 525 proc = transport.get_extra_info('subprocess') 526 orig_kill = proc.kill 527 proc.kill = kill 528 returncode = transport.get_returncode() 529 transport.close() 530 await asyncio.wait_for(transport._wait(), 5) 531 return (returncode, kill_called) 532 533 # Ignore "Close running child process: kill ..." log 534 with test_utils.disable_logger(): 535 try: 536 returncode, killed = self.loop.run_until_complete( 537 kill_running() 538 ) 539 except asyncio.TimeoutError: 540 self.skipTest( 541 "Timeout failure on waiting for subprocess stopping" 542 ) 543 self.assertIsNone(returncode) 544 545 # transport.close() must kill the process if it is still running 546 self.assertTrue(killed) 547 test_utils.run_briefly(self.loop) 548 549 def test_close_dont_kill_finished(self): 550 551 async def kill_running(): 552 create = self.loop.subprocess_exec(asyncio.SubprocessProtocol, 553 *PROGRAM_BLOCKED) 554 transport, protocol = await create 555 proc = transport.get_extra_info('subprocess') 556 557 # kill the process (but asyncio is not notified immediately) 558 proc.kill() 559 proc.wait() 560 561 proc.kill = mock.Mock() 562 proc_returncode = proc.poll() 563 transport_returncode = transport.get_returncode() 564 transport.close() 565 return (proc_returncode, transport_returncode, proc.kill.called) 566 567 # Ignore "Unknown child process pid ..." log of SafeChildWatcher, 568 # emitted because the test already consumes the exit status: 569 # proc.wait() 570 with test_utils.disable_logger(): 571 result = self.loop.run_until_complete(kill_running()) 572 test_utils.run_briefly(self.loop) 573 574 proc_returncode, transport_return_code, killed = result 575 576 self.assertIsNotNone(proc_returncode) 577 self.assertIsNone(transport_return_code) 578 579 # transport.close() must not kill the process if it finished, even if 580 # the transport was not notified yet 581 self.assertFalse(killed) 582 583 # Unlike SafeChildWatcher, FastChildWatcher does not pop the 584 # callbacks if waitpid() is called elsewhere. Let's clear them 585 # manually to avoid a warning when the watcher is detached. 586 if (sys.platform != 'win32' and 587 isinstance(self, SubprocessFastWatcherTests)): 588 asyncio.get_child_watcher()._callbacks.clear() 589 590 async def _test_popen_error(self, stdin): 591 if sys.platform == 'win32': 592 target = 'asyncio.windows_utils.Popen' 593 else: 594 target = 'subprocess.Popen' 595 with mock.patch(target) as popen: 596 exc = ZeroDivisionError 597 popen.side_effect = exc 598 599 with warnings.catch_warnings(record=True) as warns: 600 with self.assertRaises(exc): 601 await asyncio.create_subprocess_exec( 602 sys.executable, 603 '-c', 604 'pass', 605 stdin=stdin 606 ) 607 self.assertEqual(warns, []) 608 609 def test_popen_error(self): 610 # Issue #24763: check that the subprocess transport is closed 611 # when BaseSubprocessTransport fails 612 self.loop.run_until_complete(self._test_popen_error(stdin=None)) 613 614 def test_popen_error_with_stdin_pipe(self): 615 # Issue #35721: check that newly created socket pair is closed when 616 # Popen fails 617 self.loop.run_until_complete( 618 self._test_popen_error(stdin=subprocess.PIPE)) 619 620 def test_read_stdout_after_process_exit(self): 621 622 async def execute(): 623 code = '\n'.join(['import sys', 624 'for _ in range(64):', 625 ' sys.stdout.write("x" * 4096)', 626 'sys.stdout.flush()', 627 'sys.exit(1)']) 628 629 process = await asyncio.create_subprocess_exec( 630 sys.executable, '-c', code, 631 stdout=asyncio.subprocess.PIPE, 632 ) 633 634 while True: 635 data = await process.stdout.read(65536) 636 if data: 637 await asyncio.sleep(0.3) 638 else: 639 break 640 641 self.loop.run_until_complete(execute()) 642 643 def test_create_subprocess_exec_text_mode_fails(self): 644 async def execute(): 645 with self.assertRaises(ValueError): 646 await subprocess.create_subprocess_exec(sys.executable, 647 text=True) 648 649 with self.assertRaises(ValueError): 650 await subprocess.create_subprocess_exec(sys.executable, 651 encoding="utf-8") 652 653 with self.assertRaises(ValueError): 654 await subprocess.create_subprocess_exec(sys.executable, 655 errors="strict") 656 657 self.loop.run_until_complete(execute()) 658 659 def test_create_subprocess_shell_text_mode_fails(self): 660 661 async def execute(): 662 with self.assertRaises(ValueError): 663 await subprocess.create_subprocess_shell(sys.executable, 664 text=True) 665 666 with self.assertRaises(ValueError): 667 await subprocess.create_subprocess_shell(sys.executable, 668 encoding="utf-8") 669 670 with self.assertRaises(ValueError): 671 await subprocess.create_subprocess_shell(sys.executable, 672 errors="strict") 673 674 self.loop.run_until_complete(execute()) 675 676 def test_create_subprocess_exec_with_path(self): 677 async def execute(): 678 p = await subprocess.create_subprocess_exec( 679 os_helper.FakePath(sys.executable), '-c', 'pass') 680 await p.wait() 681 p = await subprocess.create_subprocess_exec( 682 sys.executable, '-c', 'pass', os_helper.FakePath('.')) 683 await p.wait() 684 685 self.assertIsNone(self.loop.run_until_complete(execute())) 686 687 def test_subprocess_communicate_stdout(self): 688 # See https://github.com/python/cpython/issues/100133 689 async def get_command_stdout(cmd, *args): 690 proc = await asyncio.create_subprocess_exec( 691 cmd, *args, stdout=asyncio.subprocess.PIPE, 692 ) 693 stdout, _ = await proc.communicate() 694 return stdout.decode().strip() 695 696 async def main(): 697 outputs = [f'foo{i}' for i in range(10)] 698 res = await asyncio.gather(*[get_command_stdout(sys.executable, '-c', 699 f'print({out!r})') for out in outputs]) 700 self.assertEqual(res, outputs) 701 702 self.loop.run_until_complete(main()) 703 704 705if sys.platform != 'win32': 706 # Unix 707 class SubprocessWatcherMixin(SubprocessMixin): 708 709 Watcher = None 710 711 def setUp(self): 712 super().setUp() 713 policy = asyncio.get_event_loop_policy() 714 self.loop = policy.new_event_loop() 715 self.set_event_loop(self.loop) 716 717 watcher = self.Watcher() 718 watcher.attach_loop(self.loop) 719 policy.set_child_watcher(watcher) 720 721 def tearDown(self): 722 super().tearDown() 723 policy = asyncio.get_event_loop_policy() 724 watcher = policy.get_child_watcher() 725 policy.set_child_watcher(None) 726 watcher.attach_loop(None) 727 watcher.close() 728 729 class SubprocessThreadedWatcherTests(SubprocessWatcherMixin, 730 test_utils.TestCase): 731 732 Watcher = unix_events.ThreadedChildWatcher 733 734 @unittest.skip("bpo-38323: MultiLoopChildWatcher has a race condition \ 735 and these tests can hang the test suite") 736 class SubprocessMultiLoopWatcherTests(SubprocessWatcherMixin, 737 test_utils.TestCase): 738 739 Watcher = unix_events.MultiLoopChildWatcher 740 741 class SubprocessSafeWatcherTests(SubprocessWatcherMixin, 742 test_utils.TestCase): 743 744 Watcher = unix_events.SafeChildWatcher 745 746 class SubprocessFastWatcherTests(SubprocessWatcherMixin, 747 test_utils.TestCase): 748 749 Watcher = unix_events.FastChildWatcher 750 751 def has_pidfd_support(): 752 if not hasattr(os, 'pidfd_open'): 753 return False 754 try: 755 os.close(os.pidfd_open(os.getpid())) 756 except OSError: 757 return False 758 return True 759 760 @unittest.skipUnless( 761 has_pidfd_support(), 762 "operating system does not support pidfds", 763 ) 764 class SubprocessPidfdWatcherTests(SubprocessWatcherMixin, 765 test_utils.TestCase): 766 Watcher = unix_events.PidfdChildWatcher 767 768 769 class GenericWatcherTests(test_utils.TestCase): 770 771 def test_create_subprocess_fails_with_inactive_watcher(self): 772 watcher = mock.create_autospec( 773 asyncio.AbstractChildWatcher, 774 **{"__enter__.return_value.is_active.return_value": False} 775 ) 776 777 async def execute(): 778 asyncio.set_child_watcher(watcher) 779 780 with self.assertRaises(RuntimeError): 781 await subprocess.create_subprocess_exec( 782 os_helper.FakePath(sys.executable), '-c', 'pass') 783 784 watcher.add_child_handler.assert_not_called() 785 786 with asyncio.Runner(loop_factory=asyncio.new_event_loop) as runner: 787 self.assertIsNone(runner.run(execute())) 788 self.assertListEqual(watcher.mock_calls, [ 789 mock.call.__enter__(), 790 mock.call.__enter__().is_active(), 791 mock.call.__exit__(RuntimeError, mock.ANY, mock.ANY), 792 ]) 793 794else: 795 # Windows 796 class SubprocessProactorTests(SubprocessMixin, test_utils.TestCase): 797 798 def setUp(self): 799 super().setUp() 800 self.loop = asyncio.ProactorEventLoop() 801 self.set_event_loop(self.loop) 802 803 804if __name__ == '__main__': 805 unittest.main() 806