1import os
2import shutil
3import signal
4import sys
5import unittest
6import warnings
7from unittest import mock
8
9import asyncio
10from asyncio import base_subprocess
11from asyncio import subprocess
12from test.test_asyncio import utils as test_utils
13from test import support
14from test.support import os_helper
15
16if sys.platform != 'win32':
17    from asyncio import unix_events
18
19if support.check_sanitizer(address=True):
20    raise unittest.SkipTest("Exposes ASAN flakiness in GitHub CI")
21
22# Program blocking
23PROGRAM_BLOCKED = [sys.executable, '-c', 'import time; time.sleep(3600)']
24
25# Program copying input to output
26PROGRAM_CAT = [
27    sys.executable, '-c',
28    ';'.join(('import sys',
29              'data = sys.stdin.buffer.read()',
30              'sys.stdout.buffer.write(data)'))]
31
32
33def tearDownModule():
34    asyncio.set_event_loop_policy(None)
35
36
37class TestSubprocessTransport(base_subprocess.BaseSubprocessTransport):
38    def _start(self, *args, **kwargs):
39        self._proc = mock.Mock()
40        self._proc.stdin = None
41        self._proc.stdout = None
42        self._proc.stderr = None
43        self._proc.pid = -1
44
45
46class SubprocessTransportTests(test_utils.TestCase):
47    def setUp(self):
48        super().setUp()
49        self.loop = self.new_test_loop()
50        self.set_event_loop(self.loop)
51
52    def create_transport(self, waiter=None):
53        protocol = mock.Mock()
54        transport = TestSubprocessTransport(
55                        self.loop, protocol, ['test'], False,
56                        None, None, None, 0, waiter=waiter)
57        return (transport, protocol)
58
59    def test_proc_exited(self):
60        waiter = self.loop.create_future()
61        transport, protocol = self.create_transport(waiter)
62        transport._process_exited(6)
63        self.loop.run_until_complete(waiter)
64
65        self.assertEqual(transport.get_returncode(), 6)
66
67        self.assertTrue(protocol.connection_made.called)
68        self.assertTrue(protocol.process_exited.called)
69        self.assertTrue(protocol.connection_lost.called)
70        self.assertEqual(protocol.connection_lost.call_args[0], (None,))
71
72        self.assertFalse(transport.is_closing())
73        self.assertIsNone(transport._loop)
74        self.assertIsNone(transport._proc)
75        self.assertIsNone(transport._protocol)
76
77        # methods must raise ProcessLookupError if the process exited
78        self.assertRaises(ProcessLookupError,
79                          transport.send_signal, signal.SIGTERM)
80        self.assertRaises(ProcessLookupError, transport.terminate)
81        self.assertRaises(ProcessLookupError, transport.kill)
82
83        transport.close()
84
85    def test_subprocess_repr(self):
86        waiter = self.loop.create_future()
87        transport, protocol = self.create_transport(waiter)
88        transport._process_exited(6)
89        self.loop.run_until_complete(waiter)
90
91        self.assertEqual(
92            repr(transport),
93            "<TestSubprocessTransport pid=-1 returncode=6>"
94        )
95        transport._returncode = None
96        self.assertEqual(
97            repr(transport),
98            "<TestSubprocessTransport pid=-1 running>"
99        )
100        transport._pid = None
101        transport._returncode = None
102        self.assertEqual(
103            repr(transport),
104            "<TestSubprocessTransport not started>"
105        )
106        transport.close()
107
108
109class SubprocessMixin:
110
111    def test_stdin_stdout(self):
112        args = PROGRAM_CAT
113
114        async def run(data):
115            proc = await asyncio.create_subprocess_exec(
116                *args,
117                stdin=subprocess.PIPE,
118                stdout=subprocess.PIPE,
119            )
120
121            # feed data
122            proc.stdin.write(data)
123            await proc.stdin.drain()
124            proc.stdin.close()
125
126            # get output and exitcode
127            data = await proc.stdout.read()
128            exitcode = await proc.wait()
129            return (exitcode, data)
130
131        task = run(b'some data')
132        task = asyncio.wait_for(task, 60.0)
133        exitcode, stdout = self.loop.run_until_complete(task)
134        self.assertEqual(exitcode, 0)
135        self.assertEqual(stdout, b'some data')
136
137    def test_communicate(self):
138        args = PROGRAM_CAT
139
140        async def run(data):
141            proc = await asyncio.create_subprocess_exec(
142                *args,
143                stdin=subprocess.PIPE,
144                stdout=subprocess.PIPE,
145            )
146            stdout, stderr = await proc.communicate(data)
147            return proc.returncode, stdout
148
149        task = run(b'some data')
150        task = asyncio.wait_for(task, support.LONG_TIMEOUT)
151        exitcode, stdout = self.loop.run_until_complete(task)
152        self.assertEqual(exitcode, 0)
153        self.assertEqual(stdout, b'some data')
154
155    def test_shell(self):
156        proc = self.loop.run_until_complete(
157            asyncio.create_subprocess_shell('exit 7')
158        )
159        exitcode = self.loop.run_until_complete(proc.wait())
160        self.assertEqual(exitcode, 7)
161
162    def test_start_new_session(self):
163        # start the new process in a new session
164        proc = self.loop.run_until_complete(
165            asyncio.create_subprocess_shell(
166                'exit 8',
167                start_new_session=True,
168            )
169        )
170        exitcode = self.loop.run_until_complete(proc.wait())
171        self.assertEqual(exitcode, 8)
172
173    def test_kill(self):
174        args = PROGRAM_BLOCKED
175        proc = self.loop.run_until_complete(
176            asyncio.create_subprocess_exec(*args)
177        )
178        proc.kill()
179        returncode = self.loop.run_until_complete(proc.wait())
180        if sys.platform == 'win32':
181            self.assertIsInstance(returncode, int)
182            # expect 1 but sometimes get 0
183        else:
184            self.assertEqual(-signal.SIGKILL, returncode)
185
186    def test_kill_issue43884(self):
187        if sys.platform == 'win32':
188            blocking_shell_command = f'{sys.executable} -c "import time; time.sleep(2)"'
189        else:
190            blocking_shell_command = 'sleep 1; sleep 1'
191        creationflags = 0
192        if sys.platform == 'win32':
193            from subprocess import CREATE_NEW_PROCESS_GROUP
194            # On windows create a new process group so that killing process
195            # kills the process and all its children.
196            creationflags = CREATE_NEW_PROCESS_GROUP
197        proc = self.loop.run_until_complete(
198            asyncio.create_subprocess_shell(blocking_shell_command, stdout=asyncio.subprocess.PIPE,
199            creationflags=creationflags)
200        )
201        self.loop.run_until_complete(asyncio.sleep(1))
202        if sys.platform == 'win32':
203            proc.send_signal(signal.CTRL_BREAK_EVENT)
204        # On windows it is an alias of terminate which sets the return code
205        proc.kill()
206        returncode = self.loop.run_until_complete(proc.wait())
207        if sys.platform == 'win32':
208            self.assertIsInstance(returncode, int)
209            # expect 1 but sometimes get 0
210        else:
211            self.assertEqual(-signal.SIGKILL, returncode)
212
213    def test_terminate(self):
214        args = PROGRAM_BLOCKED
215        proc = self.loop.run_until_complete(
216            asyncio.create_subprocess_exec(*args)
217        )
218        proc.terminate()
219        returncode = self.loop.run_until_complete(proc.wait())
220        if sys.platform == 'win32':
221            self.assertIsInstance(returncode, int)
222            # expect 1 but sometimes get 0
223        else:
224            self.assertEqual(-signal.SIGTERM, returncode)
225
226    @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP")
227    def test_send_signal(self):
228        # bpo-31034: Make sure that we get the default signal handler (killing
229        # the process). The parent process may have decided to ignore SIGHUP,
230        # and signal handlers are inherited.
231        old_handler = signal.signal(signal.SIGHUP, signal.SIG_DFL)
232        try:
233            code = 'import time; print("sleeping", flush=True); time.sleep(3600)'
234            args = [sys.executable, '-c', code]
235            proc = self.loop.run_until_complete(
236                asyncio.create_subprocess_exec(
237                    *args,
238                    stdout=subprocess.PIPE,
239                )
240            )
241
242            async def send_signal(proc):
243                # basic synchronization to wait until the program is sleeping
244                line = await proc.stdout.readline()
245                self.assertEqual(line, b'sleeping\n')
246
247                proc.send_signal(signal.SIGHUP)
248                returncode = await proc.wait()
249                return returncode
250
251            returncode = self.loop.run_until_complete(send_signal(proc))
252            self.assertEqual(-signal.SIGHUP, returncode)
253        finally:
254            signal.signal(signal.SIGHUP, old_handler)
255
256    def prepare_broken_pipe_test(self):
257        # buffer large enough to feed the whole pipe buffer
258        large_data = b'x' * support.PIPE_MAX_SIZE
259
260        # the program ends before the stdin can be fed
261        proc = self.loop.run_until_complete(
262            asyncio.create_subprocess_exec(
263                sys.executable, '-c', 'pass',
264                stdin=subprocess.PIPE,
265            )
266        )
267
268        return (proc, large_data)
269
270    def test_stdin_broken_pipe(self):
271        proc, large_data = self.prepare_broken_pipe_test()
272
273        async def write_stdin(proc, data):
274            await asyncio.sleep(0.5)
275            proc.stdin.write(data)
276            await proc.stdin.drain()
277
278        coro = write_stdin(proc, large_data)
279        # drain() must raise BrokenPipeError or ConnectionResetError
280        with test_utils.disable_logger():
281            self.assertRaises((BrokenPipeError, ConnectionResetError),
282                              self.loop.run_until_complete, coro)
283        self.loop.run_until_complete(proc.wait())
284
285    def test_communicate_ignore_broken_pipe(self):
286        proc, large_data = self.prepare_broken_pipe_test()
287
288        # communicate() must ignore BrokenPipeError when feeding stdin
289        self.loop.set_exception_handler(lambda loop, msg: None)
290        self.loop.run_until_complete(proc.communicate(large_data))
291        self.loop.run_until_complete(proc.wait())
292
293    def test_pause_reading(self):
294        limit = 10
295        size = (limit * 2 + 1)
296
297        async def test_pause_reading():
298            code = '\n'.join((
299                'import sys',
300                'sys.stdout.write("x" * %s)' % size,
301                'sys.stdout.flush()',
302            ))
303
304            connect_read_pipe = self.loop.connect_read_pipe
305
306            async def connect_read_pipe_mock(*args, **kw):
307                transport, protocol = await connect_read_pipe(*args, **kw)
308                transport.pause_reading = mock.Mock()
309                transport.resume_reading = mock.Mock()
310                return (transport, protocol)
311
312            self.loop.connect_read_pipe = connect_read_pipe_mock
313
314            proc = await asyncio.create_subprocess_exec(
315                sys.executable, '-c', code,
316                stdin=asyncio.subprocess.PIPE,
317                stdout=asyncio.subprocess.PIPE,
318                limit=limit,
319            )
320            stdout_transport = proc._transport.get_pipe_transport(1)
321
322            stdout, stderr = await proc.communicate()
323
324            # The child process produced more than limit bytes of output,
325            # the stream reader transport should pause the protocol to not
326            # allocate too much memory.
327            return (stdout, stdout_transport)
328
329        # Issue #22685: Ensure that the stream reader pauses the protocol
330        # when the child process produces too much data
331        stdout, transport = self.loop.run_until_complete(test_pause_reading())
332
333        self.assertEqual(stdout, b'x' * size)
334        self.assertTrue(transport.pause_reading.called)
335        self.assertTrue(transport.resume_reading.called)
336
337    def test_stdin_not_inheritable(self):
338        # asyncio issue #209: stdin must not be inheritable, otherwise
339        # the Process.communicate() hangs
340        async def len_message(message):
341            code = 'import sys; data = sys.stdin.read(); print(len(data))'
342            proc = await asyncio.create_subprocess_exec(
343                sys.executable, '-c', code,
344                stdin=asyncio.subprocess.PIPE,
345                stdout=asyncio.subprocess.PIPE,
346                stderr=asyncio.subprocess.PIPE,
347                close_fds=False,
348            )
349            stdout, stderr = await proc.communicate(message)
350            exitcode = await proc.wait()
351            return (stdout, exitcode)
352
353        output, exitcode = self.loop.run_until_complete(len_message(b'abc'))
354        self.assertEqual(output.rstrip(), b'3')
355        self.assertEqual(exitcode, 0)
356
357    def test_empty_input(self):
358
359        async def empty_input():
360            code = 'import sys; data = sys.stdin.read(); print(len(data))'
361            proc = await asyncio.create_subprocess_exec(
362                sys.executable, '-c', code,
363                stdin=asyncio.subprocess.PIPE,
364                stdout=asyncio.subprocess.PIPE,
365                stderr=asyncio.subprocess.PIPE,
366                close_fds=False,
367            )
368            stdout, stderr = await proc.communicate(b'')
369            exitcode = await proc.wait()
370            return (stdout, exitcode)
371
372        output, exitcode = self.loop.run_until_complete(empty_input())
373        self.assertEqual(output.rstrip(), b'0')
374        self.assertEqual(exitcode, 0)
375
376    def test_devnull_input(self):
377
378        async def empty_input():
379            code = 'import sys; data = sys.stdin.read(); print(len(data))'
380            proc = await asyncio.create_subprocess_exec(
381                sys.executable, '-c', code,
382                stdin=asyncio.subprocess.DEVNULL,
383                stdout=asyncio.subprocess.PIPE,
384                stderr=asyncio.subprocess.PIPE,
385                close_fds=False,
386            )
387            stdout, stderr = await proc.communicate()
388            exitcode = await proc.wait()
389            return (stdout, exitcode)
390
391        output, exitcode = self.loop.run_until_complete(empty_input())
392        self.assertEqual(output.rstrip(), b'0')
393        self.assertEqual(exitcode, 0)
394
395    def test_devnull_output(self):
396
397        async def empty_output():
398            code = 'import sys; data = sys.stdin.read(); print(len(data))'
399            proc = await asyncio.create_subprocess_exec(
400                sys.executable, '-c', code,
401                stdin=asyncio.subprocess.PIPE,
402                stdout=asyncio.subprocess.DEVNULL,
403                stderr=asyncio.subprocess.PIPE,
404                close_fds=False,
405            )
406            stdout, stderr = await proc.communicate(b"abc")
407            exitcode = await proc.wait()
408            return (stdout, exitcode)
409
410        output, exitcode = self.loop.run_until_complete(empty_output())
411        self.assertEqual(output, None)
412        self.assertEqual(exitcode, 0)
413
414    def test_devnull_error(self):
415
416        async def empty_error():
417            code = 'import sys; data = sys.stdin.read(); print(len(data))'
418            proc = await asyncio.create_subprocess_exec(
419                sys.executable, '-c', code,
420                stdin=asyncio.subprocess.PIPE,
421                stdout=asyncio.subprocess.PIPE,
422                stderr=asyncio.subprocess.DEVNULL,
423                close_fds=False,
424            )
425            stdout, stderr = await proc.communicate(b"abc")
426            exitcode = await proc.wait()
427            return (stderr, exitcode)
428
429        output, exitcode = self.loop.run_until_complete(empty_error())
430        self.assertEqual(output, None)
431        self.assertEqual(exitcode, 0)
432
433    @unittest.skipIf(sys.platform != 'linux', "Don't have /dev/stdin")
434    def test_devstdin_input(self):
435
436        async def devstdin_input(message):
437            code = 'file = open("/dev/stdin"); data = file.read(); print(len(data))'
438            proc = await asyncio.create_subprocess_exec(
439                sys.executable, '-c', code,
440                stdin=asyncio.subprocess.PIPE,
441                stdout=asyncio.subprocess.PIPE,
442                stderr=asyncio.subprocess.PIPE,
443                close_fds=False,
444            )
445            stdout, stderr = await proc.communicate(message)
446            exitcode = await proc.wait()
447            return (stdout, exitcode)
448
449        output, exitcode = self.loop.run_until_complete(devstdin_input(b'abc'))
450        self.assertEqual(output.rstrip(), b'3')
451        self.assertEqual(exitcode, 0)
452
453    def test_cancel_process_wait(self):
454        # Issue #23140: cancel Process.wait()
455
456        async def cancel_wait():
457            proc = await asyncio.create_subprocess_exec(*PROGRAM_BLOCKED)
458
459            # Create an internal future waiting on the process exit
460            task = self.loop.create_task(proc.wait())
461            self.loop.call_soon(task.cancel)
462            try:
463                await task
464            except asyncio.CancelledError:
465                pass
466
467            # Cancel the future
468            task.cancel()
469
470            # Kill the process and wait until it is done
471            proc.kill()
472            await proc.wait()
473
474        self.loop.run_until_complete(cancel_wait())
475
476    def test_cancel_make_subprocess_transport_exec(self):
477
478        async def cancel_make_transport():
479            coro = asyncio.create_subprocess_exec(*PROGRAM_BLOCKED)
480            task = self.loop.create_task(coro)
481
482            self.loop.call_soon(task.cancel)
483            try:
484                await task
485            except asyncio.CancelledError:
486                pass
487
488        # ignore the log:
489        # "Exception during subprocess creation, kill the subprocess"
490        with test_utils.disable_logger():
491            self.loop.run_until_complete(cancel_make_transport())
492
493    def test_cancel_post_init(self):
494
495        async def cancel_make_transport():
496            coro = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
497                                             *PROGRAM_BLOCKED)
498            task = self.loop.create_task(coro)
499
500            self.loop.call_soon(task.cancel)
501            try:
502                await task
503            except asyncio.CancelledError:
504                pass
505
506        # ignore the log:
507        # "Exception during subprocess creation, kill the subprocess"
508        with test_utils.disable_logger():
509            self.loop.run_until_complete(cancel_make_transport())
510            test_utils.run_briefly(self.loop)
511
512    def test_close_kill_running(self):
513
514        async def kill_running():
515            create = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
516                                               *PROGRAM_BLOCKED)
517            transport, protocol = await create
518
519            kill_called = False
520            def kill():
521                nonlocal kill_called
522                kill_called = True
523                orig_kill()
524
525            proc = transport.get_extra_info('subprocess')
526            orig_kill = proc.kill
527            proc.kill = kill
528            returncode = transport.get_returncode()
529            transport.close()
530            await asyncio.wait_for(transport._wait(), 5)
531            return (returncode, kill_called)
532
533        # Ignore "Close running child process: kill ..." log
534        with test_utils.disable_logger():
535            try:
536                returncode, killed = self.loop.run_until_complete(
537                    kill_running()
538                )
539            except asyncio.TimeoutError:
540                self.skipTest(
541                    "Timeout failure on waiting for subprocess stopping"
542                )
543        self.assertIsNone(returncode)
544
545        # transport.close() must kill the process if it is still running
546        self.assertTrue(killed)
547        test_utils.run_briefly(self.loop)
548
549    def test_close_dont_kill_finished(self):
550
551        async def kill_running():
552            create = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
553                                               *PROGRAM_BLOCKED)
554            transport, protocol = await create
555            proc = transport.get_extra_info('subprocess')
556
557            # kill the process (but asyncio is not notified immediately)
558            proc.kill()
559            proc.wait()
560
561            proc.kill = mock.Mock()
562            proc_returncode = proc.poll()
563            transport_returncode = transport.get_returncode()
564            transport.close()
565            return (proc_returncode, transport_returncode, proc.kill.called)
566
567        # Ignore "Unknown child process pid ..." log of SafeChildWatcher,
568        # emitted because the test already consumes the exit status:
569        # proc.wait()
570        with test_utils.disable_logger():
571            result = self.loop.run_until_complete(kill_running())
572            test_utils.run_briefly(self.loop)
573
574        proc_returncode, transport_return_code, killed = result
575
576        self.assertIsNotNone(proc_returncode)
577        self.assertIsNone(transport_return_code)
578
579        # transport.close() must not kill the process if it finished, even if
580        # the transport was not notified yet
581        self.assertFalse(killed)
582
583        # Unlike SafeChildWatcher, FastChildWatcher does not pop the
584        # callbacks if waitpid() is called elsewhere. Let's clear them
585        # manually to avoid a warning when the watcher is detached.
586        if (sys.platform != 'win32' and
587                isinstance(self, SubprocessFastWatcherTests)):
588            asyncio.get_child_watcher()._callbacks.clear()
589
590    async def _test_popen_error(self, stdin):
591        if sys.platform == 'win32':
592            target = 'asyncio.windows_utils.Popen'
593        else:
594            target = 'subprocess.Popen'
595        with mock.patch(target) as popen:
596            exc = ZeroDivisionError
597            popen.side_effect = exc
598
599            with warnings.catch_warnings(record=True) as warns:
600                with self.assertRaises(exc):
601                    await asyncio.create_subprocess_exec(
602                        sys.executable,
603                        '-c',
604                        'pass',
605                        stdin=stdin
606                    )
607                self.assertEqual(warns, [])
608
609    def test_popen_error(self):
610        # Issue #24763: check that the subprocess transport is closed
611        # when BaseSubprocessTransport fails
612        self.loop.run_until_complete(self._test_popen_error(stdin=None))
613
614    def test_popen_error_with_stdin_pipe(self):
615        # Issue #35721: check that newly created socket pair is closed when
616        # Popen fails
617        self.loop.run_until_complete(
618            self._test_popen_error(stdin=subprocess.PIPE))
619
620    def test_read_stdout_after_process_exit(self):
621
622        async def execute():
623            code = '\n'.join(['import sys',
624                              'for _ in range(64):',
625                              '    sys.stdout.write("x" * 4096)',
626                              'sys.stdout.flush()',
627                              'sys.exit(1)'])
628
629            process = await asyncio.create_subprocess_exec(
630                sys.executable, '-c', code,
631                stdout=asyncio.subprocess.PIPE,
632            )
633
634            while True:
635                data = await process.stdout.read(65536)
636                if data:
637                    await asyncio.sleep(0.3)
638                else:
639                    break
640
641        self.loop.run_until_complete(execute())
642
643    def test_create_subprocess_exec_text_mode_fails(self):
644        async def execute():
645            with self.assertRaises(ValueError):
646                await subprocess.create_subprocess_exec(sys.executable,
647                                                        text=True)
648
649            with self.assertRaises(ValueError):
650                await subprocess.create_subprocess_exec(sys.executable,
651                                                        encoding="utf-8")
652
653            with self.assertRaises(ValueError):
654                await subprocess.create_subprocess_exec(sys.executable,
655                                                        errors="strict")
656
657        self.loop.run_until_complete(execute())
658
659    def test_create_subprocess_shell_text_mode_fails(self):
660
661        async def execute():
662            with self.assertRaises(ValueError):
663                await subprocess.create_subprocess_shell(sys.executable,
664                                                         text=True)
665
666            with self.assertRaises(ValueError):
667                await subprocess.create_subprocess_shell(sys.executable,
668                                                         encoding="utf-8")
669
670            with self.assertRaises(ValueError):
671                await subprocess.create_subprocess_shell(sys.executable,
672                                                         errors="strict")
673
674        self.loop.run_until_complete(execute())
675
676    def test_create_subprocess_exec_with_path(self):
677        async def execute():
678            p = await subprocess.create_subprocess_exec(
679                os_helper.FakePath(sys.executable), '-c', 'pass')
680            await p.wait()
681            p = await subprocess.create_subprocess_exec(
682                sys.executable, '-c', 'pass', os_helper.FakePath('.'))
683            await p.wait()
684
685        self.assertIsNone(self.loop.run_until_complete(execute()))
686
687    def test_subprocess_communicate_stdout(self):
688        # See https://github.com/python/cpython/issues/100133
689        async def get_command_stdout(cmd, *args):
690            proc = await asyncio.create_subprocess_exec(
691                cmd, *args, stdout=asyncio.subprocess.PIPE,
692            )
693            stdout, _ = await proc.communicate()
694            return stdout.decode().strip()
695
696        async def main():
697            outputs = [f'foo{i}' for i in range(10)]
698            res = await asyncio.gather(*[get_command_stdout(sys.executable, '-c',
699                                        f'print({out!r})') for out in outputs])
700            self.assertEqual(res, outputs)
701
702        self.loop.run_until_complete(main())
703
704
705if sys.platform != 'win32':
706    # Unix
707    class SubprocessWatcherMixin(SubprocessMixin):
708
709        Watcher = None
710
711        def setUp(self):
712            super().setUp()
713            policy = asyncio.get_event_loop_policy()
714            self.loop = policy.new_event_loop()
715            self.set_event_loop(self.loop)
716
717            watcher = self.Watcher()
718            watcher.attach_loop(self.loop)
719            policy.set_child_watcher(watcher)
720
721        def tearDown(self):
722            super().tearDown()
723            policy = asyncio.get_event_loop_policy()
724            watcher = policy.get_child_watcher()
725            policy.set_child_watcher(None)
726            watcher.attach_loop(None)
727            watcher.close()
728
729    class SubprocessThreadedWatcherTests(SubprocessWatcherMixin,
730                                         test_utils.TestCase):
731
732        Watcher = unix_events.ThreadedChildWatcher
733
734    @unittest.skip("bpo-38323: MultiLoopChildWatcher has a race condition \
735                    and these tests can hang the test suite")
736    class SubprocessMultiLoopWatcherTests(SubprocessWatcherMixin,
737                                          test_utils.TestCase):
738
739        Watcher = unix_events.MultiLoopChildWatcher
740
741    class SubprocessSafeWatcherTests(SubprocessWatcherMixin,
742                                     test_utils.TestCase):
743
744        Watcher = unix_events.SafeChildWatcher
745
746    class SubprocessFastWatcherTests(SubprocessWatcherMixin,
747                                     test_utils.TestCase):
748
749        Watcher = unix_events.FastChildWatcher
750
751    def has_pidfd_support():
752        if not hasattr(os, 'pidfd_open'):
753            return False
754        try:
755            os.close(os.pidfd_open(os.getpid()))
756        except OSError:
757            return False
758        return True
759
760    @unittest.skipUnless(
761        has_pidfd_support(),
762        "operating system does not support pidfds",
763    )
764    class SubprocessPidfdWatcherTests(SubprocessWatcherMixin,
765                                      test_utils.TestCase):
766        Watcher = unix_events.PidfdChildWatcher
767
768
769    class GenericWatcherTests(test_utils.TestCase):
770
771        def test_create_subprocess_fails_with_inactive_watcher(self):
772            watcher = mock.create_autospec(
773                asyncio.AbstractChildWatcher,
774                **{"__enter__.return_value.is_active.return_value": False}
775            )
776
777            async def execute():
778                asyncio.set_child_watcher(watcher)
779
780                with self.assertRaises(RuntimeError):
781                    await subprocess.create_subprocess_exec(
782                        os_helper.FakePath(sys.executable), '-c', 'pass')
783
784                watcher.add_child_handler.assert_not_called()
785
786            with asyncio.Runner(loop_factory=asyncio.new_event_loop) as runner:
787                self.assertIsNone(runner.run(execute()))
788            self.assertListEqual(watcher.mock_calls, [
789                mock.call.__enter__(),
790                mock.call.__enter__().is_active(),
791                mock.call.__exit__(RuntimeError, mock.ANY, mock.ANY),
792            ])
793
794else:
795    # Windows
796    class SubprocessProactorTests(SubprocessMixin, test_utils.TestCase):
797
798        def setUp(self):
799            super().setUp()
800            self.loop = asyncio.ProactorEventLoop()
801            self.set_event_loop(self.loop)
802
803
804if __name__ == '__main__':
805    unittest.main()
806