xref: /aosp_15_r20/external/grpc-grpc/test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2017 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 //
15 
16 #include <string>
17 #include <vector>
18 
19 #include <gmock/gmock.h>
20 #include <gtest/gtest.h>
21 
22 #include "absl/strings/str_cat.h"
23 #include "absl/strings/str_format.h"
24 
25 #include <grpc/event_engine/endpoint_config.h>
26 
27 #include "src/core/client_channel/backup_poller.h"
28 #include "src/core/lib/address_utils/sockaddr_utils.h"
29 #include "src/core/lib/config/config_vars.h"
30 #include "src/core/lib/gprpp/env.h"
31 #include "src/core/load_balancing/xds/xds_channel_args.h"
32 #include "src/core/resolver/fake/fake_resolver.h"
33 #include "src/proto/grpc/testing/xds/v3/aggregate_cluster.grpc.pb.h"
34 #include "src/proto/grpc/testing/xds/v3/cluster.grpc.pb.h"
35 #include "test/core/util/resolve_localhost_ip46.h"
36 #include "test/cpp/end2end/connection_attempt_injector.h"
37 #include "test/cpp/end2end/xds/xds_end2end_test_lib.h"
38 
39 namespace grpc {
40 namespace testing {
41 namespace {
42 
43 using ::envoy::config::cluster::v3::CustomClusterType;
44 using ::envoy::config::core::v3::HealthStatus;
45 using ::envoy::extensions::clusters::aggregate::v3::ClusterConfig;
46 
47 class RingHashTest : public XdsEnd2endTest {
48  protected:
SetUp()49   void SetUp() override {
50     logical_dns_cluster_resolver_response_generator_ =
51         grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
52     InitClient();
53     SetUpChannel();
54   }
55 
SetUpChannel(ChannelArguments * args=nullptr)56   void SetUpChannel(ChannelArguments* args = nullptr) {
57     ChannelArguments local_args;
58     if (args == nullptr) args = &local_args;
59     args->SetPointerWithVtable(
60         GRPC_ARG_XDS_LOGICAL_DNS_CLUSTER_FAKE_RESOLVER_RESPONSE_GENERATOR,
61         logical_dns_cluster_resolver_response_generator_.get(),
62         &grpc_core::FakeResolverResponseGenerator::kChannelArgPointerVtable);
63     ResetStub(/*failover_timeout_ms=*/0, args);
64   }
65 
CreateAddressListFromPortList(const std::vector<int> & ports)66   grpc_core::EndpointAddressesList CreateAddressListFromPortList(
67       const std::vector<int>& ports) {
68     grpc_core::EndpointAddressesList addresses;
69     for (int port : ports) {
70       absl::StatusOr<grpc_core::URI> lb_uri =
71           grpc_core::URI::Parse(grpc_core::LocalIpUri(port));
72       GPR_ASSERT(lb_uri.ok());
73       grpc_resolved_address address;
74       GPR_ASSERT(grpc_parse_uri(*lb_uri, &address));
75       addresses.emplace_back(address, grpc_core::ChannelArgs());
76     }
77     return addresses;
78   }
79 
CreateMetadataValueThatHashesToBackendPort(int port)80   std::string CreateMetadataValueThatHashesToBackendPort(int port) {
81     return absl::StrCat(grpc_core::LocalIp(), ":", port, "_0");
82   }
83 
CreateMetadataValueThatHashesToBackend(int index)84   std::string CreateMetadataValueThatHashesToBackend(int index) {
85     return CreateMetadataValueThatHashesToBackendPort(backends_[index]->port());
86   }
87 
88   grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
89       logical_dns_cluster_resolver_response_generator_;
90 };
91 
92 // Run both with and without load reporting, just for test coverage.
93 INSTANTIATE_TEST_SUITE_P(
94     XdsTest, RingHashTest,
95     ::testing::Values(XdsTestType(), XdsTestType().set_enable_load_reporting()),
96     &XdsTestType::Name);
97 
TEST_P(RingHashTest,AggregateClusterFallBackFromRingHashAtStartup)98 TEST_P(RingHashTest, AggregateClusterFallBackFromRingHashAtStartup) {
99   CreateAndStartBackends(2);
100   const char* kNewCluster1Name = "new_cluster_1";
101   const char* kNewEdsService1Name = "new_eds_service_name_1";
102   const char* kNewCluster2Name = "new_cluster_2";
103   const char* kNewEdsService2Name = "new_eds_service_name_2";
104   // Populate new EDS resources.
105   EdsResourceArgs args1({
106       {"locality0", {MakeNonExistantEndpoint(), MakeNonExistantEndpoint()}},
107   });
108   EdsResourceArgs args2({
109       {"locality0", CreateEndpointsForBackends()},
110   });
111   balancer_->ads_service()->SetEdsResource(
112       BuildEdsResource(args1, kNewEdsService1Name));
113   balancer_->ads_service()->SetEdsResource(
114       BuildEdsResource(args2, kNewEdsService2Name));
115   // Populate new CDS resources.
116   Cluster new_cluster1 = default_cluster_;
117   new_cluster1.set_name(kNewCluster1Name);
118   new_cluster1.mutable_eds_cluster_config()->set_service_name(
119       kNewEdsService1Name);
120   new_cluster1.set_lb_policy(Cluster::RING_HASH);
121   balancer_->ads_service()->SetCdsResource(new_cluster1);
122   Cluster new_cluster2 = default_cluster_;
123   new_cluster2.set_name(kNewCluster2Name);
124   new_cluster2.mutable_eds_cluster_config()->set_service_name(
125       kNewEdsService2Name);
126   new_cluster2.set_lb_policy(Cluster::RING_HASH);
127   balancer_->ads_service()->SetCdsResource(new_cluster2);
128   // Create Aggregate Cluster
129   auto cluster = default_cluster_;
130   CustomClusterType* custom_cluster = cluster.mutable_cluster_type();
131   custom_cluster->set_name("envoy.clusters.aggregate");
132   ClusterConfig cluster_config;
133   cluster_config.add_clusters(kNewCluster1Name);
134   cluster_config.add_clusters(kNewCluster2Name);
135   custom_cluster->mutable_typed_config()->PackFrom(cluster_config);
136   balancer_->ads_service()->SetCdsResource(cluster);
137   // Set up route with channel id hashing
138   auto new_route_config = default_route_config_;
139   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
140   auto* hash_policy = route->mutable_route()->add_hash_policy();
141   hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
142   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
143                                    new_route_config);
144   // Verifying that we are using ring hash as only 1 endpoint is receiving all
145   // the traffic.
146   CheckRpcSendOk(DEBUG_LOCATION, 100, RpcOptions().set_timeout_ms(5000));
147   bool found = false;
148   for (size_t i = 0; i < backends_.size(); ++i) {
149     if (backends_[i]->backend_service()->request_count() > 0) {
150       EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100)
151           << "backend " << i;
152       EXPECT_FALSE(found) << "backend " << i;
153       found = true;
154     }
155   }
156   EXPECT_TRUE(found);
157 }
158 
TEST_P(RingHashTest,AggregateClusterFallBackFromRingHashToLogicalDnsAtStartup)159 TEST_P(RingHashTest,
160        AggregateClusterFallBackFromRingHashToLogicalDnsAtStartup) {
161   CreateAndStartBackends(1);
162   const char* kEdsClusterName = "eds_cluster";
163   const char* kLogicalDNSClusterName = "logical_dns_cluster";
164   // Populate EDS resource.
165   EdsResourceArgs args({
166       {"locality0",
167        {MakeNonExistantEndpoint(), MakeNonExistantEndpoint()},
168        kDefaultLocalityWeight,
169        0},
170       {"locality1",
171        {MakeNonExistantEndpoint(), MakeNonExistantEndpoint()},
172        kDefaultLocalityWeight,
173        1},
174   });
175   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
176   // Populate new CDS resources.
177   Cluster eds_cluster = default_cluster_;
178   eds_cluster.set_name(kEdsClusterName);
179   eds_cluster.set_lb_policy(Cluster::RING_HASH);
180   balancer_->ads_service()->SetCdsResource(eds_cluster);
181   // Populate LOGICAL_DNS cluster.
182   auto logical_dns_cluster = default_cluster_;
183   logical_dns_cluster.set_name(kLogicalDNSClusterName);
184   logical_dns_cluster.set_type(Cluster::LOGICAL_DNS);
185   auto* address = logical_dns_cluster.mutable_load_assignment()
186                       ->add_endpoints()
187                       ->add_lb_endpoints()
188                       ->mutable_endpoint()
189                       ->mutable_address()
190                       ->mutable_socket_address();
191   address->set_address(kServerName);
192   address->set_port_value(443);
193   balancer_->ads_service()->SetCdsResource(logical_dns_cluster);
194   // Create Aggregate Cluster
195   auto cluster = default_cluster_;
196   CustomClusterType* custom_cluster = cluster.mutable_cluster_type();
197   custom_cluster->set_name("envoy.clusters.aggregate");
198   ClusterConfig cluster_config;
199   cluster_config.add_clusters(kEdsClusterName);
200   cluster_config.add_clusters(kLogicalDNSClusterName);
201   custom_cluster->mutable_typed_config()->PackFrom(cluster_config);
202   balancer_->ads_service()->SetCdsResource(cluster);
203   // Set up route with channel id hashing
204   auto new_route_config = default_route_config_;
205   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
206   auto* hash_policy = route->mutable_route()->add_hash_policy();
207   hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
208   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
209                                    new_route_config);
210   // Set Logical DNS result
211   {
212     grpc_core::ExecCtx exec_ctx;
213     grpc_core::Resolver::Result result;
214     result.addresses = CreateAddressListFromPortList(GetBackendPorts());
215     logical_dns_cluster_resolver_response_generator_->SetResponseSynchronously(
216         std::move(result));
217   }
218   // Inject connection delay to make this act more realistically.
219   ConnectionAttemptInjector injector;
220   injector.SetDelay(grpc_core::Duration::Milliseconds(500) *
221                     grpc_test_slowdown_factor());
222   // Send RPC.  Need the timeout to be long enough to account for the
223   // subchannel connection delays.
224   CheckRpcSendOk(DEBUG_LOCATION, 1, RpcOptions().set_timeout_ms(5000));
225 }
226 
TEST_P(RingHashTest,AggregateClusterFallBackFromRingHashToLogicalDnsAtStartupNoFailedRpcs)227 TEST_P(RingHashTest,
228        AggregateClusterFallBackFromRingHashToLogicalDnsAtStartupNoFailedRpcs) {
229   CreateAndStartBackends(1);
230   const char* kEdsClusterName = "eds_cluster";
231   const char* kLogicalDNSClusterName = "logical_dns_cluster";
232   // Populate EDS resource.
233   EdsResourceArgs args({
234       {"locality0",
235        {MakeNonExistantEndpoint(), MakeNonExistantEndpoint()},
236        kDefaultLocalityWeight,
237        0},
238       {"locality1",
239        {MakeNonExistantEndpoint(), MakeNonExistantEndpoint()},
240        kDefaultLocalityWeight,
241        1},
242   });
243   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
244   // Populate new CDS resources.
245   Cluster eds_cluster = default_cluster_;
246   eds_cluster.set_name(kEdsClusterName);
247   eds_cluster.set_lb_policy(Cluster::RING_HASH);
248   balancer_->ads_service()->SetCdsResource(eds_cluster);
249   // Populate LOGICAL_DNS cluster.
250   auto logical_dns_cluster = default_cluster_;
251   logical_dns_cluster.set_name(kLogicalDNSClusterName);
252   logical_dns_cluster.set_type(Cluster::LOGICAL_DNS);
253   auto* address = logical_dns_cluster.mutable_load_assignment()
254                       ->add_endpoints()
255                       ->add_lb_endpoints()
256                       ->mutable_endpoint()
257                       ->mutable_address()
258                       ->mutable_socket_address();
259   address->set_address(kServerName);
260   address->set_port_value(443);
261   balancer_->ads_service()->SetCdsResource(logical_dns_cluster);
262   // Create Aggregate Cluster
263   auto cluster = default_cluster_;
264   CustomClusterType* custom_cluster = cluster.mutable_cluster_type();
265   custom_cluster->set_name("envoy.clusters.aggregate");
266   ClusterConfig cluster_config;
267   cluster_config.add_clusters(kEdsClusterName);
268   cluster_config.add_clusters(kLogicalDNSClusterName);
269   custom_cluster->mutable_typed_config()->PackFrom(cluster_config);
270   balancer_->ads_service()->SetCdsResource(cluster);
271   // Set up route with channel id hashing
272   auto new_route_config = default_route_config_;
273   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
274   auto* hash_policy = route->mutable_route()->add_hash_policy();
275   hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
276   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
277                                    new_route_config);
278   // Set Logical DNS result
279   {
280     grpc_core::ExecCtx exec_ctx;
281     grpc_core::Resolver::Result result;
282     result.addresses = CreateAddressListFromPortList(GetBackendPorts());
283     logical_dns_cluster_resolver_response_generator_->SetResponseSynchronously(
284         std::move(result));
285   }
286   // Set up connection attempt injector.
287   ConnectionAttemptInjector injector;
288   auto hold = injector.AddHold(backends_[0]->port());
289   // Increase subchannel backoff time, so that subchannels stay in
290   // TRANSIENT_FAILURE for long enough to trigger potential problems.
291   ChannelArguments channel_args;
292   channel_args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS,
293                       10000 * grpc_test_slowdown_factor());
294   SetUpChannel(&channel_args);
295   // Start an RPC in the background.
296   LongRunningRpc rpc;
297   rpc.StartRpc(stub_.get(), RpcOptions().set_timeout_ms(5000));
298   // Wait for connection attempt to the backend.
299   hold->Wait();
300   // Channel should report CONNECTING here, and any RPC should be queued.
301   EXPECT_EQ(channel_->GetState(false), GRPC_CHANNEL_CONNECTING);
302   // Start a second RPC at this point, which should be queued as well.
303   // This will fail if the priority policy fails to update the picker to
304   // point to the LOGICAL_DNS child; if it leaves it pointing to the EDS
305   // priority 1, then the RPC will fail, because all subchannels are in
306   // TRANSIENT_FAILURE.
307   // Note that sending only the first RPC does not catch this case,
308   // because if the priority policy fails to update the picker, then the
309   // pick for the first RPC will not be retried.
310   LongRunningRpc rpc2;
311   rpc2.StartRpc(stub_.get(), RpcOptions().set_timeout_ms(5000));
312   // Allow the connection attempt to complete.
313   hold->Resume();
314   // Now the RPCs should complete successfully.
315   gpr_log(GPR_INFO, "=== WAITING FOR FIRST RPC TO FINISH ===");
316   Status status = rpc.GetStatus();
317   gpr_log(GPR_INFO, "=== FIRST RPC FINISHED ===");
318   EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
319                            << " message=" << status.error_message();
320   gpr_log(GPR_INFO, "=== WAITING FOR SECOND RPC TO FINISH ===");
321   status = rpc2.GetStatus();
322   gpr_log(GPR_INFO, "=== SECOND RPC FINISHED ===");
323   EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
324                            << " message=" << status.error_message();
325 }
326 
327 // Tests that ring hash policy that hashes using channel id ensures all RPCs
328 // to go 1 particular backend.
TEST_P(RingHashTest,ChannelIdHashing)329 TEST_P(RingHashTest, ChannelIdHashing) {
330   CreateAndStartBackends(4);
331   auto cluster = default_cluster_;
332   cluster.set_lb_policy(Cluster::RING_HASH);
333   balancer_->ads_service()->SetCdsResource(cluster);
334   auto new_route_config = default_route_config_;
335   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
336   auto* hash_policy = route->mutable_route()->add_hash_policy();
337   hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
338   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
339                                    new_route_config);
340   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
341   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
342   CheckRpcSendOk(DEBUG_LOCATION, 100, RpcOptions().set_timeout_ms(5000));
343   bool found = false;
344   for (size_t i = 0; i < backends_.size(); ++i) {
345     if (backends_[i]->backend_service()->request_count() > 0) {
346       EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100)
347           << "backend " << i;
348       EXPECT_FALSE(found) << "backend " << i;
349       found = true;
350     }
351   }
352   EXPECT_TRUE(found);
353 }
354 
355 // Tests that ring hash policy that hashes using a header value can spread
356 // RPCs across all the backends.
TEST_P(RingHashTest,HeaderHashing)357 TEST_P(RingHashTest, HeaderHashing) {
358   CreateAndStartBackends(4);
359   auto cluster = default_cluster_;
360   cluster.set_lb_policy(Cluster::RING_HASH);
361   balancer_->ads_service()->SetCdsResource(cluster);
362   auto new_route_config = default_route_config_;
363   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
364   auto* hash_policy = route->mutable_route()->add_hash_policy();
365   hash_policy->mutable_header()->set_header_name("address_hash");
366   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
367                                    new_route_config);
368   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
369   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
370   // Note each type of RPC will contains a header value that will always be
371   // hashed to a specific backend as the header value matches the value used
372   // to create the entry in the ring.
373   std::vector<std::pair<std::string, std::string>> metadata = {
374       {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
375   std::vector<std::pair<std::string, std::string>> metadata1 = {
376       {"address_hash", CreateMetadataValueThatHashesToBackend(1)}};
377   std::vector<std::pair<std::string, std::string>> metadata2 = {
378       {"address_hash", CreateMetadataValueThatHashesToBackend(2)}};
379   std::vector<std::pair<std::string, std::string>> metadata3 = {
380       {"address_hash", CreateMetadataValueThatHashesToBackend(3)}};
381   const auto rpc_options =
382       RpcOptions().set_metadata(std::move(metadata)).set_timeout_ms(5000);
383   const auto rpc_options1 =
384       RpcOptions().set_metadata(std::move(metadata1)).set_timeout_ms(5000);
385   const auto rpc_options2 =
386       RpcOptions().set_metadata(std::move(metadata2)).set_timeout_ms(5000);
387   const auto rpc_options3 =
388       RpcOptions().set_metadata(std::move(metadata3)).set_timeout_ms(5000);
389   WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
390                  WaitForBackendOptions(), rpc_options);
391   WaitForBackend(DEBUG_LOCATION, 1, /*check_status=*/nullptr,
392                  WaitForBackendOptions(), rpc_options1);
393   WaitForBackend(DEBUG_LOCATION, 2, /*check_status=*/nullptr,
394                  WaitForBackendOptions(), rpc_options2);
395   WaitForBackend(DEBUG_LOCATION, 3, /*check_status=*/nullptr,
396                  WaitForBackendOptions(), rpc_options3);
397   CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options);
398   CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options1);
399   CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options2);
400   CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options3);
401   for (size_t i = 0; i < backends_.size(); ++i) {
402     EXPECT_EQ(100, backends_[i]->backend_service()->request_count());
403   }
404 }
405 
406 // Tests that ring hash policy that hashes using a header value and regex
407 // rewrite to aggregate RPCs to 1 backend.
TEST_P(RingHashTest,HeaderHashingWithRegexRewrite)408 TEST_P(RingHashTest, HeaderHashingWithRegexRewrite) {
409   CreateAndStartBackends(4);
410   auto cluster = default_cluster_;
411   cluster.set_lb_policy(Cluster::RING_HASH);
412   balancer_->ads_service()->SetCdsResource(cluster);
413   auto new_route_config = default_route_config_;
414   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
415   auto* hash_policy = route->mutable_route()->add_hash_policy();
416   hash_policy->mutable_header()->set_header_name("address_hash");
417   hash_policy->mutable_header()
418       ->mutable_regex_rewrite()
419       ->mutable_pattern()
420       ->set_regex("[0-9]+");
421   hash_policy->mutable_header()->mutable_regex_rewrite()->set_substitution(
422       "foo");
423   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
424                                    new_route_config);
425   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
426   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
427   std::vector<std::pair<std::string, std::string>> metadata = {
428       {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
429   std::vector<std::pair<std::string, std::string>> metadata1 = {
430       {"address_hash", CreateMetadataValueThatHashesToBackend(1)}};
431   std::vector<std::pair<std::string, std::string>> metadata2 = {
432       {"address_hash", CreateMetadataValueThatHashesToBackend(2)}};
433   std::vector<std::pair<std::string, std::string>> metadata3 = {
434       {"address_hash", CreateMetadataValueThatHashesToBackend(3)}};
435   const auto rpc_options =
436       RpcOptions().set_metadata(std::move(metadata)).set_timeout_ms(5000);
437   const auto rpc_options1 =
438       RpcOptions().set_metadata(std::move(metadata1)).set_timeout_ms(5000);
439   const auto rpc_options2 =
440       RpcOptions().set_metadata(std::move(metadata2)).set_timeout_ms(5000);
441   const auto rpc_options3 =
442       RpcOptions().set_metadata(std::move(metadata3)).set_timeout_ms(5000);
443   CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options);
444   CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options1);
445   CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options2);
446   CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options3);
447   bool found = false;
448   for (size_t i = 0; i < backends_.size(); ++i) {
449     if (backends_[i]->backend_service()->request_count() > 0) {
450       EXPECT_EQ(backends_[i]->backend_service()->request_count(), 400)
451           << "backend " << i;
452       EXPECT_FALSE(found) << "backend " << i;
453       found = true;
454     }
455   }
456   EXPECT_TRUE(found);
457 }
458 
459 // Tests that ring hash policy that hashes using a random value.
TEST_P(RingHashTest,NoHashPolicy)460 TEST_P(RingHashTest, NoHashPolicy) {
461   CreateAndStartBackends(2);
462   const double kDistribution50Percent = 0.5;
463   const double kErrorTolerance = 0.05;
464   const uint32_t kRpcTimeoutMs = 10000;
465   const size_t kNumRpcs =
466       ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
467   auto cluster = default_cluster_;
468   // Increasing min ring size for random distribution.
469   cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
470       100000);
471   cluster.set_lb_policy(Cluster::RING_HASH);
472   balancer_->ads_service()->SetCdsResource(cluster);
473   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
474   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
475   // TODO(donnadionne): remove extended timeout after ring creation
476   // optimization.
477   WaitForAllBackends(DEBUG_LOCATION, 0, 2, /*check_status=*/nullptr,
478                      WaitForBackendOptions().set_timeout_ms(kRpcTimeoutMs),
479                      RpcOptions().set_timeout_ms(kRpcTimeoutMs));
480   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
481   const int request_count_1 = backends_[0]->backend_service()->request_count();
482   const int request_count_2 = backends_[1]->backend_service()->request_count();
483   EXPECT_THAT(static_cast<double>(request_count_1) / kNumRpcs,
484               ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
485   EXPECT_THAT(static_cast<double>(request_count_2) / kNumRpcs,
486               ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
487 }
488 
489 // Tests that we observe endpoint weights.
TEST_P(RingHashTest,EndpointWeights)490 TEST_P(RingHashTest, EndpointWeights) {
491   CreateAndStartBackends(3);
492   const double kDistribution50Percent = 0.5;
493   const double kDistribution25Percent = 0.25;
494   const double kErrorTolerance = 0.05;
495   const uint32_t kRpcTimeoutMs = 10000;
496   const size_t kNumRpcs =
497       ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
498   auto cluster = default_cluster_;
499   // Increasing min ring size for random distribution.
500   cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
501       100000);
502   cluster.set_lb_policy(Cluster::RING_HASH);
503   balancer_->ads_service()->SetCdsResource(cluster);
504   // Endpoint 0 has weight 0, will be treated as weight 1.
505   // Endpoint 1 has weight 1.
506   // Endpoint 2 has weight 2.
507   EdsResourceArgs args(
508       {{"locality0",
509         {CreateEndpoint(0, ::envoy::config::core::v3::HealthStatus::UNKNOWN, 0),
510          CreateEndpoint(1, ::envoy::config::core::v3::HealthStatus::UNKNOWN, 1),
511          CreateEndpoint(2, ::envoy::config::core::v3::HealthStatus::UNKNOWN,
512                         2)}}});
513   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
514   // TODO(donnadionne): remove extended timeout after ring creation
515   // optimization.
516   WaitForAllBackends(DEBUG_LOCATION, 0, 3, /*check_status=*/nullptr,
517                      WaitForBackendOptions().set_timeout_ms(kRpcTimeoutMs),
518                      RpcOptions().set_timeout_ms(kRpcTimeoutMs));
519   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
520   // Endpoint 2 should see 50% of traffic, and endpoints 0 and 1 should
521   // each see 25% of traffic.
522   const int request_count_0 = backends_[0]->backend_service()->request_count();
523   const int request_count_1 = backends_[1]->backend_service()->request_count();
524   const int request_count_2 = backends_[2]->backend_service()->request_count();
525   EXPECT_THAT(static_cast<double>(request_count_0) / kNumRpcs,
526               ::testing::DoubleNear(kDistribution25Percent, kErrorTolerance));
527   EXPECT_THAT(static_cast<double>(request_count_1) / kNumRpcs,
528               ::testing::DoubleNear(kDistribution25Percent, kErrorTolerance));
529   EXPECT_THAT(static_cast<double>(request_count_2) / kNumRpcs,
530               ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
531 }
532 
533 // Test that ring hash policy evaluation will continue past the terminal
534 // policy if no results are produced yet.
TEST_P(RingHashTest,ContinuesPastTerminalPolicyThatDoesNotProduceResult)535 TEST_P(RingHashTest, ContinuesPastTerminalPolicyThatDoesNotProduceResult) {
536   CreateAndStartBackends(2);
537   auto cluster = default_cluster_;
538   cluster.set_lb_policy(Cluster::RING_HASH);
539   balancer_->ads_service()->SetCdsResource(cluster);
540   auto new_route_config = default_route_config_;
541   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
542   auto* hash_policy = route->mutable_route()->add_hash_policy();
543   hash_policy->mutable_header()->set_header_name("header_not_present");
544   hash_policy->set_terminal(true);
545   auto* hash_policy2 = route->mutable_route()->add_hash_policy();
546   hash_policy2->mutable_header()->set_header_name("address_hash");
547   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
548                                    new_route_config);
549   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
550   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
551   std::vector<std::pair<std::string, std::string>> metadata = {
552       {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
553   const auto rpc_options =
554       RpcOptions().set_metadata(std::move(metadata)).set_timeout_ms(5000);
555   CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options);
556   EXPECT_EQ(backends_[0]->backend_service()->request_count(), 100);
557   EXPECT_EQ(backends_[1]->backend_service()->request_count(), 0);
558 }
559 
560 // Test random hash is used when header hashing specified a header field that
561 // the RPC did not have.
TEST_P(RingHashTest,HashOnHeaderThatIsNotPresent)562 TEST_P(RingHashTest, HashOnHeaderThatIsNotPresent) {
563   CreateAndStartBackends(2);
564   const double kDistribution50Percent = 0.5;
565   const double kErrorTolerance = 0.05;
566   const uint32_t kRpcTimeoutMs = 10000;
567   const size_t kNumRpcs =
568       ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
569   auto cluster = default_cluster_;
570   // Increasing min ring size for random distribution.
571   cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
572       100000);
573   cluster.set_lb_policy(Cluster::RING_HASH);
574   balancer_->ads_service()->SetCdsResource(cluster);
575   auto new_route_config = default_route_config_;
576   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
577   auto* hash_policy = route->mutable_route()->add_hash_policy();
578   hash_policy->mutable_header()->set_header_name("header_not_present");
579   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
580                                    new_route_config);
581   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
582   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
583   std::vector<std::pair<std::string, std::string>> metadata = {
584       {"unmatched_header", absl::StrFormat("%" PRIu32, rand())},
585   };
586   const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
587   // TODO(donnadionne): remove extended timeout after ring creation
588   // optimization.
589   WaitForAllBackends(DEBUG_LOCATION, 0, 2, /*check_status=*/nullptr,
590                      WaitForBackendOptions().set_timeout_ms(kRpcTimeoutMs),
591                      RpcOptions().set_timeout_ms(kRpcTimeoutMs));
592   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs, rpc_options);
593   const int request_count_1 = backends_[0]->backend_service()->request_count();
594   const int request_count_2 = backends_[1]->backend_service()->request_count();
595   EXPECT_THAT(static_cast<double>(request_count_1) / kNumRpcs,
596               ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
597   EXPECT_THAT(static_cast<double>(request_count_2) / kNumRpcs,
598               ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
599 }
600 
601 // Test random hash is used when only unsupported hash policies are
602 // configured.
TEST_P(RingHashTest,UnsupportedHashPolicyDefaultToRandomHashing)603 TEST_P(RingHashTest, UnsupportedHashPolicyDefaultToRandomHashing) {
604   CreateAndStartBackends(2);
605   const double kDistribution50Percent = 0.5;
606   const double kErrorTolerance = 0.05;
607   const uint32_t kRpcTimeoutMs = 10000;
608   const size_t kNumRpcs =
609       ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
610   auto cluster = default_cluster_;
611   // Increasing min ring size for random distribution.
612   cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
613       100000);
614   cluster.set_lb_policy(Cluster::RING_HASH);
615   balancer_->ads_service()->SetCdsResource(cluster);
616   auto new_route_config = default_route_config_;
617   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
618   auto* hash_policy_unsupported_1 = route->mutable_route()->add_hash_policy();
619   hash_policy_unsupported_1->mutable_cookie()->set_name("cookie");
620   auto* hash_policy_unsupported_2 = route->mutable_route()->add_hash_policy();
621   hash_policy_unsupported_2->mutable_connection_properties()->set_source_ip(
622       true);
623   auto* hash_policy_unsupported_3 = route->mutable_route()->add_hash_policy();
624   hash_policy_unsupported_3->mutable_query_parameter()->set_name(
625       "query_parameter");
626   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
627                                    new_route_config);
628   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
629   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
630   // TODO(donnadionne): remove extended timeout after ring creation
631   // optimization.
632   WaitForAllBackends(DEBUG_LOCATION, 0, 2, /*check_status=*/nullptr,
633                      WaitForBackendOptions().set_timeout_ms(kRpcTimeoutMs),
634                      RpcOptions().set_timeout_ms(kRpcTimeoutMs));
635   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
636   const int request_count_1 = backends_[0]->backend_service()->request_count();
637   const int request_count_2 = backends_[1]->backend_service()->request_count();
638   EXPECT_THAT(static_cast<double>(request_count_1) / kNumRpcs,
639               ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
640   EXPECT_THAT(static_cast<double>(request_count_2) / kNumRpcs,
641               ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
642 }
643 
644 // Tests that ring hash policy that hashes using a random value can spread
645 // RPCs across all the backends according to locality weight.
TEST_P(RingHashTest,RandomHashingDistributionAccordingToEndpointWeight)646 TEST_P(RingHashTest, RandomHashingDistributionAccordingToEndpointWeight) {
647   CreateAndStartBackends(2);
648   const size_t kWeight1 = 1;
649   const size_t kWeight2 = 2;
650   const size_t kWeightTotal = kWeight1 + kWeight2;
651   const double kWeight33Percent = static_cast<double>(kWeight1) / kWeightTotal;
652   const double kWeight66Percent = static_cast<double>(kWeight2) / kWeightTotal;
653   const double kErrorTolerance = 0.05;
654   const uint32_t kRpcTimeoutMs = 10000;
655   const size_t kNumRpcs =
656       ComputeIdealNumRpcs(kWeight33Percent, kErrorTolerance);
657   auto cluster = default_cluster_;
658   // Increasing min ring size for random distribution.
659   cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
660       100000);
661   cluster.set_lb_policy(Cluster::RING_HASH);
662   balancer_->ads_service()->SetCdsResource(cluster);
663   EdsResourceArgs args({{"locality0",
664                          {CreateEndpoint(0, HealthStatus::UNKNOWN, 1),
665                           CreateEndpoint(1, HealthStatus::UNKNOWN, 2)}}});
666   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
667   // TODO(donnadionne): remove extended timeout after ring creation
668   // optimization.
669   WaitForAllBackends(DEBUG_LOCATION, 0, 2, /*check_status=*/nullptr,
670                      WaitForBackendOptions().set_timeout_ms(kRpcTimeoutMs),
671                      RpcOptions().set_timeout_ms(kRpcTimeoutMs));
672   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
673   const int weight_33_request_count =
674       backends_[0]->backend_service()->request_count();
675   const int weight_66_request_count =
676       backends_[1]->backend_service()->request_count();
677   EXPECT_THAT(static_cast<double>(weight_33_request_count) / kNumRpcs,
678               ::testing::DoubleNear(kWeight33Percent, kErrorTolerance));
679   EXPECT_THAT(static_cast<double>(weight_66_request_count) / kNumRpcs,
680               ::testing::DoubleNear(kWeight66Percent, kErrorTolerance));
681 }
682 
683 // Tests that ring hash policy that hashes using a random value can spread
684 // RPCs across all the backends according to locality weight.
TEST_P(RingHashTest,RandomHashingDistributionAccordingToLocalityAndEndpointWeight)685 TEST_P(RingHashTest,
686        RandomHashingDistributionAccordingToLocalityAndEndpointWeight) {
687   CreateAndStartBackends(2);
688   const size_t kWeight1 = 1 * 1;
689   const size_t kWeight2 = 2 * 2;
690   const size_t kWeightTotal = kWeight1 + kWeight2;
691   const double kWeight20Percent = static_cast<double>(kWeight1) / kWeightTotal;
692   const double kWeight80Percent = static_cast<double>(kWeight2) / kWeightTotal;
693   const double kErrorTolerance = 0.05;
694   const uint32_t kRpcTimeoutMs = 10000;
695   const size_t kNumRpcs =
696       ComputeIdealNumRpcs(kWeight20Percent, kErrorTolerance);
697   auto cluster = default_cluster_;
698   // Increasing min ring size for random distribution.
699   cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
700       100000);
701   cluster.set_lb_policy(Cluster::RING_HASH);
702   balancer_->ads_service()->SetCdsResource(cluster);
703   EdsResourceArgs args(
704       {{"locality0", {CreateEndpoint(0, HealthStatus::UNKNOWN, 1)}, 1},
705        {"locality1", {CreateEndpoint(1, HealthStatus::UNKNOWN, 2)}, 2}});
706   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
707   // TODO(donnadionne): remove extended timeout after ring creation
708   // optimization.
709   WaitForAllBackends(DEBUG_LOCATION, 0, 2, /*check_status=*/nullptr,
710                      WaitForBackendOptions().set_timeout_ms(kRpcTimeoutMs),
711                      RpcOptions().set_timeout_ms(kRpcTimeoutMs));
712   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
713   const int weight_20_request_count =
714       backends_[0]->backend_service()->request_count();
715   const int weight_80_request_count =
716       backends_[1]->backend_service()->request_count();
717   EXPECT_THAT(static_cast<double>(weight_20_request_count) / kNumRpcs,
718               ::testing::DoubleNear(kWeight20Percent, kErrorTolerance));
719   EXPECT_THAT(static_cast<double>(weight_80_request_count) / kNumRpcs,
720               ::testing::DoubleNear(kWeight80Percent, kErrorTolerance));
721 }
722 
723 // Tests that ring hash policy that hashes using a fixed string ensures all
724 // RPCs to go 1 particular backend; and that subsequent hashing policies are
725 // ignored due to the setting of terminal.
TEST_P(RingHashTest,FixedHashingTerminalPolicy)726 TEST_P(RingHashTest, FixedHashingTerminalPolicy) {
727   CreateAndStartBackends(2);
728   auto cluster = default_cluster_;
729   cluster.set_lb_policy(Cluster::RING_HASH);
730   balancer_->ads_service()->SetCdsResource(cluster);
731   auto new_route_config = default_route_config_;
732   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
733   auto* hash_policy = route->mutable_route()->add_hash_policy();
734   hash_policy->mutable_header()->set_header_name("fixed_string");
735   hash_policy->set_terminal(true);
736   auto* hash_policy_to_be_ignored = route->mutable_route()->add_hash_policy();
737   hash_policy_to_be_ignored->mutable_header()->set_header_name("random_string");
738   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
739                                    new_route_config);
740   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
741   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
742   std::vector<std::pair<std::string, std::string>> metadata = {
743       {"fixed_string", "fixed_value"},
744       {"random_string", absl::StrFormat("%" PRIu32, rand())},
745   };
746   const auto rpc_options =
747       RpcOptions().set_metadata(std::move(metadata)).set_timeout_ms(5000);
748   CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options);
749   bool found = false;
750   for (size_t i = 0; i < backends_.size(); ++i) {
751     if (backends_[i]->backend_service()->request_count() > 0) {
752       EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100)
753           << "backend " << i;
754       EXPECT_FALSE(found) << "backend " << i;
755       found = true;
756     }
757   }
758   EXPECT_TRUE(found);
759 }
760 
761 // Test that the channel will go from idle to ready via connecting;
762 // (tho it is not possible to catch the connecting state before moving to
763 // ready)
TEST_P(RingHashTest,IdleToReady)764 TEST_P(RingHashTest, IdleToReady) {
765   CreateAndStartBackends(1);
766   auto cluster = default_cluster_;
767   cluster.set_lb_policy(Cluster::RING_HASH);
768   balancer_->ads_service()->SetCdsResource(cluster);
769   auto new_route_config = default_route_config_;
770   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
771   auto* hash_policy = route->mutable_route()->add_hash_policy();
772   hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
773   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
774                                    new_route_config);
775   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
776   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
777   EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
778   CheckRpcSendOk(DEBUG_LOCATION, 1, RpcOptions().set_timeout_ms(5000));
779   EXPECT_EQ(GRPC_CHANNEL_READY, channel_->GetState(false));
780 }
781 
782 // Test that the channel will transition to READY once it starts
783 // connecting even if there are no RPCs being sent to the picker.
TEST_P(RingHashTest,ContinuesConnectingWithoutPicks)784 TEST_P(RingHashTest, ContinuesConnectingWithoutPicks) {
785   // Create EDS resource.
786   CreateAndStartBackends(1);
787   auto non_existant_endpoint = MakeNonExistantEndpoint();
788   EdsResourceArgs args(
789       {{"locality0", {non_existant_endpoint, CreateEndpoint(0)}}});
790   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
791   // Change CDS resource to use RING_HASH.
792   auto cluster = default_cluster_;
793   cluster.set_lb_policy(Cluster::RING_HASH);
794   balancer_->ads_service()->SetCdsResource(cluster);
795   // Add hash policy to RDS resource.
796   auto new_route_config = default_route_config_;
797   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
798   auto* hash_policy = route->mutable_route()->add_hash_policy();
799   hash_policy->mutable_header()->set_header_name("address_hash");
800   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
801                                    new_route_config);
802   // Start connection attempt injector and add a hold for the P0
803   // connection attempt.
804   ConnectionAttemptInjector injector;
805   auto hold = injector.AddHold(non_existant_endpoint.port);
806   // A long-running RPC, just used to send the RPC in another thread.
807   LongRunningRpc rpc;
808   std::vector<std::pair<std::string, std::string>> metadata = {
809       {"address_hash",
810        CreateMetadataValueThatHashesToBackendPort(non_existant_endpoint.port)}};
811   rpc.StartRpc(stub_.get(), RpcOptions().set_timeout_ms(0).set_metadata(
812                                 std::move(metadata)));
813   // Wait for the RPC to trigger the P0 connection attempt, then cancel it,
814   // and then allow the connection attempt to complete.
815   hold->Wait();
816   rpc.CancelRpc();
817   EXPECT_EQ(StatusCode::CANCELLED, rpc.GetStatus().error_code());
818   hold->Resume();
819   // Wait for channel to become connected without any pending RPC.
820   EXPECT_TRUE(channel_->WaitForConnected(grpc_timeout_seconds_to_deadline(5)));
821   // Make sure the backend did not get any requests.
822   EXPECT_EQ(0UL, backends_[0]->backend_service()->request_count());
823 }
824 
825 // Tests that when we trigger internal connection attempts without
826 // picks, we do so for only one subchannel at a time.
TEST_P(RingHashTest,ContinuesConnectingWithoutPicksOneSubchannelAtATime)827 TEST_P(RingHashTest, ContinuesConnectingWithoutPicksOneSubchannelAtATime) {
828   // Create EDS resource.
829   CreateAndStartBackends(1);
830   auto non_existant_endpoint0 = MakeNonExistantEndpoint();
831   auto non_existant_endpoint1 = MakeNonExistantEndpoint();
832   auto non_existant_endpoint2 = MakeNonExistantEndpoint();
833   EdsResourceArgs args({{"locality0",
834                          {non_existant_endpoint0, non_existant_endpoint1,
835                           non_existant_endpoint2, CreateEndpoint(0)}}});
836   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
837   // Change CDS resource to use RING_HASH.
838   auto cluster = default_cluster_;
839   cluster.set_lb_policy(Cluster::RING_HASH);
840   balancer_->ads_service()->SetCdsResource(cluster);
841   // Add hash policy to RDS resource.
842   auto new_route_config = default_route_config_;
843   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
844   auto* hash_policy = route->mutable_route()->add_hash_policy();
845   hash_policy->mutable_header()->set_header_name("address_hash");
846   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
847                                    new_route_config);
848   // Start connection attempt injector.
849   ConnectionAttemptInjector injector;
850   auto hold_non_existant0 = injector.AddHold(non_existant_endpoint0.port);
851   auto hold_non_existant1 = injector.AddHold(non_existant_endpoint1.port);
852   auto hold_non_existant2 = injector.AddHold(non_existant_endpoint2.port);
853   auto hold_good = injector.AddHold(backends_[0]->port());
854   // A long-running RPC, just used to send the RPC in another thread.
855   LongRunningRpc rpc;
856   std::vector<std::pair<std::string, std::string>> metadata = {
857       {"address_hash", CreateMetadataValueThatHashesToBackendPort(
858                            non_existant_endpoint0.port)}};
859   rpc.StartRpc(stub_.get(), RpcOptions().set_timeout_ms(0).set_metadata(
860                                 std::move(metadata)));
861   // Wait for the RPC to trigger a connection attempt to the first address,
862   // then cancel the RPC.  No other connection attempts should be started yet.
863   hold_non_existant0->Wait();
864   rpc.CancelRpc();
865   EXPECT_FALSE(hold_non_existant1->IsStarted());
866   EXPECT_FALSE(hold_non_existant2->IsStarted());
867   EXPECT_FALSE(hold_good->IsStarted());
868   // Allow the connection attempt to the first address to resume and wait
869   // for the attempt for the second address.  No other connection
870   // attempts should be started yet.
871   auto hold_non_existant0_again = injector.AddHold(non_existant_endpoint0.port);
872   hold_non_existant0->Resume();
873   hold_non_existant1->Wait();
874   EXPECT_FALSE(hold_non_existant0_again->IsStarted());
875   EXPECT_FALSE(hold_non_existant2->IsStarted());
876   EXPECT_FALSE(hold_good->IsStarted());
877   // Allow the connection attempt to the second address to resume and wait
878   // for the attempt for the third address.  No other connection
879   // attempts should be started yet.
880   auto hold_non_existant1_again = injector.AddHold(non_existant_endpoint1.port);
881   hold_non_existant1->Resume();
882   hold_non_existant2->Wait();
883   EXPECT_FALSE(hold_non_existant0_again->IsStarted());
884   EXPECT_FALSE(hold_non_existant1_again->IsStarted());
885   EXPECT_FALSE(hold_good->IsStarted());
886   // Allow the connection attempt to the third address to resume and wait
887   // for the attempt for the final address.  No other connection
888   // attempts should be started yet.
889   auto hold_non_existant2_again = injector.AddHold(non_existant_endpoint2.port);
890   hold_non_existant2->Resume();
891   hold_good->Wait();
892   EXPECT_FALSE(hold_non_existant0_again->IsStarted());
893   EXPECT_FALSE(hold_non_existant1_again->IsStarted());
894   EXPECT_FALSE(hold_non_existant2_again->IsStarted());
895   // Allow the final attempt to resume.
896   hold_good->Resume();
897   // Wait for channel to become connected without any pending RPC.
898   EXPECT_TRUE(channel_->WaitForConnected(grpc_timeout_seconds_to_deadline(10)));
899   // No other connection attempts should have been started.
900   EXPECT_FALSE(hold_non_existant0_again->IsStarted());
901   EXPECT_FALSE(hold_non_existant1_again->IsStarted());
902   EXPECT_FALSE(hold_non_existant2_again->IsStarted());
903   // RPC should have been cancelled.
904   EXPECT_EQ(StatusCode::CANCELLED, rpc.GetStatus().error_code());
905   // Make sure the backend did not get any requests.
906   EXPECT_EQ(0UL, backends_[0]->backend_service()->request_count());
907 }
908 
909 // Test that when the first pick is down leading to a transient failure, we
910 // will move on to the next ring hash entry.
TEST_P(RingHashTest,TransientFailureCheckNextOne)911 TEST_P(RingHashTest, TransientFailureCheckNextOne) {
912   CreateAndStartBackends(1);
913   auto cluster = default_cluster_;
914   cluster.set_lb_policy(Cluster::RING_HASH);
915   balancer_->ads_service()->SetCdsResource(cluster);
916   auto new_route_config = default_route_config_;
917   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
918   auto* hash_policy = route->mutable_route()->add_hash_policy();
919   hash_policy->mutable_header()->set_header_name("address_hash");
920   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
921                                    new_route_config);
922   std::vector<EdsResourceArgs::Endpoint> endpoints;
923   const int unused_port = grpc_pick_unused_port_or_die();
924   endpoints.emplace_back(unused_port);
925   endpoints.emplace_back(backends_[0]->port());
926   EdsResourceArgs args({{"locality0", std::move(endpoints)}});
927   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
928   std::vector<std::pair<std::string, std::string>> metadata = {
929       {"address_hash",
930        CreateMetadataValueThatHashesToBackendPort(unused_port)}};
931   const auto rpc_options =
932       RpcOptions().set_metadata(std::move(metadata)).set_timeout_ms(5000);
933   WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
934                  WaitForBackendOptions(), rpc_options);
935   CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options);
936 }
937 
938 // Test that when a backend goes down, we will move on to the next subchannel
939 // (with a lower priority).  When the backend comes back up, traffic will move
940 // back.
TEST_P(RingHashTest,SwitchToLowerPrioirtyAndThenBack)941 TEST_P(RingHashTest, SwitchToLowerPrioirtyAndThenBack) {
942   CreateAndStartBackends(2);
943   auto cluster = default_cluster_;
944   cluster.set_lb_policy(Cluster::RING_HASH);
945   balancer_->ads_service()->SetCdsResource(cluster);
946   auto new_route_config = default_route_config_;
947   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
948   auto* hash_policy = route->mutable_route()->add_hash_policy();
949   hash_policy->mutable_header()->set_header_name("address_hash");
950   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
951                                    new_route_config);
952   EdsResourceArgs args({
953       {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
954        0},
955       {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
956        1},
957   });
958   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
959   std::vector<std::pair<std::string, std::string>> metadata = {
960       {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
961   const auto rpc_options =
962       RpcOptions().set_metadata(std::move(metadata)).set_timeout_ms(5000);
963   WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
964                  WaitForBackendOptions(), rpc_options);
965   backends_[0]->StopListeningAndSendGoaways();
966   WaitForBackend(DEBUG_LOCATION, 1, /*check_status=*/nullptr,
967                  WaitForBackendOptions(), rpc_options);
968   ShutdownBackend(0);
969   StartBackend(0);
970   WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
971                  WaitForBackendOptions(), rpc_options);
972   CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options);
973   EXPECT_EQ(100, backends_[0]->backend_service()->request_count());
974   EXPECT_EQ(0, backends_[1]->backend_service()->request_count());
975 }
976 
977 // Test that when all backends are down, we will keep reattempting.
TEST_P(RingHashTest,ReattemptWhenAllEndpointsUnreachable)978 TEST_P(RingHashTest, ReattemptWhenAllEndpointsUnreachable) {
979   CreateAndStartBackends(1);
980   const uint32_t kConnectionTimeoutMilliseconds = 5000;
981   auto cluster = default_cluster_;
982   cluster.set_lb_policy(Cluster::RING_HASH);
983   balancer_->ads_service()->SetCdsResource(cluster);
984   auto new_route_config = default_route_config_;
985   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
986   auto* hash_policy = route->mutable_route()->add_hash_policy();
987   hash_policy->mutable_header()->set_header_name("address_hash");
988   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
989                                    new_route_config);
990   EdsResourceArgs args(
991       {{"locality0", {MakeNonExistantEndpoint(), CreateEndpoint(0)}}});
992   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
993   std::vector<std::pair<std::string, std::string>> metadata = {
994       {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
995   EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
996   ShutdownBackend(0);
997   CheckRpcSendFailure(
998       DEBUG_LOCATION, StatusCode::UNAVAILABLE,
999       MakeConnectionFailureRegex(
1000           "ring hash cannot find a connected endpoint; first failure: "),
1001       RpcOptions().set_metadata(std::move(metadata)));
1002   StartBackend(0);
1003   // Ensure we are actively connecting without any traffic.
1004   EXPECT_TRUE(channel_->WaitForConnected(
1005       grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds)));
1006 }
1007 
1008 // Test that when all backends are down and then up, we may pick a TF backend
1009 // and we will then jump to ready backend.
TEST_P(RingHashTest,TransientFailureSkipToAvailableReady)1010 TEST_P(RingHashTest, TransientFailureSkipToAvailableReady) {
1011   CreateBackends(2);
1012   const uint32_t kConnectionTimeoutMilliseconds = 5000;
1013   auto cluster = default_cluster_;
1014   cluster.set_lb_policy(Cluster::RING_HASH);
1015   balancer_->ads_service()->SetCdsResource(cluster);
1016   auto new_route_config = default_route_config_;
1017   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
1018   auto* hash_policy = route->mutable_route()->add_hash_policy();
1019   hash_policy->mutable_header()->set_header_name("address_hash");
1020   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
1021                                    new_route_config);
1022   // Make sure we include some unused ports to fill the ring.
1023   EdsResourceArgs args({
1024       {"locality0",
1025        {CreateEndpoint(0), CreateEndpoint(1), MakeNonExistantEndpoint(),
1026         MakeNonExistantEndpoint()}},
1027   });
1028   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1029   std::vector<std::pair<std::string, std::string>> metadata = {
1030       {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
1031   const auto rpc_options = RpcOptions()
1032                                .set_metadata(std::move(metadata))
1033                                .set_timeout_ms(kConnectionTimeoutMilliseconds);
1034   EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
1035   gpr_log(GPR_INFO, "=== SENDING FIRST RPC ===");
1036   CheckRpcSendFailure(
1037       DEBUG_LOCATION, StatusCode::UNAVAILABLE,
1038       MakeConnectionFailureRegex(
1039           "ring hash cannot find a connected endpoint; first failure: "),
1040       rpc_options);
1041   gpr_log(GPR_INFO, "=== DONE WITH FIRST RPC ===");
1042   EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel_->GetState(false));
1043   // Bring up backend 0.  The channel should become connected without
1044   // any picks, because in TF, we are always trying to connect to at
1045   // least one backend at all times.
1046   gpr_log(GPR_INFO, "=== STARTING BACKEND 0 ===");
1047   StartBackend(0);
1048   gpr_log(GPR_INFO, "=== WAITING FOR CHANNEL TO BECOME READY ===");
1049   EXPECT_TRUE(channel_->WaitForConnected(
1050       grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds)));
1051   // RPCs should go to backend 0.
1052   gpr_log(GPR_INFO, "=== WAITING FOR BACKEND 0 ===");
1053   WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
1054                  WaitForBackendOptions(), rpc_options);
1055   EXPECT_EQ(GRPC_CHANNEL_READY, channel_->GetState(false));
1056   // Bring down backend 0 and bring up backend 1.
1057   // Note the RPC contains a header value that will always be hashed to
1058   // backend 0. So by purposely bringing down backend 0 and bringing up another
1059   // backend, this will ensure Picker's first choice of backend 0 will fail
1060   // and it will go through the remaining subchannels to find one in READY.
1061   // Since the the entries in the ring are pretty distributed and we have
1062   // unused ports to fill the ring, it is almost guaranteed that the Picker
1063   // will go through some non-READY entries and skip them as per design.
1064   gpr_log(GPR_INFO, "=== SHUTTING DOWN BACKEND 0 ===");
1065   ShutdownBackend(0);
1066   gpr_log(GPR_INFO, "=== WAITING FOR STATE CHANGE ===");
1067   EXPECT_TRUE(channel_->WaitForStateChange(
1068       GRPC_CHANNEL_READY,
1069       grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds)));
1070   EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel_->GetState(false));
1071   gpr_log(GPR_INFO, "=== SENDING SECOND RPC ===");
1072   CheckRpcSendFailure(
1073       DEBUG_LOCATION, StatusCode::UNAVAILABLE,
1074       MakeConnectionFailureRegex(
1075           "ring hash cannot find a connected endpoint; first failure: "),
1076       rpc_options);
1077   gpr_log(GPR_INFO, "=== STARTING BACKEND 1 ===");
1078   StartBackend(1);
1079   gpr_log(GPR_INFO, "=== WAITING FOR CHANNEL TO BECOME READY ===");
1080   EXPECT_TRUE(channel_->WaitForConnected(
1081       grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds)));
1082   gpr_log(GPR_INFO, "=== WAITING FOR BACKEND 1 ===");
1083   WaitForBackend(DEBUG_LOCATION, 1, /*check_status=*/nullptr,
1084                  WaitForBackendOptions(), rpc_options);
1085   gpr_log(GPR_INFO, "=== DONE ===");
1086 }
1087 
1088 // This tests a bug seen in the wild where ring_hash started with no
1089 // endpoints and reported TRANSIENT_FAILURE, then got an update with
1090 // endpoints and reported IDLE, but the picker update was squelched, so
1091 // it failed to ever get reconnected.
TEST_P(RingHashTest,ReattemptWhenGoingFromTransientFailureToIdle)1092 TEST_P(RingHashTest, ReattemptWhenGoingFromTransientFailureToIdle) {
1093   CreateAndStartBackends(1);
1094   const uint32_t kConnectionTimeoutMilliseconds = 5000;
1095   auto cluster = default_cluster_;
1096   cluster.set_lb_policy(Cluster::RING_HASH);
1097   balancer_->ads_service()->SetCdsResource(cluster);
1098   auto new_route_config = default_route_config_;
1099   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
1100                                    new_route_config);
1101   // Send empty EDS update.
1102   EdsResourceArgs args(
1103       {{"locality0", std::vector<EdsResourceArgs::Endpoint>()}});
1104   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1105   EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
1106   // Channel should fail RPCs and go into TRANSIENT_FAILURE.
1107   CheckRpcSendFailure(
1108       DEBUG_LOCATION, StatusCode::UNAVAILABLE,
1109       "empty address list: EDS resource eds_service_name contains empty "
1110       "localities: \\[\\{region=\"xds_default_locality_region\", "
1111       "zone=\"xds_default_locality_zone\", sub_zone=\"locality0\"\\}\\]",
1112       RpcOptions().set_timeout_ms(kConnectionTimeoutMilliseconds));
1113   EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel_->GetState(false));
1114   // Send EDS update with 1 backend.
1115   args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends()}});
1116   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1117   // A wait_for_ready RPC should succeed, and the channel should report READY.
1118   CheckRpcSendOk(DEBUG_LOCATION, 1,
1119                  RpcOptions()
1120                      .set_timeout_ms(kConnectionTimeoutMilliseconds)
1121                      .set_wait_for_ready(true));
1122   EXPECT_EQ(GRPC_CHANNEL_READY, channel_->GetState(false));
1123 }
1124 
1125 // Test unspported hash policy types are all ignored before a supported
1126 // policy.
TEST_P(RingHashTest,UnsupportedHashPolicyUntilChannelIdHashing)1127 TEST_P(RingHashTest, UnsupportedHashPolicyUntilChannelIdHashing) {
1128   CreateAndStartBackends(2);
1129   auto cluster = default_cluster_;
1130   cluster.set_lb_policy(Cluster::RING_HASH);
1131   balancer_->ads_service()->SetCdsResource(cluster);
1132   auto new_route_config = default_route_config_;
1133   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
1134   auto* hash_policy_unsupported_1 = route->mutable_route()->add_hash_policy();
1135   hash_policy_unsupported_1->mutable_cookie()->set_name("cookie");
1136   auto* hash_policy_unsupported_2 = route->mutable_route()->add_hash_policy();
1137   hash_policy_unsupported_2->mutable_connection_properties()->set_source_ip(
1138       true);
1139   auto* hash_policy_unsupported_3 = route->mutable_route()->add_hash_policy();
1140   hash_policy_unsupported_3->mutable_query_parameter()->set_name(
1141       "query_parameter");
1142   auto* hash_policy = route->mutable_route()->add_hash_policy();
1143   hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
1144   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
1145                                    new_route_config);
1146   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
1147   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1148   CheckRpcSendOk(DEBUG_LOCATION, 100, RpcOptions().set_timeout_ms(5000));
1149   bool found = false;
1150   for (size_t i = 0; i < backends_.size(); ++i) {
1151     if (backends_[i]->backend_service()->request_count() > 0) {
1152       EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100)
1153           << "backend " << i;
1154       EXPECT_FALSE(found) << "backend " << i;
1155       found = true;
1156     }
1157   }
1158   EXPECT_TRUE(found);
1159 }
1160 
1161 }  // namespace
1162 }  // namespace testing
1163 }  // namespace grpc
1164 
main(int argc,char ** argv)1165 int main(int argc, char** argv) {
1166   grpc::testing::TestEnvironment env(&argc, argv);
1167   ::testing::InitGoogleTest(&argc, argv);
1168   // Make the backup poller poll very frequently in order to pick up
1169   // updates from all the subchannels's FDs.
1170   grpc_core::ConfigVars::Overrides overrides;
1171   overrides.client_channel_backup_poll_interval_ms = 1;
1172   grpc_core::ConfigVars::SetOverrides(overrides);
1173 #if TARGET_OS_IPHONE
1174   // Workaround Apple CFStream bug
1175   grpc_core::SetEnv("grpc_cfstream", "0");
1176 #endif
1177   grpc_init();
1178   grpc::testing::ConnectionAttemptInjector::Init();
1179   const auto result = RUN_ALL_TESTS();
1180   grpc_shutdown();
1181   return result;
1182 }
1183