1 //
2 // Copyright 2020 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 // FIXME: add tests:
18 // - cache eviction via cleanup timer (based on age)
19 // - RLS channel is down; wait_for_ready request is sent and RLS request fails
20 // and goes into backoff; RLS channel comes back up before backoff timer
21 // fires; request is processed at that point
22 // - find some deterministic way to exercise adaptive throttler code
23
24 #include <deque>
25 #include <map>
26 #include <thread>
27
28 #include <gmock/gmock.h>
29 #include <gtest/gtest.h>
30
31 #include "absl/strings/str_format.h"
32 #include "absl/strings/str_join.h"
33 #include "absl/types/optional.h"
34
35 #include <grpcpp/channel.h>
36 #include <grpcpp/create_channel.h>
37 #include <grpcpp/security/credentials.h>
38 #include <grpcpp/server.h>
39 #include <grpcpp/server_builder.h>
40 #include <grpcpp/support/channel_arguments.h>
41
42 #include "src/core/client_channel/backup_poller.h"
43 #include "src/core/lib/address_utils/parse_address.h"
44 #include "src/core/lib/channel/channel_args.h"
45 #include "src/core/lib/config/config_vars.h"
46 #include "src/core/lib/gprpp/env.h"
47 #include "src/core/lib/gprpp/host_port.h"
48 #include "src/core/lib/gprpp/time.h"
49 #include "src/core/lib/iomgr/sockaddr.h"
50 #include "src/core/lib/security/credentials/fake/fake_credentials.h"
51 #include "src/core/lib/uri/uri_parser.h"
52 #include "src/core/load_balancing/rls/rls.h"
53 #include "src/core/resolver/fake/fake_resolver.h"
54 #include "src/core/service_config/service_config_impl.h"
55 #include "src/cpp/server/secure_server_credentials.h"
56 #include "src/proto/grpc/lookup/v1/rls.grpc.pb.h"
57 #include "src/proto/grpc/lookup/v1/rls.pb.h"
58 #include "src/proto/grpc/testing/echo.grpc.pb.h"
59 #include "test/core/util/fake_stats_plugin.h"
60 #include "test/core/util/port.h"
61 #include "test/core/util/resolve_localhost_ip46.h"
62 #include "test/core/util/test_config.h"
63 #include "test/core/util/test_lb_policies.h"
64 #include "test/cpp/end2end/counted_service.h"
65 #include "test/cpp/end2end/rls_server.h"
66 #include "test/cpp/end2end/test_service_impl.h"
67 #include "test/cpp/util/credentials.h"
68 #include "test/cpp/util/test_config.h"
69
70 using ::grpc::lookup::v1::RouteLookupRequest;
71
72 namespace grpc {
73 namespace testing {
74 namespace {
75
76 const char* kServerName = "test.google.fr";
77 const char* kRequestMessage = "Live long and prosper.";
78 const char* kRlsInstanceUuid = "rls_instance_uuid";
79
80 const char* kCallCredsMdKey = "call_cred_name";
81 const char* kCallCredsMdValue = "call_cred_value";
82
83 const char* kTestKey = "test_key";
84 const char* kTestValue = "test_value";
85 const char* kHostKey = "host_key";
86 const char* kServiceKey = "service_key";
87 const char* kServiceValue = "grpc.testing.EchoTestService";
88 const char* kMethodKey = "method_key";
89 const char* kMethodValue = "Echo";
90 const char* kConstantKey = "constant_key";
91 const char* kConstantValue = "constant_value";
92
93 using BackendService = CountedService<TestServiceImpl>;
94
95 // Subclass of TestServiceImpl that increments a request counter for
96 // every call to the Echo Rpc.
97 class MyTestServiceImpl : public BackendService {
98 public:
Echo(ServerContext * context,const EchoRequest * request,EchoResponse * response)99 Status Echo(ServerContext* context, const EchoRequest* request,
100 EchoResponse* response) override {
101 // Backend should see call creds.
102 EXPECT_THAT(context->client_metadata(),
103 ::testing::Contains(
104 ::testing::Pair(kCallCredsMdKey, kCallCredsMdValue)));
105 IncreaseRequestCount();
106 auto client_metadata = context->client_metadata();
107 auto range = client_metadata.equal_range("x-google-rls-data");
108 {
109 grpc::internal::MutexLock lock(&mu_);
110 for (auto it = range.first; it != range.second; ++it) {
111 rls_header_data_.insert(
112 std::string(it->second.begin(), it->second.length()));
113 }
114 }
115 IncreaseResponseCount();
116 return TestServiceImpl::Echo(context, request, response);
117 }
118
rls_data()119 std::set<std::string> rls_data() {
120 grpc::internal::MutexLock lock(&mu_);
121 return std::move(rls_header_data_);
122 }
123
Start()124 void Start() {}
125
Shutdown()126 void Shutdown() {}
127
128 private:
129 grpc::internal::Mutex mu_;
130 std::set<std::string> rls_header_data_ ABSL_GUARDED_BY(&mu_);
131 };
132
133 class FakeResolverResponseGeneratorWrapper {
134 public:
FakeResolverResponseGeneratorWrapper()135 FakeResolverResponseGeneratorWrapper()
136 : response_generator_(grpc_core::MakeRefCounted<
137 grpc_core::FakeResolverResponseGenerator>()) {}
138
SetNextResolution(absl::string_view service_config_json)139 void SetNextResolution(absl::string_view service_config_json) {
140 grpc_core::ExecCtx exec_ctx;
141 response_generator_->SetResponseSynchronously(
142 BuildFakeResults(service_config_json));
143 }
144
Get() const145 grpc_core::FakeResolverResponseGenerator* Get() const {
146 return response_generator_.get();
147 }
148
149 private:
BuildFakeResults(absl::string_view service_config_json)150 static grpc_core::Resolver::Result BuildFakeResults(
151 absl::string_view service_config_json) {
152 grpc_core::Resolver::Result result;
153 result.service_config =
154 grpc_core::ServiceConfigImpl::Create(result.args, service_config_json);
155 EXPECT_TRUE(result.service_config.ok()) << result.service_config.status();
156 return result;
157 }
158
159 grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
160 response_generator_;
161 };
162
163 class RlsEnd2endTest : public ::testing::Test {
164 protected:
SetUpTestSuite()165 static void SetUpTestSuite() {
166 grpc_core::ConfigVars::Overrides overrides;
167 overrides.client_channel_backup_poll_interval_ms = 1;
168 grpc_core::ConfigVars::SetOverrides(overrides);
169 grpc_core::CoreConfiguration::RegisterBuilder(
170 grpc_core::RegisterFixedAddressLoadBalancingPolicy);
171 grpc_init();
172 }
173
TearDownTestSuite()174 static void TearDownTestSuite() {
175 grpc_shutdown_blocking();
176 grpc_core::CoreConfiguration::Reset();
177 }
178
SetUp()179 void SetUp() override {
180 rls_server_ = std::make_unique<ServerThread<RlsServiceImpl>>(
181 "rls", [](grpc::ServerContext* ctx) {
182 EXPECT_THAT(ctx->client_metadata(),
183 ::testing::Contains(
184 ::testing::Pair(kCallCredsMdKey, kCallCredsMdValue)));
185 EXPECT_EQ(ctx->ExperimentalGetAuthority(), kServerName);
186 });
187 rls_server_->Start();
188 rls_server_target_ = absl::StrFormat("localhost:%d", rls_server_->port_);
189 // Set up client.
190 resolver_response_generator_ =
191 std::make_unique<FakeResolverResponseGeneratorWrapper>();
192 ChannelArguments args;
193 args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
194 resolver_response_generator_->Get());
195 args.SetString(GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS, kServerName);
196 args.SetString(GRPC_ARG_TEST_ONLY_RLS_INSTANCE_ID, kRlsInstanceUuid);
197 grpc_channel_credentials* channel_creds =
198 grpc_fake_transport_security_credentials_create();
199 grpc_call_credentials* call_creds = grpc_md_only_test_credentials_create(
200 kCallCredsMdKey, kCallCredsMdValue);
201 auto creds = std::make_shared<TestCompositeChannelCredentials>(
202 channel_creds, call_creds);
203 call_creds->Unref();
204 channel_creds->Unref();
205 target_uri_ = absl::StrCat("fake:///", kServerName);
206 channel_ = grpc::CreateCustomChannel(target_uri_, creds, args);
207 stub_ = grpc::testing::EchoTestService::NewStub(channel_);
208 }
209
TearDown()210 void TearDown() override {
211 ShutdownBackends();
212 rls_server_->Shutdown();
213 }
214
ShutdownBackends()215 void ShutdownBackends() {
216 for (auto& server : backends_) {
217 server->Shutdown();
218 }
219 }
220
StartBackends(size_t num_servers)221 void StartBackends(size_t num_servers) {
222 backends_.clear();
223 for (size_t i = 0; i < num_servers; ++i) {
224 backends_.push_back(
225 std::make_unique<ServerThread<MyTestServiceImpl>>("backend"));
226 backends_.back()->Start();
227 }
228 }
229
230 struct RpcOptions {
231 int timeout_ms = 2000;
232 bool wait_for_ready = false;
233 std::vector<std::pair<std::string, std::string>> metadata;
234
RpcOptionsgrpc::testing::__anond59ed2ae0111::RlsEnd2endTest::RpcOptions235 RpcOptions() {}
236
set_timeout_msgrpc::testing::__anond59ed2ae0111::RlsEnd2endTest::RpcOptions237 RpcOptions& set_timeout_ms(int rpc_timeout_ms) {
238 timeout_ms = rpc_timeout_ms;
239 return *this;
240 }
241
set_wait_for_readygrpc::testing::__anond59ed2ae0111::RlsEnd2endTest::RpcOptions242 RpcOptions& set_wait_for_ready(bool rpc_wait_for_ready) {
243 wait_for_ready = rpc_wait_for_ready;
244 return *this;
245 }
246
set_metadatagrpc::testing::__anond59ed2ae0111::RlsEnd2endTest::RpcOptions247 RpcOptions& set_metadata(
248 std::vector<std::pair<std::string, std::string>> rpc_metadata) {
249 metadata = std::move(rpc_metadata);
250 return *this;
251 }
252
253 // Populates context.
SetupRpcgrpc::testing::__anond59ed2ae0111::RlsEnd2endTest::RpcOptions254 void SetupRpc(ClientContext* context) const {
255 for (const auto& item : metadata) {
256 context->AddMetadata(item.first, item.second);
257 }
258 if (timeout_ms != 0) {
259 context->set_deadline(
260 grpc_timeout_milliseconds_to_deadline(timeout_ms));
261 }
262 if (wait_for_ready) context->set_wait_for_ready(true);
263 }
264 };
265
SendRpc(const RpcOptions & rpc_options=RpcOptions (),EchoResponse * response=nullptr)266 Status SendRpc(const RpcOptions& rpc_options = RpcOptions(),
267 EchoResponse* response = nullptr) {
268 EchoResponse local_response;
269 if (response == nullptr) response = &local_response;
270 ClientContext context;
271 rpc_options.SetupRpc(&context);
272 EchoRequest request;
273 request.set_message(kRequestMessage);
274 return stub_->Echo(&context, request, response);
275 }
276
CheckRpcSendOk(const grpc_core::DebugLocation & location,const RpcOptions & rpc_options=RpcOptions ())277 void CheckRpcSendOk(const grpc_core::DebugLocation& location,
278 const RpcOptions& rpc_options = RpcOptions()) {
279 EchoResponse response;
280 Status status = SendRpc(rpc_options, &response);
281 ASSERT_TRUE(status.ok()) << location.file() << ":" << location.line()
282 << ": RPC failed: " << status.error_code() << ": "
283 << status.error_message();
284 EXPECT_EQ(response.message(), kRequestMessage)
285 << location.file() << ":" << location.line();
286 }
287
CheckRpcSendFailure(const grpc_core::DebugLocation & location,StatusCode expected_code,absl::string_view expected_message,const RpcOptions & rpc_options=RpcOptions ())288 void CheckRpcSendFailure(const grpc_core::DebugLocation& location,
289 StatusCode expected_code,
290 absl::string_view expected_message,
291 const RpcOptions& rpc_options = RpcOptions()) {
292 Status status = SendRpc(rpc_options);
293 ASSERT_FALSE(status.ok()) << location.file() << ":" << location.line();
294 EXPECT_EQ(expected_code, status.error_code())
295 << location.file() << ":" << location.line();
296 EXPECT_EQ(expected_message, status.error_message())
297 << location.file() << ":" << location.line();
298 }
299
300 class ServiceConfigBuilder {
301 public:
ServiceConfigBuilder(absl::string_view rls_server_target)302 explicit ServiceConfigBuilder(absl::string_view rls_server_target)
303 : rls_server_target_(rls_server_target) {}
304
set_lookup_service_timeout(grpc_core::Duration timeout)305 ServiceConfigBuilder& set_lookup_service_timeout(
306 grpc_core::Duration timeout) {
307 lookup_service_timeout_ = timeout * grpc_test_slowdown_factor();
308 return *this;
309 }
310
set_default_target(std::string default_target)311 ServiceConfigBuilder& set_default_target(std::string default_target) {
312 default_target_ = std::move(default_target);
313 return *this;
314 }
315
set_max_age(grpc_core::Duration max_age)316 ServiceConfigBuilder& set_max_age(grpc_core::Duration max_age) {
317 max_age_ = max_age * grpc_test_slowdown_factor();
318 return *this;
319 }
320
set_stale_age(grpc_core::Duration stale_age)321 ServiceConfigBuilder& set_stale_age(grpc_core::Duration stale_age) {
322 stale_age_ = stale_age * grpc_test_slowdown_factor();
323 return *this;
324 }
325
set_cache_size_bytes(int64_t size)326 ServiceConfigBuilder& set_cache_size_bytes(int64_t size) {
327 cache_size_bytes_ = size;
328 return *this;
329 }
330
AddKeyBuilder(absl::string_view key_builder)331 ServiceConfigBuilder& AddKeyBuilder(absl::string_view key_builder) {
332 key_builders_.push_back(absl::StrCat("{", key_builder, "}"));
333 return *this;
334 }
335
Build()336 std::string Build() {
337 // First build parts of routeLookupConfig.
338 std::vector<std::string> route_lookup_config_parts;
339 route_lookup_config_parts.push_back(absl::StrFormat(
340 " \"lookupService\":\"%s\"", rls_server_target_));
341 if (lookup_service_timeout_ > grpc_core::Duration::Zero()) {
342 route_lookup_config_parts.push_back(
343 absl::StrFormat(" \"lookupServiceTimeout\":\"%fs\"",
344 lookup_service_timeout_.seconds()));
345 }
346 if (!default_target_.empty()) {
347 route_lookup_config_parts.push_back(absl::StrFormat(
348 " \"defaultTarget\":\"%s\"", default_target_));
349 }
350 route_lookup_config_parts.push_back(absl::StrFormat(
351 " \"cacheSizeBytes\":%" PRId64, cache_size_bytes_));
352 if (max_age_ > grpc_core::Duration::Zero()) {
353 route_lookup_config_parts.push_back(
354 absl::StrFormat(" \"maxAge\":\"%fs\"", max_age_.seconds()));
355 }
356 if (stale_age_ > grpc_core::Duration::Zero()) {
357 route_lookup_config_parts.push_back(absl::StrFormat(
358 " \"staleAge\":\"%fs\"", stale_age_.seconds()));
359 }
360 if (!key_builders_.empty()) {
361 route_lookup_config_parts.push_back(
362 absl::StrFormat(" \"grpcKeybuilders\":[%s]",
363 absl::StrJoin(key_builders_, ",")));
364 }
365 // Now build parts of RLS LB policy config.
366 std::vector<std::string> rls_config_parts;
367 if (!route_lookup_config_parts.empty()) {
368 rls_config_parts.push_back(absl::StrCat(
369 " \"routeLookupConfig\":{",
370 absl::StrJoin(route_lookup_config_parts, ","), " }"));
371 }
372 rls_config_parts.push_back(
373 " \"childPolicy\":[{"
374 " \"fixed_address_lb\":{}\n"
375 " }],\n"
376 " \"childPolicyConfigTargetFieldName\":\"address\"\n");
377 // Put it all together.
378 return absl::StrCat(
379 "{"
380 " \"loadBalancingConfig\":[{"
381 " \"rls_experimental\":{",
382 absl::StrJoin(rls_config_parts, ","),
383 " }"
384 " }]"
385 "}");
386 }
387
388 private:
389 absl::string_view rls_server_target_;
390 grpc_core::Duration lookup_service_timeout_;
391 std::string default_target_;
392 grpc_core::Duration max_age_;
393 grpc_core::Duration stale_age_;
394 int64_t cache_size_bytes_ = 10485760;
395 std::vector<std::string> key_builders_;
396 };
397
MakeServiceConfigBuilder()398 ServiceConfigBuilder MakeServiceConfigBuilder() {
399 return ServiceConfigBuilder(rls_server_target_);
400 }
401
SetNextResolution(absl::string_view service_config_json)402 void SetNextResolution(absl::string_view service_config_json) {
403 resolver_response_generator_->SetNextResolution(service_config_json);
404 }
405
406 template <typename T>
407 struct ServerThread {
408 template <typename... Args>
ServerThreadgrpc::testing::__anond59ed2ae0111::RlsEnd2endTest::ServerThread409 explicit ServerThread(const grpc::string& type, Args&&... args)
410 : port_(grpc_pick_unused_port_or_die()),
411 type_(type),
412 service_(std::forward<Args>(args)...) {}
413
Startgrpc::testing::__anond59ed2ae0111::RlsEnd2endTest::ServerThread414 void Start() {
415 gpr_log(GPR_INFO, "starting %s server on port %d", type_.c_str(), port_);
416 GPR_ASSERT(!running_);
417 running_ = true;
418 service_.Start();
419 grpc::internal::Mutex mu;
420 // We need to acquire the lock here in order to prevent the notify_one
421 // by ServerThread::Serve from firing before the wait below is hit.
422 grpc::internal::MutexLock lock(&mu);
423 grpc::internal::CondVar cond;
424 thread_ = std::make_unique<std::thread>(
425 std::bind(&ServerThread::Serve, this, &mu, &cond));
426 cond.Wait(&mu);
427 gpr_log(GPR_INFO, "%s server startup complete", type_.c_str());
428 }
429
Servegrpc::testing::__anond59ed2ae0111::RlsEnd2endTest::ServerThread430 void Serve(grpc::internal::Mutex* mu, grpc::internal::CondVar* cond) {
431 // We need to acquire the lock here in order to prevent the notify_one
432 // below from firing before its corresponding wait is executed.
433 grpc::internal::MutexLock lock(mu);
434 ServerBuilder builder;
435 auto creds = std::make_shared<SecureServerCredentials>(
436 grpc_fake_transport_security_server_credentials_create());
437 builder.AddListeningPort(absl::StrCat("localhost:", port_),
438 std::move(creds));
439 builder.RegisterService(&service_);
440 server_ = builder.BuildAndStart();
441 cond->Signal();
442 }
443
Shutdowngrpc::testing::__anond59ed2ae0111::RlsEnd2endTest::ServerThread444 void Shutdown() {
445 if (!running_) return;
446 gpr_log(GPR_INFO, "%s about to shutdown", type_.c_str());
447 service_.Shutdown();
448 server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
449 thread_->join();
450 gpr_log(GPR_INFO, "%s shutdown completed", type_.c_str());
451 running_ = false;
452 }
453
454 const int port_;
455 grpc::string type_;
456 T service_;
457 std::unique_ptr<Server> server_;
458 std::unique_ptr<std::thread> thread_;
459 bool running_ = false;
460 };
461
462 std::vector<std::unique_ptr<ServerThread<MyTestServiceImpl>>> backends_;
463 std::string rls_server_target_;
464 std::unique_ptr<ServerThread<RlsServiceImpl>> rls_server_;
465 std::unique_ptr<FakeResolverResponseGeneratorWrapper>
466 resolver_response_generator_;
467 std::string target_uri_;
468 std::shared_ptr<grpc::Channel> channel_;
469 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
470 };
471
TEST_F(RlsEnd2endTest,Basic)472 TEST_F(RlsEnd2endTest, Basic) {
473 StartBackends(1);
474 SetNextResolution(
475 MakeServiceConfigBuilder()
476 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
477 " \"service\":\"%s\","
478 " \"method\":\"%s\""
479 "}],"
480 "\"headers\":["
481 " {"
482 " \"key\":\"%s\","
483 " \"names\":["
484 " \"key1\""
485 " ]"
486 " }"
487 "]",
488 kServiceValue, kMethodValue, kTestKey))
489 .Build());
490 rls_server_->service_.SetResponse(
491 BuildRlsRequest({{kTestKey, kTestValue}}),
492 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
493 CheckRpcSendOk(DEBUG_LOCATION,
494 RpcOptions().set_metadata({{"key1", kTestValue}}));
495 EXPECT_EQ(rls_server_->service_.request_count(), 1);
496 EXPECT_EQ(rls_server_->service_.response_count(), 1);
497 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
498 // No RLS header seen by the backend, since the RLS response didn't set any.
499 EXPECT_THAT(backends_[0]->service_.rls_data(), ::testing::ElementsAre());
500 }
501
TEST_F(RlsEnd2endTest,DuplicateHeadersAreMerged)502 TEST_F(RlsEnd2endTest, DuplicateHeadersAreMerged) {
503 const char* kTestValue2 = "test_value_2";
504 StartBackends(1);
505 SetNextResolution(
506 MakeServiceConfigBuilder()
507 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
508 " \"service\":\"%s\","
509 " \"method\":\"%s\""
510 "}],"
511 "\"headers\":["
512 " {"
513 " \"key\":\"%s\","
514 " \"names\":["
515 " \"key1\""
516 " ]"
517 " }"
518 "]",
519 kServiceValue, kMethodValue, kTestKey))
520 .Build());
521 rls_server_->service_.SetResponse(
522 BuildRlsRequest({{kTestKey, absl::StrCat(kTestValue, ",", kTestValue2)}}),
523 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
524 // Same header present twice in the request. Values should be merged.
525 CheckRpcSendOk(
526 DEBUG_LOCATION,
527 RpcOptions().set_metadata({{"key1", kTestValue}, {"key1", kTestValue2}}));
528 EXPECT_EQ(rls_server_->service_.request_count(), 1);
529 EXPECT_EQ(rls_server_->service_.response_count(), 1);
530 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
531 }
532
TEST_F(RlsEnd2endTest,SecondHeaderUsed)533 TEST_F(RlsEnd2endTest, SecondHeaderUsed) {
534 StartBackends(1);
535 SetNextResolution(
536 MakeServiceConfigBuilder()
537 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
538 " \"service\":\"%s\","
539 " \"method\":\"%s\""
540 "}],"
541 "\"headers\":["
542 " {"
543 " \"key\":\"%s\","
544 " \"names\":["
545 " \"key1\", \"key2\""
546 " ]"
547 " }"
548 "]",
549 kServiceValue, kMethodValue, kTestKey))
550 .Build());
551 rls_server_->service_.SetResponse(
552 BuildRlsRequest({{kTestKey, kTestValue}}),
553 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
554 CheckRpcSendOk(DEBUG_LOCATION,
555 RpcOptions().set_metadata({{"key2", kTestValue}}));
556 EXPECT_EQ(rls_server_->service_.request_count(), 1);
557 EXPECT_EQ(rls_server_->service_.response_count(), 1);
558 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
559 }
560
TEST_F(RlsEnd2endTest,MultipleHeaderKeys)561 TEST_F(RlsEnd2endTest, MultipleHeaderKeys) {
562 const char* kTestKey2 = "test_key_2";
563 const char* kTestValue2 = "test_value_2";
564 StartBackends(1);
565 SetNextResolution(MakeServiceConfigBuilder()
566 .AddKeyBuilder(absl::StrFormat(
567 "\"names\":[{"
568 " \"service\":\"%s\","
569 " \"method\":\"%s\""
570 "}],"
571 "\"headers\":["
572 " {"
573 " \"key\":\"%s\","
574 " \"names\":["
575 " \"key1\""
576 " ]"
577 " },"
578 " {"
579 " \"key\":\"%s\","
580 " \"names\":["
581 " \"key2\""
582 " ]"
583 " }"
584 "]",
585 kServiceValue, kMethodValue, kTestKey, kTestKey2))
586 .Build());
587 rls_server_->service_.SetResponse(
588 BuildRlsRequest({
589 {kTestKey, kTestValue},
590 {kTestKey2, kTestValue2},
591 }),
592 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
593 CheckRpcSendOk(
594 DEBUG_LOCATION,
595 RpcOptions().set_metadata({{"key1", kTestValue}, {"key2", kTestValue2}}));
596 EXPECT_EQ(rls_server_->service_.request_count(), 1);
597 EXPECT_EQ(rls_server_->service_.response_count(), 1);
598 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
599 // No RLS header seen by the backend, since the RLS response didn't set any.
600 EXPECT_THAT(backends_[0]->service_.rls_data(), ::testing::ElementsAre());
601 }
602
TEST_F(RlsEnd2endTest,NoHeaderMatch)603 TEST_F(RlsEnd2endTest, NoHeaderMatch) {
604 StartBackends(1);
605 SetNextResolution(
606 MakeServiceConfigBuilder()
607 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
608 " \"service\":\"%s\","
609 " \"method\":\"%s\""
610 "}],"
611 "\"headers\":["
612 " {"
613 " \"key\":\"%s\","
614 " \"names\":["
615 " \"key1\""
616 " ]"
617 " }"
618 "]",
619 kServiceValue, kMethodValue, kTestKey))
620 .Build());
621 rls_server_->service_.SetResponse(
622 BuildRlsRequest({}),
623 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
624 // Request does not have header "key1", so kTestKey will not be added.
625 CheckRpcSendOk(DEBUG_LOCATION);
626 EXPECT_EQ(rls_server_->service_.request_count(), 1);
627 EXPECT_EQ(rls_server_->service_.response_count(), 1);
628 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
629 }
630
TEST_F(RlsEnd2endTest,WildcardMethod)631 TEST_F(RlsEnd2endTest, WildcardMethod) {
632 StartBackends(1);
633 SetNextResolution(MakeServiceConfigBuilder()
634 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
635 " \"service\":\"%s\""
636 "}],"
637 "\"headers\":["
638 " {"
639 " \"key\":\"%s\","
640 " \"names\":["
641 " \"key1\""
642 " ]"
643 " }"
644 "]",
645 kServiceValue, kTestKey))
646 .Build());
647 rls_server_->service_.SetResponse(
648 BuildRlsRequest({{kTestKey, kTestValue}}),
649 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
650 CheckRpcSendOk(DEBUG_LOCATION,
651 RpcOptions().set_metadata({{"key1", kTestValue}}));
652 EXPECT_EQ(rls_server_->service_.request_count(), 1);
653 EXPECT_EQ(rls_server_->service_.response_count(), 1);
654 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
655 }
656
TEST_F(RlsEnd2endTest,NoKeyBuilderForMethod)657 TEST_F(RlsEnd2endTest, NoKeyBuilderForMethod) {
658 StartBackends(1);
659 SetNextResolution(
660 MakeServiceConfigBuilder()
661 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
662 " \"service\":\"%s\","
663 " \"method\":\"some_other_method\""
664 "}],"
665 "\"headers\":["
666 " {"
667 " \"key\":\"%s\","
668 " \"names\":["
669 " \"key1\""
670 " ]"
671 " }"
672 "]",
673 kServiceValue, kTestKey))
674 .Build());
675 rls_server_->service_.SetResponse(
676 BuildRlsRequest({}),
677 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
678 CheckRpcSendOk(DEBUG_LOCATION);
679 EXPECT_EQ(rls_server_->service_.request_count(), 1);
680 EXPECT_EQ(rls_server_->service_.response_count(), 1);
681 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
682 }
683
TEST_F(RlsEnd2endTest,HeaderData)684 TEST_F(RlsEnd2endTest, HeaderData) {
685 const char* kHeaderData = "header_data";
686 StartBackends(1);
687 SetNextResolution(
688 MakeServiceConfigBuilder()
689 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
690 " \"service\":\"%s\","
691 " \"method\":\"%s\""
692 "}],"
693 "\"headers\":["
694 " {"
695 " \"key\":\"%s\","
696 " \"names\":["
697 " \"key1\""
698 " ]"
699 " }"
700 "]",
701 kServiceValue, kMethodValue, kTestKey))
702 .Build());
703 rls_server_->service_.SetResponse(
704 BuildRlsRequest({{kTestKey, kTestValue}}),
705 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)},
706 kHeaderData));
707 CheckRpcSendOk(DEBUG_LOCATION,
708 RpcOptions().set_metadata({{"key1", kTestValue}}));
709 EXPECT_EQ(rls_server_->service_.request_count(), 1);
710 EXPECT_EQ(rls_server_->service_.response_count(), 1);
711 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
712 EXPECT_THAT(backends_[0]->service_.rls_data(),
713 ::testing::ElementsAre(kHeaderData));
714 }
715
TEST_F(RlsEnd2endTest,ExtraKeysAndConstantKeys)716 TEST_F(RlsEnd2endTest, ExtraKeysAndConstantKeys) {
717 StartBackends(1);
718 SetNextResolution(
719 MakeServiceConfigBuilder()
720 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
721 " \"service\":\"%s\","
722 " \"method\":\"%s\""
723 "}],"
724 "\"headers\":["
725 " {"
726 " \"key\":\"%s\","
727 " \"names\":["
728 " \"key1\",\"key2\",\"key3\""
729 " ]"
730 " }"
731 "],"
732 "\"extraKeys\":{"
733 " \"host\":\"%s\","
734 " \"service\":\"%s\","
735 " \"method\":\"%s\""
736 "},"
737 "\"constantKeys\":{"
738 " \"%s\":\"%s\""
739 "}",
740 kServiceValue, kMethodValue, kTestKey,
741 kHostKey, kServiceKey, kMethodKey,
742 kConstantKey, kConstantValue))
743 .Build());
744 rls_server_->service_.SetResponse(
745 BuildRlsRequest({
746 {kTestKey, kTestValue},
747 {kHostKey, kServerName},
748 {kServiceKey, kServiceValue},
749 {kMethodKey, kMethodValue},
750 {kConstantKey, kConstantValue},
751 }),
752 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
753 CheckRpcSendOk(DEBUG_LOCATION,
754 RpcOptions().set_metadata({{"key1", kTestValue}}));
755 EXPECT_EQ(rls_server_->service_.request_count(), 1);
756 EXPECT_EQ(rls_server_->service_.response_count(), 1);
757 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
758 }
759
TEST_F(RlsEnd2endTest,TwoCacheEntriesWithSameTarget)760 TEST_F(RlsEnd2endTest, TwoCacheEntriesWithSameTarget) {
761 const char* kTestValue2 = "test_value2";
762 StartBackends(1);
763 SetNextResolution(
764 MakeServiceConfigBuilder()
765 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
766 " \"service\":\"%s\","
767 " \"method\":\"%s\""
768 "}],"
769 "\"headers\":["
770 " {"
771 " \"key\":\"%s\","
772 " \"names\":["
773 " \"key1\""
774 " ]"
775 " }"
776 "]",
777 kServiceValue, kMethodValue, kTestKey))
778 .Build());
779 rls_server_->service_.SetResponse(
780 BuildRlsRequest({{kTestKey, kTestValue}}),
781 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
782 rls_server_->service_.SetResponse(
783 BuildRlsRequest({{kTestKey, kTestValue2}}),
784 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
785 CheckRpcSendOk(DEBUG_LOCATION,
786 RpcOptions().set_metadata({{"key1", kTestValue}}));
787 EXPECT_EQ(rls_server_->service_.request_count(), 1);
788 EXPECT_EQ(rls_server_->service_.response_count(), 1);
789 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
790 CheckRpcSendOk(DEBUG_LOCATION,
791 RpcOptions().set_metadata({{"key1", kTestValue2}}));
792 EXPECT_EQ(rls_server_->service_.request_count(), 2);
793 EXPECT_EQ(rls_server_->service_.response_count(), 2);
794 EXPECT_EQ(backends_[0]->service_.request_count(), 2);
795 }
796
TEST_F(RlsEnd2endTest,FailedRlsRequestWithoutDefaultTarget)797 TEST_F(RlsEnd2endTest, FailedRlsRequestWithoutDefaultTarget) {
798 StartBackends(1);
799 SetNextResolution(
800 MakeServiceConfigBuilder()
801 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
802 " \"service\":\"%s\","
803 " \"method\":\"%s\""
804 "}],"
805 "\"headers\":["
806 " {"
807 " \"key\":\"%s\","
808 " \"names\":["
809 " \"key1\""
810 " ]"
811 " }"
812 "]",
813 kServiceValue, kMethodValue, kTestKey))
814 .Build());
815 // The test below has one RLS RPC fail and then a subsequent one that
816 // should succeed. However, once the first RPC fails, the adaptive
817 // throttling code will throttle the second RPC with about 11% probability,
818 // which would cause the test to be flaky. To avoid that, we seed the
819 // throttling state by sending two successful RPCs before we start the
820 // real test, which ensures that the second RPC of the real test will
821 // not be throttled (with 3 successes and 1 failure, the throttling
822 // probability will be negative, so the subsequent request will never be
823 // throttled).
824 const char* kTestValue2 = "test_value_2";
825 const char* kTestValue3 = "test_value_3";
826 rls_server_->service_.SetResponse(
827 BuildRlsRequest({{kTestKey, kTestValue2}}),
828 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
829 rls_server_->service_.SetResponse(
830 BuildRlsRequest({{kTestKey, kTestValue3}}),
831 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
832 CheckRpcSendOk(DEBUG_LOCATION,
833 RpcOptions().set_metadata({{"key1", kTestValue2}}));
834 CheckRpcSendOk(DEBUG_LOCATION,
835 RpcOptions().set_metadata({{"key1", kTestValue3}}));
836 // Now start the real test.
837 // Send an RPC before we give the RLS server a response.
838 // The RLS request will fail, and thus so will the data plane RPC.
839 CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
840 "RLS request failed: INTERNAL: no response entry",
841 RpcOptions().set_metadata({{"key1", kTestValue}}));
842 EXPECT_THAT(
843 rls_server_->service_.GetUnmatchedRequests(),
844 ::testing::ElementsAre(
845 // TODO(roth): Change this to use ::testing::ProtoEquals()
846 // once that becomes available in OSS.
847 ::testing::Property(
848 &RouteLookupRequest::DebugString,
849 BuildRlsRequest({{kTestKey, kTestValue}}).DebugString())));
850 // Now give the RLS server the right response.
851 rls_server_->service_.SetResponse(
852 BuildRlsRequest({{kTestKey, kTestValue}}),
853 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
854 // Sleep long enough for backoff to elapse, then try another RPC.
855 gpr_sleep_until(grpc_timeout_seconds_to_deadline(3));
856 CheckRpcSendOk(DEBUG_LOCATION,
857 RpcOptions().set_metadata({{"key1", kTestValue}}));
858 EXPECT_EQ(rls_server_->service_.request_count(), 4);
859 EXPECT_EQ(rls_server_->service_.response_count(), 3);
860 EXPECT_EQ(backends_[0]->service_.request_count(), 3);
861 }
862
TEST_F(RlsEnd2endTest,FailedRlsRequestWithDefaultTarget)863 TEST_F(RlsEnd2endTest, FailedRlsRequestWithDefaultTarget) {
864 StartBackends(1);
865 SetNextResolution(
866 MakeServiceConfigBuilder()
867 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
868 " \"service\":\"%s\","
869 " \"method\":\"%s\""
870 "}],"
871 "\"headers\":["
872 " {"
873 " \"key\":\"%s\","
874 " \"names\":["
875 " \"key1\""
876 " ]"
877 " }"
878 "]",
879 kServiceValue, kMethodValue, kTestKey))
880 .set_default_target(grpc_core::LocalIpUri(backends_[0]->port_))
881 .Build());
882 // Don't give the RLS server a response, so the RLS request will fail.
883 // The data plane RPC should be sent to the default target.
884 CheckRpcSendOk(DEBUG_LOCATION,
885 RpcOptions().set_metadata({{"key1", kTestValue}}));
886 EXPECT_THAT(
887 rls_server_->service_.GetUnmatchedRequests(),
888 ::testing::ElementsAre(
889 // TODO(roth): Change this to use ::testing::ProtoEquals()
890 // once that becomes available in OSS.
891 ::testing::Property(
892 &RouteLookupRequest::DebugString,
893 BuildRlsRequest({{kTestKey, kTestValue}}).DebugString())));
894 EXPECT_EQ(rls_server_->service_.request_count(), 1);
895 EXPECT_EQ(rls_server_->service_.response_count(), 0);
896 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
897 }
898
TEST_F(RlsEnd2endTest,RlsRequestTimeout)899 TEST_F(RlsEnd2endTest, RlsRequestTimeout) {
900 StartBackends(2);
901 SetNextResolution(
902 MakeServiceConfigBuilder()
903 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
904 " \"service\":\"%s\","
905 " \"method\":\"%s\""
906 "}],"
907 "\"headers\":["
908 " {"
909 " \"key\":\"%s\","
910 " \"names\":["
911 " \"key1\""
912 " ]"
913 " }"
914 "]",
915 kServiceValue, kMethodValue, kTestKey))
916 .set_default_target(grpc_core::LocalIpUri(backends_[1]->port_))
917 .set_lookup_service_timeout(grpc_core::Duration::Seconds(2))
918 .Build());
919 // RLS server will send a response, but it's longer than the timeout.
920 rls_server_->service_.SetResponse(
921 BuildRlsRequest({{kTestKey, kTestValue}}),
922 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}),
923 /*response_delay=*/grpc_core::Duration::Seconds(3));
924 // The data plane RPC should be sent to the default target.
925 CheckRpcSendOk(DEBUG_LOCATION, RpcOptions().set_timeout_ms(4000).set_metadata(
926 {{"key1", kTestValue}}));
927 EXPECT_EQ(rls_server_->service_.request_count(), 1);
928 EXPECT_EQ(backends_[0]->service_.request_count(), 0);
929 EXPECT_EQ(backends_[1]->service_.request_count(), 1);
930 }
931
TEST_F(RlsEnd2endTest,UpdateConfig)932 TEST_F(RlsEnd2endTest, UpdateConfig) {
933 StartBackends(2);
934 auto service_config_builder =
935 MakeServiceConfigBuilder()
936 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
937 " \"service\":\"%s\","
938 " \"method\":\"%s\""
939 "}],"
940 "\"headers\":["
941 " {"
942 " \"key\":\"%s\","
943 " \"names\":["
944 " \"key1\""
945 " ]"
946 " }"
947 "]",
948 kServiceValue, kMethodValue, kTestKey))
949 .set_default_target(grpc_core::LocalIpUri(backends_[0]->port_));
950 SetNextResolution(service_config_builder.Build());
951 // Don't give the RLS server a response, so the RLS request will fail.
952 // The data plane RPC should be sent to the default target.
953 CheckRpcSendOk(DEBUG_LOCATION,
954 RpcOptions().set_metadata({{"key1", kTestValue}}));
955 EXPECT_THAT(
956 rls_server_->service_.GetUnmatchedRequests(),
957 ::testing::ElementsAre(
958 // TODO(roth): Change this to use ::testing::ProtoEquals()
959 // once that becomes available in OSS.
960 ::testing::Property(
961 &RouteLookupRequest::DebugString,
962 BuildRlsRequest({{kTestKey, kTestValue}}).DebugString())));
963 EXPECT_EQ(rls_server_->service_.request_count(), 1);
964 EXPECT_EQ(rls_server_->service_.response_count(), 0);
965 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
966 EXPECT_EQ(backends_[1]->service_.request_count(), 0);
967 // Now update the config to point to a new default target.
968 service_config_builder.set_default_target(
969 grpc_core::LocalIpUri(backends_[1]->port_));
970 SetNextResolution(service_config_builder.Build());
971 // Send another RPC, which should go to the new default target.
972 // The RLS server will *not* see another request, because the cache
973 // entry is still in backoff.
974 CheckRpcSendOk(DEBUG_LOCATION,
975 RpcOptions().set_metadata({{"key1", kTestValue}}));
976 EXPECT_EQ(rls_server_->service_.request_count(), 1);
977 EXPECT_EQ(rls_server_->service_.response_count(), 0);
978 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
979 EXPECT_EQ(backends_[1]->service_.request_count(), 1);
980 }
981
TEST_F(RlsEnd2endTest,CachedResponse)982 TEST_F(RlsEnd2endTest, CachedResponse) {
983 StartBackends(1);
984 SetNextResolution(
985 MakeServiceConfigBuilder()
986 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
987 " \"service\":\"%s\","
988 " \"method\":\"%s\""
989 "}],"
990 "\"headers\":["
991 " {"
992 " \"key\":\"%s\","
993 " \"names\":["
994 " \"key1\""
995 " ]"
996 " }"
997 "]",
998 kServiceValue, kMethodValue, kTestKey))
999 .Build());
1000 rls_server_->service_.SetResponse(
1001 BuildRlsRequest({{kTestKey, kTestValue}}),
1002 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
1003 // Send two RPCs.
1004 CheckRpcSendOk(DEBUG_LOCATION,
1005 RpcOptions().set_metadata({{"key1", kTestValue}}));
1006 CheckRpcSendOk(DEBUG_LOCATION,
1007 RpcOptions().set_metadata({{"key1", kTestValue}}));
1008 // The RLS server should have seen only one request.
1009 EXPECT_EQ(rls_server_->service_.request_count(), 1);
1010 EXPECT_EQ(rls_server_->service_.response_count(), 1);
1011 EXPECT_EQ(backends_[0]->service_.request_count(), 2);
1012 }
1013
TEST_F(RlsEnd2endTest,StaleCacheEntry)1014 TEST_F(RlsEnd2endTest, StaleCacheEntry) {
1015 StartBackends(1);
1016 SetNextResolution(
1017 MakeServiceConfigBuilder()
1018 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1019 " \"service\":\"%s\","
1020 " \"method\":\"%s\""
1021 "}],"
1022 "\"headers\":["
1023 " {"
1024 " \"key\":\"%s\","
1025 " \"names\":["
1026 " \"key1\""
1027 " ]"
1028 " }"
1029 "]",
1030 kServiceValue, kMethodValue, kTestKey))
1031 .set_max_age(grpc_core::Duration::Seconds(5))
1032 .set_stale_age(grpc_core::Duration::Seconds(1))
1033 .Build());
1034 rls_server_->service_.SetResponse(
1035 BuildRlsRequest({{kTestKey, kTestValue}}),
1036 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
1037 // Send one RPC. RLS server gets a request, and RPC goes to backend.
1038 CheckRpcSendOk(DEBUG_LOCATION,
1039 RpcOptions().set_metadata({{"key1", kTestValue}}));
1040 EXPECT_EQ(rls_server_->service_.request_count(), 1);
1041 EXPECT_EQ(rls_server_->service_.response_count(), 1);
1042 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1043 // Update RLS server to expect stale request.
1044 rls_server_->service_.RemoveResponse(
1045 BuildRlsRequest({{kTestKey, kTestValue}}));
1046 rls_server_->service_.SetResponse(
1047 BuildRlsRequest({{kTestKey, kTestValue}},
1048 RouteLookupRequest::REASON_STALE),
1049 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
1050 // Wait longer than stale age.
1051 gpr_sleep_until(grpc_timeout_seconds_to_deadline(2));
1052 // Send another RPC. This should use the stale value but should
1053 // dispatch a second RLS request.
1054 CheckRpcSendOk(DEBUG_LOCATION,
1055 RpcOptions().set_metadata({{"key1", kTestValue}}));
1056 EXPECT_EQ(backends_[0]->service_.request_count(), 2);
1057 // Wait for RLS server to receive the second request.
1058 gpr_sleep_until(grpc_timeout_seconds_to_deadline(2));
1059 EXPECT_EQ(rls_server_->service_.request_count(), 2);
1060 EXPECT_EQ(rls_server_->service_.response_count(), 2);
1061 }
1062
TEST_F(RlsEnd2endTest,StaleCacheEntryWithHeaderData)1063 TEST_F(RlsEnd2endTest, StaleCacheEntryWithHeaderData) {
1064 const char* kHeaderData = "header_data";
1065 StartBackends(1);
1066 SetNextResolution(
1067 MakeServiceConfigBuilder()
1068 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1069 " \"service\":\"%s\","
1070 " \"method\":\"%s\""
1071 "}],"
1072 "\"headers\":["
1073 " {"
1074 " \"key\":\"%s\","
1075 " \"names\":["
1076 " \"key1\""
1077 " ]"
1078 " }"
1079 "]",
1080 kServiceValue, kMethodValue, kTestKey))
1081 .set_max_age(grpc_core::Duration::Seconds(5))
1082 .set_stale_age(grpc_core::Duration::Seconds(1))
1083 .Build());
1084 rls_server_->service_.SetResponse(
1085 BuildRlsRequest({{kTestKey, kTestValue}}),
1086 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)},
1087 kHeaderData));
1088 // Send one RPC. RLS server gets a request, and RPC goes to backend.
1089 CheckRpcSendOk(DEBUG_LOCATION,
1090 RpcOptions().set_metadata({{"key1", kTestValue}}));
1091 EXPECT_EQ(rls_server_->service_.request_count(), 1);
1092 EXPECT_EQ(rls_server_->service_.response_count(), 1);
1093 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1094 // Update RLS server to expect stale request.
1095 rls_server_->service_.RemoveResponse(
1096 BuildRlsRequest({{kTestKey, kTestValue}}));
1097 rls_server_->service_.SetResponse(
1098 BuildRlsRequest({{kTestKey, kTestValue}},
1099 RouteLookupRequest::REASON_STALE, kHeaderData),
1100 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)},
1101 kHeaderData));
1102 // Wait longer than stale age.
1103 gpr_sleep_until(grpc_timeout_seconds_to_deadline(2));
1104 // Send another RPC. This should use the stale value but should
1105 // dispatch a second RLS request.
1106 CheckRpcSendOk(DEBUG_LOCATION,
1107 RpcOptions().set_metadata({{"key1", kTestValue}}));
1108 EXPECT_EQ(backends_[0]->service_.request_count(), 2);
1109 // Wait for RLS server to receive the second request.
1110 gpr_sleep_until(grpc_timeout_seconds_to_deadline(2));
1111 EXPECT_EQ(rls_server_->service_.request_count(), 2);
1112 EXPECT_EQ(rls_server_->service_.response_count(), 2);
1113 }
1114
TEST_F(RlsEnd2endTest,ExpiredCacheEntry)1115 TEST_F(RlsEnd2endTest, ExpiredCacheEntry) {
1116 StartBackends(1);
1117 SetNextResolution(
1118 MakeServiceConfigBuilder()
1119 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1120 " \"service\":\"%s\","
1121 " \"method\":\"%s\""
1122 "}],"
1123 "\"headers\":["
1124 " {"
1125 " \"key\":\"%s\","
1126 " \"names\":["
1127 " \"key1\""
1128 " ]"
1129 " }"
1130 "]",
1131 kServiceValue, kMethodValue, kTestKey))
1132 .set_max_age(grpc_core::Duration::Seconds(1))
1133 .set_lookup_service_timeout(grpc_core::Duration::Seconds(1))
1134 .Build());
1135 rls_server_->service_.SetResponse(
1136 BuildRlsRequest({{kTestKey, kTestValue}}),
1137 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
1138 // Send one RPC. RLS server gets a request, and RPC goes to backend.
1139 CheckRpcSendOk(DEBUG_LOCATION,
1140 RpcOptions().set_metadata({{"key1", kTestValue}}));
1141 EXPECT_EQ(rls_server_->service_.request_count(), 1);
1142 EXPECT_EQ(rls_server_->service_.response_count(), 1);
1143 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1144 // Remove response from RLS server so that the next RLS request fails.
1145 rls_server_->service_.RemoveResponse(
1146 BuildRlsRequest({{kTestKey, kTestValue}}));
1147 // Wait for cache to be expired.
1148 gpr_sleep_until(grpc_timeout_seconds_to_deadline(2));
1149 // Send another RPC. This should trigger a second RLS request, but
1150 // that fails, so the RPC fails.
1151 CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
1152 "RLS request failed: INTERNAL: no response entry",
1153 RpcOptions().set_metadata({{"key1", kTestValue}}));
1154 EXPECT_EQ(rls_server_->service_.request_count(), 2);
1155 EXPECT_EQ(rls_server_->service_.response_count(), 1);
1156 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1157 }
1158
TEST_F(RlsEnd2endTest,CacheSizeLimit)1159 TEST_F(RlsEnd2endTest, CacheSizeLimit) {
1160 const char* kTestValue2 = "test_value_2";
1161 StartBackends(2);
1162 SetNextResolution(
1163 MakeServiceConfigBuilder()
1164 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1165 " \"service\":\"%s\","
1166 " \"method\":\"%s\""
1167 "}],"
1168 "\"headers\":["
1169 " {"
1170 " \"key\":\"%s\","
1171 " \"names\":["
1172 " \"key1\""
1173 " ]"
1174 " }"
1175 "]",
1176 kServiceValue, kMethodValue,
1177 kTestKey))
1178 .set_cache_size_bytes(1) // Not even big enough for one entry.
1179 .Build());
1180 // Set RLS responses for both kTestValue and kTestValue2.
1181 rls_server_->service_.SetResponse(
1182 BuildRlsRequest({{kTestKey, kTestValue}}),
1183 BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
1184 rls_server_->service_.SetResponse(
1185 BuildRlsRequest({{kTestKey, kTestValue2}}),
1186 BuildRlsResponse({grpc_core::LocalIpUri(backends_[1]->port_)}));
1187 // Send an RPC for kTestValue.
1188 // RLS server gets a request, and RPC goes to backend.
1189 CheckRpcSendOk(DEBUG_LOCATION,
1190 RpcOptions().set_metadata({{"key1", kTestValue}}));
1191 EXPECT_EQ(rls_server_->service_.request_count(), 1);
1192 EXPECT_EQ(rls_server_->service_.response_count(), 1);
1193 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1194 EXPECT_EQ(backends_[1]->service_.request_count(), 0);
1195 // A second RPC for kTestValue should not generate another RLS
1196 // request, because the cache entry is held by min_eviction_time.
1197 CheckRpcSendOk(DEBUG_LOCATION,
1198 RpcOptions().set_metadata({{"key1", kTestValue}}));
1199 EXPECT_EQ(rls_server_->service_.request_count(), 1);
1200 EXPECT_EQ(rls_server_->service_.response_count(), 1);
1201 EXPECT_EQ(backends_[0]->service_.request_count(), 2);
1202 EXPECT_EQ(backends_[1]->service_.request_count(), 0);
1203 // Wait for min_eviction_time to elapse.
1204 gpr_sleep_until(grpc_timeout_seconds_to_deadline(6));
1205 // Send a request for kTestValue2.
1206 // RLS server gets a request, and RPC goes to backend.
1207 // This causes the entry for kTestValue to be evicted.
1208 CheckRpcSendOk(DEBUG_LOCATION,
1209 RpcOptions().set_metadata({{"key1", kTestValue2}}));
1210 EXPECT_EQ(rls_server_->service_.request_count(), 2);
1211 EXPECT_EQ(rls_server_->service_.response_count(), 2);
1212 EXPECT_EQ(backends_[0]->service_.request_count(), 2);
1213 EXPECT_EQ(backends_[1]->service_.request_count(), 1);
1214 // Send another RPC for kTestValue.
1215 // This should now trigger a new RLS request.
1216 CheckRpcSendOk(DEBUG_LOCATION,
1217 RpcOptions().set_metadata({{"key1", kTestValue}}));
1218 EXPECT_EQ(rls_server_->service_.request_count(), 3);
1219 EXPECT_EQ(rls_server_->service_.response_count(), 3);
1220 EXPECT_EQ(backends_[0]->service_.request_count(), 3);
1221 EXPECT_EQ(backends_[1]->service_.request_count(), 1);
1222 // Another RPC for kTestValue2 should still work due to min_eviction_time.
1223 CheckRpcSendOk(DEBUG_LOCATION,
1224 RpcOptions().set_metadata({{"key1", kTestValue2}}));
1225 EXPECT_EQ(rls_server_->service_.request_count(), 3);
1226 EXPECT_EQ(rls_server_->service_.response_count(), 3);
1227 EXPECT_EQ(backends_[0]->service_.request_count(), 3);
1228 EXPECT_EQ(backends_[1]->service_.request_count(), 2);
1229 }
1230
TEST_F(RlsEnd2endTest,MultipleTargets)1231 TEST_F(RlsEnd2endTest, MultipleTargets) {
1232 StartBackends(1);
1233 SetNextResolution(
1234 MakeServiceConfigBuilder()
1235 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1236 " \"service\":\"%s\","
1237 " \"method\":\"%s\""
1238 "}],"
1239 "\"headers\":["
1240 " {"
1241 " \"key\":\"%s\","
1242 " \"names\":["
1243 " \"key1\""
1244 " ]"
1245 " }"
1246 "]",
1247 kServiceValue, kMethodValue, kTestKey))
1248 .Build());
1249 rls_server_->service_.SetResponse(
1250 BuildRlsRequest({{kTestKey, kTestValue}}),
1251 BuildRlsResponse(
1252 // Second target will report TRANSIENT_FAILURE, but should
1253 // never be used.
1254 {grpc_core::LocalIpUri(backends_[0]->port_), "invalid_target"}));
1255 CheckRpcSendOk(DEBUG_LOCATION,
1256 RpcOptions().set_metadata({{"key1", kTestValue}}));
1257 EXPECT_EQ(rls_server_->service_.request_count(), 1);
1258 EXPECT_EQ(rls_server_->service_.response_count(), 1);
1259 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1260 }
1261
TEST_F(RlsEnd2endTest,MultipleTargetsFirstInTransientFailure)1262 TEST_F(RlsEnd2endTest, MultipleTargetsFirstInTransientFailure) {
1263 StartBackends(1);
1264 SetNextResolution(
1265 MakeServiceConfigBuilder()
1266 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1267 " \"service\":\"%s\","
1268 " \"method\":\"%s\""
1269 "}],"
1270 "\"headers\":["
1271 " {"
1272 " \"key\":\"%s\","
1273 " \"names\":["
1274 " \"key1\""
1275 " ]"
1276 " }"
1277 "]",
1278 kServiceValue, kMethodValue, kTestKey))
1279 .Build());
1280 rls_server_->service_.SetResponse(
1281 BuildRlsRequest({{kTestKey, kTestValue}}),
1282 BuildRlsResponse(
1283 // First target will report TRANSIENT_FAILURE.
1284 {"invalid_target", grpc_core::LocalIpUri(backends_[0]->port_)}));
1285 CheckRpcSendOk(DEBUG_LOCATION,
1286 RpcOptions().set_metadata({{"key1", kTestValue}}));
1287 EXPECT_EQ(rls_server_->service_.request_count(), 1);
1288 EXPECT_EQ(rls_server_->service_.response_count(), 1);
1289 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1290 }
1291
TEST_F(RlsEnd2endTest,ConnectivityStateReady)1292 TEST_F(RlsEnd2endTest, ConnectivityStateReady) {
1293 StartBackends(1);
1294 SetNextResolution(
1295 MakeServiceConfigBuilder()
1296 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1297 " \"service\":\"%s\","
1298 " \"method\":\"%s\""
1299 "}],"
1300 "\"headers\":["
1301 " {"
1302 " \"key\":\"%s\","
1303 " \"names\":["
1304 " \"key1\""
1305 " ]"
1306 " }"
1307 "]",
1308 kServiceValue, kMethodValue, kTestKey))
1309 .Build());
1310 EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(/*try_to_connect=*/false));
1311 rls_server_->service_.SetResponse(
1312 BuildRlsRequest({{kTestKey, kTestValue}}),
1313 BuildRlsResponse(
1314 // One target in TRANSIENT_FAILURE, the other in READY.
1315 {"invalid_target", grpc_core::LocalIpUri(backends_[0]->port_)}));
1316 CheckRpcSendOk(DEBUG_LOCATION,
1317 RpcOptions().set_metadata({{"key1", kTestValue}}));
1318 EXPECT_EQ(rls_server_->service_.request_count(), 1);
1319 EXPECT_EQ(rls_server_->service_.response_count(), 1);
1320 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1321 EXPECT_EQ(GRPC_CHANNEL_READY, channel_->GetState(/*try_to_connect=*/false));
1322 }
1323
TEST_F(RlsEnd2endTest,ConnectivityStateIdle)1324 TEST_F(RlsEnd2endTest, ConnectivityStateIdle) {
1325 SetNextResolution(
1326 MakeServiceConfigBuilder()
1327 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1328 " \"service\":\"%s\","
1329 " \"method\":\"%s\""
1330 "}],"
1331 "\"headers\":["
1332 " {"
1333 " \"key\":\"%s\","
1334 " \"names\":["
1335 " \"key1\""
1336 " ]"
1337 " }"
1338 "]",
1339 kServiceValue, kMethodValue, kTestKey))
1340 .Build());
1341 EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(/*try_to_connect=*/false));
1342 // RLS server not given any responses, so the request will fail.
1343 CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
1344 "RLS request failed: INTERNAL: no response entry");
1345 // No child policies, so should be IDLE.
1346 EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(/*try_to_connect=*/false));
1347 }
1348
TEST_F(RlsEnd2endTest,ConnectivityStateTransientFailure)1349 TEST_F(RlsEnd2endTest, ConnectivityStateTransientFailure) {
1350 SetNextResolution(
1351 MakeServiceConfigBuilder()
1352 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1353 " \"service\":\"%s\","
1354 " \"method\":\"%s\""
1355 "}],"
1356 "\"headers\":["
1357 " {"
1358 " \"key\":\"%s\","
1359 " \"names\":["
1360 " \"key1\""
1361 " ]"
1362 " }"
1363 "]",
1364 kServiceValue, kMethodValue, kTestKey))
1365 .Build());
1366 EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(/*try_to_connect=*/false));
1367 rls_server_->service_.SetResponse(BuildRlsRequest({{kTestKey, kTestValue}}),
1368 BuildRlsResponse({"invalid_target"}));
1369 CheckRpcSendFailure(
1370 DEBUG_LOCATION, StatusCode::UNAVAILABLE,
1371 "empty address list: no address in fixed_address_lb policy",
1372 RpcOptions().set_metadata({{"key1", kTestValue}}));
1373 EXPECT_EQ(rls_server_->service_.request_count(), 1);
1374 EXPECT_EQ(rls_server_->service_.response_count(), 1);
1375 EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE,
1376 channel_->GetState(/*try_to_connect=*/false));
1377 }
1378
1379 class RlsMetricsEnd2endTest : public RlsEnd2endTest {
1380 protected:
SetUp()1381 void SetUp() override {
1382 // Register stats plugin before initializing client.
1383 stats_plugin_ = grpc_core::FakeStatsPluginBuilder()
1384 .UseDisabledByDefaultMetrics(true)
1385 .BuildAndRegister();
1386 RlsEnd2endTest::SetUp();
1387 }
1388
1389 std::shared_ptr<grpc_core::FakeStatsPlugin> stats_plugin_;
1390 };
1391
TEST_F(RlsMetricsEnd2endTest,MetricDefinitionDefaultTargetPicks)1392 TEST_F(RlsMetricsEnd2endTest, MetricDefinitionDefaultTargetPicks) {
1393 const auto* descriptor =
1394 grpc_core::GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
1395 "grpc.lb.rls.default_target_picks");
1396 ASSERT_NE(descriptor, nullptr);
1397 EXPECT_EQ(descriptor->value_type,
1398 grpc_core::GlobalInstrumentsRegistry::ValueType::kUInt64);
1399 EXPECT_EQ(descriptor->instrument_type,
1400 grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCounter);
1401 EXPECT_EQ(descriptor->enable_by_default, false);
1402 EXPECT_EQ(descriptor->name, "grpc.lb.rls.default_target_picks");
1403 EXPECT_EQ(descriptor->unit, "{pick}");
1404 EXPECT_THAT(descriptor->label_keys,
1405 ::testing::ElementsAre("grpc.target", "grpc.lb.rls.server_target",
1406 "grpc.lb.rls.data_plane_target",
1407 "grpc.lb.pick_result"));
1408 EXPECT_THAT(descriptor->optional_label_keys, ::testing::ElementsAre());
1409 }
1410
TEST_F(RlsMetricsEnd2endTest,MetricDefinitionTargetPicks)1411 TEST_F(RlsMetricsEnd2endTest, MetricDefinitionTargetPicks) {
1412 const auto* descriptor =
1413 grpc_core::GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
1414 "grpc.lb.rls.target_picks");
1415 ASSERT_NE(descriptor, nullptr);
1416 EXPECT_EQ(descriptor->value_type,
1417 grpc_core::GlobalInstrumentsRegistry::ValueType::kUInt64);
1418 EXPECT_EQ(descriptor->instrument_type,
1419 grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCounter);
1420 EXPECT_EQ(descriptor->enable_by_default, false);
1421 EXPECT_EQ(descriptor->name, "grpc.lb.rls.target_picks");
1422 EXPECT_EQ(descriptor->unit, "{pick}");
1423 EXPECT_THAT(descriptor->label_keys,
1424 ::testing::ElementsAre("grpc.target", "grpc.lb.rls.server_target",
1425 "grpc.lb.rls.data_plane_target",
1426 "grpc.lb.pick_result"));
1427 EXPECT_THAT(descriptor->optional_label_keys, ::testing::ElementsAre());
1428 }
1429
TEST_F(RlsMetricsEnd2endTest,MetricDefinitionFailedPicks)1430 TEST_F(RlsMetricsEnd2endTest, MetricDefinitionFailedPicks) {
1431 const auto* descriptor =
1432 grpc_core::GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
1433 "grpc.lb.rls.failed_picks");
1434 ASSERT_NE(descriptor, nullptr);
1435 EXPECT_EQ(descriptor->value_type,
1436 grpc_core::GlobalInstrumentsRegistry::ValueType::kUInt64);
1437 EXPECT_EQ(descriptor->instrument_type,
1438 grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCounter);
1439 EXPECT_EQ(descriptor->enable_by_default, false);
1440 EXPECT_EQ(descriptor->name, "grpc.lb.rls.failed_picks");
1441 EXPECT_EQ(descriptor->unit, "{pick}");
1442 EXPECT_THAT(
1443 descriptor->label_keys,
1444 ::testing::ElementsAre("grpc.target", "grpc.lb.rls.server_target"));
1445 EXPECT_THAT(descriptor->optional_label_keys, ::testing::ElementsAre());
1446 }
1447
TEST_F(RlsMetricsEnd2endTest,MetricDefinitionCacheEntries)1448 TEST_F(RlsMetricsEnd2endTest, MetricDefinitionCacheEntries) {
1449 const auto* descriptor =
1450 grpc_core::GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
1451 "grpc.lb.rls.cache_entries");
1452 ASSERT_NE(descriptor, nullptr);
1453 EXPECT_EQ(descriptor->value_type,
1454 grpc_core::GlobalInstrumentsRegistry::ValueType::kInt64);
1455 EXPECT_EQ(
1456 descriptor->instrument_type,
1457 grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCallbackGauge);
1458 EXPECT_EQ(descriptor->enable_by_default, false);
1459 EXPECT_EQ(descriptor->name, "grpc.lb.rls.cache_entries");
1460 EXPECT_EQ(descriptor->unit, "{entry}");
1461 EXPECT_THAT(descriptor->label_keys,
1462 ::testing::ElementsAre("grpc.target", "grpc.lb.rls.server_target",
1463 "grpc.lb.rls.instance_uuid"));
1464 EXPECT_THAT(descriptor->optional_label_keys, ::testing::ElementsAre());
1465 }
1466
TEST_F(RlsMetricsEnd2endTest,MetricDefinitionCacheSize)1467 TEST_F(RlsMetricsEnd2endTest, MetricDefinitionCacheSize) {
1468 const auto* descriptor =
1469 grpc_core::GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
1470 "grpc.lb.rls.cache_size");
1471 ASSERT_NE(descriptor, nullptr);
1472 EXPECT_EQ(descriptor->value_type,
1473 grpc_core::GlobalInstrumentsRegistry::ValueType::kInt64);
1474 EXPECT_EQ(
1475 descriptor->instrument_type,
1476 grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCallbackGauge);
1477 EXPECT_EQ(descriptor->enable_by_default, false);
1478 EXPECT_EQ(descriptor->name, "grpc.lb.rls.cache_size");
1479 EXPECT_EQ(descriptor->unit, "By");
1480 EXPECT_THAT(descriptor->label_keys,
1481 ::testing::ElementsAre("grpc.target", "grpc.lb.rls.server_target",
1482 "grpc.lb.rls.instance_uuid"));
1483 EXPECT_THAT(descriptor->optional_label_keys, ::testing::ElementsAre());
1484 }
1485
TEST_F(RlsMetricsEnd2endTest,MetricValues)1486 TEST_F(RlsMetricsEnd2endTest, MetricValues) {
1487 auto kMetricTargetPicks =
1488 grpc_core::GlobalInstrumentsRegistryTestPeer::
1489 FindUInt64CounterHandleByName("grpc.lb.rls.target_picks")
1490 .value();
1491 auto kMetricFailedPicks =
1492 grpc_core::GlobalInstrumentsRegistryTestPeer::
1493 FindUInt64CounterHandleByName("grpc.lb.rls.failed_picks")
1494 .value();
1495 auto kMetricCacheEntries =
1496 grpc_core::GlobalInstrumentsRegistryTestPeer::
1497 FindCallbackInt64GaugeHandleByName("grpc.lb.rls.cache_entries")
1498 .value();
1499 auto kMetricCacheSize =
1500 grpc_core::GlobalInstrumentsRegistryTestPeer::
1501 FindCallbackInt64GaugeHandleByName("grpc.lb.rls.cache_size")
1502 .value();
1503 StartBackends(2);
1504 SetNextResolution(
1505 MakeServiceConfigBuilder()
1506 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1507 " \"service\":\"%s\","
1508 " \"method\":\"%s\""
1509 "}],"
1510 "\"headers\":["
1511 " {"
1512 " \"key\":\"%s\","
1513 " \"names\":["
1514 " \"key1\""
1515 " ]"
1516 " }"
1517 "]",
1518 kServiceValue, kMethodValue, kTestKey))
1519 .Build());
1520 const std::string rls_target0 = grpc_core::LocalIpUri(backends_[0]->port_);
1521 const std::string rls_target1 = grpc_core::LocalIpUri(backends_[1]->port_);
1522 // Send an RPC to the target for backend 0.
1523 rls_server_->service_.SetResponse(BuildRlsRequest({{kTestKey, rls_target0}}),
1524 BuildRlsResponse({rls_target0}));
1525 CheckRpcSendOk(DEBUG_LOCATION,
1526 RpcOptions().set_metadata({{"key1", rls_target0}}));
1527 EXPECT_EQ(rls_server_->service_.request_count(), 1);
1528 EXPECT_EQ(rls_server_->service_.response_count(), 1);
1529 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1530 EXPECT_EQ(backends_[1]->service_.request_count(), 0);
1531 // Check exported metrics.
1532 EXPECT_THAT(
1533 stats_plugin_->GetCounterValue(
1534 kMetricTargetPicks,
1535 {target_uri_, rls_server_target_, rls_target0, "complete"}, {}),
1536 ::testing::Optional(1));
1537 EXPECT_THAT(
1538 stats_plugin_->GetCounterValue(
1539 kMetricTargetPicks,
1540 {target_uri_, rls_server_target_, rls_target1, "complete"}, {}),
1541 absl::nullopt);
1542 EXPECT_EQ(stats_plugin_->GetCounterValue(
1543 kMetricFailedPicks, {target_uri_, rls_server_target_}, {}),
1544 absl::nullopt);
1545 stats_plugin_->TriggerCallbacks();
1546 EXPECT_THAT(stats_plugin_->GetCallbackGaugeValue(
1547 kMetricCacheEntries,
1548 {target_uri_, rls_server_target_, kRlsInstanceUuid}, {}),
1549 ::testing::Optional(1));
1550 auto cache_size = stats_plugin_->GetCallbackGaugeValue(
1551 kMetricCacheSize, {target_uri_, rls_server_target_, kRlsInstanceUuid},
1552 {});
1553 EXPECT_THAT(cache_size, ::testing::Optional(::testing::Ge(1)));
1554 // Send an RPC to the target for backend 1.
1555 rls_server_->service_.SetResponse(BuildRlsRequest({{kTestKey, rls_target1}}),
1556 BuildRlsResponse({rls_target1}));
1557 CheckRpcSendOk(DEBUG_LOCATION,
1558 RpcOptions().set_metadata({{"key1", rls_target1}}));
1559 EXPECT_EQ(rls_server_->service_.request_count(), 2);
1560 EXPECT_EQ(rls_server_->service_.response_count(), 2);
1561 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1562 EXPECT_EQ(backends_[1]->service_.request_count(), 1);
1563 // Check exported metrics.
1564 EXPECT_THAT(
1565 stats_plugin_->GetCounterValue(
1566 kMetricTargetPicks,
1567 {target_uri_, rls_server_target_, rls_target0, "complete"}, {}),
1568 ::testing::Optional(1));
1569 EXPECT_THAT(
1570 stats_plugin_->GetCounterValue(
1571 kMetricTargetPicks,
1572 {target_uri_, rls_server_target_, rls_target1, "complete"}, {}),
1573 ::testing::Optional(1));
1574 EXPECT_EQ(stats_plugin_->GetCounterValue(
1575 kMetricFailedPicks, {target_uri_, rls_server_target_}, {}),
1576 absl::nullopt);
1577 stats_plugin_->TriggerCallbacks();
1578 EXPECT_THAT(stats_plugin_->GetCallbackGaugeValue(
1579 kMetricCacheEntries,
1580 {target_uri_, rls_server_target_, kRlsInstanceUuid}, {}),
1581 ::testing::Optional(2));
1582 auto cache_size2 = stats_plugin_->GetCallbackGaugeValue(
1583 kMetricCacheSize, {target_uri_, rls_server_target_, kRlsInstanceUuid},
1584 {});
1585 EXPECT_THAT(cache_size2, ::testing::Optional(::testing::Ge(2)));
1586 if (cache_size.has_value() && cache_size2.has_value()) {
1587 EXPECT_GT(*cache_size2, *cache_size);
1588 }
1589 // Send an RPC for which the RLS server has no response, which means
1590 // that the RLS request will fail. There is no default target, so the
1591 // data plane RPC will fail.
1592 CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
1593 "RLS request failed: INTERNAL: no response entry",
1594 RpcOptions().set_metadata({{"key1", kTestValue}}));
1595 EXPECT_THAT(
1596 rls_server_->service_.GetUnmatchedRequests(),
1597 ::testing::ElementsAre(
1598 // TODO(roth): Change this to use ::testing::ProtoEquals()
1599 // once that becomes available in OSS.
1600 ::testing::Property(
1601 &RouteLookupRequest::DebugString,
1602 BuildRlsRequest({{kTestKey, kTestValue}}).DebugString())));
1603 EXPECT_EQ(rls_server_->service_.request_count(), 3);
1604 EXPECT_EQ(rls_server_->service_.response_count(), 2);
1605 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1606 EXPECT_EQ(backends_[1]->service_.request_count(), 1);
1607 // Check exported metrics.
1608 EXPECT_THAT(
1609 stats_plugin_->GetCounterValue(
1610 kMetricTargetPicks,
1611 {target_uri_, rls_server_target_, rls_target0, "complete"}, {}),
1612 ::testing::Optional(1));
1613 EXPECT_THAT(
1614 stats_plugin_->GetCounterValue(
1615 kMetricTargetPicks,
1616 {target_uri_, rls_server_target_, rls_target1, "complete"}, {}),
1617 ::testing::Optional(1));
1618 EXPECT_THAT(stats_plugin_->GetCounterValue(
1619 kMetricFailedPicks, {target_uri_, rls_server_target_}, {}),
1620 ::testing::Optional(1));
1621 stats_plugin_->TriggerCallbacks();
1622 EXPECT_THAT(stats_plugin_->GetCallbackGaugeValue(
1623 kMetricCacheEntries,
1624 {target_uri_, rls_server_target_, kRlsInstanceUuid}, {}),
1625 ::testing::Optional(3));
1626 auto cache_size3 = stats_plugin_->GetCallbackGaugeValue(
1627 kMetricCacheSize, {target_uri_, rls_server_target_, kRlsInstanceUuid},
1628 {});
1629 EXPECT_THAT(cache_size3, ::testing::Optional(::testing::Ge(3)));
1630 if (cache_size.has_value() && cache_size3.has_value()) {
1631 EXPECT_GT(*cache_size3, *cache_size);
1632 }
1633 }
1634
TEST_F(RlsMetricsEnd2endTest,MetricValuesDefaultTargetRpcs)1635 TEST_F(RlsMetricsEnd2endTest, MetricValuesDefaultTargetRpcs) {
1636 auto kMetricDefaultTargetPicks =
1637 grpc_core::GlobalInstrumentsRegistryTestPeer::
1638 FindUInt64CounterHandleByName("grpc.lb.rls.default_target_picks")
1639 .value();
1640 StartBackends(1);
1641 const std::string default_target = grpc_core::LocalIpUri(backends_[0]->port_);
1642 SetNextResolution(
1643 MakeServiceConfigBuilder()
1644 .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1645 " \"service\":\"%s\","
1646 " \"method\":\"%s\""
1647 "}],"
1648 "\"headers\":["
1649 " {"
1650 " \"key\":\"%s\","
1651 " \"names\":["
1652 " \"key1\""
1653 " ]"
1654 " }"
1655 "]",
1656 kServiceValue, kMethodValue, kTestKey))
1657 .set_default_target(default_target)
1658 .Build());
1659 // Don't give the RLS server a response, so the RLS request will fail.
1660 // The data plane RPC should be sent to the default target.
1661 CheckRpcSendOk(DEBUG_LOCATION,
1662 RpcOptions().set_metadata({{"key1", kTestValue}}));
1663 EXPECT_THAT(
1664 rls_server_->service_.GetUnmatchedRequests(),
1665 ::testing::ElementsAre(
1666 // TODO(roth): Change this to use ::testing::ProtoEquals()
1667 // once that becomes available in OSS.
1668 ::testing::Property(
1669 &RouteLookupRequest::DebugString,
1670 BuildRlsRequest({{kTestKey, kTestValue}}).DebugString())));
1671 EXPECT_EQ(rls_server_->service_.request_count(), 1);
1672 EXPECT_EQ(rls_server_->service_.response_count(), 0);
1673 EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1674 // Check expected metrics.
1675 EXPECT_THAT(
1676 stats_plugin_->GetCounterValue(
1677 kMetricDefaultTargetPicks,
1678 {target_uri_, rls_server_target_, default_target, "complete"}, {}),
1679 ::testing::Optional(1));
1680 }
1681
1682 } // namespace
1683 } // namespace testing
1684 } // namespace grpc
1685
main(int argc,char ** argv)1686 int main(int argc, char** argv) {
1687 ::testing::InitGoogleTest(&argc, argv);
1688 grpc::testing::TestEnvironment env(&argc, argv);
1689 return RUN_ALL_TESTS();
1690 }
1691