1import unittest 2from unittest import mock 3from test import support 4from test.support import check_sanitizer 5from test.support import import_helper 6from test.support import os_helper 7from test.support import warnings_helper 8import subprocess 9import sys 10import signal 11import io 12import itertools 13import os 14import errno 15import tempfile 16import time 17import traceback 18import types 19import selectors 20import sysconfig 21import select 22import shutil 23import threading 24import gc 25import textwrap 26import json 27import pathlib 28from test.support.os_helper import FakePath 29 30try: 31 import _testcapi 32except ImportError: 33 _testcapi = None 34 35try: 36 import pwd 37except ImportError: 38 pwd = None 39try: 40 import grp 41except ImportError: 42 grp = None 43 44try: 45 import fcntl 46except: 47 fcntl = None 48 49if support.PGO: 50 raise unittest.SkipTest("test is not helpful for PGO") 51 52if not support.has_subprocess_support: 53 raise unittest.SkipTest("test module requires subprocess") 54 55mswindows = (sys.platform == "win32") 56 57# 58# Depends on the following external programs: Python 59# 60 61if mswindows: 62 SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), ' 63 'os.O_BINARY);') 64else: 65 SETBINARY = '' 66 67NONEXISTING_CMD = ('nonexisting_i_hope',) 68# Ignore errors that indicate the command was not found 69NONEXISTING_ERRORS = (FileNotFoundError, NotADirectoryError, PermissionError) 70 71ZERO_RETURN_CMD = (sys.executable, '-c', 'pass') 72 73 74def setUpModule(): 75 shell_true = shutil.which('true') 76 if shell_true is None: 77 return 78 if (os.access(shell_true, os.X_OK) and 79 subprocess.run([shell_true]).returncode == 0): 80 global ZERO_RETURN_CMD 81 ZERO_RETURN_CMD = (shell_true,) # Faster than Python startup. 82 83 84class BaseTestCase(unittest.TestCase): 85 def setUp(self): 86 # Try to minimize the number of children we have so this test 87 # doesn't crash on some buildbots (Alphas in particular). 88 support.reap_children() 89 90 def tearDown(self): 91 if not mswindows: 92 # subprocess._active is not used on Windows and is set to None. 93 for inst in subprocess._active: 94 inst.wait() 95 subprocess._cleanup() 96 self.assertFalse( 97 subprocess._active, "subprocess._active not empty" 98 ) 99 self.doCleanups() 100 support.reap_children() 101 102 103class PopenTestException(Exception): 104 pass 105 106 107class PopenExecuteChildRaises(subprocess.Popen): 108 """Popen subclass for testing cleanup of subprocess.PIPE filehandles when 109 _execute_child fails. 110 """ 111 def _execute_child(self, *args, **kwargs): 112 raise PopenTestException("Forced Exception for Test") 113 114 115class ProcessTestCase(BaseTestCase): 116 117 def test_io_buffered_by_default(self): 118 p = subprocess.Popen(ZERO_RETURN_CMD, 119 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 120 stderr=subprocess.PIPE) 121 try: 122 self.assertIsInstance(p.stdin, io.BufferedIOBase) 123 self.assertIsInstance(p.stdout, io.BufferedIOBase) 124 self.assertIsInstance(p.stderr, io.BufferedIOBase) 125 finally: 126 p.stdin.close() 127 p.stdout.close() 128 p.stderr.close() 129 p.wait() 130 131 def test_io_unbuffered_works(self): 132 p = subprocess.Popen(ZERO_RETURN_CMD, 133 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 134 stderr=subprocess.PIPE, bufsize=0) 135 try: 136 self.assertIsInstance(p.stdin, io.RawIOBase) 137 self.assertIsInstance(p.stdout, io.RawIOBase) 138 self.assertIsInstance(p.stderr, io.RawIOBase) 139 finally: 140 p.stdin.close() 141 p.stdout.close() 142 p.stderr.close() 143 p.wait() 144 145 def test_call_seq(self): 146 # call() function with sequence argument 147 rc = subprocess.call([sys.executable, "-c", 148 "import sys; sys.exit(47)"]) 149 self.assertEqual(rc, 47) 150 151 def test_call_timeout(self): 152 # call() function with timeout argument; we want to test that the child 153 # process gets killed when the timeout expires. If the child isn't 154 # killed, this call will deadlock since subprocess.call waits for the 155 # child. 156 self.assertRaises(subprocess.TimeoutExpired, subprocess.call, 157 [sys.executable, "-c", "while True: pass"], 158 timeout=0.1) 159 160 def test_check_call_zero(self): 161 # check_call() function with zero return code 162 rc = subprocess.check_call(ZERO_RETURN_CMD) 163 self.assertEqual(rc, 0) 164 165 def test_check_call_nonzero(self): 166 # check_call() function with non-zero return code 167 with self.assertRaises(subprocess.CalledProcessError) as c: 168 subprocess.check_call([sys.executable, "-c", 169 "import sys; sys.exit(47)"]) 170 self.assertEqual(c.exception.returncode, 47) 171 172 def test_check_output(self): 173 # check_output() function with zero return code 174 output = subprocess.check_output( 175 [sys.executable, "-c", "print('BDFL')"]) 176 self.assertIn(b'BDFL', output) 177 178 with self.assertRaisesRegex(ValueError, 179 "stdout argument not allowed, it will be overridden"): 180 subprocess.check_output([], stdout=None) 181 182 with self.assertRaisesRegex(ValueError, 183 "check argument not allowed, it will be overridden"): 184 subprocess.check_output([], check=False) 185 186 def test_check_output_nonzero(self): 187 # check_call() function with non-zero return code 188 with self.assertRaises(subprocess.CalledProcessError) as c: 189 subprocess.check_output( 190 [sys.executable, "-c", "import sys; sys.exit(5)"]) 191 self.assertEqual(c.exception.returncode, 5) 192 193 def test_check_output_stderr(self): 194 # check_output() function stderr redirected to stdout 195 output = subprocess.check_output( 196 [sys.executable, "-c", "import sys; sys.stderr.write('BDFL')"], 197 stderr=subprocess.STDOUT) 198 self.assertIn(b'BDFL', output) 199 200 def test_check_output_stdin_arg(self): 201 # check_output() can be called with stdin set to a file 202 tf = tempfile.TemporaryFile() 203 self.addCleanup(tf.close) 204 tf.write(b'pear') 205 tf.seek(0) 206 output = subprocess.check_output( 207 [sys.executable, "-c", 208 "import sys; sys.stdout.write(sys.stdin.read().upper())"], 209 stdin=tf) 210 self.assertIn(b'PEAR', output) 211 212 def test_check_output_input_arg(self): 213 # check_output() can be called with input set to a string 214 output = subprocess.check_output( 215 [sys.executable, "-c", 216 "import sys; sys.stdout.write(sys.stdin.read().upper())"], 217 input=b'pear') 218 self.assertIn(b'PEAR', output) 219 220 def test_check_output_input_none(self): 221 """input=None has a legacy meaning of input='' on check_output.""" 222 output = subprocess.check_output( 223 [sys.executable, "-c", 224 "import sys; print('XX' if sys.stdin.read() else '')"], 225 input=None) 226 self.assertNotIn(b'XX', output) 227 228 def test_check_output_input_none_text(self): 229 output = subprocess.check_output( 230 [sys.executable, "-c", 231 "import sys; print('XX' if sys.stdin.read() else '')"], 232 input=None, text=True) 233 self.assertNotIn('XX', output) 234 235 def test_check_output_input_none_universal_newlines(self): 236 output = subprocess.check_output( 237 [sys.executable, "-c", 238 "import sys; print('XX' if sys.stdin.read() else '')"], 239 input=None, universal_newlines=True) 240 self.assertNotIn('XX', output) 241 242 def test_check_output_input_none_encoding_errors(self): 243 output = subprocess.check_output( 244 [sys.executable, "-c", "print('foo')"], 245 input=None, encoding='utf-8', errors='ignore') 246 self.assertIn('foo', output) 247 248 def test_check_output_stdout_arg(self): 249 # check_output() refuses to accept 'stdout' argument 250 with self.assertRaises(ValueError) as c: 251 output = subprocess.check_output( 252 [sys.executable, "-c", "print('will not be run')"], 253 stdout=sys.stdout) 254 self.fail("Expected ValueError when stdout arg supplied.") 255 self.assertIn('stdout', c.exception.args[0]) 256 257 def test_check_output_stdin_with_input_arg(self): 258 # check_output() refuses to accept 'stdin' with 'input' 259 tf = tempfile.TemporaryFile() 260 self.addCleanup(tf.close) 261 tf.write(b'pear') 262 tf.seek(0) 263 with self.assertRaises(ValueError) as c: 264 output = subprocess.check_output( 265 [sys.executable, "-c", "print('will not be run')"], 266 stdin=tf, input=b'hare') 267 self.fail("Expected ValueError when stdin and input args supplied.") 268 self.assertIn('stdin', c.exception.args[0]) 269 self.assertIn('input', c.exception.args[0]) 270 271 def test_check_output_timeout(self): 272 # check_output() function with timeout arg 273 with self.assertRaises(subprocess.TimeoutExpired) as c: 274 output = subprocess.check_output( 275 [sys.executable, "-c", 276 "import sys, time\n" 277 "sys.stdout.write('BDFL')\n" 278 "sys.stdout.flush()\n" 279 "time.sleep(3600)"], 280 # Some heavily loaded buildbots (sparc Debian 3.x) require 281 # this much time to start and print. 282 timeout=3) 283 self.fail("Expected TimeoutExpired.") 284 self.assertEqual(c.exception.output, b'BDFL') 285 286 def test_call_kwargs(self): 287 # call() function with keyword args 288 newenv = os.environ.copy() 289 newenv["FRUIT"] = "banana" 290 rc = subprocess.call([sys.executable, "-c", 291 'import sys, os;' 292 'sys.exit(os.getenv("FRUIT")=="banana")'], 293 env=newenv) 294 self.assertEqual(rc, 1) 295 296 def test_invalid_args(self): 297 # Popen() called with invalid arguments should raise TypeError 298 # but Popen.__del__ should not complain (issue #12085) 299 with support.captured_stderr() as s: 300 self.assertRaises(TypeError, subprocess.Popen, invalid_arg_name=1) 301 argcount = subprocess.Popen.__init__.__code__.co_argcount 302 too_many_args = [0] * (argcount + 1) 303 self.assertRaises(TypeError, subprocess.Popen, *too_many_args) 304 self.assertEqual(s.getvalue(), '') 305 306 def test_stdin_none(self): 307 # .stdin is None when not redirected 308 p = subprocess.Popen([sys.executable, "-c", 'print("banana")'], 309 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 310 self.addCleanup(p.stdout.close) 311 self.addCleanup(p.stderr.close) 312 p.wait() 313 self.assertEqual(p.stdin, None) 314 315 def test_stdout_none(self): 316 # .stdout is None when not redirected, and the child's stdout will 317 # be inherited from the parent. In order to test this we run a 318 # subprocess in a subprocess: 319 # this_test 320 # \-- subprocess created by this test (parent) 321 # \-- subprocess created by the parent subprocess (child) 322 # The parent doesn't specify stdout, so the child will use the 323 # parent's stdout. This test checks that the message printed by the 324 # child goes to the parent stdout. The parent also checks that the 325 # child's stdout is None. See #11963. 326 code = ('import sys; from subprocess import Popen, PIPE;' 327 'p = Popen([sys.executable, "-c", "print(\'test_stdout_none\')"],' 328 ' stdin=PIPE, stderr=PIPE);' 329 'p.wait(); assert p.stdout is None;') 330 p = subprocess.Popen([sys.executable, "-c", code], 331 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 332 self.addCleanup(p.stdout.close) 333 self.addCleanup(p.stderr.close) 334 out, err = p.communicate() 335 self.assertEqual(p.returncode, 0, err) 336 self.assertEqual(out.rstrip(), b'test_stdout_none') 337 338 def test_stderr_none(self): 339 # .stderr is None when not redirected 340 p = subprocess.Popen([sys.executable, "-c", 'print("banana")'], 341 stdin=subprocess.PIPE, stdout=subprocess.PIPE) 342 self.addCleanup(p.stdout.close) 343 self.addCleanup(p.stdin.close) 344 p.wait() 345 self.assertEqual(p.stderr, None) 346 347 def _assert_python(self, pre_args, **kwargs): 348 # We include sys.exit() to prevent the test runner from hanging 349 # whenever python is found. 350 args = pre_args + ["import sys; sys.exit(47)"] 351 p = subprocess.Popen(args, **kwargs) 352 p.wait() 353 self.assertEqual(47, p.returncode) 354 355 def test_executable(self): 356 # Check that the executable argument works. 357 # 358 # On Unix (non-Mac and non-Windows), Python looks at args[0] to 359 # determine where its standard library is, so we need the directory 360 # of args[0] to be valid for the Popen() call to Python to succeed. 361 # See also issue #16170 and issue #7774. 362 doesnotexist = os.path.join(os.path.dirname(sys.executable), 363 "doesnotexist") 364 self._assert_python([doesnotexist, "-c"], executable=sys.executable) 365 366 def test_bytes_executable(self): 367 doesnotexist = os.path.join(os.path.dirname(sys.executable), 368 "doesnotexist") 369 self._assert_python([doesnotexist, "-c"], 370 executable=os.fsencode(sys.executable)) 371 372 def test_pathlike_executable(self): 373 doesnotexist = os.path.join(os.path.dirname(sys.executable), 374 "doesnotexist") 375 self._assert_python([doesnotexist, "-c"], 376 executable=FakePath(sys.executable)) 377 378 def test_executable_takes_precedence(self): 379 # Check that the executable argument takes precedence over args[0]. 380 # 381 # Verify first that the call succeeds without the executable arg. 382 pre_args = [sys.executable, "-c"] 383 self._assert_python(pre_args) 384 self.assertRaises(NONEXISTING_ERRORS, 385 self._assert_python, pre_args, 386 executable=NONEXISTING_CMD[0]) 387 388 @unittest.skipIf(mswindows, "executable argument replaces shell") 389 def test_executable_replaces_shell(self): 390 # Check that the executable argument replaces the default shell 391 # when shell=True. 392 self._assert_python([], executable=sys.executable, shell=True) 393 394 @unittest.skipIf(mswindows, "executable argument replaces shell") 395 def test_bytes_executable_replaces_shell(self): 396 self._assert_python([], executable=os.fsencode(sys.executable), 397 shell=True) 398 399 @unittest.skipIf(mswindows, "executable argument replaces shell") 400 def test_pathlike_executable_replaces_shell(self): 401 self._assert_python([], executable=FakePath(sys.executable), 402 shell=True) 403 404 # For use in the test_cwd* tests below. 405 def _normalize_cwd(self, cwd): 406 # Normalize an expected cwd (for Tru64 support). 407 # We can't use os.path.realpath since it doesn't expand Tru64 {memb} 408 # strings. See bug #1063571. 409 with os_helper.change_cwd(cwd): 410 return os.getcwd() 411 412 # For use in the test_cwd* tests below. 413 def _split_python_path(self): 414 # Return normalized (python_dir, python_base). 415 python_path = os.path.realpath(sys.executable) 416 return os.path.split(python_path) 417 418 # For use in the test_cwd* tests below. 419 def _assert_cwd(self, expected_cwd, python_arg, **kwargs): 420 # Invoke Python via Popen, and assert that (1) the call succeeds, 421 # and that (2) the current working directory of the child process 422 # matches *expected_cwd*. 423 p = subprocess.Popen([python_arg, "-c", 424 "import os, sys; " 425 "buf = sys.stdout.buffer; " 426 "buf.write(os.getcwd().encode()); " 427 "buf.flush(); " 428 "sys.exit(47)"], 429 stdout=subprocess.PIPE, 430 **kwargs) 431 self.addCleanup(p.stdout.close) 432 p.wait() 433 self.assertEqual(47, p.returncode) 434 normcase = os.path.normcase 435 self.assertEqual(normcase(expected_cwd), 436 normcase(p.stdout.read().decode())) 437 438 def test_cwd(self): 439 # Check that cwd changes the cwd for the child process. 440 temp_dir = tempfile.gettempdir() 441 temp_dir = self._normalize_cwd(temp_dir) 442 self._assert_cwd(temp_dir, sys.executable, cwd=temp_dir) 443 444 def test_cwd_with_bytes(self): 445 temp_dir = tempfile.gettempdir() 446 temp_dir = self._normalize_cwd(temp_dir) 447 self._assert_cwd(temp_dir, sys.executable, cwd=os.fsencode(temp_dir)) 448 449 def test_cwd_with_pathlike(self): 450 temp_dir = tempfile.gettempdir() 451 temp_dir = self._normalize_cwd(temp_dir) 452 self._assert_cwd(temp_dir, sys.executable, cwd=FakePath(temp_dir)) 453 454 @unittest.skipIf(mswindows, "pending resolution of issue #15533") 455 def test_cwd_with_relative_arg(self): 456 # Check that Popen looks for args[0] relative to cwd if args[0] 457 # is relative. 458 python_dir, python_base = self._split_python_path() 459 rel_python = os.path.join(os.curdir, python_base) 460 with os_helper.temp_cwd() as wrong_dir: 461 # Before calling with the correct cwd, confirm that the call fails 462 # without cwd and with the wrong cwd. 463 self.assertRaises(FileNotFoundError, subprocess.Popen, 464 [rel_python]) 465 self.assertRaises(FileNotFoundError, subprocess.Popen, 466 [rel_python], cwd=wrong_dir) 467 python_dir = self._normalize_cwd(python_dir) 468 self._assert_cwd(python_dir, rel_python, cwd=python_dir) 469 470 @unittest.skipIf(mswindows, "pending resolution of issue #15533") 471 def test_cwd_with_relative_executable(self): 472 # Check that Popen looks for executable relative to cwd if executable 473 # is relative (and that executable takes precedence over args[0]). 474 python_dir, python_base = self._split_python_path() 475 rel_python = os.path.join(os.curdir, python_base) 476 doesntexist = "somethingyoudonthave" 477 with os_helper.temp_cwd() as wrong_dir: 478 # Before calling with the correct cwd, confirm that the call fails 479 # without cwd and with the wrong cwd. 480 self.assertRaises(FileNotFoundError, subprocess.Popen, 481 [doesntexist], executable=rel_python) 482 self.assertRaises(FileNotFoundError, subprocess.Popen, 483 [doesntexist], executable=rel_python, 484 cwd=wrong_dir) 485 python_dir = self._normalize_cwd(python_dir) 486 self._assert_cwd(python_dir, doesntexist, executable=rel_python, 487 cwd=python_dir) 488 489 def test_cwd_with_absolute_arg(self): 490 # Check that Popen can find the executable when the cwd is wrong 491 # if args[0] is an absolute path. 492 python_dir, python_base = self._split_python_path() 493 abs_python = os.path.join(python_dir, python_base) 494 rel_python = os.path.join(os.curdir, python_base) 495 with os_helper.temp_dir() as wrong_dir: 496 # Before calling with an absolute path, confirm that using a 497 # relative path fails. 498 self.assertRaises(FileNotFoundError, subprocess.Popen, 499 [rel_python], cwd=wrong_dir) 500 wrong_dir = self._normalize_cwd(wrong_dir) 501 self._assert_cwd(wrong_dir, abs_python, cwd=wrong_dir) 502 503 @unittest.skipIf(sys.base_prefix != sys.prefix, 504 'Test is not venv-compatible') 505 def test_executable_with_cwd(self): 506 python_dir, python_base = self._split_python_path() 507 python_dir = self._normalize_cwd(python_dir) 508 self._assert_cwd(python_dir, "somethingyoudonthave", 509 executable=sys.executable, cwd=python_dir) 510 511 @unittest.skipIf(sys.base_prefix != sys.prefix, 512 'Test is not venv-compatible') 513 @unittest.skipIf(sysconfig.is_python_build(), 514 "need an installed Python. See #7774") 515 def test_executable_without_cwd(self): 516 # For a normal installation, it should work without 'cwd' 517 # argument. For test runs in the build directory, see #7774. 518 self._assert_cwd(os.getcwd(), "somethingyoudonthave", 519 executable=sys.executable) 520 521 def test_stdin_pipe(self): 522 # stdin redirection 523 p = subprocess.Popen([sys.executable, "-c", 524 'import sys; sys.exit(sys.stdin.read() == "pear")'], 525 stdin=subprocess.PIPE) 526 p.stdin.write(b"pear") 527 p.stdin.close() 528 p.wait() 529 self.assertEqual(p.returncode, 1) 530 531 def test_stdin_filedes(self): 532 # stdin is set to open file descriptor 533 tf = tempfile.TemporaryFile() 534 self.addCleanup(tf.close) 535 d = tf.fileno() 536 os.write(d, b"pear") 537 os.lseek(d, 0, 0) 538 p = subprocess.Popen([sys.executable, "-c", 539 'import sys; sys.exit(sys.stdin.read() == "pear")'], 540 stdin=d) 541 p.wait() 542 self.assertEqual(p.returncode, 1) 543 544 def test_stdin_fileobj(self): 545 # stdin is set to open file object 546 tf = tempfile.TemporaryFile() 547 self.addCleanup(tf.close) 548 tf.write(b"pear") 549 tf.seek(0) 550 p = subprocess.Popen([sys.executable, "-c", 551 'import sys; sys.exit(sys.stdin.read() == "pear")'], 552 stdin=tf) 553 p.wait() 554 self.assertEqual(p.returncode, 1) 555 556 def test_stdout_pipe(self): 557 # stdout redirection 558 p = subprocess.Popen([sys.executable, "-c", 559 'import sys; sys.stdout.write("orange")'], 560 stdout=subprocess.PIPE) 561 with p: 562 self.assertEqual(p.stdout.read(), b"orange") 563 564 def test_stdout_filedes(self): 565 # stdout is set to open file descriptor 566 tf = tempfile.TemporaryFile() 567 self.addCleanup(tf.close) 568 d = tf.fileno() 569 p = subprocess.Popen([sys.executable, "-c", 570 'import sys; sys.stdout.write("orange")'], 571 stdout=d) 572 p.wait() 573 os.lseek(d, 0, 0) 574 self.assertEqual(os.read(d, 1024), b"orange") 575 576 def test_stdout_fileobj(self): 577 # stdout is set to open file object 578 tf = tempfile.TemporaryFile() 579 self.addCleanup(tf.close) 580 p = subprocess.Popen([sys.executable, "-c", 581 'import sys; sys.stdout.write("orange")'], 582 stdout=tf) 583 p.wait() 584 tf.seek(0) 585 self.assertEqual(tf.read(), b"orange") 586 587 def test_stderr_pipe(self): 588 # stderr redirection 589 p = subprocess.Popen([sys.executable, "-c", 590 'import sys; sys.stderr.write("strawberry")'], 591 stderr=subprocess.PIPE) 592 with p: 593 self.assertEqual(p.stderr.read(), b"strawberry") 594 595 def test_stderr_filedes(self): 596 # stderr is set to open file descriptor 597 tf = tempfile.TemporaryFile() 598 self.addCleanup(tf.close) 599 d = tf.fileno() 600 p = subprocess.Popen([sys.executable, "-c", 601 'import sys; sys.stderr.write("strawberry")'], 602 stderr=d) 603 p.wait() 604 os.lseek(d, 0, 0) 605 self.assertEqual(os.read(d, 1024), b"strawberry") 606 607 def test_stderr_fileobj(self): 608 # stderr is set to open file object 609 tf = tempfile.TemporaryFile() 610 self.addCleanup(tf.close) 611 p = subprocess.Popen([sys.executable, "-c", 612 'import sys; sys.stderr.write("strawberry")'], 613 stderr=tf) 614 p.wait() 615 tf.seek(0) 616 self.assertEqual(tf.read(), b"strawberry") 617 618 def test_stderr_redirect_with_no_stdout_redirect(self): 619 # test stderr=STDOUT while stdout=None (not set) 620 621 # - grandchild prints to stderr 622 # - child redirects grandchild's stderr to its stdout 623 # - the parent should get grandchild's stderr in child's stdout 624 p = subprocess.Popen([sys.executable, "-c", 625 'import sys, subprocess;' 626 'rc = subprocess.call([sys.executable, "-c",' 627 ' "import sys;"' 628 ' "sys.stderr.write(\'42\')"],' 629 ' stderr=subprocess.STDOUT);' 630 'sys.exit(rc)'], 631 stdout=subprocess.PIPE, 632 stderr=subprocess.PIPE) 633 stdout, stderr = p.communicate() 634 #NOTE: stdout should get stderr from grandchild 635 self.assertEqual(stdout, b'42') 636 self.assertEqual(stderr, b'') # should be empty 637 self.assertEqual(p.returncode, 0) 638 639 def test_stdout_stderr_pipe(self): 640 # capture stdout and stderr to the same pipe 641 p = subprocess.Popen([sys.executable, "-c", 642 'import sys;' 643 'sys.stdout.write("apple");' 644 'sys.stdout.flush();' 645 'sys.stderr.write("orange")'], 646 stdout=subprocess.PIPE, 647 stderr=subprocess.STDOUT) 648 with p: 649 self.assertEqual(p.stdout.read(), b"appleorange") 650 651 def test_stdout_stderr_file(self): 652 # capture stdout and stderr to the same open file 653 tf = tempfile.TemporaryFile() 654 self.addCleanup(tf.close) 655 p = subprocess.Popen([sys.executable, "-c", 656 'import sys;' 657 'sys.stdout.write("apple");' 658 'sys.stdout.flush();' 659 'sys.stderr.write("orange")'], 660 stdout=tf, 661 stderr=tf) 662 p.wait() 663 tf.seek(0) 664 self.assertEqual(tf.read(), b"appleorange") 665 666 def test_stdout_filedes_of_stdout(self): 667 # stdout is set to 1 (#1531862). 668 # To avoid printing the text on stdout, we do something similar to 669 # test_stdout_none (see above). The parent subprocess calls the child 670 # subprocess passing stdout=1, and this test uses stdout=PIPE in 671 # order to capture and check the output of the parent. See #11963. 672 code = ('import sys, subprocess; ' 673 'rc = subprocess.call([sys.executable, "-c", ' 674 ' "import os, sys; sys.exit(os.write(sys.stdout.fileno(), ' 675 'b\'test with stdout=1\'))"], stdout=1); ' 676 'assert rc == 18') 677 p = subprocess.Popen([sys.executable, "-c", code], 678 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 679 self.addCleanup(p.stdout.close) 680 self.addCleanup(p.stderr.close) 681 out, err = p.communicate() 682 self.assertEqual(p.returncode, 0, err) 683 self.assertEqual(out.rstrip(), b'test with stdout=1') 684 685 def test_stdout_devnull(self): 686 p = subprocess.Popen([sys.executable, "-c", 687 'for i in range(10240):' 688 'print("x" * 1024)'], 689 stdout=subprocess.DEVNULL) 690 p.wait() 691 self.assertEqual(p.stdout, None) 692 693 def test_stderr_devnull(self): 694 p = subprocess.Popen([sys.executable, "-c", 695 'import sys\n' 696 'for i in range(10240):' 697 'sys.stderr.write("x" * 1024)'], 698 stderr=subprocess.DEVNULL) 699 p.wait() 700 self.assertEqual(p.stderr, None) 701 702 def test_stdin_devnull(self): 703 p = subprocess.Popen([sys.executable, "-c", 704 'import sys;' 705 'sys.stdin.read(1)'], 706 stdin=subprocess.DEVNULL) 707 p.wait() 708 self.assertEqual(p.stdin, None) 709 710 @unittest.skipUnless(fcntl and hasattr(fcntl, 'F_GETPIPE_SZ'), 711 'fcntl.F_GETPIPE_SZ required for test.') 712 def test_pipesizes(self): 713 test_pipe_r, test_pipe_w = os.pipe() 714 try: 715 # Get the default pipesize with F_GETPIPE_SZ 716 pipesize_default = fcntl.fcntl(test_pipe_w, fcntl.F_GETPIPE_SZ) 717 finally: 718 os.close(test_pipe_r) 719 os.close(test_pipe_w) 720 pipesize = pipesize_default // 2 721 if pipesize < 512: # the POSIX minimum 722 raise unittest.SkipTest( 723 'default pipesize too small to perform test.') 724 p = subprocess.Popen( 725 [sys.executable, "-c", 726 'import sys; sys.stdin.read(); sys.stdout.write("out"); ' 727 'sys.stderr.write("error!")'], 728 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 729 stderr=subprocess.PIPE, pipesize=pipesize) 730 try: 731 for fifo in [p.stdin, p.stdout, p.stderr]: 732 self.assertEqual( 733 fcntl.fcntl(fifo.fileno(), fcntl.F_GETPIPE_SZ), 734 pipesize) 735 # Windows pipe size can be acquired via GetNamedPipeInfoFunction 736 # https://docs.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-getnamedpipeinfo 737 # However, this function is not yet in _winapi. 738 p.stdin.write(b"pear") 739 p.stdin.close() 740 p.stdout.close() 741 p.stderr.close() 742 finally: 743 p.kill() 744 p.wait() 745 746 @unittest.skipUnless(fcntl and hasattr(fcntl, 'F_GETPIPE_SZ'), 747 'fcntl.F_GETPIPE_SZ required for test.') 748 def test_pipesize_default(self): 749 p = subprocess.Popen( 750 [sys.executable, "-c", 751 'import sys; sys.stdin.read(); sys.stdout.write("out"); ' 752 'sys.stderr.write("error!")'], 753 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 754 stderr=subprocess.PIPE, pipesize=-1) 755 try: 756 fp_r, fp_w = os.pipe() 757 try: 758 default_pipesize = fcntl.fcntl(fp_w, fcntl.F_GETPIPE_SZ) 759 for fifo in [p.stdin, p.stdout, p.stderr]: 760 self.assertEqual( 761 fcntl.fcntl(fifo.fileno(), fcntl.F_GETPIPE_SZ), 762 default_pipesize) 763 finally: 764 os.close(fp_r) 765 os.close(fp_w) 766 # On other platforms we cannot test the pipe size (yet). But above 767 # code using pipesize=-1 should not crash. 768 p.stdin.close() 769 p.stdout.close() 770 p.stderr.close() 771 finally: 772 p.kill() 773 p.wait() 774 775 def test_env(self): 776 newenv = os.environ.copy() 777 newenv["FRUIT"] = "orange" 778 with subprocess.Popen([sys.executable, "-c", 779 'import sys,os;' 780 'sys.stdout.write(os.getenv("FRUIT"))'], 781 stdout=subprocess.PIPE, 782 env=newenv) as p: 783 stdout, stderr = p.communicate() 784 self.assertEqual(stdout, b"orange") 785 786 # Windows requires at least the SYSTEMROOT environment variable to start 787 # Python 788 @unittest.skipIf(sys.platform == 'win32', 789 'cannot test an empty env on Windows') 790 @unittest.skipIf(sysconfig.get_config_var('Py_ENABLE_SHARED') == 1, 791 'The Python shared library cannot be loaded ' 792 'with an empty environment.') 793 @unittest.skipIf(check_sanitizer(address=True), 794 'AddressSanitizer adds to the environment.') 795 def test_empty_env(self): 796 """Verify that env={} is as empty as possible.""" 797 798 def is_env_var_to_ignore(n): 799 """Determine if an environment variable is under our control.""" 800 # This excludes some __CF_* and VERSIONER_* keys MacOS insists 801 # on adding even when the environment in exec is empty. 802 # Gentoo sandboxes also force LD_PRELOAD and SANDBOX_* to exist. 803 return ('VERSIONER' in n or '__CF' in n or # MacOS 804 n == 'LD_PRELOAD' or n.startswith('SANDBOX') or # Gentoo 805 n == 'LC_CTYPE') # Locale coercion triggered 806 807 with subprocess.Popen([sys.executable, "-c", 808 'import os; print(list(os.environ.keys()))'], 809 stdout=subprocess.PIPE, env={}) as p: 810 stdout, stderr = p.communicate() 811 child_env_names = eval(stdout.strip()) 812 self.assertIsInstance(child_env_names, list) 813 child_env_names = [k for k in child_env_names 814 if not is_env_var_to_ignore(k)] 815 self.assertEqual(child_env_names, []) 816 817 def test_invalid_cmd(self): 818 # null character in the command name 819 cmd = sys.executable + '\0' 820 with self.assertRaises(ValueError): 821 subprocess.Popen([cmd, "-c", "pass"]) 822 823 # null character in the command argument 824 with self.assertRaises(ValueError): 825 subprocess.Popen([sys.executable, "-c", "pass#\0"]) 826 827 def test_invalid_env(self): 828 # null character in the environment variable name 829 newenv = os.environ.copy() 830 newenv["FRUIT\0VEGETABLE"] = "cabbage" 831 with self.assertRaises(ValueError): 832 subprocess.Popen(ZERO_RETURN_CMD, env=newenv) 833 834 # null character in the environment variable value 835 newenv = os.environ.copy() 836 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage" 837 with self.assertRaises(ValueError): 838 subprocess.Popen(ZERO_RETURN_CMD, env=newenv) 839 840 # equal character in the environment variable name 841 newenv = os.environ.copy() 842 newenv["FRUIT=ORANGE"] = "lemon" 843 with self.assertRaises(ValueError): 844 subprocess.Popen(ZERO_RETURN_CMD, env=newenv) 845 846 # equal character in the environment variable value 847 newenv = os.environ.copy() 848 newenv["FRUIT"] = "orange=lemon" 849 with subprocess.Popen([sys.executable, "-c", 850 'import sys, os;' 851 'sys.stdout.write(os.getenv("FRUIT"))'], 852 stdout=subprocess.PIPE, 853 env=newenv) as p: 854 stdout, stderr = p.communicate() 855 self.assertEqual(stdout, b"orange=lemon") 856 857 def test_communicate_stdin(self): 858 p = subprocess.Popen([sys.executable, "-c", 859 'import sys;' 860 'sys.exit(sys.stdin.read() == "pear")'], 861 stdin=subprocess.PIPE) 862 p.communicate(b"pear") 863 self.assertEqual(p.returncode, 1) 864 865 def test_communicate_stdout(self): 866 p = subprocess.Popen([sys.executable, "-c", 867 'import sys; sys.stdout.write("pineapple")'], 868 stdout=subprocess.PIPE) 869 (stdout, stderr) = p.communicate() 870 self.assertEqual(stdout, b"pineapple") 871 self.assertEqual(stderr, None) 872 873 def test_communicate_stderr(self): 874 p = subprocess.Popen([sys.executable, "-c", 875 'import sys; sys.stderr.write("pineapple")'], 876 stderr=subprocess.PIPE) 877 (stdout, stderr) = p.communicate() 878 self.assertEqual(stdout, None) 879 self.assertEqual(stderr, b"pineapple") 880 881 def test_communicate(self): 882 p = subprocess.Popen([sys.executable, "-c", 883 'import sys,os;' 884 'sys.stderr.write("pineapple");' 885 'sys.stdout.write(sys.stdin.read())'], 886 stdin=subprocess.PIPE, 887 stdout=subprocess.PIPE, 888 stderr=subprocess.PIPE) 889 self.addCleanup(p.stdout.close) 890 self.addCleanup(p.stderr.close) 891 self.addCleanup(p.stdin.close) 892 (stdout, stderr) = p.communicate(b"banana") 893 self.assertEqual(stdout, b"banana") 894 self.assertEqual(stderr, b"pineapple") 895 896 def test_communicate_timeout(self): 897 p = subprocess.Popen([sys.executable, "-c", 898 'import sys,os,time;' 899 'sys.stderr.write("pineapple\\n");' 900 'time.sleep(1);' 901 'sys.stderr.write("pear\\n");' 902 'sys.stdout.write(sys.stdin.read())'], 903 universal_newlines=True, 904 stdin=subprocess.PIPE, 905 stdout=subprocess.PIPE, 906 stderr=subprocess.PIPE) 907 self.assertRaises(subprocess.TimeoutExpired, p.communicate, "banana", 908 timeout=0.3) 909 # Make sure we can keep waiting for it, and that we get the whole output 910 # after it completes. 911 (stdout, stderr) = p.communicate() 912 self.assertEqual(stdout, "banana") 913 self.assertEqual(stderr.encode(), b"pineapple\npear\n") 914 915 def test_communicate_timeout_large_output(self): 916 # Test an expiring timeout while the child is outputting lots of data. 917 p = subprocess.Popen([sys.executable, "-c", 918 'import sys,os,time;' 919 'sys.stdout.write("a" * (64 * 1024));' 920 'time.sleep(0.2);' 921 'sys.stdout.write("a" * (64 * 1024));' 922 'time.sleep(0.2);' 923 'sys.stdout.write("a" * (64 * 1024));' 924 'time.sleep(0.2);' 925 'sys.stdout.write("a" * (64 * 1024));'], 926 stdout=subprocess.PIPE) 927 self.assertRaises(subprocess.TimeoutExpired, p.communicate, timeout=0.4) 928 (stdout, _) = p.communicate() 929 self.assertEqual(len(stdout), 4 * 64 * 1024) 930 931 # Test for the fd leak reported in http://bugs.python.org/issue2791. 932 def test_communicate_pipe_fd_leak(self): 933 for stdin_pipe in (False, True): 934 for stdout_pipe in (False, True): 935 for stderr_pipe in (False, True): 936 options = {} 937 if stdin_pipe: 938 options['stdin'] = subprocess.PIPE 939 if stdout_pipe: 940 options['stdout'] = subprocess.PIPE 941 if stderr_pipe: 942 options['stderr'] = subprocess.PIPE 943 if not options: 944 continue 945 p = subprocess.Popen(ZERO_RETURN_CMD, **options) 946 p.communicate() 947 if p.stdin is not None: 948 self.assertTrue(p.stdin.closed) 949 if p.stdout is not None: 950 self.assertTrue(p.stdout.closed) 951 if p.stderr is not None: 952 self.assertTrue(p.stderr.closed) 953 954 def test_communicate_returns(self): 955 # communicate() should return None if no redirection is active 956 p = subprocess.Popen([sys.executable, "-c", 957 "import sys; sys.exit(47)"]) 958 (stdout, stderr) = p.communicate() 959 self.assertEqual(stdout, None) 960 self.assertEqual(stderr, None) 961 962 def test_communicate_pipe_buf(self): 963 # communicate() with writes larger than pipe_buf 964 # This test will probably deadlock rather than fail, if 965 # communicate() does not work properly. 966 x, y = os.pipe() 967 os.close(x) 968 os.close(y) 969 p = subprocess.Popen([sys.executable, "-c", 970 'import sys,os;' 971 'sys.stdout.write(sys.stdin.read(47));' 972 'sys.stderr.write("x" * %d);' 973 'sys.stdout.write(sys.stdin.read())' % 974 support.PIPE_MAX_SIZE], 975 stdin=subprocess.PIPE, 976 stdout=subprocess.PIPE, 977 stderr=subprocess.PIPE) 978 self.addCleanup(p.stdout.close) 979 self.addCleanup(p.stderr.close) 980 self.addCleanup(p.stdin.close) 981 string_to_write = b"a" * support.PIPE_MAX_SIZE 982 (stdout, stderr) = p.communicate(string_to_write) 983 self.assertEqual(stdout, string_to_write) 984 985 def test_writes_before_communicate(self): 986 # stdin.write before communicate() 987 p = subprocess.Popen([sys.executable, "-c", 988 'import sys,os;' 989 'sys.stdout.write(sys.stdin.read())'], 990 stdin=subprocess.PIPE, 991 stdout=subprocess.PIPE, 992 stderr=subprocess.PIPE) 993 self.addCleanup(p.stdout.close) 994 self.addCleanup(p.stderr.close) 995 self.addCleanup(p.stdin.close) 996 p.stdin.write(b"banana") 997 (stdout, stderr) = p.communicate(b"split") 998 self.assertEqual(stdout, b"bananasplit") 999 self.assertEqual(stderr, b"") 1000 1001 def test_universal_newlines_and_text(self): 1002 args = [ 1003 sys.executable, "-c", 1004 'import sys,os;' + SETBINARY + 1005 'buf = sys.stdout.buffer;' 1006 'buf.write(sys.stdin.readline().encode());' 1007 'buf.flush();' 1008 'buf.write(b"line2\\n");' 1009 'buf.flush();' 1010 'buf.write(sys.stdin.read().encode());' 1011 'buf.flush();' 1012 'buf.write(b"line4\\n");' 1013 'buf.flush();' 1014 'buf.write(b"line5\\r\\n");' 1015 'buf.flush();' 1016 'buf.write(b"line6\\r");' 1017 'buf.flush();' 1018 'buf.write(b"\\nline7");' 1019 'buf.flush();' 1020 'buf.write(b"\\nline8");'] 1021 1022 for extra_kwarg in ('universal_newlines', 'text'): 1023 p = subprocess.Popen(args, **{'stdin': subprocess.PIPE, 1024 'stdout': subprocess.PIPE, 1025 extra_kwarg: True}) 1026 with p: 1027 p.stdin.write("line1\n") 1028 p.stdin.flush() 1029 self.assertEqual(p.stdout.readline(), "line1\n") 1030 p.stdin.write("line3\n") 1031 p.stdin.close() 1032 self.addCleanup(p.stdout.close) 1033 self.assertEqual(p.stdout.readline(), 1034 "line2\n") 1035 self.assertEqual(p.stdout.read(6), 1036 "line3\n") 1037 self.assertEqual(p.stdout.read(), 1038 "line4\nline5\nline6\nline7\nline8") 1039 1040 def test_universal_newlines_communicate(self): 1041 # universal newlines through communicate() 1042 p = subprocess.Popen([sys.executable, "-c", 1043 'import sys,os;' + SETBINARY + 1044 'buf = sys.stdout.buffer;' 1045 'buf.write(b"line2\\n");' 1046 'buf.flush();' 1047 'buf.write(b"line4\\n");' 1048 'buf.flush();' 1049 'buf.write(b"line5\\r\\n");' 1050 'buf.flush();' 1051 'buf.write(b"line6\\r");' 1052 'buf.flush();' 1053 'buf.write(b"\\nline7");' 1054 'buf.flush();' 1055 'buf.write(b"\\nline8");'], 1056 stderr=subprocess.PIPE, 1057 stdout=subprocess.PIPE, 1058 universal_newlines=1) 1059 self.addCleanup(p.stdout.close) 1060 self.addCleanup(p.stderr.close) 1061 (stdout, stderr) = p.communicate() 1062 self.assertEqual(stdout, 1063 "line2\nline4\nline5\nline6\nline7\nline8") 1064 1065 def test_universal_newlines_communicate_stdin(self): 1066 # universal newlines through communicate(), with only stdin 1067 p = subprocess.Popen([sys.executable, "-c", 1068 'import sys,os;' + SETBINARY + textwrap.dedent(''' 1069 s = sys.stdin.readline() 1070 assert s == "line1\\n", repr(s) 1071 s = sys.stdin.read() 1072 assert s == "line3\\n", repr(s) 1073 ''')], 1074 stdin=subprocess.PIPE, 1075 universal_newlines=1) 1076 (stdout, stderr) = p.communicate("line1\nline3\n") 1077 self.assertEqual(p.returncode, 0) 1078 1079 def test_universal_newlines_communicate_input_none(self): 1080 # Test communicate(input=None) with universal newlines. 1081 # 1082 # We set stdout to PIPE because, as of this writing, a different 1083 # code path is tested when the number of pipes is zero or one. 1084 p = subprocess.Popen(ZERO_RETURN_CMD, 1085 stdin=subprocess.PIPE, 1086 stdout=subprocess.PIPE, 1087 universal_newlines=True) 1088 p.communicate() 1089 self.assertEqual(p.returncode, 0) 1090 1091 def test_universal_newlines_communicate_stdin_stdout_stderr(self): 1092 # universal newlines through communicate(), with stdin, stdout, stderr 1093 p = subprocess.Popen([sys.executable, "-c", 1094 'import sys,os;' + SETBINARY + textwrap.dedent(''' 1095 s = sys.stdin.buffer.readline() 1096 sys.stdout.buffer.write(s) 1097 sys.stdout.buffer.write(b"line2\\r") 1098 sys.stderr.buffer.write(b"eline2\\n") 1099 s = sys.stdin.buffer.read() 1100 sys.stdout.buffer.write(s) 1101 sys.stdout.buffer.write(b"line4\\n") 1102 sys.stdout.buffer.write(b"line5\\r\\n") 1103 sys.stderr.buffer.write(b"eline6\\r") 1104 sys.stderr.buffer.write(b"eline7\\r\\nz") 1105 ''')], 1106 stdin=subprocess.PIPE, 1107 stderr=subprocess.PIPE, 1108 stdout=subprocess.PIPE, 1109 universal_newlines=True) 1110 self.addCleanup(p.stdout.close) 1111 self.addCleanup(p.stderr.close) 1112 (stdout, stderr) = p.communicate("line1\nline3\n") 1113 self.assertEqual(p.returncode, 0) 1114 self.assertEqual("line1\nline2\nline3\nline4\nline5\n", stdout) 1115 # Python debug build push something like "[42442 refs]\n" 1116 # to stderr at exit of subprocess. 1117 self.assertTrue(stderr.startswith("eline2\neline6\neline7\n")) 1118 1119 def test_universal_newlines_communicate_encodings(self): 1120 # Check that universal newlines mode works for various encodings, 1121 # in particular for encodings in the UTF-16 and UTF-32 families. 1122 # See issue #15595. 1123 # 1124 # UTF-16 and UTF-32-BE are sufficient to check both with BOM and 1125 # without, and UTF-16 and UTF-32. 1126 for encoding in ['utf-16', 'utf-32-be']: 1127 code = ("import sys; " 1128 r"sys.stdout.buffer.write('1\r\n2\r3\n4'.encode('%s'))" % 1129 encoding) 1130 args = [sys.executable, '-c', code] 1131 # We set stdin to be non-None because, as of this writing, 1132 # a different code path is used when the number of pipes is 1133 # zero or one. 1134 popen = subprocess.Popen(args, 1135 stdin=subprocess.PIPE, 1136 stdout=subprocess.PIPE, 1137 encoding=encoding) 1138 stdout, stderr = popen.communicate(input='') 1139 self.assertEqual(stdout, '1\n2\n3\n4') 1140 1141 def test_communicate_errors(self): 1142 for errors, expected in [ 1143 ('ignore', ''), 1144 ('replace', '\ufffd\ufffd'), 1145 ('surrogateescape', '\udc80\udc80'), 1146 ('backslashreplace', '\\x80\\x80'), 1147 ]: 1148 code = ("import sys; " 1149 r"sys.stdout.buffer.write(b'[\x80\x80]')") 1150 args = [sys.executable, '-c', code] 1151 # We set stdin to be non-None because, as of this writing, 1152 # a different code path is used when the number of pipes is 1153 # zero or one. 1154 popen = subprocess.Popen(args, 1155 stdin=subprocess.PIPE, 1156 stdout=subprocess.PIPE, 1157 encoding='utf-8', 1158 errors=errors) 1159 stdout, stderr = popen.communicate(input='') 1160 self.assertEqual(stdout, '[{}]'.format(expected)) 1161 1162 def test_no_leaking(self): 1163 # Make sure we leak no resources 1164 if not mswindows: 1165 max_handles = 1026 # too much for most UNIX systems 1166 else: 1167 max_handles = 2050 # too much for (at least some) Windows setups 1168 handles = [] 1169 tmpdir = tempfile.mkdtemp() 1170 try: 1171 for i in range(max_handles): 1172 try: 1173 tmpfile = os.path.join(tmpdir, os_helper.TESTFN) 1174 handles.append(os.open(tmpfile, os.O_WRONLY|os.O_CREAT)) 1175 except OSError as e: 1176 if e.errno != errno.EMFILE: 1177 raise 1178 break 1179 else: 1180 self.skipTest("failed to reach the file descriptor limit " 1181 "(tried %d)" % max_handles) 1182 # Close a couple of them (should be enough for a subprocess) 1183 for i in range(10): 1184 os.close(handles.pop()) 1185 # Loop creating some subprocesses. If one of them leaks some fds, 1186 # the next loop iteration will fail by reaching the max fd limit. 1187 for i in range(15): 1188 p = subprocess.Popen([sys.executable, "-c", 1189 "import sys;" 1190 "sys.stdout.write(sys.stdin.read())"], 1191 stdin=subprocess.PIPE, 1192 stdout=subprocess.PIPE, 1193 stderr=subprocess.PIPE) 1194 data = p.communicate(b"lime")[0] 1195 self.assertEqual(data, b"lime") 1196 finally: 1197 for h in handles: 1198 os.close(h) 1199 shutil.rmtree(tmpdir) 1200 1201 def test_list2cmdline(self): 1202 self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']), 1203 '"a b c" d e') 1204 self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']), 1205 'ab\\"c \\ d') 1206 self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']), 1207 'ab\\"c " \\\\" d') 1208 self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']), 1209 'a\\\\\\b "de fg" h') 1210 self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']), 1211 'a\\\\\\"b c d') 1212 self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']), 1213 '"a\\\\b c" d e') 1214 self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']), 1215 '"a\\\\b\\ c" d e') 1216 self.assertEqual(subprocess.list2cmdline(['ab', '']), 1217 'ab ""') 1218 1219 def test_poll(self): 1220 p = subprocess.Popen([sys.executable, "-c", 1221 "import os; os.read(0, 1)"], 1222 stdin=subprocess.PIPE) 1223 self.addCleanup(p.stdin.close) 1224 self.assertIsNone(p.poll()) 1225 os.write(p.stdin.fileno(), b'A') 1226 p.wait() 1227 # Subsequent invocations should just return the returncode 1228 self.assertEqual(p.poll(), 0) 1229 1230 def test_wait(self): 1231 p = subprocess.Popen(ZERO_RETURN_CMD) 1232 self.assertEqual(p.wait(), 0) 1233 # Subsequent invocations should just return the returncode 1234 self.assertEqual(p.wait(), 0) 1235 1236 def test_wait_timeout(self): 1237 p = subprocess.Popen([sys.executable, 1238 "-c", "import time; time.sleep(0.3)"]) 1239 with self.assertRaises(subprocess.TimeoutExpired) as c: 1240 p.wait(timeout=0.0001) 1241 self.assertIn("0.0001", str(c.exception)) # For coverage of __str__. 1242 self.assertEqual(p.wait(timeout=support.SHORT_TIMEOUT), 0) 1243 1244 def test_invalid_bufsize(self): 1245 # an invalid type of the bufsize argument should raise 1246 # TypeError. 1247 with self.assertRaises(TypeError): 1248 subprocess.Popen(ZERO_RETURN_CMD, "orange") 1249 1250 def test_bufsize_is_none(self): 1251 # bufsize=None should be the same as bufsize=0. 1252 p = subprocess.Popen(ZERO_RETURN_CMD, None) 1253 self.assertEqual(p.wait(), 0) 1254 # Again with keyword arg 1255 p = subprocess.Popen(ZERO_RETURN_CMD, bufsize=None) 1256 self.assertEqual(p.wait(), 0) 1257 1258 def _test_bufsize_equal_one(self, line, expected, universal_newlines): 1259 # subprocess may deadlock with bufsize=1, see issue #21332 1260 with subprocess.Popen([sys.executable, "-c", "import sys;" 1261 "sys.stdout.write(sys.stdin.readline());" 1262 "sys.stdout.flush()"], 1263 stdin=subprocess.PIPE, 1264 stdout=subprocess.PIPE, 1265 stderr=subprocess.DEVNULL, 1266 bufsize=1, 1267 universal_newlines=universal_newlines) as p: 1268 p.stdin.write(line) # expect that it flushes the line in text mode 1269 os.close(p.stdin.fileno()) # close it without flushing the buffer 1270 read_line = p.stdout.readline() 1271 with support.SuppressCrashReport(): 1272 try: 1273 p.stdin.close() 1274 except OSError: 1275 pass 1276 p.stdin = None 1277 self.assertEqual(p.returncode, 0) 1278 self.assertEqual(read_line, expected) 1279 1280 def test_bufsize_equal_one_text_mode(self): 1281 # line is flushed in text mode with bufsize=1. 1282 # we should get the full line in return 1283 line = "line\n" 1284 self._test_bufsize_equal_one(line, line, universal_newlines=True) 1285 1286 def test_bufsize_equal_one_binary_mode(self): 1287 # line is not flushed in binary mode with bufsize=1. 1288 # we should get empty response 1289 line = b'line' + os.linesep.encode() # assume ascii-based locale 1290 with self.assertWarnsRegex(RuntimeWarning, 'line buffering'): 1291 self._test_bufsize_equal_one(line, b'', universal_newlines=False) 1292 1293 def test_leaking_fds_on_error(self): 1294 # see bug #5179: Popen leaks file descriptors to PIPEs if 1295 # the child fails to execute; this will eventually exhaust 1296 # the maximum number of open fds. 1024 seems a very common 1297 # value for that limit, but Windows has 2048, so we loop 1298 # 1024 times (each call leaked two fds). 1299 for i in range(1024): 1300 with self.assertRaises(NONEXISTING_ERRORS): 1301 subprocess.Popen(NONEXISTING_CMD, 1302 stdout=subprocess.PIPE, 1303 stderr=subprocess.PIPE) 1304 1305 def test_nonexisting_with_pipes(self): 1306 # bpo-30121: Popen with pipes must close properly pipes on error. 1307 # Previously, os.close() was called with a Windows handle which is not 1308 # a valid file descriptor. 1309 # 1310 # Run the test in a subprocess to control how the CRT reports errors 1311 # and to get stderr content. 1312 try: 1313 import msvcrt 1314 msvcrt.CrtSetReportMode 1315 except (AttributeError, ImportError): 1316 self.skipTest("need msvcrt.CrtSetReportMode") 1317 1318 code = textwrap.dedent(f""" 1319 import msvcrt 1320 import subprocess 1321 1322 cmd = {NONEXISTING_CMD!r} 1323 1324 for report_type in [msvcrt.CRT_WARN, 1325 msvcrt.CRT_ERROR, 1326 msvcrt.CRT_ASSERT]: 1327 msvcrt.CrtSetReportMode(report_type, msvcrt.CRTDBG_MODE_FILE) 1328 msvcrt.CrtSetReportFile(report_type, msvcrt.CRTDBG_FILE_STDERR) 1329 1330 try: 1331 subprocess.Popen(cmd, 1332 stdout=subprocess.PIPE, 1333 stderr=subprocess.PIPE) 1334 except OSError: 1335 pass 1336 """) 1337 cmd = [sys.executable, "-c", code] 1338 proc = subprocess.Popen(cmd, 1339 stderr=subprocess.PIPE, 1340 universal_newlines=True) 1341 with proc: 1342 stderr = proc.communicate()[1] 1343 self.assertEqual(stderr, "") 1344 self.assertEqual(proc.returncode, 0) 1345 1346 def test_double_close_on_error(self): 1347 # Issue #18851 1348 fds = [] 1349 def open_fds(): 1350 for i in range(20): 1351 fds.extend(os.pipe()) 1352 time.sleep(0.001) 1353 t = threading.Thread(target=open_fds) 1354 t.start() 1355 try: 1356 with self.assertRaises(EnvironmentError): 1357 subprocess.Popen(NONEXISTING_CMD, 1358 stdin=subprocess.PIPE, 1359 stdout=subprocess.PIPE, 1360 stderr=subprocess.PIPE) 1361 finally: 1362 t.join() 1363 exc = None 1364 for fd in fds: 1365 # If a double close occurred, some of those fds will 1366 # already have been closed by mistake, and os.close() 1367 # here will raise. 1368 try: 1369 os.close(fd) 1370 except OSError as e: 1371 exc = e 1372 if exc is not None: 1373 raise exc 1374 1375 def test_threadsafe_wait(self): 1376 """Issue21291: Popen.wait() needs to be threadsafe for returncode.""" 1377 proc = subprocess.Popen([sys.executable, '-c', 1378 'import time; time.sleep(12)']) 1379 self.assertEqual(proc.returncode, None) 1380 results = [] 1381 1382 def kill_proc_timer_thread(): 1383 results.append(('thread-start-poll-result', proc.poll())) 1384 # terminate it from the thread and wait for the result. 1385 proc.kill() 1386 proc.wait() 1387 results.append(('thread-after-kill-and-wait', proc.returncode)) 1388 # this wait should be a no-op given the above. 1389 proc.wait() 1390 results.append(('thread-after-second-wait', proc.returncode)) 1391 1392 # This is a timing sensitive test, the failure mode is 1393 # triggered when both the main thread and this thread are in 1394 # the wait() call at once. The delay here is to allow the 1395 # main thread to most likely be blocked in its wait() call. 1396 t = threading.Timer(0.2, kill_proc_timer_thread) 1397 t.start() 1398 1399 if mswindows: 1400 expected_errorcode = 1 1401 else: 1402 # Should be -9 because of the proc.kill() from the thread. 1403 expected_errorcode = -9 1404 1405 # Wait for the process to finish; the thread should kill it 1406 # long before it finishes on its own. Supplying a timeout 1407 # triggers a different code path for better coverage. 1408 proc.wait(timeout=support.SHORT_TIMEOUT) 1409 self.assertEqual(proc.returncode, expected_errorcode, 1410 msg="unexpected result in wait from main thread") 1411 1412 # This should be a no-op with no change in returncode. 1413 proc.wait() 1414 self.assertEqual(proc.returncode, expected_errorcode, 1415 msg="unexpected result in second main wait.") 1416 1417 t.join() 1418 # Ensure that all of the thread results are as expected. 1419 # When a race condition occurs in wait(), the returncode could 1420 # be set by the wrong thread that doesn't actually have it 1421 # leading to an incorrect value. 1422 self.assertEqual([('thread-start-poll-result', None), 1423 ('thread-after-kill-and-wait', expected_errorcode), 1424 ('thread-after-second-wait', expected_errorcode)], 1425 results) 1426 1427 def test_issue8780(self): 1428 # Ensure that stdout is inherited from the parent 1429 # if stdout=PIPE is not used 1430 code = ';'.join(( 1431 'import subprocess, sys', 1432 'retcode = subprocess.call(' 1433 "[sys.executable, '-c', 'print(\"Hello World!\")'])", 1434 'assert retcode == 0')) 1435 output = subprocess.check_output([sys.executable, '-c', code]) 1436 self.assertTrue(output.startswith(b'Hello World!'), ascii(output)) 1437 1438 def test_handles_closed_on_exception(self): 1439 # If CreateProcess exits with an error, ensure the 1440 # duplicate output handles are released 1441 ifhandle, ifname = tempfile.mkstemp() 1442 ofhandle, ofname = tempfile.mkstemp() 1443 efhandle, efname = tempfile.mkstemp() 1444 try: 1445 subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle, 1446 stderr=efhandle) 1447 except OSError: 1448 os.close(ifhandle) 1449 os.remove(ifname) 1450 os.close(ofhandle) 1451 os.remove(ofname) 1452 os.close(efhandle) 1453 os.remove(efname) 1454 self.assertFalse(os.path.exists(ifname)) 1455 self.assertFalse(os.path.exists(ofname)) 1456 self.assertFalse(os.path.exists(efname)) 1457 1458 def test_communicate_epipe(self): 1459 # Issue 10963: communicate() should hide EPIPE 1460 p = subprocess.Popen(ZERO_RETURN_CMD, 1461 stdin=subprocess.PIPE, 1462 stdout=subprocess.PIPE, 1463 stderr=subprocess.PIPE) 1464 self.addCleanup(p.stdout.close) 1465 self.addCleanup(p.stderr.close) 1466 self.addCleanup(p.stdin.close) 1467 p.communicate(b"x" * 2**20) 1468 1469 def test_repr(self): 1470 path_cmd = pathlib.Path("my-tool.py") 1471 pathlib_cls = path_cmd.__class__.__name__ 1472 1473 cases = [ 1474 ("ls", True, 123, "<Popen: returncode: 123 args: 'ls'>"), 1475 ('a' * 100, True, 0, 1476 "<Popen: returncode: 0 args: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa...>"), 1477 (["ls"], False, None, "<Popen: returncode: None args: ['ls']>"), 1478 (["ls", '--my-opts', 'a' * 100], False, None, 1479 "<Popen: returncode: None args: ['ls', '--my-opts', 'aaaaaaaaaaaaaaaaaaaaaaaa...>"), 1480 (path_cmd, False, 7, f"<Popen: returncode: 7 args: {pathlib_cls}('my-tool.py')>") 1481 ] 1482 with unittest.mock.patch.object(subprocess.Popen, '_execute_child'): 1483 for cmd, shell, code, sx in cases: 1484 p = subprocess.Popen(cmd, shell=shell) 1485 p.returncode = code 1486 self.assertEqual(repr(p), sx) 1487 1488 def test_communicate_epipe_only_stdin(self): 1489 # Issue 10963: communicate() should hide EPIPE 1490 p = subprocess.Popen(ZERO_RETURN_CMD, 1491 stdin=subprocess.PIPE) 1492 self.addCleanup(p.stdin.close) 1493 p.wait() 1494 p.communicate(b"x" * 2**20) 1495 1496 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 1497 "Requires signal.SIGUSR1") 1498 @unittest.skipUnless(hasattr(os, 'kill'), 1499 "Requires os.kill") 1500 @unittest.skipUnless(hasattr(os, 'getppid'), 1501 "Requires os.getppid") 1502 def test_communicate_eintr(self): 1503 # Issue #12493: communicate() should handle EINTR 1504 def handler(signum, frame): 1505 pass 1506 old_handler = signal.signal(signal.SIGUSR1, handler) 1507 self.addCleanup(signal.signal, signal.SIGUSR1, old_handler) 1508 1509 args = [sys.executable, "-c", 1510 'import os, signal;' 1511 'os.kill(os.getppid(), signal.SIGUSR1)'] 1512 for stream in ('stdout', 'stderr'): 1513 kw = {stream: subprocess.PIPE} 1514 with subprocess.Popen(args, **kw) as process: 1515 # communicate() will be interrupted by SIGUSR1 1516 process.communicate() 1517 1518 1519 # This test is Linux-ish specific for simplicity to at least have 1520 # some coverage. It is not a platform specific bug. 1521 @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()), 1522 "Linux specific") 1523 def test_failed_child_execute_fd_leak(self): 1524 """Test for the fork() failure fd leak reported in issue16327.""" 1525 fd_directory = '/proc/%d/fd' % os.getpid() 1526 fds_before_popen = os.listdir(fd_directory) 1527 with self.assertRaises(PopenTestException): 1528 PopenExecuteChildRaises( 1529 ZERO_RETURN_CMD, stdin=subprocess.PIPE, 1530 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 1531 1532 # NOTE: This test doesn't verify that the real _execute_child 1533 # does not close the file descriptors itself on the way out 1534 # during an exception. Code inspection has confirmed that. 1535 1536 fds_after_exception = os.listdir(fd_directory) 1537 self.assertEqual(fds_before_popen, fds_after_exception) 1538 1539 @unittest.skipIf(mswindows, "behavior currently not supported on Windows") 1540 def test_file_not_found_includes_filename(self): 1541 with self.assertRaises(FileNotFoundError) as c: 1542 subprocess.call(['/opt/nonexistent_binary', 'with', 'some', 'args']) 1543 self.assertEqual(c.exception.filename, '/opt/nonexistent_binary') 1544 1545 @unittest.skipIf(mswindows, "behavior currently not supported on Windows") 1546 def test_file_not_found_with_bad_cwd(self): 1547 with self.assertRaises(FileNotFoundError) as c: 1548 subprocess.Popen(['exit', '0'], cwd='/some/nonexistent/directory') 1549 self.assertEqual(c.exception.filename, '/some/nonexistent/directory') 1550 1551 def test_class_getitems(self): 1552 self.assertIsInstance(subprocess.Popen[bytes], types.GenericAlias) 1553 self.assertIsInstance(subprocess.CompletedProcess[str], types.GenericAlias) 1554 1555 @unittest.skipIf(not sysconfig.get_config_var("HAVE_VFORK"), 1556 "vfork() not enabled by configure.") 1557 @mock.patch("subprocess._fork_exec") 1558 def test__use_vfork(self, mock_fork_exec): 1559 self.assertTrue(subprocess._USE_VFORK) # The default value regardless. 1560 mock_fork_exec.side_effect = RuntimeError("just testing args") 1561 with self.assertRaises(RuntimeError): 1562 subprocess.run([sys.executable, "-c", "pass"]) 1563 mock_fork_exec.assert_called_once() 1564 self.assertTrue(mock_fork_exec.call_args.args[-1]) 1565 with mock.patch.object(subprocess, '_USE_VFORK', False): 1566 with self.assertRaises(RuntimeError): 1567 subprocess.run([sys.executable, "-c", "pass"]) 1568 self.assertFalse(mock_fork_exec.call_args_list[-1].args[-1]) 1569 1570 1571class RunFuncTestCase(BaseTestCase): 1572 def run_python(self, code, **kwargs): 1573 """Run Python code in a subprocess using subprocess.run""" 1574 argv = [sys.executable, "-c", code] 1575 return subprocess.run(argv, **kwargs) 1576 1577 def test_returncode(self): 1578 # call() function with sequence argument 1579 cp = self.run_python("import sys; sys.exit(47)") 1580 self.assertEqual(cp.returncode, 47) 1581 with self.assertRaises(subprocess.CalledProcessError): 1582 cp.check_returncode() 1583 1584 def test_check(self): 1585 with self.assertRaises(subprocess.CalledProcessError) as c: 1586 self.run_python("import sys; sys.exit(47)", check=True) 1587 self.assertEqual(c.exception.returncode, 47) 1588 1589 def test_check_zero(self): 1590 # check_returncode shouldn't raise when returncode is zero 1591 cp = subprocess.run(ZERO_RETURN_CMD, check=True) 1592 self.assertEqual(cp.returncode, 0) 1593 1594 def test_timeout(self): 1595 # run() function with timeout argument; we want to test that the child 1596 # process gets killed when the timeout expires. If the child isn't 1597 # killed, this call will deadlock since subprocess.run waits for the 1598 # child. 1599 with self.assertRaises(subprocess.TimeoutExpired): 1600 self.run_python("while True: pass", timeout=0.0001) 1601 1602 def test_capture_stdout(self): 1603 # capture stdout with zero return code 1604 cp = self.run_python("print('BDFL')", stdout=subprocess.PIPE) 1605 self.assertIn(b'BDFL', cp.stdout) 1606 1607 def test_capture_stderr(self): 1608 cp = self.run_python("import sys; sys.stderr.write('BDFL')", 1609 stderr=subprocess.PIPE) 1610 self.assertIn(b'BDFL', cp.stderr) 1611 1612 def test_check_output_stdin_arg(self): 1613 # run() can be called with stdin set to a file 1614 tf = tempfile.TemporaryFile() 1615 self.addCleanup(tf.close) 1616 tf.write(b'pear') 1617 tf.seek(0) 1618 cp = self.run_python( 1619 "import sys; sys.stdout.write(sys.stdin.read().upper())", 1620 stdin=tf, stdout=subprocess.PIPE) 1621 self.assertIn(b'PEAR', cp.stdout) 1622 1623 def test_check_output_input_arg(self): 1624 # check_output() can be called with input set to a string 1625 cp = self.run_python( 1626 "import sys; sys.stdout.write(sys.stdin.read().upper())", 1627 input=b'pear', stdout=subprocess.PIPE) 1628 self.assertIn(b'PEAR', cp.stdout) 1629 1630 def test_check_output_stdin_with_input_arg(self): 1631 # run() refuses to accept 'stdin' with 'input' 1632 tf = tempfile.TemporaryFile() 1633 self.addCleanup(tf.close) 1634 tf.write(b'pear') 1635 tf.seek(0) 1636 with self.assertRaises(ValueError, 1637 msg="Expected ValueError when stdin and input args supplied.") as c: 1638 output = self.run_python("print('will not be run')", 1639 stdin=tf, input=b'hare') 1640 self.assertIn('stdin', c.exception.args[0]) 1641 self.assertIn('input', c.exception.args[0]) 1642 1643 def test_check_output_timeout(self): 1644 with self.assertRaises(subprocess.TimeoutExpired) as c: 1645 cp = self.run_python(( 1646 "import sys, time\n" 1647 "sys.stdout.write('BDFL')\n" 1648 "sys.stdout.flush()\n" 1649 "time.sleep(3600)"), 1650 # Some heavily loaded buildbots (sparc Debian 3.x) require 1651 # this much time to start and print. 1652 timeout=3, stdout=subprocess.PIPE) 1653 self.assertEqual(c.exception.output, b'BDFL') 1654 # output is aliased to stdout 1655 self.assertEqual(c.exception.stdout, b'BDFL') 1656 1657 def test_run_kwargs(self): 1658 newenv = os.environ.copy() 1659 newenv["FRUIT"] = "banana" 1660 cp = self.run_python(('import sys, os;' 1661 'sys.exit(33 if os.getenv("FRUIT")=="banana" else 31)'), 1662 env=newenv) 1663 self.assertEqual(cp.returncode, 33) 1664 1665 def test_run_with_pathlike_path(self): 1666 # bpo-31961: test run(pathlike_object) 1667 # the name of a command that can be run without 1668 # any arguments that exit fast 1669 prog = 'tree.com' if mswindows else 'ls' 1670 path = shutil.which(prog) 1671 if path is None: 1672 self.skipTest(f'{prog} required for this test') 1673 path = FakePath(path) 1674 res = subprocess.run(path, stdout=subprocess.DEVNULL) 1675 self.assertEqual(res.returncode, 0) 1676 with self.assertRaises(TypeError): 1677 subprocess.run(path, stdout=subprocess.DEVNULL, shell=True) 1678 1679 def test_run_with_bytes_path_and_arguments(self): 1680 # bpo-31961: test run([bytes_object, b'additional arguments']) 1681 path = os.fsencode(sys.executable) 1682 args = [path, '-c', b'import sys; sys.exit(57)'] 1683 res = subprocess.run(args) 1684 self.assertEqual(res.returncode, 57) 1685 1686 def test_run_with_pathlike_path_and_arguments(self): 1687 # bpo-31961: test run([pathlike_object, 'additional arguments']) 1688 path = FakePath(sys.executable) 1689 args = [path, '-c', 'import sys; sys.exit(57)'] 1690 res = subprocess.run(args) 1691 self.assertEqual(res.returncode, 57) 1692 1693 def test_capture_output(self): 1694 cp = self.run_python(("import sys;" 1695 "sys.stdout.write('BDFL'); " 1696 "sys.stderr.write('FLUFL')"), 1697 capture_output=True) 1698 self.assertIn(b'BDFL', cp.stdout) 1699 self.assertIn(b'FLUFL', cp.stderr) 1700 1701 def test_stdout_with_capture_output_arg(self): 1702 # run() refuses to accept 'stdout' with 'capture_output' 1703 tf = tempfile.TemporaryFile() 1704 self.addCleanup(tf.close) 1705 with self.assertRaises(ValueError, 1706 msg=("Expected ValueError when stdout and capture_output " 1707 "args supplied.")) as c: 1708 output = self.run_python("print('will not be run')", 1709 capture_output=True, stdout=tf) 1710 self.assertIn('stdout', c.exception.args[0]) 1711 self.assertIn('capture_output', c.exception.args[0]) 1712 1713 def test_stderr_with_capture_output_arg(self): 1714 # run() refuses to accept 'stderr' with 'capture_output' 1715 tf = tempfile.TemporaryFile() 1716 self.addCleanup(tf.close) 1717 with self.assertRaises(ValueError, 1718 msg=("Expected ValueError when stderr and capture_output " 1719 "args supplied.")) as c: 1720 output = self.run_python("print('will not be run')", 1721 capture_output=True, stderr=tf) 1722 self.assertIn('stderr', c.exception.args[0]) 1723 self.assertIn('capture_output', c.exception.args[0]) 1724 1725 # This test _might_ wind up a bit fragile on loaded build+test machines 1726 # as it depends on the timing with wide enough margins for normal situations 1727 # but does assert that it happened "soon enough" to believe the right thing 1728 # happened. 1729 @unittest.skipIf(mswindows, "requires posix like 'sleep' shell command") 1730 def test_run_with_shell_timeout_and_capture_output(self): 1731 """Output capturing after a timeout mustn't hang forever on open filehandles.""" 1732 before_secs = time.monotonic() 1733 try: 1734 subprocess.run('sleep 3', shell=True, timeout=0.1, 1735 capture_output=True) # New session unspecified. 1736 except subprocess.TimeoutExpired as exc: 1737 after_secs = time.monotonic() 1738 stacks = traceback.format_exc() # assertRaises doesn't give this. 1739 else: 1740 self.fail("TimeoutExpired not raised.") 1741 self.assertLess(after_secs - before_secs, 1.5, 1742 msg="TimeoutExpired was delayed! Bad traceback:\n```\n" 1743 f"{stacks}```") 1744 1745 def test_encoding_warning(self): 1746 code = textwrap.dedent("""\ 1747 from subprocess import * 1748 run("echo hello", shell=True, text=True) 1749 check_output("echo hello", shell=True, text=True) 1750 """) 1751 cp = subprocess.run([sys.executable, "-Xwarn_default_encoding", "-c", code], 1752 capture_output=True) 1753 lines = cp.stderr.splitlines() 1754 self.assertEqual(len(lines), 2, lines) 1755 self.assertTrue(lines[0].startswith(b"<string>:2: EncodingWarning: ")) 1756 self.assertTrue(lines[1].startswith(b"<string>:3: EncodingWarning: ")) 1757 1758 1759def _get_test_grp_name(): 1760 for name_group in ('staff', 'nogroup', 'grp', 'nobody', 'nfsnobody'): 1761 if grp: 1762 try: 1763 grp.getgrnam(name_group) 1764 except KeyError: 1765 continue 1766 return name_group 1767 else: 1768 raise unittest.SkipTest('No identified group name to use for this test on this platform.') 1769 1770 1771@unittest.skipIf(mswindows, "POSIX specific tests") 1772class POSIXProcessTestCase(BaseTestCase): 1773 1774 def setUp(self): 1775 super().setUp() 1776 self._nonexistent_dir = "/_this/pa.th/does/not/exist" 1777 1778 def _get_chdir_exception(self): 1779 try: 1780 os.chdir(self._nonexistent_dir) 1781 except OSError as e: 1782 # This avoids hard coding the errno value or the OS perror() 1783 # string and instead capture the exception that we want to see 1784 # below for comparison. 1785 desired_exception = e 1786 else: 1787 self.fail("chdir to nonexistent directory %s succeeded." % 1788 self._nonexistent_dir) 1789 return desired_exception 1790 1791 def test_exception_cwd(self): 1792 """Test error in the child raised in the parent for a bad cwd.""" 1793 desired_exception = self._get_chdir_exception() 1794 try: 1795 p = subprocess.Popen([sys.executable, "-c", ""], 1796 cwd=self._nonexistent_dir) 1797 except OSError as e: 1798 # Test that the child process chdir failure actually makes 1799 # it up to the parent process as the correct exception. 1800 self.assertEqual(desired_exception.errno, e.errno) 1801 self.assertEqual(desired_exception.strerror, e.strerror) 1802 self.assertEqual(desired_exception.filename, e.filename) 1803 else: 1804 self.fail("Expected OSError: %s" % desired_exception) 1805 1806 def test_exception_bad_executable(self): 1807 """Test error in the child raised in the parent for a bad executable.""" 1808 desired_exception = self._get_chdir_exception() 1809 try: 1810 p = subprocess.Popen([sys.executable, "-c", ""], 1811 executable=self._nonexistent_dir) 1812 except OSError as e: 1813 # Test that the child process exec failure actually makes 1814 # it up to the parent process as the correct exception. 1815 self.assertEqual(desired_exception.errno, e.errno) 1816 self.assertEqual(desired_exception.strerror, e.strerror) 1817 self.assertEqual(desired_exception.filename, e.filename) 1818 else: 1819 self.fail("Expected OSError: %s" % desired_exception) 1820 1821 def test_exception_bad_args_0(self): 1822 """Test error in the child raised in the parent for a bad args[0].""" 1823 desired_exception = self._get_chdir_exception() 1824 try: 1825 p = subprocess.Popen([self._nonexistent_dir, "-c", ""]) 1826 except OSError as e: 1827 # Test that the child process exec failure actually makes 1828 # it up to the parent process as the correct exception. 1829 self.assertEqual(desired_exception.errno, e.errno) 1830 self.assertEqual(desired_exception.strerror, e.strerror) 1831 self.assertEqual(desired_exception.filename, e.filename) 1832 else: 1833 self.fail("Expected OSError: %s" % desired_exception) 1834 1835 # We mock the __del__ method for Popen in the next two tests 1836 # because it does cleanup based on the pid returned by fork_exec 1837 # along with issuing a resource warning if it still exists. Since 1838 # we don't actually spawn a process in these tests we can forego 1839 # the destructor. An alternative would be to set _child_created to 1840 # False before the destructor is called but there is no easy way 1841 # to do that 1842 class PopenNoDestructor(subprocess.Popen): 1843 def __del__(self): 1844 pass 1845 1846 @mock.patch("subprocess._fork_exec") 1847 def test_exception_errpipe_normal(self, fork_exec): 1848 """Test error passing done through errpipe_write in the good case""" 1849 def proper_error(*args): 1850 errpipe_write = args[13] 1851 # Write the hex for the error code EISDIR: 'is a directory' 1852 err_code = '{:x}'.format(errno.EISDIR).encode() 1853 os.write(errpipe_write, b"OSError:" + err_code + b":") 1854 return 0 1855 1856 fork_exec.side_effect = proper_error 1857 1858 with mock.patch("subprocess.os.waitpid", 1859 side_effect=ChildProcessError): 1860 with self.assertRaises(IsADirectoryError): 1861 self.PopenNoDestructor(["non_existent_command"]) 1862 1863 @mock.patch("subprocess._fork_exec") 1864 def test_exception_errpipe_bad_data(self, fork_exec): 1865 """Test error passing done through errpipe_write where its not 1866 in the expected format""" 1867 error_data = b"\xFF\x00\xDE\xAD" 1868 def bad_error(*args): 1869 errpipe_write = args[13] 1870 # Anything can be in the pipe, no assumptions should 1871 # be made about its encoding, so we'll write some 1872 # arbitrary hex bytes to test it out 1873 os.write(errpipe_write, error_data) 1874 return 0 1875 1876 fork_exec.side_effect = bad_error 1877 1878 with mock.patch("subprocess.os.waitpid", 1879 side_effect=ChildProcessError): 1880 with self.assertRaises(subprocess.SubprocessError) as e: 1881 self.PopenNoDestructor(["non_existent_command"]) 1882 1883 self.assertIn(repr(error_data), str(e.exception)) 1884 1885 @unittest.skipIf(not os.path.exists('/proc/self/status'), 1886 "need /proc/self/status") 1887 def test_restore_signals(self): 1888 # Blindly assume that cat exists on systems with /proc/self/status... 1889 default_proc_status = subprocess.check_output( 1890 ['cat', '/proc/self/status'], 1891 restore_signals=False) 1892 for line in default_proc_status.splitlines(): 1893 if line.startswith(b'SigIgn'): 1894 default_sig_ign_mask = line 1895 break 1896 else: 1897 self.skipTest("SigIgn not found in /proc/self/status.") 1898 restored_proc_status = subprocess.check_output( 1899 ['cat', '/proc/self/status'], 1900 restore_signals=True) 1901 for line in restored_proc_status.splitlines(): 1902 if line.startswith(b'SigIgn'): 1903 restored_sig_ign_mask = line 1904 break 1905 self.assertNotEqual(default_sig_ign_mask, restored_sig_ign_mask, 1906 msg="restore_signals=True should've unblocked " 1907 "SIGPIPE and friends.") 1908 1909 def test_start_new_session(self): 1910 # For code coverage of calling setsid(). We don't care if we get an 1911 # EPERM error from it depending on the test execution environment, that 1912 # still indicates that it was called. 1913 try: 1914 output = subprocess.check_output( 1915 [sys.executable, "-c", "import os; print(os.getsid(0))"], 1916 start_new_session=True) 1917 except PermissionError as e: 1918 if e.errno != errno.EPERM: 1919 raise # EACCES? 1920 else: 1921 parent_sid = os.getsid(0) 1922 child_sid = int(output) 1923 self.assertNotEqual(parent_sid, child_sid) 1924 1925 @unittest.skipUnless(hasattr(os, 'setpgid') and hasattr(os, 'getpgid'), 1926 'no setpgid or getpgid on platform') 1927 def test_process_group_0(self): 1928 # For code coverage of calling setpgid(). We don't care if we get an 1929 # EPERM error from it depending on the test execution environment, that 1930 # still indicates that it was called. 1931 try: 1932 output = subprocess.check_output( 1933 [sys.executable, "-c", "import os; print(os.getpgid(0))"], 1934 process_group=0) 1935 except PermissionError as e: 1936 if e.errno != errno.EPERM: 1937 raise # EACCES? 1938 else: 1939 parent_pgid = os.getpgid(0) 1940 child_pgid = int(output) 1941 self.assertNotEqual(parent_pgid, child_pgid) 1942 1943 @unittest.skipUnless(hasattr(os, 'setreuid'), 'no setreuid on platform') 1944 def test_user(self): 1945 # For code coverage of the user parameter. We don't care if we get an 1946 # EPERM error from it depending on the test execution environment, that 1947 # still indicates that it was called. 1948 1949 uid = os.geteuid() 1950 test_users = [65534 if uid != 65534 else 65533, uid] 1951 name_uid = "nobody" if sys.platform != 'darwin' else "unknown" 1952 1953 if pwd is not None: 1954 try: 1955 pwd.getpwnam(name_uid) 1956 test_users.append(name_uid) 1957 except KeyError: 1958 # unknown user name 1959 name_uid = None 1960 1961 for user in test_users: 1962 # posix_spawn() may be used with close_fds=False 1963 for close_fds in (False, True): 1964 with self.subTest(user=user, close_fds=close_fds): 1965 try: 1966 output = subprocess.check_output( 1967 [sys.executable, "-c", 1968 "import os; print(os.getuid())"], 1969 user=user, 1970 close_fds=close_fds) 1971 except PermissionError: # (EACCES, EPERM) 1972 pass 1973 except OSError as e: 1974 if e.errno not in (errno.EACCES, errno.EPERM): 1975 raise 1976 else: 1977 if isinstance(user, str): 1978 user_uid = pwd.getpwnam(user).pw_uid 1979 else: 1980 user_uid = user 1981 child_user = int(output) 1982 self.assertEqual(child_user, user_uid) 1983 1984 with self.assertRaises(ValueError): 1985 subprocess.check_call(ZERO_RETURN_CMD, user=-1) 1986 1987 with self.assertRaises(OverflowError): 1988 subprocess.check_call(ZERO_RETURN_CMD, 1989 cwd=os.curdir, env=os.environ, user=2**64) 1990 1991 if pwd is None and name_uid is not None: 1992 with self.assertRaises(ValueError): 1993 subprocess.check_call(ZERO_RETURN_CMD, user=name_uid) 1994 1995 @unittest.skipIf(hasattr(os, 'setreuid'), 'setreuid() available on platform') 1996 def test_user_error(self): 1997 with self.assertRaises(ValueError): 1998 subprocess.check_call(ZERO_RETURN_CMD, user=65535) 1999 2000 @unittest.skipUnless(hasattr(os, 'setregid'), 'no setregid() on platform') 2001 def test_group(self): 2002 gid = os.getegid() 2003 group_list = [65534 if gid != 65534 else 65533] 2004 name_group = _get_test_grp_name() 2005 2006 if grp is not None: 2007 group_list.append(name_group) 2008 2009 for group in group_list + [gid]: 2010 # posix_spawn() may be used with close_fds=False 2011 for close_fds in (False, True): 2012 with self.subTest(group=group, close_fds=close_fds): 2013 try: 2014 output = subprocess.check_output( 2015 [sys.executable, "-c", 2016 "import os; print(os.getgid())"], 2017 group=group, 2018 close_fds=close_fds) 2019 except PermissionError: # (EACCES, EPERM) 2020 pass 2021 else: 2022 if isinstance(group, str): 2023 group_gid = grp.getgrnam(group).gr_gid 2024 else: 2025 group_gid = group 2026 2027 child_group = int(output) 2028 self.assertEqual(child_group, group_gid) 2029 2030 # make sure we bomb on negative values 2031 with self.assertRaises(ValueError): 2032 subprocess.check_call(ZERO_RETURN_CMD, group=-1) 2033 2034 with self.assertRaises(OverflowError): 2035 subprocess.check_call(ZERO_RETURN_CMD, 2036 cwd=os.curdir, env=os.environ, group=2**64) 2037 2038 if grp is None: 2039 with self.assertRaises(ValueError): 2040 subprocess.check_call(ZERO_RETURN_CMD, group=name_group) 2041 2042 @unittest.skipIf(hasattr(os, 'setregid'), 'setregid() available on platform') 2043 def test_group_error(self): 2044 with self.assertRaises(ValueError): 2045 subprocess.check_call(ZERO_RETURN_CMD, group=65535) 2046 2047 @unittest.skipUnless(hasattr(os, 'setgroups'), 'no setgroups() on platform') 2048 def test_extra_groups(self): 2049 gid = os.getegid() 2050 group_list = [65534 if gid != 65534 else 65533] 2051 name_group = _get_test_grp_name() 2052 perm_error = False 2053 2054 if grp is not None: 2055 group_list.append(name_group) 2056 2057 try: 2058 output = subprocess.check_output( 2059 [sys.executable, "-c", 2060 "import os, sys, json; json.dump(os.getgroups(), sys.stdout)"], 2061 extra_groups=group_list) 2062 except OSError as ex: 2063 if ex.errno != errno.EPERM: 2064 raise 2065 perm_error = True 2066 2067 else: 2068 parent_groups = os.getgroups() 2069 child_groups = json.loads(output) 2070 2071 if grp is not None: 2072 desired_gids = [grp.getgrnam(g).gr_gid if isinstance(g, str) else g 2073 for g in group_list] 2074 else: 2075 desired_gids = group_list 2076 2077 if perm_error: 2078 self.assertEqual(set(child_groups), set(parent_groups)) 2079 else: 2080 self.assertEqual(set(desired_gids), set(child_groups)) 2081 2082 # make sure we bomb on negative values 2083 with self.assertRaises(ValueError): 2084 subprocess.check_call(ZERO_RETURN_CMD, extra_groups=[-1]) 2085 2086 with self.assertRaises(ValueError): 2087 subprocess.check_call(ZERO_RETURN_CMD, 2088 cwd=os.curdir, env=os.environ, 2089 extra_groups=[2**64]) 2090 2091 if grp is None: 2092 with self.assertRaises(ValueError): 2093 subprocess.check_call(ZERO_RETURN_CMD, 2094 extra_groups=[name_group]) 2095 2096 @unittest.skipIf(hasattr(os, 'setgroups'), 'setgroups() available on platform') 2097 def test_extra_groups_error(self): 2098 with self.assertRaises(ValueError): 2099 subprocess.check_call(ZERO_RETURN_CMD, extra_groups=[]) 2100 2101 @unittest.skipIf(mswindows or not hasattr(os, 'umask'), 2102 'POSIX umask() is not available.') 2103 def test_umask(self): 2104 tmpdir = None 2105 try: 2106 tmpdir = tempfile.mkdtemp() 2107 name = os.path.join(tmpdir, "beans") 2108 # We set an unusual umask in the child so as a unique mode 2109 # for us to test the child's touched file for. 2110 subprocess.check_call( 2111 [sys.executable, "-c", f"open({name!r}, 'w').close()"], 2112 umask=0o053) 2113 # Ignore execute permissions entirely in our test, 2114 # filesystems could be mounted to ignore or force that. 2115 st_mode = os.stat(name).st_mode & 0o666 2116 expected_mode = 0o624 2117 self.assertEqual(expected_mode, st_mode, 2118 msg=f'{oct(expected_mode)} != {oct(st_mode)}') 2119 finally: 2120 if tmpdir is not None: 2121 shutil.rmtree(tmpdir) 2122 2123 def test_run_abort(self): 2124 # returncode handles signal termination 2125 with support.SuppressCrashReport(): 2126 p = subprocess.Popen([sys.executable, "-c", 2127 'import os; os.abort()']) 2128 p.wait() 2129 self.assertEqual(-p.returncode, signal.SIGABRT) 2130 2131 def test_CalledProcessError_str_signal(self): 2132 err = subprocess.CalledProcessError(-int(signal.SIGABRT), "fake cmd") 2133 error_string = str(err) 2134 # We're relying on the repr() of the signal.Signals intenum to provide 2135 # the word signal, the signal name and the numeric value. 2136 self.assertIn("signal", error_string.lower()) 2137 # We're not being specific about the signal name as some signals have 2138 # multiple names and which name is revealed can vary. 2139 self.assertIn("SIG", error_string) 2140 self.assertIn(str(signal.SIGABRT), error_string) 2141 2142 def test_CalledProcessError_str_unknown_signal(self): 2143 err = subprocess.CalledProcessError(-9876543, "fake cmd") 2144 error_string = str(err) 2145 self.assertIn("unknown signal 9876543.", error_string) 2146 2147 def test_CalledProcessError_str_non_zero(self): 2148 err = subprocess.CalledProcessError(2, "fake cmd") 2149 error_string = str(err) 2150 self.assertIn("non-zero exit status 2.", error_string) 2151 2152 def test_preexec(self): 2153 # DISCLAIMER: Setting environment variables is *not* a good use 2154 # of a preexec_fn. This is merely a test. 2155 p = subprocess.Popen([sys.executable, "-c", 2156 'import sys,os;' 2157 'sys.stdout.write(os.getenv("FRUIT"))'], 2158 stdout=subprocess.PIPE, 2159 preexec_fn=lambda: os.putenv("FRUIT", "apple")) 2160 with p: 2161 self.assertEqual(p.stdout.read(), b"apple") 2162 2163 def test_preexec_exception(self): 2164 def raise_it(): 2165 raise ValueError("What if two swallows carried a coconut?") 2166 try: 2167 p = subprocess.Popen([sys.executable, "-c", ""], 2168 preexec_fn=raise_it) 2169 except subprocess.SubprocessError as e: 2170 self.assertTrue( 2171 subprocess._fork_exec, 2172 "Expected a ValueError from the preexec_fn") 2173 except ValueError as e: 2174 self.assertIn("coconut", e.args[0]) 2175 else: 2176 self.fail("Exception raised by preexec_fn did not make it " 2177 "to the parent process.") 2178 2179 class _TestExecuteChildPopen(subprocess.Popen): 2180 """Used to test behavior at the end of _execute_child.""" 2181 def __init__(self, testcase, *args, **kwargs): 2182 self._testcase = testcase 2183 subprocess.Popen.__init__(self, *args, **kwargs) 2184 2185 def _execute_child(self, *args, **kwargs): 2186 try: 2187 subprocess.Popen._execute_child(self, *args, **kwargs) 2188 finally: 2189 # Open a bunch of file descriptors and verify that 2190 # none of them are the same as the ones the Popen 2191 # instance is using for stdin/stdout/stderr. 2192 devzero_fds = [os.open("/dev/zero", os.O_RDONLY) 2193 for _ in range(8)] 2194 try: 2195 for fd in devzero_fds: 2196 self._testcase.assertNotIn( 2197 fd, (self.stdin.fileno(), self.stdout.fileno(), 2198 self.stderr.fileno()), 2199 msg="At least one fd was closed early.") 2200 finally: 2201 for fd in devzero_fds: 2202 os.close(fd) 2203 2204 @unittest.skipIf(not os.path.exists("/dev/zero"), "/dev/zero required.") 2205 def test_preexec_errpipe_does_not_double_close_pipes(self): 2206 """Issue16140: Don't double close pipes on preexec error.""" 2207 2208 def raise_it(): 2209 raise subprocess.SubprocessError( 2210 "force the _execute_child() errpipe_data path.") 2211 2212 with self.assertRaises(subprocess.SubprocessError): 2213 self._TestExecuteChildPopen( 2214 self, ZERO_RETURN_CMD, 2215 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 2216 stderr=subprocess.PIPE, preexec_fn=raise_it) 2217 2218 def test_preexec_gc_module_failure(self): 2219 # This tests the code that disables garbage collection if the child 2220 # process will execute any Python. 2221 enabled = gc.isenabled() 2222 try: 2223 gc.disable() 2224 self.assertFalse(gc.isenabled()) 2225 subprocess.call([sys.executable, '-c', ''], 2226 preexec_fn=lambda: None) 2227 self.assertFalse(gc.isenabled(), 2228 "Popen enabled gc when it shouldn't.") 2229 2230 gc.enable() 2231 self.assertTrue(gc.isenabled()) 2232 subprocess.call([sys.executable, '-c', ''], 2233 preexec_fn=lambda: None) 2234 self.assertTrue(gc.isenabled(), "Popen left gc disabled.") 2235 finally: 2236 if not enabled: 2237 gc.disable() 2238 2239 @unittest.skipIf( 2240 sys.platform == 'darwin', 'setrlimit() seems to fail on OS X') 2241 def test_preexec_fork_failure(self): 2242 # The internal code did not preserve the previous exception when 2243 # re-enabling garbage collection 2244 try: 2245 from resource import getrlimit, setrlimit, RLIMIT_NPROC 2246 except ImportError as err: 2247 self.skipTest(err) # RLIMIT_NPROC is specific to Linux and BSD 2248 limits = getrlimit(RLIMIT_NPROC) 2249 [_, hard] = limits 2250 setrlimit(RLIMIT_NPROC, (0, hard)) 2251 self.addCleanup(setrlimit, RLIMIT_NPROC, limits) 2252 try: 2253 subprocess.call([sys.executable, '-c', ''], 2254 preexec_fn=lambda: None) 2255 except BlockingIOError: 2256 # Forking should raise EAGAIN, translated to BlockingIOError 2257 pass 2258 else: 2259 self.skipTest('RLIMIT_NPROC had no effect; probably superuser') 2260 2261 def test_args_string(self): 2262 # args is a string 2263 fd, fname = tempfile.mkstemp() 2264 # reopen in text mode 2265 with open(fd, "w", errors="surrogateescape") as fobj: 2266 fobj.write("#!%s\n" % support.unix_shell) 2267 fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" % 2268 sys.executable) 2269 os.chmod(fname, 0o700) 2270 p = subprocess.Popen(fname) 2271 p.wait() 2272 os.remove(fname) 2273 self.assertEqual(p.returncode, 47) 2274 2275 def test_invalid_args(self): 2276 # invalid arguments should raise ValueError 2277 self.assertRaises(ValueError, subprocess.call, 2278 [sys.executable, "-c", 2279 "import sys; sys.exit(47)"], 2280 startupinfo=47) 2281 self.assertRaises(ValueError, subprocess.call, 2282 [sys.executable, "-c", 2283 "import sys; sys.exit(47)"], 2284 creationflags=47) 2285 2286 def test_shell_sequence(self): 2287 # Run command through the shell (sequence) 2288 newenv = os.environ.copy() 2289 newenv["FRUIT"] = "apple" 2290 p = subprocess.Popen(["echo $FRUIT"], shell=1, 2291 stdout=subprocess.PIPE, 2292 env=newenv) 2293 with p: 2294 self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple") 2295 2296 def test_shell_string(self): 2297 # Run command through the shell (string) 2298 newenv = os.environ.copy() 2299 newenv["FRUIT"] = "apple" 2300 p = subprocess.Popen("echo $FRUIT", shell=1, 2301 stdout=subprocess.PIPE, 2302 env=newenv) 2303 with p: 2304 self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple") 2305 2306 def test_call_string(self): 2307 # call() function with string argument on UNIX 2308 fd, fname = tempfile.mkstemp() 2309 # reopen in text mode 2310 with open(fd, "w", errors="surrogateescape") as fobj: 2311 fobj.write("#!%s\n" % support.unix_shell) 2312 fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" % 2313 sys.executable) 2314 os.chmod(fname, 0o700) 2315 rc = subprocess.call(fname) 2316 os.remove(fname) 2317 self.assertEqual(rc, 47) 2318 2319 def test_specific_shell(self): 2320 # Issue #9265: Incorrect name passed as arg[0]. 2321 shells = [] 2322 for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']: 2323 for name in ['bash', 'ksh']: 2324 sh = os.path.join(prefix, name) 2325 if os.path.isfile(sh): 2326 shells.append(sh) 2327 if not shells: # Will probably work for any shell but csh. 2328 self.skipTest("bash or ksh required for this test") 2329 sh = '/bin/sh' 2330 if os.path.isfile(sh) and not os.path.islink(sh): 2331 # Test will fail if /bin/sh is a symlink to csh. 2332 shells.append(sh) 2333 for sh in shells: 2334 p = subprocess.Popen("echo $0", executable=sh, shell=True, 2335 stdout=subprocess.PIPE) 2336 with p: 2337 self.assertEqual(p.stdout.read().strip(), bytes(sh, 'ascii')) 2338 2339 def _kill_process(self, method, *args): 2340 # Do not inherit file handles from the parent. 2341 # It should fix failures on some platforms. 2342 # Also set the SIGINT handler to the default to make sure it's not 2343 # being ignored (some tests rely on that.) 2344 old_handler = signal.signal(signal.SIGINT, signal.default_int_handler) 2345 try: 2346 p = subprocess.Popen([sys.executable, "-c", """if 1: 2347 import sys, time 2348 sys.stdout.write('x\\n') 2349 sys.stdout.flush() 2350 time.sleep(30) 2351 """], 2352 close_fds=True, 2353 stdin=subprocess.PIPE, 2354 stdout=subprocess.PIPE, 2355 stderr=subprocess.PIPE) 2356 finally: 2357 signal.signal(signal.SIGINT, old_handler) 2358 # Wait for the interpreter to be completely initialized before 2359 # sending any signal. 2360 p.stdout.read(1) 2361 getattr(p, method)(*args) 2362 return p 2363 2364 @unittest.skipIf(sys.platform.startswith(('netbsd', 'openbsd')), 2365 "Due to known OS bug (issue #16762)") 2366 def _kill_dead_process(self, method, *args): 2367 # Do not inherit file handles from the parent. 2368 # It should fix failures on some platforms. 2369 p = subprocess.Popen([sys.executable, "-c", """if 1: 2370 import sys, time 2371 sys.stdout.write('x\\n') 2372 sys.stdout.flush() 2373 """], 2374 close_fds=True, 2375 stdin=subprocess.PIPE, 2376 stdout=subprocess.PIPE, 2377 stderr=subprocess.PIPE) 2378 # Wait for the interpreter to be completely initialized before 2379 # sending any signal. 2380 p.stdout.read(1) 2381 # The process should end after this 2382 time.sleep(1) 2383 # This shouldn't raise even though the child is now dead 2384 getattr(p, method)(*args) 2385 p.communicate() 2386 2387 def test_send_signal(self): 2388 p = self._kill_process('send_signal', signal.SIGINT) 2389 _, stderr = p.communicate() 2390 self.assertIn(b'KeyboardInterrupt', stderr) 2391 self.assertNotEqual(p.wait(), 0) 2392 2393 def test_kill(self): 2394 p = self._kill_process('kill') 2395 _, stderr = p.communicate() 2396 self.assertEqual(stderr, b'') 2397 self.assertEqual(p.wait(), -signal.SIGKILL) 2398 2399 def test_terminate(self): 2400 p = self._kill_process('terminate') 2401 _, stderr = p.communicate() 2402 self.assertEqual(stderr, b'') 2403 self.assertEqual(p.wait(), -signal.SIGTERM) 2404 2405 def test_send_signal_dead(self): 2406 # Sending a signal to a dead process 2407 self._kill_dead_process('send_signal', signal.SIGINT) 2408 2409 def test_kill_dead(self): 2410 # Killing a dead process 2411 self._kill_dead_process('kill') 2412 2413 def test_terminate_dead(self): 2414 # Terminating a dead process 2415 self._kill_dead_process('terminate') 2416 2417 def _save_fds(self, save_fds): 2418 fds = [] 2419 for fd in save_fds: 2420 inheritable = os.get_inheritable(fd) 2421 saved = os.dup(fd) 2422 fds.append((fd, saved, inheritable)) 2423 return fds 2424 2425 def _restore_fds(self, fds): 2426 for fd, saved, inheritable in fds: 2427 os.dup2(saved, fd, inheritable=inheritable) 2428 os.close(saved) 2429 2430 def check_close_std_fds(self, fds): 2431 # Issue #9905: test that subprocess pipes still work properly with 2432 # some standard fds closed 2433 stdin = 0 2434 saved_fds = self._save_fds(fds) 2435 for fd, saved, inheritable in saved_fds: 2436 if fd == 0: 2437 stdin = saved 2438 break 2439 try: 2440 for fd in fds: 2441 os.close(fd) 2442 out, err = subprocess.Popen([sys.executable, "-c", 2443 'import sys;' 2444 'sys.stdout.write("apple");' 2445 'sys.stdout.flush();' 2446 'sys.stderr.write("orange")'], 2447 stdin=stdin, 2448 stdout=subprocess.PIPE, 2449 stderr=subprocess.PIPE).communicate() 2450 self.assertEqual(out, b'apple') 2451 self.assertEqual(err, b'orange') 2452 finally: 2453 self._restore_fds(saved_fds) 2454 2455 def test_close_fd_0(self): 2456 self.check_close_std_fds([0]) 2457 2458 def test_close_fd_1(self): 2459 self.check_close_std_fds([1]) 2460 2461 def test_close_fd_2(self): 2462 self.check_close_std_fds([2]) 2463 2464 def test_close_fds_0_1(self): 2465 self.check_close_std_fds([0, 1]) 2466 2467 def test_close_fds_0_2(self): 2468 self.check_close_std_fds([0, 2]) 2469 2470 def test_close_fds_1_2(self): 2471 self.check_close_std_fds([1, 2]) 2472 2473 def test_close_fds_0_1_2(self): 2474 # Issue #10806: test that subprocess pipes still work properly with 2475 # all standard fds closed. 2476 self.check_close_std_fds([0, 1, 2]) 2477 2478 def test_small_errpipe_write_fd(self): 2479 """Issue #15798: Popen should work when stdio fds are available.""" 2480 new_stdin = os.dup(0) 2481 new_stdout = os.dup(1) 2482 try: 2483 os.close(0) 2484 os.close(1) 2485 2486 # Side test: if errpipe_write fails to have its CLOEXEC 2487 # flag set this should cause the parent to think the exec 2488 # failed. Extremely unlikely: everyone supports CLOEXEC. 2489 subprocess.Popen([ 2490 sys.executable, "-c", 2491 "print('AssertionError:0:CLOEXEC failure.')"]).wait() 2492 finally: 2493 # Restore original stdin and stdout 2494 os.dup2(new_stdin, 0) 2495 os.dup2(new_stdout, 1) 2496 os.close(new_stdin) 2497 os.close(new_stdout) 2498 2499 def test_remapping_std_fds(self): 2500 # open up some temporary files 2501 temps = [tempfile.mkstemp() for i in range(3)] 2502 try: 2503 temp_fds = [fd for fd, fname in temps] 2504 2505 # unlink the files -- we won't need to reopen them 2506 for fd, fname in temps: 2507 os.unlink(fname) 2508 2509 # write some data to what will become stdin, and rewind 2510 os.write(temp_fds[1], b"STDIN") 2511 os.lseek(temp_fds[1], 0, 0) 2512 2513 # move the standard file descriptors out of the way 2514 saved_fds = self._save_fds(range(3)) 2515 try: 2516 # duplicate the file objects over the standard fd's 2517 for fd, temp_fd in enumerate(temp_fds): 2518 os.dup2(temp_fd, fd) 2519 2520 # now use those files in the "wrong" order, so that subprocess 2521 # has to rearrange them in the child 2522 p = subprocess.Popen([sys.executable, "-c", 2523 'import sys; got = sys.stdin.read();' 2524 'sys.stdout.write("got %s"%got); sys.stderr.write("err")'], 2525 stdin=temp_fds[1], 2526 stdout=temp_fds[2], 2527 stderr=temp_fds[0]) 2528 p.wait() 2529 finally: 2530 self._restore_fds(saved_fds) 2531 2532 for fd in temp_fds: 2533 os.lseek(fd, 0, 0) 2534 2535 out = os.read(temp_fds[2], 1024) 2536 err = os.read(temp_fds[0], 1024).strip() 2537 self.assertEqual(out, b"got STDIN") 2538 self.assertEqual(err, b"err") 2539 2540 finally: 2541 for fd in temp_fds: 2542 os.close(fd) 2543 2544 def check_swap_fds(self, stdin_no, stdout_no, stderr_no): 2545 # open up some temporary files 2546 temps = [tempfile.mkstemp() for i in range(3)] 2547 temp_fds = [fd for fd, fname in temps] 2548 try: 2549 # unlink the files -- we won't need to reopen them 2550 for fd, fname in temps: 2551 os.unlink(fname) 2552 2553 # save a copy of the standard file descriptors 2554 saved_fds = self._save_fds(range(3)) 2555 try: 2556 # duplicate the temp files over the standard fd's 0, 1, 2 2557 for fd, temp_fd in enumerate(temp_fds): 2558 os.dup2(temp_fd, fd) 2559 2560 # write some data to what will become stdin, and rewind 2561 os.write(stdin_no, b"STDIN") 2562 os.lseek(stdin_no, 0, 0) 2563 2564 # now use those files in the given order, so that subprocess 2565 # has to rearrange them in the child 2566 p = subprocess.Popen([sys.executable, "-c", 2567 'import sys; got = sys.stdin.read();' 2568 'sys.stdout.write("got %s"%got); sys.stderr.write("err")'], 2569 stdin=stdin_no, 2570 stdout=stdout_no, 2571 stderr=stderr_no) 2572 p.wait() 2573 2574 for fd in temp_fds: 2575 os.lseek(fd, 0, 0) 2576 2577 out = os.read(stdout_no, 1024) 2578 err = os.read(stderr_no, 1024).strip() 2579 finally: 2580 self._restore_fds(saved_fds) 2581 2582 self.assertEqual(out, b"got STDIN") 2583 self.assertEqual(err, b"err") 2584 2585 finally: 2586 for fd in temp_fds: 2587 os.close(fd) 2588 2589 # When duping fds, if there arises a situation where one of the fds is 2590 # either 0, 1 or 2, it is possible that it is overwritten (#12607). 2591 # This tests all combinations of this. 2592 def test_swap_fds(self): 2593 self.check_swap_fds(0, 1, 2) 2594 self.check_swap_fds(0, 2, 1) 2595 self.check_swap_fds(1, 0, 2) 2596 self.check_swap_fds(1, 2, 0) 2597 self.check_swap_fds(2, 0, 1) 2598 self.check_swap_fds(2, 1, 0) 2599 2600 def _check_swap_std_fds_with_one_closed(self, from_fds, to_fds): 2601 saved_fds = self._save_fds(range(3)) 2602 try: 2603 for from_fd in from_fds: 2604 with tempfile.TemporaryFile() as f: 2605 os.dup2(f.fileno(), from_fd) 2606 2607 fd_to_close = (set(range(3)) - set(from_fds)).pop() 2608 os.close(fd_to_close) 2609 2610 arg_names = ['stdin', 'stdout', 'stderr'] 2611 kwargs = {} 2612 for from_fd, to_fd in zip(from_fds, to_fds): 2613 kwargs[arg_names[to_fd]] = from_fd 2614 2615 code = textwrap.dedent(r''' 2616 import os, sys 2617 skipped_fd = int(sys.argv[1]) 2618 for fd in range(3): 2619 if fd != skipped_fd: 2620 os.write(fd, str(fd).encode('ascii')) 2621 ''') 2622 2623 skipped_fd = (set(range(3)) - set(to_fds)).pop() 2624 2625 rc = subprocess.call([sys.executable, '-c', code, str(skipped_fd)], 2626 **kwargs) 2627 self.assertEqual(rc, 0) 2628 2629 for from_fd, to_fd in zip(from_fds, to_fds): 2630 os.lseek(from_fd, 0, os.SEEK_SET) 2631 read_bytes = os.read(from_fd, 1024) 2632 read_fds = list(map(int, read_bytes.decode('ascii'))) 2633 msg = textwrap.dedent(f""" 2634 When testing {from_fds} to {to_fds} redirection, 2635 parent descriptor {from_fd} got redirected 2636 to descriptor(s) {read_fds} instead of descriptor {to_fd}. 2637 """) 2638 self.assertEqual([to_fd], read_fds, msg) 2639 finally: 2640 self._restore_fds(saved_fds) 2641 2642 # Check that subprocess can remap std fds correctly even 2643 # if one of them is closed (#32844). 2644 def test_swap_std_fds_with_one_closed(self): 2645 for from_fds in itertools.combinations(range(3), 2): 2646 for to_fds in itertools.permutations(range(3), 2): 2647 self._check_swap_std_fds_with_one_closed(from_fds, to_fds) 2648 2649 def test_surrogates_error_message(self): 2650 def prepare(): 2651 raise ValueError("surrogate:\uDCff") 2652 2653 try: 2654 subprocess.call( 2655 ZERO_RETURN_CMD, 2656 preexec_fn=prepare) 2657 except ValueError as err: 2658 # Pure Python implementations keeps the message 2659 self.assertIsNone(subprocess._fork_exec) 2660 self.assertEqual(str(err), "surrogate:\uDCff") 2661 except subprocess.SubprocessError as err: 2662 # _posixsubprocess uses a default message 2663 self.assertIsNotNone(subprocess._fork_exec) 2664 self.assertEqual(str(err), "Exception occurred in preexec_fn.") 2665 else: 2666 self.fail("Expected ValueError or subprocess.SubprocessError") 2667 2668 def test_undecodable_env(self): 2669 for key, value in (('test', 'abc\uDCFF'), ('test\uDCFF', '42')): 2670 encoded_value = value.encode("ascii", "surrogateescape") 2671 2672 # test str with surrogates 2673 script = "import os; print(ascii(os.getenv(%s)))" % repr(key) 2674 env = os.environ.copy() 2675 env[key] = value 2676 # Use C locale to get ASCII for the locale encoding to force 2677 # surrogate-escaping of \xFF in the child process 2678 env['LC_ALL'] = 'C' 2679 decoded_value = value 2680 stdout = subprocess.check_output( 2681 [sys.executable, "-c", script], 2682 env=env) 2683 stdout = stdout.rstrip(b'\n\r') 2684 self.assertEqual(stdout.decode('ascii'), ascii(decoded_value)) 2685 2686 # test bytes 2687 key = key.encode("ascii", "surrogateescape") 2688 script = "import os; print(ascii(os.getenvb(%s)))" % repr(key) 2689 env = os.environ.copy() 2690 env[key] = encoded_value 2691 stdout = subprocess.check_output( 2692 [sys.executable, "-c", script], 2693 env=env) 2694 stdout = stdout.rstrip(b'\n\r') 2695 self.assertEqual(stdout.decode('ascii'), ascii(encoded_value)) 2696 2697 def test_bytes_program(self): 2698 abs_program = os.fsencode(ZERO_RETURN_CMD[0]) 2699 args = list(ZERO_RETURN_CMD[1:]) 2700 path, program = os.path.split(ZERO_RETURN_CMD[0]) 2701 program = os.fsencode(program) 2702 2703 # absolute bytes path 2704 exitcode = subprocess.call([abs_program]+args) 2705 self.assertEqual(exitcode, 0) 2706 2707 # absolute bytes path as a string 2708 cmd = b"'%s' %s" % (abs_program, " ".join(args).encode("utf-8")) 2709 exitcode = subprocess.call(cmd, shell=True) 2710 self.assertEqual(exitcode, 0) 2711 2712 # bytes program, unicode PATH 2713 env = os.environ.copy() 2714 env["PATH"] = path 2715 exitcode = subprocess.call([program]+args, env=env) 2716 self.assertEqual(exitcode, 0) 2717 2718 # bytes program, bytes PATH 2719 envb = os.environb.copy() 2720 envb[b"PATH"] = os.fsencode(path) 2721 exitcode = subprocess.call([program]+args, env=envb) 2722 self.assertEqual(exitcode, 0) 2723 2724 def test_pipe_cloexec(self): 2725 sleeper = support.findfile("input_reader.py", subdir="subprocessdata") 2726 fd_status = support.findfile("fd_status.py", subdir="subprocessdata") 2727 2728 p1 = subprocess.Popen([sys.executable, sleeper], 2729 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 2730 stderr=subprocess.PIPE, close_fds=False) 2731 2732 self.addCleanup(p1.communicate, b'') 2733 2734 p2 = subprocess.Popen([sys.executable, fd_status], 2735 stdout=subprocess.PIPE, close_fds=False) 2736 2737 output, error = p2.communicate() 2738 result_fds = set(map(int, output.split(b','))) 2739 unwanted_fds = set([p1.stdin.fileno(), p1.stdout.fileno(), 2740 p1.stderr.fileno()]) 2741 2742 self.assertFalse(result_fds & unwanted_fds, 2743 "Expected no fds from %r to be open in child, " 2744 "found %r" % 2745 (unwanted_fds, result_fds & unwanted_fds)) 2746 2747 def test_pipe_cloexec_real_tools(self): 2748 qcat = support.findfile("qcat.py", subdir="subprocessdata") 2749 qgrep = support.findfile("qgrep.py", subdir="subprocessdata") 2750 2751 subdata = b'zxcvbn' 2752 data = subdata * 4 + b'\n' 2753 2754 p1 = subprocess.Popen([sys.executable, qcat], 2755 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 2756 close_fds=False) 2757 2758 p2 = subprocess.Popen([sys.executable, qgrep, subdata], 2759 stdin=p1.stdout, stdout=subprocess.PIPE, 2760 close_fds=False) 2761 2762 self.addCleanup(p1.wait) 2763 self.addCleanup(p2.wait) 2764 def kill_p1(): 2765 try: 2766 p1.terminate() 2767 except ProcessLookupError: 2768 pass 2769 def kill_p2(): 2770 try: 2771 p2.terminate() 2772 except ProcessLookupError: 2773 pass 2774 self.addCleanup(kill_p1) 2775 self.addCleanup(kill_p2) 2776 2777 p1.stdin.write(data) 2778 p1.stdin.close() 2779 2780 readfiles, ignored1, ignored2 = select.select([p2.stdout], [], [], 10) 2781 2782 self.assertTrue(readfiles, "The child hung") 2783 self.assertEqual(p2.stdout.read(), data) 2784 2785 p1.stdout.close() 2786 p2.stdout.close() 2787 2788 def test_close_fds(self): 2789 fd_status = support.findfile("fd_status.py", subdir="subprocessdata") 2790 2791 fds = os.pipe() 2792 self.addCleanup(os.close, fds[0]) 2793 self.addCleanup(os.close, fds[1]) 2794 2795 open_fds = set(fds) 2796 # add a bunch more fds 2797 for _ in range(9): 2798 fd = os.open(os.devnull, os.O_RDONLY) 2799 self.addCleanup(os.close, fd) 2800 open_fds.add(fd) 2801 2802 for fd in open_fds: 2803 os.set_inheritable(fd, True) 2804 2805 p = subprocess.Popen([sys.executable, fd_status], 2806 stdout=subprocess.PIPE, close_fds=False) 2807 output, ignored = p.communicate() 2808 remaining_fds = set(map(int, output.split(b','))) 2809 2810 self.assertEqual(remaining_fds & open_fds, open_fds, 2811 "Some fds were closed") 2812 2813 p = subprocess.Popen([sys.executable, fd_status], 2814 stdout=subprocess.PIPE, close_fds=True) 2815 output, ignored = p.communicate() 2816 remaining_fds = set(map(int, output.split(b','))) 2817 2818 self.assertFalse(remaining_fds & open_fds, 2819 "Some fds were left open") 2820 self.assertIn(1, remaining_fds, "Subprocess failed") 2821 2822 # Keep some of the fd's we opened open in the subprocess. 2823 # This tests _posixsubprocess.c's proper handling of fds_to_keep. 2824 fds_to_keep = set(open_fds.pop() for _ in range(8)) 2825 p = subprocess.Popen([sys.executable, fd_status], 2826 stdout=subprocess.PIPE, close_fds=True, 2827 pass_fds=fds_to_keep) 2828 output, ignored = p.communicate() 2829 remaining_fds = set(map(int, output.split(b','))) 2830 2831 self.assertFalse((remaining_fds - fds_to_keep) & open_fds, 2832 "Some fds not in pass_fds were left open") 2833 self.assertIn(1, remaining_fds, "Subprocess failed") 2834 2835 2836 @unittest.skipIf(sys.platform.startswith("freebsd") and 2837 os.stat("/dev").st_dev == os.stat("/dev/fd").st_dev, 2838 "Requires fdescfs mounted on /dev/fd on FreeBSD") 2839 def test_close_fds_when_max_fd_is_lowered(self): 2840 """Confirm that issue21618 is fixed (may fail under valgrind).""" 2841 fd_status = support.findfile("fd_status.py", subdir="subprocessdata") 2842 2843 # This launches the meat of the test in a child process to 2844 # avoid messing with the larger unittest processes maximum 2845 # number of file descriptors. 2846 # This process launches: 2847 # +--> Process that lowers its RLIMIT_NOFILE aftr setting up 2848 # a bunch of high open fds above the new lower rlimit. 2849 # Those are reported via stdout before launching a new 2850 # process with close_fds=False to run the actual test: 2851 # +--> The TEST: This one launches a fd_status.py 2852 # subprocess with close_fds=True so we can find out if 2853 # any of the fds above the lowered rlimit are still open. 2854 p = subprocess.Popen([sys.executable, '-c', textwrap.dedent( 2855 ''' 2856 import os, resource, subprocess, sys, textwrap 2857 open_fds = set() 2858 # Add a bunch more fds to pass down. 2859 for _ in range(40): 2860 fd = os.open(os.devnull, os.O_RDONLY) 2861 open_fds.add(fd) 2862 2863 # Leave a two pairs of low ones available for use by the 2864 # internal child error pipe and the stdout pipe. 2865 # We also leave 10 more open as some Python buildbots run into 2866 # "too many open files" errors during the test if we do not. 2867 for fd in sorted(open_fds)[:14]: 2868 os.close(fd) 2869 open_fds.remove(fd) 2870 2871 for fd in open_fds: 2872 #self.addCleanup(os.close, fd) 2873 os.set_inheritable(fd, True) 2874 2875 max_fd_open = max(open_fds) 2876 2877 # Communicate the open_fds to the parent unittest.TestCase process. 2878 print(','.join(map(str, sorted(open_fds)))) 2879 sys.stdout.flush() 2880 2881 rlim_cur, rlim_max = resource.getrlimit(resource.RLIMIT_NOFILE) 2882 try: 2883 # 29 is lower than the highest fds we are leaving open. 2884 resource.setrlimit(resource.RLIMIT_NOFILE, (29, rlim_max)) 2885 # Launch a new Python interpreter with our low fd rlim_cur that 2886 # inherits open fds above that limit. It then uses subprocess 2887 # with close_fds=True to get a report of open fds in the child. 2888 # An explicit list of fds to check is passed to fd_status.py as 2889 # letting fd_status rely on its default logic would miss the 2890 # fds above rlim_cur as it normally only checks up to that limit. 2891 subprocess.Popen( 2892 [sys.executable, '-c', 2893 textwrap.dedent(""" 2894 import subprocess, sys 2895 subprocess.Popen([sys.executable, %r] + 2896 [str(x) for x in range({max_fd})], 2897 close_fds=True).wait() 2898 """.format(max_fd=max_fd_open+1))], 2899 close_fds=False).wait() 2900 finally: 2901 resource.setrlimit(resource.RLIMIT_NOFILE, (rlim_cur, rlim_max)) 2902 ''' % fd_status)], stdout=subprocess.PIPE) 2903 2904 output, unused_stderr = p.communicate() 2905 output_lines = output.splitlines() 2906 self.assertEqual(len(output_lines), 2, 2907 msg="expected exactly two lines of output:\n%r" % output) 2908 opened_fds = set(map(int, output_lines[0].strip().split(b','))) 2909 remaining_fds = set(map(int, output_lines[1].strip().split(b','))) 2910 2911 self.assertFalse(remaining_fds & opened_fds, 2912 msg="Some fds were left open.") 2913 2914 2915 # Mac OS X Tiger (10.4) has a kernel bug: sometimes, the file 2916 # descriptor of a pipe closed in the parent process is valid in the 2917 # child process according to fstat(), but the mode of the file 2918 # descriptor is invalid, and read or write raise an error. 2919 @support.requires_mac_ver(10, 5) 2920 def test_pass_fds(self): 2921 fd_status = support.findfile("fd_status.py", subdir="subprocessdata") 2922 2923 open_fds = set() 2924 2925 for x in range(5): 2926 fds = os.pipe() 2927 self.addCleanup(os.close, fds[0]) 2928 self.addCleanup(os.close, fds[1]) 2929 os.set_inheritable(fds[0], True) 2930 os.set_inheritable(fds[1], True) 2931 open_fds.update(fds) 2932 2933 for fd in open_fds: 2934 p = subprocess.Popen([sys.executable, fd_status], 2935 stdout=subprocess.PIPE, close_fds=True, 2936 pass_fds=(fd, )) 2937 output, ignored = p.communicate() 2938 2939 remaining_fds = set(map(int, output.split(b','))) 2940 to_be_closed = open_fds - {fd} 2941 2942 self.assertIn(fd, remaining_fds, "fd to be passed not passed") 2943 self.assertFalse(remaining_fds & to_be_closed, 2944 "fd to be closed passed") 2945 2946 # pass_fds overrides close_fds with a warning. 2947 with self.assertWarns(RuntimeWarning) as context: 2948 self.assertFalse(subprocess.call( 2949 ZERO_RETURN_CMD, 2950 close_fds=False, pass_fds=(fd, ))) 2951 self.assertIn('overriding close_fds', str(context.warning)) 2952 2953 def test_pass_fds_inheritable(self): 2954 script = support.findfile("fd_status.py", subdir="subprocessdata") 2955 2956 inheritable, non_inheritable = os.pipe() 2957 self.addCleanup(os.close, inheritable) 2958 self.addCleanup(os.close, non_inheritable) 2959 os.set_inheritable(inheritable, True) 2960 os.set_inheritable(non_inheritable, False) 2961 pass_fds = (inheritable, non_inheritable) 2962 args = [sys.executable, script] 2963 args += list(map(str, pass_fds)) 2964 2965 p = subprocess.Popen(args, 2966 stdout=subprocess.PIPE, close_fds=True, 2967 pass_fds=pass_fds) 2968 output, ignored = p.communicate() 2969 fds = set(map(int, output.split(b','))) 2970 2971 # the inheritable file descriptor must be inherited, so its inheritable 2972 # flag must be set in the child process after fork() and before exec() 2973 self.assertEqual(fds, set(pass_fds), "output=%a" % output) 2974 2975 # inheritable flag must not be changed in the parent process 2976 self.assertEqual(os.get_inheritable(inheritable), True) 2977 self.assertEqual(os.get_inheritable(non_inheritable), False) 2978 2979 2980 # bpo-32270: Ensure that descriptors specified in pass_fds 2981 # are inherited even if they are used in redirections. 2982 # Contributed by @izbyshev. 2983 def test_pass_fds_redirected(self): 2984 """Regression test for https://bugs.python.org/issue32270.""" 2985 fd_status = support.findfile("fd_status.py", subdir="subprocessdata") 2986 pass_fds = [] 2987 for _ in range(2): 2988 fd = os.open(os.devnull, os.O_RDWR) 2989 self.addCleanup(os.close, fd) 2990 pass_fds.append(fd) 2991 2992 stdout_r, stdout_w = os.pipe() 2993 self.addCleanup(os.close, stdout_r) 2994 self.addCleanup(os.close, stdout_w) 2995 pass_fds.insert(1, stdout_w) 2996 2997 with subprocess.Popen([sys.executable, fd_status], 2998 stdin=pass_fds[0], 2999 stdout=pass_fds[1], 3000 stderr=pass_fds[2], 3001 close_fds=True, 3002 pass_fds=pass_fds): 3003 output = os.read(stdout_r, 1024) 3004 fds = {int(num) for num in output.split(b',')} 3005 3006 self.assertEqual(fds, {0, 1, 2} | frozenset(pass_fds), f"output={output!a}") 3007 3008 3009 def test_stdout_stdin_are_single_inout_fd(self): 3010 with io.open(os.devnull, "r+") as inout: 3011 p = subprocess.Popen(ZERO_RETURN_CMD, 3012 stdout=inout, stdin=inout) 3013 p.wait() 3014 3015 def test_stdout_stderr_are_single_inout_fd(self): 3016 with io.open(os.devnull, "r+") as inout: 3017 p = subprocess.Popen(ZERO_RETURN_CMD, 3018 stdout=inout, stderr=inout) 3019 p.wait() 3020 3021 def test_stderr_stdin_are_single_inout_fd(self): 3022 with io.open(os.devnull, "r+") as inout: 3023 p = subprocess.Popen(ZERO_RETURN_CMD, 3024 stderr=inout, stdin=inout) 3025 p.wait() 3026 3027 def test_wait_when_sigchild_ignored(self): 3028 # NOTE: sigchild_ignore.py may not be an effective test on all OSes. 3029 sigchild_ignore = support.findfile("sigchild_ignore.py", 3030 subdir="subprocessdata") 3031 p = subprocess.Popen([sys.executable, sigchild_ignore], 3032 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 3033 stdout, stderr = p.communicate() 3034 self.assertEqual(0, p.returncode, "sigchild_ignore.py exited" 3035 " non-zero with this error:\n%s" % 3036 stderr.decode('utf-8')) 3037 3038 def test_select_unbuffered(self): 3039 # Issue #11459: bufsize=0 should really set the pipes as 3040 # unbuffered (and therefore let select() work properly). 3041 select = import_helper.import_module("select") 3042 p = subprocess.Popen([sys.executable, "-c", 3043 'import sys;' 3044 'sys.stdout.write("apple")'], 3045 stdout=subprocess.PIPE, 3046 bufsize=0) 3047 f = p.stdout 3048 self.addCleanup(f.close) 3049 try: 3050 self.assertEqual(f.read(4), b"appl") 3051 self.assertIn(f, select.select([f], [], [], 0.0)[0]) 3052 finally: 3053 p.wait() 3054 3055 def test_zombie_fast_process_del(self): 3056 # Issue #12650: on Unix, if Popen.__del__() was called before the 3057 # process exited, it wouldn't be added to subprocess._active, and would 3058 # remain a zombie. 3059 # spawn a Popen, and delete its reference before it exits 3060 p = subprocess.Popen([sys.executable, "-c", 3061 'import sys, time;' 3062 'time.sleep(0.2)'], 3063 stdout=subprocess.PIPE, 3064 stderr=subprocess.PIPE) 3065 self.addCleanup(p.stdout.close) 3066 self.addCleanup(p.stderr.close) 3067 ident = id(p) 3068 pid = p.pid 3069 with warnings_helper.check_warnings(('', ResourceWarning)): 3070 p = None 3071 3072 if mswindows: 3073 # subprocess._active is not used on Windows and is set to None. 3074 self.assertIsNone(subprocess._active) 3075 else: 3076 # check that p is in the active processes list 3077 self.assertIn(ident, [id(o) for o in subprocess._active]) 3078 3079 def test_leak_fast_process_del_killed(self): 3080 # Issue #12650: on Unix, if Popen.__del__() was called before the 3081 # process exited, and the process got killed by a signal, it would never 3082 # be removed from subprocess._active, which triggered a FD and memory 3083 # leak. 3084 # spawn a Popen, delete its reference and kill it 3085 p = subprocess.Popen([sys.executable, "-c", 3086 'import time;' 3087 'time.sleep(3)'], 3088 stdout=subprocess.PIPE, 3089 stderr=subprocess.PIPE) 3090 self.addCleanup(p.stdout.close) 3091 self.addCleanup(p.stderr.close) 3092 ident = id(p) 3093 pid = p.pid 3094 with warnings_helper.check_warnings(('', ResourceWarning)): 3095 p = None 3096 support.gc_collect() # For PyPy or other GCs. 3097 3098 os.kill(pid, signal.SIGKILL) 3099 if mswindows: 3100 # subprocess._active is not used on Windows and is set to None. 3101 self.assertIsNone(subprocess._active) 3102 else: 3103 # check that p is in the active processes list 3104 self.assertIn(ident, [id(o) for o in subprocess._active]) 3105 3106 # let some time for the process to exit, and create a new Popen: this 3107 # should trigger the wait() of p 3108 time.sleep(0.2) 3109 with self.assertRaises(OSError): 3110 with subprocess.Popen(NONEXISTING_CMD, 3111 stdout=subprocess.PIPE, 3112 stderr=subprocess.PIPE) as proc: 3113 pass 3114 # p should have been wait()ed on, and removed from the _active list 3115 self.assertRaises(OSError, os.waitpid, pid, 0) 3116 if mswindows: 3117 # subprocess._active is not used on Windows and is set to None. 3118 self.assertIsNone(subprocess._active) 3119 else: 3120 self.assertNotIn(ident, [id(o) for o in subprocess._active]) 3121 3122 def test_close_fds_after_preexec(self): 3123 fd_status = support.findfile("fd_status.py", subdir="subprocessdata") 3124 3125 # this FD is used as dup2() target by preexec_fn, and should be closed 3126 # in the child process 3127 fd = os.dup(1) 3128 self.addCleanup(os.close, fd) 3129 3130 p = subprocess.Popen([sys.executable, fd_status], 3131 stdout=subprocess.PIPE, close_fds=True, 3132 preexec_fn=lambda: os.dup2(1, fd)) 3133 output, ignored = p.communicate() 3134 3135 remaining_fds = set(map(int, output.split(b','))) 3136 3137 self.assertNotIn(fd, remaining_fds) 3138 3139 @support.cpython_only 3140 def test_fork_exec(self): 3141 # Issue #22290: fork_exec() must not crash on memory allocation failure 3142 # or other errors 3143 import _posixsubprocess 3144 gc_enabled = gc.isenabled() 3145 try: 3146 # Use a preexec function and enable the garbage collector 3147 # to force fork_exec() to re-enable the garbage collector 3148 # on error. 3149 func = lambda: None 3150 gc.enable() 3151 3152 for args, exe_list, cwd, env_list in ( 3153 (123, [b"exe"], None, [b"env"]), 3154 ([b"arg"], 123, None, [b"env"]), 3155 ([b"arg"], [b"exe"], 123, [b"env"]), 3156 ([b"arg"], [b"exe"], None, 123), 3157 ): 3158 with self.assertRaises(TypeError) as err: 3159 _posixsubprocess.fork_exec( 3160 args, exe_list, 3161 True, (), cwd, env_list, 3162 -1, -1, -1, -1, 3163 1, 2, 3, 4, 3164 True, True, 0, 3165 False, [], 0, -1, 3166 func, False) 3167 # Attempt to prevent 3168 # "TypeError: fork_exec() takes exactly N arguments (M given)" 3169 # from passing the test. More refactoring to have us start 3170 # with a valid *args list, confirm a good call with that works 3171 # before mutating it in various ways to ensure that bad calls 3172 # with individual arg type errors raise a typeerror would be 3173 # ideal. Saving that for a future PR... 3174 self.assertNotIn('takes exactly', str(err.exception)) 3175 finally: 3176 if not gc_enabled: 3177 gc.disable() 3178 3179 @support.cpython_only 3180 def test_fork_exec_sorted_fd_sanity_check(self): 3181 # Issue #23564: sanity check the fork_exec() fds_to_keep sanity check. 3182 import _posixsubprocess 3183 class BadInt: 3184 first = True 3185 def __init__(self, value): 3186 self.value = value 3187 def __int__(self): 3188 if self.first: 3189 self.first = False 3190 return self.value 3191 raise ValueError 3192 3193 gc_enabled = gc.isenabled() 3194 try: 3195 gc.enable() 3196 3197 for fds_to_keep in ( 3198 (-1, 2, 3, 4, 5), # Negative number. 3199 ('str', 4), # Not an int. 3200 (18, 23, 42, 2**63), # Out of range. 3201 (5, 4), # Not sorted. 3202 (6, 7, 7, 8), # Duplicate. 3203 (BadInt(1), BadInt(2)), 3204 ): 3205 with self.assertRaises( 3206 ValueError, 3207 msg='fds_to_keep={}'.format(fds_to_keep)) as c: 3208 _posixsubprocess.fork_exec( 3209 [b"false"], [b"false"], 3210 True, fds_to_keep, None, [b"env"], 3211 -1, -1, -1, -1, 3212 1, 2, 3, 4, 3213 True, True, 0, 3214 None, None, None, -1, 3215 None, True) 3216 self.assertIn('fds_to_keep', str(c.exception)) 3217 finally: 3218 if not gc_enabled: 3219 gc.disable() 3220 3221 def test_communicate_BrokenPipeError_stdin_close(self): 3222 # By not setting stdout or stderr or a timeout we force the fast path 3223 # that just calls _stdin_write() internally due to our mock. 3224 proc = subprocess.Popen(ZERO_RETURN_CMD) 3225 with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin: 3226 mock_proc_stdin.close.side_effect = BrokenPipeError 3227 proc.communicate() # Should swallow BrokenPipeError from close. 3228 mock_proc_stdin.close.assert_called_with() 3229 3230 def test_communicate_BrokenPipeError_stdin_write(self): 3231 # By not setting stdout or stderr or a timeout we force the fast path 3232 # that just calls _stdin_write() internally due to our mock. 3233 proc = subprocess.Popen(ZERO_RETURN_CMD) 3234 with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin: 3235 mock_proc_stdin.write.side_effect = BrokenPipeError 3236 proc.communicate(b'stuff') # Should swallow the BrokenPipeError. 3237 mock_proc_stdin.write.assert_called_once_with(b'stuff') 3238 mock_proc_stdin.close.assert_called_once_with() 3239 3240 def test_communicate_BrokenPipeError_stdin_flush(self): 3241 # Setting stdin and stdout forces the ._communicate() code path. 3242 # python -h exits faster than python -c pass (but spams stdout). 3243 proc = subprocess.Popen([sys.executable, '-h'], 3244 stdin=subprocess.PIPE, 3245 stdout=subprocess.PIPE) 3246 with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin, \ 3247 open(os.devnull, 'wb') as dev_null: 3248 mock_proc_stdin.flush.side_effect = BrokenPipeError 3249 # because _communicate registers a selector using proc.stdin... 3250 mock_proc_stdin.fileno.return_value = dev_null.fileno() 3251 # _communicate() should swallow BrokenPipeError from flush. 3252 proc.communicate(b'stuff') 3253 mock_proc_stdin.flush.assert_called_once_with() 3254 3255 def test_communicate_BrokenPipeError_stdin_close_with_timeout(self): 3256 # Setting stdin and stdout forces the ._communicate() code path. 3257 # python -h exits faster than python -c pass (but spams stdout). 3258 proc = subprocess.Popen([sys.executable, '-h'], 3259 stdin=subprocess.PIPE, 3260 stdout=subprocess.PIPE) 3261 with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin: 3262 mock_proc_stdin.close.side_effect = BrokenPipeError 3263 # _communicate() should swallow BrokenPipeError from close. 3264 proc.communicate(timeout=999) 3265 mock_proc_stdin.close.assert_called_once_with() 3266 3267 @unittest.skipUnless(_testcapi is not None 3268 and hasattr(_testcapi, 'W_STOPCODE'), 3269 'need _testcapi.W_STOPCODE') 3270 def test_stopped(self): 3271 """Test wait() behavior when waitpid returns WIFSTOPPED; issue29335.""" 3272 args = ZERO_RETURN_CMD 3273 proc = subprocess.Popen(args) 3274 3275 # Wait until the real process completes to avoid zombie process 3276 support.wait_process(proc.pid, exitcode=0) 3277 3278 status = _testcapi.W_STOPCODE(3) 3279 with mock.patch('subprocess.os.waitpid', return_value=(proc.pid, status)): 3280 returncode = proc.wait() 3281 3282 self.assertEqual(returncode, -3) 3283 3284 def test_send_signal_race(self): 3285 # bpo-38630: send_signal() must poll the process exit status to reduce 3286 # the risk of sending the signal to the wrong process. 3287 proc = subprocess.Popen(ZERO_RETURN_CMD) 3288 3289 # wait until the process completes without using the Popen APIs. 3290 support.wait_process(proc.pid, exitcode=0) 3291 3292 # returncode is still None but the process completed. 3293 self.assertIsNone(proc.returncode) 3294 3295 with mock.patch("os.kill") as mock_kill: 3296 proc.send_signal(signal.SIGTERM) 3297 3298 # send_signal() didn't call os.kill() since the process already 3299 # completed. 3300 mock_kill.assert_not_called() 3301 3302 # Don't check the returncode value: the test reads the exit status, 3303 # so Popen failed to read it and uses a default returncode instead. 3304 self.assertIsNotNone(proc.returncode) 3305 3306 def test_send_signal_race2(self): 3307 # bpo-40550: the process might exist between the returncode check and 3308 # the kill operation 3309 p = subprocess.Popen([sys.executable, '-c', 'exit(1)']) 3310 3311 # wait for process to exit 3312 while not p.returncode: 3313 p.poll() 3314 3315 with mock.patch.object(p, 'poll', new=lambda: None): 3316 p.returncode = None 3317 p.send_signal(signal.SIGTERM) 3318 p.kill() 3319 3320 def test_communicate_repeated_call_after_stdout_close(self): 3321 proc = subprocess.Popen([sys.executable, '-c', 3322 'import os, time; os.close(1), time.sleep(2)'], 3323 stdout=subprocess.PIPE) 3324 while True: 3325 try: 3326 proc.communicate(timeout=0.1) 3327 return 3328 except subprocess.TimeoutExpired: 3329 pass 3330 3331 3332@unittest.skipUnless(mswindows, "Windows specific tests") 3333class Win32ProcessTestCase(BaseTestCase): 3334 3335 def test_startupinfo(self): 3336 # startupinfo argument 3337 # We uses hardcoded constants, because we do not want to 3338 # depend on win32all. 3339 STARTF_USESHOWWINDOW = 1 3340 SW_MAXIMIZE = 3 3341 startupinfo = subprocess.STARTUPINFO() 3342 startupinfo.dwFlags = STARTF_USESHOWWINDOW 3343 startupinfo.wShowWindow = SW_MAXIMIZE 3344 # Since Python is a console process, it won't be affected 3345 # by wShowWindow, but the argument should be silently 3346 # ignored 3347 subprocess.call(ZERO_RETURN_CMD, 3348 startupinfo=startupinfo) 3349 3350 def test_startupinfo_keywords(self): 3351 # startupinfo argument 3352 # We use hardcoded constants, because we do not want to 3353 # depend on win32all. 3354 STARTF_USERSHOWWINDOW = 1 3355 SW_MAXIMIZE = 3 3356 startupinfo = subprocess.STARTUPINFO( 3357 dwFlags=STARTF_USERSHOWWINDOW, 3358 wShowWindow=SW_MAXIMIZE 3359 ) 3360 # Since Python is a console process, it won't be affected 3361 # by wShowWindow, but the argument should be silently 3362 # ignored 3363 subprocess.call(ZERO_RETURN_CMD, 3364 startupinfo=startupinfo) 3365 3366 def test_startupinfo_copy(self): 3367 # bpo-34044: Popen must not modify input STARTUPINFO structure 3368 startupinfo = subprocess.STARTUPINFO() 3369 startupinfo.dwFlags = subprocess.STARTF_USESHOWWINDOW 3370 startupinfo.wShowWindow = subprocess.SW_HIDE 3371 3372 # Call Popen() twice with the same startupinfo object to make sure 3373 # that it's not modified 3374 for _ in range(2): 3375 cmd = ZERO_RETURN_CMD 3376 with open(os.devnull, 'w') as null: 3377 proc = subprocess.Popen(cmd, 3378 stdout=null, 3379 stderr=subprocess.STDOUT, 3380 startupinfo=startupinfo) 3381 with proc: 3382 proc.communicate() 3383 self.assertEqual(proc.returncode, 0) 3384 3385 self.assertEqual(startupinfo.dwFlags, 3386 subprocess.STARTF_USESHOWWINDOW) 3387 self.assertIsNone(startupinfo.hStdInput) 3388 self.assertIsNone(startupinfo.hStdOutput) 3389 self.assertIsNone(startupinfo.hStdError) 3390 self.assertEqual(startupinfo.wShowWindow, subprocess.SW_HIDE) 3391 self.assertEqual(startupinfo.lpAttributeList, {"handle_list": []}) 3392 3393 def test_creationflags(self): 3394 # creationflags argument 3395 CREATE_NEW_CONSOLE = 16 3396 sys.stderr.write(" a DOS box should flash briefly ...\n") 3397 subprocess.call(sys.executable + 3398 ' -c "import time; time.sleep(0.25)"', 3399 creationflags=CREATE_NEW_CONSOLE) 3400 3401 def test_invalid_args(self): 3402 # invalid arguments should raise ValueError 3403 self.assertRaises(ValueError, subprocess.call, 3404 [sys.executable, "-c", 3405 "import sys; sys.exit(47)"], 3406 preexec_fn=lambda: 1) 3407 3408 @support.cpython_only 3409 def test_issue31471(self): 3410 # There shouldn't be an assertion failure in Popen() in case the env 3411 # argument has a bad keys() method. 3412 class BadEnv(dict): 3413 keys = None 3414 with self.assertRaises(TypeError): 3415 subprocess.Popen(ZERO_RETURN_CMD, env=BadEnv()) 3416 3417 def test_close_fds(self): 3418 # close file descriptors 3419 rc = subprocess.call([sys.executable, "-c", 3420 "import sys; sys.exit(47)"], 3421 close_fds=True) 3422 self.assertEqual(rc, 47) 3423 3424 def test_close_fds_with_stdio(self): 3425 import msvcrt 3426 3427 fds = os.pipe() 3428 self.addCleanup(os.close, fds[0]) 3429 self.addCleanup(os.close, fds[1]) 3430 3431 handles = [] 3432 for fd in fds: 3433 os.set_inheritable(fd, True) 3434 handles.append(msvcrt.get_osfhandle(fd)) 3435 3436 p = subprocess.Popen([sys.executable, "-c", 3437 "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])], 3438 stdout=subprocess.PIPE, close_fds=False) 3439 stdout, stderr = p.communicate() 3440 self.assertEqual(p.returncode, 0) 3441 int(stdout.strip()) # Check that stdout is an integer 3442 3443 p = subprocess.Popen([sys.executable, "-c", 3444 "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])], 3445 stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True) 3446 stdout, stderr = p.communicate() 3447 self.assertEqual(p.returncode, 1) 3448 self.assertIn(b"OSError", stderr) 3449 3450 # The same as the previous call, but with an empty handle_list 3451 handle_list = [] 3452 startupinfo = subprocess.STARTUPINFO() 3453 startupinfo.lpAttributeList = {"handle_list": handle_list} 3454 p = subprocess.Popen([sys.executable, "-c", 3455 "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])], 3456 stdout=subprocess.PIPE, stderr=subprocess.PIPE, 3457 startupinfo=startupinfo, close_fds=True) 3458 stdout, stderr = p.communicate() 3459 self.assertEqual(p.returncode, 1) 3460 self.assertIn(b"OSError", stderr) 3461 3462 # Check for a warning due to using handle_list and close_fds=False 3463 with warnings_helper.check_warnings((".*overriding close_fds", 3464 RuntimeWarning)): 3465 startupinfo = subprocess.STARTUPINFO() 3466 startupinfo.lpAttributeList = {"handle_list": handles[:]} 3467 p = subprocess.Popen([sys.executable, "-c", 3468 "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])], 3469 stdout=subprocess.PIPE, stderr=subprocess.PIPE, 3470 startupinfo=startupinfo, close_fds=False) 3471 stdout, stderr = p.communicate() 3472 self.assertEqual(p.returncode, 0) 3473 3474 def test_empty_attribute_list(self): 3475 startupinfo = subprocess.STARTUPINFO() 3476 startupinfo.lpAttributeList = {} 3477 subprocess.call(ZERO_RETURN_CMD, 3478 startupinfo=startupinfo) 3479 3480 def test_empty_handle_list(self): 3481 startupinfo = subprocess.STARTUPINFO() 3482 startupinfo.lpAttributeList = {"handle_list": []} 3483 subprocess.call(ZERO_RETURN_CMD, 3484 startupinfo=startupinfo) 3485 3486 def test_shell_sequence(self): 3487 # Run command through the shell (sequence) 3488 newenv = os.environ.copy() 3489 newenv["FRUIT"] = "physalis" 3490 p = subprocess.Popen(["set"], shell=1, 3491 stdout=subprocess.PIPE, 3492 env=newenv) 3493 with p: 3494 self.assertIn(b"physalis", p.stdout.read()) 3495 3496 def test_shell_string(self): 3497 # Run command through the shell (string) 3498 newenv = os.environ.copy() 3499 newenv["FRUIT"] = "physalis" 3500 p = subprocess.Popen("set", shell=1, 3501 stdout=subprocess.PIPE, 3502 env=newenv) 3503 with p: 3504 self.assertIn(b"physalis", p.stdout.read()) 3505 3506 def test_shell_encodings(self): 3507 # Run command through the shell (string) 3508 for enc in ['ansi', 'oem']: 3509 newenv = os.environ.copy() 3510 newenv["FRUIT"] = "physalis" 3511 p = subprocess.Popen("set", shell=1, 3512 stdout=subprocess.PIPE, 3513 env=newenv, 3514 encoding=enc) 3515 with p: 3516 self.assertIn("physalis", p.stdout.read(), enc) 3517 3518 def test_call_string(self): 3519 # call() function with string argument on Windows 3520 rc = subprocess.call(sys.executable + 3521 ' -c "import sys; sys.exit(47)"') 3522 self.assertEqual(rc, 47) 3523 3524 def _kill_process(self, method, *args): 3525 # Some win32 buildbot raises EOFError if stdin is inherited 3526 p = subprocess.Popen([sys.executable, "-c", """if 1: 3527 import sys, time 3528 sys.stdout.write('x\\n') 3529 sys.stdout.flush() 3530 time.sleep(30) 3531 """], 3532 stdin=subprocess.PIPE, 3533 stdout=subprocess.PIPE, 3534 stderr=subprocess.PIPE) 3535 with p: 3536 # Wait for the interpreter to be completely initialized before 3537 # sending any signal. 3538 p.stdout.read(1) 3539 getattr(p, method)(*args) 3540 _, stderr = p.communicate() 3541 self.assertEqual(stderr, b'') 3542 returncode = p.wait() 3543 self.assertNotEqual(returncode, 0) 3544 3545 def _kill_dead_process(self, method, *args): 3546 p = subprocess.Popen([sys.executable, "-c", """if 1: 3547 import sys, time 3548 sys.stdout.write('x\\n') 3549 sys.stdout.flush() 3550 sys.exit(42) 3551 """], 3552 stdin=subprocess.PIPE, 3553 stdout=subprocess.PIPE, 3554 stderr=subprocess.PIPE) 3555 with p: 3556 # Wait for the interpreter to be completely initialized before 3557 # sending any signal. 3558 p.stdout.read(1) 3559 # The process should end after this 3560 time.sleep(1) 3561 # This shouldn't raise even though the child is now dead 3562 getattr(p, method)(*args) 3563 _, stderr = p.communicate() 3564 self.assertEqual(stderr, b'') 3565 rc = p.wait() 3566 self.assertEqual(rc, 42) 3567 3568 def test_send_signal(self): 3569 self._kill_process('send_signal', signal.SIGTERM) 3570 3571 def test_kill(self): 3572 self._kill_process('kill') 3573 3574 def test_terminate(self): 3575 self._kill_process('terminate') 3576 3577 def test_send_signal_dead(self): 3578 self._kill_dead_process('send_signal', signal.SIGTERM) 3579 3580 def test_kill_dead(self): 3581 self._kill_dead_process('kill') 3582 3583 def test_terminate_dead(self): 3584 self._kill_dead_process('terminate') 3585 3586class MiscTests(unittest.TestCase): 3587 3588 class RecordingPopen(subprocess.Popen): 3589 """A Popen that saves a reference to each instance for testing.""" 3590 instances_created = [] 3591 3592 def __init__(self, *args, **kwargs): 3593 super().__init__(*args, **kwargs) 3594 self.instances_created.append(self) 3595 3596 @mock.patch.object(subprocess.Popen, "_communicate") 3597 def _test_keyboardinterrupt_no_kill(self, popener, mock__communicate, 3598 **kwargs): 3599 """Fake a SIGINT happening during Popen._communicate() and ._wait(). 3600 3601 This avoids the need to actually try and get test environments to send 3602 and receive signals reliably across platforms. The net effect of a ^C 3603 happening during a blocking subprocess execution which we want to clean 3604 up from is a KeyboardInterrupt coming out of communicate() or wait(). 3605 """ 3606 3607 mock__communicate.side_effect = KeyboardInterrupt 3608 try: 3609 with mock.patch.object(subprocess.Popen, "_wait") as mock__wait: 3610 # We patch out _wait() as no signal was involved so the 3611 # child process isn't actually going to exit rapidly. 3612 mock__wait.side_effect = KeyboardInterrupt 3613 with mock.patch.object(subprocess, "Popen", 3614 self.RecordingPopen): 3615 with self.assertRaises(KeyboardInterrupt): 3616 popener([sys.executable, "-c", 3617 "import time\ntime.sleep(9)\nimport sys\n" 3618 "sys.stderr.write('\\n!runaway child!\\n')"], 3619 stdout=subprocess.DEVNULL, **kwargs) 3620 for call in mock__wait.call_args_list[1:]: 3621 self.assertNotEqual( 3622 call, mock.call(timeout=None), 3623 "no open-ended wait() after the first allowed: " 3624 f"{mock__wait.call_args_list}") 3625 sigint_calls = [] 3626 for call in mock__wait.call_args_list: 3627 if call == mock.call(timeout=0.25): # from Popen.__init__ 3628 sigint_calls.append(call) 3629 self.assertLessEqual(mock__wait.call_count, 2, 3630 msg=mock__wait.call_args_list) 3631 self.assertEqual(len(sigint_calls), 1, 3632 msg=mock__wait.call_args_list) 3633 finally: 3634 # cleanup the forgotten (due to our mocks) child process 3635 process = self.RecordingPopen.instances_created.pop() 3636 process.kill() 3637 process.wait() 3638 self.assertEqual([], self.RecordingPopen.instances_created) 3639 3640 def test_call_keyboardinterrupt_no_kill(self): 3641 self._test_keyboardinterrupt_no_kill(subprocess.call, timeout=6.282) 3642 3643 def test_run_keyboardinterrupt_no_kill(self): 3644 self._test_keyboardinterrupt_no_kill(subprocess.run, timeout=6.282) 3645 3646 def test_context_manager_keyboardinterrupt_no_kill(self): 3647 def popen_via_context_manager(*args, **kwargs): 3648 with subprocess.Popen(*args, **kwargs) as unused_process: 3649 raise KeyboardInterrupt # Test how __exit__ handles ^C. 3650 self._test_keyboardinterrupt_no_kill(popen_via_context_manager) 3651 3652 def test_getoutput(self): 3653 self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy') 3654 self.assertEqual(subprocess.getstatusoutput('echo xyzzy'), 3655 (0, 'xyzzy')) 3656 3657 # we use mkdtemp in the next line to create an empty directory 3658 # under our exclusive control; from that, we can invent a pathname 3659 # that we _know_ won't exist. This is guaranteed to fail. 3660 dir = None 3661 try: 3662 dir = tempfile.mkdtemp() 3663 name = os.path.join(dir, "foo") 3664 status, output = subprocess.getstatusoutput( 3665 ("type " if mswindows else "cat ") + name) 3666 self.assertNotEqual(status, 0) 3667 finally: 3668 if dir is not None: 3669 os.rmdir(dir) 3670 3671 def test__all__(self): 3672 """Ensure that __all__ is populated properly.""" 3673 intentionally_excluded = {"list2cmdline", "Handle", "pwd", "grp", "fcntl"} 3674 exported = set(subprocess.__all__) 3675 possible_exports = set() 3676 import types 3677 for name, value in subprocess.__dict__.items(): 3678 if name.startswith('_'): 3679 continue 3680 if isinstance(value, (types.ModuleType,)): 3681 continue 3682 possible_exports.add(name) 3683 self.assertEqual(exported, possible_exports - intentionally_excluded) 3684 3685 3686@unittest.skipUnless(hasattr(selectors, 'PollSelector'), 3687 "Test needs selectors.PollSelector") 3688class ProcessTestCaseNoPoll(ProcessTestCase): 3689 def setUp(self): 3690 self.orig_selector = subprocess._PopenSelector 3691 subprocess._PopenSelector = selectors.SelectSelector 3692 ProcessTestCase.setUp(self) 3693 3694 def tearDown(self): 3695 subprocess._PopenSelector = self.orig_selector 3696 ProcessTestCase.tearDown(self) 3697 3698 3699@unittest.skipUnless(mswindows, "Windows-specific tests") 3700class CommandsWithSpaces (BaseTestCase): 3701 3702 def setUp(self): 3703 super().setUp() 3704 f, fname = tempfile.mkstemp(".py", "te st") 3705 self.fname = fname.lower () 3706 os.write(f, b"import sys;" 3707 b"sys.stdout.write('%d %s' % (len(sys.argv), [a.lower () for a in sys.argv]))" 3708 ) 3709 os.close(f) 3710 3711 def tearDown(self): 3712 os.remove(self.fname) 3713 super().tearDown() 3714 3715 def with_spaces(self, *args, **kwargs): 3716 kwargs['stdout'] = subprocess.PIPE 3717 p = subprocess.Popen(*args, **kwargs) 3718 with p: 3719 self.assertEqual( 3720 p.stdout.read ().decode("mbcs"), 3721 "2 [%r, 'ab cd']" % self.fname 3722 ) 3723 3724 def test_shell_string_with_spaces(self): 3725 # call() function with string argument with spaces on Windows 3726 self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname, 3727 "ab cd"), shell=1) 3728 3729 def test_shell_sequence_with_spaces(self): 3730 # call() function with sequence argument with spaces on Windows 3731 self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1) 3732 3733 def test_noshell_string_with_spaces(self): 3734 # call() function with string argument with spaces on Windows 3735 self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname, 3736 "ab cd")) 3737 3738 def test_noshell_sequence_with_spaces(self): 3739 # call() function with sequence argument with spaces on Windows 3740 self.with_spaces([sys.executable, self.fname, "ab cd"]) 3741 3742 3743class ContextManagerTests(BaseTestCase): 3744 3745 def test_pipe(self): 3746 with subprocess.Popen([sys.executable, "-c", 3747 "import sys;" 3748 "sys.stdout.write('stdout');" 3749 "sys.stderr.write('stderr');"], 3750 stdout=subprocess.PIPE, 3751 stderr=subprocess.PIPE) as proc: 3752 self.assertEqual(proc.stdout.read(), b"stdout") 3753 self.assertEqual(proc.stderr.read(), b"stderr") 3754 3755 self.assertTrue(proc.stdout.closed) 3756 self.assertTrue(proc.stderr.closed) 3757 3758 def test_returncode(self): 3759 with subprocess.Popen([sys.executable, "-c", 3760 "import sys; sys.exit(100)"]) as proc: 3761 pass 3762 # __exit__ calls wait(), so the returncode should be set 3763 self.assertEqual(proc.returncode, 100) 3764 3765 def test_communicate_stdin(self): 3766 with subprocess.Popen([sys.executable, "-c", 3767 "import sys;" 3768 "sys.exit(sys.stdin.read() == 'context')"], 3769 stdin=subprocess.PIPE) as proc: 3770 proc.communicate(b"context") 3771 self.assertEqual(proc.returncode, 1) 3772 3773 def test_invalid_args(self): 3774 with self.assertRaises(NONEXISTING_ERRORS): 3775 with subprocess.Popen(NONEXISTING_CMD, 3776 stdout=subprocess.PIPE, 3777 stderr=subprocess.PIPE) as proc: 3778 pass 3779 3780 def test_broken_pipe_cleanup(self): 3781 """Broken pipe error should not prevent wait() (Issue 21619)""" 3782 proc = subprocess.Popen(ZERO_RETURN_CMD, 3783 stdin=subprocess.PIPE, 3784 bufsize=support.PIPE_MAX_SIZE*2) 3785 proc = proc.__enter__() 3786 # Prepare to send enough data to overflow any OS pipe buffering and 3787 # guarantee a broken pipe error. Data is held in BufferedWriter 3788 # buffer until closed. 3789 proc.stdin.write(b'x' * support.PIPE_MAX_SIZE) 3790 self.assertIsNone(proc.returncode) 3791 # EPIPE expected under POSIX; EINVAL under Windows 3792 self.assertRaises(OSError, proc.__exit__, None, None, None) 3793 self.assertEqual(proc.returncode, 0) 3794 self.assertTrue(proc.stdin.closed) 3795 3796 3797if __name__ == "__main__": 3798 unittest.main() 3799