xref: /aosp_15_r20/external/grpc-grpc/tools/run_tests/performance/loadtest_config.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1#!/usr/bin/env python3
2# Copyright 2021 The gRPC Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16# Script to generate test configurations for the OSS benchmarks framework.
17#
18# This script filters test scenarios and generates uniquely named configurations
19# for each test. Configurations are dumped in multipart YAML format.
20#
21# See documentation below:
22# https://github.com/grpc/grpc/blob/master/tools/run_tests/performance/README.md#grpc-oss-benchmarks
23
24import argparse
25import collections
26import copy
27import datetime
28import itertools
29import json
30import os
31import string
32import sys
33from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Type
34
35import yaml
36
37sys.path.append(os.path.dirname(os.path.abspath(__file__)))
38import scenario_config
39import scenario_config_exporter
40
41CONFIGURATION_FILE_HEADER_COMMENT = """
42# Load test configurations generated from a template by loadtest_config.py.
43# See documentation below:
44# https://github.com/grpc/grpc/blob/master/tools/run_tests/performance/README.md#grpc-oss-benchmarks
45"""
46
47
48def safe_name(language: str) -> str:
49    """Returns a name that is safe to use in labels and file names."""
50    return scenario_config.LANGUAGES[language].safename
51
52
53def default_prefix() -> str:
54    """Constructs and returns a default prefix for LoadTest names."""
55    return os.environ.get("USER", "loadtest")
56
57
58def now_string() -> str:
59    """Returns the current date and time in string format."""
60    return datetime.datetime.now().strftime("%Y%m%d%H%M%S")
61
62
63def validate_loadtest_name(name: str) -> None:
64    """Validates that a LoadTest name is in the expected format."""
65    if len(name) > 253:
66        raise ValueError(
67            "LoadTest name must be less than 253 characters long: %s" % name
68        )
69    if not all(c.isalnum() and not c.isupper() for c in name if c != "-"):
70        raise ValueError("Invalid characters in LoadTest name: %s" % name)
71    if not name or not name[0].isalpha() or name[-1] == "-":
72        raise ValueError("Invalid format for LoadTest name: %s" % name)
73
74
75def loadtest_base_name(
76    scenario_name: str, uniquifier_elements: Iterable[str]
77) -> str:
78    """Constructs and returns the base name for a LoadTest resource."""
79    name_elements = scenario_name.split("_")
80    name_elements.extend(uniquifier_elements)
81    return "-".join(element.lower() for element in name_elements)
82
83
84def loadtest_name(
85    prefix: str, scenario_name: str, uniquifier_elements: Iterable[str]
86) -> str:
87    """Constructs and returns a valid name for a LoadTest resource."""
88    base_name = loadtest_base_name(scenario_name, uniquifier_elements)
89    name_elements = []
90    if prefix:
91        name_elements.append(prefix)
92    name_elements.append(base_name)
93    name = "-".join(name_elements)
94    validate_loadtest_name(name)
95    return name
96
97
98def component_name(elements: Iterable[str]) -> str:
99    """Constructs a component name from possibly empty elements."""
100    return "-".join((e for e in elements if e))
101
102
103def validate_annotations(annotations: Dict[str, str]) -> None:
104    """Validates that annotations do not contain reserved names.
105
106    These names are automatically added by the config generator.
107    """
108    names = set(("scenario", "uniquifier")).intersection(annotations)
109    if names:
110        raise ValueError("Annotations contain reserved names: %s" % names)
111
112
113def gen_run_indices(runs_per_test: int) -> Iterable[str]:
114    """Generates run indices for multiple runs, as formatted strings."""
115    if runs_per_test < 2:
116        yield ""
117        return
118    index_length = len("{:d}".format(runs_per_test - 1))
119    index_fmt = "{{:0{:d}d}}".format(index_length)
120    for i in range(runs_per_test):
121        yield index_fmt.format(i)
122
123
124def scenario_name(
125    base_name: str,
126    client_channels: Optional[int],
127    server_threads: Optional[int],
128    offered_load: Optional[int],
129):
130    """Constructs scenario name from base name and modifiers."""
131
132    elements = [base_name]
133    if client_channels:
134        elements.append("{:d}channels".format(client_channels))
135    if server_threads:
136        elements.append("{:d}threads".format(server_threads))
137    if offered_load:
138        elements.append("{:d}load".format(offered_load))
139    return "_".join(elements)
140
141
142def scenario_transform_function(
143    client_channels: Optional[int],
144    server_threads: Optional[int],
145    offered_loads: Optional[Iterable[int]],
146) -> Optional[
147    Callable[[Iterable[Mapping[str, Any]]], Iterable[Mapping[str, Any]]]
148]:
149    """Returns a transform to be applied to a list of scenarios."""
150    if not any((client_channels, server_threads, len(offered_loads))):
151        return lambda s: s
152
153    def _transform(
154        scenarios: Iterable[Mapping[str, Any]]
155    ) -> Iterable[Mapping[str, Any]]:
156        """Transforms scenarios by inserting num of client channels, number of async_server_threads and offered_load."""
157
158        for base_scenario in scenarios:
159            base_name = base_scenario["name"]
160            if client_channels:
161                base_scenario["client_config"][
162                    "client_channels"
163                ] = client_channels
164
165            if server_threads:
166                base_scenario["server_config"][
167                    "async_server_threads"
168                ] = server_threads
169
170            if not offered_loads:
171                base_scenario["name"] = scenario_name(
172                    base_name, client_channels, server_threads, 0
173                )
174                yield base_scenario
175                return
176
177            for offered_load in offered_loads:
178                scenario = copy.deepcopy(base_scenario)
179                scenario["client_config"]["load_params"] = {
180                    "poisson": {"offered_load": offered_load}
181                }
182                scenario["name"] = scenario_name(
183                    base_name, client_channels, server_threads, offered_load
184                )
185                yield scenario
186
187    return _transform
188
189
190def gen_loadtest_configs(
191    base_config: Mapping[str, Any],
192    base_config_clients: Iterable[Mapping[str, Any]],
193    base_config_servers: Iterable[Mapping[str, Any]],
194    scenario_name_regex: str,
195    language_config: scenario_config_exporter.LanguageConfig,
196    loadtest_name_prefix: str,
197    uniquifier_elements: Iterable[str],
198    annotations: Mapping[str, str],
199    instances_per_client: int = 1,
200    runs_per_test: int = 1,
201    scenario_transform: Callable[
202        [Iterable[Mapping[str, Any]]], List[Dict[str, Any]]
203    ] = lambda s: s,
204) -> Iterable[Dict[str, Any]]:
205    """Generates LoadTest configurations for a given language config.
206
207    The LoadTest configurations are generated as YAML objects.
208    """
209    validate_annotations(annotations)
210    prefix = loadtest_name_prefix or default_prefix()
211    cl = safe_name(language_config.client_language or language_config.language)
212    sl = safe_name(language_config.server_language or language_config.language)
213    scenario_filter = scenario_config_exporter.scenario_filter(
214        scenario_name_regex=scenario_name_regex,
215        category=language_config.category,
216        client_language=language_config.client_language,
217        server_language=language_config.server_language,
218    )
219
220    scenarios = scenario_transform(
221        scenario_config_exporter.gen_scenarios(
222            language_config.language, scenario_filter
223        )
224    )
225
226    for scenario in scenarios:
227        for run_index in gen_run_indices(runs_per_test):
228            uniq = (
229                uniquifier_elements + [run_index]
230                if run_index
231                else uniquifier_elements
232            )
233            name = loadtest_name(prefix, scenario["name"], uniq)
234            scenario_str = (
235                json.dumps({"scenarios": scenario}, indent="  ") + "\n"
236            )
237
238            config = copy.deepcopy(base_config)
239
240            metadata = config["metadata"]
241            metadata["name"] = name
242            if "labels" not in metadata:
243                metadata["labels"] = dict()
244            metadata["labels"]["language"] = safe_name(language_config.language)
245            metadata["labels"]["prefix"] = prefix
246            if "annotations" not in metadata:
247                metadata["annotations"] = dict()
248            metadata["annotations"].update(annotations)
249            metadata["annotations"].update(
250                {
251                    "scenario": scenario["name"],
252                    "uniquifier": "-".join(uniq),
253                }
254            )
255
256            spec = config["spec"]
257
258            # Select clients with the required language.
259            clients = [
260                client
261                for client in base_config_clients
262                if client["language"] == cl
263            ]
264            if not clients:
265                raise IndexError(
266                    "Client language not found in template: %s" % cl
267                )
268
269            # Validate config for additional client instances.
270            if instances_per_client > 1:
271                c = collections.Counter(
272                    (client.get("name", "") for client in clients)
273                )
274                if max(c.values()) > 1:
275                    raise ValueError(
276                        "Multiple instances of multiple clients requires "
277                        "unique names, name counts for language %s: %s"
278                        % (cl, c.most_common())
279                    )
280
281            # Name client instances with an index starting from zero.
282            client_instances = []
283            for i in range(instances_per_client):
284                client_instances.extend(copy.deepcopy(clients))
285                for client in client_instances[-len(clients) :]:
286                    client["name"] = component_name(
287                        (client.get("name", ""), str(i))
288                    )
289
290            # Set clients to named instances.
291            spec["clients"] = client_instances
292
293            # Select servers with the required language.
294            servers = copy.deepcopy(
295                [
296                    server
297                    for server in base_config_servers
298                    if server["language"] == sl
299                ]
300            )
301            if not servers:
302                raise IndexError(
303                    "Server language not found in template: %s" % sl
304                )
305
306            # Name servers with an index for consistency with clients.
307            for i, server in enumerate(servers):
308                server["name"] = component_name(
309                    (server.get("name", ""), str(i))
310                )
311
312            # Set servers to named instances.
313            spec["servers"] = servers
314
315            # Add driver, if needed.
316            if "driver" not in spec:
317                spec["driver"] = dict()
318
319            # Ensure driver has language and run fields.
320            driver = spec["driver"]
321            if "language" not in driver:
322                driver["language"] = safe_name("c++")
323            if "run" not in driver:
324                driver["run"] = dict()
325
326            # Name the driver with an index for consistency with workers.
327            # There is only one driver, so the index is zero.
328            if "name" not in driver or not driver["name"]:
329                driver["name"] = "0"
330
331            spec["scenariosJSON"] = scenario_str
332
333            yield config
334
335
336def parse_key_value_args(args: Optional[Iterable[str]]) -> Dict[str, str]:
337    """Parses arguments in the form key=value into a dictionary."""
338    d = dict()
339    if args is None:
340        return d
341    for arg in args:
342        key, equals, value = arg.partition("=")
343        if equals != "=":
344            raise ValueError("Expected key=value: " + value)
345        d[key] = value
346    return d
347
348
349def clear_empty_fields(config: Dict[str, Any]) -> None:
350    """Clears fields set to empty values by string substitution."""
351    spec = config["spec"]
352    if "clients" in spec:
353        for client in spec["clients"]:
354            if "pool" in client and not client["pool"]:
355                del client["pool"]
356    if "servers" in spec:
357        for server in spec["servers"]:
358            if "pool" in server and not server["pool"]:
359                del server["pool"]
360    if "driver" in spec:
361        driver = spec["driver"]
362        if "pool" in driver and not driver["pool"]:
363            del driver["pool"]
364        if (
365            "run" in driver
366            and "image" in driver["run"]
367            and not driver["run"]["image"]
368        ):
369            del driver["run"]["image"]
370    if "results" in spec and not (
371        "bigQueryTable" in spec["results"] and spec["results"]["bigQueryTable"]
372    ):
373        del spec["results"]
374
375
376def config_dumper(header_comment: str) -> Type[yaml.SafeDumper]:
377    """Returns a custom dumper to dump configurations in the expected format."""
378
379    class ConfigDumper(yaml.SafeDumper):
380        def expect_stream_start(self):
381            super().expect_stream_start()
382            if isinstance(self.event, yaml.StreamStartEvent):
383                self.write_indent()
384                self.write_indicator(header_comment, need_whitespace=False)
385
386    def str_presenter(dumper, data):
387        if "\n" in data:
388            return dumper.represent_scalar(
389                "tag:yaml.org,2002:str", data, style="|"
390            )
391        return dumper.represent_scalar("tag:yaml.org,2002:str", data)
392
393    ConfigDumper.add_representer(str, str_presenter)
394
395    return ConfigDumper
396
397
398def main() -> None:
399    language_choices = sorted(scenario_config.LANGUAGES.keys())
400    argp = argparse.ArgumentParser(
401        description="Generates load test configs from a template.",
402        fromfile_prefix_chars="@",
403    )
404    argp.add_argument(
405        "-l",
406        "--language",
407        action="append",
408        choices=language_choices,
409        required=True,
410        help="Language(s) to benchmark.",
411        dest="languages",
412    )
413    argp.add_argument(
414        "-t",
415        "--template",
416        type=str,
417        required=True,
418        help="LoadTest configuration yaml file template.",
419    )
420    argp.add_argument(
421        "-s",
422        "--substitution",
423        action="append",
424        default=[],
425        help="Template substitution(s), in the form key=value.",
426        dest="substitutions",
427    )
428    argp.add_argument(
429        "-p", "--prefix", default="", type=str, help="Test name prefix."
430    )
431    argp.add_argument(
432        "-u",
433        "--uniquifier_element",
434        action="append",
435        default=[],
436        help="String element(s) to make the test name unique.",
437        dest="uniquifier_elements",
438    )
439    argp.add_argument(
440        "-d",
441        action="store_true",
442        help="Use creation date and time as an additional uniquifier element.",
443    )
444    argp.add_argument(
445        "-a",
446        "--annotation",
447        action="append",
448        default=[],
449        help="metadata.annotation(s), in the form key=value.",
450        dest="annotations",
451    )
452    argp.add_argument(
453        "-r",
454        "--regex",
455        default=".*",
456        type=str,
457        help="Regex to select scenarios to run.",
458    )
459    argp.add_argument(
460        "--category",
461        choices=[
462            "all",
463            "inproc",
464            "scalable",
465            "smoketest",
466            "sweep",
467            "psm",
468            "dashboard",
469        ],
470        default="all",
471        help="Select a category of tests to run.",
472    )
473    argp.add_argument(
474        "--allow_client_language",
475        action="append",
476        choices=language_choices,
477        default=[],
478        help="Allow cross-language scenarios with this client language.",
479        dest="allow_client_languages",
480    )
481    argp.add_argument(
482        "--allow_server_language",
483        action="append",
484        choices=language_choices,
485        default=[],
486        help="Allow cross-language scenarios with this server language.",
487        dest="allow_server_languages",
488    )
489    argp.add_argument(
490        "--instances_per_client",
491        default=1,
492        type=int,
493        help="Number of instances to generate for each client.",
494    )
495    argp.add_argument(
496        "--runs_per_test",
497        default=1,
498        type=int,
499        help="Number of copies to generate for each test.",
500    )
501    argp.add_argument(
502        "-o",
503        "--output",
504        type=str,
505        help="Output file name. Output to stdout if not set.",
506    )
507    argp.add_argument(
508        "--client_channels", type=int, help="Number of client channels."
509    )
510    argp.add_argument(
511        "--server_threads", type=int, help="Number of async server threads."
512    )
513    argp.add_argument(
514        "--offered_loads",
515        nargs="*",
516        type=int,
517        default=[],
518        help=(
519            "A list of QPS values at which each load test scenario will be run."
520        ),
521    )
522    args = argp.parse_args()
523
524    if args.instances_per_client < 1:
525        argp.error("instances_per_client must be greater than zero.")
526
527    if args.runs_per_test < 1:
528        argp.error("runs_per_test must be greater than zero.")
529
530    # Config generation ignores environment variables that are passed by the
531    # controller at runtime.
532    substitutions = {
533        "DRIVER_PORT": "${DRIVER_PORT}",
534        "KILL_AFTER": "${KILL_AFTER}",
535        "POD_TIMEOUT": "${POD_TIMEOUT}",
536    }
537
538    # The user can override the ignored variables above by passing them in as
539    # substitution keys.
540    substitutions.update(parse_key_value_args(args.substitutions))
541
542    uniquifier_elements = args.uniquifier_elements
543    if args.d:
544        uniquifier_elements.append(now_string())
545
546    annotations = parse_key_value_args(args.annotations)
547
548    transform = scenario_transform_function(
549        args.client_channels, args.server_threads, args.offered_loads
550    )
551
552    with open(args.template) as f:
553        base_config = yaml.safe_load(
554            string.Template(f.read()).substitute(substitutions)
555        )
556
557    clear_empty_fields(base_config)
558
559    spec = base_config["spec"]
560    base_config_clients = spec["clients"]
561    del spec["clients"]
562    base_config_servers = spec["servers"]
563    del spec["servers"]
564
565    client_languages = [""] + args.allow_client_languages
566    server_languages = [""] + args.allow_server_languages
567    config_generators = []
568    for l, cl, sl in itertools.product(
569        args.languages, client_languages, server_languages
570    ):
571        language_config = scenario_config_exporter.LanguageConfig(
572            category=args.category,
573            language=l,
574            client_language=cl,
575            server_language=sl,
576        )
577        config_generators.append(
578            gen_loadtest_configs(
579                base_config,
580                base_config_clients,
581                base_config_servers,
582                args.regex,
583                language_config,
584                loadtest_name_prefix=args.prefix,
585                uniquifier_elements=uniquifier_elements,
586                annotations=annotations,
587                instances_per_client=args.instances_per_client,
588                runs_per_test=args.runs_per_test,
589                scenario_transform=transform,
590            )
591        )
592    configs = (config for config in itertools.chain(*config_generators))
593
594    with open(args.output, "w") if args.output else sys.stdout as f:
595        yaml.dump_all(
596            configs,
597            stream=f,
598            Dumper=config_dumper(CONFIGURATION_FILE_HEADER_COMMENT.strip()),
599            default_flow_style=False,
600        )
601
602
603if __name__ == "__main__":
604    main()
605