xref: /aosp_15_r20/external/grpc-grpc/tools/run_tests/run_xds_tests.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1#!/usr/bin/env python3
2# Copyright 2020 gRPC authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Run xDS integration tests on GCP using Traffic Director."""
16
17import argparse
18import datetime
19import json
20import logging
21import os
22import random
23import re
24import shlex
25import socket
26import subprocess
27import sys
28import tempfile
29import time
30import uuid
31
32from google.protobuf import json_format
33import googleapiclient.discovery
34import grpc
35from oauth2client.client import GoogleCredentials
36
37import python_utils.jobset as jobset
38import python_utils.report_utils as report_utils
39from src.proto.grpc.health.v1 import health_pb2
40from src.proto.grpc.health.v1 import health_pb2_grpc
41from src.proto.grpc.testing import empty_pb2
42from src.proto.grpc.testing import messages_pb2
43from src.proto.grpc.testing import test_pb2_grpc
44
45# Envoy protos provided by PyPI package xds-protos
46# Needs to import the generated Python file to load descriptors
47try:
48    from envoy.extensions.filters.common.fault.v3 import fault_pb2
49    from envoy.extensions.filters.http.fault.v3 import fault_pb2
50    from envoy.extensions.filters.http.router.v3 import router_pb2
51    from envoy.extensions.filters.network.http_connection_manager.v3 import (
52        http_connection_manager_pb2,
53    )
54    from envoy.service.status.v3 import csds_pb2
55    from envoy.service.status.v3 import csds_pb2_grpc
56except ImportError:
57    # These protos are required by CSDS test. We should not fail the entire
58    # script for one test case.
59    pass
60
61logger = logging.getLogger()
62console_handler = logging.StreamHandler()
63formatter = logging.Formatter(fmt="%(asctime)s: %(levelname)-8s %(message)s")
64console_handler.setFormatter(formatter)
65logger.handlers = []
66logger.addHandler(console_handler)
67logger.setLevel(logging.WARNING)
68
69# Suppress excessive logs for gRPC Python
70original_grpc_trace = os.environ.pop("GRPC_TRACE", None)
71original_grpc_verbosity = os.environ.pop("GRPC_VERBOSITY", None)
72# Suppress not-essential logs for GCP clients
73logging.getLogger("google_auth_httplib2").setLevel(logging.WARNING)
74logging.getLogger("googleapiclient.discovery").setLevel(logging.WARNING)
75
76_TEST_CASES = [
77    "backends_restart",
78    "change_backend_service",
79    "gentle_failover",
80    "load_report_based_failover",
81    "ping_pong",
82    "remove_instance_group",
83    "round_robin",
84    "secondary_locality_gets_no_requests_on_partial_primary_failure",
85    "secondary_locality_gets_requests_on_primary_failure",
86    "traffic_splitting",
87    "path_matching",
88    "header_matching",
89    "api_listener",
90    "forwarding_rule_port_match",
91    "forwarding_rule_default_port",
92    "metadata_filter",
93]
94
95# Valid test cases, but not in all. So the tests can only run manually, and
96# aren't enabled automatically for all languages.
97#
98# TODO: Move them into _TEST_CASES when support is ready in all languages.
99_ADDITIONAL_TEST_CASES = [
100    "circuit_breaking",
101    "timeout",
102    "fault_injection",
103    "csds",
104]
105
106# Test cases that require the V3 API.  Skipped in older runs.
107_V3_TEST_CASES = frozenset(["timeout", "fault_injection", "csds"])
108
109# Test cases that require the alpha API.  Skipped for stable API runs.
110_ALPHA_TEST_CASES = frozenset(["timeout"])
111
112
113def parse_test_cases(arg):
114    if arg == "":
115        return []
116    arg_split = arg.split(",")
117    test_cases = set()
118    all_test_cases = _TEST_CASES + _ADDITIONAL_TEST_CASES
119    for arg in arg_split:
120        if arg == "all":
121            test_cases = test_cases.union(_TEST_CASES)
122        else:
123            test_cases = test_cases.union([arg])
124    if not all([test_case in all_test_cases for test_case in test_cases]):
125        raise Exception("Failed to parse test cases %s" % arg)
126    # Perserve order.
127    return [x for x in all_test_cases if x in test_cases]
128
129
130def parse_port_range(port_arg):
131    try:
132        port = int(port_arg)
133        return list(range(port, port + 1))
134    except:
135        port_min, port_max = port_arg.split(":")
136        return list(range(int(port_min), int(port_max) + 1))
137
138
139argp = argparse.ArgumentParser(description="Run xDS interop tests on GCP")
140# TODO(zdapeng): remove default value of project_id and project_num
141argp.add_argument("--project_id", default="grpc-testing", help="GCP project id")
142argp.add_argument(
143    "--project_num", default="830293263384", help="GCP project number"
144)
145argp.add_argument(
146    "--gcp_suffix",
147    default="",
148    help=(
149        "Optional suffix for all generated GCP resource names. Useful to "
150        "ensure distinct names across test runs."
151    ),
152)
153argp.add_argument(
154    "--test_case",
155    default="ping_pong",
156    type=parse_test_cases,
157    help=(
158        "Comma-separated list of test cases to run. Available tests: %s, "
159        "(or 'all' to run every test). "
160        "Alternative tests not included in 'all': %s"
161    )
162    % (",".join(_TEST_CASES), ",".join(_ADDITIONAL_TEST_CASES)),
163)
164argp.add_argument(
165    "--bootstrap_file",
166    default="",
167    help=(
168        "File to reference via GRPC_XDS_BOOTSTRAP. Disables built-in "
169        "bootstrap generation"
170    ),
171)
172argp.add_argument(
173    "--xds_v3_support",
174    default=False,
175    action="store_true",
176    help=(
177        "Support xDS v3 via GRPC_XDS_EXPERIMENTAL_V3_SUPPORT. "
178        "If a pre-created bootstrap file is provided via the --bootstrap_file "
179        "parameter, it should include xds_v3 in its server_features field."
180    ),
181)
182argp.add_argument(
183    "--client_cmd",
184    default=None,
185    help=(
186        "Command to launch xDS test client. {server_uri}, {stats_port} and"
187        " {qps} references will be replaced using str.format()."
188        " GRPC_XDS_BOOTSTRAP will be set for the command"
189    ),
190)
191argp.add_argument(
192    "--client_hosts",
193    default=None,
194    help=(
195        "Comma-separated list of hosts running client processes. If set,"
196        " --client_cmd is ignored and client processes are assumed to be"
197        " running on the specified hosts."
198    ),
199)
200argp.add_argument("--zone", default="us-central1-a")
201argp.add_argument(
202    "--secondary_zone",
203    default="us-west1-b",
204    help="Zone to use for secondary TD locality tests",
205)
206argp.add_argument("--qps", default=100, type=int, help="Client QPS")
207argp.add_argument(
208    "--wait_for_backend_sec",
209    default=1200,
210    type=int,
211    help=(
212        "Time limit for waiting for created backend services to report "
213        "healthy when launching or updated GCP resources"
214    ),
215)
216argp.add_argument(
217    "--use_existing_gcp_resources",
218    default=False,
219    action="store_true",
220    help=(
221        "If set, find and use already created GCP resources instead of creating"
222        " new ones."
223    ),
224)
225argp.add_argument(
226    "--keep_gcp_resources",
227    default=False,
228    action="store_true",
229    help=(
230        "Leave GCP VMs and configuration running after test. Default behavior"
231        " is to delete when tests complete."
232    ),
233)
234argp.add_argument(
235    "--halt_after_fail",
236    action="store_true",
237    help="Halt and save the resources when test failed.",
238)
239argp.add_argument(
240    "--compute_discovery_document",
241    default=None,
242    type=str,
243    help=(
244        "If provided, uses this file instead of retrieving via the GCP"
245        " discovery API"
246    ),
247)
248argp.add_argument(
249    "--alpha_compute_discovery_document",
250    default=None,
251    type=str,
252    help=(
253        "If provided, uses this file instead of retrieving via the alpha GCP "
254        "discovery API"
255    ),
256)
257argp.add_argument(
258    "--network", default="global/networks/default", help="GCP network to use"
259)
260_DEFAULT_PORT_RANGE = "8080:8280"
261argp.add_argument(
262    "--service_port_range",
263    default=_DEFAULT_PORT_RANGE,
264    type=parse_port_range,
265    help=(
266        "Listening port for created gRPC backends. Specified as "
267        "either a single int or as a range in the format min:max, in "
268        "which case an available port p will be chosen s.t. min <= p "
269        "<= max"
270    ),
271)
272argp.add_argument(
273    "--stats_port",
274    default=8079,
275    type=int,
276    help="Local port for the client process to expose the LB stats service",
277)
278argp.add_argument(
279    "--xds_server",
280    default="trafficdirector.googleapis.com:443",
281    help="xDS server",
282)
283argp.add_argument(
284    "--source_image",
285    default="projects/debian-cloud/global/images/family/debian-9",
286    help="Source image for VMs created during the test",
287)
288argp.add_argument(
289    "--path_to_server_binary",
290    default=None,
291    type=str,
292    help=(
293        "If set, the server binary must already be pre-built on "
294        "the specified source image"
295    ),
296)
297argp.add_argument(
298    "--machine_type",
299    default="e2-standard-2",
300    help="Machine type for VMs created during the test",
301)
302argp.add_argument(
303    "--instance_group_size",
304    default=2,
305    type=int,
306    help=(
307        "Number of VMs to create per instance group. Certain test cases (e.g.,"
308        " round_robin) may not give meaningful results if this is set to a"
309        " value less than 2."
310    ),
311)
312argp.add_argument(
313    "--verbose", help="verbose log output", default=False, action="store_true"
314)
315# TODO(ericgribkoff) Remove this param once the sponge-formatted log files are
316# visible in all test environments.
317argp.add_argument(
318    "--log_client_output",
319    help="Log captured client output",
320    default=False,
321    action="store_true",
322)
323# TODO(ericgribkoff) Remove this flag once all test environments are verified to
324# have access to the alpha compute APIs.
325argp.add_argument(
326    "--only_stable_gcp_apis",
327    help=(
328        "Do not use alpha compute APIs. Some tests may be "
329        "incompatible with this option (gRPC health checks are "
330        "currently alpha and required for simulating server failure"
331    ),
332    default=False,
333    action="store_true",
334)
335args = argp.parse_args()
336
337if args.verbose:
338    logger.setLevel(logging.DEBUG)
339
340# In grpc-testing, use non-legacy network.
341if (
342    args.project_id == "grpc-testing"
343    and args.network
344    and args.network == argp.get_default("network")
345):
346    args.network += "-vpc"
347
348CLIENT_HOSTS = []
349if args.client_hosts:
350    CLIENT_HOSTS = args.client_hosts.split(",")
351
352# Each of the config propagation in the control plane should finish within 600s.
353# Otherwise, it indicates a bug in the control plane. The config propagation
354# includes all kinds of traffic config update, like updating urlMap, creating
355# the resources for the first time, updating BackendService, and changing the
356# status of endpoints in BackendService.
357_WAIT_FOR_URL_MAP_PATCH_SEC = 600
358# In general, fetching load balancing stats only takes ~10s. However, slow
359# config update could lead to empty EDS or similar symptoms causing the
360# connection to hang for a long period of time. So, we want to extend the stats
361# wait time to be the same as urlMap patch time.
362_WAIT_FOR_STATS_SEC = _WAIT_FOR_URL_MAP_PATCH_SEC
363
364_DEFAULT_SERVICE_PORT = 80
365_WAIT_FOR_BACKEND_SEC = args.wait_for_backend_sec
366_WAIT_FOR_OPERATION_SEC = 1200
367_INSTANCE_GROUP_SIZE = args.instance_group_size
368_NUM_TEST_RPCS = 10 * args.qps
369_CONNECTION_TIMEOUT_SEC = 60
370_GCP_API_RETRIES = 5
371_BOOTSTRAP_TEMPLATE = """
372{{
373  "node": {{
374    "id": "{node_id}",
375    "metadata": {{
376      "TRAFFICDIRECTOR_NETWORK_NAME": "%s",
377      "com.googleapis.trafficdirector.config_time_trace": "TRUE"
378    }},
379    "locality": {{
380      "zone": "%s"
381    }}
382  }},
383  "xds_servers": [{{
384    "server_uri": "%s",
385    "channel_creds": [
386      {{
387        "type": "google_default",
388        "config": {{}}
389      }}
390    ],
391    "server_features": {server_features}
392  }}]
393}}""" % (
394    args.network.split("/")[-1],
395    args.zone,
396    args.xds_server,
397)
398
399# TODO(ericgribkoff) Add change_backend_service to this list once TD no longer
400# sends an update with no localities when adding the MIG to the backend service
401# can race with the URL map patch.
402_TESTS_TO_FAIL_ON_RPC_FAILURE = ["ping_pong", "round_robin"]
403# Tests that run UnaryCall and EmptyCall.
404_TESTS_TO_RUN_MULTIPLE_RPCS = ["path_matching", "header_matching"]
405# Tests that make UnaryCall with test metadata.
406_TESTS_TO_SEND_METADATA = ["header_matching"]
407_TEST_METADATA_KEY = "xds_md"
408_TEST_METADATA_VALUE_UNARY = "unary_yranu"
409_TEST_METADATA_VALUE_EMPTY = "empty_ytpme"
410# Extra RPC metadata whose value is a number, sent with UnaryCall only.
411_TEST_METADATA_NUMERIC_KEY = "xds_md_numeric"
412_TEST_METADATA_NUMERIC_VALUE = "159"
413_PATH_MATCHER_NAME = "path-matcher"
414_BASE_TEMPLATE_NAME = "test-template"
415_BASE_INSTANCE_GROUP_NAME = "test-ig"
416_BASE_HEALTH_CHECK_NAME = "test-hc"
417_BASE_FIREWALL_RULE_NAME = "test-fw-rule"
418_BASE_BACKEND_SERVICE_NAME = "test-backend-service"
419_BASE_URL_MAP_NAME = "test-map"
420_BASE_SERVICE_HOST = "grpc-test"
421_BASE_TARGET_PROXY_NAME = "test-target-proxy"
422_BASE_FORWARDING_RULE_NAME = "test-forwarding-rule"
423_TEST_LOG_BASE_DIR = os.path.join(
424    os.path.dirname(os.path.abspath(__file__)), "../../reports"
425)
426_SPONGE_LOG_NAME = "sponge_log.log"
427_SPONGE_XML_NAME = "sponge_log.xml"
428
429
430def get_client_stats(num_rpcs, timeout_sec):
431    if CLIENT_HOSTS:
432        hosts = CLIENT_HOSTS
433    else:
434        hosts = ["localhost"]
435    for host in hosts:
436        with grpc.insecure_channel(
437            "%s:%d" % (host, args.stats_port)
438        ) as channel:
439            stub = test_pb2_grpc.LoadBalancerStatsServiceStub(channel)
440            request = messages_pb2.LoadBalancerStatsRequest()
441            request.num_rpcs = num_rpcs
442            request.timeout_sec = timeout_sec
443            rpc_timeout = timeout_sec + _CONNECTION_TIMEOUT_SEC
444            logger.debug(
445                "Invoking GetClientStats RPC to %s:%d:", host, args.stats_port
446            )
447            response = stub.GetClientStats(
448                request, wait_for_ready=True, timeout=rpc_timeout
449            )
450            logger.debug(
451                "Invoked GetClientStats RPC to %s: %s",
452                host,
453                json_format.MessageToJson(response),
454            )
455            return response
456
457
458def get_client_accumulated_stats():
459    if CLIENT_HOSTS:
460        hosts = CLIENT_HOSTS
461    else:
462        hosts = ["localhost"]
463    for host in hosts:
464        with grpc.insecure_channel(
465            "%s:%d" % (host, args.stats_port)
466        ) as channel:
467            stub = test_pb2_grpc.LoadBalancerStatsServiceStub(channel)
468            request = messages_pb2.LoadBalancerAccumulatedStatsRequest()
469            logger.debug(
470                "Invoking GetClientAccumulatedStats RPC to %s:%d:",
471                host,
472                args.stats_port,
473            )
474            response = stub.GetClientAccumulatedStats(
475                request, wait_for_ready=True, timeout=_CONNECTION_TIMEOUT_SEC
476            )
477            logger.debug(
478                "Invoked GetClientAccumulatedStats RPC to %s: %s",
479                host,
480                response,
481            )
482            return response
483
484
485def get_client_xds_config_dump():
486    if CLIENT_HOSTS:
487        hosts = CLIENT_HOSTS
488    else:
489        hosts = ["localhost"]
490    for host in hosts:
491        server_address = "%s:%d" % (host, args.stats_port)
492        with grpc.insecure_channel(server_address) as channel:
493            stub = csds_pb2_grpc.ClientStatusDiscoveryServiceStub(channel)
494            logger.debug("Fetching xDS config dump from %s", server_address)
495            response = stub.FetchClientStatus(
496                csds_pb2.ClientStatusRequest(),
497                wait_for_ready=True,
498                timeout=_CONNECTION_TIMEOUT_SEC,
499            )
500            logger.debug("Fetched xDS config dump from %s", server_address)
501            if len(response.config) != 1:
502                logger.error(
503                    "Unexpected number of ClientConfigs %d: %s",
504                    len(response.config),
505                    response,
506                )
507                return None
508            else:
509                # Converting the ClientStatusResponse into JSON, because many
510                # fields are packed in google.protobuf.Any. It will require many
511                # duplicated code to unpack proto message and inspect values.
512                return json_format.MessageToDict(
513                    response.config[0], preserving_proto_field_name=True
514                )
515
516
517def configure_client(rpc_types, metadata=[], timeout_sec=None):
518    if CLIENT_HOSTS:
519        hosts = CLIENT_HOSTS
520    else:
521        hosts = ["localhost"]
522    for host in hosts:
523        with grpc.insecure_channel(
524            "%s:%d" % (host, args.stats_port)
525        ) as channel:
526            stub = test_pb2_grpc.XdsUpdateClientConfigureServiceStub(channel)
527            request = messages_pb2.ClientConfigureRequest()
528            request.types.extend(rpc_types)
529            for rpc_type, md_key, md_value in metadata:
530                md = request.metadata.add()
531                md.type = rpc_type
532                md.key = md_key
533                md.value = md_value
534            if timeout_sec:
535                request.timeout_sec = timeout_sec
536            logger.debug(
537                "Invoking XdsUpdateClientConfigureService RPC to %s:%d: %s",
538                host,
539                args.stats_port,
540                request,
541            )
542            stub.Configure(
543                request, wait_for_ready=True, timeout=_CONNECTION_TIMEOUT_SEC
544            )
545            logger.debug(
546                "Invoked XdsUpdateClientConfigureService RPC to %s", host
547            )
548
549
550class RpcDistributionError(Exception):
551    pass
552
553
554def _verify_rpcs_to_given_backends(
555    backends, timeout_sec, num_rpcs, allow_failures
556):
557    start_time = time.time()
558    error_msg = None
559    logger.debug(
560        "Waiting for %d sec until backends %s receive load"
561        % (timeout_sec, backends)
562    )
563    while time.time() - start_time <= timeout_sec:
564        error_msg = None
565        stats = get_client_stats(num_rpcs, timeout_sec)
566        rpcs_by_peer = stats.rpcs_by_peer
567        for backend in backends:
568            if backend not in rpcs_by_peer:
569                error_msg = "Backend %s did not receive load" % backend
570                break
571        if not error_msg and len(rpcs_by_peer) > len(backends):
572            error_msg = "Unexpected backend received load: %s" % rpcs_by_peer
573        if not allow_failures and stats.num_failures > 0:
574            error_msg = "%d RPCs failed" % stats.num_failures
575        if not error_msg:
576            return
577    raise RpcDistributionError(error_msg)
578
579
580def wait_until_all_rpcs_go_to_given_backends_or_fail(
581    backends, timeout_sec, num_rpcs=_NUM_TEST_RPCS
582):
583    _verify_rpcs_to_given_backends(
584        backends, timeout_sec, num_rpcs, allow_failures=True
585    )
586
587
588def wait_until_all_rpcs_go_to_given_backends(
589    backends, timeout_sec, num_rpcs=_NUM_TEST_RPCS
590):
591    _verify_rpcs_to_given_backends(
592        backends, timeout_sec, num_rpcs, allow_failures=False
593    )
594
595
596def wait_until_no_rpcs_go_to_given_backends(backends, timeout_sec):
597    start_time = time.time()
598    while time.time() - start_time <= timeout_sec:
599        stats = get_client_stats(_NUM_TEST_RPCS, timeout_sec)
600        error_msg = None
601        rpcs_by_peer = stats.rpcs_by_peer
602        for backend in backends:
603            if backend in rpcs_by_peer:
604                error_msg = "Unexpected backend %s receives load" % backend
605                break
606        if not error_msg:
607            return
608    raise Exception("Unexpected RPCs going to given backends")
609
610
611def wait_until_rpcs_in_flight(rpc_type, timeout_sec, num_rpcs, threshold):
612    """Block until the test client reaches the state with the given number
613    of RPCs being outstanding stably.
614
615    Args:
616      rpc_type: A string indicating the RPC method to check for. Either
617        'UnaryCall' or 'EmptyCall'.
618      timeout_sec: Maximum number of seconds to wait until the desired state
619        is reached.
620      num_rpcs: Expected number of RPCs to be in-flight.
621      threshold: Number within [0,100], the tolerable percentage by which
622        the actual number of RPCs in-flight can differ from the expected number.
623    """
624    if threshold < 0 or threshold > 100:
625        raise ValueError("Value error: Threshold should be between 0 to 100")
626    threshold_fraction = threshold / 100.0
627    start_time = time.time()
628    error_msg = None
629    logger.debug(
630        "Waiting for %d sec until %d %s RPCs (with %d%% tolerance) in-flight"
631        % (timeout_sec, num_rpcs, rpc_type, threshold)
632    )
633    while time.time() - start_time <= timeout_sec:
634        error_msg = _check_rpcs_in_flight(
635            rpc_type, num_rpcs, threshold, threshold_fraction
636        )
637        if error_msg:
638            logger.debug("Progress: %s", error_msg)
639            time.sleep(2)
640        else:
641            break
642    # Ensure the number of outstanding RPCs is stable.
643    if not error_msg:
644        time.sleep(5)
645        error_msg = _check_rpcs_in_flight(
646            rpc_type, num_rpcs, threshold, threshold_fraction
647        )
648    if error_msg:
649        raise Exception(
650            "Wrong number of %s RPCs in-flight: %s" % (rpc_type, error_msg)
651        )
652
653
654def _check_rpcs_in_flight(rpc_type, num_rpcs, threshold, threshold_fraction):
655    error_msg = None
656    stats = get_client_accumulated_stats()
657    rpcs_started = stats.num_rpcs_started_by_method[rpc_type]
658    rpcs_succeeded = stats.num_rpcs_succeeded_by_method[rpc_type]
659    rpcs_failed = stats.num_rpcs_failed_by_method[rpc_type]
660    rpcs_in_flight = rpcs_started - rpcs_succeeded - rpcs_failed
661    if rpcs_in_flight < (num_rpcs * (1 - threshold_fraction)):
662        error_msg = "actual(%d) < expected(%d - %d%%)" % (
663            rpcs_in_flight,
664            num_rpcs,
665            threshold,
666        )
667    elif rpcs_in_flight > (num_rpcs * (1 + threshold_fraction)):
668        error_msg = "actual(%d) > expected(%d + %d%%)" % (
669            rpcs_in_flight,
670            num_rpcs,
671            threshold,
672        )
673    return error_msg
674
675
676def compare_distributions(
677    actual_distribution, expected_distribution, threshold
678):
679    """Compare if two distributions are similar.
680
681    Args:
682      actual_distribution: A list of floats, contains the actual distribution.
683      expected_distribution: A list of floats, contains the expected distribution.
684      threshold: Number within [0,100], the threshold percentage by which the
685        actual distribution can differ from the expected distribution.
686
687    Returns:
688      The similarity between the distributions as a boolean. Returns true if the
689      actual distribution lies within the threshold of the expected
690      distribution, false otherwise.
691
692    Raises:
693      ValueError: if threshold is not with in [0,100].
694      Exception: containing detailed error messages.
695    """
696    if len(expected_distribution) != len(actual_distribution):
697        raise Exception(
698            "Error: expected and actual distributions have different size (%d"
699            " vs %d)" % (len(expected_distribution), len(actual_distribution))
700        )
701    if threshold < 0 or threshold > 100:
702        raise ValueError("Value error: Threshold should be between 0 to 100")
703    threshold_fraction = threshold / 100.0
704    for expected, actual in zip(expected_distribution, actual_distribution):
705        if actual < (expected * (1 - threshold_fraction)):
706            raise Exception(
707                "actual(%f) < expected(%f-%d%%)" % (actual, expected, threshold)
708            )
709        if actual > (expected * (1 + threshold_fraction)):
710            raise Exception(
711                "actual(%f) > expected(%f+%d%%)" % (actual, expected, threshold)
712            )
713    return True
714
715
716def compare_expected_instances(stats, expected_instances):
717    """Compare if stats have expected instances for each type of RPC.
718
719    Args:
720      stats: LoadBalancerStatsResponse reported by interop client.
721      expected_instances: a dict with key as the RPC type (string), value as
722        the expected backend instances (list of strings).
723
724    Returns:
725      Returns true if the instances are expected. False if not.
726    """
727    for rpc_type, expected_peers in list(expected_instances.items()):
728        rpcs_by_peer_for_type = stats.rpcs_by_method[rpc_type]
729        rpcs_by_peer = (
730            rpcs_by_peer_for_type.rpcs_by_peer
731            if rpcs_by_peer_for_type
732            else None
733        )
734        logger.debug("rpc: %s, by_peer: %s", rpc_type, rpcs_by_peer)
735        peers = list(rpcs_by_peer.keys())
736        if set(peers) != set(expected_peers):
737            logger.info(
738                "unexpected peers for %s, got %s, want %s",
739                rpc_type,
740                peers,
741                expected_peers,
742            )
743            return False
744    return True
745
746
747def test_backends_restart(gcp, backend_service, instance_group):
748    logger.info("Running test_backends_restart")
749    instance_names = get_instance_names(gcp, instance_group)
750    num_instances = len(instance_names)
751    start_time = time.time()
752    wait_until_all_rpcs_go_to_given_backends(
753        instance_names, _WAIT_FOR_STATS_SEC
754    )
755    try:
756        resize_instance_group(gcp, instance_group, 0)
757        wait_until_all_rpcs_go_to_given_backends_or_fail(
758            [], _WAIT_FOR_BACKEND_SEC
759        )
760    finally:
761        resize_instance_group(gcp, instance_group, num_instances)
762    wait_for_healthy_backends(gcp, backend_service, instance_group)
763    new_instance_names = get_instance_names(gcp, instance_group)
764    wait_until_all_rpcs_go_to_given_backends(
765        new_instance_names, _WAIT_FOR_BACKEND_SEC
766    )
767
768
769def test_change_backend_service(
770    gcp,
771    original_backend_service,
772    instance_group,
773    alternate_backend_service,
774    same_zone_instance_group,
775):
776    logger.info("Running test_change_backend_service")
777    original_backend_instances = get_instance_names(gcp, instance_group)
778    alternate_backend_instances = get_instance_names(
779        gcp, same_zone_instance_group
780    )
781    patch_backend_service(
782        gcp, alternate_backend_service, [same_zone_instance_group]
783    )
784    wait_for_healthy_backends(gcp, original_backend_service, instance_group)
785    wait_for_healthy_backends(
786        gcp, alternate_backend_service, same_zone_instance_group
787    )
788    wait_until_all_rpcs_go_to_given_backends(
789        original_backend_instances, _WAIT_FOR_STATS_SEC
790    )
791    passed = True
792    try:
793        patch_url_map_backend_service(gcp, alternate_backend_service)
794        wait_until_all_rpcs_go_to_given_backends(
795            alternate_backend_instances, _WAIT_FOR_URL_MAP_PATCH_SEC
796        )
797    except Exception:
798        passed = False
799        raise
800    finally:
801        if passed or not args.halt_after_fail:
802            patch_url_map_backend_service(gcp, original_backend_service)
803            patch_backend_service(gcp, alternate_backend_service, [])
804
805
806def test_gentle_failover(
807    gcp,
808    backend_service,
809    primary_instance_group,
810    secondary_instance_group,
811    swapped_primary_and_secondary=False,
812):
813    logger.info("Running test_gentle_failover")
814    num_primary_instances = len(get_instance_names(gcp, primary_instance_group))
815    min_instances_for_gentle_failover = 3  # Need >50% failure to start failover
816    passed = True
817    try:
818        if num_primary_instances < min_instances_for_gentle_failover:
819            resize_instance_group(
820                gcp, primary_instance_group, min_instances_for_gentle_failover
821            )
822        patch_backend_service(
823            gcp,
824            backend_service,
825            [primary_instance_group, secondary_instance_group],
826        )
827        primary_instance_names = get_instance_names(gcp, primary_instance_group)
828        secondary_instance_names = get_instance_names(
829            gcp, secondary_instance_group
830        )
831        wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
832        wait_for_healthy_backends(
833            gcp, backend_service, secondary_instance_group
834        )
835        wait_until_all_rpcs_go_to_given_backends(
836            primary_instance_names, _WAIT_FOR_STATS_SEC
837        )
838        instances_to_stop = primary_instance_names[:-1]
839        remaining_instances = primary_instance_names[-1:]
840        try:
841            set_serving_status(
842                instances_to_stop, gcp.service_port, serving=False
843            )
844            wait_until_all_rpcs_go_to_given_backends(
845                remaining_instances + secondary_instance_names,
846                _WAIT_FOR_BACKEND_SEC,
847            )
848        finally:
849            set_serving_status(
850                primary_instance_names, gcp.service_port, serving=True
851            )
852    except RpcDistributionError as e:
853        if not swapped_primary_and_secondary and is_primary_instance_group(
854            gcp, secondary_instance_group
855        ):
856            # Swap expectation of primary and secondary instance groups.
857            test_gentle_failover(
858                gcp,
859                backend_service,
860                secondary_instance_group,
861                primary_instance_group,
862                swapped_primary_and_secondary=True,
863            )
864        else:
865            passed = False
866            raise e
867    except Exception:
868        passed = False
869        raise
870    finally:
871        if passed or not args.halt_after_fail:
872            patch_backend_service(
873                gcp, backend_service, [primary_instance_group]
874            )
875            resize_instance_group(
876                gcp, primary_instance_group, num_primary_instances
877            )
878            instance_names = get_instance_names(gcp, primary_instance_group)
879            wait_until_all_rpcs_go_to_given_backends(
880                instance_names, _WAIT_FOR_BACKEND_SEC
881            )
882
883
884def test_load_report_based_failover(
885    gcp, backend_service, primary_instance_group, secondary_instance_group
886):
887    logger.info("Running test_load_report_based_failover")
888    passed = True
889    try:
890        patch_backend_service(
891            gcp,
892            backend_service,
893            [primary_instance_group, secondary_instance_group],
894        )
895        primary_instance_names = get_instance_names(gcp, primary_instance_group)
896        secondary_instance_names = get_instance_names(
897            gcp, secondary_instance_group
898        )
899        wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
900        wait_for_healthy_backends(
901            gcp, backend_service, secondary_instance_group
902        )
903        wait_until_all_rpcs_go_to_given_backends(
904            primary_instance_names, _WAIT_FOR_STATS_SEC
905        )
906        # Set primary locality's balance mode to RATE, and RPS to 20% of the
907        # client's QPS. The secondary locality will be used.
908        max_rate = int(args.qps * 1 / 5)
909        logger.info(
910            "Patching backend service to RATE with %d max_rate", max_rate
911        )
912        patch_backend_service(
913            gcp,
914            backend_service,
915            [primary_instance_group, secondary_instance_group],
916            balancing_mode="RATE",
917            max_rate=max_rate,
918        )
919        wait_until_all_rpcs_go_to_given_backends(
920            primary_instance_names + secondary_instance_names,
921            _WAIT_FOR_BACKEND_SEC,
922        )
923
924        # Set primary locality's balance mode to RATE, and RPS to 120% of the
925        # client's QPS. Only the primary locality will be used.
926        max_rate = int(args.qps * 6 / 5)
927        logger.info(
928            "Patching backend service to RATE with %d max_rate", max_rate
929        )
930        patch_backend_service(
931            gcp,
932            backend_service,
933            [primary_instance_group, secondary_instance_group],
934            balancing_mode="RATE",
935            max_rate=max_rate,
936        )
937        wait_until_all_rpcs_go_to_given_backends(
938            primary_instance_names, _WAIT_FOR_BACKEND_SEC
939        )
940        logger.info("success")
941    except Exception:
942        passed = False
943        raise
944    finally:
945        if passed or not args.halt_after_fail:
946            patch_backend_service(
947                gcp, backend_service, [primary_instance_group]
948            )
949            instance_names = get_instance_names(gcp, primary_instance_group)
950            wait_until_all_rpcs_go_to_given_backends(
951                instance_names, _WAIT_FOR_BACKEND_SEC
952            )
953
954
955def test_ping_pong(gcp, backend_service, instance_group):
956    logger.info("Running test_ping_pong")
957    wait_for_healthy_backends(gcp, backend_service, instance_group)
958    instance_names = get_instance_names(gcp, instance_group)
959    wait_until_all_rpcs_go_to_given_backends(
960        instance_names, _WAIT_FOR_STATS_SEC
961    )
962
963
964def test_remove_instance_group(
965    gcp, backend_service, instance_group, same_zone_instance_group
966):
967    logger.info("Running test_remove_instance_group")
968    passed = True
969    try:
970        patch_backend_service(
971            gcp,
972            backend_service,
973            [instance_group, same_zone_instance_group],
974            balancing_mode="RATE",
975        )
976        wait_for_healthy_backends(gcp, backend_service, instance_group)
977        wait_for_healthy_backends(
978            gcp, backend_service, same_zone_instance_group
979        )
980        instance_names = get_instance_names(gcp, instance_group)
981        same_zone_instance_names = get_instance_names(
982            gcp, same_zone_instance_group
983        )
984        try:
985            wait_until_all_rpcs_go_to_given_backends(
986                instance_names + same_zone_instance_names,
987                _WAIT_FOR_OPERATION_SEC,
988            )
989            remaining_instance_group = same_zone_instance_group
990            remaining_instance_names = same_zone_instance_names
991        except RpcDistributionError as e:
992            # If connected to TD in a different zone, we may route traffic to
993            # only one instance group. Determine which group that is to continue
994            # with the remainder of the test case.
995            try:
996                wait_until_all_rpcs_go_to_given_backends(
997                    instance_names, _WAIT_FOR_STATS_SEC
998                )
999                remaining_instance_group = same_zone_instance_group
1000                remaining_instance_names = same_zone_instance_names
1001            except RpcDistributionError as e:
1002                wait_until_all_rpcs_go_to_given_backends(
1003                    same_zone_instance_names, _WAIT_FOR_STATS_SEC
1004                )
1005                remaining_instance_group = instance_group
1006                remaining_instance_names = instance_names
1007        patch_backend_service(
1008            gcp,
1009            backend_service,
1010            [remaining_instance_group],
1011            balancing_mode="RATE",
1012        )
1013        wait_until_all_rpcs_go_to_given_backends(
1014            remaining_instance_names, _WAIT_FOR_BACKEND_SEC
1015        )
1016    except Exception:
1017        passed = False
1018        raise
1019    finally:
1020        if passed or not args.halt_after_fail:
1021            patch_backend_service(gcp, backend_service, [instance_group])
1022            wait_until_all_rpcs_go_to_given_backends(
1023                instance_names, _WAIT_FOR_BACKEND_SEC
1024            )
1025
1026
1027def test_round_robin(gcp, backend_service, instance_group):
1028    logger.info("Running test_round_robin")
1029    wait_for_healthy_backends(gcp, backend_service, instance_group)
1030    instance_names = get_instance_names(gcp, instance_group)
1031    threshold = 1
1032    wait_until_all_rpcs_go_to_given_backends(
1033        instance_names, _WAIT_FOR_STATS_SEC
1034    )
1035    # TODO(ericgribkoff) Delayed config propagation from earlier tests
1036    # may result in briefly receiving an empty EDS update, resulting in failed
1037    # RPCs. Retry distribution validation if this occurs; long-term fix is
1038    # creating new backend resources for each individual test case.
1039    # Each attempt takes 10 seconds. Config propagation can take several
1040    # minutes.
1041    max_attempts = 40
1042    for i in range(max_attempts):
1043        stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
1044        requests_received = [stats.rpcs_by_peer[x] for x in stats.rpcs_by_peer]
1045        total_requests_received = sum(requests_received)
1046        if total_requests_received != _NUM_TEST_RPCS:
1047            logger.info("Unexpected RPC failures, retrying: %s", stats)
1048            continue
1049        expected_requests = total_requests_received / len(instance_names)
1050        for instance in instance_names:
1051            if (
1052                abs(stats.rpcs_by_peer[instance] - expected_requests)
1053                > threshold
1054            ):
1055                raise Exception(
1056                    "RPC peer distribution differs from expected by more than"
1057                    " %d for instance %s (%s)" % (threshold, instance, stats)
1058                )
1059        return
1060    raise Exception("RPC failures persisted through %d retries" % max_attempts)
1061
1062
1063def test_secondary_locality_gets_no_requests_on_partial_primary_failure(
1064    gcp,
1065    backend_service,
1066    primary_instance_group,
1067    secondary_instance_group,
1068    swapped_primary_and_secondary=False,
1069):
1070    logger.info(
1071        "Running secondary_locality_gets_no_requests_on_partial_primary_failure"
1072    )
1073    passed = True
1074    try:
1075        patch_backend_service(
1076            gcp,
1077            backend_service,
1078            [primary_instance_group, secondary_instance_group],
1079        )
1080        wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
1081        wait_for_healthy_backends(
1082            gcp, backend_service, secondary_instance_group
1083        )
1084        primary_instance_names = get_instance_names(gcp, primary_instance_group)
1085        wait_until_all_rpcs_go_to_given_backends(
1086            primary_instance_names, _WAIT_FOR_STATS_SEC
1087        )
1088        instances_to_stop = primary_instance_names[:1]
1089        remaining_instances = primary_instance_names[1:]
1090        try:
1091            set_serving_status(
1092                instances_to_stop, gcp.service_port, serving=False
1093            )
1094            wait_until_all_rpcs_go_to_given_backends(
1095                remaining_instances, _WAIT_FOR_BACKEND_SEC
1096            )
1097        finally:
1098            set_serving_status(
1099                primary_instance_names, gcp.service_port, serving=True
1100            )
1101    except RpcDistributionError as e:
1102        if not swapped_primary_and_secondary and is_primary_instance_group(
1103            gcp, secondary_instance_group
1104        ):
1105            # Swap expectation of primary and secondary instance groups.
1106            test_secondary_locality_gets_no_requests_on_partial_primary_failure(
1107                gcp,
1108                backend_service,
1109                secondary_instance_group,
1110                primary_instance_group,
1111                swapped_primary_and_secondary=True,
1112            )
1113        else:
1114            passed = False
1115            raise e
1116    finally:
1117        if passed or not args.halt_after_fail:
1118            patch_backend_service(
1119                gcp, backend_service, [primary_instance_group]
1120            )
1121
1122
1123def test_secondary_locality_gets_requests_on_primary_failure(
1124    gcp,
1125    backend_service,
1126    primary_instance_group,
1127    secondary_instance_group,
1128    swapped_primary_and_secondary=False,
1129):
1130    logger.info("Running secondary_locality_gets_requests_on_primary_failure")
1131    passed = True
1132    try:
1133        patch_backend_service(
1134            gcp,
1135            backend_service,
1136            [primary_instance_group, secondary_instance_group],
1137        )
1138        wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
1139        wait_for_healthy_backends(
1140            gcp, backend_service, secondary_instance_group
1141        )
1142        primary_instance_names = get_instance_names(gcp, primary_instance_group)
1143        secondary_instance_names = get_instance_names(
1144            gcp, secondary_instance_group
1145        )
1146        wait_until_all_rpcs_go_to_given_backends(
1147            primary_instance_names, _WAIT_FOR_STATS_SEC
1148        )
1149        try:
1150            set_serving_status(
1151                primary_instance_names, gcp.service_port, serving=False
1152            )
1153            wait_until_all_rpcs_go_to_given_backends(
1154                secondary_instance_names, _WAIT_FOR_BACKEND_SEC
1155            )
1156        finally:
1157            set_serving_status(
1158                primary_instance_names, gcp.service_port, serving=True
1159            )
1160    except RpcDistributionError as e:
1161        if not swapped_primary_and_secondary and is_primary_instance_group(
1162            gcp, secondary_instance_group
1163        ):
1164            # Swap expectation of primary and secondary instance groups.
1165            test_secondary_locality_gets_requests_on_primary_failure(
1166                gcp,
1167                backend_service,
1168                secondary_instance_group,
1169                primary_instance_group,
1170                swapped_primary_and_secondary=True,
1171            )
1172        else:
1173            passed = False
1174            raise e
1175    finally:
1176        if passed or not args.halt_after_fail:
1177            patch_backend_service(
1178                gcp, backend_service, [primary_instance_group]
1179            )
1180
1181
1182def prepare_services_for_urlmap_tests(
1183    gcp,
1184    original_backend_service,
1185    instance_group,
1186    alternate_backend_service,
1187    same_zone_instance_group,
1188):
1189    """
1190    This function prepares the services to be ready for tests that modifies
1191    urlmaps.
1192
1193    Returns:
1194      Returns original and alternate backend names as lists of strings.
1195    """
1196    logger.info("waiting for original backends to become healthy")
1197    wait_for_healthy_backends(gcp, original_backend_service, instance_group)
1198
1199    patch_backend_service(
1200        gcp, alternate_backend_service, [same_zone_instance_group]
1201    )
1202    logger.info("waiting for alternate to become healthy")
1203    wait_for_healthy_backends(
1204        gcp, alternate_backend_service, same_zone_instance_group
1205    )
1206
1207    original_backend_instances = get_instance_names(gcp, instance_group)
1208    logger.info("original backends instances: %s", original_backend_instances)
1209
1210    alternate_backend_instances = get_instance_names(
1211        gcp, same_zone_instance_group
1212    )
1213    logger.info("alternate backends instances: %s", alternate_backend_instances)
1214
1215    # Start with all traffic going to original_backend_service.
1216    logger.info("waiting for traffic to all go to original backends")
1217    wait_until_all_rpcs_go_to_given_backends(
1218        original_backend_instances, _WAIT_FOR_STATS_SEC
1219    )
1220    return original_backend_instances, alternate_backend_instances
1221
1222
1223def test_metadata_filter(
1224    gcp,
1225    original_backend_service,
1226    instance_group,
1227    alternate_backend_service,
1228    same_zone_instance_group,
1229):
1230    logger.info("Running test_metadata_filter")
1231    wait_for_healthy_backends(gcp, original_backend_service, instance_group)
1232    original_backend_instances = get_instance_names(gcp, instance_group)
1233    alternate_backend_instances = get_instance_names(
1234        gcp, same_zone_instance_group
1235    )
1236    patch_backend_service(
1237        gcp, alternate_backend_service, [same_zone_instance_group]
1238    )
1239    wait_for_healthy_backends(
1240        gcp, alternate_backend_service, same_zone_instance_group
1241    )
1242    passed = True
1243    try:
1244        with open(bootstrap_path) as f:
1245            md = json.load(f)["node"]["metadata"]
1246            match_labels = []
1247            for k, v in list(md.items()):
1248                match_labels.append({"name": k, "value": v})
1249
1250        not_match_labels = [{"name": "fake", "value": "fail"}]
1251        test_route_rules = [
1252            # test MATCH_ALL
1253            [
1254                {
1255                    "priority": 0,
1256                    "matchRules": [
1257                        {
1258                            "prefixMatch": "/",
1259                            "metadataFilters": [
1260                                {
1261                                    "filterMatchCriteria": "MATCH_ALL",
1262                                    "filterLabels": not_match_labels,
1263                                }
1264                            ],
1265                        }
1266                    ],
1267                    "service": original_backend_service.url,
1268                },
1269                {
1270                    "priority": 1,
1271                    "matchRules": [
1272                        {
1273                            "prefixMatch": "/",
1274                            "metadataFilters": [
1275                                {
1276                                    "filterMatchCriteria": "MATCH_ALL",
1277                                    "filterLabels": match_labels,
1278                                }
1279                            ],
1280                        }
1281                    ],
1282                    "service": alternate_backend_service.url,
1283                },
1284            ],
1285            # test mixing MATCH_ALL and MATCH_ANY
1286            # test MATCH_ALL: super set labels won't match
1287            [
1288                {
1289                    "priority": 0,
1290                    "matchRules": [
1291                        {
1292                            "prefixMatch": "/",
1293                            "metadataFilters": [
1294                                {
1295                                    "filterMatchCriteria": "MATCH_ALL",
1296                                    "filterLabels": not_match_labels
1297                                    + match_labels,
1298                                }
1299                            ],
1300                        }
1301                    ],
1302                    "service": original_backend_service.url,
1303                },
1304                {
1305                    "priority": 1,
1306                    "matchRules": [
1307                        {
1308                            "prefixMatch": "/",
1309                            "metadataFilters": [
1310                                {
1311                                    "filterMatchCriteria": "MATCH_ANY",
1312                                    "filterLabels": not_match_labels
1313                                    + match_labels,
1314                                }
1315                            ],
1316                        }
1317                    ],
1318                    "service": alternate_backend_service.url,
1319                },
1320            ],
1321            # test MATCH_ANY
1322            [
1323                {
1324                    "priority": 0,
1325                    "matchRules": [
1326                        {
1327                            "prefixMatch": "/",
1328                            "metadataFilters": [
1329                                {
1330                                    "filterMatchCriteria": "MATCH_ANY",
1331                                    "filterLabels": not_match_labels,
1332                                }
1333                            ],
1334                        }
1335                    ],
1336                    "service": original_backend_service.url,
1337                },
1338                {
1339                    "priority": 1,
1340                    "matchRules": [
1341                        {
1342                            "prefixMatch": "/",
1343                            "metadataFilters": [
1344                                {
1345                                    "filterMatchCriteria": "MATCH_ANY",
1346                                    "filterLabels": not_match_labels
1347                                    + match_labels,
1348                                }
1349                            ],
1350                        }
1351                    ],
1352                    "service": alternate_backend_service.url,
1353                },
1354            ],
1355            # test match multiple route rules
1356            [
1357                {
1358                    "priority": 0,
1359                    "matchRules": [
1360                        {
1361                            "prefixMatch": "/",
1362                            "metadataFilters": [
1363                                {
1364                                    "filterMatchCriteria": "MATCH_ANY",
1365                                    "filterLabels": match_labels,
1366                                }
1367                            ],
1368                        }
1369                    ],
1370                    "service": alternate_backend_service.url,
1371                },
1372                {
1373                    "priority": 1,
1374                    "matchRules": [
1375                        {
1376                            "prefixMatch": "/",
1377                            "metadataFilters": [
1378                                {
1379                                    "filterMatchCriteria": "MATCH_ALL",
1380                                    "filterLabels": match_labels,
1381                                }
1382                            ],
1383                        }
1384                    ],
1385                    "service": original_backend_service.url,
1386                },
1387            ],
1388        ]
1389
1390        for route_rules in test_route_rules:
1391            wait_until_all_rpcs_go_to_given_backends(
1392                original_backend_instances, _WAIT_FOR_STATS_SEC
1393            )
1394            patch_url_map_backend_service(
1395                gcp, original_backend_service, route_rules=route_rules
1396            )
1397            wait_until_no_rpcs_go_to_given_backends(
1398                original_backend_instances, _WAIT_FOR_STATS_SEC
1399            )
1400            wait_until_all_rpcs_go_to_given_backends(
1401                alternate_backend_instances, _WAIT_FOR_STATS_SEC
1402            )
1403            patch_url_map_backend_service(gcp, original_backend_service)
1404    except Exception:
1405        passed = False
1406        raise
1407    finally:
1408        if passed or not args.halt_after_fail:
1409            patch_backend_service(gcp, alternate_backend_service, [])
1410
1411
1412def test_api_listener(
1413    gcp, backend_service, instance_group, alternate_backend_service
1414):
1415    logger.info("Running api_listener")
1416    passed = True
1417    try:
1418        wait_for_healthy_backends(gcp, backend_service, instance_group)
1419        backend_instances = get_instance_names(gcp, instance_group)
1420        wait_until_all_rpcs_go_to_given_backends(
1421            backend_instances, _WAIT_FOR_STATS_SEC
1422        )
1423        # create a second suite of map+tp+fr with the same host name in host rule
1424        # and we have to disable proxyless validation because it needs `0.0.0.0`
1425        # ip address in fr for proxyless and also we violate ip:port uniqueness
1426        # for test purpose. See https://github.com/grpc/grpc-java/issues/8009
1427        new_config_suffix = "2"
1428        url_map_2 = create_url_map(
1429            gcp,
1430            url_map_name + new_config_suffix,
1431            backend_service,
1432            service_host_name,
1433        )
1434        target_proxy_2 = create_target_proxy(
1435            gcp, target_proxy_name + new_config_suffix, False, url_map_2
1436        )
1437        if not gcp.service_port:
1438            raise Exception(
1439                "Faied to find a valid port for the forwarding rule"
1440            )
1441        potential_ip_addresses = []
1442        max_attempts = 10
1443        for i in range(max_attempts):
1444            potential_ip_addresses.append(
1445                "10.10.10.%d" % (random.randint(0, 255))
1446            )
1447        create_global_forwarding_rule(
1448            gcp,
1449            forwarding_rule_name + new_config_suffix,
1450            [gcp.service_port],
1451            potential_ip_addresses,
1452            target_proxy_2,
1453        )
1454        if gcp.service_port != _DEFAULT_SERVICE_PORT:
1455            patch_url_map_host_rule_with_port(
1456                gcp,
1457                url_map_name + new_config_suffix,
1458                backend_service,
1459                service_host_name,
1460            )
1461        wait_until_all_rpcs_go_to_given_backends(
1462            backend_instances, _WAIT_FOR_STATS_SEC
1463        )
1464
1465        delete_global_forwarding_rule(gcp, gcp.global_forwarding_rules[0])
1466        delete_target_proxy(gcp, gcp.target_proxies[0])
1467        delete_url_map(gcp, gcp.url_maps[0])
1468        verify_attempts = int(
1469            _WAIT_FOR_URL_MAP_PATCH_SEC / _NUM_TEST_RPCS * args.qps
1470        )
1471        for i in range(verify_attempts):
1472            wait_until_all_rpcs_go_to_given_backends(
1473                backend_instances, _WAIT_FOR_STATS_SEC
1474            )
1475        # delete host rule for the original host name
1476        patch_url_map_backend_service(gcp, alternate_backend_service)
1477        wait_until_no_rpcs_go_to_given_backends(
1478            backend_instances, _WAIT_FOR_STATS_SEC
1479        )
1480
1481    except Exception:
1482        passed = False
1483        raise
1484    finally:
1485        if passed or not args.halt_after_fail:
1486            delete_global_forwarding_rules(gcp)
1487            delete_target_proxies(gcp)
1488            delete_url_maps(gcp)
1489            create_url_map(
1490                gcp, url_map_name, backend_service, service_host_name
1491            )
1492            create_target_proxy(gcp, target_proxy_name)
1493            create_global_forwarding_rule(
1494                gcp, forwarding_rule_name, potential_service_ports
1495            )
1496            if gcp.service_port != _DEFAULT_SERVICE_PORT:
1497                patch_url_map_host_rule_with_port(
1498                    gcp, url_map_name, backend_service, service_host_name
1499                )
1500                server_uri = service_host_name + ":" + str(gcp.service_port)
1501            else:
1502                server_uri = service_host_name
1503            return server_uri
1504
1505
1506def test_forwarding_rule_port_match(gcp, backend_service, instance_group):
1507    logger.info("Running test_forwarding_rule_port_match")
1508    passed = True
1509    try:
1510        wait_for_healthy_backends(gcp, backend_service, instance_group)
1511        backend_instances = get_instance_names(gcp, instance_group)
1512        wait_until_all_rpcs_go_to_given_backends(
1513            backend_instances, _WAIT_FOR_STATS_SEC
1514        )
1515        delete_global_forwarding_rules(gcp)
1516        create_global_forwarding_rule(
1517            gcp,
1518            forwarding_rule_name,
1519            [
1520                x
1521                for x in parse_port_range(_DEFAULT_PORT_RANGE)
1522                if x != gcp.service_port
1523            ],
1524        )
1525        wait_until_no_rpcs_go_to_given_backends(
1526            backend_instances, _WAIT_FOR_STATS_SEC
1527        )
1528    except Exception:
1529        passed = False
1530        raise
1531    finally:
1532        if passed or not args.halt_after_fail:
1533            delete_global_forwarding_rules(gcp)
1534            create_global_forwarding_rule(
1535                gcp, forwarding_rule_name, potential_service_ports
1536            )
1537            if gcp.service_port != _DEFAULT_SERVICE_PORT:
1538                patch_url_map_host_rule_with_port(
1539                    gcp, url_map_name, backend_service, service_host_name
1540                )
1541                server_uri = service_host_name + ":" + str(gcp.service_port)
1542            else:
1543                server_uri = service_host_name
1544            return server_uri
1545
1546
1547def test_forwarding_rule_default_port(gcp, backend_service, instance_group):
1548    logger.info("Running test_forwarding_rule_default_port")
1549    passed = True
1550    try:
1551        wait_for_healthy_backends(gcp, backend_service, instance_group)
1552        backend_instances = get_instance_names(gcp, instance_group)
1553        if gcp.service_port == _DEFAULT_SERVICE_PORT:
1554            wait_until_all_rpcs_go_to_given_backends(
1555                backend_instances, _WAIT_FOR_STATS_SEC
1556            )
1557            delete_global_forwarding_rules(gcp)
1558            create_global_forwarding_rule(
1559                gcp, forwarding_rule_name, parse_port_range(_DEFAULT_PORT_RANGE)
1560            )
1561            patch_url_map_host_rule_with_port(
1562                gcp, url_map_name, backend_service, service_host_name
1563            )
1564        wait_until_no_rpcs_go_to_given_backends(
1565            backend_instances, _WAIT_FOR_STATS_SEC
1566        )
1567        # expect success when no port in client request service uri, and no port in url-map
1568        delete_global_forwarding_rule(gcp, gcp.global_forwarding_rules[0])
1569        delete_target_proxy(gcp, gcp.target_proxies[0])
1570        delete_url_map(gcp, gcp.url_maps[0])
1571        create_url_map(gcp, url_map_name, backend_service, service_host_name)
1572        create_target_proxy(gcp, target_proxy_name, False)
1573        potential_ip_addresses = []
1574        max_attempts = 10
1575        for i in range(max_attempts):
1576            potential_ip_addresses.append(
1577                "10.10.10.%d" % (random.randint(0, 255))
1578            )
1579        create_global_forwarding_rule(
1580            gcp, forwarding_rule_name, [80], potential_ip_addresses
1581        )
1582        wait_until_all_rpcs_go_to_given_backends(
1583            backend_instances, _WAIT_FOR_STATS_SEC
1584        )
1585
1586        # expect failure when no port in client request uri, but specify port in url-map
1587        patch_url_map_host_rule_with_port(
1588            gcp, url_map_name, backend_service, service_host_name
1589        )
1590        wait_until_no_rpcs_go_to_given_backends(
1591            backend_instances, _WAIT_FOR_STATS_SEC
1592        )
1593    except Exception:
1594        passed = False
1595        raise
1596    finally:
1597        if passed or not args.halt_after_fail:
1598            delete_global_forwarding_rules(gcp)
1599            delete_target_proxies(gcp)
1600            delete_url_maps(gcp)
1601            create_url_map(
1602                gcp, url_map_name, backend_service, service_host_name
1603            )
1604            create_target_proxy(gcp, target_proxy_name)
1605            create_global_forwarding_rule(
1606                gcp, forwarding_rule_name, potential_service_ports
1607            )
1608            if gcp.service_port != _DEFAULT_SERVICE_PORT:
1609                patch_url_map_host_rule_with_port(
1610                    gcp, url_map_name, backend_service, service_host_name
1611                )
1612                server_uri = service_host_name + ":" + str(gcp.service_port)
1613            else:
1614                server_uri = service_host_name
1615            return server_uri
1616
1617
1618def test_traffic_splitting(
1619    gcp,
1620    original_backend_service,
1621    instance_group,
1622    alternate_backend_service,
1623    same_zone_instance_group,
1624):
1625    # This test start with all traffic going to original_backend_service. Then
1626    # it updates URL-map to set default action to traffic splitting between
1627    # original and alternate. It waits for all backends in both services to
1628    # receive traffic, then verifies that weights are expected.
1629    logger.info("Running test_traffic_splitting")
1630
1631    (
1632        original_backend_instances,
1633        alternate_backend_instances,
1634    ) = prepare_services_for_urlmap_tests(
1635        gcp,
1636        original_backend_service,
1637        instance_group,
1638        alternate_backend_service,
1639        same_zone_instance_group,
1640    )
1641
1642    passed = True
1643    try:
1644        # Patch urlmap, change route action to traffic splitting between
1645        # original and alternate.
1646        logger.info("patching url map with traffic splitting")
1647        original_service_percentage, alternate_service_percentage = 20, 80
1648        patch_url_map_backend_service(
1649            gcp,
1650            services_with_weights={
1651                original_backend_service: original_service_percentage,
1652                alternate_backend_service: alternate_service_percentage,
1653            },
1654        )
1655        # Split percentage between instances: [20,80] -> [10,10,40,40].
1656        expected_instance_percentage = [
1657            original_service_percentage * 1.0 / len(original_backend_instances)
1658        ] * len(original_backend_instances) + [
1659            alternate_service_percentage
1660            * 1.0
1661            / len(alternate_backend_instances)
1662        ] * len(
1663            alternate_backend_instances
1664        )
1665
1666        # Wait for traffic to go to both services.
1667        logger.info(
1668            "waiting for traffic to go to all backends (including alternate)"
1669        )
1670        wait_until_all_rpcs_go_to_given_backends(
1671            original_backend_instances + alternate_backend_instances,
1672            _WAIT_FOR_STATS_SEC,
1673        )
1674
1675        # Verify that weights between two services are expected.
1676        retry_count = 10
1677        # Each attempt takes about 10 seconds, 10 retries is equivalent to 100
1678        # seconds timeout.
1679        for i in range(retry_count):
1680            stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
1681            got_instance_count = [
1682                stats.rpcs_by_peer[i] for i in original_backend_instances
1683            ] + [stats.rpcs_by_peer[i] for i in alternate_backend_instances]
1684            total_count = sum(got_instance_count)
1685            got_instance_percentage = [
1686                x * 100.0 / total_count for x in got_instance_count
1687            ]
1688
1689            try:
1690                compare_distributions(
1691                    got_instance_percentage, expected_instance_percentage, 5
1692                )
1693            except Exception as e:
1694                logger.info("attempt %d", i)
1695                logger.info("got percentage: %s", got_instance_percentage)
1696                logger.info(
1697                    "expected percentage: %s", expected_instance_percentage
1698                )
1699                logger.info(e)
1700                if i == retry_count - 1:
1701                    raise Exception(
1702                        "RPC distribution (%s) differs from expected (%s)"
1703                        % (
1704                            got_instance_percentage,
1705                            expected_instance_percentage,
1706                        )
1707                    )
1708            else:
1709                logger.info("success")
1710                break
1711    except Exception:
1712        passed = False
1713        raise
1714    finally:
1715        if passed or not args.halt_after_fail:
1716            patch_url_map_backend_service(gcp, original_backend_service)
1717            patch_backend_service(gcp, alternate_backend_service, [])
1718
1719
1720def test_path_matching(
1721    gcp,
1722    original_backend_service,
1723    instance_group,
1724    alternate_backend_service,
1725    same_zone_instance_group,
1726):
1727    # This test start with all traffic (UnaryCall and EmptyCall) going to
1728    # original_backend_service.
1729    #
1730    # Then it updates URL-map to add routes, to make UnaryCall and EmptyCall to
1731    # go different backends. It waits for all backends in both services to
1732    # receive traffic, then verifies that traffic goes to the expected
1733    # backends.
1734    logger.info("Running test_path_matching")
1735
1736    (
1737        original_backend_instances,
1738        alternate_backend_instances,
1739    ) = prepare_services_for_urlmap_tests(
1740        gcp,
1741        original_backend_service,
1742        instance_group,
1743        alternate_backend_service,
1744        same_zone_instance_group,
1745    )
1746
1747    passed = True
1748    try:
1749        # A list of tuples (route_rules, expected_instances).
1750        test_cases = [
1751            (
1752                [
1753                    {
1754                        "priority": 0,
1755                        # FullPath EmptyCall -> alternate_backend_service.
1756                        "matchRules": [
1757                            {
1758                                "fullPathMatch": (
1759                                    "/grpc.testing.TestService/EmptyCall"
1760                                )
1761                            }
1762                        ],
1763                        "service": alternate_backend_service.url,
1764                    }
1765                ],
1766                {
1767                    "EmptyCall": alternate_backend_instances,
1768                    "UnaryCall": original_backend_instances,
1769                },
1770            ),
1771            (
1772                [
1773                    {
1774                        "priority": 0,
1775                        # Prefix UnaryCall -> alternate_backend_service.
1776                        "matchRules": [
1777                            {"prefixMatch": "/grpc.testing.TestService/Unary"}
1778                        ],
1779                        "service": alternate_backend_service.url,
1780                    }
1781                ],
1782                {
1783                    "UnaryCall": alternate_backend_instances,
1784                    "EmptyCall": original_backend_instances,
1785                },
1786            ),
1787            (
1788                # This test case is similar to the one above (but with route
1789                # services swapped). This test has two routes (full_path and
1790                # the default) to match EmptyCall, and both routes set
1791                # alternative_backend_service as the action. This forces the
1792                # client to handle duplicate Clusters in the RDS response.
1793                [
1794                    {
1795                        "priority": 0,
1796                        # Prefix UnaryCall -> original_backend_service.
1797                        "matchRules": [
1798                            {"prefixMatch": "/grpc.testing.TestService/Unary"}
1799                        ],
1800                        "service": original_backend_service.url,
1801                    },
1802                    {
1803                        "priority": 1,
1804                        # FullPath EmptyCall -> alternate_backend_service.
1805                        "matchRules": [
1806                            {
1807                                "fullPathMatch": (
1808                                    "/grpc.testing.TestService/EmptyCall"
1809                                )
1810                            }
1811                        ],
1812                        "service": alternate_backend_service.url,
1813                    },
1814                ],
1815                {
1816                    "UnaryCall": original_backend_instances,
1817                    "EmptyCall": alternate_backend_instances,
1818                },
1819            ),
1820            (
1821                [
1822                    {
1823                        "priority": 0,
1824                        # Regex UnaryCall -> alternate_backend_service.
1825                        "matchRules": [
1826                            {
1827                                "regexMatch": (  # Unary methods with any services.
1828                                    "^\/.*\/UnaryCall$"
1829                                )
1830                            }
1831                        ],
1832                        "service": alternate_backend_service.url,
1833                    }
1834                ],
1835                {
1836                    "UnaryCall": alternate_backend_instances,
1837                    "EmptyCall": original_backend_instances,
1838                },
1839            ),
1840            (
1841                [
1842                    {
1843                        "priority": 0,
1844                        # ignoreCase EmptyCall -> alternate_backend_service.
1845                        "matchRules": [
1846                            {
1847                                # Case insensitive matching.
1848                                "fullPathMatch": (
1849                                    "/gRpC.tEsTinG.tEstseRvice/empTycaLl"
1850                                ),
1851                                "ignoreCase": True,
1852                            }
1853                        ],
1854                        "service": alternate_backend_service.url,
1855                    }
1856                ],
1857                {
1858                    "UnaryCall": original_backend_instances,
1859                    "EmptyCall": alternate_backend_instances,
1860                },
1861            ),
1862        ]
1863
1864        for route_rules, expected_instances in test_cases:
1865            logger.info("patching url map with %s", route_rules)
1866            patch_url_map_backend_service(
1867                gcp, original_backend_service, route_rules=route_rules
1868            )
1869
1870            # Wait for traffic to go to both services.
1871            logger.info(
1872                "waiting for traffic to go to all backends (including"
1873                " alternate)"
1874            )
1875            wait_until_all_rpcs_go_to_given_backends(
1876                original_backend_instances + alternate_backend_instances,
1877                _WAIT_FOR_STATS_SEC,
1878            )
1879
1880            retry_count = 80
1881            # Each attempt takes about 5 seconds, 80 retries is equivalent to 400
1882            # seconds timeout.
1883            for i in range(retry_count):
1884                stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
1885                if not stats.rpcs_by_method:
1886                    raise ValueError(
1887                        "stats.rpcs_by_method is None, the interop client stats"
1888                        " service does not support this test case"
1889                    )
1890                logger.info("attempt %d", i)
1891                if compare_expected_instances(stats, expected_instances):
1892                    logger.info("success")
1893                    break
1894                elif i == retry_count - 1:
1895                    raise Exception(
1896                        "timeout waiting for RPCs to the expected instances: %s"
1897                        % expected_instances
1898                    )
1899    except Exception:
1900        passed = False
1901        raise
1902    finally:
1903        if passed or not args.halt_after_fail:
1904            patch_url_map_backend_service(gcp, original_backend_service)
1905            patch_backend_service(gcp, alternate_backend_service, [])
1906
1907
1908def test_header_matching(
1909    gcp,
1910    original_backend_service,
1911    instance_group,
1912    alternate_backend_service,
1913    same_zone_instance_group,
1914):
1915    # This test start with all traffic (UnaryCall and EmptyCall) going to
1916    # original_backend_service.
1917    #
1918    # Then it updates URL-map to add routes, to make RPCs with test headers to
1919    # go to different backends. It waits for all backends in both services to
1920    # receive traffic, then verifies that traffic goes to the expected
1921    # backends.
1922    logger.info("Running test_header_matching")
1923
1924    (
1925        original_backend_instances,
1926        alternate_backend_instances,
1927    ) = prepare_services_for_urlmap_tests(
1928        gcp,
1929        original_backend_service,
1930        instance_group,
1931        alternate_backend_service,
1932        same_zone_instance_group,
1933    )
1934
1935    passed = True
1936    try:
1937        # A list of tuples (route_rules, expected_instances).
1938        test_cases = [
1939            (
1940                [
1941                    {
1942                        "priority": 0,
1943                        # Header ExactMatch -> alternate_backend_service.
1944                        # EmptyCall is sent with the metadata.
1945                        "matchRules": [
1946                            {
1947                                "prefixMatch": "/",
1948                                "headerMatches": [
1949                                    {
1950                                        "headerName": _TEST_METADATA_KEY,
1951                                        "exactMatch": _TEST_METADATA_VALUE_EMPTY,
1952                                    }
1953                                ],
1954                            }
1955                        ],
1956                        "service": alternate_backend_service.url,
1957                    }
1958                ],
1959                {
1960                    "EmptyCall": alternate_backend_instances,
1961                    "UnaryCall": original_backend_instances,
1962                },
1963            ),
1964            (
1965                [
1966                    {
1967                        "priority": 0,
1968                        # Header PrefixMatch -> alternate_backend_service.
1969                        # UnaryCall is sent with the metadata.
1970                        "matchRules": [
1971                            {
1972                                "prefixMatch": "/",
1973                                "headerMatches": [
1974                                    {
1975                                        "headerName": _TEST_METADATA_KEY,
1976                                        "prefixMatch": _TEST_METADATA_VALUE_UNARY[
1977                                            :2
1978                                        ],
1979                                    }
1980                                ],
1981                            }
1982                        ],
1983                        "service": alternate_backend_service.url,
1984                    }
1985                ],
1986                {
1987                    "EmptyCall": original_backend_instances,
1988                    "UnaryCall": alternate_backend_instances,
1989                },
1990            ),
1991            (
1992                [
1993                    {
1994                        "priority": 0,
1995                        # Header SuffixMatch -> alternate_backend_service.
1996                        # EmptyCall is sent with the metadata.
1997                        "matchRules": [
1998                            {
1999                                "prefixMatch": "/",
2000                                "headerMatches": [
2001                                    {
2002                                        "headerName": _TEST_METADATA_KEY,
2003                                        "suffixMatch": _TEST_METADATA_VALUE_EMPTY[
2004                                            -2:
2005                                        ],
2006                                    }
2007                                ],
2008                            }
2009                        ],
2010                        "service": alternate_backend_service.url,
2011                    }
2012                ],
2013                {
2014                    "EmptyCall": alternate_backend_instances,
2015                    "UnaryCall": original_backend_instances,
2016                },
2017            ),
2018            (
2019                [
2020                    {
2021                        "priority": 0,
2022                        # Header 'xds_md_numeric' present -> alternate_backend_service.
2023                        # UnaryCall is sent with the metadata, so will be sent to alternative.
2024                        "matchRules": [
2025                            {
2026                                "prefixMatch": "/",
2027                                "headerMatches": [
2028                                    {
2029                                        "headerName": _TEST_METADATA_NUMERIC_KEY,
2030                                        "presentMatch": True,
2031                                    }
2032                                ],
2033                            }
2034                        ],
2035                        "service": alternate_backend_service.url,
2036                    }
2037                ],
2038                {
2039                    "EmptyCall": original_backend_instances,
2040                    "UnaryCall": alternate_backend_instances,
2041                },
2042            ),
2043            (
2044                [
2045                    {
2046                        "priority": 0,
2047                        # Header invert ExactMatch -> alternate_backend_service.
2048                        # UnaryCall is sent with the metadata, so will be sent to
2049                        # original. EmptyCall will be sent to alternative.
2050                        "matchRules": [
2051                            {
2052                                "prefixMatch": "/",
2053                                "headerMatches": [
2054                                    {
2055                                        "headerName": _TEST_METADATA_KEY,
2056                                        "exactMatch": _TEST_METADATA_VALUE_UNARY,
2057                                        "invertMatch": True,
2058                                    }
2059                                ],
2060                            }
2061                        ],
2062                        "service": alternate_backend_service.url,
2063                    }
2064                ],
2065                {
2066                    "EmptyCall": alternate_backend_instances,
2067                    "UnaryCall": original_backend_instances,
2068                },
2069            ),
2070            (
2071                [
2072                    {
2073                        "priority": 0,
2074                        # Header 'xds_md_numeric' range [100,200] -> alternate_backend_service.
2075                        # UnaryCall is sent with the metadata in range.
2076                        "matchRules": [
2077                            {
2078                                "prefixMatch": "/",
2079                                "headerMatches": [
2080                                    {
2081                                        "headerName": _TEST_METADATA_NUMERIC_KEY,
2082                                        "rangeMatch": {
2083                                            "rangeStart": "100",
2084                                            "rangeEnd": "200",
2085                                        },
2086                                    }
2087                                ],
2088                            }
2089                        ],
2090                        "service": alternate_backend_service.url,
2091                    }
2092                ],
2093                {
2094                    "EmptyCall": original_backend_instances,
2095                    "UnaryCall": alternate_backend_instances,
2096                },
2097            ),
2098            (
2099                [
2100                    {
2101                        "priority": 0,
2102                        # Header RegexMatch -> alternate_backend_service.
2103                        # EmptyCall is sent with the metadata.
2104                        "matchRules": [
2105                            {
2106                                "prefixMatch": "/",
2107                                "headerMatches": [
2108                                    {
2109                                        "headerName": _TEST_METADATA_KEY,
2110                                        "regexMatch": "^%s.*%s$"
2111                                        % (
2112                                            _TEST_METADATA_VALUE_EMPTY[:2],
2113                                            _TEST_METADATA_VALUE_EMPTY[-2:],
2114                                        ),
2115                                    }
2116                                ],
2117                            }
2118                        ],
2119                        "service": alternate_backend_service.url,
2120                    }
2121                ],
2122                {
2123                    "EmptyCall": alternate_backend_instances,
2124                    "UnaryCall": original_backend_instances,
2125                },
2126            ),
2127        ]
2128
2129        for route_rules, expected_instances in test_cases:
2130            logger.info(
2131                "patching url map with %s -> alternative",
2132                route_rules[0]["matchRules"],
2133            )
2134            patch_url_map_backend_service(
2135                gcp, original_backend_service, route_rules=route_rules
2136            )
2137
2138            # Wait for traffic to go to both services.
2139            logger.info(
2140                "waiting for traffic to go to all backends (including"
2141                " alternate)"
2142            )
2143            wait_until_all_rpcs_go_to_given_backends(
2144                original_backend_instances + alternate_backend_instances,
2145                _WAIT_FOR_STATS_SEC,
2146            )
2147
2148            retry_count = 80
2149            # Each attempt takes about 5 seconds, 80 retries is equivalent to 400
2150            # seconds timeout.
2151            for i in range(retry_count):
2152                stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
2153                if not stats.rpcs_by_method:
2154                    raise ValueError(
2155                        "stats.rpcs_by_method is None, the interop client stats"
2156                        " service does not support this test case"
2157                    )
2158                logger.info("attempt %d", i)
2159                if compare_expected_instances(stats, expected_instances):
2160                    logger.info("success")
2161                    break
2162                elif i == retry_count - 1:
2163                    raise Exception(
2164                        "timeout waiting for RPCs to the expected instances: %s"
2165                        % expected_instances
2166                    )
2167    except Exception:
2168        passed = False
2169        raise
2170    finally:
2171        if passed or not args.halt_after_fail:
2172            patch_url_map_backend_service(gcp, original_backend_service)
2173            patch_backend_service(gcp, alternate_backend_service, [])
2174
2175
2176def test_circuit_breaking(
2177    gcp, original_backend_service, instance_group, same_zone_instance_group
2178):
2179    """
2180    Since backend service circuit_breakers configuration cannot be unset,
2181    which causes trouble for restoring validate_for_proxy flag in target
2182    proxy/global forwarding rule. This test uses dedicated backend sevices.
2183    The url_map and backend services undergoes the following state changes:
2184
2185    Before test:
2186       original_backend_service -> [instance_group]
2187       extra_backend_service -> []
2188       more_extra_backend_service -> []
2189
2190       url_map -> [original_backend_service]
2191
2192    In test:
2193       extra_backend_service (with circuit_breakers) -> [instance_group]
2194       more_extra_backend_service (with circuit_breakers) -> [same_zone_instance_group]
2195
2196       url_map -> [extra_backend_service, more_extra_backend_service]
2197
2198    After test:
2199       original_backend_service -> [instance_group]
2200       extra_backend_service (with circuit_breakers) -> []
2201       more_extra_backend_service (with circuit_breakers) -> []
2202
2203       url_map -> [original_backend_service]
2204    """
2205    logger.info("Running test_circuit_breaking")
2206    additional_backend_services = []
2207    passed = True
2208    try:
2209        # TODO(chengyuanzhang): Dedicated backend services created for circuit
2210        # breaking test. Once the issue for unsetting backend service circuit
2211        # breakers is resolved or configuring backend service circuit breakers is
2212        # enabled for config validation, these dedicated backend services can be
2213        # eliminated.
2214        extra_backend_service_name = (
2215            _BASE_BACKEND_SERVICE_NAME + "-extra" + gcp_suffix
2216        )
2217        more_extra_backend_service_name = (
2218            _BASE_BACKEND_SERVICE_NAME + "-more-extra" + gcp_suffix
2219        )
2220        extra_backend_service = add_backend_service(
2221            gcp, extra_backend_service_name
2222        )
2223        additional_backend_services.append(extra_backend_service)
2224        more_extra_backend_service = add_backend_service(
2225            gcp, more_extra_backend_service_name
2226        )
2227        additional_backend_services.append(more_extra_backend_service)
2228        # The config validation for proxyless doesn't allow setting
2229        # circuit_breakers. Disable validate validate_for_proxyless
2230        # for this test. This can be removed when validation
2231        # accepts circuit_breakers.
2232        logger.info("disabling validate_for_proxyless in target proxy")
2233        set_validate_for_proxyless(gcp, False)
2234        extra_backend_service_max_requests = 500
2235        more_extra_backend_service_max_requests = 1000
2236        patch_backend_service(
2237            gcp,
2238            extra_backend_service,
2239            [instance_group],
2240            circuit_breakers={
2241                "maxRequests": extra_backend_service_max_requests
2242            },
2243        )
2244        logger.info("Waiting for extra backends to become healthy")
2245        wait_for_healthy_backends(gcp, extra_backend_service, instance_group)
2246        patch_backend_service(
2247            gcp,
2248            more_extra_backend_service,
2249            [same_zone_instance_group],
2250            circuit_breakers={
2251                "maxRequests": more_extra_backend_service_max_requests
2252            },
2253        )
2254        logger.info("Waiting for more extra backend to become healthy")
2255        wait_for_healthy_backends(
2256            gcp, more_extra_backend_service, same_zone_instance_group
2257        )
2258        extra_backend_instances = get_instance_names(gcp, instance_group)
2259        more_extra_backend_instances = get_instance_names(
2260            gcp, same_zone_instance_group
2261        )
2262        route_rules = [
2263            {
2264                "priority": 0,
2265                # UnaryCall -> extra_backend_service
2266                "matchRules": [
2267                    {"fullPathMatch": "/grpc.testing.TestService/UnaryCall"}
2268                ],
2269                "service": extra_backend_service.url,
2270            },
2271            {
2272                "priority": 1,
2273                # EmptyCall -> more_extra_backend_service
2274                "matchRules": [
2275                    {"fullPathMatch": "/grpc.testing.TestService/EmptyCall"}
2276                ],
2277                "service": more_extra_backend_service.url,
2278            },
2279        ]
2280
2281        # Make client send UNARY_CALL and EMPTY_CALL.
2282        configure_client(
2283            [
2284                messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
2285                messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL,
2286            ]
2287        )
2288        logger.info("Patching url map with %s", route_rules)
2289        patch_url_map_backend_service(
2290            gcp, extra_backend_service, route_rules=route_rules
2291        )
2292        logger.info("Waiting for traffic to go to all backends")
2293        wait_until_all_rpcs_go_to_given_backends(
2294            extra_backend_instances + more_extra_backend_instances,
2295            _WAIT_FOR_STATS_SEC,
2296        )
2297
2298        # Make all calls keep-open.
2299        configure_client(
2300            [
2301                messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
2302                messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL,
2303            ],
2304            [
2305                (
2306                    messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
2307                    "rpc-behavior",
2308                    "keep-open",
2309                ),
2310                (
2311                    messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL,
2312                    "rpc-behavior",
2313                    "keep-open",
2314                ),
2315            ],
2316        )
2317        wait_until_rpcs_in_flight(
2318            "UNARY_CALL",
2319            (
2320                _WAIT_FOR_BACKEND_SEC
2321                + int(extra_backend_service_max_requests / args.qps)
2322            ),
2323            extra_backend_service_max_requests,
2324            1,
2325        )
2326        logger.info(
2327            "UNARY_CALL reached stable state (%d)",
2328            extra_backend_service_max_requests,
2329        )
2330        wait_until_rpcs_in_flight(
2331            "EMPTY_CALL",
2332            (
2333                _WAIT_FOR_BACKEND_SEC
2334                + int(more_extra_backend_service_max_requests / args.qps)
2335            ),
2336            more_extra_backend_service_max_requests,
2337            1,
2338        )
2339        logger.info(
2340            "EMPTY_CALL reached stable state (%d)",
2341            more_extra_backend_service_max_requests,
2342        )
2343
2344        # Increment circuit breakers max_requests threshold.
2345        extra_backend_service_max_requests = 800
2346        patch_backend_service(
2347            gcp,
2348            extra_backend_service,
2349            [instance_group],
2350            circuit_breakers={
2351                "maxRequests": extra_backend_service_max_requests
2352            },
2353        )
2354        wait_until_rpcs_in_flight(
2355            "UNARY_CALL",
2356            (
2357                _WAIT_FOR_BACKEND_SEC
2358                + int(extra_backend_service_max_requests / args.qps)
2359            ),
2360            extra_backend_service_max_requests,
2361            1,
2362        )
2363        logger.info(
2364            "UNARY_CALL reached stable state after increase (%d)",
2365            extra_backend_service_max_requests,
2366        )
2367        logger.info("success")
2368        # Avoid new RPCs being outstanding (some test clients create threads
2369        # for sending RPCs) after restoring backend services.
2370        configure_client(
2371            [messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL]
2372        )
2373    except Exception:
2374        passed = False
2375        raise
2376    finally:
2377        if passed or not args.halt_after_fail:
2378            patch_url_map_backend_service(gcp, original_backend_service)
2379            patch_backend_service(
2380                gcp, original_backend_service, [instance_group]
2381            )
2382            for backend_service in additional_backend_services:
2383                delete_backend_service(gcp, backend_service)
2384            set_validate_for_proxyless(gcp, True)
2385
2386
2387def test_timeout(gcp, original_backend_service, instance_group):
2388    logger.info("Running test_timeout")
2389
2390    logger.info("waiting for original backends to become healthy")
2391    wait_for_healthy_backends(gcp, original_backend_service, instance_group)
2392
2393    # UnaryCall -> maxStreamDuration:3s
2394    route_rules = [
2395        {
2396            "priority": 0,
2397            "matchRules": [
2398                {"fullPathMatch": "/grpc.testing.TestService/UnaryCall"}
2399            ],
2400            "service": original_backend_service.url,
2401            "routeAction": {
2402                "maxStreamDuration": {
2403                    "seconds": 3,
2404                },
2405            },
2406        }
2407    ]
2408    patch_url_map_backend_service(
2409        gcp, original_backend_service, route_rules=route_rules
2410    )
2411    # A list of tuples (testcase_name, {client_config}, {expected_results})
2412    test_cases = [
2413        (
2414            (
2415                "timeout_exceeded (UNARY_CALL), timeout_different_route"
2416                " (EMPTY_CALL)"
2417            ),
2418            # UnaryCall and EmptyCall both sleep-4.
2419            # UnaryCall timeouts, EmptyCall succeeds.
2420            {
2421                "rpc_types": [
2422                    messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
2423                    messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL,
2424                ],
2425                "metadata": [
2426                    (
2427                        messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
2428                        "rpc-behavior",
2429                        "sleep-4",
2430                    ),
2431                    (
2432                        messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL,
2433                        "rpc-behavior",
2434                        "sleep-4",
2435                    ),
2436                ],
2437            },
2438            {
2439                "UNARY_CALL": 4,  # DEADLINE_EXCEEDED
2440                "EMPTY_CALL": 0,
2441            },
2442        ),
2443        (
2444            "app_timeout_exceeded",
2445            # UnaryCall only with sleep-2; timeout=1s; calls timeout.
2446            {
2447                "rpc_types": [
2448                    messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
2449                ],
2450                "metadata": [
2451                    (
2452                        messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
2453                        "rpc-behavior",
2454                        "sleep-2",
2455                    ),
2456                ],
2457                "timeout_sec": 1,
2458            },
2459            {
2460                "UNARY_CALL": 4,  # DEADLINE_EXCEEDED
2461            },
2462        ),
2463        (
2464            "timeout_not_exceeded",
2465            # UnaryCall only with no sleep; calls succeed.
2466            {
2467                "rpc_types": [
2468                    messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
2469                ],
2470            },
2471            {
2472                "UNARY_CALL": 0,
2473            },
2474        ),
2475    ]
2476
2477    passed = True
2478    try:
2479        first_case = True
2480        for testcase_name, client_config, expected_results in test_cases:
2481            logger.info("starting case %s", testcase_name)
2482            configure_client(**client_config)
2483            # wait a second to help ensure the client stops sending RPCs with
2484            # the old config.  We will make multiple attempts if it is failing,
2485            # but this improves confidence that the test is valid if the
2486            # previous client_config would lead to the same results.
2487            time.sleep(1)
2488            # Each attempt takes 10 seconds; 20 attempts is equivalent to 200
2489            # second timeout.
2490            attempt_count = 20
2491            if first_case:
2492                attempt_count = 120
2493                first_case = False
2494            before_stats = get_client_accumulated_stats()
2495            if not before_stats.stats_per_method:
2496                raise ValueError(
2497                    "stats.stats_per_method is None, the interop client stats"
2498                    " service does not support this test case"
2499                )
2500            for i in range(attempt_count):
2501                logger.info("%s: attempt %d", testcase_name, i)
2502
2503                test_runtime_secs = 10
2504                time.sleep(test_runtime_secs)
2505                after_stats = get_client_accumulated_stats()
2506
2507                success = True
2508                for rpc, status in list(expected_results.items()):
2509                    qty = (
2510                        after_stats.stats_per_method[rpc].result[status]
2511                        - before_stats.stats_per_method[rpc].result[status]
2512                    )
2513                    want = test_runtime_secs * args.qps
2514                    # Allow 10% deviation from expectation to reduce flakiness
2515                    if qty < (want * 0.9) or qty > (want * 1.1):
2516                        logger.info(
2517                            "%s: failed due to %s[%s]: got %d want ~%d",
2518                            testcase_name,
2519                            rpc,
2520                            status,
2521                            qty,
2522                            want,
2523                        )
2524                        success = False
2525                if success:
2526                    logger.info("success")
2527                    break
2528                logger.info("%s attempt %d failed", testcase_name, i)
2529                before_stats = after_stats
2530            else:
2531                raise Exception(
2532                    "%s: timeout waiting for expected results: %s; got %s"
2533                    % (
2534                        testcase_name,
2535                        expected_results,
2536                        after_stats.stats_per_method,
2537                    )
2538                )
2539    except Exception:
2540        passed = False
2541        raise
2542    finally:
2543        if passed or not args.halt_after_fail:
2544            patch_url_map_backend_service(gcp, original_backend_service)
2545
2546
2547def test_fault_injection(gcp, original_backend_service, instance_group):
2548    logger.info("Running test_fault_injection")
2549
2550    logger.info("waiting for original backends to become healthy")
2551    wait_for_healthy_backends(gcp, original_backend_service, instance_group)
2552
2553    testcase_header = "fi_testcase"
2554
2555    def _route(pri, name, fi_policy):
2556        return {
2557            "priority": pri,
2558            "matchRules": [
2559                {
2560                    "prefixMatch": "/",
2561                    "headerMatches": [
2562                        {
2563                            "headerName": testcase_header,
2564                            "exactMatch": name,
2565                        }
2566                    ],
2567                }
2568            ],
2569            "service": original_backend_service.url,
2570            "routeAction": {"faultInjectionPolicy": fi_policy},
2571        }
2572
2573    def _abort(pct):
2574        return {
2575            "abort": {
2576                "httpStatus": 401,
2577                "percentage": pct,
2578            }
2579        }
2580
2581    def _delay(pct):
2582        return {
2583            "delay": {
2584                "fixedDelay": {"seconds": "20"},
2585                "percentage": pct,
2586            }
2587        }
2588
2589    zero_route = _abort(0)
2590    zero_route.update(_delay(0))
2591    route_rules = [
2592        _route(0, "zero_percent_fault_injection", zero_route),
2593        _route(1, "always_delay", _delay(100)),
2594        _route(2, "always_abort", _abort(100)),
2595        _route(3, "delay_half", _delay(50)),
2596        _route(4, "abort_half", _abort(50)),
2597        {
2598            "priority": 5,
2599            "matchRules": [{"prefixMatch": "/"}],
2600            "service": original_backend_service.url,
2601        },
2602    ]
2603    set_validate_for_proxyless(gcp, False)
2604    patch_url_map_backend_service(
2605        gcp, original_backend_service, route_rules=route_rules
2606    )
2607    # A list of tuples (testcase_name, {client_config}, {code: percent}).  Each
2608    # test case will set the testcase_header with the testcase_name for routing
2609    # to the appropriate config for the case, defined above.
2610    test_cases = [
2611        (
2612            "always_delay",
2613            {"timeout_sec": 2},
2614            {4: 1},  # DEADLINE_EXCEEDED
2615        ),
2616        (
2617            "always_abort",
2618            {},
2619            {16: 1},  # UNAUTHENTICATED
2620        ),
2621        (
2622            "delay_half",
2623            {"timeout_sec": 2},
2624            {4: 0.5, 0: 0.5},  # DEADLINE_EXCEEDED / OK: 50% / 50%
2625        ),
2626        (
2627            "abort_half",
2628            {},
2629            {16: 0.5, 0: 0.5},  # UNAUTHENTICATED / OK: 50% / 50%
2630        ),
2631        (
2632            "zero_percent_fault_injection",
2633            {},
2634            {0: 1},  # OK
2635        ),
2636        (
2637            "non_matching_fault_injection",  # Not in route_rules, above.
2638            {},
2639            {0: 1},  # OK
2640        ),
2641    ]
2642
2643    passed = True
2644    try:
2645        first_case = True
2646        for testcase_name, client_config, expected_results in test_cases:
2647            logger.info("starting case %s", testcase_name)
2648
2649            client_config["metadata"] = [
2650                (
2651                    messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
2652                    testcase_header,
2653                    testcase_name,
2654                )
2655            ]
2656            client_config["rpc_types"] = [
2657                messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
2658            ]
2659            configure_client(**client_config)
2660            # wait a second to help ensure the client stops sending RPCs with
2661            # the old config.  We will make multiple attempts if it is failing,
2662            # but this improves confidence that the test is valid if the
2663            # previous client_config would lead to the same results.
2664            time.sleep(1)
2665            # Each attempt takes 10 seconds
2666            if first_case:
2667                # Give the first test case 600s for xDS config propagation.
2668                attempt_count = 60
2669                first_case = False
2670            else:
2671                # The accumulated stats might include previous sub-test, running
2672                # the test multiple times to deflake
2673                attempt_count = 10
2674            before_stats = get_client_accumulated_stats()
2675            if not before_stats.stats_per_method:
2676                raise ValueError(
2677                    "stats.stats_per_method is None, the interop client stats"
2678                    " service does not support this test case"
2679                )
2680            for i in range(attempt_count):
2681                logger.info("%s: attempt %d", testcase_name, i)
2682
2683                test_runtime_secs = 10
2684                time.sleep(test_runtime_secs)
2685                after_stats = get_client_accumulated_stats()
2686
2687                success = True
2688                for status, pct in list(expected_results.items()):
2689                    rpc = "UNARY_CALL"
2690                    qty = (
2691                        after_stats.stats_per_method[rpc].result[status]
2692                        - before_stats.stats_per_method[rpc].result[status]
2693                    )
2694                    want = pct * args.qps * test_runtime_secs
2695                    # Allow 10% deviation from expectation to reduce flakiness
2696                    VARIANCE_ALLOWED = 0.1
2697                    if abs(qty - want) > want * VARIANCE_ALLOWED:
2698                        logger.info(
2699                            "%s: failed due to %s[%s]: got %d want ~%d",
2700                            testcase_name,
2701                            rpc,
2702                            status,
2703                            qty,
2704                            want,
2705                        )
2706                        success = False
2707                if success:
2708                    logger.info("success")
2709                    break
2710                logger.info("%s attempt %d failed", testcase_name, i)
2711                before_stats = after_stats
2712            else:
2713                raise Exception(
2714                    "%s: timeout waiting for expected results: %s; got %s"
2715                    % (
2716                        testcase_name,
2717                        expected_results,
2718                        after_stats.stats_per_method,
2719                    )
2720                )
2721    except Exception:
2722        passed = False
2723        raise
2724    finally:
2725        if passed or not args.halt_after_fail:
2726            patch_url_map_backend_service(gcp, original_backend_service)
2727            set_validate_for_proxyless(gcp, True)
2728
2729
2730def test_csds(gcp, original_backend_service, instance_group, server_uri):
2731    test_csds_timeout_s = datetime.timedelta(minutes=5).total_seconds()
2732    sleep_interval_between_attempts_s = datetime.timedelta(
2733        seconds=2
2734    ).total_seconds()
2735    logger.info("Running test_csds")
2736
2737    logger.info("waiting for original backends to become healthy")
2738    wait_for_healthy_backends(gcp, original_backend_service, instance_group)
2739
2740    # Test case timeout: 5 minutes
2741    deadline = time.time() + test_csds_timeout_s
2742    cnt = 0
2743    while time.time() <= deadline:
2744        client_config = get_client_xds_config_dump()
2745        logger.info(
2746            "test_csds attempt %d: received xDS config %s",
2747            cnt,
2748            json.dumps(client_config, indent=2),
2749        )
2750        if client_config is not None:
2751            # Got the xDS config dump, now validate it
2752            ok = True
2753            try:
2754                if client_config["node"]["locality"]["zone"] != args.zone:
2755                    logger.info(
2756                        "Invalid zone %s != %s",
2757                        client_config["node"]["locality"]["zone"],
2758                        args.zone,
2759                    )
2760                    ok = False
2761                seen = set()
2762                for xds_config in client_config.get("xds_config", []):
2763                    if "listener_config" in xds_config:
2764                        listener_name = xds_config["listener_config"][
2765                            "dynamic_listeners"
2766                        ][0]["active_state"]["listener"]["name"]
2767                        if listener_name != server_uri:
2768                            logger.info(
2769                                "Invalid Listener name %s != %s",
2770                                listener_name,
2771                                server_uri,
2772                            )
2773                            ok = False
2774                        else:
2775                            seen.add("lds")
2776                    elif "route_config" in xds_config:
2777                        num_vh = len(
2778                            xds_config["route_config"]["dynamic_route_configs"][
2779                                0
2780                            ]["route_config"]["virtual_hosts"]
2781                        )
2782                        if num_vh <= 0:
2783                            logger.info(
2784                                "Invalid number of VirtualHosts %s", num_vh
2785                            )
2786                            ok = False
2787                        else:
2788                            seen.add("rds")
2789                    elif "cluster_config" in xds_config:
2790                        cluster_type = xds_config["cluster_config"][
2791                            "dynamic_active_clusters"
2792                        ][0]["cluster"]["type"]
2793                        if cluster_type != "EDS":
2794                            logger.info(
2795                                "Invalid cluster type %s != EDS", cluster_type
2796                            )
2797                            ok = False
2798                        else:
2799                            seen.add("cds")
2800                    elif "endpoint_config" in xds_config:
2801                        sub_zone = xds_config["endpoint_config"][
2802                            "dynamic_endpoint_configs"
2803                        ][0]["endpoint_config"]["endpoints"][0]["locality"][
2804                            "sub_zone"
2805                        ]
2806                        if args.zone not in sub_zone:
2807                            logger.info(
2808                                "Invalid endpoint sub_zone %s", sub_zone
2809                            )
2810                            ok = False
2811                        else:
2812                            seen.add("eds")
2813                for generic_xds_config in client_config.get(
2814                    "generic_xds_configs", []
2815                ):
2816                    if re.search(
2817                        r"\.Listener$", generic_xds_config["type_url"]
2818                    ):
2819                        seen.add("lds")
2820                        listener = generic_xds_config["xds_config"]
2821                        if listener["name"] != server_uri:
2822                            logger.info(
2823                                "Invalid Listener name %s != %s",
2824                                listener_name,
2825                                server_uri,
2826                            )
2827                            ok = False
2828                    elif re.search(
2829                        r"\.RouteConfiguration$", generic_xds_config["type_url"]
2830                    ):
2831                        seen.add("rds")
2832                        route_config = generic_xds_config["xds_config"]
2833                        if not len(route_config["virtual_hosts"]):
2834                            logger.info(
2835                                "Invalid number of VirtualHosts %s", num_vh
2836                            )
2837                            ok = False
2838                    elif re.search(
2839                        r"\.Cluster$", generic_xds_config["type_url"]
2840                    ):
2841                        seen.add("cds")
2842                        cluster = generic_xds_config["xds_config"]
2843                        if cluster["type"] != "EDS":
2844                            logger.info(
2845                                "Invalid cluster type %s != EDS", cluster_type
2846                            )
2847                            ok = False
2848                    elif re.search(
2849                        r"\.ClusterLoadAssignment$",
2850                        generic_xds_config["type_url"],
2851                    ):
2852                        seen.add("eds")
2853                        endpoint = generic_xds_config["xds_config"]
2854                        if (
2855                            args.zone
2856                            not in endpoint["endpoints"][0]["locality"][
2857                                "sub_zone"
2858                            ]
2859                        ):
2860                            logger.info(
2861                                "Invalid endpoint sub_zone %s", sub_zone
2862                            )
2863                            ok = False
2864                want = {"lds", "rds", "cds", "eds"}
2865                if seen != want:
2866                    logger.info("Incomplete xDS config dump, seen=%s", seen)
2867                    ok = False
2868            except:
2869                logger.exception("Error in xDS config dump:")
2870                ok = False
2871            finally:
2872                if ok:
2873                    # Successfully fetched xDS config, and they looks good.
2874                    logger.info("success")
2875                    return
2876        logger.info("test_csds attempt %d failed", cnt)
2877        # Give the client some time to fetch xDS resources
2878        time.sleep(sleep_interval_between_attempts_s)
2879        cnt += 1
2880
2881    raise RuntimeError(
2882        "failed to receive a valid xDS config in %s seconds"
2883        % test_csds_timeout_s
2884    )
2885
2886
2887def set_validate_for_proxyless(gcp, validate_for_proxyless):
2888    if not gcp.alpha_compute:
2889        logger.debug(
2890            "Not setting validateForProxy because alpha is not enabled"
2891        )
2892        return
2893    if (
2894        len(gcp.global_forwarding_rules) != 1
2895        or len(gcp.target_proxies) != 1
2896        or len(gcp.url_maps) != 1
2897    ):
2898        logger.debug(
2899            "Global forwarding rule, target proxy or url map not found."
2900        )
2901        return
2902    # This function deletes global_forwarding_rule and target_proxy, then
2903    # recreate target_proxy with validateForProxyless=False. This is necessary
2904    # because patching target_grpc_proxy isn't supported.
2905    delete_global_forwarding_rule(gcp, gcp.global_forwarding_rules[0])
2906    delete_target_proxy(gcp, gcp.target_proxies[0])
2907    create_target_proxy(gcp, target_proxy_name, validate_for_proxyless)
2908    create_global_forwarding_rule(gcp, forwarding_rule_name, [gcp.service_port])
2909
2910
2911def get_serving_status(instance, service_port):
2912    with grpc.insecure_channel("%s:%d" % (instance, service_port)) as channel:
2913        health_stub = health_pb2_grpc.HealthStub(channel)
2914        return health_stub.Check(health_pb2.HealthCheckRequest())
2915
2916
2917def set_serving_status(instances, service_port, serving):
2918    logger.info("setting %s serving status to %s", instances, serving)
2919    for instance in instances:
2920        with grpc.insecure_channel(
2921            "%s:%d" % (instance, service_port)
2922        ) as channel:
2923            logger.info("setting %s serving status to %s", instance, serving)
2924            stub = test_pb2_grpc.XdsUpdateHealthServiceStub(channel)
2925            retry_count = 5
2926            for i in range(5):
2927                if serving:
2928                    stub.SetServing(empty_pb2.Empty())
2929                else:
2930                    stub.SetNotServing(empty_pb2.Empty())
2931                serving_status = get_serving_status(instance, service_port)
2932                logger.info("got instance service status %s", serving_status)
2933                want_status = (
2934                    health_pb2.HealthCheckResponse.SERVING
2935                    if serving
2936                    else health_pb2.HealthCheckResponse.NOT_SERVING
2937                )
2938                if serving_status.status == want_status:
2939                    break
2940                if i == retry_count - 1:
2941                    raise Exception(
2942                        "failed to set instance service status after %d retries"
2943                        % retry_count
2944                    )
2945
2946
2947def is_primary_instance_group(gcp, instance_group):
2948    # Clients may connect to a TD instance in a different region than the
2949    # client, in which case primary/secondary assignments may not be based on
2950    # the client's actual locality.
2951    instance_names = get_instance_names(gcp, instance_group)
2952    stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
2953    return all(
2954        peer in instance_names for peer in list(stats.rpcs_by_peer.keys())
2955    )
2956
2957
2958def get_startup_script(path_to_server_binary, service_port):
2959    if path_to_server_binary:
2960        return "nohup %s --port=%d 1>/dev/null &" % (
2961            path_to_server_binary,
2962            service_port,
2963        )
2964    else:
2965        return (
2966            """#!/bin/bash
2967sudo apt update
2968sudo apt install -y git default-jdk
2969mkdir java_server
2970pushd java_server
2971git clone https://github.com/grpc/grpc-java.git
2972pushd grpc-java
2973pushd interop-testing
2974../gradlew installDist -x test -PskipCodegen=true -PskipAndroid=true
2975
2976nohup build/install/grpc-interop-testing/bin/xds-test-server \
2977    --port=%d 1>/dev/null &"""
2978            % service_port
2979        )
2980
2981
2982def create_instance_template(
2983    gcp, name, network, source_image, machine_type, startup_script
2984):
2985    config = {
2986        "name": name,
2987        "properties": {
2988            "tags": {"items": ["allow-health-checks"]},
2989            "machineType": machine_type,
2990            "serviceAccounts": [
2991                {
2992                    "email": "default",
2993                    "scopes": [
2994                        "https://www.googleapis.com/auth/cloud-platform",
2995                    ],
2996                }
2997            ],
2998            "networkInterfaces": [
2999                {
3000                    "accessConfigs": [{"type": "ONE_TO_ONE_NAT"}],
3001                    "network": network,
3002                }
3003            ],
3004            "disks": [
3005                {
3006                    "boot": True,
3007                    "initializeParams": {"sourceImage": source_image},
3008                    "autoDelete": True,
3009                }
3010            ],
3011            "metadata": {
3012                "items": [{"key": "startup-script", "value": startup_script}]
3013            },
3014        },
3015    }
3016
3017    logger.debug("Sending GCP request with body=%s", config)
3018    result = (
3019        gcp.compute.instanceTemplates()
3020        .insert(project=gcp.project, body=config)
3021        .execute(num_retries=_GCP_API_RETRIES)
3022    )
3023    wait_for_global_operation(gcp, result["name"])
3024    gcp.instance_template = GcpResource(config["name"], result["targetLink"])
3025
3026
3027def add_instance_group(gcp, zone, name, size):
3028    config = {
3029        "name": name,
3030        "instanceTemplate": gcp.instance_template.url,
3031        "targetSize": size,
3032        "namedPorts": [{"name": "grpc", "port": gcp.service_port}],
3033    }
3034
3035    logger.debug("Sending GCP request with body=%s", config)
3036    result = (
3037        gcp.compute.instanceGroupManagers()
3038        .insert(project=gcp.project, zone=zone, body=config)
3039        .execute(num_retries=_GCP_API_RETRIES)
3040    )
3041    wait_for_zone_operation(gcp, zone, result["name"])
3042    result = (
3043        gcp.compute.instanceGroupManagers()
3044        .get(
3045            project=gcp.project, zone=zone, instanceGroupManager=config["name"]
3046        )
3047        .execute(num_retries=_GCP_API_RETRIES)
3048    )
3049    instance_group = InstanceGroup(
3050        config["name"], result["instanceGroup"], zone
3051    )
3052    gcp.instance_groups.append(instance_group)
3053    wait_for_instance_group_to_reach_expected_size(
3054        gcp, instance_group, size, _WAIT_FOR_OPERATION_SEC
3055    )
3056    return instance_group
3057
3058
3059def create_health_check(gcp, name):
3060    if gcp.alpha_compute:
3061        config = {
3062            "name": name,
3063            "type": "GRPC",
3064            "grpcHealthCheck": {"portSpecification": "USE_SERVING_PORT"},
3065        }
3066        compute_to_use = gcp.alpha_compute
3067    else:
3068        config = {
3069            "name": name,
3070            "type": "TCP",
3071            "tcpHealthCheck": {"portName": "grpc"},
3072        }
3073        compute_to_use = gcp.compute
3074    logger.debug("Sending GCP request with body=%s", config)
3075    result = (
3076        compute_to_use.healthChecks()
3077        .insert(project=gcp.project, body=config)
3078        .execute(num_retries=_GCP_API_RETRIES)
3079    )
3080    wait_for_global_operation(gcp, result["name"])
3081    gcp.health_check = GcpResource(config["name"], result["targetLink"])
3082
3083
3084def create_health_check_firewall_rule(gcp, name):
3085    config = {
3086        "name": name,
3087        "direction": "INGRESS",
3088        "allowed": [{"IPProtocol": "tcp"}],
3089        "sourceRanges": ["35.191.0.0/16", "130.211.0.0/22"],
3090        "targetTags": ["allow-health-checks"],
3091    }
3092    logger.debug("Sending GCP request with body=%s", config)
3093    result = (
3094        gcp.compute.firewalls()
3095        .insert(project=gcp.project, body=config)
3096        .execute(num_retries=_GCP_API_RETRIES)
3097    )
3098    wait_for_global_operation(gcp, result["name"])
3099    gcp.health_check_firewall_rule = GcpResource(
3100        config["name"], result["targetLink"]
3101    )
3102
3103
3104def add_backend_service(gcp, name):
3105    if gcp.alpha_compute:
3106        protocol = "GRPC"
3107        compute_to_use = gcp.alpha_compute
3108    else:
3109        protocol = "HTTP2"
3110        compute_to_use = gcp.compute
3111    config = {
3112        "name": name,
3113        "loadBalancingScheme": "INTERNAL_SELF_MANAGED",
3114        "healthChecks": [gcp.health_check.url],
3115        "portName": "grpc",
3116        "protocol": protocol,
3117    }
3118    logger.debug("Sending GCP request with body=%s", config)
3119    result = (
3120        compute_to_use.backendServices()
3121        .insert(project=gcp.project, body=config)
3122        .execute(num_retries=_GCP_API_RETRIES)
3123    )
3124    wait_for_global_operation(gcp, result["name"])
3125    backend_service = GcpResource(config["name"], result["targetLink"])
3126    gcp.backend_services.append(backend_service)
3127    return backend_service
3128
3129
3130def create_url_map(gcp, name, backend_service, host_name):
3131    config = {
3132        "name": name,
3133        "defaultService": backend_service.url,
3134        "pathMatchers": [
3135            {
3136                "name": _PATH_MATCHER_NAME,
3137                "defaultService": backend_service.url,
3138            }
3139        ],
3140        "hostRules": [
3141            {"hosts": [host_name], "pathMatcher": _PATH_MATCHER_NAME}
3142        ],
3143    }
3144    logger.debug("Sending GCP request with body=%s", config)
3145    result = (
3146        gcp.compute.urlMaps()
3147        .insert(project=gcp.project, body=config)
3148        .execute(num_retries=_GCP_API_RETRIES)
3149    )
3150    wait_for_global_operation(gcp, result["name"])
3151    url_map = GcpResource(config["name"], result["targetLink"])
3152    gcp.url_maps.append(url_map)
3153    return url_map
3154
3155
3156def patch_url_map_host_rule_with_port(gcp, name, backend_service, host_name):
3157    config = {
3158        "hostRules": [
3159            {
3160                "hosts": ["%s:%d" % (host_name, gcp.service_port)],
3161                "pathMatcher": _PATH_MATCHER_NAME,
3162            }
3163        ]
3164    }
3165    logger.debug("Sending GCP request with body=%s", config)
3166    result = (
3167        gcp.compute.urlMaps()
3168        .patch(project=gcp.project, urlMap=name, body=config)
3169        .execute(num_retries=_GCP_API_RETRIES)
3170    )
3171    wait_for_global_operation(gcp, result["name"])
3172
3173
3174def create_target_proxy(gcp, name, validate_for_proxyless=True, url_map=None):
3175    if url_map:
3176        arg_url_map_url = url_map.url
3177    else:
3178        arg_url_map_url = gcp.url_maps[0].url
3179    if gcp.alpha_compute:
3180        config = {
3181            "name": name,
3182            "url_map": arg_url_map_url,
3183            "validate_for_proxyless": validate_for_proxyless,
3184        }
3185        logger.debug("Sending GCP request with body=%s", config)
3186        result = (
3187            gcp.alpha_compute.targetGrpcProxies()
3188            .insert(project=gcp.project, body=config)
3189            .execute(num_retries=_GCP_API_RETRIES)
3190        )
3191    else:
3192        config = {
3193            "name": name,
3194            "url_map": arg_url_map_url,
3195        }
3196        logger.debug("Sending GCP request with body=%s", config)
3197        result = (
3198            gcp.compute.targetHttpProxies()
3199            .insert(project=gcp.project, body=config)
3200            .execute(num_retries=_GCP_API_RETRIES)
3201        )
3202    wait_for_global_operation(gcp, result["name"])
3203    target_proxy = GcpResource(config["name"], result["targetLink"])
3204    gcp.target_proxies.append(target_proxy)
3205    return target_proxy
3206
3207
3208def create_global_forwarding_rule(
3209    gcp,
3210    name,
3211    potential_ports,
3212    potential_ip_addresses=["0.0.0.0"],
3213    target_proxy=None,
3214):
3215    if target_proxy:
3216        arg_target_proxy_url = target_proxy.url
3217    else:
3218        arg_target_proxy_url = gcp.target_proxies[0].url
3219    if gcp.alpha_compute:
3220        compute_to_use = gcp.alpha_compute
3221    else:
3222        compute_to_use = gcp.compute
3223    for port in potential_ports:
3224        for ip_address in potential_ip_addresses:
3225            try:
3226                config = {
3227                    "name": name,
3228                    "loadBalancingScheme": "INTERNAL_SELF_MANAGED",
3229                    "portRange": str(port),
3230                    "IPAddress": ip_address,
3231                    "network": args.network,
3232                    "target": arg_target_proxy_url,
3233                }
3234                logger.debug("Sending GCP request with body=%s", config)
3235                result = (
3236                    compute_to_use.globalForwardingRules()
3237                    .insert(project=gcp.project, body=config)
3238                    .execute(num_retries=_GCP_API_RETRIES)
3239                )
3240                wait_for_global_operation(gcp, result["name"])
3241                global_forwarding_rule = GcpResource(
3242                    config["name"], result["targetLink"]
3243                )
3244                gcp.global_forwarding_rules.append(global_forwarding_rule)
3245                gcp.service_port = port
3246                return
3247            except googleapiclient.errors.HttpError as http_error:
3248                logger.warning(
3249                    "Got error %s when attempting to create forwarding rule to "
3250                    "%s:%d. Retrying with another port."
3251                    % (http_error, ip_address, port)
3252                )
3253
3254
3255def get_health_check(gcp, health_check_name):
3256    try:
3257        result = (
3258            gcp.compute.healthChecks()
3259            .get(project=gcp.project, healthCheck=health_check_name)
3260            .execute()
3261        )
3262        gcp.health_check = GcpResource(health_check_name, result["selfLink"])
3263    except Exception as e:
3264        gcp.errors.append(e)
3265        gcp.health_check = GcpResource(health_check_name, None)
3266
3267
3268def get_health_check_firewall_rule(gcp, firewall_name):
3269    try:
3270        result = (
3271            gcp.compute.firewalls()
3272            .get(project=gcp.project, firewall=firewall_name)
3273            .execute()
3274        )
3275        gcp.health_check_firewall_rule = GcpResource(
3276            firewall_name, result["selfLink"]
3277        )
3278    except Exception as e:
3279        gcp.errors.append(e)
3280        gcp.health_check_firewall_rule = GcpResource(firewall_name, None)
3281
3282
3283def get_backend_service(gcp, backend_service_name, record_error=True):
3284    try:
3285        result = (
3286            gcp.compute.backendServices()
3287            .get(project=gcp.project, backendService=backend_service_name)
3288            .execute()
3289        )
3290        backend_service = GcpResource(backend_service_name, result["selfLink"])
3291    except Exception as e:
3292        if record_error:
3293            gcp.errors.append(e)
3294        backend_service = GcpResource(backend_service_name, None)
3295    gcp.backend_services.append(backend_service)
3296    return backend_service
3297
3298
3299def get_url_map(gcp, url_map_name, record_error=True):
3300    try:
3301        result = (
3302            gcp.compute.urlMaps()
3303            .get(project=gcp.project, urlMap=url_map_name)
3304            .execute()
3305        )
3306        url_map = GcpResource(url_map_name, result["selfLink"])
3307        gcp.url_maps.append(url_map)
3308    except Exception as e:
3309        if record_error:
3310            gcp.errors.append(e)
3311
3312
3313def get_target_proxy(gcp, target_proxy_name, record_error=True):
3314    try:
3315        if gcp.alpha_compute:
3316            result = (
3317                gcp.alpha_compute.targetGrpcProxies()
3318                .get(project=gcp.project, targetGrpcProxy=target_proxy_name)
3319                .execute()
3320            )
3321        else:
3322            result = (
3323                gcp.compute.targetHttpProxies()
3324                .get(project=gcp.project, targetHttpProxy=target_proxy_name)
3325                .execute()
3326            )
3327        target_proxy = GcpResource(target_proxy_name, result["selfLink"])
3328        gcp.target_proxies.append(target_proxy)
3329    except Exception as e:
3330        if record_error:
3331            gcp.errors.append(e)
3332
3333
3334def get_global_forwarding_rule(gcp, forwarding_rule_name, record_error=True):
3335    try:
3336        result = (
3337            gcp.compute.globalForwardingRules()
3338            .get(project=gcp.project, forwardingRule=forwarding_rule_name)
3339            .execute()
3340        )
3341        global_forwarding_rule = GcpResource(
3342            forwarding_rule_name, result["selfLink"]
3343        )
3344        gcp.global_forwarding_rules.append(global_forwarding_rule)
3345    except Exception as e:
3346        if record_error:
3347            gcp.errors.append(e)
3348
3349
3350def get_instance_template(gcp, template_name):
3351    try:
3352        result = (
3353            gcp.compute.instanceTemplates()
3354            .get(project=gcp.project, instanceTemplate=template_name)
3355            .execute()
3356        )
3357        gcp.instance_template = GcpResource(template_name, result["selfLink"])
3358    except Exception as e:
3359        gcp.errors.append(e)
3360        gcp.instance_template = GcpResource(template_name, None)
3361
3362
3363def get_instance_group(gcp, zone, instance_group_name):
3364    try:
3365        result = (
3366            gcp.compute.instanceGroups()
3367            .get(
3368                project=gcp.project,
3369                zone=zone,
3370                instanceGroup=instance_group_name,
3371            )
3372            .execute()
3373        )
3374        gcp.service_port = result["namedPorts"][0]["port"]
3375        instance_group = InstanceGroup(
3376            instance_group_name, result["selfLink"], zone
3377        )
3378    except Exception as e:
3379        gcp.errors.append(e)
3380        instance_group = InstanceGroup(instance_group_name, None, zone)
3381    gcp.instance_groups.append(instance_group)
3382    return instance_group
3383
3384
3385def delete_global_forwarding_rule(gcp, forwarding_rule_to_delete=None):
3386    if not forwarding_rule_to_delete:
3387        return
3388    try:
3389        logger.debug(
3390            "Deleting forwarding rule %s", forwarding_rule_to_delete.name
3391        )
3392        result = (
3393            gcp.compute.globalForwardingRules()
3394            .delete(
3395                project=gcp.project,
3396                forwardingRule=forwarding_rule_to_delete.name,
3397            )
3398            .execute(num_retries=_GCP_API_RETRIES)
3399        )
3400        wait_for_global_operation(gcp, result["name"])
3401        if forwarding_rule_to_delete in gcp.global_forwarding_rules:
3402            gcp.global_forwarding_rules.remove(forwarding_rule_to_delete)
3403        else:
3404            logger.debug(
3405                (
3406                    "Forwarding rule %s does not exist in"
3407                    " gcp.global_forwarding_rules"
3408                ),
3409                forwarding_rule_to_delete.name,
3410            )
3411    except googleapiclient.errors.HttpError as http_error:
3412        logger.info("Delete failed: %s", http_error)
3413
3414
3415def delete_global_forwarding_rules(gcp):
3416    forwarding_rules_to_delete = gcp.global_forwarding_rules.copy()
3417    for forwarding_rule in forwarding_rules_to_delete:
3418        delete_global_forwarding_rule(gcp, forwarding_rule)
3419
3420
3421def delete_target_proxy(gcp, proxy_to_delete=None):
3422    if not proxy_to_delete:
3423        return
3424    try:
3425        if gcp.alpha_compute:
3426            logger.debug("Deleting grpc proxy %s", proxy_to_delete.name)
3427            result = (
3428                gcp.alpha_compute.targetGrpcProxies()
3429                .delete(
3430                    project=gcp.project, targetGrpcProxy=proxy_to_delete.name
3431                )
3432                .execute(num_retries=_GCP_API_RETRIES)
3433            )
3434        else:
3435            logger.debug("Deleting http proxy %s", proxy_to_delete.name)
3436            result = (
3437                gcp.compute.targetHttpProxies()
3438                .delete(
3439                    project=gcp.project, targetHttpProxy=proxy_to_delete.name
3440                )
3441                .execute(num_retries=_GCP_API_RETRIES)
3442            )
3443        wait_for_global_operation(gcp, result["name"])
3444        if proxy_to_delete in gcp.target_proxies:
3445            gcp.target_proxies.remove(proxy_to_delete)
3446        else:
3447            logger.debug(
3448                "Gcp proxy %s does not exist in gcp.target_proxies",
3449                proxy_to_delete.name,
3450            )
3451    except googleapiclient.errors.HttpError as http_error:
3452        logger.info("Delete failed: %s", http_error)
3453
3454
3455def delete_target_proxies(gcp):
3456    target_proxies_to_delete = gcp.target_proxies.copy()
3457    for target_proxy in target_proxies_to_delete:
3458        delete_target_proxy(gcp, target_proxy)
3459
3460
3461def delete_url_map(gcp, url_map_to_delete=None):
3462    if not url_map_to_delete:
3463        return
3464    try:
3465        logger.debug("Deleting url map %s", url_map_to_delete.name)
3466        result = (
3467            gcp.compute.urlMaps()
3468            .delete(project=gcp.project, urlMap=url_map_to_delete.name)
3469            .execute(num_retries=_GCP_API_RETRIES)
3470        )
3471        wait_for_global_operation(gcp, result["name"])
3472        if url_map_to_delete in gcp.url_maps:
3473            gcp.url_maps.remove(url_map_to_delete)
3474        else:
3475            logger.debug(
3476                "Url map %s does not exist in gcp.url_maps",
3477                url_map_to_delete.name,
3478            )
3479    except googleapiclient.errors.HttpError as http_error:
3480        logger.info("Delete failed: %s", http_error)
3481
3482
3483def delete_url_maps(gcp):
3484    url_maps_to_delete = gcp.url_maps.copy()
3485    for url_map in url_maps_to_delete:
3486        delete_url_map(gcp, url_map)
3487
3488
3489def delete_backend_service(gcp, backend_service):
3490    try:
3491        logger.debug("Deleting backend service %s", backend_service.name)
3492        result = (
3493            gcp.compute.backendServices()
3494            .delete(project=gcp.project, backendService=backend_service.name)
3495            .execute(num_retries=_GCP_API_RETRIES)
3496        )
3497        wait_for_global_operation(gcp, result["name"])
3498    except googleapiclient.errors.HttpError as http_error:
3499        logger.info("Delete failed: %s", http_error)
3500
3501
3502def delete_backend_services(gcp):
3503    for backend_service in gcp.backend_services:
3504        delete_backend_service(gcp, backend_service)
3505
3506
3507def delete_firewall(gcp):
3508    try:
3509        logger.debug(
3510            "Deleting firewall %s", gcp.health_check_firewall_rule.name
3511        )
3512        result = (
3513            gcp.compute.firewalls()
3514            .delete(
3515                project=gcp.project,
3516                firewall=gcp.health_check_firewall_rule.name,
3517            )
3518            .execute(num_retries=_GCP_API_RETRIES)
3519        )
3520        wait_for_global_operation(gcp, result["name"])
3521    except googleapiclient.errors.HttpError as http_error:
3522        logger.info("Delete failed: %s", http_error)
3523
3524
3525def delete_health_check(gcp):
3526    try:
3527        logger.debug("Deleting health check %s", gcp.health_check.name)
3528        result = (
3529            gcp.compute.healthChecks()
3530            .delete(project=gcp.project, healthCheck=gcp.health_check.name)
3531            .execute(num_retries=_GCP_API_RETRIES)
3532        )
3533        wait_for_global_operation(gcp, result["name"])
3534    except googleapiclient.errors.HttpError as http_error:
3535        logger.info("Delete failed: %s", http_error)
3536
3537
3538def delete_instance_groups(gcp):
3539    for instance_group in gcp.instance_groups:
3540        try:
3541            logger.debug(
3542                "Deleting instance group %s %s",
3543                instance_group.name,
3544                instance_group.zone,
3545            )
3546            result = (
3547                gcp.compute.instanceGroupManagers()
3548                .delete(
3549                    project=gcp.project,
3550                    zone=instance_group.zone,
3551                    instanceGroupManager=instance_group.name,
3552                )
3553                .execute(num_retries=_GCP_API_RETRIES)
3554            )
3555            wait_for_zone_operation(
3556                gcp,
3557                instance_group.zone,
3558                result["name"],
3559                timeout_sec=_WAIT_FOR_BACKEND_SEC,
3560            )
3561        except googleapiclient.errors.HttpError as http_error:
3562            logger.info("Delete failed: %s", http_error)
3563
3564
3565def delete_instance_template(gcp):
3566    try:
3567        logger.debug(
3568            "Deleting instance template %s", gcp.instance_template.name
3569        )
3570        result = (
3571            gcp.compute.instanceTemplates()
3572            .delete(
3573                project=gcp.project, instanceTemplate=gcp.instance_template.name
3574            )
3575            .execute(num_retries=_GCP_API_RETRIES)
3576        )
3577        wait_for_global_operation(gcp, result["name"])
3578    except googleapiclient.errors.HttpError as http_error:
3579        logger.info("Delete failed: %s", http_error)
3580
3581
3582def patch_backend_service(
3583    gcp,
3584    backend_service,
3585    instance_groups,
3586    balancing_mode="UTILIZATION",
3587    max_rate=1,
3588    circuit_breakers=None,
3589):
3590    if gcp.alpha_compute:
3591        compute_to_use = gcp.alpha_compute
3592    else:
3593        compute_to_use = gcp.compute
3594    config = {
3595        "backends": [
3596            {
3597                "group": instance_group.url,
3598                "balancingMode": balancing_mode,
3599                "maxRate": max_rate if balancing_mode == "RATE" else None,
3600            }
3601            for instance_group in instance_groups
3602        ],
3603        "circuitBreakers": circuit_breakers,
3604    }
3605    logger.debug("Sending GCP request with body=%s", config)
3606    result = (
3607        compute_to_use.backendServices()
3608        .patch(
3609            project=gcp.project,
3610            backendService=backend_service.name,
3611            body=config,
3612        )
3613        .execute(num_retries=_GCP_API_RETRIES)
3614    )
3615    wait_for_global_operation(
3616        gcp, result["name"], timeout_sec=_WAIT_FOR_BACKEND_SEC
3617    )
3618
3619
3620def resize_instance_group(
3621    gcp, instance_group, new_size, timeout_sec=_WAIT_FOR_OPERATION_SEC
3622):
3623    result = (
3624        gcp.compute.instanceGroupManagers()
3625        .resize(
3626            project=gcp.project,
3627            zone=instance_group.zone,
3628            instanceGroupManager=instance_group.name,
3629            size=new_size,
3630        )
3631        .execute(num_retries=_GCP_API_RETRIES)
3632    )
3633    wait_for_zone_operation(
3634        gcp, instance_group.zone, result["name"], timeout_sec=360
3635    )
3636    wait_for_instance_group_to_reach_expected_size(
3637        gcp, instance_group, new_size, timeout_sec
3638    )
3639
3640
3641def patch_url_map_backend_service(
3642    gcp,
3643    backend_service=None,
3644    services_with_weights=None,
3645    route_rules=None,
3646    url_map=None,
3647):
3648    if url_map:
3649        url_map_name = url_map.name
3650    else:
3651        url_map_name = gcp.url_maps[0].name
3652    """change url_map's backend service
3653
3654    Only one of backend_service and service_with_weights can be not None.
3655    """
3656    if gcp.alpha_compute:
3657        compute_to_use = gcp.alpha_compute
3658    else:
3659        compute_to_use = gcp.compute
3660
3661    if backend_service and services_with_weights:
3662        raise ValueError(
3663            "both backend_service and service_with_weights are not None."
3664        )
3665
3666    default_service = backend_service.url if backend_service else None
3667    default_route_action = (
3668        {
3669            "weightedBackendServices": [
3670                {
3671                    "backendService": service.url,
3672                    "weight": w,
3673                }
3674                for service, w in list(services_with_weights.items())
3675            ]
3676        }
3677        if services_with_weights
3678        else None
3679    )
3680
3681    config = {
3682        "pathMatchers": [
3683            {
3684                "name": _PATH_MATCHER_NAME,
3685                "defaultService": default_service,
3686                "defaultRouteAction": default_route_action,
3687                "routeRules": route_rules,
3688            }
3689        ]
3690    }
3691    logger.debug("Sending GCP request with body=%s", config)
3692    result = (
3693        compute_to_use.urlMaps()
3694        .patch(project=gcp.project, urlMap=url_map_name, body=config)
3695        .execute(num_retries=_GCP_API_RETRIES)
3696    )
3697    wait_for_global_operation(gcp, result["name"])
3698
3699
3700def wait_for_instance_group_to_reach_expected_size(
3701    gcp, instance_group, expected_size, timeout_sec
3702):
3703    start_time = time.time()
3704    while True:
3705        current_size = len(get_instance_names(gcp, instance_group))
3706        if current_size == expected_size:
3707            break
3708        if time.time() - start_time > timeout_sec:
3709            raise Exception(
3710                "Instance group had expected size %d but actual size %d"
3711                % (expected_size, current_size)
3712            )
3713        time.sleep(2)
3714
3715
3716def wait_for_global_operation(
3717    gcp, operation, timeout_sec=_WAIT_FOR_OPERATION_SEC
3718):
3719    start_time = time.time()
3720    while time.time() - start_time <= timeout_sec:
3721        result = (
3722            gcp.compute.globalOperations()
3723            .get(project=gcp.project, operation=operation)
3724            .execute(num_retries=_GCP_API_RETRIES)
3725        )
3726        if result["status"] == "DONE":
3727            if "error" in result:
3728                raise Exception(result["error"])
3729            return
3730        time.sleep(2)
3731    raise Exception(
3732        "Operation %s did not complete within %d" % (operation, timeout_sec)
3733    )
3734
3735
3736def wait_for_zone_operation(
3737    gcp, zone, operation, timeout_sec=_WAIT_FOR_OPERATION_SEC
3738):
3739    start_time = time.time()
3740    while time.time() - start_time <= timeout_sec:
3741        result = (
3742            gcp.compute.zoneOperations()
3743            .get(project=gcp.project, zone=zone, operation=operation)
3744            .execute(num_retries=_GCP_API_RETRIES)
3745        )
3746        if result["status"] == "DONE":
3747            if "error" in result:
3748                raise Exception(result["error"])
3749            return
3750        time.sleep(2)
3751    raise Exception(
3752        "Operation %s did not complete within %d" % (operation, timeout_sec)
3753    )
3754
3755
3756def wait_for_healthy_backends(
3757    gcp, backend_service, instance_group, timeout_sec=_WAIT_FOR_BACKEND_SEC
3758):
3759    start_time = time.time()
3760    config = {"group": instance_group.url}
3761    instance_names = get_instance_names(gcp, instance_group)
3762    expected_size = len(instance_names)
3763    while time.time() - start_time <= timeout_sec:
3764        for instance_name in instance_names:
3765            try:
3766                status = get_serving_status(instance_name, gcp.service_port)
3767                logger.info(
3768                    "serving status response from %s: %s", instance_name, status
3769                )
3770            except grpc.RpcError as rpc_error:
3771                logger.info(
3772                    "checking serving status of %s failed: %s",
3773                    instance_name,
3774                    rpc_error,
3775                )
3776        result = (
3777            gcp.compute.backendServices()
3778            .getHealth(
3779                project=gcp.project,
3780                backendService=backend_service.name,
3781                body=config,
3782            )
3783            .execute(num_retries=_GCP_API_RETRIES)
3784        )
3785        if "healthStatus" in result:
3786            logger.info("received GCP healthStatus: %s", result["healthStatus"])
3787            healthy = True
3788            for instance in result["healthStatus"]:
3789                if instance["healthState"] != "HEALTHY":
3790                    healthy = False
3791                    break
3792            if healthy and expected_size == len(result["healthStatus"]):
3793                return
3794        else:
3795            logger.info("no healthStatus received from GCP")
3796        time.sleep(5)
3797    raise Exception(
3798        "Not all backends became healthy within %d seconds: %s"
3799        % (timeout_sec, result)
3800    )
3801
3802
3803def get_instance_names(gcp, instance_group):
3804    instance_names = []
3805    result = (
3806        gcp.compute.instanceGroups()
3807        .listInstances(
3808            project=gcp.project,
3809            zone=instance_group.zone,
3810            instanceGroup=instance_group.name,
3811            body={"instanceState": "ALL"},
3812        )
3813        .execute(num_retries=_GCP_API_RETRIES)
3814    )
3815    if "items" not in result:
3816        return []
3817    for item in result["items"]:
3818        # listInstances() returns the full URL of the instance, which ends with
3819        # the instance name. compute.instances().get() requires using the
3820        # instance name (not the full URL) to look up instance details, so we
3821        # just extract the name manually.
3822        instance_name = item["instance"].split("/")[-1]
3823        instance_names.append(instance_name)
3824    logger.info("retrieved instance names: %s", instance_names)
3825    return instance_names
3826
3827
3828def clean_up(gcp):
3829    delete_global_forwarding_rules(gcp)
3830    delete_target_proxies(gcp)
3831    delete_url_maps(gcp)
3832    delete_backend_services(gcp)
3833    if gcp.health_check_firewall_rule:
3834        delete_firewall(gcp)
3835    if gcp.health_check:
3836        delete_health_check(gcp)
3837    delete_instance_groups(gcp)
3838    if gcp.instance_template:
3839        delete_instance_template(gcp)
3840
3841
3842class InstanceGroup(object):
3843    def __init__(self, name, url, zone):
3844        self.name = name
3845        self.url = url
3846        self.zone = zone
3847
3848
3849class GcpResource(object):
3850    def __init__(self, name, url):
3851        self.name = name
3852        self.url = url
3853
3854
3855class GcpState(object):
3856    def __init__(self, compute, alpha_compute, project, project_num):
3857        self.compute = compute
3858        self.alpha_compute = alpha_compute
3859        self.project = project
3860        self.project_num = project_num
3861        self.health_check = None
3862        self.health_check_firewall_rule = None
3863        self.backend_services = []
3864        self.url_maps = []
3865        self.target_proxies = []
3866        self.global_forwarding_rules = []
3867        self.service_port = None
3868        self.instance_template = None
3869        self.instance_groups = []
3870        self.errors = []
3871
3872
3873logging.debug(
3874    "script start time: %s",
3875    datetime.datetime.now(datetime.timezone.utc)
3876    .astimezone()
3877    .strftime("%Y-%m-%dT%H:%M:%S %Z"),
3878)
3879logging.debug(
3880    "logging local timezone: %s",
3881    datetime.datetime.now(datetime.timezone.utc).astimezone().tzinfo,
3882)
3883alpha_compute = None
3884if args.compute_discovery_document:
3885    with open(args.compute_discovery_document, "r") as discovery_doc:
3886        compute = googleapiclient.discovery.build_from_document(
3887            discovery_doc.read()
3888        )
3889    if not args.only_stable_gcp_apis and args.alpha_compute_discovery_document:
3890        with open(args.alpha_compute_discovery_document, "r") as discovery_doc:
3891            alpha_compute = googleapiclient.discovery.build_from_document(
3892                discovery_doc.read()
3893            )
3894else:
3895    compute = googleapiclient.discovery.build("compute", "v1")
3896    if not args.only_stable_gcp_apis:
3897        alpha_compute = googleapiclient.discovery.build("compute", "alpha")
3898
3899test_results = {}
3900failed_tests = []
3901try:
3902    gcp = GcpState(compute, alpha_compute, args.project_id, args.project_num)
3903    gcp_suffix = args.gcp_suffix
3904    health_check_name = _BASE_HEALTH_CHECK_NAME + gcp_suffix
3905    if not args.use_existing_gcp_resources:
3906        if args.keep_gcp_resources:
3907            # Auto-generating a unique suffix in case of conflict should not be
3908            # combined with --keep_gcp_resources, as the suffix actually used
3909            # for GCP resources will not match the provided --gcp_suffix value.
3910            num_attempts = 1
3911        else:
3912            num_attempts = 5
3913        for i in range(num_attempts):
3914            try:
3915                logger.info("Using GCP suffix %s", gcp_suffix)
3916                create_health_check(gcp, health_check_name)
3917                break
3918            except googleapiclient.errors.HttpError as http_error:
3919                gcp_suffix = "%s-%04d" % (gcp_suffix, random.randint(0, 9999))
3920                health_check_name = _BASE_HEALTH_CHECK_NAME + gcp_suffix
3921                logger.exception("HttpError when creating health check")
3922        if gcp.health_check is None:
3923            raise Exception(
3924                "Failed to create health check name after %d attempts"
3925                % num_attempts
3926            )
3927    firewall_name = _BASE_FIREWALL_RULE_NAME + gcp_suffix
3928    backend_service_name = _BASE_BACKEND_SERVICE_NAME + gcp_suffix
3929    alternate_backend_service_name = (
3930        _BASE_BACKEND_SERVICE_NAME + "-alternate" + gcp_suffix
3931    )
3932    extra_backend_service_name = (
3933        _BASE_BACKEND_SERVICE_NAME + "-extra" + gcp_suffix
3934    )
3935    more_extra_backend_service_name = (
3936        _BASE_BACKEND_SERVICE_NAME + "-more-extra" + gcp_suffix
3937    )
3938    url_map_name = _BASE_URL_MAP_NAME + gcp_suffix
3939    url_map_name_2 = url_map_name + "2"
3940    service_host_name = _BASE_SERVICE_HOST + gcp_suffix
3941    target_proxy_name = _BASE_TARGET_PROXY_NAME + gcp_suffix
3942    target_proxy_name_2 = target_proxy_name + "2"
3943    forwarding_rule_name = _BASE_FORWARDING_RULE_NAME + gcp_suffix
3944    forwarding_rule_name_2 = forwarding_rule_name + "2"
3945    template_name = _BASE_TEMPLATE_NAME + gcp_suffix
3946    instance_group_name = _BASE_INSTANCE_GROUP_NAME + gcp_suffix
3947    same_zone_instance_group_name = (
3948        _BASE_INSTANCE_GROUP_NAME + "-same-zone" + gcp_suffix
3949    )
3950    secondary_zone_instance_group_name = (
3951        _BASE_INSTANCE_GROUP_NAME + "-secondary-zone" + gcp_suffix
3952    )
3953    potential_service_ports = list(args.service_port_range)
3954    random.shuffle(potential_service_ports)
3955    if args.use_existing_gcp_resources:
3956        logger.info("Reusing existing GCP resources")
3957        get_health_check(gcp, health_check_name)
3958        get_health_check_firewall_rule(gcp, firewall_name)
3959        backend_service = get_backend_service(gcp, backend_service_name)
3960        alternate_backend_service = get_backend_service(
3961            gcp, alternate_backend_service_name
3962        )
3963        extra_backend_service = get_backend_service(
3964            gcp, extra_backend_service_name, record_error=False
3965        )
3966        more_extra_backend_service = get_backend_service(
3967            gcp, more_extra_backend_service_name, record_error=False
3968        )
3969        get_url_map(gcp, url_map_name)
3970        get_target_proxy(gcp, target_proxy_name)
3971        get_global_forwarding_rule(gcp, forwarding_rule_name)
3972        get_url_map(gcp, url_map_name_2, record_error=False)
3973        get_target_proxy(gcp, target_proxy_name_2, record_error=False)
3974        get_global_forwarding_rule(
3975            gcp, forwarding_rule_name_2, record_error=False
3976        )
3977        get_instance_template(gcp, template_name)
3978        instance_group = get_instance_group(gcp, args.zone, instance_group_name)
3979        same_zone_instance_group = get_instance_group(
3980            gcp, args.zone, same_zone_instance_group_name
3981        )
3982        secondary_zone_instance_group = get_instance_group(
3983            gcp, args.secondary_zone, secondary_zone_instance_group_name
3984        )
3985        if gcp.errors:
3986            raise Exception(gcp.errors)
3987    else:
3988        create_health_check_firewall_rule(gcp, firewall_name)
3989        backend_service = add_backend_service(gcp, backend_service_name)
3990        alternate_backend_service = add_backend_service(
3991            gcp, alternate_backend_service_name
3992        )
3993        create_url_map(gcp, url_map_name, backend_service, service_host_name)
3994        create_target_proxy(gcp, target_proxy_name)
3995        create_global_forwarding_rule(
3996            gcp, forwarding_rule_name, potential_service_ports
3997        )
3998        if not gcp.service_port:
3999            raise Exception(
4000                "Failed to find a valid ip:port for the forwarding rule"
4001            )
4002        if gcp.service_port != _DEFAULT_SERVICE_PORT:
4003            patch_url_map_host_rule_with_port(
4004                gcp, url_map_name, backend_service, service_host_name
4005            )
4006        startup_script = get_startup_script(
4007            args.path_to_server_binary, gcp.service_port
4008        )
4009        create_instance_template(
4010            gcp,
4011            template_name,
4012            args.network,
4013            args.source_image,
4014            args.machine_type,
4015            startup_script,
4016        )
4017        instance_group = add_instance_group(
4018            gcp, args.zone, instance_group_name, _INSTANCE_GROUP_SIZE
4019        )
4020        patch_backend_service(gcp, backend_service, [instance_group])
4021        same_zone_instance_group = add_instance_group(
4022            gcp, args.zone, same_zone_instance_group_name, _INSTANCE_GROUP_SIZE
4023        )
4024        secondary_zone_instance_group = add_instance_group(
4025            gcp,
4026            args.secondary_zone,
4027            secondary_zone_instance_group_name,
4028            _INSTANCE_GROUP_SIZE,
4029        )
4030
4031    wait_for_healthy_backends(gcp, backend_service, instance_group)
4032
4033    if args.test_case:
4034        client_env = dict(os.environ)
4035        if original_grpc_trace:
4036            client_env["GRPC_TRACE"] = original_grpc_trace
4037        if original_grpc_verbosity:
4038            client_env["GRPC_VERBOSITY"] = original_grpc_verbosity
4039        bootstrap_server_features = []
4040
4041        if gcp.service_port == _DEFAULT_SERVICE_PORT:
4042            server_uri = service_host_name
4043        else:
4044            server_uri = service_host_name + ":" + str(gcp.service_port)
4045        if args.xds_v3_support:
4046            client_env["GRPC_XDS_EXPERIMENTAL_V3_SUPPORT"] = "true"
4047            bootstrap_server_features.append("xds_v3")
4048        if args.bootstrap_file:
4049            bootstrap_path = os.path.abspath(args.bootstrap_file)
4050        else:
4051            with tempfile.NamedTemporaryFile(delete=False) as bootstrap_file:
4052                bootstrap_file.write(
4053                    _BOOTSTRAP_TEMPLATE.format(
4054                        node_id="projects/%s/networks/%s/nodes/%s"
4055                        % (
4056                            gcp.project_num,
4057                            args.network.split("/")[-1],
4058                            uuid.uuid1(),
4059                        ),
4060                        server_features=json.dumps(bootstrap_server_features),
4061                    ).encode("utf-8")
4062                )
4063                bootstrap_path = bootstrap_file.name
4064        client_env["GRPC_XDS_BOOTSTRAP"] = bootstrap_path
4065        client_env["GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"] = "true"
4066        client_env["GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"] = "true"
4067        client_env["GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION"] = "true"
4068        for test_case in args.test_case:
4069            if test_case in _V3_TEST_CASES and not args.xds_v3_support:
4070                logger.info(
4071                    "skipping test %s due to missing v3 support", test_case
4072                )
4073                continue
4074            if test_case in _ALPHA_TEST_CASES and not gcp.alpha_compute:
4075                logger.info(
4076                    "skipping test %s due to missing alpha support", test_case
4077                )
4078                continue
4079            if (
4080                test_case
4081                in [
4082                    "api_listener",
4083                    "forwarding_rule_port_match",
4084                    "forwarding_rule_default_port",
4085                ]
4086                and CLIENT_HOSTS
4087            ):
4088                logger.info(
4089                    (
4090                        "skipping test %s because test configuration is"
4091                        "not compatible with client processes on existing"
4092                        "client hosts"
4093                    ),
4094                    test_case,
4095                )
4096                continue
4097            if test_case == "forwarding_rule_default_port":
4098                server_uri = service_host_name
4099            result = jobset.JobResult()
4100            log_dir = os.path.join(_TEST_LOG_BASE_DIR, test_case)
4101            if not os.path.exists(log_dir):
4102                os.makedirs(log_dir)
4103            test_log_filename = os.path.join(log_dir, _SPONGE_LOG_NAME)
4104            test_log_file = open(test_log_filename, "w+")
4105            client_process = None
4106
4107            if test_case in _TESTS_TO_RUN_MULTIPLE_RPCS:
4108                rpcs_to_send = '--rpc="UnaryCall,EmptyCall"'
4109            else:
4110                rpcs_to_send = '--rpc="UnaryCall"'
4111
4112            if test_case in _TESTS_TO_SEND_METADATA:
4113                metadata_to_send = '--metadata="EmptyCall:{keyE}:{valueE},UnaryCall:{keyU}:{valueU},UnaryCall:{keyNU}:{valueNU}"'.format(
4114                    keyE=_TEST_METADATA_KEY,
4115                    valueE=_TEST_METADATA_VALUE_EMPTY,
4116                    keyU=_TEST_METADATA_KEY,
4117                    valueU=_TEST_METADATA_VALUE_UNARY,
4118                    keyNU=_TEST_METADATA_NUMERIC_KEY,
4119                    valueNU=_TEST_METADATA_NUMERIC_VALUE,
4120                )
4121            else:
4122                # Setting the arg explicitly to empty with '--metadata=""'
4123                # makes C# client fail
4124                # (see https://github.com/commandlineparser/commandline/issues/412),
4125                # so instead we just rely on clients using the default when
4126                # metadata arg is not specified.
4127                metadata_to_send = ""
4128
4129            # TODO(ericgribkoff) Temporarily disable fail_on_failed_rpc checks
4130            # in the client. This means we will ignore intermittent RPC
4131            # failures (but this framework still checks that the final result
4132            # is as expected).
4133            #
4134            # Reason for disabling this is, the resources are shared by
4135            # multiple tests, and a change in previous test could be delayed
4136            # until the second test starts. The second test may see
4137            # intermittent failures because of that.
4138            #
4139            # A fix is to not share resources between tests (though that does
4140            # mean the tests will be significantly slower due to creating new
4141            # resources).
4142            fail_on_failed_rpc = ""
4143
4144            try:
4145                if not CLIENT_HOSTS:
4146                    client_cmd_formatted = args.client_cmd.format(
4147                        server_uri=server_uri,
4148                        stats_port=args.stats_port,
4149                        qps=args.qps,
4150                        fail_on_failed_rpc=fail_on_failed_rpc,
4151                        rpcs_to_send=rpcs_to_send,
4152                        metadata_to_send=metadata_to_send,
4153                    )
4154                    logger.debug("running client: %s", client_cmd_formatted)
4155                    client_cmd = shlex.split(client_cmd_formatted)
4156                    client_process = subprocess.Popen(
4157                        client_cmd,
4158                        env=client_env,
4159                        stderr=subprocess.STDOUT,
4160                        stdout=test_log_file,
4161                    )
4162                if test_case == "backends_restart":
4163                    test_backends_restart(gcp, backend_service, instance_group)
4164                elif test_case == "change_backend_service":
4165                    test_change_backend_service(
4166                        gcp,
4167                        backend_service,
4168                        instance_group,
4169                        alternate_backend_service,
4170                        same_zone_instance_group,
4171                    )
4172                elif test_case == "gentle_failover":
4173                    test_gentle_failover(
4174                        gcp,
4175                        backend_service,
4176                        instance_group,
4177                        secondary_zone_instance_group,
4178                    )
4179                elif test_case == "load_report_based_failover":
4180                    test_load_report_based_failover(
4181                        gcp,
4182                        backend_service,
4183                        instance_group,
4184                        secondary_zone_instance_group,
4185                    )
4186                elif test_case == "ping_pong":
4187                    test_ping_pong(gcp, backend_service, instance_group)
4188                elif test_case == "remove_instance_group":
4189                    test_remove_instance_group(
4190                        gcp,
4191                        backend_service,
4192                        instance_group,
4193                        same_zone_instance_group,
4194                    )
4195                elif test_case == "round_robin":
4196                    test_round_robin(gcp, backend_service, instance_group)
4197                elif (
4198                    test_case
4199                    == "secondary_locality_gets_no_requests_on_partial_primary_failure"
4200                ):
4201                    test_secondary_locality_gets_no_requests_on_partial_primary_failure(
4202                        gcp,
4203                        backend_service,
4204                        instance_group,
4205                        secondary_zone_instance_group,
4206                    )
4207                elif (
4208                    test_case
4209                    == "secondary_locality_gets_requests_on_primary_failure"
4210                ):
4211                    test_secondary_locality_gets_requests_on_primary_failure(
4212                        gcp,
4213                        backend_service,
4214                        instance_group,
4215                        secondary_zone_instance_group,
4216                    )
4217                elif test_case == "traffic_splitting":
4218                    test_traffic_splitting(
4219                        gcp,
4220                        backend_service,
4221                        instance_group,
4222                        alternate_backend_service,
4223                        same_zone_instance_group,
4224                    )
4225                elif test_case == "path_matching":
4226                    test_path_matching(
4227                        gcp,
4228                        backend_service,
4229                        instance_group,
4230                        alternate_backend_service,
4231                        same_zone_instance_group,
4232                    )
4233                elif test_case == "header_matching":
4234                    test_header_matching(
4235                        gcp,
4236                        backend_service,
4237                        instance_group,
4238                        alternate_backend_service,
4239                        same_zone_instance_group,
4240                    )
4241                elif test_case == "circuit_breaking":
4242                    test_circuit_breaking(
4243                        gcp,
4244                        backend_service,
4245                        instance_group,
4246                        same_zone_instance_group,
4247                    )
4248                elif test_case == "timeout":
4249                    test_timeout(gcp, backend_service, instance_group)
4250                elif test_case == "fault_injection":
4251                    test_fault_injection(gcp, backend_service, instance_group)
4252                elif test_case == "api_listener":
4253                    server_uri = test_api_listener(
4254                        gcp,
4255                        backend_service,
4256                        instance_group,
4257                        alternate_backend_service,
4258                    )
4259                elif test_case == "forwarding_rule_port_match":
4260                    server_uri = test_forwarding_rule_port_match(
4261                        gcp, backend_service, instance_group
4262                    )
4263                elif test_case == "forwarding_rule_default_port":
4264                    server_uri = test_forwarding_rule_default_port(
4265                        gcp, backend_service, instance_group
4266                    )
4267                elif test_case == "metadata_filter":
4268                    test_metadata_filter(
4269                        gcp,
4270                        backend_service,
4271                        instance_group,
4272                        alternate_backend_service,
4273                        same_zone_instance_group,
4274                    )
4275                elif test_case == "csds":
4276                    test_csds(gcp, backend_service, instance_group, server_uri)
4277                else:
4278                    logger.error("Unknown test case: %s", test_case)
4279                    sys.exit(1)
4280                if client_process and client_process.poll() is not None:
4281                    raise Exception(
4282                        "Client process exited prematurely with exit code %d"
4283                        % client_process.returncode
4284                    )
4285                result.state = "PASSED"
4286                result.returncode = 0
4287            except Exception as e:
4288                logger.exception("Test case %s failed", test_case)
4289                failed_tests.append(test_case)
4290                result.state = "FAILED"
4291                result.message = str(e)
4292                if args.halt_after_fail:
4293                    # Stop the test suite if one case failed.
4294                    raise
4295            finally:
4296                if client_process:
4297                    if client_process.returncode:
4298                        logger.info(
4299                            "Client exited with code %d"
4300                            % client_process.returncode
4301                        )
4302                    else:
4303                        client_process.terminate()
4304                test_log_file.close()
4305                # Workaround for Python 3, as report_utils will invoke decode() on
4306                # result.message, which has a default value of ''.
4307                result.message = result.message.encode("UTF-8")
4308                test_results[test_case] = [result]
4309                if args.log_client_output:
4310                    logger.info("Client output:")
4311                    with open(test_log_filename, "r") as client_output:
4312                        logger.info(client_output.read())
4313        if not os.path.exists(_TEST_LOG_BASE_DIR):
4314            os.makedirs(_TEST_LOG_BASE_DIR)
4315        report_utils.render_junit_xml_report(
4316            test_results,
4317            os.path.join(_TEST_LOG_BASE_DIR, _SPONGE_XML_NAME),
4318            suite_name="xds_tests",
4319            multi_target=True,
4320        )
4321        if failed_tests:
4322            logger.error("Test case(s) %s failed", failed_tests)
4323            sys.exit(1)
4324finally:
4325    keep_resources = args.keep_gcp_resources
4326    if args.halt_after_fail and failed_tests:
4327        logger.info(
4328            "Halt after fail triggered, exiting without cleaning up resources"
4329        )
4330        keep_resources = True
4331    if not keep_resources:
4332        logger.info("Cleaning up GCP resources. This may take some time.")
4333        clean_up(gcp)
4334