1#!/usr/bin/env python3 2# Copyright 2021 The gRPC Authors 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16# Script to generate test configurations for the OSS benchmarks framework. 17# 18# This script filters test scenarios and generates uniquely named configurations 19# for each test. Configurations are dumped in multipart YAML format. 20# 21# See documentation below: 22# https://github.com/grpc/grpc/blob/master/tools/run_tests/performance/README.md#grpc-oss-benchmarks 23 24import argparse 25import collections 26import copy 27import datetime 28import itertools 29import json 30import os 31import string 32import sys 33from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Type 34 35import yaml 36 37sys.path.append(os.path.dirname(os.path.abspath(__file__))) 38import scenario_config 39import scenario_config_exporter 40 41CONFIGURATION_FILE_HEADER_COMMENT = """ 42# Load test configurations generated from a template by loadtest_config.py. 43# See documentation below: 44# https://github.com/grpc/grpc/blob/master/tools/run_tests/performance/README.md#grpc-oss-benchmarks 45""" 46 47 48def safe_name(language: str) -> str: 49 """Returns a name that is safe to use in labels and file names.""" 50 return scenario_config.LANGUAGES[language].safename 51 52 53def default_prefix() -> str: 54 """Constructs and returns a default prefix for LoadTest names.""" 55 return os.environ.get("USER", "loadtest") 56 57 58def now_string() -> str: 59 """Returns the current date and time in string format.""" 60 return datetime.datetime.now().strftime("%Y%m%d%H%M%S") 61 62 63def validate_loadtest_name(name: str) -> None: 64 """Validates that a LoadTest name is in the expected format.""" 65 if len(name) > 253: 66 raise ValueError( 67 "LoadTest name must be less than 253 characters long: %s" % name 68 ) 69 if not all(c.isalnum() and not c.isupper() for c in name if c != "-"): 70 raise ValueError("Invalid characters in LoadTest name: %s" % name) 71 if not name or not name[0].isalpha() or name[-1] == "-": 72 raise ValueError("Invalid format for LoadTest name: %s" % name) 73 74 75def loadtest_base_name( 76 scenario_name: str, uniquifier_elements: Iterable[str] 77) -> str: 78 """Constructs and returns the base name for a LoadTest resource.""" 79 name_elements = scenario_name.split("_") 80 name_elements.extend(uniquifier_elements) 81 return "-".join(element.lower() for element in name_elements) 82 83 84def loadtest_name( 85 prefix: str, scenario_name: str, uniquifier_elements: Iterable[str] 86) -> str: 87 """Constructs and returns a valid name for a LoadTest resource.""" 88 base_name = loadtest_base_name(scenario_name, uniquifier_elements) 89 name_elements = [] 90 if prefix: 91 name_elements.append(prefix) 92 name_elements.append(base_name) 93 name = "-".join(name_elements) 94 validate_loadtest_name(name) 95 return name 96 97 98def component_name(elements: Iterable[str]) -> str: 99 """Constructs a component name from possibly empty elements.""" 100 return "-".join((e for e in elements if e)) 101 102 103def validate_annotations(annotations: Dict[str, str]) -> None: 104 """Validates that annotations do not contain reserved names. 105 106 These names are automatically added by the config generator. 107 """ 108 names = set(("scenario", "uniquifier")).intersection(annotations) 109 if names: 110 raise ValueError("Annotations contain reserved names: %s" % names) 111 112 113def gen_run_indices(runs_per_test: int) -> Iterable[str]: 114 """Generates run indices for multiple runs, as formatted strings.""" 115 if runs_per_test < 2: 116 yield "" 117 return 118 index_length = len("{:d}".format(runs_per_test - 1)) 119 index_fmt = "{{:0{:d}d}}".format(index_length) 120 for i in range(runs_per_test): 121 yield index_fmt.format(i) 122 123 124def scenario_name( 125 base_name: str, 126 client_channels: Optional[int], 127 server_threads: Optional[int], 128 offered_load: Optional[int], 129): 130 """Constructs scenario name from base name and modifiers.""" 131 132 elements = [base_name] 133 if client_channels: 134 elements.append("{:d}channels".format(client_channels)) 135 if server_threads: 136 elements.append("{:d}threads".format(server_threads)) 137 if offered_load: 138 elements.append("{:d}load".format(offered_load)) 139 return "_".join(elements) 140 141 142def scenario_transform_function( 143 client_channels: Optional[int], 144 server_threads: Optional[int], 145 offered_loads: Optional[Iterable[int]], 146) -> Optional[ 147 Callable[[Iterable[Mapping[str, Any]]], Iterable[Mapping[str, Any]]] 148]: 149 """Returns a transform to be applied to a list of scenarios.""" 150 if not any((client_channels, server_threads, len(offered_loads))): 151 return lambda s: s 152 153 def _transform( 154 scenarios: Iterable[Mapping[str, Any]] 155 ) -> Iterable[Mapping[str, Any]]: 156 """Transforms scenarios by inserting num of client channels, number of async_server_threads and offered_load.""" 157 158 for base_scenario in scenarios: 159 base_name = base_scenario["name"] 160 if client_channels: 161 base_scenario["client_config"][ 162 "client_channels" 163 ] = client_channels 164 165 if server_threads: 166 base_scenario["server_config"][ 167 "async_server_threads" 168 ] = server_threads 169 170 if not offered_loads: 171 base_scenario["name"] = scenario_name( 172 base_name, client_channels, server_threads, 0 173 ) 174 yield base_scenario 175 return 176 177 for offered_load in offered_loads: 178 scenario = copy.deepcopy(base_scenario) 179 scenario["client_config"]["load_params"] = { 180 "poisson": {"offered_load": offered_load} 181 } 182 scenario["name"] = scenario_name( 183 base_name, client_channels, server_threads, offered_load 184 ) 185 yield scenario 186 187 return _transform 188 189 190def gen_loadtest_configs( 191 base_config: Mapping[str, Any], 192 base_config_clients: Iterable[Mapping[str, Any]], 193 base_config_servers: Iterable[Mapping[str, Any]], 194 scenario_name_regex: str, 195 language_config: scenario_config_exporter.LanguageConfig, 196 loadtest_name_prefix: str, 197 uniquifier_elements: Iterable[str], 198 annotations: Mapping[str, str], 199 instances_per_client: int = 1, 200 runs_per_test: int = 1, 201 scenario_transform: Callable[ 202 [Iterable[Mapping[str, Any]]], List[Dict[str, Any]] 203 ] = lambda s: s, 204) -> Iterable[Dict[str, Any]]: 205 """Generates LoadTest configurations for a given language config. 206 207 The LoadTest configurations are generated as YAML objects. 208 """ 209 validate_annotations(annotations) 210 prefix = loadtest_name_prefix or default_prefix() 211 cl = safe_name(language_config.client_language or language_config.language) 212 sl = safe_name(language_config.server_language or language_config.language) 213 scenario_filter = scenario_config_exporter.scenario_filter( 214 scenario_name_regex=scenario_name_regex, 215 category=language_config.category, 216 client_language=language_config.client_language, 217 server_language=language_config.server_language, 218 ) 219 220 scenarios = scenario_transform( 221 scenario_config_exporter.gen_scenarios( 222 language_config.language, scenario_filter 223 ) 224 ) 225 226 for scenario in scenarios: 227 for run_index in gen_run_indices(runs_per_test): 228 uniq = ( 229 uniquifier_elements + [run_index] 230 if run_index 231 else uniquifier_elements 232 ) 233 name = loadtest_name(prefix, scenario["name"], uniq) 234 scenario_str = ( 235 json.dumps({"scenarios": scenario}, indent=" ") + "\n" 236 ) 237 238 config = copy.deepcopy(base_config) 239 240 metadata = config["metadata"] 241 metadata["name"] = name 242 if "labels" not in metadata: 243 metadata["labels"] = dict() 244 metadata["labels"]["language"] = safe_name(language_config.language) 245 metadata["labels"]["prefix"] = prefix 246 if "annotations" not in metadata: 247 metadata["annotations"] = dict() 248 metadata["annotations"].update(annotations) 249 metadata["annotations"].update( 250 { 251 "scenario": scenario["name"], 252 "uniquifier": "-".join(uniq), 253 } 254 ) 255 256 spec = config["spec"] 257 258 # Select clients with the required language. 259 clients = [ 260 client 261 for client in base_config_clients 262 if client["language"] == cl 263 ] 264 if not clients: 265 raise IndexError( 266 "Client language not found in template: %s" % cl 267 ) 268 269 # Validate config for additional client instances. 270 if instances_per_client > 1: 271 c = collections.Counter( 272 (client.get("name", "") for client in clients) 273 ) 274 if max(c.values()) > 1: 275 raise ValueError( 276 "Multiple instances of multiple clients requires " 277 "unique names, name counts for language %s: %s" 278 % (cl, c.most_common()) 279 ) 280 281 # Name client instances with an index starting from zero. 282 client_instances = [] 283 for i in range(instances_per_client): 284 client_instances.extend(copy.deepcopy(clients)) 285 for client in client_instances[-len(clients) :]: 286 client["name"] = component_name( 287 (client.get("name", ""), str(i)) 288 ) 289 290 # Set clients to named instances. 291 spec["clients"] = client_instances 292 293 # Select servers with the required language. 294 servers = copy.deepcopy( 295 [ 296 server 297 for server in base_config_servers 298 if server["language"] == sl 299 ] 300 ) 301 if not servers: 302 raise IndexError( 303 "Server language not found in template: %s" % sl 304 ) 305 306 # Name servers with an index for consistency with clients. 307 for i, server in enumerate(servers): 308 server["name"] = component_name( 309 (server.get("name", ""), str(i)) 310 ) 311 312 # Set servers to named instances. 313 spec["servers"] = servers 314 315 # Add driver, if needed. 316 if "driver" not in spec: 317 spec["driver"] = dict() 318 319 # Ensure driver has language and run fields. 320 driver = spec["driver"] 321 if "language" not in driver: 322 driver["language"] = safe_name("c++") 323 if "run" not in driver: 324 driver["run"] = dict() 325 326 # Name the driver with an index for consistency with workers. 327 # There is only one driver, so the index is zero. 328 if "name" not in driver or not driver["name"]: 329 driver["name"] = "0" 330 331 spec["scenariosJSON"] = scenario_str 332 333 yield config 334 335 336def parse_key_value_args(args: Optional[Iterable[str]]) -> Dict[str, str]: 337 """Parses arguments in the form key=value into a dictionary.""" 338 d = dict() 339 if args is None: 340 return d 341 for arg in args: 342 key, equals, value = arg.partition("=") 343 if equals != "=": 344 raise ValueError("Expected key=value: " + value) 345 d[key] = value 346 return d 347 348 349def clear_empty_fields(config: Dict[str, Any]) -> None: 350 """Clears fields set to empty values by string substitution.""" 351 spec = config["spec"] 352 if "clients" in spec: 353 for client in spec["clients"]: 354 if "pool" in client and not client["pool"]: 355 del client["pool"] 356 if "servers" in spec: 357 for server in spec["servers"]: 358 if "pool" in server and not server["pool"]: 359 del server["pool"] 360 if "driver" in spec: 361 driver = spec["driver"] 362 if "pool" in driver and not driver["pool"]: 363 del driver["pool"] 364 if ( 365 "run" in driver 366 and "image" in driver["run"] 367 and not driver["run"]["image"] 368 ): 369 del driver["run"]["image"] 370 if "results" in spec and not ( 371 "bigQueryTable" in spec["results"] and spec["results"]["bigQueryTable"] 372 ): 373 del spec["results"] 374 375 376def config_dumper(header_comment: str) -> Type[yaml.SafeDumper]: 377 """Returns a custom dumper to dump configurations in the expected format.""" 378 379 class ConfigDumper(yaml.SafeDumper): 380 def expect_stream_start(self): 381 super().expect_stream_start() 382 if isinstance(self.event, yaml.StreamStartEvent): 383 self.write_indent() 384 self.write_indicator(header_comment, need_whitespace=False) 385 386 def str_presenter(dumper, data): 387 if "\n" in data: 388 return dumper.represent_scalar( 389 "tag:yaml.org,2002:str", data, style="|" 390 ) 391 return dumper.represent_scalar("tag:yaml.org,2002:str", data) 392 393 ConfigDumper.add_representer(str, str_presenter) 394 395 return ConfigDumper 396 397 398def main() -> None: 399 language_choices = sorted(scenario_config.LANGUAGES.keys()) 400 argp = argparse.ArgumentParser( 401 description="Generates load test configs from a template.", 402 fromfile_prefix_chars="@", 403 ) 404 argp.add_argument( 405 "-l", 406 "--language", 407 action="append", 408 choices=language_choices, 409 required=True, 410 help="Language(s) to benchmark.", 411 dest="languages", 412 ) 413 argp.add_argument( 414 "-t", 415 "--template", 416 type=str, 417 required=True, 418 help="LoadTest configuration yaml file template.", 419 ) 420 argp.add_argument( 421 "-s", 422 "--substitution", 423 action="append", 424 default=[], 425 help="Template substitution(s), in the form key=value.", 426 dest="substitutions", 427 ) 428 argp.add_argument( 429 "-p", "--prefix", default="", type=str, help="Test name prefix." 430 ) 431 argp.add_argument( 432 "-u", 433 "--uniquifier_element", 434 action="append", 435 default=[], 436 help="String element(s) to make the test name unique.", 437 dest="uniquifier_elements", 438 ) 439 argp.add_argument( 440 "-d", 441 action="store_true", 442 help="Use creation date and time as an additional uniquifier element.", 443 ) 444 argp.add_argument( 445 "-a", 446 "--annotation", 447 action="append", 448 default=[], 449 help="metadata.annotation(s), in the form key=value.", 450 dest="annotations", 451 ) 452 argp.add_argument( 453 "-r", 454 "--regex", 455 default=".*", 456 type=str, 457 help="Regex to select scenarios to run.", 458 ) 459 argp.add_argument( 460 "--category", 461 choices=[ 462 "all", 463 "inproc", 464 "scalable", 465 "smoketest", 466 "sweep", 467 "psm", 468 "dashboard", 469 ], 470 default="all", 471 help="Select a category of tests to run.", 472 ) 473 argp.add_argument( 474 "--allow_client_language", 475 action="append", 476 choices=language_choices, 477 default=[], 478 help="Allow cross-language scenarios with this client language.", 479 dest="allow_client_languages", 480 ) 481 argp.add_argument( 482 "--allow_server_language", 483 action="append", 484 choices=language_choices, 485 default=[], 486 help="Allow cross-language scenarios with this server language.", 487 dest="allow_server_languages", 488 ) 489 argp.add_argument( 490 "--instances_per_client", 491 default=1, 492 type=int, 493 help="Number of instances to generate for each client.", 494 ) 495 argp.add_argument( 496 "--runs_per_test", 497 default=1, 498 type=int, 499 help="Number of copies to generate for each test.", 500 ) 501 argp.add_argument( 502 "-o", 503 "--output", 504 type=str, 505 help="Output file name. Output to stdout if not set.", 506 ) 507 argp.add_argument( 508 "--client_channels", type=int, help="Number of client channels." 509 ) 510 argp.add_argument( 511 "--server_threads", type=int, help="Number of async server threads." 512 ) 513 argp.add_argument( 514 "--offered_loads", 515 nargs="*", 516 type=int, 517 default=[], 518 help=( 519 "A list of QPS values at which each load test scenario will be run." 520 ), 521 ) 522 args = argp.parse_args() 523 524 if args.instances_per_client < 1: 525 argp.error("instances_per_client must be greater than zero.") 526 527 if args.runs_per_test < 1: 528 argp.error("runs_per_test must be greater than zero.") 529 530 # Config generation ignores environment variables that are passed by the 531 # controller at runtime. 532 substitutions = { 533 "DRIVER_PORT": "${DRIVER_PORT}", 534 "KILL_AFTER": "${KILL_AFTER}", 535 "POD_TIMEOUT": "${POD_TIMEOUT}", 536 } 537 538 # The user can override the ignored variables above by passing them in as 539 # substitution keys. 540 substitutions.update(parse_key_value_args(args.substitutions)) 541 542 uniquifier_elements = args.uniquifier_elements 543 if args.d: 544 uniquifier_elements.append(now_string()) 545 546 annotations = parse_key_value_args(args.annotations) 547 548 transform = scenario_transform_function( 549 args.client_channels, args.server_threads, args.offered_loads 550 ) 551 552 with open(args.template) as f: 553 base_config = yaml.safe_load( 554 string.Template(f.read()).substitute(substitutions) 555 ) 556 557 clear_empty_fields(base_config) 558 559 spec = base_config["spec"] 560 base_config_clients = spec["clients"] 561 del spec["clients"] 562 base_config_servers = spec["servers"] 563 del spec["servers"] 564 565 client_languages = [""] + args.allow_client_languages 566 server_languages = [""] + args.allow_server_languages 567 config_generators = [] 568 for l, cl, sl in itertools.product( 569 args.languages, client_languages, server_languages 570 ): 571 language_config = scenario_config_exporter.LanguageConfig( 572 category=args.category, 573 language=l, 574 client_language=cl, 575 server_language=sl, 576 ) 577 config_generators.append( 578 gen_loadtest_configs( 579 base_config, 580 base_config_clients, 581 base_config_servers, 582 args.regex, 583 language_config, 584 loadtest_name_prefix=args.prefix, 585 uniquifier_elements=uniquifier_elements, 586 annotations=annotations, 587 instances_per_client=args.instances_per_client, 588 runs_per_test=args.runs_per_test, 589 scenario_transform=transform, 590 ) 591 ) 592 configs = (config for config in itertools.chain(*config_generators)) 593 594 with open(args.output, "w") if args.output else sys.stdout as f: 595 yaml.dump_all( 596 configs, 597 stream=f, 598 Dumper=config_dumper(CONFIGURATION_FILE_HEADER_COMMENT.strip()), 599 default_flow_style=False, 600 ) 601 602 603if __name__ == "__main__": 604 main() 605