1import faulthandler
2import functools
3import gc
4import importlib
5import io
6import os
7import sys
8import time
9import traceback
10import unittest
11
12from test import support
13from test.support import os_helper
14from test.support import threading_helper
15from test.libregrtest.cmdline import Namespace
16from test.libregrtest.save_env import saved_test_environment
17from test.libregrtest.utils import clear_caches, format_duration, print_warning
18
19
20class TestResult:
21    def __init__(
22        self,
23        name: str,
24        duration_sec: float = 0.0,
25        xml_data: list[str] | None = None,
26    ) -> None:
27        self.name = name
28        self.duration_sec = duration_sec
29        self.xml_data = xml_data
30
31    def __str__(self) -> str:
32        return f"{self.name} finished"
33
34
35class Passed(TestResult):
36    def __str__(self) -> str:
37        return f"{self.name} passed"
38
39
40class Failed(TestResult):
41    def __init__(
42        self,
43        name: str,
44        duration_sec: float = 0.0,
45        xml_data: list[str] | None = None,
46        errors: list[tuple[str, str]] | None = None,
47        failures: list[tuple[str, str]] | None = None,
48    ) -> None:
49        super().__init__(name, duration_sec=duration_sec, xml_data=xml_data)
50        self.errors = errors
51        self.failures = failures
52
53    def __str__(self) -> str:
54        if self.errors and self.failures:
55            le = len(self.errors)
56            lf = len(self.failures)
57            error_s = "error" + ("s" if le > 1 else "")
58            failure_s = "failure" + ("s" if lf > 1 else "")
59            return f"{self.name} failed ({le} {error_s}, {lf} {failure_s})"
60
61        if self.errors:
62            le = len(self.errors)
63            error_s = "error" + ("s" if le > 1 else "")
64            return f"{self.name} failed ({le} {error_s})"
65
66        if self.failures:
67            lf = len(self.failures)
68            failure_s = "failure" + ("s" if lf > 1 else "")
69            return f"{self.name} failed ({lf} {failure_s})"
70
71        return f"{self.name} failed"
72
73
74class UncaughtException(Failed):
75    def __str__(self) -> str:
76        return f"{self.name} failed (uncaught exception)"
77
78
79class EnvChanged(Failed):
80    def __str__(self) -> str:
81        return f"{self.name} failed (env changed)"
82
83
84class RefLeak(Failed):
85    def __str__(self) -> str:
86        return f"{self.name} failed (reference leak)"
87
88
89class Skipped(TestResult):
90    def __str__(self) -> str:
91        return f"{self.name} skipped"
92
93
94class ResourceDenied(Skipped):
95    def __str__(self) -> str:
96        return f"{self.name} skipped (resource denied)"
97
98
99class Interrupted(TestResult):
100    def __str__(self) -> str:
101        return f"{self.name} interrupted"
102
103
104class ChildError(Failed):
105    def __str__(self) -> str:
106        return f"{self.name} crashed"
107
108
109class DidNotRun(TestResult):
110    def __str__(self) -> str:
111        return f"{self.name} ran no tests"
112
113
114class Timeout(Failed):
115    def __str__(self) -> str:
116        return f"{self.name} timed out ({format_duration(self.duration_sec)})"
117
118
119# Minimum duration of a test to display its duration or to mention that
120# the test is running in background
121PROGRESS_MIN_TIME = 30.0   # seconds
122
123# small set of tests to determine if we have a basically functioning interpreter
124# (i.e. if any of these fail, then anything else is likely to follow)
125STDTESTS = [
126    'test_grammar',
127    'test_opcodes',
128    'test_dict',
129    'test_builtin',
130    'test_exceptions',
131    'test_types',
132    'test_unittest',
133    'test_doctest',
134    'test_doctest2',
135    'test_support'
136]
137
138# set of tests that we don't want to be executed when using regrtest
139NOTTESTS = set()
140
141
142# Storage of uncollectable objects
143FOUND_GARBAGE = []
144
145
146def is_failed(result: TestResult, ns: Namespace) -> bool:
147    if isinstance(result, EnvChanged):
148        return ns.fail_env_changed
149    return isinstance(result, Failed)
150
151
152def findtestdir(path=None):
153    return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
154
155
156def findtests(testdir=None, stdtests=STDTESTS, nottests=NOTTESTS):
157    """Return a list of all applicable test modules."""
158    testdir = findtestdir(testdir)
159    names = os.listdir(testdir)
160    tests = []
161    others = set(stdtests) | nottests
162    for name in names:
163        mod, ext = os.path.splitext(name)
164        if mod[:5] == "test_" and ext in (".py", "") and mod not in others:
165            tests.append(mod)
166    return stdtests + sorted(tests)
167
168
169def get_abs_module(ns: Namespace, test_name: str) -> str:
170    if test_name.startswith('test.') or ns.testdir:
171        return test_name
172    else:
173        # Import it from the test package
174        return 'test.' + test_name
175
176
177def _runtest(ns: Namespace, test_name: str) -> TestResult:
178    # Handle faulthandler timeout, capture stdout+stderr, XML serialization
179    # and measure time.
180
181    output_on_failure = ns.verbose3
182
183    use_timeout = (
184        ns.timeout is not None and threading_helper.can_start_thread
185    )
186    if use_timeout:
187        faulthandler.dump_traceback_later(ns.timeout, exit=True)
188
189    start_time = time.perf_counter()
190    try:
191        support.set_match_tests(ns.match_tests, ns.ignore_tests)
192        support.junit_xml_list = xml_list = [] if ns.xmlpath else None
193        if ns.failfast:
194            support.failfast = True
195
196        if output_on_failure:
197            support.verbose = True
198
199            stream = io.StringIO()
200            orig_stdout = sys.stdout
201            orig_stderr = sys.stderr
202            print_warning = support.print_warning
203            orig_print_warnings_stderr = print_warning.orig_stderr
204
205            output = None
206            try:
207                sys.stdout = stream
208                sys.stderr = stream
209                # print_warning() writes into the temporary stream to preserve
210                # messages order. If support.environment_altered becomes true,
211                # warnings will be written to sys.stderr below.
212                print_warning.orig_stderr = stream
213
214                result = _runtest_inner(ns, test_name,
215                                        display_failure=False)
216                if not isinstance(result, Passed):
217                    output = stream.getvalue()
218            finally:
219                sys.stdout = orig_stdout
220                sys.stderr = orig_stderr
221                print_warning.orig_stderr = orig_print_warnings_stderr
222
223            if output is not None:
224                sys.stderr.write(output)
225                sys.stderr.flush()
226        else:
227            # Tell tests to be moderately quiet
228            support.verbose = ns.verbose
229
230            result = _runtest_inner(ns, test_name,
231                                    display_failure=not ns.verbose)
232
233        if xml_list:
234            import xml.etree.ElementTree as ET
235            result.xml_data = [
236                ET.tostring(x).decode('us-ascii')
237                for x in xml_list
238            ]
239
240        result.duration_sec = time.perf_counter() - start_time
241        return result
242    finally:
243        if use_timeout:
244            faulthandler.cancel_dump_traceback_later()
245        support.junit_xml_list = None
246
247
248def runtest(ns: Namespace, test_name: str) -> TestResult:
249    """Run a single test.
250
251    ns -- regrtest namespace of options
252    test_name -- the name of the test
253
254    Returns a TestResult sub-class depending on the kind of result received.
255
256    If ns.xmlpath is not None, xml_data is a list containing each
257    generated testsuite element.
258    """
259    try:
260        return _runtest(ns, test_name)
261    except:
262        if not ns.pgo:
263            msg = traceback.format_exc()
264            print(f"test {test_name} crashed -- {msg}",
265                  file=sys.stderr, flush=True)
266        return Failed(test_name)
267
268
269def _test_module(the_module):
270    loader = unittest.TestLoader()
271    tests = loader.loadTestsFromModule(the_module)
272    for error in loader.errors:
273        print(error, file=sys.stderr)
274    if loader.errors:
275        raise Exception("errors while loading tests")
276    support.run_unittest(tests)
277
278
279def save_env(ns: Namespace, test_name: str):
280    return saved_test_environment(test_name, ns.verbose, ns.quiet, pgo=ns.pgo)
281
282
283def _runtest_inner2(ns: Namespace, test_name: str) -> bool:
284    # Load the test function, run the test function, handle huntrleaks
285    # to detect leaks.
286
287    abstest = get_abs_module(ns, test_name)
288
289    # remove the module from sys.module to reload it if it was already imported
290    try:
291        del sys.modules[abstest]
292    except KeyError:
293        pass
294
295    the_module = importlib.import_module(abstest)
296
297    if ns.huntrleaks:
298        from test.libregrtest.refleak import dash_R
299
300    # If the test has a test_main, that will run the appropriate
301    # tests.  If not, use normal unittest test loading.
302    test_runner = getattr(the_module, "test_main", None)
303    if test_runner is None:
304        test_runner = functools.partial(_test_module, the_module)
305
306    try:
307        with save_env(ns, test_name):
308            if ns.huntrleaks:
309                # Return True if the test leaked references
310                refleak = dash_R(ns, test_name, test_runner)
311            else:
312                test_runner()
313                refleak = False
314    finally:
315        # First kill any dangling references to open files etc.
316        # This can also issue some ResourceWarnings which would otherwise get
317        # triggered during the following test run, and possibly produce
318        # failures.
319        support.gc_collect()
320
321        cleanup_test_droppings(test_name, ns.verbose)
322
323    if gc.garbage:
324        support.environment_altered = True
325        print_warning(f"{test_name} created {len(gc.garbage)} "
326                      f"uncollectable object(s).")
327
328        # move the uncollectable objects somewhere,
329        # so we don't see them again
330        FOUND_GARBAGE.extend(gc.garbage)
331        gc.garbage.clear()
332
333    support.reap_children()
334
335    return refleak
336
337
338def _runtest_inner(
339    ns: Namespace, test_name: str, display_failure: bool = True
340) -> TestResult:
341    # Detect environment changes, handle exceptions.
342
343    # Reset the environment_altered flag to detect if a test altered
344    # the environment
345    support.environment_altered = False
346
347    if ns.pgo:
348        display_failure = False
349
350    try:
351        clear_caches()
352        support.gc_collect()
353
354        with save_env(ns, test_name):
355            refleak = _runtest_inner2(ns, test_name)
356    except support.ResourceDenied as msg:
357        if not ns.quiet and not ns.pgo:
358            print(f"{test_name} skipped -- {msg}", flush=True)
359        return ResourceDenied(test_name)
360    except unittest.SkipTest as msg:
361        if not ns.quiet and not ns.pgo:
362            print(f"{test_name} skipped -- {msg}", flush=True)
363        return Skipped(test_name)
364    except support.TestFailedWithDetails as exc:
365        msg = f"test {test_name} failed"
366        if display_failure:
367            msg = f"{msg} -- {exc}"
368        print(msg, file=sys.stderr, flush=True)
369        return Failed(test_name, errors=exc.errors, failures=exc.failures)
370    except support.TestFailed as exc:
371        msg = f"test {test_name} failed"
372        if display_failure:
373            msg = f"{msg} -- {exc}"
374        print(msg, file=sys.stderr, flush=True)
375        return Failed(test_name)
376    except support.TestDidNotRun:
377        return DidNotRun(test_name)
378    except KeyboardInterrupt:
379        print()
380        return Interrupted(test_name)
381    except:
382        if not ns.pgo:
383            msg = traceback.format_exc()
384            print(f"test {test_name} crashed -- {msg}",
385                  file=sys.stderr, flush=True)
386        return UncaughtException(test_name)
387
388    if refleak:
389        return RefLeak(test_name)
390    if support.environment_altered:
391        return EnvChanged(test_name)
392    return Passed(test_name)
393
394
395def cleanup_test_droppings(test_name: str, verbose: int) -> None:
396    # Try to clean up junk commonly left behind.  While tests shouldn't leave
397    # any files or directories behind, when a test fails that can be tedious
398    # for it to arrange.  The consequences can be especially nasty on Windows,
399    # since if a test leaves a file open, it cannot be deleted by name (while
400    # there's nothing we can do about that here either, we can display the
401    # name of the offending test, which is a real help).
402    for name in (os_helper.TESTFN,):
403        if not os.path.exists(name):
404            continue
405
406        if os.path.isdir(name):
407            import shutil
408            kind, nuker = "directory", shutil.rmtree
409        elif os.path.isfile(name):
410            kind, nuker = "file", os.unlink
411        else:
412            raise RuntimeError(f"os.path says {name!r} exists but is neither "
413                               f"directory nor file")
414
415        if verbose:
416            print_warning(f"{test_name} left behind {kind} {name!r}")
417            support.environment_altered = True
418
419        try:
420            import stat
421            # fix possible permissions problems that might prevent cleanup
422            os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
423            nuker(name)
424        except Exception as exc:
425            print_warning(f"{test_name} left behind {kind} {name!r} "
426                          f"and it couldn't be removed: {exc}")
427