1# Copyright 2021 The gRPC Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import collections 16from typing import List 17 18from absl import flags 19from absl import logging 20from absl.testing import absltest 21from google.protobuf import json_format 22 23from framework import xds_k8s_testcase 24from framework import xds_url_map_testcase 25from framework.helpers import skips 26 27flags.adopt_module_key_flags(xds_k8s_testcase) 28 29# Type aliases 30_XdsTestServer = xds_k8s_testcase.XdsTestServer 31_XdsTestClient = xds_k8s_testcase.XdsTestClient 32 33_SUBSET_SIZE = 4 34_NUM_BACKENDS = 8 35_NUM_CLIENTS = 3 36 37 38class SubsettingTest(xds_k8s_testcase.RegularXdsKubernetesTestCase): 39 40 @staticmethod 41 def is_supported(config: skips.TestConfig) -> bool: 42 # Subsetting is an experimental feature where most work is done on the 43 # server-side. We limit it to only run on master branch to save 44 # resources. 45 return config.version_gte('master') 46 47 def test_subsetting_basic(self) -> None: 48 with self.subTest('00_create_health_check'): 49 self.td.create_health_check() 50 51 with self.subTest('01_create_backend_services'): 52 self.td.create_backend_service(subset_size=_SUBSET_SIZE) 53 54 with self.subTest('02_create_url_map'): 55 self.td.create_url_map(self.server_xds_host, self.server_xds_port) 56 57 with self.subTest('03_create_target_proxy'): 58 self.td.create_target_proxy() 59 60 with self.subTest('04_create_forwarding_rule'): 61 self.td.create_forwarding_rule(self.server_xds_port) 62 63 test_servers: List[_XdsTestServer] 64 with self.subTest('05_start_test_servers'): 65 test_servers = self.startTestServers(replica_count=_NUM_BACKENDS) 66 67 with self.subTest('06_add_server_backends_to_backend_services'): 68 self.setupServerBackends() 69 70 rpc_distribution = collections.defaultdict(int) 71 with self.subTest('07_start_test_client'): 72 for i in range(_NUM_CLIENTS): 73 # Clean created client pods if there is any. 74 if self.client_runner.time_start_requested: 75 # TODO(sergiitk): Speed up by reusing the namespace. 76 self.client_runner.cleanup() 77 78 # Create a test client 79 test_client: _XdsTestClient = self.startTestClient( 80 test_servers[0]) 81 # Validate the number of received endpoints 82 config = test_client.csds.fetch_client_status( 83 log_level=logging.INFO) 84 self.assertIsNotNone(config) 85 json_config = json_format.MessageToDict(config) 86 parsed = xds_url_map_testcase.DumpedXdsConfig(json_config) 87 logging.info('Client %d received endpoints (len=%s): %s', i, 88 len(parsed.endpoints), parsed.endpoints) 89 self.assertLen(parsed.endpoints, _SUBSET_SIZE) 90 # Record RPC stats 91 lb_stats = self.getClientRpcStats(test_client, 92 _NUM_BACKENDS * 25) 93 for key, value in lb_stats.rpcs_by_peer.items(): 94 rpc_distribution[key] += value 95 96 with self.subTest('08_log_rpc_distribution'): 97 server_entries = sorted(rpc_distribution.items(), 98 key=lambda x: -x[1]) 99 # Validate if clients are receiving different sets of backends (3 100 # client received a total of 4 unique backends == FAIL, a total of 5 101 # unique backends == PASS) 102 self.assertGreater(len(server_entries), _SUBSET_SIZE) 103 logging.info('RPC distribution (len=%s): %s', len(server_entries), 104 server_entries) 105 peak = server_entries[0][1] 106 mean = sum(map(lambda x: x[1], 107 server_entries)) / len(server_entries) 108 logging.info('Peak=%d Mean=%.1f Peak-to-Mean-Ratio=%.2f', peak, 109 mean, peak / mean) 110 111 112if __name__ == '__main__': 113 absltest.main(failfast=True) 114