1# Copyright 2021 The gRPC Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import collections
16from typing import List
17
18from absl import flags
19from absl import logging
20from absl.testing import absltest
21from google.protobuf import json_format
22
23from framework import xds_k8s_testcase
24from framework import xds_url_map_testcase
25from framework.helpers import skips
26
27flags.adopt_module_key_flags(xds_k8s_testcase)
28
29# Type aliases
30_XdsTestServer = xds_k8s_testcase.XdsTestServer
31_XdsTestClient = xds_k8s_testcase.XdsTestClient
32
33_SUBSET_SIZE = 4
34_NUM_BACKENDS = 8
35_NUM_CLIENTS = 3
36
37
38class SubsettingTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
39
40    @staticmethod
41    def is_supported(config: skips.TestConfig) -> bool:
42        # Subsetting is an experimental feature where most work is done on the
43        # server-side. We limit it to only run on master branch to save
44        # resources.
45        return config.version_gte('master')
46
47    def test_subsetting_basic(self) -> None:
48        with self.subTest('00_create_health_check'):
49            self.td.create_health_check()
50
51        with self.subTest('01_create_backend_services'):
52            self.td.create_backend_service(subset_size=_SUBSET_SIZE)
53
54        with self.subTest('02_create_url_map'):
55            self.td.create_url_map(self.server_xds_host, self.server_xds_port)
56
57        with self.subTest('03_create_target_proxy'):
58            self.td.create_target_proxy()
59
60        with self.subTest('04_create_forwarding_rule'):
61            self.td.create_forwarding_rule(self.server_xds_port)
62
63        test_servers: List[_XdsTestServer]
64        with self.subTest('05_start_test_servers'):
65            test_servers = self.startTestServers(replica_count=_NUM_BACKENDS)
66
67        with self.subTest('06_add_server_backends_to_backend_services'):
68            self.setupServerBackends()
69
70        rpc_distribution = collections.defaultdict(int)
71        with self.subTest('07_start_test_client'):
72            for i in range(_NUM_CLIENTS):
73                # Clean created client pods if there is any.
74                if self.client_runner.time_start_requested:
75                    # TODO(sergiitk): Speed up by reusing the namespace.
76                    self.client_runner.cleanup()
77
78                # Create a test client
79                test_client: _XdsTestClient = self.startTestClient(
80                    test_servers[0])
81                # Validate the number of received endpoints
82                config = test_client.csds.fetch_client_status(
83                    log_level=logging.INFO)
84                self.assertIsNotNone(config)
85                json_config = json_format.MessageToDict(config)
86                parsed = xds_url_map_testcase.DumpedXdsConfig(json_config)
87                logging.info('Client %d received endpoints (len=%s): %s', i,
88                             len(parsed.endpoints), parsed.endpoints)
89                self.assertLen(parsed.endpoints, _SUBSET_SIZE)
90                # Record RPC stats
91                lb_stats = self.getClientRpcStats(test_client,
92                                                  _NUM_BACKENDS * 25)
93                for key, value in lb_stats.rpcs_by_peer.items():
94                    rpc_distribution[key] += value
95
96        with self.subTest('08_log_rpc_distribution'):
97            server_entries = sorted(rpc_distribution.items(),
98                                    key=lambda x: -x[1])
99            # Validate if clients are receiving different sets of backends (3
100            # client received a total of 4 unique backends == FAIL, a total of 5
101            # unique backends == PASS)
102            self.assertGreater(len(server_entries), _SUBSET_SIZE)
103            logging.info('RPC distribution (len=%s): %s', len(server_entries),
104                         server_entries)
105            peak = server_entries[0][1]
106            mean = sum(map(lambda x: x[1],
107                           server_entries)) / len(server_entries)
108            logging.info('Peak=%d Mean=%.1f Peak-to-Mean-Ratio=%.2f', peak,
109                         mean, peak / mean)
110
111
112if __name__ == '__main__':
113    absltest.main(failfast=True)
114