1#!/usr/bin/env python3 2# Copyright 2020 gRPC authors. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15"""Run xDS integration tests on GCP using Traffic Director.""" 16 17import argparse 18import datetime 19import json 20import logging 21import os 22import random 23import re 24import shlex 25import socket 26import subprocess 27import sys 28import tempfile 29import time 30import uuid 31 32from google.protobuf import json_format 33import googleapiclient.discovery 34import grpc 35from oauth2client.client import GoogleCredentials 36 37import python_utils.jobset as jobset 38import python_utils.report_utils as report_utils 39from src.proto.grpc.health.v1 import health_pb2 40from src.proto.grpc.health.v1 import health_pb2_grpc 41from src.proto.grpc.testing import empty_pb2 42from src.proto.grpc.testing import messages_pb2 43from src.proto.grpc.testing import test_pb2_grpc 44 45# Envoy protos provided by PyPI package xds-protos 46# Needs to import the generated Python file to load descriptors 47try: 48 from envoy.extensions.filters.common.fault.v3 import fault_pb2 49 from envoy.extensions.filters.http.fault.v3 import fault_pb2 50 from envoy.extensions.filters.http.router.v3 import router_pb2 51 from envoy.extensions.filters.network.http_connection_manager.v3 import \ 52 http_connection_manager_pb2 53 from envoy.service.status.v3 import csds_pb2 54 from envoy.service.status.v3 import csds_pb2_grpc 55except ImportError: 56 # These protos are required by CSDS test. We should not fail the entire 57 # script for one test case. 58 pass 59 60logger = logging.getLogger() 61console_handler = logging.StreamHandler() 62formatter = logging.Formatter(fmt='%(asctime)s: %(levelname)-8s %(message)s') 63console_handler.setFormatter(formatter) 64logger.handlers = [] 65logger.addHandler(console_handler) 66logger.setLevel(logging.WARNING) 67 68# Suppress excessive logs for gRPC Python 69original_grpc_trace = os.environ.pop('GRPC_TRACE', None) 70original_grpc_verbosity = os.environ.pop('GRPC_VERBOSITY', None) 71# Suppress not-essential logs for GCP clients 72logging.getLogger('google_auth_httplib2').setLevel(logging.WARNING) 73logging.getLogger('googleapiclient.discovery').setLevel(logging.WARNING) 74 75_TEST_CASES = [ 76 'backends_restart', 77 'change_backend_service', 78 'gentle_failover', 79 'load_report_based_failover', 80 'ping_pong', 81 'remove_instance_group', 82 'round_robin', 83 'secondary_locality_gets_no_requests_on_partial_primary_failure', 84 'secondary_locality_gets_requests_on_primary_failure', 85 'traffic_splitting', 86 'path_matching', 87 'header_matching', 88 'api_listener', 89 'forwarding_rule_port_match', 90 'forwarding_rule_default_port', 91 'metadata_filter', 92] 93 94# Valid test cases, but not in all. So the tests can only run manually, and 95# aren't enabled automatically for all languages. 96# 97# TODO: Move them into _TEST_CASES when support is ready in all languages. 98_ADDITIONAL_TEST_CASES = [ 99 'circuit_breaking', 100 'timeout', 101 'fault_injection', 102 'csds', 103] 104 105# Test cases that require the V3 API. Skipped in older runs. 106_V3_TEST_CASES = frozenset(['timeout', 'fault_injection', 'csds']) 107 108# Test cases that require the alpha API. Skipped for stable API runs. 109_ALPHA_TEST_CASES = frozenset(['timeout']) 110 111 112def parse_test_cases(arg): 113 if arg == '': 114 return [] 115 arg_split = arg.split(',') 116 test_cases = set() 117 all_test_cases = _TEST_CASES + _ADDITIONAL_TEST_CASES 118 for arg in arg_split: 119 if arg == "all": 120 test_cases = test_cases.union(_TEST_CASES) 121 else: 122 test_cases = test_cases.union([arg]) 123 if not all([test_case in all_test_cases for test_case in test_cases]): 124 raise Exception('Failed to parse test cases %s' % arg) 125 # Perserve order. 126 return [x for x in all_test_cases if x in test_cases] 127 128 129def parse_port_range(port_arg): 130 try: 131 port = int(port_arg) 132 return list(range(port, port + 1)) 133 except: 134 port_min, port_max = port_arg.split(':') 135 return list(range(int(port_min), int(port_max) + 1)) 136 137 138argp = argparse.ArgumentParser(description='Run xDS interop tests on GCP') 139# TODO(zdapeng): remove default value of project_id and project_num 140argp.add_argument('--project_id', default='grpc-testing', help='GCP project id') 141argp.add_argument('--project_num', 142 default='830293263384', 143 help='GCP project number') 144argp.add_argument( 145 '--gcp_suffix', 146 default='', 147 help='Optional suffix for all generated GCP resource names. Useful to ' 148 'ensure distinct names across test runs.') 149argp.add_argument( 150 '--test_case', 151 default='ping_pong', 152 type=parse_test_cases, 153 help='Comma-separated list of test cases to run. Available tests: %s, ' 154 '(or \'all\' to run every test). ' 155 'Alternative tests not included in \'all\': %s' % 156 (','.join(_TEST_CASES), ','.join(_ADDITIONAL_TEST_CASES))) 157argp.add_argument( 158 '--bootstrap_file', 159 default='', 160 help='File to reference via GRPC_XDS_BOOTSTRAP. Disables built-in ' 161 'bootstrap generation') 162argp.add_argument( 163 '--xds_v3_support', 164 default=False, 165 action='store_true', 166 help='Support xDS v3 via GRPC_XDS_EXPERIMENTAL_V3_SUPPORT. ' 167 'If a pre-created bootstrap file is provided via the --bootstrap_file ' 168 'parameter, it should include xds_v3 in its server_features field.') 169argp.add_argument( 170 '--client_cmd', 171 default=None, 172 help='Command to launch xDS test client. {server_uri}, {stats_port} and ' 173 '{qps} references will be replaced using str.format(). GRPC_XDS_BOOTSTRAP ' 174 'will be set for the command') 175argp.add_argument( 176 '--client_hosts', 177 default=None, 178 help='Comma-separated list of hosts running client processes. If set, ' 179 '--client_cmd is ignored and client processes are assumed to be running on ' 180 'the specified hosts.') 181argp.add_argument('--zone', default='us-central1-a') 182argp.add_argument('--secondary_zone', 183 default='us-west1-b', 184 help='Zone to use for secondary TD locality tests') 185argp.add_argument('--qps', default=100, type=int, help='Client QPS') 186argp.add_argument( 187 '--wait_for_backend_sec', 188 default=1200, 189 type=int, 190 help='Time limit for waiting for created backend services to report ' 191 'healthy when launching or updated GCP resources') 192argp.add_argument( 193 '--use_existing_gcp_resources', 194 default=False, 195 action='store_true', 196 help= 197 'If set, find and use already created GCP resources instead of creating new' 198 ' ones.') 199argp.add_argument( 200 '--keep_gcp_resources', 201 default=False, 202 action='store_true', 203 help= 204 'Leave GCP VMs and configuration running after test. Default behavior is ' 205 'to delete when tests complete.') 206argp.add_argument('--halt_after_fail', 207 action='store_true', 208 help='Halt and save the resources when test failed.') 209argp.add_argument( 210 '--compute_discovery_document', 211 default=None, 212 type=str, 213 help= 214 'If provided, uses this file instead of retrieving via the GCP discovery ' 215 'API') 216argp.add_argument( 217 '--alpha_compute_discovery_document', 218 default=None, 219 type=str, 220 help='If provided, uses this file instead of retrieving via the alpha GCP ' 221 'discovery API') 222argp.add_argument('--network', 223 default='global/networks/default', 224 help='GCP network to use') 225_DEFAULT_PORT_RANGE = '8080:8280' 226argp.add_argument('--service_port_range', 227 default=_DEFAULT_PORT_RANGE, 228 type=parse_port_range, 229 help='Listening port for created gRPC backends. Specified as ' 230 'either a single int or as a range in the format min:max, in ' 231 'which case an available port p will be chosen s.t. min <= p ' 232 '<= max') 233argp.add_argument( 234 '--stats_port', 235 default=8079, 236 type=int, 237 help='Local port for the client process to expose the LB stats service') 238argp.add_argument('--xds_server', 239 default='trafficdirector.googleapis.com:443', 240 help='xDS server') 241argp.add_argument('--source_image', 242 default='projects/debian-cloud/global/images/family/debian-9', 243 help='Source image for VMs created during the test') 244argp.add_argument('--path_to_server_binary', 245 default=None, 246 type=str, 247 help='If set, the server binary must already be pre-built on ' 248 'the specified source image') 249argp.add_argument('--machine_type', 250 default='e2-standard-2', 251 help='Machine type for VMs created during the test') 252argp.add_argument( 253 '--instance_group_size', 254 default=2, 255 type=int, 256 help='Number of VMs to create per instance group. Certain test cases (e.g., ' 257 'round_robin) may not give meaningful results if this is set to a value ' 258 'less than 2.') 259argp.add_argument('--verbose', 260 help='verbose log output', 261 default=False, 262 action='store_true') 263# TODO(ericgribkoff) Remove this param once the sponge-formatted log files are 264# visible in all test environments. 265argp.add_argument('--log_client_output', 266 help='Log captured client output', 267 default=False, 268 action='store_true') 269# TODO(ericgribkoff) Remove this flag once all test environments are verified to 270# have access to the alpha compute APIs. 271argp.add_argument('--only_stable_gcp_apis', 272 help='Do not use alpha compute APIs. Some tests may be ' 273 'incompatible with this option (gRPC health checks are ' 274 'currently alpha and required for simulating server failure', 275 default=False, 276 action='store_true') 277args = argp.parse_args() 278 279if args.verbose: 280 logger.setLevel(logging.DEBUG) 281 282CLIENT_HOSTS = [] 283if args.client_hosts: 284 CLIENT_HOSTS = args.client_hosts.split(',') 285 286# Each of the config propagation in the control plane should finish within 600s. 287# Otherwise, it indicates a bug in the control plane. The config propagation 288# includes all kinds of traffic config update, like updating urlMap, creating 289# the resources for the first time, updating BackendService, and changing the 290# status of endpoints in BackendService. 291_WAIT_FOR_URL_MAP_PATCH_SEC = 600 292# In general, fetching load balancing stats only takes ~10s. However, slow 293# config update could lead to empty EDS or similar symptoms causing the 294# connection to hang for a long period of time. So, we want to extend the stats 295# wait time to be the same as urlMap patch time. 296_WAIT_FOR_STATS_SEC = _WAIT_FOR_URL_MAP_PATCH_SEC 297 298_DEFAULT_SERVICE_PORT = 80 299_WAIT_FOR_BACKEND_SEC = args.wait_for_backend_sec 300_WAIT_FOR_OPERATION_SEC = 1200 301_INSTANCE_GROUP_SIZE = args.instance_group_size 302_NUM_TEST_RPCS = 10 * args.qps 303_CONNECTION_TIMEOUT_SEC = 60 304_GCP_API_RETRIES = 5 305_BOOTSTRAP_TEMPLATE = """ 306{{ 307 "node": {{ 308 "id": "{node_id}", 309 "metadata": {{ 310 "TRAFFICDIRECTOR_NETWORK_NAME": "%s", 311 "com.googleapis.trafficdirector.config_time_trace": "TRUE" 312 }}, 313 "locality": {{ 314 "zone": "%s" 315 }} 316 }}, 317 "xds_servers": [{{ 318 "server_uri": "%s", 319 "channel_creds": [ 320 {{ 321 "type": "google_default", 322 "config": {{}} 323 }} 324 ], 325 "server_features": {server_features} 326 }}] 327}}""" % (args.network.split('/')[-1], args.zone, args.xds_server) 328 329# TODO(ericgribkoff) Add change_backend_service to this list once TD no longer 330# sends an update with no localities when adding the MIG to the backend service 331# can race with the URL map patch. 332_TESTS_TO_FAIL_ON_RPC_FAILURE = ['ping_pong', 'round_robin'] 333# Tests that run UnaryCall and EmptyCall. 334_TESTS_TO_RUN_MULTIPLE_RPCS = ['path_matching', 'header_matching'] 335# Tests that make UnaryCall with test metadata. 336_TESTS_TO_SEND_METADATA = ['header_matching'] 337_TEST_METADATA_KEY = 'xds_md' 338_TEST_METADATA_VALUE_UNARY = 'unary_yranu' 339_TEST_METADATA_VALUE_EMPTY = 'empty_ytpme' 340# Extra RPC metadata whose value is a number, sent with UnaryCall only. 341_TEST_METADATA_NUMERIC_KEY = 'xds_md_numeric' 342_TEST_METADATA_NUMERIC_VALUE = '159' 343_PATH_MATCHER_NAME = 'path-matcher' 344_BASE_TEMPLATE_NAME = 'test-template' 345_BASE_INSTANCE_GROUP_NAME = 'test-ig' 346_BASE_HEALTH_CHECK_NAME = 'test-hc' 347_BASE_FIREWALL_RULE_NAME = 'test-fw-rule' 348_BASE_BACKEND_SERVICE_NAME = 'test-backend-service' 349_BASE_URL_MAP_NAME = 'test-map' 350_BASE_SERVICE_HOST = 'grpc-test' 351_BASE_TARGET_PROXY_NAME = 'test-target-proxy' 352_BASE_FORWARDING_RULE_NAME = 'test-forwarding-rule' 353_TEST_LOG_BASE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 354 '../../reports') 355_SPONGE_LOG_NAME = 'sponge_log.log' 356_SPONGE_XML_NAME = 'sponge_log.xml' 357 358 359def get_client_stats(num_rpcs, timeout_sec): 360 if CLIENT_HOSTS: 361 hosts = CLIENT_HOSTS 362 else: 363 hosts = ['localhost'] 364 for host in hosts: 365 with grpc.insecure_channel('%s:%d' % 366 (host, args.stats_port)) as channel: 367 stub = test_pb2_grpc.LoadBalancerStatsServiceStub(channel) 368 request = messages_pb2.LoadBalancerStatsRequest() 369 request.num_rpcs = num_rpcs 370 request.timeout_sec = timeout_sec 371 rpc_timeout = timeout_sec + _CONNECTION_TIMEOUT_SEC 372 logger.debug('Invoking GetClientStats RPC to %s:%d:', host, 373 args.stats_port) 374 response = stub.GetClientStats(request, 375 wait_for_ready=True, 376 timeout=rpc_timeout) 377 logger.debug('Invoked GetClientStats RPC to %s: %s', host, 378 json_format.MessageToJson(response)) 379 return response 380 381 382def get_client_accumulated_stats(): 383 if CLIENT_HOSTS: 384 hosts = CLIENT_HOSTS 385 else: 386 hosts = ['localhost'] 387 for host in hosts: 388 with grpc.insecure_channel('%s:%d' % 389 (host, args.stats_port)) as channel: 390 stub = test_pb2_grpc.LoadBalancerStatsServiceStub(channel) 391 request = messages_pb2.LoadBalancerAccumulatedStatsRequest() 392 logger.debug('Invoking GetClientAccumulatedStats RPC to %s:%d:', 393 host, args.stats_port) 394 response = stub.GetClientAccumulatedStats( 395 request, wait_for_ready=True, timeout=_CONNECTION_TIMEOUT_SEC) 396 logger.debug('Invoked GetClientAccumulatedStats RPC to %s: %s', 397 host, response) 398 return response 399 400 401def get_client_xds_config_dump(): 402 if CLIENT_HOSTS: 403 hosts = CLIENT_HOSTS 404 else: 405 hosts = ['localhost'] 406 for host in hosts: 407 server_address = '%s:%d' % (host, args.stats_port) 408 with grpc.insecure_channel(server_address) as channel: 409 stub = csds_pb2_grpc.ClientStatusDiscoveryServiceStub(channel) 410 logger.debug('Fetching xDS config dump from %s', server_address) 411 response = stub.FetchClientStatus(csds_pb2.ClientStatusRequest(), 412 wait_for_ready=True, 413 timeout=_CONNECTION_TIMEOUT_SEC) 414 logger.debug('Fetched xDS config dump from %s', server_address) 415 if len(response.config) != 1: 416 logger.error('Unexpected number of ClientConfigs %d: %s', 417 len(response.config), response) 418 return None 419 else: 420 # Converting the ClientStatusResponse into JSON, because many 421 # fields are packed in google.protobuf.Any. It will require many 422 # duplicated code to unpack proto message and inspect values. 423 return json_format.MessageToDict( 424 response.config[0], preserving_proto_field_name=True) 425 426 427def configure_client(rpc_types, metadata=[], timeout_sec=None): 428 if CLIENT_HOSTS: 429 hosts = CLIENT_HOSTS 430 else: 431 hosts = ['localhost'] 432 for host in hosts: 433 with grpc.insecure_channel('%s:%d' % 434 (host, args.stats_port)) as channel: 435 stub = test_pb2_grpc.XdsUpdateClientConfigureServiceStub(channel) 436 request = messages_pb2.ClientConfigureRequest() 437 request.types.extend(rpc_types) 438 for rpc_type, md_key, md_value in metadata: 439 md = request.metadata.add() 440 md.type = rpc_type 441 md.key = md_key 442 md.value = md_value 443 if timeout_sec: 444 request.timeout_sec = timeout_sec 445 logger.debug( 446 'Invoking XdsUpdateClientConfigureService RPC to %s:%d: %s', 447 host, args.stats_port, request) 448 stub.Configure(request, 449 wait_for_ready=True, 450 timeout=_CONNECTION_TIMEOUT_SEC) 451 logger.debug('Invoked XdsUpdateClientConfigureService RPC to %s', 452 host) 453 454 455class RpcDistributionError(Exception): 456 pass 457 458 459def _verify_rpcs_to_given_backends(backends, timeout_sec, num_rpcs, 460 allow_failures): 461 start_time = time.time() 462 error_msg = None 463 logger.debug('Waiting for %d sec until backends %s receive load' % 464 (timeout_sec, backends)) 465 while time.time() - start_time <= timeout_sec: 466 error_msg = None 467 stats = get_client_stats(num_rpcs, timeout_sec) 468 rpcs_by_peer = stats.rpcs_by_peer 469 for backend in backends: 470 if backend not in rpcs_by_peer: 471 error_msg = 'Backend %s did not receive load' % backend 472 break 473 if not error_msg and len(rpcs_by_peer) > len(backends): 474 error_msg = 'Unexpected backend received load: %s' % rpcs_by_peer 475 if not allow_failures and stats.num_failures > 0: 476 error_msg = '%d RPCs failed' % stats.num_failures 477 if not error_msg: 478 return 479 raise RpcDistributionError(error_msg) 480 481 482def wait_until_all_rpcs_go_to_given_backends_or_fail(backends, 483 timeout_sec, 484 num_rpcs=_NUM_TEST_RPCS): 485 _verify_rpcs_to_given_backends(backends, 486 timeout_sec, 487 num_rpcs, 488 allow_failures=True) 489 490 491def wait_until_all_rpcs_go_to_given_backends(backends, 492 timeout_sec, 493 num_rpcs=_NUM_TEST_RPCS): 494 _verify_rpcs_to_given_backends(backends, 495 timeout_sec, 496 num_rpcs, 497 allow_failures=False) 498 499 500def wait_until_no_rpcs_go_to_given_backends(backends, timeout_sec): 501 start_time = time.time() 502 while time.time() - start_time <= timeout_sec: 503 stats = get_client_stats(_NUM_TEST_RPCS, timeout_sec) 504 error_msg = None 505 rpcs_by_peer = stats.rpcs_by_peer 506 for backend in backends: 507 if backend in rpcs_by_peer: 508 error_msg = 'Unexpected backend %s receives load' % backend 509 break 510 if not error_msg: 511 return 512 raise Exception('Unexpected RPCs going to given backends') 513 514 515def wait_until_rpcs_in_flight(rpc_type, timeout_sec, num_rpcs, threshold): 516 '''Block until the test client reaches the state with the given number 517 of RPCs being outstanding stably. 518 519 Args: 520 rpc_type: A string indicating the RPC method to check for. Either 521 'UnaryCall' or 'EmptyCall'. 522 timeout_sec: Maximum number of seconds to wait until the desired state 523 is reached. 524 num_rpcs: Expected number of RPCs to be in-flight. 525 threshold: Number within [0,100], the tolerable percentage by which 526 the actual number of RPCs in-flight can differ from the expected number. 527 ''' 528 if threshold < 0 or threshold > 100: 529 raise ValueError('Value error: Threshold should be between 0 to 100') 530 threshold_fraction = threshold / 100.0 531 start_time = time.time() 532 error_msg = None 533 logger.debug( 534 'Waiting for %d sec until %d %s RPCs (with %d%% tolerance) in-flight' % 535 (timeout_sec, num_rpcs, rpc_type, threshold)) 536 while time.time() - start_time <= timeout_sec: 537 error_msg = _check_rpcs_in_flight(rpc_type, num_rpcs, threshold, 538 threshold_fraction) 539 if error_msg: 540 logger.debug('Progress: %s', error_msg) 541 time.sleep(2) 542 else: 543 break 544 # Ensure the number of outstanding RPCs is stable. 545 if not error_msg: 546 time.sleep(5) 547 error_msg = _check_rpcs_in_flight(rpc_type, num_rpcs, threshold, 548 threshold_fraction) 549 if error_msg: 550 raise Exception("Wrong number of %s RPCs in-flight: %s" % 551 (rpc_type, error_msg)) 552 553 554def _check_rpcs_in_flight(rpc_type, num_rpcs, threshold, threshold_fraction): 555 error_msg = None 556 stats = get_client_accumulated_stats() 557 rpcs_started = stats.num_rpcs_started_by_method[rpc_type] 558 rpcs_succeeded = stats.num_rpcs_succeeded_by_method[rpc_type] 559 rpcs_failed = stats.num_rpcs_failed_by_method[rpc_type] 560 rpcs_in_flight = rpcs_started - rpcs_succeeded - rpcs_failed 561 if rpcs_in_flight < (num_rpcs * (1 - threshold_fraction)): 562 error_msg = ('actual(%d) < expected(%d - %d%%)' % 563 (rpcs_in_flight, num_rpcs, threshold)) 564 elif rpcs_in_flight > (num_rpcs * (1 + threshold_fraction)): 565 error_msg = ('actual(%d) > expected(%d + %d%%)' % 566 (rpcs_in_flight, num_rpcs, threshold)) 567 return error_msg 568 569 570def compare_distributions(actual_distribution, expected_distribution, 571 threshold): 572 """Compare if two distributions are similar. 573 574 Args: 575 actual_distribution: A list of floats, contains the actual distribution. 576 expected_distribution: A list of floats, contains the expected distribution. 577 threshold: Number within [0,100], the threshold percentage by which the 578 actual distribution can differ from the expected distribution. 579 580 Returns: 581 The similarity between the distributions as a boolean. Returns true if the 582 actual distribution lies within the threshold of the expected 583 distribution, false otherwise. 584 585 Raises: 586 ValueError: if threshold is not with in [0,100]. 587 Exception: containing detailed error messages. 588 """ 589 if len(expected_distribution) != len(actual_distribution): 590 raise Exception( 591 'Error: expected and actual distributions have different size (%d vs %d)' 592 % (len(expected_distribution), len(actual_distribution))) 593 if threshold < 0 or threshold > 100: 594 raise ValueError('Value error: Threshold should be between 0 to 100') 595 threshold_fraction = threshold / 100.0 596 for expected, actual in zip(expected_distribution, actual_distribution): 597 if actual < (expected * (1 - threshold_fraction)): 598 raise Exception("actual(%f) < expected(%f-%d%%)" % 599 (actual, expected, threshold)) 600 if actual > (expected * (1 + threshold_fraction)): 601 raise Exception("actual(%f) > expected(%f+%d%%)" % 602 (actual, expected, threshold)) 603 return True 604 605 606def compare_expected_instances(stats, expected_instances): 607 """Compare if stats have expected instances for each type of RPC. 608 609 Args: 610 stats: LoadBalancerStatsResponse reported by interop client. 611 expected_instances: a dict with key as the RPC type (string), value as 612 the expected backend instances (list of strings). 613 614 Returns: 615 Returns true if the instances are expected. False if not. 616 """ 617 for rpc_type, expected_peers in list(expected_instances.items()): 618 rpcs_by_peer_for_type = stats.rpcs_by_method[rpc_type] 619 rpcs_by_peer = rpcs_by_peer_for_type.rpcs_by_peer if rpcs_by_peer_for_type else None 620 logger.debug('rpc: %s, by_peer: %s', rpc_type, rpcs_by_peer) 621 peers = list(rpcs_by_peer.keys()) 622 if set(peers) != set(expected_peers): 623 logger.info('unexpected peers for %s, got %s, want %s', rpc_type, 624 peers, expected_peers) 625 return False 626 return True 627 628 629def test_backends_restart(gcp, backend_service, instance_group): 630 logger.info('Running test_backends_restart') 631 instance_names = get_instance_names(gcp, instance_group) 632 num_instances = len(instance_names) 633 start_time = time.time() 634 wait_until_all_rpcs_go_to_given_backends(instance_names, 635 _WAIT_FOR_STATS_SEC) 636 try: 637 resize_instance_group(gcp, instance_group, 0) 638 wait_until_all_rpcs_go_to_given_backends_or_fail([], 639 _WAIT_FOR_BACKEND_SEC) 640 finally: 641 resize_instance_group(gcp, instance_group, num_instances) 642 wait_for_healthy_backends(gcp, backend_service, instance_group) 643 new_instance_names = get_instance_names(gcp, instance_group) 644 wait_until_all_rpcs_go_to_given_backends(new_instance_names, 645 _WAIT_FOR_BACKEND_SEC) 646 647 648def test_change_backend_service(gcp, original_backend_service, instance_group, 649 alternate_backend_service, 650 same_zone_instance_group): 651 logger.info('Running test_change_backend_service') 652 original_backend_instances = get_instance_names(gcp, instance_group) 653 alternate_backend_instances = get_instance_names(gcp, 654 same_zone_instance_group) 655 patch_backend_service(gcp, alternate_backend_service, 656 [same_zone_instance_group]) 657 wait_for_healthy_backends(gcp, original_backend_service, instance_group) 658 wait_for_healthy_backends(gcp, alternate_backend_service, 659 same_zone_instance_group) 660 wait_until_all_rpcs_go_to_given_backends(original_backend_instances, 661 _WAIT_FOR_STATS_SEC) 662 passed = True 663 try: 664 patch_url_map_backend_service(gcp, alternate_backend_service) 665 wait_until_all_rpcs_go_to_given_backends(alternate_backend_instances, 666 _WAIT_FOR_URL_MAP_PATCH_SEC) 667 except Exception: 668 passed = False 669 raise 670 finally: 671 if passed or not args.halt_after_fail: 672 patch_url_map_backend_service(gcp, original_backend_service) 673 patch_backend_service(gcp, alternate_backend_service, []) 674 675 676def test_gentle_failover(gcp, 677 backend_service, 678 primary_instance_group, 679 secondary_instance_group, 680 swapped_primary_and_secondary=False): 681 logger.info('Running test_gentle_failover') 682 num_primary_instances = len(get_instance_names(gcp, primary_instance_group)) 683 min_instances_for_gentle_failover = 3 # Need >50% failure to start failover 684 passed = True 685 try: 686 if num_primary_instances < min_instances_for_gentle_failover: 687 resize_instance_group(gcp, primary_instance_group, 688 min_instances_for_gentle_failover) 689 patch_backend_service( 690 gcp, backend_service, 691 [primary_instance_group, secondary_instance_group]) 692 primary_instance_names = get_instance_names(gcp, primary_instance_group) 693 secondary_instance_names = get_instance_names(gcp, 694 secondary_instance_group) 695 wait_for_healthy_backends(gcp, backend_service, primary_instance_group) 696 wait_for_healthy_backends(gcp, backend_service, 697 secondary_instance_group) 698 wait_until_all_rpcs_go_to_given_backends(primary_instance_names, 699 _WAIT_FOR_STATS_SEC) 700 instances_to_stop = primary_instance_names[:-1] 701 remaining_instances = primary_instance_names[-1:] 702 try: 703 set_serving_status(instances_to_stop, 704 gcp.service_port, 705 serving=False) 706 wait_until_all_rpcs_go_to_given_backends( 707 remaining_instances + secondary_instance_names, 708 _WAIT_FOR_BACKEND_SEC) 709 finally: 710 set_serving_status(primary_instance_names, 711 gcp.service_port, 712 serving=True) 713 except RpcDistributionError as e: 714 if not swapped_primary_and_secondary and is_primary_instance_group( 715 gcp, secondary_instance_group): 716 # Swap expectation of primary and secondary instance groups. 717 test_gentle_failover(gcp, 718 backend_service, 719 secondary_instance_group, 720 primary_instance_group, 721 swapped_primary_and_secondary=True) 722 else: 723 passed = False 724 raise e 725 except Exception: 726 passed = False 727 raise 728 finally: 729 if passed or not args.halt_after_fail: 730 patch_backend_service(gcp, backend_service, 731 [primary_instance_group]) 732 resize_instance_group(gcp, primary_instance_group, 733 num_primary_instances) 734 instance_names = get_instance_names(gcp, primary_instance_group) 735 wait_until_all_rpcs_go_to_given_backends(instance_names, 736 _WAIT_FOR_BACKEND_SEC) 737 738 739def test_load_report_based_failover(gcp, backend_service, 740 primary_instance_group, 741 secondary_instance_group): 742 logger.info('Running test_load_report_based_failover') 743 passed = True 744 try: 745 patch_backend_service( 746 gcp, backend_service, 747 [primary_instance_group, secondary_instance_group]) 748 primary_instance_names = get_instance_names(gcp, primary_instance_group) 749 secondary_instance_names = get_instance_names(gcp, 750 secondary_instance_group) 751 wait_for_healthy_backends(gcp, backend_service, primary_instance_group) 752 wait_for_healthy_backends(gcp, backend_service, 753 secondary_instance_group) 754 wait_until_all_rpcs_go_to_given_backends(primary_instance_names, 755 _WAIT_FOR_STATS_SEC) 756 # Set primary locality's balance mode to RATE, and RPS to 20% of the 757 # client's QPS. The secondary locality will be used. 758 max_rate = int(args.qps * 1 / 5) 759 logger.info('Patching backend service to RATE with %d max_rate', 760 max_rate) 761 patch_backend_service( 762 gcp, 763 backend_service, [primary_instance_group, secondary_instance_group], 764 balancing_mode='RATE', 765 max_rate=max_rate) 766 wait_until_all_rpcs_go_to_given_backends( 767 primary_instance_names + secondary_instance_names, 768 _WAIT_FOR_BACKEND_SEC) 769 770 # Set primary locality's balance mode to RATE, and RPS to 120% of the 771 # client's QPS. Only the primary locality will be used. 772 max_rate = int(args.qps * 6 / 5) 773 logger.info('Patching backend service to RATE with %d max_rate', 774 max_rate) 775 patch_backend_service( 776 gcp, 777 backend_service, [primary_instance_group, secondary_instance_group], 778 balancing_mode='RATE', 779 max_rate=max_rate) 780 wait_until_all_rpcs_go_to_given_backends(primary_instance_names, 781 _WAIT_FOR_BACKEND_SEC) 782 logger.info("success") 783 except Exception: 784 passed = False 785 raise 786 finally: 787 if passed or not args.halt_after_fail: 788 patch_backend_service(gcp, backend_service, 789 [primary_instance_group]) 790 instance_names = get_instance_names(gcp, primary_instance_group) 791 wait_until_all_rpcs_go_to_given_backends(instance_names, 792 _WAIT_FOR_BACKEND_SEC) 793 794 795def test_ping_pong(gcp, backend_service, instance_group): 796 logger.info('Running test_ping_pong') 797 wait_for_healthy_backends(gcp, backend_service, instance_group) 798 instance_names = get_instance_names(gcp, instance_group) 799 wait_until_all_rpcs_go_to_given_backends(instance_names, 800 _WAIT_FOR_STATS_SEC) 801 802 803def test_remove_instance_group(gcp, backend_service, instance_group, 804 same_zone_instance_group): 805 logger.info('Running test_remove_instance_group') 806 passed = True 807 try: 808 patch_backend_service(gcp, 809 backend_service, 810 [instance_group, same_zone_instance_group], 811 balancing_mode='RATE') 812 wait_for_healthy_backends(gcp, backend_service, instance_group) 813 wait_for_healthy_backends(gcp, backend_service, 814 same_zone_instance_group) 815 instance_names = get_instance_names(gcp, instance_group) 816 same_zone_instance_names = get_instance_names(gcp, 817 same_zone_instance_group) 818 try: 819 wait_until_all_rpcs_go_to_given_backends( 820 instance_names + same_zone_instance_names, 821 _WAIT_FOR_OPERATION_SEC) 822 remaining_instance_group = same_zone_instance_group 823 remaining_instance_names = same_zone_instance_names 824 except RpcDistributionError as e: 825 # If connected to TD in a different zone, we may route traffic to 826 # only one instance group. Determine which group that is to continue 827 # with the remainder of the test case. 828 try: 829 wait_until_all_rpcs_go_to_given_backends( 830 instance_names, _WAIT_FOR_STATS_SEC) 831 remaining_instance_group = same_zone_instance_group 832 remaining_instance_names = same_zone_instance_names 833 except RpcDistributionError as e: 834 wait_until_all_rpcs_go_to_given_backends( 835 same_zone_instance_names, _WAIT_FOR_STATS_SEC) 836 remaining_instance_group = instance_group 837 remaining_instance_names = instance_names 838 patch_backend_service(gcp, 839 backend_service, [remaining_instance_group], 840 balancing_mode='RATE') 841 wait_until_all_rpcs_go_to_given_backends(remaining_instance_names, 842 _WAIT_FOR_BACKEND_SEC) 843 except Exception: 844 passed = False 845 raise 846 finally: 847 if passed or not args.halt_after_fail: 848 patch_backend_service(gcp, backend_service, [instance_group]) 849 wait_until_all_rpcs_go_to_given_backends(instance_names, 850 _WAIT_FOR_BACKEND_SEC) 851 852 853def test_round_robin(gcp, backend_service, instance_group): 854 logger.info('Running test_round_robin') 855 wait_for_healthy_backends(gcp, backend_service, instance_group) 856 instance_names = get_instance_names(gcp, instance_group) 857 threshold = 1 858 wait_until_all_rpcs_go_to_given_backends(instance_names, 859 _WAIT_FOR_STATS_SEC) 860 # TODO(ericgribkoff) Delayed config propagation from earlier tests 861 # may result in briefly receiving an empty EDS update, resulting in failed 862 # RPCs. Retry distribution validation if this occurs; long-term fix is 863 # creating new backend resources for each individual test case. 864 # Each attempt takes 10 seconds. Config propagation can take several 865 # minutes. 866 max_attempts = 40 867 for i in range(max_attempts): 868 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) 869 requests_received = [stats.rpcs_by_peer[x] for x in stats.rpcs_by_peer] 870 total_requests_received = sum(requests_received) 871 if total_requests_received != _NUM_TEST_RPCS: 872 logger.info('Unexpected RPC failures, retrying: %s', stats) 873 continue 874 expected_requests = total_requests_received / len(instance_names) 875 for instance in instance_names: 876 if abs(stats.rpcs_by_peer[instance] - 877 expected_requests) > threshold: 878 raise Exception( 879 'RPC peer distribution differs from expected by more than %d ' 880 'for instance %s (%s)' % (threshold, instance, stats)) 881 return 882 raise Exception('RPC failures persisted through %d retries' % max_attempts) 883 884 885def test_secondary_locality_gets_no_requests_on_partial_primary_failure( 886 gcp, 887 backend_service, 888 primary_instance_group, 889 secondary_instance_group, 890 swapped_primary_and_secondary=False): 891 logger.info( 892 'Running secondary_locality_gets_no_requests_on_partial_primary_failure' 893 ) 894 passed = True 895 try: 896 patch_backend_service( 897 gcp, backend_service, 898 [primary_instance_group, secondary_instance_group]) 899 wait_for_healthy_backends(gcp, backend_service, primary_instance_group) 900 wait_for_healthy_backends(gcp, backend_service, 901 secondary_instance_group) 902 primary_instance_names = get_instance_names(gcp, primary_instance_group) 903 wait_until_all_rpcs_go_to_given_backends(primary_instance_names, 904 _WAIT_FOR_STATS_SEC) 905 instances_to_stop = primary_instance_names[:1] 906 remaining_instances = primary_instance_names[1:] 907 try: 908 set_serving_status(instances_to_stop, 909 gcp.service_port, 910 serving=False) 911 wait_until_all_rpcs_go_to_given_backends(remaining_instances, 912 _WAIT_FOR_BACKEND_SEC) 913 finally: 914 set_serving_status(primary_instance_names, 915 gcp.service_port, 916 serving=True) 917 except RpcDistributionError as e: 918 if not swapped_primary_and_secondary and is_primary_instance_group( 919 gcp, secondary_instance_group): 920 # Swap expectation of primary and secondary instance groups. 921 test_secondary_locality_gets_no_requests_on_partial_primary_failure( 922 gcp, 923 backend_service, 924 secondary_instance_group, 925 primary_instance_group, 926 swapped_primary_and_secondary=True) 927 else: 928 passed = False 929 raise e 930 finally: 931 if passed or not args.halt_after_fail: 932 patch_backend_service(gcp, backend_service, 933 [primary_instance_group]) 934 935 936def test_secondary_locality_gets_requests_on_primary_failure( 937 gcp, 938 backend_service, 939 primary_instance_group, 940 secondary_instance_group, 941 swapped_primary_and_secondary=False): 942 logger.info('Running secondary_locality_gets_requests_on_primary_failure') 943 passed = True 944 try: 945 patch_backend_service( 946 gcp, backend_service, 947 [primary_instance_group, secondary_instance_group]) 948 wait_for_healthy_backends(gcp, backend_service, primary_instance_group) 949 wait_for_healthy_backends(gcp, backend_service, 950 secondary_instance_group) 951 primary_instance_names = get_instance_names(gcp, primary_instance_group) 952 secondary_instance_names = get_instance_names(gcp, 953 secondary_instance_group) 954 wait_until_all_rpcs_go_to_given_backends(primary_instance_names, 955 _WAIT_FOR_STATS_SEC) 956 try: 957 set_serving_status(primary_instance_names, 958 gcp.service_port, 959 serving=False) 960 wait_until_all_rpcs_go_to_given_backends(secondary_instance_names, 961 _WAIT_FOR_BACKEND_SEC) 962 finally: 963 set_serving_status(primary_instance_names, 964 gcp.service_port, 965 serving=True) 966 except RpcDistributionError as e: 967 if not swapped_primary_and_secondary and is_primary_instance_group( 968 gcp, secondary_instance_group): 969 # Swap expectation of primary and secondary instance groups. 970 test_secondary_locality_gets_requests_on_primary_failure( 971 gcp, 972 backend_service, 973 secondary_instance_group, 974 primary_instance_group, 975 swapped_primary_and_secondary=True) 976 else: 977 passed = False 978 raise e 979 finally: 980 if passed or not args.halt_after_fail: 981 patch_backend_service(gcp, backend_service, 982 [primary_instance_group]) 983 984 985def prepare_services_for_urlmap_tests(gcp, original_backend_service, 986 instance_group, alternate_backend_service, 987 same_zone_instance_group): 988 ''' 989 This function prepares the services to be ready for tests that modifies 990 urlmaps. 991 992 Returns: 993 Returns original and alternate backend names as lists of strings. 994 ''' 995 logger.info('waiting for original backends to become healthy') 996 wait_for_healthy_backends(gcp, original_backend_service, instance_group) 997 998 patch_backend_service(gcp, alternate_backend_service, 999 [same_zone_instance_group]) 1000 logger.info('waiting for alternate to become healthy') 1001 wait_for_healthy_backends(gcp, alternate_backend_service, 1002 same_zone_instance_group) 1003 1004 original_backend_instances = get_instance_names(gcp, instance_group) 1005 logger.info('original backends instances: %s', original_backend_instances) 1006 1007 alternate_backend_instances = get_instance_names(gcp, 1008 same_zone_instance_group) 1009 logger.info('alternate backends instances: %s', alternate_backend_instances) 1010 1011 # Start with all traffic going to original_backend_service. 1012 logger.info('waiting for traffic to all go to original backends') 1013 wait_until_all_rpcs_go_to_given_backends(original_backend_instances, 1014 _WAIT_FOR_STATS_SEC) 1015 return original_backend_instances, alternate_backend_instances 1016 1017 1018def test_metadata_filter(gcp, original_backend_service, instance_group, 1019 alternate_backend_service, same_zone_instance_group): 1020 logger.info("Running test_metadata_filter") 1021 wait_for_healthy_backends(gcp, original_backend_service, instance_group) 1022 original_backend_instances = get_instance_names(gcp, instance_group) 1023 alternate_backend_instances = get_instance_names(gcp, 1024 same_zone_instance_group) 1025 patch_backend_service(gcp, alternate_backend_service, 1026 [same_zone_instance_group]) 1027 wait_for_healthy_backends(gcp, alternate_backend_service, 1028 same_zone_instance_group) 1029 passed = True 1030 try: 1031 with open(bootstrap_path) as f: 1032 md = json.load(f)['node']['metadata'] 1033 match_labels = [] 1034 for k, v in list(md.items()): 1035 match_labels.append({'name': k, 'value': v}) 1036 1037 not_match_labels = [{'name': 'fake', 'value': 'fail'}] 1038 test_route_rules = [ 1039 # test MATCH_ALL 1040 [ 1041 { 1042 'priority': 0, 1043 'matchRules': [{ 1044 'prefixMatch': 1045 '/', 1046 'metadataFilters': [{ 1047 'filterMatchCriteria': 'MATCH_ALL', 1048 'filterLabels': not_match_labels 1049 }] 1050 }], 1051 'service': original_backend_service.url 1052 }, 1053 { 1054 'priority': 1, 1055 'matchRules': [{ 1056 'prefixMatch': 1057 '/', 1058 'metadataFilters': [{ 1059 'filterMatchCriteria': 'MATCH_ALL', 1060 'filterLabels': match_labels 1061 }] 1062 }], 1063 'service': alternate_backend_service.url 1064 }, 1065 ], 1066 # test mixing MATCH_ALL and MATCH_ANY 1067 # test MATCH_ALL: super set labels won't match 1068 [ 1069 { 1070 'priority': 0, 1071 'matchRules': [{ 1072 'prefixMatch': 1073 '/', 1074 'metadataFilters': [{ 1075 'filterMatchCriteria': 'MATCH_ALL', 1076 'filterLabels': not_match_labels + match_labels 1077 }] 1078 }], 1079 'service': original_backend_service.url 1080 }, 1081 { 1082 'priority': 1, 1083 'matchRules': [{ 1084 'prefixMatch': 1085 '/', 1086 'metadataFilters': [{ 1087 'filterMatchCriteria': 'MATCH_ANY', 1088 'filterLabels': not_match_labels + match_labels 1089 }] 1090 }], 1091 'service': alternate_backend_service.url 1092 }, 1093 ], 1094 # test MATCH_ANY 1095 [ 1096 { 1097 'priority': 0, 1098 'matchRules': [{ 1099 'prefixMatch': 1100 '/', 1101 'metadataFilters': [{ 1102 'filterMatchCriteria': 'MATCH_ANY', 1103 'filterLabels': not_match_labels 1104 }] 1105 }], 1106 'service': original_backend_service.url 1107 }, 1108 { 1109 'priority': 1, 1110 'matchRules': [{ 1111 'prefixMatch': 1112 '/', 1113 'metadataFilters': [{ 1114 'filterMatchCriteria': 'MATCH_ANY', 1115 'filterLabels': not_match_labels + match_labels 1116 }] 1117 }], 1118 'service': alternate_backend_service.url 1119 }, 1120 ], 1121 # test match multiple route rules 1122 [ 1123 { 1124 'priority': 0, 1125 'matchRules': [{ 1126 'prefixMatch': 1127 '/', 1128 'metadataFilters': [{ 1129 'filterMatchCriteria': 'MATCH_ANY', 1130 'filterLabels': match_labels 1131 }] 1132 }], 1133 'service': alternate_backend_service.url 1134 }, 1135 { 1136 'priority': 1, 1137 'matchRules': [{ 1138 'prefixMatch': 1139 '/', 1140 'metadataFilters': [{ 1141 'filterMatchCriteria': 'MATCH_ALL', 1142 'filterLabels': match_labels 1143 }] 1144 }], 1145 'service': original_backend_service.url 1146 }, 1147 ] 1148 ] 1149 1150 for route_rules in test_route_rules: 1151 wait_until_all_rpcs_go_to_given_backends(original_backend_instances, 1152 _WAIT_FOR_STATS_SEC) 1153 patch_url_map_backend_service(gcp, 1154 original_backend_service, 1155 route_rules=route_rules) 1156 wait_until_no_rpcs_go_to_given_backends(original_backend_instances, 1157 _WAIT_FOR_STATS_SEC) 1158 wait_until_all_rpcs_go_to_given_backends( 1159 alternate_backend_instances, _WAIT_FOR_STATS_SEC) 1160 patch_url_map_backend_service(gcp, original_backend_service) 1161 except Exception: 1162 passed = False 1163 raise 1164 finally: 1165 if passed or not args.halt_after_fail: 1166 patch_backend_service(gcp, alternate_backend_service, []) 1167 1168 1169def test_api_listener(gcp, backend_service, instance_group, 1170 alternate_backend_service): 1171 logger.info("Running api_listener") 1172 passed = True 1173 try: 1174 wait_for_healthy_backends(gcp, backend_service, instance_group) 1175 backend_instances = get_instance_names(gcp, instance_group) 1176 wait_until_all_rpcs_go_to_given_backends(backend_instances, 1177 _WAIT_FOR_STATS_SEC) 1178 # create a second suite of map+tp+fr with the same host name in host rule 1179 # and we have to disable proxyless validation because it needs `0.0.0.0` 1180 # ip address in fr for proxyless and also we violate ip:port uniqueness 1181 # for test purpose. See https://github.com/grpc/grpc-java/issues/8009 1182 new_config_suffix = '2' 1183 url_map_2 = create_url_map(gcp, url_map_name + new_config_suffix, 1184 backend_service, service_host_name) 1185 target_proxy_2 = create_target_proxy( 1186 gcp, target_proxy_name + new_config_suffix, False, url_map_2) 1187 if not gcp.service_port: 1188 raise Exception( 1189 'Faied to find a valid port for the forwarding rule') 1190 potential_ip_addresses = [] 1191 max_attempts = 10 1192 for i in range(max_attempts): 1193 potential_ip_addresses.append('10.10.10.%d' % 1194 (random.randint(0, 255))) 1195 create_global_forwarding_rule(gcp, 1196 forwarding_rule_name + new_config_suffix, 1197 [gcp.service_port], 1198 potential_ip_addresses, target_proxy_2) 1199 if gcp.service_port != _DEFAULT_SERVICE_PORT: 1200 patch_url_map_host_rule_with_port(gcp, 1201 url_map_name + new_config_suffix, 1202 backend_service, 1203 service_host_name) 1204 wait_until_all_rpcs_go_to_given_backends(backend_instances, 1205 _WAIT_FOR_STATS_SEC) 1206 1207 delete_global_forwarding_rule(gcp, gcp.global_forwarding_rules[0]) 1208 delete_target_proxy(gcp, gcp.target_proxies[0]) 1209 delete_url_map(gcp, gcp.url_maps[0]) 1210 verify_attempts = int(_WAIT_FOR_URL_MAP_PATCH_SEC / _NUM_TEST_RPCS * 1211 args.qps) 1212 for i in range(verify_attempts): 1213 wait_until_all_rpcs_go_to_given_backends(backend_instances, 1214 _WAIT_FOR_STATS_SEC) 1215 # delete host rule for the original host name 1216 patch_url_map_backend_service(gcp, alternate_backend_service) 1217 wait_until_no_rpcs_go_to_given_backends(backend_instances, 1218 _WAIT_FOR_STATS_SEC) 1219 1220 except Exception: 1221 passed = False 1222 raise 1223 finally: 1224 if passed or not args.halt_after_fail: 1225 delete_global_forwarding_rules(gcp) 1226 delete_target_proxies(gcp) 1227 delete_url_maps(gcp) 1228 create_url_map(gcp, url_map_name, backend_service, 1229 service_host_name) 1230 create_target_proxy(gcp, target_proxy_name) 1231 create_global_forwarding_rule(gcp, forwarding_rule_name, 1232 potential_service_ports) 1233 if gcp.service_port != _DEFAULT_SERVICE_PORT: 1234 patch_url_map_host_rule_with_port(gcp, url_map_name, 1235 backend_service, 1236 service_host_name) 1237 server_uri = service_host_name + ':' + str(gcp.service_port) 1238 else: 1239 server_uri = service_host_name 1240 return server_uri 1241 1242 1243def test_forwarding_rule_port_match(gcp, backend_service, instance_group): 1244 logger.info("Running test_forwarding_rule_port_match") 1245 passed = True 1246 try: 1247 wait_for_healthy_backends(gcp, backend_service, instance_group) 1248 backend_instances = get_instance_names(gcp, instance_group) 1249 wait_until_all_rpcs_go_to_given_backends(backend_instances, 1250 _WAIT_FOR_STATS_SEC) 1251 delete_global_forwarding_rules(gcp) 1252 create_global_forwarding_rule(gcp, forwarding_rule_name, [ 1253 x for x in parse_port_range(_DEFAULT_PORT_RANGE) 1254 if x != gcp.service_port 1255 ]) 1256 wait_until_no_rpcs_go_to_given_backends(backend_instances, 1257 _WAIT_FOR_STATS_SEC) 1258 except Exception: 1259 passed = False 1260 raise 1261 finally: 1262 if passed or not args.halt_after_fail: 1263 delete_global_forwarding_rules(gcp) 1264 create_global_forwarding_rule(gcp, forwarding_rule_name, 1265 potential_service_ports) 1266 if gcp.service_port != _DEFAULT_SERVICE_PORT: 1267 patch_url_map_host_rule_with_port(gcp, url_map_name, 1268 backend_service, 1269 service_host_name) 1270 server_uri = service_host_name + ':' + str(gcp.service_port) 1271 else: 1272 server_uri = service_host_name 1273 return server_uri 1274 1275 1276def test_forwarding_rule_default_port(gcp, backend_service, instance_group): 1277 logger.info("Running test_forwarding_rule_default_port") 1278 passed = True 1279 try: 1280 wait_for_healthy_backends(gcp, backend_service, instance_group) 1281 backend_instances = get_instance_names(gcp, instance_group) 1282 if gcp.service_port == _DEFAULT_SERVICE_PORT: 1283 wait_until_all_rpcs_go_to_given_backends(backend_instances, 1284 _WAIT_FOR_STATS_SEC) 1285 delete_global_forwarding_rules(gcp) 1286 create_global_forwarding_rule(gcp, forwarding_rule_name, 1287 parse_port_range(_DEFAULT_PORT_RANGE)) 1288 patch_url_map_host_rule_with_port(gcp, url_map_name, 1289 backend_service, 1290 service_host_name) 1291 wait_until_no_rpcs_go_to_given_backends(backend_instances, 1292 _WAIT_FOR_STATS_SEC) 1293 # expect success when no port in client request service uri, and no port in url-map 1294 delete_global_forwarding_rule(gcp, gcp.global_forwarding_rules[0]) 1295 delete_target_proxy(gcp, gcp.target_proxies[0]) 1296 delete_url_map(gcp, gcp.url_maps[0]) 1297 create_url_map(gcp, url_map_name, backend_service, service_host_name) 1298 create_target_proxy(gcp, target_proxy_name, False) 1299 potential_ip_addresses = [] 1300 max_attempts = 10 1301 for i in range(max_attempts): 1302 potential_ip_addresses.append('10.10.10.%d' % 1303 (random.randint(0, 255))) 1304 create_global_forwarding_rule(gcp, forwarding_rule_name, [80], 1305 potential_ip_addresses) 1306 wait_until_all_rpcs_go_to_given_backends(backend_instances, 1307 _WAIT_FOR_STATS_SEC) 1308 1309 # expect failure when no port in client request uri, but specify port in url-map 1310 patch_url_map_host_rule_with_port(gcp, url_map_name, backend_service, 1311 service_host_name) 1312 wait_until_no_rpcs_go_to_given_backends(backend_instances, 1313 _WAIT_FOR_STATS_SEC) 1314 except Exception: 1315 passed = False 1316 raise 1317 finally: 1318 if passed or not args.halt_after_fail: 1319 delete_global_forwarding_rules(gcp) 1320 delete_target_proxies(gcp) 1321 delete_url_maps(gcp) 1322 create_url_map(gcp, url_map_name, backend_service, 1323 service_host_name) 1324 create_target_proxy(gcp, target_proxy_name) 1325 create_global_forwarding_rule(gcp, forwarding_rule_name, 1326 potential_service_ports) 1327 if gcp.service_port != _DEFAULT_SERVICE_PORT: 1328 patch_url_map_host_rule_with_port(gcp, url_map_name, 1329 backend_service, 1330 service_host_name) 1331 server_uri = service_host_name + ':' + str(gcp.service_port) 1332 else: 1333 server_uri = service_host_name 1334 return server_uri 1335 1336 1337def test_traffic_splitting(gcp, original_backend_service, instance_group, 1338 alternate_backend_service, same_zone_instance_group): 1339 # This test start with all traffic going to original_backend_service. Then 1340 # it updates URL-map to set default action to traffic splitting between 1341 # original and alternate. It waits for all backends in both services to 1342 # receive traffic, then verifies that weights are expected. 1343 logger.info('Running test_traffic_splitting') 1344 1345 original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests( 1346 gcp, original_backend_service, instance_group, 1347 alternate_backend_service, same_zone_instance_group) 1348 1349 passed = True 1350 try: 1351 # Patch urlmap, change route action to traffic splitting between 1352 # original and alternate. 1353 logger.info('patching url map with traffic splitting') 1354 original_service_percentage, alternate_service_percentage = 20, 80 1355 patch_url_map_backend_service( 1356 gcp, 1357 services_with_weights={ 1358 original_backend_service: original_service_percentage, 1359 alternate_backend_service: alternate_service_percentage, 1360 }) 1361 # Split percentage between instances: [20,80] -> [10,10,40,40]. 1362 expected_instance_percentage = [ 1363 original_service_percentage * 1.0 / len(original_backend_instances) 1364 ] * len(original_backend_instances) + [ 1365 alternate_service_percentage * 1.0 / 1366 len(alternate_backend_instances) 1367 ] * len(alternate_backend_instances) 1368 1369 # Wait for traffic to go to both services. 1370 logger.info( 1371 'waiting for traffic to go to all backends (including alternate)') 1372 wait_until_all_rpcs_go_to_given_backends( 1373 original_backend_instances + alternate_backend_instances, 1374 _WAIT_FOR_STATS_SEC) 1375 1376 # Verify that weights between two services are expected. 1377 retry_count = 10 1378 # Each attempt takes about 10 seconds, 10 retries is equivalent to 100 1379 # seconds timeout. 1380 for i in range(retry_count): 1381 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) 1382 got_instance_count = [ 1383 stats.rpcs_by_peer[i] for i in original_backend_instances 1384 ] + [stats.rpcs_by_peer[i] for i in alternate_backend_instances] 1385 total_count = sum(got_instance_count) 1386 got_instance_percentage = [ 1387 x * 100.0 / total_count for x in got_instance_count 1388 ] 1389 1390 try: 1391 compare_distributions(got_instance_percentage, 1392 expected_instance_percentage, 5) 1393 except Exception as e: 1394 logger.info('attempt %d', i) 1395 logger.info('got percentage: %s', got_instance_percentage) 1396 logger.info('expected percentage: %s', 1397 expected_instance_percentage) 1398 logger.info(e) 1399 if i == retry_count - 1: 1400 raise Exception( 1401 'RPC distribution (%s) differs from expected (%s)' % 1402 (got_instance_percentage, expected_instance_percentage)) 1403 else: 1404 logger.info("success") 1405 break 1406 except Exception: 1407 passed = False 1408 raise 1409 finally: 1410 if passed or not args.halt_after_fail: 1411 patch_url_map_backend_service(gcp, original_backend_service) 1412 patch_backend_service(gcp, alternate_backend_service, []) 1413 1414 1415def test_path_matching(gcp, original_backend_service, instance_group, 1416 alternate_backend_service, same_zone_instance_group): 1417 # This test start with all traffic (UnaryCall and EmptyCall) going to 1418 # original_backend_service. 1419 # 1420 # Then it updates URL-map to add routes, to make UnaryCall and EmptyCall to 1421 # go different backends. It waits for all backends in both services to 1422 # receive traffic, then verifies that traffic goes to the expected 1423 # backends. 1424 logger.info('Running test_path_matching') 1425 1426 original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests( 1427 gcp, original_backend_service, instance_group, 1428 alternate_backend_service, same_zone_instance_group) 1429 1430 passed = True 1431 try: 1432 # A list of tuples (route_rules, expected_instances). 1433 test_cases = [ 1434 ( 1435 [{ 1436 'priority': 0, 1437 # FullPath EmptyCall -> alternate_backend_service. 1438 'matchRules': [{ 1439 'fullPathMatch': '/grpc.testing.TestService/EmptyCall' 1440 }], 1441 'service': alternate_backend_service.url 1442 }], 1443 { 1444 "EmptyCall": alternate_backend_instances, 1445 "UnaryCall": original_backend_instances 1446 }), 1447 ( 1448 [{ 1449 'priority': 0, 1450 # Prefix UnaryCall -> alternate_backend_service. 1451 'matchRules': [{ 1452 'prefixMatch': '/grpc.testing.TestService/Unary' 1453 }], 1454 'service': alternate_backend_service.url 1455 }], 1456 { 1457 "UnaryCall": alternate_backend_instances, 1458 "EmptyCall": original_backend_instances 1459 }), 1460 ( 1461 # This test case is similar to the one above (but with route 1462 # services swapped). This test has two routes (full_path and 1463 # the default) to match EmptyCall, and both routes set 1464 # alternative_backend_service as the action. This forces the 1465 # client to handle duplicate Clusters in the RDS response. 1466 [ 1467 { 1468 'priority': 0, 1469 # Prefix UnaryCall -> original_backend_service. 1470 'matchRules': [{ 1471 'prefixMatch': '/grpc.testing.TestService/Unary' 1472 }], 1473 'service': original_backend_service.url 1474 }, 1475 { 1476 'priority': 1, 1477 # FullPath EmptyCall -> alternate_backend_service. 1478 'matchRules': [{ 1479 'fullPathMatch': 1480 '/grpc.testing.TestService/EmptyCall' 1481 }], 1482 'service': alternate_backend_service.url 1483 } 1484 ], 1485 { 1486 "UnaryCall": original_backend_instances, 1487 "EmptyCall": alternate_backend_instances 1488 }), 1489 ( 1490 [{ 1491 'priority': 0, 1492 # Regex UnaryCall -> alternate_backend_service. 1493 'matchRules': [{ 1494 'regexMatch': 1495 '^\/.*\/UnaryCall$' # Unary methods with any services. 1496 }], 1497 'service': alternate_backend_service.url 1498 }], 1499 { 1500 "UnaryCall": alternate_backend_instances, 1501 "EmptyCall": original_backend_instances 1502 }), 1503 ( 1504 [{ 1505 'priority': 0, 1506 # ignoreCase EmptyCall -> alternate_backend_service. 1507 'matchRules': [{ 1508 # Case insensitive matching. 1509 'fullPathMatch': '/gRpC.tEsTinG.tEstseRvice/empTycaLl', 1510 'ignoreCase': True, 1511 }], 1512 'service': alternate_backend_service.url 1513 }], 1514 { 1515 "UnaryCall": original_backend_instances, 1516 "EmptyCall": alternate_backend_instances 1517 }), 1518 ] 1519 1520 for (route_rules, expected_instances) in test_cases: 1521 logger.info('patching url map with %s', route_rules) 1522 patch_url_map_backend_service(gcp, 1523 original_backend_service, 1524 route_rules=route_rules) 1525 1526 # Wait for traffic to go to both services. 1527 logger.info( 1528 'waiting for traffic to go to all backends (including alternate)' 1529 ) 1530 wait_until_all_rpcs_go_to_given_backends( 1531 original_backend_instances + alternate_backend_instances, 1532 _WAIT_FOR_STATS_SEC) 1533 1534 retry_count = 80 1535 # Each attempt takes about 5 seconds, 80 retries is equivalent to 400 1536 # seconds timeout. 1537 for i in range(retry_count): 1538 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) 1539 if not stats.rpcs_by_method: 1540 raise ValueError( 1541 'stats.rpcs_by_method is None, the interop client stats service does not support this test case' 1542 ) 1543 logger.info('attempt %d', i) 1544 if compare_expected_instances(stats, expected_instances): 1545 logger.info("success") 1546 break 1547 elif i == retry_count - 1: 1548 raise Exception( 1549 'timeout waiting for RPCs to the expected instances: %s' 1550 % expected_instances) 1551 except Exception: 1552 passed = False 1553 raise 1554 finally: 1555 if passed or not args.halt_after_fail: 1556 patch_url_map_backend_service(gcp, original_backend_service) 1557 patch_backend_service(gcp, alternate_backend_service, []) 1558 1559 1560def test_header_matching(gcp, original_backend_service, instance_group, 1561 alternate_backend_service, same_zone_instance_group): 1562 # This test start with all traffic (UnaryCall and EmptyCall) going to 1563 # original_backend_service. 1564 # 1565 # Then it updates URL-map to add routes, to make RPCs with test headers to 1566 # go to different backends. It waits for all backends in both services to 1567 # receive traffic, then verifies that traffic goes to the expected 1568 # backends. 1569 logger.info('Running test_header_matching') 1570 1571 original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests( 1572 gcp, original_backend_service, instance_group, 1573 alternate_backend_service, same_zone_instance_group) 1574 1575 passed = True 1576 try: 1577 # A list of tuples (route_rules, expected_instances). 1578 test_cases = [ 1579 ( 1580 [{ 1581 'priority': 0, 1582 # Header ExactMatch -> alternate_backend_service. 1583 # EmptyCall is sent with the metadata. 1584 'matchRules': [{ 1585 'prefixMatch': 1586 '/', 1587 'headerMatches': [{ 1588 'headerName': _TEST_METADATA_KEY, 1589 'exactMatch': _TEST_METADATA_VALUE_EMPTY 1590 }] 1591 }], 1592 'service': alternate_backend_service.url 1593 }], 1594 { 1595 "EmptyCall": alternate_backend_instances, 1596 "UnaryCall": original_backend_instances 1597 }), 1598 ( 1599 [{ 1600 'priority': 0, 1601 # Header PrefixMatch -> alternate_backend_service. 1602 # UnaryCall is sent with the metadata. 1603 'matchRules': [{ 1604 'prefixMatch': 1605 '/', 1606 'headerMatches': [{ 1607 'headerName': _TEST_METADATA_KEY, 1608 'prefixMatch': _TEST_METADATA_VALUE_UNARY[:2] 1609 }] 1610 }], 1611 'service': alternate_backend_service.url 1612 }], 1613 { 1614 "EmptyCall": original_backend_instances, 1615 "UnaryCall": alternate_backend_instances 1616 }), 1617 ( 1618 [{ 1619 'priority': 0, 1620 # Header SuffixMatch -> alternate_backend_service. 1621 # EmptyCall is sent with the metadata. 1622 'matchRules': [{ 1623 'prefixMatch': 1624 '/', 1625 'headerMatches': [{ 1626 'headerName': _TEST_METADATA_KEY, 1627 'suffixMatch': _TEST_METADATA_VALUE_EMPTY[-2:] 1628 }] 1629 }], 1630 'service': alternate_backend_service.url 1631 }], 1632 { 1633 "EmptyCall": alternate_backend_instances, 1634 "UnaryCall": original_backend_instances 1635 }), 1636 ( 1637 [{ 1638 'priority': 0, 1639 # Header 'xds_md_numeric' present -> alternate_backend_service. 1640 # UnaryCall is sent with the metadata, so will be sent to alternative. 1641 'matchRules': [{ 1642 'prefixMatch': 1643 '/', 1644 'headerMatches': [{ 1645 'headerName': _TEST_METADATA_NUMERIC_KEY, 1646 'presentMatch': True 1647 }] 1648 }], 1649 'service': alternate_backend_service.url 1650 }], 1651 { 1652 "EmptyCall": original_backend_instances, 1653 "UnaryCall": alternate_backend_instances 1654 }), 1655 ( 1656 [{ 1657 'priority': 0, 1658 # Header invert ExactMatch -> alternate_backend_service. 1659 # UnaryCall is sent with the metadata, so will be sent to 1660 # original. EmptyCall will be sent to alternative. 1661 'matchRules': [{ 1662 'prefixMatch': 1663 '/', 1664 'headerMatches': [{ 1665 'headerName': _TEST_METADATA_KEY, 1666 'exactMatch': _TEST_METADATA_VALUE_UNARY, 1667 'invertMatch': True 1668 }] 1669 }], 1670 'service': alternate_backend_service.url 1671 }], 1672 { 1673 "EmptyCall": alternate_backend_instances, 1674 "UnaryCall": original_backend_instances 1675 }), 1676 ( 1677 [{ 1678 'priority': 0, 1679 # Header 'xds_md_numeric' range [100,200] -> alternate_backend_service. 1680 # UnaryCall is sent with the metadata in range. 1681 'matchRules': [{ 1682 'prefixMatch': 1683 '/', 1684 'headerMatches': [{ 1685 'headerName': _TEST_METADATA_NUMERIC_KEY, 1686 'rangeMatch': { 1687 'rangeStart': '100', 1688 'rangeEnd': '200' 1689 } 1690 }] 1691 }], 1692 'service': alternate_backend_service.url 1693 }], 1694 { 1695 "EmptyCall": original_backend_instances, 1696 "UnaryCall": alternate_backend_instances 1697 }), 1698 ( 1699 [{ 1700 'priority': 0, 1701 # Header RegexMatch -> alternate_backend_service. 1702 # EmptyCall is sent with the metadata. 1703 'matchRules': [{ 1704 'prefixMatch': 1705 '/', 1706 'headerMatches': [{ 1707 'headerName': 1708 _TEST_METADATA_KEY, 1709 'regexMatch': 1710 "^%s.*%s$" % (_TEST_METADATA_VALUE_EMPTY[:2], 1711 _TEST_METADATA_VALUE_EMPTY[-2:]) 1712 }] 1713 }], 1714 'service': alternate_backend_service.url 1715 }], 1716 { 1717 "EmptyCall": alternate_backend_instances, 1718 "UnaryCall": original_backend_instances 1719 }), 1720 ] 1721 1722 for (route_rules, expected_instances) in test_cases: 1723 logger.info('patching url map with %s -> alternative', 1724 route_rules[0]['matchRules']) 1725 patch_url_map_backend_service(gcp, 1726 original_backend_service, 1727 route_rules=route_rules) 1728 1729 # Wait for traffic to go to both services. 1730 logger.info( 1731 'waiting for traffic to go to all backends (including alternate)' 1732 ) 1733 wait_until_all_rpcs_go_to_given_backends( 1734 original_backend_instances + alternate_backend_instances, 1735 _WAIT_FOR_STATS_SEC) 1736 1737 retry_count = 80 1738 # Each attempt takes about 5 seconds, 80 retries is equivalent to 400 1739 # seconds timeout. 1740 for i in range(retry_count): 1741 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) 1742 if not stats.rpcs_by_method: 1743 raise ValueError( 1744 'stats.rpcs_by_method is None, the interop client stats service does not support this test case' 1745 ) 1746 logger.info('attempt %d', i) 1747 if compare_expected_instances(stats, expected_instances): 1748 logger.info("success") 1749 break 1750 elif i == retry_count - 1: 1751 raise Exception( 1752 'timeout waiting for RPCs to the expected instances: %s' 1753 % expected_instances) 1754 except Exception: 1755 passed = False 1756 raise 1757 finally: 1758 if passed or not args.halt_after_fail: 1759 patch_url_map_backend_service(gcp, original_backend_service) 1760 patch_backend_service(gcp, alternate_backend_service, []) 1761 1762 1763def test_circuit_breaking(gcp, original_backend_service, instance_group, 1764 same_zone_instance_group): 1765 ''' 1766 Since backend service circuit_breakers configuration cannot be unset, 1767 which causes trouble for restoring validate_for_proxy flag in target 1768 proxy/global forwarding rule. This test uses dedicated backend sevices. 1769 The url_map and backend services undergoes the following state changes: 1770 1771 Before test: 1772 original_backend_service -> [instance_group] 1773 extra_backend_service -> [] 1774 more_extra_backend_service -> [] 1775 1776 url_map -> [original_backend_service] 1777 1778 In test: 1779 extra_backend_service (with circuit_breakers) -> [instance_group] 1780 more_extra_backend_service (with circuit_breakers) -> [same_zone_instance_group] 1781 1782 url_map -> [extra_backend_service, more_extra_backend_service] 1783 1784 After test: 1785 original_backend_service -> [instance_group] 1786 extra_backend_service (with circuit_breakers) -> [] 1787 more_extra_backend_service (with circuit_breakers) -> [] 1788 1789 url_map -> [original_backend_service] 1790 ''' 1791 logger.info('Running test_circuit_breaking') 1792 additional_backend_services = [] 1793 passed = True 1794 try: 1795 # TODO(chengyuanzhang): Dedicated backend services created for circuit 1796 # breaking test. Once the issue for unsetting backend service circuit 1797 # breakers is resolved or configuring backend service circuit breakers is 1798 # enabled for config validation, these dedicated backend services can be 1799 # eliminated. 1800 extra_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-extra' + gcp_suffix 1801 more_extra_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-more-extra' + gcp_suffix 1802 extra_backend_service = add_backend_service(gcp, 1803 extra_backend_service_name) 1804 additional_backend_services.append(extra_backend_service) 1805 more_extra_backend_service = add_backend_service( 1806 gcp, more_extra_backend_service_name) 1807 additional_backend_services.append(more_extra_backend_service) 1808 # The config validation for proxyless doesn't allow setting 1809 # circuit_breakers. Disable validate validate_for_proxyless 1810 # for this test. This can be removed when validation 1811 # accepts circuit_breakers. 1812 logger.info('disabling validate_for_proxyless in target proxy') 1813 set_validate_for_proxyless(gcp, False) 1814 extra_backend_service_max_requests = 500 1815 more_extra_backend_service_max_requests = 1000 1816 patch_backend_service(gcp, 1817 extra_backend_service, [instance_group], 1818 circuit_breakers={ 1819 'maxRequests': 1820 extra_backend_service_max_requests 1821 }) 1822 logger.info('Waiting for extra backends to become healthy') 1823 wait_for_healthy_backends(gcp, extra_backend_service, instance_group) 1824 patch_backend_service(gcp, 1825 more_extra_backend_service, 1826 [same_zone_instance_group], 1827 circuit_breakers={ 1828 'maxRequests': 1829 more_extra_backend_service_max_requests 1830 }) 1831 logger.info('Waiting for more extra backend to become healthy') 1832 wait_for_healthy_backends(gcp, more_extra_backend_service, 1833 same_zone_instance_group) 1834 extra_backend_instances = get_instance_names(gcp, instance_group) 1835 more_extra_backend_instances = get_instance_names( 1836 gcp, same_zone_instance_group) 1837 route_rules = [ 1838 { 1839 'priority': 0, 1840 # UnaryCall -> extra_backend_service 1841 'matchRules': [{ 1842 'fullPathMatch': '/grpc.testing.TestService/UnaryCall' 1843 }], 1844 'service': extra_backend_service.url 1845 }, 1846 { 1847 'priority': 1, 1848 # EmptyCall -> more_extra_backend_service 1849 'matchRules': [{ 1850 'fullPathMatch': '/grpc.testing.TestService/EmptyCall' 1851 }], 1852 'service': more_extra_backend_service.url 1853 }, 1854 ] 1855 1856 # Make client send UNARY_CALL and EMPTY_CALL. 1857 configure_client([ 1858 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 1859 messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL 1860 ]) 1861 logger.info('Patching url map with %s', route_rules) 1862 patch_url_map_backend_service(gcp, 1863 extra_backend_service, 1864 route_rules=route_rules) 1865 logger.info('Waiting for traffic to go to all backends') 1866 wait_until_all_rpcs_go_to_given_backends( 1867 extra_backend_instances + more_extra_backend_instances, 1868 _WAIT_FOR_STATS_SEC) 1869 1870 # Make all calls keep-open. 1871 configure_client([ 1872 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 1873 messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL 1874 ], [(messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 1875 'rpc-behavior', 'keep-open'), 1876 (messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL, 1877 'rpc-behavior', 'keep-open')]) 1878 wait_until_rpcs_in_flight( 1879 'UNARY_CALL', (_WAIT_FOR_BACKEND_SEC + 1880 int(extra_backend_service_max_requests / args.qps)), 1881 extra_backend_service_max_requests, 1) 1882 logger.info('UNARY_CALL reached stable state (%d)', 1883 extra_backend_service_max_requests) 1884 wait_until_rpcs_in_flight( 1885 'EMPTY_CALL', 1886 (_WAIT_FOR_BACKEND_SEC + 1887 int(more_extra_backend_service_max_requests / args.qps)), 1888 more_extra_backend_service_max_requests, 1) 1889 logger.info('EMPTY_CALL reached stable state (%d)', 1890 more_extra_backend_service_max_requests) 1891 1892 # Increment circuit breakers max_requests threshold. 1893 extra_backend_service_max_requests = 800 1894 patch_backend_service(gcp, 1895 extra_backend_service, [instance_group], 1896 circuit_breakers={ 1897 'maxRequests': 1898 extra_backend_service_max_requests 1899 }) 1900 wait_until_rpcs_in_flight( 1901 'UNARY_CALL', (_WAIT_FOR_BACKEND_SEC + 1902 int(extra_backend_service_max_requests / args.qps)), 1903 extra_backend_service_max_requests, 1) 1904 logger.info('UNARY_CALL reached stable state after increase (%d)', 1905 extra_backend_service_max_requests) 1906 logger.info('success') 1907 # Avoid new RPCs being outstanding (some test clients create threads 1908 # for sending RPCs) after restoring backend services. 1909 configure_client( 1910 [messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL]) 1911 except Exception: 1912 passed = False 1913 raise 1914 finally: 1915 if passed or not args.halt_after_fail: 1916 patch_url_map_backend_service(gcp, original_backend_service) 1917 patch_backend_service(gcp, original_backend_service, 1918 [instance_group]) 1919 for backend_service in additional_backend_services: 1920 delete_backend_service(gcp, backend_service) 1921 set_validate_for_proxyless(gcp, True) 1922 1923 1924def test_timeout(gcp, original_backend_service, instance_group): 1925 logger.info('Running test_timeout') 1926 1927 logger.info('waiting for original backends to become healthy') 1928 wait_for_healthy_backends(gcp, original_backend_service, instance_group) 1929 1930 # UnaryCall -> maxStreamDuration:3s 1931 route_rules = [{ 1932 'priority': 0, 1933 'matchRules': [{ 1934 'fullPathMatch': '/grpc.testing.TestService/UnaryCall' 1935 }], 1936 'service': original_backend_service.url, 1937 'routeAction': { 1938 'maxStreamDuration': { 1939 'seconds': 3, 1940 }, 1941 }, 1942 }] 1943 patch_url_map_backend_service(gcp, 1944 original_backend_service, 1945 route_rules=route_rules) 1946 # A list of tuples (testcase_name, {client_config}, {expected_results}) 1947 test_cases = [ 1948 ( 1949 'timeout_exceeded (UNARY_CALL), timeout_different_route (EMPTY_CALL)', 1950 # UnaryCall and EmptyCall both sleep-4. 1951 # UnaryCall timeouts, EmptyCall succeeds. 1952 { 1953 'rpc_types': [ 1954 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 1955 messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL, 1956 ], 1957 'metadata': [ 1958 (messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 1959 'rpc-behavior', 'sleep-4'), 1960 (messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL, 1961 'rpc-behavior', 'sleep-4'), 1962 ], 1963 }, 1964 { 1965 'UNARY_CALL': 4, # DEADLINE_EXCEEDED 1966 'EMPTY_CALL': 0, 1967 }, 1968 ), 1969 ( 1970 'app_timeout_exceeded', 1971 # UnaryCall only with sleep-2; timeout=1s; calls timeout. 1972 { 1973 'rpc_types': [ 1974 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 1975 ], 1976 'metadata': [ 1977 (messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 1978 'rpc-behavior', 'sleep-2'), 1979 ], 1980 'timeout_sec': 1, 1981 }, 1982 { 1983 'UNARY_CALL': 4, # DEADLINE_EXCEEDED 1984 }, 1985 ), 1986 ( 1987 'timeout_not_exceeded', 1988 # UnaryCall only with no sleep; calls succeed. 1989 { 1990 'rpc_types': [ 1991 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 1992 ], 1993 }, 1994 { 1995 'UNARY_CALL': 0, 1996 }, 1997 ) 1998 ] 1999 2000 passed = True 2001 try: 2002 first_case = True 2003 for (testcase_name, client_config, expected_results) in test_cases: 2004 logger.info('starting case %s', testcase_name) 2005 configure_client(**client_config) 2006 # wait a second to help ensure the client stops sending RPCs with 2007 # the old config. We will make multiple attempts if it is failing, 2008 # but this improves confidence that the test is valid if the 2009 # previous client_config would lead to the same results. 2010 time.sleep(1) 2011 # Each attempt takes 10 seconds; 20 attempts is equivalent to 200 2012 # second timeout. 2013 attempt_count = 20 2014 if first_case: 2015 attempt_count = 120 2016 first_case = False 2017 before_stats = get_client_accumulated_stats() 2018 if not before_stats.stats_per_method: 2019 raise ValueError( 2020 'stats.stats_per_method is None, the interop client stats service does not support this test case' 2021 ) 2022 for i in range(attempt_count): 2023 logger.info('%s: attempt %d', testcase_name, i) 2024 2025 test_runtime_secs = 10 2026 time.sleep(test_runtime_secs) 2027 after_stats = get_client_accumulated_stats() 2028 2029 success = True 2030 for rpc, status in list(expected_results.items()): 2031 qty = (after_stats.stats_per_method[rpc].result[status] - 2032 before_stats.stats_per_method[rpc].result[status]) 2033 want = test_runtime_secs * args.qps 2034 # Allow 10% deviation from expectation to reduce flakiness 2035 if qty < (want * .9) or qty > (want * 1.1): 2036 logger.info('%s: failed due to %s[%s]: got %d want ~%d', 2037 testcase_name, rpc, status, qty, want) 2038 success = False 2039 if success: 2040 logger.info('success') 2041 break 2042 logger.info('%s attempt %d failed', testcase_name, i) 2043 before_stats = after_stats 2044 else: 2045 raise Exception( 2046 '%s: timeout waiting for expected results: %s; got %s' % 2047 (testcase_name, expected_results, 2048 after_stats.stats_per_method)) 2049 except Exception: 2050 passed = False 2051 raise 2052 finally: 2053 if passed or not args.halt_after_fail: 2054 patch_url_map_backend_service(gcp, original_backend_service) 2055 2056 2057def test_fault_injection(gcp, original_backend_service, instance_group): 2058 logger.info('Running test_fault_injection') 2059 2060 logger.info('waiting for original backends to become healthy') 2061 wait_for_healthy_backends(gcp, original_backend_service, instance_group) 2062 2063 testcase_header = 'fi_testcase' 2064 2065 def _route(pri, name, fi_policy): 2066 return { 2067 'priority': pri, 2068 'matchRules': [{ 2069 'prefixMatch': 2070 '/', 2071 'headerMatches': [{ 2072 'headerName': testcase_header, 2073 'exactMatch': name, 2074 }], 2075 }], 2076 'service': original_backend_service.url, 2077 'routeAction': { 2078 'faultInjectionPolicy': fi_policy 2079 }, 2080 } 2081 2082 def _abort(pct): 2083 return { 2084 'abort': { 2085 'httpStatus': 401, 2086 'percentage': pct, 2087 } 2088 } 2089 2090 def _delay(pct): 2091 return { 2092 'delay': { 2093 'fixedDelay': { 2094 'seconds': '20' 2095 }, 2096 'percentage': pct, 2097 } 2098 } 2099 2100 zero_route = _abort(0) 2101 zero_route.update(_delay(0)) 2102 route_rules = [ 2103 _route(0, 'zero_percent_fault_injection', zero_route), 2104 _route(1, 'always_delay', _delay(100)), 2105 _route(2, 'always_abort', _abort(100)), 2106 _route(3, 'delay_half', _delay(50)), 2107 _route(4, 'abort_half', _abort(50)), 2108 { 2109 'priority': 5, 2110 'matchRules': [{ 2111 'prefixMatch': '/' 2112 }], 2113 'service': original_backend_service.url, 2114 }, 2115 ] 2116 set_validate_for_proxyless(gcp, False) 2117 patch_url_map_backend_service(gcp, 2118 original_backend_service, 2119 route_rules=route_rules) 2120 # A list of tuples (testcase_name, {client_config}, {code: percent}). Each 2121 # test case will set the testcase_header with the testcase_name for routing 2122 # to the appropriate config for the case, defined above. 2123 test_cases = [ 2124 ( 2125 'always_delay', 2126 { 2127 'timeout_sec': 2 2128 }, 2129 { 2130 4: 1 2131 }, # DEADLINE_EXCEEDED 2132 ), 2133 ( 2134 'always_abort', 2135 {}, 2136 { 2137 16: 1 2138 }, # UNAUTHENTICATED 2139 ), 2140 ( 2141 'delay_half', 2142 { 2143 'timeout_sec': 2 2144 }, 2145 { 2146 4: .5, 2147 0: .5 2148 }, # DEADLINE_EXCEEDED / OK: 50% / 50% 2149 ), 2150 ( 2151 'abort_half', 2152 {}, 2153 { 2154 16: .5, 2155 0: .5 2156 }, # UNAUTHENTICATED / OK: 50% / 50% 2157 ), 2158 ( 2159 'zero_percent_fault_injection', 2160 {}, 2161 { 2162 0: 1 2163 }, # OK 2164 ), 2165 ( 2166 'non_matching_fault_injection', # Not in route_rules, above. 2167 {}, 2168 { 2169 0: 1 2170 }, # OK 2171 ), 2172 ] 2173 2174 passed = True 2175 try: 2176 first_case = True 2177 for (testcase_name, client_config, expected_results) in test_cases: 2178 logger.info('starting case %s', testcase_name) 2179 2180 client_config['metadata'] = [ 2181 (messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 2182 testcase_header, testcase_name) 2183 ] 2184 client_config['rpc_types'] = [ 2185 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 2186 ] 2187 configure_client(**client_config) 2188 # wait a second to help ensure the client stops sending RPCs with 2189 # the old config. We will make multiple attempts if it is failing, 2190 # but this improves confidence that the test is valid if the 2191 # previous client_config would lead to the same results. 2192 time.sleep(1) 2193 # Each attempt takes 10 seconds 2194 if first_case: 2195 # Give the first test case 600s for xDS config propagation. 2196 attempt_count = 60 2197 first_case = False 2198 else: 2199 # The accumulated stats might include previous sub-test, running 2200 # the test multiple times to deflake 2201 attempt_count = 10 2202 before_stats = get_client_accumulated_stats() 2203 if not before_stats.stats_per_method: 2204 raise ValueError( 2205 'stats.stats_per_method is None, the interop client stats service does not support this test case' 2206 ) 2207 for i in range(attempt_count): 2208 logger.info('%s: attempt %d', testcase_name, i) 2209 2210 test_runtime_secs = 10 2211 time.sleep(test_runtime_secs) 2212 after_stats = get_client_accumulated_stats() 2213 2214 success = True 2215 for status, pct in list(expected_results.items()): 2216 rpc = 'UNARY_CALL' 2217 qty = (after_stats.stats_per_method[rpc].result[status] - 2218 before_stats.stats_per_method[rpc].result[status]) 2219 want = pct * args.qps * test_runtime_secs 2220 # Allow 10% deviation from expectation to reduce flakiness 2221 VARIANCE_ALLOWED = 0.1 2222 if abs(qty - want) > want * VARIANCE_ALLOWED: 2223 logger.info('%s: failed due to %s[%s]: got %d want ~%d', 2224 testcase_name, rpc, status, qty, want) 2225 success = False 2226 if success: 2227 logger.info('success') 2228 break 2229 logger.info('%s attempt %d failed', testcase_name, i) 2230 before_stats = after_stats 2231 else: 2232 raise Exception( 2233 '%s: timeout waiting for expected results: %s; got %s' % 2234 (testcase_name, expected_results, 2235 after_stats.stats_per_method)) 2236 except Exception: 2237 passed = False 2238 raise 2239 finally: 2240 if passed or not args.halt_after_fail: 2241 patch_url_map_backend_service(gcp, original_backend_service) 2242 set_validate_for_proxyless(gcp, True) 2243 2244 2245def test_csds(gcp, original_backend_service, instance_group, server_uri): 2246 test_csds_timeout_s = datetime.timedelta(minutes=5).total_seconds() 2247 sleep_interval_between_attempts_s = datetime.timedelta( 2248 seconds=2).total_seconds() 2249 logger.info('Running test_csds') 2250 2251 logger.info('waiting for original backends to become healthy') 2252 wait_for_healthy_backends(gcp, original_backend_service, instance_group) 2253 2254 # Test case timeout: 5 minutes 2255 deadline = time.time() + test_csds_timeout_s 2256 cnt = 0 2257 while time.time() <= deadline: 2258 client_config = get_client_xds_config_dump() 2259 logger.info('test_csds attempt %d: received xDS config %s', cnt, 2260 json.dumps(client_config, indent=2)) 2261 if client_config is not None: 2262 # Got the xDS config dump, now validate it 2263 ok = True 2264 try: 2265 if client_config['node']['locality']['zone'] != args.zone: 2266 logger.info('Invalid zone %s != %s', 2267 client_config['node']['locality']['zone'], 2268 args.zone) 2269 ok = False 2270 seen = set() 2271 for xds_config in client_config.get('xds_config', []): 2272 if 'listener_config' in xds_config: 2273 listener_name = xds_config['listener_config'][ 2274 'dynamic_listeners'][0]['active_state']['listener'][ 2275 'name'] 2276 if listener_name != server_uri: 2277 logger.info('Invalid Listener name %s != %s', 2278 listener_name, server_uri) 2279 ok = False 2280 else: 2281 seen.add('lds') 2282 elif 'route_config' in xds_config: 2283 num_vh = len( 2284 xds_config['route_config']['dynamic_route_configs'] 2285 [0]['route_config']['virtual_hosts']) 2286 if num_vh <= 0: 2287 logger.info('Invalid number of VirtualHosts %s', 2288 num_vh) 2289 ok = False 2290 else: 2291 seen.add('rds') 2292 elif 'cluster_config' in xds_config: 2293 cluster_type = xds_config['cluster_config'][ 2294 'dynamic_active_clusters'][0]['cluster']['type'] 2295 if cluster_type != 'EDS': 2296 logger.info('Invalid cluster type %s != EDS', 2297 cluster_type) 2298 ok = False 2299 else: 2300 seen.add('cds') 2301 elif 'endpoint_config' in xds_config: 2302 sub_zone = xds_config["endpoint_config"][ 2303 "dynamic_endpoint_configs"][0]["endpoint_config"][ 2304 "endpoints"][0]["locality"]["sub_zone"] 2305 if args.zone not in sub_zone: 2306 logger.info('Invalid endpoint sub_zone %s', 2307 sub_zone) 2308 ok = False 2309 else: 2310 seen.add('eds') 2311 for generic_xds_config in client_config.get( 2312 'generic_xds_configs', []): 2313 if re.search(r'\.Listener$', 2314 generic_xds_config['type_url']): 2315 seen.add('lds') 2316 listener = generic_xds_config["xds_config"] 2317 if listener['name'] != server_uri: 2318 logger.info('Invalid Listener name %s != %s', 2319 listener_name, server_uri) 2320 ok = False 2321 elif re.search(r'\.RouteConfiguration$', 2322 generic_xds_config['type_url']): 2323 seen.add('rds') 2324 route_config = generic_xds_config["xds_config"] 2325 if not len(route_config['virtual_hosts']): 2326 logger.info('Invalid number of VirtualHosts %s', 2327 num_vh) 2328 ok = False 2329 elif re.search(r'\.Cluster$', 2330 generic_xds_config['type_url']): 2331 seen.add('cds') 2332 cluster = generic_xds_config["xds_config"] 2333 if cluster['type'] != 'EDS': 2334 logger.info('Invalid cluster type %s != EDS', 2335 cluster_type) 2336 ok = False 2337 elif re.search(r'\.ClusterLoadAssignment$', 2338 generic_xds_config['type_url']): 2339 seen.add('eds') 2340 endpoint = generic_xds_config["xds_config"] 2341 if args.zone not in endpoint["endpoints"][0][ 2342 "locality"]["sub_zone"]: 2343 logger.info('Invalid endpoint sub_zone %s', 2344 sub_zone) 2345 ok = False 2346 want = {'lds', 'rds', 'cds', 'eds'} 2347 if seen != want: 2348 logger.info('Incomplete xDS config dump, seen=%s', seen) 2349 ok = False 2350 except: 2351 logger.exception('Error in xDS config dump:') 2352 ok = False 2353 finally: 2354 if ok: 2355 # Successfully fetched xDS config, and they looks good. 2356 logger.info('success') 2357 return 2358 logger.info('test_csds attempt %d failed', cnt) 2359 # Give the client some time to fetch xDS resources 2360 time.sleep(sleep_interval_between_attempts_s) 2361 cnt += 1 2362 2363 raise RuntimeError('failed to receive a valid xDS config in %s seconds' % 2364 test_csds_timeout_s) 2365 2366 2367def set_validate_for_proxyless(gcp, validate_for_proxyless): 2368 if not gcp.alpha_compute: 2369 logger.debug( 2370 'Not setting validateForProxy because alpha is not enabled') 2371 return 2372 if len(gcp.global_forwarding_rules) != 1 or len( 2373 gcp.target_proxies) != 1 or len(gcp.url_maps) != 1: 2374 logger.debug( 2375 "Global forwarding rule, target proxy or url map not found.") 2376 return 2377 # This function deletes global_forwarding_rule and target_proxy, then 2378 # recreate target_proxy with validateForProxyless=False. This is necessary 2379 # because patching target_grpc_proxy isn't supported. 2380 delete_global_forwarding_rule(gcp, gcp.global_forwarding_rules[0]) 2381 delete_target_proxy(gcp, gcp.target_proxies[0]) 2382 create_target_proxy(gcp, target_proxy_name, validate_for_proxyless) 2383 create_global_forwarding_rule(gcp, forwarding_rule_name, [gcp.service_port]) 2384 2385 2386def get_serving_status(instance, service_port): 2387 with grpc.insecure_channel('%s:%d' % (instance, service_port)) as channel: 2388 health_stub = health_pb2_grpc.HealthStub(channel) 2389 return health_stub.Check(health_pb2.HealthCheckRequest()) 2390 2391 2392def set_serving_status(instances, service_port, serving): 2393 logger.info('setting %s serving status to %s', instances, serving) 2394 for instance in instances: 2395 with grpc.insecure_channel('%s:%d' % 2396 (instance, service_port)) as channel: 2397 logger.info('setting %s serving status to %s', instance, serving) 2398 stub = test_pb2_grpc.XdsUpdateHealthServiceStub(channel) 2399 retry_count = 5 2400 for i in range(5): 2401 if serving: 2402 stub.SetServing(empty_pb2.Empty()) 2403 else: 2404 stub.SetNotServing(empty_pb2.Empty()) 2405 serving_status = get_serving_status(instance, service_port) 2406 logger.info('got instance service status %s', serving_status) 2407 want_status = health_pb2.HealthCheckResponse.SERVING if serving else health_pb2.HealthCheckResponse.NOT_SERVING 2408 if serving_status.status == want_status: 2409 break 2410 if i == retry_count - 1: 2411 raise Exception( 2412 'failed to set instance service status after %d retries' 2413 % retry_count) 2414 2415 2416def is_primary_instance_group(gcp, instance_group): 2417 # Clients may connect to a TD instance in a different region than the 2418 # client, in which case primary/secondary assignments may not be based on 2419 # the client's actual locality. 2420 instance_names = get_instance_names(gcp, instance_group) 2421 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) 2422 return all( 2423 peer in instance_names for peer in list(stats.rpcs_by_peer.keys())) 2424 2425 2426def get_startup_script(path_to_server_binary, service_port): 2427 if path_to_server_binary: 2428 return 'nohup %s --port=%d 1>/dev/null &' % (path_to_server_binary, 2429 service_port) 2430 else: 2431 return """#!/bin/bash 2432sudo apt update 2433sudo apt install -y git default-jdk 2434mkdir java_server 2435pushd java_server 2436git clone https://github.com/grpc/grpc-java.git 2437pushd grpc-java 2438pushd interop-testing 2439../gradlew installDist -x test -PskipCodegen=true -PskipAndroid=true 2440 2441nohup build/install/grpc-interop-testing/bin/xds-test-server \ 2442 --port=%d 1>/dev/null &""" % service_port 2443 2444 2445def create_instance_template(gcp, name, network, source_image, machine_type, 2446 startup_script): 2447 config = { 2448 'name': name, 2449 'properties': { 2450 'tags': { 2451 'items': ['allow-health-checks'] 2452 }, 2453 'machineType': machine_type, 2454 'serviceAccounts': [{ 2455 'email': 'default', 2456 'scopes': ['https://www.googleapis.com/auth/cloud-platform',] 2457 }], 2458 'networkInterfaces': [{ 2459 'accessConfigs': [{ 2460 'type': 'ONE_TO_ONE_NAT' 2461 }], 2462 'network': network 2463 }], 2464 'disks': [{ 2465 'boot': True, 2466 'initializeParams': { 2467 'sourceImage': source_image 2468 }, 2469 'autoDelete': True 2470 }], 2471 'metadata': { 2472 'items': [{ 2473 'key': 'startup-script', 2474 'value': startup_script 2475 }] 2476 } 2477 } 2478 } 2479 2480 logger.debug('Sending GCP request with body=%s', config) 2481 result = gcp.compute.instanceTemplates().insert( 2482 project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES) 2483 wait_for_global_operation(gcp, result['name']) 2484 gcp.instance_template = GcpResource(config['name'], result['targetLink']) 2485 2486 2487def add_instance_group(gcp, zone, name, size): 2488 config = { 2489 'name': name, 2490 'instanceTemplate': gcp.instance_template.url, 2491 'targetSize': size, 2492 'namedPorts': [{ 2493 'name': 'grpc', 2494 'port': gcp.service_port 2495 }] 2496 } 2497 2498 logger.debug('Sending GCP request with body=%s', config) 2499 result = gcp.compute.instanceGroupManagers().insert( 2500 project=gcp.project, zone=zone, 2501 body=config).execute(num_retries=_GCP_API_RETRIES) 2502 wait_for_zone_operation(gcp, zone, result['name']) 2503 result = gcp.compute.instanceGroupManagers().get( 2504 project=gcp.project, zone=zone, 2505 instanceGroupManager=config['name']).execute( 2506 num_retries=_GCP_API_RETRIES) 2507 instance_group = InstanceGroup(config['name'], result['instanceGroup'], 2508 zone) 2509 gcp.instance_groups.append(instance_group) 2510 wait_for_instance_group_to_reach_expected_size(gcp, instance_group, size, 2511 _WAIT_FOR_OPERATION_SEC) 2512 return instance_group 2513 2514 2515def create_health_check(gcp, name): 2516 if gcp.alpha_compute: 2517 config = { 2518 'name': name, 2519 'type': 'GRPC', 2520 'grpcHealthCheck': { 2521 'portSpecification': 'USE_SERVING_PORT' 2522 } 2523 } 2524 compute_to_use = gcp.alpha_compute 2525 else: 2526 config = { 2527 'name': name, 2528 'type': 'TCP', 2529 'tcpHealthCheck': { 2530 'portName': 'grpc' 2531 } 2532 } 2533 compute_to_use = gcp.compute 2534 logger.debug('Sending GCP request with body=%s', config) 2535 result = compute_to_use.healthChecks().insert( 2536 project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES) 2537 wait_for_global_operation(gcp, result['name']) 2538 gcp.health_check = GcpResource(config['name'], result['targetLink']) 2539 2540 2541def create_health_check_firewall_rule(gcp, name): 2542 config = { 2543 'name': name, 2544 'direction': 'INGRESS', 2545 'allowed': [{ 2546 'IPProtocol': 'tcp' 2547 }], 2548 'sourceRanges': ['35.191.0.0/16', '130.211.0.0/22'], 2549 'targetTags': ['allow-health-checks'], 2550 } 2551 logger.debug('Sending GCP request with body=%s', config) 2552 result = gcp.compute.firewalls().insert( 2553 project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES) 2554 wait_for_global_operation(gcp, result['name']) 2555 gcp.health_check_firewall_rule = GcpResource(config['name'], 2556 result['targetLink']) 2557 2558 2559def add_backend_service(gcp, name): 2560 if gcp.alpha_compute: 2561 protocol = 'GRPC' 2562 compute_to_use = gcp.alpha_compute 2563 else: 2564 protocol = 'HTTP2' 2565 compute_to_use = gcp.compute 2566 config = { 2567 'name': name, 2568 'loadBalancingScheme': 'INTERNAL_SELF_MANAGED', 2569 'healthChecks': [gcp.health_check.url], 2570 'portName': 'grpc', 2571 'protocol': protocol 2572 } 2573 logger.debug('Sending GCP request with body=%s', config) 2574 result = compute_to_use.backendServices().insert( 2575 project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES) 2576 wait_for_global_operation(gcp, result['name']) 2577 backend_service = GcpResource(config['name'], result['targetLink']) 2578 gcp.backend_services.append(backend_service) 2579 return backend_service 2580 2581 2582def create_url_map(gcp, name, backend_service, host_name): 2583 config = { 2584 'name': name, 2585 'defaultService': backend_service.url, 2586 'pathMatchers': [{ 2587 'name': _PATH_MATCHER_NAME, 2588 'defaultService': backend_service.url, 2589 }], 2590 'hostRules': [{ 2591 'hosts': [host_name], 2592 'pathMatcher': _PATH_MATCHER_NAME 2593 }] 2594 } 2595 logger.debug('Sending GCP request with body=%s', config) 2596 result = gcp.compute.urlMaps().insert( 2597 project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES) 2598 wait_for_global_operation(gcp, result['name']) 2599 url_map = GcpResource(config['name'], result['targetLink']) 2600 gcp.url_maps.append(url_map) 2601 return url_map 2602 2603 2604def patch_url_map_host_rule_with_port(gcp, name, backend_service, host_name): 2605 config = { 2606 'hostRules': [{ 2607 'hosts': ['%s:%d' % (host_name, gcp.service_port)], 2608 'pathMatcher': _PATH_MATCHER_NAME 2609 }] 2610 } 2611 logger.debug('Sending GCP request with body=%s', config) 2612 result = gcp.compute.urlMaps().patch( 2613 project=gcp.project, urlMap=name, 2614 body=config).execute(num_retries=_GCP_API_RETRIES) 2615 wait_for_global_operation(gcp, result['name']) 2616 2617 2618def create_target_proxy(gcp, name, validate_for_proxyless=True, url_map=None): 2619 if url_map: 2620 arg_url_map_url = url_map.url 2621 else: 2622 arg_url_map_url = gcp.url_maps[0].url 2623 if gcp.alpha_compute: 2624 config = { 2625 'name': name, 2626 'url_map': arg_url_map_url, 2627 'validate_for_proxyless': validate_for_proxyless 2628 } 2629 logger.debug('Sending GCP request with body=%s', config) 2630 result = gcp.alpha_compute.targetGrpcProxies().insert( 2631 project=gcp.project, 2632 body=config).execute(num_retries=_GCP_API_RETRIES) 2633 else: 2634 config = { 2635 'name': name, 2636 'url_map': arg_url_map_url, 2637 } 2638 logger.debug('Sending GCP request with body=%s', config) 2639 result = gcp.compute.targetHttpProxies().insert( 2640 project=gcp.project, 2641 body=config).execute(num_retries=_GCP_API_RETRIES) 2642 wait_for_global_operation(gcp, result['name']) 2643 target_proxy = GcpResource(config['name'], result['targetLink']) 2644 gcp.target_proxies.append(target_proxy) 2645 return target_proxy 2646 2647 2648def create_global_forwarding_rule(gcp, 2649 name, 2650 potential_ports, 2651 potential_ip_addresses=['0.0.0.0'], 2652 target_proxy=None): 2653 if target_proxy: 2654 arg_target_proxy_url = target_proxy.url 2655 else: 2656 arg_target_proxy_url = gcp.target_proxies[0].url 2657 if gcp.alpha_compute: 2658 compute_to_use = gcp.alpha_compute 2659 else: 2660 compute_to_use = gcp.compute 2661 for port in potential_ports: 2662 for ip_address in potential_ip_addresses: 2663 try: 2664 config = { 2665 'name': name, 2666 'loadBalancingScheme': 'INTERNAL_SELF_MANAGED', 2667 'portRange': str(port), 2668 'IPAddress': ip_address, 2669 'network': args.network, 2670 'target': arg_target_proxy_url, 2671 } 2672 logger.debug('Sending GCP request with body=%s', config) 2673 result = compute_to_use.globalForwardingRules().insert( 2674 project=gcp.project, 2675 body=config).execute(num_retries=_GCP_API_RETRIES) 2676 wait_for_global_operation(gcp, result['name']) 2677 global_forwarding_rule = GcpResource(config['name'], 2678 result['targetLink']) 2679 gcp.global_forwarding_rules.append(global_forwarding_rule) 2680 gcp.service_port = port 2681 return 2682 except googleapiclient.errors.HttpError as http_error: 2683 logger.warning( 2684 'Got error %s when attempting to create forwarding rule to ' 2685 '%s:%d. Retrying with another port.' % 2686 (http_error, ip_address, port)) 2687 2688 2689def get_health_check(gcp, health_check_name): 2690 try: 2691 result = gcp.compute.healthChecks().get( 2692 project=gcp.project, healthCheck=health_check_name).execute() 2693 gcp.health_check = GcpResource(health_check_name, result['selfLink']) 2694 except Exception as e: 2695 gcp.errors.append(e) 2696 gcp.health_check = GcpResource(health_check_name, None) 2697 2698 2699def get_health_check_firewall_rule(gcp, firewall_name): 2700 try: 2701 result = gcp.compute.firewalls().get(project=gcp.project, 2702 firewall=firewall_name).execute() 2703 gcp.health_check_firewall_rule = GcpResource(firewall_name, 2704 result['selfLink']) 2705 except Exception as e: 2706 gcp.errors.append(e) 2707 gcp.health_check_firewall_rule = GcpResource(firewall_name, None) 2708 2709 2710def get_backend_service(gcp, backend_service_name, record_error=True): 2711 try: 2712 result = gcp.compute.backendServices().get( 2713 project=gcp.project, backendService=backend_service_name).execute() 2714 backend_service = GcpResource(backend_service_name, result['selfLink']) 2715 except Exception as e: 2716 if record_error: 2717 gcp.errors.append(e) 2718 backend_service = GcpResource(backend_service_name, None) 2719 gcp.backend_services.append(backend_service) 2720 return backend_service 2721 2722 2723def get_url_map(gcp, url_map_name, record_error=True): 2724 try: 2725 result = gcp.compute.urlMaps().get(project=gcp.project, 2726 urlMap=url_map_name).execute() 2727 url_map = GcpResource(url_map_name, result['selfLink']) 2728 gcp.url_maps.append(url_map) 2729 except Exception as e: 2730 if record_error: 2731 gcp.errors.append(e) 2732 2733 2734def get_target_proxy(gcp, target_proxy_name, record_error=True): 2735 try: 2736 if gcp.alpha_compute: 2737 result = gcp.alpha_compute.targetGrpcProxies().get( 2738 project=gcp.project, 2739 targetGrpcProxy=target_proxy_name).execute() 2740 else: 2741 result = gcp.compute.targetHttpProxies().get( 2742 project=gcp.project, 2743 targetHttpProxy=target_proxy_name).execute() 2744 target_proxy = GcpResource(target_proxy_name, result['selfLink']) 2745 gcp.target_proxies.append(target_proxy) 2746 except Exception as e: 2747 if record_error: 2748 gcp.errors.append(e) 2749 2750 2751def get_global_forwarding_rule(gcp, forwarding_rule_name, record_error=True): 2752 try: 2753 result = gcp.compute.globalForwardingRules().get( 2754 project=gcp.project, forwardingRule=forwarding_rule_name).execute() 2755 global_forwarding_rule = GcpResource(forwarding_rule_name, 2756 result['selfLink']) 2757 gcp.global_forwarding_rules.append(global_forwarding_rule) 2758 except Exception as e: 2759 if record_error: 2760 gcp.errors.append(e) 2761 2762 2763def get_instance_template(gcp, template_name): 2764 try: 2765 result = gcp.compute.instanceTemplates().get( 2766 project=gcp.project, instanceTemplate=template_name).execute() 2767 gcp.instance_template = GcpResource(template_name, result['selfLink']) 2768 except Exception as e: 2769 gcp.errors.append(e) 2770 gcp.instance_template = GcpResource(template_name, None) 2771 2772 2773def get_instance_group(gcp, zone, instance_group_name): 2774 try: 2775 result = gcp.compute.instanceGroups().get( 2776 project=gcp.project, zone=zone, 2777 instanceGroup=instance_group_name).execute() 2778 gcp.service_port = result['namedPorts'][0]['port'] 2779 instance_group = InstanceGroup(instance_group_name, result['selfLink'], 2780 zone) 2781 except Exception as e: 2782 gcp.errors.append(e) 2783 instance_group = InstanceGroup(instance_group_name, None, zone) 2784 gcp.instance_groups.append(instance_group) 2785 return instance_group 2786 2787 2788def delete_global_forwarding_rule(gcp, forwarding_rule_to_delete=None): 2789 if not forwarding_rule_to_delete: 2790 return 2791 try: 2792 logger.debug('Deleting forwarding rule %s', 2793 forwarding_rule_to_delete.name) 2794 result = gcp.compute.globalForwardingRules().delete( 2795 project=gcp.project, 2796 forwardingRule=forwarding_rule_to_delete.name).execute( 2797 num_retries=_GCP_API_RETRIES) 2798 wait_for_global_operation(gcp, result['name']) 2799 if forwarding_rule_to_delete in gcp.global_forwarding_rules: 2800 gcp.global_forwarding_rules.remove(forwarding_rule_to_delete) 2801 else: 2802 logger.debug( 2803 'Forwarding rule %s does not exist in gcp.global_forwarding_rules', 2804 forwarding_rule_to_delete.name) 2805 except googleapiclient.errors.HttpError as http_error: 2806 logger.info('Delete failed: %s', http_error) 2807 2808 2809def delete_global_forwarding_rules(gcp): 2810 forwarding_rules_to_delete = gcp.global_forwarding_rules.copy() 2811 for forwarding_rule in forwarding_rules_to_delete: 2812 delete_global_forwarding_rule(gcp, forwarding_rule) 2813 2814 2815def delete_target_proxy(gcp, proxy_to_delete=None): 2816 if not proxy_to_delete: 2817 return 2818 try: 2819 if gcp.alpha_compute: 2820 logger.debug('Deleting grpc proxy %s', proxy_to_delete.name) 2821 result = gcp.alpha_compute.targetGrpcProxies().delete( 2822 project=gcp.project, 2823 targetGrpcProxy=proxy_to_delete.name).execute( 2824 num_retries=_GCP_API_RETRIES) 2825 else: 2826 logger.debug('Deleting http proxy %s', proxy_to_delete.name) 2827 result = gcp.compute.targetHttpProxies().delete( 2828 project=gcp.project, 2829 targetHttpProxy=proxy_to_delete.name).execute( 2830 num_retries=_GCP_API_RETRIES) 2831 wait_for_global_operation(gcp, result['name']) 2832 if proxy_to_delete in gcp.target_proxies: 2833 gcp.target_proxies.remove(proxy_to_delete) 2834 else: 2835 logger.debug('Gcp proxy %s does not exist in gcp.target_proxies', 2836 proxy_to_delete.name) 2837 except googleapiclient.errors.HttpError as http_error: 2838 logger.info('Delete failed: %s', http_error) 2839 2840 2841def delete_target_proxies(gcp): 2842 target_proxies_to_delete = gcp.target_proxies.copy() 2843 for target_proxy in target_proxies_to_delete: 2844 delete_target_proxy(gcp, target_proxy) 2845 2846 2847def delete_url_map(gcp, url_map_to_delete=None): 2848 if not url_map_to_delete: 2849 return 2850 try: 2851 logger.debug('Deleting url map %s', url_map_to_delete.name) 2852 result = gcp.compute.urlMaps().delete( 2853 project=gcp.project, 2854 urlMap=url_map_to_delete.name).execute(num_retries=_GCP_API_RETRIES) 2855 wait_for_global_operation(gcp, result['name']) 2856 if url_map_to_delete in gcp.url_maps: 2857 gcp.url_maps.remove(url_map_to_delete) 2858 else: 2859 logger.debug('Url map %s does not exist in gcp.url_maps', 2860 url_map_to_delete.name) 2861 except googleapiclient.errors.HttpError as http_error: 2862 logger.info('Delete failed: %s', http_error) 2863 2864 2865def delete_url_maps(gcp): 2866 url_maps_to_delete = gcp.url_maps.copy() 2867 for url_map in url_maps_to_delete: 2868 delete_url_map(gcp, url_map) 2869 2870 2871def delete_backend_service(gcp, backend_service): 2872 try: 2873 logger.debug('Deleting backend service %s', backend_service.name) 2874 result = gcp.compute.backendServices().delete( 2875 project=gcp.project, backendService=backend_service.name).execute( 2876 num_retries=_GCP_API_RETRIES) 2877 wait_for_global_operation(gcp, result['name']) 2878 except googleapiclient.errors.HttpError as http_error: 2879 logger.info('Delete failed: %s', http_error) 2880 2881 2882def delete_backend_services(gcp): 2883 for backend_service in gcp.backend_services: 2884 delete_backend_service(gcp, backend_service) 2885 2886 2887def delete_firewall(gcp): 2888 try: 2889 logger.debug('Deleting firewall %s', 2890 gcp.health_check_firewall_rule.name) 2891 result = gcp.compute.firewalls().delete( 2892 project=gcp.project, 2893 firewall=gcp.health_check_firewall_rule.name).execute( 2894 num_retries=_GCP_API_RETRIES) 2895 wait_for_global_operation(gcp, result['name']) 2896 except googleapiclient.errors.HttpError as http_error: 2897 logger.info('Delete failed: %s', http_error) 2898 2899 2900def delete_health_check(gcp): 2901 try: 2902 logger.debug('Deleting health check %s', gcp.health_check.name) 2903 result = gcp.compute.healthChecks().delete( 2904 project=gcp.project, healthCheck=gcp.health_check.name).execute( 2905 num_retries=_GCP_API_RETRIES) 2906 wait_for_global_operation(gcp, result['name']) 2907 except googleapiclient.errors.HttpError as http_error: 2908 logger.info('Delete failed: %s', http_error) 2909 2910 2911def delete_instance_groups(gcp): 2912 for instance_group in gcp.instance_groups: 2913 try: 2914 logger.debug('Deleting instance group %s %s', instance_group.name, 2915 instance_group.zone) 2916 result = gcp.compute.instanceGroupManagers().delete( 2917 project=gcp.project, 2918 zone=instance_group.zone, 2919 instanceGroupManager=instance_group.name).execute( 2920 num_retries=_GCP_API_RETRIES) 2921 wait_for_zone_operation(gcp, 2922 instance_group.zone, 2923 result['name'], 2924 timeout_sec=_WAIT_FOR_BACKEND_SEC) 2925 except googleapiclient.errors.HttpError as http_error: 2926 logger.info('Delete failed: %s', http_error) 2927 2928 2929def delete_instance_template(gcp): 2930 try: 2931 logger.debug('Deleting instance template %s', 2932 gcp.instance_template.name) 2933 result = gcp.compute.instanceTemplates().delete( 2934 project=gcp.project, 2935 instanceTemplate=gcp.instance_template.name).execute( 2936 num_retries=_GCP_API_RETRIES) 2937 wait_for_global_operation(gcp, result['name']) 2938 except googleapiclient.errors.HttpError as http_error: 2939 logger.info('Delete failed: %s', http_error) 2940 2941 2942def patch_backend_service(gcp, 2943 backend_service, 2944 instance_groups, 2945 balancing_mode='UTILIZATION', 2946 max_rate=1, 2947 circuit_breakers=None): 2948 if gcp.alpha_compute: 2949 compute_to_use = gcp.alpha_compute 2950 else: 2951 compute_to_use = gcp.compute 2952 config = { 2953 'backends': [{ 2954 'group': instance_group.url, 2955 'balancingMode': balancing_mode, 2956 'maxRate': max_rate if balancing_mode == 'RATE' else None 2957 } for instance_group in instance_groups], 2958 'circuitBreakers': circuit_breakers, 2959 } 2960 logger.debug('Sending GCP request with body=%s', config) 2961 result = compute_to_use.backendServices().patch( 2962 project=gcp.project, backendService=backend_service.name, 2963 body=config).execute(num_retries=_GCP_API_RETRIES) 2964 wait_for_global_operation(gcp, 2965 result['name'], 2966 timeout_sec=_WAIT_FOR_BACKEND_SEC) 2967 2968 2969def resize_instance_group(gcp, 2970 instance_group, 2971 new_size, 2972 timeout_sec=_WAIT_FOR_OPERATION_SEC): 2973 result = gcp.compute.instanceGroupManagers().resize( 2974 project=gcp.project, 2975 zone=instance_group.zone, 2976 instanceGroupManager=instance_group.name, 2977 size=new_size).execute(num_retries=_GCP_API_RETRIES) 2978 wait_for_zone_operation(gcp, 2979 instance_group.zone, 2980 result['name'], 2981 timeout_sec=360) 2982 wait_for_instance_group_to_reach_expected_size(gcp, instance_group, 2983 new_size, timeout_sec) 2984 2985 2986def patch_url_map_backend_service(gcp, 2987 backend_service=None, 2988 services_with_weights=None, 2989 route_rules=None, 2990 url_map=None): 2991 if url_map: 2992 url_map_name = url_map.name 2993 else: 2994 url_map_name = gcp.url_maps[0].name 2995 '''change url_map's backend service 2996 2997 Only one of backend_service and service_with_weights can be not None. 2998 ''' 2999 if gcp.alpha_compute: 3000 compute_to_use = gcp.alpha_compute 3001 else: 3002 compute_to_use = gcp.compute 3003 3004 if backend_service and services_with_weights: 3005 raise ValueError( 3006 'both backend_service and service_with_weights are not None.') 3007 3008 default_service = backend_service.url if backend_service else None 3009 default_route_action = { 3010 'weightedBackendServices': [{ 3011 'backendService': service.url, 3012 'weight': w, 3013 } for service, w in list(services_with_weights.items())] 3014 } if services_with_weights else None 3015 3016 config = { 3017 'pathMatchers': [{ 3018 'name': _PATH_MATCHER_NAME, 3019 'defaultService': default_service, 3020 'defaultRouteAction': default_route_action, 3021 'routeRules': route_rules, 3022 }] 3023 } 3024 logger.debug('Sending GCP request with body=%s', config) 3025 result = compute_to_use.urlMaps().patch( 3026 project=gcp.project, urlMap=url_map_name, 3027 body=config).execute(num_retries=_GCP_API_RETRIES) 3028 wait_for_global_operation(gcp, result['name']) 3029 3030 3031def wait_for_instance_group_to_reach_expected_size(gcp, instance_group, 3032 expected_size, timeout_sec): 3033 start_time = time.time() 3034 while True: 3035 current_size = len(get_instance_names(gcp, instance_group)) 3036 if current_size == expected_size: 3037 break 3038 if time.time() - start_time > timeout_sec: 3039 raise Exception( 3040 'Instance group had expected size %d but actual size %d' % 3041 (expected_size, current_size)) 3042 time.sleep(2) 3043 3044 3045def wait_for_global_operation(gcp, 3046 operation, 3047 timeout_sec=_WAIT_FOR_OPERATION_SEC): 3048 start_time = time.time() 3049 while time.time() - start_time <= timeout_sec: 3050 result = gcp.compute.globalOperations().get( 3051 project=gcp.project, 3052 operation=operation).execute(num_retries=_GCP_API_RETRIES) 3053 if result['status'] == 'DONE': 3054 if 'error' in result: 3055 raise Exception(result['error']) 3056 return 3057 time.sleep(2) 3058 raise Exception('Operation %s did not complete within %d' % 3059 (operation, timeout_sec)) 3060 3061 3062def wait_for_zone_operation(gcp, 3063 zone, 3064 operation, 3065 timeout_sec=_WAIT_FOR_OPERATION_SEC): 3066 start_time = time.time() 3067 while time.time() - start_time <= timeout_sec: 3068 result = gcp.compute.zoneOperations().get( 3069 project=gcp.project, zone=zone, 3070 operation=operation).execute(num_retries=_GCP_API_RETRIES) 3071 if result['status'] == 'DONE': 3072 if 'error' in result: 3073 raise Exception(result['error']) 3074 return 3075 time.sleep(2) 3076 raise Exception('Operation %s did not complete within %d' % 3077 (operation, timeout_sec)) 3078 3079 3080def wait_for_healthy_backends(gcp, 3081 backend_service, 3082 instance_group, 3083 timeout_sec=_WAIT_FOR_BACKEND_SEC): 3084 start_time = time.time() 3085 config = {'group': instance_group.url} 3086 instance_names = get_instance_names(gcp, instance_group) 3087 expected_size = len(instance_names) 3088 while time.time() - start_time <= timeout_sec: 3089 for instance_name in instance_names: 3090 try: 3091 status = get_serving_status(instance_name, gcp.service_port) 3092 logger.info('serving status response from %s: %s', 3093 instance_name, status) 3094 except grpc.RpcError as rpc_error: 3095 logger.info('checking serving status of %s failed: %s', 3096 instance_name, rpc_error) 3097 result = gcp.compute.backendServices().getHealth( 3098 project=gcp.project, 3099 backendService=backend_service.name, 3100 body=config).execute(num_retries=_GCP_API_RETRIES) 3101 if 'healthStatus' in result: 3102 logger.info('received GCP healthStatus: %s', result['healthStatus']) 3103 healthy = True 3104 for instance in result['healthStatus']: 3105 if instance['healthState'] != 'HEALTHY': 3106 healthy = False 3107 break 3108 if healthy and expected_size == len(result['healthStatus']): 3109 return 3110 else: 3111 logger.info('no healthStatus received from GCP') 3112 time.sleep(5) 3113 raise Exception('Not all backends became healthy within %d seconds: %s' % 3114 (timeout_sec, result)) 3115 3116 3117def get_instance_names(gcp, instance_group): 3118 instance_names = [] 3119 result = gcp.compute.instanceGroups().listInstances( 3120 project=gcp.project, 3121 zone=instance_group.zone, 3122 instanceGroup=instance_group.name, 3123 body={ 3124 'instanceState': 'ALL' 3125 }).execute(num_retries=_GCP_API_RETRIES) 3126 if 'items' not in result: 3127 return [] 3128 for item in result['items']: 3129 # listInstances() returns the full URL of the instance, which ends with 3130 # the instance name. compute.instances().get() requires using the 3131 # instance name (not the full URL) to look up instance details, so we 3132 # just extract the name manually. 3133 instance_name = item['instance'].split('/')[-1] 3134 instance_names.append(instance_name) 3135 logger.info('retrieved instance names: %s', instance_names) 3136 return instance_names 3137 3138 3139def clean_up(gcp): 3140 delete_global_forwarding_rules(gcp) 3141 delete_target_proxies(gcp) 3142 delete_url_maps(gcp) 3143 delete_backend_services(gcp) 3144 if gcp.health_check_firewall_rule: 3145 delete_firewall(gcp) 3146 if gcp.health_check: 3147 delete_health_check(gcp) 3148 delete_instance_groups(gcp) 3149 if gcp.instance_template: 3150 delete_instance_template(gcp) 3151 3152 3153class InstanceGroup(object): 3154 3155 def __init__(self, name, url, zone): 3156 self.name = name 3157 self.url = url 3158 self.zone = zone 3159 3160 3161class GcpResource(object): 3162 3163 def __init__(self, name, url): 3164 self.name = name 3165 self.url = url 3166 3167 3168class GcpState(object): 3169 3170 def __init__(self, compute, alpha_compute, project, project_num): 3171 self.compute = compute 3172 self.alpha_compute = alpha_compute 3173 self.project = project 3174 self.project_num = project_num 3175 self.health_check = None 3176 self.health_check_firewall_rule = None 3177 self.backend_services = [] 3178 self.url_maps = [] 3179 self.target_proxies = [] 3180 self.global_forwarding_rules = [] 3181 self.service_port = None 3182 self.instance_template = None 3183 self.instance_groups = [] 3184 self.errors = [] 3185 3186 3187logging.debug( 3188 "script start time: %s", 3189 datetime.datetime.now( 3190 datetime.timezone.utc).astimezone().strftime("%Y-%m-%dT%H:%M:%S %Z")) 3191logging.debug("logging local timezone: %s", 3192 datetime.datetime.now(datetime.timezone.utc).astimezone().tzinfo) 3193alpha_compute = None 3194if args.compute_discovery_document: 3195 with open(args.compute_discovery_document, 'r') as discovery_doc: 3196 compute = googleapiclient.discovery.build_from_document( 3197 discovery_doc.read()) 3198 if not args.only_stable_gcp_apis and args.alpha_compute_discovery_document: 3199 with open(args.alpha_compute_discovery_document, 'r') as discovery_doc: 3200 alpha_compute = googleapiclient.discovery.build_from_document( 3201 discovery_doc.read()) 3202else: 3203 compute = googleapiclient.discovery.build('compute', 'v1') 3204 if not args.only_stable_gcp_apis: 3205 alpha_compute = googleapiclient.discovery.build('compute', 'alpha') 3206 3207test_results = {} 3208failed_tests = [] 3209try: 3210 gcp = GcpState(compute, alpha_compute, args.project_id, args.project_num) 3211 gcp_suffix = args.gcp_suffix 3212 health_check_name = _BASE_HEALTH_CHECK_NAME + gcp_suffix 3213 if not args.use_existing_gcp_resources: 3214 if args.keep_gcp_resources: 3215 # Auto-generating a unique suffix in case of conflict should not be 3216 # combined with --keep_gcp_resources, as the suffix actually used 3217 # for GCP resources will not match the provided --gcp_suffix value. 3218 num_attempts = 1 3219 else: 3220 num_attempts = 5 3221 for i in range(num_attempts): 3222 try: 3223 logger.info('Using GCP suffix %s', gcp_suffix) 3224 create_health_check(gcp, health_check_name) 3225 break 3226 except googleapiclient.errors.HttpError as http_error: 3227 gcp_suffix = '%s-%04d' % (gcp_suffix, random.randint(0, 9999)) 3228 health_check_name = _BASE_HEALTH_CHECK_NAME + gcp_suffix 3229 logger.exception('HttpError when creating health check') 3230 if gcp.health_check is None: 3231 raise Exception('Failed to create health check name after %d ' 3232 'attempts' % num_attempts) 3233 firewall_name = _BASE_FIREWALL_RULE_NAME + gcp_suffix 3234 backend_service_name = _BASE_BACKEND_SERVICE_NAME + gcp_suffix 3235 alternate_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-alternate' + gcp_suffix 3236 extra_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-extra' + gcp_suffix 3237 more_extra_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-more-extra' + gcp_suffix 3238 url_map_name = _BASE_URL_MAP_NAME + gcp_suffix 3239 url_map_name_2 = url_map_name + '2' 3240 service_host_name = _BASE_SERVICE_HOST + gcp_suffix 3241 target_proxy_name = _BASE_TARGET_PROXY_NAME + gcp_suffix 3242 target_proxy_name_2 = target_proxy_name + '2' 3243 forwarding_rule_name = _BASE_FORWARDING_RULE_NAME + gcp_suffix 3244 forwarding_rule_name_2 = forwarding_rule_name + '2' 3245 template_name = _BASE_TEMPLATE_NAME + gcp_suffix 3246 instance_group_name = _BASE_INSTANCE_GROUP_NAME + gcp_suffix 3247 same_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-same-zone' + gcp_suffix 3248 secondary_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-secondary-zone' + gcp_suffix 3249 potential_service_ports = list(args.service_port_range) 3250 random.shuffle(potential_service_ports) 3251 if args.use_existing_gcp_resources: 3252 logger.info('Reusing existing GCP resources') 3253 get_health_check(gcp, health_check_name) 3254 get_health_check_firewall_rule(gcp, firewall_name) 3255 backend_service = get_backend_service(gcp, backend_service_name) 3256 alternate_backend_service = get_backend_service( 3257 gcp, alternate_backend_service_name) 3258 extra_backend_service = get_backend_service(gcp, 3259 extra_backend_service_name, 3260 record_error=False) 3261 more_extra_backend_service = get_backend_service( 3262 gcp, more_extra_backend_service_name, record_error=False) 3263 get_url_map(gcp, url_map_name) 3264 get_target_proxy(gcp, target_proxy_name) 3265 get_global_forwarding_rule(gcp, forwarding_rule_name) 3266 get_url_map(gcp, url_map_name_2, record_error=False) 3267 get_target_proxy(gcp, target_proxy_name_2, record_error=False) 3268 get_global_forwarding_rule(gcp, 3269 forwarding_rule_name_2, 3270 record_error=False) 3271 get_instance_template(gcp, template_name) 3272 instance_group = get_instance_group(gcp, args.zone, instance_group_name) 3273 same_zone_instance_group = get_instance_group( 3274 gcp, args.zone, same_zone_instance_group_name) 3275 secondary_zone_instance_group = get_instance_group( 3276 gcp, args.secondary_zone, secondary_zone_instance_group_name) 3277 if gcp.errors: 3278 raise Exception(gcp.errors) 3279 else: 3280 create_health_check_firewall_rule(gcp, firewall_name) 3281 backend_service = add_backend_service(gcp, backend_service_name) 3282 alternate_backend_service = add_backend_service( 3283 gcp, alternate_backend_service_name) 3284 create_url_map(gcp, url_map_name, backend_service, service_host_name) 3285 create_target_proxy(gcp, target_proxy_name) 3286 create_global_forwarding_rule(gcp, forwarding_rule_name, 3287 potential_service_ports) 3288 if not gcp.service_port: 3289 raise Exception( 3290 'Failed to find a valid ip:port for the forwarding rule') 3291 if gcp.service_port != _DEFAULT_SERVICE_PORT: 3292 patch_url_map_host_rule_with_port(gcp, url_map_name, 3293 backend_service, 3294 service_host_name) 3295 startup_script = get_startup_script(args.path_to_server_binary, 3296 gcp.service_port) 3297 create_instance_template(gcp, template_name, args.network, 3298 args.source_image, args.machine_type, 3299 startup_script) 3300 instance_group = add_instance_group(gcp, args.zone, instance_group_name, 3301 _INSTANCE_GROUP_SIZE) 3302 patch_backend_service(gcp, backend_service, [instance_group]) 3303 same_zone_instance_group = add_instance_group( 3304 gcp, args.zone, same_zone_instance_group_name, _INSTANCE_GROUP_SIZE) 3305 secondary_zone_instance_group = add_instance_group( 3306 gcp, args.secondary_zone, secondary_zone_instance_group_name, 3307 _INSTANCE_GROUP_SIZE) 3308 3309 wait_for_healthy_backends(gcp, backend_service, instance_group) 3310 3311 if args.test_case: 3312 client_env = dict(os.environ) 3313 if original_grpc_trace: 3314 client_env['GRPC_TRACE'] = original_grpc_trace 3315 if original_grpc_verbosity: 3316 client_env['GRPC_VERBOSITY'] = original_grpc_verbosity 3317 bootstrap_server_features = [] 3318 3319 if gcp.service_port == _DEFAULT_SERVICE_PORT: 3320 server_uri = service_host_name 3321 else: 3322 server_uri = service_host_name + ':' + str(gcp.service_port) 3323 if args.xds_v3_support: 3324 client_env['GRPC_XDS_EXPERIMENTAL_V3_SUPPORT'] = 'true' 3325 bootstrap_server_features.append('xds_v3') 3326 if args.bootstrap_file: 3327 bootstrap_path = os.path.abspath(args.bootstrap_file) 3328 else: 3329 with tempfile.NamedTemporaryFile(delete=False) as bootstrap_file: 3330 bootstrap_file.write( 3331 _BOOTSTRAP_TEMPLATE.format( 3332 node_id='projects/%s/networks/%s/nodes/%s' % 3333 (gcp.project_num, args.network.split('/')[-1], 3334 uuid.uuid1()), 3335 server_features=json.dumps( 3336 bootstrap_server_features)).encode('utf-8')) 3337 bootstrap_path = bootstrap_file.name 3338 client_env['GRPC_XDS_BOOTSTRAP'] = bootstrap_path 3339 client_env['GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING'] = 'true' 3340 client_env['GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT'] = 'true' 3341 client_env['GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION'] = 'true' 3342 for test_case in args.test_case: 3343 if test_case in _V3_TEST_CASES and not args.xds_v3_support: 3344 logger.info('skipping test %s due to missing v3 support', 3345 test_case) 3346 continue 3347 if test_case in _ALPHA_TEST_CASES and not gcp.alpha_compute: 3348 logger.info('skipping test %s due to missing alpha support', 3349 test_case) 3350 continue 3351 if test_case in [ 3352 'api_listener', 'forwarding_rule_port_match', 3353 'forwarding_rule_default_port' 3354 ] and CLIENT_HOSTS: 3355 logger.info( 3356 'skipping test %s because test configuration is' 3357 'not compatible with client processes on existing' 3358 'client hosts', test_case) 3359 continue 3360 if test_case == 'forwarding_rule_default_port': 3361 server_uri = service_host_name 3362 result = jobset.JobResult() 3363 log_dir = os.path.join(_TEST_LOG_BASE_DIR, test_case) 3364 if not os.path.exists(log_dir): 3365 os.makedirs(log_dir) 3366 test_log_filename = os.path.join(log_dir, _SPONGE_LOG_NAME) 3367 test_log_file = open(test_log_filename, 'w+') 3368 client_process = None 3369 3370 if test_case in _TESTS_TO_RUN_MULTIPLE_RPCS: 3371 rpcs_to_send = '--rpc="UnaryCall,EmptyCall"' 3372 else: 3373 rpcs_to_send = '--rpc="UnaryCall"' 3374 3375 if test_case in _TESTS_TO_SEND_METADATA: 3376 metadata_to_send = '--metadata="EmptyCall:{keyE}:{valueE},UnaryCall:{keyU}:{valueU},UnaryCall:{keyNU}:{valueNU}"'.format( 3377 keyE=_TEST_METADATA_KEY, 3378 valueE=_TEST_METADATA_VALUE_EMPTY, 3379 keyU=_TEST_METADATA_KEY, 3380 valueU=_TEST_METADATA_VALUE_UNARY, 3381 keyNU=_TEST_METADATA_NUMERIC_KEY, 3382 valueNU=_TEST_METADATA_NUMERIC_VALUE) 3383 else: 3384 # Setting the arg explicitly to empty with '--metadata=""' 3385 # makes C# client fail 3386 # (see https://github.com/commandlineparser/commandline/issues/412), 3387 # so instead we just rely on clients using the default when 3388 # metadata arg is not specified. 3389 metadata_to_send = '' 3390 3391 # TODO(ericgribkoff) Temporarily disable fail_on_failed_rpc checks 3392 # in the client. This means we will ignore intermittent RPC 3393 # failures (but this framework still checks that the final result 3394 # is as expected). 3395 # 3396 # Reason for disabling this is, the resources are shared by 3397 # multiple tests, and a change in previous test could be delayed 3398 # until the second test starts. The second test may see 3399 # intermittent failures because of that. 3400 # 3401 # A fix is to not share resources between tests (though that does 3402 # mean the tests will be significantly slower due to creating new 3403 # resources). 3404 fail_on_failed_rpc = '' 3405 3406 try: 3407 if not CLIENT_HOSTS: 3408 client_cmd_formatted = args.client_cmd.format( 3409 server_uri=server_uri, 3410 stats_port=args.stats_port, 3411 qps=args.qps, 3412 fail_on_failed_rpc=fail_on_failed_rpc, 3413 rpcs_to_send=rpcs_to_send, 3414 metadata_to_send=metadata_to_send) 3415 logger.debug('running client: %s', client_cmd_formatted) 3416 client_cmd = shlex.split(client_cmd_formatted) 3417 client_process = subprocess.Popen(client_cmd, 3418 env=client_env, 3419 stderr=subprocess.STDOUT, 3420 stdout=test_log_file) 3421 if test_case == 'backends_restart': 3422 test_backends_restart(gcp, backend_service, instance_group) 3423 elif test_case == 'change_backend_service': 3424 test_change_backend_service(gcp, backend_service, 3425 instance_group, 3426 alternate_backend_service, 3427 same_zone_instance_group) 3428 elif test_case == 'gentle_failover': 3429 test_gentle_failover(gcp, backend_service, instance_group, 3430 secondary_zone_instance_group) 3431 elif test_case == 'load_report_based_failover': 3432 test_load_report_based_failover( 3433 gcp, backend_service, instance_group, 3434 secondary_zone_instance_group) 3435 elif test_case == 'ping_pong': 3436 test_ping_pong(gcp, backend_service, instance_group) 3437 elif test_case == 'remove_instance_group': 3438 test_remove_instance_group(gcp, backend_service, 3439 instance_group, 3440 same_zone_instance_group) 3441 elif test_case == 'round_robin': 3442 test_round_robin(gcp, backend_service, instance_group) 3443 elif test_case == 'secondary_locality_gets_no_requests_on_partial_primary_failure': 3444 test_secondary_locality_gets_no_requests_on_partial_primary_failure( 3445 gcp, backend_service, instance_group, 3446 secondary_zone_instance_group) 3447 elif test_case == 'secondary_locality_gets_requests_on_primary_failure': 3448 test_secondary_locality_gets_requests_on_primary_failure( 3449 gcp, backend_service, instance_group, 3450 secondary_zone_instance_group) 3451 elif test_case == 'traffic_splitting': 3452 test_traffic_splitting(gcp, backend_service, instance_group, 3453 alternate_backend_service, 3454 same_zone_instance_group) 3455 elif test_case == 'path_matching': 3456 test_path_matching(gcp, backend_service, instance_group, 3457 alternate_backend_service, 3458 same_zone_instance_group) 3459 elif test_case == 'header_matching': 3460 test_header_matching(gcp, backend_service, instance_group, 3461 alternate_backend_service, 3462 same_zone_instance_group) 3463 elif test_case == 'circuit_breaking': 3464 test_circuit_breaking(gcp, backend_service, instance_group, 3465 same_zone_instance_group) 3466 elif test_case == 'timeout': 3467 test_timeout(gcp, backend_service, instance_group) 3468 elif test_case == 'fault_injection': 3469 test_fault_injection(gcp, backend_service, instance_group) 3470 elif test_case == 'api_listener': 3471 server_uri = test_api_listener(gcp, backend_service, 3472 instance_group, 3473 alternate_backend_service) 3474 elif test_case == 'forwarding_rule_port_match': 3475 server_uri = test_forwarding_rule_port_match( 3476 gcp, backend_service, instance_group) 3477 elif test_case == 'forwarding_rule_default_port': 3478 server_uri = test_forwarding_rule_default_port( 3479 gcp, backend_service, instance_group) 3480 elif test_case == 'metadata_filter': 3481 test_metadata_filter(gcp, backend_service, instance_group, 3482 alternate_backend_service, 3483 same_zone_instance_group) 3484 elif test_case == 'csds': 3485 test_csds(gcp, backend_service, instance_group, server_uri) 3486 else: 3487 logger.error('Unknown test case: %s', test_case) 3488 sys.exit(1) 3489 if client_process and client_process.poll() is not None: 3490 raise Exception( 3491 'Client process exited prematurely with exit code %d' % 3492 client_process.returncode) 3493 result.state = 'PASSED' 3494 result.returncode = 0 3495 except Exception as e: 3496 logger.exception('Test case %s failed', test_case) 3497 failed_tests.append(test_case) 3498 result.state = 'FAILED' 3499 result.message = str(e) 3500 if args.halt_after_fail: 3501 # Stop the test suite if one case failed. 3502 raise 3503 finally: 3504 if client_process: 3505 if client_process.returncode: 3506 logger.info('Client exited with code %d' % 3507 client_process.returncode) 3508 else: 3509 client_process.terminate() 3510 test_log_file.close() 3511 # Workaround for Python 3, as report_utils will invoke decode() on 3512 # result.message, which has a default value of ''. 3513 result.message = result.message.encode('UTF-8') 3514 test_results[test_case] = [result] 3515 if args.log_client_output: 3516 logger.info('Client output:') 3517 with open(test_log_filename, 'r') as client_output: 3518 logger.info(client_output.read()) 3519 if not os.path.exists(_TEST_LOG_BASE_DIR): 3520 os.makedirs(_TEST_LOG_BASE_DIR) 3521 report_utils.render_junit_xml_report(test_results, 3522 os.path.join( 3523 _TEST_LOG_BASE_DIR, 3524 _SPONGE_XML_NAME), 3525 suite_name='xds_tests', 3526 multi_target=True) 3527 if failed_tests: 3528 logger.error('Test case(s) %s failed', failed_tests) 3529 sys.exit(1) 3530finally: 3531 keep_resources = args.keep_gcp_resources 3532 if args.halt_after_fail and failed_tests: 3533 logger.info( 3534 'Halt after fail triggered, exiting without cleaning up resources') 3535 keep_resources = True 3536 if not keep_resources: 3537 logger.info('Cleaning up GCP resources. This may take some time.') 3538 clean_up(gcp) 3539