1 // Copyright 2017 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 //
15
16 #include "test/cpp/end2end/xds/xds_end2end_test_lib.h"
17
18 #include <functional>
19 #include <map>
20 #include <memory>
21 #include <set>
22 #include <string>
23 #include <thread>
24 #include <vector>
25
26 #include <gmock/gmock.h>
27 #include <gtest/gtest.h>
28
29 #include "absl/memory/memory.h"
30 #include "absl/strings/str_cat.h"
31 #include "absl/strings/str_format.h"
32 #include "absl/strings/str_join.h"
33 #include "absl/strings/string_view.h"
34 #include "absl/types/optional.h"
35
36 #include <grpcpp/security/tls_certificate_provider.h>
37
38 #include "src/core/ext/filters/http/server/http_server_filter.h"
39 #include "src/core/ext/xds/xds_channel_args.h"
40 #include "src/core/ext/xds/xds_client_grpc.h"
41 #include "src/core/lib/gpr/tmpfile.h"
42 #include "src/core/lib/gprpp/env.h"
43 #include "src/core/lib/surface/server.h"
44 #include "src/proto/grpc/testing/xds/v3/router.grpc.pb.h"
45 #include "test/core/util/resolve_localhost_ip46.h"
46 #include "test/core/util/tls_utils.h"
47 #include "test/cpp/util/credentials.h"
48 #include "test/cpp/util/tls_test_utils.h"
49
50 namespace grpc {
51 namespace testing {
52
53 using ::envoy::config::core::v3::HealthStatus;
54 using ::envoy::service::discovery::v3::DiscoveryRequest;
55 using ::envoy::service::load_stats::v3::LoadStatsRequest;
56
57 using ::grpc::experimental::ExternalCertificateVerifier;
58 using ::grpc::experimental::IdentityKeyCertPair;
59 using ::grpc::experimental::ServerMetricRecorder;
60 using ::grpc::experimental::StaticDataCertificateProvider;
61
62 //
63 // XdsEnd2endTest::ServerThread::XdsServingStatusNotifier
64 //
65
66 void XdsEnd2endTest::ServerThread::XdsServingStatusNotifier::
OnServingStatusUpdate(std::string uri,ServingStatusUpdate update)67 OnServingStatusUpdate(std::string uri, ServingStatusUpdate update) {
68 grpc_core::MutexLock lock(&mu_);
69 status_map[uri] = update.status;
70 cond_.Signal();
71 }
72
73 void XdsEnd2endTest::ServerThread::XdsServingStatusNotifier::
WaitOnServingStatusChange(std::string uri,grpc::StatusCode expected_status)74 WaitOnServingStatusChange(std::string uri,
75 grpc::StatusCode expected_status) {
76 grpc_core::MutexLock lock(&mu_);
77 std::map<std::string, grpc::Status>::iterator it;
78 while ((it = status_map.find(uri)) == status_map.end() ||
79 it->second.error_code() != expected_status) {
80 cond_.Wait(&mu_);
81 }
82 }
83
84 //
85 // XdsEnd2endTest::ServerThread::XdsChannelArgsServerBuilderOption
86 //
87
88 namespace {
89
90 // Channel arg pointer vtable for storing xDS channel args in the parent
91 // channel's channel args.
ChannelArgsArgCopy(void * p)92 void* ChannelArgsArgCopy(void* p) {
93 auto* args = static_cast<grpc_channel_args*>(p);
94 return grpc_channel_args_copy(args);
95 }
ChannelArgsArgDestroy(void * p)96 void ChannelArgsArgDestroy(void* p) {
97 auto* args = static_cast<grpc_channel_args*>(p);
98 grpc_channel_args_destroy(args);
99 }
ChannelArgsArgCmp(void * a,void * b)100 int ChannelArgsArgCmp(void* a, void* b) {
101 auto* args_a = static_cast<grpc_channel_args*>(a);
102 auto* args_b = static_cast<grpc_channel_args*>(b);
103 return grpc_channel_args_compare(args_a, args_b);
104 }
105 const grpc_arg_pointer_vtable kChannelArgsArgVtable = {
106 ChannelArgsArgCopy, ChannelArgsArgDestroy, ChannelArgsArgCmp};
107
108 } // namespace
109
110 class XdsEnd2endTest::ServerThread::XdsChannelArgsServerBuilderOption
111 : public grpc::ServerBuilderOption {
112 public:
XdsChannelArgsServerBuilderOption(XdsEnd2endTest * test_obj)113 explicit XdsChannelArgsServerBuilderOption(XdsEnd2endTest* test_obj)
114 : test_obj_(test_obj) {}
115
UpdateArguments(grpc::ChannelArguments * args)116 void UpdateArguments(grpc::ChannelArguments* args) override {
117 args->SetString(GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_BOOTSTRAP_CONFIG,
118 test_obj_->bootstrap_);
119 args->SetPointerWithVtable(
120 GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_CLIENT_CHANNEL_ARGS,
121 &test_obj_->xds_channel_args_, &kChannelArgsArgVtable);
122 }
123
UpdatePlugins(std::vector<std::unique_ptr<grpc::ServerBuilderPlugin>> *)124 void UpdatePlugins(
125 std::vector<std::unique_ptr<grpc::ServerBuilderPlugin>>* /*plugins*/)
126 override {}
127
128 private:
129 XdsEnd2endTest* test_obj_;
130 };
131
132 //
133 // XdsEnd2endTest::ServerThread
134 //
135
Start()136 void XdsEnd2endTest::ServerThread::Start() {
137 gpr_log(GPR_INFO, "starting %s server on port %d", Type(), port_);
138 GPR_ASSERT(!running_);
139 running_ = true;
140 StartAllServices();
141 grpc_core::Mutex mu;
142 // We need to acquire the lock here in order to prevent the notify_one
143 // by ServerThread::Serve from firing before the wait below is hit.
144 grpc_core::MutexLock lock(&mu);
145 grpc_core::CondVar cond;
146 thread_ = std::make_unique<std::thread>(
147 std::bind(&ServerThread::Serve, this, &mu, &cond));
148 cond.Wait(&mu);
149 gpr_log(GPR_INFO, "%s server startup complete", Type());
150 }
151
Shutdown()152 void XdsEnd2endTest::ServerThread::Shutdown() {
153 if (!running_) return;
154 gpr_log(GPR_INFO, "%s about to shutdown", Type());
155 ShutdownAllServices();
156 server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
157 thread_->join();
158 gpr_log(GPR_INFO, "%s shutdown completed", Type());
159 running_ = false;
160 }
161
StopListeningAndSendGoaways()162 void XdsEnd2endTest::ServerThread::StopListeningAndSendGoaways() {
163 gpr_log(GPR_INFO, "%s sending GOAWAYs", Type());
164 {
165 grpc_core::ExecCtx exec_ctx;
166 auto* server = grpc_core::Server::FromC(server_->c_server());
167 server->StopListening();
168 server->SendGoaways();
169 }
170 gpr_log(GPR_INFO, "%s done sending GOAWAYs", Type());
171 }
172
StopListening()173 void XdsEnd2endTest::ServerThread::StopListening() {
174 gpr_log(GPR_INFO, "%s about to stop listening", Type());
175 {
176 grpc_core::ExecCtx exec_ctx;
177 auto* server = grpc_core::Server::FromC(server_->c_server());
178 server->StopListening();
179 }
180 gpr_log(GPR_INFO, "%s stopped listening", Type());
181 }
182
Serve(grpc_core::Mutex * mu,grpc_core::CondVar * cond)183 void XdsEnd2endTest::ServerThread::Serve(grpc_core::Mutex* mu,
184 grpc_core::CondVar* cond) {
185 // We need to acquire the lock here in order to prevent the notify_one
186 // below from firing before its corresponding wait is executed.
187 grpc_core::MutexLock lock(mu);
188 std::string server_address = absl::StrCat("localhost:", port_);
189 if (use_xds_enabled_server_) {
190 XdsServerBuilder builder;
191 if (GetParam().bootstrap_source() ==
192 XdsTestType::kBootstrapFromChannelArg) {
193 builder.SetOption(
194 std::make_unique<XdsChannelArgsServerBuilderOption>(test_obj_));
195 }
196 builder.set_status_notifier(¬ifier_);
197 builder.experimental().set_drain_grace_time(
198 test_obj_->xds_drain_grace_time_ms_);
199 builder.AddListeningPort(server_address, Credentials());
200 // Allow gRPC Core's HTTP server to accept PUT requests for testing
201 // purposes.
202 if (allow_put_requests_) {
203 builder.AddChannelArgument(
204 GRPC_ARG_DO_NOT_USE_UNLESS_YOU_HAVE_PERMISSION_FROM_GRPC_TEAM_ALLOW_BROKEN_PUT_REQUESTS,
205 true);
206 }
207 RegisterAllServices(&builder);
208 server_ = builder.BuildAndStart();
209 } else {
210 ServerBuilder builder;
211 builder.AddListeningPort(server_address, Credentials());
212 RegisterAllServices(&builder);
213 server_ = builder.BuildAndStart();
214 }
215 cond->Signal();
216 }
217
218 //
219 // XdsEnd2endTest::BackendServerThread
220 //
221
BackendServerThread(XdsEnd2endTest * test_obj,bool use_xds_enabled_server)222 XdsEnd2endTest::BackendServerThread::BackendServerThread(
223 XdsEnd2endTest* test_obj, bool use_xds_enabled_server)
224 : ServerThread(test_obj, use_xds_enabled_server) {
225 if (use_xds_enabled_server) {
226 test_obj->SetServerListenerNameAndRouteConfiguration(
227 test_obj->balancer_.get(), test_obj->default_server_listener_, port(),
228 test_obj->default_server_route_config_);
229 }
230 }
231
232 std::shared_ptr<ServerCredentials>
Credentials()233 XdsEnd2endTest::BackendServerThread::Credentials() {
234 if (GetParam().use_xds_credentials()) {
235 if (use_xds_enabled_server()) {
236 // We are testing server's use of XdsServerCredentials
237 return XdsServerCredentials(InsecureServerCredentials());
238 } else {
239 // We are testing client's use of XdsCredentials
240 std::string root_cert = grpc_core::testing::GetFileContents(kCaCertPath);
241 std::string identity_cert =
242 grpc_core::testing::GetFileContents(kServerCertPath);
243 std::string private_key =
244 grpc_core::testing::GetFileContents(kServerKeyPath);
245 std::vector<experimental::IdentityKeyCertPair> identity_key_cert_pairs = {
246 {private_key, identity_cert}};
247 auto certificate_provider =
248 std::make_shared<grpc::experimental::StaticDataCertificateProvider>(
249 root_cert, identity_key_cert_pairs);
250 grpc::experimental::TlsServerCredentialsOptions options(
251 certificate_provider);
252 options.watch_root_certs();
253 options.watch_identity_key_cert_pairs();
254 options.set_cert_request_type(
255 GRPC_SSL_REQUEST_CLIENT_CERTIFICATE_AND_VERIFY);
256 return grpc::experimental::TlsServerCredentials(options);
257 }
258 }
259 return ServerThread::Credentials();
260 }
261
RegisterAllServices(ServerBuilder * builder)262 void XdsEnd2endTest::BackendServerThread::RegisterAllServices(
263 ServerBuilder* builder) {
264 server_metric_recorder_ = ServerMetricRecorder::Create();
265 ServerBuilder::experimental_type(builder).EnableCallMetricRecording(
266 server_metric_recorder_.get());
267 builder->RegisterService(&backend_service_);
268 builder->RegisterService(&backend_service1_);
269 builder->RegisterService(&backend_service2_);
270 }
271
StartAllServices()272 void XdsEnd2endTest::BackendServerThread::StartAllServices() {
273 backend_service_.Start();
274 backend_service1_.Start();
275 backend_service2_.Start();
276 }
277
ShutdownAllServices()278 void XdsEnd2endTest::BackendServerThread::ShutdownAllServices() {
279 backend_service_.Shutdown();
280 backend_service1_.Shutdown();
281 backend_service2_.Shutdown();
282 }
283
284 //
285 // XdsEnd2endTest::BalancerServerThread
286 //
287
BalancerServerThread(XdsEnd2endTest * test_obj,absl::string_view debug_label)288 XdsEnd2endTest::BalancerServerThread::BalancerServerThread(
289 XdsEnd2endTest* test_obj, absl::string_view debug_label)
290 : ServerThread(test_obj, /*use_xds_enabled_server=*/false),
291 ads_service_(new AdsServiceImpl(
292 // First request must have node set with the right client
293 // features.
294 [&](const DiscoveryRequest& request) {
295 EXPECT_TRUE(request.has_node());
296 EXPECT_THAT(request.node().client_features(),
297 ::testing::UnorderedElementsAre(
298 "envoy.lb.does_not_support_overprovisioning",
299 "xds.config.resource-in-sotw"));
300 },
301 // NACKs must use the right status code.
__anon266f36600302(absl::StatusCode code) 302 [&](absl::StatusCode code) {
303 EXPECT_EQ(code, absl::StatusCode::kInvalidArgument);
304 },
305 debug_label)),
306 lrs_service_(new LrsServiceImpl(
307 (GetParam().enable_load_reporting() ? 20 : 0), {kDefaultClusterName},
308 // Fail if load reporting is used when not enabled.
__anon266f36600402() 309 [&]() { EXPECT_TRUE(GetParam().enable_load_reporting()); },
310 // Make sure we send the client feature saying that we support
311 // send_all_clusters.
__anon266f36600502(const LoadStatsRequest& request) 312 [&](const LoadStatsRequest& request) {
313 EXPECT_THAT(
314 request.node().client_features(),
315 ::testing::Contains("envoy.lrs.supports_send_all_clusters"));
316 },
317 debug_label)) {}
318
RegisterAllServices(ServerBuilder * builder)319 void XdsEnd2endTest::BalancerServerThread::RegisterAllServices(
320 ServerBuilder* builder) {
321 builder->RegisterService(ads_service_.get());
322 builder->RegisterService(lrs_service_.get());
323 }
324
StartAllServices()325 void XdsEnd2endTest::BalancerServerThread::StartAllServices() {
326 ads_service_->Start();
327 lrs_service_->Start();
328 }
329
ShutdownAllServices()330 void XdsEnd2endTest::BalancerServerThread::ShutdownAllServices() {
331 ads_service_->Shutdown();
332 lrs_service_->Shutdown();
333 }
334
335 //
336 // XdsEnd2endTest::RpcOptions
337 //
338
SetupRpc(ClientContext * context,EchoRequest * request) const339 void XdsEnd2endTest::RpcOptions::SetupRpc(ClientContext* context,
340 EchoRequest* request) const {
341 for (const auto& item : metadata) {
342 context->AddMetadata(item.first, item.second);
343 }
344 if (timeout_ms != 0) {
345 context->set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms));
346 }
347 if (wait_for_ready) context->set_wait_for_ready(true);
348 request->set_message(kRequestMessage);
349 if (server_fail) {
350 request->mutable_param()->mutable_expected_error()->set_code(
351 GRPC_STATUS_FAILED_PRECONDITION);
352 }
353 if (server_sleep_us != 0) {
354 request->mutable_param()->set_server_sleep_us(server_sleep_us);
355 }
356 if (client_cancel_after_us != 0) {
357 request->mutable_param()->set_client_cancel_after_us(
358 client_cancel_after_us);
359 }
360 if (skip_cancelled_check) {
361 request->mutable_param()->set_skip_cancelled_check(true);
362 }
363 if (backend_metrics.has_value()) {
364 *request->mutable_param()->mutable_backend_metrics() = *backend_metrics;
365 }
366 if (server_notify_client_when_started) {
367 request->mutable_param()->set_server_notify_client_when_started(true);
368 }
369 }
370
371 //
372 // XdsEnd2endTest
373 //
374
375 const char XdsEnd2endTest::kCaCertPath[] = "src/core/tsi/test_creds/ca.pem";
376 const char XdsEnd2endTest::kServerCertPath[] =
377 "src/core/tsi/test_creds/server1.pem";
378 const char XdsEnd2endTest::kServerKeyPath[] =
379 "src/core/tsi/test_creds/server1.key";
380
381 const char XdsEnd2endTest::kRequestMessage[] = "Live long and prosper.";
382
XdsEnd2endTest()383 XdsEnd2endTest::XdsEnd2endTest()
384 : balancer_(CreateAndStartBalancer("Default Balancer")) {
385 // Initialize default client-side xDS resources.
386 default_listener_ = XdsResourceUtils::DefaultListener();
387 default_route_config_ = XdsResourceUtils::DefaultRouteConfig();
388 default_cluster_ = XdsResourceUtils::DefaultCluster();
389 if (GetParam().enable_load_reporting()) {
390 default_cluster_.mutable_lrs_server()->mutable_self();
391 }
392 // Initialize client-side resources on balancer.
393 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
394 default_route_config_);
395 balancer_->ads_service()->SetCdsResource(default_cluster_);
396 // Initialize default server-side xDS resources.
397 default_server_route_config_ = XdsResourceUtils::DefaultServerRouteConfig();
398 default_server_listener_ = XdsResourceUtils::DefaultServerListener();
399 }
400
TearDown()401 void XdsEnd2endTest::TearDown() {
402 ShutdownAllBackends();
403 balancer_->Shutdown();
404 // Clear global xDS channel args, since they will go out of scope
405 // when this test object is destroyed.
406 grpc_core::internal::SetXdsChannelArgsForTest(nullptr);
407 grpc_core::UnsetEnv("GRPC_XDS_BOOTSTRAP");
408 grpc_core::UnsetEnv("GRPC_XDS_BOOTSTRAP_CONFIG");
409 if (bootstrap_file_ != nullptr) {
410 remove(bootstrap_file_);
411 gpr_free(bootstrap_file_);
412 }
413 }
414
415 std::unique_ptr<XdsEnd2endTest::BalancerServerThread>
CreateAndStartBalancer(absl::string_view debug_label)416 XdsEnd2endTest::CreateAndStartBalancer(absl::string_view debug_label) {
417 std::unique_ptr<BalancerServerThread> balancer =
418 std::make_unique<BalancerServerThread>(this, debug_label);
419 balancer->Start();
420 return balancer;
421 }
422
423 std::vector<XdsEnd2endTest::EdsResourceArgs::Endpoint>
CreateEndpointsForBackends(size_t start_index,size_t stop_index,HealthStatus health_status,int lb_weight)424 XdsEnd2endTest::CreateEndpointsForBackends(size_t start_index,
425 size_t stop_index,
426 HealthStatus health_status,
427 int lb_weight) {
428 if (stop_index == 0) stop_index = backends_.size();
429 std::vector<EdsResourceArgs::Endpoint> endpoints;
430 for (size_t i = start_index; i < stop_index; ++i) {
431 endpoints.emplace_back(CreateEndpoint(i, health_status, lb_weight));
432 }
433 return endpoints;
434 }
435
ResetBackendCounters(size_t start_index,size_t stop_index)436 void XdsEnd2endTest::ResetBackendCounters(size_t start_index,
437 size_t stop_index) {
438 if (stop_index == 0) stop_index = backends_.size();
439 for (size_t i = start_index; i < stop_index; ++i) {
440 backends_[i]->backend_service()->ResetCounters();
441 backends_[i]->backend_service1()->ResetCounters();
442 backends_[i]->backend_service2()->ResetCounters();
443 }
444 }
445
SeenBackend(size_t backend_idx,const RpcService rpc_service)446 bool XdsEnd2endTest::SeenBackend(size_t backend_idx,
447 const RpcService rpc_service) {
448 switch (rpc_service) {
449 case SERVICE_ECHO:
450 if (backends_[backend_idx]->backend_service()->request_count() == 0) {
451 return false;
452 }
453 break;
454 case SERVICE_ECHO1:
455 if (backends_[backend_idx]->backend_service1()->request_count() == 0) {
456 return false;
457 }
458 break;
459 case SERVICE_ECHO2:
460 if (backends_[backend_idx]->backend_service2()->request_count() == 0) {
461 return false;
462 }
463 break;
464 }
465 return true;
466 }
467
SeenAllBackends(size_t start_index,size_t stop_index,const RpcService rpc_service)468 bool XdsEnd2endTest::SeenAllBackends(size_t start_index, size_t stop_index,
469 const RpcService rpc_service) {
470 if (stop_index == 0) stop_index = backends_.size();
471 for (size_t i = start_index; i < stop_index; ++i) {
472 if (!SeenBackend(i, rpc_service)) {
473 return false;
474 }
475 }
476 return true;
477 }
478
GetBackendPorts(size_t start_index,size_t stop_index) const479 std::vector<int> XdsEnd2endTest::GetBackendPorts(size_t start_index,
480 size_t stop_index) const {
481 if (stop_index == 0) stop_index = backends_.size();
482 std::vector<int> backend_ports;
483 for (size_t i = start_index; i < stop_index; ++i) {
484 backend_ports.push_back(backends_[i]->port());
485 }
486 return backend_ports;
487 }
488
InitClient(absl::optional<XdsBootstrapBuilder> builder,std::string lb_expected_authority,int xds_resource_does_not_exist_timeout_ms)489 void XdsEnd2endTest::InitClient(absl::optional<XdsBootstrapBuilder> builder,
490 std::string lb_expected_authority,
491 int xds_resource_does_not_exist_timeout_ms) {
492 if (!builder.has_value()) {
493 builder = MakeBootstrapBuilder();
494 }
495 if (xds_resource_does_not_exist_timeout_ms > 0) {
496 xds_channel_args_to_add_.emplace_back(grpc_channel_arg_integer_create(
497 const_cast<char*>(GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS),
498 xds_resource_does_not_exist_timeout_ms));
499 }
500 if (!lb_expected_authority.empty()) {
501 constexpr char authority_const[] = "localhost:%d";
502 if (lb_expected_authority == authority_const) {
503 lb_expected_authority =
504 absl::StrFormat(authority_const, balancer_->port());
505 }
506 xds_channel_args_to_add_.emplace_back(grpc_channel_arg_string_create(
507 const_cast<char*>(GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS),
508 const_cast<char*>(lb_expected_authority.c_str())));
509 }
510 xds_channel_args_.num_args = xds_channel_args_to_add_.size();
511 xds_channel_args_.args = xds_channel_args_to_add_.data();
512 bootstrap_ = builder->Build();
513 if (GetParam().bootstrap_source() == XdsTestType::kBootstrapFromEnvVar) {
514 grpc_core::SetEnv("GRPC_XDS_BOOTSTRAP_CONFIG", bootstrap_.c_str());
515 } else if (GetParam().bootstrap_source() == XdsTestType::kBootstrapFromFile) {
516 FILE* out = gpr_tmpfile("xds_bootstrap_v3", &bootstrap_file_);
517 fputs(bootstrap_.c_str(), out);
518 fclose(out);
519 grpc_core::SetEnv("GRPC_XDS_BOOTSTRAP", bootstrap_file_);
520 }
521 if (GetParam().bootstrap_source() != XdsTestType::kBootstrapFromChannelArg) {
522 // If getting bootstrap from channel arg, we'll pass these args in
523 // via the parent channel args in CreateChannel() instead.
524 grpc_core::internal::SetXdsChannelArgsForTest(&xds_channel_args_);
525 // Make sure each test creates a new XdsClient instance rather than
526 // reusing the one from the previous test. This avoids spurious failures
527 // caused when a load reporting test runs after a non-load reporting test
528 // and the XdsClient is still talking to the old LRS server, which fails
529 // because it's not expecting the client to connect. It also
530 // ensures that each test can independently set the global channel
531 // args for the xDS channel.
532 grpc_core::internal::UnsetGlobalXdsClientsForTest();
533 }
534 // Create channel and stub.
535 ResetStub();
536 }
537
ResetStub(int failover_timeout_ms,ChannelArguments * args)538 void XdsEnd2endTest::ResetStub(int failover_timeout_ms,
539 ChannelArguments* args) {
540 channel_ = CreateChannel(failover_timeout_ms, kServerName, "", args);
541 stub_ = grpc::testing::EchoTestService::NewStub(channel_);
542 stub1_ = grpc::testing::EchoTest1Service::NewStub(channel_);
543 stub2_ = grpc::testing::EchoTest2Service::NewStub(channel_);
544 }
545
CreateChannel(int failover_timeout_ms,const char * server_name,const char * xds_authority,ChannelArguments * args)546 std::shared_ptr<Channel> XdsEnd2endTest::CreateChannel(
547 int failover_timeout_ms, const char* server_name, const char* xds_authority,
548 ChannelArguments* args) {
549 ChannelArguments local_args;
550 if (args == nullptr) args = &local_args;
551 // TODO(roth): Remove this once we enable retries by default internally.
552 args->SetInt(GRPC_ARG_ENABLE_RETRIES, 1);
553 if (failover_timeout_ms > 0) {
554 args->SetInt(GRPC_ARG_PRIORITY_FAILOVER_TIMEOUT_MS,
555 failover_timeout_ms * grpc_test_slowdown_factor());
556 }
557 if (GetParam().bootstrap_source() == XdsTestType::kBootstrapFromChannelArg) {
558 // We're getting the bootstrap from a channel arg, so we do the
559 // same thing for the response generator to use for the xDS
560 // channel and the xDS resource-does-not-exist timeout value.
561 args->SetString(GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_BOOTSTRAP_CONFIG,
562 bootstrap_);
563 args->SetPointerWithVtable(
564 GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_CLIENT_CHANNEL_ARGS,
565 &xds_channel_args_, &kChannelArgsArgVtable);
566 }
567 std::vector<absl::string_view> parts = {"xds:"};
568 if (xds_authority != nullptr && xds_authority[0] != '\0') {
569 parts.emplace_back("//");
570 parts.emplace_back(xds_authority);
571 parts.emplace_back("/");
572 }
573 parts.emplace_back(server_name);
574 std::string uri = absl::StrJoin(parts, "");
575 std::shared_ptr<ChannelCredentials> channel_creds =
576 GetParam().use_xds_credentials()
577 ? XdsCredentials(CreateTlsFallbackCredentials())
578 : std::make_shared<FakeTransportSecurityChannelCredentials>();
579 return grpc::CreateCustomChannel(uri, channel_creds, *args);
580 }
581
SendRpc(const RpcOptions & rpc_options,EchoResponse * response,std::multimap<std::string,std::string> * server_initial_metadata)582 Status XdsEnd2endTest::SendRpc(
583 const RpcOptions& rpc_options, EchoResponse* response,
584 std::multimap<std::string, std::string>* server_initial_metadata) {
585 EchoResponse local_response;
586 if (response == nullptr) response = &local_response;
587 ClientContext context;
588 EchoRequest request;
589 if (rpc_options.server_expected_error != StatusCode::OK) {
590 auto* error = request.mutable_param()->mutable_expected_error();
591 error->set_code(rpc_options.server_expected_error);
592 }
593 rpc_options.SetupRpc(&context, &request);
594 Status status;
595 switch (rpc_options.service) {
596 case SERVICE_ECHO:
597 status =
598 SendRpcMethod(stub_.get(), rpc_options, &context, request, response);
599 break;
600 case SERVICE_ECHO1:
601 status =
602 SendRpcMethod(stub1_.get(), rpc_options, &context, request, response);
603 break;
604 case SERVICE_ECHO2:
605 status =
606 SendRpcMethod(stub2_.get(), rpc_options, &context, request, response);
607 break;
608 }
609 if (server_initial_metadata != nullptr) {
610 for (const auto& it : context.GetServerInitialMetadata()) {
611 std::string header(it.first.data(), it.first.size());
612 // Guard against implementation-specific header case - RFC 2616
613 absl::AsciiStrToLower(&header);
614 server_initial_metadata->emplace(
615 header, std::string(it.second.data(), it.second.size()));
616 }
617 }
618 return status;
619 }
620
SendRpcsUntil(const grpc_core::DebugLocation & debug_location,std::function<bool (const RpcResult &)> continue_predicate,int timeout_ms,const RpcOptions & rpc_options)621 void XdsEnd2endTest::SendRpcsUntil(
622 const grpc_core::DebugLocation& debug_location,
623 std::function<bool(const RpcResult&)> continue_predicate, int timeout_ms,
624 const RpcOptions& rpc_options) {
625 absl::Time deadline = absl::InfiniteFuture();
626 if (timeout_ms != 0) {
627 deadline = absl::Now() +
628 (absl::Milliseconds(timeout_ms) * grpc_test_slowdown_factor());
629 }
630 while (true) {
631 RpcResult result;
632 result.status = SendRpc(rpc_options, &result.response);
633 if (!continue_predicate(result)) return;
634 EXPECT_LE(absl::Now(), deadline)
635 << debug_location.file() << ":" << debug_location.line();
636 if (absl::Now() >= deadline) break;
637 }
638 }
639
CheckRpcSendOk(const grpc_core::DebugLocation & debug_location,const size_t times,const RpcOptions & rpc_options)640 void XdsEnd2endTest::CheckRpcSendOk(
641 const grpc_core::DebugLocation& debug_location, const size_t times,
642 const RpcOptions& rpc_options) {
643 SendRpcsUntil(
644 debug_location,
645 [debug_location, times, n = size_t{0}](const RpcResult& result) mutable {
646 EXPECT_TRUE(result.status.ok())
647 << "code=" << result.status.error_code()
648 << " message=" << result.status.error_message() << " at "
649 << debug_location.file() << ":" << debug_location.line();
650 EXPECT_EQ(result.response.message(), kRequestMessage);
651 return ++n < times;
652 },
653 /*timeout_ms=*/0, rpc_options);
654 }
655
CheckRpcSendFailure(const grpc_core::DebugLocation & debug_location,StatusCode expected_status,absl::string_view expected_message_regex,const RpcOptions & rpc_options)656 void XdsEnd2endTest::CheckRpcSendFailure(
657 const grpc_core::DebugLocation& debug_location, StatusCode expected_status,
658 absl::string_view expected_message_regex, const RpcOptions& rpc_options) {
659 const Status status = SendRpc(rpc_options);
660 EXPECT_FALSE(status.ok())
661 << debug_location.file() << ":" << debug_location.line();
662 EXPECT_EQ(expected_status, status.error_code())
663 << debug_location.file() << ":" << debug_location.line();
664 EXPECT_THAT(status.error_message(),
665 ::testing::MatchesRegex(expected_message_regex))
666 << debug_location.file() << ":" << debug_location.line();
667 }
668
SendRpcsAndCountFailuresWithMessage(const grpc_core::DebugLocation & debug_location,size_t num_rpcs,StatusCode expected_status,absl::string_view expected_message_prefix,const RpcOptions & rpc_options)669 size_t XdsEnd2endTest::SendRpcsAndCountFailuresWithMessage(
670 const grpc_core::DebugLocation& debug_location, size_t num_rpcs,
671 StatusCode expected_status, absl::string_view expected_message_prefix,
672 const RpcOptions& rpc_options) {
673 size_t num_failed = 0;
674 SendRpcsUntil(
675 debug_location,
676 [&, n = size_t{0}](const RpcResult& result) mutable {
677 if (!result.status.ok()) {
678 EXPECT_EQ(result.status.error_code(), expected_status)
679 << debug_location.file() << ":" << debug_location.line();
680 EXPECT_THAT(result.status.error_message(),
681 ::testing::StartsWith(expected_message_prefix))
682 << debug_location.file() << ":" << debug_location.line();
683 ++num_failed;
684 }
685 return ++n < num_rpcs;
686 },
687 /*timeout_ms=*/0, rpc_options);
688 return num_failed;
689 }
690
StartRpc(grpc::testing::EchoTestService::Stub * stub,const RpcOptions & rpc_options)691 void XdsEnd2endTest::LongRunningRpc::StartRpc(
692 grpc::testing::EchoTestService::Stub* stub, const RpcOptions& rpc_options) {
693 sender_thread_ = std::thread([this, stub, rpc_options]() {
694 EchoRequest request;
695 EchoResponse response;
696 rpc_options.SetupRpc(&context_, &request);
697 status_ = stub->Echo(&context_, request, &response);
698 });
699 }
700
CancelRpc()701 void XdsEnd2endTest::LongRunningRpc::CancelRpc() {
702 context_.TryCancel();
703 if (sender_thread_.joinable()) sender_thread_.join();
704 }
705
GetStatus()706 Status XdsEnd2endTest::LongRunningRpc::GetStatus() {
707 if (sender_thread_.joinable()) sender_thread_.join();
708 return status_;
709 }
710
SendConcurrentRpcs(const grpc_core::DebugLocation & debug_location,grpc::testing::EchoTestService::Stub * stub,size_t num_rpcs,const RpcOptions & rpc_options)711 std::vector<XdsEnd2endTest::ConcurrentRpc> XdsEnd2endTest::SendConcurrentRpcs(
712 const grpc_core::DebugLocation& debug_location,
713 grpc::testing::EchoTestService::Stub* stub, size_t num_rpcs,
714 const RpcOptions& rpc_options) {
715 // Variables for RPCs.
716 std::vector<ConcurrentRpc> rpcs(num_rpcs);
717 EchoRequest request;
718 // Variables for synchronization
719 grpc_core::Mutex mu;
720 grpc_core::CondVar cv;
721 size_t completed = 0;
722 // Set-off callback RPCs
723 for (size_t i = 0; i < num_rpcs; i++) {
724 ConcurrentRpc* rpc = &rpcs[i];
725 rpc_options.SetupRpc(&rpc->context, &request);
726 grpc_core::Timestamp t0 = NowFromCycleCounter();
727 stub->async()->Echo(&rpc->context, &request, &rpc->response,
728 [rpc, &mu, &completed, &cv, num_rpcs, t0](Status s) {
729 rpc->status = s;
730 rpc->elapsed_time = NowFromCycleCounter() - t0;
731 bool done;
732 {
733 grpc_core::MutexLock lock(&mu);
734 done = (++completed) == num_rpcs;
735 }
736 if (done) cv.Signal();
737 });
738 }
739 {
740 grpc_core::MutexLock lock(&mu);
741 cv.Wait(&mu);
742 }
743 EXPECT_EQ(completed, num_rpcs)
744 << " at " << debug_location.file() << ":" << debug_location.line();
745 return rpcs;
746 }
747
WaitForAllBackends(const grpc_core::DebugLocation & debug_location,size_t start_index,size_t stop_index,std::function<void (const RpcResult &)> check_status,const WaitForBackendOptions & wait_options,const RpcOptions & rpc_options)748 size_t XdsEnd2endTest::WaitForAllBackends(
749 const grpc_core::DebugLocation& debug_location, size_t start_index,
750 size_t stop_index, std::function<void(const RpcResult&)> check_status,
751 const WaitForBackendOptions& wait_options, const RpcOptions& rpc_options) {
752 if (check_status == nullptr) {
753 check_status = [&](const RpcResult& result) {
754 EXPECT_TRUE(result.status.ok())
755 << "code=" << result.status.error_code()
756 << " message=" << result.status.error_message() << " at "
757 << debug_location.file() << ":" << debug_location.line();
758 };
759 }
760 gpr_log(GPR_INFO,
761 "========= WAITING FOR BACKENDS [%" PRIuPTR ", %" PRIuPTR
762 ") ==========",
763 start_index, stop_index);
764 size_t num_rpcs = 0;
765 SendRpcsUntil(
766 debug_location,
767 [&](const RpcResult& result) {
768 ++num_rpcs;
769 check_status(result);
770 return !SeenAllBackends(start_index, stop_index, rpc_options.service);
771 },
772 wait_options.timeout_ms, rpc_options);
773 if (wait_options.reset_counters) ResetBackendCounters();
774 gpr_log(GPR_INFO, "Backends up; sent %" PRIuPTR " warm up requests",
775 num_rpcs);
776 return num_rpcs;
777 }
778
WaitForNack(const grpc_core::DebugLocation & debug_location,std::function<absl::optional<AdsServiceImpl::ResponseState> ()> get_state,const RpcOptions & rpc_options,StatusCode expected_status)779 absl::optional<AdsServiceImpl::ResponseState> XdsEnd2endTest::WaitForNack(
780 const grpc_core::DebugLocation& debug_location,
781 std::function<absl::optional<AdsServiceImpl::ResponseState>()> get_state,
782 const RpcOptions& rpc_options, StatusCode expected_status) {
783 absl::optional<AdsServiceImpl::ResponseState> response_state;
784 auto deadline =
785 absl::Now() + (absl::Seconds(30) * grpc_test_slowdown_factor());
786 auto continue_predicate = [&]() {
787 if (absl::Now() >= deadline) {
788 return false;
789 }
790 response_state = get_state();
791 return !response_state.has_value() ||
792 response_state->state != AdsServiceImpl::ResponseState::NACKED;
793 };
794 do {
795 const Status status = SendRpc(rpc_options);
796 EXPECT_EQ(expected_status, status.error_code())
797 << "code=" << status.error_code()
798 << " message=" << status.error_message() << " at "
799 << debug_location.file() << ":" << debug_location.line();
800 } while (continue_predicate());
801 return response_state;
802 }
803
SetProtoDuration(grpc_core::Duration duration,google::protobuf::Duration * duration_proto)804 void XdsEnd2endTest::SetProtoDuration(
805 grpc_core::Duration duration, google::protobuf::Duration* duration_proto) {
806 duration *= grpc_test_slowdown_factor();
807 gpr_timespec ts = duration.as_timespec();
808 duration_proto->set_seconds(ts.tv_sec);
809 duration_proto->set_nanos(ts.tv_nsec);
810 }
811
MakeConnectionFailureRegex(absl::string_view prefix)812 std::string XdsEnd2endTest::MakeConnectionFailureRegex(
813 absl::string_view prefix) {
814 return absl::StrCat(
815 prefix,
816 "(UNKNOWN|UNAVAILABLE): (ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: "
817 "(Failed to connect to remote host: )?"
818 "(Connection refused|Connection reset by peer|"
819 "recvmsg:Connection reset by peer|"
820 "getsockopt\\(SO\\_ERROR\\): Connection reset by peer|"
821 "Socket closed|FD shutdown)");
822 }
823
ReadTlsIdentityPair(const char * key_path,const char * cert_path)824 grpc_core::PemKeyCertPairList XdsEnd2endTest::ReadTlsIdentityPair(
825 const char* key_path, const char* cert_path) {
826 return grpc_core::PemKeyCertPairList{grpc_core::PemKeyCertPair(
827 grpc_core::testing::GetFileContents(key_path),
828 grpc_core::testing::GetFileContents(cert_path))};
829 }
830
831 std::shared_ptr<ChannelCredentials>
CreateTlsFallbackCredentials()832 XdsEnd2endTest::CreateTlsFallbackCredentials() {
833 IdentityKeyCertPair key_cert_pair;
834 key_cert_pair.private_key =
835 grpc_core::testing::GetFileContents(kServerKeyPath);
836 key_cert_pair.certificate_chain =
837 grpc_core::testing::GetFileContents(kServerCertPath);
838 std::vector<IdentityKeyCertPair> identity_key_cert_pairs;
839 identity_key_cert_pairs.emplace_back(key_cert_pair);
840 auto certificate_provider = std::make_shared<StaticDataCertificateProvider>(
841 grpc_core::testing::GetFileContents(kCaCertPath),
842 identity_key_cert_pairs);
843 grpc::experimental::TlsChannelCredentialsOptions options;
844 options.set_certificate_provider(std::move(certificate_provider));
845 options.watch_root_certs();
846 options.watch_identity_key_cert_pairs();
847 auto verifier =
848 ExternalCertificateVerifier::Create<SyncCertificateVerifier>(true);
849 options.set_certificate_verifier(std::move(verifier));
850 options.set_verify_server_certs(true);
851 options.set_check_call_host(false);
852 auto channel_creds = grpc::experimental::TlsCredentials(options);
853 GPR_ASSERT(channel_creds.get() != nullptr);
854 return channel_creds;
855 }
856
857 } // namespace testing
858 } // namespace grpc
859