1import faulthandler 2import functools 3import gc 4import importlib 5import io 6import os 7import sys 8import time 9import traceback 10import unittest 11 12from test import support 13from test.support import os_helper 14from test.support import threading_helper 15from test.libregrtest.cmdline import Namespace 16from test.libregrtest.save_env import saved_test_environment 17from test.libregrtest.utils import clear_caches, format_duration, print_warning 18 19 20class TestResult: 21 def __init__( 22 self, 23 name: str, 24 duration_sec: float = 0.0, 25 xml_data: list[str] | None = None, 26 ) -> None: 27 self.name = name 28 self.duration_sec = duration_sec 29 self.xml_data = xml_data 30 31 def __str__(self) -> str: 32 return f"{self.name} finished" 33 34 35class Passed(TestResult): 36 def __str__(self) -> str: 37 return f"{self.name} passed" 38 39 40class Failed(TestResult): 41 def __init__( 42 self, 43 name: str, 44 duration_sec: float = 0.0, 45 xml_data: list[str] | None = None, 46 errors: list[tuple[str, str]] | None = None, 47 failures: list[tuple[str, str]] | None = None, 48 ) -> None: 49 super().__init__(name, duration_sec=duration_sec, xml_data=xml_data) 50 self.errors = errors 51 self.failures = failures 52 53 def __str__(self) -> str: 54 if self.errors and self.failures: 55 le = len(self.errors) 56 lf = len(self.failures) 57 error_s = "error" + ("s" if le > 1 else "") 58 failure_s = "failure" + ("s" if lf > 1 else "") 59 return f"{self.name} failed ({le} {error_s}, {lf} {failure_s})" 60 61 if self.errors: 62 le = len(self.errors) 63 error_s = "error" + ("s" if le > 1 else "") 64 return f"{self.name} failed ({le} {error_s})" 65 66 if self.failures: 67 lf = len(self.failures) 68 failure_s = "failure" + ("s" if lf > 1 else "") 69 return f"{self.name} failed ({lf} {failure_s})" 70 71 return f"{self.name} failed" 72 73 74class UncaughtException(Failed): 75 def __str__(self) -> str: 76 return f"{self.name} failed (uncaught exception)" 77 78 79class EnvChanged(Failed): 80 def __str__(self) -> str: 81 return f"{self.name} failed (env changed)" 82 83 84class RefLeak(Failed): 85 def __str__(self) -> str: 86 return f"{self.name} failed (reference leak)" 87 88 89class Skipped(TestResult): 90 def __str__(self) -> str: 91 return f"{self.name} skipped" 92 93 94class ResourceDenied(Skipped): 95 def __str__(self) -> str: 96 return f"{self.name} skipped (resource denied)" 97 98 99class Interrupted(TestResult): 100 def __str__(self) -> str: 101 return f"{self.name} interrupted" 102 103 104class ChildError(Failed): 105 def __str__(self) -> str: 106 return f"{self.name} crashed" 107 108 109class DidNotRun(TestResult): 110 def __str__(self) -> str: 111 return f"{self.name} ran no tests" 112 113 114class Timeout(Failed): 115 def __str__(self) -> str: 116 return f"{self.name} timed out ({format_duration(self.duration_sec)})" 117 118 119# Minimum duration of a test to display its duration or to mention that 120# the test is running in background 121PROGRESS_MIN_TIME = 30.0 # seconds 122 123# small set of tests to determine if we have a basically functioning interpreter 124# (i.e. if any of these fail, then anything else is likely to follow) 125STDTESTS = [ 126 'test_grammar', 127 'test_opcodes', 128 'test_dict', 129 'test_builtin', 130 'test_exceptions', 131 'test_types', 132 'test_unittest', 133 'test_doctest', 134 'test_doctest2', 135 'test_support' 136] 137 138# set of tests that we don't want to be executed when using regrtest 139NOTTESTS = set() 140 141 142# Storage of uncollectable objects 143FOUND_GARBAGE = [] 144 145 146def is_failed(result: TestResult, ns: Namespace) -> bool: 147 if isinstance(result, EnvChanged): 148 return ns.fail_env_changed 149 return isinstance(result, Failed) 150 151 152def findtestdir(path=None): 153 return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir 154 155 156def findtests(testdir=None, stdtests=STDTESTS, nottests=NOTTESTS): 157 """Return a list of all applicable test modules.""" 158 testdir = findtestdir(testdir) 159 names = os.listdir(testdir) 160 tests = [] 161 others = set(stdtests) | nottests 162 for name in names: 163 mod, ext = os.path.splitext(name) 164 if mod[:5] == "test_" and ext in (".py", "") and mod not in others: 165 tests.append(mod) 166 return stdtests + sorted(tests) 167 168 169def get_abs_module(ns: Namespace, test_name: str) -> str: 170 if test_name.startswith('test.') or ns.testdir: 171 return test_name 172 else: 173 # Import it from the test package 174 return 'test.' + test_name 175 176 177def _runtest(ns: Namespace, test_name: str) -> TestResult: 178 # Handle faulthandler timeout, capture stdout+stderr, XML serialization 179 # and measure time. 180 181 output_on_failure = ns.verbose3 182 183 use_timeout = ( 184 ns.timeout is not None and threading_helper.can_start_thread 185 ) 186 if use_timeout: 187 faulthandler.dump_traceback_later(ns.timeout, exit=True) 188 189 start_time = time.perf_counter() 190 try: 191 support.set_match_tests(ns.match_tests, ns.ignore_tests) 192 support.junit_xml_list = xml_list = [] if ns.xmlpath else None 193 if ns.failfast: 194 support.failfast = True 195 196 if output_on_failure: 197 support.verbose = True 198 199 stream = io.StringIO() 200 orig_stdout = sys.stdout 201 orig_stderr = sys.stderr 202 print_warning = support.print_warning 203 orig_print_warnings_stderr = print_warning.orig_stderr 204 205 output = None 206 try: 207 sys.stdout = stream 208 sys.stderr = stream 209 # print_warning() writes into the temporary stream to preserve 210 # messages order. If support.environment_altered becomes true, 211 # warnings will be written to sys.stderr below. 212 print_warning.orig_stderr = stream 213 214 result = _runtest_inner(ns, test_name, 215 display_failure=False) 216 if not isinstance(result, Passed): 217 output = stream.getvalue() 218 finally: 219 sys.stdout = orig_stdout 220 sys.stderr = orig_stderr 221 print_warning.orig_stderr = orig_print_warnings_stderr 222 223 if output is not None: 224 sys.stderr.write(output) 225 sys.stderr.flush() 226 else: 227 # Tell tests to be moderately quiet 228 support.verbose = ns.verbose 229 230 result = _runtest_inner(ns, test_name, 231 display_failure=not ns.verbose) 232 233 if xml_list: 234 import xml.etree.ElementTree as ET 235 result.xml_data = [ 236 ET.tostring(x).decode('us-ascii') 237 for x in xml_list 238 ] 239 240 result.duration_sec = time.perf_counter() - start_time 241 return result 242 finally: 243 if use_timeout: 244 faulthandler.cancel_dump_traceback_later() 245 support.junit_xml_list = None 246 247 248def runtest(ns: Namespace, test_name: str) -> TestResult: 249 """Run a single test. 250 251 ns -- regrtest namespace of options 252 test_name -- the name of the test 253 254 Returns a TestResult sub-class depending on the kind of result received. 255 256 If ns.xmlpath is not None, xml_data is a list containing each 257 generated testsuite element. 258 """ 259 try: 260 return _runtest(ns, test_name) 261 except: 262 if not ns.pgo: 263 msg = traceback.format_exc() 264 print(f"test {test_name} crashed -- {msg}", 265 file=sys.stderr, flush=True) 266 return Failed(test_name) 267 268 269def _test_module(the_module): 270 loader = unittest.TestLoader() 271 tests = loader.loadTestsFromModule(the_module) 272 for error in loader.errors: 273 print(error, file=sys.stderr) 274 if loader.errors: 275 raise Exception("errors while loading tests") 276 support.run_unittest(tests) 277 278 279def save_env(ns: Namespace, test_name: str): 280 return saved_test_environment(test_name, ns.verbose, ns.quiet, pgo=ns.pgo) 281 282 283def _runtest_inner2(ns: Namespace, test_name: str) -> bool: 284 # Load the test function, run the test function, handle huntrleaks 285 # to detect leaks. 286 287 abstest = get_abs_module(ns, test_name) 288 289 # remove the module from sys.module to reload it if it was already imported 290 try: 291 del sys.modules[abstest] 292 except KeyError: 293 pass 294 295 the_module = importlib.import_module(abstest) 296 297 if ns.huntrleaks: 298 from test.libregrtest.refleak import dash_R 299 300 # If the test has a test_main, that will run the appropriate 301 # tests. If not, use normal unittest test loading. 302 test_runner = getattr(the_module, "test_main", None) 303 if test_runner is None: 304 test_runner = functools.partial(_test_module, the_module) 305 306 try: 307 with save_env(ns, test_name): 308 if ns.huntrleaks: 309 # Return True if the test leaked references 310 refleak = dash_R(ns, test_name, test_runner) 311 else: 312 test_runner() 313 refleak = False 314 finally: 315 # First kill any dangling references to open files etc. 316 # This can also issue some ResourceWarnings which would otherwise get 317 # triggered during the following test run, and possibly produce 318 # failures. 319 support.gc_collect() 320 321 cleanup_test_droppings(test_name, ns.verbose) 322 323 if gc.garbage: 324 support.environment_altered = True 325 print_warning(f"{test_name} created {len(gc.garbage)} " 326 f"uncollectable object(s).") 327 328 # move the uncollectable objects somewhere, 329 # so we don't see them again 330 FOUND_GARBAGE.extend(gc.garbage) 331 gc.garbage.clear() 332 333 support.reap_children() 334 335 return refleak 336 337 338def _runtest_inner( 339 ns: Namespace, test_name: str, display_failure: bool = True 340) -> TestResult: 341 # Detect environment changes, handle exceptions. 342 343 # Reset the environment_altered flag to detect if a test altered 344 # the environment 345 support.environment_altered = False 346 347 if ns.pgo: 348 display_failure = False 349 350 try: 351 clear_caches() 352 support.gc_collect() 353 354 with save_env(ns, test_name): 355 refleak = _runtest_inner2(ns, test_name) 356 except support.ResourceDenied as msg: 357 if not ns.quiet and not ns.pgo: 358 print(f"{test_name} skipped -- {msg}", flush=True) 359 return ResourceDenied(test_name) 360 except unittest.SkipTest as msg: 361 if not ns.quiet and not ns.pgo: 362 print(f"{test_name} skipped -- {msg}", flush=True) 363 return Skipped(test_name) 364 except support.TestFailedWithDetails as exc: 365 msg = f"test {test_name} failed" 366 if display_failure: 367 msg = f"{msg} -- {exc}" 368 print(msg, file=sys.stderr, flush=True) 369 return Failed(test_name, errors=exc.errors, failures=exc.failures) 370 except support.TestFailed as exc: 371 msg = f"test {test_name} failed" 372 if display_failure: 373 msg = f"{msg} -- {exc}" 374 print(msg, file=sys.stderr, flush=True) 375 return Failed(test_name) 376 except support.TestDidNotRun: 377 return DidNotRun(test_name) 378 except KeyboardInterrupt: 379 print() 380 return Interrupted(test_name) 381 except: 382 if not ns.pgo: 383 msg = traceback.format_exc() 384 print(f"test {test_name} crashed -- {msg}", 385 file=sys.stderr, flush=True) 386 return UncaughtException(test_name) 387 388 if refleak: 389 return RefLeak(test_name) 390 if support.environment_altered: 391 return EnvChanged(test_name) 392 return Passed(test_name) 393 394 395def cleanup_test_droppings(test_name: str, verbose: int) -> None: 396 # Try to clean up junk commonly left behind. While tests shouldn't leave 397 # any files or directories behind, when a test fails that can be tedious 398 # for it to arrange. The consequences can be especially nasty on Windows, 399 # since if a test leaves a file open, it cannot be deleted by name (while 400 # there's nothing we can do about that here either, we can display the 401 # name of the offending test, which is a real help). 402 for name in (os_helper.TESTFN,): 403 if not os.path.exists(name): 404 continue 405 406 if os.path.isdir(name): 407 import shutil 408 kind, nuker = "directory", shutil.rmtree 409 elif os.path.isfile(name): 410 kind, nuker = "file", os.unlink 411 else: 412 raise RuntimeError(f"os.path says {name!r} exists but is neither " 413 f"directory nor file") 414 415 if verbose: 416 print_warning(f"{test_name} left behind {kind} {name!r}") 417 support.environment_altered = True 418 419 try: 420 import stat 421 # fix possible permissions problems that might prevent cleanup 422 os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) 423 nuker(name) 424 except Exception as exc: 425 print_warning(f"{test_name} left behind {kind} {name!r} " 426 f"and it couldn't be removed: {exc}") 427