xref: /aosp_15_r20/external/grpc-grpc/tools/run_tests/run_performance_tests.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1#!/usr/bin/env python3
2# Copyright 2016 gRPC authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Run performance tests locally or remotely."""
16
17from __future__ import print_function
18
19import argparse
20import collections
21import itertools
22import json
23import os
24import re
25import shlex
26import sys
27import time
28
29import six
30
31import performance.scenario_config as scenario_config
32import python_utils.jobset as jobset
33import python_utils.report_utils as report_utils
34
35_ROOT = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), "../.."))
36os.chdir(_ROOT)
37
38_REMOTE_HOST_USERNAME = "jenkins"
39
40_SCENARIO_TIMEOUT = 3 * 60
41_WORKER_TIMEOUT = 3 * 60
42_NETPERF_TIMEOUT = 60
43_QUIT_WORKER_TIMEOUT = 2 * 60
44
45
46class QpsWorkerJob:
47    """Encapsulates a qps worker server job."""
48
49    def __init__(self, spec, language, host_and_port, perf_file_base_name=None):
50        self._spec = spec
51        self.language = language
52        self.host_and_port = host_and_port
53        self._job = None
54        self.perf_file_base_name = perf_file_base_name
55
56    def start(self):
57        self._job = jobset.Job(
58            self._spec, newline_on_success=True, travis=True, add_env={}
59        )
60
61    def is_running(self):
62        """Polls a job and returns True if given job is still running."""
63        return self._job and self._job.state() == jobset._RUNNING
64
65    def kill(self):
66        if self._job:
67            self._job.kill()
68            self._job = None
69
70
71def create_qpsworker_job(
72    language, shortname=None, port=10000, remote_host=None, perf_cmd=None
73):
74    cmdline = language.worker_cmdline() + ["--driver_port=%s" % port]
75
76    if remote_host:
77        host_and_port = "%s:%s" % (remote_host, port)
78    else:
79        host_and_port = "localhost:%s" % port
80
81    perf_file_base_name = None
82    if perf_cmd:
83        perf_file_base_name = "%s-%s" % (host_and_port, shortname)
84        # specify -o output file so perf.data gets collected when worker stopped
85        cmdline = (
86            perf_cmd + ["-o", "%s-perf.data" % perf_file_base_name] + cmdline
87        )
88
89    worker_timeout = _WORKER_TIMEOUT
90    if remote_host:
91        user_at_host = "%s@%s" % (_REMOTE_HOST_USERNAME, remote_host)
92        ssh_cmd = ["ssh"]
93        cmdline = ["timeout", "%s" % (worker_timeout + 30)] + cmdline
94        ssh_cmd.extend(
95            [
96                str(user_at_host),
97                "cd ~/performance_workspace/grpc/ && %s" % " ".join(cmdline),
98            ]
99        )
100        cmdline = ssh_cmd
101
102    jobspec = jobset.JobSpec(
103        cmdline=cmdline,
104        shortname=shortname,
105        timeout_seconds=worker_timeout,  # workers get restarted after each scenario
106        verbose_success=True,
107    )
108    return QpsWorkerJob(jobspec, language, host_and_port, perf_file_base_name)
109
110
111def create_scenario_jobspec(
112    scenario_json,
113    workers,
114    remote_host=None,
115    bq_result_table=None,
116    server_cpu_load=0,
117):
118    """Runs one scenario using QPS driver."""
119    # setting QPS_WORKERS env variable here makes sure it works with SSH too.
120    cmd = 'QPS_WORKERS="%s" ' % ",".join(workers)
121    if bq_result_table:
122        cmd += 'BQ_RESULT_TABLE="%s" ' % bq_result_table
123    cmd += "tools/run_tests/performance/run_qps_driver.sh "
124    cmd += "--scenarios_json=%s " % shlex.quote(
125        json.dumps({"scenarios": [scenario_json]})
126    )
127    cmd += "--scenario_result_file=scenario_result.json "
128    if server_cpu_load != 0:
129        cmd += (
130            "--search_param=offered_load --initial_search_value=1000"
131            " --targeted_cpu_load=%d --stride=500 --error_tolerance=0.01"
132            % server_cpu_load
133        )
134    if remote_host:
135        user_at_host = "%s@%s" % (_REMOTE_HOST_USERNAME, remote_host)
136        cmd = 'ssh %s "cd ~/performance_workspace/grpc/ && "%s' % (
137            user_at_host,
138            shlex.quote(cmd),
139        )
140
141    return jobset.JobSpec(
142        cmdline=[cmd],
143        shortname="%s" % scenario_json["name"],
144        timeout_seconds=_SCENARIO_TIMEOUT,
145        shell=True,
146        verbose_success=True,
147    )
148
149
150def create_quit_jobspec(workers, remote_host=None):
151    """Runs quit using QPS driver."""
152    # setting QPS_WORKERS env variable here makes sure it works with SSH too.
153    cmd = 'QPS_WORKERS="%s" cmake/build/qps_json_driver --quit' % ",".join(
154        w.host_and_port for w in workers
155    )
156    if remote_host:
157        user_at_host = "%s@%s" % (_REMOTE_HOST_USERNAME, remote_host)
158        cmd = 'ssh %s "cd ~/performance_workspace/grpc/ && "%s' % (
159            user_at_host,
160            shlex.quote(cmd),
161        )
162
163    return jobset.JobSpec(
164        cmdline=[cmd],
165        shortname="shutdown_workers",
166        timeout_seconds=_QUIT_WORKER_TIMEOUT,
167        shell=True,
168        verbose_success=True,
169    )
170
171
172def create_netperf_jobspec(
173    server_host="localhost", client_host=None, bq_result_table=None
174):
175    """Runs netperf benchmark."""
176    cmd = 'NETPERF_SERVER_HOST="%s" ' % server_host
177    if bq_result_table:
178        cmd += 'BQ_RESULT_TABLE="%s" ' % bq_result_table
179    if client_host:
180        # If netperf is running remotely, the env variables populated by Jenkins
181        # won't be available on the client, but we need them for uploading results
182        # to BigQuery.
183        jenkins_job_name = os.getenv("KOKORO_JOB_NAME")
184        if jenkins_job_name:
185            cmd += 'KOKORO_JOB_NAME="%s" ' % jenkins_job_name
186        jenkins_build_number = os.getenv("KOKORO_BUILD_NUMBER")
187        if jenkins_build_number:
188            cmd += 'KOKORO_BUILD_NUMBER="%s" ' % jenkins_build_number
189
190    cmd += "tools/run_tests/performance/run_netperf.sh"
191    if client_host:
192        user_at_host = "%s@%s" % (_REMOTE_HOST_USERNAME, client_host)
193        cmd = 'ssh %s "cd ~/performance_workspace/grpc/ && "%s' % (
194            user_at_host,
195            shlex.quote(cmd),
196        )
197
198    return jobset.JobSpec(
199        cmdline=[cmd],
200        shortname="netperf",
201        timeout_seconds=_NETPERF_TIMEOUT,
202        shell=True,
203        verbose_success=True,
204    )
205
206
207def archive_repo(languages):
208    """Archives local version of repo including submodules."""
209    cmdline = ["tar", "-cf", "../grpc.tar", "../grpc/"]
210    if "java" in languages:
211        cmdline.append("../grpc-java")
212    if "go" in languages:
213        cmdline.append("../grpc-go")
214    if "node" in languages:
215        cmdline.append("../grpc-node")
216
217    archive_job = jobset.JobSpec(
218        cmdline=cmdline, shortname="archive_repo", timeout_seconds=3 * 60
219    )
220
221    jobset.message("START", "Archiving local repository.", do_newline=True)
222    num_failures, _ = jobset.run(
223        [archive_job], newline_on_success=True, maxjobs=1
224    )
225    if num_failures == 0:
226        jobset.message(
227            "SUCCESS",
228            "Archive with local repository created successfully.",
229            do_newline=True,
230        )
231    else:
232        jobset.message(
233            "FAILED", "Failed to archive local repository.", do_newline=True
234        )
235        sys.exit(1)
236
237
238def prepare_remote_hosts(hosts, prepare_local=False):
239    """Prepares remote hosts (and maybe prepare localhost as well)."""
240    prepare_timeout = 10 * 60
241    prepare_jobs = []
242    for host in hosts:
243        user_at_host = "%s@%s" % (_REMOTE_HOST_USERNAME, host)
244        prepare_jobs.append(
245            jobset.JobSpec(
246                cmdline=["tools/run_tests/performance/remote_host_prepare.sh"],
247                shortname="remote_host_prepare.%s" % host,
248                environ={"USER_AT_HOST": user_at_host},
249                timeout_seconds=prepare_timeout,
250            )
251        )
252    if prepare_local:
253        # Prepare localhost as well
254        prepare_jobs.append(
255            jobset.JobSpec(
256                cmdline=["tools/run_tests/performance/kill_workers.sh"],
257                shortname="local_prepare",
258                timeout_seconds=prepare_timeout,
259            )
260        )
261    jobset.message("START", "Preparing hosts.", do_newline=True)
262    num_failures, _ = jobset.run(
263        prepare_jobs, newline_on_success=True, maxjobs=10
264    )
265    if num_failures == 0:
266        jobset.message(
267            "SUCCESS", "Prepare step completed successfully.", do_newline=True
268        )
269    else:
270        jobset.message(
271            "FAILED", "Failed to prepare remote hosts.", do_newline=True
272        )
273        sys.exit(1)
274
275
276def build_on_remote_hosts(
277    hosts, languages=list(scenario_config.LANGUAGES.keys()), build_local=False
278):
279    """Builds performance worker on remote hosts (and maybe also locally)."""
280    build_timeout = 45 * 60
281    # Kokoro VMs (which are local only) do not have caching, so they need more time to build
282    local_build_timeout = 60 * 60
283    build_jobs = []
284    for host in hosts:
285        user_at_host = "%s@%s" % (_REMOTE_HOST_USERNAME, host)
286        build_jobs.append(
287            jobset.JobSpec(
288                cmdline=["tools/run_tests/performance/remote_host_build.sh"]
289                + languages,
290                shortname="remote_host_build.%s" % host,
291                environ={"USER_AT_HOST": user_at_host, "CONFIG": "opt"},
292                timeout_seconds=build_timeout,
293            )
294        )
295    if build_local:
296        # start port server locally
297        build_jobs.append(
298            jobset.JobSpec(
299                cmdline=["python", "tools/run_tests/start_port_server.py"],
300                shortname="local_start_port_server",
301                timeout_seconds=2 * 60,
302            )
303        )
304        # Build locally as well
305        build_jobs.append(
306            jobset.JobSpec(
307                cmdline=["tools/run_tests/performance/build_performance.sh"]
308                + languages,
309                shortname="local_build",
310                environ={"CONFIG": "opt"},
311                timeout_seconds=local_build_timeout,
312            )
313        )
314    jobset.message("START", "Building.", do_newline=True)
315    num_failures, _ = jobset.run(
316        build_jobs, newline_on_success=True, maxjobs=10
317    )
318    if num_failures == 0:
319        jobset.message("SUCCESS", "Built successfully.", do_newline=True)
320    else:
321        jobset.message("FAILED", "Build failed.", do_newline=True)
322        sys.exit(1)
323
324
325def create_qpsworkers(languages, worker_hosts, perf_cmd=None):
326    """Creates QPS workers (but does not start them)."""
327    if not worker_hosts:
328        # run two workers locally (for each language)
329        workers = [(None, 10000), (None, 10010)]
330    elif len(worker_hosts) == 1:
331        # run two workers on the remote host (for each language)
332        workers = [(worker_hosts[0], 10000), (worker_hosts[0], 10010)]
333    else:
334        # run one worker per each remote host (for each language)
335        workers = [(worker_host, 10000) for worker_host in worker_hosts]
336
337    return [
338        create_qpsworker_job(
339            language,
340            shortname="qps_worker_%s_%s" % (language, worker_idx),
341            port=worker[1] + language.worker_port_offset(),
342            remote_host=worker[0],
343            perf_cmd=perf_cmd,
344        )
345        for language in languages
346        for worker_idx, worker in enumerate(workers)
347    ]
348
349
350def perf_report_processor_job(
351    worker_host, perf_base_name, output_filename, flame_graph_reports
352):
353    print("Creating perf report collection job for %s" % worker_host)
354    cmd = ""
355    if worker_host != "localhost":
356        user_at_host = "%s@%s" % (_REMOTE_HOST_USERNAME, worker_host)
357        cmd = (
358            "USER_AT_HOST=%s OUTPUT_FILENAME=%s OUTPUT_DIR=%s PERF_BASE_NAME=%s"
359            " tools/run_tests/performance/process_remote_perf_flamegraphs.sh"
360            % (
361                user_at_host,
362                output_filename,
363                flame_graph_reports,
364                perf_base_name,
365            )
366        )
367    else:
368        cmd = (
369            "OUTPUT_FILENAME=%s OUTPUT_DIR=%s PERF_BASE_NAME=%s"
370            " tools/run_tests/performance/process_local_perf_flamegraphs.sh"
371            % (output_filename, flame_graph_reports, perf_base_name)
372        )
373
374    return jobset.JobSpec(
375        cmdline=cmd,
376        timeout_seconds=3 * 60,
377        shell=True,
378        verbose_success=True,
379        shortname="process perf report",
380    )
381
382
383Scenario = collections.namedtuple("Scenario", "jobspec workers name")
384
385
386def create_scenarios(
387    languages,
388    workers_by_lang,
389    remote_host=None,
390    regex=".*",
391    category="all",
392    bq_result_table=None,
393    netperf=False,
394    netperf_hosts=[],
395    server_cpu_load=0,
396):
397    """Create jobspecs for scenarios to run."""
398    all_workers = [
399        worker
400        for workers in list(workers_by_lang.values())
401        for worker in workers
402    ]
403    scenarios = []
404    _NO_WORKERS = []
405
406    if netperf:
407        if not netperf_hosts:
408            netperf_server = "localhost"
409            netperf_client = None
410        elif len(netperf_hosts) == 1:
411            netperf_server = netperf_hosts[0]
412            netperf_client = netperf_hosts[0]
413        else:
414            netperf_server = netperf_hosts[0]
415            netperf_client = netperf_hosts[1]
416        scenarios.append(
417            Scenario(
418                create_netperf_jobspec(
419                    server_host=netperf_server,
420                    client_host=netperf_client,
421                    bq_result_table=bq_result_table,
422                ),
423                _NO_WORKERS,
424                "netperf",
425            )
426        )
427
428    for language in languages:
429        for scenario_json in language.scenarios():
430            if re.search(regex, scenario_json["name"]):
431                categories = scenario_json.get(
432                    "CATEGORIES", ["scalable", "smoketest"]
433                )
434                if category in categories or category == "all":
435                    workers = workers_by_lang[str(language)][:]
436                    # 'SERVER_LANGUAGE' is an indicator for this script to pick
437                    # a server in different language.
438                    custom_server_lang = scenario_json.get(
439                        "SERVER_LANGUAGE", None
440                    )
441                    custom_client_lang = scenario_json.get(
442                        "CLIENT_LANGUAGE", None
443                    )
444                    scenario_json = scenario_config.remove_nonproto_fields(
445                        scenario_json
446                    )
447                    if custom_server_lang and custom_client_lang:
448                        raise Exception(
449                            "Cannot set both custom CLIENT_LANGUAGE and"
450                            " SERVER_LANGUAGEin the same scenario"
451                        )
452                    if custom_server_lang:
453                        if not workers_by_lang.get(custom_server_lang, []):
454                            print(
455                                "Warning: Skipping scenario %s as"
456                                % scenario_json["name"]
457                            )
458                            print(
459                                "SERVER_LANGUAGE is set to %s yet the language"
460                                " has not been selected with -l"
461                                % custom_server_lang
462                            )
463                            continue
464                        for idx in range(0, scenario_json["num_servers"]):
465                            # replace first X workers by workers of a different language
466                            workers[idx] = workers_by_lang[custom_server_lang][
467                                idx
468                            ]
469                    if custom_client_lang:
470                        if not workers_by_lang.get(custom_client_lang, []):
471                            print(
472                                "Warning: Skipping scenario %s as"
473                                % scenario_json["name"]
474                            )
475                            print(
476                                "CLIENT_LANGUAGE is set to %s yet the language"
477                                " has not been selected with -l"
478                                % custom_client_lang
479                            )
480                            continue
481                        for idx in range(
482                            scenario_json["num_servers"], len(workers)
483                        ):
484                            # replace all client workers by workers of a different language,
485                            # leave num_server workers as they are server workers.
486                            workers[idx] = workers_by_lang[custom_client_lang][
487                                idx
488                            ]
489                    scenario = Scenario(
490                        create_scenario_jobspec(
491                            scenario_json,
492                            [w.host_and_port for w in workers],
493                            remote_host=remote_host,
494                            bq_result_table=bq_result_table,
495                            server_cpu_load=server_cpu_load,
496                        ),
497                        workers,
498                        scenario_json["name"],
499                    )
500                    scenarios.append(scenario)
501
502    return scenarios
503
504
505def finish_qps_workers(jobs, qpsworker_jobs):
506    """Waits for given jobs to finish and eventually kills them."""
507    retries = 0
508    num_killed = 0
509    while any(job.is_running() for job in jobs):
510        for job in qpsworker_jobs:
511            if job.is_running():
512                print('QPS worker "%s" is still running.' % job.host_and_port)
513        if retries > 10:
514            print("Killing all QPS workers.")
515            for job in jobs:
516                job.kill()
517                num_killed += 1
518        retries += 1
519        time.sleep(3)
520    print("All QPS workers finished.")
521    return num_killed
522
523
524profile_output_files = []
525
526
527# Collect perf text reports and flamegraphs if perf_cmd was used
528# Note the base names of perf text reports are used when creating and processing
529# perf data. The scenario name uniqifies the output name in the final
530# perf reports directory.
531# Alos, the perf profiles need to be fetched and processed after each scenario
532# in order to avoid clobbering the output files.
533def run_collect_perf_profile_jobs(
534    hosts_and_base_names, scenario_name, flame_graph_reports
535):
536    perf_report_jobs = []
537    global profile_output_files
538    for host_and_port in hosts_and_base_names:
539        perf_base_name = hosts_and_base_names[host_and_port]
540        output_filename = "%s-%s" % (scenario_name, perf_base_name)
541        # from the base filename, create .svg output filename
542        host = host_and_port.split(":")[0]
543        profile_output_files.append("%s.svg" % output_filename)
544        perf_report_jobs.append(
545            perf_report_processor_job(
546                host, perf_base_name, output_filename, flame_graph_reports
547            )
548        )
549
550    jobset.message(
551        "START", "Collecting perf reports from qps workers", do_newline=True
552    )
553    failures, _ = jobset.run(
554        perf_report_jobs, newline_on_success=True, maxjobs=1
555    )
556    jobset.message(
557        "SUCCESS", "Collecting perf reports from qps workers", do_newline=True
558    )
559    return failures
560
561
562def main():
563    argp = argparse.ArgumentParser(description="Run performance tests.")
564    argp.add_argument(
565        "-l",
566        "--language",
567        choices=["all"] + sorted(scenario_config.LANGUAGES.keys()),
568        nargs="+",
569        required=True,
570        help="Languages to benchmark.",
571    )
572    argp.add_argument(
573        "--remote_driver_host",
574        default=None,
575        help=(
576            "Run QPS driver on given host. By default, QPS driver is run"
577            " locally."
578        ),
579    )
580    argp.add_argument(
581        "--remote_worker_host",
582        nargs="+",
583        default=[],
584        help="Worker hosts where to start QPS workers.",
585    )
586    argp.add_argument(
587        "--dry_run",
588        default=False,
589        action="store_const",
590        const=True,
591        help="Just list scenarios to be run, but don't run them.",
592    )
593    argp.add_argument(
594        "-r",
595        "--regex",
596        default=".*",
597        type=str,
598        help="Regex to select scenarios to run.",
599    )
600    argp.add_argument(
601        "--bq_result_table",
602        default=None,
603        type=str,
604        help='Bigquery "dataset.table" to upload results to.',
605    )
606    argp.add_argument(
607        "--category",
608        choices=["smoketest", "all", "scalable", "sweep"],
609        default="all",
610        help="Select a category of tests to run.",
611    )
612    argp.add_argument(
613        "--netperf",
614        default=False,
615        action="store_const",
616        const=True,
617        help="Run netperf benchmark as one of the scenarios.",
618    )
619    argp.add_argument(
620        "--server_cpu_load",
621        default=0,
622        type=int,
623        help=(
624            "Select a targeted server cpu load to run. 0 means ignore this flag"
625        ),
626    )
627    argp.add_argument(
628        "-x",
629        "--xml_report",
630        default="report.xml",
631        type=str,
632        help="Name of XML report file to generate.",
633    )
634    argp.add_argument(
635        "--perf_args",
636        help=(
637            'Example usage: "--perf_args=record -F 99 -g". '
638            "Wrap QPS workers in a perf command "
639            "with the arguments to perf specified here. "
640            '".svg" flame graph profiles will be '
641            "created for each Qps Worker on each scenario. "
642            'Files will output to "<repo_root>/<args.flame_graph_reports>" '
643            "directory. Output files from running the worker "
644            "under perf are saved in the repo root where its ran. "
645            'Note that the perf "-g" flag is necessary for '
646            "flame graphs generation to work (assuming the binary "
647            "being profiled uses frame pointers, check out "
648            '"--call-graph dwarf" option using libunwind otherwise.) '
649            'Also note that the entire "--perf_args=<arg(s)>" must '
650            "be wrapped in quotes as in the example usage. "
651            'If the "--perg_args" is unspecified, "perf" will '
652            "not be used at all. "
653            "See http://www.brendangregg.com/perf.html "
654            "for more general perf examples."
655        ),
656    )
657    argp.add_argument(
658        "--skip_generate_flamegraphs",
659        default=False,
660        action="store_const",
661        const=True,
662        help=(
663            "Turn flame graph generation off. "
664            'May be useful if "perf_args" arguments do not make sense for '
665            'generating flamegraphs (e.g., "--perf_args=stat ...")'
666        ),
667    )
668    argp.add_argument(
669        "-f",
670        "--flame_graph_reports",
671        default="perf_reports",
672        type=str,
673        help=(
674            "Name of directory to output flame graph profiles to, if any are"
675            " created."
676        ),
677    )
678    argp.add_argument(
679        "-u",
680        "--remote_host_username",
681        default="",
682        type=str,
683        help='Use a username that isn\'t "Jenkins" to SSH into remote workers.',
684    )
685
686    args = argp.parse_args()
687
688    global _REMOTE_HOST_USERNAME
689    if args.remote_host_username:
690        _REMOTE_HOST_USERNAME = args.remote_host_username
691
692    languages = set(
693        scenario_config.LANGUAGES[l]
694        for l in itertools.chain.from_iterable(
695            six.iterkeys(scenario_config.LANGUAGES) if x == "all" else [x]
696            for x in args.language
697        )
698    )
699
700    # Put together set of remote hosts where to run and build
701    remote_hosts = set()
702    if args.remote_worker_host:
703        for host in args.remote_worker_host:
704            remote_hosts.add(host)
705    if args.remote_driver_host:
706        remote_hosts.add(args.remote_driver_host)
707
708    if not args.dry_run:
709        if remote_hosts:
710            archive_repo(languages=[str(l) for l in languages])
711            prepare_remote_hosts(remote_hosts, prepare_local=True)
712        else:
713            prepare_remote_hosts([], prepare_local=True)
714
715    build_local = False
716    if not args.remote_driver_host:
717        build_local = True
718    if not args.dry_run:
719        build_on_remote_hosts(
720            remote_hosts,
721            languages=[str(l) for l in languages],
722            build_local=build_local,
723        )
724
725    perf_cmd = None
726    if args.perf_args:
727        print("Running workers under perf profiler")
728        # Expect /usr/bin/perf to be installed here, as is usual
729        perf_cmd = ["/usr/bin/perf"]
730        perf_cmd.extend(re.split("\s+", args.perf_args))
731
732    qpsworker_jobs = create_qpsworkers(
733        languages, args.remote_worker_host, perf_cmd=perf_cmd
734    )
735
736    # get list of worker addresses for each language.
737    workers_by_lang = dict([(str(language), []) for language in languages])
738    for job in qpsworker_jobs:
739        workers_by_lang[str(job.language)].append(job)
740
741    scenarios = create_scenarios(
742        languages,
743        workers_by_lang=workers_by_lang,
744        remote_host=args.remote_driver_host,
745        regex=args.regex,
746        category=args.category,
747        bq_result_table=args.bq_result_table,
748        netperf=args.netperf,
749        netperf_hosts=args.remote_worker_host,
750        server_cpu_load=args.server_cpu_load,
751    )
752
753    if not scenarios:
754        raise Exception("No scenarios to run")
755
756    total_scenario_failures = 0
757    qps_workers_killed = 0
758    merged_resultset = {}
759    perf_report_failures = 0
760
761    for scenario in scenarios:
762        if args.dry_run:
763            print(scenario.name)
764        else:
765            scenario_failures = 0
766            try:
767                for worker in scenario.workers:
768                    worker.start()
769                jobs = [scenario.jobspec]
770                if scenario.workers:
771                    # TODO(jtattermusch): ideally the "quit" job won't show up
772                    # in the report
773                    jobs.append(
774                        create_quit_jobspec(
775                            scenario.workers,
776                            remote_host=args.remote_driver_host,
777                        )
778                    )
779                scenario_failures, resultset = jobset.run(
780                    jobs, newline_on_success=True, maxjobs=1
781                )
782                total_scenario_failures += scenario_failures
783                merged_resultset = dict(
784                    itertools.chain(
785                        six.iteritems(merged_resultset),
786                        six.iteritems(resultset),
787                    )
788                )
789            finally:
790                # Consider qps workers that need to be killed as failures
791                qps_workers_killed += finish_qps_workers(
792                    scenario.workers, qpsworker_jobs
793                )
794
795            if (
796                perf_cmd
797                and scenario_failures == 0
798                and not args.skip_generate_flamegraphs
799            ):
800                workers_and_base_names = {}
801                for worker in scenario.workers:
802                    if not worker.perf_file_base_name:
803                        raise Exception(
804                            "using perf buf perf report filename is unspecified"
805                        )
806                    workers_and_base_names[
807                        worker.host_and_port
808                    ] = worker.perf_file_base_name
809                perf_report_failures += run_collect_perf_profile_jobs(
810                    workers_and_base_names,
811                    scenario.name,
812                    args.flame_graph_reports,
813                )
814
815    # Still write the index.html even if some scenarios failed.
816    # 'profile_output_files' will only have names for scenarios that passed
817    if perf_cmd and not args.skip_generate_flamegraphs:
818        # write the index fil to the output dir, with all profiles from all scenarios/workers
819        report_utils.render_perf_profiling_results(
820            "%s/index.html" % args.flame_graph_reports, profile_output_files
821        )
822
823    report_utils.render_junit_xml_report(
824        merged_resultset,
825        args.xml_report,
826        suite_name="benchmarks",
827        multi_target=True,
828    )
829
830    if total_scenario_failures > 0 or qps_workers_killed > 0:
831        print(
832            "%s scenarios failed and %s qps worker jobs killed"
833            % (total_scenario_failures, qps_workers_killed)
834        )
835        sys.exit(1)
836
837    if perf_report_failures > 0:
838        print("%s perf profile collection jobs failed" % perf_report_failures)
839        sys.exit(1)
840
841
842if __name__ == "__main__":
843    main()
844