1import unittest
2from unittest import mock
3from test import support
4from test.support import check_sanitizer
5from test.support import import_helper
6from test.support import os_helper
7from test.support import warnings_helper
8import subprocess
9import sys
10import signal
11import io
12import itertools
13import os
14import errno
15import tempfile
16import time
17import traceback
18import types
19import selectors
20import sysconfig
21import select
22import shutil
23import threading
24import gc
25import textwrap
26import json
27import pathlib
28from test.support.os_helper import FakePath
29
30try:
31    import _testcapi
32except ImportError:
33    _testcapi = None
34
35try:
36    import pwd
37except ImportError:
38    pwd = None
39try:
40    import grp
41except ImportError:
42    grp = None
43
44try:
45    import fcntl
46except:
47    fcntl = None
48
49if support.PGO:
50    raise unittest.SkipTest("test is not helpful for PGO")
51
52if not support.has_subprocess_support:
53    raise unittest.SkipTest("test module requires subprocess")
54
55mswindows = (sys.platform == "win32")
56
57#
58# Depends on the following external programs: Python
59#
60
61if mswindows:
62    SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), '
63                                                'os.O_BINARY);')
64else:
65    SETBINARY = ''
66
67NONEXISTING_CMD = ('nonexisting_i_hope',)
68# Ignore errors that indicate the command was not found
69NONEXISTING_ERRORS = (FileNotFoundError, NotADirectoryError, PermissionError)
70
71ZERO_RETURN_CMD = (sys.executable, '-c', 'pass')
72
73
74def setUpModule():
75    shell_true = shutil.which('true')
76    if shell_true is None:
77        return
78    if (os.access(shell_true, os.X_OK) and
79        subprocess.run([shell_true]).returncode == 0):
80        global ZERO_RETURN_CMD
81        ZERO_RETURN_CMD = (shell_true,)  # Faster than Python startup.
82
83
84class BaseTestCase(unittest.TestCase):
85    def setUp(self):
86        # Try to minimize the number of children we have so this test
87        # doesn't crash on some buildbots (Alphas in particular).
88        support.reap_children()
89
90    def tearDown(self):
91        if not mswindows:
92            # subprocess._active is not used on Windows and is set to None.
93            for inst in subprocess._active:
94                inst.wait()
95            subprocess._cleanup()
96            self.assertFalse(
97                subprocess._active, "subprocess._active not empty"
98            )
99        self.doCleanups()
100        support.reap_children()
101
102
103class PopenTestException(Exception):
104    pass
105
106
107class PopenExecuteChildRaises(subprocess.Popen):
108    """Popen subclass for testing cleanup of subprocess.PIPE filehandles when
109    _execute_child fails.
110    """
111    def _execute_child(self, *args, **kwargs):
112        raise PopenTestException("Forced Exception for Test")
113
114
115class ProcessTestCase(BaseTestCase):
116
117    def test_io_buffered_by_default(self):
118        p = subprocess.Popen(ZERO_RETURN_CMD,
119                             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
120                             stderr=subprocess.PIPE)
121        try:
122            self.assertIsInstance(p.stdin, io.BufferedIOBase)
123            self.assertIsInstance(p.stdout, io.BufferedIOBase)
124            self.assertIsInstance(p.stderr, io.BufferedIOBase)
125        finally:
126            p.stdin.close()
127            p.stdout.close()
128            p.stderr.close()
129            p.wait()
130
131    def test_io_unbuffered_works(self):
132        p = subprocess.Popen(ZERO_RETURN_CMD,
133                             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
134                             stderr=subprocess.PIPE, bufsize=0)
135        try:
136            self.assertIsInstance(p.stdin, io.RawIOBase)
137            self.assertIsInstance(p.stdout, io.RawIOBase)
138            self.assertIsInstance(p.stderr, io.RawIOBase)
139        finally:
140            p.stdin.close()
141            p.stdout.close()
142            p.stderr.close()
143            p.wait()
144
145    def test_call_seq(self):
146        # call() function with sequence argument
147        rc = subprocess.call([sys.executable, "-c",
148                              "import sys; sys.exit(47)"])
149        self.assertEqual(rc, 47)
150
151    def test_call_timeout(self):
152        # call() function with timeout argument; we want to test that the child
153        # process gets killed when the timeout expires.  If the child isn't
154        # killed, this call will deadlock since subprocess.call waits for the
155        # child.
156        self.assertRaises(subprocess.TimeoutExpired, subprocess.call,
157                          [sys.executable, "-c", "while True: pass"],
158                          timeout=0.1)
159
160    def test_check_call_zero(self):
161        # check_call() function with zero return code
162        rc = subprocess.check_call(ZERO_RETURN_CMD)
163        self.assertEqual(rc, 0)
164
165    def test_check_call_nonzero(self):
166        # check_call() function with non-zero return code
167        with self.assertRaises(subprocess.CalledProcessError) as c:
168            subprocess.check_call([sys.executable, "-c",
169                                   "import sys; sys.exit(47)"])
170        self.assertEqual(c.exception.returncode, 47)
171
172    def test_check_output(self):
173        # check_output() function with zero return code
174        output = subprocess.check_output(
175                [sys.executable, "-c", "print('BDFL')"])
176        self.assertIn(b'BDFL', output)
177
178        with self.assertRaisesRegex(ValueError,
179                "stdout argument not allowed, it will be overridden"):
180            subprocess.check_output([], stdout=None)
181
182        with self.assertRaisesRegex(ValueError,
183                "check argument not allowed, it will be overridden"):
184            subprocess.check_output([], check=False)
185
186    def test_check_output_nonzero(self):
187        # check_call() function with non-zero return code
188        with self.assertRaises(subprocess.CalledProcessError) as c:
189            subprocess.check_output(
190                    [sys.executable, "-c", "import sys; sys.exit(5)"])
191        self.assertEqual(c.exception.returncode, 5)
192
193    def test_check_output_stderr(self):
194        # check_output() function stderr redirected to stdout
195        output = subprocess.check_output(
196                [sys.executable, "-c", "import sys; sys.stderr.write('BDFL')"],
197                stderr=subprocess.STDOUT)
198        self.assertIn(b'BDFL', output)
199
200    def test_check_output_stdin_arg(self):
201        # check_output() can be called with stdin set to a file
202        tf = tempfile.TemporaryFile()
203        self.addCleanup(tf.close)
204        tf.write(b'pear')
205        tf.seek(0)
206        output = subprocess.check_output(
207                [sys.executable, "-c",
208                 "import sys; sys.stdout.write(sys.stdin.read().upper())"],
209                stdin=tf)
210        self.assertIn(b'PEAR', output)
211
212    def test_check_output_input_arg(self):
213        # check_output() can be called with input set to a string
214        output = subprocess.check_output(
215                [sys.executable, "-c",
216                 "import sys; sys.stdout.write(sys.stdin.read().upper())"],
217                input=b'pear')
218        self.assertIn(b'PEAR', output)
219
220    def test_check_output_input_none(self):
221        """input=None has a legacy meaning of input='' on check_output."""
222        output = subprocess.check_output(
223                [sys.executable, "-c",
224                 "import sys; print('XX' if sys.stdin.read() else '')"],
225                input=None)
226        self.assertNotIn(b'XX', output)
227
228    def test_check_output_input_none_text(self):
229        output = subprocess.check_output(
230                [sys.executable, "-c",
231                 "import sys; print('XX' if sys.stdin.read() else '')"],
232                input=None, text=True)
233        self.assertNotIn('XX', output)
234
235    def test_check_output_input_none_universal_newlines(self):
236        output = subprocess.check_output(
237                [sys.executable, "-c",
238                 "import sys; print('XX' if sys.stdin.read() else '')"],
239                input=None, universal_newlines=True)
240        self.assertNotIn('XX', output)
241
242    def test_check_output_input_none_encoding_errors(self):
243        output = subprocess.check_output(
244                [sys.executable, "-c", "print('foo')"],
245                input=None, encoding='utf-8', errors='ignore')
246        self.assertIn('foo', output)
247
248    def test_check_output_stdout_arg(self):
249        # check_output() refuses to accept 'stdout' argument
250        with self.assertRaises(ValueError) as c:
251            output = subprocess.check_output(
252                    [sys.executable, "-c", "print('will not be run')"],
253                    stdout=sys.stdout)
254            self.fail("Expected ValueError when stdout arg supplied.")
255        self.assertIn('stdout', c.exception.args[0])
256
257    def test_check_output_stdin_with_input_arg(self):
258        # check_output() refuses to accept 'stdin' with 'input'
259        tf = tempfile.TemporaryFile()
260        self.addCleanup(tf.close)
261        tf.write(b'pear')
262        tf.seek(0)
263        with self.assertRaises(ValueError) as c:
264            output = subprocess.check_output(
265                    [sys.executable, "-c", "print('will not be run')"],
266                    stdin=tf, input=b'hare')
267            self.fail("Expected ValueError when stdin and input args supplied.")
268        self.assertIn('stdin', c.exception.args[0])
269        self.assertIn('input', c.exception.args[0])
270
271    def test_check_output_timeout(self):
272        # check_output() function with timeout arg
273        with self.assertRaises(subprocess.TimeoutExpired) as c:
274            output = subprocess.check_output(
275                    [sys.executable, "-c",
276                     "import sys, time\n"
277                     "sys.stdout.write('BDFL')\n"
278                     "sys.stdout.flush()\n"
279                     "time.sleep(3600)"],
280                    # Some heavily loaded buildbots (sparc Debian 3.x) require
281                    # this much time to start and print.
282                    timeout=3)
283            self.fail("Expected TimeoutExpired.")
284        self.assertEqual(c.exception.output, b'BDFL')
285
286    def test_call_kwargs(self):
287        # call() function with keyword args
288        newenv = os.environ.copy()
289        newenv["FRUIT"] = "banana"
290        rc = subprocess.call([sys.executable, "-c",
291                              'import sys, os;'
292                              'sys.exit(os.getenv("FRUIT")=="banana")'],
293                             env=newenv)
294        self.assertEqual(rc, 1)
295
296    def test_invalid_args(self):
297        # Popen() called with invalid arguments should raise TypeError
298        # but Popen.__del__ should not complain (issue #12085)
299        with support.captured_stderr() as s:
300            self.assertRaises(TypeError, subprocess.Popen, invalid_arg_name=1)
301            argcount = subprocess.Popen.__init__.__code__.co_argcount
302            too_many_args = [0] * (argcount + 1)
303            self.assertRaises(TypeError, subprocess.Popen, *too_many_args)
304        self.assertEqual(s.getvalue(), '')
305
306    def test_stdin_none(self):
307        # .stdin is None when not redirected
308        p = subprocess.Popen([sys.executable, "-c", 'print("banana")'],
309                         stdout=subprocess.PIPE, stderr=subprocess.PIPE)
310        self.addCleanup(p.stdout.close)
311        self.addCleanup(p.stderr.close)
312        p.wait()
313        self.assertEqual(p.stdin, None)
314
315    def test_stdout_none(self):
316        # .stdout is None when not redirected, and the child's stdout will
317        # be inherited from the parent.  In order to test this we run a
318        # subprocess in a subprocess:
319        # this_test
320        #   \-- subprocess created by this test (parent)
321        #          \-- subprocess created by the parent subprocess (child)
322        # The parent doesn't specify stdout, so the child will use the
323        # parent's stdout.  This test checks that the message printed by the
324        # child goes to the parent stdout.  The parent also checks that the
325        # child's stdout is None.  See #11963.
326        code = ('import sys; from subprocess import Popen, PIPE;'
327                'p = Popen([sys.executable, "-c", "print(\'test_stdout_none\')"],'
328                '          stdin=PIPE, stderr=PIPE);'
329                'p.wait(); assert p.stdout is None;')
330        p = subprocess.Popen([sys.executable, "-c", code],
331                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
332        self.addCleanup(p.stdout.close)
333        self.addCleanup(p.stderr.close)
334        out, err = p.communicate()
335        self.assertEqual(p.returncode, 0, err)
336        self.assertEqual(out.rstrip(), b'test_stdout_none')
337
338    def test_stderr_none(self):
339        # .stderr is None when not redirected
340        p = subprocess.Popen([sys.executable, "-c", 'print("banana")'],
341                         stdin=subprocess.PIPE, stdout=subprocess.PIPE)
342        self.addCleanup(p.stdout.close)
343        self.addCleanup(p.stdin.close)
344        p.wait()
345        self.assertEqual(p.stderr, None)
346
347    def _assert_python(self, pre_args, **kwargs):
348        # We include sys.exit() to prevent the test runner from hanging
349        # whenever python is found.
350        args = pre_args + ["import sys; sys.exit(47)"]
351        p = subprocess.Popen(args, **kwargs)
352        p.wait()
353        self.assertEqual(47, p.returncode)
354
355    def test_executable(self):
356        # Check that the executable argument works.
357        #
358        # On Unix (non-Mac and non-Windows), Python looks at args[0] to
359        # determine where its standard library is, so we need the directory
360        # of args[0] to be valid for the Popen() call to Python to succeed.
361        # See also issue #16170 and issue #7774.
362        doesnotexist = os.path.join(os.path.dirname(sys.executable),
363                                    "doesnotexist")
364        self._assert_python([doesnotexist, "-c"], executable=sys.executable)
365
366    def test_bytes_executable(self):
367        doesnotexist = os.path.join(os.path.dirname(sys.executable),
368                                    "doesnotexist")
369        self._assert_python([doesnotexist, "-c"],
370                            executable=os.fsencode(sys.executable))
371
372    def test_pathlike_executable(self):
373        doesnotexist = os.path.join(os.path.dirname(sys.executable),
374                                    "doesnotexist")
375        self._assert_python([doesnotexist, "-c"],
376                            executable=FakePath(sys.executable))
377
378    def test_executable_takes_precedence(self):
379        # Check that the executable argument takes precedence over args[0].
380        #
381        # Verify first that the call succeeds without the executable arg.
382        pre_args = [sys.executable, "-c"]
383        self._assert_python(pre_args)
384        self.assertRaises(NONEXISTING_ERRORS,
385                          self._assert_python, pre_args,
386                          executable=NONEXISTING_CMD[0])
387
388    @unittest.skipIf(mswindows, "executable argument replaces shell")
389    def test_executable_replaces_shell(self):
390        # Check that the executable argument replaces the default shell
391        # when shell=True.
392        self._assert_python([], executable=sys.executable, shell=True)
393
394    @unittest.skipIf(mswindows, "executable argument replaces shell")
395    def test_bytes_executable_replaces_shell(self):
396        self._assert_python([], executable=os.fsencode(sys.executable),
397                            shell=True)
398
399    @unittest.skipIf(mswindows, "executable argument replaces shell")
400    def test_pathlike_executable_replaces_shell(self):
401        self._assert_python([], executable=FakePath(sys.executable),
402                            shell=True)
403
404    # For use in the test_cwd* tests below.
405    def _normalize_cwd(self, cwd):
406        # Normalize an expected cwd (for Tru64 support).
407        # We can't use os.path.realpath since it doesn't expand Tru64 {memb}
408        # strings.  See bug #1063571.
409        with os_helper.change_cwd(cwd):
410            return os.getcwd()
411
412    # For use in the test_cwd* tests below.
413    def _split_python_path(self):
414        # Return normalized (python_dir, python_base).
415        python_path = os.path.realpath(sys.executable)
416        return os.path.split(python_path)
417
418    # For use in the test_cwd* tests below.
419    def _assert_cwd(self, expected_cwd, python_arg, **kwargs):
420        # Invoke Python via Popen, and assert that (1) the call succeeds,
421        # and that (2) the current working directory of the child process
422        # matches *expected_cwd*.
423        p = subprocess.Popen([python_arg, "-c",
424                              "import os, sys; "
425                              "buf = sys.stdout.buffer; "
426                              "buf.write(os.getcwd().encode()); "
427                              "buf.flush(); "
428                              "sys.exit(47)"],
429                              stdout=subprocess.PIPE,
430                              **kwargs)
431        self.addCleanup(p.stdout.close)
432        p.wait()
433        self.assertEqual(47, p.returncode)
434        normcase = os.path.normcase
435        self.assertEqual(normcase(expected_cwd),
436                         normcase(p.stdout.read().decode()))
437
438    def test_cwd(self):
439        # Check that cwd changes the cwd for the child process.
440        temp_dir = tempfile.gettempdir()
441        temp_dir = self._normalize_cwd(temp_dir)
442        self._assert_cwd(temp_dir, sys.executable, cwd=temp_dir)
443
444    def test_cwd_with_bytes(self):
445        temp_dir = tempfile.gettempdir()
446        temp_dir = self._normalize_cwd(temp_dir)
447        self._assert_cwd(temp_dir, sys.executable, cwd=os.fsencode(temp_dir))
448
449    def test_cwd_with_pathlike(self):
450        temp_dir = tempfile.gettempdir()
451        temp_dir = self._normalize_cwd(temp_dir)
452        self._assert_cwd(temp_dir, sys.executable, cwd=FakePath(temp_dir))
453
454    @unittest.skipIf(mswindows, "pending resolution of issue #15533")
455    def test_cwd_with_relative_arg(self):
456        # Check that Popen looks for args[0] relative to cwd if args[0]
457        # is relative.
458        python_dir, python_base = self._split_python_path()
459        rel_python = os.path.join(os.curdir, python_base)
460        with os_helper.temp_cwd() as wrong_dir:
461            # Before calling with the correct cwd, confirm that the call fails
462            # without cwd and with the wrong cwd.
463            self.assertRaises(FileNotFoundError, subprocess.Popen,
464                              [rel_python])
465            self.assertRaises(FileNotFoundError, subprocess.Popen,
466                              [rel_python], cwd=wrong_dir)
467            python_dir = self._normalize_cwd(python_dir)
468            self._assert_cwd(python_dir, rel_python, cwd=python_dir)
469
470    @unittest.skipIf(mswindows, "pending resolution of issue #15533")
471    def test_cwd_with_relative_executable(self):
472        # Check that Popen looks for executable relative to cwd if executable
473        # is relative (and that executable takes precedence over args[0]).
474        python_dir, python_base = self._split_python_path()
475        rel_python = os.path.join(os.curdir, python_base)
476        doesntexist = "somethingyoudonthave"
477        with os_helper.temp_cwd() as wrong_dir:
478            # Before calling with the correct cwd, confirm that the call fails
479            # without cwd and with the wrong cwd.
480            self.assertRaises(FileNotFoundError, subprocess.Popen,
481                              [doesntexist], executable=rel_python)
482            self.assertRaises(FileNotFoundError, subprocess.Popen,
483                              [doesntexist], executable=rel_python,
484                              cwd=wrong_dir)
485            python_dir = self._normalize_cwd(python_dir)
486            self._assert_cwd(python_dir, doesntexist, executable=rel_python,
487                             cwd=python_dir)
488
489    def test_cwd_with_absolute_arg(self):
490        # Check that Popen can find the executable when the cwd is wrong
491        # if args[0] is an absolute path.
492        python_dir, python_base = self._split_python_path()
493        abs_python = os.path.join(python_dir, python_base)
494        rel_python = os.path.join(os.curdir, python_base)
495        with os_helper.temp_dir() as wrong_dir:
496            # Before calling with an absolute path, confirm that using a
497            # relative path fails.
498            self.assertRaises(FileNotFoundError, subprocess.Popen,
499                              [rel_python], cwd=wrong_dir)
500            wrong_dir = self._normalize_cwd(wrong_dir)
501            self._assert_cwd(wrong_dir, abs_python, cwd=wrong_dir)
502
503    @unittest.skipIf(sys.base_prefix != sys.prefix,
504                     'Test is not venv-compatible')
505    def test_executable_with_cwd(self):
506        python_dir, python_base = self._split_python_path()
507        python_dir = self._normalize_cwd(python_dir)
508        self._assert_cwd(python_dir, "somethingyoudonthave",
509                         executable=sys.executable, cwd=python_dir)
510
511    @unittest.skipIf(sys.base_prefix != sys.prefix,
512                     'Test is not venv-compatible')
513    @unittest.skipIf(sysconfig.is_python_build(),
514                     "need an installed Python. See #7774")
515    def test_executable_without_cwd(self):
516        # For a normal installation, it should work without 'cwd'
517        # argument.  For test runs in the build directory, see #7774.
518        self._assert_cwd(os.getcwd(), "somethingyoudonthave",
519                         executable=sys.executable)
520
521    def test_stdin_pipe(self):
522        # stdin redirection
523        p = subprocess.Popen([sys.executable, "-c",
524                         'import sys; sys.exit(sys.stdin.read() == "pear")'],
525                        stdin=subprocess.PIPE)
526        p.stdin.write(b"pear")
527        p.stdin.close()
528        p.wait()
529        self.assertEqual(p.returncode, 1)
530
531    def test_stdin_filedes(self):
532        # stdin is set to open file descriptor
533        tf = tempfile.TemporaryFile()
534        self.addCleanup(tf.close)
535        d = tf.fileno()
536        os.write(d, b"pear")
537        os.lseek(d, 0, 0)
538        p = subprocess.Popen([sys.executable, "-c",
539                         'import sys; sys.exit(sys.stdin.read() == "pear")'],
540                         stdin=d)
541        p.wait()
542        self.assertEqual(p.returncode, 1)
543
544    def test_stdin_fileobj(self):
545        # stdin is set to open file object
546        tf = tempfile.TemporaryFile()
547        self.addCleanup(tf.close)
548        tf.write(b"pear")
549        tf.seek(0)
550        p = subprocess.Popen([sys.executable, "-c",
551                         'import sys; sys.exit(sys.stdin.read() == "pear")'],
552                         stdin=tf)
553        p.wait()
554        self.assertEqual(p.returncode, 1)
555
556    def test_stdout_pipe(self):
557        # stdout redirection
558        p = subprocess.Popen([sys.executable, "-c",
559                          'import sys; sys.stdout.write("orange")'],
560                         stdout=subprocess.PIPE)
561        with p:
562            self.assertEqual(p.stdout.read(), b"orange")
563
564    def test_stdout_filedes(self):
565        # stdout is set to open file descriptor
566        tf = tempfile.TemporaryFile()
567        self.addCleanup(tf.close)
568        d = tf.fileno()
569        p = subprocess.Popen([sys.executable, "-c",
570                          'import sys; sys.stdout.write("orange")'],
571                         stdout=d)
572        p.wait()
573        os.lseek(d, 0, 0)
574        self.assertEqual(os.read(d, 1024), b"orange")
575
576    def test_stdout_fileobj(self):
577        # stdout is set to open file object
578        tf = tempfile.TemporaryFile()
579        self.addCleanup(tf.close)
580        p = subprocess.Popen([sys.executable, "-c",
581                          'import sys; sys.stdout.write("orange")'],
582                         stdout=tf)
583        p.wait()
584        tf.seek(0)
585        self.assertEqual(tf.read(), b"orange")
586
587    def test_stderr_pipe(self):
588        # stderr redirection
589        p = subprocess.Popen([sys.executable, "-c",
590                          'import sys; sys.stderr.write("strawberry")'],
591                         stderr=subprocess.PIPE)
592        with p:
593            self.assertEqual(p.stderr.read(), b"strawberry")
594
595    def test_stderr_filedes(self):
596        # stderr is set to open file descriptor
597        tf = tempfile.TemporaryFile()
598        self.addCleanup(tf.close)
599        d = tf.fileno()
600        p = subprocess.Popen([sys.executable, "-c",
601                          'import sys; sys.stderr.write("strawberry")'],
602                         stderr=d)
603        p.wait()
604        os.lseek(d, 0, 0)
605        self.assertEqual(os.read(d, 1024), b"strawberry")
606
607    def test_stderr_fileobj(self):
608        # stderr is set to open file object
609        tf = tempfile.TemporaryFile()
610        self.addCleanup(tf.close)
611        p = subprocess.Popen([sys.executable, "-c",
612                          'import sys; sys.stderr.write("strawberry")'],
613                         stderr=tf)
614        p.wait()
615        tf.seek(0)
616        self.assertEqual(tf.read(), b"strawberry")
617
618    def test_stderr_redirect_with_no_stdout_redirect(self):
619        # test stderr=STDOUT while stdout=None (not set)
620
621        # - grandchild prints to stderr
622        # - child redirects grandchild's stderr to its stdout
623        # - the parent should get grandchild's stderr in child's stdout
624        p = subprocess.Popen([sys.executable, "-c",
625                              'import sys, subprocess;'
626                              'rc = subprocess.call([sys.executable, "-c",'
627                              '    "import sys;"'
628                              '    "sys.stderr.write(\'42\')"],'
629                              '    stderr=subprocess.STDOUT);'
630                              'sys.exit(rc)'],
631                             stdout=subprocess.PIPE,
632                             stderr=subprocess.PIPE)
633        stdout, stderr = p.communicate()
634        #NOTE: stdout should get stderr from grandchild
635        self.assertEqual(stdout, b'42')
636        self.assertEqual(stderr, b'') # should be empty
637        self.assertEqual(p.returncode, 0)
638
639    def test_stdout_stderr_pipe(self):
640        # capture stdout and stderr to the same pipe
641        p = subprocess.Popen([sys.executable, "-c",
642                              'import sys;'
643                              'sys.stdout.write("apple");'
644                              'sys.stdout.flush();'
645                              'sys.stderr.write("orange")'],
646                             stdout=subprocess.PIPE,
647                             stderr=subprocess.STDOUT)
648        with p:
649            self.assertEqual(p.stdout.read(), b"appleorange")
650
651    def test_stdout_stderr_file(self):
652        # capture stdout and stderr to the same open file
653        tf = tempfile.TemporaryFile()
654        self.addCleanup(tf.close)
655        p = subprocess.Popen([sys.executable, "-c",
656                              'import sys;'
657                              'sys.stdout.write("apple");'
658                              'sys.stdout.flush();'
659                              'sys.stderr.write("orange")'],
660                             stdout=tf,
661                             stderr=tf)
662        p.wait()
663        tf.seek(0)
664        self.assertEqual(tf.read(), b"appleorange")
665
666    def test_stdout_filedes_of_stdout(self):
667        # stdout is set to 1 (#1531862).
668        # To avoid printing the text on stdout, we do something similar to
669        # test_stdout_none (see above).  The parent subprocess calls the child
670        # subprocess passing stdout=1, and this test uses stdout=PIPE in
671        # order to capture and check the output of the parent. See #11963.
672        code = ('import sys, subprocess; '
673                'rc = subprocess.call([sys.executable, "-c", '
674                '    "import os, sys; sys.exit(os.write(sys.stdout.fileno(), '
675                     'b\'test with stdout=1\'))"], stdout=1); '
676                'assert rc == 18')
677        p = subprocess.Popen([sys.executable, "-c", code],
678                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
679        self.addCleanup(p.stdout.close)
680        self.addCleanup(p.stderr.close)
681        out, err = p.communicate()
682        self.assertEqual(p.returncode, 0, err)
683        self.assertEqual(out.rstrip(), b'test with stdout=1')
684
685    def test_stdout_devnull(self):
686        p = subprocess.Popen([sys.executable, "-c",
687                              'for i in range(10240):'
688                              'print("x" * 1024)'],
689                              stdout=subprocess.DEVNULL)
690        p.wait()
691        self.assertEqual(p.stdout, None)
692
693    def test_stderr_devnull(self):
694        p = subprocess.Popen([sys.executable, "-c",
695                              'import sys\n'
696                              'for i in range(10240):'
697                              'sys.stderr.write("x" * 1024)'],
698                              stderr=subprocess.DEVNULL)
699        p.wait()
700        self.assertEqual(p.stderr, None)
701
702    def test_stdin_devnull(self):
703        p = subprocess.Popen([sys.executable, "-c",
704                              'import sys;'
705                              'sys.stdin.read(1)'],
706                              stdin=subprocess.DEVNULL)
707        p.wait()
708        self.assertEqual(p.stdin, None)
709
710    @unittest.skipUnless(fcntl and hasattr(fcntl, 'F_GETPIPE_SZ'),
711                         'fcntl.F_GETPIPE_SZ required for test.')
712    def test_pipesizes(self):
713        test_pipe_r, test_pipe_w = os.pipe()
714        try:
715            # Get the default pipesize with F_GETPIPE_SZ
716            pipesize_default = fcntl.fcntl(test_pipe_w, fcntl.F_GETPIPE_SZ)
717        finally:
718            os.close(test_pipe_r)
719            os.close(test_pipe_w)
720        pipesize = pipesize_default // 2
721        if pipesize < 512:  # the POSIX minimum
722            raise unittest.SkipTest(
723                'default pipesize too small to perform test.')
724        p = subprocess.Popen(
725            [sys.executable, "-c",
726             'import sys; sys.stdin.read(); sys.stdout.write("out"); '
727             'sys.stderr.write("error!")'],
728            stdin=subprocess.PIPE, stdout=subprocess.PIPE,
729            stderr=subprocess.PIPE, pipesize=pipesize)
730        try:
731            for fifo in [p.stdin, p.stdout, p.stderr]:
732                self.assertEqual(
733                    fcntl.fcntl(fifo.fileno(), fcntl.F_GETPIPE_SZ),
734                    pipesize)
735            # Windows pipe size can be acquired via GetNamedPipeInfoFunction
736            # https://docs.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-getnamedpipeinfo
737            # However, this function is not yet in _winapi.
738            p.stdin.write(b"pear")
739            p.stdin.close()
740            p.stdout.close()
741            p.stderr.close()
742        finally:
743            p.kill()
744            p.wait()
745
746    @unittest.skipUnless(fcntl and hasattr(fcntl, 'F_GETPIPE_SZ'),
747                         'fcntl.F_GETPIPE_SZ required for test.')
748    def test_pipesize_default(self):
749        p = subprocess.Popen(
750            [sys.executable, "-c",
751             'import sys; sys.stdin.read(); sys.stdout.write("out"); '
752             'sys.stderr.write("error!")'],
753            stdin=subprocess.PIPE, stdout=subprocess.PIPE,
754            stderr=subprocess.PIPE, pipesize=-1)
755        try:
756            fp_r, fp_w = os.pipe()
757            try:
758                default_pipesize = fcntl.fcntl(fp_w, fcntl.F_GETPIPE_SZ)
759                for fifo in [p.stdin, p.stdout, p.stderr]:
760                    self.assertEqual(
761                        fcntl.fcntl(fifo.fileno(), fcntl.F_GETPIPE_SZ),
762                        default_pipesize)
763            finally:
764                os.close(fp_r)
765                os.close(fp_w)
766            # On other platforms we cannot test the pipe size (yet). But above
767            # code using pipesize=-1 should not crash.
768            p.stdin.close()
769            p.stdout.close()
770            p.stderr.close()
771        finally:
772            p.kill()
773            p.wait()
774
775    def test_env(self):
776        newenv = os.environ.copy()
777        newenv["FRUIT"] = "orange"
778        with subprocess.Popen([sys.executable, "-c",
779                               'import sys,os;'
780                               'sys.stdout.write(os.getenv("FRUIT"))'],
781                              stdout=subprocess.PIPE,
782                              env=newenv) as p:
783            stdout, stderr = p.communicate()
784            self.assertEqual(stdout, b"orange")
785
786    # Windows requires at least the SYSTEMROOT environment variable to start
787    # Python
788    @unittest.skipIf(sys.platform == 'win32',
789                     'cannot test an empty env on Windows')
790    @unittest.skipIf(sysconfig.get_config_var('Py_ENABLE_SHARED') == 1,
791                     'The Python shared library cannot be loaded '
792                     'with an empty environment.')
793    @unittest.skipIf(check_sanitizer(address=True),
794                     'AddressSanitizer adds to the environment.')
795    def test_empty_env(self):
796        """Verify that env={} is as empty as possible."""
797
798        def is_env_var_to_ignore(n):
799            """Determine if an environment variable is under our control."""
800            # This excludes some __CF_* and VERSIONER_* keys MacOS insists
801            # on adding even when the environment in exec is empty.
802            # Gentoo sandboxes also force LD_PRELOAD and SANDBOX_* to exist.
803            return ('VERSIONER' in n or '__CF' in n or  # MacOS
804                    n == 'LD_PRELOAD' or n.startswith('SANDBOX') or # Gentoo
805                    n == 'LC_CTYPE') # Locale coercion triggered
806
807        with subprocess.Popen([sys.executable, "-c",
808                               'import os; print(list(os.environ.keys()))'],
809                              stdout=subprocess.PIPE, env={}) as p:
810            stdout, stderr = p.communicate()
811            child_env_names = eval(stdout.strip())
812            self.assertIsInstance(child_env_names, list)
813            child_env_names = [k for k in child_env_names
814                               if not is_env_var_to_ignore(k)]
815            self.assertEqual(child_env_names, [])
816
817    def test_invalid_cmd(self):
818        # null character in the command name
819        cmd = sys.executable + '\0'
820        with self.assertRaises(ValueError):
821            subprocess.Popen([cmd, "-c", "pass"])
822
823        # null character in the command argument
824        with self.assertRaises(ValueError):
825            subprocess.Popen([sys.executable, "-c", "pass#\0"])
826
827    def test_invalid_env(self):
828        # null character in the environment variable name
829        newenv = os.environ.copy()
830        newenv["FRUIT\0VEGETABLE"] = "cabbage"
831        with self.assertRaises(ValueError):
832            subprocess.Popen(ZERO_RETURN_CMD, env=newenv)
833
834        # null character in the environment variable value
835        newenv = os.environ.copy()
836        newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
837        with self.assertRaises(ValueError):
838            subprocess.Popen(ZERO_RETURN_CMD, env=newenv)
839
840        # equal character in the environment variable name
841        newenv = os.environ.copy()
842        newenv["FRUIT=ORANGE"] = "lemon"
843        with self.assertRaises(ValueError):
844            subprocess.Popen(ZERO_RETURN_CMD, env=newenv)
845
846        # equal character in the environment variable value
847        newenv = os.environ.copy()
848        newenv["FRUIT"] = "orange=lemon"
849        with subprocess.Popen([sys.executable, "-c",
850                               'import sys, os;'
851                               'sys.stdout.write(os.getenv("FRUIT"))'],
852                              stdout=subprocess.PIPE,
853                              env=newenv) as p:
854            stdout, stderr = p.communicate()
855            self.assertEqual(stdout, b"orange=lemon")
856
857    def test_communicate_stdin(self):
858        p = subprocess.Popen([sys.executable, "-c",
859                              'import sys;'
860                              'sys.exit(sys.stdin.read() == "pear")'],
861                             stdin=subprocess.PIPE)
862        p.communicate(b"pear")
863        self.assertEqual(p.returncode, 1)
864
865    def test_communicate_stdout(self):
866        p = subprocess.Popen([sys.executable, "-c",
867                              'import sys; sys.stdout.write("pineapple")'],
868                             stdout=subprocess.PIPE)
869        (stdout, stderr) = p.communicate()
870        self.assertEqual(stdout, b"pineapple")
871        self.assertEqual(stderr, None)
872
873    def test_communicate_stderr(self):
874        p = subprocess.Popen([sys.executable, "-c",
875                              'import sys; sys.stderr.write("pineapple")'],
876                             stderr=subprocess.PIPE)
877        (stdout, stderr) = p.communicate()
878        self.assertEqual(stdout, None)
879        self.assertEqual(stderr, b"pineapple")
880
881    def test_communicate(self):
882        p = subprocess.Popen([sys.executable, "-c",
883                              'import sys,os;'
884                              'sys.stderr.write("pineapple");'
885                              'sys.stdout.write(sys.stdin.read())'],
886                             stdin=subprocess.PIPE,
887                             stdout=subprocess.PIPE,
888                             stderr=subprocess.PIPE)
889        self.addCleanup(p.stdout.close)
890        self.addCleanup(p.stderr.close)
891        self.addCleanup(p.stdin.close)
892        (stdout, stderr) = p.communicate(b"banana")
893        self.assertEqual(stdout, b"banana")
894        self.assertEqual(stderr, b"pineapple")
895
896    def test_communicate_timeout(self):
897        p = subprocess.Popen([sys.executable, "-c",
898                              'import sys,os,time;'
899                              'sys.stderr.write("pineapple\\n");'
900                              'time.sleep(1);'
901                              'sys.stderr.write("pear\\n");'
902                              'sys.stdout.write(sys.stdin.read())'],
903                             universal_newlines=True,
904                             stdin=subprocess.PIPE,
905                             stdout=subprocess.PIPE,
906                             stderr=subprocess.PIPE)
907        self.assertRaises(subprocess.TimeoutExpired, p.communicate, "banana",
908                          timeout=0.3)
909        # Make sure we can keep waiting for it, and that we get the whole output
910        # after it completes.
911        (stdout, stderr) = p.communicate()
912        self.assertEqual(stdout, "banana")
913        self.assertEqual(stderr.encode(), b"pineapple\npear\n")
914
915    def test_communicate_timeout_large_output(self):
916        # Test an expiring timeout while the child is outputting lots of data.
917        p = subprocess.Popen([sys.executable, "-c",
918                              'import sys,os,time;'
919                              'sys.stdout.write("a" * (64 * 1024));'
920                              'time.sleep(0.2);'
921                              'sys.stdout.write("a" * (64 * 1024));'
922                              'time.sleep(0.2);'
923                              'sys.stdout.write("a" * (64 * 1024));'
924                              'time.sleep(0.2);'
925                              'sys.stdout.write("a" * (64 * 1024));'],
926                             stdout=subprocess.PIPE)
927        self.assertRaises(subprocess.TimeoutExpired, p.communicate, timeout=0.4)
928        (stdout, _) = p.communicate()
929        self.assertEqual(len(stdout), 4 * 64 * 1024)
930
931    # Test for the fd leak reported in http://bugs.python.org/issue2791.
932    def test_communicate_pipe_fd_leak(self):
933        for stdin_pipe in (False, True):
934            for stdout_pipe in (False, True):
935                for stderr_pipe in (False, True):
936                    options = {}
937                    if stdin_pipe:
938                        options['stdin'] = subprocess.PIPE
939                    if stdout_pipe:
940                        options['stdout'] = subprocess.PIPE
941                    if stderr_pipe:
942                        options['stderr'] = subprocess.PIPE
943                    if not options:
944                        continue
945                    p = subprocess.Popen(ZERO_RETURN_CMD, **options)
946                    p.communicate()
947                    if p.stdin is not None:
948                        self.assertTrue(p.stdin.closed)
949                    if p.stdout is not None:
950                        self.assertTrue(p.stdout.closed)
951                    if p.stderr is not None:
952                        self.assertTrue(p.stderr.closed)
953
954    def test_communicate_returns(self):
955        # communicate() should return None if no redirection is active
956        p = subprocess.Popen([sys.executable, "-c",
957                              "import sys; sys.exit(47)"])
958        (stdout, stderr) = p.communicate()
959        self.assertEqual(stdout, None)
960        self.assertEqual(stderr, None)
961
962    def test_communicate_pipe_buf(self):
963        # communicate() with writes larger than pipe_buf
964        # This test will probably deadlock rather than fail, if
965        # communicate() does not work properly.
966        x, y = os.pipe()
967        os.close(x)
968        os.close(y)
969        p = subprocess.Popen([sys.executable, "-c",
970                              'import sys,os;'
971                              'sys.stdout.write(sys.stdin.read(47));'
972                              'sys.stderr.write("x" * %d);'
973                              'sys.stdout.write(sys.stdin.read())' %
974                              support.PIPE_MAX_SIZE],
975                             stdin=subprocess.PIPE,
976                             stdout=subprocess.PIPE,
977                             stderr=subprocess.PIPE)
978        self.addCleanup(p.stdout.close)
979        self.addCleanup(p.stderr.close)
980        self.addCleanup(p.stdin.close)
981        string_to_write = b"a" * support.PIPE_MAX_SIZE
982        (stdout, stderr) = p.communicate(string_to_write)
983        self.assertEqual(stdout, string_to_write)
984
985    def test_writes_before_communicate(self):
986        # stdin.write before communicate()
987        p = subprocess.Popen([sys.executable, "-c",
988                              'import sys,os;'
989                              'sys.stdout.write(sys.stdin.read())'],
990                             stdin=subprocess.PIPE,
991                             stdout=subprocess.PIPE,
992                             stderr=subprocess.PIPE)
993        self.addCleanup(p.stdout.close)
994        self.addCleanup(p.stderr.close)
995        self.addCleanup(p.stdin.close)
996        p.stdin.write(b"banana")
997        (stdout, stderr) = p.communicate(b"split")
998        self.assertEqual(stdout, b"bananasplit")
999        self.assertEqual(stderr, b"")
1000
1001    def test_universal_newlines_and_text(self):
1002        args = [
1003            sys.executable, "-c",
1004            'import sys,os;' + SETBINARY +
1005            'buf = sys.stdout.buffer;'
1006            'buf.write(sys.stdin.readline().encode());'
1007            'buf.flush();'
1008            'buf.write(b"line2\\n");'
1009            'buf.flush();'
1010            'buf.write(sys.stdin.read().encode());'
1011            'buf.flush();'
1012            'buf.write(b"line4\\n");'
1013            'buf.flush();'
1014            'buf.write(b"line5\\r\\n");'
1015            'buf.flush();'
1016            'buf.write(b"line6\\r");'
1017            'buf.flush();'
1018            'buf.write(b"\\nline7");'
1019            'buf.flush();'
1020            'buf.write(b"\\nline8");']
1021
1022        for extra_kwarg in ('universal_newlines', 'text'):
1023            p = subprocess.Popen(args, **{'stdin': subprocess.PIPE,
1024                                          'stdout': subprocess.PIPE,
1025                                          extra_kwarg: True})
1026            with p:
1027                p.stdin.write("line1\n")
1028                p.stdin.flush()
1029                self.assertEqual(p.stdout.readline(), "line1\n")
1030                p.stdin.write("line3\n")
1031                p.stdin.close()
1032                self.addCleanup(p.stdout.close)
1033                self.assertEqual(p.stdout.readline(),
1034                                 "line2\n")
1035                self.assertEqual(p.stdout.read(6),
1036                                 "line3\n")
1037                self.assertEqual(p.stdout.read(),
1038                                 "line4\nline5\nline6\nline7\nline8")
1039
1040    def test_universal_newlines_communicate(self):
1041        # universal newlines through communicate()
1042        p = subprocess.Popen([sys.executable, "-c",
1043                              'import sys,os;' + SETBINARY +
1044                              'buf = sys.stdout.buffer;'
1045                              'buf.write(b"line2\\n");'
1046                              'buf.flush();'
1047                              'buf.write(b"line4\\n");'
1048                              'buf.flush();'
1049                              'buf.write(b"line5\\r\\n");'
1050                              'buf.flush();'
1051                              'buf.write(b"line6\\r");'
1052                              'buf.flush();'
1053                              'buf.write(b"\\nline7");'
1054                              'buf.flush();'
1055                              'buf.write(b"\\nline8");'],
1056                             stderr=subprocess.PIPE,
1057                             stdout=subprocess.PIPE,
1058                             universal_newlines=1)
1059        self.addCleanup(p.stdout.close)
1060        self.addCleanup(p.stderr.close)
1061        (stdout, stderr) = p.communicate()
1062        self.assertEqual(stdout,
1063                         "line2\nline4\nline5\nline6\nline7\nline8")
1064
1065    def test_universal_newlines_communicate_stdin(self):
1066        # universal newlines through communicate(), with only stdin
1067        p = subprocess.Popen([sys.executable, "-c",
1068                              'import sys,os;' + SETBINARY + textwrap.dedent('''
1069                               s = sys.stdin.readline()
1070                               assert s == "line1\\n", repr(s)
1071                               s = sys.stdin.read()
1072                               assert s == "line3\\n", repr(s)
1073                              ''')],
1074                             stdin=subprocess.PIPE,
1075                             universal_newlines=1)
1076        (stdout, stderr) = p.communicate("line1\nline3\n")
1077        self.assertEqual(p.returncode, 0)
1078
1079    def test_universal_newlines_communicate_input_none(self):
1080        # Test communicate(input=None) with universal newlines.
1081        #
1082        # We set stdout to PIPE because, as of this writing, a different
1083        # code path is tested when the number of pipes is zero or one.
1084        p = subprocess.Popen(ZERO_RETURN_CMD,
1085                             stdin=subprocess.PIPE,
1086                             stdout=subprocess.PIPE,
1087                             universal_newlines=True)
1088        p.communicate()
1089        self.assertEqual(p.returncode, 0)
1090
1091    def test_universal_newlines_communicate_stdin_stdout_stderr(self):
1092        # universal newlines through communicate(), with stdin, stdout, stderr
1093        p = subprocess.Popen([sys.executable, "-c",
1094                              'import sys,os;' + SETBINARY + textwrap.dedent('''
1095                               s = sys.stdin.buffer.readline()
1096                               sys.stdout.buffer.write(s)
1097                               sys.stdout.buffer.write(b"line2\\r")
1098                               sys.stderr.buffer.write(b"eline2\\n")
1099                               s = sys.stdin.buffer.read()
1100                               sys.stdout.buffer.write(s)
1101                               sys.stdout.buffer.write(b"line4\\n")
1102                               sys.stdout.buffer.write(b"line5\\r\\n")
1103                               sys.stderr.buffer.write(b"eline6\\r")
1104                               sys.stderr.buffer.write(b"eline7\\r\\nz")
1105                              ''')],
1106                             stdin=subprocess.PIPE,
1107                             stderr=subprocess.PIPE,
1108                             stdout=subprocess.PIPE,
1109                             universal_newlines=True)
1110        self.addCleanup(p.stdout.close)
1111        self.addCleanup(p.stderr.close)
1112        (stdout, stderr) = p.communicate("line1\nline3\n")
1113        self.assertEqual(p.returncode, 0)
1114        self.assertEqual("line1\nline2\nline3\nline4\nline5\n", stdout)
1115        # Python debug build push something like "[42442 refs]\n"
1116        # to stderr at exit of subprocess.
1117        self.assertTrue(stderr.startswith("eline2\neline6\neline7\n"))
1118
1119    def test_universal_newlines_communicate_encodings(self):
1120        # Check that universal newlines mode works for various encodings,
1121        # in particular for encodings in the UTF-16 and UTF-32 families.
1122        # See issue #15595.
1123        #
1124        # UTF-16 and UTF-32-BE are sufficient to check both with BOM and
1125        # without, and UTF-16 and UTF-32.
1126        for encoding in ['utf-16', 'utf-32-be']:
1127            code = ("import sys; "
1128                    r"sys.stdout.buffer.write('1\r\n2\r3\n4'.encode('%s'))" %
1129                    encoding)
1130            args = [sys.executable, '-c', code]
1131            # We set stdin to be non-None because, as of this writing,
1132            # a different code path is used when the number of pipes is
1133            # zero or one.
1134            popen = subprocess.Popen(args,
1135                                     stdin=subprocess.PIPE,
1136                                     stdout=subprocess.PIPE,
1137                                     encoding=encoding)
1138            stdout, stderr = popen.communicate(input='')
1139            self.assertEqual(stdout, '1\n2\n3\n4')
1140
1141    def test_communicate_errors(self):
1142        for errors, expected in [
1143            ('ignore', ''),
1144            ('replace', '\ufffd\ufffd'),
1145            ('surrogateescape', '\udc80\udc80'),
1146            ('backslashreplace', '\\x80\\x80'),
1147        ]:
1148            code = ("import sys; "
1149                    r"sys.stdout.buffer.write(b'[\x80\x80]')")
1150            args = [sys.executable, '-c', code]
1151            # We set stdin to be non-None because, as of this writing,
1152            # a different code path is used when the number of pipes is
1153            # zero or one.
1154            popen = subprocess.Popen(args,
1155                                     stdin=subprocess.PIPE,
1156                                     stdout=subprocess.PIPE,
1157                                     encoding='utf-8',
1158                                     errors=errors)
1159            stdout, stderr = popen.communicate(input='')
1160            self.assertEqual(stdout, '[{}]'.format(expected))
1161
1162    def test_no_leaking(self):
1163        # Make sure we leak no resources
1164        if not mswindows:
1165            max_handles = 1026 # too much for most UNIX systems
1166        else:
1167            max_handles = 2050 # too much for (at least some) Windows setups
1168        handles = []
1169        tmpdir = tempfile.mkdtemp()
1170        try:
1171            for i in range(max_handles):
1172                try:
1173                    tmpfile = os.path.join(tmpdir, os_helper.TESTFN)
1174                    handles.append(os.open(tmpfile, os.O_WRONLY|os.O_CREAT))
1175                except OSError as e:
1176                    if e.errno != errno.EMFILE:
1177                        raise
1178                    break
1179            else:
1180                self.skipTest("failed to reach the file descriptor limit "
1181                    "(tried %d)" % max_handles)
1182            # Close a couple of them (should be enough for a subprocess)
1183            for i in range(10):
1184                os.close(handles.pop())
1185            # Loop creating some subprocesses. If one of them leaks some fds,
1186            # the next loop iteration will fail by reaching the max fd limit.
1187            for i in range(15):
1188                p = subprocess.Popen([sys.executable, "-c",
1189                                      "import sys;"
1190                                      "sys.stdout.write(sys.stdin.read())"],
1191                                     stdin=subprocess.PIPE,
1192                                     stdout=subprocess.PIPE,
1193                                     stderr=subprocess.PIPE)
1194                data = p.communicate(b"lime")[0]
1195                self.assertEqual(data, b"lime")
1196        finally:
1197            for h in handles:
1198                os.close(h)
1199            shutil.rmtree(tmpdir)
1200
1201    def test_list2cmdline(self):
1202        self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']),
1203                         '"a b c" d e')
1204        self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']),
1205                         'ab\\"c \\ d')
1206        self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']),
1207                         'ab\\"c " \\\\" d')
1208        self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']),
1209                         'a\\\\\\b "de fg" h')
1210        self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']),
1211                         'a\\\\\\"b c d')
1212        self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']),
1213                         '"a\\\\b c" d e')
1214        self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']),
1215                         '"a\\\\b\\ c" d e')
1216        self.assertEqual(subprocess.list2cmdline(['ab', '']),
1217                         'ab ""')
1218
1219    def test_poll(self):
1220        p = subprocess.Popen([sys.executable, "-c",
1221                              "import os; os.read(0, 1)"],
1222                             stdin=subprocess.PIPE)
1223        self.addCleanup(p.stdin.close)
1224        self.assertIsNone(p.poll())
1225        os.write(p.stdin.fileno(), b'A')
1226        p.wait()
1227        # Subsequent invocations should just return the returncode
1228        self.assertEqual(p.poll(), 0)
1229
1230    def test_wait(self):
1231        p = subprocess.Popen(ZERO_RETURN_CMD)
1232        self.assertEqual(p.wait(), 0)
1233        # Subsequent invocations should just return the returncode
1234        self.assertEqual(p.wait(), 0)
1235
1236    def test_wait_timeout(self):
1237        p = subprocess.Popen([sys.executable,
1238                              "-c", "import time; time.sleep(0.3)"])
1239        with self.assertRaises(subprocess.TimeoutExpired) as c:
1240            p.wait(timeout=0.0001)
1241        self.assertIn("0.0001", str(c.exception))  # For coverage of __str__.
1242        self.assertEqual(p.wait(timeout=support.SHORT_TIMEOUT), 0)
1243
1244    def test_invalid_bufsize(self):
1245        # an invalid type of the bufsize argument should raise
1246        # TypeError.
1247        with self.assertRaises(TypeError):
1248            subprocess.Popen(ZERO_RETURN_CMD, "orange")
1249
1250    def test_bufsize_is_none(self):
1251        # bufsize=None should be the same as bufsize=0.
1252        p = subprocess.Popen(ZERO_RETURN_CMD, None)
1253        self.assertEqual(p.wait(), 0)
1254        # Again with keyword arg
1255        p = subprocess.Popen(ZERO_RETURN_CMD, bufsize=None)
1256        self.assertEqual(p.wait(), 0)
1257
1258    def _test_bufsize_equal_one(self, line, expected, universal_newlines):
1259        # subprocess may deadlock with bufsize=1, see issue #21332
1260        with subprocess.Popen([sys.executable, "-c", "import sys;"
1261                               "sys.stdout.write(sys.stdin.readline());"
1262                               "sys.stdout.flush()"],
1263                              stdin=subprocess.PIPE,
1264                              stdout=subprocess.PIPE,
1265                              stderr=subprocess.DEVNULL,
1266                              bufsize=1,
1267                              universal_newlines=universal_newlines) as p:
1268            p.stdin.write(line) # expect that it flushes the line in text mode
1269            os.close(p.stdin.fileno()) # close it without flushing the buffer
1270            read_line = p.stdout.readline()
1271            with support.SuppressCrashReport():
1272                try:
1273                    p.stdin.close()
1274                except OSError:
1275                    pass
1276            p.stdin = None
1277        self.assertEqual(p.returncode, 0)
1278        self.assertEqual(read_line, expected)
1279
1280    def test_bufsize_equal_one_text_mode(self):
1281        # line is flushed in text mode with bufsize=1.
1282        # we should get the full line in return
1283        line = "line\n"
1284        self._test_bufsize_equal_one(line, line, universal_newlines=True)
1285
1286    def test_bufsize_equal_one_binary_mode(self):
1287        # line is not flushed in binary mode with bufsize=1.
1288        # we should get empty response
1289        line = b'line' + os.linesep.encode() # assume ascii-based locale
1290        with self.assertWarnsRegex(RuntimeWarning, 'line buffering'):
1291            self._test_bufsize_equal_one(line, b'', universal_newlines=False)
1292
1293    def test_leaking_fds_on_error(self):
1294        # see bug #5179: Popen leaks file descriptors to PIPEs if
1295        # the child fails to execute; this will eventually exhaust
1296        # the maximum number of open fds. 1024 seems a very common
1297        # value for that limit, but Windows has 2048, so we loop
1298        # 1024 times (each call leaked two fds).
1299        for i in range(1024):
1300            with self.assertRaises(NONEXISTING_ERRORS):
1301                subprocess.Popen(NONEXISTING_CMD,
1302                                 stdout=subprocess.PIPE,
1303                                 stderr=subprocess.PIPE)
1304
1305    def test_nonexisting_with_pipes(self):
1306        # bpo-30121: Popen with pipes must close properly pipes on error.
1307        # Previously, os.close() was called with a Windows handle which is not
1308        # a valid file descriptor.
1309        #
1310        # Run the test in a subprocess to control how the CRT reports errors
1311        # and to get stderr content.
1312        try:
1313            import msvcrt
1314            msvcrt.CrtSetReportMode
1315        except (AttributeError, ImportError):
1316            self.skipTest("need msvcrt.CrtSetReportMode")
1317
1318        code = textwrap.dedent(f"""
1319            import msvcrt
1320            import subprocess
1321
1322            cmd = {NONEXISTING_CMD!r}
1323
1324            for report_type in [msvcrt.CRT_WARN,
1325                                msvcrt.CRT_ERROR,
1326                                msvcrt.CRT_ASSERT]:
1327                msvcrt.CrtSetReportMode(report_type, msvcrt.CRTDBG_MODE_FILE)
1328                msvcrt.CrtSetReportFile(report_type, msvcrt.CRTDBG_FILE_STDERR)
1329
1330            try:
1331                subprocess.Popen(cmd,
1332                                 stdout=subprocess.PIPE,
1333                                 stderr=subprocess.PIPE)
1334            except OSError:
1335                pass
1336        """)
1337        cmd = [sys.executable, "-c", code]
1338        proc = subprocess.Popen(cmd,
1339                                stderr=subprocess.PIPE,
1340                                universal_newlines=True)
1341        with proc:
1342            stderr = proc.communicate()[1]
1343        self.assertEqual(stderr, "")
1344        self.assertEqual(proc.returncode, 0)
1345
1346    def test_double_close_on_error(self):
1347        # Issue #18851
1348        fds = []
1349        def open_fds():
1350            for i in range(20):
1351                fds.extend(os.pipe())
1352                time.sleep(0.001)
1353        t = threading.Thread(target=open_fds)
1354        t.start()
1355        try:
1356            with self.assertRaises(EnvironmentError):
1357                subprocess.Popen(NONEXISTING_CMD,
1358                                 stdin=subprocess.PIPE,
1359                                 stdout=subprocess.PIPE,
1360                                 stderr=subprocess.PIPE)
1361        finally:
1362            t.join()
1363            exc = None
1364            for fd in fds:
1365                # If a double close occurred, some of those fds will
1366                # already have been closed by mistake, and os.close()
1367                # here will raise.
1368                try:
1369                    os.close(fd)
1370                except OSError as e:
1371                    exc = e
1372            if exc is not None:
1373                raise exc
1374
1375    def test_threadsafe_wait(self):
1376        """Issue21291: Popen.wait() needs to be threadsafe for returncode."""
1377        proc = subprocess.Popen([sys.executable, '-c',
1378                                 'import time; time.sleep(12)'])
1379        self.assertEqual(proc.returncode, None)
1380        results = []
1381
1382        def kill_proc_timer_thread():
1383            results.append(('thread-start-poll-result', proc.poll()))
1384            # terminate it from the thread and wait for the result.
1385            proc.kill()
1386            proc.wait()
1387            results.append(('thread-after-kill-and-wait', proc.returncode))
1388            # this wait should be a no-op given the above.
1389            proc.wait()
1390            results.append(('thread-after-second-wait', proc.returncode))
1391
1392        # This is a timing sensitive test, the failure mode is
1393        # triggered when both the main thread and this thread are in
1394        # the wait() call at once.  The delay here is to allow the
1395        # main thread to most likely be blocked in its wait() call.
1396        t = threading.Timer(0.2, kill_proc_timer_thread)
1397        t.start()
1398
1399        if mswindows:
1400            expected_errorcode = 1
1401        else:
1402            # Should be -9 because of the proc.kill() from the thread.
1403            expected_errorcode = -9
1404
1405        # Wait for the process to finish; the thread should kill it
1406        # long before it finishes on its own.  Supplying a timeout
1407        # triggers a different code path for better coverage.
1408        proc.wait(timeout=support.SHORT_TIMEOUT)
1409        self.assertEqual(proc.returncode, expected_errorcode,
1410                         msg="unexpected result in wait from main thread")
1411
1412        # This should be a no-op with no change in returncode.
1413        proc.wait()
1414        self.assertEqual(proc.returncode, expected_errorcode,
1415                         msg="unexpected result in second main wait.")
1416
1417        t.join()
1418        # Ensure that all of the thread results are as expected.
1419        # When a race condition occurs in wait(), the returncode could
1420        # be set by the wrong thread that doesn't actually have it
1421        # leading to an incorrect value.
1422        self.assertEqual([('thread-start-poll-result', None),
1423                          ('thread-after-kill-and-wait', expected_errorcode),
1424                          ('thread-after-second-wait', expected_errorcode)],
1425                         results)
1426
1427    def test_issue8780(self):
1428        # Ensure that stdout is inherited from the parent
1429        # if stdout=PIPE is not used
1430        code = ';'.join((
1431            'import subprocess, sys',
1432            'retcode = subprocess.call('
1433                "[sys.executable, '-c', 'print(\"Hello World!\")'])",
1434            'assert retcode == 0'))
1435        output = subprocess.check_output([sys.executable, '-c', code])
1436        self.assertTrue(output.startswith(b'Hello World!'), ascii(output))
1437
1438    def test_handles_closed_on_exception(self):
1439        # If CreateProcess exits with an error, ensure the
1440        # duplicate output handles are released
1441        ifhandle, ifname = tempfile.mkstemp()
1442        ofhandle, ofname = tempfile.mkstemp()
1443        efhandle, efname = tempfile.mkstemp()
1444        try:
1445            subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle,
1446              stderr=efhandle)
1447        except OSError:
1448            os.close(ifhandle)
1449            os.remove(ifname)
1450            os.close(ofhandle)
1451            os.remove(ofname)
1452            os.close(efhandle)
1453            os.remove(efname)
1454        self.assertFalse(os.path.exists(ifname))
1455        self.assertFalse(os.path.exists(ofname))
1456        self.assertFalse(os.path.exists(efname))
1457
1458    def test_communicate_epipe(self):
1459        # Issue 10963: communicate() should hide EPIPE
1460        p = subprocess.Popen(ZERO_RETURN_CMD,
1461                             stdin=subprocess.PIPE,
1462                             stdout=subprocess.PIPE,
1463                             stderr=subprocess.PIPE)
1464        self.addCleanup(p.stdout.close)
1465        self.addCleanup(p.stderr.close)
1466        self.addCleanup(p.stdin.close)
1467        p.communicate(b"x" * 2**20)
1468
1469    def test_repr(self):
1470        path_cmd = pathlib.Path("my-tool.py")
1471        pathlib_cls = path_cmd.__class__.__name__
1472
1473        cases = [
1474            ("ls", True, 123, "<Popen: returncode: 123 args: 'ls'>"),
1475            ('a' * 100, True, 0,
1476             "<Popen: returncode: 0 args: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa...>"),
1477            (["ls"], False, None, "<Popen: returncode: None args: ['ls']>"),
1478            (["ls", '--my-opts', 'a' * 100], False, None,
1479             "<Popen: returncode: None args: ['ls', '--my-opts', 'aaaaaaaaaaaaaaaaaaaaaaaa...>"),
1480            (path_cmd, False, 7, f"<Popen: returncode: 7 args: {pathlib_cls}('my-tool.py')>")
1481        ]
1482        with unittest.mock.patch.object(subprocess.Popen, '_execute_child'):
1483            for cmd, shell, code, sx in cases:
1484                p = subprocess.Popen(cmd, shell=shell)
1485                p.returncode = code
1486                self.assertEqual(repr(p), sx)
1487
1488    def test_communicate_epipe_only_stdin(self):
1489        # Issue 10963: communicate() should hide EPIPE
1490        p = subprocess.Popen(ZERO_RETURN_CMD,
1491                             stdin=subprocess.PIPE)
1492        self.addCleanup(p.stdin.close)
1493        p.wait()
1494        p.communicate(b"x" * 2**20)
1495
1496    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'),
1497                         "Requires signal.SIGUSR1")
1498    @unittest.skipUnless(hasattr(os, 'kill'),
1499                         "Requires os.kill")
1500    @unittest.skipUnless(hasattr(os, 'getppid'),
1501                         "Requires os.getppid")
1502    def test_communicate_eintr(self):
1503        # Issue #12493: communicate() should handle EINTR
1504        def handler(signum, frame):
1505            pass
1506        old_handler = signal.signal(signal.SIGUSR1, handler)
1507        self.addCleanup(signal.signal, signal.SIGUSR1, old_handler)
1508
1509        args = [sys.executable, "-c",
1510                'import os, signal;'
1511                'os.kill(os.getppid(), signal.SIGUSR1)']
1512        for stream in ('stdout', 'stderr'):
1513            kw = {stream: subprocess.PIPE}
1514            with subprocess.Popen(args, **kw) as process:
1515                # communicate() will be interrupted by SIGUSR1
1516                process.communicate()
1517
1518
1519    # This test is Linux-ish specific for simplicity to at least have
1520    # some coverage.  It is not a platform specific bug.
1521    @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()),
1522                         "Linux specific")
1523    def test_failed_child_execute_fd_leak(self):
1524        """Test for the fork() failure fd leak reported in issue16327."""
1525        fd_directory = '/proc/%d/fd' % os.getpid()
1526        fds_before_popen = os.listdir(fd_directory)
1527        with self.assertRaises(PopenTestException):
1528            PopenExecuteChildRaises(
1529                    ZERO_RETURN_CMD, stdin=subprocess.PIPE,
1530                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1531
1532        # NOTE: This test doesn't verify that the real _execute_child
1533        # does not close the file descriptors itself on the way out
1534        # during an exception.  Code inspection has confirmed that.
1535
1536        fds_after_exception = os.listdir(fd_directory)
1537        self.assertEqual(fds_before_popen, fds_after_exception)
1538
1539    @unittest.skipIf(mswindows, "behavior currently not supported on Windows")
1540    def test_file_not_found_includes_filename(self):
1541        with self.assertRaises(FileNotFoundError) as c:
1542            subprocess.call(['/opt/nonexistent_binary', 'with', 'some', 'args'])
1543        self.assertEqual(c.exception.filename, '/opt/nonexistent_binary')
1544
1545    @unittest.skipIf(mswindows, "behavior currently not supported on Windows")
1546    def test_file_not_found_with_bad_cwd(self):
1547        with self.assertRaises(FileNotFoundError) as c:
1548            subprocess.Popen(['exit', '0'], cwd='/some/nonexistent/directory')
1549        self.assertEqual(c.exception.filename, '/some/nonexistent/directory')
1550
1551    def test_class_getitems(self):
1552        self.assertIsInstance(subprocess.Popen[bytes], types.GenericAlias)
1553        self.assertIsInstance(subprocess.CompletedProcess[str], types.GenericAlias)
1554
1555    @unittest.skipIf(not sysconfig.get_config_var("HAVE_VFORK"),
1556                     "vfork() not enabled by configure.")
1557    @mock.patch("subprocess._fork_exec")
1558    def test__use_vfork(self, mock_fork_exec):
1559        self.assertTrue(subprocess._USE_VFORK)  # The default value regardless.
1560        mock_fork_exec.side_effect = RuntimeError("just testing args")
1561        with self.assertRaises(RuntimeError):
1562            subprocess.run([sys.executable, "-c", "pass"])
1563        mock_fork_exec.assert_called_once()
1564        self.assertTrue(mock_fork_exec.call_args.args[-1])
1565        with mock.patch.object(subprocess, '_USE_VFORK', False):
1566            with self.assertRaises(RuntimeError):
1567                subprocess.run([sys.executable, "-c", "pass"])
1568            self.assertFalse(mock_fork_exec.call_args_list[-1].args[-1])
1569
1570
1571class RunFuncTestCase(BaseTestCase):
1572    def run_python(self, code, **kwargs):
1573        """Run Python code in a subprocess using subprocess.run"""
1574        argv = [sys.executable, "-c", code]
1575        return subprocess.run(argv, **kwargs)
1576
1577    def test_returncode(self):
1578        # call() function with sequence argument
1579        cp = self.run_python("import sys; sys.exit(47)")
1580        self.assertEqual(cp.returncode, 47)
1581        with self.assertRaises(subprocess.CalledProcessError):
1582            cp.check_returncode()
1583
1584    def test_check(self):
1585        with self.assertRaises(subprocess.CalledProcessError) as c:
1586            self.run_python("import sys; sys.exit(47)", check=True)
1587        self.assertEqual(c.exception.returncode, 47)
1588
1589    def test_check_zero(self):
1590        # check_returncode shouldn't raise when returncode is zero
1591        cp = subprocess.run(ZERO_RETURN_CMD, check=True)
1592        self.assertEqual(cp.returncode, 0)
1593
1594    def test_timeout(self):
1595        # run() function with timeout argument; we want to test that the child
1596        # process gets killed when the timeout expires.  If the child isn't
1597        # killed, this call will deadlock since subprocess.run waits for the
1598        # child.
1599        with self.assertRaises(subprocess.TimeoutExpired):
1600            self.run_python("while True: pass", timeout=0.0001)
1601
1602    def test_capture_stdout(self):
1603        # capture stdout with zero return code
1604        cp = self.run_python("print('BDFL')", stdout=subprocess.PIPE)
1605        self.assertIn(b'BDFL', cp.stdout)
1606
1607    def test_capture_stderr(self):
1608        cp = self.run_python("import sys; sys.stderr.write('BDFL')",
1609                             stderr=subprocess.PIPE)
1610        self.assertIn(b'BDFL', cp.stderr)
1611
1612    def test_check_output_stdin_arg(self):
1613        # run() can be called with stdin set to a file
1614        tf = tempfile.TemporaryFile()
1615        self.addCleanup(tf.close)
1616        tf.write(b'pear')
1617        tf.seek(0)
1618        cp = self.run_python(
1619                 "import sys; sys.stdout.write(sys.stdin.read().upper())",
1620                stdin=tf, stdout=subprocess.PIPE)
1621        self.assertIn(b'PEAR', cp.stdout)
1622
1623    def test_check_output_input_arg(self):
1624        # check_output() can be called with input set to a string
1625        cp = self.run_python(
1626                "import sys; sys.stdout.write(sys.stdin.read().upper())",
1627                input=b'pear', stdout=subprocess.PIPE)
1628        self.assertIn(b'PEAR', cp.stdout)
1629
1630    def test_check_output_stdin_with_input_arg(self):
1631        # run() refuses to accept 'stdin' with 'input'
1632        tf = tempfile.TemporaryFile()
1633        self.addCleanup(tf.close)
1634        tf.write(b'pear')
1635        tf.seek(0)
1636        with self.assertRaises(ValueError,
1637              msg="Expected ValueError when stdin and input args supplied.") as c:
1638            output = self.run_python("print('will not be run')",
1639                                     stdin=tf, input=b'hare')
1640        self.assertIn('stdin', c.exception.args[0])
1641        self.assertIn('input', c.exception.args[0])
1642
1643    def test_check_output_timeout(self):
1644        with self.assertRaises(subprocess.TimeoutExpired) as c:
1645            cp = self.run_python((
1646                     "import sys, time\n"
1647                     "sys.stdout.write('BDFL')\n"
1648                     "sys.stdout.flush()\n"
1649                     "time.sleep(3600)"),
1650                    # Some heavily loaded buildbots (sparc Debian 3.x) require
1651                    # this much time to start and print.
1652                    timeout=3, stdout=subprocess.PIPE)
1653        self.assertEqual(c.exception.output, b'BDFL')
1654        # output is aliased to stdout
1655        self.assertEqual(c.exception.stdout, b'BDFL')
1656
1657    def test_run_kwargs(self):
1658        newenv = os.environ.copy()
1659        newenv["FRUIT"] = "banana"
1660        cp = self.run_python(('import sys, os;'
1661                      'sys.exit(33 if os.getenv("FRUIT")=="banana" else 31)'),
1662                             env=newenv)
1663        self.assertEqual(cp.returncode, 33)
1664
1665    def test_run_with_pathlike_path(self):
1666        # bpo-31961: test run(pathlike_object)
1667        # the name of a command that can be run without
1668        # any arguments that exit fast
1669        prog = 'tree.com' if mswindows else 'ls'
1670        path = shutil.which(prog)
1671        if path is None:
1672            self.skipTest(f'{prog} required for this test')
1673        path = FakePath(path)
1674        res = subprocess.run(path, stdout=subprocess.DEVNULL)
1675        self.assertEqual(res.returncode, 0)
1676        with self.assertRaises(TypeError):
1677            subprocess.run(path, stdout=subprocess.DEVNULL, shell=True)
1678
1679    def test_run_with_bytes_path_and_arguments(self):
1680        # bpo-31961: test run([bytes_object, b'additional arguments'])
1681        path = os.fsencode(sys.executable)
1682        args = [path, '-c', b'import sys; sys.exit(57)']
1683        res = subprocess.run(args)
1684        self.assertEqual(res.returncode, 57)
1685
1686    def test_run_with_pathlike_path_and_arguments(self):
1687        # bpo-31961: test run([pathlike_object, 'additional arguments'])
1688        path = FakePath(sys.executable)
1689        args = [path, '-c', 'import sys; sys.exit(57)']
1690        res = subprocess.run(args)
1691        self.assertEqual(res.returncode, 57)
1692
1693    def test_capture_output(self):
1694        cp = self.run_python(("import sys;"
1695                              "sys.stdout.write('BDFL'); "
1696                              "sys.stderr.write('FLUFL')"),
1697                             capture_output=True)
1698        self.assertIn(b'BDFL', cp.stdout)
1699        self.assertIn(b'FLUFL', cp.stderr)
1700
1701    def test_stdout_with_capture_output_arg(self):
1702        # run() refuses to accept 'stdout' with 'capture_output'
1703        tf = tempfile.TemporaryFile()
1704        self.addCleanup(tf.close)
1705        with self.assertRaises(ValueError,
1706            msg=("Expected ValueError when stdout and capture_output "
1707                 "args supplied.")) as c:
1708            output = self.run_python("print('will not be run')",
1709                                      capture_output=True, stdout=tf)
1710        self.assertIn('stdout', c.exception.args[0])
1711        self.assertIn('capture_output', c.exception.args[0])
1712
1713    def test_stderr_with_capture_output_arg(self):
1714        # run() refuses to accept 'stderr' with 'capture_output'
1715        tf = tempfile.TemporaryFile()
1716        self.addCleanup(tf.close)
1717        with self.assertRaises(ValueError,
1718            msg=("Expected ValueError when stderr and capture_output "
1719                 "args supplied.")) as c:
1720            output = self.run_python("print('will not be run')",
1721                                      capture_output=True, stderr=tf)
1722        self.assertIn('stderr', c.exception.args[0])
1723        self.assertIn('capture_output', c.exception.args[0])
1724
1725    # This test _might_ wind up a bit fragile on loaded build+test machines
1726    # as it depends on the timing with wide enough margins for normal situations
1727    # but does assert that it happened "soon enough" to believe the right thing
1728    # happened.
1729    @unittest.skipIf(mswindows, "requires posix like 'sleep' shell command")
1730    def test_run_with_shell_timeout_and_capture_output(self):
1731        """Output capturing after a timeout mustn't hang forever on open filehandles."""
1732        before_secs = time.monotonic()
1733        try:
1734            subprocess.run('sleep 3', shell=True, timeout=0.1,
1735                           capture_output=True)  # New session unspecified.
1736        except subprocess.TimeoutExpired as exc:
1737            after_secs = time.monotonic()
1738            stacks = traceback.format_exc()  # assertRaises doesn't give this.
1739        else:
1740            self.fail("TimeoutExpired not raised.")
1741        self.assertLess(after_secs - before_secs, 1.5,
1742                        msg="TimeoutExpired was delayed! Bad traceback:\n```\n"
1743                        f"{stacks}```")
1744
1745    def test_encoding_warning(self):
1746        code = textwrap.dedent("""\
1747            from subprocess import *
1748            run("echo hello", shell=True, text=True)
1749            check_output("echo hello", shell=True, text=True)
1750            """)
1751        cp = subprocess.run([sys.executable, "-Xwarn_default_encoding", "-c", code],
1752                            capture_output=True)
1753        lines = cp.stderr.splitlines()
1754        self.assertEqual(len(lines), 2, lines)
1755        self.assertTrue(lines[0].startswith(b"<string>:2: EncodingWarning: "))
1756        self.assertTrue(lines[1].startswith(b"<string>:3: EncodingWarning: "))
1757
1758
1759def _get_test_grp_name():
1760    for name_group in ('staff', 'nogroup', 'grp', 'nobody', 'nfsnobody'):
1761        if grp:
1762            try:
1763                grp.getgrnam(name_group)
1764            except KeyError:
1765                continue
1766            return name_group
1767    else:
1768        raise unittest.SkipTest('No identified group name to use for this test on this platform.')
1769
1770
1771@unittest.skipIf(mswindows, "POSIX specific tests")
1772class POSIXProcessTestCase(BaseTestCase):
1773
1774    def setUp(self):
1775        super().setUp()
1776        self._nonexistent_dir = "/_this/pa.th/does/not/exist"
1777
1778    def _get_chdir_exception(self):
1779        try:
1780            os.chdir(self._nonexistent_dir)
1781        except OSError as e:
1782            # This avoids hard coding the errno value or the OS perror()
1783            # string and instead capture the exception that we want to see
1784            # below for comparison.
1785            desired_exception = e
1786        else:
1787            self.fail("chdir to nonexistent directory %s succeeded." %
1788                      self._nonexistent_dir)
1789        return desired_exception
1790
1791    def test_exception_cwd(self):
1792        """Test error in the child raised in the parent for a bad cwd."""
1793        desired_exception = self._get_chdir_exception()
1794        try:
1795            p = subprocess.Popen([sys.executable, "-c", ""],
1796                                 cwd=self._nonexistent_dir)
1797        except OSError as e:
1798            # Test that the child process chdir failure actually makes
1799            # it up to the parent process as the correct exception.
1800            self.assertEqual(desired_exception.errno, e.errno)
1801            self.assertEqual(desired_exception.strerror, e.strerror)
1802            self.assertEqual(desired_exception.filename, e.filename)
1803        else:
1804            self.fail("Expected OSError: %s" % desired_exception)
1805
1806    def test_exception_bad_executable(self):
1807        """Test error in the child raised in the parent for a bad executable."""
1808        desired_exception = self._get_chdir_exception()
1809        try:
1810            p = subprocess.Popen([sys.executable, "-c", ""],
1811                                 executable=self._nonexistent_dir)
1812        except OSError as e:
1813            # Test that the child process exec failure actually makes
1814            # it up to the parent process as the correct exception.
1815            self.assertEqual(desired_exception.errno, e.errno)
1816            self.assertEqual(desired_exception.strerror, e.strerror)
1817            self.assertEqual(desired_exception.filename, e.filename)
1818        else:
1819            self.fail("Expected OSError: %s" % desired_exception)
1820
1821    def test_exception_bad_args_0(self):
1822        """Test error in the child raised in the parent for a bad args[0]."""
1823        desired_exception = self._get_chdir_exception()
1824        try:
1825            p = subprocess.Popen([self._nonexistent_dir, "-c", ""])
1826        except OSError as e:
1827            # Test that the child process exec failure actually makes
1828            # it up to the parent process as the correct exception.
1829            self.assertEqual(desired_exception.errno, e.errno)
1830            self.assertEqual(desired_exception.strerror, e.strerror)
1831            self.assertEqual(desired_exception.filename, e.filename)
1832        else:
1833            self.fail("Expected OSError: %s" % desired_exception)
1834
1835    # We mock the __del__ method for Popen in the next two tests
1836    # because it does cleanup based on the pid returned by fork_exec
1837    # along with issuing a resource warning if it still exists. Since
1838    # we don't actually spawn a process in these tests we can forego
1839    # the destructor. An alternative would be to set _child_created to
1840    # False before the destructor is called but there is no easy way
1841    # to do that
1842    class PopenNoDestructor(subprocess.Popen):
1843        def __del__(self):
1844            pass
1845
1846    @mock.patch("subprocess._fork_exec")
1847    def test_exception_errpipe_normal(self, fork_exec):
1848        """Test error passing done through errpipe_write in the good case"""
1849        def proper_error(*args):
1850            errpipe_write = args[13]
1851            # Write the hex for the error code EISDIR: 'is a directory'
1852            err_code = '{:x}'.format(errno.EISDIR).encode()
1853            os.write(errpipe_write, b"OSError:" + err_code + b":")
1854            return 0
1855
1856        fork_exec.side_effect = proper_error
1857
1858        with mock.patch("subprocess.os.waitpid",
1859                        side_effect=ChildProcessError):
1860            with self.assertRaises(IsADirectoryError):
1861                self.PopenNoDestructor(["non_existent_command"])
1862
1863    @mock.patch("subprocess._fork_exec")
1864    def test_exception_errpipe_bad_data(self, fork_exec):
1865        """Test error passing done through errpipe_write where its not
1866        in the expected format"""
1867        error_data = b"\xFF\x00\xDE\xAD"
1868        def bad_error(*args):
1869            errpipe_write = args[13]
1870            # Anything can be in the pipe, no assumptions should
1871            # be made about its encoding, so we'll write some
1872            # arbitrary hex bytes to test it out
1873            os.write(errpipe_write, error_data)
1874            return 0
1875
1876        fork_exec.side_effect = bad_error
1877
1878        with mock.patch("subprocess.os.waitpid",
1879                        side_effect=ChildProcessError):
1880            with self.assertRaises(subprocess.SubprocessError) as e:
1881                self.PopenNoDestructor(["non_existent_command"])
1882
1883        self.assertIn(repr(error_data), str(e.exception))
1884
1885    @unittest.skipIf(not os.path.exists('/proc/self/status'),
1886                     "need /proc/self/status")
1887    def test_restore_signals(self):
1888        # Blindly assume that cat exists on systems with /proc/self/status...
1889        default_proc_status = subprocess.check_output(
1890                ['cat', '/proc/self/status'],
1891                restore_signals=False)
1892        for line in default_proc_status.splitlines():
1893            if line.startswith(b'SigIgn'):
1894                default_sig_ign_mask = line
1895                break
1896        else:
1897            self.skipTest("SigIgn not found in /proc/self/status.")
1898        restored_proc_status = subprocess.check_output(
1899                ['cat', '/proc/self/status'],
1900                restore_signals=True)
1901        for line in restored_proc_status.splitlines():
1902            if line.startswith(b'SigIgn'):
1903                restored_sig_ign_mask = line
1904                break
1905        self.assertNotEqual(default_sig_ign_mask, restored_sig_ign_mask,
1906                            msg="restore_signals=True should've unblocked "
1907                            "SIGPIPE and friends.")
1908
1909    def test_start_new_session(self):
1910        # For code coverage of calling setsid().  We don't care if we get an
1911        # EPERM error from it depending on the test execution environment, that
1912        # still indicates that it was called.
1913        try:
1914            output = subprocess.check_output(
1915                    [sys.executable, "-c", "import os; print(os.getsid(0))"],
1916                    start_new_session=True)
1917        except PermissionError as e:
1918            if e.errno != errno.EPERM:
1919                raise  # EACCES?
1920        else:
1921            parent_sid = os.getsid(0)
1922            child_sid = int(output)
1923            self.assertNotEqual(parent_sid, child_sid)
1924
1925    @unittest.skipUnless(hasattr(os, 'setpgid') and hasattr(os, 'getpgid'),
1926                         'no setpgid or getpgid on platform')
1927    def test_process_group_0(self):
1928        # For code coverage of calling setpgid().  We don't care if we get an
1929        # EPERM error from it depending on the test execution environment, that
1930        # still indicates that it was called.
1931        try:
1932            output = subprocess.check_output(
1933                    [sys.executable, "-c", "import os; print(os.getpgid(0))"],
1934                    process_group=0)
1935        except PermissionError as e:
1936            if e.errno != errno.EPERM:
1937                raise  # EACCES?
1938        else:
1939            parent_pgid = os.getpgid(0)
1940            child_pgid = int(output)
1941            self.assertNotEqual(parent_pgid, child_pgid)
1942
1943    @unittest.skipUnless(hasattr(os, 'setreuid'), 'no setreuid on platform')
1944    def test_user(self):
1945        # For code coverage of the user parameter.  We don't care if we get an
1946        # EPERM error from it depending on the test execution environment, that
1947        # still indicates that it was called.
1948
1949        uid = os.geteuid()
1950        test_users = [65534 if uid != 65534 else 65533, uid]
1951        name_uid = "nobody" if sys.platform != 'darwin' else "unknown"
1952
1953        if pwd is not None:
1954            try:
1955                pwd.getpwnam(name_uid)
1956                test_users.append(name_uid)
1957            except KeyError:
1958                # unknown user name
1959                name_uid = None
1960
1961        for user in test_users:
1962            # posix_spawn() may be used with close_fds=False
1963            for close_fds in (False, True):
1964                with self.subTest(user=user, close_fds=close_fds):
1965                    try:
1966                        output = subprocess.check_output(
1967                                [sys.executable, "-c",
1968                                 "import os; print(os.getuid())"],
1969                                user=user,
1970                                close_fds=close_fds)
1971                    except PermissionError:  # (EACCES, EPERM)
1972                        pass
1973                    except OSError as e:
1974                        if e.errno not in (errno.EACCES, errno.EPERM):
1975                            raise
1976                    else:
1977                        if isinstance(user, str):
1978                            user_uid = pwd.getpwnam(user).pw_uid
1979                        else:
1980                            user_uid = user
1981                        child_user = int(output)
1982                        self.assertEqual(child_user, user_uid)
1983
1984        with self.assertRaises(ValueError):
1985            subprocess.check_call(ZERO_RETURN_CMD, user=-1)
1986
1987        with self.assertRaises(OverflowError):
1988            subprocess.check_call(ZERO_RETURN_CMD,
1989                                  cwd=os.curdir, env=os.environ, user=2**64)
1990
1991        if pwd is None and name_uid is not None:
1992            with self.assertRaises(ValueError):
1993                subprocess.check_call(ZERO_RETURN_CMD, user=name_uid)
1994
1995    @unittest.skipIf(hasattr(os, 'setreuid'), 'setreuid() available on platform')
1996    def test_user_error(self):
1997        with self.assertRaises(ValueError):
1998            subprocess.check_call(ZERO_RETURN_CMD, user=65535)
1999
2000    @unittest.skipUnless(hasattr(os, 'setregid'), 'no setregid() on platform')
2001    def test_group(self):
2002        gid = os.getegid()
2003        group_list = [65534 if gid != 65534 else 65533]
2004        name_group = _get_test_grp_name()
2005
2006        if grp is not None:
2007            group_list.append(name_group)
2008
2009        for group in group_list + [gid]:
2010            # posix_spawn() may be used with close_fds=False
2011            for close_fds in (False, True):
2012                with self.subTest(group=group, close_fds=close_fds):
2013                    try:
2014                        output = subprocess.check_output(
2015                                [sys.executable, "-c",
2016                                 "import os; print(os.getgid())"],
2017                                group=group,
2018                                close_fds=close_fds)
2019                    except PermissionError:  # (EACCES, EPERM)
2020                        pass
2021                    else:
2022                        if isinstance(group, str):
2023                            group_gid = grp.getgrnam(group).gr_gid
2024                        else:
2025                            group_gid = group
2026
2027                        child_group = int(output)
2028                        self.assertEqual(child_group, group_gid)
2029
2030        # make sure we bomb on negative values
2031        with self.assertRaises(ValueError):
2032            subprocess.check_call(ZERO_RETURN_CMD, group=-1)
2033
2034        with self.assertRaises(OverflowError):
2035            subprocess.check_call(ZERO_RETURN_CMD,
2036                                  cwd=os.curdir, env=os.environ, group=2**64)
2037
2038        if grp is None:
2039            with self.assertRaises(ValueError):
2040                subprocess.check_call(ZERO_RETURN_CMD, group=name_group)
2041
2042    @unittest.skipIf(hasattr(os, 'setregid'), 'setregid() available on platform')
2043    def test_group_error(self):
2044        with self.assertRaises(ValueError):
2045            subprocess.check_call(ZERO_RETURN_CMD, group=65535)
2046
2047    @unittest.skipUnless(hasattr(os, 'setgroups'), 'no setgroups() on platform')
2048    def test_extra_groups(self):
2049        gid = os.getegid()
2050        group_list = [65534 if gid != 65534 else 65533]
2051        name_group = _get_test_grp_name()
2052        perm_error = False
2053
2054        if grp is not None:
2055            group_list.append(name_group)
2056
2057        try:
2058            output = subprocess.check_output(
2059                    [sys.executable, "-c",
2060                     "import os, sys, json; json.dump(os.getgroups(), sys.stdout)"],
2061                    extra_groups=group_list)
2062        except OSError as ex:
2063            if ex.errno != errno.EPERM:
2064                raise
2065            perm_error = True
2066
2067        else:
2068            parent_groups = os.getgroups()
2069            child_groups = json.loads(output)
2070
2071            if grp is not None:
2072                desired_gids = [grp.getgrnam(g).gr_gid if isinstance(g, str) else g
2073                                for g in group_list]
2074            else:
2075                desired_gids = group_list
2076
2077            if perm_error:
2078                self.assertEqual(set(child_groups), set(parent_groups))
2079            else:
2080                self.assertEqual(set(desired_gids), set(child_groups))
2081
2082        # make sure we bomb on negative values
2083        with self.assertRaises(ValueError):
2084            subprocess.check_call(ZERO_RETURN_CMD, extra_groups=[-1])
2085
2086        with self.assertRaises(ValueError):
2087            subprocess.check_call(ZERO_RETURN_CMD,
2088                                  cwd=os.curdir, env=os.environ,
2089                                  extra_groups=[2**64])
2090
2091        if grp is None:
2092            with self.assertRaises(ValueError):
2093                subprocess.check_call(ZERO_RETURN_CMD,
2094                                      extra_groups=[name_group])
2095
2096    @unittest.skipIf(hasattr(os, 'setgroups'), 'setgroups() available on platform')
2097    def test_extra_groups_error(self):
2098        with self.assertRaises(ValueError):
2099            subprocess.check_call(ZERO_RETURN_CMD, extra_groups=[])
2100
2101    @unittest.skipIf(mswindows or not hasattr(os, 'umask'),
2102                     'POSIX umask() is not available.')
2103    def test_umask(self):
2104        tmpdir = None
2105        try:
2106            tmpdir = tempfile.mkdtemp()
2107            name = os.path.join(tmpdir, "beans")
2108            # We set an unusual umask in the child so as a unique mode
2109            # for us to test the child's touched file for.
2110            subprocess.check_call(
2111                    [sys.executable, "-c", f"open({name!r}, 'w').close()"],
2112                    umask=0o053)
2113            # Ignore execute permissions entirely in our test,
2114            # filesystems could be mounted to ignore or force that.
2115            st_mode = os.stat(name).st_mode & 0o666
2116            expected_mode = 0o624
2117            self.assertEqual(expected_mode, st_mode,
2118                             msg=f'{oct(expected_mode)} != {oct(st_mode)}')
2119        finally:
2120            if tmpdir is not None:
2121                shutil.rmtree(tmpdir)
2122
2123    def test_run_abort(self):
2124        # returncode handles signal termination
2125        with support.SuppressCrashReport():
2126            p = subprocess.Popen([sys.executable, "-c",
2127                                  'import os; os.abort()'])
2128            p.wait()
2129        self.assertEqual(-p.returncode, signal.SIGABRT)
2130
2131    def test_CalledProcessError_str_signal(self):
2132        err = subprocess.CalledProcessError(-int(signal.SIGABRT), "fake cmd")
2133        error_string = str(err)
2134        # We're relying on the repr() of the signal.Signals intenum to provide
2135        # the word signal, the signal name and the numeric value.
2136        self.assertIn("signal", error_string.lower())
2137        # We're not being specific about the signal name as some signals have
2138        # multiple names and which name is revealed can vary.
2139        self.assertIn("SIG", error_string)
2140        self.assertIn(str(signal.SIGABRT), error_string)
2141
2142    def test_CalledProcessError_str_unknown_signal(self):
2143        err = subprocess.CalledProcessError(-9876543, "fake cmd")
2144        error_string = str(err)
2145        self.assertIn("unknown signal 9876543.", error_string)
2146
2147    def test_CalledProcessError_str_non_zero(self):
2148        err = subprocess.CalledProcessError(2, "fake cmd")
2149        error_string = str(err)
2150        self.assertIn("non-zero exit status 2.", error_string)
2151
2152    def test_preexec(self):
2153        # DISCLAIMER: Setting environment variables is *not* a good use
2154        # of a preexec_fn.  This is merely a test.
2155        p = subprocess.Popen([sys.executable, "-c",
2156                              'import sys,os;'
2157                              'sys.stdout.write(os.getenv("FRUIT"))'],
2158                             stdout=subprocess.PIPE,
2159                             preexec_fn=lambda: os.putenv("FRUIT", "apple"))
2160        with p:
2161            self.assertEqual(p.stdout.read(), b"apple")
2162
2163    def test_preexec_exception(self):
2164        def raise_it():
2165            raise ValueError("What if two swallows carried a coconut?")
2166        try:
2167            p = subprocess.Popen([sys.executable, "-c", ""],
2168                                 preexec_fn=raise_it)
2169        except subprocess.SubprocessError as e:
2170            self.assertTrue(
2171                    subprocess._fork_exec,
2172                    "Expected a ValueError from the preexec_fn")
2173        except ValueError as e:
2174            self.assertIn("coconut", e.args[0])
2175        else:
2176            self.fail("Exception raised by preexec_fn did not make it "
2177                      "to the parent process.")
2178
2179    class _TestExecuteChildPopen(subprocess.Popen):
2180        """Used to test behavior at the end of _execute_child."""
2181        def __init__(self, testcase, *args, **kwargs):
2182            self._testcase = testcase
2183            subprocess.Popen.__init__(self, *args, **kwargs)
2184
2185        def _execute_child(self, *args, **kwargs):
2186            try:
2187                subprocess.Popen._execute_child(self, *args, **kwargs)
2188            finally:
2189                # Open a bunch of file descriptors and verify that
2190                # none of them are the same as the ones the Popen
2191                # instance is using for stdin/stdout/stderr.
2192                devzero_fds = [os.open("/dev/zero", os.O_RDONLY)
2193                               for _ in range(8)]
2194                try:
2195                    for fd in devzero_fds:
2196                        self._testcase.assertNotIn(
2197                                fd, (self.stdin.fileno(), self.stdout.fileno(),
2198                                     self.stderr.fileno()),
2199                                msg="At least one fd was closed early.")
2200                finally:
2201                    for fd in devzero_fds:
2202                        os.close(fd)
2203
2204    @unittest.skipIf(not os.path.exists("/dev/zero"), "/dev/zero required.")
2205    def test_preexec_errpipe_does_not_double_close_pipes(self):
2206        """Issue16140: Don't double close pipes on preexec error."""
2207
2208        def raise_it():
2209            raise subprocess.SubprocessError(
2210                    "force the _execute_child() errpipe_data path.")
2211
2212        with self.assertRaises(subprocess.SubprocessError):
2213            self._TestExecuteChildPopen(
2214                        self, ZERO_RETURN_CMD,
2215                        stdin=subprocess.PIPE, stdout=subprocess.PIPE,
2216                        stderr=subprocess.PIPE, preexec_fn=raise_it)
2217
2218    def test_preexec_gc_module_failure(self):
2219        # This tests the code that disables garbage collection if the child
2220        # process will execute any Python.
2221        enabled = gc.isenabled()
2222        try:
2223            gc.disable()
2224            self.assertFalse(gc.isenabled())
2225            subprocess.call([sys.executable, '-c', ''],
2226                            preexec_fn=lambda: None)
2227            self.assertFalse(gc.isenabled(),
2228                             "Popen enabled gc when it shouldn't.")
2229
2230            gc.enable()
2231            self.assertTrue(gc.isenabled())
2232            subprocess.call([sys.executable, '-c', ''],
2233                            preexec_fn=lambda: None)
2234            self.assertTrue(gc.isenabled(), "Popen left gc disabled.")
2235        finally:
2236            if not enabled:
2237                gc.disable()
2238
2239    @unittest.skipIf(
2240        sys.platform == 'darwin', 'setrlimit() seems to fail on OS X')
2241    def test_preexec_fork_failure(self):
2242        # The internal code did not preserve the previous exception when
2243        # re-enabling garbage collection
2244        try:
2245            from resource import getrlimit, setrlimit, RLIMIT_NPROC
2246        except ImportError as err:
2247            self.skipTest(err)  # RLIMIT_NPROC is specific to Linux and BSD
2248        limits = getrlimit(RLIMIT_NPROC)
2249        [_, hard] = limits
2250        setrlimit(RLIMIT_NPROC, (0, hard))
2251        self.addCleanup(setrlimit, RLIMIT_NPROC, limits)
2252        try:
2253            subprocess.call([sys.executable, '-c', ''],
2254                            preexec_fn=lambda: None)
2255        except BlockingIOError:
2256            # Forking should raise EAGAIN, translated to BlockingIOError
2257            pass
2258        else:
2259            self.skipTest('RLIMIT_NPROC had no effect; probably superuser')
2260
2261    def test_args_string(self):
2262        # args is a string
2263        fd, fname = tempfile.mkstemp()
2264        # reopen in text mode
2265        with open(fd, "w", errors="surrogateescape") as fobj:
2266            fobj.write("#!%s\n" % support.unix_shell)
2267            fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" %
2268                       sys.executable)
2269        os.chmod(fname, 0o700)
2270        p = subprocess.Popen(fname)
2271        p.wait()
2272        os.remove(fname)
2273        self.assertEqual(p.returncode, 47)
2274
2275    def test_invalid_args(self):
2276        # invalid arguments should raise ValueError
2277        self.assertRaises(ValueError, subprocess.call,
2278                          [sys.executable, "-c",
2279                           "import sys; sys.exit(47)"],
2280                          startupinfo=47)
2281        self.assertRaises(ValueError, subprocess.call,
2282                          [sys.executable, "-c",
2283                           "import sys; sys.exit(47)"],
2284                          creationflags=47)
2285
2286    def test_shell_sequence(self):
2287        # Run command through the shell (sequence)
2288        newenv = os.environ.copy()
2289        newenv["FRUIT"] = "apple"
2290        p = subprocess.Popen(["echo $FRUIT"], shell=1,
2291                             stdout=subprocess.PIPE,
2292                             env=newenv)
2293        with p:
2294            self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple")
2295
2296    def test_shell_string(self):
2297        # Run command through the shell (string)
2298        newenv = os.environ.copy()
2299        newenv["FRUIT"] = "apple"
2300        p = subprocess.Popen("echo $FRUIT", shell=1,
2301                             stdout=subprocess.PIPE,
2302                             env=newenv)
2303        with p:
2304            self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple")
2305
2306    def test_call_string(self):
2307        # call() function with string argument on UNIX
2308        fd, fname = tempfile.mkstemp()
2309        # reopen in text mode
2310        with open(fd, "w", errors="surrogateescape") as fobj:
2311            fobj.write("#!%s\n" % support.unix_shell)
2312            fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" %
2313                       sys.executable)
2314        os.chmod(fname, 0o700)
2315        rc = subprocess.call(fname)
2316        os.remove(fname)
2317        self.assertEqual(rc, 47)
2318
2319    def test_specific_shell(self):
2320        # Issue #9265: Incorrect name passed as arg[0].
2321        shells = []
2322        for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']:
2323            for name in ['bash', 'ksh']:
2324                sh = os.path.join(prefix, name)
2325                if os.path.isfile(sh):
2326                    shells.append(sh)
2327        if not shells: # Will probably work for any shell but csh.
2328            self.skipTest("bash or ksh required for this test")
2329        sh = '/bin/sh'
2330        if os.path.isfile(sh) and not os.path.islink(sh):
2331            # Test will fail if /bin/sh is a symlink to csh.
2332            shells.append(sh)
2333        for sh in shells:
2334            p = subprocess.Popen("echo $0", executable=sh, shell=True,
2335                                 stdout=subprocess.PIPE)
2336            with p:
2337                self.assertEqual(p.stdout.read().strip(), bytes(sh, 'ascii'))
2338
2339    def _kill_process(self, method, *args):
2340        # Do not inherit file handles from the parent.
2341        # It should fix failures on some platforms.
2342        # Also set the SIGINT handler to the default to make sure it's not
2343        # being ignored (some tests rely on that.)
2344        old_handler = signal.signal(signal.SIGINT, signal.default_int_handler)
2345        try:
2346            p = subprocess.Popen([sys.executable, "-c", """if 1:
2347                                 import sys, time
2348                                 sys.stdout.write('x\\n')
2349                                 sys.stdout.flush()
2350                                 time.sleep(30)
2351                                 """],
2352                                 close_fds=True,
2353                                 stdin=subprocess.PIPE,
2354                                 stdout=subprocess.PIPE,
2355                                 stderr=subprocess.PIPE)
2356        finally:
2357            signal.signal(signal.SIGINT, old_handler)
2358        # Wait for the interpreter to be completely initialized before
2359        # sending any signal.
2360        p.stdout.read(1)
2361        getattr(p, method)(*args)
2362        return p
2363
2364    @unittest.skipIf(sys.platform.startswith(('netbsd', 'openbsd')),
2365                     "Due to known OS bug (issue #16762)")
2366    def _kill_dead_process(self, method, *args):
2367        # Do not inherit file handles from the parent.
2368        # It should fix failures on some platforms.
2369        p = subprocess.Popen([sys.executable, "-c", """if 1:
2370                             import sys, time
2371                             sys.stdout.write('x\\n')
2372                             sys.stdout.flush()
2373                             """],
2374                             close_fds=True,
2375                             stdin=subprocess.PIPE,
2376                             stdout=subprocess.PIPE,
2377                             stderr=subprocess.PIPE)
2378        # Wait for the interpreter to be completely initialized before
2379        # sending any signal.
2380        p.stdout.read(1)
2381        # The process should end after this
2382        time.sleep(1)
2383        # This shouldn't raise even though the child is now dead
2384        getattr(p, method)(*args)
2385        p.communicate()
2386
2387    def test_send_signal(self):
2388        p = self._kill_process('send_signal', signal.SIGINT)
2389        _, stderr = p.communicate()
2390        self.assertIn(b'KeyboardInterrupt', stderr)
2391        self.assertNotEqual(p.wait(), 0)
2392
2393    def test_kill(self):
2394        p = self._kill_process('kill')
2395        _, stderr = p.communicate()
2396        self.assertEqual(stderr, b'')
2397        self.assertEqual(p.wait(), -signal.SIGKILL)
2398
2399    def test_terminate(self):
2400        p = self._kill_process('terminate')
2401        _, stderr = p.communicate()
2402        self.assertEqual(stderr, b'')
2403        self.assertEqual(p.wait(), -signal.SIGTERM)
2404
2405    def test_send_signal_dead(self):
2406        # Sending a signal to a dead process
2407        self._kill_dead_process('send_signal', signal.SIGINT)
2408
2409    def test_kill_dead(self):
2410        # Killing a dead process
2411        self._kill_dead_process('kill')
2412
2413    def test_terminate_dead(self):
2414        # Terminating a dead process
2415        self._kill_dead_process('terminate')
2416
2417    def _save_fds(self, save_fds):
2418        fds = []
2419        for fd in save_fds:
2420            inheritable = os.get_inheritable(fd)
2421            saved = os.dup(fd)
2422            fds.append((fd, saved, inheritable))
2423        return fds
2424
2425    def _restore_fds(self, fds):
2426        for fd, saved, inheritable in fds:
2427            os.dup2(saved, fd, inheritable=inheritable)
2428            os.close(saved)
2429
2430    def check_close_std_fds(self, fds):
2431        # Issue #9905: test that subprocess pipes still work properly with
2432        # some standard fds closed
2433        stdin = 0
2434        saved_fds = self._save_fds(fds)
2435        for fd, saved, inheritable in saved_fds:
2436            if fd == 0:
2437                stdin = saved
2438                break
2439        try:
2440            for fd in fds:
2441                os.close(fd)
2442            out, err = subprocess.Popen([sys.executable, "-c",
2443                              'import sys;'
2444                              'sys.stdout.write("apple");'
2445                              'sys.stdout.flush();'
2446                              'sys.stderr.write("orange")'],
2447                       stdin=stdin,
2448                       stdout=subprocess.PIPE,
2449                       stderr=subprocess.PIPE).communicate()
2450            self.assertEqual(out, b'apple')
2451            self.assertEqual(err, b'orange')
2452        finally:
2453            self._restore_fds(saved_fds)
2454
2455    def test_close_fd_0(self):
2456        self.check_close_std_fds([0])
2457
2458    def test_close_fd_1(self):
2459        self.check_close_std_fds([1])
2460
2461    def test_close_fd_2(self):
2462        self.check_close_std_fds([2])
2463
2464    def test_close_fds_0_1(self):
2465        self.check_close_std_fds([0, 1])
2466
2467    def test_close_fds_0_2(self):
2468        self.check_close_std_fds([0, 2])
2469
2470    def test_close_fds_1_2(self):
2471        self.check_close_std_fds([1, 2])
2472
2473    def test_close_fds_0_1_2(self):
2474        # Issue #10806: test that subprocess pipes still work properly with
2475        # all standard fds closed.
2476        self.check_close_std_fds([0, 1, 2])
2477
2478    def test_small_errpipe_write_fd(self):
2479        """Issue #15798: Popen should work when stdio fds are available."""
2480        new_stdin = os.dup(0)
2481        new_stdout = os.dup(1)
2482        try:
2483            os.close(0)
2484            os.close(1)
2485
2486            # Side test: if errpipe_write fails to have its CLOEXEC
2487            # flag set this should cause the parent to think the exec
2488            # failed.  Extremely unlikely: everyone supports CLOEXEC.
2489            subprocess.Popen([
2490                    sys.executable, "-c",
2491                    "print('AssertionError:0:CLOEXEC failure.')"]).wait()
2492        finally:
2493            # Restore original stdin and stdout
2494            os.dup2(new_stdin, 0)
2495            os.dup2(new_stdout, 1)
2496            os.close(new_stdin)
2497            os.close(new_stdout)
2498
2499    def test_remapping_std_fds(self):
2500        # open up some temporary files
2501        temps = [tempfile.mkstemp() for i in range(3)]
2502        try:
2503            temp_fds = [fd for fd, fname in temps]
2504
2505            # unlink the files -- we won't need to reopen them
2506            for fd, fname in temps:
2507                os.unlink(fname)
2508
2509            # write some data to what will become stdin, and rewind
2510            os.write(temp_fds[1], b"STDIN")
2511            os.lseek(temp_fds[1], 0, 0)
2512
2513            # move the standard file descriptors out of the way
2514            saved_fds = self._save_fds(range(3))
2515            try:
2516                # duplicate the file objects over the standard fd's
2517                for fd, temp_fd in enumerate(temp_fds):
2518                    os.dup2(temp_fd, fd)
2519
2520                # now use those files in the "wrong" order, so that subprocess
2521                # has to rearrange them in the child
2522                p = subprocess.Popen([sys.executable, "-c",
2523                    'import sys; got = sys.stdin.read();'
2524                    'sys.stdout.write("got %s"%got); sys.stderr.write("err")'],
2525                    stdin=temp_fds[1],
2526                    stdout=temp_fds[2],
2527                    stderr=temp_fds[0])
2528                p.wait()
2529            finally:
2530                self._restore_fds(saved_fds)
2531
2532            for fd in temp_fds:
2533                os.lseek(fd, 0, 0)
2534
2535            out = os.read(temp_fds[2], 1024)
2536            err = os.read(temp_fds[0], 1024).strip()
2537            self.assertEqual(out, b"got STDIN")
2538            self.assertEqual(err, b"err")
2539
2540        finally:
2541            for fd in temp_fds:
2542                os.close(fd)
2543
2544    def check_swap_fds(self, stdin_no, stdout_no, stderr_no):
2545        # open up some temporary files
2546        temps = [tempfile.mkstemp() for i in range(3)]
2547        temp_fds = [fd for fd, fname in temps]
2548        try:
2549            # unlink the files -- we won't need to reopen them
2550            for fd, fname in temps:
2551                os.unlink(fname)
2552
2553            # save a copy of the standard file descriptors
2554            saved_fds = self._save_fds(range(3))
2555            try:
2556                # duplicate the temp files over the standard fd's 0, 1, 2
2557                for fd, temp_fd in enumerate(temp_fds):
2558                    os.dup2(temp_fd, fd)
2559
2560                # write some data to what will become stdin, and rewind
2561                os.write(stdin_no, b"STDIN")
2562                os.lseek(stdin_no, 0, 0)
2563
2564                # now use those files in the given order, so that subprocess
2565                # has to rearrange them in the child
2566                p = subprocess.Popen([sys.executable, "-c",
2567                    'import sys; got = sys.stdin.read();'
2568                    'sys.stdout.write("got %s"%got); sys.stderr.write("err")'],
2569                    stdin=stdin_no,
2570                    stdout=stdout_no,
2571                    stderr=stderr_no)
2572                p.wait()
2573
2574                for fd in temp_fds:
2575                    os.lseek(fd, 0, 0)
2576
2577                out = os.read(stdout_no, 1024)
2578                err = os.read(stderr_no, 1024).strip()
2579            finally:
2580                self._restore_fds(saved_fds)
2581
2582            self.assertEqual(out, b"got STDIN")
2583            self.assertEqual(err, b"err")
2584
2585        finally:
2586            for fd in temp_fds:
2587                os.close(fd)
2588
2589    # When duping fds, if there arises a situation where one of the fds is
2590    # either 0, 1 or 2, it is possible that it is overwritten (#12607).
2591    # This tests all combinations of this.
2592    def test_swap_fds(self):
2593        self.check_swap_fds(0, 1, 2)
2594        self.check_swap_fds(0, 2, 1)
2595        self.check_swap_fds(1, 0, 2)
2596        self.check_swap_fds(1, 2, 0)
2597        self.check_swap_fds(2, 0, 1)
2598        self.check_swap_fds(2, 1, 0)
2599
2600    def _check_swap_std_fds_with_one_closed(self, from_fds, to_fds):
2601        saved_fds = self._save_fds(range(3))
2602        try:
2603            for from_fd in from_fds:
2604                with tempfile.TemporaryFile() as f:
2605                    os.dup2(f.fileno(), from_fd)
2606
2607            fd_to_close = (set(range(3)) - set(from_fds)).pop()
2608            os.close(fd_to_close)
2609
2610            arg_names = ['stdin', 'stdout', 'stderr']
2611            kwargs = {}
2612            for from_fd, to_fd in zip(from_fds, to_fds):
2613                kwargs[arg_names[to_fd]] = from_fd
2614
2615            code = textwrap.dedent(r'''
2616                import os, sys
2617                skipped_fd = int(sys.argv[1])
2618                for fd in range(3):
2619                    if fd != skipped_fd:
2620                        os.write(fd, str(fd).encode('ascii'))
2621            ''')
2622
2623            skipped_fd = (set(range(3)) - set(to_fds)).pop()
2624
2625            rc = subprocess.call([sys.executable, '-c', code, str(skipped_fd)],
2626                                 **kwargs)
2627            self.assertEqual(rc, 0)
2628
2629            for from_fd, to_fd in zip(from_fds, to_fds):
2630                os.lseek(from_fd, 0, os.SEEK_SET)
2631                read_bytes = os.read(from_fd, 1024)
2632                read_fds = list(map(int, read_bytes.decode('ascii')))
2633                msg = textwrap.dedent(f"""
2634                    When testing {from_fds} to {to_fds} redirection,
2635                    parent descriptor {from_fd} got redirected
2636                    to descriptor(s) {read_fds} instead of descriptor {to_fd}.
2637                """)
2638                self.assertEqual([to_fd], read_fds, msg)
2639        finally:
2640            self._restore_fds(saved_fds)
2641
2642    # Check that subprocess can remap std fds correctly even
2643    # if one of them is closed (#32844).
2644    def test_swap_std_fds_with_one_closed(self):
2645        for from_fds in itertools.combinations(range(3), 2):
2646            for to_fds in itertools.permutations(range(3), 2):
2647                self._check_swap_std_fds_with_one_closed(from_fds, to_fds)
2648
2649    def test_surrogates_error_message(self):
2650        def prepare():
2651            raise ValueError("surrogate:\uDCff")
2652
2653        try:
2654            subprocess.call(
2655                ZERO_RETURN_CMD,
2656                preexec_fn=prepare)
2657        except ValueError as err:
2658            # Pure Python implementations keeps the message
2659            self.assertIsNone(subprocess._fork_exec)
2660            self.assertEqual(str(err), "surrogate:\uDCff")
2661        except subprocess.SubprocessError as err:
2662            # _posixsubprocess uses a default message
2663            self.assertIsNotNone(subprocess._fork_exec)
2664            self.assertEqual(str(err), "Exception occurred in preexec_fn.")
2665        else:
2666            self.fail("Expected ValueError or subprocess.SubprocessError")
2667
2668    def test_undecodable_env(self):
2669        for key, value in (('test', 'abc\uDCFF'), ('test\uDCFF', '42')):
2670            encoded_value = value.encode("ascii", "surrogateescape")
2671
2672            # test str with surrogates
2673            script = "import os; print(ascii(os.getenv(%s)))" % repr(key)
2674            env = os.environ.copy()
2675            env[key] = value
2676            # Use C locale to get ASCII for the locale encoding to force
2677            # surrogate-escaping of \xFF in the child process
2678            env['LC_ALL'] = 'C'
2679            decoded_value = value
2680            stdout = subprocess.check_output(
2681                [sys.executable, "-c", script],
2682                env=env)
2683            stdout = stdout.rstrip(b'\n\r')
2684            self.assertEqual(stdout.decode('ascii'), ascii(decoded_value))
2685
2686            # test bytes
2687            key = key.encode("ascii", "surrogateescape")
2688            script = "import os; print(ascii(os.getenvb(%s)))" % repr(key)
2689            env = os.environ.copy()
2690            env[key] = encoded_value
2691            stdout = subprocess.check_output(
2692                [sys.executable, "-c", script],
2693                env=env)
2694            stdout = stdout.rstrip(b'\n\r')
2695            self.assertEqual(stdout.decode('ascii'), ascii(encoded_value))
2696
2697    def test_bytes_program(self):
2698        abs_program = os.fsencode(ZERO_RETURN_CMD[0])
2699        args = list(ZERO_RETURN_CMD[1:])
2700        path, program = os.path.split(ZERO_RETURN_CMD[0])
2701        program = os.fsencode(program)
2702
2703        # absolute bytes path
2704        exitcode = subprocess.call([abs_program]+args)
2705        self.assertEqual(exitcode, 0)
2706
2707        # absolute bytes path as a string
2708        cmd = b"'%s' %s" % (abs_program, " ".join(args).encode("utf-8"))
2709        exitcode = subprocess.call(cmd, shell=True)
2710        self.assertEqual(exitcode, 0)
2711
2712        # bytes program, unicode PATH
2713        env = os.environ.copy()
2714        env["PATH"] = path
2715        exitcode = subprocess.call([program]+args, env=env)
2716        self.assertEqual(exitcode, 0)
2717
2718        # bytes program, bytes PATH
2719        envb = os.environb.copy()
2720        envb[b"PATH"] = os.fsencode(path)
2721        exitcode = subprocess.call([program]+args, env=envb)
2722        self.assertEqual(exitcode, 0)
2723
2724    def test_pipe_cloexec(self):
2725        sleeper = support.findfile("input_reader.py", subdir="subprocessdata")
2726        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2727
2728        p1 = subprocess.Popen([sys.executable, sleeper],
2729                              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
2730                              stderr=subprocess.PIPE, close_fds=False)
2731
2732        self.addCleanup(p1.communicate, b'')
2733
2734        p2 = subprocess.Popen([sys.executable, fd_status],
2735                              stdout=subprocess.PIPE, close_fds=False)
2736
2737        output, error = p2.communicate()
2738        result_fds = set(map(int, output.split(b',')))
2739        unwanted_fds = set([p1.stdin.fileno(), p1.stdout.fileno(),
2740                            p1.stderr.fileno()])
2741
2742        self.assertFalse(result_fds & unwanted_fds,
2743                         "Expected no fds from %r to be open in child, "
2744                         "found %r" %
2745                              (unwanted_fds, result_fds & unwanted_fds))
2746
2747    def test_pipe_cloexec_real_tools(self):
2748        qcat = support.findfile("qcat.py", subdir="subprocessdata")
2749        qgrep = support.findfile("qgrep.py", subdir="subprocessdata")
2750
2751        subdata = b'zxcvbn'
2752        data = subdata * 4 + b'\n'
2753
2754        p1 = subprocess.Popen([sys.executable, qcat],
2755                              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
2756                              close_fds=False)
2757
2758        p2 = subprocess.Popen([sys.executable, qgrep, subdata],
2759                              stdin=p1.stdout, stdout=subprocess.PIPE,
2760                              close_fds=False)
2761
2762        self.addCleanup(p1.wait)
2763        self.addCleanup(p2.wait)
2764        def kill_p1():
2765            try:
2766                p1.terminate()
2767            except ProcessLookupError:
2768                pass
2769        def kill_p2():
2770            try:
2771                p2.terminate()
2772            except ProcessLookupError:
2773                pass
2774        self.addCleanup(kill_p1)
2775        self.addCleanup(kill_p2)
2776
2777        p1.stdin.write(data)
2778        p1.stdin.close()
2779
2780        readfiles, ignored1, ignored2 = select.select([p2.stdout], [], [], 10)
2781
2782        self.assertTrue(readfiles, "The child hung")
2783        self.assertEqual(p2.stdout.read(), data)
2784
2785        p1.stdout.close()
2786        p2.stdout.close()
2787
2788    def test_close_fds(self):
2789        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2790
2791        fds = os.pipe()
2792        self.addCleanup(os.close, fds[0])
2793        self.addCleanup(os.close, fds[1])
2794
2795        open_fds = set(fds)
2796        # add a bunch more fds
2797        for _ in range(9):
2798            fd = os.open(os.devnull, os.O_RDONLY)
2799            self.addCleanup(os.close, fd)
2800            open_fds.add(fd)
2801
2802        for fd in open_fds:
2803            os.set_inheritable(fd, True)
2804
2805        p = subprocess.Popen([sys.executable, fd_status],
2806                             stdout=subprocess.PIPE, close_fds=False)
2807        output, ignored = p.communicate()
2808        remaining_fds = set(map(int, output.split(b',')))
2809
2810        self.assertEqual(remaining_fds & open_fds, open_fds,
2811                         "Some fds were closed")
2812
2813        p = subprocess.Popen([sys.executable, fd_status],
2814                             stdout=subprocess.PIPE, close_fds=True)
2815        output, ignored = p.communicate()
2816        remaining_fds = set(map(int, output.split(b',')))
2817
2818        self.assertFalse(remaining_fds & open_fds,
2819                         "Some fds were left open")
2820        self.assertIn(1, remaining_fds, "Subprocess failed")
2821
2822        # Keep some of the fd's we opened open in the subprocess.
2823        # This tests _posixsubprocess.c's proper handling of fds_to_keep.
2824        fds_to_keep = set(open_fds.pop() for _ in range(8))
2825        p = subprocess.Popen([sys.executable, fd_status],
2826                             stdout=subprocess.PIPE, close_fds=True,
2827                             pass_fds=fds_to_keep)
2828        output, ignored = p.communicate()
2829        remaining_fds = set(map(int, output.split(b',')))
2830
2831        self.assertFalse((remaining_fds - fds_to_keep) & open_fds,
2832                         "Some fds not in pass_fds were left open")
2833        self.assertIn(1, remaining_fds, "Subprocess failed")
2834
2835
2836    @unittest.skipIf(sys.platform.startswith("freebsd") and
2837                     os.stat("/dev").st_dev == os.stat("/dev/fd").st_dev,
2838                     "Requires fdescfs mounted on /dev/fd on FreeBSD")
2839    def test_close_fds_when_max_fd_is_lowered(self):
2840        """Confirm that issue21618 is fixed (may fail under valgrind)."""
2841        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2842
2843        # This launches the meat of the test in a child process to
2844        # avoid messing with the larger unittest processes maximum
2845        # number of file descriptors.
2846        #  This process launches:
2847        #  +--> Process that lowers its RLIMIT_NOFILE aftr setting up
2848        #    a bunch of high open fds above the new lower rlimit.
2849        #    Those are reported via stdout before launching a new
2850        #    process with close_fds=False to run the actual test:
2851        #    +--> The TEST: This one launches a fd_status.py
2852        #      subprocess with close_fds=True so we can find out if
2853        #      any of the fds above the lowered rlimit are still open.
2854        p = subprocess.Popen([sys.executable, '-c', textwrap.dedent(
2855        '''
2856        import os, resource, subprocess, sys, textwrap
2857        open_fds = set()
2858        # Add a bunch more fds to pass down.
2859        for _ in range(40):
2860            fd = os.open(os.devnull, os.O_RDONLY)
2861            open_fds.add(fd)
2862
2863        # Leave a two pairs of low ones available for use by the
2864        # internal child error pipe and the stdout pipe.
2865        # We also leave 10 more open as some Python buildbots run into
2866        # "too many open files" errors during the test if we do not.
2867        for fd in sorted(open_fds)[:14]:
2868            os.close(fd)
2869            open_fds.remove(fd)
2870
2871        for fd in open_fds:
2872            #self.addCleanup(os.close, fd)
2873            os.set_inheritable(fd, True)
2874
2875        max_fd_open = max(open_fds)
2876
2877        # Communicate the open_fds to the parent unittest.TestCase process.
2878        print(','.join(map(str, sorted(open_fds))))
2879        sys.stdout.flush()
2880
2881        rlim_cur, rlim_max = resource.getrlimit(resource.RLIMIT_NOFILE)
2882        try:
2883            # 29 is lower than the highest fds we are leaving open.
2884            resource.setrlimit(resource.RLIMIT_NOFILE, (29, rlim_max))
2885            # Launch a new Python interpreter with our low fd rlim_cur that
2886            # inherits open fds above that limit.  It then uses subprocess
2887            # with close_fds=True to get a report of open fds in the child.
2888            # An explicit list of fds to check is passed to fd_status.py as
2889            # letting fd_status rely on its default logic would miss the
2890            # fds above rlim_cur as it normally only checks up to that limit.
2891            subprocess.Popen(
2892                [sys.executable, '-c',
2893                 textwrap.dedent("""
2894                     import subprocess, sys
2895                     subprocess.Popen([sys.executable, %r] +
2896                                      [str(x) for x in range({max_fd})],
2897                                      close_fds=True).wait()
2898                     """.format(max_fd=max_fd_open+1))],
2899                close_fds=False).wait()
2900        finally:
2901            resource.setrlimit(resource.RLIMIT_NOFILE, (rlim_cur, rlim_max))
2902        ''' % fd_status)], stdout=subprocess.PIPE)
2903
2904        output, unused_stderr = p.communicate()
2905        output_lines = output.splitlines()
2906        self.assertEqual(len(output_lines), 2,
2907                         msg="expected exactly two lines of output:\n%r" % output)
2908        opened_fds = set(map(int, output_lines[0].strip().split(b',')))
2909        remaining_fds = set(map(int, output_lines[1].strip().split(b',')))
2910
2911        self.assertFalse(remaining_fds & opened_fds,
2912                         msg="Some fds were left open.")
2913
2914
2915    # Mac OS X Tiger (10.4) has a kernel bug: sometimes, the file
2916    # descriptor of a pipe closed in the parent process is valid in the
2917    # child process according to fstat(), but the mode of the file
2918    # descriptor is invalid, and read or write raise an error.
2919    @support.requires_mac_ver(10, 5)
2920    def test_pass_fds(self):
2921        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2922
2923        open_fds = set()
2924
2925        for x in range(5):
2926            fds = os.pipe()
2927            self.addCleanup(os.close, fds[0])
2928            self.addCleanup(os.close, fds[1])
2929            os.set_inheritable(fds[0], True)
2930            os.set_inheritable(fds[1], True)
2931            open_fds.update(fds)
2932
2933        for fd in open_fds:
2934            p = subprocess.Popen([sys.executable, fd_status],
2935                                 stdout=subprocess.PIPE, close_fds=True,
2936                                 pass_fds=(fd, ))
2937            output, ignored = p.communicate()
2938
2939            remaining_fds = set(map(int, output.split(b',')))
2940            to_be_closed = open_fds - {fd}
2941
2942            self.assertIn(fd, remaining_fds, "fd to be passed not passed")
2943            self.assertFalse(remaining_fds & to_be_closed,
2944                             "fd to be closed passed")
2945
2946            # pass_fds overrides close_fds with a warning.
2947            with self.assertWarns(RuntimeWarning) as context:
2948                self.assertFalse(subprocess.call(
2949                        ZERO_RETURN_CMD,
2950                        close_fds=False, pass_fds=(fd, )))
2951            self.assertIn('overriding close_fds', str(context.warning))
2952
2953    def test_pass_fds_inheritable(self):
2954        script = support.findfile("fd_status.py", subdir="subprocessdata")
2955
2956        inheritable, non_inheritable = os.pipe()
2957        self.addCleanup(os.close, inheritable)
2958        self.addCleanup(os.close, non_inheritable)
2959        os.set_inheritable(inheritable, True)
2960        os.set_inheritable(non_inheritable, False)
2961        pass_fds = (inheritable, non_inheritable)
2962        args = [sys.executable, script]
2963        args += list(map(str, pass_fds))
2964
2965        p = subprocess.Popen(args,
2966                             stdout=subprocess.PIPE, close_fds=True,
2967                             pass_fds=pass_fds)
2968        output, ignored = p.communicate()
2969        fds = set(map(int, output.split(b',')))
2970
2971        # the inheritable file descriptor must be inherited, so its inheritable
2972        # flag must be set in the child process after fork() and before exec()
2973        self.assertEqual(fds, set(pass_fds), "output=%a" % output)
2974
2975        # inheritable flag must not be changed in the parent process
2976        self.assertEqual(os.get_inheritable(inheritable), True)
2977        self.assertEqual(os.get_inheritable(non_inheritable), False)
2978
2979
2980    # bpo-32270: Ensure that descriptors specified in pass_fds
2981    # are inherited even if they are used in redirections.
2982    # Contributed by @izbyshev.
2983    def test_pass_fds_redirected(self):
2984        """Regression test for https://bugs.python.org/issue32270."""
2985        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2986        pass_fds = []
2987        for _ in range(2):
2988            fd = os.open(os.devnull, os.O_RDWR)
2989            self.addCleanup(os.close, fd)
2990            pass_fds.append(fd)
2991
2992        stdout_r, stdout_w = os.pipe()
2993        self.addCleanup(os.close, stdout_r)
2994        self.addCleanup(os.close, stdout_w)
2995        pass_fds.insert(1, stdout_w)
2996
2997        with subprocess.Popen([sys.executable, fd_status],
2998                              stdin=pass_fds[0],
2999                              stdout=pass_fds[1],
3000                              stderr=pass_fds[2],
3001                              close_fds=True,
3002                              pass_fds=pass_fds):
3003            output = os.read(stdout_r, 1024)
3004        fds = {int(num) for num in output.split(b',')}
3005
3006        self.assertEqual(fds, {0, 1, 2} | frozenset(pass_fds), f"output={output!a}")
3007
3008
3009    def test_stdout_stdin_are_single_inout_fd(self):
3010        with io.open(os.devnull, "r+") as inout:
3011            p = subprocess.Popen(ZERO_RETURN_CMD,
3012                                 stdout=inout, stdin=inout)
3013            p.wait()
3014
3015    def test_stdout_stderr_are_single_inout_fd(self):
3016        with io.open(os.devnull, "r+") as inout:
3017            p = subprocess.Popen(ZERO_RETURN_CMD,
3018                                 stdout=inout, stderr=inout)
3019            p.wait()
3020
3021    def test_stderr_stdin_are_single_inout_fd(self):
3022        with io.open(os.devnull, "r+") as inout:
3023            p = subprocess.Popen(ZERO_RETURN_CMD,
3024                                 stderr=inout, stdin=inout)
3025            p.wait()
3026
3027    def test_wait_when_sigchild_ignored(self):
3028        # NOTE: sigchild_ignore.py may not be an effective test on all OSes.
3029        sigchild_ignore = support.findfile("sigchild_ignore.py",
3030                                           subdir="subprocessdata")
3031        p = subprocess.Popen([sys.executable, sigchild_ignore],
3032                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
3033        stdout, stderr = p.communicate()
3034        self.assertEqual(0, p.returncode, "sigchild_ignore.py exited"
3035                         " non-zero with this error:\n%s" %
3036                         stderr.decode('utf-8'))
3037
3038    def test_select_unbuffered(self):
3039        # Issue #11459: bufsize=0 should really set the pipes as
3040        # unbuffered (and therefore let select() work properly).
3041        select = import_helper.import_module("select")
3042        p = subprocess.Popen([sys.executable, "-c",
3043                              'import sys;'
3044                              'sys.stdout.write("apple")'],
3045                             stdout=subprocess.PIPE,
3046                             bufsize=0)
3047        f = p.stdout
3048        self.addCleanup(f.close)
3049        try:
3050            self.assertEqual(f.read(4), b"appl")
3051            self.assertIn(f, select.select([f], [], [], 0.0)[0])
3052        finally:
3053            p.wait()
3054
3055    def test_zombie_fast_process_del(self):
3056        # Issue #12650: on Unix, if Popen.__del__() was called before the
3057        # process exited, it wouldn't be added to subprocess._active, and would
3058        # remain a zombie.
3059        # spawn a Popen, and delete its reference before it exits
3060        p = subprocess.Popen([sys.executable, "-c",
3061                              'import sys, time;'
3062                              'time.sleep(0.2)'],
3063                             stdout=subprocess.PIPE,
3064                             stderr=subprocess.PIPE)
3065        self.addCleanup(p.stdout.close)
3066        self.addCleanup(p.stderr.close)
3067        ident = id(p)
3068        pid = p.pid
3069        with warnings_helper.check_warnings(('', ResourceWarning)):
3070            p = None
3071
3072        if mswindows:
3073            # subprocess._active is not used on Windows and is set to None.
3074            self.assertIsNone(subprocess._active)
3075        else:
3076            # check that p is in the active processes list
3077            self.assertIn(ident, [id(o) for o in subprocess._active])
3078
3079    def test_leak_fast_process_del_killed(self):
3080        # Issue #12650: on Unix, if Popen.__del__() was called before the
3081        # process exited, and the process got killed by a signal, it would never
3082        # be removed from subprocess._active, which triggered a FD and memory
3083        # leak.
3084        # spawn a Popen, delete its reference and kill it
3085        p = subprocess.Popen([sys.executable, "-c",
3086                              'import time;'
3087                              'time.sleep(3)'],
3088                             stdout=subprocess.PIPE,
3089                             stderr=subprocess.PIPE)
3090        self.addCleanup(p.stdout.close)
3091        self.addCleanup(p.stderr.close)
3092        ident = id(p)
3093        pid = p.pid
3094        with warnings_helper.check_warnings(('', ResourceWarning)):
3095            p = None
3096            support.gc_collect()  # For PyPy or other GCs.
3097
3098        os.kill(pid, signal.SIGKILL)
3099        if mswindows:
3100            # subprocess._active is not used on Windows and is set to None.
3101            self.assertIsNone(subprocess._active)
3102        else:
3103            # check that p is in the active processes list
3104            self.assertIn(ident, [id(o) for o in subprocess._active])
3105
3106        # let some time for the process to exit, and create a new Popen: this
3107        # should trigger the wait() of p
3108        time.sleep(0.2)
3109        with self.assertRaises(OSError):
3110            with subprocess.Popen(NONEXISTING_CMD,
3111                                  stdout=subprocess.PIPE,
3112                                  stderr=subprocess.PIPE) as proc:
3113                pass
3114        # p should have been wait()ed on, and removed from the _active list
3115        self.assertRaises(OSError, os.waitpid, pid, 0)
3116        if mswindows:
3117            # subprocess._active is not used on Windows and is set to None.
3118            self.assertIsNone(subprocess._active)
3119        else:
3120            self.assertNotIn(ident, [id(o) for o in subprocess._active])
3121
3122    def test_close_fds_after_preexec(self):
3123        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
3124
3125        # this FD is used as dup2() target by preexec_fn, and should be closed
3126        # in the child process
3127        fd = os.dup(1)
3128        self.addCleanup(os.close, fd)
3129
3130        p = subprocess.Popen([sys.executable, fd_status],
3131                             stdout=subprocess.PIPE, close_fds=True,
3132                             preexec_fn=lambda: os.dup2(1, fd))
3133        output, ignored = p.communicate()
3134
3135        remaining_fds = set(map(int, output.split(b',')))
3136
3137        self.assertNotIn(fd, remaining_fds)
3138
3139    @support.cpython_only
3140    def test_fork_exec(self):
3141        # Issue #22290: fork_exec() must not crash on memory allocation failure
3142        # or other errors
3143        import _posixsubprocess
3144        gc_enabled = gc.isenabled()
3145        try:
3146            # Use a preexec function and enable the garbage collector
3147            # to force fork_exec() to re-enable the garbage collector
3148            # on error.
3149            func = lambda: None
3150            gc.enable()
3151
3152            for args, exe_list, cwd, env_list in (
3153                (123,      [b"exe"], None, [b"env"]),
3154                ([b"arg"], 123,      None, [b"env"]),
3155                ([b"arg"], [b"exe"], 123,  [b"env"]),
3156                ([b"arg"], [b"exe"], None, 123),
3157            ):
3158                with self.assertRaises(TypeError) as err:
3159                    _posixsubprocess.fork_exec(
3160                        args, exe_list,
3161                        True, (), cwd, env_list,
3162                        -1, -1, -1, -1,
3163                        1, 2, 3, 4,
3164                        True, True, 0,
3165                        False, [], 0, -1,
3166                        func, False)
3167                # Attempt to prevent
3168                # "TypeError: fork_exec() takes exactly N arguments (M given)"
3169                # from passing the test.  More refactoring to have us start
3170                # with a valid *args list, confirm a good call with that works
3171                # before mutating it in various ways to ensure that bad calls
3172                # with individual arg type errors raise a typeerror would be
3173                # ideal.  Saving that for a future PR...
3174                self.assertNotIn('takes exactly', str(err.exception))
3175        finally:
3176            if not gc_enabled:
3177                gc.disable()
3178
3179    @support.cpython_only
3180    def test_fork_exec_sorted_fd_sanity_check(self):
3181        # Issue #23564: sanity check the fork_exec() fds_to_keep sanity check.
3182        import _posixsubprocess
3183        class BadInt:
3184            first = True
3185            def __init__(self, value):
3186                self.value = value
3187            def __int__(self):
3188                if self.first:
3189                    self.first = False
3190                    return self.value
3191                raise ValueError
3192
3193        gc_enabled = gc.isenabled()
3194        try:
3195            gc.enable()
3196
3197            for fds_to_keep in (
3198                (-1, 2, 3, 4, 5),  # Negative number.
3199                ('str', 4),  # Not an int.
3200                (18, 23, 42, 2**63),  # Out of range.
3201                (5, 4),  # Not sorted.
3202                (6, 7, 7, 8),  # Duplicate.
3203                (BadInt(1), BadInt(2)),
3204            ):
3205                with self.assertRaises(
3206                        ValueError,
3207                        msg='fds_to_keep={}'.format(fds_to_keep)) as c:
3208                    _posixsubprocess.fork_exec(
3209                        [b"false"], [b"false"],
3210                        True, fds_to_keep, None, [b"env"],
3211                        -1, -1, -1, -1,
3212                        1, 2, 3, 4,
3213                        True, True, 0,
3214                        None, None, None, -1,
3215                        None, True)
3216                self.assertIn('fds_to_keep', str(c.exception))
3217        finally:
3218            if not gc_enabled:
3219                gc.disable()
3220
3221    def test_communicate_BrokenPipeError_stdin_close(self):
3222        # By not setting stdout or stderr or a timeout we force the fast path
3223        # that just calls _stdin_write() internally due to our mock.
3224        proc = subprocess.Popen(ZERO_RETURN_CMD)
3225        with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin:
3226            mock_proc_stdin.close.side_effect = BrokenPipeError
3227            proc.communicate()  # Should swallow BrokenPipeError from close.
3228            mock_proc_stdin.close.assert_called_with()
3229
3230    def test_communicate_BrokenPipeError_stdin_write(self):
3231        # By not setting stdout or stderr or a timeout we force the fast path
3232        # that just calls _stdin_write() internally due to our mock.
3233        proc = subprocess.Popen(ZERO_RETURN_CMD)
3234        with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin:
3235            mock_proc_stdin.write.side_effect = BrokenPipeError
3236            proc.communicate(b'stuff')  # Should swallow the BrokenPipeError.
3237            mock_proc_stdin.write.assert_called_once_with(b'stuff')
3238            mock_proc_stdin.close.assert_called_once_with()
3239
3240    def test_communicate_BrokenPipeError_stdin_flush(self):
3241        # Setting stdin and stdout forces the ._communicate() code path.
3242        # python -h exits faster than python -c pass (but spams stdout).
3243        proc = subprocess.Popen([sys.executable, '-h'],
3244                                stdin=subprocess.PIPE,
3245                                stdout=subprocess.PIPE)
3246        with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin, \
3247                open(os.devnull, 'wb') as dev_null:
3248            mock_proc_stdin.flush.side_effect = BrokenPipeError
3249            # because _communicate registers a selector using proc.stdin...
3250            mock_proc_stdin.fileno.return_value = dev_null.fileno()
3251            # _communicate() should swallow BrokenPipeError from flush.
3252            proc.communicate(b'stuff')
3253            mock_proc_stdin.flush.assert_called_once_with()
3254
3255    def test_communicate_BrokenPipeError_stdin_close_with_timeout(self):
3256        # Setting stdin and stdout forces the ._communicate() code path.
3257        # python -h exits faster than python -c pass (but spams stdout).
3258        proc = subprocess.Popen([sys.executable, '-h'],
3259                                stdin=subprocess.PIPE,
3260                                stdout=subprocess.PIPE)
3261        with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin:
3262            mock_proc_stdin.close.side_effect = BrokenPipeError
3263            # _communicate() should swallow BrokenPipeError from close.
3264            proc.communicate(timeout=999)
3265            mock_proc_stdin.close.assert_called_once_with()
3266
3267    @unittest.skipUnless(_testcapi is not None
3268                         and hasattr(_testcapi, 'W_STOPCODE'),
3269                         'need _testcapi.W_STOPCODE')
3270    def test_stopped(self):
3271        """Test wait() behavior when waitpid returns WIFSTOPPED; issue29335."""
3272        args = ZERO_RETURN_CMD
3273        proc = subprocess.Popen(args)
3274
3275        # Wait until the real process completes to avoid zombie process
3276        support.wait_process(proc.pid, exitcode=0)
3277
3278        status = _testcapi.W_STOPCODE(3)
3279        with mock.patch('subprocess.os.waitpid', return_value=(proc.pid, status)):
3280            returncode = proc.wait()
3281
3282        self.assertEqual(returncode, -3)
3283
3284    def test_send_signal_race(self):
3285        # bpo-38630: send_signal() must poll the process exit status to reduce
3286        # the risk of sending the signal to the wrong process.
3287        proc = subprocess.Popen(ZERO_RETURN_CMD)
3288
3289        # wait until the process completes without using the Popen APIs.
3290        support.wait_process(proc.pid, exitcode=0)
3291
3292        # returncode is still None but the process completed.
3293        self.assertIsNone(proc.returncode)
3294
3295        with mock.patch("os.kill") as mock_kill:
3296            proc.send_signal(signal.SIGTERM)
3297
3298        # send_signal() didn't call os.kill() since the process already
3299        # completed.
3300        mock_kill.assert_not_called()
3301
3302        # Don't check the returncode value: the test reads the exit status,
3303        # so Popen failed to read it and uses a default returncode instead.
3304        self.assertIsNotNone(proc.returncode)
3305
3306    def test_send_signal_race2(self):
3307        # bpo-40550: the process might exist between the returncode check and
3308        # the kill operation
3309        p = subprocess.Popen([sys.executable, '-c', 'exit(1)'])
3310
3311        # wait for process to exit
3312        while not p.returncode:
3313            p.poll()
3314
3315        with mock.patch.object(p, 'poll', new=lambda: None):
3316            p.returncode = None
3317            p.send_signal(signal.SIGTERM)
3318        p.kill()
3319
3320    def test_communicate_repeated_call_after_stdout_close(self):
3321        proc = subprocess.Popen([sys.executable, '-c',
3322                                 'import os, time; os.close(1), time.sleep(2)'],
3323                                stdout=subprocess.PIPE)
3324        while True:
3325            try:
3326                proc.communicate(timeout=0.1)
3327                return
3328            except subprocess.TimeoutExpired:
3329                pass
3330
3331
3332@unittest.skipUnless(mswindows, "Windows specific tests")
3333class Win32ProcessTestCase(BaseTestCase):
3334
3335    def test_startupinfo(self):
3336        # startupinfo argument
3337        # We uses hardcoded constants, because we do not want to
3338        # depend on win32all.
3339        STARTF_USESHOWWINDOW = 1
3340        SW_MAXIMIZE = 3
3341        startupinfo = subprocess.STARTUPINFO()
3342        startupinfo.dwFlags = STARTF_USESHOWWINDOW
3343        startupinfo.wShowWindow = SW_MAXIMIZE
3344        # Since Python is a console process, it won't be affected
3345        # by wShowWindow, but the argument should be silently
3346        # ignored
3347        subprocess.call(ZERO_RETURN_CMD,
3348                        startupinfo=startupinfo)
3349
3350    def test_startupinfo_keywords(self):
3351        # startupinfo argument
3352        # We use hardcoded constants, because we do not want to
3353        # depend on win32all.
3354        STARTF_USERSHOWWINDOW = 1
3355        SW_MAXIMIZE = 3
3356        startupinfo = subprocess.STARTUPINFO(
3357            dwFlags=STARTF_USERSHOWWINDOW,
3358            wShowWindow=SW_MAXIMIZE
3359        )
3360        # Since Python is a console process, it won't be affected
3361        # by wShowWindow, but the argument should be silently
3362        # ignored
3363        subprocess.call(ZERO_RETURN_CMD,
3364                        startupinfo=startupinfo)
3365
3366    def test_startupinfo_copy(self):
3367        # bpo-34044: Popen must not modify input STARTUPINFO structure
3368        startupinfo = subprocess.STARTUPINFO()
3369        startupinfo.dwFlags = subprocess.STARTF_USESHOWWINDOW
3370        startupinfo.wShowWindow = subprocess.SW_HIDE
3371
3372        # Call Popen() twice with the same startupinfo object to make sure
3373        # that it's not modified
3374        for _ in range(2):
3375            cmd = ZERO_RETURN_CMD
3376            with open(os.devnull, 'w') as null:
3377                proc = subprocess.Popen(cmd,
3378                                        stdout=null,
3379                                        stderr=subprocess.STDOUT,
3380                                        startupinfo=startupinfo)
3381                with proc:
3382                    proc.communicate()
3383                self.assertEqual(proc.returncode, 0)
3384
3385            self.assertEqual(startupinfo.dwFlags,
3386                             subprocess.STARTF_USESHOWWINDOW)
3387            self.assertIsNone(startupinfo.hStdInput)
3388            self.assertIsNone(startupinfo.hStdOutput)
3389            self.assertIsNone(startupinfo.hStdError)
3390            self.assertEqual(startupinfo.wShowWindow, subprocess.SW_HIDE)
3391            self.assertEqual(startupinfo.lpAttributeList, {"handle_list": []})
3392
3393    def test_creationflags(self):
3394        # creationflags argument
3395        CREATE_NEW_CONSOLE = 16
3396        sys.stderr.write("    a DOS box should flash briefly ...\n")
3397        subprocess.call(sys.executable +
3398                        ' -c "import time; time.sleep(0.25)"',
3399                        creationflags=CREATE_NEW_CONSOLE)
3400
3401    def test_invalid_args(self):
3402        # invalid arguments should raise ValueError
3403        self.assertRaises(ValueError, subprocess.call,
3404                          [sys.executable, "-c",
3405                           "import sys; sys.exit(47)"],
3406                          preexec_fn=lambda: 1)
3407
3408    @support.cpython_only
3409    def test_issue31471(self):
3410        # There shouldn't be an assertion failure in Popen() in case the env
3411        # argument has a bad keys() method.
3412        class BadEnv(dict):
3413            keys = None
3414        with self.assertRaises(TypeError):
3415            subprocess.Popen(ZERO_RETURN_CMD, env=BadEnv())
3416
3417    def test_close_fds(self):
3418        # close file descriptors
3419        rc = subprocess.call([sys.executable, "-c",
3420                              "import sys; sys.exit(47)"],
3421                              close_fds=True)
3422        self.assertEqual(rc, 47)
3423
3424    def test_close_fds_with_stdio(self):
3425        import msvcrt
3426
3427        fds = os.pipe()
3428        self.addCleanup(os.close, fds[0])
3429        self.addCleanup(os.close, fds[1])
3430
3431        handles = []
3432        for fd in fds:
3433            os.set_inheritable(fd, True)
3434            handles.append(msvcrt.get_osfhandle(fd))
3435
3436        p = subprocess.Popen([sys.executable, "-c",
3437                              "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])],
3438                             stdout=subprocess.PIPE, close_fds=False)
3439        stdout, stderr = p.communicate()
3440        self.assertEqual(p.returncode, 0)
3441        int(stdout.strip())  # Check that stdout is an integer
3442
3443        p = subprocess.Popen([sys.executable, "-c",
3444                              "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])],
3445                             stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
3446        stdout, stderr = p.communicate()
3447        self.assertEqual(p.returncode, 1)
3448        self.assertIn(b"OSError", stderr)
3449
3450        # The same as the previous call, but with an empty handle_list
3451        handle_list = []
3452        startupinfo = subprocess.STARTUPINFO()
3453        startupinfo.lpAttributeList = {"handle_list": handle_list}
3454        p = subprocess.Popen([sys.executable, "-c",
3455                              "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])],
3456                             stdout=subprocess.PIPE, stderr=subprocess.PIPE,
3457                             startupinfo=startupinfo, close_fds=True)
3458        stdout, stderr = p.communicate()
3459        self.assertEqual(p.returncode, 1)
3460        self.assertIn(b"OSError", stderr)
3461
3462        # Check for a warning due to using handle_list and close_fds=False
3463        with warnings_helper.check_warnings((".*overriding close_fds",
3464                                             RuntimeWarning)):
3465            startupinfo = subprocess.STARTUPINFO()
3466            startupinfo.lpAttributeList = {"handle_list": handles[:]}
3467            p = subprocess.Popen([sys.executable, "-c",
3468                                  "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])],
3469                                 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
3470                                 startupinfo=startupinfo, close_fds=False)
3471            stdout, stderr = p.communicate()
3472            self.assertEqual(p.returncode, 0)
3473
3474    def test_empty_attribute_list(self):
3475        startupinfo = subprocess.STARTUPINFO()
3476        startupinfo.lpAttributeList = {}
3477        subprocess.call(ZERO_RETURN_CMD,
3478                        startupinfo=startupinfo)
3479
3480    def test_empty_handle_list(self):
3481        startupinfo = subprocess.STARTUPINFO()
3482        startupinfo.lpAttributeList = {"handle_list": []}
3483        subprocess.call(ZERO_RETURN_CMD,
3484                        startupinfo=startupinfo)
3485
3486    def test_shell_sequence(self):
3487        # Run command through the shell (sequence)
3488        newenv = os.environ.copy()
3489        newenv["FRUIT"] = "physalis"
3490        p = subprocess.Popen(["set"], shell=1,
3491                             stdout=subprocess.PIPE,
3492                             env=newenv)
3493        with p:
3494            self.assertIn(b"physalis", p.stdout.read())
3495
3496    def test_shell_string(self):
3497        # Run command through the shell (string)
3498        newenv = os.environ.copy()
3499        newenv["FRUIT"] = "physalis"
3500        p = subprocess.Popen("set", shell=1,
3501                             stdout=subprocess.PIPE,
3502                             env=newenv)
3503        with p:
3504            self.assertIn(b"physalis", p.stdout.read())
3505
3506    def test_shell_encodings(self):
3507        # Run command through the shell (string)
3508        for enc in ['ansi', 'oem']:
3509            newenv = os.environ.copy()
3510            newenv["FRUIT"] = "physalis"
3511            p = subprocess.Popen("set", shell=1,
3512                                 stdout=subprocess.PIPE,
3513                                 env=newenv,
3514                                 encoding=enc)
3515            with p:
3516                self.assertIn("physalis", p.stdout.read(), enc)
3517
3518    def test_call_string(self):
3519        # call() function with string argument on Windows
3520        rc = subprocess.call(sys.executable +
3521                             ' -c "import sys; sys.exit(47)"')
3522        self.assertEqual(rc, 47)
3523
3524    def _kill_process(self, method, *args):
3525        # Some win32 buildbot raises EOFError if stdin is inherited
3526        p = subprocess.Popen([sys.executable, "-c", """if 1:
3527                             import sys, time
3528                             sys.stdout.write('x\\n')
3529                             sys.stdout.flush()
3530                             time.sleep(30)
3531                             """],
3532                             stdin=subprocess.PIPE,
3533                             stdout=subprocess.PIPE,
3534                             stderr=subprocess.PIPE)
3535        with p:
3536            # Wait for the interpreter to be completely initialized before
3537            # sending any signal.
3538            p.stdout.read(1)
3539            getattr(p, method)(*args)
3540            _, stderr = p.communicate()
3541            self.assertEqual(stderr, b'')
3542            returncode = p.wait()
3543        self.assertNotEqual(returncode, 0)
3544
3545    def _kill_dead_process(self, method, *args):
3546        p = subprocess.Popen([sys.executable, "-c", """if 1:
3547                             import sys, time
3548                             sys.stdout.write('x\\n')
3549                             sys.stdout.flush()
3550                             sys.exit(42)
3551                             """],
3552                             stdin=subprocess.PIPE,
3553                             stdout=subprocess.PIPE,
3554                             stderr=subprocess.PIPE)
3555        with p:
3556            # Wait for the interpreter to be completely initialized before
3557            # sending any signal.
3558            p.stdout.read(1)
3559            # The process should end after this
3560            time.sleep(1)
3561            # This shouldn't raise even though the child is now dead
3562            getattr(p, method)(*args)
3563            _, stderr = p.communicate()
3564            self.assertEqual(stderr, b'')
3565            rc = p.wait()
3566        self.assertEqual(rc, 42)
3567
3568    def test_send_signal(self):
3569        self._kill_process('send_signal', signal.SIGTERM)
3570
3571    def test_kill(self):
3572        self._kill_process('kill')
3573
3574    def test_terminate(self):
3575        self._kill_process('terminate')
3576
3577    def test_send_signal_dead(self):
3578        self._kill_dead_process('send_signal', signal.SIGTERM)
3579
3580    def test_kill_dead(self):
3581        self._kill_dead_process('kill')
3582
3583    def test_terminate_dead(self):
3584        self._kill_dead_process('terminate')
3585
3586class MiscTests(unittest.TestCase):
3587
3588    class RecordingPopen(subprocess.Popen):
3589        """A Popen that saves a reference to each instance for testing."""
3590        instances_created = []
3591
3592        def __init__(self, *args, **kwargs):
3593            super().__init__(*args, **kwargs)
3594            self.instances_created.append(self)
3595
3596    @mock.patch.object(subprocess.Popen, "_communicate")
3597    def _test_keyboardinterrupt_no_kill(self, popener, mock__communicate,
3598                                        **kwargs):
3599        """Fake a SIGINT happening during Popen._communicate() and ._wait().
3600
3601        This avoids the need to actually try and get test environments to send
3602        and receive signals reliably across platforms.  The net effect of a ^C
3603        happening during a blocking subprocess execution which we want to clean
3604        up from is a KeyboardInterrupt coming out of communicate() or wait().
3605        """
3606
3607        mock__communicate.side_effect = KeyboardInterrupt
3608        try:
3609            with mock.patch.object(subprocess.Popen, "_wait") as mock__wait:
3610                # We patch out _wait() as no signal was involved so the
3611                # child process isn't actually going to exit rapidly.
3612                mock__wait.side_effect = KeyboardInterrupt
3613                with mock.patch.object(subprocess, "Popen",
3614                                       self.RecordingPopen):
3615                    with self.assertRaises(KeyboardInterrupt):
3616                        popener([sys.executable, "-c",
3617                                 "import time\ntime.sleep(9)\nimport sys\n"
3618                                 "sys.stderr.write('\\n!runaway child!\\n')"],
3619                                stdout=subprocess.DEVNULL, **kwargs)
3620                for call in mock__wait.call_args_list[1:]:
3621                    self.assertNotEqual(
3622                            call, mock.call(timeout=None),
3623                            "no open-ended wait() after the first allowed: "
3624                            f"{mock__wait.call_args_list}")
3625                sigint_calls = []
3626                for call in mock__wait.call_args_list:
3627                    if call == mock.call(timeout=0.25):  # from Popen.__init__
3628                        sigint_calls.append(call)
3629                self.assertLessEqual(mock__wait.call_count, 2,
3630                                     msg=mock__wait.call_args_list)
3631                self.assertEqual(len(sigint_calls), 1,
3632                                 msg=mock__wait.call_args_list)
3633        finally:
3634            # cleanup the forgotten (due to our mocks) child process
3635            process = self.RecordingPopen.instances_created.pop()
3636            process.kill()
3637            process.wait()
3638            self.assertEqual([], self.RecordingPopen.instances_created)
3639
3640    def test_call_keyboardinterrupt_no_kill(self):
3641        self._test_keyboardinterrupt_no_kill(subprocess.call, timeout=6.282)
3642
3643    def test_run_keyboardinterrupt_no_kill(self):
3644        self._test_keyboardinterrupt_no_kill(subprocess.run, timeout=6.282)
3645
3646    def test_context_manager_keyboardinterrupt_no_kill(self):
3647        def popen_via_context_manager(*args, **kwargs):
3648            with subprocess.Popen(*args, **kwargs) as unused_process:
3649                raise KeyboardInterrupt  # Test how __exit__ handles ^C.
3650        self._test_keyboardinterrupt_no_kill(popen_via_context_manager)
3651
3652    def test_getoutput(self):
3653        self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy')
3654        self.assertEqual(subprocess.getstatusoutput('echo xyzzy'),
3655                         (0, 'xyzzy'))
3656
3657        # we use mkdtemp in the next line to create an empty directory
3658        # under our exclusive control; from that, we can invent a pathname
3659        # that we _know_ won't exist.  This is guaranteed to fail.
3660        dir = None
3661        try:
3662            dir = tempfile.mkdtemp()
3663            name = os.path.join(dir, "foo")
3664            status, output = subprocess.getstatusoutput(
3665                ("type " if mswindows else "cat ") + name)
3666            self.assertNotEqual(status, 0)
3667        finally:
3668            if dir is not None:
3669                os.rmdir(dir)
3670
3671    def test__all__(self):
3672        """Ensure that __all__ is populated properly."""
3673        intentionally_excluded = {"list2cmdline", "Handle", "pwd", "grp", "fcntl"}
3674        exported = set(subprocess.__all__)
3675        possible_exports = set()
3676        import types
3677        for name, value in subprocess.__dict__.items():
3678            if name.startswith('_'):
3679                continue
3680            if isinstance(value, (types.ModuleType,)):
3681                continue
3682            possible_exports.add(name)
3683        self.assertEqual(exported, possible_exports - intentionally_excluded)
3684
3685
3686@unittest.skipUnless(hasattr(selectors, 'PollSelector'),
3687                     "Test needs selectors.PollSelector")
3688class ProcessTestCaseNoPoll(ProcessTestCase):
3689    def setUp(self):
3690        self.orig_selector = subprocess._PopenSelector
3691        subprocess._PopenSelector = selectors.SelectSelector
3692        ProcessTestCase.setUp(self)
3693
3694    def tearDown(self):
3695        subprocess._PopenSelector = self.orig_selector
3696        ProcessTestCase.tearDown(self)
3697
3698
3699@unittest.skipUnless(mswindows, "Windows-specific tests")
3700class CommandsWithSpaces (BaseTestCase):
3701
3702    def setUp(self):
3703        super().setUp()
3704        f, fname = tempfile.mkstemp(".py", "te st")
3705        self.fname = fname.lower ()
3706        os.write(f, b"import sys;"
3707                    b"sys.stdout.write('%d %s' % (len(sys.argv), [a.lower () for a in sys.argv]))"
3708        )
3709        os.close(f)
3710
3711    def tearDown(self):
3712        os.remove(self.fname)
3713        super().tearDown()
3714
3715    def with_spaces(self, *args, **kwargs):
3716        kwargs['stdout'] = subprocess.PIPE
3717        p = subprocess.Popen(*args, **kwargs)
3718        with p:
3719            self.assertEqual(
3720              p.stdout.read ().decode("mbcs"),
3721              "2 [%r, 'ab cd']" % self.fname
3722            )
3723
3724    def test_shell_string_with_spaces(self):
3725        # call() function with string argument with spaces on Windows
3726        self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
3727                                             "ab cd"), shell=1)
3728
3729    def test_shell_sequence_with_spaces(self):
3730        # call() function with sequence argument with spaces on Windows
3731        self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1)
3732
3733    def test_noshell_string_with_spaces(self):
3734        # call() function with string argument with spaces on Windows
3735        self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
3736                             "ab cd"))
3737
3738    def test_noshell_sequence_with_spaces(self):
3739        # call() function with sequence argument with spaces on Windows
3740        self.with_spaces([sys.executable, self.fname, "ab cd"])
3741
3742
3743class ContextManagerTests(BaseTestCase):
3744
3745    def test_pipe(self):
3746        with subprocess.Popen([sys.executable, "-c",
3747                               "import sys;"
3748                               "sys.stdout.write('stdout');"
3749                               "sys.stderr.write('stderr');"],
3750                              stdout=subprocess.PIPE,
3751                              stderr=subprocess.PIPE) as proc:
3752            self.assertEqual(proc.stdout.read(), b"stdout")
3753            self.assertEqual(proc.stderr.read(), b"stderr")
3754
3755        self.assertTrue(proc.stdout.closed)
3756        self.assertTrue(proc.stderr.closed)
3757
3758    def test_returncode(self):
3759        with subprocess.Popen([sys.executable, "-c",
3760                               "import sys; sys.exit(100)"]) as proc:
3761            pass
3762        # __exit__ calls wait(), so the returncode should be set
3763        self.assertEqual(proc.returncode, 100)
3764
3765    def test_communicate_stdin(self):
3766        with subprocess.Popen([sys.executable, "-c",
3767                              "import sys;"
3768                              "sys.exit(sys.stdin.read() == 'context')"],
3769                             stdin=subprocess.PIPE) as proc:
3770            proc.communicate(b"context")
3771            self.assertEqual(proc.returncode, 1)
3772
3773    def test_invalid_args(self):
3774        with self.assertRaises(NONEXISTING_ERRORS):
3775            with subprocess.Popen(NONEXISTING_CMD,
3776                                  stdout=subprocess.PIPE,
3777                                  stderr=subprocess.PIPE) as proc:
3778                pass
3779
3780    def test_broken_pipe_cleanup(self):
3781        """Broken pipe error should not prevent wait() (Issue 21619)"""
3782        proc = subprocess.Popen(ZERO_RETURN_CMD,
3783                                stdin=subprocess.PIPE,
3784                                bufsize=support.PIPE_MAX_SIZE*2)
3785        proc = proc.__enter__()
3786        # Prepare to send enough data to overflow any OS pipe buffering and
3787        # guarantee a broken pipe error. Data is held in BufferedWriter
3788        # buffer until closed.
3789        proc.stdin.write(b'x' * support.PIPE_MAX_SIZE)
3790        self.assertIsNone(proc.returncode)
3791        # EPIPE expected under POSIX; EINVAL under Windows
3792        self.assertRaises(OSError, proc.__exit__, None, None, None)
3793        self.assertEqual(proc.returncode, 0)
3794        self.assertTrue(proc.stdin.closed)
3795
3796
3797if __name__ == "__main__":
3798    unittest.main()
3799