1# Common utility functions used by various script execution tests 2# e.g. test_cmd_line, test_cmd_line_script and test_runpy 3 4import collections 5import importlib 6import sys 7import os 8import os.path 9import subprocess 10import py_compile 11import zipfile 12 13from importlib.util import source_from_cache 14from test import support 15from test.support.import_helper import make_legacy_pyc 16 17 18# Cached result of the expensive test performed in the function below. 19__cached_interp_requires_environment = None 20 21 22def interpreter_requires_environment(): 23 """ 24 Returns True if our sys.executable interpreter requires environment 25 variables in order to be able to run at all. 26 27 This is designed to be used with @unittest.skipIf() to annotate tests 28 that need to use an assert_python*() function to launch an isolated 29 mode (-I) or no environment mode (-E) sub-interpreter process. 30 31 A normal build & test does not run into this situation but it can happen 32 when trying to run the standard library test suite from an interpreter that 33 doesn't have an obvious home with Python's current home finding logic. 34 35 Setting PYTHONHOME is one way to get most of the testsuite to run in that 36 situation. PYTHONPATH or PYTHONUSERSITE are other common environment 37 variables that might impact whether or not the interpreter can start. 38 """ 39 global __cached_interp_requires_environment 40 if __cached_interp_requires_environment is None: 41 # If PYTHONHOME is set, assume that we need it 42 if 'PYTHONHOME' in os.environ: 43 __cached_interp_requires_environment = True 44 return True 45 # cannot run subprocess, assume we don't need it 46 if not support.has_subprocess_support: 47 __cached_interp_requires_environment = False 48 return False 49 50 # Try running an interpreter with -E to see if it works or not. 51 try: 52 subprocess.check_call([sys.executable, '-E', 53 '-c', 'import sys; sys.exit(0)']) 54 except subprocess.CalledProcessError: 55 __cached_interp_requires_environment = True 56 else: 57 __cached_interp_requires_environment = False 58 59 return __cached_interp_requires_environment 60 61 62class _PythonRunResult(collections.namedtuple("_PythonRunResult", 63 ("rc", "out", "err"))): 64 """Helper for reporting Python subprocess run results""" 65 def fail(self, cmd_line): 66 """Provide helpful details about failed subcommand runs""" 67 # Limit to 80 lines to ASCII characters 68 maxlen = 80 * 100 69 out, err = self.out, self.err 70 if len(out) > maxlen: 71 out = b'(... truncated stdout ...)' + out[-maxlen:] 72 if len(err) > maxlen: 73 err = b'(... truncated stderr ...)' + err[-maxlen:] 74 out = out.decode('ascii', 'replace').rstrip() 75 err = err.decode('ascii', 'replace').rstrip() 76 raise AssertionError("Process return code is %d\n" 77 "command line: %r\n" 78 "\n" 79 "stdout:\n" 80 "---\n" 81 "%s\n" 82 "---\n" 83 "\n" 84 "stderr:\n" 85 "---\n" 86 "%s\n" 87 "---" 88 % (self.rc, cmd_line, 89 out, 90 err)) 91 92 93# Executing the interpreter in a subprocess 94@support.requires_subprocess() 95def run_python_until_end(*args, **env_vars): 96 env_required = interpreter_requires_environment() 97 cwd = env_vars.pop('__cwd', None) 98 if '__isolated' in env_vars: 99 isolated = env_vars.pop('__isolated') 100 else: 101 isolated = not env_vars and not env_required 102 cmd_line = [sys.executable, '-X', 'faulthandler'] 103 if isolated: 104 # isolated mode: ignore Python environment variables, ignore user 105 # site-packages, and don't add the current directory to sys.path 106 cmd_line.append('-I') 107 elif not env_vars and not env_required: 108 # ignore Python environment variables 109 cmd_line.append('-E') 110 111 # But a special flag that can be set to override -- in this case, the 112 # caller is responsible to pass the full environment. 113 if env_vars.pop('__cleanenv', None): 114 env = {} 115 if sys.platform == 'win32': 116 # Windows requires at least the SYSTEMROOT environment variable to 117 # start Python. 118 env['SYSTEMROOT'] = os.environ['SYSTEMROOT'] 119 120 # Other interesting environment variables, not copied currently: 121 # COMSPEC, HOME, PATH, TEMP, TMPDIR, TMP. 122 else: 123 # Need to preserve the original environment, for in-place testing of 124 # shared library builds. 125 env = os.environ.copy() 126 127 # set TERM='' unless the TERM environment variable is passed explicitly 128 # see issues #11390 and #18300 129 if 'TERM' not in env_vars: 130 env['TERM'] = '' 131 132 env.update(env_vars) 133 cmd_line.extend(args) 134 proc = subprocess.Popen(cmd_line, stdin=subprocess.PIPE, 135 stdout=subprocess.PIPE, stderr=subprocess.PIPE, 136 env=env, cwd=cwd) 137 with proc: 138 try: 139 out, err = proc.communicate() 140 finally: 141 proc.kill() 142 subprocess._cleanup() 143 rc = proc.returncode 144 return _PythonRunResult(rc, out, err), cmd_line 145 146 147@support.requires_subprocess() 148def _assert_python(expected_success, /, *args, **env_vars): 149 res, cmd_line = run_python_until_end(*args, **env_vars) 150 if (res.rc and expected_success) or (not res.rc and not expected_success): 151 res.fail(cmd_line) 152 return res 153 154 155def assert_python_ok(*args, **env_vars): 156 """ 157 Assert that running the interpreter with `args` and optional environment 158 variables `env_vars` succeeds (rc == 0) and return a (return code, stdout, 159 stderr) tuple. 160 161 If the __cleanenv keyword is set, env_vars is used as a fresh environment. 162 163 Python is started in isolated mode (command line option -I), 164 except if the __isolated keyword is set to False. 165 """ 166 return _assert_python(True, *args, **env_vars) 167 168 169def assert_python_failure(*args, **env_vars): 170 """ 171 Assert that running the interpreter with `args` and optional environment 172 variables `env_vars` fails (rc != 0) and return a (return code, stdout, 173 stderr) tuple. 174 175 See assert_python_ok() for more options. 176 """ 177 return _assert_python(False, *args, **env_vars) 178 179 180@support.requires_subprocess() 181def spawn_python(*args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kw): 182 """Run a Python subprocess with the given arguments. 183 184 kw is extra keyword args to pass to subprocess.Popen. Returns a Popen 185 object. 186 """ 187 cmd_line = [sys.executable] 188 if not interpreter_requires_environment(): 189 cmd_line.append('-E') 190 cmd_line.extend(args) 191 # Under Fedora (?), GNU readline can output junk on stderr when initialized, 192 # depending on the TERM setting. Setting TERM=vt100 is supposed to disable 193 # that. References: 194 # - http://reinout.vanrees.org/weblog/2009/08/14/readline-invisible-character-hack.html 195 # - http://stackoverflow.com/questions/15760712/python-readline-module-prints-escape-character-during-import 196 # - http://lists.gnu.org/archive/html/bug-readline/2007-08/msg00004.html 197 env = kw.setdefault('env', dict(os.environ)) 198 env['TERM'] = 'vt100' 199 return subprocess.Popen(cmd_line, stdin=subprocess.PIPE, 200 stdout=stdout, stderr=stderr, 201 **kw) 202 203 204def kill_python(p): 205 """Run the given Popen process until completion and return stdout.""" 206 p.stdin.close() 207 data = p.stdout.read() 208 p.stdout.close() 209 # try to cleanup the child so we don't appear to leak when running 210 # with regrtest -R. 211 p.wait() 212 subprocess._cleanup() 213 return data 214 215 216def make_script(script_dir, script_basename, source, omit_suffix=False): 217 script_filename = script_basename 218 if not omit_suffix: 219 script_filename += os.extsep + 'py' 220 script_name = os.path.join(script_dir, script_filename) 221 # The script should be encoded to UTF-8, the default string encoding 222 with open(script_name, 'w', encoding='utf-8') as script_file: 223 script_file.write(source) 224 importlib.invalidate_caches() 225 return script_name 226 227 228def make_zip_script(zip_dir, zip_basename, script_name, name_in_zip=None): 229 zip_filename = zip_basename+os.extsep+'zip' 230 zip_name = os.path.join(zip_dir, zip_filename) 231 with zipfile.ZipFile(zip_name, 'w') as zip_file: 232 if name_in_zip is None: 233 parts = script_name.split(os.sep) 234 if len(parts) >= 2 and parts[-2] == '__pycache__': 235 legacy_pyc = make_legacy_pyc(source_from_cache(script_name)) 236 name_in_zip = os.path.basename(legacy_pyc) 237 script_name = legacy_pyc 238 else: 239 name_in_zip = os.path.basename(script_name) 240 zip_file.write(script_name, name_in_zip) 241 #if test.support.verbose: 242 # with zipfile.ZipFile(zip_name, 'r') as zip_file: 243 # print 'Contents of %r:' % zip_name 244 # zip_file.printdir() 245 return zip_name, os.path.join(zip_name, name_in_zip) 246 247 248def make_pkg(pkg_dir, init_source=''): 249 os.mkdir(pkg_dir) 250 make_script(pkg_dir, '__init__', init_source) 251 252 253def make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename, 254 source, depth=1, compiled=False): 255 unlink = [] 256 init_name = make_script(zip_dir, '__init__', '') 257 unlink.append(init_name) 258 init_basename = os.path.basename(init_name) 259 script_name = make_script(zip_dir, script_basename, source) 260 unlink.append(script_name) 261 if compiled: 262 init_name = py_compile.compile(init_name, doraise=True) 263 script_name = py_compile.compile(script_name, doraise=True) 264 unlink.extend((init_name, script_name)) 265 pkg_names = [os.sep.join([pkg_name]*i) for i in range(1, depth+1)] 266 script_name_in_zip = os.path.join(pkg_names[-1], os.path.basename(script_name)) 267 zip_filename = zip_basename+os.extsep+'zip' 268 zip_name = os.path.join(zip_dir, zip_filename) 269 with zipfile.ZipFile(zip_name, 'w') as zip_file: 270 for name in pkg_names: 271 init_name_in_zip = os.path.join(name, init_basename) 272 zip_file.write(init_name, init_name_in_zip) 273 zip_file.write(script_name, script_name_in_zip) 274 for name in unlink: 275 os.unlink(name) 276 #if test.support.verbose: 277 # with zipfile.ZipFile(zip_name, 'r') as zip_file: 278 # print 'Contents of %r:' % zip_name 279 # zip_file.printdir() 280 return zip_name, os.path.join(zip_name, script_name_in_zip) 281 282 283@support.requires_subprocess() 284def run_test_script(script): 285 # use -u to try to get the full output if the test hangs or crash 286 if support.verbose: 287 def title(text): 288 return f"===== {text} ======" 289 290 name = f"script {os.path.basename(script)}" 291 print() 292 print(title(name), flush=True) 293 # In verbose mode, the child process inherit stdout and stdout, 294 # to see output in realtime and reduce the risk of losing output. 295 args = [sys.executable, "-E", "-X", "faulthandler", "-u", script, "-v"] 296 proc = subprocess.run(args) 297 print(title(f"{name} completed: exit code {proc.returncode}"), 298 flush=True) 299 if proc.returncode: 300 raise AssertionError(f"{name} failed") 301 else: 302 assert_python_ok("-u", script, "-v") 303