1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "src/core/load_balancing/pick_first/pick_first.h"
18
19 #include <stddef.h>
20
21 #include <algorithm>
22 #include <array>
23 #include <chrono>
24 #include <map>
25 #include <memory>
26 #include <utility>
27 #include <vector>
28
29 #include "absl/status/status.h"
30 #include "absl/strings/string_view.h"
31 #include "absl/synchronization/notification.h"
32 #include "absl/types/optional.h"
33 #include "absl/types/span.h"
34 #include "gmock/gmock.h"
35 #include "gtest/gtest.h"
36
37 #include <grpc/grpc.h>
38 #include <grpc/support/json.h>
39
40 #include "src/core/lib/channel/metrics.h"
41 #include "src/core/lib/gprpp/debug_location.h"
42 #include "src/core/lib/gprpp/orphanable.h"
43 #include "src/core/lib/gprpp/ref_counted_ptr.h"
44 #include "src/core/lib/gprpp/time.h"
45 #include "src/core/lib/gprpp/work_serializer.h"
46 #include "src/core/lib/iomgr/exec_ctx.h"
47 #include "src/core/lib/json/json.h"
48 #include "src/core/load_balancing/lb_policy.h"
49 #include "src/core/resolver/endpoint_addresses.h"
50 #include "test/core/client_channel/lb_policy/lb_policy_test_lib.h"
51 #include "test/core/util/fake_stats_plugin.h"
52 #include "test/core/util/test_config.h"
53
54 namespace grpc_core {
55 namespace testing {
56 namespace {
57
58 class PickFirstTest : public LoadBalancingPolicyTest {
59 protected:
PickFirstTest(ChannelArgs channel_args=ChannelArgs ())60 explicit PickFirstTest(ChannelArgs channel_args = ChannelArgs())
61 : LoadBalancingPolicyTest("pick_first", channel_args) {}
62
SetUp()63 void SetUp() override {
64 LoadBalancingPolicyTest::SetUp();
65 SetExpectedTimerDuration(std::chrono::milliseconds(250));
66 }
67
MakePickFirstConfig(absl::optional<bool> shuffle_address_list=absl::nullopt)68 static RefCountedPtr<LoadBalancingPolicy::Config> MakePickFirstConfig(
69 absl::optional<bool> shuffle_address_list = absl::nullopt) {
70 return MakeConfig(Json::FromArray({Json::FromObject(
71 {{"pick_first",
72 shuffle_address_list.has_value()
73 ? Json::FromObject({{"shuffleAddressList",
74 Json::FromBool(*shuffle_address_list)}})
75 : Json::FromObject({})}})}));
76 }
77
78 // Gets order the addresses are being picked. Return type is void so
79 // assertions can be used
GetOrderAddressesArePicked(absl::Span<const absl::string_view> addresses,std::vector<absl::string_view> * out_address_order)80 void GetOrderAddressesArePicked(
81 absl::Span<const absl::string_view> addresses,
82 std::vector<absl::string_view>* out_address_order) {
83 out_address_order->clear();
84 ExitIdle();
85 // Construct a map of subchannel to address.
86 // We will remove entries as each subchannel starts to connect.
87 std::map<SubchannelState*, absl::string_view> subchannels;
88 for (auto address : addresses) {
89 auto* subchannel = FindSubchannel(address);
90 ASSERT_NE(subchannel, nullptr);
91 subchannels.emplace(subchannel, address);
92 }
93 // Now process each subchannel in the order in which pick_first tries it.
94 while (!subchannels.empty()) {
95 // Find the subchannel that is being attempted.
96 SubchannelState* subchannel = nullptr;
97 for (const auto& p : subchannels) {
98 if (p.first->ConnectionRequested()) {
99 out_address_order->push_back(p.second);
100 subchannel = p.first;
101 break;
102 }
103 }
104 ASSERT_NE(subchannel, nullptr);
105 // The subchannel reports CONNECTING.
106 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
107 // If this is the first subchannel being attempted, expect a CONNECTING
108 // update.
109 if (subchannels.size() == addresses.size()) {
110 ExpectConnectingUpdate();
111 }
112 if (subchannels.size() > 1) {
113 // Not the last subchannel in the list. Connection attempt should fail.
114 subchannel->SetConnectivityState(
115 GRPC_CHANNEL_TRANSIENT_FAILURE,
116 absl::UnavailableError("failed to connect"));
117 subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
118 } else {
119 // Last subchannel in the list. Connection attempt should succeed.
120 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
121 auto picker = WaitForConnected();
122 ASSERT_NE(picker, nullptr);
123 EXPECT_EQ(ExpectPickComplete(picker.get()), out_address_order->back());
124 // Then it should become disconnected.
125 subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
126 ExpectReresolutionRequest();
127 // We would normally call ExpectStateAndQueueingPicker() here instead of
128 // just ExpectState(). However, calling the picker would also trigger
129 // exiting IDLE, which we don't want here, because if the test is going
130 // to send an address list update and call GetOrderAddressesArePicked()
131 // again, we don't want to trigger a connection attempt on any
132 // subchannel until after that next address list update is processed.
133 ExpectState(GRPC_CHANNEL_IDLE);
134 }
135 // Remove the subchannel from the map.
136 subchannels.erase(subchannel);
137 }
138 }
139 };
140
TEST_F(PickFirstTest,FirstAddressWorks)141 TEST_F(PickFirstTest, FirstAddressWorks) {
142 // Send an update containing two addresses.
143 constexpr std::array<absl::string_view, 2> kAddresses = {
144 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
145 absl::Status status = ApplyUpdate(
146 BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
147 EXPECT_TRUE(status.ok()) << status;
148 // LB policy should have created a subchannel for both addresses.
149 auto* subchannel = FindSubchannel(kAddresses[0]);
150 ASSERT_NE(subchannel, nullptr);
151 auto* subchannel2 = FindSubchannel(kAddresses[1]);
152 ASSERT_NE(subchannel2, nullptr);
153 // When the LB policy receives the first subchannel's initial connectivity
154 // state notification (IDLE), it will request a connection.
155 EXPECT_TRUE(subchannel->ConnectionRequested());
156 // This causes the subchannel to start to connect, so it reports CONNECTING.
157 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
158 // LB policy should have reported CONNECTING state.
159 ExpectConnectingUpdate();
160 // The second subchannel should not be connecting.
161 EXPECT_FALSE(subchannel2->ConnectionRequested());
162 // When the first subchannel becomes connected, it reports READY.
163 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
164 // The LB policy will report CONNECTING some number of times (doesn't
165 // matter how many) and then report READY.
166 auto picker = WaitForConnected();
167 ASSERT_NE(picker, nullptr);
168 // Picker should return the same subchannel repeatedly.
169 for (size_t i = 0; i < 3; ++i) {
170 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[0]);
171 }
172 }
173
TEST_F(PickFirstTest,FirstAddressFails)174 TEST_F(PickFirstTest, FirstAddressFails) {
175 // Send an update containing two addresses.
176 constexpr std::array<absl::string_view, 2> kAddresses = {
177 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
178 absl::Status status = ApplyUpdate(
179 BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
180 EXPECT_TRUE(status.ok()) << status;
181 // LB policy should have created a subchannel for both addresses.
182 auto* subchannel = FindSubchannel(kAddresses[0]);
183 ASSERT_NE(subchannel, nullptr);
184 auto* subchannel2 = FindSubchannel(kAddresses[1]);
185 ASSERT_NE(subchannel2, nullptr);
186 // When the LB policy receives the first subchannel's initial connectivity
187 // state notification (IDLE), it will request a connection.
188 EXPECT_TRUE(subchannel->ConnectionRequested());
189 // This causes the subchannel to start to connect, so it reports
190 // CONNECTING.
191 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
192 // LB policy should have reported CONNECTING state.
193 ExpectConnectingUpdate();
194 // The second subchannel should not be connecting.
195 EXPECT_FALSE(subchannel2->ConnectionRequested());
196 // The first subchannel's connection attempt fails.
197 subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
198 absl::UnavailableError("failed to connect"));
199 // The LB policy will start a connection attempt on the second subchannel.
200 EXPECT_TRUE(subchannel2->ConnectionRequested());
201 // This causes the subchannel to start to connect, so it reports
202 // CONNECTING.
203 subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
204 // The connection attempt succeeds.
205 subchannel2->SetConnectivityState(GRPC_CHANNEL_READY);
206 // The LB policy will report CONNECTING some number of times (doesn't
207 // matter how many) and then report READY.
208 auto picker = WaitForConnected();
209 ASSERT_NE(picker, nullptr);
210 // Picker should return the same subchannel repeatedly.
211 for (size_t i = 0; i < 3; ++i) {
212 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[1]);
213 }
214 }
215
TEST_F(PickFirstTest,FlattensEndpointAddressesList)216 TEST_F(PickFirstTest, FlattensEndpointAddressesList) {
217 // Send an update containing two endpoints, the first one with two addresses.
218 constexpr std::array<absl::string_view, 2> kEndpoint1Addresses = {
219 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
220 constexpr std::array<absl::string_view, 1> kEndpoint2Addresses = {
221 "ipv4:127.0.0.1:445"};
222 const std::array<EndpointAddresses, 2> kEndpoints = {
223 MakeEndpointAddresses(kEndpoint1Addresses),
224 MakeEndpointAddresses(kEndpoint2Addresses)};
225 absl::Status status = ApplyUpdate(
226 BuildUpdate(kEndpoints, MakePickFirstConfig(false)), lb_policy_.get());
227 EXPECT_TRUE(status.ok()) << status;
228 // LB policy should have created a subchannel for all 3 addresses.
229 auto* subchannel = FindSubchannel(kEndpoint1Addresses[0]);
230 ASSERT_NE(subchannel, nullptr);
231 auto* subchannel2 = FindSubchannel(kEndpoint1Addresses[1]);
232 ASSERT_NE(subchannel2, nullptr);
233 auto* subchannel3 = FindSubchannel(kEndpoint2Addresses[0]);
234 ASSERT_NE(subchannel3, nullptr);
235 // When the LB policy receives the first subchannel's initial connectivity
236 // state notification (IDLE), it will request a connection.
237 EXPECT_TRUE(subchannel->ConnectionRequested());
238 // This causes the subchannel to start to connect, so it reports
239 // CONNECTING.
240 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
241 // LB policy should have reported CONNECTING state.
242 ExpectConnectingUpdate();
243 // The other subchannels should not be connecting.
244 EXPECT_FALSE(subchannel2->ConnectionRequested());
245 EXPECT_FALSE(subchannel3->ConnectionRequested());
246 // The first subchannel's connection attempt fails.
247 subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
248 absl::UnavailableError("failed to connect"));
249 // The LB policy will start a connection attempt on the second subchannel.
250 EXPECT_TRUE(subchannel2->ConnectionRequested());
251 EXPECT_FALSE(subchannel3->ConnectionRequested());
252 // This causes the subchannel to start to connect, so it reports
253 // CONNECTING.
254 subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
255 // The connection attempt fails.
256 subchannel2->SetConnectivityState(
257 GRPC_CHANNEL_TRANSIENT_FAILURE,
258 absl::UnavailableError("failed to connect"));
259 // The LB policy will start a connection attempt on the third subchannel.
260 EXPECT_TRUE(subchannel3->ConnectionRequested());
261 // This causes the subchannel to start to connect, so it reports
262 // CONNECTING.
263 subchannel3->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
264 // This one succeeds.
265 subchannel3->SetConnectivityState(GRPC_CHANNEL_READY);
266 // The LB policy will report CONNECTING some number of times (doesn't
267 // matter how many) and then report READY.
268 auto picker = WaitForConnected();
269 ASSERT_NE(picker, nullptr);
270 // Picker should return the same subchannel repeatedly.
271 for (size_t i = 0; i < 3; ++i) {
272 EXPECT_EQ(ExpectPickComplete(picker.get()), kEndpoint2Addresses[0]);
273 }
274 }
275
TEST_F(PickFirstTest,FirstTwoAddressesInTransientFailureAtStart)276 TEST_F(PickFirstTest, FirstTwoAddressesInTransientFailureAtStart) {
277 // Send an update containing three addresses.
278 // The first two addresses are already in state TRANSIENT_FAILURE when the
279 // LB policy gets the update.
280 constexpr std::array<absl::string_view, 3> kAddresses = {
281 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444", "ipv4:127.0.0.1:445"};
282 auto* subchannel = CreateSubchannel(kAddresses[0]);
283 subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
284 absl::UnavailableError("failed to connect"),
285 /*validate_state_transition=*/false);
286 auto* subchannel2 = CreateSubchannel(kAddresses[1]);
287 subchannel2->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
288 absl::UnavailableError("failed to connect"),
289 /*validate_state_transition=*/false);
290 absl::Status status = ApplyUpdate(
291 BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
292 EXPECT_TRUE(status.ok()) << status;
293 // LB policy should have created a subchannel for all addresses.
294 auto* subchannel3 = FindSubchannel(kAddresses[2]);
295 ASSERT_NE(subchannel3, nullptr);
296 // When the LB policy receives the first subchannel's initial connectivity
297 // state notification (TRANSIENT_FAILURE), it will move on to the second
298 // subchannel. The second subchannel is also in state TRANSIENT_FAILURE,
299 // so the LB policy will move on to the third subchannel. That
300 // subchannel is in state IDLE, so the LB policy will request a connection
301 // attempt on it.
302 EXPECT_TRUE(subchannel3->ConnectionRequested());
303 // This causes the subchannel to start to connect, so it reports
304 // CONNECTING.
305 subchannel3->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
306 // LB policy should have reported CONNECTING state.
307 ExpectConnectingUpdate();
308 // The connection attempt succeeds.
309 subchannel3->SetConnectivityState(GRPC_CHANNEL_READY);
310 // The LB policy will report CONNECTING some number of times (doesn't
311 // matter how many) and then report READY.
312 auto picker = WaitForConnected();
313 ASSERT_NE(picker, nullptr);
314 // Picker should return the same subchannel repeatedly.
315 for (size_t i = 0; i < 3; ++i) {
316 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[2]);
317 }
318 }
319
TEST_F(PickFirstTest,AllAddressesInTransientFailureAtStart)320 TEST_F(PickFirstTest, AllAddressesInTransientFailureAtStart) {
321 // Send an update containing two addresses, both in TRANSIENT_FAILURE
322 // when the LB policy gets the update.
323 constexpr std::array<absl::string_view, 2> kAddresses = {
324 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
325 auto* subchannel = CreateSubchannel(kAddresses[0]);
326 subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
327 absl::UnavailableError("failed to connect"),
328 /*validate_state_transition=*/false);
329 auto* subchannel2 = CreateSubchannel(kAddresses[1]);
330 subchannel2->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
331 absl::UnavailableError("failed to connect"),
332 /*validate_state_transition=*/false);
333 absl::Status status = ApplyUpdate(
334 BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
335 EXPECT_TRUE(status.ok()) << status;
336 // The LB policy should request re-resolution.
337 ExpectReresolutionRequest();
338 // The LB policy should report TRANSIENT_FAILURE.
339 WaitForConnectionFailed([&](const absl::Status& status) {
340 EXPECT_EQ(status, absl::UnavailableError(
341 "failed to connect to all addresses; "
342 "last error: UNAVAILABLE: failed to connect"));
343 });
344 // No connections should have been requested.
345 EXPECT_FALSE(subchannel->ConnectionRequested());
346 EXPECT_FALSE(subchannel2->ConnectionRequested());
347 // Now have the first subchannel report IDLE.
348 subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
349 // The policy will ask it to connect.
350 EXPECT_TRUE(subchannel->ConnectionRequested());
351 // This causes the subchannel to start to connect, so it reports
352 // CONNECTING.
353 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
354 // The connection attempt succeeds.
355 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
356 // The LB policy will report READY.
357 auto picker = ExpectState(GRPC_CHANNEL_READY);
358 ASSERT_NE(picker, nullptr);
359 // Picker should return the same subchannel repeatedly.
360 for (size_t i = 0; i < 3; ++i) {
361 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[0]);
362 }
363 }
364
TEST_F(PickFirstTest,StaysInTransientFailureAfterAddressListUpdate)365 TEST_F(PickFirstTest, StaysInTransientFailureAfterAddressListUpdate) {
366 // Send an update containing two addresses, both in TRANSIENT_FAILURE
367 // when the LB policy gets the update.
368 constexpr std::array<absl::string_view, 2> kAddresses = {
369 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
370 auto* subchannel = CreateSubchannel(kAddresses[0]);
371 subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
372 absl::UnavailableError("failed to connect"),
373 /*validate_state_transition=*/false);
374 auto* subchannel2 = CreateSubchannel(kAddresses[1]);
375 subchannel2->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
376 absl::UnavailableError("failed to connect"),
377 /*validate_state_transition=*/false);
378 absl::Status status = ApplyUpdate(
379 BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
380 EXPECT_TRUE(status.ok()) << status;
381 // The LB policy should request re-resolution.
382 ExpectReresolutionRequest();
383 // The LB policy should report TRANSIENT_FAILURE.
384 WaitForConnectionFailed([&](const absl::Status& status) {
385 EXPECT_EQ(status, absl::UnavailableError(
386 "failed to connect to all addresses; "
387 "last error: UNAVAILABLE: failed to connect"));
388 });
389 // No connections should have been requested.
390 EXPECT_FALSE(subchannel->ConnectionRequested());
391 EXPECT_FALSE(subchannel2->ConnectionRequested());
392 // Now send an address list update. This contains the first address
393 // from the previous update plus a new address, whose subchannel will
394 // be in state IDLE.
395 constexpr std::array<absl::string_view, 2> kAddresses2 = {
396 kAddresses[0], "ipv4:127.0.0.1:445"};
397 status = ApplyUpdate(BuildUpdate(kAddresses2, MakePickFirstConfig(false)),
398 lb_policy());
399 EXPECT_TRUE(status.ok()) << status;
400 // The LB policy should have created a subchannel for the new address.
401 auto* subchannel3 = FindSubchannel(kAddresses2[1]);
402 ASSERT_NE(subchannel3, nullptr);
403 // The policy will ask it to connect.
404 EXPECT_TRUE(subchannel3->ConnectionRequested());
405 // This causes it to start to connect, so it reports CONNECTING.
406 subchannel3->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
407 // The connection attempt succeeds.
408 subchannel3->SetConnectivityState(GRPC_CHANNEL_READY);
409 // The LB policy will report READY.
410 auto picker = ExpectState(GRPC_CHANNEL_READY);
411 ASSERT_NE(picker, nullptr);
412 // Picker should return the same subchannel repeatedly.
413 for (size_t i = 0; i < 3; ++i) {
414 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses2[1]);
415 }
416 }
417
418 // This tests a real-world bug in which PF ignored a resolver update if
419 // it had just created the subchannels but had not yet seen their
420 // initial connectivity state notification.
TEST_F(PickFirstTest,ResolverUpdateBeforeLeavingIdle)421 TEST_F(PickFirstTest, ResolverUpdateBeforeLeavingIdle) {
422 constexpr std::array<absl::string_view, 2> kAddresses = {
423 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
424 constexpr std::array<absl::string_view, 2> kNewAddresses = {
425 "ipv4:127.0.0.1:445", "ipv4:127.0.0.1:446"};
426 // Send initial update containing two addresses.
427 absl::Status status = ApplyUpdate(
428 BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
429 EXPECT_TRUE(status.ok()) << status;
430 // LB policy should have created a subchannel for both addresses.
431 auto* subchannel = FindSubchannel(kAddresses[0]);
432 ASSERT_NE(subchannel, nullptr);
433 auto* subchannel2 = FindSubchannel(kAddresses[1]);
434 ASSERT_NE(subchannel2, nullptr);
435 // When the LB policy receives the first subchannel's initial connectivity
436 // state notification (IDLE), it will request a connection.
437 EXPECT_TRUE(subchannel->ConnectionRequested());
438 // This causes the subchannel to start to connect, so it reports CONNECTING.
439 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
440 // LB policy should have reported CONNECTING state.
441 ExpectConnectingUpdate();
442 // The second subchannel should not be connecting.
443 EXPECT_FALSE(subchannel2->ConnectionRequested());
444 // When the first subchannel becomes connected, it reports READY.
445 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
446 // The LB policy will report CONNECTING some number of times (doesn't
447 // matter how many) and then report READY.
448 auto picker = WaitForConnected();
449 ASSERT_NE(picker, nullptr);
450 // Picker should return the same subchannel repeatedly.
451 for (size_t i = 0; i < 3; ++i) {
452 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[0]);
453 }
454 // Now the connection is closed, so we go IDLE.
455 subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
456 ExpectReresolutionRequest();
457 ExpectState(GRPC_CHANNEL_IDLE);
458 // Now we tell the LB policy to exit idle. This causes it to create a
459 // new subchannel list from the original update. However, before it
460 // can get the initial connectivity state notifications for those
461 // subchannels (i.e., before it can transition from IDLE to CONNECTING),
462 // we send a new update.
463 absl::Notification notification;
464 work_serializer_->Run(
465 [&]() {
466 // Inject second update into WorkSerializer queue before we
467 // exit idle, so that the second update gets run before the initial
468 // subchannel connectivity state notifications from the first update
469 // are delivered.
470 work_serializer_->Run(
471 [&]() {
472 // Second update.
473 absl::Status status = lb_policy()->UpdateLocked(
474 BuildUpdate(kNewAddresses, MakePickFirstConfig(false)));
475 EXPECT_TRUE(status.ok()) << status;
476 // Trigger notification once all connectivity state
477 // notifications have been delivered.
478 work_serializer_->Run([&]() { notification.Notify(); },
479 DEBUG_LOCATION);
480 },
481 DEBUG_LOCATION);
482 // Exit idle.
483 lb_policy()->ExitIdleLocked();
484 },
485 DEBUG_LOCATION);
486 notification.WaitForNotification();
487 // The LB policy should have created subchannels for the new addresses.
488 auto* subchannel3 = FindSubchannel(kNewAddresses[0]);
489 ASSERT_NE(subchannel3, nullptr);
490 auto* subchannel4 = FindSubchannel(kNewAddresses[1]);
491 ASSERT_NE(subchannel4, nullptr);
492 // The LB policy will request a connection on the first new subchannel,
493 // none of the others.
494 EXPECT_TRUE(subchannel3->ConnectionRequested());
495 EXPECT_FALSE(subchannel->ConnectionRequested());
496 EXPECT_FALSE(subchannel2->ConnectionRequested());
497 EXPECT_FALSE(subchannel4->ConnectionRequested());
498 // The subchannel starts a connection attempt.
499 subchannel3->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
500 // The LB policy should now report CONNECTING.
501 ExpectConnectingUpdate();
502 // The connection attempt succeeds.
503 subchannel3->SetConnectivityState(GRPC_CHANNEL_READY);
504 // The LB policy will report CONNECTING some number of times (doesn't
505 // matter how many) and then report READY.
506 picker = WaitForConnected();
507 ASSERT_NE(picker, nullptr);
508 // Picker should return the same subchannel repeatedly.
509 for (size_t i = 0; i < 3; ++i) {
510 EXPECT_EQ(ExpectPickComplete(picker.get()), kNewAddresses[0]);
511 }
512 }
513
TEST_F(PickFirstTest,HappyEyeballs)514 TEST_F(PickFirstTest, HappyEyeballs) {
515 // Send an update containing three addresses.
516 constexpr std::array<absl::string_view, 3> kAddresses = {
517 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444", "ipv4:127.0.0.1:445"};
518 absl::Status status = ApplyUpdate(
519 BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
520 EXPECT_TRUE(status.ok()) << status;
521 // LB policy should have created a subchannel for both addresses.
522 auto* subchannel = FindSubchannel(kAddresses[0]);
523 ASSERT_NE(subchannel, nullptr);
524 auto* subchannel2 = FindSubchannel(kAddresses[1]);
525 ASSERT_NE(subchannel2, nullptr);
526 auto* subchannel3 = FindSubchannel(kAddresses[2]);
527 ASSERT_NE(subchannel3, nullptr);
528 // When the LB policy receives the first subchannel's initial connectivity
529 // state notification (IDLE), it will request a connection.
530 EXPECT_TRUE(subchannel->ConnectionRequested());
531 // This causes the subchannel to start to connect, so it reports
532 // CONNECTING.
533 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
534 // LB policy should have reported CONNECTING state.
535 ExpectConnectingUpdate();
536 // The second subchannel should not be connecting.
537 EXPECT_FALSE(subchannel2->ConnectionRequested());
538 // The timer fires before the connection attempt completes.
539 IncrementTimeBy(Duration::Milliseconds(250));
540 // This causes the LB policy to start connecting to the second subchannel.
541 EXPECT_TRUE(subchannel2->ConnectionRequested());
542 subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
543 // The second subchannel fails before the timer fires.
544 subchannel2->SetConnectivityState(
545 GRPC_CHANNEL_TRANSIENT_FAILURE,
546 absl::UnavailableError("failed to connect"));
547 // This causes the LB policy to start connecting to the third subchannel.
548 EXPECT_TRUE(subchannel3->ConnectionRequested());
549 subchannel3->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
550 // Incrementing the time here has no effect, because the LB policy
551 // does not use a timer for the last subchannel in the list.
552 // So if there are any queued updates at this point, they will be
553 // CONNECTING state.
554 IncrementTimeBy(Duration::Milliseconds(250));
555 DrainConnectingUpdates();
556 // The first subchannel becomes connected.
557 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
558 // The LB policy will report CONNECTING some number of times (doesn't
559 // matter how many) and then report READY.
560 auto picker = WaitForConnected();
561 ASSERT_NE(picker, nullptr);
562 // Picker should return the same subchannel repeatedly.
563 for (size_t i = 0; i < 3; ++i) {
564 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[0]);
565 }
566 }
567
TEST_F(PickFirstTest,HappyEyeballsCompletesWithoutSuccess)568 TEST_F(PickFirstTest, HappyEyeballsCompletesWithoutSuccess) {
569 // Send an update containing three addresses.
570 constexpr std::array<absl::string_view, 3> kAddresses = {
571 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444", "ipv4:127.0.0.1:445"};
572 absl::Status status = ApplyUpdate(
573 BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
574 EXPECT_TRUE(status.ok()) << status;
575 // LB policy should have created a subchannel for both addresses.
576 auto* subchannel = FindSubchannel(kAddresses[0]);
577 ASSERT_NE(subchannel, nullptr);
578 auto* subchannel2 = FindSubchannel(kAddresses[1]);
579 ASSERT_NE(subchannel2, nullptr);
580 auto* subchannel3 = FindSubchannel(kAddresses[2]);
581 ASSERT_NE(subchannel3, nullptr);
582 // When the LB policy receives the first subchannel's initial connectivity
583 // state notification (IDLE), it will request a connection.
584 EXPECT_TRUE(subchannel->ConnectionRequested());
585 // This causes the subchannel to start to connect, so it reports
586 // CONNECTING.
587 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
588 // LB policy should have reported CONNECTING state.
589 ExpectConnectingUpdate();
590 // The second subchannel should not be connecting.
591 EXPECT_FALSE(subchannel2->ConnectionRequested());
592 // The timer fires before the connection attempt completes.
593 IncrementTimeBy(Duration::Milliseconds(250));
594 // This causes the LB policy to start connecting to the second subchannel.
595 EXPECT_TRUE(subchannel2->ConnectionRequested());
596 subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
597 // The second subchannel fails before the timer fires.
598 subchannel2->SetConnectivityState(
599 GRPC_CHANNEL_TRANSIENT_FAILURE,
600 absl::UnavailableError("failed to connect"));
601 // This causes the LB policy to start connecting to the third subchannel.
602 EXPECT_TRUE(subchannel3->ConnectionRequested());
603 subchannel3->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
604 // Incrementing the time here has no effect, because the LB policy
605 // does not use a timer for the last subchannel in the list.
606 // So if there are any queued updates at this point, they will be
607 // CONNECTING state.
608 IncrementTimeBy(Duration::Milliseconds(250));
609 DrainConnectingUpdates();
610 // Set subchannel 2 back to IDLE, so it's already in that state when
611 // Happy Eyeballs fails.
612 subchannel2->SetConnectivityState(GRPC_CHANNEL_IDLE);
613 // Third subchannel fails to connect.
614 subchannel3->SetConnectivityState(
615 GRPC_CHANNEL_TRANSIENT_FAILURE,
616 absl::UnavailableError("failed to connect"));
617 ExpectQueueEmpty();
618 // Eventually, the first subchannel fails as well.
619 subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
620 absl::UnavailableError("failed to connect"));
621 // The LB policy should request re-resolution.
622 ExpectReresolutionRequest();
623 // The LB policy should report TRANSIENT_FAILURE.
624 WaitForConnectionFailed([&](const absl::Status& status) {
625 EXPECT_EQ(status, absl::UnavailableError(
626 "failed to connect to all addresses; "
627 "last error: UNAVAILABLE: failed to connect"));
628 });
629 // We are now done with the Happy Eyeballs pass, and we move into a
630 // mode where we try to connect to all subchannels in parallel.
631 // Subchannel 2 was already in state IDLE, so the LB policy will
632 // immediately trigger a connection request on it. It will not do so
633 // for subchannels 1 or 3, which are in TRANSIENT_FAILURE.
634 EXPECT_FALSE(subchannel->ConnectionRequested());
635 EXPECT_TRUE(subchannel2->ConnectionRequested());
636 EXPECT_FALSE(subchannel3->ConnectionRequested());
637 // Subchannel 2 reports CONNECTING.
638 subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
639 // Now subchannel 1 reports IDLE. This should trigger another
640 // connection attempt.
641 subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
642 EXPECT_TRUE(subchannel->ConnectionRequested());
643 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
644 // Now subchannel 1 reports TRANSIENT_FAILURE. This is the first failure
645 // since we finished Happy Eyeballs.
646 subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
647 absl::UnavailableError("failed to connect"));
648 EXPECT_FALSE(subchannel->ConnectionRequested());
649 // Now subchannel 3 reports IDLE. This should trigger another
650 // connection attempt.
651 subchannel3->SetConnectivityState(GRPC_CHANNEL_IDLE);
652 EXPECT_TRUE(subchannel3->ConnectionRequested());
653 subchannel3->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
654 // Subchannel 2 reports TF. This is the second failure since we
655 // finished Happy Eyeballs.
656 subchannel2->SetConnectivityState(
657 GRPC_CHANNEL_TRANSIENT_FAILURE,
658 absl::UnavailableError("failed to connect"));
659 EXPECT_FALSE(subchannel2->ConnectionRequested());
660 // Finally, subchannel 3 reports TF. This is the third failure since
661 // we finished Happy Eyeballs, so the LB policy will request
662 // re-resolution and report TF again.
663 subchannel3->SetConnectivityState(
664 GRPC_CHANNEL_TRANSIENT_FAILURE,
665 absl::UnavailableError("failed to connect"));
666 EXPECT_FALSE(subchannel3->ConnectionRequested());
667 ExpectReresolutionRequest();
668 ExpectTransientFailureUpdate(
669 absl::UnavailableError("failed to connect to all addresses; "
670 "last error: UNAVAILABLE: failed to connect"));
671 // Now the second subchannel goes IDLE.
672 subchannel2->SetConnectivityState(GRPC_CHANNEL_IDLE);
673 // The LB policy asks it to connect.
674 EXPECT_TRUE(subchannel2->ConnectionRequested());
675 subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
676 // This time, the connection attempt succeeds.
677 subchannel2->SetConnectivityState(GRPC_CHANNEL_READY);
678 // The LB policy will report READY.
679 auto picker = ExpectState(GRPC_CHANNEL_READY);
680 ASSERT_NE(picker, nullptr);
681 // Picker should return the same subchannel repeatedly.
682 for (size_t i = 0; i < 3; ++i) {
683 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[1]);
684 }
685 }
686
TEST_F(PickFirstTest,HappyEyeballsLastSubchannelFailsWhileAnotherIsStillPending)687 TEST_F(PickFirstTest,
688 HappyEyeballsLastSubchannelFailsWhileAnotherIsStillPending) {
689 // Send an update containing three addresses.
690 constexpr std::array<absl::string_view, 2> kAddresses = {
691 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
692 absl::Status status = ApplyUpdate(
693 BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
694 EXPECT_TRUE(status.ok()) << status;
695 // LB policy should have created a subchannel for both addresses.
696 auto* subchannel = FindSubchannel(kAddresses[0]);
697 ASSERT_NE(subchannel, nullptr);
698 auto* subchannel2 = FindSubchannel(kAddresses[1]);
699 ASSERT_NE(subchannel2, nullptr);
700 // When the LB policy receives the first subchannel's initial connectivity
701 // state notification (IDLE), it will request a connection.
702 EXPECT_TRUE(subchannel->ConnectionRequested());
703 // This causes the subchannel to start to connect, so it reports
704 // CONNECTING.
705 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
706 // LB policy should have reported CONNECTING state.
707 ExpectConnectingUpdate();
708 // The second subchannel should not be connecting.
709 EXPECT_FALSE(subchannel2->ConnectionRequested());
710 // The timer fires before the connection attempt completes.
711 IncrementTimeBy(Duration::Milliseconds(250));
712 // This causes the LB policy to start connecting to the second subchannel.
713 EXPECT_TRUE(subchannel2->ConnectionRequested());
714 subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
715 // The second subchannel fails.
716 subchannel2->SetConnectivityState(
717 GRPC_CHANNEL_TRANSIENT_FAILURE,
718 absl::UnavailableError("failed to connect"));
719 // The LB policy should not yet report TRANSIENT_FAILURE, because the
720 // first subchannel is still CONNECTING.
721 DrainConnectingUpdates();
722 // Set subchannel 2 back to IDLE, so it's already in that state when
723 // Happy Eyeballs fails.
724 subchannel2->SetConnectivityState(GRPC_CHANNEL_IDLE);
725 // Now the first subchannel fails.
726 subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
727 absl::UnavailableError("failed to connect"));
728 // The LB policy should request re-resolution.
729 ExpectReresolutionRequest();
730 // The LB policy should report TRANSIENT_FAILURE.
731 WaitForConnectionFailed([&](const absl::Status& status) {
732 EXPECT_EQ(status, absl::UnavailableError(
733 "failed to connect to all addresses; "
734 "last error: UNAVAILABLE: failed to connect"));
735 });
736 // We are now done with the Happy Eyeballs pass, and we move into a
737 // mode where we try to connect to all subchannels in parallel.
738 // Subchannel 2 was already in state IDLE, so the LB policy will
739 // immediately trigger a connection request on it. It will not do so
740 // for subchannel 1, which is still in TRANSIENT_FAILURE.
741 EXPECT_FALSE(subchannel->ConnectionRequested());
742 EXPECT_TRUE(subchannel2->ConnectionRequested());
743 // Subchannel 2 reports CONNECTING.
744 subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
745 // Subchannel 2 reports READY.
746 subchannel2->SetConnectivityState(GRPC_CHANNEL_READY);
747 // The LB policy will report READY.
748 auto picker = ExpectState(GRPC_CHANNEL_READY);
749 ASSERT_NE(picker, nullptr);
750 // Picker should return the same subchannel repeatedly.
751 for (size_t i = 0; i < 3; ++i) {
752 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[1]);
753 }
754 }
755
TEST_F(PickFirstTest,HappyEyeballsAddressInterleaving)756 TEST_F(PickFirstTest, HappyEyeballsAddressInterleaving) {
757 // Send an update containing four IPv4 addresses followed by two
758 // IPv6 addresses.
759 constexpr std::array<absl::string_view, 6> kAddresses = {
760 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444", "ipv4:127.0.0.1:445",
761 "ipv4:127.0.0.1:446", "ipv6:[::1]:444", "ipv6:[::1]:445"};
762 absl::Status status = ApplyUpdate(
763 BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
764 EXPECT_TRUE(status.ok()) << status;
765 // LB policy should have created a subchannel for all addresses.
766 auto* subchannel_ipv4_1 = FindSubchannel(kAddresses[0]);
767 ASSERT_NE(subchannel_ipv4_1, nullptr);
768 auto* subchannel_ipv4_2 = FindSubchannel(kAddresses[1]);
769 ASSERT_NE(subchannel_ipv4_2, nullptr);
770 auto* subchannel_ipv4_3 = FindSubchannel(kAddresses[2]);
771 ASSERT_NE(subchannel_ipv4_3, nullptr);
772 auto* subchannel_ipv4_4 = FindSubchannel(kAddresses[3]);
773 ASSERT_NE(subchannel_ipv4_4, nullptr);
774 auto* subchannel_ipv6_1 = FindSubchannel(kAddresses[4]);
775 ASSERT_NE(subchannel_ipv6_1, nullptr);
776 auto* subchannel_ipv6_2 = FindSubchannel(kAddresses[5]);
777 ASSERT_NE(subchannel_ipv6_2, nullptr);
778 // When the LB policy receives the subchannels' initial connectivity
779 // state notifications (all IDLE), it will request a connection on the
780 // first IPv4 subchannel.
781 EXPECT_TRUE(subchannel_ipv4_1->ConnectionRequested());
782 subchannel_ipv4_1->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
783 // LB policy should have reported CONNECTING state.
784 ExpectConnectingUpdate();
785 // No other subchannels should be connecting.
786 EXPECT_FALSE(subchannel_ipv4_2->ConnectionRequested());
787 EXPECT_FALSE(subchannel_ipv4_3->ConnectionRequested());
788 EXPECT_FALSE(subchannel_ipv4_4->ConnectionRequested());
789 EXPECT_FALSE(subchannel_ipv6_1->ConnectionRequested());
790 EXPECT_FALSE(subchannel_ipv6_2->ConnectionRequested());
791 // The timer fires before the connection attempt completes.
792 IncrementTimeBy(Duration::Milliseconds(250));
793 // This causes the LB policy to start connecting to the first IPv6
794 // subchannel.
795 EXPECT_TRUE(subchannel_ipv6_1->ConnectionRequested());
796 subchannel_ipv6_1->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
797 // LB policy should have reported CONNECTING state.
798 ExpectConnectingUpdate();
799 // No other subchannels should be connecting.
800 EXPECT_FALSE(subchannel_ipv4_2->ConnectionRequested());
801 EXPECT_FALSE(subchannel_ipv4_3->ConnectionRequested());
802 EXPECT_FALSE(subchannel_ipv4_4->ConnectionRequested());
803 EXPECT_FALSE(subchannel_ipv6_2->ConnectionRequested());
804 // The timer fires before the connection attempt completes.
805 IncrementTimeBy(Duration::Milliseconds(250));
806 // This causes the LB policy to start connecting to the second IPv4
807 // subchannel.
808 EXPECT_TRUE(subchannel_ipv4_2->ConnectionRequested());
809 subchannel_ipv4_2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
810 // LB policy should have reported CONNECTING state.
811 ExpectConnectingUpdate();
812 // No other subchannels should be connecting.
813 EXPECT_FALSE(subchannel_ipv4_3->ConnectionRequested());
814 EXPECT_FALSE(subchannel_ipv4_4->ConnectionRequested());
815 EXPECT_FALSE(subchannel_ipv6_2->ConnectionRequested());
816 // The timer fires before the connection attempt completes.
817 IncrementTimeBy(Duration::Milliseconds(250));
818 // This causes the LB policy to start connecting to the second IPv6
819 // subchannel.
820 EXPECT_TRUE(subchannel_ipv6_2->ConnectionRequested());
821 subchannel_ipv6_2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
822 // LB policy should have reported CONNECTING state.
823 ExpectConnectingUpdate();
824 // No other subchannels should be connecting.
825 EXPECT_FALSE(subchannel_ipv4_3->ConnectionRequested());
826 EXPECT_FALSE(subchannel_ipv4_4->ConnectionRequested());
827 // The timer fires before the connection attempt completes.
828 IncrementTimeBy(Duration::Milliseconds(250));
829 // This causes the LB policy to start connecting to the third IPv4
830 // subchannel.
831 EXPECT_TRUE(subchannel_ipv4_3->ConnectionRequested());
832 subchannel_ipv4_3->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
833 // LB policy should have reported CONNECTING state.
834 ExpectConnectingUpdate();
835 // No other subchannels should be connecting.
836 EXPECT_FALSE(subchannel_ipv4_4->ConnectionRequested());
837 // The timer fires before the connection attempt completes.
838 IncrementTimeBy(Duration::Milliseconds(250));
839 // This causes the LB policy to start connecting to the fourth IPv4
840 // subchannel.
841 EXPECT_TRUE(subchannel_ipv4_4->ConnectionRequested());
842 subchannel_ipv4_4->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
843 // LB policy should have reported CONNECTING state.
844 ExpectConnectingUpdate();
845 }
846
TEST_F(PickFirstTest,HappyEyeballsAddressInterleavingSecondFamilyHasMoreAddresses)847 TEST_F(PickFirstTest,
848 HappyEyeballsAddressInterleavingSecondFamilyHasMoreAddresses) {
849 // Send an update containing two IPv6 addresses followed by four IPv4
850 // addresses.
851 constexpr std::array<absl::string_view, 6> kAddresses = {
852 "ipv6:[::1]:444", "ipv6:[::1]:445", "ipv4:127.0.0.1:443",
853 "ipv4:127.0.0.1:444", "ipv4:127.0.0.1:445", "ipv4:127.0.0.1:446"};
854 absl::Status status = ApplyUpdate(
855 BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
856 EXPECT_TRUE(status.ok()) << status;
857 // LB policy should have created a subchannel for all addresses.
858 auto* subchannel_ipv6_1 = FindSubchannel(kAddresses[0]);
859 ASSERT_NE(subchannel_ipv6_1, nullptr);
860 auto* subchannel_ipv6_2 = FindSubchannel(kAddresses[1]);
861 ASSERT_NE(subchannel_ipv6_2, nullptr);
862 auto* subchannel_ipv4_1 = FindSubchannel(kAddresses[2]);
863 ASSERT_NE(subchannel_ipv4_1, nullptr);
864 auto* subchannel_ipv4_2 = FindSubchannel(kAddresses[3]);
865 ASSERT_NE(subchannel_ipv4_2, nullptr);
866 auto* subchannel_ipv4_3 = FindSubchannel(kAddresses[4]);
867 ASSERT_NE(subchannel_ipv4_3, nullptr);
868 auto* subchannel_ipv4_4 = FindSubchannel(kAddresses[5]);
869 ASSERT_NE(subchannel_ipv4_4, nullptr);
870 // When the LB policy receives the subchannels' initial connectivity
871 // state notifications (all IDLE), it will request a connection on the
872 // first IPv6 subchannel.
873 EXPECT_TRUE(subchannel_ipv6_1->ConnectionRequested());
874 subchannel_ipv6_1->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
875 // LB policy should have reported CONNECTING state.
876 ExpectConnectingUpdate();
877 // No other subchannels should be connecting.
878 EXPECT_FALSE(subchannel_ipv6_2->ConnectionRequested());
879 EXPECT_FALSE(subchannel_ipv4_1->ConnectionRequested());
880 EXPECT_FALSE(subchannel_ipv4_2->ConnectionRequested());
881 EXPECT_FALSE(subchannel_ipv4_3->ConnectionRequested());
882 EXPECT_FALSE(subchannel_ipv4_4->ConnectionRequested());
883 // The timer fires before the connection attempt completes.
884 IncrementTimeBy(Duration::Milliseconds(250));
885 // This causes the LB policy to start connecting to the first IPv4
886 // subchannel.
887 EXPECT_TRUE(subchannel_ipv4_1->ConnectionRequested());
888 subchannel_ipv4_1->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
889 // LB policy should have reported CONNECTING state.
890 ExpectConnectingUpdate();
891 // No other subchannels should be connecting.
892 EXPECT_FALSE(subchannel_ipv6_2->ConnectionRequested());
893 EXPECT_FALSE(subchannel_ipv4_2->ConnectionRequested());
894 EXPECT_FALSE(subchannel_ipv4_3->ConnectionRequested());
895 EXPECT_FALSE(subchannel_ipv4_4->ConnectionRequested());
896 // The timer fires before the connection attempt completes.
897 IncrementTimeBy(Duration::Milliseconds(250));
898 // This causes the LB policy to start connecting to the second IPv6
899 // subchannel.
900 EXPECT_TRUE(subchannel_ipv6_2->ConnectionRequested());
901 subchannel_ipv6_2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
902 // LB policy should have reported CONNECTING state.
903 ExpectConnectingUpdate();
904 // No other subchannels should be connecting.
905 EXPECT_FALSE(subchannel_ipv4_2->ConnectionRequested());
906 EXPECT_FALSE(subchannel_ipv4_3->ConnectionRequested());
907 EXPECT_FALSE(subchannel_ipv4_4->ConnectionRequested());
908 // The timer fires before the connection attempt completes.
909 IncrementTimeBy(Duration::Milliseconds(250));
910 // This causes the LB policy to start connecting to the second IPv4
911 // subchannel.
912 EXPECT_TRUE(subchannel_ipv4_2->ConnectionRequested());
913 subchannel_ipv4_2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
914 // LB policy should have reported CONNECTING state.
915 ExpectConnectingUpdate();
916 // No other subchannels should be connecting.
917 EXPECT_FALSE(subchannel_ipv4_3->ConnectionRequested());
918 EXPECT_FALSE(subchannel_ipv4_4->ConnectionRequested());
919 // The timer fires before the connection attempt completes.
920 IncrementTimeBy(Duration::Milliseconds(250));
921 // This causes the LB policy to start connecting to the third IPv4
922 // subchannel.
923 EXPECT_TRUE(subchannel_ipv4_3->ConnectionRequested());
924 subchannel_ipv4_3->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
925 // LB policy should have reported CONNECTING state.
926 ExpectConnectingUpdate();
927 // No other subchannels should be connecting.
928 EXPECT_FALSE(subchannel_ipv4_4->ConnectionRequested());
929 // The timer fires before the connection attempt completes.
930 IncrementTimeBy(Duration::Milliseconds(250));
931 // This causes the LB policy to start connecting to the fourth IPv4
932 // subchannel.
933 EXPECT_TRUE(subchannel_ipv4_4->ConnectionRequested());
934 subchannel_ipv4_4->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
935 // LB policy should have reported CONNECTING state.
936 ExpectConnectingUpdate();
937 }
938
TEST_F(PickFirstTest,FirstAddressGoesIdleBeforeSecondOneFails)939 TEST_F(PickFirstTest, FirstAddressGoesIdleBeforeSecondOneFails) {
940 // Send an update containing two addresses.
941 constexpr std::array<absl::string_view, 2> kAddresses = {
942 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
943 absl::Status status = ApplyUpdate(
944 BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
945 EXPECT_TRUE(status.ok()) << status;
946 // LB policy should have created a subchannel for both addresses.
947 auto* subchannel = FindSubchannel(kAddresses[0]);
948 ASSERT_NE(subchannel, nullptr);
949 auto* subchannel2 = FindSubchannel(kAddresses[1]);
950 ASSERT_NE(subchannel2, nullptr);
951 // When the LB policy receives the first subchannel's initial connectivity
952 // state notification (IDLE), it will request a connection.
953 EXPECT_TRUE(subchannel->ConnectionRequested());
954 // This causes the subchannel to start to connect, so it reports
955 // CONNECTING.
956 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
957 // LB policy should have reported CONNECTING state.
958 ExpectConnectingUpdate();
959 // The second subchannel should not be connecting.
960 EXPECT_FALSE(subchannel2->ConnectionRequested());
961 // The first subchannel's connection attempt fails.
962 subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
963 absl::UnavailableError("failed to connect"));
964 // The LB policy will start a connection attempt on the second subchannel.
965 EXPECT_TRUE(subchannel2->ConnectionRequested());
966 // This causes the subchannel to start to connect, so it reports
967 // CONNECTING.
968 subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
969 // LB policy should have reported CONNECTING state.
970 ExpectConnectingUpdate();
971 // Before the second subchannel's attempt completes, the first
972 // subchannel reports IDLE.
973 subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
974 // Now the connection attempt on the second subchannel fails.
975 subchannel2->SetConnectivityState(
976 GRPC_CHANNEL_TRANSIENT_FAILURE,
977 absl::UnavailableError("failed to connect"));
978 // The LB policy should request re-resolution.
979 ExpectReresolutionRequest();
980 // The LB policy will report TRANSIENT_FAILURE.
981 WaitForConnectionFailed([&](const absl::Status& status) {
982 EXPECT_EQ(status, absl::UnavailableError(
983 "failed to connect to all addresses; "
984 "last error: UNAVAILABLE: failed to connect"));
985 });
986 // It will then start connecting to the first address again.
987 EXPECT_TRUE(subchannel->ConnectionRequested());
988 // This time, the connection attempt succeeds.
989 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
990 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
991 // The LB policy will report READY.
992 auto picker = ExpectState(GRPC_CHANNEL_READY);
993 ASSERT_NE(picker, nullptr);
994 // Picker should return the same subchannel repeatedly.
995 for (size_t i = 0; i < 3; ++i) {
996 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[0]);
997 }
998 }
999
TEST_F(PickFirstTest,GoesIdleWhenConnectionFailsThenCanReconnect)1000 TEST_F(PickFirstTest, GoesIdleWhenConnectionFailsThenCanReconnect) {
1001 // Send an update containing two addresses.
1002 constexpr std::array<absl::string_view, 2> kAddresses = {
1003 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
1004 absl::Status status = ApplyUpdate(
1005 BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
1006 EXPECT_TRUE(status.ok()) << status;
1007 // LB policy should have created a subchannel for both addresses.
1008 auto* subchannel = FindSubchannel(kAddresses[0]);
1009 ASSERT_NE(subchannel, nullptr);
1010 auto* subchannel2 = FindSubchannel(kAddresses[1]);
1011 ASSERT_NE(subchannel2, nullptr);
1012 // When the LB policy receives the first subchannel's initial connectivity
1013 // state notification (IDLE), it will request a connection.
1014 EXPECT_TRUE(subchannel->ConnectionRequested());
1015 // This causes the subchannel to start to connect, so it reports CONNECTING.
1016 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
1017 // LB policy should have reported CONNECTING state.
1018 ExpectConnectingUpdate();
1019 // The second subchannel should not be connecting.
1020 EXPECT_FALSE(subchannel2->ConnectionRequested());
1021 // When the first subchannel becomes connected, it reports READY.
1022 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
1023 // The LB policy will report CONNECTING some number of times (doesn't
1024 // matter how many) and then report READY.
1025 auto picker = WaitForConnected();
1026 ASSERT_NE(picker, nullptr);
1027 // Picker should return the same subchannel repeatedly.
1028 for (size_t i = 0; i < 3; ++i) {
1029 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[0]);
1030 }
1031 // Connection fails.
1032 subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
1033 // We should see a re-resolution request.
1034 ExpectReresolutionRequest();
1035 // LB policy reports IDLE with a queueing picker.
1036 ExpectStateAndQueuingPicker(GRPC_CHANNEL_IDLE);
1037 // By checking the picker, we told the LB policy to trigger a new
1038 // connection attempt, so it should start over with the first
1039 // subchannel.
1040 // Note that the picker will have enqueued the ExitIdle() call in the
1041 // WorkSerializer, so the first flush will execute that call. But
1042 // executing that call will result in enqueueing subchannel
1043 // connectivity state notifications, so we need to flush again to make
1044 // sure all of that work is done before we continue.
1045 WaitForWorkSerializerToFlush();
1046 WaitForWorkSerializerToFlush();
1047 EXPECT_TRUE(subchannel->ConnectionRequested());
1048 // The subchannel starts connecting.
1049 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
1050 // LB policy should have reported CONNECTING state.
1051 ExpectConnectingUpdate();
1052 // Subchannel succeeds in connecting.
1053 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
1054 // LB policy reports READY.
1055 picker = WaitForConnected();
1056 ASSERT_NE(picker, nullptr);
1057 // Picker should return the same subchannel repeatedly.
1058 for (size_t i = 0; i < 3; ++i) {
1059 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[0]);
1060 }
1061 }
1062
TEST_F(PickFirstTest,WithShuffle)1063 TEST_F(PickFirstTest, WithShuffle) {
1064 constexpr std::array<absl::string_view, 6> kAddresses = {
1065 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444", "ipv4:127.0.0.1:445",
1066 "ipv4:127.0.0.1:446", "ipv4:127.0.0.1:447", "ipv4:127.0.0.1:448"};
1067 // 6 addresses have 6! = 720 permutations or roughly 0.14% chance that
1068 // the shuffle returns same permutation. We allow for several tries to
1069 // prevent flake test.
1070 constexpr size_t kMaxTries = 10;
1071 std::vector<absl::string_view> addresses_after_update;
1072 bool shuffled = false;
1073 for (size_t i = 0; i < kMaxTries; ++i) {
1074 absl::Status status = ApplyUpdate(
1075 BuildUpdate(kAddresses, MakePickFirstConfig(true)), lb_policy());
1076 EXPECT_TRUE(status.ok()) << status;
1077 GetOrderAddressesArePicked(kAddresses, &addresses_after_update);
1078 if (absl::MakeConstSpan(addresses_after_update) !=
1079 absl::MakeConstSpan(kAddresses)) {
1080 shuffled = true;
1081 break;
1082 }
1083 }
1084 ASSERT_TRUE(shuffled);
1085 // Address order should be stable between updates
1086 std::vector<absl::string_view> addresses_on_another_try;
1087 GetOrderAddressesArePicked(kAddresses, &addresses_on_another_try);
1088 EXPECT_EQ(addresses_on_another_try, addresses_after_update);
1089 }
1090
TEST_F(PickFirstTest,ShufflingDisabled)1091 TEST_F(PickFirstTest, ShufflingDisabled) {
1092 constexpr std::array<absl::string_view, 6> kAddresses = {
1093 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444", "ipv4:127.0.0.1:445",
1094 "ipv4:127.0.0.1:446", "ipv4:127.0.0.1:447", "ipv4:127.0.0.1:448"};
1095 constexpr static size_t kMaxAttempts = 5;
1096 for (size_t attempt = 0; attempt < kMaxAttempts; ++attempt) {
1097 absl::Status status = ApplyUpdate(
1098 BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
1099 EXPECT_TRUE(status.ok()) << status;
1100 std::vector<absl::string_view> address_order;
1101 GetOrderAddressesArePicked(kAddresses, &address_order);
1102 EXPECT_THAT(address_order, ::testing::ElementsAreArray(kAddresses));
1103 }
1104 }
1105
TEST_F(PickFirstTest,MetricDefinitionDisconnections)1106 TEST_F(PickFirstTest, MetricDefinitionDisconnections) {
1107 const auto* descriptor =
1108 GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
1109 "grpc.lb.pick_first.disconnections");
1110 ASSERT_NE(descriptor, nullptr);
1111 EXPECT_EQ(descriptor->value_type,
1112 GlobalInstrumentsRegistry::ValueType::kUInt64);
1113 EXPECT_EQ(descriptor->instrument_type,
1114 GlobalInstrumentsRegistry::InstrumentType::kCounter);
1115 EXPECT_EQ(descriptor->enable_by_default, false);
1116 EXPECT_EQ(descriptor->name, "grpc.lb.pick_first.disconnections");
1117 EXPECT_EQ(descriptor->unit, "{disconnection}");
1118 EXPECT_THAT(descriptor->label_keys, ::testing::ElementsAre("grpc.target"));
1119 EXPECT_THAT(descriptor->optional_label_keys, ::testing::ElementsAre());
1120 }
1121
TEST_F(PickFirstTest,MetricDefinitionConnectionAttemptsSucceeded)1122 TEST_F(PickFirstTest, MetricDefinitionConnectionAttemptsSucceeded) {
1123 const auto* descriptor =
1124 GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
1125 "grpc.lb.pick_first.connection_attempts_succeeded");
1126 ASSERT_NE(descriptor, nullptr);
1127 EXPECT_EQ(descriptor->value_type,
1128 GlobalInstrumentsRegistry::ValueType::kUInt64);
1129 EXPECT_EQ(descriptor->instrument_type,
1130 GlobalInstrumentsRegistry::InstrumentType::kCounter);
1131 EXPECT_EQ(descriptor->enable_by_default, false);
1132 EXPECT_EQ(descriptor->name,
1133 "grpc.lb.pick_first.connection_attempts_succeeded");
1134 EXPECT_EQ(descriptor->unit, "{attempt}");
1135 EXPECT_THAT(descriptor->label_keys, ::testing::ElementsAre("grpc.target"));
1136 EXPECT_THAT(descriptor->optional_label_keys, ::testing::ElementsAre());
1137 }
1138
TEST_F(PickFirstTest,MetricDefinitionConnectionAttemptsFailed)1139 TEST_F(PickFirstTest, MetricDefinitionConnectionAttemptsFailed) {
1140 const auto* descriptor =
1141 GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
1142 "grpc.lb.pick_first.connection_attempts_failed");
1143 ASSERT_NE(descriptor, nullptr);
1144 EXPECT_EQ(descriptor->value_type,
1145 GlobalInstrumentsRegistry::ValueType::kUInt64);
1146 EXPECT_EQ(descriptor->instrument_type,
1147 GlobalInstrumentsRegistry::InstrumentType::kCounter);
1148 EXPECT_EQ(descriptor->enable_by_default, false);
1149 EXPECT_EQ(descriptor->name, "grpc.lb.pick_first.connection_attempts_failed");
1150 EXPECT_EQ(descriptor->unit, "{attempt}");
1151 EXPECT_THAT(descriptor->label_keys, ::testing::ElementsAre("grpc.target"));
1152 EXPECT_THAT(descriptor->optional_label_keys, ::testing::ElementsAre());
1153 }
1154
TEST_F(PickFirstTest,MetricValues)1155 TEST_F(PickFirstTest, MetricValues) {
1156 const auto kDisconnections =
1157 GlobalInstrumentsRegistryTestPeer::FindUInt64CounterHandleByName(
1158 "grpc.lb.pick_first.disconnections")
1159 .value();
1160 const auto kConnectionAttemptsSucceeded =
1161 GlobalInstrumentsRegistryTestPeer::FindUInt64CounterHandleByName(
1162 "grpc.lb.pick_first.connection_attempts_succeeded")
1163 .value();
1164 const auto kConnectionAttemptsFailed =
1165 GlobalInstrumentsRegistryTestPeer::FindUInt64CounterHandleByName(
1166 "grpc.lb.pick_first.connection_attempts_failed")
1167 .value();
1168 const absl::string_view kLabelValues[] = {target_};
1169 auto stats_plugin = std::make_shared<FakeStatsPlugin>(
1170 nullptr, /*use_disabled_by_default_metrics=*/true);
1171 stats_plugin_group_.AddStatsPlugin(stats_plugin, nullptr);
1172 // Send an update containing two addresses.
1173 constexpr std::array<absl::string_view, 2> kAddresses = {
1174 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
1175 absl::Status status = ApplyUpdate(
1176 BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
1177 EXPECT_TRUE(status.ok()) << status;
1178 // LB policy should have created a subchannel for both addresses.
1179 auto* subchannel = FindSubchannel(kAddresses[0]);
1180 ASSERT_NE(subchannel, nullptr);
1181 auto* subchannel2 = FindSubchannel(kAddresses[1]);
1182 ASSERT_NE(subchannel2, nullptr);
1183 // When the LB policy receives the first subchannel's initial connectivity
1184 // state notification (IDLE), it will request a connection.
1185 EXPECT_TRUE(subchannel->ConnectionRequested());
1186 // This causes the subchannel to start to connect, so it reports
1187 // CONNECTING.
1188 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
1189 // LB policy should have reported CONNECTING state.
1190 ExpectConnectingUpdate();
1191 // The second subchannel should not be connecting.
1192 EXPECT_FALSE(subchannel2->ConnectionRequested());
1193 // The first subchannel's connection attempt fails.
1194 subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
1195 absl::UnavailableError("failed to connect"));
1196 EXPECT_THAT(stats_plugin->GetCounterValue(kConnectionAttemptsFailed,
1197 kLabelValues, {}),
1198 ::testing::Optional(1));
1199 // The LB policy will start a connection attempt on the second subchannel.
1200 EXPECT_TRUE(subchannel2->ConnectionRequested());
1201 // This causes the subchannel to start to connect, so it reports
1202 // CONNECTING.
1203 subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
1204 // The connection attempt succeeds.
1205 subchannel2->SetConnectivityState(GRPC_CHANNEL_READY);
1206 EXPECT_THAT(stats_plugin->GetCounterValue(kConnectionAttemptsSucceeded,
1207 kLabelValues, {}),
1208 ::testing::Optional(1));
1209 // The LB policy will report CONNECTING some number of times (doesn't
1210 // matter how many) and then report READY.
1211 auto picker = WaitForConnected();
1212 ASSERT_NE(picker, nullptr);
1213 // Picker should return the same subchannel repeatedly.
1214 for (size_t i = 0; i < 3; ++i) {
1215 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[1]);
1216 }
1217 // Now the subchannel becomes disconnected.
1218 subchannel2->SetConnectivityState(GRPC_CHANNEL_IDLE);
1219 ExpectReresolutionRequest();
1220 ExpectState(GRPC_CHANNEL_IDLE);
1221 EXPECT_THAT(stats_plugin->GetCounterValue(kDisconnections, kLabelValues, {}),
1222 ::testing::Optional(1));
1223 }
1224
1225 class PickFirstHealthCheckingEnabledTest : public PickFirstTest {
1226 protected:
PickFirstHealthCheckingEnabledTest()1227 PickFirstHealthCheckingEnabledTest()
1228 : PickFirstTest(ChannelArgs().Set(
1229 GRPC_ARG_INTERNAL_PICK_FIRST_ENABLE_HEALTH_CHECKING, true)) {}
1230 };
1231
TEST_F(PickFirstHealthCheckingEnabledTest,UpdateWithReadyChannel)1232 TEST_F(PickFirstHealthCheckingEnabledTest, UpdateWithReadyChannel) {
1233 constexpr absl::string_view kAddress = "ipv4:127.0.0.1:443";
1234 LoadBalancingPolicy::UpdateArgs update =
1235 BuildUpdate({kAddress}, MakePickFirstConfig());
1236 absl::Status status = ApplyUpdate(update, lb_policy());
1237 EXPECT_TRUE(status.ok()) << status;
1238 // LB policy should have created a subchannel for the address.
1239 auto* subchannel = FindSubchannel(kAddress);
1240 ASSERT_NE(subchannel, nullptr);
1241 // When the LB policy receives the first subchannel's initial connectivity
1242 // state notification (IDLE), it will request a connection.
1243 EXPECT_TRUE(subchannel->ConnectionRequested());
1244 // This causes the subchannel to start to connect, so it reports CONNECTING.
1245 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
1246 // LB policy should have reported CONNECTING state.
1247 ExpectConnectingUpdate();
1248 // When the subchannel becomes connected, it reports READY.
1249 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
1250 // The LB policy will report CONNECTING some number of times (doesn't
1251 // matter how many) and then report READY.
1252 auto picker = WaitForConnected();
1253 ASSERT_NE(picker, nullptr);
1254 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddress);
1255 // Reapply the same update we did before. The the underlying
1256 // subchannel will immediately become ready.
1257 status =
1258 ApplyUpdate(BuildUpdate({kAddress}, MakePickFirstConfig()), lb_policy());
1259 EXPECT_TRUE(status.ok()) << status;
1260 picker = ExpectState(GRPC_CHANNEL_READY);
1261 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddress);
1262 // At this point, NumWatchers() should account for our
1263 // subchannel connectivity watcher and our health watcher.
1264 EXPECT_EQ(subchannel->NumWatchers(), 2);
1265 }
1266
1267 } // namespace
1268 } // namespace testing
1269 } // namespace grpc_core
1270
main(int argc,char ** argv)1271 int main(int argc, char** argv) {
1272 ::testing::InitGoogleTest(&argc, argv);
1273 grpc::testing::TestEnvironment env(&argc, argv);
1274 return RUN_ALL_TESTS();
1275 }
1276