1#!/usr/bin/env python3
2# Copyright 2021 The gRPC Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16# Script to generate test configurations for the OSS benchmarks framework.
17#
18# This script filters test scenarios and generates uniquely named configurations
19# for each test. Configurations are dumped in multipart YAML format.
20#
21# See documentation below:
22# https://github.com/grpc/grpc/blob/master/tools/run_tests/performance/README.md#grpc-oss-benchmarks
23
24import argparse
25import collections
26import copy
27import datetime
28import itertools
29import json
30import os
31import string
32import sys
33from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Type
34
35import yaml
36
37sys.path.append(os.path.dirname(os.path.abspath(__file__)))
38import scenario_config
39import scenario_config_exporter
40
41CONFIGURATION_FILE_HEADER_COMMENT = """
42# Load test configurations generated from a template by loadtest_config.py.
43# See documentation below:
44# https://github.com/grpc/grpc/blob/master/tools/run_tests/performance/README.md#grpc-oss-benchmarks
45"""
46
47
48def safe_name(language: str) -> str:
49    """Returns a name that is safe to use in labels and file names."""
50    return scenario_config.LANGUAGES[language].safename
51
52
53def default_prefix() -> str:
54    """Constructs and returns a default prefix for LoadTest names."""
55    return os.environ.get('USER', 'loadtest')
56
57
58def now_string() -> str:
59    """Returns the current date and time in string format."""
60    return datetime.datetime.now().strftime('%Y%m%d%H%M%S')
61
62
63def validate_loadtest_name(name: str) -> None:
64    """Validates that a LoadTest name is in the expected format."""
65    if len(name) > 253:
66        raise ValueError(
67            'LoadTest name must be less than 253 characters long: %s' % name)
68    if not all(c.isalnum() and not c.isupper() for c in name if c != '-'):
69        raise ValueError('Invalid characters in LoadTest name: %s' % name)
70    if not name or not name[0].isalpha() or name[-1] == '-':
71        raise ValueError('Invalid format for LoadTest name: %s' % name)
72
73
74def loadtest_base_name(scenario_name: str,
75                       uniquifier_elements: Iterable[str]) -> str:
76    """Constructs and returns the base name for a LoadTest resource."""
77    name_elements = scenario_name.split('_')
78    name_elements.extend(uniquifier_elements)
79    return '-'.join(element.lower() for element in name_elements)
80
81
82def loadtest_name(prefix: str, scenario_name: str,
83                  uniquifier_elements: Iterable[str]) -> str:
84    """Constructs and returns a valid name for a LoadTest resource."""
85    base_name = loadtest_base_name(scenario_name, uniquifier_elements)
86    name_elements = []
87    if prefix:
88        name_elements.append(prefix)
89    name_elements.append(base_name)
90    name = '-'.join(name_elements)
91    validate_loadtest_name(name)
92    return name
93
94
95def component_name(elements: Iterable[str]) -> str:
96    """Constructs a component name from possibly empty elements."""
97    return '-'.join((e for e in elements if e))
98
99
100def validate_annotations(annotations: Dict[str, str]) -> None:
101    """Validates that annotations do not contain reserved names.
102
103    These names are automatically added by the config generator.
104    """
105    names = set(('scenario', 'uniquifier')).intersection(annotations)
106    if names:
107        raise ValueError('Annotations contain reserved names: %s' % names)
108
109
110def gen_run_indices(runs_per_test: int) -> Iterable[str]:
111    """Generates run indices for multiple runs, as formatted strings."""
112    if runs_per_test < 2:
113        yield ''
114        return
115    index_length = len('{:d}'.format(runs_per_test - 1))
116    index_fmt = '{{:0{:d}d}}'.format(index_length)
117    for i in range(runs_per_test):
118        yield index_fmt.format(i)
119
120
121def scenario_name(base_name: str, client_channels: Optional[int],
122                  server_threads: Optional[int], offered_load: Optional[int]):
123    """Constructs scenario name from base name and modifiers."""
124
125    elements = [base_name]
126    if client_channels:
127        elements.append('{:d}channels'.format(client_channels))
128    if server_threads:
129        elements.append('{:d}threads'.format(server_threads))
130    if offered_load:
131        elements.append('{:d}load'.format(offered_load))
132    return '_'.join(elements)
133
134
135def scenario_transform_function(
136    client_channels: Optional[int], server_threads: Optional[int],
137    offered_loads: Optional[Iterable[int]]
138) -> Optional[Callable[[Iterable[Mapping[str, Any]]], Iterable[Mapping[str,
139                                                                       Any]]]]:
140    """Returns a transform to be applied to a list of scenarios."""
141    if not any((client_channels, server_threads, len(offered_loads))):
142        return lambda s: s
143
144    def _transform(
145            scenarios: Iterable[Mapping[str,
146                                        Any]]) -> Iterable[Mapping[str, Any]]:
147        """Transforms scenarios by inserting num of client channels, number of async_server_threads and offered_load."""
148
149        for base_scenario in scenarios:
150            base_name = base_scenario['name']
151            if client_channels:
152                base_scenario['client_config'][
153                    'client_channels'] = client_channels
154
155            if server_threads:
156                base_scenario['server_config'][
157                    'async_server_threads'] = server_threads
158
159            if not offered_loads:
160                base_scenario['name'] = scenario_name(base_name,
161                                                      client_channels,
162                                                      server_threads, 0)
163                yield base_scenario
164                return
165
166            for offered_load in offered_loads:
167                scenario = copy.deepcopy(base_scenario)
168                scenario['client_config']['load_params'] = {
169                    'poisson': {
170                        'offered_load': offered_load
171                    }
172                }
173                scenario['name'] = scenario_name(base_name, client_channels,
174                                                 server_threads, offered_load)
175                yield scenario
176
177    return _transform
178
179
180def gen_loadtest_configs(
181    base_config: Mapping[str, Any],
182    base_config_clients: Iterable[Mapping[str, Any]],
183    base_config_servers: Iterable[Mapping[str, Any]],
184    scenario_name_regex: str,
185    language_config: scenario_config_exporter.LanguageConfig,
186    loadtest_name_prefix: str,
187    uniquifier_elements: Iterable[str],
188    annotations: Mapping[str, str],
189    instances_per_client: int = 1,
190    runs_per_test: int = 1,
191    scenario_transform: Callable[[Iterable[Mapping[str, Any]]],
192                                 List[Dict[str, Any]]] = lambda s: s
193) -> Iterable[Dict[str, Any]]:
194    """Generates LoadTest configurations for a given language config.
195
196    The LoadTest configurations are generated as YAML objects.
197    """
198    validate_annotations(annotations)
199    prefix = loadtest_name_prefix or default_prefix()
200    cl = safe_name(language_config.client_language or language_config.language)
201    sl = safe_name(language_config.server_language or language_config.language)
202    scenario_filter = scenario_config_exporter.scenario_filter(
203        scenario_name_regex=scenario_name_regex,
204        category=language_config.category,
205        client_language=language_config.client_language,
206        server_language=language_config.server_language)
207
208    scenarios = scenario_transform(
209        scenario_config_exporter.gen_scenarios(language_config.language,
210                                               scenario_filter))
211
212    for scenario in scenarios:
213        for run_index in gen_run_indices(runs_per_test):
214            uniq = (uniquifier_elements +
215                    [run_index] if run_index else uniquifier_elements)
216            name = loadtest_name(prefix, scenario['name'], uniq)
217            scenario_str = json.dumps({'scenarios': scenario},
218                                      indent='  ') + '\n'
219
220            config = copy.deepcopy(base_config)
221
222            metadata = config['metadata']
223            metadata['name'] = name
224            if 'labels' not in metadata:
225                metadata['labels'] = dict()
226            metadata['labels']['language'] = safe_name(language_config.language)
227            metadata['labels']['prefix'] = prefix
228            if 'annotations' not in metadata:
229                metadata['annotations'] = dict()
230            metadata['annotations'].update(annotations)
231            metadata['annotations'].update({
232                'scenario': scenario['name'],
233                'uniquifier': '-'.join(uniq),
234            })
235
236            spec = config['spec']
237
238            # Select clients with the required language.
239            clients = [
240                client for client in base_config_clients
241                if client['language'] == cl
242            ]
243            if not clients:
244                raise IndexError('Client language not found in template: %s' %
245                                 cl)
246
247            # Validate config for additional client instances.
248            if instances_per_client > 1:
249                c = collections.Counter(
250                    (client.get('name', '') for client in clients))
251                if max(c.values()) > 1:
252                    raise ValueError(
253                        ('Multiple instances of multiple clients requires '
254                         'unique names, name counts for language %s: %s') %
255                        (cl, c.most_common()))
256
257            # Name client instances with an index starting from zero.
258            client_instances = []
259            for i in range(instances_per_client):
260                client_instances.extend(copy.deepcopy(clients))
261                for client in client_instances[-len(clients):]:
262                    client['name'] = component_name((client.get('name',
263                                                                ''), str(i)))
264
265            # Set clients to named instances.
266            spec['clients'] = client_instances
267
268            # Select servers with the required language.
269            servers = copy.deepcopy([
270                server for server in base_config_servers
271                if server['language'] == sl
272            ])
273            if not servers:
274                raise IndexError('Server language not found in template: %s' %
275                                 sl)
276
277            # Name servers with an index for consistency with clients.
278            for i, server in enumerate(servers):
279                server['name'] = component_name((server.get('name',
280                                                            ''), str(i)))
281
282            # Set servers to named instances.
283            spec['servers'] = servers
284
285            # Add driver, if needed.
286            if 'driver' not in spec:
287                spec['driver'] = dict()
288
289            # Ensure driver has language and run fields.
290            driver = spec['driver']
291            if 'language' not in driver:
292                driver['language'] = safe_name('c++')
293            if 'run' not in driver:
294                driver['run'] = dict()
295
296            # Name the driver with an index for consistency with workers.
297            # There is only one driver, so the index is zero.
298            if 'name' not in driver or not driver['name']:
299                driver['name'] = '0'
300
301            spec['scenariosJSON'] = scenario_str
302
303            yield config
304
305
306def parse_key_value_args(args: Optional[Iterable[str]]) -> Dict[str, str]:
307    """Parses arguments in the form key=value into a dictionary."""
308    d = dict()
309    if args is None:
310        return d
311    for arg in args:
312        key, equals, value = arg.partition('=')
313        if equals != '=':
314            raise ValueError('Expected key=value: ' + value)
315        d[key] = value
316    return d
317
318
319def clear_empty_fields(config: Dict[str, Any]) -> None:
320    """Clears fields set to empty values by string substitution."""
321    spec = config['spec']
322    if 'clients' in spec:
323        for client in spec['clients']:
324            if 'pool' in client and not client['pool']:
325                del client['pool']
326    if 'servers' in spec:
327        for server in spec['servers']:
328            if 'pool' in server and not server['pool']:
329                del server['pool']
330    if 'driver' in spec:
331        driver = spec['driver']
332        if 'pool' in driver and not driver['pool']:
333            del driver['pool']
334        if ('run' in driver and 'image' in driver['run'] and
335                not driver['run']['image']):
336            del driver['run']['image']
337    if 'results' in spec and not ('bigQueryTable' in spec['results'] and
338                                  spec['results']['bigQueryTable']):
339        del spec['results']
340
341
342def config_dumper(header_comment: str) -> Type[yaml.SafeDumper]:
343    """Returns a custom dumper to dump configurations in the expected format."""
344
345    class ConfigDumper(yaml.SafeDumper):
346
347        def expect_stream_start(self):
348            super().expect_stream_start()
349            if isinstance(self.event, yaml.StreamStartEvent):
350                self.write_indent()
351                self.write_indicator(header_comment, need_whitespace=False)
352
353    def str_presenter(dumper, data):
354        if '\n' in data:
355            return dumper.represent_scalar('tag:yaml.org,2002:str',
356                                           data,
357                                           style='|')
358        return dumper.represent_scalar('tag:yaml.org,2002:str', data)
359
360    ConfigDumper.add_representer(str, str_presenter)
361
362    return ConfigDumper
363
364
365def main() -> None:
366    language_choices = sorted(scenario_config.LANGUAGES.keys())
367    argp = argparse.ArgumentParser(
368        description='Generates load test configs from a template.',
369        fromfile_prefix_chars='@')
370    argp.add_argument('-l',
371                      '--language',
372                      action='append',
373                      choices=language_choices,
374                      required=True,
375                      help='Language(s) to benchmark.',
376                      dest='languages')
377    argp.add_argument('-t',
378                      '--template',
379                      type=str,
380                      required=True,
381                      help='LoadTest configuration yaml file template.')
382    argp.add_argument('-s',
383                      '--substitution',
384                      action='append',
385                      default=[],
386                      help='Template substitution(s), in the form key=value.',
387                      dest='substitutions')
388    argp.add_argument('-p',
389                      '--prefix',
390                      default='',
391                      type=str,
392                      help='Test name prefix.')
393    argp.add_argument('-u',
394                      '--uniquifier_element',
395                      action='append',
396                      default=[],
397                      help='String element(s) to make the test name unique.',
398                      dest='uniquifier_elements')
399    argp.add_argument(
400        '-d',
401        action='store_true',
402        help='Use creation date and time as an additional uniquifier element.')
403    argp.add_argument('-a',
404                      '--annotation',
405                      action='append',
406                      default=[],
407                      help='metadata.annotation(s), in the form key=value.',
408                      dest='annotations')
409    argp.add_argument('-r',
410                      '--regex',
411                      default='.*',
412                      type=str,
413                      help='Regex to select scenarios to run.')
414    argp.add_argument(
415        '--category',
416        choices=['all', 'inproc', 'scalable', 'smoketest', 'sweep', 'psm'],
417        default='all',
418        help='Select a category of tests to run.')
419    argp.add_argument(
420        '--allow_client_language',
421        action='append',
422        choices=language_choices,
423        default=[],
424        help='Allow cross-language scenarios with this client language.',
425        dest='allow_client_languages')
426    argp.add_argument(
427        '--allow_server_language',
428        action='append',
429        choices=language_choices,
430        default=[],
431        help='Allow cross-language scenarios with this server language.',
432        dest='allow_server_languages')
433    argp.add_argument('--instances_per_client',
434                      default=1,
435                      type=int,
436                      help="Number of instances to generate for each client.")
437    argp.add_argument('--runs_per_test',
438                      default=1,
439                      type=int,
440                      help='Number of copies to generate for each test.')
441    argp.add_argument('-o',
442                      '--output',
443                      type=str,
444                      help='Output file name. Output to stdout if not set.')
445    argp.add_argument('--client_channels',
446                      type=int,
447                      help='Number of client channels.')
448    argp.add_argument('--server_threads',
449                      type=int,
450                      help='Number of async server threads.')
451    argp.add_argument(
452        '--offered_loads',
453        nargs="*",
454        type=int,
455        default=[],
456        help='A list of QPS values at which each load test scenario will be run.'
457    )
458    args = argp.parse_args()
459
460    if args.instances_per_client < 1:
461        argp.error('instances_per_client must be greater than zero.')
462
463    if args.runs_per_test < 1:
464        argp.error('runs_per_test must be greater than zero.')
465
466    # Config generation ignores environment variables that are passed by the
467    # controller at runtime.
468    substitutions = {
469        'DRIVER_PORT': '${DRIVER_PORT}',
470        'KILL_AFTER': '${KILL_AFTER}',
471        'POD_TIMEOUT': '${POD_TIMEOUT}',
472    }
473
474    # The user can override the ignored variables above by passing them in as
475    # substitution keys.
476    substitutions.update(parse_key_value_args(args.substitutions))
477
478    uniquifier_elements = args.uniquifier_elements
479    if args.d:
480        uniquifier_elements.append(now_string())
481
482    annotations = parse_key_value_args(args.annotations)
483
484    transform = scenario_transform_function(args.client_channels,
485                                            args.server_threads,
486                                            args.offered_loads)
487
488    with open(args.template) as f:
489        base_config = yaml.safe_load(
490            string.Template(f.read()).substitute(substitutions))
491
492    clear_empty_fields(base_config)
493
494    spec = base_config['spec']
495    base_config_clients = spec['clients']
496    del spec['clients']
497    base_config_servers = spec['servers']
498    del spec['servers']
499
500    client_languages = [''] + args.allow_client_languages
501    server_languages = [''] + args.allow_server_languages
502    config_generators = []
503    for l, cl, sl in itertools.product(args.languages, client_languages,
504                                       server_languages):
505        language_config = scenario_config_exporter.LanguageConfig(
506            category=args.category,
507            language=l,
508            client_language=cl,
509            server_language=sl)
510        config_generators.append(
511            gen_loadtest_configs(base_config,
512                                 base_config_clients,
513                                 base_config_servers,
514                                 args.regex,
515                                 language_config,
516                                 loadtest_name_prefix=args.prefix,
517                                 uniquifier_elements=uniquifier_elements,
518                                 annotations=annotations,
519                                 instances_per_client=args.instances_per_client,
520                                 runs_per_test=args.runs_per_test,
521                                 scenario_transform=transform))
522    configs = (config for config in itertools.chain(*config_generators))
523
524    with open(args.output, 'w') if args.output else sys.stdout as f:
525        yaml.dump_all(configs,
526                      stream=f,
527                      Dumper=config_dumper(
528                          CONFIGURATION_FILE_HEADER_COMMENT.strip()),
529                      default_flow_style=False)
530
531
532if __name__ == '__main__':
533    main()
534