1import faulthandler
2import json
3import os
4import queue
5import signal
6import subprocess
7import sys
8import tempfile
9import threading
10import time
11import traceback
12from typing import NamedTuple, NoReturn, Literal, Any, TextIO
13
14from test import support
15from test.support import os_helper
16
17from test.libregrtest.cmdline import Namespace
18from test.libregrtest.main import Regrtest
19from test.libregrtest.runtest import (
20    runtest, is_failed, TestResult, Interrupted, Timeout, ChildError, PROGRESS_MIN_TIME)
21from test.libregrtest.setup import setup_tests
22from test.libregrtest.utils import format_duration, print_warning
23
24if sys.platform == 'win32':
25    import locale
26
27
28# Display the running tests if nothing happened last N seconds
29PROGRESS_UPDATE = 30.0   # seconds
30assert PROGRESS_UPDATE >= PROGRESS_MIN_TIME
31
32# Kill the main process after 5 minutes. It is supposed to write an update
33# every PROGRESS_UPDATE seconds. Tolerate 5 minutes for Python slowest
34# buildbot workers.
35MAIN_PROCESS_TIMEOUT = 5 * 60.0
36assert MAIN_PROCESS_TIMEOUT >= PROGRESS_UPDATE
37
38# Time to wait until a worker completes: should be immediate
39JOIN_TIMEOUT = 30.0   # seconds
40
41USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg"))
42
43
44def must_stop(result: TestResult, ns: Namespace) -> bool:
45    if isinstance(result, Interrupted):
46        return True
47    if ns.failfast and is_failed(result, ns):
48        return True
49    return False
50
51
52def parse_worker_args(worker_args) -> tuple[Namespace, str]:
53    ns_dict, test_name = json.loads(worker_args)
54    ns = Namespace(**ns_dict)
55    return (ns, test_name)
56
57
58def run_test_in_subprocess(testname: str, ns: Namespace, stdout_fh: TextIO) -> subprocess.Popen:
59    ns_dict = vars(ns)
60    worker_args = (ns_dict, testname)
61    worker_args = json.dumps(worker_args)
62    if ns.python is not None:
63        executable = ns.python
64    else:
65        executable = [sys.executable]
66    cmd = [*executable, *support.args_from_interpreter_flags(),
67           '-u',    # Unbuffered stdout and stderr
68           '-m', 'test.regrtest',
69           '--worker-args', worker_args]
70
71    # Running the child from the same working directory as regrtest's original
72    # invocation ensures that TEMPDIR for the child is the same when
73    # sysconfig.is_python_build() is true. See issue 15300.
74    kw = dict(
75        stdout=stdout_fh,
76        # bpo-45410: Write stderr into stdout to keep messages order
77        stderr=stdout_fh,
78        text=True,
79        close_fds=(os.name != 'nt'),
80        cwd=os_helper.SAVEDCWD,
81    )
82    if USE_PROCESS_GROUP:
83        kw['start_new_session'] = True
84    return subprocess.Popen(cmd, **kw)
85
86
87def run_tests_worker(ns: Namespace, test_name: str) -> NoReturn:
88    setup_tests(ns)
89
90    result = runtest(ns, test_name)
91
92    print()   # Force a newline (just in case)
93
94    # Serialize TestResult as dict in JSON
95    print(json.dumps(result, cls=EncodeTestResult), flush=True)
96    sys.exit(0)
97
98
99# We do not use a generator so multiple threads can call next().
100class MultiprocessIterator:
101
102    """A thread-safe iterator over tests for multiprocess mode."""
103
104    def __init__(self, tests_iter):
105        self.lock = threading.Lock()
106        self.tests_iter = tests_iter
107
108    def __iter__(self):
109        return self
110
111    def __next__(self):
112        with self.lock:
113            if self.tests_iter is None:
114                raise StopIteration
115            return next(self.tests_iter)
116
117    def stop(self):
118        with self.lock:
119            self.tests_iter = None
120
121
122class MultiprocessResult(NamedTuple):
123    result: TestResult
124    # bpo-45410: stderr is written into stdout to keep messages order
125    stdout: str
126    error_msg: str
127
128
129ExcStr = str
130QueueOutput = tuple[Literal[False], MultiprocessResult] | tuple[Literal[True], ExcStr]
131
132
133class ExitThread(Exception):
134    pass
135
136
137class TestWorkerProcess(threading.Thread):
138    def __init__(self, worker_id: int, runner: "MultiprocessTestRunner") -> None:
139        super().__init__()
140        self.worker_id = worker_id
141        self.pending = runner.pending
142        self.output = runner.output
143        self.ns = runner.ns
144        self.timeout = runner.worker_timeout
145        self.regrtest = runner.regrtest
146        self.current_test_name = None
147        self.start_time = None
148        self._popen = None
149        self._killed = False
150        self._stopped = False
151
152    def __repr__(self) -> str:
153        info = [f'TestWorkerProcess #{self.worker_id}']
154        if self.is_alive():
155            info.append("running")
156        else:
157            info.append('stopped')
158        test = self.current_test_name
159        if test:
160            info.append(f'test={test}')
161        popen = self._popen
162        if popen is not None:
163            dt = time.monotonic() - self.start_time
164            info.extend((f'pid={self._popen.pid}',
165                         f'time={format_duration(dt)}'))
166        return '<%s>' % ' '.join(info)
167
168    def _kill(self) -> None:
169        popen = self._popen
170        if popen is None:
171            return
172
173        if self._killed:
174            return
175        self._killed = True
176
177        if USE_PROCESS_GROUP:
178            what = f"{self} process group"
179        else:
180            what = f"{self}"
181
182        print(f"Kill {what}", file=sys.stderr, flush=True)
183        try:
184            if USE_PROCESS_GROUP:
185                os.killpg(popen.pid, signal.SIGKILL)
186            else:
187                popen.kill()
188        except ProcessLookupError:
189            # popen.kill(): the process completed, the TestWorkerProcess thread
190            # read its exit status, but Popen.send_signal() read the returncode
191            # just before Popen.wait() set returncode.
192            pass
193        except OSError as exc:
194            print_warning(f"Failed to kill {what}: {exc!r}")
195
196    def stop(self) -> None:
197        # Method called from a different thread to stop this thread
198        self._stopped = True
199        self._kill()
200
201    def mp_result_error(
202        self,
203        test_result: TestResult,
204        stdout: str = '',
205        err_msg=None
206    ) -> MultiprocessResult:
207        test_result.duration_sec = time.monotonic() - self.start_time
208        return MultiprocessResult(test_result, stdout, err_msg)
209
210    def _run_process(self, test_name: str, stdout_fh: TextIO) -> int:
211        self.start_time = time.monotonic()
212
213        self.current_test_name = test_name
214        try:
215            popen = run_test_in_subprocess(test_name, self.ns, stdout_fh)
216
217            self._killed = False
218            self._popen = popen
219        except:
220            self.current_test_name = None
221            raise
222
223        try:
224            if self._stopped:
225                # If kill() has been called before self._popen is set,
226                # self._popen is still running. Call again kill()
227                # to ensure that the process is killed.
228                self._kill()
229                raise ExitThread
230
231            try:
232                # gh-94026: stdout+stderr are written to tempfile
233                retcode = popen.wait(timeout=self.timeout)
234                assert retcode is not None
235                return retcode
236            except subprocess.TimeoutExpired:
237                if self._stopped:
238                    # kill() has been called: communicate() fails on reading
239                    # closed stdout
240                    raise ExitThread
241
242                # On timeout, kill the process
243                self._kill()
244
245                # None means TIMEOUT for the caller
246                retcode = None
247                # bpo-38207: Don't attempt to call communicate() again: on it
248                # can hang until all child processes using stdout
249                # pipes completes.
250            except OSError:
251                if self._stopped:
252                    # kill() has been called: communicate() fails
253                    # on reading closed stdout
254                    raise ExitThread
255                raise
256        except:
257            self._kill()
258            raise
259        finally:
260            self._wait_completed()
261            self._popen = None
262            self.current_test_name = None
263
264    def _runtest(self, test_name: str) -> MultiprocessResult:
265        if sys.platform == 'win32':
266            # gh-95027: When stdout is not a TTY, Python uses the ANSI code
267            # page for the sys.stdout encoding. If the main process runs in a
268            # terminal, sys.stdout uses WindowsConsoleIO with UTF-8 encoding.
269            encoding = locale.getencoding()
270        else:
271            encoding = sys.stdout.encoding
272        # gh-94026: Write stdout+stderr to a tempfile as workaround for
273        # non-blocking pipes on Emscripten with NodeJS.
274        with tempfile.TemporaryFile('w+', encoding=encoding) as stdout_fh:
275            # gh-93353: Check for leaked temporary files in the parent process,
276            # since the deletion of temporary files can happen late during
277            # Python finalization: too late for libregrtest.
278            retcode = self._run_process(test_name, stdout_fh)
279            stdout_fh.seek(0)
280            stdout = stdout_fh.read().strip()
281
282        if retcode is None:
283            return self.mp_result_error(Timeout(test_name), stdout)
284
285        err_msg = None
286        if retcode != 0:
287            err_msg = "Exit code %s" % retcode
288        else:
289            stdout, _, result = stdout.rpartition("\n")
290            stdout = stdout.rstrip()
291            if not result:
292                err_msg = "Failed to parse worker stdout"
293            else:
294                try:
295                    # deserialize run_tests_worker() output
296                    result = json.loads(result, object_hook=decode_test_result)
297                except Exception as exc:
298                    err_msg = "Failed to parse worker JSON: %s" % exc
299
300        if err_msg is not None:
301            return self.mp_result_error(ChildError(test_name), stdout, err_msg)
302
303        return MultiprocessResult(result, stdout, err_msg)
304
305    def run(self) -> None:
306        while not self._stopped:
307            try:
308                try:
309                    test_name = next(self.pending)
310                except StopIteration:
311                    break
312
313                mp_result = self._runtest(test_name)
314                self.output.put((False, mp_result))
315
316                if must_stop(mp_result.result, self.ns):
317                    break
318            except ExitThread:
319                break
320            except BaseException:
321                self.output.put((True, traceback.format_exc()))
322                break
323
324    def _wait_completed(self) -> None:
325        popen = self._popen
326
327        try:
328            popen.wait(JOIN_TIMEOUT)
329        except (subprocess.TimeoutExpired, OSError) as exc:
330            print_warning(f"Failed to wait for {self} completion "
331                          f"(timeout={format_duration(JOIN_TIMEOUT)}): "
332                          f"{exc!r}")
333
334    def wait_stopped(self, start_time: float) -> None:
335        # bpo-38207: MultiprocessTestRunner.stop_workers() called self.stop()
336        # which killed the process. Sometimes, killing the process from the
337        # main thread does not interrupt popen.communicate() in
338        # TestWorkerProcess thread. This loop with a timeout is a workaround
339        # for that.
340        #
341        # Moreover, if this method fails to join the thread, it is likely
342        # that Python will hang at exit while calling threading._shutdown()
343        # which tries again to join the blocked thread. Regrtest.main()
344        # uses EXIT_TIMEOUT to workaround this second bug.
345        while True:
346            # Write a message every second
347            self.join(1.0)
348            if not self.is_alive():
349                break
350            dt = time.monotonic() - start_time
351            self.regrtest.log(f"Waiting for {self} thread "
352                              f"for {format_duration(dt)}")
353            if dt > JOIN_TIMEOUT:
354                print_warning(f"Failed to join {self} in {format_duration(dt)}")
355                break
356
357
358def get_running(workers: list[TestWorkerProcess]) -> list[TestWorkerProcess]:
359    running = []
360    for worker in workers:
361        current_test_name = worker.current_test_name
362        if not current_test_name:
363            continue
364        dt = time.monotonic() - worker.start_time
365        if dt >= PROGRESS_MIN_TIME:
366            text = '%s (%s)' % (current_test_name, format_duration(dt))
367            running.append(text)
368    return running
369
370
371class MultiprocessTestRunner:
372    def __init__(self, regrtest: Regrtest) -> None:
373        self.regrtest = regrtest
374        self.log = self.regrtest.log
375        self.ns = regrtest.ns
376        self.output: queue.Queue[QueueOutput] = queue.Queue()
377        self.pending = MultiprocessIterator(self.regrtest.tests)
378        if self.ns.timeout is not None:
379            # Rely on faulthandler to kill a worker process. This timouet is
380            # when faulthandler fails to kill a worker process. Give a maximum
381            # of 5 minutes to faulthandler to kill the worker.
382            self.worker_timeout = min(self.ns.timeout * 1.5,
383                                      self.ns.timeout + 5 * 60)
384        else:
385            self.worker_timeout = None
386        self.workers = None
387
388    def start_workers(self) -> None:
389        self.workers = [TestWorkerProcess(index, self)
390                        for index in range(1, self.ns.use_mp + 1)]
391        msg = f"Run tests in parallel using {len(self.workers)} child processes"
392        if self.ns.timeout:
393            msg += (" (timeout: %s, worker timeout: %s)"
394                    % (format_duration(self.ns.timeout),
395                       format_duration(self.worker_timeout)))
396        self.log(msg)
397        for worker in self.workers:
398            worker.start()
399
400    def stop_workers(self) -> None:
401        start_time = time.monotonic()
402        for worker in self.workers:
403            worker.stop()
404        for worker in self.workers:
405            worker.wait_stopped(start_time)
406
407    def _get_result(self) -> QueueOutput | None:
408        use_faulthandler = (self.ns.timeout is not None)
409        timeout = PROGRESS_UPDATE
410
411        # bpo-46205: check the status of workers every iteration to avoid
412        # waiting forever on an empty queue.
413        while any(worker.is_alive() for worker in self.workers):
414            if use_faulthandler:
415                faulthandler.dump_traceback_later(MAIN_PROCESS_TIMEOUT,
416                                                  exit=True)
417
418            # wait for a thread
419            try:
420                return self.output.get(timeout=timeout)
421            except queue.Empty:
422                pass
423
424            # display progress
425            running = get_running(self.workers)
426            if running and not self.ns.pgo:
427                self.log('running: %s' % ', '.join(running))
428
429        # all worker threads are done: consume pending results
430        try:
431            return self.output.get(timeout=0)
432        except queue.Empty:
433            return None
434
435    def display_result(self, mp_result: MultiprocessResult) -> None:
436        result = mp_result.result
437
438        text = str(result)
439        if mp_result.error_msg is not None:
440            # CHILD_ERROR
441            text += ' (%s)' % mp_result.error_msg
442        elif (result.duration_sec >= PROGRESS_MIN_TIME and not self.ns.pgo):
443            text += ' (%s)' % format_duration(result.duration_sec)
444        running = get_running(self.workers)
445        if running and not self.ns.pgo:
446            text += ' -- running: %s' % ', '.join(running)
447        self.regrtest.display_progress(self.test_index, text)
448
449    def _process_result(self, item: QueueOutput) -> bool:
450        """Returns True if test runner must stop."""
451        if item[0]:
452            # Thread got an exception
453            format_exc = item[1]
454            print_warning(f"regrtest worker thread failed: {format_exc}")
455            return True
456
457        self.test_index += 1
458        mp_result = item[1]
459        self.regrtest.accumulate_result(mp_result.result)
460        self.display_result(mp_result)
461
462        if mp_result.stdout:
463            print(mp_result.stdout, flush=True)
464
465        if must_stop(mp_result.result, self.ns):
466            return True
467
468        return False
469
470    def run_tests(self) -> None:
471        self.start_workers()
472
473        self.test_index = 0
474        try:
475            while True:
476                item = self._get_result()
477                if item is None:
478                    break
479
480                stop = self._process_result(item)
481                if stop:
482                    break
483        except KeyboardInterrupt:
484            print()
485            self.regrtest.interrupted = True
486        finally:
487            if self.ns.timeout is not None:
488                faulthandler.cancel_dump_traceback_later()
489
490            # Always ensure that all worker processes are no longer
491            # worker when we exit this function
492            self.pending.stop()
493            self.stop_workers()
494
495
496def run_tests_multiprocess(regrtest: Regrtest) -> None:
497    MultiprocessTestRunner(regrtest).run_tests()
498
499
500class EncodeTestResult(json.JSONEncoder):
501    """Encode a TestResult (sub)class object into a JSON dict."""
502
503    def default(self, o: Any) -> dict[str, Any]:
504        if isinstance(o, TestResult):
505            result = vars(o)
506            result["__test_result__"] = o.__class__.__name__
507            return result
508
509        return super().default(o)
510
511
512def decode_test_result(d: dict[str, Any]) -> TestResult | dict[str, Any]:
513    """Decode a TestResult (sub)class object from a JSON dict."""
514
515    if "__test_result__" not in d:
516        return d
517
518    cls_name = d.pop("__test_result__")
519    for cls in get_all_test_result_classes():
520        if cls.__name__ == cls_name:
521            return cls(**d)
522
523
524def get_all_test_result_classes() -> set[type[TestResult]]:
525    prev_count = 0
526    classes = {TestResult}
527    while len(classes) > prev_count:
528        prev_count = len(classes)
529        to_add = []
530        for cls in classes:
531            to_add.extend(cls.__subclasses__())
532        classes.update(to_add)
533    return classes
534