1#!/usr/bin/env python3 2# Copyright 2020 gRPC authors. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15"""Run xDS integration tests on GCP using Traffic Director.""" 16 17import argparse 18import datetime 19import json 20import logging 21import os 22import random 23import re 24import shlex 25import socket 26import subprocess 27import sys 28import tempfile 29import time 30import uuid 31 32from google.protobuf import json_format 33import googleapiclient.discovery 34import grpc 35from oauth2client.client import GoogleCredentials 36 37import python_utils.jobset as jobset 38import python_utils.report_utils as report_utils 39from src.proto.grpc.health.v1 import health_pb2 40from src.proto.grpc.health.v1 import health_pb2_grpc 41from src.proto.grpc.testing import empty_pb2 42from src.proto.grpc.testing import messages_pb2 43from src.proto.grpc.testing import test_pb2_grpc 44 45# Envoy protos provided by PyPI package xds-protos 46# Needs to import the generated Python file to load descriptors 47try: 48 from envoy.extensions.filters.common.fault.v3 import fault_pb2 49 from envoy.extensions.filters.http.fault.v3 import fault_pb2 50 from envoy.extensions.filters.http.router.v3 import router_pb2 51 from envoy.extensions.filters.network.http_connection_manager.v3 import ( 52 http_connection_manager_pb2, 53 ) 54 from envoy.service.status.v3 import csds_pb2 55 from envoy.service.status.v3 import csds_pb2_grpc 56except ImportError: 57 # These protos are required by CSDS test. We should not fail the entire 58 # script for one test case. 59 pass 60 61logger = logging.getLogger() 62console_handler = logging.StreamHandler() 63formatter = logging.Formatter(fmt="%(asctime)s: %(levelname)-8s %(message)s") 64console_handler.setFormatter(formatter) 65logger.handlers = [] 66logger.addHandler(console_handler) 67logger.setLevel(logging.WARNING) 68 69# Suppress excessive logs for gRPC Python 70original_grpc_trace = os.environ.pop("GRPC_TRACE", None) 71original_grpc_verbosity = os.environ.pop("GRPC_VERBOSITY", None) 72# Suppress not-essential logs for GCP clients 73logging.getLogger("google_auth_httplib2").setLevel(logging.WARNING) 74logging.getLogger("googleapiclient.discovery").setLevel(logging.WARNING) 75 76_TEST_CASES = [ 77 "backends_restart", 78 "change_backend_service", 79 "gentle_failover", 80 "load_report_based_failover", 81 "ping_pong", 82 "remove_instance_group", 83 "round_robin", 84 "secondary_locality_gets_no_requests_on_partial_primary_failure", 85 "secondary_locality_gets_requests_on_primary_failure", 86 "traffic_splitting", 87 "path_matching", 88 "header_matching", 89 "api_listener", 90 "forwarding_rule_port_match", 91 "forwarding_rule_default_port", 92 "metadata_filter", 93] 94 95# Valid test cases, but not in all. So the tests can only run manually, and 96# aren't enabled automatically for all languages. 97# 98# TODO: Move them into _TEST_CASES when support is ready in all languages. 99_ADDITIONAL_TEST_CASES = [ 100 "circuit_breaking", 101 "timeout", 102 "fault_injection", 103 "csds", 104] 105 106# Test cases that require the V3 API. Skipped in older runs. 107_V3_TEST_CASES = frozenset(["timeout", "fault_injection", "csds"]) 108 109# Test cases that require the alpha API. Skipped for stable API runs. 110_ALPHA_TEST_CASES = frozenset(["timeout"]) 111 112 113def parse_test_cases(arg): 114 if arg == "": 115 return [] 116 arg_split = arg.split(",") 117 test_cases = set() 118 all_test_cases = _TEST_CASES + _ADDITIONAL_TEST_CASES 119 for arg in arg_split: 120 if arg == "all": 121 test_cases = test_cases.union(_TEST_CASES) 122 else: 123 test_cases = test_cases.union([arg]) 124 if not all([test_case in all_test_cases for test_case in test_cases]): 125 raise Exception("Failed to parse test cases %s" % arg) 126 # Perserve order. 127 return [x for x in all_test_cases if x in test_cases] 128 129 130def parse_port_range(port_arg): 131 try: 132 port = int(port_arg) 133 return list(range(port, port + 1)) 134 except: 135 port_min, port_max = port_arg.split(":") 136 return list(range(int(port_min), int(port_max) + 1)) 137 138 139argp = argparse.ArgumentParser(description="Run xDS interop tests on GCP") 140# TODO(zdapeng): remove default value of project_id and project_num 141argp.add_argument("--project_id", default="grpc-testing", help="GCP project id") 142argp.add_argument( 143 "--project_num", default="830293263384", help="GCP project number" 144) 145argp.add_argument( 146 "--gcp_suffix", 147 default="", 148 help=( 149 "Optional suffix for all generated GCP resource names. Useful to " 150 "ensure distinct names across test runs." 151 ), 152) 153argp.add_argument( 154 "--test_case", 155 default="ping_pong", 156 type=parse_test_cases, 157 help=( 158 "Comma-separated list of test cases to run. Available tests: %s, " 159 "(or 'all' to run every test). " 160 "Alternative tests not included in 'all': %s" 161 ) 162 % (",".join(_TEST_CASES), ",".join(_ADDITIONAL_TEST_CASES)), 163) 164argp.add_argument( 165 "--bootstrap_file", 166 default="", 167 help=( 168 "File to reference via GRPC_XDS_BOOTSTRAP. Disables built-in " 169 "bootstrap generation" 170 ), 171) 172argp.add_argument( 173 "--xds_v3_support", 174 default=False, 175 action="store_true", 176 help=( 177 "Support xDS v3 via GRPC_XDS_EXPERIMENTAL_V3_SUPPORT. " 178 "If a pre-created bootstrap file is provided via the --bootstrap_file " 179 "parameter, it should include xds_v3 in its server_features field." 180 ), 181) 182argp.add_argument( 183 "--client_cmd", 184 default=None, 185 help=( 186 "Command to launch xDS test client. {server_uri}, {stats_port} and" 187 " {qps} references will be replaced using str.format()." 188 " GRPC_XDS_BOOTSTRAP will be set for the command" 189 ), 190) 191argp.add_argument( 192 "--client_hosts", 193 default=None, 194 help=( 195 "Comma-separated list of hosts running client processes. If set," 196 " --client_cmd is ignored and client processes are assumed to be" 197 " running on the specified hosts." 198 ), 199) 200argp.add_argument("--zone", default="us-central1-a") 201argp.add_argument( 202 "--secondary_zone", 203 default="us-west1-b", 204 help="Zone to use for secondary TD locality tests", 205) 206argp.add_argument("--qps", default=100, type=int, help="Client QPS") 207argp.add_argument( 208 "--wait_for_backend_sec", 209 default=1200, 210 type=int, 211 help=( 212 "Time limit for waiting for created backend services to report " 213 "healthy when launching or updated GCP resources" 214 ), 215) 216argp.add_argument( 217 "--use_existing_gcp_resources", 218 default=False, 219 action="store_true", 220 help=( 221 "If set, find and use already created GCP resources instead of creating" 222 " new ones." 223 ), 224) 225argp.add_argument( 226 "--keep_gcp_resources", 227 default=False, 228 action="store_true", 229 help=( 230 "Leave GCP VMs and configuration running after test. Default behavior" 231 " is to delete when tests complete." 232 ), 233) 234argp.add_argument( 235 "--halt_after_fail", 236 action="store_true", 237 help="Halt and save the resources when test failed.", 238) 239argp.add_argument( 240 "--compute_discovery_document", 241 default=None, 242 type=str, 243 help=( 244 "If provided, uses this file instead of retrieving via the GCP" 245 " discovery API" 246 ), 247) 248argp.add_argument( 249 "--alpha_compute_discovery_document", 250 default=None, 251 type=str, 252 help=( 253 "If provided, uses this file instead of retrieving via the alpha GCP " 254 "discovery API" 255 ), 256) 257argp.add_argument( 258 "--network", default="global/networks/default", help="GCP network to use" 259) 260_DEFAULT_PORT_RANGE = "8080:8280" 261argp.add_argument( 262 "--service_port_range", 263 default=_DEFAULT_PORT_RANGE, 264 type=parse_port_range, 265 help=( 266 "Listening port for created gRPC backends. Specified as " 267 "either a single int or as a range in the format min:max, in " 268 "which case an available port p will be chosen s.t. min <= p " 269 "<= max" 270 ), 271) 272argp.add_argument( 273 "--stats_port", 274 default=8079, 275 type=int, 276 help="Local port for the client process to expose the LB stats service", 277) 278argp.add_argument( 279 "--xds_server", 280 default="trafficdirector.googleapis.com:443", 281 help="xDS server", 282) 283argp.add_argument( 284 "--source_image", 285 default="projects/debian-cloud/global/images/family/debian-9", 286 help="Source image for VMs created during the test", 287) 288argp.add_argument( 289 "--path_to_server_binary", 290 default=None, 291 type=str, 292 help=( 293 "If set, the server binary must already be pre-built on " 294 "the specified source image" 295 ), 296) 297argp.add_argument( 298 "--machine_type", 299 default="e2-standard-2", 300 help="Machine type for VMs created during the test", 301) 302argp.add_argument( 303 "--instance_group_size", 304 default=2, 305 type=int, 306 help=( 307 "Number of VMs to create per instance group. Certain test cases (e.g.," 308 " round_robin) may not give meaningful results if this is set to a" 309 " value less than 2." 310 ), 311) 312argp.add_argument( 313 "--verbose", help="verbose log output", default=False, action="store_true" 314) 315# TODO(ericgribkoff) Remove this param once the sponge-formatted log files are 316# visible in all test environments. 317argp.add_argument( 318 "--log_client_output", 319 help="Log captured client output", 320 default=False, 321 action="store_true", 322) 323# TODO(ericgribkoff) Remove this flag once all test environments are verified to 324# have access to the alpha compute APIs. 325argp.add_argument( 326 "--only_stable_gcp_apis", 327 help=( 328 "Do not use alpha compute APIs. Some tests may be " 329 "incompatible with this option (gRPC health checks are " 330 "currently alpha and required for simulating server failure" 331 ), 332 default=False, 333 action="store_true", 334) 335args = argp.parse_args() 336 337if args.verbose: 338 logger.setLevel(logging.DEBUG) 339 340# In grpc-testing, use non-legacy network. 341if ( 342 args.project_id == "grpc-testing" 343 and args.network 344 and args.network == argp.get_default("network") 345): 346 args.network += "-vpc" 347 348CLIENT_HOSTS = [] 349if args.client_hosts: 350 CLIENT_HOSTS = args.client_hosts.split(",") 351 352# Each of the config propagation in the control plane should finish within 600s. 353# Otherwise, it indicates a bug in the control plane. The config propagation 354# includes all kinds of traffic config update, like updating urlMap, creating 355# the resources for the first time, updating BackendService, and changing the 356# status of endpoints in BackendService. 357_WAIT_FOR_URL_MAP_PATCH_SEC = 600 358# In general, fetching load balancing stats only takes ~10s. However, slow 359# config update could lead to empty EDS or similar symptoms causing the 360# connection to hang for a long period of time. So, we want to extend the stats 361# wait time to be the same as urlMap patch time. 362_WAIT_FOR_STATS_SEC = _WAIT_FOR_URL_MAP_PATCH_SEC 363 364_DEFAULT_SERVICE_PORT = 80 365_WAIT_FOR_BACKEND_SEC = args.wait_for_backend_sec 366_WAIT_FOR_OPERATION_SEC = 1200 367_INSTANCE_GROUP_SIZE = args.instance_group_size 368_NUM_TEST_RPCS = 10 * args.qps 369_CONNECTION_TIMEOUT_SEC = 60 370_GCP_API_RETRIES = 5 371_BOOTSTRAP_TEMPLATE = """ 372{{ 373 "node": {{ 374 "id": "{node_id}", 375 "metadata": {{ 376 "TRAFFICDIRECTOR_NETWORK_NAME": "%s", 377 "com.googleapis.trafficdirector.config_time_trace": "TRUE" 378 }}, 379 "locality": {{ 380 "zone": "%s" 381 }} 382 }}, 383 "xds_servers": [{{ 384 "server_uri": "%s", 385 "channel_creds": [ 386 {{ 387 "type": "google_default", 388 "config": {{}} 389 }} 390 ], 391 "server_features": {server_features} 392 }}] 393}}""" % ( 394 args.network.split("/")[-1], 395 args.zone, 396 args.xds_server, 397) 398 399# TODO(ericgribkoff) Add change_backend_service to this list once TD no longer 400# sends an update with no localities when adding the MIG to the backend service 401# can race with the URL map patch. 402_TESTS_TO_FAIL_ON_RPC_FAILURE = ["ping_pong", "round_robin"] 403# Tests that run UnaryCall and EmptyCall. 404_TESTS_TO_RUN_MULTIPLE_RPCS = ["path_matching", "header_matching"] 405# Tests that make UnaryCall with test metadata. 406_TESTS_TO_SEND_METADATA = ["header_matching"] 407_TEST_METADATA_KEY = "xds_md" 408_TEST_METADATA_VALUE_UNARY = "unary_yranu" 409_TEST_METADATA_VALUE_EMPTY = "empty_ytpme" 410# Extra RPC metadata whose value is a number, sent with UnaryCall only. 411_TEST_METADATA_NUMERIC_KEY = "xds_md_numeric" 412_TEST_METADATA_NUMERIC_VALUE = "159" 413_PATH_MATCHER_NAME = "path-matcher" 414_BASE_TEMPLATE_NAME = "test-template" 415_BASE_INSTANCE_GROUP_NAME = "test-ig" 416_BASE_HEALTH_CHECK_NAME = "test-hc" 417_BASE_FIREWALL_RULE_NAME = "test-fw-rule" 418_BASE_BACKEND_SERVICE_NAME = "test-backend-service" 419_BASE_URL_MAP_NAME = "test-map" 420_BASE_SERVICE_HOST = "grpc-test" 421_BASE_TARGET_PROXY_NAME = "test-target-proxy" 422_BASE_FORWARDING_RULE_NAME = "test-forwarding-rule" 423_TEST_LOG_BASE_DIR = os.path.join( 424 os.path.dirname(os.path.abspath(__file__)), "../../reports" 425) 426_SPONGE_LOG_NAME = "sponge_log.log" 427_SPONGE_XML_NAME = "sponge_log.xml" 428 429 430def get_client_stats(num_rpcs, timeout_sec): 431 if CLIENT_HOSTS: 432 hosts = CLIENT_HOSTS 433 else: 434 hosts = ["localhost"] 435 for host in hosts: 436 with grpc.insecure_channel( 437 "%s:%d" % (host, args.stats_port) 438 ) as channel: 439 stub = test_pb2_grpc.LoadBalancerStatsServiceStub(channel) 440 request = messages_pb2.LoadBalancerStatsRequest() 441 request.num_rpcs = num_rpcs 442 request.timeout_sec = timeout_sec 443 rpc_timeout = timeout_sec + _CONNECTION_TIMEOUT_SEC 444 logger.debug( 445 "Invoking GetClientStats RPC to %s:%d:", host, args.stats_port 446 ) 447 response = stub.GetClientStats( 448 request, wait_for_ready=True, timeout=rpc_timeout 449 ) 450 logger.debug( 451 "Invoked GetClientStats RPC to %s: %s", 452 host, 453 json_format.MessageToJson(response), 454 ) 455 return response 456 457 458def get_client_accumulated_stats(): 459 if CLIENT_HOSTS: 460 hosts = CLIENT_HOSTS 461 else: 462 hosts = ["localhost"] 463 for host in hosts: 464 with grpc.insecure_channel( 465 "%s:%d" % (host, args.stats_port) 466 ) as channel: 467 stub = test_pb2_grpc.LoadBalancerStatsServiceStub(channel) 468 request = messages_pb2.LoadBalancerAccumulatedStatsRequest() 469 logger.debug( 470 "Invoking GetClientAccumulatedStats RPC to %s:%d:", 471 host, 472 args.stats_port, 473 ) 474 response = stub.GetClientAccumulatedStats( 475 request, wait_for_ready=True, timeout=_CONNECTION_TIMEOUT_SEC 476 ) 477 logger.debug( 478 "Invoked GetClientAccumulatedStats RPC to %s: %s", 479 host, 480 response, 481 ) 482 return response 483 484 485def get_client_xds_config_dump(): 486 if CLIENT_HOSTS: 487 hosts = CLIENT_HOSTS 488 else: 489 hosts = ["localhost"] 490 for host in hosts: 491 server_address = "%s:%d" % (host, args.stats_port) 492 with grpc.insecure_channel(server_address) as channel: 493 stub = csds_pb2_grpc.ClientStatusDiscoveryServiceStub(channel) 494 logger.debug("Fetching xDS config dump from %s", server_address) 495 response = stub.FetchClientStatus( 496 csds_pb2.ClientStatusRequest(), 497 wait_for_ready=True, 498 timeout=_CONNECTION_TIMEOUT_SEC, 499 ) 500 logger.debug("Fetched xDS config dump from %s", server_address) 501 if len(response.config) != 1: 502 logger.error( 503 "Unexpected number of ClientConfigs %d: %s", 504 len(response.config), 505 response, 506 ) 507 return None 508 else: 509 # Converting the ClientStatusResponse into JSON, because many 510 # fields are packed in google.protobuf.Any. It will require many 511 # duplicated code to unpack proto message and inspect values. 512 return json_format.MessageToDict( 513 response.config[0], preserving_proto_field_name=True 514 ) 515 516 517def configure_client(rpc_types, metadata=[], timeout_sec=None): 518 if CLIENT_HOSTS: 519 hosts = CLIENT_HOSTS 520 else: 521 hosts = ["localhost"] 522 for host in hosts: 523 with grpc.insecure_channel( 524 "%s:%d" % (host, args.stats_port) 525 ) as channel: 526 stub = test_pb2_grpc.XdsUpdateClientConfigureServiceStub(channel) 527 request = messages_pb2.ClientConfigureRequest() 528 request.types.extend(rpc_types) 529 for rpc_type, md_key, md_value in metadata: 530 md = request.metadata.add() 531 md.type = rpc_type 532 md.key = md_key 533 md.value = md_value 534 if timeout_sec: 535 request.timeout_sec = timeout_sec 536 logger.debug( 537 "Invoking XdsUpdateClientConfigureService RPC to %s:%d: %s", 538 host, 539 args.stats_port, 540 request, 541 ) 542 stub.Configure( 543 request, wait_for_ready=True, timeout=_CONNECTION_TIMEOUT_SEC 544 ) 545 logger.debug( 546 "Invoked XdsUpdateClientConfigureService RPC to %s", host 547 ) 548 549 550class RpcDistributionError(Exception): 551 pass 552 553 554def _verify_rpcs_to_given_backends( 555 backends, timeout_sec, num_rpcs, allow_failures 556): 557 start_time = time.time() 558 error_msg = None 559 logger.debug( 560 "Waiting for %d sec until backends %s receive load" 561 % (timeout_sec, backends) 562 ) 563 while time.time() - start_time <= timeout_sec: 564 error_msg = None 565 stats = get_client_stats(num_rpcs, timeout_sec) 566 rpcs_by_peer = stats.rpcs_by_peer 567 for backend in backends: 568 if backend not in rpcs_by_peer: 569 error_msg = "Backend %s did not receive load" % backend 570 break 571 if not error_msg and len(rpcs_by_peer) > len(backends): 572 error_msg = "Unexpected backend received load: %s" % rpcs_by_peer 573 if not allow_failures and stats.num_failures > 0: 574 error_msg = "%d RPCs failed" % stats.num_failures 575 if not error_msg: 576 return 577 raise RpcDistributionError(error_msg) 578 579 580def wait_until_all_rpcs_go_to_given_backends_or_fail( 581 backends, timeout_sec, num_rpcs=_NUM_TEST_RPCS 582): 583 _verify_rpcs_to_given_backends( 584 backends, timeout_sec, num_rpcs, allow_failures=True 585 ) 586 587 588def wait_until_all_rpcs_go_to_given_backends( 589 backends, timeout_sec, num_rpcs=_NUM_TEST_RPCS 590): 591 _verify_rpcs_to_given_backends( 592 backends, timeout_sec, num_rpcs, allow_failures=False 593 ) 594 595 596def wait_until_no_rpcs_go_to_given_backends(backends, timeout_sec): 597 start_time = time.time() 598 while time.time() - start_time <= timeout_sec: 599 stats = get_client_stats(_NUM_TEST_RPCS, timeout_sec) 600 error_msg = None 601 rpcs_by_peer = stats.rpcs_by_peer 602 for backend in backends: 603 if backend in rpcs_by_peer: 604 error_msg = "Unexpected backend %s receives load" % backend 605 break 606 if not error_msg: 607 return 608 raise Exception("Unexpected RPCs going to given backends") 609 610 611def wait_until_rpcs_in_flight(rpc_type, timeout_sec, num_rpcs, threshold): 612 """Block until the test client reaches the state with the given number 613 of RPCs being outstanding stably. 614 615 Args: 616 rpc_type: A string indicating the RPC method to check for. Either 617 'UnaryCall' or 'EmptyCall'. 618 timeout_sec: Maximum number of seconds to wait until the desired state 619 is reached. 620 num_rpcs: Expected number of RPCs to be in-flight. 621 threshold: Number within [0,100], the tolerable percentage by which 622 the actual number of RPCs in-flight can differ from the expected number. 623 """ 624 if threshold < 0 or threshold > 100: 625 raise ValueError("Value error: Threshold should be between 0 to 100") 626 threshold_fraction = threshold / 100.0 627 start_time = time.time() 628 error_msg = None 629 logger.debug( 630 "Waiting for %d sec until %d %s RPCs (with %d%% tolerance) in-flight" 631 % (timeout_sec, num_rpcs, rpc_type, threshold) 632 ) 633 while time.time() - start_time <= timeout_sec: 634 error_msg = _check_rpcs_in_flight( 635 rpc_type, num_rpcs, threshold, threshold_fraction 636 ) 637 if error_msg: 638 logger.debug("Progress: %s", error_msg) 639 time.sleep(2) 640 else: 641 break 642 # Ensure the number of outstanding RPCs is stable. 643 if not error_msg: 644 time.sleep(5) 645 error_msg = _check_rpcs_in_flight( 646 rpc_type, num_rpcs, threshold, threshold_fraction 647 ) 648 if error_msg: 649 raise Exception( 650 "Wrong number of %s RPCs in-flight: %s" % (rpc_type, error_msg) 651 ) 652 653 654def _check_rpcs_in_flight(rpc_type, num_rpcs, threshold, threshold_fraction): 655 error_msg = None 656 stats = get_client_accumulated_stats() 657 rpcs_started = stats.num_rpcs_started_by_method[rpc_type] 658 rpcs_succeeded = stats.num_rpcs_succeeded_by_method[rpc_type] 659 rpcs_failed = stats.num_rpcs_failed_by_method[rpc_type] 660 rpcs_in_flight = rpcs_started - rpcs_succeeded - rpcs_failed 661 if rpcs_in_flight < (num_rpcs * (1 - threshold_fraction)): 662 error_msg = "actual(%d) < expected(%d - %d%%)" % ( 663 rpcs_in_flight, 664 num_rpcs, 665 threshold, 666 ) 667 elif rpcs_in_flight > (num_rpcs * (1 + threshold_fraction)): 668 error_msg = "actual(%d) > expected(%d + %d%%)" % ( 669 rpcs_in_flight, 670 num_rpcs, 671 threshold, 672 ) 673 return error_msg 674 675 676def compare_distributions( 677 actual_distribution, expected_distribution, threshold 678): 679 """Compare if two distributions are similar. 680 681 Args: 682 actual_distribution: A list of floats, contains the actual distribution. 683 expected_distribution: A list of floats, contains the expected distribution. 684 threshold: Number within [0,100], the threshold percentage by which the 685 actual distribution can differ from the expected distribution. 686 687 Returns: 688 The similarity between the distributions as a boolean. Returns true if the 689 actual distribution lies within the threshold of the expected 690 distribution, false otherwise. 691 692 Raises: 693 ValueError: if threshold is not with in [0,100]. 694 Exception: containing detailed error messages. 695 """ 696 if len(expected_distribution) != len(actual_distribution): 697 raise Exception( 698 "Error: expected and actual distributions have different size (%d" 699 " vs %d)" % (len(expected_distribution), len(actual_distribution)) 700 ) 701 if threshold < 0 or threshold > 100: 702 raise ValueError("Value error: Threshold should be between 0 to 100") 703 threshold_fraction = threshold / 100.0 704 for expected, actual in zip(expected_distribution, actual_distribution): 705 if actual < (expected * (1 - threshold_fraction)): 706 raise Exception( 707 "actual(%f) < expected(%f-%d%%)" % (actual, expected, threshold) 708 ) 709 if actual > (expected * (1 + threshold_fraction)): 710 raise Exception( 711 "actual(%f) > expected(%f+%d%%)" % (actual, expected, threshold) 712 ) 713 return True 714 715 716def compare_expected_instances(stats, expected_instances): 717 """Compare if stats have expected instances for each type of RPC. 718 719 Args: 720 stats: LoadBalancerStatsResponse reported by interop client. 721 expected_instances: a dict with key as the RPC type (string), value as 722 the expected backend instances (list of strings). 723 724 Returns: 725 Returns true if the instances are expected. False if not. 726 """ 727 for rpc_type, expected_peers in list(expected_instances.items()): 728 rpcs_by_peer_for_type = stats.rpcs_by_method[rpc_type] 729 rpcs_by_peer = ( 730 rpcs_by_peer_for_type.rpcs_by_peer 731 if rpcs_by_peer_for_type 732 else None 733 ) 734 logger.debug("rpc: %s, by_peer: %s", rpc_type, rpcs_by_peer) 735 peers = list(rpcs_by_peer.keys()) 736 if set(peers) != set(expected_peers): 737 logger.info( 738 "unexpected peers for %s, got %s, want %s", 739 rpc_type, 740 peers, 741 expected_peers, 742 ) 743 return False 744 return True 745 746 747def test_backends_restart(gcp, backend_service, instance_group): 748 logger.info("Running test_backends_restart") 749 instance_names = get_instance_names(gcp, instance_group) 750 num_instances = len(instance_names) 751 start_time = time.time() 752 wait_until_all_rpcs_go_to_given_backends( 753 instance_names, _WAIT_FOR_STATS_SEC 754 ) 755 try: 756 resize_instance_group(gcp, instance_group, 0) 757 wait_until_all_rpcs_go_to_given_backends_or_fail( 758 [], _WAIT_FOR_BACKEND_SEC 759 ) 760 finally: 761 resize_instance_group(gcp, instance_group, num_instances) 762 wait_for_healthy_backends(gcp, backend_service, instance_group) 763 new_instance_names = get_instance_names(gcp, instance_group) 764 wait_until_all_rpcs_go_to_given_backends( 765 new_instance_names, _WAIT_FOR_BACKEND_SEC 766 ) 767 768 769def test_change_backend_service( 770 gcp, 771 original_backend_service, 772 instance_group, 773 alternate_backend_service, 774 same_zone_instance_group, 775): 776 logger.info("Running test_change_backend_service") 777 original_backend_instances = get_instance_names(gcp, instance_group) 778 alternate_backend_instances = get_instance_names( 779 gcp, same_zone_instance_group 780 ) 781 patch_backend_service( 782 gcp, alternate_backend_service, [same_zone_instance_group] 783 ) 784 wait_for_healthy_backends(gcp, original_backend_service, instance_group) 785 wait_for_healthy_backends( 786 gcp, alternate_backend_service, same_zone_instance_group 787 ) 788 wait_until_all_rpcs_go_to_given_backends( 789 original_backend_instances, _WAIT_FOR_STATS_SEC 790 ) 791 passed = True 792 try: 793 patch_url_map_backend_service(gcp, alternate_backend_service) 794 wait_until_all_rpcs_go_to_given_backends( 795 alternate_backend_instances, _WAIT_FOR_URL_MAP_PATCH_SEC 796 ) 797 except Exception: 798 passed = False 799 raise 800 finally: 801 if passed or not args.halt_after_fail: 802 patch_url_map_backend_service(gcp, original_backend_service) 803 patch_backend_service(gcp, alternate_backend_service, []) 804 805 806def test_gentle_failover( 807 gcp, 808 backend_service, 809 primary_instance_group, 810 secondary_instance_group, 811 swapped_primary_and_secondary=False, 812): 813 logger.info("Running test_gentle_failover") 814 num_primary_instances = len(get_instance_names(gcp, primary_instance_group)) 815 min_instances_for_gentle_failover = 3 # Need >50% failure to start failover 816 passed = True 817 try: 818 if num_primary_instances < min_instances_for_gentle_failover: 819 resize_instance_group( 820 gcp, primary_instance_group, min_instances_for_gentle_failover 821 ) 822 patch_backend_service( 823 gcp, 824 backend_service, 825 [primary_instance_group, secondary_instance_group], 826 ) 827 primary_instance_names = get_instance_names(gcp, primary_instance_group) 828 secondary_instance_names = get_instance_names( 829 gcp, secondary_instance_group 830 ) 831 wait_for_healthy_backends(gcp, backend_service, primary_instance_group) 832 wait_for_healthy_backends( 833 gcp, backend_service, secondary_instance_group 834 ) 835 wait_until_all_rpcs_go_to_given_backends( 836 primary_instance_names, _WAIT_FOR_STATS_SEC 837 ) 838 instances_to_stop = primary_instance_names[:-1] 839 remaining_instances = primary_instance_names[-1:] 840 try: 841 set_serving_status( 842 instances_to_stop, gcp.service_port, serving=False 843 ) 844 wait_until_all_rpcs_go_to_given_backends( 845 remaining_instances + secondary_instance_names, 846 _WAIT_FOR_BACKEND_SEC, 847 ) 848 finally: 849 set_serving_status( 850 primary_instance_names, gcp.service_port, serving=True 851 ) 852 except RpcDistributionError as e: 853 if not swapped_primary_and_secondary and is_primary_instance_group( 854 gcp, secondary_instance_group 855 ): 856 # Swap expectation of primary and secondary instance groups. 857 test_gentle_failover( 858 gcp, 859 backend_service, 860 secondary_instance_group, 861 primary_instance_group, 862 swapped_primary_and_secondary=True, 863 ) 864 else: 865 passed = False 866 raise e 867 except Exception: 868 passed = False 869 raise 870 finally: 871 if passed or not args.halt_after_fail: 872 patch_backend_service( 873 gcp, backend_service, [primary_instance_group] 874 ) 875 resize_instance_group( 876 gcp, primary_instance_group, num_primary_instances 877 ) 878 instance_names = get_instance_names(gcp, primary_instance_group) 879 wait_until_all_rpcs_go_to_given_backends( 880 instance_names, _WAIT_FOR_BACKEND_SEC 881 ) 882 883 884def test_load_report_based_failover( 885 gcp, backend_service, primary_instance_group, secondary_instance_group 886): 887 logger.info("Running test_load_report_based_failover") 888 passed = True 889 try: 890 patch_backend_service( 891 gcp, 892 backend_service, 893 [primary_instance_group, secondary_instance_group], 894 ) 895 primary_instance_names = get_instance_names(gcp, primary_instance_group) 896 secondary_instance_names = get_instance_names( 897 gcp, secondary_instance_group 898 ) 899 wait_for_healthy_backends(gcp, backend_service, primary_instance_group) 900 wait_for_healthy_backends( 901 gcp, backend_service, secondary_instance_group 902 ) 903 wait_until_all_rpcs_go_to_given_backends( 904 primary_instance_names, _WAIT_FOR_STATS_SEC 905 ) 906 # Set primary locality's balance mode to RATE, and RPS to 20% of the 907 # client's QPS. The secondary locality will be used. 908 max_rate = int(args.qps * 1 / 5) 909 logger.info( 910 "Patching backend service to RATE with %d max_rate", max_rate 911 ) 912 patch_backend_service( 913 gcp, 914 backend_service, 915 [primary_instance_group, secondary_instance_group], 916 balancing_mode="RATE", 917 max_rate=max_rate, 918 ) 919 wait_until_all_rpcs_go_to_given_backends( 920 primary_instance_names + secondary_instance_names, 921 _WAIT_FOR_BACKEND_SEC, 922 ) 923 924 # Set primary locality's balance mode to RATE, and RPS to 120% of the 925 # client's QPS. Only the primary locality will be used. 926 max_rate = int(args.qps * 6 / 5) 927 logger.info( 928 "Patching backend service to RATE with %d max_rate", max_rate 929 ) 930 patch_backend_service( 931 gcp, 932 backend_service, 933 [primary_instance_group, secondary_instance_group], 934 balancing_mode="RATE", 935 max_rate=max_rate, 936 ) 937 wait_until_all_rpcs_go_to_given_backends( 938 primary_instance_names, _WAIT_FOR_BACKEND_SEC 939 ) 940 logger.info("success") 941 except Exception: 942 passed = False 943 raise 944 finally: 945 if passed or not args.halt_after_fail: 946 patch_backend_service( 947 gcp, backend_service, [primary_instance_group] 948 ) 949 instance_names = get_instance_names(gcp, primary_instance_group) 950 wait_until_all_rpcs_go_to_given_backends( 951 instance_names, _WAIT_FOR_BACKEND_SEC 952 ) 953 954 955def test_ping_pong(gcp, backend_service, instance_group): 956 logger.info("Running test_ping_pong") 957 wait_for_healthy_backends(gcp, backend_service, instance_group) 958 instance_names = get_instance_names(gcp, instance_group) 959 wait_until_all_rpcs_go_to_given_backends( 960 instance_names, _WAIT_FOR_STATS_SEC 961 ) 962 963 964def test_remove_instance_group( 965 gcp, backend_service, instance_group, same_zone_instance_group 966): 967 logger.info("Running test_remove_instance_group") 968 passed = True 969 try: 970 patch_backend_service( 971 gcp, 972 backend_service, 973 [instance_group, same_zone_instance_group], 974 balancing_mode="RATE", 975 ) 976 wait_for_healthy_backends(gcp, backend_service, instance_group) 977 wait_for_healthy_backends( 978 gcp, backend_service, same_zone_instance_group 979 ) 980 instance_names = get_instance_names(gcp, instance_group) 981 same_zone_instance_names = get_instance_names( 982 gcp, same_zone_instance_group 983 ) 984 try: 985 wait_until_all_rpcs_go_to_given_backends( 986 instance_names + same_zone_instance_names, 987 _WAIT_FOR_OPERATION_SEC, 988 ) 989 remaining_instance_group = same_zone_instance_group 990 remaining_instance_names = same_zone_instance_names 991 except RpcDistributionError as e: 992 # If connected to TD in a different zone, we may route traffic to 993 # only one instance group. Determine which group that is to continue 994 # with the remainder of the test case. 995 try: 996 wait_until_all_rpcs_go_to_given_backends( 997 instance_names, _WAIT_FOR_STATS_SEC 998 ) 999 remaining_instance_group = same_zone_instance_group 1000 remaining_instance_names = same_zone_instance_names 1001 except RpcDistributionError as e: 1002 wait_until_all_rpcs_go_to_given_backends( 1003 same_zone_instance_names, _WAIT_FOR_STATS_SEC 1004 ) 1005 remaining_instance_group = instance_group 1006 remaining_instance_names = instance_names 1007 patch_backend_service( 1008 gcp, 1009 backend_service, 1010 [remaining_instance_group], 1011 balancing_mode="RATE", 1012 ) 1013 wait_until_all_rpcs_go_to_given_backends( 1014 remaining_instance_names, _WAIT_FOR_BACKEND_SEC 1015 ) 1016 except Exception: 1017 passed = False 1018 raise 1019 finally: 1020 if passed or not args.halt_after_fail: 1021 patch_backend_service(gcp, backend_service, [instance_group]) 1022 wait_until_all_rpcs_go_to_given_backends( 1023 instance_names, _WAIT_FOR_BACKEND_SEC 1024 ) 1025 1026 1027def test_round_robin(gcp, backend_service, instance_group): 1028 logger.info("Running test_round_robin") 1029 wait_for_healthy_backends(gcp, backend_service, instance_group) 1030 instance_names = get_instance_names(gcp, instance_group) 1031 threshold = 1 1032 wait_until_all_rpcs_go_to_given_backends( 1033 instance_names, _WAIT_FOR_STATS_SEC 1034 ) 1035 # TODO(ericgribkoff) Delayed config propagation from earlier tests 1036 # may result in briefly receiving an empty EDS update, resulting in failed 1037 # RPCs. Retry distribution validation if this occurs; long-term fix is 1038 # creating new backend resources for each individual test case. 1039 # Each attempt takes 10 seconds. Config propagation can take several 1040 # minutes. 1041 max_attempts = 40 1042 for i in range(max_attempts): 1043 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) 1044 requests_received = [stats.rpcs_by_peer[x] for x in stats.rpcs_by_peer] 1045 total_requests_received = sum(requests_received) 1046 if total_requests_received != _NUM_TEST_RPCS: 1047 logger.info("Unexpected RPC failures, retrying: %s", stats) 1048 continue 1049 expected_requests = total_requests_received / len(instance_names) 1050 for instance in instance_names: 1051 if ( 1052 abs(stats.rpcs_by_peer[instance] - expected_requests) 1053 > threshold 1054 ): 1055 raise Exception( 1056 "RPC peer distribution differs from expected by more than" 1057 " %d for instance %s (%s)" % (threshold, instance, stats) 1058 ) 1059 return 1060 raise Exception("RPC failures persisted through %d retries" % max_attempts) 1061 1062 1063def test_secondary_locality_gets_no_requests_on_partial_primary_failure( 1064 gcp, 1065 backend_service, 1066 primary_instance_group, 1067 secondary_instance_group, 1068 swapped_primary_and_secondary=False, 1069): 1070 logger.info( 1071 "Running secondary_locality_gets_no_requests_on_partial_primary_failure" 1072 ) 1073 passed = True 1074 try: 1075 patch_backend_service( 1076 gcp, 1077 backend_service, 1078 [primary_instance_group, secondary_instance_group], 1079 ) 1080 wait_for_healthy_backends(gcp, backend_service, primary_instance_group) 1081 wait_for_healthy_backends( 1082 gcp, backend_service, secondary_instance_group 1083 ) 1084 primary_instance_names = get_instance_names(gcp, primary_instance_group) 1085 wait_until_all_rpcs_go_to_given_backends( 1086 primary_instance_names, _WAIT_FOR_STATS_SEC 1087 ) 1088 instances_to_stop = primary_instance_names[:1] 1089 remaining_instances = primary_instance_names[1:] 1090 try: 1091 set_serving_status( 1092 instances_to_stop, gcp.service_port, serving=False 1093 ) 1094 wait_until_all_rpcs_go_to_given_backends( 1095 remaining_instances, _WAIT_FOR_BACKEND_SEC 1096 ) 1097 finally: 1098 set_serving_status( 1099 primary_instance_names, gcp.service_port, serving=True 1100 ) 1101 except RpcDistributionError as e: 1102 if not swapped_primary_and_secondary and is_primary_instance_group( 1103 gcp, secondary_instance_group 1104 ): 1105 # Swap expectation of primary and secondary instance groups. 1106 test_secondary_locality_gets_no_requests_on_partial_primary_failure( 1107 gcp, 1108 backend_service, 1109 secondary_instance_group, 1110 primary_instance_group, 1111 swapped_primary_and_secondary=True, 1112 ) 1113 else: 1114 passed = False 1115 raise e 1116 finally: 1117 if passed or not args.halt_after_fail: 1118 patch_backend_service( 1119 gcp, backend_service, [primary_instance_group] 1120 ) 1121 1122 1123def test_secondary_locality_gets_requests_on_primary_failure( 1124 gcp, 1125 backend_service, 1126 primary_instance_group, 1127 secondary_instance_group, 1128 swapped_primary_and_secondary=False, 1129): 1130 logger.info("Running secondary_locality_gets_requests_on_primary_failure") 1131 passed = True 1132 try: 1133 patch_backend_service( 1134 gcp, 1135 backend_service, 1136 [primary_instance_group, secondary_instance_group], 1137 ) 1138 wait_for_healthy_backends(gcp, backend_service, primary_instance_group) 1139 wait_for_healthy_backends( 1140 gcp, backend_service, secondary_instance_group 1141 ) 1142 primary_instance_names = get_instance_names(gcp, primary_instance_group) 1143 secondary_instance_names = get_instance_names( 1144 gcp, secondary_instance_group 1145 ) 1146 wait_until_all_rpcs_go_to_given_backends( 1147 primary_instance_names, _WAIT_FOR_STATS_SEC 1148 ) 1149 try: 1150 set_serving_status( 1151 primary_instance_names, gcp.service_port, serving=False 1152 ) 1153 wait_until_all_rpcs_go_to_given_backends( 1154 secondary_instance_names, _WAIT_FOR_BACKEND_SEC 1155 ) 1156 finally: 1157 set_serving_status( 1158 primary_instance_names, gcp.service_port, serving=True 1159 ) 1160 except RpcDistributionError as e: 1161 if not swapped_primary_and_secondary and is_primary_instance_group( 1162 gcp, secondary_instance_group 1163 ): 1164 # Swap expectation of primary and secondary instance groups. 1165 test_secondary_locality_gets_requests_on_primary_failure( 1166 gcp, 1167 backend_service, 1168 secondary_instance_group, 1169 primary_instance_group, 1170 swapped_primary_and_secondary=True, 1171 ) 1172 else: 1173 passed = False 1174 raise e 1175 finally: 1176 if passed or not args.halt_after_fail: 1177 patch_backend_service( 1178 gcp, backend_service, [primary_instance_group] 1179 ) 1180 1181 1182def prepare_services_for_urlmap_tests( 1183 gcp, 1184 original_backend_service, 1185 instance_group, 1186 alternate_backend_service, 1187 same_zone_instance_group, 1188): 1189 """ 1190 This function prepares the services to be ready for tests that modifies 1191 urlmaps. 1192 1193 Returns: 1194 Returns original and alternate backend names as lists of strings. 1195 """ 1196 logger.info("waiting for original backends to become healthy") 1197 wait_for_healthy_backends(gcp, original_backend_service, instance_group) 1198 1199 patch_backend_service( 1200 gcp, alternate_backend_service, [same_zone_instance_group] 1201 ) 1202 logger.info("waiting for alternate to become healthy") 1203 wait_for_healthy_backends( 1204 gcp, alternate_backend_service, same_zone_instance_group 1205 ) 1206 1207 original_backend_instances = get_instance_names(gcp, instance_group) 1208 logger.info("original backends instances: %s", original_backend_instances) 1209 1210 alternate_backend_instances = get_instance_names( 1211 gcp, same_zone_instance_group 1212 ) 1213 logger.info("alternate backends instances: %s", alternate_backend_instances) 1214 1215 # Start with all traffic going to original_backend_service. 1216 logger.info("waiting for traffic to all go to original backends") 1217 wait_until_all_rpcs_go_to_given_backends( 1218 original_backend_instances, _WAIT_FOR_STATS_SEC 1219 ) 1220 return original_backend_instances, alternate_backend_instances 1221 1222 1223def test_metadata_filter( 1224 gcp, 1225 original_backend_service, 1226 instance_group, 1227 alternate_backend_service, 1228 same_zone_instance_group, 1229): 1230 logger.info("Running test_metadata_filter") 1231 wait_for_healthy_backends(gcp, original_backend_service, instance_group) 1232 original_backend_instances = get_instance_names(gcp, instance_group) 1233 alternate_backend_instances = get_instance_names( 1234 gcp, same_zone_instance_group 1235 ) 1236 patch_backend_service( 1237 gcp, alternate_backend_service, [same_zone_instance_group] 1238 ) 1239 wait_for_healthy_backends( 1240 gcp, alternate_backend_service, same_zone_instance_group 1241 ) 1242 passed = True 1243 try: 1244 with open(bootstrap_path) as f: 1245 md = json.load(f)["node"]["metadata"] 1246 match_labels = [] 1247 for k, v in list(md.items()): 1248 match_labels.append({"name": k, "value": v}) 1249 1250 not_match_labels = [{"name": "fake", "value": "fail"}] 1251 test_route_rules = [ 1252 # test MATCH_ALL 1253 [ 1254 { 1255 "priority": 0, 1256 "matchRules": [ 1257 { 1258 "prefixMatch": "/", 1259 "metadataFilters": [ 1260 { 1261 "filterMatchCriteria": "MATCH_ALL", 1262 "filterLabels": not_match_labels, 1263 } 1264 ], 1265 } 1266 ], 1267 "service": original_backend_service.url, 1268 }, 1269 { 1270 "priority": 1, 1271 "matchRules": [ 1272 { 1273 "prefixMatch": "/", 1274 "metadataFilters": [ 1275 { 1276 "filterMatchCriteria": "MATCH_ALL", 1277 "filterLabels": match_labels, 1278 } 1279 ], 1280 } 1281 ], 1282 "service": alternate_backend_service.url, 1283 }, 1284 ], 1285 # test mixing MATCH_ALL and MATCH_ANY 1286 # test MATCH_ALL: super set labels won't match 1287 [ 1288 { 1289 "priority": 0, 1290 "matchRules": [ 1291 { 1292 "prefixMatch": "/", 1293 "metadataFilters": [ 1294 { 1295 "filterMatchCriteria": "MATCH_ALL", 1296 "filterLabels": not_match_labels 1297 + match_labels, 1298 } 1299 ], 1300 } 1301 ], 1302 "service": original_backend_service.url, 1303 }, 1304 { 1305 "priority": 1, 1306 "matchRules": [ 1307 { 1308 "prefixMatch": "/", 1309 "metadataFilters": [ 1310 { 1311 "filterMatchCriteria": "MATCH_ANY", 1312 "filterLabels": not_match_labels 1313 + match_labels, 1314 } 1315 ], 1316 } 1317 ], 1318 "service": alternate_backend_service.url, 1319 }, 1320 ], 1321 # test MATCH_ANY 1322 [ 1323 { 1324 "priority": 0, 1325 "matchRules": [ 1326 { 1327 "prefixMatch": "/", 1328 "metadataFilters": [ 1329 { 1330 "filterMatchCriteria": "MATCH_ANY", 1331 "filterLabels": not_match_labels, 1332 } 1333 ], 1334 } 1335 ], 1336 "service": original_backend_service.url, 1337 }, 1338 { 1339 "priority": 1, 1340 "matchRules": [ 1341 { 1342 "prefixMatch": "/", 1343 "metadataFilters": [ 1344 { 1345 "filterMatchCriteria": "MATCH_ANY", 1346 "filterLabels": not_match_labels 1347 + match_labels, 1348 } 1349 ], 1350 } 1351 ], 1352 "service": alternate_backend_service.url, 1353 }, 1354 ], 1355 # test match multiple route rules 1356 [ 1357 { 1358 "priority": 0, 1359 "matchRules": [ 1360 { 1361 "prefixMatch": "/", 1362 "metadataFilters": [ 1363 { 1364 "filterMatchCriteria": "MATCH_ANY", 1365 "filterLabels": match_labels, 1366 } 1367 ], 1368 } 1369 ], 1370 "service": alternate_backend_service.url, 1371 }, 1372 { 1373 "priority": 1, 1374 "matchRules": [ 1375 { 1376 "prefixMatch": "/", 1377 "metadataFilters": [ 1378 { 1379 "filterMatchCriteria": "MATCH_ALL", 1380 "filterLabels": match_labels, 1381 } 1382 ], 1383 } 1384 ], 1385 "service": original_backend_service.url, 1386 }, 1387 ], 1388 ] 1389 1390 for route_rules in test_route_rules: 1391 wait_until_all_rpcs_go_to_given_backends( 1392 original_backend_instances, _WAIT_FOR_STATS_SEC 1393 ) 1394 patch_url_map_backend_service( 1395 gcp, original_backend_service, route_rules=route_rules 1396 ) 1397 wait_until_no_rpcs_go_to_given_backends( 1398 original_backend_instances, _WAIT_FOR_STATS_SEC 1399 ) 1400 wait_until_all_rpcs_go_to_given_backends( 1401 alternate_backend_instances, _WAIT_FOR_STATS_SEC 1402 ) 1403 patch_url_map_backend_service(gcp, original_backend_service) 1404 except Exception: 1405 passed = False 1406 raise 1407 finally: 1408 if passed or not args.halt_after_fail: 1409 patch_backend_service(gcp, alternate_backend_service, []) 1410 1411 1412def test_api_listener( 1413 gcp, backend_service, instance_group, alternate_backend_service 1414): 1415 logger.info("Running api_listener") 1416 passed = True 1417 try: 1418 wait_for_healthy_backends(gcp, backend_service, instance_group) 1419 backend_instances = get_instance_names(gcp, instance_group) 1420 wait_until_all_rpcs_go_to_given_backends( 1421 backend_instances, _WAIT_FOR_STATS_SEC 1422 ) 1423 # create a second suite of map+tp+fr with the same host name in host rule 1424 # and we have to disable proxyless validation because it needs `0.0.0.0` 1425 # ip address in fr for proxyless and also we violate ip:port uniqueness 1426 # for test purpose. See https://github.com/grpc/grpc-java/issues/8009 1427 new_config_suffix = "2" 1428 url_map_2 = create_url_map( 1429 gcp, 1430 url_map_name + new_config_suffix, 1431 backend_service, 1432 service_host_name, 1433 ) 1434 target_proxy_2 = create_target_proxy( 1435 gcp, target_proxy_name + new_config_suffix, False, url_map_2 1436 ) 1437 if not gcp.service_port: 1438 raise Exception( 1439 "Faied to find a valid port for the forwarding rule" 1440 ) 1441 potential_ip_addresses = [] 1442 max_attempts = 10 1443 for i in range(max_attempts): 1444 potential_ip_addresses.append( 1445 "10.10.10.%d" % (random.randint(0, 255)) 1446 ) 1447 create_global_forwarding_rule( 1448 gcp, 1449 forwarding_rule_name + new_config_suffix, 1450 [gcp.service_port], 1451 potential_ip_addresses, 1452 target_proxy_2, 1453 ) 1454 if gcp.service_port != _DEFAULT_SERVICE_PORT: 1455 patch_url_map_host_rule_with_port( 1456 gcp, 1457 url_map_name + new_config_suffix, 1458 backend_service, 1459 service_host_name, 1460 ) 1461 wait_until_all_rpcs_go_to_given_backends( 1462 backend_instances, _WAIT_FOR_STATS_SEC 1463 ) 1464 1465 delete_global_forwarding_rule(gcp, gcp.global_forwarding_rules[0]) 1466 delete_target_proxy(gcp, gcp.target_proxies[0]) 1467 delete_url_map(gcp, gcp.url_maps[0]) 1468 verify_attempts = int( 1469 _WAIT_FOR_URL_MAP_PATCH_SEC / _NUM_TEST_RPCS * args.qps 1470 ) 1471 for i in range(verify_attempts): 1472 wait_until_all_rpcs_go_to_given_backends( 1473 backend_instances, _WAIT_FOR_STATS_SEC 1474 ) 1475 # delete host rule for the original host name 1476 patch_url_map_backend_service(gcp, alternate_backend_service) 1477 wait_until_no_rpcs_go_to_given_backends( 1478 backend_instances, _WAIT_FOR_STATS_SEC 1479 ) 1480 1481 except Exception: 1482 passed = False 1483 raise 1484 finally: 1485 if passed or not args.halt_after_fail: 1486 delete_global_forwarding_rules(gcp) 1487 delete_target_proxies(gcp) 1488 delete_url_maps(gcp) 1489 create_url_map( 1490 gcp, url_map_name, backend_service, service_host_name 1491 ) 1492 create_target_proxy(gcp, target_proxy_name) 1493 create_global_forwarding_rule( 1494 gcp, forwarding_rule_name, potential_service_ports 1495 ) 1496 if gcp.service_port != _DEFAULT_SERVICE_PORT: 1497 patch_url_map_host_rule_with_port( 1498 gcp, url_map_name, backend_service, service_host_name 1499 ) 1500 server_uri = service_host_name + ":" + str(gcp.service_port) 1501 else: 1502 server_uri = service_host_name 1503 return server_uri 1504 1505 1506def test_forwarding_rule_port_match(gcp, backend_service, instance_group): 1507 logger.info("Running test_forwarding_rule_port_match") 1508 passed = True 1509 try: 1510 wait_for_healthy_backends(gcp, backend_service, instance_group) 1511 backend_instances = get_instance_names(gcp, instance_group) 1512 wait_until_all_rpcs_go_to_given_backends( 1513 backend_instances, _WAIT_FOR_STATS_SEC 1514 ) 1515 delete_global_forwarding_rules(gcp) 1516 create_global_forwarding_rule( 1517 gcp, 1518 forwarding_rule_name, 1519 [ 1520 x 1521 for x in parse_port_range(_DEFAULT_PORT_RANGE) 1522 if x != gcp.service_port 1523 ], 1524 ) 1525 wait_until_no_rpcs_go_to_given_backends( 1526 backend_instances, _WAIT_FOR_STATS_SEC 1527 ) 1528 except Exception: 1529 passed = False 1530 raise 1531 finally: 1532 if passed or not args.halt_after_fail: 1533 delete_global_forwarding_rules(gcp) 1534 create_global_forwarding_rule( 1535 gcp, forwarding_rule_name, potential_service_ports 1536 ) 1537 if gcp.service_port != _DEFAULT_SERVICE_PORT: 1538 patch_url_map_host_rule_with_port( 1539 gcp, url_map_name, backend_service, service_host_name 1540 ) 1541 server_uri = service_host_name + ":" + str(gcp.service_port) 1542 else: 1543 server_uri = service_host_name 1544 return server_uri 1545 1546 1547def test_forwarding_rule_default_port(gcp, backend_service, instance_group): 1548 logger.info("Running test_forwarding_rule_default_port") 1549 passed = True 1550 try: 1551 wait_for_healthy_backends(gcp, backend_service, instance_group) 1552 backend_instances = get_instance_names(gcp, instance_group) 1553 if gcp.service_port == _DEFAULT_SERVICE_PORT: 1554 wait_until_all_rpcs_go_to_given_backends( 1555 backend_instances, _WAIT_FOR_STATS_SEC 1556 ) 1557 delete_global_forwarding_rules(gcp) 1558 create_global_forwarding_rule( 1559 gcp, forwarding_rule_name, parse_port_range(_DEFAULT_PORT_RANGE) 1560 ) 1561 patch_url_map_host_rule_with_port( 1562 gcp, url_map_name, backend_service, service_host_name 1563 ) 1564 wait_until_no_rpcs_go_to_given_backends( 1565 backend_instances, _WAIT_FOR_STATS_SEC 1566 ) 1567 # expect success when no port in client request service uri, and no port in url-map 1568 delete_global_forwarding_rule(gcp, gcp.global_forwarding_rules[0]) 1569 delete_target_proxy(gcp, gcp.target_proxies[0]) 1570 delete_url_map(gcp, gcp.url_maps[0]) 1571 create_url_map(gcp, url_map_name, backend_service, service_host_name) 1572 create_target_proxy(gcp, target_proxy_name, False) 1573 potential_ip_addresses = [] 1574 max_attempts = 10 1575 for i in range(max_attempts): 1576 potential_ip_addresses.append( 1577 "10.10.10.%d" % (random.randint(0, 255)) 1578 ) 1579 create_global_forwarding_rule( 1580 gcp, forwarding_rule_name, [80], potential_ip_addresses 1581 ) 1582 wait_until_all_rpcs_go_to_given_backends( 1583 backend_instances, _WAIT_FOR_STATS_SEC 1584 ) 1585 1586 # expect failure when no port in client request uri, but specify port in url-map 1587 patch_url_map_host_rule_with_port( 1588 gcp, url_map_name, backend_service, service_host_name 1589 ) 1590 wait_until_no_rpcs_go_to_given_backends( 1591 backend_instances, _WAIT_FOR_STATS_SEC 1592 ) 1593 except Exception: 1594 passed = False 1595 raise 1596 finally: 1597 if passed or not args.halt_after_fail: 1598 delete_global_forwarding_rules(gcp) 1599 delete_target_proxies(gcp) 1600 delete_url_maps(gcp) 1601 create_url_map( 1602 gcp, url_map_name, backend_service, service_host_name 1603 ) 1604 create_target_proxy(gcp, target_proxy_name) 1605 create_global_forwarding_rule( 1606 gcp, forwarding_rule_name, potential_service_ports 1607 ) 1608 if gcp.service_port != _DEFAULT_SERVICE_PORT: 1609 patch_url_map_host_rule_with_port( 1610 gcp, url_map_name, backend_service, service_host_name 1611 ) 1612 server_uri = service_host_name + ":" + str(gcp.service_port) 1613 else: 1614 server_uri = service_host_name 1615 return server_uri 1616 1617 1618def test_traffic_splitting( 1619 gcp, 1620 original_backend_service, 1621 instance_group, 1622 alternate_backend_service, 1623 same_zone_instance_group, 1624): 1625 # This test start with all traffic going to original_backend_service. Then 1626 # it updates URL-map to set default action to traffic splitting between 1627 # original and alternate. It waits for all backends in both services to 1628 # receive traffic, then verifies that weights are expected. 1629 logger.info("Running test_traffic_splitting") 1630 1631 ( 1632 original_backend_instances, 1633 alternate_backend_instances, 1634 ) = prepare_services_for_urlmap_tests( 1635 gcp, 1636 original_backend_service, 1637 instance_group, 1638 alternate_backend_service, 1639 same_zone_instance_group, 1640 ) 1641 1642 passed = True 1643 try: 1644 # Patch urlmap, change route action to traffic splitting between 1645 # original and alternate. 1646 logger.info("patching url map with traffic splitting") 1647 original_service_percentage, alternate_service_percentage = 20, 80 1648 patch_url_map_backend_service( 1649 gcp, 1650 services_with_weights={ 1651 original_backend_service: original_service_percentage, 1652 alternate_backend_service: alternate_service_percentage, 1653 }, 1654 ) 1655 # Split percentage between instances: [20,80] -> [10,10,40,40]. 1656 expected_instance_percentage = [ 1657 original_service_percentage * 1.0 / len(original_backend_instances) 1658 ] * len(original_backend_instances) + [ 1659 alternate_service_percentage 1660 * 1.0 1661 / len(alternate_backend_instances) 1662 ] * len( 1663 alternate_backend_instances 1664 ) 1665 1666 # Wait for traffic to go to both services. 1667 logger.info( 1668 "waiting for traffic to go to all backends (including alternate)" 1669 ) 1670 wait_until_all_rpcs_go_to_given_backends( 1671 original_backend_instances + alternate_backend_instances, 1672 _WAIT_FOR_STATS_SEC, 1673 ) 1674 1675 # Verify that weights between two services are expected. 1676 retry_count = 10 1677 # Each attempt takes about 10 seconds, 10 retries is equivalent to 100 1678 # seconds timeout. 1679 for i in range(retry_count): 1680 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) 1681 got_instance_count = [ 1682 stats.rpcs_by_peer[i] for i in original_backend_instances 1683 ] + [stats.rpcs_by_peer[i] for i in alternate_backend_instances] 1684 total_count = sum(got_instance_count) 1685 got_instance_percentage = [ 1686 x * 100.0 / total_count for x in got_instance_count 1687 ] 1688 1689 try: 1690 compare_distributions( 1691 got_instance_percentage, expected_instance_percentage, 5 1692 ) 1693 except Exception as e: 1694 logger.info("attempt %d", i) 1695 logger.info("got percentage: %s", got_instance_percentage) 1696 logger.info( 1697 "expected percentage: %s", expected_instance_percentage 1698 ) 1699 logger.info(e) 1700 if i == retry_count - 1: 1701 raise Exception( 1702 "RPC distribution (%s) differs from expected (%s)" 1703 % ( 1704 got_instance_percentage, 1705 expected_instance_percentage, 1706 ) 1707 ) 1708 else: 1709 logger.info("success") 1710 break 1711 except Exception: 1712 passed = False 1713 raise 1714 finally: 1715 if passed or not args.halt_after_fail: 1716 patch_url_map_backend_service(gcp, original_backend_service) 1717 patch_backend_service(gcp, alternate_backend_service, []) 1718 1719 1720def test_path_matching( 1721 gcp, 1722 original_backend_service, 1723 instance_group, 1724 alternate_backend_service, 1725 same_zone_instance_group, 1726): 1727 # This test start with all traffic (UnaryCall and EmptyCall) going to 1728 # original_backend_service. 1729 # 1730 # Then it updates URL-map to add routes, to make UnaryCall and EmptyCall to 1731 # go different backends. It waits for all backends in both services to 1732 # receive traffic, then verifies that traffic goes to the expected 1733 # backends. 1734 logger.info("Running test_path_matching") 1735 1736 ( 1737 original_backend_instances, 1738 alternate_backend_instances, 1739 ) = prepare_services_for_urlmap_tests( 1740 gcp, 1741 original_backend_service, 1742 instance_group, 1743 alternate_backend_service, 1744 same_zone_instance_group, 1745 ) 1746 1747 passed = True 1748 try: 1749 # A list of tuples (route_rules, expected_instances). 1750 test_cases = [ 1751 ( 1752 [ 1753 { 1754 "priority": 0, 1755 # FullPath EmptyCall -> alternate_backend_service. 1756 "matchRules": [ 1757 { 1758 "fullPathMatch": ( 1759 "/grpc.testing.TestService/EmptyCall" 1760 ) 1761 } 1762 ], 1763 "service": alternate_backend_service.url, 1764 } 1765 ], 1766 { 1767 "EmptyCall": alternate_backend_instances, 1768 "UnaryCall": original_backend_instances, 1769 }, 1770 ), 1771 ( 1772 [ 1773 { 1774 "priority": 0, 1775 # Prefix UnaryCall -> alternate_backend_service. 1776 "matchRules": [ 1777 {"prefixMatch": "/grpc.testing.TestService/Unary"} 1778 ], 1779 "service": alternate_backend_service.url, 1780 } 1781 ], 1782 { 1783 "UnaryCall": alternate_backend_instances, 1784 "EmptyCall": original_backend_instances, 1785 }, 1786 ), 1787 ( 1788 # This test case is similar to the one above (but with route 1789 # services swapped). This test has two routes (full_path and 1790 # the default) to match EmptyCall, and both routes set 1791 # alternative_backend_service as the action. This forces the 1792 # client to handle duplicate Clusters in the RDS response. 1793 [ 1794 { 1795 "priority": 0, 1796 # Prefix UnaryCall -> original_backend_service. 1797 "matchRules": [ 1798 {"prefixMatch": "/grpc.testing.TestService/Unary"} 1799 ], 1800 "service": original_backend_service.url, 1801 }, 1802 { 1803 "priority": 1, 1804 # FullPath EmptyCall -> alternate_backend_service. 1805 "matchRules": [ 1806 { 1807 "fullPathMatch": ( 1808 "/grpc.testing.TestService/EmptyCall" 1809 ) 1810 } 1811 ], 1812 "service": alternate_backend_service.url, 1813 }, 1814 ], 1815 { 1816 "UnaryCall": original_backend_instances, 1817 "EmptyCall": alternate_backend_instances, 1818 }, 1819 ), 1820 ( 1821 [ 1822 { 1823 "priority": 0, 1824 # Regex UnaryCall -> alternate_backend_service. 1825 "matchRules": [ 1826 { 1827 "regexMatch": ( # Unary methods with any services. 1828 "^\/.*\/UnaryCall$" 1829 ) 1830 } 1831 ], 1832 "service": alternate_backend_service.url, 1833 } 1834 ], 1835 { 1836 "UnaryCall": alternate_backend_instances, 1837 "EmptyCall": original_backend_instances, 1838 }, 1839 ), 1840 ( 1841 [ 1842 { 1843 "priority": 0, 1844 # ignoreCase EmptyCall -> alternate_backend_service. 1845 "matchRules": [ 1846 { 1847 # Case insensitive matching. 1848 "fullPathMatch": ( 1849 "/gRpC.tEsTinG.tEstseRvice/empTycaLl" 1850 ), 1851 "ignoreCase": True, 1852 } 1853 ], 1854 "service": alternate_backend_service.url, 1855 } 1856 ], 1857 { 1858 "UnaryCall": original_backend_instances, 1859 "EmptyCall": alternate_backend_instances, 1860 }, 1861 ), 1862 ] 1863 1864 for route_rules, expected_instances in test_cases: 1865 logger.info("patching url map with %s", route_rules) 1866 patch_url_map_backend_service( 1867 gcp, original_backend_service, route_rules=route_rules 1868 ) 1869 1870 # Wait for traffic to go to both services. 1871 logger.info( 1872 "waiting for traffic to go to all backends (including" 1873 " alternate)" 1874 ) 1875 wait_until_all_rpcs_go_to_given_backends( 1876 original_backend_instances + alternate_backend_instances, 1877 _WAIT_FOR_STATS_SEC, 1878 ) 1879 1880 retry_count = 80 1881 # Each attempt takes about 5 seconds, 80 retries is equivalent to 400 1882 # seconds timeout. 1883 for i in range(retry_count): 1884 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) 1885 if not stats.rpcs_by_method: 1886 raise ValueError( 1887 "stats.rpcs_by_method is None, the interop client stats" 1888 " service does not support this test case" 1889 ) 1890 logger.info("attempt %d", i) 1891 if compare_expected_instances(stats, expected_instances): 1892 logger.info("success") 1893 break 1894 elif i == retry_count - 1: 1895 raise Exception( 1896 "timeout waiting for RPCs to the expected instances: %s" 1897 % expected_instances 1898 ) 1899 except Exception: 1900 passed = False 1901 raise 1902 finally: 1903 if passed or not args.halt_after_fail: 1904 patch_url_map_backend_service(gcp, original_backend_service) 1905 patch_backend_service(gcp, alternate_backend_service, []) 1906 1907 1908def test_header_matching( 1909 gcp, 1910 original_backend_service, 1911 instance_group, 1912 alternate_backend_service, 1913 same_zone_instance_group, 1914): 1915 # This test start with all traffic (UnaryCall and EmptyCall) going to 1916 # original_backend_service. 1917 # 1918 # Then it updates URL-map to add routes, to make RPCs with test headers to 1919 # go to different backends. It waits for all backends in both services to 1920 # receive traffic, then verifies that traffic goes to the expected 1921 # backends. 1922 logger.info("Running test_header_matching") 1923 1924 ( 1925 original_backend_instances, 1926 alternate_backend_instances, 1927 ) = prepare_services_for_urlmap_tests( 1928 gcp, 1929 original_backend_service, 1930 instance_group, 1931 alternate_backend_service, 1932 same_zone_instance_group, 1933 ) 1934 1935 passed = True 1936 try: 1937 # A list of tuples (route_rules, expected_instances). 1938 test_cases = [ 1939 ( 1940 [ 1941 { 1942 "priority": 0, 1943 # Header ExactMatch -> alternate_backend_service. 1944 # EmptyCall is sent with the metadata. 1945 "matchRules": [ 1946 { 1947 "prefixMatch": "/", 1948 "headerMatches": [ 1949 { 1950 "headerName": _TEST_METADATA_KEY, 1951 "exactMatch": _TEST_METADATA_VALUE_EMPTY, 1952 } 1953 ], 1954 } 1955 ], 1956 "service": alternate_backend_service.url, 1957 } 1958 ], 1959 { 1960 "EmptyCall": alternate_backend_instances, 1961 "UnaryCall": original_backend_instances, 1962 }, 1963 ), 1964 ( 1965 [ 1966 { 1967 "priority": 0, 1968 # Header PrefixMatch -> alternate_backend_service. 1969 # UnaryCall is sent with the metadata. 1970 "matchRules": [ 1971 { 1972 "prefixMatch": "/", 1973 "headerMatches": [ 1974 { 1975 "headerName": _TEST_METADATA_KEY, 1976 "prefixMatch": _TEST_METADATA_VALUE_UNARY[ 1977 :2 1978 ], 1979 } 1980 ], 1981 } 1982 ], 1983 "service": alternate_backend_service.url, 1984 } 1985 ], 1986 { 1987 "EmptyCall": original_backend_instances, 1988 "UnaryCall": alternate_backend_instances, 1989 }, 1990 ), 1991 ( 1992 [ 1993 { 1994 "priority": 0, 1995 # Header SuffixMatch -> alternate_backend_service. 1996 # EmptyCall is sent with the metadata. 1997 "matchRules": [ 1998 { 1999 "prefixMatch": "/", 2000 "headerMatches": [ 2001 { 2002 "headerName": _TEST_METADATA_KEY, 2003 "suffixMatch": _TEST_METADATA_VALUE_EMPTY[ 2004 -2: 2005 ], 2006 } 2007 ], 2008 } 2009 ], 2010 "service": alternate_backend_service.url, 2011 } 2012 ], 2013 { 2014 "EmptyCall": alternate_backend_instances, 2015 "UnaryCall": original_backend_instances, 2016 }, 2017 ), 2018 ( 2019 [ 2020 { 2021 "priority": 0, 2022 # Header 'xds_md_numeric' present -> alternate_backend_service. 2023 # UnaryCall is sent with the metadata, so will be sent to alternative. 2024 "matchRules": [ 2025 { 2026 "prefixMatch": "/", 2027 "headerMatches": [ 2028 { 2029 "headerName": _TEST_METADATA_NUMERIC_KEY, 2030 "presentMatch": True, 2031 } 2032 ], 2033 } 2034 ], 2035 "service": alternate_backend_service.url, 2036 } 2037 ], 2038 { 2039 "EmptyCall": original_backend_instances, 2040 "UnaryCall": alternate_backend_instances, 2041 }, 2042 ), 2043 ( 2044 [ 2045 { 2046 "priority": 0, 2047 # Header invert ExactMatch -> alternate_backend_service. 2048 # UnaryCall is sent with the metadata, so will be sent to 2049 # original. EmptyCall will be sent to alternative. 2050 "matchRules": [ 2051 { 2052 "prefixMatch": "/", 2053 "headerMatches": [ 2054 { 2055 "headerName": _TEST_METADATA_KEY, 2056 "exactMatch": _TEST_METADATA_VALUE_UNARY, 2057 "invertMatch": True, 2058 } 2059 ], 2060 } 2061 ], 2062 "service": alternate_backend_service.url, 2063 } 2064 ], 2065 { 2066 "EmptyCall": alternate_backend_instances, 2067 "UnaryCall": original_backend_instances, 2068 }, 2069 ), 2070 ( 2071 [ 2072 { 2073 "priority": 0, 2074 # Header 'xds_md_numeric' range [100,200] -> alternate_backend_service. 2075 # UnaryCall is sent with the metadata in range. 2076 "matchRules": [ 2077 { 2078 "prefixMatch": "/", 2079 "headerMatches": [ 2080 { 2081 "headerName": _TEST_METADATA_NUMERIC_KEY, 2082 "rangeMatch": { 2083 "rangeStart": "100", 2084 "rangeEnd": "200", 2085 }, 2086 } 2087 ], 2088 } 2089 ], 2090 "service": alternate_backend_service.url, 2091 } 2092 ], 2093 { 2094 "EmptyCall": original_backend_instances, 2095 "UnaryCall": alternate_backend_instances, 2096 }, 2097 ), 2098 ( 2099 [ 2100 { 2101 "priority": 0, 2102 # Header RegexMatch -> alternate_backend_service. 2103 # EmptyCall is sent with the metadata. 2104 "matchRules": [ 2105 { 2106 "prefixMatch": "/", 2107 "headerMatches": [ 2108 { 2109 "headerName": _TEST_METADATA_KEY, 2110 "regexMatch": "^%s.*%s$" 2111 % ( 2112 _TEST_METADATA_VALUE_EMPTY[:2], 2113 _TEST_METADATA_VALUE_EMPTY[-2:], 2114 ), 2115 } 2116 ], 2117 } 2118 ], 2119 "service": alternate_backend_service.url, 2120 } 2121 ], 2122 { 2123 "EmptyCall": alternate_backend_instances, 2124 "UnaryCall": original_backend_instances, 2125 }, 2126 ), 2127 ] 2128 2129 for route_rules, expected_instances in test_cases: 2130 logger.info( 2131 "patching url map with %s -> alternative", 2132 route_rules[0]["matchRules"], 2133 ) 2134 patch_url_map_backend_service( 2135 gcp, original_backend_service, route_rules=route_rules 2136 ) 2137 2138 # Wait for traffic to go to both services. 2139 logger.info( 2140 "waiting for traffic to go to all backends (including" 2141 " alternate)" 2142 ) 2143 wait_until_all_rpcs_go_to_given_backends( 2144 original_backend_instances + alternate_backend_instances, 2145 _WAIT_FOR_STATS_SEC, 2146 ) 2147 2148 retry_count = 80 2149 # Each attempt takes about 5 seconds, 80 retries is equivalent to 400 2150 # seconds timeout. 2151 for i in range(retry_count): 2152 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) 2153 if not stats.rpcs_by_method: 2154 raise ValueError( 2155 "stats.rpcs_by_method is None, the interop client stats" 2156 " service does not support this test case" 2157 ) 2158 logger.info("attempt %d", i) 2159 if compare_expected_instances(stats, expected_instances): 2160 logger.info("success") 2161 break 2162 elif i == retry_count - 1: 2163 raise Exception( 2164 "timeout waiting for RPCs to the expected instances: %s" 2165 % expected_instances 2166 ) 2167 except Exception: 2168 passed = False 2169 raise 2170 finally: 2171 if passed or not args.halt_after_fail: 2172 patch_url_map_backend_service(gcp, original_backend_service) 2173 patch_backend_service(gcp, alternate_backend_service, []) 2174 2175 2176def test_circuit_breaking( 2177 gcp, original_backend_service, instance_group, same_zone_instance_group 2178): 2179 """ 2180 Since backend service circuit_breakers configuration cannot be unset, 2181 which causes trouble for restoring validate_for_proxy flag in target 2182 proxy/global forwarding rule. This test uses dedicated backend sevices. 2183 The url_map and backend services undergoes the following state changes: 2184 2185 Before test: 2186 original_backend_service -> [instance_group] 2187 extra_backend_service -> [] 2188 more_extra_backend_service -> [] 2189 2190 url_map -> [original_backend_service] 2191 2192 In test: 2193 extra_backend_service (with circuit_breakers) -> [instance_group] 2194 more_extra_backend_service (with circuit_breakers) -> [same_zone_instance_group] 2195 2196 url_map -> [extra_backend_service, more_extra_backend_service] 2197 2198 After test: 2199 original_backend_service -> [instance_group] 2200 extra_backend_service (with circuit_breakers) -> [] 2201 more_extra_backend_service (with circuit_breakers) -> [] 2202 2203 url_map -> [original_backend_service] 2204 """ 2205 logger.info("Running test_circuit_breaking") 2206 additional_backend_services = [] 2207 passed = True 2208 try: 2209 # TODO(chengyuanzhang): Dedicated backend services created for circuit 2210 # breaking test. Once the issue for unsetting backend service circuit 2211 # breakers is resolved or configuring backend service circuit breakers is 2212 # enabled for config validation, these dedicated backend services can be 2213 # eliminated. 2214 extra_backend_service_name = ( 2215 _BASE_BACKEND_SERVICE_NAME + "-extra" + gcp_suffix 2216 ) 2217 more_extra_backend_service_name = ( 2218 _BASE_BACKEND_SERVICE_NAME + "-more-extra" + gcp_suffix 2219 ) 2220 extra_backend_service = add_backend_service( 2221 gcp, extra_backend_service_name 2222 ) 2223 additional_backend_services.append(extra_backend_service) 2224 more_extra_backend_service = add_backend_service( 2225 gcp, more_extra_backend_service_name 2226 ) 2227 additional_backend_services.append(more_extra_backend_service) 2228 # The config validation for proxyless doesn't allow setting 2229 # circuit_breakers. Disable validate validate_for_proxyless 2230 # for this test. This can be removed when validation 2231 # accepts circuit_breakers. 2232 logger.info("disabling validate_for_proxyless in target proxy") 2233 set_validate_for_proxyless(gcp, False) 2234 extra_backend_service_max_requests = 500 2235 more_extra_backend_service_max_requests = 1000 2236 patch_backend_service( 2237 gcp, 2238 extra_backend_service, 2239 [instance_group], 2240 circuit_breakers={ 2241 "maxRequests": extra_backend_service_max_requests 2242 }, 2243 ) 2244 logger.info("Waiting for extra backends to become healthy") 2245 wait_for_healthy_backends(gcp, extra_backend_service, instance_group) 2246 patch_backend_service( 2247 gcp, 2248 more_extra_backend_service, 2249 [same_zone_instance_group], 2250 circuit_breakers={ 2251 "maxRequests": more_extra_backend_service_max_requests 2252 }, 2253 ) 2254 logger.info("Waiting for more extra backend to become healthy") 2255 wait_for_healthy_backends( 2256 gcp, more_extra_backend_service, same_zone_instance_group 2257 ) 2258 extra_backend_instances = get_instance_names(gcp, instance_group) 2259 more_extra_backend_instances = get_instance_names( 2260 gcp, same_zone_instance_group 2261 ) 2262 route_rules = [ 2263 { 2264 "priority": 0, 2265 # UnaryCall -> extra_backend_service 2266 "matchRules": [ 2267 {"fullPathMatch": "/grpc.testing.TestService/UnaryCall"} 2268 ], 2269 "service": extra_backend_service.url, 2270 }, 2271 { 2272 "priority": 1, 2273 # EmptyCall -> more_extra_backend_service 2274 "matchRules": [ 2275 {"fullPathMatch": "/grpc.testing.TestService/EmptyCall"} 2276 ], 2277 "service": more_extra_backend_service.url, 2278 }, 2279 ] 2280 2281 # Make client send UNARY_CALL and EMPTY_CALL. 2282 configure_client( 2283 [ 2284 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 2285 messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL, 2286 ] 2287 ) 2288 logger.info("Patching url map with %s", route_rules) 2289 patch_url_map_backend_service( 2290 gcp, extra_backend_service, route_rules=route_rules 2291 ) 2292 logger.info("Waiting for traffic to go to all backends") 2293 wait_until_all_rpcs_go_to_given_backends( 2294 extra_backend_instances + more_extra_backend_instances, 2295 _WAIT_FOR_STATS_SEC, 2296 ) 2297 2298 # Make all calls keep-open. 2299 configure_client( 2300 [ 2301 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 2302 messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL, 2303 ], 2304 [ 2305 ( 2306 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 2307 "rpc-behavior", 2308 "keep-open", 2309 ), 2310 ( 2311 messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL, 2312 "rpc-behavior", 2313 "keep-open", 2314 ), 2315 ], 2316 ) 2317 wait_until_rpcs_in_flight( 2318 "UNARY_CALL", 2319 ( 2320 _WAIT_FOR_BACKEND_SEC 2321 + int(extra_backend_service_max_requests / args.qps) 2322 ), 2323 extra_backend_service_max_requests, 2324 1, 2325 ) 2326 logger.info( 2327 "UNARY_CALL reached stable state (%d)", 2328 extra_backend_service_max_requests, 2329 ) 2330 wait_until_rpcs_in_flight( 2331 "EMPTY_CALL", 2332 ( 2333 _WAIT_FOR_BACKEND_SEC 2334 + int(more_extra_backend_service_max_requests / args.qps) 2335 ), 2336 more_extra_backend_service_max_requests, 2337 1, 2338 ) 2339 logger.info( 2340 "EMPTY_CALL reached stable state (%d)", 2341 more_extra_backend_service_max_requests, 2342 ) 2343 2344 # Increment circuit breakers max_requests threshold. 2345 extra_backend_service_max_requests = 800 2346 patch_backend_service( 2347 gcp, 2348 extra_backend_service, 2349 [instance_group], 2350 circuit_breakers={ 2351 "maxRequests": extra_backend_service_max_requests 2352 }, 2353 ) 2354 wait_until_rpcs_in_flight( 2355 "UNARY_CALL", 2356 ( 2357 _WAIT_FOR_BACKEND_SEC 2358 + int(extra_backend_service_max_requests / args.qps) 2359 ), 2360 extra_backend_service_max_requests, 2361 1, 2362 ) 2363 logger.info( 2364 "UNARY_CALL reached stable state after increase (%d)", 2365 extra_backend_service_max_requests, 2366 ) 2367 logger.info("success") 2368 # Avoid new RPCs being outstanding (some test clients create threads 2369 # for sending RPCs) after restoring backend services. 2370 configure_client( 2371 [messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL] 2372 ) 2373 except Exception: 2374 passed = False 2375 raise 2376 finally: 2377 if passed or not args.halt_after_fail: 2378 patch_url_map_backend_service(gcp, original_backend_service) 2379 patch_backend_service( 2380 gcp, original_backend_service, [instance_group] 2381 ) 2382 for backend_service in additional_backend_services: 2383 delete_backend_service(gcp, backend_service) 2384 set_validate_for_proxyless(gcp, True) 2385 2386 2387def test_timeout(gcp, original_backend_service, instance_group): 2388 logger.info("Running test_timeout") 2389 2390 logger.info("waiting for original backends to become healthy") 2391 wait_for_healthy_backends(gcp, original_backend_service, instance_group) 2392 2393 # UnaryCall -> maxStreamDuration:3s 2394 route_rules = [ 2395 { 2396 "priority": 0, 2397 "matchRules": [ 2398 {"fullPathMatch": "/grpc.testing.TestService/UnaryCall"} 2399 ], 2400 "service": original_backend_service.url, 2401 "routeAction": { 2402 "maxStreamDuration": { 2403 "seconds": 3, 2404 }, 2405 }, 2406 } 2407 ] 2408 patch_url_map_backend_service( 2409 gcp, original_backend_service, route_rules=route_rules 2410 ) 2411 # A list of tuples (testcase_name, {client_config}, {expected_results}) 2412 test_cases = [ 2413 ( 2414 ( 2415 "timeout_exceeded (UNARY_CALL), timeout_different_route" 2416 " (EMPTY_CALL)" 2417 ), 2418 # UnaryCall and EmptyCall both sleep-4. 2419 # UnaryCall timeouts, EmptyCall succeeds. 2420 { 2421 "rpc_types": [ 2422 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 2423 messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL, 2424 ], 2425 "metadata": [ 2426 ( 2427 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 2428 "rpc-behavior", 2429 "sleep-4", 2430 ), 2431 ( 2432 messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL, 2433 "rpc-behavior", 2434 "sleep-4", 2435 ), 2436 ], 2437 }, 2438 { 2439 "UNARY_CALL": 4, # DEADLINE_EXCEEDED 2440 "EMPTY_CALL": 0, 2441 }, 2442 ), 2443 ( 2444 "app_timeout_exceeded", 2445 # UnaryCall only with sleep-2; timeout=1s; calls timeout. 2446 { 2447 "rpc_types": [ 2448 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 2449 ], 2450 "metadata": [ 2451 ( 2452 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 2453 "rpc-behavior", 2454 "sleep-2", 2455 ), 2456 ], 2457 "timeout_sec": 1, 2458 }, 2459 { 2460 "UNARY_CALL": 4, # DEADLINE_EXCEEDED 2461 }, 2462 ), 2463 ( 2464 "timeout_not_exceeded", 2465 # UnaryCall only with no sleep; calls succeed. 2466 { 2467 "rpc_types": [ 2468 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 2469 ], 2470 }, 2471 { 2472 "UNARY_CALL": 0, 2473 }, 2474 ), 2475 ] 2476 2477 passed = True 2478 try: 2479 first_case = True 2480 for testcase_name, client_config, expected_results in test_cases: 2481 logger.info("starting case %s", testcase_name) 2482 configure_client(**client_config) 2483 # wait a second to help ensure the client stops sending RPCs with 2484 # the old config. We will make multiple attempts if it is failing, 2485 # but this improves confidence that the test is valid if the 2486 # previous client_config would lead to the same results. 2487 time.sleep(1) 2488 # Each attempt takes 10 seconds; 20 attempts is equivalent to 200 2489 # second timeout. 2490 attempt_count = 20 2491 if first_case: 2492 attempt_count = 120 2493 first_case = False 2494 before_stats = get_client_accumulated_stats() 2495 if not before_stats.stats_per_method: 2496 raise ValueError( 2497 "stats.stats_per_method is None, the interop client stats" 2498 " service does not support this test case" 2499 ) 2500 for i in range(attempt_count): 2501 logger.info("%s: attempt %d", testcase_name, i) 2502 2503 test_runtime_secs = 10 2504 time.sleep(test_runtime_secs) 2505 after_stats = get_client_accumulated_stats() 2506 2507 success = True 2508 for rpc, status in list(expected_results.items()): 2509 qty = ( 2510 after_stats.stats_per_method[rpc].result[status] 2511 - before_stats.stats_per_method[rpc].result[status] 2512 ) 2513 want = test_runtime_secs * args.qps 2514 # Allow 10% deviation from expectation to reduce flakiness 2515 if qty < (want * 0.9) or qty > (want * 1.1): 2516 logger.info( 2517 "%s: failed due to %s[%s]: got %d want ~%d", 2518 testcase_name, 2519 rpc, 2520 status, 2521 qty, 2522 want, 2523 ) 2524 success = False 2525 if success: 2526 logger.info("success") 2527 break 2528 logger.info("%s attempt %d failed", testcase_name, i) 2529 before_stats = after_stats 2530 else: 2531 raise Exception( 2532 "%s: timeout waiting for expected results: %s; got %s" 2533 % ( 2534 testcase_name, 2535 expected_results, 2536 after_stats.stats_per_method, 2537 ) 2538 ) 2539 except Exception: 2540 passed = False 2541 raise 2542 finally: 2543 if passed or not args.halt_after_fail: 2544 patch_url_map_backend_service(gcp, original_backend_service) 2545 2546 2547def test_fault_injection(gcp, original_backend_service, instance_group): 2548 logger.info("Running test_fault_injection") 2549 2550 logger.info("waiting for original backends to become healthy") 2551 wait_for_healthy_backends(gcp, original_backend_service, instance_group) 2552 2553 testcase_header = "fi_testcase" 2554 2555 def _route(pri, name, fi_policy): 2556 return { 2557 "priority": pri, 2558 "matchRules": [ 2559 { 2560 "prefixMatch": "/", 2561 "headerMatches": [ 2562 { 2563 "headerName": testcase_header, 2564 "exactMatch": name, 2565 } 2566 ], 2567 } 2568 ], 2569 "service": original_backend_service.url, 2570 "routeAction": {"faultInjectionPolicy": fi_policy}, 2571 } 2572 2573 def _abort(pct): 2574 return { 2575 "abort": { 2576 "httpStatus": 401, 2577 "percentage": pct, 2578 } 2579 } 2580 2581 def _delay(pct): 2582 return { 2583 "delay": { 2584 "fixedDelay": {"seconds": "20"}, 2585 "percentage": pct, 2586 } 2587 } 2588 2589 zero_route = _abort(0) 2590 zero_route.update(_delay(0)) 2591 route_rules = [ 2592 _route(0, "zero_percent_fault_injection", zero_route), 2593 _route(1, "always_delay", _delay(100)), 2594 _route(2, "always_abort", _abort(100)), 2595 _route(3, "delay_half", _delay(50)), 2596 _route(4, "abort_half", _abort(50)), 2597 { 2598 "priority": 5, 2599 "matchRules": [{"prefixMatch": "/"}], 2600 "service": original_backend_service.url, 2601 }, 2602 ] 2603 set_validate_for_proxyless(gcp, False) 2604 patch_url_map_backend_service( 2605 gcp, original_backend_service, route_rules=route_rules 2606 ) 2607 # A list of tuples (testcase_name, {client_config}, {code: percent}). Each 2608 # test case will set the testcase_header with the testcase_name for routing 2609 # to the appropriate config for the case, defined above. 2610 test_cases = [ 2611 ( 2612 "always_delay", 2613 {"timeout_sec": 2}, 2614 {4: 1}, # DEADLINE_EXCEEDED 2615 ), 2616 ( 2617 "always_abort", 2618 {}, 2619 {16: 1}, # UNAUTHENTICATED 2620 ), 2621 ( 2622 "delay_half", 2623 {"timeout_sec": 2}, 2624 {4: 0.5, 0: 0.5}, # DEADLINE_EXCEEDED / OK: 50% / 50% 2625 ), 2626 ( 2627 "abort_half", 2628 {}, 2629 {16: 0.5, 0: 0.5}, # UNAUTHENTICATED / OK: 50% / 50% 2630 ), 2631 ( 2632 "zero_percent_fault_injection", 2633 {}, 2634 {0: 1}, # OK 2635 ), 2636 ( 2637 "non_matching_fault_injection", # Not in route_rules, above. 2638 {}, 2639 {0: 1}, # OK 2640 ), 2641 ] 2642 2643 passed = True 2644 try: 2645 first_case = True 2646 for testcase_name, client_config, expected_results in test_cases: 2647 logger.info("starting case %s", testcase_name) 2648 2649 client_config["metadata"] = [ 2650 ( 2651 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 2652 testcase_header, 2653 testcase_name, 2654 ) 2655 ] 2656 client_config["rpc_types"] = [ 2657 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 2658 ] 2659 configure_client(**client_config) 2660 # wait a second to help ensure the client stops sending RPCs with 2661 # the old config. We will make multiple attempts if it is failing, 2662 # but this improves confidence that the test is valid if the 2663 # previous client_config would lead to the same results. 2664 time.sleep(1) 2665 # Each attempt takes 10 seconds 2666 if first_case: 2667 # Give the first test case 600s for xDS config propagation. 2668 attempt_count = 60 2669 first_case = False 2670 else: 2671 # The accumulated stats might include previous sub-test, running 2672 # the test multiple times to deflake 2673 attempt_count = 10 2674 before_stats = get_client_accumulated_stats() 2675 if not before_stats.stats_per_method: 2676 raise ValueError( 2677 "stats.stats_per_method is None, the interop client stats" 2678 " service does not support this test case" 2679 ) 2680 for i in range(attempt_count): 2681 logger.info("%s: attempt %d", testcase_name, i) 2682 2683 test_runtime_secs = 10 2684 time.sleep(test_runtime_secs) 2685 after_stats = get_client_accumulated_stats() 2686 2687 success = True 2688 for status, pct in list(expected_results.items()): 2689 rpc = "UNARY_CALL" 2690 qty = ( 2691 after_stats.stats_per_method[rpc].result[status] 2692 - before_stats.stats_per_method[rpc].result[status] 2693 ) 2694 want = pct * args.qps * test_runtime_secs 2695 # Allow 10% deviation from expectation to reduce flakiness 2696 VARIANCE_ALLOWED = 0.1 2697 if abs(qty - want) > want * VARIANCE_ALLOWED: 2698 logger.info( 2699 "%s: failed due to %s[%s]: got %d want ~%d", 2700 testcase_name, 2701 rpc, 2702 status, 2703 qty, 2704 want, 2705 ) 2706 success = False 2707 if success: 2708 logger.info("success") 2709 break 2710 logger.info("%s attempt %d failed", testcase_name, i) 2711 before_stats = after_stats 2712 else: 2713 raise Exception( 2714 "%s: timeout waiting for expected results: %s; got %s" 2715 % ( 2716 testcase_name, 2717 expected_results, 2718 after_stats.stats_per_method, 2719 ) 2720 ) 2721 except Exception: 2722 passed = False 2723 raise 2724 finally: 2725 if passed or not args.halt_after_fail: 2726 patch_url_map_backend_service(gcp, original_backend_service) 2727 set_validate_for_proxyless(gcp, True) 2728 2729 2730def test_csds(gcp, original_backend_service, instance_group, server_uri): 2731 test_csds_timeout_s = datetime.timedelta(minutes=5).total_seconds() 2732 sleep_interval_between_attempts_s = datetime.timedelta( 2733 seconds=2 2734 ).total_seconds() 2735 logger.info("Running test_csds") 2736 2737 logger.info("waiting for original backends to become healthy") 2738 wait_for_healthy_backends(gcp, original_backend_service, instance_group) 2739 2740 # Test case timeout: 5 minutes 2741 deadline = time.time() + test_csds_timeout_s 2742 cnt = 0 2743 while time.time() <= deadline: 2744 client_config = get_client_xds_config_dump() 2745 logger.info( 2746 "test_csds attempt %d: received xDS config %s", 2747 cnt, 2748 json.dumps(client_config, indent=2), 2749 ) 2750 if client_config is not None: 2751 # Got the xDS config dump, now validate it 2752 ok = True 2753 try: 2754 if client_config["node"]["locality"]["zone"] != args.zone: 2755 logger.info( 2756 "Invalid zone %s != %s", 2757 client_config["node"]["locality"]["zone"], 2758 args.zone, 2759 ) 2760 ok = False 2761 seen = set() 2762 for xds_config in client_config.get("xds_config", []): 2763 if "listener_config" in xds_config: 2764 listener_name = xds_config["listener_config"][ 2765 "dynamic_listeners" 2766 ][0]["active_state"]["listener"]["name"] 2767 if listener_name != server_uri: 2768 logger.info( 2769 "Invalid Listener name %s != %s", 2770 listener_name, 2771 server_uri, 2772 ) 2773 ok = False 2774 else: 2775 seen.add("lds") 2776 elif "route_config" in xds_config: 2777 num_vh = len( 2778 xds_config["route_config"]["dynamic_route_configs"][ 2779 0 2780 ]["route_config"]["virtual_hosts"] 2781 ) 2782 if num_vh <= 0: 2783 logger.info( 2784 "Invalid number of VirtualHosts %s", num_vh 2785 ) 2786 ok = False 2787 else: 2788 seen.add("rds") 2789 elif "cluster_config" in xds_config: 2790 cluster_type = xds_config["cluster_config"][ 2791 "dynamic_active_clusters" 2792 ][0]["cluster"]["type"] 2793 if cluster_type != "EDS": 2794 logger.info( 2795 "Invalid cluster type %s != EDS", cluster_type 2796 ) 2797 ok = False 2798 else: 2799 seen.add("cds") 2800 elif "endpoint_config" in xds_config: 2801 sub_zone = xds_config["endpoint_config"][ 2802 "dynamic_endpoint_configs" 2803 ][0]["endpoint_config"]["endpoints"][0]["locality"][ 2804 "sub_zone" 2805 ] 2806 if args.zone not in sub_zone: 2807 logger.info( 2808 "Invalid endpoint sub_zone %s", sub_zone 2809 ) 2810 ok = False 2811 else: 2812 seen.add("eds") 2813 for generic_xds_config in client_config.get( 2814 "generic_xds_configs", [] 2815 ): 2816 if re.search( 2817 r"\.Listener$", generic_xds_config["type_url"] 2818 ): 2819 seen.add("lds") 2820 listener = generic_xds_config["xds_config"] 2821 if listener["name"] != server_uri: 2822 logger.info( 2823 "Invalid Listener name %s != %s", 2824 listener_name, 2825 server_uri, 2826 ) 2827 ok = False 2828 elif re.search( 2829 r"\.RouteConfiguration$", generic_xds_config["type_url"] 2830 ): 2831 seen.add("rds") 2832 route_config = generic_xds_config["xds_config"] 2833 if not len(route_config["virtual_hosts"]): 2834 logger.info( 2835 "Invalid number of VirtualHosts %s", num_vh 2836 ) 2837 ok = False 2838 elif re.search( 2839 r"\.Cluster$", generic_xds_config["type_url"] 2840 ): 2841 seen.add("cds") 2842 cluster = generic_xds_config["xds_config"] 2843 if cluster["type"] != "EDS": 2844 logger.info( 2845 "Invalid cluster type %s != EDS", cluster_type 2846 ) 2847 ok = False 2848 elif re.search( 2849 r"\.ClusterLoadAssignment$", 2850 generic_xds_config["type_url"], 2851 ): 2852 seen.add("eds") 2853 endpoint = generic_xds_config["xds_config"] 2854 if ( 2855 args.zone 2856 not in endpoint["endpoints"][0]["locality"][ 2857 "sub_zone" 2858 ] 2859 ): 2860 logger.info( 2861 "Invalid endpoint sub_zone %s", sub_zone 2862 ) 2863 ok = False 2864 want = {"lds", "rds", "cds", "eds"} 2865 if seen != want: 2866 logger.info("Incomplete xDS config dump, seen=%s", seen) 2867 ok = False 2868 except: 2869 logger.exception("Error in xDS config dump:") 2870 ok = False 2871 finally: 2872 if ok: 2873 # Successfully fetched xDS config, and they looks good. 2874 logger.info("success") 2875 return 2876 logger.info("test_csds attempt %d failed", cnt) 2877 # Give the client some time to fetch xDS resources 2878 time.sleep(sleep_interval_between_attempts_s) 2879 cnt += 1 2880 2881 raise RuntimeError( 2882 "failed to receive a valid xDS config in %s seconds" 2883 % test_csds_timeout_s 2884 ) 2885 2886 2887def set_validate_for_proxyless(gcp, validate_for_proxyless): 2888 if not gcp.alpha_compute: 2889 logger.debug( 2890 "Not setting validateForProxy because alpha is not enabled" 2891 ) 2892 return 2893 if ( 2894 len(gcp.global_forwarding_rules) != 1 2895 or len(gcp.target_proxies) != 1 2896 or len(gcp.url_maps) != 1 2897 ): 2898 logger.debug( 2899 "Global forwarding rule, target proxy or url map not found." 2900 ) 2901 return 2902 # This function deletes global_forwarding_rule and target_proxy, then 2903 # recreate target_proxy with validateForProxyless=False. This is necessary 2904 # because patching target_grpc_proxy isn't supported. 2905 delete_global_forwarding_rule(gcp, gcp.global_forwarding_rules[0]) 2906 delete_target_proxy(gcp, gcp.target_proxies[0]) 2907 create_target_proxy(gcp, target_proxy_name, validate_for_proxyless) 2908 create_global_forwarding_rule(gcp, forwarding_rule_name, [gcp.service_port]) 2909 2910 2911def get_serving_status(instance, service_port): 2912 with grpc.insecure_channel("%s:%d" % (instance, service_port)) as channel: 2913 health_stub = health_pb2_grpc.HealthStub(channel) 2914 return health_stub.Check(health_pb2.HealthCheckRequest()) 2915 2916 2917def set_serving_status(instances, service_port, serving): 2918 logger.info("setting %s serving status to %s", instances, serving) 2919 for instance in instances: 2920 with grpc.insecure_channel( 2921 "%s:%d" % (instance, service_port) 2922 ) as channel: 2923 logger.info("setting %s serving status to %s", instance, serving) 2924 stub = test_pb2_grpc.XdsUpdateHealthServiceStub(channel) 2925 retry_count = 5 2926 for i in range(5): 2927 if serving: 2928 stub.SetServing(empty_pb2.Empty()) 2929 else: 2930 stub.SetNotServing(empty_pb2.Empty()) 2931 serving_status = get_serving_status(instance, service_port) 2932 logger.info("got instance service status %s", serving_status) 2933 want_status = ( 2934 health_pb2.HealthCheckResponse.SERVING 2935 if serving 2936 else health_pb2.HealthCheckResponse.NOT_SERVING 2937 ) 2938 if serving_status.status == want_status: 2939 break 2940 if i == retry_count - 1: 2941 raise Exception( 2942 "failed to set instance service status after %d retries" 2943 % retry_count 2944 ) 2945 2946 2947def is_primary_instance_group(gcp, instance_group): 2948 # Clients may connect to a TD instance in a different region than the 2949 # client, in which case primary/secondary assignments may not be based on 2950 # the client's actual locality. 2951 instance_names = get_instance_names(gcp, instance_group) 2952 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) 2953 return all( 2954 peer in instance_names for peer in list(stats.rpcs_by_peer.keys()) 2955 ) 2956 2957 2958def get_startup_script(path_to_server_binary, service_port): 2959 if path_to_server_binary: 2960 return "nohup %s --port=%d 1>/dev/null &" % ( 2961 path_to_server_binary, 2962 service_port, 2963 ) 2964 else: 2965 return ( 2966 """#!/bin/bash 2967sudo apt update 2968sudo apt install -y git default-jdk 2969mkdir java_server 2970pushd java_server 2971git clone https://github.com/grpc/grpc-java.git 2972pushd grpc-java 2973pushd interop-testing 2974../gradlew installDist -x test -PskipCodegen=true -PskipAndroid=true 2975 2976nohup build/install/grpc-interop-testing/bin/xds-test-server \ 2977 --port=%d 1>/dev/null &""" 2978 % service_port 2979 ) 2980 2981 2982def create_instance_template( 2983 gcp, name, network, source_image, machine_type, startup_script 2984): 2985 config = { 2986 "name": name, 2987 "properties": { 2988 "tags": {"items": ["allow-health-checks"]}, 2989 "machineType": machine_type, 2990 "serviceAccounts": [ 2991 { 2992 "email": "default", 2993 "scopes": [ 2994 "https://www.googleapis.com/auth/cloud-platform", 2995 ], 2996 } 2997 ], 2998 "networkInterfaces": [ 2999 { 3000 "accessConfigs": [{"type": "ONE_TO_ONE_NAT"}], 3001 "network": network, 3002 } 3003 ], 3004 "disks": [ 3005 { 3006 "boot": True, 3007 "initializeParams": {"sourceImage": source_image}, 3008 "autoDelete": True, 3009 } 3010 ], 3011 "metadata": { 3012 "items": [{"key": "startup-script", "value": startup_script}] 3013 }, 3014 }, 3015 } 3016 3017 logger.debug("Sending GCP request with body=%s", config) 3018 result = ( 3019 gcp.compute.instanceTemplates() 3020 .insert(project=gcp.project, body=config) 3021 .execute(num_retries=_GCP_API_RETRIES) 3022 ) 3023 wait_for_global_operation(gcp, result["name"]) 3024 gcp.instance_template = GcpResource(config["name"], result["targetLink"]) 3025 3026 3027def add_instance_group(gcp, zone, name, size): 3028 config = { 3029 "name": name, 3030 "instanceTemplate": gcp.instance_template.url, 3031 "targetSize": size, 3032 "namedPorts": [{"name": "grpc", "port": gcp.service_port}], 3033 } 3034 3035 logger.debug("Sending GCP request with body=%s", config) 3036 result = ( 3037 gcp.compute.instanceGroupManagers() 3038 .insert(project=gcp.project, zone=zone, body=config) 3039 .execute(num_retries=_GCP_API_RETRIES) 3040 ) 3041 wait_for_zone_operation(gcp, zone, result["name"]) 3042 result = ( 3043 gcp.compute.instanceGroupManagers() 3044 .get( 3045 project=gcp.project, zone=zone, instanceGroupManager=config["name"] 3046 ) 3047 .execute(num_retries=_GCP_API_RETRIES) 3048 ) 3049 instance_group = InstanceGroup( 3050 config["name"], result["instanceGroup"], zone 3051 ) 3052 gcp.instance_groups.append(instance_group) 3053 wait_for_instance_group_to_reach_expected_size( 3054 gcp, instance_group, size, _WAIT_FOR_OPERATION_SEC 3055 ) 3056 return instance_group 3057 3058 3059def create_health_check(gcp, name): 3060 if gcp.alpha_compute: 3061 config = { 3062 "name": name, 3063 "type": "GRPC", 3064 "grpcHealthCheck": {"portSpecification": "USE_SERVING_PORT"}, 3065 } 3066 compute_to_use = gcp.alpha_compute 3067 else: 3068 config = { 3069 "name": name, 3070 "type": "TCP", 3071 "tcpHealthCheck": {"portName": "grpc"}, 3072 } 3073 compute_to_use = gcp.compute 3074 logger.debug("Sending GCP request with body=%s", config) 3075 result = ( 3076 compute_to_use.healthChecks() 3077 .insert(project=gcp.project, body=config) 3078 .execute(num_retries=_GCP_API_RETRIES) 3079 ) 3080 wait_for_global_operation(gcp, result["name"]) 3081 gcp.health_check = GcpResource(config["name"], result["targetLink"]) 3082 3083 3084def create_health_check_firewall_rule(gcp, name): 3085 config = { 3086 "name": name, 3087 "direction": "INGRESS", 3088 "allowed": [{"IPProtocol": "tcp"}], 3089 "sourceRanges": ["35.191.0.0/16", "130.211.0.0/22"], 3090 "targetTags": ["allow-health-checks"], 3091 } 3092 logger.debug("Sending GCP request with body=%s", config) 3093 result = ( 3094 gcp.compute.firewalls() 3095 .insert(project=gcp.project, body=config) 3096 .execute(num_retries=_GCP_API_RETRIES) 3097 ) 3098 wait_for_global_operation(gcp, result["name"]) 3099 gcp.health_check_firewall_rule = GcpResource( 3100 config["name"], result["targetLink"] 3101 ) 3102 3103 3104def add_backend_service(gcp, name): 3105 if gcp.alpha_compute: 3106 protocol = "GRPC" 3107 compute_to_use = gcp.alpha_compute 3108 else: 3109 protocol = "HTTP2" 3110 compute_to_use = gcp.compute 3111 config = { 3112 "name": name, 3113 "loadBalancingScheme": "INTERNAL_SELF_MANAGED", 3114 "healthChecks": [gcp.health_check.url], 3115 "portName": "grpc", 3116 "protocol": protocol, 3117 } 3118 logger.debug("Sending GCP request with body=%s", config) 3119 result = ( 3120 compute_to_use.backendServices() 3121 .insert(project=gcp.project, body=config) 3122 .execute(num_retries=_GCP_API_RETRIES) 3123 ) 3124 wait_for_global_operation(gcp, result["name"]) 3125 backend_service = GcpResource(config["name"], result["targetLink"]) 3126 gcp.backend_services.append(backend_service) 3127 return backend_service 3128 3129 3130def create_url_map(gcp, name, backend_service, host_name): 3131 config = { 3132 "name": name, 3133 "defaultService": backend_service.url, 3134 "pathMatchers": [ 3135 { 3136 "name": _PATH_MATCHER_NAME, 3137 "defaultService": backend_service.url, 3138 } 3139 ], 3140 "hostRules": [ 3141 {"hosts": [host_name], "pathMatcher": _PATH_MATCHER_NAME} 3142 ], 3143 } 3144 logger.debug("Sending GCP request with body=%s", config) 3145 result = ( 3146 gcp.compute.urlMaps() 3147 .insert(project=gcp.project, body=config) 3148 .execute(num_retries=_GCP_API_RETRIES) 3149 ) 3150 wait_for_global_operation(gcp, result["name"]) 3151 url_map = GcpResource(config["name"], result["targetLink"]) 3152 gcp.url_maps.append(url_map) 3153 return url_map 3154 3155 3156def patch_url_map_host_rule_with_port(gcp, name, backend_service, host_name): 3157 config = { 3158 "hostRules": [ 3159 { 3160 "hosts": ["%s:%d" % (host_name, gcp.service_port)], 3161 "pathMatcher": _PATH_MATCHER_NAME, 3162 } 3163 ] 3164 } 3165 logger.debug("Sending GCP request with body=%s", config) 3166 result = ( 3167 gcp.compute.urlMaps() 3168 .patch(project=gcp.project, urlMap=name, body=config) 3169 .execute(num_retries=_GCP_API_RETRIES) 3170 ) 3171 wait_for_global_operation(gcp, result["name"]) 3172 3173 3174def create_target_proxy(gcp, name, validate_for_proxyless=True, url_map=None): 3175 if url_map: 3176 arg_url_map_url = url_map.url 3177 else: 3178 arg_url_map_url = gcp.url_maps[0].url 3179 if gcp.alpha_compute: 3180 config = { 3181 "name": name, 3182 "url_map": arg_url_map_url, 3183 "validate_for_proxyless": validate_for_proxyless, 3184 } 3185 logger.debug("Sending GCP request with body=%s", config) 3186 result = ( 3187 gcp.alpha_compute.targetGrpcProxies() 3188 .insert(project=gcp.project, body=config) 3189 .execute(num_retries=_GCP_API_RETRIES) 3190 ) 3191 else: 3192 config = { 3193 "name": name, 3194 "url_map": arg_url_map_url, 3195 } 3196 logger.debug("Sending GCP request with body=%s", config) 3197 result = ( 3198 gcp.compute.targetHttpProxies() 3199 .insert(project=gcp.project, body=config) 3200 .execute(num_retries=_GCP_API_RETRIES) 3201 ) 3202 wait_for_global_operation(gcp, result["name"]) 3203 target_proxy = GcpResource(config["name"], result["targetLink"]) 3204 gcp.target_proxies.append(target_proxy) 3205 return target_proxy 3206 3207 3208def create_global_forwarding_rule( 3209 gcp, 3210 name, 3211 potential_ports, 3212 potential_ip_addresses=["0.0.0.0"], 3213 target_proxy=None, 3214): 3215 if target_proxy: 3216 arg_target_proxy_url = target_proxy.url 3217 else: 3218 arg_target_proxy_url = gcp.target_proxies[0].url 3219 if gcp.alpha_compute: 3220 compute_to_use = gcp.alpha_compute 3221 else: 3222 compute_to_use = gcp.compute 3223 for port in potential_ports: 3224 for ip_address in potential_ip_addresses: 3225 try: 3226 config = { 3227 "name": name, 3228 "loadBalancingScheme": "INTERNAL_SELF_MANAGED", 3229 "portRange": str(port), 3230 "IPAddress": ip_address, 3231 "network": args.network, 3232 "target": arg_target_proxy_url, 3233 } 3234 logger.debug("Sending GCP request with body=%s", config) 3235 result = ( 3236 compute_to_use.globalForwardingRules() 3237 .insert(project=gcp.project, body=config) 3238 .execute(num_retries=_GCP_API_RETRIES) 3239 ) 3240 wait_for_global_operation(gcp, result["name"]) 3241 global_forwarding_rule = GcpResource( 3242 config["name"], result["targetLink"] 3243 ) 3244 gcp.global_forwarding_rules.append(global_forwarding_rule) 3245 gcp.service_port = port 3246 return 3247 except googleapiclient.errors.HttpError as http_error: 3248 logger.warning( 3249 "Got error %s when attempting to create forwarding rule to " 3250 "%s:%d. Retrying with another port." 3251 % (http_error, ip_address, port) 3252 ) 3253 3254 3255def get_health_check(gcp, health_check_name): 3256 try: 3257 result = ( 3258 gcp.compute.healthChecks() 3259 .get(project=gcp.project, healthCheck=health_check_name) 3260 .execute() 3261 ) 3262 gcp.health_check = GcpResource(health_check_name, result["selfLink"]) 3263 except Exception as e: 3264 gcp.errors.append(e) 3265 gcp.health_check = GcpResource(health_check_name, None) 3266 3267 3268def get_health_check_firewall_rule(gcp, firewall_name): 3269 try: 3270 result = ( 3271 gcp.compute.firewalls() 3272 .get(project=gcp.project, firewall=firewall_name) 3273 .execute() 3274 ) 3275 gcp.health_check_firewall_rule = GcpResource( 3276 firewall_name, result["selfLink"] 3277 ) 3278 except Exception as e: 3279 gcp.errors.append(e) 3280 gcp.health_check_firewall_rule = GcpResource(firewall_name, None) 3281 3282 3283def get_backend_service(gcp, backend_service_name, record_error=True): 3284 try: 3285 result = ( 3286 gcp.compute.backendServices() 3287 .get(project=gcp.project, backendService=backend_service_name) 3288 .execute() 3289 ) 3290 backend_service = GcpResource(backend_service_name, result["selfLink"]) 3291 except Exception as e: 3292 if record_error: 3293 gcp.errors.append(e) 3294 backend_service = GcpResource(backend_service_name, None) 3295 gcp.backend_services.append(backend_service) 3296 return backend_service 3297 3298 3299def get_url_map(gcp, url_map_name, record_error=True): 3300 try: 3301 result = ( 3302 gcp.compute.urlMaps() 3303 .get(project=gcp.project, urlMap=url_map_name) 3304 .execute() 3305 ) 3306 url_map = GcpResource(url_map_name, result["selfLink"]) 3307 gcp.url_maps.append(url_map) 3308 except Exception as e: 3309 if record_error: 3310 gcp.errors.append(e) 3311 3312 3313def get_target_proxy(gcp, target_proxy_name, record_error=True): 3314 try: 3315 if gcp.alpha_compute: 3316 result = ( 3317 gcp.alpha_compute.targetGrpcProxies() 3318 .get(project=gcp.project, targetGrpcProxy=target_proxy_name) 3319 .execute() 3320 ) 3321 else: 3322 result = ( 3323 gcp.compute.targetHttpProxies() 3324 .get(project=gcp.project, targetHttpProxy=target_proxy_name) 3325 .execute() 3326 ) 3327 target_proxy = GcpResource(target_proxy_name, result["selfLink"]) 3328 gcp.target_proxies.append(target_proxy) 3329 except Exception as e: 3330 if record_error: 3331 gcp.errors.append(e) 3332 3333 3334def get_global_forwarding_rule(gcp, forwarding_rule_name, record_error=True): 3335 try: 3336 result = ( 3337 gcp.compute.globalForwardingRules() 3338 .get(project=gcp.project, forwardingRule=forwarding_rule_name) 3339 .execute() 3340 ) 3341 global_forwarding_rule = GcpResource( 3342 forwarding_rule_name, result["selfLink"] 3343 ) 3344 gcp.global_forwarding_rules.append(global_forwarding_rule) 3345 except Exception as e: 3346 if record_error: 3347 gcp.errors.append(e) 3348 3349 3350def get_instance_template(gcp, template_name): 3351 try: 3352 result = ( 3353 gcp.compute.instanceTemplates() 3354 .get(project=gcp.project, instanceTemplate=template_name) 3355 .execute() 3356 ) 3357 gcp.instance_template = GcpResource(template_name, result["selfLink"]) 3358 except Exception as e: 3359 gcp.errors.append(e) 3360 gcp.instance_template = GcpResource(template_name, None) 3361 3362 3363def get_instance_group(gcp, zone, instance_group_name): 3364 try: 3365 result = ( 3366 gcp.compute.instanceGroups() 3367 .get( 3368 project=gcp.project, 3369 zone=zone, 3370 instanceGroup=instance_group_name, 3371 ) 3372 .execute() 3373 ) 3374 gcp.service_port = result["namedPorts"][0]["port"] 3375 instance_group = InstanceGroup( 3376 instance_group_name, result["selfLink"], zone 3377 ) 3378 except Exception as e: 3379 gcp.errors.append(e) 3380 instance_group = InstanceGroup(instance_group_name, None, zone) 3381 gcp.instance_groups.append(instance_group) 3382 return instance_group 3383 3384 3385def delete_global_forwarding_rule(gcp, forwarding_rule_to_delete=None): 3386 if not forwarding_rule_to_delete: 3387 return 3388 try: 3389 logger.debug( 3390 "Deleting forwarding rule %s", forwarding_rule_to_delete.name 3391 ) 3392 result = ( 3393 gcp.compute.globalForwardingRules() 3394 .delete( 3395 project=gcp.project, 3396 forwardingRule=forwarding_rule_to_delete.name, 3397 ) 3398 .execute(num_retries=_GCP_API_RETRIES) 3399 ) 3400 wait_for_global_operation(gcp, result["name"]) 3401 if forwarding_rule_to_delete in gcp.global_forwarding_rules: 3402 gcp.global_forwarding_rules.remove(forwarding_rule_to_delete) 3403 else: 3404 logger.debug( 3405 ( 3406 "Forwarding rule %s does not exist in" 3407 " gcp.global_forwarding_rules" 3408 ), 3409 forwarding_rule_to_delete.name, 3410 ) 3411 except googleapiclient.errors.HttpError as http_error: 3412 logger.info("Delete failed: %s", http_error) 3413 3414 3415def delete_global_forwarding_rules(gcp): 3416 forwarding_rules_to_delete = gcp.global_forwarding_rules.copy() 3417 for forwarding_rule in forwarding_rules_to_delete: 3418 delete_global_forwarding_rule(gcp, forwarding_rule) 3419 3420 3421def delete_target_proxy(gcp, proxy_to_delete=None): 3422 if not proxy_to_delete: 3423 return 3424 try: 3425 if gcp.alpha_compute: 3426 logger.debug("Deleting grpc proxy %s", proxy_to_delete.name) 3427 result = ( 3428 gcp.alpha_compute.targetGrpcProxies() 3429 .delete( 3430 project=gcp.project, targetGrpcProxy=proxy_to_delete.name 3431 ) 3432 .execute(num_retries=_GCP_API_RETRIES) 3433 ) 3434 else: 3435 logger.debug("Deleting http proxy %s", proxy_to_delete.name) 3436 result = ( 3437 gcp.compute.targetHttpProxies() 3438 .delete( 3439 project=gcp.project, targetHttpProxy=proxy_to_delete.name 3440 ) 3441 .execute(num_retries=_GCP_API_RETRIES) 3442 ) 3443 wait_for_global_operation(gcp, result["name"]) 3444 if proxy_to_delete in gcp.target_proxies: 3445 gcp.target_proxies.remove(proxy_to_delete) 3446 else: 3447 logger.debug( 3448 "Gcp proxy %s does not exist in gcp.target_proxies", 3449 proxy_to_delete.name, 3450 ) 3451 except googleapiclient.errors.HttpError as http_error: 3452 logger.info("Delete failed: %s", http_error) 3453 3454 3455def delete_target_proxies(gcp): 3456 target_proxies_to_delete = gcp.target_proxies.copy() 3457 for target_proxy in target_proxies_to_delete: 3458 delete_target_proxy(gcp, target_proxy) 3459 3460 3461def delete_url_map(gcp, url_map_to_delete=None): 3462 if not url_map_to_delete: 3463 return 3464 try: 3465 logger.debug("Deleting url map %s", url_map_to_delete.name) 3466 result = ( 3467 gcp.compute.urlMaps() 3468 .delete(project=gcp.project, urlMap=url_map_to_delete.name) 3469 .execute(num_retries=_GCP_API_RETRIES) 3470 ) 3471 wait_for_global_operation(gcp, result["name"]) 3472 if url_map_to_delete in gcp.url_maps: 3473 gcp.url_maps.remove(url_map_to_delete) 3474 else: 3475 logger.debug( 3476 "Url map %s does not exist in gcp.url_maps", 3477 url_map_to_delete.name, 3478 ) 3479 except googleapiclient.errors.HttpError as http_error: 3480 logger.info("Delete failed: %s", http_error) 3481 3482 3483def delete_url_maps(gcp): 3484 url_maps_to_delete = gcp.url_maps.copy() 3485 for url_map in url_maps_to_delete: 3486 delete_url_map(gcp, url_map) 3487 3488 3489def delete_backend_service(gcp, backend_service): 3490 try: 3491 logger.debug("Deleting backend service %s", backend_service.name) 3492 result = ( 3493 gcp.compute.backendServices() 3494 .delete(project=gcp.project, backendService=backend_service.name) 3495 .execute(num_retries=_GCP_API_RETRIES) 3496 ) 3497 wait_for_global_operation(gcp, result["name"]) 3498 except googleapiclient.errors.HttpError as http_error: 3499 logger.info("Delete failed: %s", http_error) 3500 3501 3502def delete_backend_services(gcp): 3503 for backend_service in gcp.backend_services: 3504 delete_backend_service(gcp, backend_service) 3505 3506 3507def delete_firewall(gcp): 3508 try: 3509 logger.debug( 3510 "Deleting firewall %s", gcp.health_check_firewall_rule.name 3511 ) 3512 result = ( 3513 gcp.compute.firewalls() 3514 .delete( 3515 project=gcp.project, 3516 firewall=gcp.health_check_firewall_rule.name, 3517 ) 3518 .execute(num_retries=_GCP_API_RETRIES) 3519 ) 3520 wait_for_global_operation(gcp, result["name"]) 3521 except googleapiclient.errors.HttpError as http_error: 3522 logger.info("Delete failed: %s", http_error) 3523 3524 3525def delete_health_check(gcp): 3526 try: 3527 logger.debug("Deleting health check %s", gcp.health_check.name) 3528 result = ( 3529 gcp.compute.healthChecks() 3530 .delete(project=gcp.project, healthCheck=gcp.health_check.name) 3531 .execute(num_retries=_GCP_API_RETRIES) 3532 ) 3533 wait_for_global_operation(gcp, result["name"]) 3534 except googleapiclient.errors.HttpError as http_error: 3535 logger.info("Delete failed: %s", http_error) 3536 3537 3538def delete_instance_groups(gcp): 3539 for instance_group in gcp.instance_groups: 3540 try: 3541 logger.debug( 3542 "Deleting instance group %s %s", 3543 instance_group.name, 3544 instance_group.zone, 3545 ) 3546 result = ( 3547 gcp.compute.instanceGroupManagers() 3548 .delete( 3549 project=gcp.project, 3550 zone=instance_group.zone, 3551 instanceGroupManager=instance_group.name, 3552 ) 3553 .execute(num_retries=_GCP_API_RETRIES) 3554 ) 3555 wait_for_zone_operation( 3556 gcp, 3557 instance_group.zone, 3558 result["name"], 3559 timeout_sec=_WAIT_FOR_BACKEND_SEC, 3560 ) 3561 except googleapiclient.errors.HttpError as http_error: 3562 logger.info("Delete failed: %s", http_error) 3563 3564 3565def delete_instance_template(gcp): 3566 try: 3567 logger.debug( 3568 "Deleting instance template %s", gcp.instance_template.name 3569 ) 3570 result = ( 3571 gcp.compute.instanceTemplates() 3572 .delete( 3573 project=gcp.project, instanceTemplate=gcp.instance_template.name 3574 ) 3575 .execute(num_retries=_GCP_API_RETRIES) 3576 ) 3577 wait_for_global_operation(gcp, result["name"]) 3578 except googleapiclient.errors.HttpError as http_error: 3579 logger.info("Delete failed: %s", http_error) 3580 3581 3582def patch_backend_service( 3583 gcp, 3584 backend_service, 3585 instance_groups, 3586 balancing_mode="UTILIZATION", 3587 max_rate=1, 3588 circuit_breakers=None, 3589): 3590 if gcp.alpha_compute: 3591 compute_to_use = gcp.alpha_compute 3592 else: 3593 compute_to_use = gcp.compute 3594 config = { 3595 "backends": [ 3596 { 3597 "group": instance_group.url, 3598 "balancingMode": balancing_mode, 3599 "maxRate": max_rate if balancing_mode == "RATE" else None, 3600 } 3601 for instance_group in instance_groups 3602 ], 3603 "circuitBreakers": circuit_breakers, 3604 } 3605 logger.debug("Sending GCP request with body=%s", config) 3606 result = ( 3607 compute_to_use.backendServices() 3608 .patch( 3609 project=gcp.project, 3610 backendService=backend_service.name, 3611 body=config, 3612 ) 3613 .execute(num_retries=_GCP_API_RETRIES) 3614 ) 3615 wait_for_global_operation( 3616 gcp, result["name"], timeout_sec=_WAIT_FOR_BACKEND_SEC 3617 ) 3618 3619 3620def resize_instance_group( 3621 gcp, instance_group, new_size, timeout_sec=_WAIT_FOR_OPERATION_SEC 3622): 3623 result = ( 3624 gcp.compute.instanceGroupManagers() 3625 .resize( 3626 project=gcp.project, 3627 zone=instance_group.zone, 3628 instanceGroupManager=instance_group.name, 3629 size=new_size, 3630 ) 3631 .execute(num_retries=_GCP_API_RETRIES) 3632 ) 3633 wait_for_zone_operation( 3634 gcp, instance_group.zone, result["name"], timeout_sec=360 3635 ) 3636 wait_for_instance_group_to_reach_expected_size( 3637 gcp, instance_group, new_size, timeout_sec 3638 ) 3639 3640 3641def patch_url_map_backend_service( 3642 gcp, 3643 backend_service=None, 3644 services_with_weights=None, 3645 route_rules=None, 3646 url_map=None, 3647): 3648 if url_map: 3649 url_map_name = url_map.name 3650 else: 3651 url_map_name = gcp.url_maps[0].name 3652 """change url_map's backend service 3653 3654 Only one of backend_service and service_with_weights can be not None. 3655 """ 3656 if gcp.alpha_compute: 3657 compute_to_use = gcp.alpha_compute 3658 else: 3659 compute_to_use = gcp.compute 3660 3661 if backend_service and services_with_weights: 3662 raise ValueError( 3663 "both backend_service and service_with_weights are not None." 3664 ) 3665 3666 default_service = backend_service.url if backend_service else None 3667 default_route_action = ( 3668 { 3669 "weightedBackendServices": [ 3670 { 3671 "backendService": service.url, 3672 "weight": w, 3673 } 3674 for service, w in list(services_with_weights.items()) 3675 ] 3676 } 3677 if services_with_weights 3678 else None 3679 ) 3680 3681 config = { 3682 "pathMatchers": [ 3683 { 3684 "name": _PATH_MATCHER_NAME, 3685 "defaultService": default_service, 3686 "defaultRouteAction": default_route_action, 3687 "routeRules": route_rules, 3688 } 3689 ] 3690 } 3691 logger.debug("Sending GCP request with body=%s", config) 3692 result = ( 3693 compute_to_use.urlMaps() 3694 .patch(project=gcp.project, urlMap=url_map_name, body=config) 3695 .execute(num_retries=_GCP_API_RETRIES) 3696 ) 3697 wait_for_global_operation(gcp, result["name"]) 3698 3699 3700def wait_for_instance_group_to_reach_expected_size( 3701 gcp, instance_group, expected_size, timeout_sec 3702): 3703 start_time = time.time() 3704 while True: 3705 current_size = len(get_instance_names(gcp, instance_group)) 3706 if current_size == expected_size: 3707 break 3708 if time.time() - start_time > timeout_sec: 3709 raise Exception( 3710 "Instance group had expected size %d but actual size %d" 3711 % (expected_size, current_size) 3712 ) 3713 time.sleep(2) 3714 3715 3716def wait_for_global_operation( 3717 gcp, operation, timeout_sec=_WAIT_FOR_OPERATION_SEC 3718): 3719 start_time = time.time() 3720 while time.time() - start_time <= timeout_sec: 3721 result = ( 3722 gcp.compute.globalOperations() 3723 .get(project=gcp.project, operation=operation) 3724 .execute(num_retries=_GCP_API_RETRIES) 3725 ) 3726 if result["status"] == "DONE": 3727 if "error" in result: 3728 raise Exception(result["error"]) 3729 return 3730 time.sleep(2) 3731 raise Exception( 3732 "Operation %s did not complete within %d" % (operation, timeout_sec) 3733 ) 3734 3735 3736def wait_for_zone_operation( 3737 gcp, zone, operation, timeout_sec=_WAIT_FOR_OPERATION_SEC 3738): 3739 start_time = time.time() 3740 while time.time() - start_time <= timeout_sec: 3741 result = ( 3742 gcp.compute.zoneOperations() 3743 .get(project=gcp.project, zone=zone, operation=operation) 3744 .execute(num_retries=_GCP_API_RETRIES) 3745 ) 3746 if result["status"] == "DONE": 3747 if "error" in result: 3748 raise Exception(result["error"]) 3749 return 3750 time.sleep(2) 3751 raise Exception( 3752 "Operation %s did not complete within %d" % (operation, timeout_sec) 3753 ) 3754 3755 3756def wait_for_healthy_backends( 3757 gcp, backend_service, instance_group, timeout_sec=_WAIT_FOR_BACKEND_SEC 3758): 3759 start_time = time.time() 3760 config = {"group": instance_group.url} 3761 instance_names = get_instance_names(gcp, instance_group) 3762 expected_size = len(instance_names) 3763 while time.time() - start_time <= timeout_sec: 3764 for instance_name in instance_names: 3765 try: 3766 status = get_serving_status(instance_name, gcp.service_port) 3767 logger.info( 3768 "serving status response from %s: %s", instance_name, status 3769 ) 3770 except grpc.RpcError as rpc_error: 3771 logger.info( 3772 "checking serving status of %s failed: %s", 3773 instance_name, 3774 rpc_error, 3775 ) 3776 result = ( 3777 gcp.compute.backendServices() 3778 .getHealth( 3779 project=gcp.project, 3780 backendService=backend_service.name, 3781 body=config, 3782 ) 3783 .execute(num_retries=_GCP_API_RETRIES) 3784 ) 3785 if "healthStatus" in result: 3786 logger.info("received GCP healthStatus: %s", result["healthStatus"]) 3787 healthy = True 3788 for instance in result["healthStatus"]: 3789 if instance["healthState"] != "HEALTHY": 3790 healthy = False 3791 break 3792 if healthy and expected_size == len(result["healthStatus"]): 3793 return 3794 else: 3795 logger.info("no healthStatus received from GCP") 3796 time.sleep(5) 3797 raise Exception( 3798 "Not all backends became healthy within %d seconds: %s" 3799 % (timeout_sec, result) 3800 ) 3801 3802 3803def get_instance_names(gcp, instance_group): 3804 instance_names = [] 3805 result = ( 3806 gcp.compute.instanceGroups() 3807 .listInstances( 3808 project=gcp.project, 3809 zone=instance_group.zone, 3810 instanceGroup=instance_group.name, 3811 body={"instanceState": "ALL"}, 3812 ) 3813 .execute(num_retries=_GCP_API_RETRIES) 3814 ) 3815 if "items" not in result: 3816 return [] 3817 for item in result["items"]: 3818 # listInstances() returns the full URL of the instance, which ends with 3819 # the instance name. compute.instances().get() requires using the 3820 # instance name (not the full URL) to look up instance details, so we 3821 # just extract the name manually. 3822 instance_name = item["instance"].split("/")[-1] 3823 instance_names.append(instance_name) 3824 logger.info("retrieved instance names: %s", instance_names) 3825 return instance_names 3826 3827 3828def clean_up(gcp): 3829 delete_global_forwarding_rules(gcp) 3830 delete_target_proxies(gcp) 3831 delete_url_maps(gcp) 3832 delete_backend_services(gcp) 3833 if gcp.health_check_firewall_rule: 3834 delete_firewall(gcp) 3835 if gcp.health_check: 3836 delete_health_check(gcp) 3837 delete_instance_groups(gcp) 3838 if gcp.instance_template: 3839 delete_instance_template(gcp) 3840 3841 3842class InstanceGroup(object): 3843 def __init__(self, name, url, zone): 3844 self.name = name 3845 self.url = url 3846 self.zone = zone 3847 3848 3849class GcpResource(object): 3850 def __init__(self, name, url): 3851 self.name = name 3852 self.url = url 3853 3854 3855class GcpState(object): 3856 def __init__(self, compute, alpha_compute, project, project_num): 3857 self.compute = compute 3858 self.alpha_compute = alpha_compute 3859 self.project = project 3860 self.project_num = project_num 3861 self.health_check = None 3862 self.health_check_firewall_rule = None 3863 self.backend_services = [] 3864 self.url_maps = [] 3865 self.target_proxies = [] 3866 self.global_forwarding_rules = [] 3867 self.service_port = None 3868 self.instance_template = None 3869 self.instance_groups = [] 3870 self.errors = [] 3871 3872 3873logging.debug( 3874 "script start time: %s", 3875 datetime.datetime.now(datetime.timezone.utc) 3876 .astimezone() 3877 .strftime("%Y-%m-%dT%H:%M:%S %Z"), 3878) 3879logging.debug( 3880 "logging local timezone: %s", 3881 datetime.datetime.now(datetime.timezone.utc).astimezone().tzinfo, 3882) 3883alpha_compute = None 3884if args.compute_discovery_document: 3885 with open(args.compute_discovery_document, "r") as discovery_doc: 3886 compute = googleapiclient.discovery.build_from_document( 3887 discovery_doc.read() 3888 ) 3889 if not args.only_stable_gcp_apis and args.alpha_compute_discovery_document: 3890 with open(args.alpha_compute_discovery_document, "r") as discovery_doc: 3891 alpha_compute = googleapiclient.discovery.build_from_document( 3892 discovery_doc.read() 3893 ) 3894else: 3895 compute = googleapiclient.discovery.build("compute", "v1") 3896 if not args.only_stable_gcp_apis: 3897 alpha_compute = googleapiclient.discovery.build("compute", "alpha") 3898 3899test_results = {} 3900failed_tests = [] 3901try: 3902 gcp = GcpState(compute, alpha_compute, args.project_id, args.project_num) 3903 gcp_suffix = args.gcp_suffix 3904 health_check_name = _BASE_HEALTH_CHECK_NAME + gcp_suffix 3905 if not args.use_existing_gcp_resources: 3906 if args.keep_gcp_resources: 3907 # Auto-generating a unique suffix in case of conflict should not be 3908 # combined with --keep_gcp_resources, as the suffix actually used 3909 # for GCP resources will not match the provided --gcp_suffix value. 3910 num_attempts = 1 3911 else: 3912 num_attempts = 5 3913 for i in range(num_attempts): 3914 try: 3915 logger.info("Using GCP suffix %s", gcp_suffix) 3916 create_health_check(gcp, health_check_name) 3917 break 3918 except googleapiclient.errors.HttpError as http_error: 3919 gcp_suffix = "%s-%04d" % (gcp_suffix, random.randint(0, 9999)) 3920 health_check_name = _BASE_HEALTH_CHECK_NAME + gcp_suffix 3921 logger.exception("HttpError when creating health check") 3922 if gcp.health_check is None: 3923 raise Exception( 3924 "Failed to create health check name after %d attempts" 3925 % num_attempts 3926 ) 3927 firewall_name = _BASE_FIREWALL_RULE_NAME + gcp_suffix 3928 backend_service_name = _BASE_BACKEND_SERVICE_NAME + gcp_suffix 3929 alternate_backend_service_name = ( 3930 _BASE_BACKEND_SERVICE_NAME + "-alternate" + gcp_suffix 3931 ) 3932 extra_backend_service_name = ( 3933 _BASE_BACKEND_SERVICE_NAME + "-extra" + gcp_suffix 3934 ) 3935 more_extra_backend_service_name = ( 3936 _BASE_BACKEND_SERVICE_NAME + "-more-extra" + gcp_suffix 3937 ) 3938 url_map_name = _BASE_URL_MAP_NAME + gcp_suffix 3939 url_map_name_2 = url_map_name + "2" 3940 service_host_name = _BASE_SERVICE_HOST + gcp_suffix 3941 target_proxy_name = _BASE_TARGET_PROXY_NAME + gcp_suffix 3942 target_proxy_name_2 = target_proxy_name + "2" 3943 forwarding_rule_name = _BASE_FORWARDING_RULE_NAME + gcp_suffix 3944 forwarding_rule_name_2 = forwarding_rule_name + "2" 3945 template_name = _BASE_TEMPLATE_NAME + gcp_suffix 3946 instance_group_name = _BASE_INSTANCE_GROUP_NAME + gcp_suffix 3947 same_zone_instance_group_name = ( 3948 _BASE_INSTANCE_GROUP_NAME + "-same-zone" + gcp_suffix 3949 ) 3950 secondary_zone_instance_group_name = ( 3951 _BASE_INSTANCE_GROUP_NAME + "-secondary-zone" + gcp_suffix 3952 ) 3953 potential_service_ports = list(args.service_port_range) 3954 random.shuffle(potential_service_ports) 3955 if args.use_existing_gcp_resources: 3956 logger.info("Reusing existing GCP resources") 3957 get_health_check(gcp, health_check_name) 3958 get_health_check_firewall_rule(gcp, firewall_name) 3959 backend_service = get_backend_service(gcp, backend_service_name) 3960 alternate_backend_service = get_backend_service( 3961 gcp, alternate_backend_service_name 3962 ) 3963 extra_backend_service = get_backend_service( 3964 gcp, extra_backend_service_name, record_error=False 3965 ) 3966 more_extra_backend_service = get_backend_service( 3967 gcp, more_extra_backend_service_name, record_error=False 3968 ) 3969 get_url_map(gcp, url_map_name) 3970 get_target_proxy(gcp, target_proxy_name) 3971 get_global_forwarding_rule(gcp, forwarding_rule_name) 3972 get_url_map(gcp, url_map_name_2, record_error=False) 3973 get_target_proxy(gcp, target_proxy_name_2, record_error=False) 3974 get_global_forwarding_rule( 3975 gcp, forwarding_rule_name_2, record_error=False 3976 ) 3977 get_instance_template(gcp, template_name) 3978 instance_group = get_instance_group(gcp, args.zone, instance_group_name) 3979 same_zone_instance_group = get_instance_group( 3980 gcp, args.zone, same_zone_instance_group_name 3981 ) 3982 secondary_zone_instance_group = get_instance_group( 3983 gcp, args.secondary_zone, secondary_zone_instance_group_name 3984 ) 3985 if gcp.errors: 3986 raise Exception(gcp.errors) 3987 else: 3988 create_health_check_firewall_rule(gcp, firewall_name) 3989 backend_service = add_backend_service(gcp, backend_service_name) 3990 alternate_backend_service = add_backend_service( 3991 gcp, alternate_backend_service_name 3992 ) 3993 create_url_map(gcp, url_map_name, backend_service, service_host_name) 3994 create_target_proxy(gcp, target_proxy_name) 3995 create_global_forwarding_rule( 3996 gcp, forwarding_rule_name, potential_service_ports 3997 ) 3998 if not gcp.service_port: 3999 raise Exception( 4000 "Failed to find a valid ip:port for the forwarding rule" 4001 ) 4002 if gcp.service_port != _DEFAULT_SERVICE_PORT: 4003 patch_url_map_host_rule_with_port( 4004 gcp, url_map_name, backend_service, service_host_name 4005 ) 4006 startup_script = get_startup_script( 4007 args.path_to_server_binary, gcp.service_port 4008 ) 4009 create_instance_template( 4010 gcp, 4011 template_name, 4012 args.network, 4013 args.source_image, 4014 args.machine_type, 4015 startup_script, 4016 ) 4017 instance_group = add_instance_group( 4018 gcp, args.zone, instance_group_name, _INSTANCE_GROUP_SIZE 4019 ) 4020 patch_backend_service(gcp, backend_service, [instance_group]) 4021 same_zone_instance_group = add_instance_group( 4022 gcp, args.zone, same_zone_instance_group_name, _INSTANCE_GROUP_SIZE 4023 ) 4024 secondary_zone_instance_group = add_instance_group( 4025 gcp, 4026 args.secondary_zone, 4027 secondary_zone_instance_group_name, 4028 _INSTANCE_GROUP_SIZE, 4029 ) 4030 4031 wait_for_healthy_backends(gcp, backend_service, instance_group) 4032 4033 if args.test_case: 4034 client_env = dict(os.environ) 4035 if original_grpc_trace: 4036 client_env["GRPC_TRACE"] = original_grpc_trace 4037 if original_grpc_verbosity: 4038 client_env["GRPC_VERBOSITY"] = original_grpc_verbosity 4039 bootstrap_server_features = [] 4040 4041 if gcp.service_port == _DEFAULT_SERVICE_PORT: 4042 server_uri = service_host_name 4043 else: 4044 server_uri = service_host_name + ":" + str(gcp.service_port) 4045 if args.xds_v3_support: 4046 client_env["GRPC_XDS_EXPERIMENTAL_V3_SUPPORT"] = "true" 4047 bootstrap_server_features.append("xds_v3") 4048 if args.bootstrap_file: 4049 bootstrap_path = os.path.abspath(args.bootstrap_file) 4050 else: 4051 with tempfile.NamedTemporaryFile(delete=False) as bootstrap_file: 4052 bootstrap_file.write( 4053 _BOOTSTRAP_TEMPLATE.format( 4054 node_id="projects/%s/networks/%s/nodes/%s" 4055 % ( 4056 gcp.project_num, 4057 args.network.split("/")[-1], 4058 uuid.uuid1(), 4059 ), 4060 server_features=json.dumps(bootstrap_server_features), 4061 ).encode("utf-8") 4062 ) 4063 bootstrap_path = bootstrap_file.name 4064 client_env["GRPC_XDS_BOOTSTRAP"] = bootstrap_path 4065 client_env["GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"] = "true" 4066 client_env["GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"] = "true" 4067 client_env["GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION"] = "true" 4068 for test_case in args.test_case: 4069 if test_case in _V3_TEST_CASES and not args.xds_v3_support: 4070 logger.info( 4071 "skipping test %s due to missing v3 support", test_case 4072 ) 4073 continue 4074 if test_case in _ALPHA_TEST_CASES and not gcp.alpha_compute: 4075 logger.info( 4076 "skipping test %s due to missing alpha support", test_case 4077 ) 4078 continue 4079 if ( 4080 test_case 4081 in [ 4082 "api_listener", 4083 "forwarding_rule_port_match", 4084 "forwarding_rule_default_port", 4085 ] 4086 and CLIENT_HOSTS 4087 ): 4088 logger.info( 4089 ( 4090 "skipping test %s because test configuration is" 4091 "not compatible with client processes on existing" 4092 "client hosts" 4093 ), 4094 test_case, 4095 ) 4096 continue 4097 if test_case == "forwarding_rule_default_port": 4098 server_uri = service_host_name 4099 result = jobset.JobResult() 4100 log_dir = os.path.join(_TEST_LOG_BASE_DIR, test_case) 4101 if not os.path.exists(log_dir): 4102 os.makedirs(log_dir) 4103 test_log_filename = os.path.join(log_dir, _SPONGE_LOG_NAME) 4104 test_log_file = open(test_log_filename, "w+") 4105 client_process = None 4106 4107 if test_case in _TESTS_TO_RUN_MULTIPLE_RPCS: 4108 rpcs_to_send = '--rpc="UnaryCall,EmptyCall"' 4109 else: 4110 rpcs_to_send = '--rpc="UnaryCall"' 4111 4112 if test_case in _TESTS_TO_SEND_METADATA: 4113 metadata_to_send = '--metadata="EmptyCall:{keyE}:{valueE},UnaryCall:{keyU}:{valueU},UnaryCall:{keyNU}:{valueNU}"'.format( 4114 keyE=_TEST_METADATA_KEY, 4115 valueE=_TEST_METADATA_VALUE_EMPTY, 4116 keyU=_TEST_METADATA_KEY, 4117 valueU=_TEST_METADATA_VALUE_UNARY, 4118 keyNU=_TEST_METADATA_NUMERIC_KEY, 4119 valueNU=_TEST_METADATA_NUMERIC_VALUE, 4120 ) 4121 else: 4122 # Setting the arg explicitly to empty with '--metadata=""' 4123 # makes C# client fail 4124 # (see https://github.com/commandlineparser/commandline/issues/412), 4125 # so instead we just rely on clients using the default when 4126 # metadata arg is not specified. 4127 metadata_to_send = "" 4128 4129 # TODO(ericgribkoff) Temporarily disable fail_on_failed_rpc checks 4130 # in the client. This means we will ignore intermittent RPC 4131 # failures (but this framework still checks that the final result 4132 # is as expected). 4133 # 4134 # Reason for disabling this is, the resources are shared by 4135 # multiple tests, and a change in previous test could be delayed 4136 # until the second test starts. The second test may see 4137 # intermittent failures because of that. 4138 # 4139 # A fix is to not share resources between tests (though that does 4140 # mean the tests will be significantly slower due to creating new 4141 # resources). 4142 fail_on_failed_rpc = "" 4143 4144 try: 4145 if not CLIENT_HOSTS: 4146 client_cmd_formatted = args.client_cmd.format( 4147 server_uri=server_uri, 4148 stats_port=args.stats_port, 4149 qps=args.qps, 4150 fail_on_failed_rpc=fail_on_failed_rpc, 4151 rpcs_to_send=rpcs_to_send, 4152 metadata_to_send=metadata_to_send, 4153 ) 4154 logger.debug("running client: %s", client_cmd_formatted) 4155 client_cmd = shlex.split(client_cmd_formatted) 4156 client_process = subprocess.Popen( 4157 client_cmd, 4158 env=client_env, 4159 stderr=subprocess.STDOUT, 4160 stdout=test_log_file, 4161 ) 4162 if test_case == "backends_restart": 4163 test_backends_restart(gcp, backend_service, instance_group) 4164 elif test_case == "change_backend_service": 4165 test_change_backend_service( 4166 gcp, 4167 backend_service, 4168 instance_group, 4169 alternate_backend_service, 4170 same_zone_instance_group, 4171 ) 4172 elif test_case == "gentle_failover": 4173 test_gentle_failover( 4174 gcp, 4175 backend_service, 4176 instance_group, 4177 secondary_zone_instance_group, 4178 ) 4179 elif test_case == "load_report_based_failover": 4180 test_load_report_based_failover( 4181 gcp, 4182 backend_service, 4183 instance_group, 4184 secondary_zone_instance_group, 4185 ) 4186 elif test_case == "ping_pong": 4187 test_ping_pong(gcp, backend_service, instance_group) 4188 elif test_case == "remove_instance_group": 4189 test_remove_instance_group( 4190 gcp, 4191 backend_service, 4192 instance_group, 4193 same_zone_instance_group, 4194 ) 4195 elif test_case == "round_robin": 4196 test_round_robin(gcp, backend_service, instance_group) 4197 elif ( 4198 test_case 4199 == "secondary_locality_gets_no_requests_on_partial_primary_failure" 4200 ): 4201 test_secondary_locality_gets_no_requests_on_partial_primary_failure( 4202 gcp, 4203 backend_service, 4204 instance_group, 4205 secondary_zone_instance_group, 4206 ) 4207 elif ( 4208 test_case 4209 == "secondary_locality_gets_requests_on_primary_failure" 4210 ): 4211 test_secondary_locality_gets_requests_on_primary_failure( 4212 gcp, 4213 backend_service, 4214 instance_group, 4215 secondary_zone_instance_group, 4216 ) 4217 elif test_case == "traffic_splitting": 4218 test_traffic_splitting( 4219 gcp, 4220 backend_service, 4221 instance_group, 4222 alternate_backend_service, 4223 same_zone_instance_group, 4224 ) 4225 elif test_case == "path_matching": 4226 test_path_matching( 4227 gcp, 4228 backend_service, 4229 instance_group, 4230 alternate_backend_service, 4231 same_zone_instance_group, 4232 ) 4233 elif test_case == "header_matching": 4234 test_header_matching( 4235 gcp, 4236 backend_service, 4237 instance_group, 4238 alternate_backend_service, 4239 same_zone_instance_group, 4240 ) 4241 elif test_case == "circuit_breaking": 4242 test_circuit_breaking( 4243 gcp, 4244 backend_service, 4245 instance_group, 4246 same_zone_instance_group, 4247 ) 4248 elif test_case == "timeout": 4249 test_timeout(gcp, backend_service, instance_group) 4250 elif test_case == "fault_injection": 4251 test_fault_injection(gcp, backend_service, instance_group) 4252 elif test_case == "api_listener": 4253 server_uri = test_api_listener( 4254 gcp, 4255 backend_service, 4256 instance_group, 4257 alternate_backend_service, 4258 ) 4259 elif test_case == "forwarding_rule_port_match": 4260 server_uri = test_forwarding_rule_port_match( 4261 gcp, backend_service, instance_group 4262 ) 4263 elif test_case == "forwarding_rule_default_port": 4264 server_uri = test_forwarding_rule_default_port( 4265 gcp, backend_service, instance_group 4266 ) 4267 elif test_case == "metadata_filter": 4268 test_metadata_filter( 4269 gcp, 4270 backend_service, 4271 instance_group, 4272 alternate_backend_service, 4273 same_zone_instance_group, 4274 ) 4275 elif test_case == "csds": 4276 test_csds(gcp, backend_service, instance_group, server_uri) 4277 else: 4278 logger.error("Unknown test case: %s", test_case) 4279 sys.exit(1) 4280 if client_process and client_process.poll() is not None: 4281 raise Exception( 4282 "Client process exited prematurely with exit code %d" 4283 % client_process.returncode 4284 ) 4285 result.state = "PASSED" 4286 result.returncode = 0 4287 except Exception as e: 4288 logger.exception("Test case %s failed", test_case) 4289 failed_tests.append(test_case) 4290 result.state = "FAILED" 4291 result.message = str(e) 4292 if args.halt_after_fail: 4293 # Stop the test suite if one case failed. 4294 raise 4295 finally: 4296 if client_process: 4297 if client_process.returncode: 4298 logger.info( 4299 "Client exited with code %d" 4300 % client_process.returncode 4301 ) 4302 else: 4303 client_process.terminate() 4304 test_log_file.close() 4305 # Workaround for Python 3, as report_utils will invoke decode() on 4306 # result.message, which has a default value of ''. 4307 result.message = result.message.encode("UTF-8") 4308 test_results[test_case] = [result] 4309 if args.log_client_output: 4310 logger.info("Client output:") 4311 with open(test_log_filename, "r") as client_output: 4312 logger.info(client_output.read()) 4313 if not os.path.exists(_TEST_LOG_BASE_DIR): 4314 os.makedirs(_TEST_LOG_BASE_DIR) 4315 report_utils.render_junit_xml_report( 4316 test_results, 4317 os.path.join(_TEST_LOG_BASE_DIR, _SPONGE_XML_NAME), 4318 suite_name="xds_tests", 4319 multi_target=True, 4320 ) 4321 if failed_tests: 4322 logger.error("Test case(s) %s failed", failed_tests) 4323 sys.exit(1) 4324finally: 4325 keep_resources = args.keep_gcp_resources 4326 if args.halt_after_fail and failed_tests: 4327 logger.info( 4328 "Halt after fail triggered, exiting without cleaning up resources" 4329 ) 4330 keep_resources = True 4331 if not keep_resources: 4332 logger.info("Cleaning up GCP resources. This may take some time.") 4333 clean_up(gcp) 4334