1#!/usr/bin/env python3 2# Copyright 2021 The gRPC Authors 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16# Script to generate test configurations for the OSS benchmarks framework. 17# 18# This script filters test scenarios and generates uniquely named configurations 19# for each test. Configurations are dumped in multipart YAML format. 20# 21# See documentation below: 22# https://github.com/grpc/grpc/blob/master/tools/run_tests/performance/README.md#grpc-oss-benchmarks 23 24import argparse 25import collections 26import copy 27import datetime 28import itertools 29import json 30import os 31import string 32import sys 33from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Type 34 35import yaml 36 37sys.path.append(os.path.dirname(os.path.abspath(__file__))) 38import scenario_config 39import scenario_config_exporter 40 41CONFIGURATION_FILE_HEADER_COMMENT = """ 42# Load test configurations generated from a template by loadtest_config.py. 43# See documentation below: 44# https://github.com/grpc/grpc/blob/master/tools/run_tests/performance/README.md#grpc-oss-benchmarks 45""" 46 47 48def safe_name(language: str) -> str: 49 """Returns a name that is safe to use in labels and file names.""" 50 return scenario_config.LANGUAGES[language].safename 51 52 53def default_prefix() -> str: 54 """Constructs and returns a default prefix for LoadTest names.""" 55 return os.environ.get('USER', 'loadtest') 56 57 58def now_string() -> str: 59 """Returns the current date and time in string format.""" 60 return datetime.datetime.now().strftime('%Y%m%d%H%M%S') 61 62 63def validate_loadtest_name(name: str) -> None: 64 """Validates that a LoadTest name is in the expected format.""" 65 if len(name) > 253: 66 raise ValueError( 67 'LoadTest name must be less than 253 characters long: %s' % name) 68 if not all(c.isalnum() and not c.isupper() for c in name if c != '-'): 69 raise ValueError('Invalid characters in LoadTest name: %s' % name) 70 if not name or not name[0].isalpha() or name[-1] == '-': 71 raise ValueError('Invalid format for LoadTest name: %s' % name) 72 73 74def loadtest_base_name(scenario_name: str, 75 uniquifier_elements: Iterable[str]) -> str: 76 """Constructs and returns the base name for a LoadTest resource.""" 77 name_elements = scenario_name.split('_') 78 name_elements.extend(uniquifier_elements) 79 return '-'.join(element.lower() for element in name_elements) 80 81 82def loadtest_name(prefix: str, scenario_name: str, 83 uniquifier_elements: Iterable[str]) -> str: 84 """Constructs and returns a valid name for a LoadTest resource.""" 85 base_name = loadtest_base_name(scenario_name, uniquifier_elements) 86 name_elements = [] 87 if prefix: 88 name_elements.append(prefix) 89 name_elements.append(base_name) 90 name = '-'.join(name_elements) 91 validate_loadtest_name(name) 92 return name 93 94 95def component_name(elements: Iterable[str]) -> str: 96 """Constructs a component name from possibly empty elements.""" 97 return '-'.join((e for e in elements if e)) 98 99 100def validate_annotations(annotations: Dict[str, str]) -> None: 101 """Validates that annotations do not contain reserved names. 102 103 These names are automatically added by the config generator. 104 """ 105 names = set(('scenario', 'uniquifier')).intersection(annotations) 106 if names: 107 raise ValueError('Annotations contain reserved names: %s' % names) 108 109 110def gen_run_indices(runs_per_test: int) -> Iterable[str]: 111 """Generates run indices for multiple runs, as formatted strings.""" 112 if runs_per_test < 2: 113 yield '' 114 return 115 index_length = len('{:d}'.format(runs_per_test - 1)) 116 index_fmt = '{{:0{:d}d}}'.format(index_length) 117 for i in range(runs_per_test): 118 yield index_fmt.format(i) 119 120 121def scenario_name(base_name: str, client_channels: Optional[int], 122 server_threads: Optional[int], offered_load: Optional[int]): 123 """Constructs scenario name from base name and modifiers.""" 124 125 elements = [base_name] 126 if client_channels: 127 elements.append('{:d}channels'.format(client_channels)) 128 if server_threads: 129 elements.append('{:d}threads'.format(server_threads)) 130 if offered_load: 131 elements.append('{:d}load'.format(offered_load)) 132 return '_'.join(elements) 133 134 135def scenario_transform_function( 136 client_channels: Optional[int], server_threads: Optional[int], 137 offered_loads: Optional[Iterable[int]] 138) -> Optional[Callable[[Iterable[Mapping[str, Any]]], Iterable[Mapping[str, 139 Any]]]]: 140 """Returns a transform to be applied to a list of scenarios.""" 141 if not any((client_channels, server_threads, len(offered_loads))): 142 return lambda s: s 143 144 def _transform( 145 scenarios: Iterable[Mapping[str, 146 Any]]) -> Iterable[Mapping[str, Any]]: 147 """Transforms scenarios by inserting num of client channels, number of async_server_threads and offered_load.""" 148 149 for base_scenario in scenarios: 150 base_name = base_scenario['name'] 151 if client_channels: 152 base_scenario['client_config'][ 153 'client_channels'] = client_channels 154 155 if server_threads: 156 base_scenario['server_config'][ 157 'async_server_threads'] = server_threads 158 159 if not offered_loads: 160 base_scenario['name'] = scenario_name(base_name, 161 client_channels, 162 server_threads, 0) 163 yield base_scenario 164 return 165 166 for offered_load in offered_loads: 167 scenario = copy.deepcopy(base_scenario) 168 scenario['client_config']['load_params'] = { 169 'poisson': { 170 'offered_load': offered_load 171 } 172 } 173 scenario['name'] = scenario_name(base_name, client_channels, 174 server_threads, offered_load) 175 yield scenario 176 177 return _transform 178 179 180def gen_loadtest_configs( 181 base_config: Mapping[str, Any], 182 base_config_clients: Iterable[Mapping[str, Any]], 183 base_config_servers: Iterable[Mapping[str, Any]], 184 scenario_name_regex: str, 185 language_config: scenario_config_exporter.LanguageConfig, 186 loadtest_name_prefix: str, 187 uniquifier_elements: Iterable[str], 188 annotations: Mapping[str, str], 189 instances_per_client: int = 1, 190 runs_per_test: int = 1, 191 scenario_transform: Callable[[Iterable[Mapping[str, Any]]], 192 List[Dict[str, Any]]] = lambda s: s 193) -> Iterable[Dict[str, Any]]: 194 """Generates LoadTest configurations for a given language config. 195 196 The LoadTest configurations are generated as YAML objects. 197 """ 198 validate_annotations(annotations) 199 prefix = loadtest_name_prefix or default_prefix() 200 cl = safe_name(language_config.client_language or language_config.language) 201 sl = safe_name(language_config.server_language or language_config.language) 202 scenario_filter = scenario_config_exporter.scenario_filter( 203 scenario_name_regex=scenario_name_regex, 204 category=language_config.category, 205 client_language=language_config.client_language, 206 server_language=language_config.server_language) 207 208 scenarios = scenario_transform( 209 scenario_config_exporter.gen_scenarios(language_config.language, 210 scenario_filter)) 211 212 for scenario in scenarios: 213 for run_index in gen_run_indices(runs_per_test): 214 uniq = (uniquifier_elements + 215 [run_index] if run_index else uniquifier_elements) 216 name = loadtest_name(prefix, scenario['name'], uniq) 217 scenario_str = json.dumps({'scenarios': scenario}, 218 indent=' ') + '\n' 219 220 config = copy.deepcopy(base_config) 221 222 metadata = config['metadata'] 223 metadata['name'] = name 224 if 'labels' not in metadata: 225 metadata['labels'] = dict() 226 metadata['labels']['language'] = safe_name(language_config.language) 227 metadata['labels']['prefix'] = prefix 228 if 'annotations' not in metadata: 229 metadata['annotations'] = dict() 230 metadata['annotations'].update(annotations) 231 metadata['annotations'].update({ 232 'scenario': scenario['name'], 233 'uniquifier': '-'.join(uniq), 234 }) 235 236 spec = config['spec'] 237 238 # Select clients with the required language. 239 clients = [ 240 client for client in base_config_clients 241 if client['language'] == cl 242 ] 243 if not clients: 244 raise IndexError('Client language not found in template: %s' % 245 cl) 246 247 # Validate config for additional client instances. 248 if instances_per_client > 1: 249 c = collections.Counter( 250 (client.get('name', '') for client in clients)) 251 if max(c.values()) > 1: 252 raise ValueError( 253 ('Multiple instances of multiple clients requires ' 254 'unique names, name counts for language %s: %s') % 255 (cl, c.most_common())) 256 257 # Name client instances with an index starting from zero. 258 client_instances = [] 259 for i in range(instances_per_client): 260 client_instances.extend(copy.deepcopy(clients)) 261 for client in client_instances[-len(clients):]: 262 client['name'] = component_name((client.get('name', 263 ''), str(i))) 264 265 # Set clients to named instances. 266 spec['clients'] = client_instances 267 268 # Select servers with the required language. 269 servers = copy.deepcopy([ 270 server for server in base_config_servers 271 if server['language'] == sl 272 ]) 273 if not servers: 274 raise IndexError('Server language not found in template: %s' % 275 sl) 276 277 # Name servers with an index for consistency with clients. 278 for i, server in enumerate(servers): 279 server['name'] = component_name((server.get('name', 280 ''), str(i))) 281 282 # Set servers to named instances. 283 spec['servers'] = servers 284 285 # Add driver, if needed. 286 if 'driver' not in spec: 287 spec['driver'] = dict() 288 289 # Ensure driver has language and run fields. 290 driver = spec['driver'] 291 if 'language' not in driver: 292 driver['language'] = safe_name('c++') 293 if 'run' not in driver: 294 driver['run'] = dict() 295 296 # Name the driver with an index for consistency with workers. 297 # There is only one driver, so the index is zero. 298 if 'name' not in driver or not driver['name']: 299 driver['name'] = '0' 300 301 spec['scenariosJSON'] = scenario_str 302 303 yield config 304 305 306def parse_key_value_args(args: Optional[Iterable[str]]) -> Dict[str, str]: 307 """Parses arguments in the form key=value into a dictionary.""" 308 d = dict() 309 if args is None: 310 return d 311 for arg in args: 312 key, equals, value = arg.partition('=') 313 if equals != '=': 314 raise ValueError('Expected key=value: ' + value) 315 d[key] = value 316 return d 317 318 319def clear_empty_fields(config: Dict[str, Any]) -> None: 320 """Clears fields set to empty values by string substitution.""" 321 spec = config['spec'] 322 if 'clients' in spec: 323 for client in spec['clients']: 324 if 'pool' in client and not client['pool']: 325 del client['pool'] 326 if 'servers' in spec: 327 for server in spec['servers']: 328 if 'pool' in server and not server['pool']: 329 del server['pool'] 330 if 'driver' in spec: 331 driver = spec['driver'] 332 if 'pool' in driver and not driver['pool']: 333 del driver['pool'] 334 if ('run' in driver and 'image' in driver['run'] and 335 not driver['run']['image']): 336 del driver['run']['image'] 337 if 'results' in spec and not ('bigQueryTable' in spec['results'] and 338 spec['results']['bigQueryTable']): 339 del spec['results'] 340 341 342def config_dumper(header_comment: str) -> Type[yaml.SafeDumper]: 343 """Returns a custom dumper to dump configurations in the expected format.""" 344 345 class ConfigDumper(yaml.SafeDumper): 346 347 def expect_stream_start(self): 348 super().expect_stream_start() 349 if isinstance(self.event, yaml.StreamStartEvent): 350 self.write_indent() 351 self.write_indicator(header_comment, need_whitespace=False) 352 353 def str_presenter(dumper, data): 354 if '\n' in data: 355 return dumper.represent_scalar('tag:yaml.org,2002:str', 356 data, 357 style='|') 358 return dumper.represent_scalar('tag:yaml.org,2002:str', data) 359 360 ConfigDumper.add_representer(str, str_presenter) 361 362 return ConfigDumper 363 364 365def main() -> None: 366 language_choices = sorted(scenario_config.LANGUAGES.keys()) 367 argp = argparse.ArgumentParser( 368 description='Generates load test configs from a template.', 369 fromfile_prefix_chars='@') 370 argp.add_argument('-l', 371 '--language', 372 action='append', 373 choices=language_choices, 374 required=True, 375 help='Language(s) to benchmark.', 376 dest='languages') 377 argp.add_argument('-t', 378 '--template', 379 type=str, 380 required=True, 381 help='LoadTest configuration yaml file template.') 382 argp.add_argument('-s', 383 '--substitution', 384 action='append', 385 default=[], 386 help='Template substitution(s), in the form key=value.', 387 dest='substitutions') 388 argp.add_argument('-p', 389 '--prefix', 390 default='', 391 type=str, 392 help='Test name prefix.') 393 argp.add_argument('-u', 394 '--uniquifier_element', 395 action='append', 396 default=[], 397 help='String element(s) to make the test name unique.', 398 dest='uniquifier_elements') 399 argp.add_argument( 400 '-d', 401 action='store_true', 402 help='Use creation date and time as an additional uniquifier element.') 403 argp.add_argument('-a', 404 '--annotation', 405 action='append', 406 default=[], 407 help='metadata.annotation(s), in the form key=value.', 408 dest='annotations') 409 argp.add_argument('-r', 410 '--regex', 411 default='.*', 412 type=str, 413 help='Regex to select scenarios to run.') 414 argp.add_argument( 415 '--category', 416 choices=['all', 'inproc', 'scalable', 'smoketest', 'sweep', 'psm'], 417 default='all', 418 help='Select a category of tests to run.') 419 argp.add_argument( 420 '--allow_client_language', 421 action='append', 422 choices=language_choices, 423 default=[], 424 help='Allow cross-language scenarios with this client language.', 425 dest='allow_client_languages') 426 argp.add_argument( 427 '--allow_server_language', 428 action='append', 429 choices=language_choices, 430 default=[], 431 help='Allow cross-language scenarios with this server language.', 432 dest='allow_server_languages') 433 argp.add_argument('--instances_per_client', 434 default=1, 435 type=int, 436 help="Number of instances to generate for each client.") 437 argp.add_argument('--runs_per_test', 438 default=1, 439 type=int, 440 help='Number of copies to generate for each test.') 441 argp.add_argument('-o', 442 '--output', 443 type=str, 444 help='Output file name. Output to stdout if not set.') 445 argp.add_argument('--client_channels', 446 type=int, 447 help='Number of client channels.') 448 argp.add_argument('--server_threads', 449 type=int, 450 help='Number of async server threads.') 451 argp.add_argument( 452 '--offered_loads', 453 nargs="*", 454 type=int, 455 default=[], 456 help='A list of QPS values at which each load test scenario will be run.' 457 ) 458 args = argp.parse_args() 459 460 if args.instances_per_client < 1: 461 argp.error('instances_per_client must be greater than zero.') 462 463 if args.runs_per_test < 1: 464 argp.error('runs_per_test must be greater than zero.') 465 466 # Config generation ignores environment variables that are passed by the 467 # controller at runtime. 468 substitutions = { 469 'DRIVER_PORT': '${DRIVER_PORT}', 470 'KILL_AFTER': '${KILL_AFTER}', 471 'POD_TIMEOUT': '${POD_TIMEOUT}', 472 } 473 474 # The user can override the ignored variables above by passing them in as 475 # substitution keys. 476 substitutions.update(parse_key_value_args(args.substitutions)) 477 478 uniquifier_elements = args.uniquifier_elements 479 if args.d: 480 uniquifier_elements.append(now_string()) 481 482 annotations = parse_key_value_args(args.annotations) 483 484 transform = scenario_transform_function(args.client_channels, 485 args.server_threads, 486 args.offered_loads) 487 488 with open(args.template) as f: 489 base_config = yaml.safe_load( 490 string.Template(f.read()).substitute(substitutions)) 491 492 clear_empty_fields(base_config) 493 494 spec = base_config['spec'] 495 base_config_clients = spec['clients'] 496 del spec['clients'] 497 base_config_servers = spec['servers'] 498 del spec['servers'] 499 500 client_languages = [''] + args.allow_client_languages 501 server_languages = [''] + args.allow_server_languages 502 config_generators = [] 503 for l, cl, sl in itertools.product(args.languages, client_languages, 504 server_languages): 505 language_config = scenario_config_exporter.LanguageConfig( 506 category=args.category, 507 language=l, 508 client_language=cl, 509 server_language=sl) 510 config_generators.append( 511 gen_loadtest_configs(base_config, 512 base_config_clients, 513 base_config_servers, 514 args.regex, 515 language_config, 516 loadtest_name_prefix=args.prefix, 517 uniquifier_elements=uniquifier_elements, 518 annotations=annotations, 519 instances_per_client=args.instances_per_client, 520 runs_per_test=args.runs_per_test, 521 scenario_transform=transform)) 522 configs = (config for config in itertools.chain(*config_generators)) 523 524 with open(args.output, 'w') if args.output else sys.stdout as f: 525 yaml.dump_all(configs, 526 stream=f, 527 Dumper=config_dumper( 528 CONFIGURATION_FILE_HEADER_COMMENT.strip()), 529 default_flow_style=False) 530 531 532if __name__ == '__main__': 533 main() 534