1#!/usr/bin/env python3
2# Copyright 2016 gRPC authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Run performance tests locally or remotely."""
16
17from __future__ import print_function
18
19import argparse
20import collections
21import itertools
22import json
23import multiprocessing
24import os
25import pipes
26import re
27import subprocess
28import sys
29import tempfile
30import time
31import traceback
32import uuid
33
34import six
35
36import performance.scenario_config as scenario_config
37import python_utils.jobset as jobset
38import python_utils.report_utils as report_utils
39
40_ROOT = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), '../..'))
41os.chdir(_ROOT)
42
43_REMOTE_HOST_USERNAME = 'jenkins'
44
45_SCENARIO_TIMEOUT = 3 * 60
46_WORKER_TIMEOUT = 3 * 60
47_NETPERF_TIMEOUT = 60
48_QUIT_WORKER_TIMEOUT = 2 * 60
49
50
51class QpsWorkerJob:
52    """Encapsulates a qps worker server job."""
53
54    def __init__(self, spec, language, host_and_port, perf_file_base_name=None):
55        self._spec = spec
56        self.language = language
57        self.host_and_port = host_and_port
58        self._job = None
59        self.perf_file_base_name = perf_file_base_name
60
61    def start(self):
62        self._job = jobset.Job(self._spec,
63                               newline_on_success=True,
64                               travis=True,
65                               add_env={})
66
67    def is_running(self):
68        """Polls a job and returns True if given job is still running."""
69        return self._job and self._job.state() == jobset._RUNNING
70
71    def kill(self):
72        if self._job:
73            self._job.kill()
74            self._job = None
75
76
77def create_qpsworker_job(language,
78                         shortname=None,
79                         port=10000,
80                         remote_host=None,
81                         perf_cmd=None):
82    cmdline = (language.worker_cmdline() + ['--driver_port=%s' % port])
83
84    if remote_host:
85        host_and_port = '%s:%s' % (remote_host, port)
86    else:
87        host_and_port = 'localhost:%s' % port
88
89    perf_file_base_name = None
90    if perf_cmd:
91        perf_file_base_name = '%s-%s' % (host_and_port, shortname)
92        # specify -o output file so perf.data gets collected when worker stopped
93        cmdline = perf_cmd + ['-o', '%s-perf.data' % perf_file_base_name
94                             ] + cmdline
95
96    worker_timeout = _WORKER_TIMEOUT
97    if remote_host:
98        user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, remote_host)
99        ssh_cmd = ['ssh']
100        cmdline = ['timeout', '%s' % (worker_timeout + 30)] + cmdline
101        ssh_cmd.extend([
102            str(user_at_host),
103            'cd ~/performance_workspace/grpc/ && %s' % ' '.join(cmdline)
104        ])
105        cmdline = ssh_cmd
106
107    jobspec = jobset.JobSpec(
108        cmdline=cmdline,
109        shortname=shortname,
110        timeout_seconds=
111        worker_timeout,  # workers get restarted after each scenario
112        verbose_success=True)
113    return QpsWorkerJob(jobspec, language, host_and_port, perf_file_base_name)
114
115
116def create_scenario_jobspec(scenario_json,
117                            workers,
118                            remote_host=None,
119                            bq_result_table=None,
120                            server_cpu_load=0):
121    """Runs one scenario using QPS driver."""
122    # setting QPS_WORKERS env variable here makes sure it works with SSH too.
123    cmd = 'QPS_WORKERS="%s" ' % ','.join(workers)
124    if bq_result_table:
125        cmd += 'BQ_RESULT_TABLE="%s" ' % bq_result_table
126    cmd += 'tools/run_tests/performance/run_qps_driver.sh '
127    cmd += '--scenarios_json=%s ' % pipes.quote(
128        json.dumps({'scenarios': [scenario_json]}))
129    cmd += '--scenario_result_file=scenario_result.json '
130    if server_cpu_load != 0:
131        cmd += '--search_param=offered_load --initial_search_value=1000 --targeted_cpu_load=%d --stride=500 --error_tolerance=0.01' % server_cpu_load
132    if remote_host:
133        user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, remote_host)
134        cmd = 'ssh %s "cd ~/performance_workspace/grpc/ && "%s' % (
135            user_at_host, pipes.quote(cmd))
136
137    return jobset.JobSpec(cmdline=[cmd],
138                          shortname='%s' % scenario_json['name'],
139                          timeout_seconds=_SCENARIO_TIMEOUT,
140                          shell=True,
141                          verbose_success=True)
142
143
144def create_quit_jobspec(workers, remote_host=None):
145    """Runs quit using QPS driver."""
146    # setting QPS_WORKERS env variable here makes sure it works with SSH too.
147    cmd = 'QPS_WORKERS="%s" cmake/build/qps_json_driver --quit' % ','.join(
148        w.host_and_port for w in workers)
149    if remote_host:
150        user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, remote_host)
151        cmd = 'ssh %s "cd ~/performance_workspace/grpc/ && "%s' % (
152            user_at_host, pipes.quote(cmd))
153
154    return jobset.JobSpec(cmdline=[cmd],
155                          shortname='shutdown_workers',
156                          timeout_seconds=_QUIT_WORKER_TIMEOUT,
157                          shell=True,
158                          verbose_success=True)
159
160
161def create_netperf_jobspec(server_host='localhost',
162                           client_host=None,
163                           bq_result_table=None):
164    """Runs netperf benchmark."""
165    cmd = 'NETPERF_SERVER_HOST="%s" ' % server_host
166    if bq_result_table:
167        cmd += 'BQ_RESULT_TABLE="%s" ' % bq_result_table
168    if client_host:
169        # If netperf is running remotely, the env variables populated by Jenkins
170        # won't be available on the client, but we need them for uploading results
171        # to BigQuery.
172        jenkins_job_name = os.getenv('KOKORO_JOB_NAME')
173        if jenkins_job_name:
174            cmd += 'KOKORO_JOB_NAME="%s" ' % jenkins_job_name
175        jenkins_build_number = os.getenv('KOKORO_BUILD_NUMBER')
176        if jenkins_build_number:
177            cmd += 'KOKORO_BUILD_NUMBER="%s" ' % jenkins_build_number
178
179    cmd += 'tools/run_tests/performance/run_netperf.sh'
180    if client_host:
181        user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, client_host)
182        cmd = 'ssh %s "cd ~/performance_workspace/grpc/ && "%s' % (
183            user_at_host, pipes.quote(cmd))
184
185    return jobset.JobSpec(cmdline=[cmd],
186                          shortname='netperf',
187                          timeout_seconds=_NETPERF_TIMEOUT,
188                          shell=True,
189                          verbose_success=True)
190
191
192def archive_repo(languages):
193    """Archives local version of repo including submodules."""
194    cmdline = ['tar', '-cf', '../grpc.tar', '../grpc/']
195    if 'java' in languages:
196        cmdline.append('../grpc-java')
197    if 'go' in languages:
198        cmdline.append('../grpc-go')
199    if 'node' in languages or 'node_purejs' in languages:
200        cmdline.append('../grpc-node')
201
202    archive_job = jobset.JobSpec(cmdline=cmdline,
203                                 shortname='archive_repo',
204                                 timeout_seconds=3 * 60)
205
206    jobset.message('START', 'Archiving local repository.', do_newline=True)
207    num_failures, _ = jobset.run([archive_job],
208                                 newline_on_success=True,
209                                 maxjobs=1)
210    if num_failures == 0:
211        jobset.message('SUCCESS',
212                       'Archive with local repository created successfully.',
213                       do_newline=True)
214    else:
215        jobset.message('FAILED',
216                       'Failed to archive local repository.',
217                       do_newline=True)
218        sys.exit(1)
219
220
221def prepare_remote_hosts(hosts, prepare_local=False):
222    """Prepares remote hosts (and maybe prepare localhost as well)."""
223    prepare_timeout = 10 * 60
224    prepare_jobs = []
225    for host in hosts:
226        user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, host)
227        prepare_jobs.append(
228            jobset.JobSpec(
229                cmdline=['tools/run_tests/performance/remote_host_prepare.sh'],
230                shortname='remote_host_prepare.%s' % host,
231                environ={'USER_AT_HOST': user_at_host},
232                timeout_seconds=prepare_timeout))
233    if prepare_local:
234        # Prepare localhost as well
235        prepare_jobs.append(
236            jobset.JobSpec(
237                cmdline=['tools/run_tests/performance/kill_workers.sh'],
238                shortname='local_prepare',
239                timeout_seconds=prepare_timeout))
240    jobset.message('START', 'Preparing hosts.', do_newline=True)
241    num_failures, _ = jobset.run(prepare_jobs,
242                                 newline_on_success=True,
243                                 maxjobs=10)
244    if num_failures == 0:
245        jobset.message('SUCCESS',
246                       'Prepare step completed successfully.',
247                       do_newline=True)
248    else:
249        jobset.message('FAILED',
250                       'Failed to prepare remote hosts.',
251                       do_newline=True)
252        sys.exit(1)
253
254
255def build_on_remote_hosts(hosts,
256                          languages=list(scenario_config.LANGUAGES.keys()),
257                          build_local=False):
258    """Builds performance worker on remote hosts (and maybe also locally)."""
259    build_timeout = 45 * 60
260    # Kokoro VMs (which are local only) do not have caching, so they need more time to build
261    local_build_timeout = 60 * 60
262    build_jobs = []
263    for host in hosts:
264        user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, host)
265        build_jobs.append(
266            jobset.JobSpec(
267                cmdline=['tools/run_tests/performance/remote_host_build.sh'] +
268                languages,
269                shortname='remote_host_build.%s' % host,
270                environ={
271                    'USER_AT_HOST': user_at_host,
272                    'CONFIG': 'opt'
273                },
274                timeout_seconds=build_timeout))
275    if build_local:
276        # start port server locally
277        build_jobs.append(
278            jobset.JobSpec(
279                cmdline=['python', 'tools/run_tests/start_port_server.py'],
280                shortname='local_start_port_server',
281                timeout_seconds=2 * 60))
282        # Build locally as well
283        build_jobs.append(
284            jobset.JobSpec(
285                cmdline=['tools/run_tests/performance/build_performance.sh'] +
286                languages,
287                shortname='local_build',
288                environ={'CONFIG': 'opt'},
289                timeout_seconds=local_build_timeout))
290    jobset.message('START', 'Building.', do_newline=True)
291    num_failures, _ = jobset.run(build_jobs,
292                                 newline_on_success=True,
293                                 maxjobs=10)
294    if num_failures == 0:
295        jobset.message('SUCCESS', 'Built successfully.', do_newline=True)
296    else:
297        jobset.message('FAILED', 'Build failed.', do_newline=True)
298        sys.exit(1)
299
300
301def create_qpsworkers(languages, worker_hosts, perf_cmd=None):
302    """Creates QPS workers (but does not start them)."""
303    if not worker_hosts:
304        # run two workers locally (for each language)
305        workers = [(None, 10000), (None, 10010)]
306    elif len(worker_hosts) == 1:
307        # run two workers on the remote host (for each language)
308        workers = [(worker_hosts[0], 10000), (worker_hosts[0], 10010)]
309    else:
310        # run one worker per each remote host (for each language)
311        workers = [(worker_host, 10000) for worker_host in worker_hosts]
312
313    return [
314        create_qpsworker_job(language,
315                             shortname='qps_worker_%s_%s' %
316                             (language, worker_idx),
317                             port=worker[1] + language.worker_port_offset(),
318                             remote_host=worker[0],
319                             perf_cmd=perf_cmd)
320        for language in languages
321        for worker_idx, worker in enumerate(workers)
322    ]
323
324
325def perf_report_processor_job(worker_host, perf_base_name, output_filename,
326                              flame_graph_reports):
327    print('Creating perf report collection job for %s' % worker_host)
328    cmd = ''
329    if worker_host != 'localhost':
330        user_at_host = "%s@%s" % (_REMOTE_HOST_USERNAME, worker_host)
331        cmd = "USER_AT_HOST=%s OUTPUT_FILENAME=%s OUTPUT_DIR=%s PERF_BASE_NAME=%s tools/run_tests/performance/process_remote_perf_flamegraphs.sh" % (
332            user_at_host, output_filename, flame_graph_reports, perf_base_name)
333    else:
334        cmd = "OUTPUT_FILENAME=%s OUTPUT_DIR=%s PERF_BASE_NAME=%s tools/run_tests/performance/process_local_perf_flamegraphs.sh" % (
335            output_filename, flame_graph_reports, perf_base_name)
336
337    return jobset.JobSpec(cmdline=cmd,
338                          timeout_seconds=3 * 60,
339                          shell=True,
340                          verbose_success=True,
341                          shortname='process perf report')
342
343
344Scenario = collections.namedtuple('Scenario', 'jobspec workers name')
345
346
347def create_scenarios(languages,
348                     workers_by_lang,
349                     remote_host=None,
350                     regex='.*',
351                     category='all',
352                     bq_result_table=None,
353                     netperf=False,
354                     netperf_hosts=[],
355                     server_cpu_load=0):
356    """Create jobspecs for scenarios to run."""
357    all_workers = [
358        worker for workers in list(workers_by_lang.values())
359        for worker in workers
360    ]
361    scenarios = []
362    _NO_WORKERS = []
363
364    if netperf:
365        if not netperf_hosts:
366            netperf_server = 'localhost'
367            netperf_client = None
368        elif len(netperf_hosts) == 1:
369            netperf_server = netperf_hosts[0]
370            netperf_client = netperf_hosts[0]
371        else:
372            netperf_server = netperf_hosts[0]
373            netperf_client = netperf_hosts[1]
374        scenarios.append(
375            Scenario(
376                create_netperf_jobspec(server_host=netperf_server,
377                                       client_host=netperf_client,
378                                       bq_result_table=bq_result_table),
379                _NO_WORKERS, 'netperf'))
380
381    for language in languages:
382        for scenario_json in language.scenarios():
383            if re.search(regex, scenario_json['name']):
384                categories = scenario_json.get('CATEGORIES',
385                                               ['scalable', 'smoketest'])
386                if category in categories or category == 'all':
387                    workers = workers_by_lang[str(language)][:]
388                    # 'SERVER_LANGUAGE' is an indicator for this script to pick
389                    # a server in different language.
390                    custom_server_lang = scenario_json.get(
391                        'SERVER_LANGUAGE', None)
392                    custom_client_lang = scenario_json.get(
393                        'CLIENT_LANGUAGE', None)
394                    scenario_json = scenario_config.remove_nonproto_fields(
395                        scenario_json)
396                    if custom_server_lang and custom_client_lang:
397                        raise Exception(
398                            'Cannot set both custom CLIENT_LANGUAGE and SERVER_LANGUAGE'
399                            'in the same scenario')
400                    if custom_server_lang:
401                        if not workers_by_lang.get(custom_server_lang, []):
402                            print('Warning: Skipping scenario %s as' %
403                                  scenario_json['name'])
404                            print(
405                                'SERVER_LANGUAGE is set to %s yet the language has '
406                                'not been selected with -l' %
407                                custom_server_lang)
408                            continue
409                        for idx in range(0, scenario_json['num_servers']):
410                            # replace first X workers by workers of a different language
411                            workers[idx] = workers_by_lang[custom_server_lang][
412                                idx]
413                    if custom_client_lang:
414                        if not workers_by_lang.get(custom_client_lang, []):
415                            print('Warning: Skipping scenario %s as' %
416                                  scenario_json['name'])
417                            print(
418                                'CLIENT_LANGUAGE is set to %s yet the language has '
419                                'not been selected with -l' %
420                                custom_client_lang)
421                            continue
422                        for idx in range(scenario_json['num_servers'],
423                                         len(workers)):
424                            # replace all client workers by workers of a different language,
425                            # leave num_server workers as they are server workers.
426                            workers[idx] = workers_by_lang[custom_client_lang][
427                                idx]
428                    scenario = Scenario(
429                        create_scenario_jobspec(
430                            scenario_json, [w.host_and_port for w in workers],
431                            remote_host=remote_host,
432                            bq_result_table=bq_result_table,
433                            server_cpu_load=server_cpu_load), workers,
434                        scenario_json['name'])
435                    scenarios.append(scenario)
436
437    return scenarios
438
439
440def finish_qps_workers(jobs, qpsworker_jobs):
441    """Waits for given jobs to finish and eventually kills them."""
442    retries = 0
443    num_killed = 0
444    while any(job.is_running() for job in jobs):
445        for job in qpsworker_jobs:
446            if job.is_running():
447                print('QPS worker "%s" is still running.' % job.host_and_port)
448        if retries > 10:
449            print('Killing all QPS workers.')
450            for job in jobs:
451                job.kill()
452                num_killed += 1
453        retries += 1
454        time.sleep(3)
455    print('All QPS workers finished.')
456    return num_killed
457
458
459profile_output_files = []
460
461
462# Collect perf text reports and flamegraphs if perf_cmd was used
463# Note the base names of perf text reports are used when creating and processing
464# perf data. The scenario name uniqifies the output name in the final
465# perf reports directory.
466# Alos, the perf profiles need to be fetched and processed after each scenario
467# in order to avoid clobbering the output files.
468def run_collect_perf_profile_jobs(hosts_and_base_names, scenario_name,
469                                  flame_graph_reports):
470    perf_report_jobs = []
471    global profile_output_files
472    for host_and_port in hosts_and_base_names:
473        perf_base_name = hosts_and_base_names[host_and_port]
474        output_filename = '%s-%s' % (scenario_name, perf_base_name)
475        # from the base filename, create .svg output filename
476        host = host_and_port.split(':')[0]
477        profile_output_files.append('%s.svg' % output_filename)
478        perf_report_jobs.append(
479            perf_report_processor_job(host, perf_base_name, output_filename,
480                                      flame_graph_reports))
481
482    jobset.message('START',
483                   'Collecting perf reports from qps workers',
484                   do_newline=True)
485    failures, _ = jobset.run(perf_report_jobs,
486                             newline_on_success=True,
487                             maxjobs=1)
488    jobset.message('SUCCESS',
489                   'Collecting perf reports from qps workers',
490                   do_newline=True)
491    return failures
492
493
494def main():
495    argp = argparse.ArgumentParser(description='Run performance tests.')
496    argp.add_argument('-l',
497                      '--language',
498                      choices=['all'] +
499                      sorted(scenario_config.LANGUAGES.keys()),
500                      nargs='+',
501                      required=True,
502                      help='Languages to benchmark.')
503    argp.add_argument(
504        '--remote_driver_host',
505        default=None,
506        help=
507        'Run QPS driver on given host. By default, QPS driver is run locally.')
508    argp.add_argument('--remote_worker_host',
509                      nargs='+',
510                      default=[],
511                      help='Worker hosts where to start QPS workers.')
512    argp.add_argument(
513        '--dry_run',
514        default=False,
515        action='store_const',
516        const=True,
517        help='Just list scenarios to be run, but don\'t run them.')
518    argp.add_argument('-r',
519                      '--regex',
520                      default='.*',
521                      type=str,
522                      help='Regex to select scenarios to run.')
523    argp.add_argument('--bq_result_table',
524                      default=None,
525                      type=str,
526                      help='Bigquery "dataset.table" to upload results to.')
527    argp.add_argument('--category',
528                      choices=['smoketest', 'all', 'scalable', 'sweep'],
529                      default='all',
530                      help='Select a category of tests to run.')
531    argp.add_argument('--netperf',
532                      default=False,
533                      action='store_const',
534                      const=True,
535                      help='Run netperf benchmark as one of the scenarios.')
536    argp.add_argument(
537        '--server_cpu_load',
538        default=0,
539        type=int,
540        help='Select a targeted server cpu load to run. 0 means ignore this flag'
541    )
542    argp.add_argument('-x',
543                      '--xml_report',
544                      default='report.xml',
545                      type=str,
546                      help='Name of XML report file to generate.')
547    argp.add_argument(
548        '--perf_args',
549        help=('Example usage: "--perf_args=record -F 99 -g". '
550              'Wrap QPS workers in a perf command '
551              'with the arguments to perf specified here. '
552              '".svg" flame graph profiles will be '
553              'created for each Qps Worker on each scenario. '
554              'Files will output to "<repo_root>/<args.flame_graph_reports>" '
555              'directory. Output files from running the worker '
556              'under perf are saved in the repo root where its ran. '
557              'Note that the perf "-g" flag is necessary for '
558              'flame graphs generation to work (assuming the binary '
559              'being profiled uses frame pointers, check out '
560              '"--call-graph dwarf" option using libunwind otherwise.) '
561              'Also note that the entire "--perf_args=<arg(s)>" must '
562              'be wrapped in quotes as in the example usage. '
563              'If the "--perg_args" is unspecified, "perf" will '
564              'not be used at all. '
565              'See http://www.brendangregg.com/perf.html '
566              'for more general perf examples.'))
567    argp.add_argument(
568        '--skip_generate_flamegraphs',
569        default=False,
570        action='store_const',
571        const=True,
572        help=('Turn flame graph generation off. '
573              'May be useful if "perf_args" arguments do not make sense for '
574              'generating flamegraphs (e.g., "--perf_args=stat ...")'))
575    argp.add_argument(
576        '-f',
577        '--flame_graph_reports',
578        default='perf_reports',
579        type=str,
580        help=
581        'Name of directory to output flame graph profiles to, if any are created.'
582    )
583    argp.add_argument(
584        '-u',
585        '--remote_host_username',
586        default='',
587        type=str,
588        help='Use a username that isn\'t "Jenkins" to SSH into remote workers.')
589
590    args = argp.parse_args()
591
592    global _REMOTE_HOST_USERNAME
593    if args.remote_host_username:
594        _REMOTE_HOST_USERNAME = args.remote_host_username
595
596    languages = set(
597        scenario_config.LANGUAGES[l] for l in itertools.chain.from_iterable(
598            six.iterkeys(scenario_config.LANGUAGES) if x == 'all' else [x]
599            for x in args.language))
600
601    # Put together set of remote hosts where to run and build
602    remote_hosts = set()
603    if args.remote_worker_host:
604        for host in args.remote_worker_host:
605            remote_hosts.add(host)
606    if args.remote_driver_host:
607        remote_hosts.add(args.remote_driver_host)
608
609    if not args.dry_run:
610        if remote_hosts:
611            archive_repo(languages=[str(l) for l in languages])
612            prepare_remote_hosts(remote_hosts, prepare_local=True)
613        else:
614            prepare_remote_hosts([], prepare_local=True)
615
616    build_local = False
617    if not args.remote_driver_host:
618        build_local = True
619    if not args.dry_run:
620        build_on_remote_hosts(remote_hosts,
621                              languages=[str(l) for l in languages],
622                              build_local=build_local)
623
624    perf_cmd = None
625    if args.perf_args:
626        print('Running workers under perf profiler')
627        # Expect /usr/bin/perf to be installed here, as is usual
628        perf_cmd = ['/usr/bin/perf']
629        perf_cmd.extend(re.split('\s+', args.perf_args))
630
631    qpsworker_jobs = create_qpsworkers(languages,
632                                       args.remote_worker_host,
633                                       perf_cmd=perf_cmd)
634
635    # get list of worker addresses for each language.
636    workers_by_lang = dict([(str(language), []) for language in languages])
637    for job in qpsworker_jobs:
638        workers_by_lang[str(job.language)].append(job)
639
640    scenarios = create_scenarios(languages,
641                                 workers_by_lang=workers_by_lang,
642                                 remote_host=args.remote_driver_host,
643                                 regex=args.regex,
644                                 category=args.category,
645                                 bq_result_table=args.bq_result_table,
646                                 netperf=args.netperf,
647                                 netperf_hosts=args.remote_worker_host,
648                                 server_cpu_load=args.server_cpu_load)
649
650    if not scenarios:
651        raise Exception('No scenarios to run')
652
653    total_scenario_failures = 0
654    qps_workers_killed = 0
655    merged_resultset = {}
656    perf_report_failures = 0
657
658    for scenario in scenarios:
659        if args.dry_run:
660            print(scenario.name)
661        else:
662            scenario_failures = 0
663            try:
664                for worker in scenario.workers:
665                    worker.start()
666                jobs = [scenario.jobspec]
667                if scenario.workers:
668                    # TODO(jtattermusch): ideally the "quit" job won't show up
669                    # in the report
670                    jobs.append(
671                        create_quit_jobspec(
672                            scenario.workers,
673                            remote_host=args.remote_driver_host))
674                scenario_failures, resultset = jobset.run(
675                    jobs, newline_on_success=True, maxjobs=1)
676                total_scenario_failures += scenario_failures
677                merged_resultset = dict(
678                    itertools.chain(six.iteritems(merged_resultset),
679                                    six.iteritems(resultset)))
680            finally:
681                # Consider qps workers that need to be killed as failures
682                qps_workers_killed += finish_qps_workers(
683                    scenario.workers, qpsworker_jobs)
684
685            if perf_cmd and scenario_failures == 0 and not args.skip_generate_flamegraphs:
686                workers_and_base_names = {}
687                for worker in scenario.workers:
688                    if not worker.perf_file_base_name:
689                        raise Exception(
690                            'using perf buf perf report filename is unspecified'
691                        )
692                    workers_and_base_names[
693                        worker.host_and_port] = worker.perf_file_base_name
694                perf_report_failures += run_collect_perf_profile_jobs(
695                    workers_and_base_names, scenario.name,
696                    args.flame_graph_reports)
697
698    # Still write the index.html even if some scenarios failed.
699    # 'profile_output_files' will only have names for scenarios that passed
700    if perf_cmd and not args.skip_generate_flamegraphs:
701        # write the index fil to the output dir, with all profiles from all scenarios/workers
702        report_utils.render_perf_profiling_results(
703            '%s/index.html' % args.flame_graph_reports, profile_output_files)
704
705    report_utils.render_junit_xml_report(merged_resultset,
706                                         args.xml_report,
707                                         suite_name='benchmarks',
708                                         multi_target=True)
709
710    if total_scenario_failures > 0 or qps_workers_killed > 0:
711        print('%s scenarios failed and %s qps worker jobs killed' %
712              (total_scenario_failures, qps_workers_killed))
713        sys.exit(1)
714
715    if perf_report_failures > 0:
716        print('%s perf profile collection jobs failed' % perf_report_failures)
717        sys.exit(1)
718
719
720if __name__ == "__main__":
721    main()
722