1import faulthandler 2import json 3import os 4import queue 5import signal 6import subprocess 7import sys 8import tempfile 9import threading 10import time 11import traceback 12from typing import NamedTuple, NoReturn, Literal, Any, TextIO 13 14from test import support 15from test.support import os_helper 16 17from test.libregrtest.cmdline import Namespace 18from test.libregrtest.main import Regrtest 19from test.libregrtest.runtest import ( 20 runtest, is_failed, TestResult, Interrupted, Timeout, ChildError, PROGRESS_MIN_TIME) 21from test.libregrtest.setup import setup_tests 22from test.libregrtest.utils import format_duration, print_warning 23 24if sys.platform == 'win32': 25 import locale 26 27 28# Display the running tests if nothing happened last N seconds 29PROGRESS_UPDATE = 30.0 # seconds 30assert PROGRESS_UPDATE >= PROGRESS_MIN_TIME 31 32# Kill the main process after 5 minutes. It is supposed to write an update 33# every PROGRESS_UPDATE seconds. Tolerate 5 minutes for Python slowest 34# buildbot workers. 35MAIN_PROCESS_TIMEOUT = 5 * 60.0 36assert MAIN_PROCESS_TIMEOUT >= PROGRESS_UPDATE 37 38# Time to wait until a worker completes: should be immediate 39JOIN_TIMEOUT = 30.0 # seconds 40 41USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg")) 42 43 44def must_stop(result: TestResult, ns: Namespace) -> bool: 45 if isinstance(result, Interrupted): 46 return True 47 if ns.failfast and is_failed(result, ns): 48 return True 49 return False 50 51 52def parse_worker_args(worker_args) -> tuple[Namespace, str]: 53 ns_dict, test_name = json.loads(worker_args) 54 ns = Namespace(**ns_dict) 55 return (ns, test_name) 56 57 58def run_test_in_subprocess(testname: str, ns: Namespace, stdout_fh: TextIO) -> subprocess.Popen: 59 ns_dict = vars(ns) 60 worker_args = (ns_dict, testname) 61 worker_args = json.dumps(worker_args) 62 if ns.python is not None: 63 executable = ns.python 64 else: 65 executable = [sys.executable] 66 cmd = [*executable, *support.args_from_interpreter_flags(), 67 '-u', # Unbuffered stdout and stderr 68 '-m', 'test.regrtest', 69 '--worker-args', worker_args] 70 71 # Running the child from the same working directory as regrtest's original 72 # invocation ensures that TEMPDIR for the child is the same when 73 # sysconfig.is_python_build() is true. See issue 15300. 74 kw = dict( 75 stdout=stdout_fh, 76 # bpo-45410: Write stderr into stdout to keep messages order 77 stderr=stdout_fh, 78 text=True, 79 close_fds=(os.name != 'nt'), 80 cwd=os_helper.SAVEDCWD, 81 ) 82 if USE_PROCESS_GROUP: 83 kw['start_new_session'] = True 84 return subprocess.Popen(cmd, **kw) 85 86 87def run_tests_worker(ns: Namespace, test_name: str) -> NoReturn: 88 setup_tests(ns) 89 90 result = runtest(ns, test_name) 91 92 print() # Force a newline (just in case) 93 94 # Serialize TestResult as dict in JSON 95 print(json.dumps(result, cls=EncodeTestResult), flush=True) 96 sys.exit(0) 97 98 99# We do not use a generator so multiple threads can call next(). 100class MultiprocessIterator: 101 102 """A thread-safe iterator over tests for multiprocess mode.""" 103 104 def __init__(self, tests_iter): 105 self.lock = threading.Lock() 106 self.tests_iter = tests_iter 107 108 def __iter__(self): 109 return self 110 111 def __next__(self): 112 with self.lock: 113 if self.tests_iter is None: 114 raise StopIteration 115 return next(self.tests_iter) 116 117 def stop(self): 118 with self.lock: 119 self.tests_iter = None 120 121 122class MultiprocessResult(NamedTuple): 123 result: TestResult 124 # bpo-45410: stderr is written into stdout to keep messages order 125 stdout: str 126 error_msg: str 127 128 129ExcStr = str 130QueueOutput = tuple[Literal[False], MultiprocessResult] | tuple[Literal[True], ExcStr] 131 132 133class ExitThread(Exception): 134 pass 135 136 137class TestWorkerProcess(threading.Thread): 138 def __init__(self, worker_id: int, runner: "MultiprocessTestRunner") -> None: 139 super().__init__() 140 self.worker_id = worker_id 141 self.pending = runner.pending 142 self.output = runner.output 143 self.ns = runner.ns 144 self.timeout = runner.worker_timeout 145 self.regrtest = runner.regrtest 146 self.current_test_name = None 147 self.start_time = None 148 self._popen = None 149 self._killed = False 150 self._stopped = False 151 152 def __repr__(self) -> str: 153 info = [f'TestWorkerProcess #{self.worker_id}'] 154 if self.is_alive(): 155 info.append("running") 156 else: 157 info.append('stopped') 158 test = self.current_test_name 159 if test: 160 info.append(f'test={test}') 161 popen = self._popen 162 if popen is not None: 163 dt = time.monotonic() - self.start_time 164 info.extend((f'pid={self._popen.pid}', 165 f'time={format_duration(dt)}')) 166 return '<%s>' % ' '.join(info) 167 168 def _kill(self) -> None: 169 popen = self._popen 170 if popen is None: 171 return 172 173 if self._killed: 174 return 175 self._killed = True 176 177 if USE_PROCESS_GROUP: 178 what = f"{self} process group" 179 else: 180 what = f"{self}" 181 182 print(f"Kill {what}", file=sys.stderr, flush=True) 183 try: 184 if USE_PROCESS_GROUP: 185 os.killpg(popen.pid, signal.SIGKILL) 186 else: 187 popen.kill() 188 except ProcessLookupError: 189 # popen.kill(): the process completed, the TestWorkerProcess thread 190 # read its exit status, but Popen.send_signal() read the returncode 191 # just before Popen.wait() set returncode. 192 pass 193 except OSError as exc: 194 print_warning(f"Failed to kill {what}: {exc!r}") 195 196 def stop(self) -> None: 197 # Method called from a different thread to stop this thread 198 self._stopped = True 199 self._kill() 200 201 def mp_result_error( 202 self, 203 test_result: TestResult, 204 stdout: str = '', 205 err_msg=None 206 ) -> MultiprocessResult: 207 test_result.duration_sec = time.monotonic() - self.start_time 208 return MultiprocessResult(test_result, stdout, err_msg) 209 210 def _run_process(self, test_name: str, stdout_fh: TextIO) -> int: 211 self.start_time = time.monotonic() 212 213 self.current_test_name = test_name 214 try: 215 popen = run_test_in_subprocess(test_name, self.ns, stdout_fh) 216 217 self._killed = False 218 self._popen = popen 219 except: 220 self.current_test_name = None 221 raise 222 223 try: 224 if self._stopped: 225 # If kill() has been called before self._popen is set, 226 # self._popen is still running. Call again kill() 227 # to ensure that the process is killed. 228 self._kill() 229 raise ExitThread 230 231 try: 232 # gh-94026: stdout+stderr are written to tempfile 233 retcode = popen.wait(timeout=self.timeout) 234 assert retcode is not None 235 return retcode 236 except subprocess.TimeoutExpired: 237 if self._stopped: 238 # kill() has been called: communicate() fails on reading 239 # closed stdout 240 raise ExitThread 241 242 # On timeout, kill the process 243 self._kill() 244 245 # None means TIMEOUT for the caller 246 retcode = None 247 # bpo-38207: Don't attempt to call communicate() again: on it 248 # can hang until all child processes using stdout 249 # pipes completes. 250 except OSError: 251 if self._stopped: 252 # kill() has been called: communicate() fails 253 # on reading closed stdout 254 raise ExitThread 255 raise 256 except: 257 self._kill() 258 raise 259 finally: 260 self._wait_completed() 261 self._popen = None 262 self.current_test_name = None 263 264 def _runtest(self, test_name: str) -> MultiprocessResult: 265 if sys.platform == 'win32': 266 # gh-95027: When stdout is not a TTY, Python uses the ANSI code 267 # page for the sys.stdout encoding. If the main process runs in a 268 # terminal, sys.stdout uses WindowsConsoleIO with UTF-8 encoding. 269 encoding = locale.getencoding() 270 else: 271 encoding = sys.stdout.encoding 272 # gh-94026: Write stdout+stderr to a tempfile as workaround for 273 # non-blocking pipes on Emscripten with NodeJS. 274 with tempfile.TemporaryFile('w+', encoding=encoding) as stdout_fh: 275 # gh-93353: Check for leaked temporary files in the parent process, 276 # since the deletion of temporary files can happen late during 277 # Python finalization: too late for libregrtest. 278 retcode = self._run_process(test_name, stdout_fh) 279 stdout_fh.seek(0) 280 stdout = stdout_fh.read().strip() 281 282 if retcode is None: 283 return self.mp_result_error(Timeout(test_name), stdout) 284 285 err_msg = None 286 if retcode != 0: 287 err_msg = "Exit code %s" % retcode 288 else: 289 stdout, _, result = stdout.rpartition("\n") 290 stdout = stdout.rstrip() 291 if not result: 292 err_msg = "Failed to parse worker stdout" 293 else: 294 try: 295 # deserialize run_tests_worker() output 296 result = json.loads(result, object_hook=decode_test_result) 297 except Exception as exc: 298 err_msg = "Failed to parse worker JSON: %s" % exc 299 300 if err_msg is not None: 301 return self.mp_result_error(ChildError(test_name), stdout, err_msg) 302 303 return MultiprocessResult(result, stdout, err_msg) 304 305 def run(self) -> None: 306 while not self._stopped: 307 try: 308 try: 309 test_name = next(self.pending) 310 except StopIteration: 311 break 312 313 mp_result = self._runtest(test_name) 314 self.output.put((False, mp_result)) 315 316 if must_stop(mp_result.result, self.ns): 317 break 318 except ExitThread: 319 break 320 except BaseException: 321 self.output.put((True, traceback.format_exc())) 322 break 323 324 def _wait_completed(self) -> None: 325 popen = self._popen 326 327 try: 328 popen.wait(JOIN_TIMEOUT) 329 except (subprocess.TimeoutExpired, OSError) as exc: 330 print_warning(f"Failed to wait for {self} completion " 331 f"(timeout={format_duration(JOIN_TIMEOUT)}): " 332 f"{exc!r}") 333 334 def wait_stopped(self, start_time: float) -> None: 335 # bpo-38207: MultiprocessTestRunner.stop_workers() called self.stop() 336 # which killed the process. Sometimes, killing the process from the 337 # main thread does not interrupt popen.communicate() in 338 # TestWorkerProcess thread. This loop with a timeout is a workaround 339 # for that. 340 # 341 # Moreover, if this method fails to join the thread, it is likely 342 # that Python will hang at exit while calling threading._shutdown() 343 # which tries again to join the blocked thread. Regrtest.main() 344 # uses EXIT_TIMEOUT to workaround this second bug. 345 while True: 346 # Write a message every second 347 self.join(1.0) 348 if not self.is_alive(): 349 break 350 dt = time.monotonic() - start_time 351 self.regrtest.log(f"Waiting for {self} thread " 352 f"for {format_duration(dt)}") 353 if dt > JOIN_TIMEOUT: 354 print_warning(f"Failed to join {self} in {format_duration(dt)}") 355 break 356 357 358def get_running(workers: list[TestWorkerProcess]) -> list[TestWorkerProcess]: 359 running = [] 360 for worker in workers: 361 current_test_name = worker.current_test_name 362 if not current_test_name: 363 continue 364 dt = time.monotonic() - worker.start_time 365 if dt >= PROGRESS_MIN_TIME: 366 text = '%s (%s)' % (current_test_name, format_duration(dt)) 367 running.append(text) 368 return running 369 370 371class MultiprocessTestRunner: 372 def __init__(self, regrtest: Regrtest) -> None: 373 self.regrtest = regrtest 374 self.log = self.regrtest.log 375 self.ns = regrtest.ns 376 self.output: queue.Queue[QueueOutput] = queue.Queue() 377 self.pending = MultiprocessIterator(self.regrtest.tests) 378 if self.ns.timeout is not None: 379 # Rely on faulthandler to kill a worker process. This timouet is 380 # when faulthandler fails to kill a worker process. Give a maximum 381 # of 5 minutes to faulthandler to kill the worker. 382 self.worker_timeout = min(self.ns.timeout * 1.5, 383 self.ns.timeout + 5 * 60) 384 else: 385 self.worker_timeout = None 386 self.workers = None 387 388 def start_workers(self) -> None: 389 self.workers = [TestWorkerProcess(index, self) 390 for index in range(1, self.ns.use_mp + 1)] 391 msg = f"Run tests in parallel using {len(self.workers)} child processes" 392 if self.ns.timeout: 393 msg += (" (timeout: %s, worker timeout: %s)" 394 % (format_duration(self.ns.timeout), 395 format_duration(self.worker_timeout))) 396 self.log(msg) 397 for worker in self.workers: 398 worker.start() 399 400 def stop_workers(self) -> None: 401 start_time = time.monotonic() 402 for worker in self.workers: 403 worker.stop() 404 for worker in self.workers: 405 worker.wait_stopped(start_time) 406 407 def _get_result(self) -> QueueOutput | None: 408 use_faulthandler = (self.ns.timeout is not None) 409 timeout = PROGRESS_UPDATE 410 411 # bpo-46205: check the status of workers every iteration to avoid 412 # waiting forever on an empty queue. 413 while any(worker.is_alive() for worker in self.workers): 414 if use_faulthandler: 415 faulthandler.dump_traceback_later(MAIN_PROCESS_TIMEOUT, 416 exit=True) 417 418 # wait for a thread 419 try: 420 return self.output.get(timeout=timeout) 421 except queue.Empty: 422 pass 423 424 # display progress 425 running = get_running(self.workers) 426 if running and not self.ns.pgo: 427 self.log('running: %s' % ', '.join(running)) 428 429 # all worker threads are done: consume pending results 430 try: 431 return self.output.get(timeout=0) 432 except queue.Empty: 433 return None 434 435 def display_result(self, mp_result: MultiprocessResult) -> None: 436 result = mp_result.result 437 438 text = str(result) 439 if mp_result.error_msg is not None: 440 # CHILD_ERROR 441 text += ' (%s)' % mp_result.error_msg 442 elif (result.duration_sec >= PROGRESS_MIN_TIME and not self.ns.pgo): 443 text += ' (%s)' % format_duration(result.duration_sec) 444 running = get_running(self.workers) 445 if running and not self.ns.pgo: 446 text += ' -- running: %s' % ', '.join(running) 447 self.regrtest.display_progress(self.test_index, text) 448 449 def _process_result(self, item: QueueOutput) -> bool: 450 """Returns True if test runner must stop.""" 451 if item[0]: 452 # Thread got an exception 453 format_exc = item[1] 454 print_warning(f"regrtest worker thread failed: {format_exc}") 455 return True 456 457 self.test_index += 1 458 mp_result = item[1] 459 self.regrtest.accumulate_result(mp_result.result) 460 self.display_result(mp_result) 461 462 if mp_result.stdout: 463 print(mp_result.stdout, flush=True) 464 465 if must_stop(mp_result.result, self.ns): 466 return True 467 468 return False 469 470 def run_tests(self) -> None: 471 self.start_workers() 472 473 self.test_index = 0 474 try: 475 while True: 476 item = self._get_result() 477 if item is None: 478 break 479 480 stop = self._process_result(item) 481 if stop: 482 break 483 except KeyboardInterrupt: 484 print() 485 self.regrtest.interrupted = True 486 finally: 487 if self.ns.timeout is not None: 488 faulthandler.cancel_dump_traceback_later() 489 490 # Always ensure that all worker processes are no longer 491 # worker when we exit this function 492 self.pending.stop() 493 self.stop_workers() 494 495 496def run_tests_multiprocess(regrtest: Regrtest) -> None: 497 MultiprocessTestRunner(regrtest).run_tests() 498 499 500class EncodeTestResult(json.JSONEncoder): 501 """Encode a TestResult (sub)class object into a JSON dict.""" 502 503 def default(self, o: Any) -> dict[str, Any]: 504 if isinstance(o, TestResult): 505 result = vars(o) 506 result["__test_result__"] = o.__class__.__name__ 507 return result 508 509 return super().default(o) 510 511 512def decode_test_result(d: dict[str, Any]) -> TestResult | dict[str, Any]: 513 """Decode a TestResult (sub)class object from a JSON dict.""" 514 515 if "__test_result__" not in d: 516 return d 517 518 cls_name = d.pop("__test_result__") 519 for cls in get_all_test_result_classes(): 520 if cls.__name__ == cls_name: 521 return cls(**d) 522 523 524def get_all_test_result_classes() -> set[type[TestResult]]: 525 prev_count = 0 526 classes = {TestResult} 527 while len(classes) > prev_count: 528 prev_count = len(classes) 529 to_add = [] 530 for cls in classes: 531 to_add.extend(cls.__subclasses__()) 532 classes.update(to_add) 533 return classes 534