1# Common utility functions used by various script execution tests
2#  e.g. test_cmd_line, test_cmd_line_script and test_runpy
3
4import collections
5import importlib
6import sys
7import os
8import os.path
9import subprocess
10import py_compile
11import zipfile
12
13from importlib.util import source_from_cache
14from test import support
15from test.support.import_helper import make_legacy_pyc
16
17
18# Cached result of the expensive test performed in the function below.
19__cached_interp_requires_environment = None
20
21
22def interpreter_requires_environment():
23    """
24    Returns True if our sys.executable interpreter requires environment
25    variables in order to be able to run at all.
26
27    This is designed to be used with @unittest.skipIf() to annotate tests
28    that need to use an assert_python*() function to launch an isolated
29    mode (-I) or no environment mode (-E) sub-interpreter process.
30
31    A normal build & test does not run into this situation but it can happen
32    when trying to run the standard library test suite from an interpreter that
33    doesn't have an obvious home with Python's current home finding logic.
34
35    Setting PYTHONHOME is one way to get most of the testsuite to run in that
36    situation.  PYTHONPATH or PYTHONUSERSITE are other common environment
37    variables that might impact whether or not the interpreter can start.
38    """
39    global __cached_interp_requires_environment
40    if __cached_interp_requires_environment is None:
41        # If PYTHONHOME is set, assume that we need it
42        if 'PYTHONHOME' in os.environ:
43            __cached_interp_requires_environment = True
44            return True
45        # cannot run subprocess, assume we don't need it
46        if not support.has_subprocess_support:
47            __cached_interp_requires_environment = False
48            return False
49
50        # Try running an interpreter with -E to see if it works or not.
51        try:
52            subprocess.check_call([sys.executable, '-E',
53                                   '-c', 'import sys; sys.exit(0)'])
54        except subprocess.CalledProcessError:
55            __cached_interp_requires_environment = True
56        else:
57            __cached_interp_requires_environment = False
58
59    return __cached_interp_requires_environment
60
61
62class _PythonRunResult(collections.namedtuple("_PythonRunResult",
63                                          ("rc", "out", "err"))):
64    """Helper for reporting Python subprocess run results"""
65    def fail(self, cmd_line):
66        """Provide helpful details about failed subcommand runs"""
67        # Limit to 80 lines to ASCII characters
68        maxlen = 80 * 100
69        out, err = self.out, self.err
70        if len(out) > maxlen:
71            out = b'(... truncated stdout ...)' + out[-maxlen:]
72        if len(err) > maxlen:
73            err = b'(... truncated stderr ...)' + err[-maxlen:]
74        out = out.decode('ascii', 'replace').rstrip()
75        err = err.decode('ascii', 'replace').rstrip()
76        raise AssertionError("Process return code is %d\n"
77                             "command line: %r\n"
78                             "\n"
79                             "stdout:\n"
80                             "---\n"
81                             "%s\n"
82                             "---\n"
83                             "\n"
84                             "stderr:\n"
85                             "---\n"
86                             "%s\n"
87                             "---"
88                             % (self.rc, cmd_line,
89                                out,
90                                err))
91
92
93# Executing the interpreter in a subprocess
94@support.requires_subprocess()
95def run_python_until_end(*args, **env_vars):
96    env_required = interpreter_requires_environment()
97    cwd = env_vars.pop('__cwd', None)
98    if '__isolated' in env_vars:
99        isolated = env_vars.pop('__isolated')
100    else:
101        isolated = not env_vars and not env_required
102    cmd_line = [sys.executable, '-X', 'faulthandler']
103    if isolated:
104        # isolated mode: ignore Python environment variables, ignore user
105        # site-packages, and don't add the current directory to sys.path
106        cmd_line.append('-I')
107    elif not env_vars and not env_required:
108        # ignore Python environment variables
109        cmd_line.append('-E')
110
111    # But a special flag that can be set to override -- in this case, the
112    # caller is responsible to pass the full environment.
113    if env_vars.pop('__cleanenv', None):
114        env = {}
115        if sys.platform == 'win32':
116            # Windows requires at least the SYSTEMROOT environment variable to
117            # start Python.
118            env['SYSTEMROOT'] = os.environ['SYSTEMROOT']
119
120        # Other interesting environment variables, not copied currently:
121        # COMSPEC, HOME, PATH, TEMP, TMPDIR, TMP.
122    else:
123        # Need to preserve the original environment, for in-place testing of
124        # shared library builds.
125        env = os.environ.copy()
126
127    # set TERM='' unless the TERM environment variable is passed explicitly
128    # see issues #11390 and #18300
129    if 'TERM' not in env_vars:
130        env['TERM'] = ''
131
132    env.update(env_vars)
133    cmd_line.extend(args)
134    proc = subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
135                         stdout=subprocess.PIPE, stderr=subprocess.PIPE,
136                         env=env, cwd=cwd)
137    with proc:
138        try:
139            out, err = proc.communicate()
140        finally:
141            proc.kill()
142            subprocess._cleanup()
143    rc = proc.returncode
144    return _PythonRunResult(rc, out, err), cmd_line
145
146
147@support.requires_subprocess()
148def _assert_python(expected_success, /, *args, **env_vars):
149    res, cmd_line = run_python_until_end(*args, **env_vars)
150    if (res.rc and expected_success) or (not res.rc and not expected_success):
151        res.fail(cmd_line)
152    return res
153
154
155def assert_python_ok(*args, **env_vars):
156    """
157    Assert that running the interpreter with `args` and optional environment
158    variables `env_vars` succeeds (rc == 0) and return a (return code, stdout,
159    stderr) tuple.
160
161    If the __cleanenv keyword is set, env_vars is used as a fresh environment.
162
163    Python is started in isolated mode (command line option -I),
164    except if the __isolated keyword is set to False.
165    """
166    return _assert_python(True, *args, **env_vars)
167
168
169def assert_python_failure(*args, **env_vars):
170    """
171    Assert that running the interpreter with `args` and optional environment
172    variables `env_vars` fails (rc != 0) and return a (return code, stdout,
173    stderr) tuple.
174
175    See assert_python_ok() for more options.
176    """
177    return _assert_python(False, *args, **env_vars)
178
179
180@support.requires_subprocess()
181def spawn_python(*args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kw):
182    """Run a Python subprocess with the given arguments.
183
184    kw is extra keyword args to pass to subprocess.Popen. Returns a Popen
185    object.
186    """
187    cmd_line = [sys.executable]
188    if not interpreter_requires_environment():
189        cmd_line.append('-E')
190    cmd_line.extend(args)
191    # Under Fedora (?), GNU readline can output junk on stderr when initialized,
192    # depending on the TERM setting.  Setting TERM=vt100 is supposed to disable
193    # that.  References:
194    # - http://reinout.vanrees.org/weblog/2009/08/14/readline-invisible-character-hack.html
195    # - http://stackoverflow.com/questions/15760712/python-readline-module-prints-escape-character-during-import
196    # - http://lists.gnu.org/archive/html/bug-readline/2007-08/msg00004.html
197    env = kw.setdefault('env', dict(os.environ))
198    env['TERM'] = 'vt100'
199    return subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
200                            stdout=stdout, stderr=stderr,
201                            **kw)
202
203
204def kill_python(p):
205    """Run the given Popen process until completion and return stdout."""
206    p.stdin.close()
207    data = p.stdout.read()
208    p.stdout.close()
209    # try to cleanup the child so we don't appear to leak when running
210    # with regrtest -R.
211    p.wait()
212    subprocess._cleanup()
213    return data
214
215
216def make_script(script_dir, script_basename, source, omit_suffix=False):
217    script_filename = script_basename
218    if not omit_suffix:
219        script_filename += os.extsep + 'py'
220    script_name = os.path.join(script_dir, script_filename)
221    # The script should be encoded to UTF-8, the default string encoding
222    with open(script_name, 'w', encoding='utf-8') as script_file:
223        script_file.write(source)
224    importlib.invalidate_caches()
225    return script_name
226
227
228def make_zip_script(zip_dir, zip_basename, script_name, name_in_zip=None):
229    zip_filename = zip_basename+os.extsep+'zip'
230    zip_name = os.path.join(zip_dir, zip_filename)
231    with zipfile.ZipFile(zip_name, 'w') as zip_file:
232        if name_in_zip is None:
233            parts = script_name.split(os.sep)
234            if len(parts) >= 2 and parts[-2] == '__pycache__':
235                legacy_pyc = make_legacy_pyc(source_from_cache(script_name))
236                name_in_zip = os.path.basename(legacy_pyc)
237                script_name = legacy_pyc
238            else:
239                name_in_zip = os.path.basename(script_name)
240        zip_file.write(script_name, name_in_zip)
241    #if test.support.verbose:
242    #    with zipfile.ZipFile(zip_name, 'r') as zip_file:
243    #        print 'Contents of %r:' % zip_name
244    #        zip_file.printdir()
245    return zip_name, os.path.join(zip_name, name_in_zip)
246
247
248def make_pkg(pkg_dir, init_source=''):
249    os.mkdir(pkg_dir)
250    make_script(pkg_dir, '__init__', init_source)
251
252
253def make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename,
254                 source, depth=1, compiled=False):
255    unlink = []
256    init_name = make_script(zip_dir, '__init__', '')
257    unlink.append(init_name)
258    init_basename = os.path.basename(init_name)
259    script_name = make_script(zip_dir, script_basename, source)
260    unlink.append(script_name)
261    if compiled:
262        init_name = py_compile.compile(init_name, doraise=True)
263        script_name = py_compile.compile(script_name, doraise=True)
264        unlink.extend((init_name, script_name))
265    pkg_names = [os.sep.join([pkg_name]*i) for i in range(1, depth+1)]
266    script_name_in_zip = os.path.join(pkg_names[-1], os.path.basename(script_name))
267    zip_filename = zip_basename+os.extsep+'zip'
268    zip_name = os.path.join(zip_dir, zip_filename)
269    with zipfile.ZipFile(zip_name, 'w') as zip_file:
270        for name in pkg_names:
271            init_name_in_zip = os.path.join(name, init_basename)
272            zip_file.write(init_name, init_name_in_zip)
273        zip_file.write(script_name, script_name_in_zip)
274    for name in unlink:
275        os.unlink(name)
276    #if test.support.verbose:
277    #    with zipfile.ZipFile(zip_name, 'r') as zip_file:
278    #        print 'Contents of %r:' % zip_name
279    #        zip_file.printdir()
280    return zip_name, os.path.join(zip_name, script_name_in_zip)
281
282
283@support.requires_subprocess()
284def run_test_script(script):
285    # use -u to try to get the full output if the test hangs or crash
286    if support.verbose:
287        def title(text):
288            return f"===== {text} ======"
289
290        name = f"script {os.path.basename(script)}"
291        print()
292        print(title(name), flush=True)
293        # In verbose mode, the child process inherit stdout and stdout,
294        # to see output in realtime and reduce the risk of losing output.
295        args = [sys.executable, "-E", "-X", "faulthandler", "-u", script, "-v"]
296        proc = subprocess.run(args)
297        print(title(f"{name} completed: exit code {proc.returncode}"),
298              flush=True)
299        if proc.returncode:
300            raise AssertionError(f"{name} failed")
301    else:
302        assert_python_ok("-u", script, "-v")
303