1# Copyright 2021 The gRPC Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14import logging
15from typing import Tuple
16
17from absl import flags
18from absl.testing import absltest
19
20from framework import xds_url_map_testcase
21from framework.helpers import skips
22from framework.infrastructure import traffic_director
23from framework.rpc import grpc_channelz
24from framework.test_app import client_app
25
26# Type aliases
27HostRule = xds_url_map_testcase.HostRule
28PathMatcher = xds_url_map_testcase.PathMatcher
29GcpResourceManager = xds_url_map_testcase.GcpResourceManager
30DumpedXdsConfig = xds_url_map_testcase.DumpedXdsConfig
31RpcTypeUnaryCall = xds_url_map_testcase.RpcTypeUnaryCall
32RpcTypeEmptyCall = xds_url_map_testcase.RpcTypeEmptyCall
33XdsTestClient = client_app.XdsTestClient
34_Lang = skips.Lang
35
36logger = logging.getLogger(__name__)
37flags.adopt_module_key_flags(xds_url_map_testcase)
38
39_NUM_RPCS = 150
40_TEST_METADATA_KEY = traffic_director.TEST_AFFINITY_METADATA_KEY
41_TEST_METADATA_VALUE_UNARY = 'unary_yranu'
42_TEST_METADATA_VALUE_EMPTY = 'empty_ytpme'
43_TEST_METADATA_NUMERIC_KEY = 'xds_md_numeric'
44_TEST_METADATA_NUMERIC_VALUE = '159'
45
46_TEST_METADATA = (
47    (RpcTypeUnaryCall, _TEST_METADATA_KEY, _TEST_METADATA_VALUE_UNARY),
48    (RpcTypeEmptyCall, _TEST_METADATA_KEY, _TEST_METADATA_VALUE_EMPTY),
49    (RpcTypeUnaryCall, _TEST_METADATA_NUMERIC_KEY,
50     _TEST_METADATA_NUMERIC_VALUE),
51)
52
53_ChannelzChannelState = grpc_channelz.ChannelState
54
55
56def _is_supported(config: skips.TestConfig) -> bool:
57    # Per "Ring hash" in
58    # https://github.com/grpc/grpc/blob/master/doc/grpc_xds_features.md
59    if config.client_lang in _Lang.CPP | _Lang.JAVA:
60        return config.version_gte('v1.40.x')
61    elif config.client_lang == _Lang.GO:
62        return config.version_gte('v1.41.x')
63    elif config.client_lang == _Lang.PYTHON:
64        # TODO(https://github.com/grpc/grpc/issues/27430): supported after
65        #      the issue is fixed.
66        return False
67    elif config.client_lang == _Lang.NODE:
68        return False
69    return True
70
71
72class TestHeaderBasedAffinity(xds_url_map_testcase.XdsUrlMapTestCase):
73
74    @staticmethod
75    def is_supported(config: skips.TestConfig) -> bool:
76        return _is_supported(config)
77
78    @staticmethod
79    def client_init_config(rpc: str, metadata: str):
80        # Config the init RPCs to send with the same set of metadata. Without
81        # this, the init RPCs will not have headers, and will pick random
82        # backends (behavior of RING_HASH). This is necessary to only one
83        # sub-channel is picked and used from the beginning, thus the channel
84        # will only create this one sub-channel.
85        return 'EmptyCall', 'EmptyCall:%s:%s' % (_TEST_METADATA_KEY,
86                                                 _TEST_METADATA_VALUE_EMPTY)
87
88    @staticmethod
89    def url_map_change(
90            host_rule: HostRule,
91            path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
92        # Update default service to the affinity service.
93        path_matcher["defaultService"] = GcpResourceManager(
94        ).affinity_backend_service()
95        return host_rule, path_matcher
96
97    def xds_config_validate(self, xds_config: DumpedXdsConfig):
98        # 3 endpoints in the affinity backend service.
99        self.assertNumEndpoints(xds_config, 3)
100        self.assertEqual(
101            xds_config.rds['virtualHosts'][0]['routes'][0]['route']
102            ['hashPolicy'][0]['header']['headerName'], _TEST_METADATA_KEY)
103        self.assertEqual(xds_config.cds[0]['lbPolicy'], 'RING_HASH')
104
105    def rpc_distribution_validate(self, test_client: XdsTestClient):
106        rpc_distribution = self.configure_and_send(test_client,
107                                                   rpc_types=[RpcTypeEmptyCall],
108                                                   metadata=_TEST_METADATA,
109                                                   num_rpcs=_NUM_RPCS)
110        # Only one backend should receive traffic, even though there are 3
111        # backends.
112        self.assertEqual(1, rpc_distribution.num_peers)
113        self.assertLen(
114            test_client.find_subchannels_with_state(
115                _ChannelzChannelState.READY),
116            1,
117        )
118        self.assertLen(
119            test_client.find_subchannels_with_state(_ChannelzChannelState.IDLE),
120            2,
121        )
122        # Send 150 RPCs without headers. RPCs without headers will pick random
123        # backends. After this, we expect to see all backends to be connected.
124        rpc_distribution = self.configure_and_send(
125            test_client,
126            rpc_types=[RpcTypeEmptyCall, RpcTypeUnaryCall],
127            num_rpcs=_NUM_RPCS)
128        self.assertEqual(3, rpc_distribution.num_peers)
129        self.assertLen(
130            test_client.find_subchannels_with_state(
131                _ChannelzChannelState.READY),
132            3,
133        )
134
135
136class TestHeaderBasedAffinityMultipleHeaders(
137        xds_url_map_testcase.XdsUrlMapTestCase):
138
139    @staticmethod
140    def is_supported(config: skips.TestConfig) -> bool:
141        return _is_supported(config)
142
143    @staticmethod
144    def client_init_config(rpc: str, metadata: str):
145        # Config the init RPCs to send with the same set of metadata. Without
146        # this, the init RPCs will not have headers, and will pick random
147        # backends (behavior of RING_HASH). This is necessary to only one
148        # sub-channel is picked and used from the beginning, thus the channel
149        # will only create this one sub-channel.
150        return 'EmptyCall', 'EmptyCall:%s:%s' % (_TEST_METADATA_KEY,
151                                                 _TEST_METADATA_VALUE_EMPTY)
152
153    @staticmethod
154    def url_map_change(
155            host_rule: HostRule,
156            path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
157        # Update default service to the affinity service.
158        path_matcher["defaultService"] = GcpResourceManager(
159        ).affinity_backend_service()
160        return host_rule, path_matcher
161
162    def xds_config_validate(self, xds_config: DumpedXdsConfig):
163        # 3 endpoints in the affinity backend service.
164        self.assertNumEndpoints(xds_config, 3)
165        self.assertEqual(
166            xds_config.rds['virtualHosts'][0]['routes'][0]['route']
167            ['hashPolicy'][0]['header']['headerName'], _TEST_METADATA_KEY)
168        self.assertEqual(xds_config.cds[0]['lbPolicy'], 'RING_HASH')
169
170    def rpc_distribution_validate(self, test_client: XdsTestClient):
171        rpc_distribution = self.configure_and_send(test_client,
172                                                   rpc_types=[RpcTypeEmptyCall],
173                                                   metadata=_TEST_METADATA,
174                                                   num_rpcs=_NUM_RPCS)
175        # Only one backend should receive traffic, even though there are 3
176        # backends.
177        self.assertEqual(1, rpc_distribution.num_peers)
178        self.assertLen(
179            test_client.find_subchannels_with_state(
180                _ChannelzChannelState.READY),
181            1,
182        )
183        self.assertLen(
184            test_client.find_subchannels_with_state(_ChannelzChannelState.IDLE),
185            2,
186        )
187        empty_call_peer = list(rpc_distribution.raw['rpcsByMethod']['EmptyCall']
188                               ['rpcsByPeer'].keys())[0]
189        # Send RPCs with a different metadata value, try different values to
190        # verify that the client will pick a different backend.
191        #
192        # EmptyCalls will be sent with the same metadata as before, and
193        # UnaryCalls will be sent with headers from ["0".."29"]. We check the
194        # endpoint picked for UnaryCall, and stop as soon as one different from
195        # the EmptyCall peer is picked.
196        #
197        # Note that there's a small chance all the headers would still pick the
198        # same backend used by EmptyCall. But there will be over a thousand
199        # nodes on the ring (default min size is 1024), and the probability of
200        # picking the same backend should be fairly small.
201        different_peer_picked = False
202        for i in range(30):
203            new_metadata = (
204                (RpcTypeEmptyCall, _TEST_METADATA_KEY,
205                 _TEST_METADATA_VALUE_EMPTY),
206                (RpcTypeUnaryCall, _TEST_METADATA_KEY, str(i)),
207            )
208            rpc_distribution = self.configure_and_send(
209                test_client,
210                rpc_types=[RpcTypeEmptyCall, RpcTypeUnaryCall],
211                metadata=new_metadata,
212                num_rpcs=_NUM_RPCS)
213            unary_call_peer = list(rpc_distribution.raw['rpcsByMethod']
214                                   ['UnaryCall']['rpcsByPeer'].keys())[0]
215            if unary_call_peer != empty_call_peer:
216                different_peer_picked = True
217                break
218        self.assertTrue(
219            different_peer_picked,
220            ("the same endpoint was picked for all the headers, expect a "
221             "different endpoint to be picked"))
222        self.assertLen(
223            test_client.find_subchannels_with_state(
224                _ChannelzChannelState.READY),
225            2,
226        )
227        self.assertLen(
228            test_client.find_subchannels_with_state(_ChannelzChannelState.IDLE),
229            1,
230        )
231
232
233# TODO: add more test cases
234# 1. based on the basic test, turn down the backend in use, then verify that all
235#    RPCs are sent to another backend
236
237if __name__ == '__main__':
238    absltest.main()
239