xref: /aosp_15_r20/external/grpc-grpc/tools/run_tests/python_utils/jobset.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Run a group of subprocesses and then finish."""
15
16import errno
17import logging
18import multiprocessing
19import os
20import platform
21import re
22import signal
23import subprocess
24import sys
25import tempfile
26import time
27
28# cpu cost measurement
29measure_cpu_costs = False
30
31_DEFAULT_MAX_JOBS = 16 * multiprocessing.cpu_count()
32# Maximum number of bytes of job's stdout that will be stored in the result.
33# Only last N bytes of stdout will be kept if the actual output longer.
34_MAX_RESULT_SIZE = 64 * 1024
35
36
37# NOTE: If you change this, please make sure to test reviewing the
38# github PR with http://reviewable.io, which is known to add UTF-8
39# characters to the PR description, which leak into the environment here
40# and cause failures.
41def strip_non_ascii_chars(s):
42    return "".join(c for c in s if ord(c) < 128)
43
44
45def sanitized_environment(env):
46    sanitized = {}
47    for key, value in list(env.items()):
48        sanitized[strip_non_ascii_chars(key)] = strip_non_ascii_chars(value)
49    return sanitized
50
51
52def platform_string():
53    if platform.system() == "Windows":
54        return "windows"
55    elif platform.system()[:7] == "MSYS_NT":
56        return "windows"
57    elif platform.system() == "Darwin":
58        return "mac"
59    elif platform.system() == "Linux":
60        return "linux"
61    else:
62        return "posix"
63
64
65# setup a signal handler so that signal.pause registers 'something'
66# when a child finishes
67# not using futures and threading to avoid a dependency on subprocess32
68if platform_string() == "windows":
69    pass
70else:
71
72    def alarm_handler(unused_signum, unused_frame):
73        pass
74
75    signal.signal(signal.SIGCHLD, lambda unused_signum, unused_frame: None)
76    signal.signal(signal.SIGALRM, alarm_handler)
77
78_SUCCESS = object()
79_FAILURE = object()
80_RUNNING = object()
81_KILLED = object()
82
83_COLORS = {
84    "red": [31, 0],
85    "green": [32, 0],
86    "yellow": [33, 0],
87    "lightgray": [37, 0],
88    "gray": [30, 1],
89    "purple": [35, 0],
90    "cyan": [36, 0],
91}
92
93_BEGINNING_OF_LINE = "\x1b[0G"
94_CLEAR_LINE = "\x1b[2K"
95
96_TAG_COLOR = {
97    "FAILED": "red",
98    "FLAKE": "purple",
99    "TIMEOUT_FLAKE": "purple",
100    "WARNING": "yellow",
101    "TIMEOUT": "red",
102    "PASSED": "green",
103    "START": "gray",
104    "WAITING": "yellow",
105    "SUCCESS": "green",
106    "IDLE": "gray",
107    "SKIPPED": "cyan",
108}
109
110_FORMAT = "%(asctime)-15s %(message)s"
111logging.basicConfig(level=logging.INFO, format=_FORMAT)
112
113
114def eintr_be_gone(fn):
115    """Run fn until it doesn't stop because of EINTR"""
116    while True:
117        try:
118            return fn()
119        except IOError as e:
120            if e.errno != errno.EINTR:
121                raise
122
123
124def message(tag, msg, explanatory_text=None, do_newline=False):
125    if (
126        message.old_tag == tag
127        and message.old_msg == msg
128        and not explanatory_text
129    ):
130        return
131    message.old_tag = tag
132    message.old_msg = msg
133    if explanatory_text:
134        if isinstance(explanatory_text, bytes):
135            explanatory_text = explanatory_text.decode("utf8", errors="replace")
136    while True:
137        try:
138            if platform_string() == "windows" or not sys.stdout.isatty():
139                if explanatory_text:
140                    logging.info(explanatory_text)
141                logging.info("%s: %s", tag, msg)
142            else:
143                sys.stdout.write(
144                    "%s%s%s\x1b[%d;%dm%s\x1b[0m: %s%s"
145                    % (
146                        _BEGINNING_OF_LINE,
147                        _CLEAR_LINE,
148                        "\n%s" % explanatory_text
149                        if explanatory_text is not None
150                        else "",
151                        _COLORS[_TAG_COLOR[tag]][1],
152                        _COLORS[_TAG_COLOR[tag]][0],
153                        tag,
154                        msg,
155                        "\n"
156                        if do_newline or explanatory_text is not None
157                        else "",
158                    )
159                )
160            sys.stdout.flush()
161            return
162        except IOError as e:
163            if e.errno != errno.EINTR:
164                raise
165
166
167message.old_tag = ""
168message.old_msg = ""
169
170
171def which(filename):
172    if "/" in filename:
173        return filename
174    for path in os.environ["PATH"].split(os.pathsep):
175        if os.path.exists(os.path.join(path, filename)):
176            return os.path.join(path, filename)
177    raise Exception("%s not found" % filename)
178
179
180class JobSpec(object):
181    """Specifies what to run for a job."""
182
183    def __init__(
184        self,
185        cmdline,
186        shortname=None,
187        environ=None,
188        cwd=None,
189        shell=False,
190        timeout_seconds=5 * 60,
191        flake_retries=0,
192        timeout_retries=0,
193        kill_handler=None,
194        cpu_cost=1.0,
195        verbose_success=False,
196        logfilename=None,
197    ):
198        """
199        Arguments:
200          cmdline: a list of arguments to pass as the command line
201          environ: a dictionary of environment variables to set in the child process
202          kill_handler: a handler that will be called whenever job.kill() is invoked
203          cpu_cost: number of cores per second this job needs
204          logfilename: use given file to store job's output, rather than using a temporary file
205        """
206        if environ is None:
207            environ = {}
208        self.cmdline = cmdline
209        self.environ = environ
210        self.shortname = cmdline[0] if shortname is None else shortname
211        self.cwd = cwd
212        self.shell = shell
213        self.timeout_seconds = timeout_seconds
214        self.flake_retries = flake_retries
215        self.timeout_retries = timeout_retries
216        self.kill_handler = kill_handler
217        self.cpu_cost = cpu_cost
218        self.verbose_success = verbose_success
219        self.logfilename = logfilename
220        if (
221            self.logfilename
222            and self.flake_retries != 0
223            and self.timeout_retries != 0
224        ):
225            # Forbidden to avoid overwriting the test log when retrying.
226            raise Exception(
227                "Cannot use custom logfile when retries are enabled"
228            )
229
230    def identity(self):
231        return "%r %r" % (self.cmdline, self.environ)
232
233    def __hash__(self):
234        return hash(self.identity())
235
236    def __cmp__(self, other):
237        return self.identity() == other.identity()
238
239    def __lt__(self, other):
240        return self.identity() < other.identity()
241
242    def __repr__(self):
243        return "JobSpec(shortname=%s, cmdline=%s)" % (
244            self.shortname,
245            self.cmdline,
246        )
247
248    def __str__(self):
249        return "%s: %s %s" % (
250            self.shortname,
251            " ".join("%s=%s" % kv for kv in list(self.environ.items())),
252            " ".join(self.cmdline),
253        )
254
255
256class JobResult(object):
257    def __init__(self):
258        self.state = "UNKNOWN"
259        self.returncode = -1
260        self.elapsed_time = 0
261        self.num_failures = 0
262        self.retries = 0
263        self.message = ""
264        self.cpu_estimated = 1
265        self.cpu_measured = 1
266
267
268def read_from_start(f):
269    f.seek(0)
270    return f.read()
271
272
273class Job(object):
274    """Manages one job."""
275
276    def __init__(
277        self, spec, newline_on_success, travis, add_env, quiet_success=False
278    ):
279        self._spec = spec
280        self._newline_on_success = newline_on_success
281        self._travis = travis
282        self._add_env = add_env.copy()
283        self._retries = 0
284        self._timeout_retries = 0
285        self._suppress_failure_message = False
286        self._quiet_success = quiet_success
287        if not self._quiet_success:
288            message("START", spec.shortname, do_newline=self._travis)
289        self.result = JobResult()
290        self.start()
291
292    def GetSpec(self):
293        return self._spec
294
295    def start(self):
296        if self._spec.logfilename:
297            # make sure the log directory exists
298            logfile_dir = os.path.dirname(
299                os.path.abspath(self._spec.logfilename)
300            )
301            if not os.path.exists(logfile_dir):
302                os.makedirs(logfile_dir)
303            self._logfile = open(self._spec.logfilename, "w+")
304        else:
305            # macOS: a series of quick os.unlink invocation might cause OS
306            # error during the creation of temporary file. By using
307            # NamedTemporaryFile, we defer the removal of file and directory.
308            self._logfile = tempfile.NamedTemporaryFile()
309        env = dict(os.environ)
310        env.update(self._spec.environ)
311        env.update(self._add_env)
312        env = sanitized_environment(env)
313        self._start = time.time()
314        cmdline = self._spec.cmdline
315        # The Unix time command is finicky when used with MSBuild, so we don't use it
316        # with jobs that run MSBuild.
317        global measure_cpu_costs
318        if measure_cpu_costs and not "vsprojects\\build" in cmdline[0]:
319            cmdline = ["time", "-p"] + cmdline
320        else:
321            measure_cpu_costs = False
322        try_start = lambda: subprocess.Popen(
323            args=cmdline,
324            stderr=subprocess.STDOUT,
325            stdout=self._logfile,
326            cwd=self._spec.cwd,
327            shell=self._spec.shell,
328            env=env,
329        )
330        delay = 0.3
331        for i in range(0, 4):
332            try:
333                self._process = try_start()
334                break
335            except OSError:
336                message(
337                    "WARNING",
338                    "Failed to start %s, retrying in %f seconds"
339                    % (self._spec.shortname, delay),
340                )
341                time.sleep(delay)
342                delay *= 2
343        else:
344            self._process = try_start()
345        self._state = _RUNNING
346
347    def state(self):
348        """Poll current state of the job. Prints messages at completion."""
349
350        def stdout(self=self):
351            stdout = read_from_start(self._logfile)
352            self.result.message = stdout[-_MAX_RESULT_SIZE:]
353            return stdout
354
355        if self._state == _RUNNING and self._process.poll() is not None:
356            elapsed = time.time() - self._start
357            self.result.elapsed_time = elapsed
358            if self._process.returncode != 0:
359                if self._retries < self._spec.flake_retries:
360                    message(
361                        "FLAKE",
362                        "%s [ret=%d, pid=%d]"
363                        % (
364                            self._spec.shortname,
365                            self._process.returncode,
366                            self._process.pid,
367                        ),
368                        stdout(),
369                        do_newline=True,
370                    )
371                    self._retries += 1
372                    self.result.num_failures += 1
373                    self.result.retries = self._timeout_retries + self._retries
374                    # NOTE: job is restarted regardless of jobset's max_time setting
375                    self.start()
376                else:
377                    self._state = _FAILURE
378                    if not self._suppress_failure_message:
379                        message(
380                            "FAILED",
381                            "%s [ret=%d, pid=%d, time=%.1fsec]"
382                            % (
383                                self._spec.shortname,
384                                self._process.returncode,
385                                self._process.pid,
386                                elapsed,
387                            ),
388                            stdout(),
389                            do_newline=True,
390                        )
391                    self.result.state = "FAILED"
392                    self.result.num_failures += 1
393                    self.result.returncode = self._process.returncode
394            else:
395                self._state = _SUCCESS
396                measurement = ""
397                if measure_cpu_costs:
398                    m = re.search(
399                        r"real\s+([0-9.]+)\nuser\s+([0-9.]+)\nsys\s+([0-9.]+)",
400                        (stdout()).decode("utf8", errors="replace"),
401                    )
402                    real = float(m.group(1))
403                    user = float(m.group(2))
404                    sys = float(m.group(3))
405                    if real > 0.5:
406                        cores = (user + sys) / real
407                        self.result.cpu_measured = float("%.01f" % cores)
408                        self.result.cpu_estimated = float(
409                            "%.01f" % self._spec.cpu_cost
410                        )
411                        measurement = "; cpu_cost=%.01f; estimated=%.01f" % (
412                            self.result.cpu_measured,
413                            self.result.cpu_estimated,
414                        )
415                if not self._quiet_success:
416                    message(
417                        "PASSED",
418                        "%s [time=%.1fsec, retries=%d:%d%s]"
419                        % (
420                            self._spec.shortname,
421                            elapsed,
422                            self._retries,
423                            self._timeout_retries,
424                            measurement,
425                        ),
426                        stdout() if self._spec.verbose_success else None,
427                        do_newline=self._newline_on_success or self._travis,
428                    )
429                self.result.state = "PASSED"
430        elif (
431            self._state == _RUNNING
432            and self._spec.timeout_seconds is not None
433            and time.time() - self._start > self._spec.timeout_seconds
434        ):
435            elapsed = time.time() - self._start
436            self.result.elapsed_time = elapsed
437            if self._timeout_retries < self._spec.timeout_retries:
438                message(
439                    "TIMEOUT_FLAKE",
440                    "%s [pid=%d]" % (self._spec.shortname, self._process.pid),
441                    stdout(),
442                    do_newline=True,
443                )
444                self._timeout_retries += 1
445                self.result.num_failures += 1
446                self.result.retries = self._timeout_retries + self._retries
447                if self._spec.kill_handler:
448                    self._spec.kill_handler(self)
449                self._process.terminate()
450                # NOTE: job is restarted regardless of jobset's max_time setting
451                self.start()
452            else:
453                message(
454                    "TIMEOUT",
455                    "%s [pid=%d, time=%.1fsec]"
456                    % (self._spec.shortname, self._process.pid, elapsed),
457                    stdout(),
458                    do_newline=True,
459                )
460                self.kill()
461                self.result.state = "TIMEOUT"
462                self.result.num_failures += 1
463        return self._state
464
465    def kill(self):
466        if self._state == _RUNNING:
467            self._state = _KILLED
468            if self._spec.kill_handler:
469                self._spec.kill_handler(self)
470            self._process.terminate()
471
472    def suppress_failure_message(self):
473        self._suppress_failure_message = True
474
475
476class Jobset(object):
477    """Manages one run of jobs."""
478
479    def __init__(
480        self,
481        check_cancelled,
482        maxjobs,
483        maxjobs_cpu_agnostic,
484        newline_on_success,
485        travis,
486        stop_on_failure,
487        add_env,
488        quiet_success,
489        max_time,
490    ):
491        self._running = set()
492        self._check_cancelled = check_cancelled
493        self._cancelled = False
494        self._failures = 0
495        self._completed = 0
496        self._maxjobs = maxjobs
497        self._maxjobs_cpu_agnostic = maxjobs_cpu_agnostic
498        self._newline_on_success = newline_on_success
499        self._travis = travis
500        self._stop_on_failure = stop_on_failure
501        self._add_env = add_env
502        self._quiet_success = quiet_success
503        self._max_time = max_time
504        self.resultset = {}
505        self._remaining = None
506        self._start_time = time.time()
507
508    def set_remaining(self, remaining):
509        self._remaining = remaining
510
511    def get_num_failures(self):
512        return self._failures
513
514    def cpu_cost(self):
515        c = 0
516        for job in self._running:
517            c += job._spec.cpu_cost
518        return c
519
520    def start(self, spec):
521        """Start a job. Return True on success, False on failure."""
522        while True:
523            if (
524                self._max_time > 0
525                and time.time() - self._start_time > self._max_time
526            ):
527                skipped_job_result = JobResult()
528                skipped_job_result.state = "SKIPPED"
529                message("SKIPPED", spec.shortname, do_newline=True)
530                self.resultset[spec.shortname] = [skipped_job_result]
531                return True
532            if self.cancelled():
533                return False
534            current_cpu_cost = self.cpu_cost()
535            if current_cpu_cost == 0:
536                break
537            if current_cpu_cost + spec.cpu_cost <= self._maxjobs:
538                if len(self._running) < self._maxjobs_cpu_agnostic:
539                    break
540            self.reap(spec.shortname, spec.cpu_cost)
541        if self.cancelled():
542            return False
543        job = Job(
544            spec,
545            self._newline_on_success,
546            self._travis,
547            self._add_env,
548            self._quiet_success,
549        )
550        self._running.add(job)
551        if job.GetSpec().shortname not in self.resultset:
552            self.resultset[job.GetSpec().shortname] = []
553        return True
554
555    def reap(self, waiting_for=None, waiting_for_cost=None):
556        """Collect the dead jobs."""
557        while self._running:
558            dead = set()
559            for job in self._running:
560                st = eintr_be_gone(lambda: job.state())
561                if st == _RUNNING:
562                    continue
563                if st == _FAILURE or st == _KILLED:
564                    self._failures += 1
565                    if self._stop_on_failure:
566                        self._cancelled = True
567                        for job in self._running:
568                            job.kill()
569                dead.add(job)
570                break
571            for job in dead:
572                self._completed += 1
573                if not self._quiet_success or job.result.state != "PASSED":
574                    self.resultset[job.GetSpec().shortname].append(job.result)
575                self._running.remove(job)
576            if dead:
577                return
578            if not self._travis and platform_string() != "windows":
579                rstr = (
580                    ""
581                    if self._remaining is None
582                    else "%d queued, " % self._remaining
583                )
584                if self._remaining is not None and self._completed > 0:
585                    now = time.time()
586                    sofar = now - self._start_time
587                    remaining = (
588                        sofar
589                        / self._completed
590                        * (self._remaining + len(self._running))
591                    )
592                    rstr = "ETA %.1f sec; %s" % (remaining, rstr)
593                if waiting_for is not None:
594                    wstr = " next: %s @ %.2f cpu" % (
595                        waiting_for,
596                        waiting_for_cost,
597                    )
598                else:
599                    wstr = ""
600                message(
601                    "WAITING",
602                    "%s%d jobs running, %d complete, %d failed (load %.2f)%s"
603                    % (
604                        rstr,
605                        len(self._running),
606                        self._completed,
607                        self._failures,
608                        self.cpu_cost(),
609                        wstr,
610                    ),
611                )
612            if platform_string() == "windows":
613                time.sleep(0.1)
614            else:
615                signal.alarm(10)
616                signal.pause()
617
618    def cancelled(self):
619        """Poll for cancellation."""
620        if self._cancelled:
621            return True
622        if not self._check_cancelled():
623            return False
624        for job in self._running:
625            job.kill()
626        self._cancelled = True
627        return True
628
629    def finish(self):
630        while self._running:
631            if self.cancelled():
632                pass  # poll cancellation
633            self.reap()
634        if platform_string() != "windows":
635            signal.alarm(0)
636        return not self.cancelled() and self._failures == 0
637
638
639def _never_cancelled():
640    return False
641
642
643def tag_remaining(xs):
644    staging = []
645    for x in xs:
646        staging.append(x)
647        if len(staging) > 5000:
648            yield (staging.pop(0), None)
649    n = len(staging)
650    for i, x in enumerate(staging):
651        yield (x, n - i - 1)
652
653
654def run(
655    cmdlines,
656    check_cancelled=_never_cancelled,
657    maxjobs=None,
658    maxjobs_cpu_agnostic=None,
659    newline_on_success=False,
660    travis=False,
661    infinite_runs=False,
662    stop_on_failure=False,
663    add_env={},
664    skip_jobs=False,
665    quiet_success=False,
666    max_time=-1,
667):
668    if skip_jobs:
669        resultset = {}
670        skipped_job_result = JobResult()
671        skipped_job_result.state = "SKIPPED"
672        for job in cmdlines:
673            message("SKIPPED", job.shortname, do_newline=True)
674            resultset[job.shortname] = [skipped_job_result]
675        return 0, resultset
676    js = Jobset(
677        check_cancelled,
678        maxjobs if maxjobs is not None else _DEFAULT_MAX_JOBS,
679        maxjobs_cpu_agnostic
680        if maxjobs_cpu_agnostic is not None
681        else _DEFAULT_MAX_JOBS,
682        newline_on_success,
683        travis,
684        stop_on_failure,
685        add_env,
686        quiet_success,
687        max_time,
688    )
689    for cmdline, remaining in tag_remaining(cmdlines):
690        if not js.start(cmdline):
691            break
692        if remaining is not None:
693            js.set_remaining(remaining)
694    js.finish()
695    return js.get_num_failures(), js.resultset
696