1#!/usr/bin/env python3 2# Copyright 2016 gRPC authors. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15"""Run performance tests locally or remotely.""" 16 17from __future__ import print_function 18 19import argparse 20import collections 21import itertools 22import json 23import os 24import re 25import shlex 26import sys 27import time 28 29import six 30 31import performance.scenario_config as scenario_config 32import python_utils.jobset as jobset 33import python_utils.report_utils as report_utils 34 35_ROOT = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), "../..")) 36os.chdir(_ROOT) 37 38_REMOTE_HOST_USERNAME = "jenkins" 39 40_SCENARIO_TIMEOUT = 3 * 60 41_WORKER_TIMEOUT = 3 * 60 42_NETPERF_TIMEOUT = 60 43_QUIT_WORKER_TIMEOUT = 2 * 60 44 45 46class QpsWorkerJob: 47 """Encapsulates a qps worker server job.""" 48 49 def __init__(self, spec, language, host_and_port, perf_file_base_name=None): 50 self._spec = spec 51 self.language = language 52 self.host_and_port = host_and_port 53 self._job = None 54 self.perf_file_base_name = perf_file_base_name 55 56 def start(self): 57 self._job = jobset.Job( 58 self._spec, newline_on_success=True, travis=True, add_env={} 59 ) 60 61 def is_running(self): 62 """Polls a job and returns True if given job is still running.""" 63 return self._job and self._job.state() == jobset._RUNNING 64 65 def kill(self): 66 if self._job: 67 self._job.kill() 68 self._job = None 69 70 71def create_qpsworker_job( 72 language, shortname=None, port=10000, remote_host=None, perf_cmd=None 73): 74 cmdline = language.worker_cmdline() + ["--driver_port=%s" % port] 75 76 if remote_host: 77 host_and_port = "%s:%s" % (remote_host, port) 78 else: 79 host_and_port = "localhost:%s" % port 80 81 perf_file_base_name = None 82 if perf_cmd: 83 perf_file_base_name = "%s-%s" % (host_and_port, shortname) 84 # specify -o output file so perf.data gets collected when worker stopped 85 cmdline = ( 86 perf_cmd + ["-o", "%s-perf.data" % perf_file_base_name] + cmdline 87 ) 88 89 worker_timeout = _WORKER_TIMEOUT 90 if remote_host: 91 user_at_host = "%s@%s" % (_REMOTE_HOST_USERNAME, remote_host) 92 ssh_cmd = ["ssh"] 93 cmdline = ["timeout", "%s" % (worker_timeout + 30)] + cmdline 94 ssh_cmd.extend( 95 [ 96 str(user_at_host), 97 "cd ~/performance_workspace/grpc/ && %s" % " ".join(cmdline), 98 ] 99 ) 100 cmdline = ssh_cmd 101 102 jobspec = jobset.JobSpec( 103 cmdline=cmdline, 104 shortname=shortname, 105 timeout_seconds=worker_timeout, # workers get restarted after each scenario 106 verbose_success=True, 107 ) 108 return QpsWorkerJob(jobspec, language, host_and_port, perf_file_base_name) 109 110 111def create_scenario_jobspec( 112 scenario_json, 113 workers, 114 remote_host=None, 115 bq_result_table=None, 116 server_cpu_load=0, 117): 118 """Runs one scenario using QPS driver.""" 119 # setting QPS_WORKERS env variable here makes sure it works with SSH too. 120 cmd = 'QPS_WORKERS="%s" ' % ",".join(workers) 121 if bq_result_table: 122 cmd += 'BQ_RESULT_TABLE="%s" ' % bq_result_table 123 cmd += "tools/run_tests/performance/run_qps_driver.sh " 124 cmd += "--scenarios_json=%s " % shlex.quote( 125 json.dumps({"scenarios": [scenario_json]}) 126 ) 127 cmd += "--scenario_result_file=scenario_result.json " 128 if server_cpu_load != 0: 129 cmd += ( 130 "--search_param=offered_load --initial_search_value=1000" 131 " --targeted_cpu_load=%d --stride=500 --error_tolerance=0.01" 132 % server_cpu_load 133 ) 134 if remote_host: 135 user_at_host = "%s@%s" % (_REMOTE_HOST_USERNAME, remote_host) 136 cmd = 'ssh %s "cd ~/performance_workspace/grpc/ && "%s' % ( 137 user_at_host, 138 shlex.quote(cmd), 139 ) 140 141 return jobset.JobSpec( 142 cmdline=[cmd], 143 shortname="%s" % scenario_json["name"], 144 timeout_seconds=_SCENARIO_TIMEOUT, 145 shell=True, 146 verbose_success=True, 147 ) 148 149 150def create_quit_jobspec(workers, remote_host=None): 151 """Runs quit using QPS driver.""" 152 # setting QPS_WORKERS env variable here makes sure it works with SSH too. 153 cmd = 'QPS_WORKERS="%s" cmake/build/qps_json_driver --quit' % ",".join( 154 w.host_and_port for w in workers 155 ) 156 if remote_host: 157 user_at_host = "%s@%s" % (_REMOTE_HOST_USERNAME, remote_host) 158 cmd = 'ssh %s "cd ~/performance_workspace/grpc/ && "%s' % ( 159 user_at_host, 160 shlex.quote(cmd), 161 ) 162 163 return jobset.JobSpec( 164 cmdline=[cmd], 165 shortname="shutdown_workers", 166 timeout_seconds=_QUIT_WORKER_TIMEOUT, 167 shell=True, 168 verbose_success=True, 169 ) 170 171 172def create_netperf_jobspec( 173 server_host="localhost", client_host=None, bq_result_table=None 174): 175 """Runs netperf benchmark.""" 176 cmd = 'NETPERF_SERVER_HOST="%s" ' % server_host 177 if bq_result_table: 178 cmd += 'BQ_RESULT_TABLE="%s" ' % bq_result_table 179 if client_host: 180 # If netperf is running remotely, the env variables populated by Jenkins 181 # won't be available on the client, but we need them for uploading results 182 # to BigQuery. 183 jenkins_job_name = os.getenv("KOKORO_JOB_NAME") 184 if jenkins_job_name: 185 cmd += 'KOKORO_JOB_NAME="%s" ' % jenkins_job_name 186 jenkins_build_number = os.getenv("KOKORO_BUILD_NUMBER") 187 if jenkins_build_number: 188 cmd += 'KOKORO_BUILD_NUMBER="%s" ' % jenkins_build_number 189 190 cmd += "tools/run_tests/performance/run_netperf.sh" 191 if client_host: 192 user_at_host = "%s@%s" % (_REMOTE_HOST_USERNAME, client_host) 193 cmd = 'ssh %s "cd ~/performance_workspace/grpc/ && "%s' % ( 194 user_at_host, 195 shlex.quote(cmd), 196 ) 197 198 return jobset.JobSpec( 199 cmdline=[cmd], 200 shortname="netperf", 201 timeout_seconds=_NETPERF_TIMEOUT, 202 shell=True, 203 verbose_success=True, 204 ) 205 206 207def archive_repo(languages): 208 """Archives local version of repo including submodules.""" 209 cmdline = ["tar", "-cf", "../grpc.tar", "../grpc/"] 210 if "java" in languages: 211 cmdline.append("../grpc-java") 212 if "go" in languages: 213 cmdline.append("../grpc-go") 214 if "node" in languages: 215 cmdline.append("../grpc-node") 216 217 archive_job = jobset.JobSpec( 218 cmdline=cmdline, shortname="archive_repo", timeout_seconds=3 * 60 219 ) 220 221 jobset.message("START", "Archiving local repository.", do_newline=True) 222 num_failures, _ = jobset.run( 223 [archive_job], newline_on_success=True, maxjobs=1 224 ) 225 if num_failures == 0: 226 jobset.message( 227 "SUCCESS", 228 "Archive with local repository created successfully.", 229 do_newline=True, 230 ) 231 else: 232 jobset.message( 233 "FAILED", "Failed to archive local repository.", do_newline=True 234 ) 235 sys.exit(1) 236 237 238def prepare_remote_hosts(hosts, prepare_local=False): 239 """Prepares remote hosts (and maybe prepare localhost as well).""" 240 prepare_timeout = 10 * 60 241 prepare_jobs = [] 242 for host in hosts: 243 user_at_host = "%s@%s" % (_REMOTE_HOST_USERNAME, host) 244 prepare_jobs.append( 245 jobset.JobSpec( 246 cmdline=["tools/run_tests/performance/remote_host_prepare.sh"], 247 shortname="remote_host_prepare.%s" % host, 248 environ={"USER_AT_HOST": user_at_host}, 249 timeout_seconds=prepare_timeout, 250 ) 251 ) 252 if prepare_local: 253 # Prepare localhost as well 254 prepare_jobs.append( 255 jobset.JobSpec( 256 cmdline=["tools/run_tests/performance/kill_workers.sh"], 257 shortname="local_prepare", 258 timeout_seconds=prepare_timeout, 259 ) 260 ) 261 jobset.message("START", "Preparing hosts.", do_newline=True) 262 num_failures, _ = jobset.run( 263 prepare_jobs, newline_on_success=True, maxjobs=10 264 ) 265 if num_failures == 0: 266 jobset.message( 267 "SUCCESS", "Prepare step completed successfully.", do_newline=True 268 ) 269 else: 270 jobset.message( 271 "FAILED", "Failed to prepare remote hosts.", do_newline=True 272 ) 273 sys.exit(1) 274 275 276def build_on_remote_hosts( 277 hosts, languages=list(scenario_config.LANGUAGES.keys()), build_local=False 278): 279 """Builds performance worker on remote hosts (and maybe also locally).""" 280 build_timeout = 45 * 60 281 # Kokoro VMs (which are local only) do not have caching, so they need more time to build 282 local_build_timeout = 60 * 60 283 build_jobs = [] 284 for host in hosts: 285 user_at_host = "%s@%s" % (_REMOTE_HOST_USERNAME, host) 286 build_jobs.append( 287 jobset.JobSpec( 288 cmdline=["tools/run_tests/performance/remote_host_build.sh"] 289 + languages, 290 shortname="remote_host_build.%s" % host, 291 environ={"USER_AT_HOST": user_at_host, "CONFIG": "opt"}, 292 timeout_seconds=build_timeout, 293 ) 294 ) 295 if build_local: 296 # start port server locally 297 build_jobs.append( 298 jobset.JobSpec( 299 cmdline=["python", "tools/run_tests/start_port_server.py"], 300 shortname="local_start_port_server", 301 timeout_seconds=2 * 60, 302 ) 303 ) 304 # Build locally as well 305 build_jobs.append( 306 jobset.JobSpec( 307 cmdline=["tools/run_tests/performance/build_performance.sh"] 308 + languages, 309 shortname="local_build", 310 environ={"CONFIG": "opt"}, 311 timeout_seconds=local_build_timeout, 312 ) 313 ) 314 jobset.message("START", "Building.", do_newline=True) 315 num_failures, _ = jobset.run( 316 build_jobs, newline_on_success=True, maxjobs=10 317 ) 318 if num_failures == 0: 319 jobset.message("SUCCESS", "Built successfully.", do_newline=True) 320 else: 321 jobset.message("FAILED", "Build failed.", do_newline=True) 322 sys.exit(1) 323 324 325def create_qpsworkers(languages, worker_hosts, perf_cmd=None): 326 """Creates QPS workers (but does not start them).""" 327 if not worker_hosts: 328 # run two workers locally (for each language) 329 workers = [(None, 10000), (None, 10010)] 330 elif len(worker_hosts) == 1: 331 # run two workers on the remote host (for each language) 332 workers = [(worker_hosts[0], 10000), (worker_hosts[0], 10010)] 333 else: 334 # run one worker per each remote host (for each language) 335 workers = [(worker_host, 10000) for worker_host in worker_hosts] 336 337 return [ 338 create_qpsworker_job( 339 language, 340 shortname="qps_worker_%s_%s" % (language, worker_idx), 341 port=worker[1] + language.worker_port_offset(), 342 remote_host=worker[0], 343 perf_cmd=perf_cmd, 344 ) 345 for language in languages 346 for worker_idx, worker in enumerate(workers) 347 ] 348 349 350def perf_report_processor_job( 351 worker_host, perf_base_name, output_filename, flame_graph_reports 352): 353 print("Creating perf report collection job for %s" % worker_host) 354 cmd = "" 355 if worker_host != "localhost": 356 user_at_host = "%s@%s" % (_REMOTE_HOST_USERNAME, worker_host) 357 cmd = ( 358 "USER_AT_HOST=%s OUTPUT_FILENAME=%s OUTPUT_DIR=%s PERF_BASE_NAME=%s" 359 " tools/run_tests/performance/process_remote_perf_flamegraphs.sh" 360 % ( 361 user_at_host, 362 output_filename, 363 flame_graph_reports, 364 perf_base_name, 365 ) 366 ) 367 else: 368 cmd = ( 369 "OUTPUT_FILENAME=%s OUTPUT_DIR=%s PERF_BASE_NAME=%s" 370 " tools/run_tests/performance/process_local_perf_flamegraphs.sh" 371 % (output_filename, flame_graph_reports, perf_base_name) 372 ) 373 374 return jobset.JobSpec( 375 cmdline=cmd, 376 timeout_seconds=3 * 60, 377 shell=True, 378 verbose_success=True, 379 shortname="process perf report", 380 ) 381 382 383Scenario = collections.namedtuple("Scenario", "jobspec workers name") 384 385 386def create_scenarios( 387 languages, 388 workers_by_lang, 389 remote_host=None, 390 regex=".*", 391 category="all", 392 bq_result_table=None, 393 netperf=False, 394 netperf_hosts=[], 395 server_cpu_load=0, 396): 397 """Create jobspecs for scenarios to run.""" 398 all_workers = [ 399 worker 400 for workers in list(workers_by_lang.values()) 401 for worker in workers 402 ] 403 scenarios = [] 404 _NO_WORKERS = [] 405 406 if netperf: 407 if not netperf_hosts: 408 netperf_server = "localhost" 409 netperf_client = None 410 elif len(netperf_hosts) == 1: 411 netperf_server = netperf_hosts[0] 412 netperf_client = netperf_hosts[0] 413 else: 414 netperf_server = netperf_hosts[0] 415 netperf_client = netperf_hosts[1] 416 scenarios.append( 417 Scenario( 418 create_netperf_jobspec( 419 server_host=netperf_server, 420 client_host=netperf_client, 421 bq_result_table=bq_result_table, 422 ), 423 _NO_WORKERS, 424 "netperf", 425 ) 426 ) 427 428 for language in languages: 429 for scenario_json in language.scenarios(): 430 if re.search(regex, scenario_json["name"]): 431 categories = scenario_json.get( 432 "CATEGORIES", ["scalable", "smoketest"] 433 ) 434 if category in categories or category == "all": 435 workers = workers_by_lang[str(language)][:] 436 # 'SERVER_LANGUAGE' is an indicator for this script to pick 437 # a server in different language. 438 custom_server_lang = scenario_json.get( 439 "SERVER_LANGUAGE", None 440 ) 441 custom_client_lang = scenario_json.get( 442 "CLIENT_LANGUAGE", None 443 ) 444 scenario_json = scenario_config.remove_nonproto_fields( 445 scenario_json 446 ) 447 if custom_server_lang and custom_client_lang: 448 raise Exception( 449 "Cannot set both custom CLIENT_LANGUAGE and" 450 " SERVER_LANGUAGEin the same scenario" 451 ) 452 if custom_server_lang: 453 if not workers_by_lang.get(custom_server_lang, []): 454 print( 455 "Warning: Skipping scenario %s as" 456 % scenario_json["name"] 457 ) 458 print( 459 "SERVER_LANGUAGE is set to %s yet the language" 460 " has not been selected with -l" 461 % custom_server_lang 462 ) 463 continue 464 for idx in range(0, scenario_json["num_servers"]): 465 # replace first X workers by workers of a different language 466 workers[idx] = workers_by_lang[custom_server_lang][ 467 idx 468 ] 469 if custom_client_lang: 470 if not workers_by_lang.get(custom_client_lang, []): 471 print( 472 "Warning: Skipping scenario %s as" 473 % scenario_json["name"] 474 ) 475 print( 476 "CLIENT_LANGUAGE is set to %s yet the language" 477 " has not been selected with -l" 478 % custom_client_lang 479 ) 480 continue 481 for idx in range( 482 scenario_json["num_servers"], len(workers) 483 ): 484 # replace all client workers by workers of a different language, 485 # leave num_server workers as they are server workers. 486 workers[idx] = workers_by_lang[custom_client_lang][ 487 idx 488 ] 489 scenario = Scenario( 490 create_scenario_jobspec( 491 scenario_json, 492 [w.host_and_port for w in workers], 493 remote_host=remote_host, 494 bq_result_table=bq_result_table, 495 server_cpu_load=server_cpu_load, 496 ), 497 workers, 498 scenario_json["name"], 499 ) 500 scenarios.append(scenario) 501 502 return scenarios 503 504 505def finish_qps_workers(jobs, qpsworker_jobs): 506 """Waits for given jobs to finish and eventually kills them.""" 507 retries = 0 508 num_killed = 0 509 while any(job.is_running() for job in jobs): 510 for job in qpsworker_jobs: 511 if job.is_running(): 512 print('QPS worker "%s" is still running.' % job.host_and_port) 513 if retries > 10: 514 print("Killing all QPS workers.") 515 for job in jobs: 516 job.kill() 517 num_killed += 1 518 retries += 1 519 time.sleep(3) 520 print("All QPS workers finished.") 521 return num_killed 522 523 524profile_output_files = [] 525 526 527# Collect perf text reports and flamegraphs if perf_cmd was used 528# Note the base names of perf text reports are used when creating and processing 529# perf data. The scenario name uniqifies the output name in the final 530# perf reports directory. 531# Alos, the perf profiles need to be fetched and processed after each scenario 532# in order to avoid clobbering the output files. 533def run_collect_perf_profile_jobs( 534 hosts_and_base_names, scenario_name, flame_graph_reports 535): 536 perf_report_jobs = [] 537 global profile_output_files 538 for host_and_port in hosts_and_base_names: 539 perf_base_name = hosts_and_base_names[host_and_port] 540 output_filename = "%s-%s" % (scenario_name, perf_base_name) 541 # from the base filename, create .svg output filename 542 host = host_and_port.split(":")[0] 543 profile_output_files.append("%s.svg" % output_filename) 544 perf_report_jobs.append( 545 perf_report_processor_job( 546 host, perf_base_name, output_filename, flame_graph_reports 547 ) 548 ) 549 550 jobset.message( 551 "START", "Collecting perf reports from qps workers", do_newline=True 552 ) 553 failures, _ = jobset.run( 554 perf_report_jobs, newline_on_success=True, maxjobs=1 555 ) 556 jobset.message( 557 "SUCCESS", "Collecting perf reports from qps workers", do_newline=True 558 ) 559 return failures 560 561 562def main(): 563 argp = argparse.ArgumentParser(description="Run performance tests.") 564 argp.add_argument( 565 "-l", 566 "--language", 567 choices=["all"] + sorted(scenario_config.LANGUAGES.keys()), 568 nargs="+", 569 required=True, 570 help="Languages to benchmark.", 571 ) 572 argp.add_argument( 573 "--remote_driver_host", 574 default=None, 575 help=( 576 "Run QPS driver on given host. By default, QPS driver is run" 577 " locally." 578 ), 579 ) 580 argp.add_argument( 581 "--remote_worker_host", 582 nargs="+", 583 default=[], 584 help="Worker hosts where to start QPS workers.", 585 ) 586 argp.add_argument( 587 "--dry_run", 588 default=False, 589 action="store_const", 590 const=True, 591 help="Just list scenarios to be run, but don't run them.", 592 ) 593 argp.add_argument( 594 "-r", 595 "--regex", 596 default=".*", 597 type=str, 598 help="Regex to select scenarios to run.", 599 ) 600 argp.add_argument( 601 "--bq_result_table", 602 default=None, 603 type=str, 604 help='Bigquery "dataset.table" to upload results to.', 605 ) 606 argp.add_argument( 607 "--category", 608 choices=["smoketest", "all", "scalable", "sweep"], 609 default="all", 610 help="Select a category of tests to run.", 611 ) 612 argp.add_argument( 613 "--netperf", 614 default=False, 615 action="store_const", 616 const=True, 617 help="Run netperf benchmark as one of the scenarios.", 618 ) 619 argp.add_argument( 620 "--server_cpu_load", 621 default=0, 622 type=int, 623 help=( 624 "Select a targeted server cpu load to run. 0 means ignore this flag" 625 ), 626 ) 627 argp.add_argument( 628 "-x", 629 "--xml_report", 630 default="report.xml", 631 type=str, 632 help="Name of XML report file to generate.", 633 ) 634 argp.add_argument( 635 "--perf_args", 636 help=( 637 'Example usage: "--perf_args=record -F 99 -g". ' 638 "Wrap QPS workers in a perf command " 639 "with the arguments to perf specified here. " 640 '".svg" flame graph profiles will be ' 641 "created for each Qps Worker on each scenario. " 642 'Files will output to "<repo_root>/<args.flame_graph_reports>" ' 643 "directory. Output files from running the worker " 644 "under perf are saved in the repo root where its ran. " 645 'Note that the perf "-g" flag is necessary for ' 646 "flame graphs generation to work (assuming the binary " 647 "being profiled uses frame pointers, check out " 648 '"--call-graph dwarf" option using libunwind otherwise.) ' 649 'Also note that the entire "--perf_args=<arg(s)>" must ' 650 "be wrapped in quotes as in the example usage. " 651 'If the "--perg_args" is unspecified, "perf" will ' 652 "not be used at all. " 653 "See http://www.brendangregg.com/perf.html " 654 "for more general perf examples." 655 ), 656 ) 657 argp.add_argument( 658 "--skip_generate_flamegraphs", 659 default=False, 660 action="store_const", 661 const=True, 662 help=( 663 "Turn flame graph generation off. " 664 'May be useful if "perf_args" arguments do not make sense for ' 665 'generating flamegraphs (e.g., "--perf_args=stat ...")' 666 ), 667 ) 668 argp.add_argument( 669 "-f", 670 "--flame_graph_reports", 671 default="perf_reports", 672 type=str, 673 help=( 674 "Name of directory to output flame graph profiles to, if any are" 675 " created." 676 ), 677 ) 678 argp.add_argument( 679 "-u", 680 "--remote_host_username", 681 default="", 682 type=str, 683 help='Use a username that isn\'t "Jenkins" to SSH into remote workers.', 684 ) 685 686 args = argp.parse_args() 687 688 global _REMOTE_HOST_USERNAME 689 if args.remote_host_username: 690 _REMOTE_HOST_USERNAME = args.remote_host_username 691 692 languages = set( 693 scenario_config.LANGUAGES[l] 694 for l in itertools.chain.from_iterable( 695 six.iterkeys(scenario_config.LANGUAGES) if x == "all" else [x] 696 for x in args.language 697 ) 698 ) 699 700 # Put together set of remote hosts where to run and build 701 remote_hosts = set() 702 if args.remote_worker_host: 703 for host in args.remote_worker_host: 704 remote_hosts.add(host) 705 if args.remote_driver_host: 706 remote_hosts.add(args.remote_driver_host) 707 708 if not args.dry_run: 709 if remote_hosts: 710 archive_repo(languages=[str(l) for l in languages]) 711 prepare_remote_hosts(remote_hosts, prepare_local=True) 712 else: 713 prepare_remote_hosts([], prepare_local=True) 714 715 build_local = False 716 if not args.remote_driver_host: 717 build_local = True 718 if not args.dry_run: 719 build_on_remote_hosts( 720 remote_hosts, 721 languages=[str(l) for l in languages], 722 build_local=build_local, 723 ) 724 725 perf_cmd = None 726 if args.perf_args: 727 print("Running workers under perf profiler") 728 # Expect /usr/bin/perf to be installed here, as is usual 729 perf_cmd = ["/usr/bin/perf"] 730 perf_cmd.extend(re.split("\s+", args.perf_args)) 731 732 qpsworker_jobs = create_qpsworkers( 733 languages, args.remote_worker_host, perf_cmd=perf_cmd 734 ) 735 736 # get list of worker addresses for each language. 737 workers_by_lang = dict([(str(language), []) for language in languages]) 738 for job in qpsworker_jobs: 739 workers_by_lang[str(job.language)].append(job) 740 741 scenarios = create_scenarios( 742 languages, 743 workers_by_lang=workers_by_lang, 744 remote_host=args.remote_driver_host, 745 regex=args.regex, 746 category=args.category, 747 bq_result_table=args.bq_result_table, 748 netperf=args.netperf, 749 netperf_hosts=args.remote_worker_host, 750 server_cpu_load=args.server_cpu_load, 751 ) 752 753 if not scenarios: 754 raise Exception("No scenarios to run") 755 756 total_scenario_failures = 0 757 qps_workers_killed = 0 758 merged_resultset = {} 759 perf_report_failures = 0 760 761 for scenario in scenarios: 762 if args.dry_run: 763 print(scenario.name) 764 else: 765 scenario_failures = 0 766 try: 767 for worker in scenario.workers: 768 worker.start() 769 jobs = [scenario.jobspec] 770 if scenario.workers: 771 # TODO(jtattermusch): ideally the "quit" job won't show up 772 # in the report 773 jobs.append( 774 create_quit_jobspec( 775 scenario.workers, 776 remote_host=args.remote_driver_host, 777 ) 778 ) 779 scenario_failures, resultset = jobset.run( 780 jobs, newline_on_success=True, maxjobs=1 781 ) 782 total_scenario_failures += scenario_failures 783 merged_resultset = dict( 784 itertools.chain( 785 six.iteritems(merged_resultset), 786 six.iteritems(resultset), 787 ) 788 ) 789 finally: 790 # Consider qps workers that need to be killed as failures 791 qps_workers_killed += finish_qps_workers( 792 scenario.workers, qpsworker_jobs 793 ) 794 795 if ( 796 perf_cmd 797 and scenario_failures == 0 798 and not args.skip_generate_flamegraphs 799 ): 800 workers_and_base_names = {} 801 for worker in scenario.workers: 802 if not worker.perf_file_base_name: 803 raise Exception( 804 "using perf buf perf report filename is unspecified" 805 ) 806 workers_and_base_names[ 807 worker.host_and_port 808 ] = worker.perf_file_base_name 809 perf_report_failures += run_collect_perf_profile_jobs( 810 workers_and_base_names, 811 scenario.name, 812 args.flame_graph_reports, 813 ) 814 815 # Still write the index.html even if some scenarios failed. 816 # 'profile_output_files' will only have names for scenarios that passed 817 if perf_cmd and not args.skip_generate_flamegraphs: 818 # write the index fil to the output dir, with all profiles from all scenarios/workers 819 report_utils.render_perf_profiling_results( 820 "%s/index.html" % args.flame_graph_reports, profile_output_files 821 ) 822 823 report_utils.render_junit_xml_report( 824 merged_resultset, 825 args.xml_report, 826 suite_name="benchmarks", 827 multi_target=True, 828 ) 829 830 if total_scenario_failures > 0 or qps_workers_killed > 0: 831 print( 832 "%s scenarios failed and %s qps worker jobs killed" 833 % (total_scenario_failures, qps_workers_killed) 834 ) 835 sys.exit(1) 836 837 if perf_report_failures > 0: 838 print("%s perf profile collection jobs failed" % perf_report_failures) 839 sys.exit(1) 840 841 842if __name__ == "__main__": 843 main() 844