xref: /aosp_15_r20/external/grpc-grpc/test/cpp/end2end/rls_end2end_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 // Copyright 2020 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 // FIXME: add tests:
18 // - cache eviction via cleanup timer (based on age)
19 // - RLS channel is down; wait_for_ready request is sent and RLS request fails
20 //   and goes into backoff; RLS channel comes back up before backoff timer
21 //   fires; request is processed at that point
22 // - find some deterministic way to exercise adaptive throttler code
23 
24 #include <deque>
25 #include <map>
26 #include <thread>
27 
28 #include <gmock/gmock.h>
29 #include <gtest/gtest.h>
30 
31 #include "absl/strings/str_format.h"
32 #include "absl/strings/str_join.h"
33 #include "absl/types/optional.h"
34 
35 #include <grpcpp/channel.h>
36 #include <grpcpp/create_channel.h>
37 #include <grpcpp/security/credentials.h>
38 #include <grpcpp/server.h>
39 #include <grpcpp/server_builder.h>
40 #include <grpcpp/support/channel_arguments.h>
41 
42 #include "src/core/client_channel/backup_poller.h"
43 #include "src/core/lib/address_utils/parse_address.h"
44 #include "src/core/lib/channel/channel_args.h"
45 #include "src/core/lib/config/config_vars.h"
46 #include "src/core/lib/gprpp/env.h"
47 #include "src/core/lib/gprpp/host_port.h"
48 #include "src/core/lib/gprpp/time.h"
49 #include "src/core/lib/iomgr/sockaddr.h"
50 #include "src/core/lib/security/credentials/fake/fake_credentials.h"
51 #include "src/core/lib/uri/uri_parser.h"
52 #include "src/core/load_balancing/rls/rls.h"
53 #include "src/core/resolver/fake/fake_resolver.h"
54 #include "src/core/service_config/service_config_impl.h"
55 #include "src/cpp/server/secure_server_credentials.h"
56 #include "src/proto/grpc/lookup/v1/rls.grpc.pb.h"
57 #include "src/proto/grpc/lookup/v1/rls.pb.h"
58 #include "src/proto/grpc/testing/echo.grpc.pb.h"
59 #include "test/core/util/fake_stats_plugin.h"
60 #include "test/core/util/port.h"
61 #include "test/core/util/resolve_localhost_ip46.h"
62 #include "test/core/util/test_config.h"
63 #include "test/core/util/test_lb_policies.h"
64 #include "test/cpp/end2end/counted_service.h"
65 #include "test/cpp/end2end/rls_server.h"
66 #include "test/cpp/end2end/test_service_impl.h"
67 #include "test/cpp/util/credentials.h"
68 #include "test/cpp/util/test_config.h"
69 
70 using ::grpc::lookup::v1::RouteLookupRequest;
71 
72 namespace grpc {
73 namespace testing {
74 namespace {
75 
76 const char* kServerName = "test.google.fr";
77 const char* kRequestMessage = "Live long and prosper.";
78 const char* kRlsInstanceUuid = "rls_instance_uuid";
79 
80 const char* kCallCredsMdKey = "call_cred_name";
81 const char* kCallCredsMdValue = "call_cred_value";
82 
83 const char* kTestKey = "test_key";
84 const char* kTestValue = "test_value";
85 const char* kHostKey = "host_key";
86 const char* kServiceKey = "service_key";
87 const char* kServiceValue = "grpc.testing.EchoTestService";
88 const char* kMethodKey = "method_key";
89 const char* kMethodValue = "Echo";
90 const char* kConstantKey = "constant_key";
91 const char* kConstantValue = "constant_value";
92 
93 using BackendService = CountedService<TestServiceImpl>;
94 
95 // Subclass of TestServiceImpl that increments a request counter for
96 // every call to the Echo Rpc.
97 class MyTestServiceImpl : public BackendService {
98  public:
Echo(ServerContext * context,const EchoRequest * request,EchoResponse * response)99   Status Echo(ServerContext* context, const EchoRequest* request,
100               EchoResponse* response) override {
101     // Backend should see call creds.
102     EXPECT_THAT(context->client_metadata(),
103                 ::testing::Contains(
104                     ::testing::Pair(kCallCredsMdKey, kCallCredsMdValue)));
105     IncreaseRequestCount();
106     auto client_metadata = context->client_metadata();
107     auto range = client_metadata.equal_range("x-google-rls-data");
108     {
109       grpc::internal::MutexLock lock(&mu_);
110       for (auto it = range.first; it != range.second; ++it) {
111         rls_header_data_.insert(
112             std::string(it->second.begin(), it->second.length()));
113       }
114     }
115     IncreaseResponseCount();
116     return TestServiceImpl::Echo(context, request, response);
117   }
118 
rls_data()119   std::set<std::string> rls_data() {
120     grpc::internal::MutexLock lock(&mu_);
121     return std::move(rls_header_data_);
122   }
123 
Start()124   void Start() {}
125 
Shutdown()126   void Shutdown() {}
127 
128  private:
129   grpc::internal::Mutex mu_;
130   std::set<std::string> rls_header_data_ ABSL_GUARDED_BY(&mu_);
131 };
132 
133 class FakeResolverResponseGeneratorWrapper {
134  public:
FakeResolverResponseGeneratorWrapper()135   FakeResolverResponseGeneratorWrapper()
136       : response_generator_(grpc_core::MakeRefCounted<
137                             grpc_core::FakeResolverResponseGenerator>()) {}
138 
SetNextResolution(absl::string_view service_config_json)139   void SetNextResolution(absl::string_view service_config_json) {
140     grpc_core::ExecCtx exec_ctx;
141     response_generator_->SetResponseSynchronously(
142         BuildFakeResults(service_config_json));
143   }
144 
Get() const145   grpc_core::FakeResolverResponseGenerator* Get() const {
146     return response_generator_.get();
147   }
148 
149  private:
BuildFakeResults(absl::string_view service_config_json)150   static grpc_core::Resolver::Result BuildFakeResults(
151       absl::string_view service_config_json) {
152     grpc_core::Resolver::Result result;
153     result.service_config =
154         grpc_core::ServiceConfigImpl::Create(result.args, service_config_json);
155     EXPECT_TRUE(result.service_config.ok()) << result.service_config.status();
156     return result;
157   }
158 
159   grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
160       response_generator_;
161 };
162 
163 class RlsEnd2endTest : public ::testing::Test {
164  protected:
SetUpTestSuite()165   static void SetUpTestSuite() {
166     grpc_core::ConfigVars::Overrides overrides;
167     overrides.client_channel_backup_poll_interval_ms = 1;
168     grpc_core::ConfigVars::SetOverrides(overrides);
169     grpc_core::CoreConfiguration::RegisterBuilder(
170         grpc_core::RegisterFixedAddressLoadBalancingPolicy);
171     grpc_init();
172   }
173 
TearDownTestSuite()174   static void TearDownTestSuite() {
175     grpc_shutdown_blocking();
176     grpc_core::CoreConfiguration::Reset();
177   }
178 
SetUp()179   void SetUp() override {
180     rls_server_ = std::make_unique<ServerThread<RlsServiceImpl>>(
181         "rls", [](grpc::ServerContext* ctx) {
182           EXPECT_THAT(ctx->client_metadata(),
183                       ::testing::Contains(
184                           ::testing::Pair(kCallCredsMdKey, kCallCredsMdValue)));
185           EXPECT_EQ(ctx->ExperimentalGetAuthority(), kServerName);
186         });
187     rls_server_->Start();
188     rls_server_target_ = absl::StrFormat("localhost:%d", rls_server_->port_);
189     // Set up client.
190     resolver_response_generator_ =
191         std::make_unique<FakeResolverResponseGeneratorWrapper>();
192     ChannelArguments args;
193     args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
194                     resolver_response_generator_->Get());
195     args.SetString(GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS, kServerName);
196     args.SetString(GRPC_ARG_TEST_ONLY_RLS_INSTANCE_ID, kRlsInstanceUuid);
197     grpc_channel_credentials* channel_creds =
198         grpc_fake_transport_security_credentials_create();
199     grpc_call_credentials* call_creds = grpc_md_only_test_credentials_create(
200         kCallCredsMdKey, kCallCredsMdValue);
201     auto creds = std::make_shared<TestCompositeChannelCredentials>(
202         channel_creds, call_creds);
203     call_creds->Unref();
204     channel_creds->Unref();
205     target_uri_ = absl::StrCat("fake:///", kServerName);
206     channel_ = grpc::CreateCustomChannel(target_uri_, creds, args);
207     stub_ = grpc::testing::EchoTestService::NewStub(channel_);
208   }
209 
TearDown()210   void TearDown() override {
211     ShutdownBackends();
212     rls_server_->Shutdown();
213   }
214 
ShutdownBackends()215   void ShutdownBackends() {
216     for (auto& server : backends_) {
217       server->Shutdown();
218     }
219   }
220 
StartBackends(size_t num_servers)221   void StartBackends(size_t num_servers) {
222     backends_.clear();
223     for (size_t i = 0; i < num_servers; ++i) {
224       backends_.push_back(
225           std::make_unique<ServerThread<MyTestServiceImpl>>("backend"));
226       backends_.back()->Start();
227     }
228   }
229 
230   struct RpcOptions {
231     int timeout_ms = 2000;
232     bool wait_for_ready = false;
233     std::vector<std::pair<std::string, std::string>> metadata;
234 
RpcOptionsgrpc::testing::__anond59ed2ae0111::RlsEnd2endTest::RpcOptions235     RpcOptions() {}
236 
set_timeout_msgrpc::testing::__anond59ed2ae0111::RlsEnd2endTest::RpcOptions237     RpcOptions& set_timeout_ms(int rpc_timeout_ms) {
238       timeout_ms = rpc_timeout_ms;
239       return *this;
240     }
241 
set_wait_for_readygrpc::testing::__anond59ed2ae0111::RlsEnd2endTest::RpcOptions242     RpcOptions& set_wait_for_ready(bool rpc_wait_for_ready) {
243       wait_for_ready = rpc_wait_for_ready;
244       return *this;
245     }
246 
set_metadatagrpc::testing::__anond59ed2ae0111::RlsEnd2endTest::RpcOptions247     RpcOptions& set_metadata(
248         std::vector<std::pair<std::string, std::string>> rpc_metadata) {
249       metadata = std::move(rpc_metadata);
250       return *this;
251     }
252 
253     // Populates context.
SetupRpcgrpc::testing::__anond59ed2ae0111::RlsEnd2endTest::RpcOptions254     void SetupRpc(ClientContext* context) const {
255       for (const auto& item : metadata) {
256         context->AddMetadata(item.first, item.second);
257       }
258       if (timeout_ms != 0) {
259         context->set_deadline(
260             grpc_timeout_milliseconds_to_deadline(timeout_ms));
261       }
262       if (wait_for_ready) context->set_wait_for_ready(true);
263     }
264   };
265 
SendRpc(const RpcOptions & rpc_options=RpcOptions (),EchoResponse * response=nullptr)266   Status SendRpc(const RpcOptions& rpc_options = RpcOptions(),
267                  EchoResponse* response = nullptr) {
268     EchoResponse local_response;
269     if (response == nullptr) response = &local_response;
270     ClientContext context;
271     rpc_options.SetupRpc(&context);
272     EchoRequest request;
273     request.set_message(kRequestMessage);
274     return stub_->Echo(&context, request, response);
275   }
276 
CheckRpcSendOk(const grpc_core::DebugLocation & location,const RpcOptions & rpc_options=RpcOptions ())277   void CheckRpcSendOk(const grpc_core::DebugLocation& location,
278                       const RpcOptions& rpc_options = RpcOptions()) {
279     EchoResponse response;
280     Status status = SendRpc(rpc_options, &response);
281     ASSERT_TRUE(status.ok()) << location.file() << ":" << location.line()
282                              << ": RPC failed: " << status.error_code() << ": "
283                              << status.error_message();
284     EXPECT_EQ(response.message(), kRequestMessage)
285         << location.file() << ":" << location.line();
286   }
287 
CheckRpcSendFailure(const grpc_core::DebugLocation & location,StatusCode expected_code,absl::string_view expected_message,const RpcOptions & rpc_options=RpcOptions ())288   void CheckRpcSendFailure(const grpc_core::DebugLocation& location,
289                            StatusCode expected_code,
290                            absl::string_view expected_message,
291                            const RpcOptions& rpc_options = RpcOptions()) {
292     Status status = SendRpc(rpc_options);
293     ASSERT_FALSE(status.ok()) << location.file() << ":" << location.line();
294     EXPECT_EQ(expected_code, status.error_code())
295         << location.file() << ":" << location.line();
296     EXPECT_EQ(expected_message, status.error_message())
297         << location.file() << ":" << location.line();
298   }
299 
300   class ServiceConfigBuilder {
301    public:
ServiceConfigBuilder(absl::string_view rls_server_target)302     explicit ServiceConfigBuilder(absl::string_view rls_server_target)
303         : rls_server_target_(rls_server_target) {}
304 
set_lookup_service_timeout(grpc_core::Duration timeout)305     ServiceConfigBuilder& set_lookup_service_timeout(
306         grpc_core::Duration timeout) {
307       lookup_service_timeout_ = timeout * grpc_test_slowdown_factor();
308       return *this;
309     }
310 
set_default_target(std::string default_target)311     ServiceConfigBuilder& set_default_target(std::string default_target) {
312       default_target_ = std::move(default_target);
313       return *this;
314     }
315 
set_max_age(grpc_core::Duration max_age)316     ServiceConfigBuilder& set_max_age(grpc_core::Duration max_age) {
317       max_age_ = max_age * grpc_test_slowdown_factor();
318       return *this;
319     }
320 
set_stale_age(grpc_core::Duration stale_age)321     ServiceConfigBuilder& set_stale_age(grpc_core::Duration stale_age) {
322       stale_age_ = stale_age * grpc_test_slowdown_factor();
323       return *this;
324     }
325 
set_cache_size_bytes(int64_t size)326     ServiceConfigBuilder& set_cache_size_bytes(int64_t size) {
327       cache_size_bytes_ = size;
328       return *this;
329     }
330 
AddKeyBuilder(absl::string_view key_builder)331     ServiceConfigBuilder& AddKeyBuilder(absl::string_view key_builder) {
332       key_builders_.push_back(absl::StrCat("{", key_builder, "}"));
333       return *this;
334     }
335 
Build()336     std::string Build() {
337       // First build parts of routeLookupConfig.
338       std::vector<std::string> route_lookup_config_parts;
339       route_lookup_config_parts.push_back(absl::StrFormat(
340           "        \"lookupService\":\"%s\"", rls_server_target_));
341       if (lookup_service_timeout_ > grpc_core::Duration::Zero()) {
342         route_lookup_config_parts.push_back(
343             absl::StrFormat("        \"lookupServiceTimeout\":\"%fs\"",
344                             lookup_service_timeout_.seconds()));
345       }
346       if (!default_target_.empty()) {
347         route_lookup_config_parts.push_back(absl::StrFormat(
348             "        \"defaultTarget\":\"%s\"", default_target_));
349       }
350       route_lookup_config_parts.push_back(absl::StrFormat(
351           "        \"cacheSizeBytes\":%" PRId64, cache_size_bytes_));
352       if (max_age_ > grpc_core::Duration::Zero()) {
353         route_lookup_config_parts.push_back(
354             absl::StrFormat("        \"maxAge\":\"%fs\"", max_age_.seconds()));
355       }
356       if (stale_age_ > grpc_core::Duration::Zero()) {
357         route_lookup_config_parts.push_back(absl::StrFormat(
358             "        \"staleAge\":\"%fs\"", stale_age_.seconds()));
359       }
360       if (!key_builders_.empty()) {
361         route_lookup_config_parts.push_back(
362             absl::StrFormat("        \"grpcKeybuilders\":[%s]",
363                             absl::StrJoin(key_builders_, ",")));
364       }
365       // Now build parts of RLS LB policy config.
366       std::vector<std::string> rls_config_parts;
367       if (!route_lookup_config_parts.empty()) {
368         rls_config_parts.push_back(absl::StrCat(
369             "      \"routeLookupConfig\":{",
370             absl::StrJoin(route_lookup_config_parts, ","), "      }"));
371       }
372       rls_config_parts.push_back(
373           "      \"childPolicy\":[{"
374           "        \"fixed_address_lb\":{}\n"
375           "      }],\n"
376           "      \"childPolicyConfigTargetFieldName\":\"address\"\n");
377       // Put it all together.
378       return absl::StrCat(
379           "{"
380           "  \"loadBalancingConfig\":[{"
381           "    \"rls_experimental\":{",
382           absl::StrJoin(rls_config_parts, ","),
383           "    }"
384           "  }]"
385           "}");
386     }
387 
388    private:
389     absl::string_view rls_server_target_;
390     grpc_core::Duration lookup_service_timeout_;
391     std::string default_target_;
392     grpc_core::Duration max_age_;
393     grpc_core::Duration stale_age_;
394     int64_t cache_size_bytes_ = 10485760;
395     std::vector<std::string> key_builders_;
396   };
397 
MakeServiceConfigBuilder()398   ServiceConfigBuilder MakeServiceConfigBuilder() {
399     return ServiceConfigBuilder(rls_server_target_);
400   }
401 
SetNextResolution(absl::string_view service_config_json)402   void SetNextResolution(absl::string_view service_config_json) {
403     resolver_response_generator_->SetNextResolution(service_config_json);
404   }
405 
406   template <typename T>
407   struct ServerThread {
408     template <typename... Args>
ServerThreadgrpc::testing::__anond59ed2ae0111::RlsEnd2endTest::ServerThread409     explicit ServerThread(const grpc::string& type, Args&&... args)
410         : port_(grpc_pick_unused_port_or_die()),
411           type_(type),
412           service_(std::forward<Args>(args)...) {}
413 
Startgrpc::testing::__anond59ed2ae0111::RlsEnd2endTest::ServerThread414     void Start() {
415       gpr_log(GPR_INFO, "starting %s server on port %d", type_.c_str(), port_);
416       GPR_ASSERT(!running_);
417       running_ = true;
418       service_.Start();
419       grpc::internal::Mutex mu;
420       // We need to acquire the lock here in order to prevent the notify_one
421       // by ServerThread::Serve from firing before the wait below is hit.
422       grpc::internal::MutexLock lock(&mu);
423       grpc::internal::CondVar cond;
424       thread_ = std::make_unique<std::thread>(
425           std::bind(&ServerThread::Serve, this, &mu, &cond));
426       cond.Wait(&mu);
427       gpr_log(GPR_INFO, "%s server startup complete", type_.c_str());
428     }
429 
Servegrpc::testing::__anond59ed2ae0111::RlsEnd2endTest::ServerThread430     void Serve(grpc::internal::Mutex* mu, grpc::internal::CondVar* cond) {
431       // We need to acquire the lock here in order to prevent the notify_one
432       // below from firing before its corresponding wait is executed.
433       grpc::internal::MutexLock lock(mu);
434       ServerBuilder builder;
435       auto creds = std::make_shared<SecureServerCredentials>(
436           grpc_fake_transport_security_server_credentials_create());
437       builder.AddListeningPort(absl::StrCat("localhost:", port_),
438                                std::move(creds));
439       builder.RegisterService(&service_);
440       server_ = builder.BuildAndStart();
441       cond->Signal();
442     }
443 
Shutdowngrpc::testing::__anond59ed2ae0111::RlsEnd2endTest::ServerThread444     void Shutdown() {
445       if (!running_) return;
446       gpr_log(GPR_INFO, "%s about to shutdown", type_.c_str());
447       service_.Shutdown();
448       server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
449       thread_->join();
450       gpr_log(GPR_INFO, "%s shutdown completed", type_.c_str());
451       running_ = false;
452     }
453 
454     const int port_;
455     grpc::string type_;
456     T service_;
457     std::unique_ptr<Server> server_;
458     std::unique_ptr<std::thread> thread_;
459     bool running_ = false;
460   };
461 
462   std::vector<std::unique_ptr<ServerThread<MyTestServiceImpl>>> backends_;
463   std::string rls_server_target_;
464   std::unique_ptr<ServerThread<RlsServiceImpl>> rls_server_;
465   std::unique_ptr<FakeResolverResponseGeneratorWrapper>
466       resolver_response_generator_;
467   std::string target_uri_;
468   std::shared_ptr<grpc::Channel> channel_;
469   std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
470 };
471 
TEST_F(RlsEnd2endTest,Basic)472 TEST_F(RlsEnd2endTest, Basic) {
473   StartBackends(1);
474   SetNextResolution(
475       MakeServiceConfigBuilder()
476           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
477                                          "  \"service\":\"%s\","
478                                          "  \"method\":\"%s\""
479                                          "}],"
480                                          "\"headers\":["
481                                          "  {"
482                                          "    \"key\":\"%s\","
483                                          "    \"names\":["
484                                          "      \"key1\""
485                                          "    ]"
486                                          "  }"
487                                          "]",
488                                          kServiceValue, kMethodValue, kTestKey))
489           .Build());
490   rls_server_->service_.SetResponse(
491       BuildRlsRequest({{kTestKey, kTestValue}}),
492       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
493   CheckRpcSendOk(DEBUG_LOCATION,
494                  RpcOptions().set_metadata({{"key1", kTestValue}}));
495   EXPECT_EQ(rls_server_->service_.request_count(), 1);
496   EXPECT_EQ(rls_server_->service_.response_count(), 1);
497   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
498   // No RLS header seen by the backend, since the RLS response didn't set any.
499   EXPECT_THAT(backends_[0]->service_.rls_data(), ::testing::ElementsAre());
500 }
501 
TEST_F(RlsEnd2endTest,DuplicateHeadersAreMerged)502 TEST_F(RlsEnd2endTest, DuplicateHeadersAreMerged) {
503   const char* kTestValue2 = "test_value_2";
504   StartBackends(1);
505   SetNextResolution(
506       MakeServiceConfigBuilder()
507           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
508                                          "  \"service\":\"%s\","
509                                          "  \"method\":\"%s\""
510                                          "}],"
511                                          "\"headers\":["
512                                          "  {"
513                                          "    \"key\":\"%s\","
514                                          "    \"names\":["
515                                          "      \"key1\""
516                                          "    ]"
517                                          "  }"
518                                          "]",
519                                          kServiceValue, kMethodValue, kTestKey))
520           .Build());
521   rls_server_->service_.SetResponse(
522       BuildRlsRequest({{kTestKey, absl::StrCat(kTestValue, ",", kTestValue2)}}),
523       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
524   // Same header present twice in the request.  Values should be merged.
525   CheckRpcSendOk(
526       DEBUG_LOCATION,
527       RpcOptions().set_metadata({{"key1", kTestValue}, {"key1", kTestValue2}}));
528   EXPECT_EQ(rls_server_->service_.request_count(), 1);
529   EXPECT_EQ(rls_server_->service_.response_count(), 1);
530   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
531 }
532 
TEST_F(RlsEnd2endTest,SecondHeaderUsed)533 TEST_F(RlsEnd2endTest, SecondHeaderUsed) {
534   StartBackends(1);
535   SetNextResolution(
536       MakeServiceConfigBuilder()
537           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
538                                          "  \"service\":\"%s\","
539                                          "  \"method\":\"%s\""
540                                          "}],"
541                                          "\"headers\":["
542                                          "  {"
543                                          "    \"key\":\"%s\","
544                                          "    \"names\":["
545                                          "      \"key1\", \"key2\""
546                                          "    ]"
547                                          "  }"
548                                          "]",
549                                          kServiceValue, kMethodValue, kTestKey))
550           .Build());
551   rls_server_->service_.SetResponse(
552       BuildRlsRequest({{kTestKey, kTestValue}}),
553       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
554   CheckRpcSendOk(DEBUG_LOCATION,
555                  RpcOptions().set_metadata({{"key2", kTestValue}}));
556   EXPECT_EQ(rls_server_->service_.request_count(), 1);
557   EXPECT_EQ(rls_server_->service_.response_count(), 1);
558   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
559 }
560 
TEST_F(RlsEnd2endTest,MultipleHeaderKeys)561 TEST_F(RlsEnd2endTest, MultipleHeaderKeys) {
562   const char* kTestKey2 = "test_key_2";
563   const char* kTestValue2 = "test_value_2";
564   StartBackends(1);
565   SetNextResolution(MakeServiceConfigBuilder()
566                         .AddKeyBuilder(absl::StrFormat(
567                             "\"names\":[{"
568                             "  \"service\":\"%s\","
569                             "  \"method\":\"%s\""
570                             "}],"
571                             "\"headers\":["
572                             "  {"
573                             "    \"key\":\"%s\","
574                             "    \"names\":["
575                             "      \"key1\""
576                             "    ]"
577                             "  },"
578                             "  {"
579                             "    \"key\":\"%s\","
580                             "    \"names\":["
581                             "      \"key2\""
582                             "    ]"
583                             "  }"
584                             "]",
585                             kServiceValue, kMethodValue, kTestKey, kTestKey2))
586                         .Build());
587   rls_server_->service_.SetResponse(
588       BuildRlsRequest({
589           {kTestKey, kTestValue},
590           {kTestKey2, kTestValue2},
591       }),
592       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
593   CheckRpcSendOk(
594       DEBUG_LOCATION,
595       RpcOptions().set_metadata({{"key1", kTestValue}, {"key2", kTestValue2}}));
596   EXPECT_EQ(rls_server_->service_.request_count(), 1);
597   EXPECT_EQ(rls_server_->service_.response_count(), 1);
598   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
599   // No RLS header seen by the backend, since the RLS response didn't set any.
600   EXPECT_THAT(backends_[0]->service_.rls_data(), ::testing::ElementsAre());
601 }
602 
TEST_F(RlsEnd2endTest,NoHeaderMatch)603 TEST_F(RlsEnd2endTest, NoHeaderMatch) {
604   StartBackends(1);
605   SetNextResolution(
606       MakeServiceConfigBuilder()
607           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
608                                          "  \"service\":\"%s\","
609                                          "  \"method\":\"%s\""
610                                          "}],"
611                                          "\"headers\":["
612                                          "  {"
613                                          "    \"key\":\"%s\","
614                                          "    \"names\":["
615                                          "      \"key1\""
616                                          "    ]"
617                                          "  }"
618                                          "]",
619                                          kServiceValue, kMethodValue, kTestKey))
620           .Build());
621   rls_server_->service_.SetResponse(
622       BuildRlsRequest({}),
623       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
624   // Request does not have header "key1", so kTestKey will not be added.
625   CheckRpcSendOk(DEBUG_LOCATION);
626   EXPECT_EQ(rls_server_->service_.request_count(), 1);
627   EXPECT_EQ(rls_server_->service_.response_count(), 1);
628   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
629 }
630 
TEST_F(RlsEnd2endTest,WildcardMethod)631 TEST_F(RlsEnd2endTest, WildcardMethod) {
632   StartBackends(1);
633   SetNextResolution(MakeServiceConfigBuilder()
634                         .AddKeyBuilder(absl::StrFormat("\"names\":[{"
635                                                        "  \"service\":\"%s\""
636                                                        "}],"
637                                                        "\"headers\":["
638                                                        "  {"
639                                                        "    \"key\":\"%s\","
640                                                        "    \"names\":["
641                                                        "      \"key1\""
642                                                        "    ]"
643                                                        "  }"
644                                                        "]",
645                                                        kServiceValue, kTestKey))
646                         .Build());
647   rls_server_->service_.SetResponse(
648       BuildRlsRequest({{kTestKey, kTestValue}}),
649       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
650   CheckRpcSendOk(DEBUG_LOCATION,
651                  RpcOptions().set_metadata({{"key1", kTestValue}}));
652   EXPECT_EQ(rls_server_->service_.request_count(), 1);
653   EXPECT_EQ(rls_server_->service_.response_count(), 1);
654   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
655 }
656 
TEST_F(RlsEnd2endTest,NoKeyBuilderForMethod)657 TEST_F(RlsEnd2endTest, NoKeyBuilderForMethod) {
658   StartBackends(1);
659   SetNextResolution(
660       MakeServiceConfigBuilder()
661           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
662                                          "  \"service\":\"%s\","
663                                          "  \"method\":\"some_other_method\""
664                                          "}],"
665                                          "\"headers\":["
666                                          "  {"
667                                          "    \"key\":\"%s\","
668                                          "    \"names\":["
669                                          "      \"key1\""
670                                          "    ]"
671                                          "  }"
672                                          "]",
673                                          kServiceValue, kTestKey))
674           .Build());
675   rls_server_->service_.SetResponse(
676       BuildRlsRequest({}),
677       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
678   CheckRpcSendOk(DEBUG_LOCATION);
679   EXPECT_EQ(rls_server_->service_.request_count(), 1);
680   EXPECT_EQ(rls_server_->service_.response_count(), 1);
681   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
682 }
683 
TEST_F(RlsEnd2endTest,HeaderData)684 TEST_F(RlsEnd2endTest, HeaderData) {
685   const char* kHeaderData = "header_data";
686   StartBackends(1);
687   SetNextResolution(
688       MakeServiceConfigBuilder()
689           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
690                                          "  \"service\":\"%s\","
691                                          "  \"method\":\"%s\""
692                                          "}],"
693                                          "\"headers\":["
694                                          "  {"
695                                          "    \"key\":\"%s\","
696                                          "    \"names\":["
697                                          "      \"key1\""
698                                          "    ]"
699                                          "  }"
700                                          "]",
701                                          kServiceValue, kMethodValue, kTestKey))
702           .Build());
703   rls_server_->service_.SetResponse(
704       BuildRlsRequest({{kTestKey, kTestValue}}),
705       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)},
706                        kHeaderData));
707   CheckRpcSendOk(DEBUG_LOCATION,
708                  RpcOptions().set_metadata({{"key1", kTestValue}}));
709   EXPECT_EQ(rls_server_->service_.request_count(), 1);
710   EXPECT_EQ(rls_server_->service_.response_count(), 1);
711   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
712   EXPECT_THAT(backends_[0]->service_.rls_data(),
713               ::testing::ElementsAre(kHeaderData));
714 }
715 
TEST_F(RlsEnd2endTest,ExtraKeysAndConstantKeys)716 TEST_F(RlsEnd2endTest, ExtraKeysAndConstantKeys) {
717   StartBackends(1);
718   SetNextResolution(
719       MakeServiceConfigBuilder()
720           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
721                                          "  \"service\":\"%s\","
722                                          "  \"method\":\"%s\""
723                                          "}],"
724                                          "\"headers\":["
725                                          "  {"
726                                          "    \"key\":\"%s\","
727                                          "    \"names\":["
728                                          "      \"key1\",\"key2\",\"key3\""
729                                          "    ]"
730                                          "  }"
731                                          "],"
732                                          "\"extraKeys\":{"
733                                          "  \"host\":\"%s\","
734                                          "  \"service\":\"%s\","
735                                          "  \"method\":\"%s\""
736                                          "},"
737                                          "\"constantKeys\":{"
738                                          "  \"%s\":\"%s\""
739                                          "}",
740                                          kServiceValue, kMethodValue, kTestKey,
741                                          kHostKey, kServiceKey, kMethodKey,
742                                          kConstantKey, kConstantValue))
743           .Build());
744   rls_server_->service_.SetResponse(
745       BuildRlsRequest({
746           {kTestKey, kTestValue},
747           {kHostKey, kServerName},
748           {kServiceKey, kServiceValue},
749           {kMethodKey, kMethodValue},
750           {kConstantKey, kConstantValue},
751       }),
752       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
753   CheckRpcSendOk(DEBUG_LOCATION,
754                  RpcOptions().set_metadata({{"key1", kTestValue}}));
755   EXPECT_EQ(rls_server_->service_.request_count(), 1);
756   EXPECT_EQ(rls_server_->service_.response_count(), 1);
757   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
758 }
759 
TEST_F(RlsEnd2endTest,TwoCacheEntriesWithSameTarget)760 TEST_F(RlsEnd2endTest, TwoCacheEntriesWithSameTarget) {
761   const char* kTestValue2 = "test_value2";
762   StartBackends(1);
763   SetNextResolution(
764       MakeServiceConfigBuilder()
765           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
766                                          "  \"service\":\"%s\","
767                                          "  \"method\":\"%s\""
768                                          "}],"
769                                          "\"headers\":["
770                                          "  {"
771                                          "    \"key\":\"%s\","
772                                          "    \"names\":["
773                                          "      \"key1\""
774                                          "    ]"
775                                          "  }"
776                                          "]",
777                                          kServiceValue, kMethodValue, kTestKey))
778           .Build());
779   rls_server_->service_.SetResponse(
780       BuildRlsRequest({{kTestKey, kTestValue}}),
781       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
782   rls_server_->service_.SetResponse(
783       BuildRlsRequest({{kTestKey, kTestValue2}}),
784       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
785   CheckRpcSendOk(DEBUG_LOCATION,
786                  RpcOptions().set_metadata({{"key1", kTestValue}}));
787   EXPECT_EQ(rls_server_->service_.request_count(), 1);
788   EXPECT_EQ(rls_server_->service_.response_count(), 1);
789   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
790   CheckRpcSendOk(DEBUG_LOCATION,
791                  RpcOptions().set_metadata({{"key1", kTestValue2}}));
792   EXPECT_EQ(rls_server_->service_.request_count(), 2);
793   EXPECT_EQ(rls_server_->service_.response_count(), 2);
794   EXPECT_EQ(backends_[0]->service_.request_count(), 2);
795 }
796 
TEST_F(RlsEnd2endTest,FailedRlsRequestWithoutDefaultTarget)797 TEST_F(RlsEnd2endTest, FailedRlsRequestWithoutDefaultTarget) {
798   StartBackends(1);
799   SetNextResolution(
800       MakeServiceConfigBuilder()
801           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
802                                          "  \"service\":\"%s\","
803                                          "  \"method\":\"%s\""
804                                          "}],"
805                                          "\"headers\":["
806                                          "  {"
807                                          "    \"key\":\"%s\","
808                                          "    \"names\":["
809                                          "      \"key1\""
810                                          "    ]"
811                                          "  }"
812                                          "]",
813                                          kServiceValue, kMethodValue, kTestKey))
814           .Build());
815   // The test below has one RLS RPC fail and then a subsequent one that
816   // should succeed.  However, once the first RPC fails, the adaptive
817   // throttling code will throttle the second RPC with about 11% probability,
818   // which would cause the test to be flaky.  To avoid that, we seed the
819   // throttling state by sending two successful RPCs before we start the
820   // real test, which ensures that the second RPC of the real test will
821   // not be throttled (with 3 successes and 1 failure, the throttling
822   // probability will be negative, so the subsequent request will never be
823   // throttled).
824   const char* kTestValue2 = "test_value_2";
825   const char* kTestValue3 = "test_value_3";
826   rls_server_->service_.SetResponse(
827       BuildRlsRequest({{kTestKey, kTestValue2}}),
828       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
829   rls_server_->service_.SetResponse(
830       BuildRlsRequest({{kTestKey, kTestValue3}}),
831       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
832   CheckRpcSendOk(DEBUG_LOCATION,
833                  RpcOptions().set_metadata({{"key1", kTestValue2}}));
834   CheckRpcSendOk(DEBUG_LOCATION,
835                  RpcOptions().set_metadata({{"key1", kTestValue3}}));
836   // Now start the real test.
837   // Send an RPC before we give the RLS server a response.
838   // The RLS request will fail, and thus so will the data plane RPC.
839   CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
840                       "RLS request failed: INTERNAL: no response entry",
841                       RpcOptions().set_metadata({{"key1", kTestValue}}));
842   EXPECT_THAT(
843       rls_server_->service_.GetUnmatchedRequests(),
844       ::testing::ElementsAre(
845           // TODO(roth): Change this to use ::testing::ProtoEquals()
846           // once that becomes available in OSS.
847           ::testing::Property(
848               &RouteLookupRequest::DebugString,
849               BuildRlsRequest({{kTestKey, kTestValue}}).DebugString())));
850   // Now give the RLS server the right response.
851   rls_server_->service_.SetResponse(
852       BuildRlsRequest({{kTestKey, kTestValue}}),
853       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
854   // Sleep long enough for backoff to elapse, then try another RPC.
855   gpr_sleep_until(grpc_timeout_seconds_to_deadline(3));
856   CheckRpcSendOk(DEBUG_LOCATION,
857                  RpcOptions().set_metadata({{"key1", kTestValue}}));
858   EXPECT_EQ(rls_server_->service_.request_count(), 4);
859   EXPECT_EQ(rls_server_->service_.response_count(), 3);
860   EXPECT_EQ(backends_[0]->service_.request_count(), 3);
861 }
862 
TEST_F(RlsEnd2endTest,FailedRlsRequestWithDefaultTarget)863 TEST_F(RlsEnd2endTest, FailedRlsRequestWithDefaultTarget) {
864   StartBackends(1);
865   SetNextResolution(
866       MakeServiceConfigBuilder()
867           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
868                                          "  \"service\":\"%s\","
869                                          "  \"method\":\"%s\""
870                                          "}],"
871                                          "\"headers\":["
872                                          "  {"
873                                          "    \"key\":\"%s\","
874                                          "    \"names\":["
875                                          "      \"key1\""
876                                          "    ]"
877                                          "  }"
878                                          "]",
879                                          kServiceValue, kMethodValue, kTestKey))
880           .set_default_target(grpc_core::LocalIpUri(backends_[0]->port_))
881           .Build());
882   // Don't give the RLS server a response, so the RLS request will fail.
883   // The data plane RPC should be sent to the default target.
884   CheckRpcSendOk(DEBUG_LOCATION,
885                  RpcOptions().set_metadata({{"key1", kTestValue}}));
886   EXPECT_THAT(
887       rls_server_->service_.GetUnmatchedRequests(),
888       ::testing::ElementsAre(
889           // TODO(roth): Change this to use ::testing::ProtoEquals()
890           // once that becomes available in OSS.
891           ::testing::Property(
892               &RouteLookupRequest::DebugString,
893               BuildRlsRequest({{kTestKey, kTestValue}}).DebugString())));
894   EXPECT_EQ(rls_server_->service_.request_count(), 1);
895   EXPECT_EQ(rls_server_->service_.response_count(), 0);
896   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
897 }
898 
TEST_F(RlsEnd2endTest,RlsRequestTimeout)899 TEST_F(RlsEnd2endTest, RlsRequestTimeout) {
900   StartBackends(2);
901   SetNextResolution(
902       MakeServiceConfigBuilder()
903           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
904                                          "  \"service\":\"%s\","
905                                          "  \"method\":\"%s\""
906                                          "}],"
907                                          "\"headers\":["
908                                          "  {"
909                                          "    \"key\":\"%s\","
910                                          "    \"names\":["
911                                          "      \"key1\""
912                                          "    ]"
913                                          "  }"
914                                          "]",
915                                          kServiceValue, kMethodValue, kTestKey))
916           .set_default_target(grpc_core::LocalIpUri(backends_[1]->port_))
917           .set_lookup_service_timeout(grpc_core::Duration::Seconds(2))
918           .Build());
919   // RLS server will send a response, but it's longer than the timeout.
920   rls_server_->service_.SetResponse(
921       BuildRlsRequest({{kTestKey, kTestValue}}),
922       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}),
923       /*response_delay=*/grpc_core::Duration::Seconds(3));
924   // The data plane RPC should be sent to the default target.
925   CheckRpcSendOk(DEBUG_LOCATION, RpcOptions().set_timeout_ms(4000).set_metadata(
926                                      {{"key1", kTestValue}}));
927   EXPECT_EQ(rls_server_->service_.request_count(), 1);
928   EXPECT_EQ(backends_[0]->service_.request_count(), 0);
929   EXPECT_EQ(backends_[1]->service_.request_count(), 1);
930 }
931 
TEST_F(RlsEnd2endTest,UpdateConfig)932 TEST_F(RlsEnd2endTest, UpdateConfig) {
933   StartBackends(2);
934   auto service_config_builder =
935       MakeServiceConfigBuilder()
936           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
937                                          "  \"service\":\"%s\","
938                                          "  \"method\":\"%s\""
939                                          "}],"
940                                          "\"headers\":["
941                                          "  {"
942                                          "    \"key\":\"%s\","
943                                          "    \"names\":["
944                                          "      \"key1\""
945                                          "    ]"
946                                          "  }"
947                                          "]",
948                                          kServiceValue, kMethodValue, kTestKey))
949           .set_default_target(grpc_core::LocalIpUri(backends_[0]->port_));
950   SetNextResolution(service_config_builder.Build());
951   // Don't give the RLS server a response, so the RLS request will fail.
952   // The data plane RPC should be sent to the default target.
953   CheckRpcSendOk(DEBUG_LOCATION,
954                  RpcOptions().set_metadata({{"key1", kTestValue}}));
955   EXPECT_THAT(
956       rls_server_->service_.GetUnmatchedRequests(),
957       ::testing::ElementsAre(
958           // TODO(roth): Change this to use ::testing::ProtoEquals()
959           // once that becomes available in OSS.
960           ::testing::Property(
961               &RouteLookupRequest::DebugString,
962               BuildRlsRequest({{kTestKey, kTestValue}}).DebugString())));
963   EXPECT_EQ(rls_server_->service_.request_count(), 1);
964   EXPECT_EQ(rls_server_->service_.response_count(), 0);
965   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
966   EXPECT_EQ(backends_[1]->service_.request_count(), 0);
967   // Now update the config to point to a new default target.
968   service_config_builder.set_default_target(
969       grpc_core::LocalIpUri(backends_[1]->port_));
970   SetNextResolution(service_config_builder.Build());
971   // Send another RPC, which should go to the new default target.
972   // The RLS server will *not* see another request, because the cache
973   // entry is still in backoff.
974   CheckRpcSendOk(DEBUG_LOCATION,
975                  RpcOptions().set_metadata({{"key1", kTestValue}}));
976   EXPECT_EQ(rls_server_->service_.request_count(), 1);
977   EXPECT_EQ(rls_server_->service_.response_count(), 0);
978   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
979   EXPECT_EQ(backends_[1]->service_.request_count(), 1);
980 }
981 
TEST_F(RlsEnd2endTest,CachedResponse)982 TEST_F(RlsEnd2endTest, CachedResponse) {
983   StartBackends(1);
984   SetNextResolution(
985       MakeServiceConfigBuilder()
986           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
987                                          "  \"service\":\"%s\","
988                                          "  \"method\":\"%s\""
989                                          "}],"
990                                          "\"headers\":["
991                                          "  {"
992                                          "    \"key\":\"%s\","
993                                          "    \"names\":["
994                                          "      \"key1\""
995                                          "    ]"
996                                          "  }"
997                                          "]",
998                                          kServiceValue, kMethodValue, kTestKey))
999           .Build());
1000   rls_server_->service_.SetResponse(
1001       BuildRlsRequest({{kTestKey, kTestValue}}),
1002       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
1003   // Send two RPCs.
1004   CheckRpcSendOk(DEBUG_LOCATION,
1005                  RpcOptions().set_metadata({{"key1", kTestValue}}));
1006   CheckRpcSendOk(DEBUG_LOCATION,
1007                  RpcOptions().set_metadata({{"key1", kTestValue}}));
1008   // The RLS server should have seen only one request.
1009   EXPECT_EQ(rls_server_->service_.request_count(), 1);
1010   EXPECT_EQ(rls_server_->service_.response_count(), 1);
1011   EXPECT_EQ(backends_[0]->service_.request_count(), 2);
1012 }
1013 
TEST_F(RlsEnd2endTest,StaleCacheEntry)1014 TEST_F(RlsEnd2endTest, StaleCacheEntry) {
1015   StartBackends(1);
1016   SetNextResolution(
1017       MakeServiceConfigBuilder()
1018           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1019                                          "  \"service\":\"%s\","
1020                                          "  \"method\":\"%s\""
1021                                          "}],"
1022                                          "\"headers\":["
1023                                          "  {"
1024                                          "    \"key\":\"%s\","
1025                                          "    \"names\":["
1026                                          "      \"key1\""
1027                                          "    ]"
1028                                          "  }"
1029                                          "]",
1030                                          kServiceValue, kMethodValue, kTestKey))
1031           .set_max_age(grpc_core::Duration::Seconds(5))
1032           .set_stale_age(grpc_core::Duration::Seconds(1))
1033           .Build());
1034   rls_server_->service_.SetResponse(
1035       BuildRlsRequest({{kTestKey, kTestValue}}),
1036       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
1037   // Send one RPC.  RLS server gets a request, and RPC goes to backend.
1038   CheckRpcSendOk(DEBUG_LOCATION,
1039                  RpcOptions().set_metadata({{"key1", kTestValue}}));
1040   EXPECT_EQ(rls_server_->service_.request_count(), 1);
1041   EXPECT_EQ(rls_server_->service_.response_count(), 1);
1042   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1043   // Update RLS server to expect stale request.
1044   rls_server_->service_.RemoveResponse(
1045       BuildRlsRequest({{kTestKey, kTestValue}}));
1046   rls_server_->service_.SetResponse(
1047       BuildRlsRequest({{kTestKey, kTestValue}},
1048                       RouteLookupRequest::REASON_STALE),
1049       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
1050   // Wait longer than stale age.
1051   gpr_sleep_until(grpc_timeout_seconds_to_deadline(2));
1052   // Send another RPC.  This should use the stale value but should
1053   // dispatch a second RLS request.
1054   CheckRpcSendOk(DEBUG_LOCATION,
1055                  RpcOptions().set_metadata({{"key1", kTestValue}}));
1056   EXPECT_EQ(backends_[0]->service_.request_count(), 2);
1057   // Wait for RLS server to receive the second request.
1058   gpr_sleep_until(grpc_timeout_seconds_to_deadline(2));
1059   EXPECT_EQ(rls_server_->service_.request_count(), 2);
1060   EXPECT_EQ(rls_server_->service_.response_count(), 2);
1061 }
1062 
TEST_F(RlsEnd2endTest,StaleCacheEntryWithHeaderData)1063 TEST_F(RlsEnd2endTest, StaleCacheEntryWithHeaderData) {
1064   const char* kHeaderData = "header_data";
1065   StartBackends(1);
1066   SetNextResolution(
1067       MakeServiceConfigBuilder()
1068           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1069                                          "  \"service\":\"%s\","
1070                                          "  \"method\":\"%s\""
1071                                          "}],"
1072                                          "\"headers\":["
1073                                          "  {"
1074                                          "    \"key\":\"%s\","
1075                                          "    \"names\":["
1076                                          "      \"key1\""
1077                                          "    ]"
1078                                          "  }"
1079                                          "]",
1080                                          kServiceValue, kMethodValue, kTestKey))
1081           .set_max_age(grpc_core::Duration::Seconds(5))
1082           .set_stale_age(grpc_core::Duration::Seconds(1))
1083           .Build());
1084   rls_server_->service_.SetResponse(
1085       BuildRlsRequest({{kTestKey, kTestValue}}),
1086       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)},
1087                        kHeaderData));
1088   // Send one RPC.  RLS server gets a request, and RPC goes to backend.
1089   CheckRpcSendOk(DEBUG_LOCATION,
1090                  RpcOptions().set_metadata({{"key1", kTestValue}}));
1091   EXPECT_EQ(rls_server_->service_.request_count(), 1);
1092   EXPECT_EQ(rls_server_->service_.response_count(), 1);
1093   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1094   // Update RLS server to expect stale request.
1095   rls_server_->service_.RemoveResponse(
1096       BuildRlsRequest({{kTestKey, kTestValue}}));
1097   rls_server_->service_.SetResponse(
1098       BuildRlsRequest({{kTestKey, kTestValue}},
1099                       RouteLookupRequest::REASON_STALE, kHeaderData),
1100       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)},
1101                        kHeaderData));
1102   // Wait longer than stale age.
1103   gpr_sleep_until(grpc_timeout_seconds_to_deadline(2));
1104   // Send another RPC.  This should use the stale value but should
1105   // dispatch a second RLS request.
1106   CheckRpcSendOk(DEBUG_LOCATION,
1107                  RpcOptions().set_metadata({{"key1", kTestValue}}));
1108   EXPECT_EQ(backends_[0]->service_.request_count(), 2);
1109   // Wait for RLS server to receive the second request.
1110   gpr_sleep_until(grpc_timeout_seconds_to_deadline(2));
1111   EXPECT_EQ(rls_server_->service_.request_count(), 2);
1112   EXPECT_EQ(rls_server_->service_.response_count(), 2);
1113 }
1114 
TEST_F(RlsEnd2endTest,ExpiredCacheEntry)1115 TEST_F(RlsEnd2endTest, ExpiredCacheEntry) {
1116   StartBackends(1);
1117   SetNextResolution(
1118       MakeServiceConfigBuilder()
1119           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1120                                          "  \"service\":\"%s\","
1121                                          "  \"method\":\"%s\""
1122                                          "}],"
1123                                          "\"headers\":["
1124                                          "  {"
1125                                          "    \"key\":\"%s\","
1126                                          "    \"names\":["
1127                                          "      \"key1\""
1128                                          "    ]"
1129                                          "  }"
1130                                          "]",
1131                                          kServiceValue, kMethodValue, kTestKey))
1132           .set_max_age(grpc_core::Duration::Seconds(1))
1133           .set_lookup_service_timeout(grpc_core::Duration::Seconds(1))
1134           .Build());
1135   rls_server_->service_.SetResponse(
1136       BuildRlsRequest({{kTestKey, kTestValue}}),
1137       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
1138   // Send one RPC.  RLS server gets a request, and RPC goes to backend.
1139   CheckRpcSendOk(DEBUG_LOCATION,
1140                  RpcOptions().set_metadata({{"key1", kTestValue}}));
1141   EXPECT_EQ(rls_server_->service_.request_count(), 1);
1142   EXPECT_EQ(rls_server_->service_.response_count(), 1);
1143   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1144   // Remove response from RLS server so that the next RLS request fails.
1145   rls_server_->service_.RemoveResponse(
1146       BuildRlsRequest({{kTestKey, kTestValue}}));
1147   // Wait for cache to be expired.
1148   gpr_sleep_until(grpc_timeout_seconds_to_deadline(2));
1149   // Send another RPC.  This should trigger a second RLS request, but
1150   // that fails, so the RPC fails.
1151   CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
1152                       "RLS request failed: INTERNAL: no response entry",
1153                       RpcOptions().set_metadata({{"key1", kTestValue}}));
1154   EXPECT_EQ(rls_server_->service_.request_count(), 2);
1155   EXPECT_EQ(rls_server_->service_.response_count(), 1);
1156   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1157 }
1158 
TEST_F(RlsEnd2endTest,CacheSizeLimit)1159 TEST_F(RlsEnd2endTest, CacheSizeLimit) {
1160   const char* kTestValue2 = "test_value_2";
1161   StartBackends(2);
1162   SetNextResolution(
1163       MakeServiceConfigBuilder()
1164           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1165                                          "  \"service\":\"%s\","
1166                                          "  \"method\":\"%s\""
1167                                          "}],"
1168                                          "\"headers\":["
1169                                          "  {"
1170                                          "    \"key\":\"%s\","
1171                                          "    \"names\":["
1172                                          "      \"key1\""
1173                                          "    ]"
1174                                          "  }"
1175                                          "]",
1176                                          kServiceValue, kMethodValue,
1177                                          kTestKey))
1178           .set_cache_size_bytes(1)  // Not even big enough for one entry.
1179           .Build());
1180   // Set RLS responses for both kTestValue and kTestValue2.
1181   rls_server_->service_.SetResponse(
1182       BuildRlsRequest({{kTestKey, kTestValue}}),
1183       BuildRlsResponse({grpc_core::LocalIpUri(backends_[0]->port_)}));
1184   rls_server_->service_.SetResponse(
1185       BuildRlsRequest({{kTestKey, kTestValue2}}),
1186       BuildRlsResponse({grpc_core::LocalIpUri(backends_[1]->port_)}));
1187   // Send an RPC for kTestValue.
1188   // RLS server gets a request, and RPC goes to backend.
1189   CheckRpcSendOk(DEBUG_LOCATION,
1190                  RpcOptions().set_metadata({{"key1", kTestValue}}));
1191   EXPECT_EQ(rls_server_->service_.request_count(), 1);
1192   EXPECT_EQ(rls_server_->service_.response_count(), 1);
1193   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1194   EXPECT_EQ(backends_[1]->service_.request_count(), 0);
1195   // A second RPC for kTestValue should not generate another RLS
1196   // request, because the cache entry is held by min_eviction_time.
1197   CheckRpcSendOk(DEBUG_LOCATION,
1198                  RpcOptions().set_metadata({{"key1", kTestValue}}));
1199   EXPECT_EQ(rls_server_->service_.request_count(), 1);
1200   EXPECT_EQ(rls_server_->service_.response_count(), 1);
1201   EXPECT_EQ(backends_[0]->service_.request_count(), 2);
1202   EXPECT_EQ(backends_[1]->service_.request_count(), 0);
1203   // Wait for min_eviction_time to elapse.
1204   gpr_sleep_until(grpc_timeout_seconds_to_deadline(6));
1205   // Send a request for kTestValue2.
1206   // RLS server gets a request, and RPC goes to backend.
1207   // This causes the entry for kTestValue to be evicted.
1208   CheckRpcSendOk(DEBUG_LOCATION,
1209                  RpcOptions().set_metadata({{"key1", kTestValue2}}));
1210   EXPECT_EQ(rls_server_->service_.request_count(), 2);
1211   EXPECT_EQ(rls_server_->service_.response_count(), 2);
1212   EXPECT_EQ(backends_[0]->service_.request_count(), 2);
1213   EXPECT_EQ(backends_[1]->service_.request_count(), 1);
1214   // Send another RPC for kTestValue.
1215   // This should now trigger a new RLS request.
1216   CheckRpcSendOk(DEBUG_LOCATION,
1217                  RpcOptions().set_metadata({{"key1", kTestValue}}));
1218   EXPECT_EQ(rls_server_->service_.request_count(), 3);
1219   EXPECT_EQ(rls_server_->service_.response_count(), 3);
1220   EXPECT_EQ(backends_[0]->service_.request_count(), 3);
1221   EXPECT_EQ(backends_[1]->service_.request_count(), 1);
1222   // Another RPC for kTestValue2 should still work due to min_eviction_time.
1223   CheckRpcSendOk(DEBUG_LOCATION,
1224                  RpcOptions().set_metadata({{"key1", kTestValue2}}));
1225   EXPECT_EQ(rls_server_->service_.request_count(), 3);
1226   EXPECT_EQ(rls_server_->service_.response_count(), 3);
1227   EXPECT_EQ(backends_[0]->service_.request_count(), 3);
1228   EXPECT_EQ(backends_[1]->service_.request_count(), 2);
1229 }
1230 
TEST_F(RlsEnd2endTest,MultipleTargets)1231 TEST_F(RlsEnd2endTest, MultipleTargets) {
1232   StartBackends(1);
1233   SetNextResolution(
1234       MakeServiceConfigBuilder()
1235           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1236                                          "  \"service\":\"%s\","
1237                                          "  \"method\":\"%s\""
1238                                          "}],"
1239                                          "\"headers\":["
1240                                          "  {"
1241                                          "    \"key\":\"%s\","
1242                                          "    \"names\":["
1243                                          "      \"key1\""
1244                                          "    ]"
1245                                          "  }"
1246                                          "]",
1247                                          kServiceValue, kMethodValue, kTestKey))
1248           .Build());
1249   rls_server_->service_.SetResponse(
1250       BuildRlsRequest({{kTestKey, kTestValue}}),
1251       BuildRlsResponse(
1252           // Second target will report TRANSIENT_FAILURE, but should
1253           // never be used.
1254           {grpc_core::LocalIpUri(backends_[0]->port_), "invalid_target"}));
1255   CheckRpcSendOk(DEBUG_LOCATION,
1256                  RpcOptions().set_metadata({{"key1", kTestValue}}));
1257   EXPECT_EQ(rls_server_->service_.request_count(), 1);
1258   EXPECT_EQ(rls_server_->service_.response_count(), 1);
1259   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1260 }
1261 
TEST_F(RlsEnd2endTest,MultipleTargetsFirstInTransientFailure)1262 TEST_F(RlsEnd2endTest, MultipleTargetsFirstInTransientFailure) {
1263   StartBackends(1);
1264   SetNextResolution(
1265       MakeServiceConfigBuilder()
1266           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1267                                          "  \"service\":\"%s\","
1268                                          "  \"method\":\"%s\""
1269                                          "}],"
1270                                          "\"headers\":["
1271                                          "  {"
1272                                          "    \"key\":\"%s\","
1273                                          "    \"names\":["
1274                                          "      \"key1\""
1275                                          "    ]"
1276                                          "  }"
1277                                          "]",
1278                                          kServiceValue, kMethodValue, kTestKey))
1279           .Build());
1280   rls_server_->service_.SetResponse(
1281       BuildRlsRequest({{kTestKey, kTestValue}}),
1282       BuildRlsResponse(
1283           // First target will report TRANSIENT_FAILURE.
1284           {"invalid_target", grpc_core::LocalIpUri(backends_[0]->port_)}));
1285   CheckRpcSendOk(DEBUG_LOCATION,
1286                  RpcOptions().set_metadata({{"key1", kTestValue}}));
1287   EXPECT_EQ(rls_server_->service_.request_count(), 1);
1288   EXPECT_EQ(rls_server_->service_.response_count(), 1);
1289   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1290 }
1291 
TEST_F(RlsEnd2endTest,ConnectivityStateReady)1292 TEST_F(RlsEnd2endTest, ConnectivityStateReady) {
1293   StartBackends(1);
1294   SetNextResolution(
1295       MakeServiceConfigBuilder()
1296           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1297                                          "  \"service\":\"%s\","
1298                                          "  \"method\":\"%s\""
1299                                          "}],"
1300                                          "\"headers\":["
1301                                          "  {"
1302                                          "    \"key\":\"%s\","
1303                                          "    \"names\":["
1304                                          "      \"key1\""
1305                                          "    ]"
1306                                          "  }"
1307                                          "]",
1308                                          kServiceValue, kMethodValue, kTestKey))
1309           .Build());
1310   EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(/*try_to_connect=*/false));
1311   rls_server_->service_.SetResponse(
1312       BuildRlsRequest({{kTestKey, kTestValue}}),
1313       BuildRlsResponse(
1314           // One target in TRANSIENT_FAILURE, the other in READY.
1315           {"invalid_target", grpc_core::LocalIpUri(backends_[0]->port_)}));
1316   CheckRpcSendOk(DEBUG_LOCATION,
1317                  RpcOptions().set_metadata({{"key1", kTestValue}}));
1318   EXPECT_EQ(rls_server_->service_.request_count(), 1);
1319   EXPECT_EQ(rls_server_->service_.response_count(), 1);
1320   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1321   EXPECT_EQ(GRPC_CHANNEL_READY, channel_->GetState(/*try_to_connect=*/false));
1322 }
1323 
TEST_F(RlsEnd2endTest,ConnectivityStateIdle)1324 TEST_F(RlsEnd2endTest, ConnectivityStateIdle) {
1325   SetNextResolution(
1326       MakeServiceConfigBuilder()
1327           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1328                                          "  \"service\":\"%s\","
1329                                          "  \"method\":\"%s\""
1330                                          "}],"
1331                                          "\"headers\":["
1332                                          "  {"
1333                                          "    \"key\":\"%s\","
1334                                          "    \"names\":["
1335                                          "      \"key1\""
1336                                          "    ]"
1337                                          "  }"
1338                                          "]",
1339                                          kServiceValue, kMethodValue, kTestKey))
1340           .Build());
1341   EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(/*try_to_connect=*/false));
1342   // RLS server not given any responses, so the request will fail.
1343   CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
1344                       "RLS request failed: INTERNAL: no response entry");
1345   // No child policies, so should be IDLE.
1346   EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(/*try_to_connect=*/false));
1347 }
1348 
TEST_F(RlsEnd2endTest,ConnectivityStateTransientFailure)1349 TEST_F(RlsEnd2endTest, ConnectivityStateTransientFailure) {
1350   SetNextResolution(
1351       MakeServiceConfigBuilder()
1352           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1353                                          "  \"service\":\"%s\","
1354                                          "  \"method\":\"%s\""
1355                                          "}],"
1356                                          "\"headers\":["
1357                                          "  {"
1358                                          "    \"key\":\"%s\","
1359                                          "    \"names\":["
1360                                          "      \"key1\""
1361                                          "    ]"
1362                                          "  }"
1363                                          "]",
1364                                          kServiceValue, kMethodValue, kTestKey))
1365           .Build());
1366   EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(/*try_to_connect=*/false));
1367   rls_server_->service_.SetResponse(BuildRlsRequest({{kTestKey, kTestValue}}),
1368                                     BuildRlsResponse({"invalid_target"}));
1369   CheckRpcSendFailure(
1370       DEBUG_LOCATION, StatusCode::UNAVAILABLE,
1371       "empty address list: no address in fixed_address_lb policy",
1372       RpcOptions().set_metadata({{"key1", kTestValue}}));
1373   EXPECT_EQ(rls_server_->service_.request_count(), 1);
1374   EXPECT_EQ(rls_server_->service_.response_count(), 1);
1375   EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE,
1376             channel_->GetState(/*try_to_connect=*/false));
1377 }
1378 
1379 class RlsMetricsEnd2endTest : public RlsEnd2endTest {
1380  protected:
SetUp()1381   void SetUp() override {
1382     // Register stats plugin before initializing client.
1383     stats_plugin_ = grpc_core::FakeStatsPluginBuilder()
1384                         .UseDisabledByDefaultMetrics(true)
1385                         .BuildAndRegister();
1386     RlsEnd2endTest::SetUp();
1387   }
1388 
1389   std::shared_ptr<grpc_core::FakeStatsPlugin> stats_plugin_;
1390 };
1391 
TEST_F(RlsMetricsEnd2endTest,MetricDefinitionDefaultTargetPicks)1392 TEST_F(RlsMetricsEnd2endTest, MetricDefinitionDefaultTargetPicks) {
1393   const auto* descriptor =
1394       grpc_core::GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
1395           "grpc.lb.rls.default_target_picks");
1396   ASSERT_NE(descriptor, nullptr);
1397   EXPECT_EQ(descriptor->value_type,
1398             grpc_core::GlobalInstrumentsRegistry::ValueType::kUInt64);
1399   EXPECT_EQ(descriptor->instrument_type,
1400             grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCounter);
1401   EXPECT_EQ(descriptor->enable_by_default, false);
1402   EXPECT_EQ(descriptor->name, "grpc.lb.rls.default_target_picks");
1403   EXPECT_EQ(descriptor->unit, "{pick}");
1404   EXPECT_THAT(descriptor->label_keys,
1405               ::testing::ElementsAre("grpc.target", "grpc.lb.rls.server_target",
1406                                      "grpc.lb.rls.data_plane_target",
1407                                      "grpc.lb.pick_result"));
1408   EXPECT_THAT(descriptor->optional_label_keys, ::testing::ElementsAre());
1409 }
1410 
TEST_F(RlsMetricsEnd2endTest,MetricDefinitionTargetPicks)1411 TEST_F(RlsMetricsEnd2endTest, MetricDefinitionTargetPicks) {
1412   const auto* descriptor =
1413       grpc_core::GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
1414           "grpc.lb.rls.target_picks");
1415   ASSERT_NE(descriptor, nullptr);
1416   EXPECT_EQ(descriptor->value_type,
1417             grpc_core::GlobalInstrumentsRegistry::ValueType::kUInt64);
1418   EXPECT_EQ(descriptor->instrument_type,
1419             grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCounter);
1420   EXPECT_EQ(descriptor->enable_by_default, false);
1421   EXPECT_EQ(descriptor->name, "grpc.lb.rls.target_picks");
1422   EXPECT_EQ(descriptor->unit, "{pick}");
1423   EXPECT_THAT(descriptor->label_keys,
1424               ::testing::ElementsAre("grpc.target", "grpc.lb.rls.server_target",
1425                                      "grpc.lb.rls.data_plane_target",
1426                                      "grpc.lb.pick_result"));
1427   EXPECT_THAT(descriptor->optional_label_keys, ::testing::ElementsAre());
1428 }
1429 
TEST_F(RlsMetricsEnd2endTest,MetricDefinitionFailedPicks)1430 TEST_F(RlsMetricsEnd2endTest, MetricDefinitionFailedPicks) {
1431   const auto* descriptor =
1432       grpc_core::GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
1433           "grpc.lb.rls.failed_picks");
1434   ASSERT_NE(descriptor, nullptr);
1435   EXPECT_EQ(descriptor->value_type,
1436             grpc_core::GlobalInstrumentsRegistry::ValueType::kUInt64);
1437   EXPECT_EQ(descriptor->instrument_type,
1438             grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCounter);
1439   EXPECT_EQ(descriptor->enable_by_default, false);
1440   EXPECT_EQ(descriptor->name, "grpc.lb.rls.failed_picks");
1441   EXPECT_EQ(descriptor->unit, "{pick}");
1442   EXPECT_THAT(
1443       descriptor->label_keys,
1444       ::testing::ElementsAre("grpc.target", "grpc.lb.rls.server_target"));
1445   EXPECT_THAT(descriptor->optional_label_keys, ::testing::ElementsAre());
1446 }
1447 
TEST_F(RlsMetricsEnd2endTest,MetricDefinitionCacheEntries)1448 TEST_F(RlsMetricsEnd2endTest, MetricDefinitionCacheEntries) {
1449   const auto* descriptor =
1450       grpc_core::GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
1451           "grpc.lb.rls.cache_entries");
1452   ASSERT_NE(descriptor, nullptr);
1453   EXPECT_EQ(descriptor->value_type,
1454             grpc_core::GlobalInstrumentsRegistry::ValueType::kInt64);
1455   EXPECT_EQ(
1456       descriptor->instrument_type,
1457       grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCallbackGauge);
1458   EXPECT_EQ(descriptor->enable_by_default, false);
1459   EXPECT_EQ(descriptor->name, "grpc.lb.rls.cache_entries");
1460   EXPECT_EQ(descriptor->unit, "{entry}");
1461   EXPECT_THAT(descriptor->label_keys,
1462               ::testing::ElementsAre("grpc.target", "grpc.lb.rls.server_target",
1463                                      "grpc.lb.rls.instance_uuid"));
1464   EXPECT_THAT(descriptor->optional_label_keys, ::testing::ElementsAre());
1465 }
1466 
TEST_F(RlsMetricsEnd2endTest,MetricDefinitionCacheSize)1467 TEST_F(RlsMetricsEnd2endTest, MetricDefinitionCacheSize) {
1468   const auto* descriptor =
1469       grpc_core::GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
1470           "grpc.lb.rls.cache_size");
1471   ASSERT_NE(descriptor, nullptr);
1472   EXPECT_EQ(descriptor->value_type,
1473             grpc_core::GlobalInstrumentsRegistry::ValueType::kInt64);
1474   EXPECT_EQ(
1475       descriptor->instrument_type,
1476       grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCallbackGauge);
1477   EXPECT_EQ(descriptor->enable_by_default, false);
1478   EXPECT_EQ(descriptor->name, "grpc.lb.rls.cache_size");
1479   EXPECT_EQ(descriptor->unit, "By");
1480   EXPECT_THAT(descriptor->label_keys,
1481               ::testing::ElementsAre("grpc.target", "grpc.lb.rls.server_target",
1482                                      "grpc.lb.rls.instance_uuid"));
1483   EXPECT_THAT(descriptor->optional_label_keys, ::testing::ElementsAre());
1484 }
1485 
TEST_F(RlsMetricsEnd2endTest,MetricValues)1486 TEST_F(RlsMetricsEnd2endTest, MetricValues) {
1487   auto kMetricTargetPicks =
1488       grpc_core::GlobalInstrumentsRegistryTestPeer::
1489           FindUInt64CounterHandleByName("grpc.lb.rls.target_picks")
1490               .value();
1491   auto kMetricFailedPicks =
1492       grpc_core::GlobalInstrumentsRegistryTestPeer::
1493           FindUInt64CounterHandleByName("grpc.lb.rls.failed_picks")
1494               .value();
1495   auto kMetricCacheEntries =
1496       grpc_core::GlobalInstrumentsRegistryTestPeer::
1497           FindCallbackInt64GaugeHandleByName("grpc.lb.rls.cache_entries")
1498               .value();
1499   auto kMetricCacheSize =
1500       grpc_core::GlobalInstrumentsRegistryTestPeer::
1501           FindCallbackInt64GaugeHandleByName("grpc.lb.rls.cache_size")
1502               .value();
1503   StartBackends(2);
1504   SetNextResolution(
1505       MakeServiceConfigBuilder()
1506           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1507                                          "  \"service\":\"%s\","
1508                                          "  \"method\":\"%s\""
1509                                          "}],"
1510                                          "\"headers\":["
1511                                          "  {"
1512                                          "    \"key\":\"%s\","
1513                                          "    \"names\":["
1514                                          "      \"key1\""
1515                                          "    ]"
1516                                          "  }"
1517                                          "]",
1518                                          kServiceValue, kMethodValue, kTestKey))
1519           .Build());
1520   const std::string rls_target0 = grpc_core::LocalIpUri(backends_[0]->port_);
1521   const std::string rls_target1 = grpc_core::LocalIpUri(backends_[1]->port_);
1522   // Send an RPC to the target for backend 0.
1523   rls_server_->service_.SetResponse(BuildRlsRequest({{kTestKey, rls_target0}}),
1524                                     BuildRlsResponse({rls_target0}));
1525   CheckRpcSendOk(DEBUG_LOCATION,
1526                  RpcOptions().set_metadata({{"key1", rls_target0}}));
1527   EXPECT_EQ(rls_server_->service_.request_count(), 1);
1528   EXPECT_EQ(rls_server_->service_.response_count(), 1);
1529   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1530   EXPECT_EQ(backends_[1]->service_.request_count(), 0);
1531   // Check exported metrics.
1532   EXPECT_THAT(
1533       stats_plugin_->GetCounterValue(
1534           kMetricTargetPicks,
1535           {target_uri_, rls_server_target_, rls_target0, "complete"}, {}),
1536       ::testing::Optional(1));
1537   EXPECT_THAT(
1538       stats_plugin_->GetCounterValue(
1539           kMetricTargetPicks,
1540           {target_uri_, rls_server_target_, rls_target1, "complete"}, {}),
1541       absl::nullopt);
1542   EXPECT_EQ(stats_plugin_->GetCounterValue(
1543                 kMetricFailedPicks, {target_uri_, rls_server_target_}, {}),
1544             absl::nullopt);
1545   stats_plugin_->TriggerCallbacks();
1546   EXPECT_THAT(stats_plugin_->GetCallbackGaugeValue(
1547                   kMetricCacheEntries,
1548                   {target_uri_, rls_server_target_, kRlsInstanceUuid}, {}),
1549               ::testing::Optional(1));
1550   auto cache_size = stats_plugin_->GetCallbackGaugeValue(
1551       kMetricCacheSize, {target_uri_, rls_server_target_, kRlsInstanceUuid},
1552       {});
1553   EXPECT_THAT(cache_size, ::testing::Optional(::testing::Ge(1)));
1554   // Send an RPC to the target for backend 1.
1555   rls_server_->service_.SetResponse(BuildRlsRequest({{kTestKey, rls_target1}}),
1556                                     BuildRlsResponse({rls_target1}));
1557   CheckRpcSendOk(DEBUG_LOCATION,
1558                  RpcOptions().set_metadata({{"key1", rls_target1}}));
1559   EXPECT_EQ(rls_server_->service_.request_count(), 2);
1560   EXPECT_EQ(rls_server_->service_.response_count(), 2);
1561   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1562   EXPECT_EQ(backends_[1]->service_.request_count(), 1);
1563   // Check exported metrics.
1564   EXPECT_THAT(
1565       stats_plugin_->GetCounterValue(
1566           kMetricTargetPicks,
1567           {target_uri_, rls_server_target_, rls_target0, "complete"}, {}),
1568       ::testing::Optional(1));
1569   EXPECT_THAT(
1570       stats_plugin_->GetCounterValue(
1571           kMetricTargetPicks,
1572           {target_uri_, rls_server_target_, rls_target1, "complete"}, {}),
1573       ::testing::Optional(1));
1574   EXPECT_EQ(stats_plugin_->GetCounterValue(
1575                 kMetricFailedPicks, {target_uri_, rls_server_target_}, {}),
1576             absl::nullopt);
1577   stats_plugin_->TriggerCallbacks();
1578   EXPECT_THAT(stats_plugin_->GetCallbackGaugeValue(
1579                   kMetricCacheEntries,
1580                   {target_uri_, rls_server_target_, kRlsInstanceUuid}, {}),
1581               ::testing::Optional(2));
1582   auto cache_size2 = stats_plugin_->GetCallbackGaugeValue(
1583       kMetricCacheSize, {target_uri_, rls_server_target_, kRlsInstanceUuid},
1584       {});
1585   EXPECT_THAT(cache_size2, ::testing::Optional(::testing::Ge(2)));
1586   if (cache_size.has_value() && cache_size2.has_value()) {
1587     EXPECT_GT(*cache_size2, *cache_size);
1588   }
1589   // Send an RPC for which the RLS server has no response, which means
1590   // that the RLS request will fail.  There is no default target, so the
1591   // data plane RPC will fail.
1592   CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
1593                       "RLS request failed: INTERNAL: no response entry",
1594                       RpcOptions().set_metadata({{"key1", kTestValue}}));
1595   EXPECT_THAT(
1596       rls_server_->service_.GetUnmatchedRequests(),
1597       ::testing::ElementsAre(
1598           // TODO(roth): Change this to use ::testing::ProtoEquals()
1599           // once that becomes available in OSS.
1600           ::testing::Property(
1601               &RouteLookupRequest::DebugString,
1602               BuildRlsRequest({{kTestKey, kTestValue}}).DebugString())));
1603   EXPECT_EQ(rls_server_->service_.request_count(), 3);
1604   EXPECT_EQ(rls_server_->service_.response_count(), 2);
1605   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1606   EXPECT_EQ(backends_[1]->service_.request_count(), 1);
1607   // Check exported metrics.
1608   EXPECT_THAT(
1609       stats_plugin_->GetCounterValue(
1610           kMetricTargetPicks,
1611           {target_uri_, rls_server_target_, rls_target0, "complete"}, {}),
1612       ::testing::Optional(1));
1613   EXPECT_THAT(
1614       stats_plugin_->GetCounterValue(
1615           kMetricTargetPicks,
1616           {target_uri_, rls_server_target_, rls_target1, "complete"}, {}),
1617       ::testing::Optional(1));
1618   EXPECT_THAT(stats_plugin_->GetCounterValue(
1619                   kMetricFailedPicks, {target_uri_, rls_server_target_}, {}),
1620               ::testing::Optional(1));
1621   stats_plugin_->TriggerCallbacks();
1622   EXPECT_THAT(stats_plugin_->GetCallbackGaugeValue(
1623                   kMetricCacheEntries,
1624                   {target_uri_, rls_server_target_, kRlsInstanceUuid}, {}),
1625               ::testing::Optional(3));
1626   auto cache_size3 = stats_plugin_->GetCallbackGaugeValue(
1627       kMetricCacheSize, {target_uri_, rls_server_target_, kRlsInstanceUuid},
1628       {});
1629   EXPECT_THAT(cache_size3, ::testing::Optional(::testing::Ge(3)));
1630   if (cache_size.has_value() && cache_size3.has_value()) {
1631     EXPECT_GT(*cache_size3, *cache_size);
1632   }
1633 }
1634 
TEST_F(RlsMetricsEnd2endTest,MetricValuesDefaultTargetRpcs)1635 TEST_F(RlsMetricsEnd2endTest, MetricValuesDefaultTargetRpcs) {
1636   auto kMetricDefaultTargetPicks =
1637       grpc_core::GlobalInstrumentsRegistryTestPeer::
1638           FindUInt64CounterHandleByName("grpc.lb.rls.default_target_picks")
1639               .value();
1640   StartBackends(1);
1641   const std::string default_target = grpc_core::LocalIpUri(backends_[0]->port_);
1642   SetNextResolution(
1643       MakeServiceConfigBuilder()
1644           .AddKeyBuilder(absl::StrFormat("\"names\":[{"
1645                                          "  \"service\":\"%s\","
1646                                          "  \"method\":\"%s\""
1647                                          "}],"
1648                                          "\"headers\":["
1649                                          "  {"
1650                                          "    \"key\":\"%s\","
1651                                          "    \"names\":["
1652                                          "      \"key1\""
1653                                          "    ]"
1654                                          "  }"
1655                                          "]",
1656                                          kServiceValue, kMethodValue, kTestKey))
1657           .set_default_target(default_target)
1658           .Build());
1659   // Don't give the RLS server a response, so the RLS request will fail.
1660   // The data plane RPC should be sent to the default target.
1661   CheckRpcSendOk(DEBUG_LOCATION,
1662                  RpcOptions().set_metadata({{"key1", kTestValue}}));
1663   EXPECT_THAT(
1664       rls_server_->service_.GetUnmatchedRequests(),
1665       ::testing::ElementsAre(
1666           // TODO(roth): Change this to use ::testing::ProtoEquals()
1667           // once that becomes available in OSS.
1668           ::testing::Property(
1669               &RouteLookupRequest::DebugString,
1670               BuildRlsRequest({{kTestKey, kTestValue}}).DebugString())));
1671   EXPECT_EQ(rls_server_->service_.request_count(), 1);
1672   EXPECT_EQ(rls_server_->service_.response_count(), 0);
1673   EXPECT_EQ(backends_[0]->service_.request_count(), 1);
1674   // Check expected metrics.
1675   EXPECT_THAT(
1676       stats_plugin_->GetCounterValue(
1677           kMetricDefaultTargetPicks,
1678           {target_uri_, rls_server_target_, default_target, "complete"}, {}),
1679       ::testing::Optional(1));
1680 }
1681 
1682 }  // namespace
1683 }  // namespace testing
1684 }  // namespace grpc
1685 
main(int argc,char ** argv)1686 int main(int argc, char** argv) {
1687   ::testing::InitGoogleTest(&argc, argv);
1688   grpc::testing::TestEnvironment env(&argc, argv);
1689   return RUN_ALL_TESTS();
1690 }
1691