1#!/usr/bin/env python3 2# Copyright 2015 gRPC authors. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15"""Run interop (cross-language) tests in parallel.""" 16 17from __future__ import print_function 18 19import argparse 20import atexit 21import itertools 22import json 23import multiprocessing 24import os 25import re 26import subprocess 27import sys 28import tempfile 29import time 30import traceback 31import uuid 32 33import six 34 35import python_utils.dockerjob as dockerjob 36import python_utils.jobset as jobset 37import python_utils.report_utils as report_utils 38 39# Docker doesn't clean up after itself, so we do it on exit. 40atexit.register(lambda: subprocess.call(["stty", "echo"])) 41 42ROOT = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), "../..")) 43os.chdir(ROOT) 44 45_FALLBACK_SERVER_PORT = 443 46_BALANCER_SERVER_PORT = 12000 47_BACKEND_SERVER_PORT = 8080 48 49_TEST_TIMEOUT = 30 50 51_FAKE_SERVERS_SAFENAME = "fake_servers" 52 53# Use a name that's verified by the test certs 54_SERVICE_NAME = "server.test.google.fr" 55 56 57class CXXLanguage: 58 def __init__(self): 59 self.client_cwd = "/var/local/git/grpc" 60 self.safename = "cxx" 61 62 def client_cmd(self, args): 63 return ["bins/opt/interop_client"] + args 64 65 def global_env(self): 66 # 1) Set c-ares as the resolver, to 67 # enable grpclb. 68 # 2) Turn on verbose logging. 69 # 3) Set the ROOTS_PATH env variable 70 # to the test CA in order for 71 # GoogleDefaultCredentials to be 72 # able to use the test CA. 73 return { 74 "GRPC_DNS_RESOLVER": "ares", 75 "GRPC_VERBOSITY": "DEBUG", 76 "GRPC_TRACE": "client_channel,glb", 77 "GRPC_DEFAULT_SSL_ROOTS_FILE_PATH": ( 78 "/var/local/git/grpc/src/core/tsi/test_creds/ca.pem" 79 ), 80 } 81 82 def __str__(self): 83 return "c++" 84 85 86class JavaLanguage: 87 def __init__(self): 88 self.client_cwd = "/var/local/git/grpc-java" 89 self.safename = str(self) 90 91 def client_cmd(self, args): 92 # Take necessary steps to import our test CA into 93 # the set of test CA's that the Java runtime of the 94 # docker container will pick up, so that 95 # Java GoogleDefaultCreds can use it. 96 pem_to_der_cmd = ( 97 "openssl x509 -outform der " 98 "-in /external_mount/src/core/tsi/test_creds/ca.pem " 99 "-out /tmp/test_ca.der" 100 ) 101 keystore_import_cmd = ( 102 "keytool -import " 103 "-keystore /usr/lib/jvm/java-8-oracle/jre/lib/security/cacerts " 104 "-file /tmp/test_ca.der " 105 "-deststorepass changeit " 106 "-noprompt" 107 ) 108 return [ 109 "bash", 110 "-c", 111 ( 112 "{pem_to_der_cmd} && " 113 "{keystore_import_cmd} && " 114 "./run-test-client.sh {java_client_args}" 115 ).format( 116 pem_to_der_cmd=pem_to_der_cmd, 117 keystore_import_cmd=keystore_import_cmd, 118 java_client_args=" ".join(args), 119 ), 120 ] 121 122 def global_env(self): 123 # 1) Enable grpclb 124 # 2) Enable verbose logging 125 return { 126 "JAVA_OPTS": ( 127 "-Dio.grpc.internal.DnsNameResolverProvider.enable_grpclb=true " 128 "-Djava.util.logging.config.file=/var/local/grpc_java_logging/logconf.txt" 129 ) 130 } 131 132 def __str__(self): 133 return "java" 134 135 136class GoLanguage: 137 def __init__(self): 138 self.client_cwd = "/go/src/google.golang.org/grpc/interop/client" 139 self.safename = str(self) 140 141 def client_cmd(self, args): 142 # Copy the test CA file into the path that 143 # the Go runtime in the docker container will use, so 144 # that Go's GoogleDefaultCredentials can use it. 145 # See https://golang.org/src/crypto/x509/root_linux.go. 146 return [ 147 "bash", 148 "-c", 149 ( 150 "cp /external_mount/src/core/tsi/test_creds/ca.pem " 151 "/etc/ssl/certs/ca-certificates.crt && " 152 "/go/bin/client {go_client_args}" 153 ).format(go_client_args=" ".join(args)), 154 ] 155 156 def global_env(self): 157 return { 158 "GRPC_GO_LOG_VERBOSITY_LEVEL": "3", 159 "GRPC_GO_LOG_SEVERITY_LEVEL": "INFO", 160 } 161 162 def __str__(self): 163 return "go" 164 165 166_LANGUAGES = { 167 "c++": CXXLanguage(), 168 "go": GoLanguage(), 169 "java": JavaLanguage(), 170} 171 172 173def docker_run_cmdline(cmdline, image, docker_args, cwd, environ=None): 174 """Wraps given cmdline array to create 'docker run' cmdline from it.""" 175 # turn environ into -e docker args 176 docker_cmdline = "docker run -i --rm=true".split() 177 if environ: 178 for k, v in list(environ.items()): 179 docker_cmdline += ["-e", "%s=%s" % (k, v)] 180 return docker_cmdline + ["-w", cwd] + docker_args + [image] + cmdline 181 182 183def _job_kill_handler(job): 184 assert job._spec.container_name 185 dockerjob.docker_kill(job._spec.container_name) 186 187 188def transport_security_to_args(transport_security): 189 args = [] 190 if transport_security == "tls": 191 args += ["--use_tls=true"] 192 elif transport_security == "alts": 193 args += ["--use_tls=false", "--use_alts=true"] 194 elif transport_security == "insecure": 195 args += ["--use_tls=false"] 196 elif transport_security == "google_default_credentials": 197 args += ["--custom_credentials_type=google_default_credentials"] 198 else: 199 print("Invalid transport security option.") 200 sys.exit(1) 201 return args 202 203 204def lb_client_interop_jobspec( 205 language, dns_server_ip, docker_image, transport_security="tls" 206): 207 """Runs a gRPC client under test in a docker container""" 208 interop_only_options = [ 209 "--server_host=%s" % _SERVICE_NAME, 210 "--server_port=%d" % _FALLBACK_SERVER_PORT, 211 ] + transport_security_to_args(transport_security) 212 # Don't set the server host override in any client; 213 # Go and Java default to no override. 214 # We're using a DNS server so there's no need. 215 if language.safename == "c++": 216 interop_only_options += ['--server_host_override=""'] 217 # Don't set --use_test_ca; we're configuring 218 # clients to use test CA's via alternate means. 219 interop_only_options += ["--use_test_ca=false"] 220 client_args = language.client_cmd(interop_only_options) 221 container_name = dockerjob.random_name( 222 "lb_interop_client_%s" % language.safename 223 ) 224 docker_cmdline = docker_run_cmdline( 225 client_args, 226 environ=language.global_env(), 227 image=docker_image, 228 cwd=language.client_cwd, 229 docker_args=[ 230 "--dns=%s" % dns_server_ip, 231 "--net=host", 232 "--name=%s" % container_name, 233 "-v", 234 "{grpc_grpc_root_dir}:/external_mount:ro".format( 235 grpc_grpc_root_dir=ROOT 236 ), 237 ], 238 ) 239 jobset.message( 240 "IDLE", 241 "docker_cmdline:\b|%s|" % " ".join(docker_cmdline), 242 do_newline=True, 243 ) 244 test_job = jobset.JobSpec( 245 cmdline=docker_cmdline, 246 shortname="lb_interop_client:%s" % language, 247 timeout_seconds=_TEST_TIMEOUT, 248 kill_handler=_job_kill_handler, 249 ) 250 test_job.container_name = container_name 251 return test_job 252 253 254def fallback_server_jobspec(transport_security, shortname): 255 """Create jobspec for running a fallback server""" 256 cmdline = [ 257 "bin/server", 258 "--port=%d" % _FALLBACK_SERVER_PORT, 259 ] + transport_security_to_args(transport_security) 260 return grpc_server_in_docker_jobspec( 261 server_cmdline=cmdline, shortname=shortname 262 ) 263 264 265def backend_server_jobspec(transport_security, shortname): 266 """Create jobspec for running a backend server""" 267 cmdline = [ 268 "bin/server", 269 "--port=%d" % _BACKEND_SERVER_PORT, 270 ] + transport_security_to_args(transport_security) 271 return grpc_server_in_docker_jobspec( 272 server_cmdline=cmdline, shortname=shortname 273 ) 274 275 276def grpclb_jobspec(transport_security, short_stream, backend_addrs, shortname): 277 """Create jobspec for running a balancer server""" 278 cmdline = [ 279 "bin/fake_grpclb", 280 "--backend_addrs=%s" % ",".join(backend_addrs), 281 "--port=%d" % _BALANCER_SERVER_PORT, 282 "--short_stream=%s" % short_stream, 283 "--service_name=%s" % _SERVICE_NAME, 284 ] + transport_security_to_args(transport_security) 285 return grpc_server_in_docker_jobspec( 286 server_cmdline=cmdline, shortname=shortname 287 ) 288 289 290def grpc_server_in_docker_jobspec(server_cmdline, shortname): 291 container_name = dockerjob.random_name(shortname) 292 environ = { 293 "GRPC_GO_LOG_VERBOSITY_LEVEL": "3", 294 "GRPC_GO_LOG_SEVERITY_LEVEL": "INFO ", 295 } 296 docker_cmdline = docker_run_cmdline( 297 server_cmdline, 298 cwd="/go", 299 image=docker_images.get(_FAKE_SERVERS_SAFENAME), 300 environ=environ, 301 docker_args=["--name=%s" % container_name], 302 ) 303 jobset.message( 304 "IDLE", 305 "docker_cmdline:\b|%s|" % " ".join(docker_cmdline), 306 do_newline=True, 307 ) 308 server_job = jobset.JobSpec( 309 cmdline=docker_cmdline, shortname=shortname, timeout_seconds=30 * 60 310 ) 311 server_job.container_name = container_name 312 return server_job 313 314 315def dns_server_in_docker_jobspec( 316 grpclb_ips, 317 fallback_ips, 318 shortname, 319 cause_no_error_no_data_for_balancer_a_record, 320): 321 container_name = dockerjob.random_name(shortname) 322 run_dns_server_cmdline = [ 323 "python", 324 "test/cpp/naming/utils/run_dns_server_for_lb_interop_tests.py", 325 "--grpclb_ips=%s" % ",".join(grpclb_ips), 326 "--fallback_ips=%s" % ",".join(fallback_ips), 327 ] 328 if cause_no_error_no_data_for_balancer_a_record: 329 run_dns_server_cmdline.append( 330 "--cause_no_error_no_data_for_balancer_a_record" 331 ) 332 docker_cmdline = docker_run_cmdline( 333 run_dns_server_cmdline, 334 cwd="/var/local/git/grpc", 335 image=docker_images.get(_FAKE_SERVERS_SAFENAME), 336 docker_args=["--name=%s" % container_name], 337 ) 338 jobset.message( 339 "IDLE", 340 "docker_cmdline:\b|%s|" % " ".join(docker_cmdline), 341 do_newline=True, 342 ) 343 server_job = jobset.JobSpec( 344 cmdline=docker_cmdline, shortname=shortname, timeout_seconds=30 * 60 345 ) 346 server_job.container_name = container_name 347 return server_job 348 349 350def build_interop_image_jobspec(lang_safename, basename_prefix="grpc_interop"): 351 """Creates jobspec for building interop docker image for a language""" 352 tag = "%s_%s:%s" % (basename_prefix, lang_safename, uuid.uuid4()) 353 env = { 354 "INTEROP_IMAGE": tag, 355 "BASE_NAME": "%s_%s" % (basename_prefix, lang_safename), 356 } 357 build_job = jobset.JobSpec( 358 cmdline=["tools/run_tests/dockerize/build_interop_image.sh"], 359 environ=env, 360 shortname="build_docker_%s" % lang_safename, 361 timeout_seconds=30 * 60, 362 ) 363 build_job.tag = tag 364 return build_job 365 366 367argp = argparse.ArgumentParser(description="Run interop tests.") 368argp.add_argument( 369 "-l", 370 "--language", 371 choices=["all"] + sorted(_LANGUAGES), 372 nargs="+", 373 default=["all"], 374 help="Clients to run.", 375) 376argp.add_argument("-j", "--jobs", default=multiprocessing.cpu_count(), type=int) 377argp.add_argument( 378 "-s", 379 "--scenarios_file", 380 default=None, 381 type=str, 382 help="File containing test scenarios as JSON configs.", 383) 384argp.add_argument( 385 "-n", 386 "--scenario_name", 387 default=None, 388 type=str, 389 help=( 390 "Useful for manual runs: specify the name of " 391 "the scenario to run from scenarios_file. Run all scenarios if unset." 392 ), 393) 394argp.add_argument( 395 "--cxx_image_tag", 396 default=None, 397 type=str, 398 help=( 399 "Setting this skips the clients docker image " 400 "build step and runs the client from the named " 401 "image. Only supports running a one client language." 402 ), 403) 404argp.add_argument( 405 "--go_image_tag", 406 default=None, 407 type=str, 408 help=( 409 "Setting this skips the clients docker image build " 410 "step and runs the client from the named image. Only " 411 "supports running a one client language." 412 ), 413) 414argp.add_argument( 415 "--java_image_tag", 416 default=None, 417 type=str, 418 help=( 419 "Setting this skips the clients docker image build " 420 "step and runs the client from the named image. Only " 421 "supports running a one client language." 422 ), 423) 424argp.add_argument( 425 "--servers_image_tag", 426 default=None, 427 type=str, 428 help=( 429 "Setting this skips the fake servers docker image " 430 "build step and runs the servers from the named image." 431 ), 432) 433argp.add_argument( 434 "--no_skips", 435 default=False, 436 type=bool, 437 nargs="?", 438 const=True, 439 help=( 440 "Useful for manual runs. Setting this overrides test " 441 '"skips" configured in test scenarios.' 442 ), 443) 444argp.add_argument( 445 "--verbose", 446 default=False, 447 type=bool, 448 nargs="?", 449 const=True, 450 help="Increase logging.", 451) 452args = argp.parse_args() 453 454docker_images = {} 455 456build_jobs = [] 457if len(args.language) and args.language[0] == "all": 458 languages = list(_LANGUAGES.keys()) 459else: 460 languages = args.language 461for lang_name in languages: 462 l = _LANGUAGES[lang_name] 463 # First check if a pre-built image was supplied, and avoid 464 # rebuilding the particular docker image if so. 465 if lang_name == "c++" and args.cxx_image_tag: 466 docker_images[str(l.safename)] = args.cxx_image_tag 467 elif lang_name == "go" and args.go_image_tag: 468 docker_images[str(l.safename)] = args.go_image_tag 469 elif lang_name == "java" and args.java_image_tag: 470 docker_images[str(l.safename)] = args.java_image_tag 471 else: 472 # Build the test client in docker and save the fully 473 # built image. 474 job = build_interop_image_jobspec(l.safename) 475 build_jobs.append(job) 476 docker_images[str(l.safename)] = job.tag 477 478# First check if a pre-built image was supplied. 479if args.servers_image_tag: 480 docker_images[_FAKE_SERVERS_SAFENAME] = args.servers_image_tag 481else: 482 # Build the test servers in docker and save the fully 483 # built image. 484 job = build_interop_image_jobspec( 485 _FAKE_SERVERS_SAFENAME, basename_prefix="lb_interop" 486 ) 487 build_jobs.append(job) 488 docker_images[_FAKE_SERVERS_SAFENAME] = job.tag 489 490if build_jobs: 491 jobset.message("START", "Building interop docker images.", do_newline=True) 492 print("Jobs to run: \n%s\n" % "\n".join(str(j) for j in build_jobs)) 493 num_failures, _ = jobset.run( 494 build_jobs, newline_on_success=True, maxjobs=args.jobs 495 ) 496 if num_failures == 0: 497 jobset.message( 498 "SUCCESS", "All docker images built successfully.", do_newline=True 499 ) 500 else: 501 jobset.message( 502 "FAILED", "Failed to build interop docker images.", do_newline=True 503 ) 504 sys.exit(1) 505 506 507def wait_until_dns_server_is_up(dns_server_ip): 508 """Probes the DNS server until it's running and safe for tests.""" 509 for i in range(0, 30): 510 print("Health check: attempt to connect to DNS server over TCP.") 511 tcp_connect_subprocess = subprocess.Popen( 512 [ 513 os.path.join( 514 os.getcwd(), "test/cpp/naming/utils/tcp_connect.py" 515 ), 516 "--server_host", 517 dns_server_ip, 518 "--server_port", 519 str(53), 520 "--timeout", 521 str(1), 522 ] 523 ) 524 tcp_connect_subprocess.communicate() 525 if tcp_connect_subprocess.returncode == 0: 526 print( 527 "Health check: attempt to make an A-record query to DNS server." 528 ) 529 dns_resolver_subprocess = subprocess.Popen( 530 [ 531 os.path.join( 532 os.getcwd(), "test/cpp/naming/utils/dns_resolver.py" 533 ), 534 "--qname", 535 ( 536 "health-check-local-dns-server-is-alive." 537 "resolver-tests.grpctestingexp" 538 ), 539 "--server_host", 540 dns_server_ip, 541 "--server_port", 542 str(53), 543 ], 544 stdout=subprocess.PIPE, 545 ) 546 dns_resolver_stdout, _ = dns_resolver_subprocess.communicate() 547 if dns_resolver_subprocess.returncode == 0: 548 if "123.123.123.123" in dns_resolver_stdout: 549 print( 550 "DNS server is up! " 551 "Successfully reached it over UDP and TCP." 552 ) 553 return 554 time.sleep(0.1) 555 raise Exception( 556 "Failed to reach DNS server over TCP and/or UDP. " 557 "Exitting without running tests." 558 ) 559 560 561def shortname(shortname_prefix, shortname, index): 562 return "%s_%s_%d" % (shortname_prefix, shortname, index) 563 564 565def run_one_scenario(scenario_config): 566 jobset.message("START", "Run scenario: %s" % scenario_config["name"]) 567 server_jobs = {} 568 server_addresses = {} 569 suppress_server_logs = True 570 try: 571 backend_addrs = [] 572 fallback_ips = [] 573 grpclb_ips = [] 574 shortname_prefix = scenario_config["name"] 575 # Start backends 576 for i in range(len(scenario_config["backend_configs"])): 577 backend_config = scenario_config["backend_configs"][i] 578 backend_shortname = shortname(shortname_prefix, "backend_server", i) 579 backend_spec = backend_server_jobspec( 580 backend_config["transport_sec"], backend_shortname 581 ) 582 backend_job = dockerjob.DockerJob(backend_spec) 583 server_jobs[backend_shortname] = backend_job 584 backend_addrs.append( 585 "%s:%d" % (backend_job.ip_address(), _BACKEND_SERVER_PORT) 586 ) 587 # Start fallbacks 588 for i in range(len(scenario_config["fallback_configs"])): 589 fallback_config = scenario_config["fallback_configs"][i] 590 fallback_shortname = shortname( 591 shortname_prefix, "fallback_server", i 592 ) 593 fallback_spec = fallback_server_jobspec( 594 fallback_config["transport_sec"], fallback_shortname 595 ) 596 fallback_job = dockerjob.DockerJob(fallback_spec) 597 server_jobs[fallback_shortname] = fallback_job 598 fallback_ips.append(fallback_job.ip_address()) 599 # Start balancers 600 for i in range(len(scenario_config["balancer_configs"])): 601 balancer_config = scenario_config["balancer_configs"][i] 602 grpclb_shortname = shortname(shortname_prefix, "grpclb_server", i) 603 grpclb_spec = grpclb_jobspec( 604 balancer_config["transport_sec"], 605 balancer_config["short_stream"], 606 backend_addrs, 607 grpclb_shortname, 608 ) 609 grpclb_job = dockerjob.DockerJob(grpclb_spec) 610 server_jobs[grpclb_shortname] = grpclb_job 611 grpclb_ips.append(grpclb_job.ip_address()) 612 # Start DNS server 613 dns_server_shortname = shortname(shortname_prefix, "dns_server", 0) 614 dns_server_spec = dns_server_in_docker_jobspec( 615 grpclb_ips, 616 fallback_ips, 617 dns_server_shortname, 618 scenario_config["cause_no_error_no_data_for_balancer_a_record"], 619 ) 620 dns_server_job = dockerjob.DockerJob(dns_server_spec) 621 server_jobs[dns_server_shortname] = dns_server_job 622 # Get the IP address of the docker container running the DNS server. 623 # The DNS server is running on port 53 of that IP address. Note we will 624 # point the DNS resolvers of grpc clients under test to our controlled 625 # DNS server by effectively modifying the /etc/resolve.conf "nameserver" 626 # lists of their docker containers. 627 dns_server_ip = dns_server_job.ip_address() 628 wait_until_dns_server_is_up(dns_server_ip) 629 # Run clients 630 jobs = [] 631 for lang_name in languages: 632 # Skip languages that are known to not currently 633 # work for this test. 634 if not args.no_skips and lang_name in scenario_config.get( 635 "skip_langs", [] 636 ): 637 jobset.message( 638 "IDLE", 639 "Skipping scenario: %s for language: %s\n" 640 % (scenario_config["name"], lang_name), 641 ) 642 continue 643 lang = _LANGUAGES[lang_name] 644 test_job = lb_client_interop_jobspec( 645 lang, 646 dns_server_ip, 647 docker_image=docker_images.get(lang.safename), 648 transport_security=scenario_config["transport_sec"], 649 ) 650 jobs.append(test_job) 651 jobset.message( 652 "IDLE", "Jobs to run: \n%s\n" % "\n".join(str(job) for job in jobs) 653 ) 654 num_failures, resultset = jobset.run( 655 jobs, newline_on_success=True, maxjobs=args.jobs 656 ) 657 report_utils.render_junit_xml_report(resultset, "sponge_log.xml") 658 if num_failures: 659 suppress_server_logs = False 660 jobset.message( 661 "FAILED", 662 "Scenario: %s. Some tests failed" % scenario_config["name"], 663 do_newline=True, 664 ) 665 else: 666 jobset.message( 667 "SUCCESS", 668 "Scenario: %s. All tests passed" % scenario_config["name"], 669 do_newline=True, 670 ) 671 return num_failures 672 finally: 673 # Check if servers are still running. 674 for server, job in list(server_jobs.items()): 675 if not job.is_running(): 676 print('Server "%s" has exited prematurely.' % server) 677 suppress_failure = suppress_server_logs and not args.verbose 678 dockerjob.finish_jobs( 679 [j for j in six.itervalues(server_jobs)], 680 suppress_failure=suppress_failure, 681 ) 682 683 684num_failures = 0 685with open(args.scenarios_file, "r") as scenarios_input: 686 all_scenarios = json.loads(scenarios_input.read()) 687 for scenario in all_scenarios: 688 if args.scenario_name: 689 if args.scenario_name != scenario["name"]: 690 jobset.message( 691 "IDLE", "Skipping scenario: %s" % scenario["name"] 692 ) 693 continue 694 num_failures += run_one_scenario(scenario) 695if num_failures == 0: 696 sys.exit(0) 697else: 698 sys.exit(1) 699