1 //
2 // Copyright 2023 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "src/core/load_balancing/ring_hash/ring_hash.h"
18
19 #include <stdint.h>
20
21 #include <algorithm>
22 #include <array>
23 #include <memory>
24 #include <string>
25 #include <vector>
26
27 #include "absl/status/status.h"
28 #include "absl/strings/str_cat.h"
29 #include "absl/strings/string_view.h"
30 #include "absl/strings/strip.h"
31 #include "absl/types/optional.h"
32 #include "gtest/gtest.h"
33
34 #include <grpc/grpc.h>
35 #include <grpc/support/json.h>
36
37 #include "src/core/lib/gprpp/ref_counted_ptr.h"
38 #include "src/core/lib/gprpp/xxhash_inline.h"
39 #include "src/core/lib/json/json.h"
40 #include "src/core/load_balancing/lb_policy.h"
41 #include "src/core/resolver/endpoint_addresses.h"
42 #include "test/core/client_channel/lb_policy/lb_policy_test_lib.h"
43 #include "test/core/util/test_config.h"
44
45 namespace grpc_core {
46 namespace testing {
47 namespace {
48
49 // TODO(roth): I created this file when I fixed a bug and wrote only a
50 // very basic test and the test needed for that bug. When we have time,
51 // we need a lot more tests here to cover all of the policy's functionality.
52
53 class RingHashTest : public LoadBalancingPolicyTest {
54 protected:
RingHashTest()55 RingHashTest() : LoadBalancingPolicyTest("ring_hash_experimental") {}
56
MakeRingHashConfig(int min_ring_size=0,int max_ring_size=0)57 static RefCountedPtr<LoadBalancingPolicy::Config> MakeRingHashConfig(
58 int min_ring_size = 0, int max_ring_size = 0) {
59 Json::Object fields;
60 if (min_ring_size > 0) {
61 fields["minRingSize"] = Json::FromString(absl::StrCat(min_ring_size));
62 }
63 if (max_ring_size > 0) {
64 fields["maxRingSize"] = Json::FromString(absl::StrCat(max_ring_size));
65 }
66 return MakeConfig(Json::FromArray({Json::FromObject(
67 {{"ring_hash_experimental", Json::FromObject(fields)}})}));
68 }
69
MakeHashAttribute(absl::string_view address)70 RequestHashAttribute* MakeHashAttribute(absl::string_view address) {
71 std::string hash_input =
72 absl::StrCat(absl::StripPrefix(address, "ipv4:"), "_0");
73 uint64_t hash = XXH64(hash_input.data(), hash_input.size(), 0);
74 attribute_storage_.emplace_back(
75 std::make_unique<RequestHashAttribute>(hash));
76 return attribute_storage_.back().get();
77 }
78
79 std::vector<std::unique_ptr<RequestHashAttribute>> attribute_storage_;
80 };
81
TEST_F(RingHashTest,Basic)82 TEST_F(RingHashTest, Basic) {
83 const std::array<absl::string_view, 3> kAddresses = {
84 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
85 EXPECT_EQ(
86 ApplyUpdate(BuildUpdate(kAddresses, MakeRingHashConfig()), lb_policy()),
87 absl::OkStatus());
88 auto picker = ExpectState(GRPC_CHANNEL_IDLE);
89 auto* address0_attribute = MakeHashAttribute(kAddresses[0]);
90 ExpectPickQueued(picker.get(), {address0_attribute});
91 WaitForWorkSerializerToFlush();
92 WaitForWorkSerializerToFlush();
93 auto* subchannel = FindSubchannel(kAddresses[0]);
94 ASSERT_NE(subchannel, nullptr);
95 EXPECT_TRUE(subchannel->ConnectionRequested());
96 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
97 picker = ExpectState(GRPC_CHANNEL_CONNECTING);
98 ExpectPickQueued(picker.get(), {address0_attribute});
99 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
100 picker = ExpectState(GRPC_CHANNEL_READY);
101 auto address = ExpectPickComplete(picker.get(), {address0_attribute});
102 EXPECT_EQ(address, kAddresses[0]);
103 }
104
TEST_F(RingHashTest,SameAddressListedMultipleTimes)105 TEST_F(RingHashTest, SameAddressListedMultipleTimes) {
106 const std::array<absl::string_view, 3> kAddresses = {
107 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:441"};
108 EXPECT_EQ(
109 ApplyUpdate(BuildUpdate(kAddresses, MakeRingHashConfig()), lb_policy()),
110 absl::OkStatus());
111 auto picker = ExpectState(GRPC_CHANNEL_IDLE);
112 auto* address0_attribute = MakeHashAttribute(kAddresses[0]);
113 ExpectPickQueued(picker.get(), {address0_attribute});
114 WaitForWorkSerializerToFlush();
115 WaitForWorkSerializerToFlush();
116 auto* subchannel = FindSubchannel(kAddresses[0]);
117 ASSERT_NE(subchannel, nullptr);
118 EXPECT_TRUE(subchannel->ConnectionRequested());
119 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
120 picker = ExpectState(GRPC_CHANNEL_CONNECTING);
121 ExpectPickQueued(picker.get(), {address0_attribute});
122 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
123 picker = ExpectState(GRPC_CHANNEL_READY);
124 auto address = ExpectPickComplete(picker.get(), {address0_attribute});
125 EXPECT_EQ(address, kAddresses[0]);
126 }
127
TEST_F(RingHashTest,MultipleAddressesPerEndpoint)128 TEST_F(RingHashTest, MultipleAddressesPerEndpoint) {
129 constexpr std::array<absl::string_view, 2> kEndpoint1Addresses = {
130 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
131 constexpr std::array<absl::string_view, 2> kEndpoint2Addresses = {
132 "ipv4:127.0.0.1:445", "ipv4:127.0.0.1:446"};
133 const std::array<EndpointAddresses, 2> kEndpoints = {
134 MakeEndpointAddresses(kEndpoint1Addresses),
135 MakeEndpointAddresses(kEndpoint2Addresses)};
136 EXPECT_EQ(
137 ApplyUpdate(BuildUpdate(kEndpoints, MakeRingHashConfig()), lb_policy()),
138 absl::OkStatus());
139 auto picker = ExpectState(GRPC_CHANNEL_IDLE);
140 // Normal connection to first address of the first endpoint.
141 auto* address0_attribute = MakeHashAttribute(kEndpoint1Addresses[0]);
142 ExpectPickQueued(picker.get(), {address0_attribute});
143 WaitForWorkSerializerToFlush();
144 WaitForWorkSerializerToFlush();
145 auto* subchannel = FindSubchannel(kEndpoint1Addresses[0]);
146 ASSERT_NE(subchannel, nullptr);
147 EXPECT_TRUE(subchannel->ConnectionRequested());
148 auto* subchannel2 = FindSubchannel(kEndpoint1Addresses[1]);
149 ASSERT_NE(subchannel2, nullptr);
150 EXPECT_FALSE(subchannel2->ConnectionRequested());
151 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
152 picker = ExpectState(GRPC_CHANNEL_CONNECTING);
153 ExpectPickQueued(picker.get(), {address0_attribute});
154 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
155 picker = ExpectState(GRPC_CHANNEL_READY);
156 auto address = ExpectPickComplete(picker.get(), {address0_attribute});
157 EXPECT_EQ(address, kEndpoint1Addresses[0]);
158 // Now that connection fails.
159 subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
160 ExpectReresolutionRequest();
161 picker = ExpectState(GRPC_CHANNEL_IDLE);
162 EXPECT_FALSE(subchannel->ConnectionRequested());
163 EXPECT_FALSE(subchannel2->ConnectionRequested());
164 // The LB policy will try to reconnect when it gets another pick.
165 ExpectPickQueued(picker.get(), {address0_attribute});
166 WaitForWorkSerializerToFlush();
167 WaitForWorkSerializerToFlush();
168 EXPECT_TRUE(subchannel->ConnectionRequested());
169 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
170 picker = ExpectState(GRPC_CHANNEL_CONNECTING);
171 ExpectPickQueued(picker.get(), {address0_attribute});
172 // The connection attempt fails.
173 subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
174 absl::UnavailableError("ugh"));
175 // The PF child policy will try to connect to the second address for the
176 // endpoint.
177 EXPECT_TRUE(subchannel2->ConnectionRequested());
178 subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
179 picker = ExpectState(GRPC_CHANNEL_CONNECTING);
180 ExpectPickQueued(picker.get(), {address0_attribute});
181 subchannel2->SetConnectivityState(GRPC_CHANNEL_READY);
182 picker = ExpectState(GRPC_CHANNEL_READY);
183 address = ExpectPickComplete(picker.get(), {address0_attribute});
184 EXPECT_EQ(address, kEndpoint1Addresses[1]);
185 }
186
187 } // namespace
188 } // namespace testing
189 } // namespace grpc_core
190
main(int argc,char ** argv)191 int main(int argc, char** argv) {
192 ::testing::InitGoogleTest(&argc, argv);
193 grpc::testing::TestEnvironment env(&argc, argv);
194 return RUN_ALL_TESTS();
195 }
196