1# Copyright 2021 The gRPC Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14import logging 15from typing import Tuple 16 17from absl import flags 18from absl.testing import absltest 19 20from framework import xds_url_map_testcase 21from framework.helpers import skips 22from framework.infrastructure import traffic_director 23from framework.rpc import grpc_channelz 24from framework.test_app import client_app 25 26# Type aliases 27HostRule = xds_url_map_testcase.HostRule 28PathMatcher = xds_url_map_testcase.PathMatcher 29GcpResourceManager = xds_url_map_testcase.GcpResourceManager 30DumpedXdsConfig = xds_url_map_testcase.DumpedXdsConfig 31RpcTypeUnaryCall = xds_url_map_testcase.RpcTypeUnaryCall 32RpcTypeEmptyCall = xds_url_map_testcase.RpcTypeEmptyCall 33XdsTestClient = client_app.XdsTestClient 34_Lang = skips.Lang 35 36logger = logging.getLogger(__name__) 37flags.adopt_module_key_flags(xds_url_map_testcase) 38 39_NUM_RPCS = 150 40_TEST_METADATA_KEY = traffic_director.TEST_AFFINITY_METADATA_KEY 41_TEST_METADATA_VALUE_UNARY = 'unary_yranu' 42_TEST_METADATA_VALUE_EMPTY = 'empty_ytpme' 43_TEST_METADATA_NUMERIC_KEY = 'xds_md_numeric' 44_TEST_METADATA_NUMERIC_VALUE = '159' 45 46_TEST_METADATA = ( 47 (RpcTypeUnaryCall, _TEST_METADATA_KEY, _TEST_METADATA_VALUE_UNARY), 48 (RpcTypeEmptyCall, _TEST_METADATA_KEY, _TEST_METADATA_VALUE_EMPTY), 49 (RpcTypeUnaryCall, _TEST_METADATA_NUMERIC_KEY, 50 _TEST_METADATA_NUMERIC_VALUE), 51) 52 53_ChannelzChannelState = grpc_channelz.ChannelState 54 55 56def _is_supported(config: skips.TestConfig) -> bool: 57 # Per "Ring hash" in 58 # https://github.com/grpc/grpc/blob/master/doc/grpc_xds_features.md 59 if config.client_lang in _Lang.CPP | _Lang.JAVA: 60 return config.version_gte('v1.40.x') 61 elif config.client_lang == _Lang.GO: 62 return config.version_gte('v1.41.x') 63 elif config.client_lang == _Lang.PYTHON: 64 # TODO(https://github.com/grpc/grpc/issues/27430): supported after 65 # the issue is fixed. 66 return False 67 elif config.client_lang == _Lang.NODE: 68 return False 69 return True 70 71 72class TestHeaderBasedAffinity(xds_url_map_testcase.XdsUrlMapTestCase): 73 74 @staticmethod 75 def is_supported(config: skips.TestConfig) -> bool: 76 return _is_supported(config) 77 78 @staticmethod 79 def client_init_config(rpc: str, metadata: str): 80 # Config the init RPCs to send with the same set of metadata. Without 81 # this, the init RPCs will not have headers, and will pick random 82 # backends (behavior of RING_HASH). This is necessary to only one 83 # sub-channel is picked and used from the beginning, thus the channel 84 # will only create this one sub-channel. 85 return 'EmptyCall', 'EmptyCall:%s:%s' % (_TEST_METADATA_KEY, 86 _TEST_METADATA_VALUE_EMPTY) 87 88 @staticmethod 89 def url_map_change( 90 host_rule: HostRule, 91 path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]: 92 # Update default service to the affinity service. 93 path_matcher["defaultService"] = GcpResourceManager( 94 ).affinity_backend_service() 95 return host_rule, path_matcher 96 97 def xds_config_validate(self, xds_config: DumpedXdsConfig): 98 # 3 endpoints in the affinity backend service. 99 self.assertNumEndpoints(xds_config, 3) 100 self.assertEqual( 101 xds_config.rds['virtualHosts'][0]['routes'][0]['route'] 102 ['hashPolicy'][0]['header']['headerName'], _TEST_METADATA_KEY) 103 self.assertEqual(xds_config.cds[0]['lbPolicy'], 'RING_HASH') 104 105 def rpc_distribution_validate(self, test_client: XdsTestClient): 106 rpc_distribution = self.configure_and_send(test_client, 107 rpc_types=[RpcTypeEmptyCall], 108 metadata=_TEST_METADATA, 109 num_rpcs=_NUM_RPCS) 110 # Only one backend should receive traffic, even though there are 3 111 # backends. 112 self.assertEqual(1, rpc_distribution.num_peers) 113 self.assertLen( 114 test_client.find_subchannels_with_state( 115 _ChannelzChannelState.READY), 116 1, 117 ) 118 self.assertLen( 119 test_client.find_subchannels_with_state(_ChannelzChannelState.IDLE), 120 2, 121 ) 122 # Send 150 RPCs without headers. RPCs without headers will pick random 123 # backends. After this, we expect to see all backends to be connected. 124 rpc_distribution = self.configure_and_send( 125 test_client, 126 rpc_types=[RpcTypeEmptyCall, RpcTypeUnaryCall], 127 num_rpcs=_NUM_RPCS) 128 self.assertEqual(3, rpc_distribution.num_peers) 129 self.assertLen( 130 test_client.find_subchannels_with_state( 131 _ChannelzChannelState.READY), 132 3, 133 ) 134 135 136class TestHeaderBasedAffinityMultipleHeaders( 137 xds_url_map_testcase.XdsUrlMapTestCase): 138 139 @staticmethod 140 def is_supported(config: skips.TestConfig) -> bool: 141 return _is_supported(config) 142 143 @staticmethod 144 def client_init_config(rpc: str, metadata: str): 145 # Config the init RPCs to send with the same set of metadata. Without 146 # this, the init RPCs will not have headers, and will pick random 147 # backends (behavior of RING_HASH). This is necessary to only one 148 # sub-channel is picked and used from the beginning, thus the channel 149 # will only create this one sub-channel. 150 return 'EmptyCall', 'EmptyCall:%s:%s' % (_TEST_METADATA_KEY, 151 _TEST_METADATA_VALUE_EMPTY) 152 153 @staticmethod 154 def url_map_change( 155 host_rule: HostRule, 156 path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]: 157 # Update default service to the affinity service. 158 path_matcher["defaultService"] = GcpResourceManager( 159 ).affinity_backend_service() 160 return host_rule, path_matcher 161 162 def xds_config_validate(self, xds_config: DumpedXdsConfig): 163 # 3 endpoints in the affinity backend service. 164 self.assertNumEndpoints(xds_config, 3) 165 self.assertEqual( 166 xds_config.rds['virtualHosts'][0]['routes'][0]['route'] 167 ['hashPolicy'][0]['header']['headerName'], _TEST_METADATA_KEY) 168 self.assertEqual(xds_config.cds[0]['lbPolicy'], 'RING_HASH') 169 170 def rpc_distribution_validate(self, test_client: XdsTestClient): 171 rpc_distribution = self.configure_and_send(test_client, 172 rpc_types=[RpcTypeEmptyCall], 173 metadata=_TEST_METADATA, 174 num_rpcs=_NUM_RPCS) 175 # Only one backend should receive traffic, even though there are 3 176 # backends. 177 self.assertEqual(1, rpc_distribution.num_peers) 178 self.assertLen( 179 test_client.find_subchannels_with_state( 180 _ChannelzChannelState.READY), 181 1, 182 ) 183 self.assertLen( 184 test_client.find_subchannels_with_state(_ChannelzChannelState.IDLE), 185 2, 186 ) 187 empty_call_peer = list(rpc_distribution.raw['rpcsByMethod']['EmptyCall'] 188 ['rpcsByPeer'].keys())[0] 189 # Send RPCs with a different metadata value, try different values to 190 # verify that the client will pick a different backend. 191 # 192 # EmptyCalls will be sent with the same metadata as before, and 193 # UnaryCalls will be sent with headers from ["0".."29"]. We check the 194 # endpoint picked for UnaryCall, and stop as soon as one different from 195 # the EmptyCall peer is picked. 196 # 197 # Note that there's a small chance all the headers would still pick the 198 # same backend used by EmptyCall. But there will be over a thousand 199 # nodes on the ring (default min size is 1024), and the probability of 200 # picking the same backend should be fairly small. 201 different_peer_picked = False 202 for i in range(30): 203 new_metadata = ( 204 (RpcTypeEmptyCall, _TEST_METADATA_KEY, 205 _TEST_METADATA_VALUE_EMPTY), 206 (RpcTypeUnaryCall, _TEST_METADATA_KEY, str(i)), 207 ) 208 rpc_distribution = self.configure_and_send( 209 test_client, 210 rpc_types=[RpcTypeEmptyCall, RpcTypeUnaryCall], 211 metadata=new_metadata, 212 num_rpcs=_NUM_RPCS) 213 unary_call_peer = list(rpc_distribution.raw['rpcsByMethod'] 214 ['UnaryCall']['rpcsByPeer'].keys())[0] 215 if unary_call_peer != empty_call_peer: 216 different_peer_picked = True 217 break 218 self.assertTrue( 219 different_peer_picked, 220 ("the same endpoint was picked for all the headers, expect a " 221 "different endpoint to be picked")) 222 self.assertLen( 223 test_client.find_subchannels_with_state( 224 _ChannelzChannelState.READY), 225 2, 226 ) 227 self.assertLen( 228 test_client.find_subchannels_with_state(_ChannelzChannelState.IDLE), 229 1, 230 ) 231 232 233# TODO: add more test cases 234# 1. based on the basic test, turn down the backend in use, then verify that all 235# RPCs are sent to another backend 236 237if __name__ == '__main__': 238 absltest.main() 239