1 // Copyright 2017 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 //
15
16 #include <string>
17 #include <vector>
18
19 #include <gmock/gmock.h>
20 #include <gtest/gtest.h>
21
22 #include "absl/strings/str_cat.h"
23 #include "absl/strings/str_format.h"
24
25 #include <grpc/event_engine/endpoint_config.h>
26
27 #include "src/core/client_channel/backup_poller.h"
28 #include "src/core/lib/address_utils/sockaddr_utils.h"
29 #include "src/core/lib/config/config_vars.h"
30 #include "src/core/lib/gprpp/env.h"
31 #include "src/core/load_balancing/xds/xds_channel_args.h"
32 #include "src/core/resolver/fake/fake_resolver.h"
33 #include "src/proto/grpc/testing/xds/v3/aggregate_cluster.grpc.pb.h"
34 #include "src/proto/grpc/testing/xds/v3/cluster.grpc.pb.h"
35 #include "test/core/util/resolve_localhost_ip46.h"
36 #include "test/cpp/end2end/connection_attempt_injector.h"
37 #include "test/cpp/end2end/xds/xds_end2end_test_lib.h"
38
39 namespace grpc {
40 namespace testing {
41 namespace {
42
43 using ::envoy::config::cluster::v3::CustomClusterType;
44 using ::envoy::config::core::v3::HealthStatus;
45 using ::envoy::extensions::clusters::aggregate::v3::ClusterConfig;
46
47 class RingHashTest : public XdsEnd2endTest {
48 protected:
SetUp()49 void SetUp() override {
50 logical_dns_cluster_resolver_response_generator_ =
51 grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
52 InitClient();
53 SetUpChannel();
54 }
55
SetUpChannel(ChannelArguments * args=nullptr)56 void SetUpChannel(ChannelArguments* args = nullptr) {
57 ChannelArguments local_args;
58 if (args == nullptr) args = &local_args;
59 args->SetPointerWithVtable(
60 GRPC_ARG_XDS_LOGICAL_DNS_CLUSTER_FAKE_RESOLVER_RESPONSE_GENERATOR,
61 logical_dns_cluster_resolver_response_generator_.get(),
62 &grpc_core::FakeResolverResponseGenerator::kChannelArgPointerVtable);
63 ResetStub(/*failover_timeout_ms=*/0, args);
64 }
65
CreateAddressListFromPortList(const std::vector<int> & ports)66 grpc_core::EndpointAddressesList CreateAddressListFromPortList(
67 const std::vector<int>& ports) {
68 grpc_core::EndpointAddressesList addresses;
69 for (int port : ports) {
70 absl::StatusOr<grpc_core::URI> lb_uri =
71 grpc_core::URI::Parse(grpc_core::LocalIpUri(port));
72 GPR_ASSERT(lb_uri.ok());
73 grpc_resolved_address address;
74 GPR_ASSERT(grpc_parse_uri(*lb_uri, &address));
75 addresses.emplace_back(address, grpc_core::ChannelArgs());
76 }
77 return addresses;
78 }
79
CreateMetadataValueThatHashesToBackendPort(int port)80 std::string CreateMetadataValueThatHashesToBackendPort(int port) {
81 return absl::StrCat(grpc_core::LocalIp(), ":", port, "_0");
82 }
83
CreateMetadataValueThatHashesToBackend(int index)84 std::string CreateMetadataValueThatHashesToBackend(int index) {
85 return CreateMetadataValueThatHashesToBackendPort(backends_[index]->port());
86 }
87
88 grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
89 logical_dns_cluster_resolver_response_generator_;
90 };
91
92 // Run both with and without load reporting, just for test coverage.
93 INSTANTIATE_TEST_SUITE_P(
94 XdsTest, RingHashTest,
95 ::testing::Values(XdsTestType(), XdsTestType().set_enable_load_reporting()),
96 &XdsTestType::Name);
97
TEST_P(RingHashTest,AggregateClusterFallBackFromRingHashAtStartup)98 TEST_P(RingHashTest, AggregateClusterFallBackFromRingHashAtStartup) {
99 CreateAndStartBackends(2);
100 const char* kNewCluster1Name = "new_cluster_1";
101 const char* kNewEdsService1Name = "new_eds_service_name_1";
102 const char* kNewCluster2Name = "new_cluster_2";
103 const char* kNewEdsService2Name = "new_eds_service_name_2";
104 // Populate new EDS resources.
105 EdsResourceArgs args1({
106 {"locality0", {MakeNonExistantEndpoint(), MakeNonExistantEndpoint()}},
107 });
108 EdsResourceArgs args2({
109 {"locality0", CreateEndpointsForBackends()},
110 });
111 balancer_->ads_service()->SetEdsResource(
112 BuildEdsResource(args1, kNewEdsService1Name));
113 balancer_->ads_service()->SetEdsResource(
114 BuildEdsResource(args2, kNewEdsService2Name));
115 // Populate new CDS resources.
116 Cluster new_cluster1 = default_cluster_;
117 new_cluster1.set_name(kNewCluster1Name);
118 new_cluster1.mutable_eds_cluster_config()->set_service_name(
119 kNewEdsService1Name);
120 new_cluster1.set_lb_policy(Cluster::RING_HASH);
121 balancer_->ads_service()->SetCdsResource(new_cluster1);
122 Cluster new_cluster2 = default_cluster_;
123 new_cluster2.set_name(kNewCluster2Name);
124 new_cluster2.mutable_eds_cluster_config()->set_service_name(
125 kNewEdsService2Name);
126 new_cluster2.set_lb_policy(Cluster::RING_HASH);
127 balancer_->ads_service()->SetCdsResource(new_cluster2);
128 // Create Aggregate Cluster
129 auto cluster = default_cluster_;
130 CustomClusterType* custom_cluster = cluster.mutable_cluster_type();
131 custom_cluster->set_name("envoy.clusters.aggregate");
132 ClusterConfig cluster_config;
133 cluster_config.add_clusters(kNewCluster1Name);
134 cluster_config.add_clusters(kNewCluster2Name);
135 custom_cluster->mutable_typed_config()->PackFrom(cluster_config);
136 balancer_->ads_service()->SetCdsResource(cluster);
137 // Set up route with channel id hashing
138 auto new_route_config = default_route_config_;
139 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
140 auto* hash_policy = route->mutable_route()->add_hash_policy();
141 hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
142 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
143 new_route_config);
144 // Verifying that we are using ring hash as only 1 endpoint is receiving all
145 // the traffic.
146 CheckRpcSendOk(DEBUG_LOCATION, 100, RpcOptions().set_timeout_ms(5000));
147 bool found = false;
148 for (size_t i = 0; i < backends_.size(); ++i) {
149 if (backends_[i]->backend_service()->request_count() > 0) {
150 EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100)
151 << "backend " << i;
152 EXPECT_FALSE(found) << "backend " << i;
153 found = true;
154 }
155 }
156 EXPECT_TRUE(found);
157 }
158
TEST_P(RingHashTest,AggregateClusterFallBackFromRingHashToLogicalDnsAtStartup)159 TEST_P(RingHashTest,
160 AggregateClusterFallBackFromRingHashToLogicalDnsAtStartup) {
161 CreateAndStartBackends(1);
162 const char* kEdsClusterName = "eds_cluster";
163 const char* kLogicalDNSClusterName = "logical_dns_cluster";
164 // Populate EDS resource.
165 EdsResourceArgs args({
166 {"locality0",
167 {MakeNonExistantEndpoint(), MakeNonExistantEndpoint()},
168 kDefaultLocalityWeight,
169 0},
170 {"locality1",
171 {MakeNonExistantEndpoint(), MakeNonExistantEndpoint()},
172 kDefaultLocalityWeight,
173 1},
174 });
175 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
176 // Populate new CDS resources.
177 Cluster eds_cluster = default_cluster_;
178 eds_cluster.set_name(kEdsClusterName);
179 eds_cluster.set_lb_policy(Cluster::RING_HASH);
180 balancer_->ads_service()->SetCdsResource(eds_cluster);
181 // Populate LOGICAL_DNS cluster.
182 auto logical_dns_cluster = default_cluster_;
183 logical_dns_cluster.set_name(kLogicalDNSClusterName);
184 logical_dns_cluster.set_type(Cluster::LOGICAL_DNS);
185 auto* address = logical_dns_cluster.mutable_load_assignment()
186 ->add_endpoints()
187 ->add_lb_endpoints()
188 ->mutable_endpoint()
189 ->mutable_address()
190 ->mutable_socket_address();
191 address->set_address(kServerName);
192 address->set_port_value(443);
193 balancer_->ads_service()->SetCdsResource(logical_dns_cluster);
194 // Create Aggregate Cluster
195 auto cluster = default_cluster_;
196 CustomClusterType* custom_cluster = cluster.mutable_cluster_type();
197 custom_cluster->set_name("envoy.clusters.aggregate");
198 ClusterConfig cluster_config;
199 cluster_config.add_clusters(kEdsClusterName);
200 cluster_config.add_clusters(kLogicalDNSClusterName);
201 custom_cluster->mutable_typed_config()->PackFrom(cluster_config);
202 balancer_->ads_service()->SetCdsResource(cluster);
203 // Set up route with channel id hashing
204 auto new_route_config = default_route_config_;
205 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
206 auto* hash_policy = route->mutable_route()->add_hash_policy();
207 hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
208 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
209 new_route_config);
210 // Set Logical DNS result
211 {
212 grpc_core::ExecCtx exec_ctx;
213 grpc_core::Resolver::Result result;
214 result.addresses = CreateAddressListFromPortList(GetBackendPorts());
215 logical_dns_cluster_resolver_response_generator_->SetResponseSynchronously(
216 std::move(result));
217 }
218 // Inject connection delay to make this act more realistically.
219 ConnectionAttemptInjector injector;
220 injector.SetDelay(grpc_core::Duration::Milliseconds(500) *
221 grpc_test_slowdown_factor());
222 // Send RPC. Need the timeout to be long enough to account for the
223 // subchannel connection delays.
224 CheckRpcSendOk(DEBUG_LOCATION, 1, RpcOptions().set_timeout_ms(5000));
225 }
226
TEST_P(RingHashTest,AggregateClusterFallBackFromRingHashToLogicalDnsAtStartupNoFailedRpcs)227 TEST_P(RingHashTest,
228 AggregateClusterFallBackFromRingHashToLogicalDnsAtStartupNoFailedRpcs) {
229 CreateAndStartBackends(1);
230 const char* kEdsClusterName = "eds_cluster";
231 const char* kLogicalDNSClusterName = "logical_dns_cluster";
232 // Populate EDS resource.
233 EdsResourceArgs args({
234 {"locality0",
235 {MakeNonExistantEndpoint(), MakeNonExistantEndpoint()},
236 kDefaultLocalityWeight,
237 0},
238 {"locality1",
239 {MakeNonExistantEndpoint(), MakeNonExistantEndpoint()},
240 kDefaultLocalityWeight,
241 1},
242 });
243 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
244 // Populate new CDS resources.
245 Cluster eds_cluster = default_cluster_;
246 eds_cluster.set_name(kEdsClusterName);
247 eds_cluster.set_lb_policy(Cluster::RING_HASH);
248 balancer_->ads_service()->SetCdsResource(eds_cluster);
249 // Populate LOGICAL_DNS cluster.
250 auto logical_dns_cluster = default_cluster_;
251 logical_dns_cluster.set_name(kLogicalDNSClusterName);
252 logical_dns_cluster.set_type(Cluster::LOGICAL_DNS);
253 auto* address = logical_dns_cluster.mutable_load_assignment()
254 ->add_endpoints()
255 ->add_lb_endpoints()
256 ->mutable_endpoint()
257 ->mutable_address()
258 ->mutable_socket_address();
259 address->set_address(kServerName);
260 address->set_port_value(443);
261 balancer_->ads_service()->SetCdsResource(logical_dns_cluster);
262 // Create Aggregate Cluster
263 auto cluster = default_cluster_;
264 CustomClusterType* custom_cluster = cluster.mutable_cluster_type();
265 custom_cluster->set_name("envoy.clusters.aggregate");
266 ClusterConfig cluster_config;
267 cluster_config.add_clusters(kEdsClusterName);
268 cluster_config.add_clusters(kLogicalDNSClusterName);
269 custom_cluster->mutable_typed_config()->PackFrom(cluster_config);
270 balancer_->ads_service()->SetCdsResource(cluster);
271 // Set up route with channel id hashing
272 auto new_route_config = default_route_config_;
273 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
274 auto* hash_policy = route->mutable_route()->add_hash_policy();
275 hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
276 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
277 new_route_config);
278 // Set Logical DNS result
279 {
280 grpc_core::ExecCtx exec_ctx;
281 grpc_core::Resolver::Result result;
282 result.addresses = CreateAddressListFromPortList(GetBackendPorts());
283 logical_dns_cluster_resolver_response_generator_->SetResponseSynchronously(
284 std::move(result));
285 }
286 // Set up connection attempt injector.
287 ConnectionAttemptInjector injector;
288 auto hold = injector.AddHold(backends_[0]->port());
289 // Increase subchannel backoff time, so that subchannels stay in
290 // TRANSIENT_FAILURE for long enough to trigger potential problems.
291 ChannelArguments channel_args;
292 channel_args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS,
293 10000 * grpc_test_slowdown_factor());
294 SetUpChannel(&channel_args);
295 // Start an RPC in the background.
296 LongRunningRpc rpc;
297 rpc.StartRpc(stub_.get(), RpcOptions().set_timeout_ms(5000));
298 // Wait for connection attempt to the backend.
299 hold->Wait();
300 // Channel should report CONNECTING here, and any RPC should be queued.
301 EXPECT_EQ(channel_->GetState(false), GRPC_CHANNEL_CONNECTING);
302 // Start a second RPC at this point, which should be queued as well.
303 // This will fail if the priority policy fails to update the picker to
304 // point to the LOGICAL_DNS child; if it leaves it pointing to the EDS
305 // priority 1, then the RPC will fail, because all subchannels are in
306 // TRANSIENT_FAILURE.
307 // Note that sending only the first RPC does not catch this case,
308 // because if the priority policy fails to update the picker, then the
309 // pick for the first RPC will not be retried.
310 LongRunningRpc rpc2;
311 rpc2.StartRpc(stub_.get(), RpcOptions().set_timeout_ms(5000));
312 // Allow the connection attempt to complete.
313 hold->Resume();
314 // Now the RPCs should complete successfully.
315 gpr_log(GPR_INFO, "=== WAITING FOR FIRST RPC TO FINISH ===");
316 Status status = rpc.GetStatus();
317 gpr_log(GPR_INFO, "=== FIRST RPC FINISHED ===");
318 EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
319 << " message=" << status.error_message();
320 gpr_log(GPR_INFO, "=== WAITING FOR SECOND RPC TO FINISH ===");
321 status = rpc2.GetStatus();
322 gpr_log(GPR_INFO, "=== SECOND RPC FINISHED ===");
323 EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
324 << " message=" << status.error_message();
325 }
326
327 // Tests that ring hash policy that hashes using channel id ensures all RPCs
328 // to go 1 particular backend.
TEST_P(RingHashTest,ChannelIdHashing)329 TEST_P(RingHashTest, ChannelIdHashing) {
330 CreateAndStartBackends(4);
331 auto cluster = default_cluster_;
332 cluster.set_lb_policy(Cluster::RING_HASH);
333 balancer_->ads_service()->SetCdsResource(cluster);
334 auto new_route_config = default_route_config_;
335 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
336 auto* hash_policy = route->mutable_route()->add_hash_policy();
337 hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
338 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
339 new_route_config);
340 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
341 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
342 CheckRpcSendOk(DEBUG_LOCATION, 100, RpcOptions().set_timeout_ms(5000));
343 bool found = false;
344 for (size_t i = 0; i < backends_.size(); ++i) {
345 if (backends_[i]->backend_service()->request_count() > 0) {
346 EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100)
347 << "backend " << i;
348 EXPECT_FALSE(found) << "backend " << i;
349 found = true;
350 }
351 }
352 EXPECT_TRUE(found);
353 }
354
355 // Tests that ring hash policy that hashes using a header value can spread
356 // RPCs across all the backends.
TEST_P(RingHashTest,HeaderHashing)357 TEST_P(RingHashTest, HeaderHashing) {
358 CreateAndStartBackends(4);
359 auto cluster = default_cluster_;
360 cluster.set_lb_policy(Cluster::RING_HASH);
361 balancer_->ads_service()->SetCdsResource(cluster);
362 auto new_route_config = default_route_config_;
363 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
364 auto* hash_policy = route->mutable_route()->add_hash_policy();
365 hash_policy->mutable_header()->set_header_name("address_hash");
366 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
367 new_route_config);
368 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
369 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
370 // Note each type of RPC will contains a header value that will always be
371 // hashed to a specific backend as the header value matches the value used
372 // to create the entry in the ring.
373 std::vector<std::pair<std::string, std::string>> metadata = {
374 {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
375 std::vector<std::pair<std::string, std::string>> metadata1 = {
376 {"address_hash", CreateMetadataValueThatHashesToBackend(1)}};
377 std::vector<std::pair<std::string, std::string>> metadata2 = {
378 {"address_hash", CreateMetadataValueThatHashesToBackend(2)}};
379 std::vector<std::pair<std::string, std::string>> metadata3 = {
380 {"address_hash", CreateMetadataValueThatHashesToBackend(3)}};
381 const auto rpc_options =
382 RpcOptions().set_metadata(std::move(metadata)).set_timeout_ms(5000);
383 const auto rpc_options1 =
384 RpcOptions().set_metadata(std::move(metadata1)).set_timeout_ms(5000);
385 const auto rpc_options2 =
386 RpcOptions().set_metadata(std::move(metadata2)).set_timeout_ms(5000);
387 const auto rpc_options3 =
388 RpcOptions().set_metadata(std::move(metadata3)).set_timeout_ms(5000);
389 WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
390 WaitForBackendOptions(), rpc_options);
391 WaitForBackend(DEBUG_LOCATION, 1, /*check_status=*/nullptr,
392 WaitForBackendOptions(), rpc_options1);
393 WaitForBackend(DEBUG_LOCATION, 2, /*check_status=*/nullptr,
394 WaitForBackendOptions(), rpc_options2);
395 WaitForBackend(DEBUG_LOCATION, 3, /*check_status=*/nullptr,
396 WaitForBackendOptions(), rpc_options3);
397 CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options);
398 CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options1);
399 CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options2);
400 CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options3);
401 for (size_t i = 0; i < backends_.size(); ++i) {
402 EXPECT_EQ(100, backends_[i]->backend_service()->request_count());
403 }
404 }
405
406 // Tests that ring hash policy that hashes using a header value and regex
407 // rewrite to aggregate RPCs to 1 backend.
TEST_P(RingHashTest,HeaderHashingWithRegexRewrite)408 TEST_P(RingHashTest, HeaderHashingWithRegexRewrite) {
409 CreateAndStartBackends(4);
410 auto cluster = default_cluster_;
411 cluster.set_lb_policy(Cluster::RING_HASH);
412 balancer_->ads_service()->SetCdsResource(cluster);
413 auto new_route_config = default_route_config_;
414 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
415 auto* hash_policy = route->mutable_route()->add_hash_policy();
416 hash_policy->mutable_header()->set_header_name("address_hash");
417 hash_policy->mutable_header()
418 ->mutable_regex_rewrite()
419 ->mutable_pattern()
420 ->set_regex("[0-9]+");
421 hash_policy->mutable_header()->mutable_regex_rewrite()->set_substitution(
422 "foo");
423 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
424 new_route_config);
425 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
426 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
427 std::vector<std::pair<std::string, std::string>> metadata = {
428 {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
429 std::vector<std::pair<std::string, std::string>> metadata1 = {
430 {"address_hash", CreateMetadataValueThatHashesToBackend(1)}};
431 std::vector<std::pair<std::string, std::string>> metadata2 = {
432 {"address_hash", CreateMetadataValueThatHashesToBackend(2)}};
433 std::vector<std::pair<std::string, std::string>> metadata3 = {
434 {"address_hash", CreateMetadataValueThatHashesToBackend(3)}};
435 const auto rpc_options =
436 RpcOptions().set_metadata(std::move(metadata)).set_timeout_ms(5000);
437 const auto rpc_options1 =
438 RpcOptions().set_metadata(std::move(metadata1)).set_timeout_ms(5000);
439 const auto rpc_options2 =
440 RpcOptions().set_metadata(std::move(metadata2)).set_timeout_ms(5000);
441 const auto rpc_options3 =
442 RpcOptions().set_metadata(std::move(metadata3)).set_timeout_ms(5000);
443 CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options);
444 CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options1);
445 CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options2);
446 CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options3);
447 bool found = false;
448 for (size_t i = 0; i < backends_.size(); ++i) {
449 if (backends_[i]->backend_service()->request_count() > 0) {
450 EXPECT_EQ(backends_[i]->backend_service()->request_count(), 400)
451 << "backend " << i;
452 EXPECT_FALSE(found) << "backend " << i;
453 found = true;
454 }
455 }
456 EXPECT_TRUE(found);
457 }
458
459 // Tests that ring hash policy that hashes using a random value.
TEST_P(RingHashTest,NoHashPolicy)460 TEST_P(RingHashTest, NoHashPolicy) {
461 CreateAndStartBackends(2);
462 const double kDistribution50Percent = 0.5;
463 const double kErrorTolerance = 0.05;
464 const uint32_t kRpcTimeoutMs = 10000;
465 const size_t kNumRpcs =
466 ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
467 auto cluster = default_cluster_;
468 // Increasing min ring size for random distribution.
469 cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
470 100000);
471 cluster.set_lb_policy(Cluster::RING_HASH);
472 balancer_->ads_service()->SetCdsResource(cluster);
473 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
474 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
475 // TODO(donnadionne): remove extended timeout after ring creation
476 // optimization.
477 WaitForAllBackends(DEBUG_LOCATION, 0, 2, /*check_status=*/nullptr,
478 WaitForBackendOptions().set_timeout_ms(kRpcTimeoutMs),
479 RpcOptions().set_timeout_ms(kRpcTimeoutMs));
480 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
481 const int request_count_1 = backends_[0]->backend_service()->request_count();
482 const int request_count_2 = backends_[1]->backend_service()->request_count();
483 EXPECT_THAT(static_cast<double>(request_count_1) / kNumRpcs,
484 ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
485 EXPECT_THAT(static_cast<double>(request_count_2) / kNumRpcs,
486 ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
487 }
488
489 // Tests that we observe endpoint weights.
TEST_P(RingHashTest,EndpointWeights)490 TEST_P(RingHashTest, EndpointWeights) {
491 CreateAndStartBackends(3);
492 const double kDistribution50Percent = 0.5;
493 const double kDistribution25Percent = 0.25;
494 const double kErrorTolerance = 0.05;
495 const uint32_t kRpcTimeoutMs = 10000;
496 const size_t kNumRpcs =
497 ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
498 auto cluster = default_cluster_;
499 // Increasing min ring size for random distribution.
500 cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
501 100000);
502 cluster.set_lb_policy(Cluster::RING_HASH);
503 balancer_->ads_service()->SetCdsResource(cluster);
504 // Endpoint 0 has weight 0, will be treated as weight 1.
505 // Endpoint 1 has weight 1.
506 // Endpoint 2 has weight 2.
507 EdsResourceArgs args(
508 {{"locality0",
509 {CreateEndpoint(0, ::envoy::config::core::v3::HealthStatus::UNKNOWN, 0),
510 CreateEndpoint(1, ::envoy::config::core::v3::HealthStatus::UNKNOWN, 1),
511 CreateEndpoint(2, ::envoy::config::core::v3::HealthStatus::UNKNOWN,
512 2)}}});
513 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
514 // TODO(donnadionne): remove extended timeout after ring creation
515 // optimization.
516 WaitForAllBackends(DEBUG_LOCATION, 0, 3, /*check_status=*/nullptr,
517 WaitForBackendOptions().set_timeout_ms(kRpcTimeoutMs),
518 RpcOptions().set_timeout_ms(kRpcTimeoutMs));
519 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
520 // Endpoint 2 should see 50% of traffic, and endpoints 0 and 1 should
521 // each see 25% of traffic.
522 const int request_count_0 = backends_[0]->backend_service()->request_count();
523 const int request_count_1 = backends_[1]->backend_service()->request_count();
524 const int request_count_2 = backends_[2]->backend_service()->request_count();
525 EXPECT_THAT(static_cast<double>(request_count_0) / kNumRpcs,
526 ::testing::DoubleNear(kDistribution25Percent, kErrorTolerance));
527 EXPECT_THAT(static_cast<double>(request_count_1) / kNumRpcs,
528 ::testing::DoubleNear(kDistribution25Percent, kErrorTolerance));
529 EXPECT_THAT(static_cast<double>(request_count_2) / kNumRpcs,
530 ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
531 }
532
533 // Test that ring hash policy evaluation will continue past the terminal
534 // policy if no results are produced yet.
TEST_P(RingHashTest,ContinuesPastTerminalPolicyThatDoesNotProduceResult)535 TEST_P(RingHashTest, ContinuesPastTerminalPolicyThatDoesNotProduceResult) {
536 CreateAndStartBackends(2);
537 auto cluster = default_cluster_;
538 cluster.set_lb_policy(Cluster::RING_HASH);
539 balancer_->ads_service()->SetCdsResource(cluster);
540 auto new_route_config = default_route_config_;
541 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
542 auto* hash_policy = route->mutable_route()->add_hash_policy();
543 hash_policy->mutable_header()->set_header_name("header_not_present");
544 hash_policy->set_terminal(true);
545 auto* hash_policy2 = route->mutable_route()->add_hash_policy();
546 hash_policy2->mutable_header()->set_header_name("address_hash");
547 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
548 new_route_config);
549 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
550 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
551 std::vector<std::pair<std::string, std::string>> metadata = {
552 {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
553 const auto rpc_options =
554 RpcOptions().set_metadata(std::move(metadata)).set_timeout_ms(5000);
555 CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options);
556 EXPECT_EQ(backends_[0]->backend_service()->request_count(), 100);
557 EXPECT_EQ(backends_[1]->backend_service()->request_count(), 0);
558 }
559
560 // Test random hash is used when header hashing specified a header field that
561 // the RPC did not have.
TEST_P(RingHashTest,HashOnHeaderThatIsNotPresent)562 TEST_P(RingHashTest, HashOnHeaderThatIsNotPresent) {
563 CreateAndStartBackends(2);
564 const double kDistribution50Percent = 0.5;
565 const double kErrorTolerance = 0.05;
566 const uint32_t kRpcTimeoutMs = 10000;
567 const size_t kNumRpcs =
568 ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
569 auto cluster = default_cluster_;
570 // Increasing min ring size for random distribution.
571 cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
572 100000);
573 cluster.set_lb_policy(Cluster::RING_HASH);
574 balancer_->ads_service()->SetCdsResource(cluster);
575 auto new_route_config = default_route_config_;
576 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
577 auto* hash_policy = route->mutable_route()->add_hash_policy();
578 hash_policy->mutable_header()->set_header_name("header_not_present");
579 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
580 new_route_config);
581 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
582 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
583 std::vector<std::pair<std::string, std::string>> metadata = {
584 {"unmatched_header", absl::StrFormat("%" PRIu32, rand())},
585 };
586 const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
587 // TODO(donnadionne): remove extended timeout after ring creation
588 // optimization.
589 WaitForAllBackends(DEBUG_LOCATION, 0, 2, /*check_status=*/nullptr,
590 WaitForBackendOptions().set_timeout_ms(kRpcTimeoutMs),
591 RpcOptions().set_timeout_ms(kRpcTimeoutMs));
592 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs, rpc_options);
593 const int request_count_1 = backends_[0]->backend_service()->request_count();
594 const int request_count_2 = backends_[1]->backend_service()->request_count();
595 EXPECT_THAT(static_cast<double>(request_count_1) / kNumRpcs,
596 ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
597 EXPECT_THAT(static_cast<double>(request_count_2) / kNumRpcs,
598 ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
599 }
600
601 // Test random hash is used when only unsupported hash policies are
602 // configured.
TEST_P(RingHashTest,UnsupportedHashPolicyDefaultToRandomHashing)603 TEST_P(RingHashTest, UnsupportedHashPolicyDefaultToRandomHashing) {
604 CreateAndStartBackends(2);
605 const double kDistribution50Percent = 0.5;
606 const double kErrorTolerance = 0.05;
607 const uint32_t kRpcTimeoutMs = 10000;
608 const size_t kNumRpcs =
609 ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
610 auto cluster = default_cluster_;
611 // Increasing min ring size for random distribution.
612 cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
613 100000);
614 cluster.set_lb_policy(Cluster::RING_HASH);
615 balancer_->ads_service()->SetCdsResource(cluster);
616 auto new_route_config = default_route_config_;
617 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
618 auto* hash_policy_unsupported_1 = route->mutable_route()->add_hash_policy();
619 hash_policy_unsupported_1->mutable_cookie()->set_name("cookie");
620 auto* hash_policy_unsupported_2 = route->mutable_route()->add_hash_policy();
621 hash_policy_unsupported_2->mutable_connection_properties()->set_source_ip(
622 true);
623 auto* hash_policy_unsupported_3 = route->mutable_route()->add_hash_policy();
624 hash_policy_unsupported_3->mutable_query_parameter()->set_name(
625 "query_parameter");
626 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
627 new_route_config);
628 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
629 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
630 // TODO(donnadionne): remove extended timeout after ring creation
631 // optimization.
632 WaitForAllBackends(DEBUG_LOCATION, 0, 2, /*check_status=*/nullptr,
633 WaitForBackendOptions().set_timeout_ms(kRpcTimeoutMs),
634 RpcOptions().set_timeout_ms(kRpcTimeoutMs));
635 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
636 const int request_count_1 = backends_[0]->backend_service()->request_count();
637 const int request_count_2 = backends_[1]->backend_service()->request_count();
638 EXPECT_THAT(static_cast<double>(request_count_1) / kNumRpcs,
639 ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
640 EXPECT_THAT(static_cast<double>(request_count_2) / kNumRpcs,
641 ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
642 }
643
644 // Tests that ring hash policy that hashes using a random value can spread
645 // RPCs across all the backends according to locality weight.
TEST_P(RingHashTest,RandomHashingDistributionAccordingToEndpointWeight)646 TEST_P(RingHashTest, RandomHashingDistributionAccordingToEndpointWeight) {
647 CreateAndStartBackends(2);
648 const size_t kWeight1 = 1;
649 const size_t kWeight2 = 2;
650 const size_t kWeightTotal = kWeight1 + kWeight2;
651 const double kWeight33Percent = static_cast<double>(kWeight1) / kWeightTotal;
652 const double kWeight66Percent = static_cast<double>(kWeight2) / kWeightTotal;
653 const double kErrorTolerance = 0.05;
654 const uint32_t kRpcTimeoutMs = 10000;
655 const size_t kNumRpcs =
656 ComputeIdealNumRpcs(kWeight33Percent, kErrorTolerance);
657 auto cluster = default_cluster_;
658 // Increasing min ring size for random distribution.
659 cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
660 100000);
661 cluster.set_lb_policy(Cluster::RING_HASH);
662 balancer_->ads_service()->SetCdsResource(cluster);
663 EdsResourceArgs args({{"locality0",
664 {CreateEndpoint(0, HealthStatus::UNKNOWN, 1),
665 CreateEndpoint(1, HealthStatus::UNKNOWN, 2)}}});
666 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
667 // TODO(donnadionne): remove extended timeout after ring creation
668 // optimization.
669 WaitForAllBackends(DEBUG_LOCATION, 0, 2, /*check_status=*/nullptr,
670 WaitForBackendOptions().set_timeout_ms(kRpcTimeoutMs),
671 RpcOptions().set_timeout_ms(kRpcTimeoutMs));
672 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
673 const int weight_33_request_count =
674 backends_[0]->backend_service()->request_count();
675 const int weight_66_request_count =
676 backends_[1]->backend_service()->request_count();
677 EXPECT_THAT(static_cast<double>(weight_33_request_count) / kNumRpcs,
678 ::testing::DoubleNear(kWeight33Percent, kErrorTolerance));
679 EXPECT_THAT(static_cast<double>(weight_66_request_count) / kNumRpcs,
680 ::testing::DoubleNear(kWeight66Percent, kErrorTolerance));
681 }
682
683 // Tests that ring hash policy that hashes using a random value can spread
684 // RPCs across all the backends according to locality weight.
TEST_P(RingHashTest,RandomHashingDistributionAccordingToLocalityAndEndpointWeight)685 TEST_P(RingHashTest,
686 RandomHashingDistributionAccordingToLocalityAndEndpointWeight) {
687 CreateAndStartBackends(2);
688 const size_t kWeight1 = 1 * 1;
689 const size_t kWeight2 = 2 * 2;
690 const size_t kWeightTotal = kWeight1 + kWeight2;
691 const double kWeight20Percent = static_cast<double>(kWeight1) / kWeightTotal;
692 const double kWeight80Percent = static_cast<double>(kWeight2) / kWeightTotal;
693 const double kErrorTolerance = 0.05;
694 const uint32_t kRpcTimeoutMs = 10000;
695 const size_t kNumRpcs =
696 ComputeIdealNumRpcs(kWeight20Percent, kErrorTolerance);
697 auto cluster = default_cluster_;
698 // Increasing min ring size for random distribution.
699 cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
700 100000);
701 cluster.set_lb_policy(Cluster::RING_HASH);
702 balancer_->ads_service()->SetCdsResource(cluster);
703 EdsResourceArgs args(
704 {{"locality0", {CreateEndpoint(0, HealthStatus::UNKNOWN, 1)}, 1},
705 {"locality1", {CreateEndpoint(1, HealthStatus::UNKNOWN, 2)}, 2}});
706 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
707 // TODO(donnadionne): remove extended timeout after ring creation
708 // optimization.
709 WaitForAllBackends(DEBUG_LOCATION, 0, 2, /*check_status=*/nullptr,
710 WaitForBackendOptions().set_timeout_ms(kRpcTimeoutMs),
711 RpcOptions().set_timeout_ms(kRpcTimeoutMs));
712 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
713 const int weight_20_request_count =
714 backends_[0]->backend_service()->request_count();
715 const int weight_80_request_count =
716 backends_[1]->backend_service()->request_count();
717 EXPECT_THAT(static_cast<double>(weight_20_request_count) / kNumRpcs,
718 ::testing::DoubleNear(kWeight20Percent, kErrorTolerance));
719 EXPECT_THAT(static_cast<double>(weight_80_request_count) / kNumRpcs,
720 ::testing::DoubleNear(kWeight80Percent, kErrorTolerance));
721 }
722
723 // Tests that ring hash policy that hashes using a fixed string ensures all
724 // RPCs to go 1 particular backend; and that subsequent hashing policies are
725 // ignored due to the setting of terminal.
TEST_P(RingHashTest,FixedHashingTerminalPolicy)726 TEST_P(RingHashTest, FixedHashingTerminalPolicy) {
727 CreateAndStartBackends(2);
728 auto cluster = default_cluster_;
729 cluster.set_lb_policy(Cluster::RING_HASH);
730 balancer_->ads_service()->SetCdsResource(cluster);
731 auto new_route_config = default_route_config_;
732 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
733 auto* hash_policy = route->mutable_route()->add_hash_policy();
734 hash_policy->mutable_header()->set_header_name("fixed_string");
735 hash_policy->set_terminal(true);
736 auto* hash_policy_to_be_ignored = route->mutable_route()->add_hash_policy();
737 hash_policy_to_be_ignored->mutable_header()->set_header_name("random_string");
738 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
739 new_route_config);
740 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
741 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
742 std::vector<std::pair<std::string, std::string>> metadata = {
743 {"fixed_string", "fixed_value"},
744 {"random_string", absl::StrFormat("%" PRIu32, rand())},
745 };
746 const auto rpc_options =
747 RpcOptions().set_metadata(std::move(metadata)).set_timeout_ms(5000);
748 CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options);
749 bool found = false;
750 for (size_t i = 0; i < backends_.size(); ++i) {
751 if (backends_[i]->backend_service()->request_count() > 0) {
752 EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100)
753 << "backend " << i;
754 EXPECT_FALSE(found) << "backend " << i;
755 found = true;
756 }
757 }
758 EXPECT_TRUE(found);
759 }
760
761 // Test that the channel will go from idle to ready via connecting;
762 // (tho it is not possible to catch the connecting state before moving to
763 // ready)
TEST_P(RingHashTest,IdleToReady)764 TEST_P(RingHashTest, IdleToReady) {
765 CreateAndStartBackends(1);
766 auto cluster = default_cluster_;
767 cluster.set_lb_policy(Cluster::RING_HASH);
768 balancer_->ads_service()->SetCdsResource(cluster);
769 auto new_route_config = default_route_config_;
770 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
771 auto* hash_policy = route->mutable_route()->add_hash_policy();
772 hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
773 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
774 new_route_config);
775 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
776 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
777 EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
778 CheckRpcSendOk(DEBUG_LOCATION, 1, RpcOptions().set_timeout_ms(5000));
779 EXPECT_EQ(GRPC_CHANNEL_READY, channel_->GetState(false));
780 }
781
782 // Test that the channel will transition to READY once it starts
783 // connecting even if there are no RPCs being sent to the picker.
TEST_P(RingHashTest,ContinuesConnectingWithoutPicks)784 TEST_P(RingHashTest, ContinuesConnectingWithoutPicks) {
785 // Create EDS resource.
786 CreateAndStartBackends(1);
787 auto non_existant_endpoint = MakeNonExistantEndpoint();
788 EdsResourceArgs args(
789 {{"locality0", {non_existant_endpoint, CreateEndpoint(0)}}});
790 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
791 // Change CDS resource to use RING_HASH.
792 auto cluster = default_cluster_;
793 cluster.set_lb_policy(Cluster::RING_HASH);
794 balancer_->ads_service()->SetCdsResource(cluster);
795 // Add hash policy to RDS resource.
796 auto new_route_config = default_route_config_;
797 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
798 auto* hash_policy = route->mutable_route()->add_hash_policy();
799 hash_policy->mutable_header()->set_header_name("address_hash");
800 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
801 new_route_config);
802 // Start connection attempt injector and add a hold for the P0
803 // connection attempt.
804 ConnectionAttemptInjector injector;
805 auto hold = injector.AddHold(non_existant_endpoint.port);
806 // A long-running RPC, just used to send the RPC in another thread.
807 LongRunningRpc rpc;
808 std::vector<std::pair<std::string, std::string>> metadata = {
809 {"address_hash",
810 CreateMetadataValueThatHashesToBackendPort(non_existant_endpoint.port)}};
811 rpc.StartRpc(stub_.get(), RpcOptions().set_timeout_ms(0).set_metadata(
812 std::move(metadata)));
813 // Wait for the RPC to trigger the P0 connection attempt, then cancel it,
814 // and then allow the connection attempt to complete.
815 hold->Wait();
816 rpc.CancelRpc();
817 EXPECT_EQ(StatusCode::CANCELLED, rpc.GetStatus().error_code());
818 hold->Resume();
819 // Wait for channel to become connected without any pending RPC.
820 EXPECT_TRUE(channel_->WaitForConnected(grpc_timeout_seconds_to_deadline(5)));
821 // Make sure the backend did not get any requests.
822 EXPECT_EQ(0UL, backends_[0]->backend_service()->request_count());
823 }
824
825 // Tests that when we trigger internal connection attempts without
826 // picks, we do so for only one subchannel at a time.
TEST_P(RingHashTest,ContinuesConnectingWithoutPicksOneSubchannelAtATime)827 TEST_P(RingHashTest, ContinuesConnectingWithoutPicksOneSubchannelAtATime) {
828 // Create EDS resource.
829 CreateAndStartBackends(1);
830 auto non_existant_endpoint0 = MakeNonExistantEndpoint();
831 auto non_existant_endpoint1 = MakeNonExistantEndpoint();
832 auto non_existant_endpoint2 = MakeNonExistantEndpoint();
833 EdsResourceArgs args({{"locality0",
834 {non_existant_endpoint0, non_existant_endpoint1,
835 non_existant_endpoint2, CreateEndpoint(0)}}});
836 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
837 // Change CDS resource to use RING_HASH.
838 auto cluster = default_cluster_;
839 cluster.set_lb_policy(Cluster::RING_HASH);
840 balancer_->ads_service()->SetCdsResource(cluster);
841 // Add hash policy to RDS resource.
842 auto new_route_config = default_route_config_;
843 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
844 auto* hash_policy = route->mutable_route()->add_hash_policy();
845 hash_policy->mutable_header()->set_header_name("address_hash");
846 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
847 new_route_config);
848 // Start connection attempt injector.
849 ConnectionAttemptInjector injector;
850 auto hold_non_existant0 = injector.AddHold(non_existant_endpoint0.port);
851 auto hold_non_existant1 = injector.AddHold(non_existant_endpoint1.port);
852 auto hold_non_existant2 = injector.AddHold(non_existant_endpoint2.port);
853 auto hold_good = injector.AddHold(backends_[0]->port());
854 // A long-running RPC, just used to send the RPC in another thread.
855 LongRunningRpc rpc;
856 std::vector<std::pair<std::string, std::string>> metadata = {
857 {"address_hash", CreateMetadataValueThatHashesToBackendPort(
858 non_existant_endpoint0.port)}};
859 rpc.StartRpc(stub_.get(), RpcOptions().set_timeout_ms(0).set_metadata(
860 std::move(metadata)));
861 // Wait for the RPC to trigger a connection attempt to the first address,
862 // then cancel the RPC. No other connection attempts should be started yet.
863 hold_non_existant0->Wait();
864 rpc.CancelRpc();
865 EXPECT_FALSE(hold_non_existant1->IsStarted());
866 EXPECT_FALSE(hold_non_existant2->IsStarted());
867 EXPECT_FALSE(hold_good->IsStarted());
868 // Allow the connection attempt to the first address to resume and wait
869 // for the attempt for the second address. No other connection
870 // attempts should be started yet.
871 auto hold_non_existant0_again = injector.AddHold(non_existant_endpoint0.port);
872 hold_non_existant0->Resume();
873 hold_non_existant1->Wait();
874 EXPECT_FALSE(hold_non_existant0_again->IsStarted());
875 EXPECT_FALSE(hold_non_existant2->IsStarted());
876 EXPECT_FALSE(hold_good->IsStarted());
877 // Allow the connection attempt to the second address to resume and wait
878 // for the attempt for the third address. No other connection
879 // attempts should be started yet.
880 auto hold_non_existant1_again = injector.AddHold(non_existant_endpoint1.port);
881 hold_non_existant1->Resume();
882 hold_non_existant2->Wait();
883 EXPECT_FALSE(hold_non_existant0_again->IsStarted());
884 EXPECT_FALSE(hold_non_existant1_again->IsStarted());
885 EXPECT_FALSE(hold_good->IsStarted());
886 // Allow the connection attempt to the third address to resume and wait
887 // for the attempt for the final address. No other connection
888 // attempts should be started yet.
889 auto hold_non_existant2_again = injector.AddHold(non_existant_endpoint2.port);
890 hold_non_existant2->Resume();
891 hold_good->Wait();
892 EXPECT_FALSE(hold_non_existant0_again->IsStarted());
893 EXPECT_FALSE(hold_non_existant1_again->IsStarted());
894 EXPECT_FALSE(hold_non_existant2_again->IsStarted());
895 // Allow the final attempt to resume.
896 hold_good->Resume();
897 // Wait for channel to become connected without any pending RPC.
898 EXPECT_TRUE(channel_->WaitForConnected(grpc_timeout_seconds_to_deadline(10)));
899 // No other connection attempts should have been started.
900 EXPECT_FALSE(hold_non_existant0_again->IsStarted());
901 EXPECT_FALSE(hold_non_existant1_again->IsStarted());
902 EXPECT_FALSE(hold_non_existant2_again->IsStarted());
903 // RPC should have been cancelled.
904 EXPECT_EQ(StatusCode::CANCELLED, rpc.GetStatus().error_code());
905 // Make sure the backend did not get any requests.
906 EXPECT_EQ(0UL, backends_[0]->backend_service()->request_count());
907 }
908
909 // Test that when the first pick is down leading to a transient failure, we
910 // will move on to the next ring hash entry.
TEST_P(RingHashTest,TransientFailureCheckNextOne)911 TEST_P(RingHashTest, TransientFailureCheckNextOne) {
912 CreateAndStartBackends(1);
913 auto cluster = default_cluster_;
914 cluster.set_lb_policy(Cluster::RING_HASH);
915 balancer_->ads_service()->SetCdsResource(cluster);
916 auto new_route_config = default_route_config_;
917 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
918 auto* hash_policy = route->mutable_route()->add_hash_policy();
919 hash_policy->mutable_header()->set_header_name("address_hash");
920 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
921 new_route_config);
922 std::vector<EdsResourceArgs::Endpoint> endpoints;
923 const int unused_port = grpc_pick_unused_port_or_die();
924 endpoints.emplace_back(unused_port);
925 endpoints.emplace_back(backends_[0]->port());
926 EdsResourceArgs args({{"locality0", std::move(endpoints)}});
927 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
928 std::vector<std::pair<std::string, std::string>> metadata = {
929 {"address_hash",
930 CreateMetadataValueThatHashesToBackendPort(unused_port)}};
931 const auto rpc_options =
932 RpcOptions().set_metadata(std::move(metadata)).set_timeout_ms(5000);
933 WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
934 WaitForBackendOptions(), rpc_options);
935 CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options);
936 }
937
938 // Test that when a backend goes down, we will move on to the next subchannel
939 // (with a lower priority). When the backend comes back up, traffic will move
940 // back.
TEST_P(RingHashTest,SwitchToLowerPrioirtyAndThenBack)941 TEST_P(RingHashTest, SwitchToLowerPrioirtyAndThenBack) {
942 CreateAndStartBackends(2);
943 auto cluster = default_cluster_;
944 cluster.set_lb_policy(Cluster::RING_HASH);
945 balancer_->ads_service()->SetCdsResource(cluster);
946 auto new_route_config = default_route_config_;
947 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
948 auto* hash_policy = route->mutable_route()->add_hash_policy();
949 hash_policy->mutable_header()->set_header_name("address_hash");
950 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
951 new_route_config);
952 EdsResourceArgs args({
953 {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
954 0},
955 {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
956 1},
957 });
958 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
959 std::vector<std::pair<std::string, std::string>> metadata = {
960 {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
961 const auto rpc_options =
962 RpcOptions().set_metadata(std::move(metadata)).set_timeout_ms(5000);
963 WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
964 WaitForBackendOptions(), rpc_options);
965 backends_[0]->StopListeningAndSendGoaways();
966 WaitForBackend(DEBUG_LOCATION, 1, /*check_status=*/nullptr,
967 WaitForBackendOptions(), rpc_options);
968 ShutdownBackend(0);
969 StartBackend(0);
970 WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
971 WaitForBackendOptions(), rpc_options);
972 CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options);
973 EXPECT_EQ(100, backends_[0]->backend_service()->request_count());
974 EXPECT_EQ(0, backends_[1]->backend_service()->request_count());
975 }
976
977 // Test that when all backends are down, we will keep reattempting.
TEST_P(RingHashTest,ReattemptWhenAllEndpointsUnreachable)978 TEST_P(RingHashTest, ReattemptWhenAllEndpointsUnreachable) {
979 CreateAndStartBackends(1);
980 const uint32_t kConnectionTimeoutMilliseconds = 5000;
981 auto cluster = default_cluster_;
982 cluster.set_lb_policy(Cluster::RING_HASH);
983 balancer_->ads_service()->SetCdsResource(cluster);
984 auto new_route_config = default_route_config_;
985 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
986 auto* hash_policy = route->mutable_route()->add_hash_policy();
987 hash_policy->mutable_header()->set_header_name("address_hash");
988 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
989 new_route_config);
990 EdsResourceArgs args(
991 {{"locality0", {MakeNonExistantEndpoint(), CreateEndpoint(0)}}});
992 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
993 std::vector<std::pair<std::string, std::string>> metadata = {
994 {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
995 EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
996 ShutdownBackend(0);
997 CheckRpcSendFailure(
998 DEBUG_LOCATION, StatusCode::UNAVAILABLE,
999 MakeConnectionFailureRegex(
1000 "ring hash cannot find a connected endpoint; first failure: "),
1001 RpcOptions().set_metadata(std::move(metadata)));
1002 StartBackend(0);
1003 // Ensure we are actively connecting without any traffic.
1004 EXPECT_TRUE(channel_->WaitForConnected(
1005 grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds)));
1006 }
1007
1008 // Test that when all backends are down and then up, we may pick a TF backend
1009 // and we will then jump to ready backend.
TEST_P(RingHashTest,TransientFailureSkipToAvailableReady)1010 TEST_P(RingHashTest, TransientFailureSkipToAvailableReady) {
1011 CreateBackends(2);
1012 const uint32_t kConnectionTimeoutMilliseconds = 5000;
1013 auto cluster = default_cluster_;
1014 cluster.set_lb_policy(Cluster::RING_HASH);
1015 balancer_->ads_service()->SetCdsResource(cluster);
1016 auto new_route_config = default_route_config_;
1017 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
1018 auto* hash_policy = route->mutable_route()->add_hash_policy();
1019 hash_policy->mutable_header()->set_header_name("address_hash");
1020 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
1021 new_route_config);
1022 // Make sure we include some unused ports to fill the ring.
1023 EdsResourceArgs args({
1024 {"locality0",
1025 {CreateEndpoint(0), CreateEndpoint(1), MakeNonExistantEndpoint(),
1026 MakeNonExistantEndpoint()}},
1027 });
1028 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1029 std::vector<std::pair<std::string, std::string>> metadata = {
1030 {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
1031 const auto rpc_options = RpcOptions()
1032 .set_metadata(std::move(metadata))
1033 .set_timeout_ms(kConnectionTimeoutMilliseconds);
1034 EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
1035 gpr_log(GPR_INFO, "=== SENDING FIRST RPC ===");
1036 CheckRpcSendFailure(
1037 DEBUG_LOCATION, StatusCode::UNAVAILABLE,
1038 MakeConnectionFailureRegex(
1039 "ring hash cannot find a connected endpoint; first failure: "),
1040 rpc_options);
1041 gpr_log(GPR_INFO, "=== DONE WITH FIRST RPC ===");
1042 EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel_->GetState(false));
1043 // Bring up backend 0. The channel should become connected without
1044 // any picks, because in TF, we are always trying to connect to at
1045 // least one backend at all times.
1046 gpr_log(GPR_INFO, "=== STARTING BACKEND 0 ===");
1047 StartBackend(0);
1048 gpr_log(GPR_INFO, "=== WAITING FOR CHANNEL TO BECOME READY ===");
1049 EXPECT_TRUE(channel_->WaitForConnected(
1050 grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds)));
1051 // RPCs should go to backend 0.
1052 gpr_log(GPR_INFO, "=== WAITING FOR BACKEND 0 ===");
1053 WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
1054 WaitForBackendOptions(), rpc_options);
1055 EXPECT_EQ(GRPC_CHANNEL_READY, channel_->GetState(false));
1056 // Bring down backend 0 and bring up backend 1.
1057 // Note the RPC contains a header value that will always be hashed to
1058 // backend 0. So by purposely bringing down backend 0 and bringing up another
1059 // backend, this will ensure Picker's first choice of backend 0 will fail
1060 // and it will go through the remaining subchannels to find one in READY.
1061 // Since the the entries in the ring are pretty distributed and we have
1062 // unused ports to fill the ring, it is almost guaranteed that the Picker
1063 // will go through some non-READY entries and skip them as per design.
1064 gpr_log(GPR_INFO, "=== SHUTTING DOWN BACKEND 0 ===");
1065 ShutdownBackend(0);
1066 gpr_log(GPR_INFO, "=== WAITING FOR STATE CHANGE ===");
1067 EXPECT_TRUE(channel_->WaitForStateChange(
1068 GRPC_CHANNEL_READY,
1069 grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds)));
1070 EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel_->GetState(false));
1071 gpr_log(GPR_INFO, "=== SENDING SECOND RPC ===");
1072 CheckRpcSendFailure(
1073 DEBUG_LOCATION, StatusCode::UNAVAILABLE,
1074 MakeConnectionFailureRegex(
1075 "ring hash cannot find a connected endpoint; first failure: "),
1076 rpc_options);
1077 gpr_log(GPR_INFO, "=== STARTING BACKEND 1 ===");
1078 StartBackend(1);
1079 gpr_log(GPR_INFO, "=== WAITING FOR CHANNEL TO BECOME READY ===");
1080 EXPECT_TRUE(channel_->WaitForConnected(
1081 grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds)));
1082 gpr_log(GPR_INFO, "=== WAITING FOR BACKEND 1 ===");
1083 WaitForBackend(DEBUG_LOCATION, 1, /*check_status=*/nullptr,
1084 WaitForBackendOptions(), rpc_options);
1085 gpr_log(GPR_INFO, "=== DONE ===");
1086 }
1087
1088 // This tests a bug seen in the wild where ring_hash started with no
1089 // endpoints and reported TRANSIENT_FAILURE, then got an update with
1090 // endpoints and reported IDLE, but the picker update was squelched, so
1091 // it failed to ever get reconnected.
TEST_P(RingHashTest,ReattemptWhenGoingFromTransientFailureToIdle)1092 TEST_P(RingHashTest, ReattemptWhenGoingFromTransientFailureToIdle) {
1093 CreateAndStartBackends(1);
1094 const uint32_t kConnectionTimeoutMilliseconds = 5000;
1095 auto cluster = default_cluster_;
1096 cluster.set_lb_policy(Cluster::RING_HASH);
1097 balancer_->ads_service()->SetCdsResource(cluster);
1098 auto new_route_config = default_route_config_;
1099 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
1100 new_route_config);
1101 // Send empty EDS update.
1102 EdsResourceArgs args(
1103 {{"locality0", std::vector<EdsResourceArgs::Endpoint>()}});
1104 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1105 EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
1106 // Channel should fail RPCs and go into TRANSIENT_FAILURE.
1107 CheckRpcSendFailure(
1108 DEBUG_LOCATION, StatusCode::UNAVAILABLE,
1109 "empty address list: EDS resource eds_service_name contains empty "
1110 "localities: \\[\\{region=\"xds_default_locality_region\", "
1111 "zone=\"xds_default_locality_zone\", sub_zone=\"locality0\"\\}\\]",
1112 RpcOptions().set_timeout_ms(kConnectionTimeoutMilliseconds));
1113 EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel_->GetState(false));
1114 // Send EDS update with 1 backend.
1115 args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends()}});
1116 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1117 // A wait_for_ready RPC should succeed, and the channel should report READY.
1118 CheckRpcSendOk(DEBUG_LOCATION, 1,
1119 RpcOptions()
1120 .set_timeout_ms(kConnectionTimeoutMilliseconds)
1121 .set_wait_for_ready(true));
1122 EXPECT_EQ(GRPC_CHANNEL_READY, channel_->GetState(false));
1123 }
1124
1125 // Test unspported hash policy types are all ignored before a supported
1126 // policy.
TEST_P(RingHashTest,UnsupportedHashPolicyUntilChannelIdHashing)1127 TEST_P(RingHashTest, UnsupportedHashPolicyUntilChannelIdHashing) {
1128 CreateAndStartBackends(2);
1129 auto cluster = default_cluster_;
1130 cluster.set_lb_policy(Cluster::RING_HASH);
1131 balancer_->ads_service()->SetCdsResource(cluster);
1132 auto new_route_config = default_route_config_;
1133 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
1134 auto* hash_policy_unsupported_1 = route->mutable_route()->add_hash_policy();
1135 hash_policy_unsupported_1->mutable_cookie()->set_name("cookie");
1136 auto* hash_policy_unsupported_2 = route->mutable_route()->add_hash_policy();
1137 hash_policy_unsupported_2->mutable_connection_properties()->set_source_ip(
1138 true);
1139 auto* hash_policy_unsupported_3 = route->mutable_route()->add_hash_policy();
1140 hash_policy_unsupported_3->mutable_query_parameter()->set_name(
1141 "query_parameter");
1142 auto* hash_policy = route->mutable_route()->add_hash_policy();
1143 hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
1144 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
1145 new_route_config);
1146 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
1147 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1148 CheckRpcSendOk(DEBUG_LOCATION, 100, RpcOptions().set_timeout_ms(5000));
1149 bool found = false;
1150 for (size_t i = 0; i < backends_.size(); ++i) {
1151 if (backends_[i]->backend_service()->request_count() > 0) {
1152 EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100)
1153 << "backend " << i;
1154 EXPECT_FALSE(found) << "backend " << i;
1155 found = true;
1156 }
1157 }
1158 EXPECT_TRUE(found);
1159 }
1160
1161 } // namespace
1162 } // namespace testing
1163 } // namespace grpc
1164
main(int argc,char ** argv)1165 int main(int argc, char** argv) {
1166 grpc::testing::TestEnvironment env(&argc, argv);
1167 ::testing::InitGoogleTest(&argc, argv);
1168 // Make the backup poller poll very frequently in order to pick up
1169 // updates from all the subchannels's FDs.
1170 grpc_core::ConfigVars::Overrides overrides;
1171 overrides.client_channel_backup_poll_interval_ms = 1;
1172 grpc_core::ConfigVars::SetOverrides(overrides);
1173 #if TARGET_OS_IPHONE
1174 // Workaround Apple CFStream bug
1175 grpc_core::SetEnv("grpc_cfstream", "0");
1176 #endif
1177 grpc_init();
1178 grpc::testing::ConnectionAttemptInjector::Init();
1179 const auto result = RUN_ALL_TESTS();
1180 grpc_shutdown();
1181 return result;
1182 }
1183