xref: /aosp_15_r20/external/grpc-grpc/tools/run_tests/run_grpclb_interop_tests.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1#!/usr/bin/env python3
2# Copyright 2015 gRPC authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Run interop (cross-language) tests in parallel."""
16
17from __future__ import print_function
18
19import argparse
20import atexit
21import itertools
22import json
23import multiprocessing
24import os
25import re
26import subprocess
27import sys
28import tempfile
29import time
30import traceback
31import uuid
32
33import six
34
35import python_utils.dockerjob as dockerjob
36import python_utils.jobset as jobset
37import python_utils.report_utils as report_utils
38
39# Docker doesn't clean up after itself, so we do it on exit.
40atexit.register(lambda: subprocess.call(["stty", "echo"]))
41
42ROOT = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), "../.."))
43os.chdir(ROOT)
44
45_FALLBACK_SERVER_PORT = 443
46_BALANCER_SERVER_PORT = 12000
47_BACKEND_SERVER_PORT = 8080
48
49_TEST_TIMEOUT = 30
50
51_FAKE_SERVERS_SAFENAME = "fake_servers"
52
53# Use a name that's verified by the test certs
54_SERVICE_NAME = "server.test.google.fr"
55
56
57class CXXLanguage:
58    def __init__(self):
59        self.client_cwd = "/var/local/git/grpc"
60        self.safename = "cxx"
61
62    def client_cmd(self, args):
63        return ["bins/opt/interop_client"] + args
64
65    def global_env(self):
66        # 1) Set c-ares as the resolver, to
67        #    enable grpclb.
68        # 2) Turn on verbose logging.
69        # 3) Set the ROOTS_PATH env variable
70        #    to the test CA in order for
71        #    GoogleDefaultCredentials to be
72        #    able to use the test CA.
73        return {
74            "GRPC_DNS_RESOLVER": "ares",
75            "GRPC_VERBOSITY": "DEBUG",
76            "GRPC_TRACE": "client_channel,glb",
77            "GRPC_DEFAULT_SSL_ROOTS_FILE_PATH": (
78                "/var/local/git/grpc/src/core/tsi/test_creds/ca.pem"
79            ),
80        }
81
82    def __str__(self):
83        return "c++"
84
85
86class JavaLanguage:
87    def __init__(self):
88        self.client_cwd = "/var/local/git/grpc-java"
89        self.safename = str(self)
90
91    def client_cmd(self, args):
92        # Take necessary steps to import our test CA into
93        # the set of test CA's that the Java runtime of the
94        # docker container will pick up, so that
95        # Java GoogleDefaultCreds can use it.
96        pem_to_der_cmd = (
97            "openssl x509 -outform der "
98            "-in /external_mount/src/core/tsi/test_creds/ca.pem "
99            "-out /tmp/test_ca.der"
100        )
101        keystore_import_cmd = (
102            "keytool -import "
103            "-keystore /usr/lib/jvm/java-8-oracle/jre/lib/security/cacerts "
104            "-file /tmp/test_ca.der "
105            "-deststorepass changeit "
106            "-noprompt"
107        )
108        return [
109            "bash",
110            "-c",
111            (
112                "{pem_to_der_cmd} && "
113                "{keystore_import_cmd} && "
114                "./run-test-client.sh {java_client_args}"
115            ).format(
116                pem_to_der_cmd=pem_to_der_cmd,
117                keystore_import_cmd=keystore_import_cmd,
118                java_client_args=" ".join(args),
119            ),
120        ]
121
122    def global_env(self):
123        # 1) Enable grpclb
124        # 2) Enable verbose logging
125        return {
126            "JAVA_OPTS": (
127                "-Dio.grpc.internal.DnsNameResolverProvider.enable_grpclb=true "
128                "-Djava.util.logging.config.file=/var/local/grpc_java_logging/logconf.txt"
129            )
130        }
131
132    def __str__(self):
133        return "java"
134
135
136class GoLanguage:
137    def __init__(self):
138        self.client_cwd = "/go/src/google.golang.org/grpc/interop/client"
139        self.safename = str(self)
140
141    def client_cmd(self, args):
142        # Copy the test CA file into the path that
143        # the Go runtime in the docker container will use, so
144        # that Go's GoogleDefaultCredentials can use it.
145        # See https://golang.org/src/crypto/x509/root_linux.go.
146        return [
147            "bash",
148            "-c",
149            (
150                "cp /external_mount/src/core/tsi/test_creds/ca.pem "
151                "/etc/ssl/certs/ca-certificates.crt && "
152                "/go/bin/client {go_client_args}"
153            ).format(go_client_args=" ".join(args)),
154        ]
155
156    def global_env(self):
157        return {
158            "GRPC_GO_LOG_VERBOSITY_LEVEL": "3",
159            "GRPC_GO_LOG_SEVERITY_LEVEL": "INFO",
160        }
161
162    def __str__(self):
163        return "go"
164
165
166_LANGUAGES = {
167    "c++": CXXLanguage(),
168    "go": GoLanguage(),
169    "java": JavaLanguage(),
170}
171
172
173def docker_run_cmdline(cmdline, image, docker_args, cwd, environ=None):
174    """Wraps given cmdline array to create 'docker run' cmdline from it."""
175    # turn environ into -e docker args
176    docker_cmdline = "docker run -i --rm=true".split()
177    if environ:
178        for k, v in list(environ.items()):
179            docker_cmdline += ["-e", "%s=%s" % (k, v)]
180    return docker_cmdline + ["-w", cwd] + docker_args + [image] + cmdline
181
182
183def _job_kill_handler(job):
184    assert job._spec.container_name
185    dockerjob.docker_kill(job._spec.container_name)
186
187
188def transport_security_to_args(transport_security):
189    args = []
190    if transport_security == "tls":
191        args += ["--use_tls=true"]
192    elif transport_security == "alts":
193        args += ["--use_tls=false", "--use_alts=true"]
194    elif transport_security == "insecure":
195        args += ["--use_tls=false"]
196    elif transport_security == "google_default_credentials":
197        args += ["--custom_credentials_type=google_default_credentials"]
198    else:
199        print("Invalid transport security option.")
200        sys.exit(1)
201    return args
202
203
204def lb_client_interop_jobspec(
205    language, dns_server_ip, docker_image, transport_security="tls"
206):
207    """Runs a gRPC client under test in a docker container"""
208    interop_only_options = [
209        "--server_host=%s" % _SERVICE_NAME,
210        "--server_port=%d" % _FALLBACK_SERVER_PORT,
211    ] + transport_security_to_args(transport_security)
212    # Don't set the server host override in any client;
213    # Go and Java default to no override.
214    # We're using a DNS server so there's no need.
215    if language.safename == "c++":
216        interop_only_options += ['--server_host_override=""']
217    # Don't set --use_test_ca; we're configuring
218    # clients to use test CA's via alternate means.
219    interop_only_options += ["--use_test_ca=false"]
220    client_args = language.client_cmd(interop_only_options)
221    container_name = dockerjob.random_name(
222        "lb_interop_client_%s" % language.safename
223    )
224    docker_cmdline = docker_run_cmdline(
225        client_args,
226        environ=language.global_env(),
227        image=docker_image,
228        cwd=language.client_cwd,
229        docker_args=[
230            "--dns=%s" % dns_server_ip,
231            "--net=host",
232            "--name=%s" % container_name,
233            "-v",
234            "{grpc_grpc_root_dir}:/external_mount:ro".format(
235                grpc_grpc_root_dir=ROOT
236            ),
237        ],
238    )
239    jobset.message(
240        "IDLE",
241        "docker_cmdline:\b|%s|" % " ".join(docker_cmdline),
242        do_newline=True,
243    )
244    test_job = jobset.JobSpec(
245        cmdline=docker_cmdline,
246        shortname="lb_interop_client:%s" % language,
247        timeout_seconds=_TEST_TIMEOUT,
248        kill_handler=_job_kill_handler,
249    )
250    test_job.container_name = container_name
251    return test_job
252
253
254def fallback_server_jobspec(transport_security, shortname):
255    """Create jobspec for running a fallback server"""
256    cmdline = [
257        "bin/server",
258        "--port=%d" % _FALLBACK_SERVER_PORT,
259    ] + transport_security_to_args(transport_security)
260    return grpc_server_in_docker_jobspec(
261        server_cmdline=cmdline, shortname=shortname
262    )
263
264
265def backend_server_jobspec(transport_security, shortname):
266    """Create jobspec for running a backend server"""
267    cmdline = [
268        "bin/server",
269        "--port=%d" % _BACKEND_SERVER_PORT,
270    ] + transport_security_to_args(transport_security)
271    return grpc_server_in_docker_jobspec(
272        server_cmdline=cmdline, shortname=shortname
273    )
274
275
276def grpclb_jobspec(transport_security, short_stream, backend_addrs, shortname):
277    """Create jobspec for running a balancer server"""
278    cmdline = [
279        "bin/fake_grpclb",
280        "--backend_addrs=%s" % ",".join(backend_addrs),
281        "--port=%d" % _BALANCER_SERVER_PORT,
282        "--short_stream=%s" % short_stream,
283        "--service_name=%s" % _SERVICE_NAME,
284    ] + transport_security_to_args(transport_security)
285    return grpc_server_in_docker_jobspec(
286        server_cmdline=cmdline, shortname=shortname
287    )
288
289
290def grpc_server_in_docker_jobspec(server_cmdline, shortname):
291    container_name = dockerjob.random_name(shortname)
292    environ = {
293        "GRPC_GO_LOG_VERBOSITY_LEVEL": "3",
294        "GRPC_GO_LOG_SEVERITY_LEVEL": "INFO ",
295    }
296    docker_cmdline = docker_run_cmdline(
297        server_cmdline,
298        cwd="/go",
299        image=docker_images.get(_FAKE_SERVERS_SAFENAME),
300        environ=environ,
301        docker_args=["--name=%s" % container_name],
302    )
303    jobset.message(
304        "IDLE",
305        "docker_cmdline:\b|%s|" % " ".join(docker_cmdline),
306        do_newline=True,
307    )
308    server_job = jobset.JobSpec(
309        cmdline=docker_cmdline, shortname=shortname, timeout_seconds=30 * 60
310    )
311    server_job.container_name = container_name
312    return server_job
313
314
315def dns_server_in_docker_jobspec(
316    grpclb_ips,
317    fallback_ips,
318    shortname,
319    cause_no_error_no_data_for_balancer_a_record,
320):
321    container_name = dockerjob.random_name(shortname)
322    run_dns_server_cmdline = [
323        "python",
324        "test/cpp/naming/utils/run_dns_server_for_lb_interop_tests.py",
325        "--grpclb_ips=%s" % ",".join(grpclb_ips),
326        "--fallback_ips=%s" % ",".join(fallback_ips),
327    ]
328    if cause_no_error_no_data_for_balancer_a_record:
329        run_dns_server_cmdline.append(
330            "--cause_no_error_no_data_for_balancer_a_record"
331        )
332    docker_cmdline = docker_run_cmdline(
333        run_dns_server_cmdline,
334        cwd="/var/local/git/grpc",
335        image=docker_images.get(_FAKE_SERVERS_SAFENAME),
336        docker_args=["--name=%s" % container_name],
337    )
338    jobset.message(
339        "IDLE",
340        "docker_cmdline:\b|%s|" % " ".join(docker_cmdline),
341        do_newline=True,
342    )
343    server_job = jobset.JobSpec(
344        cmdline=docker_cmdline, shortname=shortname, timeout_seconds=30 * 60
345    )
346    server_job.container_name = container_name
347    return server_job
348
349
350def build_interop_image_jobspec(lang_safename, basename_prefix="grpc_interop"):
351    """Creates jobspec for building interop docker image for a language"""
352    tag = "%s_%s:%s" % (basename_prefix, lang_safename, uuid.uuid4())
353    env = {
354        "INTEROP_IMAGE": tag,
355        "BASE_NAME": "%s_%s" % (basename_prefix, lang_safename),
356    }
357    build_job = jobset.JobSpec(
358        cmdline=["tools/run_tests/dockerize/build_interop_image.sh"],
359        environ=env,
360        shortname="build_docker_%s" % lang_safename,
361        timeout_seconds=30 * 60,
362    )
363    build_job.tag = tag
364    return build_job
365
366
367argp = argparse.ArgumentParser(description="Run interop tests.")
368argp.add_argument(
369    "-l",
370    "--language",
371    choices=["all"] + sorted(_LANGUAGES),
372    nargs="+",
373    default=["all"],
374    help="Clients to run.",
375)
376argp.add_argument("-j", "--jobs", default=multiprocessing.cpu_count(), type=int)
377argp.add_argument(
378    "-s",
379    "--scenarios_file",
380    default=None,
381    type=str,
382    help="File containing test scenarios as JSON configs.",
383)
384argp.add_argument(
385    "-n",
386    "--scenario_name",
387    default=None,
388    type=str,
389    help=(
390        "Useful for manual runs: specify the name of "
391        "the scenario to run from scenarios_file. Run all scenarios if unset."
392    ),
393)
394argp.add_argument(
395    "--cxx_image_tag",
396    default=None,
397    type=str,
398    help=(
399        "Setting this skips the clients docker image "
400        "build step and runs the client from the named "
401        "image. Only supports running a one client language."
402    ),
403)
404argp.add_argument(
405    "--go_image_tag",
406    default=None,
407    type=str,
408    help=(
409        "Setting this skips the clients docker image build "
410        "step and runs the client from the named image. Only "
411        "supports running a one client language."
412    ),
413)
414argp.add_argument(
415    "--java_image_tag",
416    default=None,
417    type=str,
418    help=(
419        "Setting this skips the clients docker image build "
420        "step and runs the client from the named image. Only "
421        "supports running a one client language."
422    ),
423)
424argp.add_argument(
425    "--servers_image_tag",
426    default=None,
427    type=str,
428    help=(
429        "Setting this skips the fake servers docker image "
430        "build step and runs the servers from the named image."
431    ),
432)
433argp.add_argument(
434    "--no_skips",
435    default=False,
436    type=bool,
437    nargs="?",
438    const=True,
439    help=(
440        "Useful for manual runs. Setting this overrides test "
441        '"skips" configured in test scenarios.'
442    ),
443)
444argp.add_argument(
445    "--verbose",
446    default=False,
447    type=bool,
448    nargs="?",
449    const=True,
450    help="Increase logging.",
451)
452args = argp.parse_args()
453
454docker_images = {}
455
456build_jobs = []
457if len(args.language) and args.language[0] == "all":
458    languages = list(_LANGUAGES.keys())
459else:
460    languages = args.language
461for lang_name in languages:
462    l = _LANGUAGES[lang_name]
463    # First check if a pre-built image was supplied, and avoid
464    # rebuilding the particular docker image if so.
465    if lang_name == "c++" and args.cxx_image_tag:
466        docker_images[str(l.safename)] = args.cxx_image_tag
467    elif lang_name == "go" and args.go_image_tag:
468        docker_images[str(l.safename)] = args.go_image_tag
469    elif lang_name == "java" and args.java_image_tag:
470        docker_images[str(l.safename)] = args.java_image_tag
471    else:
472        # Build the test client in docker and save the fully
473        # built image.
474        job = build_interop_image_jobspec(l.safename)
475        build_jobs.append(job)
476        docker_images[str(l.safename)] = job.tag
477
478# First check if a pre-built image was supplied.
479if args.servers_image_tag:
480    docker_images[_FAKE_SERVERS_SAFENAME] = args.servers_image_tag
481else:
482    # Build the test servers in docker and save the fully
483    # built image.
484    job = build_interop_image_jobspec(
485        _FAKE_SERVERS_SAFENAME, basename_prefix="lb_interop"
486    )
487    build_jobs.append(job)
488    docker_images[_FAKE_SERVERS_SAFENAME] = job.tag
489
490if build_jobs:
491    jobset.message("START", "Building interop docker images.", do_newline=True)
492    print("Jobs to run: \n%s\n" % "\n".join(str(j) for j in build_jobs))
493    num_failures, _ = jobset.run(
494        build_jobs, newline_on_success=True, maxjobs=args.jobs
495    )
496    if num_failures == 0:
497        jobset.message(
498            "SUCCESS", "All docker images built successfully.", do_newline=True
499        )
500    else:
501        jobset.message(
502            "FAILED", "Failed to build interop docker images.", do_newline=True
503        )
504        sys.exit(1)
505
506
507def wait_until_dns_server_is_up(dns_server_ip):
508    """Probes the DNS server until it's running and safe for tests."""
509    for i in range(0, 30):
510        print("Health check: attempt to connect to DNS server over TCP.")
511        tcp_connect_subprocess = subprocess.Popen(
512            [
513                os.path.join(
514                    os.getcwd(), "test/cpp/naming/utils/tcp_connect.py"
515                ),
516                "--server_host",
517                dns_server_ip,
518                "--server_port",
519                str(53),
520                "--timeout",
521                str(1),
522            ]
523        )
524        tcp_connect_subprocess.communicate()
525        if tcp_connect_subprocess.returncode == 0:
526            print(
527                "Health check: attempt to make an A-record query to DNS server."
528            )
529            dns_resolver_subprocess = subprocess.Popen(
530                [
531                    os.path.join(
532                        os.getcwd(), "test/cpp/naming/utils/dns_resolver.py"
533                    ),
534                    "--qname",
535                    (
536                        "health-check-local-dns-server-is-alive."
537                        "resolver-tests.grpctestingexp"
538                    ),
539                    "--server_host",
540                    dns_server_ip,
541                    "--server_port",
542                    str(53),
543                ],
544                stdout=subprocess.PIPE,
545            )
546            dns_resolver_stdout, _ = dns_resolver_subprocess.communicate()
547            if dns_resolver_subprocess.returncode == 0:
548                if "123.123.123.123" in dns_resolver_stdout:
549                    print(
550                        "DNS server is up! "
551                        "Successfully reached it over UDP and TCP."
552                    )
553                    return
554        time.sleep(0.1)
555    raise Exception(
556        "Failed to reach DNS server over TCP and/or UDP. "
557        "Exitting without running tests."
558    )
559
560
561def shortname(shortname_prefix, shortname, index):
562    return "%s_%s_%d" % (shortname_prefix, shortname, index)
563
564
565def run_one_scenario(scenario_config):
566    jobset.message("START", "Run scenario: %s" % scenario_config["name"])
567    server_jobs = {}
568    server_addresses = {}
569    suppress_server_logs = True
570    try:
571        backend_addrs = []
572        fallback_ips = []
573        grpclb_ips = []
574        shortname_prefix = scenario_config["name"]
575        # Start backends
576        for i in range(len(scenario_config["backend_configs"])):
577            backend_config = scenario_config["backend_configs"][i]
578            backend_shortname = shortname(shortname_prefix, "backend_server", i)
579            backend_spec = backend_server_jobspec(
580                backend_config["transport_sec"], backend_shortname
581            )
582            backend_job = dockerjob.DockerJob(backend_spec)
583            server_jobs[backend_shortname] = backend_job
584            backend_addrs.append(
585                "%s:%d" % (backend_job.ip_address(), _BACKEND_SERVER_PORT)
586            )
587        # Start fallbacks
588        for i in range(len(scenario_config["fallback_configs"])):
589            fallback_config = scenario_config["fallback_configs"][i]
590            fallback_shortname = shortname(
591                shortname_prefix, "fallback_server", i
592            )
593            fallback_spec = fallback_server_jobspec(
594                fallback_config["transport_sec"], fallback_shortname
595            )
596            fallback_job = dockerjob.DockerJob(fallback_spec)
597            server_jobs[fallback_shortname] = fallback_job
598            fallback_ips.append(fallback_job.ip_address())
599        # Start balancers
600        for i in range(len(scenario_config["balancer_configs"])):
601            balancer_config = scenario_config["balancer_configs"][i]
602            grpclb_shortname = shortname(shortname_prefix, "grpclb_server", i)
603            grpclb_spec = grpclb_jobspec(
604                balancer_config["transport_sec"],
605                balancer_config["short_stream"],
606                backend_addrs,
607                grpclb_shortname,
608            )
609            grpclb_job = dockerjob.DockerJob(grpclb_spec)
610            server_jobs[grpclb_shortname] = grpclb_job
611            grpclb_ips.append(grpclb_job.ip_address())
612        # Start DNS server
613        dns_server_shortname = shortname(shortname_prefix, "dns_server", 0)
614        dns_server_spec = dns_server_in_docker_jobspec(
615            grpclb_ips,
616            fallback_ips,
617            dns_server_shortname,
618            scenario_config["cause_no_error_no_data_for_balancer_a_record"],
619        )
620        dns_server_job = dockerjob.DockerJob(dns_server_spec)
621        server_jobs[dns_server_shortname] = dns_server_job
622        # Get the IP address of the docker container running the DNS server.
623        # The DNS server is running on port 53 of that IP address. Note we will
624        # point the DNS resolvers of grpc clients under test to our controlled
625        # DNS server by effectively modifying the /etc/resolve.conf "nameserver"
626        # lists of their docker containers.
627        dns_server_ip = dns_server_job.ip_address()
628        wait_until_dns_server_is_up(dns_server_ip)
629        # Run clients
630        jobs = []
631        for lang_name in languages:
632            # Skip languages that are known to not currently
633            # work for this test.
634            if not args.no_skips and lang_name in scenario_config.get(
635                "skip_langs", []
636            ):
637                jobset.message(
638                    "IDLE",
639                    "Skipping scenario: %s for language: %s\n"
640                    % (scenario_config["name"], lang_name),
641                )
642                continue
643            lang = _LANGUAGES[lang_name]
644            test_job = lb_client_interop_jobspec(
645                lang,
646                dns_server_ip,
647                docker_image=docker_images.get(lang.safename),
648                transport_security=scenario_config["transport_sec"],
649            )
650            jobs.append(test_job)
651        jobset.message(
652            "IDLE", "Jobs to run: \n%s\n" % "\n".join(str(job) for job in jobs)
653        )
654        num_failures, resultset = jobset.run(
655            jobs, newline_on_success=True, maxjobs=args.jobs
656        )
657        report_utils.render_junit_xml_report(resultset, "sponge_log.xml")
658        if num_failures:
659            suppress_server_logs = False
660            jobset.message(
661                "FAILED",
662                "Scenario: %s. Some tests failed" % scenario_config["name"],
663                do_newline=True,
664            )
665        else:
666            jobset.message(
667                "SUCCESS",
668                "Scenario: %s. All tests passed" % scenario_config["name"],
669                do_newline=True,
670            )
671        return num_failures
672    finally:
673        # Check if servers are still running.
674        for server, job in list(server_jobs.items()):
675            if not job.is_running():
676                print('Server "%s" has exited prematurely.' % server)
677        suppress_failure = suppress_server_logs and not args.verbose
678        dockerjob.finish_jobs(
679            [j for j in six.itervalues(server_jobs)],
680            suppress_failure=suppress_failure,
681        )
682
683
684num_failures = 0
685with open(args.scenarios_file, "r") as scenarios_input:
686    all_scenarios = json.loads(scenarios_input.read())
687    for scenario in all_scenarios:
688        if args.scenario_name:
689            if args.scenario_name != scenario["name"]:
690                jobset.message(
691                    "IDLE", "Skipping scenario: %s" % scenario["name"]
692                )
693                continue
694        num_failures += run_one_scenario(scenario)
695if num_failures == 0:
696    sys.exit(0)
697else:
698    sys.exit(1)
699