xref: /aosp_15_r20/external/grpc-grpc/test/cpp/end2end/xds/xds_end2end_test_lib.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2017 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 //
15 
16 #include "test/cpp/end2end/xds/xds_end2end_test_lib.h"
17 
18 #include <functional>
19 #include <map>
20 #include <memory>
21 #include <set>
22 #include <string>
23 #include <thread>
24 #include <vector>
25 
26 #include <gmock/gmock.h>
27 #include <gtest/gtest.h>
28 
29 #include "absl/memory/memory.h"
30 #include "absl/strings/str_cat.h"
31 #include "absl/strings/str_format.h"
32 #include "absl/strings/str_join.h"
33 #include "absl/strings/string_view.h"
34 #include "absl/types/optional.h"
35 
36 #include <grpcpp/security/tls_certificate_provider.h>
37 
38 #include "src/core/ext/filters/http/server/http_server_filter.h"
39 #include "src/core/ext/xds/xds_channel_args.h"
40 #include "src/core/ext/xds/xds_client_grpc.h"
41 #include "src/core/lib/gpr/tmpfile.h"
42 #include "src/core/lib/gprpp/env.h"
43 #include "src/core/lib/surface/server.h"
44 #include "src/proto/grpc/testing/xds/v3/router.grpc.pb.h"
45 #include "test/core/util/resolve_localhost_ip46.h"
46 #include "test/core/util/tls_utils.h"
47 #include "test/cpp/util/credentials.h"
48 #include "test/cpp/util/tls_test_utils.h"
49 
50 namespace grpc {
51 namespace testing {
52 
53 using ::envoy::config::core::v3::HealthStatus;
54 using ::envoy::service::discovery::v3::DiscoveryRequest;
55 using ::envoy::service::load_stats::v3::LoadStatsRequest;
56 
57 using ::grpc::experimental::ExternalCertificateVerifier;
58 using ::grpc::experimental::IdentityKeyCertPair;
59 using ::grpc::experimental::ServerMetricRecorder;
60 using ::grpc::experimental::StaticDataCertificateProvider;
61 
62 //
63 // XdsEnd2endTest::ServerThread::XdsServingStatusNotifier
64 //
65 
66 void XdsEnd2endTest::ServerThread::XdsServingStatusNotifier::
OnServingStatusUpdate(std::string uri,ServingStatusUpdate update)67     OnServingStatusUpdate(std::string uri, ServingStatusUpdate update) {
68   grpc_core::MutexLock lock(&mu_);
69   status_map[uri] = update.status;
70   cond_.Signal();
71 }
72 
73 void XdsEnd2endTest::ServerThread::XdsServingStatusNotifier::
WaitOnServingStatusChange(std::string uri,grpc::StatusCode expected_status)74     WaitOnServingStatusChange(std::string uri,
75                               grpc::StatusCode expected_status) {
76   grpc_core::MutexLock lock(&mu_);
77   std::map<std::string, grpc::Status>::iterator it;
78   while ((it = status_map.find(uri)) == status_map.end() ||
79          it->second.error_code() != expected_status) {
80     cond_.Wait(&mu_);
81   }
82 }
83 
84 //
85 // XdsEnd2endTest::ServerThread::XdsChannelArgsServerBuilderOption
86 //
87 
88 namespace {
89 
90 // Channel arg pointer vtable for storing xDS channel args in the parent
91 // channel's channel args.
ChannelArgsArgCopy(void * p)92 void* ChannelArgsArgCopy(void* p) {
93   auto* args = static_cast<grpc_channel_args*>(p);
94   return grpc_channel_args_copy(args);
95 }
ChannelArgsArgDestroy(void * p)96 void ChannelArgsArgDestroy(void* p) {
97   auto* args = static_cast<grpc_channel_args*>(p);
98   grpc_channel_args_destroy(args);
99 }
ChannelArgsArgCmp(void * a,void * b)100 int ChannelArgsArgCmp(void* a, void* b) {
101   auto* args_a = static_cast<grpc_channel_args*>(a);
102   auto* args_b = static_cast<grpc_channel_args*>(b);
103   return grpc_channel_args_compare(args_a, args_b);
104 }
105 const grpc_arg_pointer_vtable kChannelArgsArgVtable = {
106     ChannelArgsArgCopy, ChannelArgsArgDestroy, ChannelArgsArgCmp};
107 
108 }  // namespace
109 
110 class XdsEnd2endTest::ServerThread::XdsChannelArgsServerBuilderOption
111     : public grpc::ServerBuilderOption {
112  public:
XdsChannelArgsServerBuilderOption(XdsEnd2endTest * test_obj)113   explicit XdsChannelArgsServerBuilderOption(XdsEnd2endTest* test_obj)
114       : test_obj_(test_obj) {}
115 
UpdateArguments(grpc::ChannelArguments * args)116   void UpdateArguments(grpc::ChannelArguments* args) override {
117     args->SetString(GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_BOOTSTRAP_CONFIG,
118                     test_obj_->bootstrap_);
119     args->SetPointerWithVtable(
120         GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_CLIENT_CHANNEL_ARGS,
121         &test_obj_->xds_channel_args_, &kChannelArgsArgVtable);
122   }
123 
UpdatePlugins(std::vector<std::unique_ptr<grpc::ServerBuilderPlugin>> *)124   void UpdatePlugins(
125       std::vector<std::unique_ptr<grpc::ServerBuilderPlugin>>* /*plugins*/)
126       override {}
127 
128  private:
129   XdsEnd2endTest* test_obj_;
130 };
131 
132 //
133 // XdsEnd2endTest::ServerThread
134 //
135 
Start()136 void XdsEnd2endTest::ServerThread::Start() {
137   gpr_log(GPR_INFO, "starting %s server on port %d", Type(), port_);
138   GPR_ASSERT(!running_);
139   running_ = true;
140   StartAllServices();
141   grpc_core::Mutex mu;
142   // We need to acquire the lock here in order to prevent the notify_one
143   // by ServerThread::Serve from firing before the wait below is hit.
144   grpc_core::MutexLock lock(&mu);
145   grpc_core::CondVar cond;
146   thread_ = std::make_unique<std::thread>(
147       std::bind(&ServerThread::Serve, this, &mu, &cond));
148   cond.Wait(&mu);
149   gpr_log(GPR_INFO, "%s server startup complete", Type());
150 }
151 
Shutdown()152 void XdsEnd2endTest::ServerThread::Shutdown() {
153   if (!running_) return;
154   gpr_log(GPR_INFO, "%s about to shutdown", Type());
155   ShutdownAllServices();
156   server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
157   thread_->join();
158   gpr_log(GPR_INFO, "%s shutdown completed", Type());
159   running_ = false;
160 }
161 
StopListeningAndSendGoaways()162 void XdsEnd2endTest::ServerThread::StopListeningAndSendGoaways() {
163   gpr_log(GPR_INFO, "%s sending GOAWAYs", Type());
164   {
165     grpc_core::ExecCtx exec_ctx;
166     auto* server = grpc_core::Server::FromC(server_->c_server());
167     server->StopListening();
168     server->SendGoaways();
169   }
170   gpr_log(GPR_INFO, "%s done sending GOAWAYs", Type());
171 }
172 
StopListening()173 void XdsEnd2endTest::ServerThread::StopListening() {
174   gpr_log(GPR_INFO, "%s about to stop listening", Type());
175   {
176     grpc_core::ExecCtx exec_ctx;
177     auto* server = grpc_core::Server::FromC(server_->c_server());
178     server->StopListening();
179   }
180   gpr_log(GPR_INFO, "%s stopped listening", Type());
181 }
182 
Serve(grpc_core::Mutex * mu,grpc_core::CondVar * cond)183 void XdsEnd2endTest::ServerThread::Serve(grpc_core::Mutex* mu,
184                                          grpc_core::CondVar* cond) {
185   // We need to acquire the lock here in order to prevent the notify_one
186   // below from firing before its corresponding wait is executed.
187   grpc_core::MutexLock lock(mu);
188   std::string server_address = absl::StrCat("localhost:", port_);
189   if (use_xds_enabled_server_) {
190     XdsServerBuilder builder;
191     if (GetParam().bootstrap_source() ==
192         XdsTestType::kBootstrapFromChannelArg) {
193       builder.SetOption(
194           std::make_unique<XdsChannelArgsServerBuilderOption>(test_obj_));
195     }
196     builder.set_status_notifier(&notifier_);
197     builder.experimental().set_drain_grace_time(
198         test_obj_->xds_drain_grace_time_ms_);
199     builder.AddListeningPort(server_address, Credentials());
200     // Allow gRPC Core's HTTP server to accept PUT requests for testing
201     // purposes.
202     if (allow_put_requests_) {
203       builder.AddChannelArgument(
204           GRPC_ARG_DO_NOT_USE_UNLESS_YOU_HAVE_PERMISSION_FROM_GRPC_TEAM_ALLOW_BROKEN_PUT_REQUESTS,
205           true);
206     }
207     RegisterAllServices(&builder);
208     server_ = builder.BuildAndStart();
209   } else {
210     ServerBuilder builder;
211     builder.AddListeningPort(server_address, Credentials());
212     RegisterAllServices(&builder);
213     server_ = builder.BuildAndStart();
214   }
215   cond->Signal();
216 }
217 
218 //
219 // XdsEnd2endTest::BackendServerThread
220 //
221 
BackendServerThread(XdsEnd2endTest * test_obj,bool use_xds_enabled_server)222 XdsEnd2endTest::BackendServerThread::BackendServerThread(
223     XdsEnd2endTest* test_obj, bool use_xds_enabled_server)
224     : ServerThread(test_obj, use_xds_enabled_server) {
225   if (use_xds_enabled_server) {
226     test_obj->SetServerListenerNameAndRouteConfiguration(
227         test_obj->balancer_.get(), test_obj->default_server_listener_, port(),
228         test_obj->default_server_route_config_);
229   }
230 }
231 
232 std::shared_ptr<ServerCredentials>
Credentials()233 XdsEnd2endTest::BackendServerThread::Credentials() {
234   if (GetParam().use_xds_credentials()) {
235     if (use_xds_enabled_server()) {
236       // We are testing server's use of XdsServerCredentials
237       return XdsServerCredentials(InsecureServerCredentials());
238     } else {
239       // We are testing client's use of XdsCredentials
240       std::string root_cert = grpc_core::testing::GetFileContents(kCaCertPath);
241       std::string identity_cert =
242           grpc_core::testing::GetFileContents(kServerCertPath);
243       std::string private_key =
244           grpc_core::testing::GetFileContents(kServerKeyPath);
245       std::vector<experimental::IdentityKeyCertPair> identity_key_cert_pairs = {
246           {private_key, identity_cert}};
247       auto certificate_provider =
248           std::make_shared<grpc::experimental::StaticDataCertificateProvider>(
249               root_cert, identity_key_cert_pairs);
250       grpc::experimental::TlsServerCredentialsOptions options(
251           certificate_provider);
252       options.watch_root_certs();
253       options.watch_identity_key_cert_pairs();
254       options.set_cert_request_type(
255           GRPC_SSL_REQUEST_CLIENT_CERTIFICATE_AND_VERIFY);
256       return grpc::experimental::TlsServerCredentials(options);
257     }
258   }
259   return ServerThread::Credentials();
260 }
261 
RegisterAllServices(ServerBuilder * builder)262 void XdsEnd2endTest::BackendServerThread::RegisterAllServices(
263     ServerBuilder* builder) {
264   server_metric_recorder_ = ServerMetricRecorder::Create();
265   ServerBuilder::experimental_type(builder).EnableCallMetricRecording(
266       server_metric_recorder_.get());
267   builder->RegisterService(&backend_service_);
268   builder->RegisterService(&backend_service1_);
269   builder->RegisterService(&backend_service2_);
270 }
271 
StartAllServices()272 void XdsEnd2endTest::BackendServerThread::StartAllServices() {
273   backend_service_.Start();
274   backend_service1_.Start();
275   backend_service2_.Start();
276 }
277 
ShutdownAllServices()278 void XdsEnd2endTest::BackendServerThread::ShutdownAllServices() {
279   backend_service_.Shutdown();
280   backend_service1_.Shutdown();
281   backend_service2_.Shutdown();
282 }
283 
284 //
285 // XdsEnd2endTest::BalancerServerThread
286 //
287 
BalancerServerThread(XdsEnd2endTest * test_obj,absl::string_view debug_label)288 XdsEnd2endTest::BalancerServerThread::BalancerServerThread(
289     XdsEnd2endTest* test_obj, absl::string_view debug_label)
290     : ServerThread(test_obj, /*use_xds_enabled_server=*/false),
291       ads_service_(new AdsServiceImpl(
292           // First request must have node set with the right client
293           // features.
294           [&](const DiscoveryRequest& request) {
295             EXPECT_TRUE(request.has_node());
296             EXPECT_THAT(request.node().client_features(),
297                         ::testing::UnorderedElementsAre(
298                             "envoy.lb.does_not_support_overprovisioning",
299                             "xds.config.resource-in-sotw"));
300           },
301           // NACKs must use the right status code.
__anon266f36600302(absl::StatusCode code) 302           [&](absl::StatusCode code) {
303             EXPECT_EQ(code, absl::StatusCode::kInvalidArgument);
304           },
305           debug_label)),
306       lrs_service_(new LrsServiceImpl(
307           (GetParam().enable_load_reporting() ? 20 : 0), {kDefaultClusterName},
308           // Fail if load reporting is used when not enabled.
__anon266f36600402() 309           [&]() { EXPECT_TRUE(GetParam().enable_load_reporting()); },
310           // Make sure we send the client feature saying that we support
311           // send_all_clusters.
__anon266f36600502(const LoadStatsRequest& request) 312           [&](const LoadStatsRequest& request) {
313             EXPECT_THAT(
314                 request.node().client_features(),
315                 ::testing::Contains("envoy.lrs.supports_send_all_clusters"));
316           },
317           debug_label)) {}
318 
RegisterAllServices(ServerBuilder * builder)319 void XdsEnd2endTest::BalancerServerThread::RegisterAllServices(
320     ServerBuilder* builder) {
321   builder->RegisterService(ads_service_.get());
322   builder->RegisterService(lrs_service_.get());
323 }
324 
StartAllServices()325 void XdsEnd2endTest::BalancerServerThread::StartAllServices() {
326   ads_service_->Start();
327   lrs_service_->Start();
328 }
329 
ShutdownAllServices()330 void XdsEnd2endTest::BalancerServerThread::ShutdownAllServices() {
331   ads_service_->Shutdown();
332   lrs_service_->Shutdown();
333 }
334 
335 //
336 // XdsEnd2endTest::RpcOptions
337 //
338 
SetupRpc(ClientContext * context,EchoRequest * request) const339 void XdsEnd2endTest::RpcOptions::SetupRpc(ClientContext* context,
340                                           EchoRequest* request) const {
341   for (const auto& item : metadata) {
342     context->AddMetadata(item.first, item.second);
343   }
344   if (timeout_ms != 0) {
345     context->set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms));
346   }
347   if (wait_for_ready) context->set_wait_for_ready(true);
348   request->set_message(kRequestMessage);
349   if (server_fail) {
350     request->mutable_param()->mutable_expected_error()->set_code(
351         GRPC_STATUS_FAILED_PRECONDITION);
352   }
353   if (server_sleep_us != 0) {
354     request->mutable_param()->set_server_sleep_us(server_sleep_us);
355   }
356   if (client_cancel_after_us != 0) {
357     request->mutable_param()->set_client_cancel_after_us(
358         client_cancel_after_us);
359   }
360   if (skip_cancelled_check) {
361     request->mutable_param()->set_skip_cancelled_check(true);
362   }
363   if (backend_metrics.has_value()) {
364     *request->mutable_param()->mutable_backend_metrics() = *backend_metrics;
365   }
366   if (server_notify_client_when_started) {
367     request->mutable_param()->set_server_notify_client_when_started(true);
368   }
369 }
370 
371 //
372 // XdsEnd2endTest
373 //
374 
375 const char XdsEnd2endTest::kCaCertPath[] = "src/core/tsi/test_creds/ca.pem";
376 const char XdsEnd2endTest::kServerCertPath[] =
377     "src/core/tsi/test_creds/server1.pem";
378 const char XdsEnd2endTest::kServerKeyPath[] =
379     "src/core/tsi/test_creds/server1.key";
380 
381 const char XdsEnd2endTest::kRequestMessage[] = "Live long and prosper.";
382 
XdsEnd2endTest()383 XdsEnd2endTest::XdsEnd2endTest()
384     : balancer_(CreateAndStartBalancer("Default Balancer")) {
385   // Initialize default client-side xDS resources.
386   default_listener_ = XdsResourceUtils::DefaultListener();
387   default_route_config_ = XdsResourceUtils::DefaultRouteConfig();
388   default_cluster_ = XdsResourceUtils::DefaultCluster();
389   if (GetParam().enable_load_reporting()) {
390     default_cluster_.mutable_lrs_server()->mutable_self();
391   }
392   // Initialize client-side resources on balancer.
393   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
394                                    default_route_config_);
395   balancer_->ads_service()->SetCdsResource(default_cluster_);
396   // Initialize default server-side xDS resources.
397   default_server_route_config_ = XdsResourceUtils::DefaultServerRouteConfig();
398   default_server_listener_ = XdsResourceUtils::DefaultServerListener();
399 }
400 
TearDown()401 void XdsEnd2endTest::TearDown() {
402   ShutdownAllBackends();
403   balancer_->Shutdown();
404   // Clear global xDS channel args, since they will go out of scope
405   // when this test object is destroyed.
406   grpc_core::internal::SetXdsChannelArgsForTest(nullptr);
407   grpc_core::UnsetEnv("GRPC_XDS_BOOTSTRAP");
408   grpc_core::UnsetEnv("GRPC_XDS_BOOTSTRAP_CONFIG");
409   if (bootstrap_file_ != nullptr) {
410     remove(bootstrap_file_);
411     gpr_free(bootstrap_file_);
412   }
413 }
414 
415 std::unique_ptr<XdsEnd2endTest::BalancerServerThread>
CreateAndStartBalancer(absl::string_view debug_label)416 XdsEnd2endTest::CreateAndStartBalancer(absl::string_view debug_label) {
417   std::unique_ptr<BalancerServerThread> balancer =
418       std::make_unique<BalancerServerThread>(this, debug_label);
419   balancer->Start();
420   return balancer;
421 }
422 
423 std::vector<XdsEnd2endTest::EdsResourceArgs::Endpoint>
CreateEndpointsForBackends(size_t start_index,size_t stop_index,HealthStatus health_status,int lb_weight)424 XdsEnd2endTest::CreateEndpointsForBackends(size_t start_index,
425                                            size_t stop_index,
426                                            HealthStatus health_status,
427                                            int lb_weight) {
428   if (stop_index == 0) stop_index = backends_.size();
429   std::vector<EdsResourceArgs::Endpoint> endpoints;
430   for (size_t i = start_index; i < stop_index; ++i) {
431     endpoints.emplace_back(CreateEndpoint(i, health_status, lb_weight));
432   }
433   return endpoints;
434 }
435 
ResetBackendCounters(size_t start_index,size_t stop_index)436 void XdsEnd2endTest::ResetBackendCounters(size_t start_index,
437                                           size_t stop_index) {
438   if (stop_index == 0) stop_index = backends_.size();
439   for (size_t i = start_index; i < stop_index; ++i) {
440     backends_[i]->backend_service()->ResetCounters();
441     backends_[i]->backend_service1()->ResetCounters();
442     backends_[i]->backend_service2()->ResetCounters();
443   }
444 }
445 
SeenBackend(size_t backend_idx,const RpcService rpc_service)446 bool XdsEnd2endTest::SeenBackend(size_t backend_idx,
447                                  const RpcService rpc_service) {
448   switch (rpc_service) {
449     case SERVICE_ECHO:
450       if (backends_[backend_idx]->backend_service()->request_count() == 0) {
451         return false;
452       }
453       break;
454     case SERVICE_ECHO1:
455       if (backends_[backend_idx]->backend_service1()->request_count() == 0) {
456         return false;
457       }
458       break;
459     case SERVICE_ECHO2:
460       if (backends_[backend_idx]->backend_service2()->request_count() == 0) {
461         return false;
462       }
463       break;
464   }
465   return true;
466 }
467 
SeenAllBackends(size_t start_index,size_t stop_index,const RpcService rpc_service)468 bool XdsEnd2endTest::SeenAllBackends(size_t start_index, size_t stop_index,
469                                      const RpcService rpc_service) {
470   if (stop_index == 0) stop_index = backends_.size();
471   for (size_t i = start_index; i < stop_index; ++i) {
472     if (!SeenBackend(i, rpc_service)) {
473       return false;
474     }
475   }
476   return true;
477 }
478 
GetBackendPorts(size_t start_index,size_t stop_index) const479 std::vector<int> XdsEnd2endTest::GetBackendPorts(size_t start_index,
480                                                  size_t stop_index) const {
481   if (stop_index == 0) stop_index = backends_.size();
482   std::vector<int> backend_ports;
483   for (size_t i = start_index; i < stop_index; ++i) {
484     backend_ports.push_back(backends_[i]->port());
485   }
486   return backend_ports;
487 }
488 
InitClient(absl::optional<XdsBootstrapBuilder> builder,std::string lb_expected_authority,int xds_resource_does_not_exist_timeout_ms)489 void XdsEnd2endTest::InitClient(absl::optional<XdsBootstrapBuilder> builder,
490                                 std::string lb_expected_authority,
491                                 int xds_resource_does_not_exist_timeout_ms) {
492   if (!builder.has_value()) {
493     builder = MakeBootstrapBuilder();
494   }
495   if (xds_resource_does_not_exist_timeout_ms > 0) {
496     xds_channel_args_to_add_.emplace_back(grpc_channel_arg_integer_create(
497         const_cast<char*>(GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS),
498         xds_resource_does_not_exist_timeout_ms));
499   }
500   if (!lb_expected_authority.empty()) {
501     constexpr char authority_const[] = "localhost:%d";
502     if (lb_expected_authority == authority_const) {
503       lb_expected_authority =
504           absl::StrFormat(authority_const, balancer_->port());
505     }
506     xds_channel_args_to_add_.emplace_back(grpc_channel_arg_string_create(
507         const_cast<char*>(GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS),
508         const_cast<char*>(lb_expected_authority.c_str())));
509   }
510   xds_channel_args_.num_args = xds_channel_args_to_add_.size();
511   xds_channel_args_.args = xds_channel_args_to_add_.data();
512   bootstrap_ = builder->Build();
513   if (GetParam().bootstrap_source() == XdsTestType::kBootstrapFromEnvVar) {
514     grpc_core::SetEnv("GRPC_XDS_BOOTSTRAP_CONFIG", bootstrap_.c_str());
515   } else if (GetParam().bootstrap_source() == XdsTestType::kBootstrapFromFile) {
516     FILE* out = gpr_tmpfile("xds_bootstrap_v3", &bootstrap_file_);
517     fputs(bootstrap_.c_str(), out);
518     fclose(out);
519     grpc_core::SetEnv("GRPC_XDS_BOOTSTRAP", bootstrap_file_);
520   }
521   if (GetParam().bootstrap_source() != XdsTestType::kBootstrapFromChannelArg) {
522     // If getting bootstrap from channel arg, we'll pass these args in
523     // via the parent channel args in CreateChannel() instead.
524     grpc_core::internal::SetXdsChannelArgsForTest(&xds_channel_args_);
525     // Make sure each test creates a new XdsClient instance rather than
526     // reusing the one from the previous test.  This avoids spurious failures
527     // caused when a load reporting test runs after a non-load reporting test
528     // and the XdsClient is still talking to the old LRS server, which fails
529     // because it's not expecting the client to connect.  It also
530     // ensures that each test can independently set the global channel
531     // args for the xDS channel.
532     grpc_core::internal::UnsetGlobalXdsClientsForTest();
533   }
534   // Create channel and stub.
535   ResetStub();
536 }
537 
ResetStub(int failover_timeout_ms,ChannelArguments * args)538 void XdsEnd2endTest::ResetStub(int failover_timeout_ms,
539                                ChannelArguments* args) {
540   channel_ = CreateChannel(failover_timeout_ms, kServerName, "", args);
541   stub_ = grpc::testing::EchoTestService::NewStub(channel_);
542   stub1_ = grpc::testing::EchoTest1Service::NewStub(channel_);
543   stub2_ = grpc::testing::EchoTest2Service::NewStub(channel_);
544 }
545 
CreateChannel(int failover_timeout_ms,const char * server_name,const char * xds_authority,ChannelArguments * args)546 std::shared_ptr<Channel> XdsEnd2endTest::CreateChannel(
547     int failover_timeout_ms, const char* server_name, const char* xds_authority,
548     ChannelArguments* args) {
549   ChannelArguments local_args;
550   if (args == nullptr) args = &local_args;
551   // TODO(roth): Remove this once we enable retries by default internally.
552   args->SetInt(GRPC_ARG_ENABLE_RETRIES, 1);
553   if (failover_timeout_ms > 0) {
554     args->SetInt(GRPC_ARG_PRIORITY_FAILOVER_TIMEOUT_MS,
555                  failover_timeout_ms * grpc_test_slowdown_factor());
556   }
557   if (GetParam().bootstrap_source() == XdsTestType::kBootstrapFromChannelArg) {
558     // We're getting the bootstrap from a channel arg, so we do the
559     // same thing for the response generator to use for the xDS
560     // channel and the xDS resource-does-not-exist timeout value.
561     args->SetString(GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_BOOTSTRAP_CONFIG,
562                     bootstrap_);
563     args->SetPointerWithVtable(
564         GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_CLIENT_CHANNEL_ARGS,
565         &xds_channel_args_, &kChannelArgsArgVtable);
566   }
567   std::vector<absl::string_view> parts = {"xds:"};
568   if (xds_authority != nullptr && xds_authority[0] != '\0') {
569     parts.emplace_back("//");
570     parts.emplace_back(xds_authority);
571     parts.emplace_back("/");
572   }
573   parts.emplace_back(server_name);
574   std::string uri = absl::StrJoin(parts, "");
575   std::shared_ptr<ChannelCredentials> channel_creds =
576       GetParam().use_xds_credentials()
577           ? XdsCredentials(CreateTlsFallbackCredentials())
578           : std::make_shared<FakeTransportSecurityChannelCredentials>();
579   return grpc::CreateCustomChannel(uri, channel_creds, *args);
580 }
581 
SendRpc(const RpcOptions & rpc_options,EchoResponse * response,std::multimap<std::string,std::string> * server_initial_metadata)582 Status XdsEnd2endTest::SendRpc(
583     const RpcOptions& rpc_options, EchoResponse* response,
584     std::multimap<std::string, std::string>* server_initial_metadata) {
585   EchoResponse local_response;
586   if (response == nullptr) response = &local_response;
587   ClientContext context;
588   EchoRequest request;
589   if (rpc_options.server_expected_error != StatusCode::OK) {
590     auto* error = request.mutable_param()->mutable_expected_error();
591     error->set_code(rpc_options.server_expected_error);
592   }
593   rpc_options.SetupRpc(&context, &request);
594   Status status;
595   switch (rpc_options.service) {
596     case SERVICE_ECHO:
597       status =
598           SendRpcMethod(stub_.get(), rpc_options, &context, request, response);
599       break;
600     case SERVICE_ECHO1:
601       status =
602           SendRpcMethod(stub1_.get(), rpc_options, &context, request, response);
603       break;
604     case SERVICE_ECHO2:
605       status =
606           SendRpcMethod(stub2_.get(), rpc_options, &context, request, response);
607       break;
608   }
609   if (server_initial_metadata != nullptr) {
610     for (const auto& it : context.GetServerInitialMetadata()) {
611       std::string header(it.first.data(), it.first.size());
612       // Guard against implementation-specific header case - RFC 2616
613       absl::AsciiStrToLower(&header);
614       server_initial_metadata->emplace(
615           header, std::string(it.second.data(), it.second.size()));
616     }
617   }
618   return status;
619 }
620 
SendRpcsUntil(const grpc_core::DebugLocation & debug_location,std::function<bool (const RpcResult &)> continue_predicate,int timeout_ms,const RpcOptions & rpc_options)621 void XdsEnd2endTest::SendRpcsUntil(
622     const grpc_core::DebugLocation& debug_location,
623     std::function<bool(const RpcResult&)> continue_predicate, int timeout_ms,
624     const RpcOptions& rpc_options) {
625   absl::Time deadline = absl::InfiniteFuture();
626   if (timeout_ms != 0) {
627     deadline = absl::Now() +
628                (absl::Milliseconds(timeout_ms) * grpc_test_slowdown_factor());
629   }
630   while (true) {
631     RpcResult result;
632     result.status = SendRpc(rpc_options, &result.response);
633     if (!continue_predicate(result)) return;
634     EXPECT_LE(absl::Now(), deadline)
635         << debug_location.file() << ":" << debug_location.line();
636     if (absl::Now() >= deadline) break;
637   }
638 }
639 
CheckRpcSendOk(const grpc_core::DebugLocation & debug_location,const size_t times,const RpcOptions & rpc_options)640 void XdsEnd2endTest::CheckRpcSendOk(
641     const grpc_core::DebugLocation& debug_location, const size_t times,
642     const RpcOptions& rpc_options) {
643   SendRpcsUntil(
644       debug_location,
645       [debug_location, times, n = size_t{0}](const RpcResult& result) mutable {
646         EXPECT_TRUE(result.status.ok())
647             << "code=" << result.status.error_code()
648             << " message=" << result.status.error_message() << " at "
649             << debug_location.file() << ":" << debug_location.line();
650         EXPECT_EQ(result.response.message(), kRequestMessage);
651         return ++n < times;
652       },
653       /*timeout_ms=*/0, rpc_options);
654 }
655 
CheckRpcSendFailure(const grpc_core::DebugLocation & debug_location,StatusCode expected_status,absl::string_view expected_message_regex,const RpcOptions & rpc_options)656 void XdsEnd2endTest::CheckRpcSendFailure(
657     const grpc_core::DebugLocation& debug_location, StatusCode expected_status,
658     absl::string_view expected_message_regex, const RpcOptions& rpc_options) {
659   const Status status = SendRpc(rpc_options);
660   EXPECT_FALSE(status.ok())
661       << debug_location.file() << ":" << debug_location.line();
662   EXPECT_EQ(expected_status, status.error_code())
663       << debug_location.file() << ":" << debug_location.line();
664   EXPECT_THAT(status.error_message(),
665               ::testing::MatchesRegex(expected_message_regex))
666       << debug_location.file() << ":" << debug_location.line();
667 }
668 
SendRpcsAndCountFailuresWithMessage(const grpc_core::DebugLocation & debug_location,size_t num_rpcs,StatusCode expected_status,absl::string_view expected_message_prefix,const RpcOptions & rpc_options)669 size_t XdsEnd2endTest::SendRpcsAndCountFailuresWithMessage(
670     const grpc_core::DebugLocation& debug_location, size_t num_rpcs,
671     StatusCode expected_status, absl::string_view expected_message_prefix,
672     const RpcOptions& rpc_options) {
673   size_t num_failed = 0;
674   SendRpcsUntil(
675       debug_location,
676       [&, n = size_t{0}](const RpcResult& result) mutable {
677         if (!result.status.ok()) {
678           EXPECT_EQ(result.status.error_code(), expected_status)
679               << debug_location.file() << ":" << debug_location.line();
680           EXPECT_THAT(result.status.error_message(),
681                       ::testing::StartsWith(expected_message_prefix))
682               << debug_location.file() << ":" << debug_location.line();
683           ++num_failed;
684         }
685         return ++n < num_rpcs;
686       },
687       /*timeout_ms=*/0, rpc_options);
688   return num_failed;
689 }
690 
StartRpc(grpc::testing::EchoTestService::Stub * stub,const RpcOptions & rpc_options)691 void XdsEnd2endTest::LongRunningRpc::StartRpc(
692     grpc::testing::EchoTestService::Stub* stub, const RpcOptions& rpc_options) {
693   sender_thread_ = std::thread([this, stub, rpc_options]() {
694     EchoRequest request;
695     EchoResponse response;
696     rpc_options.SetupRpc(&context_, &request);
697     status_ = stub->Echo(&context_, request, &response);
698   });
699 }
700 
CancelRpc()701 void XdsEnd2endTest::LongRunningRpc::CancelRpc() {
702   context_.TryCancel();
703   if (sender_thread_.joinable()) sender_thread_.join();
704 }
705 
GetStatus()706 Status XdsEnd2endTest::LongRunningRpc::GetStatus() {
707   if (sender_thread_.joinable()) sender_thread_.join();
708   return status_;
709 }
710 
SendConcurrentRpcs(const grpc_core::DebugLocation & debug_location,grpc::testing::EchoTestService::Stub * stub,size_t num_rpcs,const RpcOptions & rpc_options)711 std::vector<XdsEnd2endTest::ConcurrentRpc> XdsEnd2endTest::SendConcurrentRpcs(
712     const grpc_core::DebugLocation& debug_location,
713     grpc::testing::EchoTestService::Stub* stub, size_t num_rpcs,
714     const RpcOptions& rpc_options) {
715   // Variables for RPCs.
716   std::vector<ConcurrentRpc> rpcs(num_rpcs);
717   EchoRequest request;
718   // Variables for synchronization
719   grpc_core::Mutex mu;
720   grpc_core::CondVar cv;
721   size_t completed = 0;
722   // Set-off callback RPCs
723   for (size_t i = 0; i < num_rpcs; i++) {
724     ConcurrentRpc* rpc = &rpcs[i];
725     rpc_options.SetupRpc(&rpc->context, &request);
726     grpc_core::Timestamp t0 = NowFromCycleCounter();
727     stub->async()->Echo(&rpc->context, &request, &rpc->response,
728                         [rpc, &mu, &completed, &cv, num_rpcs, t0](Status s) {
729                           rpc->status = s;
730                           rpc->elapsed_time = NowFromCycleCounter() - t0;
731                           bool done;
732                           {
733                             grpc_core::MutexLock lock(&mu);
734                             done = (++completed) == num_rpcs;
735                           }
736                           if (done) cv.Signal();
737                         });
738   }
739   {
740     grpc_core::MutexLock lock(&mu);
741     cv.Wait(&mu);
742   }
743   EXPECT_EQ(completed, num_rpcs)
744       << " at " << debug_location.file() << ":" << debug_location.line();
745   return rpcs;
746 }
747 
WaitForAllBackends(const grpc_core::DebugLocation & debug_location,size_t start_index,size_t stop_index,std::function<void (const RpcResult &)> check_status,const WaitForBackendOptions & wait_options,const RpcOptions & rpc_options)748 size_t XdsEnd2endTest::WaitForAllBackends(
749     const grpc_core::DebugLocation& debug_location, size_t start_index,
750     size_t stop_index, std::function<void(const RpcResult&)> check_status,
751     const WaitForBackendOptions& wait_options, const RpcOptions& rpc_options) {
752   if (check_status == nullptr) {
753     check_status = [&](const RpcResult& result) {
754       EXPECT_TRUE(result.status.ok())
755           << "code=" << result.status.error_code()
756           << " message=" << result.status.error_message() << " at "
757           << debug_location.file() << ":" << debug_location.line();
758     };
759   }
760   gpr_log(GPR_INFO,
761           "========= WAITING FOR BACKENDS [%" PRIuPTR ", %" PRIuPTR
762           ") ==========",
763           start_index, stop_index);
764   size_t num_rpcs = 0;
765   SendRpcsUntil(
766       debug_location,
767       [&](const RpcResult& result) {
768         ++num_rpcs;
769         check_status(result);
770         return !SeenAllBackends(start_index, stop_index, rpc_options.service);
771       },
772       wait_options.timeout_ms, rpc_options);
773   if (wait_options.reset_counters) ResetBackendCounters();
774   gpr_log(GPR_INFO, "Backends up; sent %" PRIuPTR " warm up requests",
775           num_rpcs);
776   return num_rpcs;
777 }
778 
WaitForNack(const grpc_core::DebugLocation & debug_location,std::function<absl::optional<AdsServiceImpl::ResponseState> ()> get_state,const RpcOptions & rpc_options,StatusCode expected_status)779 absl::optional<AdsServiceImpl::ResponseState> XdsEnd2endTest::WaitForNack(
780     const grpc_core::DebugLocation& debug_location,
781     std::function<absl::optional<AdsServiceImpl::ResponseState>()> get_state,
782     const RpcOptions& rpc_options, StatusCode expected_status) {
783   absl::optional<AdsServiceImpl::ResponseState> response_state;
784   auto deadline =
785       absl::Now() + (absl::Seconds(30) * grpc_test_slowdown_factor());
786   auto continue_predicate = [&]() {
787     if (absl::Now() >= deadline) {
788       return false;
789     }
790     response_state = get_state();
791     return !response_state.has_value() ||
792            response_state->state != AdsServiceImpl::ResponseState::NACKED;
793   };
794   do {
795     const Status status = SendRpc(rpc_options);
796     EXPECT_EQ(expected_status, status.error_code())
797         << "code=" << status.error_code()
798         << " message=" << status.error_message() << " at "
799         << debug_location.file() << ":" << debug_location.line();
800   } while (continue_predicate());
801   return response_state;
802 }
803 
SetProtoDuration(grpc_core::Duration duration,google::protobuf::Duration * duration_proto)804 void XdsEnd2endTest::SetProtoDuration(
805     grpc_core::Duration duration, google::protobuf::Duration* duration_proto) {
806   duration *= grpc_test_slowdown_factor();
807   gpr_timespec ts = duration.as_timespec();
808   duration_proto->set_seconds(ts.tv_sec);
809   duration_proto->set_nanos(ts.tv_nsec);
810 }
811 
MakeConnectionFailureRegex(absl::string_view prefix)812 std::string XdsEnd2endTest::MakeConnectionFailureRegex(
813     absl::string_view prefix) {
814   return absl::StrCat(
815       prefix,
816       "(UNKNOWN|UNAVAILABLE): (ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: "
817       "(Failed to connect to remote host: )?"
818       "(Connection refused|Connection reset by peer|"
819       "recvmsg:Connection reset by peer|"
820       "getsockopt\\(SO\\_ERROR\\): Connection reset by peer|"
821       "Socket closed|FD shutdown)");
822 }
823 
ReadTlsIdentityPair(const char * key_path,const char * cert_path)824 grpc_core::PemKeyCertPairList XdsEnd2endTest::ReadTlsIdentityPair(
825     const char* key_path, const char* cert_path) {
826   return grpc_core::PemKeyCertPairList{grpc_core::PemKeyCertPair(
827       grpc_core::testing::GetFileContents(key_path),
828       grpc_core::testing::GetFileContents(cert_path))};
829 }
830 
831 std::shared_ptr<ChannelCredentials>
CreateTlsFallbackCredentials()832 XdsEnd2endTest::CreateTlsFallbackCredentials() {
833   IdentityKeyCertPair key_cert_pair;
834   key_cert_pair.private_key =
835       grpc_core::testing::GetFileContents(kServerKeyPath);
836   key_cert_pair.certificate_chain =
837       grpc_core::testing::GetFileContents(kServerCertPath);
838   std::vector<IdentityKeyCertPair> identity_key_cert_pairs;
839   identity_key_cert_pairs.emplace_back(key_cert_pair);
840   auto certificate_provider = std::make_shared<StaticDataCertificateProvider>(
841       grpc_core::testing::GetFileContents(kCaCertPath),
842       identity_key_cert_pairs);
843   grpc::experimental::TlsChannelCredentialsOptions options;
844   options.set_certificate_provider(std::move(certificate_provider));
845   options.watch_root_certs();
846   options.watch_identity_key_cert_pairs();
847   auto verifier =
848       ExternalCertificateVerifier::Create<SyncCertificateVerifier>(true);
849   options.set_certificate_verifier(std::move(verifier));
850   options.set_verify_server_certs(true);
851   options.set_check_call_host(false);
852   auto channel_creds = grpc::experimental::TlsCredentials(options);
853   GPR_ASSERT(channel_creds.get() != nullptr);
854   return channel_creds;
855 }
856 
857 }  // namespace testing
858 }  // namespace grpc
859