1 // Copyright 2017 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 // 15 16 #ifndef GRPC_TEST_CPP_END2END_XDS_XDS_END2END_TEST_LIB_H 17 #define GRPC_TEST_CPP_END2END_XDS_XDS_END2END_TEST_LIB_H 18 19 #include <memory> 20 #include <set> 21 #include <string> 22 #include <thread> 23 #include <vector> 24 25 #include <gmock/gmock.h> 26 #include <gtest/gtest.h> 27 28 #include "absl/status/statusor.h" 29 #include "absl/strings/str_cat.h" 30 #include "absl/strings/string_view.h" 31 #include "absl/types/optional.h" 32 33 #include <grpc/grpc.h> 34 #include <grpc/grpc_security.h> 35 #include <grpcpp/channel.h> 36 #include <grpcpp/client_context.h> 37 #include <grpcpp/ext/call_metric_recorder.h> 38 #include <grpcpp/ext/server_metric_recorder.h> 39 #include <grpcpp/xds_server_builder.h> 40 41 #include "src/core/lib/security/credentials/fake/fake_credentials.h" 42 #include "src/core/lib/security/security_connector/ssl_utils.h" 43 #include "src/cpp/server/secure_server_credentials.h" 44 #include "src/proto/grpc/testing/echo.grpc.pb.h" 45 #include "src/proto/grpc/testing/xds/v3/http_connection_manager.grpc.pb.h" 46 #include "src/proto/grpc/testing/xds/v3/http_filter_rbac.grpc.pb.h" 47 #include "src/proto/grpc/testing/xds/v3/orca_load_report.pb.h" 48 #include "src/proto/grpc/testing/xds/v3/rbac.pb.h" 49 #include "test/core/util/port.h" 50 #include "test/cpp/end2end/counted_service.h" 51 #include "test/cpp/end2end/test_service_impl.h" 52 #include "test/cpp/end2end/xds/xds_server.h" 53 #include "test/cpp/end2end/xds/xds_utils.h" 54 55 namespace grpc { 56 namespace testing { 57 58 // The parameter type for INSTANTIATE_TEST_SUITE_P(). 59 class XdsTestType { 60 public: 61 enum HttpFilterConfigLocation { 62 // Set the HTTP filter config directly in LDS. 63 kHttpFilterConfigInListener, 64 // Enable the HTTP filter in LDS, but override the filter config in route. 65 kHttpFilterConfigInRoute, 66 }; 67 68 enum BootstrapSource { 69 kBootstrapFromChannelArg, 70 kBootstrapFromFile, 71 kBootstrapFromEnvVar, 72 }; 73 set_enable_load_reporting()74 XdsTestType& set_enable_load_reporting() { 75 enable_load_reporting_ = true; 76 return *this; 77 } 78 set_enable_rds_testing()79 XdsTestType& set_enable_rds_testing() { 80 enable_rds_testing_ = true; 81 return *this; 82 } 83 set_use_xds_credentials()84 XdsTestType& set_use_xds_credentials() { 85 use_xds_credentials_ = true; 86 return *this; 87 } 88 set_use_csds_streaming()89 XdsTestType& set_use_csds_streaming() { 90 use_csds_streaming_ = true; 91 return *this; 92 } 93 set_filter_config_setup(HttpFilterConfigLocation setup)94 XdsTestType& set_filter_config_setup(HttpFilterConfigLocation setup) { 95 filter_config_setup_ = setup; 96 return *this; 97 } 98 set_bootstrap_source(BootstrapSource bootstrap_source)99 XdsTestType& set_bootstrap_source(BootstrapSource bootstrap_source) { 100 bootstrap_source_ = bootstrap_source; 101 return *this; 102 } 103 set_rbac_action(::envoy::config::rbac::v3::RBAC_Action action)104 XdsTestType& set_rbac_action(::envoy::config::rbac::v3::RBAC_Action action) { 105 rbac_action_ = action; 106 return *this; 107 } 108 set_rbac_audit_condition(::envoy::config::rbac::v3::RBAC_AuditLoggingOptions_AuditCondition audit_condition)109 XdsTestType& set_rbac_audit_condition( 110 ::envoy::config::rbac::v3::RBAC_AuditLoggingOptions_AuditCondition 111 audit_condition) { 112 rbac_audit_condition_ = audit_condition; 113 return *this; 114 } 115 enable_load_reporting()116 bool enable_load_reporting() const { return enable_load_reporting_; } enable_rds_testing()117 bool enable_rds_testing() const { return enable_rds_testing_; } use_xds_credentials()118 bool use_xds_credentials() const { return use_xds_credentials_; } use_csds_streaming()119 bool use_csds_streaming() const { return use_csds_streaming_; } filter_config_setup()120 HttpFilterConfigLocation filter_config_setup() const { 121 return filter_config_setup_; 122 } bootstrap_source()123 BootstrapSource bootstrap_source() const { return bootstrap_source_; } rbac_action()124 ::envoy::config::rbac::v3::RBAC_Action rbac_action() const { 125 return rbac_action_; 126 } 127 ::envoy::config::rbac::v3::RBAC_AuditLoggingOptions_AuditCondition rbac_audit_condition()128 rbac_audit_condition() const { 129 return rbac_audit_condition_; 130 } 131 AsString()132 std::string AsString() const { 133 std::string retval = "V3"; 134 if (enable_load_reporting_) retval += "WithLoadReporting"; 135 if (enable_rds_testing_) retval += "Rds"; 136 if (use_xds_credentials_) retval += "XdsCreds"; 137 if (use_csds_streaming_) retval += "CsdsStreaming"; 138 if (filter_config_setup_ == kHttpFilterConfigInRoute) { 139 retval += "FilterPerRouteOverride"; 140 } 141 if (bootstrap_source_ == kBootstrapFromFile) { 142 retval += "BootstrapFromFile"; 143 } else if (bootstrap_source_ == kBootstrapFromEnvVar) { 144 retval += "BootstrapFromEnvVar"; 145 } 146 if (rbac_action_ == ::envoy::config::rbac::v3::RBAC_Action_ALLOW) { 147 retval += "RbacAllow"; 148 } else if (rbac_action_ == ::envoy::config::rbac::v3::RBAC_Action_DENY) { 149 retval += "RbacDeny"; 150 } 151 if (rbac_audit_condition_ != 152 ::envoy::config::rbac::v3:: 153 RBAC_AuditLoggingOptions_AuditCondition_NONE) { 154 retval += absl::StrCat("AuditCondition", 155 ::envoy::config::rbac::v3:: 156 RBAC_AuditLoggingOptions_AuditCondition_Name( 157 rbac_audit_condition_)); 158 } 159 return retval; 160 } 161 162 // For use as the final parameter in INSTANTIATE_TEST_SUITE_P(). Name(const::testing::TestParamInfo<XdsTestType> & info)163 static std::string Name(const ::testing::TestParamInfo<XdsTestType>& info) { 164 return info.param.AsString(); 165 } 166 167 private: 168 bool enable_load_reporting_ = false; 169 bool enable_rds_testing_ = false; 170 bool use_xds_credentials_ = false; 171 bool use_csds_streaming_ = false; 172 HttpFilterConfigLocation filter_config_setup_ = kHttpFilterConfigInListener; 173 BootstrapSource bootstrap_source_ = kBootstrapFromChannelArg; 174 ::envoy::config::rbac::v3::RBAC_Action rbac_action_ = 175 ::envoy::config::rbac::v3::RBAC_Action_LOG; 176 ::envoy::config::rbac::v3::RBAC_AuditLoggingOptions_AuditCondition 177 rbac_audit_condition_ = ::envoy::config::rbac::v3:: 178 RBAC_AuditLoggingOptions_AuditCondition_NONE; 179 }; 180 181 // A base class for xDS end-to-end tests. 182 // 183 // An xDS server is provided in balancer_. It is automatically started 184 // for every test. Additional xDS servers can be started if needed by 185 // calling CreateAndStartBalancer(). 186 // 187 // A default set of LDS, RDS, and CDS resources are created for gRPC 188 // clients, available in default_listener_, default_route_config_, and 189 // default_cluster_. These resources are automatically loaded into 190 // balancer_ but can be modified by individual tests. No EDS resource 191 // is provided by default. There are also default LDS and RDS resources 192 // for the gRPC server side in default_server_listener_ and 193 // default_server_route_config_. Methods are provided for constructing new 194 // resources that can be added to the xDS server as needed. 195 // 196 // This class provides a mechanism for running backend servers, which will 197 // be stored in backends_. No servers are created or started by default, 198 // but tests can call CreateAndStartBackends() to start however many 199 // backends they want. There are also a number of methods for accessing 200 // backends by index, which is the index into the backends_ vector. 201 // For methods that take a start_index and stop_index, this refers to 202 // the indexes in the range [start_index, stop_index). If stop_index 203 // is 0, backends_.size() is used. Backends may or may not be 204 // xDS-enabled, at the discretion of the test. 205 class XdsEnd2endTest : public ::testing::TestWithParam<XdsTestType>, 206 public XdsResourceUtils { 207 protected: 208 // TLS certificate paths. 209 static const char kCaCertPath[]; 210 static const char kServerCertPath[]; 211 static const char kServerKeyPath[]; 212 213 // Message used in EchoRequest to the backend. 214 static const char kRequestMessage[]; 215 216 // A base class for server threads. 217 class ServerThread { 218 public: 219 // A status notifier for xDS-enabled servers. 220 class XdsServingStatusNotifier 221 : public grpc::XdsServerServingStatusNotifierInterface { 222 public: 223 void OnServingStatusUpdate(std::string uri, 224 ServingStatusUpdate update) override; 225 226 void WaitOnServingStatusChange(std::string uri, 227 grpc::StatusCode expected_status); 228 229 private: 230 grpc_core::Mutex mu_; 231 grpc_core::CondVar cond_; 232 std::map<std::string, grpc::Status> status_map ABSL_GUARDED_BY(mu_); 233 }; 234 235 // If use_xds_enabled_server is true, the server will use xDS. 236 explicit ServerThread(XdsEnd2endTest* test_obj, 237 bool use_xds_enabled_server = false) test_obj_(test_obj)238 : test_obj_(test_obj), 239 port_(grpc_pick_unused_port_or_die()), 240 use_xds_enabled_server_(use_xds_enabled_server) {} 241 ~ServerThread()242 virtual ~ServerThread() { 243 // Shutdown should be called manually. Shutdown calls virtual methods and 244 // can't be called from the base class destructor. 245 GPR_ASSERT(!running_); 246 } 247 248 void Start(); 249 void Shutdown(); 250 Credentials()251 virtual std::shared_ptr<ServerCredentials> Credentials() { 252 return std::make_shared<SecureServerCredentials>( 253 grpc_fake_transport_security_server_credentials_create()); 254 } 255 target()256 std::string target() const { return absl::StrCat("localhost:", port_); } 257 port()258 int port() const { return port_; } 259 use_xds_enabled_server()260 bool use_xds_enabled_server() const { return use_xds_enabled_server_; } 261 notifier()262 XdsServingStatusNotifier* notifier() { return ¬ifier_; } 263 set_allow_put_requests(bool allow_put_requests)264 void set_allow_put_requests(bool allow_put_requests) { 265 allow_put_requests_ = allow_put_requests; 266 } 267 268 void StopListening(); 269 270 void StopListeningAndSendGoaways(); 271 272 private: 273 class XdsChannelArgsServerBuilderOption; 274 275 virtual const char* Type() = 0; 276 virtual void RegisterAllServices(ServerBuilder* builder) = 0; 277 virtual void StartAllServices() = 0; 278 virtual void ShutdownAllServices() = 0; 279 280 void Serve(grpc_core::Mutex* mu, grpc_core::CondVar* cond); 281 282 XdsEnd2endTest* test_obj_; 283 const int port_; 284 std::unique_ptr<Server> server_; 285 XdsServingStatusNotifier notifier_; 286 std::unique_ptr<std::thread> thread_; 287 bool running_ = false; 288 const bool use_xds_enabled_server_; 289 bool allow_put_requests_ = false; 290 }; 291 292 // A server thread for a backend server. 293 class BackendServerThread : public ServerThread { 294 public: 295 // A wrapper around the backend echo test service impl that counts 296 // requests and responses. 297 template <typename RpcService> 298 class BackendServiceImpl 299 : public CountedService<TestMultipleServiceImpl<RpcService>> { 300 public: BackendServiceImpl()301 BackendServiceImpl() {} 302 Echo(ServerContext * context,const EchoRequest * request,EchoResponse * response)303 Status Echo(ServerContext* context, const EchoRequest* request, 304 EchoResponse* response) override { 305 auto peer_identity = context->auth_context()->GetPeerIdentity(); 306 CountedService< 307 TestMultipleServiceImpl<RpcService>>::IncreaseRequestCount(); 308 { 309 grpc_core::MutexLock lock(&mu_); 310 clients_.insert(context->peer()); 311 last_peer_identity_.clear(); 312 for (const auto& entry : peer_identity) { 313 last_peer_identity_.emplace_back(entry.data(), entry.size()); 314 } 315 } 316 if (request->has_param() && request->param().has_backend_metrics()) { 317 const auto& request_metrics = request->param().backend_metrics(); 318 auto* recorder = context->ExperimentalGetCallMetricRecorder(); 319 for (const auto& p : request_metrics.named_metrics()) { 320 char* key = static_cast<char*>( 321 grpc_call_arena_alloc(context->c_call(), p.first.size() + 1)); 322 strncpy(key, p.first.data(), p.first.size()); 323 key[p.first.size()] = '\0'; 324 recorder->RecordNamedMetric(key, p.second); 325 } 326 } 327 const auto status = TestMultipleServiceImpl<RpcService>::Echo( 328 context, request, response); 329 CountedService< 330 TestMultipleServiceImpl<RpcService>>::IncreaseResponseCount(); 331 return status; 332 } 333 Echo1(ServerContext * context,const EchoRequest * request,EchoResponse * response)334 Status Echo1(ServerContext* context, const EchoRequest* request, 335 EchoResponse* response) override { 336 return Echo(context, request, response); 337 } 338 Echo2(ServerContext * context,const EchoRequest * request,EchoResponse * response)339 Status Echo2(ServerContext* context, const EchoRequest* request, 340 EchoResponse* response) override { 341 return Echo(context, request, response); 342 } 343 Start()344 void Start() {} Shutdown()345 void Shutdown() {} 346 clients()347 std::set<std::string> clients() { 348 grpc_core::MutexLock lock(&mu_); 349 return clients_; 350 } 351 last_peer_identity()352 const std::vector<std::string>& last_peer_identity() { 353 grpc_core::MutexLock lock(&mu_); 354 return last_peer_identity_; 355 } 356 357 private: 358 grpc_core::Mutex mu_; 359 std::set<std::string> clients_ ABSL_GUARDED_BY(&mu_); 360 std::vector<std::string> last_peer_identity_ ABSL_GUARDED_BY(&mu_); 361 }; 362 363 // If use_xds_enabled_server is true, the server will use xDS. 364 BackendServerThread(XdsEnd2endTest* test_obj, bool use_xds_enabled_server); 365 366 BackendServiceImpl<grpc::testing::EchoTestService::Service>* backend_service()367 backend_service() { 368 return &backend_service_; 369 } 370 BackendServiceImpl<grpc::testing::EchoTest1Service::Service>* backend_service1()371 backend_service1() { 372 return &backend_service1_; 373 } 374 BackendServiceImpl<grpc::testing::EchoTest2Service::Service>* backend_service2()375 backend_service2() { 376 return &backend_service2_; 377 } server_metric_recorder()378 grpc::experimental::ServerMetricRecorder* server_metric_recorder() const { 379 return server_metric_recorder_.get(); 380 } 381 382 // If XdsTestType::use_xds_credentials() and use_xds_enabled_server() 383 // are both true, returns XdsServerCredentials. 384 // Otherwise, if XdsTestType::use_xds_credentials() is true and 385 // use_xds_enabled_server() is false, returns TlsServerCredentials. 386 // Otherwise, returns fake credentials. 387 std::shared_ptr<ServerCredentials> Credentials() override; 388 389 private: Type()390 const char* Type() override { return "Backend"; } 391 void RegisterAllServices(ServerBuilder* builder) override; 392 void StartAllServices() override; 393 void ShutdownAllServices() override; 394 395 BackendServiceImpl<grpc::testing::EchoTestService::Service> 396 backend_service_; 397 BackendServiceImpl<grpc::testing::EchoTest1Service::Service> 398 backend_service1_; 399 BackendServiceImpl<grpc::testing::EchoTest2Service::Service> 400 backend_service2_; 401 std::unique_ptr<experimental::ServerMetricRecorder> server_metric_recorder_; 402 }; 403 404 // A server thread for the xDS server. 405 class BalancerServerThread : public ServerThread { 406 public: 407 explicit BalancerServerThread(XdsEnd2endTest* test_obj, 408 absl::string_view debug_label); 409 ads_service()410 AdsServiceImpl* ads_service() { return ads_service_.get(); } lrs_service()411 LrsServiceImpl* lrs_service() { return lrs_service_.get(); } 412 413 private: Type()414 const char* Type() override { return "Balancer"; } 415 void RegisterAllServices(ServerBuilder* builder) override; 416 void StartAllServices() override; 417 void ShutdownAllServices() override; 418 419 std::shared_ptr<AdsServiceImpl> ads_service_; 420 std::shared_ptr<LrsServiceImpl> lrs_service_; 421 }; 422 423 // RPC services used to talk to the backends. 424 enum RpcService { 425 SERVICE_ECHO, 426 SERVICE_ECHO1, 427 SERVICE_ECHO2, 428 }; 429 430 // RPC methods used to talk to the backends. 431 enum RpcMethod { 432 METHOD_ECHO, 433 METHOD_ECHO1, 434 METHOD_ECHO2, 435 }; 436 437 XdsEnd2endTest(); 438 SetUp()439 void SetUp() override { InitClient(); } 440 void TearDown() override; 441 442 // 443 // xDS server management 444 // 445 446 // Creates and starts a new balancer, running in its own thread. 447 // Most tests will not need to call this; instead, they can use 448 // balancer_, which is already populated with default resources. 449 std::unique_ptr<BalancerServerThread> CreateAndStartBalancer( 450 absl::string_view debug_label = ""); 451 452 // Sets the Listener and RouteConfiguration resource on the specified 453 // balancer. If RDS is in use, they will be set as separate resources; 454 // otherwise, the RouteConfig will be inlined into the Listener. 455 void SetListenerAndRouteConfiguration( 456 BalancerServerThread* balancer, Listener listener, 457 const RouteConfiguration& route_config, 458 const HcmAccessor& hcm_accessor = ClientHcmAccessor()) { 459 XdsResourceUtils::SetListenerAndRouteConfiguration( 460 balancer->ads_service(), std::move(listener), route_config, 461 GetParam().enable_rds_testing(), hcm_accessor); 462 } 463 464 // A convenient wrapper for setting the Listener and 465 // RouteConfiguration resources on the server side. SetServerListenerNameAndRouteConfiguration(BalancerServerThread * balancer,Listener listener,int port,const RouteConfiguration & route_config)466 void SetServerListenerNameAndRouteConfiguration( 467 BalancerServerThread* balancer, Listener listener, int port, 468 const RouteConfiguration& route_config) { 469 SetListenerAndRouteConfiguration( 470 balancer, PopulateServerListenerNameAndPort(listener, port), 471 route_config, ServerHcmAccessor()); 472 } 473 474 // Sets the RouteConfiguration resource on the specified balancer. 475 // If RDS is in use, it will be set directly as an independent 476 // resource; otherwise, it will be inlined into a Listener resource 477 // (either listener_to_copy, or if that is null, default_listener_). 478 void SetRouteConfiguration(BalancerServerThread* balancer, 479 const RouteConfiguration& route_config, 480 const Listener* listener_to_copy = nullptr) { 481 XdsResourceUtils::SetRouteConfiguration( 482 balancer->ads_service(), route_config, GetParam().enable_rds_testing(), 483 listener_to_copy); 484 } 485 486 // Helper method for generating an endpoint for a backend, for use in 487 // constructing an EDS resource. 488 EdsResourceArgs::Endpoint CreateEndpoint( 489 size_t backend_idx, 490 ::envoy::config::core::v3::HealthStatus health_status = 491 ::envoy::config::core::v3::HealthStatus::UNKNOWN, 492 int lb_weight = 1, std::vector<size_t> additional_backend_indxees = {}) { 493 std::vector<int> additional_ports; 494 additional_ports.reserve(additional_backend_indxees.size()); 495 for (size_t idx : additional_backend_indxees) { 496 additional_ports.push_back(backends_[idx]->port()); 497 } 498 return EdsResourceArgs::Endpoint(backends_[backend_idx]->port(), 499 health_status, lb_weight, 500 additional_ports); 501 } 502 503 // Creates a vector of endpoints for a specified range of backends, 504 // for use in constructing an EDS resource. 505 std::vector<EdsResourceArgs::Endpoint> CreateEndpointsForBackends( 506 size_t start_index = 0, size_t stop_index = 0, 507 ::envoy::config::core::v3::HealthStatus health_status = 508 ::envoy::config::core::v3::HealthStatus::UNKNOWN, 509 int lb_weight = 1); 510 511 // Returns an endpoint for an unused port, for use in constructing an 512 // EDS resource. MakeNonExistantEndpoint()513 EdsResourceArgs::Endpoint MakeNonExistantEndpoint() { 514 return EdsResourceArgs::Endpoint(grpc_pick_unused_port_or_die()); 515 } 516 517 // 518 // Backend management 519 // 520 521 // Creates num_backends backends and stores them in backends_, but 522 // does not actually start them. If xds_enabled is true, the backends 523 // are xDS-enabled. 524 void CreateBackends(size_t num_backends, bool xds_enabled = false) { 525 for (size_t i = 0; i < num_backends; ++i) { 526 backends_.emplace_back(new BackendServerThread(this, xds_enabled)); 527 } 528 } 529 530 // Starts all backends in backends_. StartAllBackends()531 void StartAllBackends() { 532 for (auto& backend : backends_) backend->Start(); 533 } 534 535 // Same as CreateBackends(), but also starts the backends. 536 void CreateAndStartBackends(size_t num_backends, bool xds_enabled = false) { 537 CreateBackends(num_backends, xds_enabled); 538 StartAllBackends(); 539 } 540 541 // Starts the backend at backends_[index]. StartBackend(size_t index)542 void StartBackend(size_t index) { backends_[index]->Start(); } 543 544 // Shuts down all backends in backends_. ShutdownAllBackends()545 void ShutdownAllBackends() { 546 for (auto& backend : backends_) backend->Shutdown(); 547 } 548 549 // Shuts down the backend at backends_[index]. ShutdownBackend(size_t index)550 void ShutdownBackend(size_t index) { backends_[index]->Shutdown(); } 551 552 // Resets the request counters for backends in the specified range. 553 void ResetBackendCounters(size_t start_index = 0, size_t stop_index = 0); 554 555 // Returns true if the specified backend has received requests for the 556 // specified service. 557 bool SeenBackend(size_t backend_idx, 558 const RpcService rpc_service = SERVICE_ECHO); 559 560 // Returns true if all backends in the specified range have received 561 // requests for the specified service. 562 bool SeenAllBackends(size_t start_index = 0, size_t stop_index = 0, 563 const RpcService rpc_service = SERVICE_ECHO); 564 565 // Returns a vector containing the port for every backend in the 566 // specified range. 567 std::vector<int> GetBackendPorts(size_t start_index = 0, 568 size_t stop_index = 0) const; 569 570 // 571 // Client management 572 // 573 574 // Initializes global state for the client, such as the bootstrap file 575 // and channel args for the XdsClient. Then calls ResetStub(). 576 // All tests must call this exactly once at the start of the test. 577 void InitClient(absl::optional<XdsBootstrapBuilder> builder = absl::nullopt, 578 std::string lb_expected_authority = "", 579 int xds_resource_does_not_exist_timeout_ms = 0); 580 MakeBootstrapBuilder()581 XdsBootstrapBuilder MakeBootstrapBuilder() { 582 return XdsBootstrapBuilder().SetServers( 583 {absl::StrCat("localhost:", balancer_->port())}); 584 } 585 586 // Sets channel_, stub_, stub1_, and stub2_. 587 void ResetStub(int failover_timeout_ms = 0, ChannelArguments* args = nullptr); 588 589 // Creates a new client channel. Requires that InitClient() has 590 // already been called. 591 std::shared_ptr<Channel> CreateChannel(int failover_timeout_ms = 0, 592 const char* server_name = kServerName, 593 const char* xds_authority = "", 594 ChannelArguments* args = nullptr); 595 596 // 597 // Sending RPCs 598 // 599 600 // Options used for sending an RPC. 601 struct RpcOptions { 602 RpcService service = SERVICE_ECHO; 603 RpcMethod method = METHOD_ECHO; 604 // Will be multiplied by grpc_test_slowdown_factor(). 605 int timeout_ms = 5000; 606 bool wait_for_ready = false; 607 std::vector<std::pair<std::string, std::string>> metadata; 608 // These options are used by the backend service impl. 609 bool server_fail = false; 610 int server_sleep_us = 0; 611 int client_cancel_after_us = 0; 612 bool skip_cancelled_check = false; 613 StatusCode server_expected_error = StatusCode::OK; 614 absl::optional<xds::data::orca::v3::OrcaLoadReport> backend_metrics; 615 bool server_notify_client_when_started = false; 616 RpcOptionsRpcOptions617 RpcOptions() {} 618 set_rpc_serviceRpcOptions619 RpcOptions& set_rpc_service(RpcService rpc_service) { 620 service = rpc_service; 621 return *this; 622 } 623 set_rpc_methodRpcOptions624 RpcOptions& set_rpc_method(RpcMethod rpc_method) { 625 method = rpc_method; 626 return *this; 627 } 628 set_timeout_msRpcOptions629 RpcOptions& set_timeout_ms(int rpc_timeout_ms) { 630 timeout_ms = rpc_timeout_ms; 631 return *this; 632 } 633 set_timeoutRpcOptions634 RpcOptions& set_timeout(grpc_core::Duration timeout) { 635 timeout_ms = timeout.millis(); 636 return *this; 637 } 638 set_wait_for_readyRpcOptions639 RpcOptions& set_wait_for_ready(bool rpc_wait_for_ready) { 640 wait_for_ready = rpc_wait_for_ready; 641 return *this; 642 } 643 set_metadataRpcOptions644 RpcOptions& set_metadata( 645 std::vector<std::pair<std::string, std::string>> rpc_metadata) { 646 metadata = std::move(rpc_metadata); 647 return *this; 648 } 649 set_server_failRpcOptions650 RpcOptions& set_server_fail(bool rpc_server_fail) { 651 server_fail = rpc_server_fail; 652 return *this; 653 } 654 set_server_sleep_usRpcOptions655 RpcOptions& set_server_sleep_us(int rpc_server_sleep_us) { 656 server_sleep_us = rpc_server_sleep_us; 657 return *this; 658 } 659 set_client_cancel_after_usRpcOptions660 RpcOptions& set_client_cancel_after_us(int rpc_client_cancel_after_us) { 661 client_cancel_after_us = rpc_client_cancel_after_us; 662 return *this; 663 } 664 set_skip_cancelled_checkRpcOptions665 RpcOptions& set_skip_cancelled_check(bool rpc_skip_cancelled_check) { 666 skip_cancelled_check = rpc_skip_cancelled_check; 667 return *this; 668 } 669 set_server_expected_errorRpcOptions670 RpcOptions& set_server_expected_error(StatusCode code) { 671 server_expected_error = code; 672 return *this; 673 } 674 set_backend_metricsRpcOptions675 RpcOptions& set_backend_metrics( 676 absl::optional<xds::data::orca::v3::OrcaLoadReport> metrics) { 677 backend_metrics = std::move(metrics); 678 return *this; 679 } 680 set_server_notify_client_when_startedRpcOptions681 RpcOptions& set_server_notify_client_when_started( 682 bool rpc_server_notify_client_when_started) { 683 server_notify_client_when_started = rpc_server_notify_client_when_started; 684 return *this; 685 } 686 687 // Populates context and request. 688 void SetupRpc(ClientContext* context, EchoRequest* request) const; 689 }; 690 691 // Sends an RPC with the specified options. 692 // If response is non-null, it will be populated with the response. 693 // Returns the status of the RPC. 694 Status SendRpc(const RpcOptions& rpc_options = RpcOptions(), 695 EchoResponse* response = nullptr, 696 std::multimap<std::string, std::string>* 697 server_initial_metadata = nullptr); 698 699 // Internal helper function for SendRpc(). 700 template <typename Stub> SendRpcMethod(Stub * stub,const RpcOptions & rpc_options,ClientContext * context,EchoRequest & request,EchoResponse * response)701 static Status SendRpcMethod(Stub* stub, const RpcOptions& rpc_options, 702 ClientContext* context, EchoRequest& request, 703 EchoResponse* response) { 704 switch (rpc_options.method) { 705 case METHOD_ECHO: 706 return stub->Echo(context, request, response); 707 case METHOD_ECHO1: 708 return stub->Echo1(context, request, response); 709 case METHOD_ECHO2: 710 return stub->Echo2(context, request, response); 711 } 712 GPR_UNREACHABLE_CODE(return grpc::Status::OK); 713 } 714 715 // Send RPCs in a loop until either continue_predicate() returns false 716 // or timeout_ms elapses. 717 struct RpcResult { 718 Status status; 719 EchoResponse response; 720 }; 721 void SendRpcsUntil(const grpc_core::DebugLocation& debug_location, 722 std::function<bool(const RpcResult&)> continue_predicate, 723 int timeout_ms = 15000, 724 const RpcOptions& rpc_options = RpcOptions()); 725 726 // Sends the specified number of RPCs and fails if the RPC fails. 727 void CheckRpcSendOk(const grpc_core::DebugLocation& debug_location, 728 const size_t times = 1, 729 const RpcOptions& rpc_options = RpcOptions()); 730 731 // Sends one RPC, which must fail with the specified status code and 732 // a message matching the specified regex. 733 void CheckRpcSendFailure(const grpc_core::DebugLocation& debug_location, 734 StatusCode expected_status, 735 absl::string_view expected_message_regex, 736 const RpcOptions& rpc_options = RpcOptions()); 737 738 // Sends num_rpcs RPCs, counting how many of them fail with a message 739 // matching the specfied expected_message_prefix. 740 // Any failure with a non-matching status or message is a test failure. 741 size_t SendRpcsAndCountFailuresWithMessage( 742 const grpc_core::DebugLocation& debug_location, size_t num_rpcs, 743 StatusCode expected_status, absl::string_view expected_message_prefix, 744 const RpcOptions& rpc_options = RpcOptions()); 745 746 // A class for running a long-running RPC in its own thread. 747 // TODO(roth): Maybe consolidate this and SendConcurrentRpcs() 748 // somehow? LongRunningRpc has a cleaner API, but SendConcurrentRpcs() 749 // uses the callback API, which is probably better. 750 class LongRunningRpc { 751 public: 752 // Starts the RPC. 753 void StartRpc(grpc::testing::EchoTestService::Stub* stub, 754 const RpcOptions& rpc_options = 755 RpcOptions().set_timeout_ms(0).set_client_cancel_after_us( 756 1 * 1000 * 1000)); 757 758 // Cancels the RPC. 759 void CancelRpc(); 760 761 // Gets the RPC's status. Blocks if the RPC is not yet complete. 762 Status GetStatus(); 763 764 private: 765 std::thread sender_thread_; 766 ClientContext context_; 767 Status status_; 768 }; 769 770 // Starts a set of concurrent RPCs. 771 struct ConcurrentRpc { 772 ClientContext context; 773 Status status; 774 grpc_core::Duration elapsed_time; 775 EchoResponse response; 776 }; 777 std::vector<ConcurrentRpc> SendConcurrentRpcs( 778 const grpc_core::DebugLocation& debug_location, 779 grpc::testing::EchoTestService::Stub* stub, size_t num_rpcs, 780 const RpcOptions& rpc_options); 781 782 // 783 // Waiting for individual backends to be seen by the client 784 // 785 786 struct WaitForBackendOptions { 787 // If true, resets the backend counters before returning. 788 bool reset_counters = true; 789 // How long to wait for the backend(s) to see requests. 790 // Will be multiplied by grpc_test_slowdown_factor(). 791 int timeout_ms = 15000; 792 WaitForBackendOptionsWaitForBackendOptions793 WaitForBackendOptions() {} 794 set_reset_countersWaitForBackendOptions795 WaitForBackendOptions& set_reset_counters(bool enable) { 796 reset_counters = enable; 797 return *this; 798 } 799 set_timeout_msWaitForBackendOptions800 WaitForBackendOptions& set_timeout_ms(int ms) { 801 timeout_ms = ms; 802 return *this; 803 } 804 }; 805 806 // Sends RPCs until all of the backends in the specified range see requests. 807 // The check_status callback will be invoked to check the status of 808 // every RPC; if null, the default is to check that the RPC succeeded. 809 // Returns the total number of RPCs sent. 810 size_t WaitForAllBackends( 811 const grpc_core::DebugLocation& debug_location, size_t start_index = 0, 812 size_t stop_index = 0, 813 std::function<void(const RpcResult&)> check_status = nullptr, 814 const WaitForBackendOptions& wait_options = WaitForBackendOptions(), 815 const RpcOptions& rpc_options = RpcOptions()); 816 817 // Sends RPCs until the backend at index backend_idx sees requests. 818 void WaitForBackend( 819 const grpc_core::DebugLocation& debug_location, size_t backend_idx, 820 std::function<void(const RpcResult&)> check_status = nullptr, 821 const WaitForBackendOptions& wait_options = WaitForBackendOptions(), 822 const RpcOptions& rpc_options = RpcOptions()) { 823 WaitForAllBackends(debug_location, backend_idx, backend_idx + 1, 824 check_status, wait_options, rpc_options); 825 } 826 827 // 828 // Waiting for xDS NACKs 829 // 830 // These methods send RPCs in a loop until they see a NACK from the 831 // xDS server, or until a timeout expires. 832 833 // Sends RPCs until get_state() returns a response. 834 absl::optional<AdsServiceImpl::ResponseState> WaitForNack( 835 const grpc_core::DebugLocation& debug_location, 836 std::function<absl::optional<AdsServiceImpl::ResponseState>()> get_state, 837 const RpcOptions& rpc_options = RpcOptions(), 838 StatusCode expected_status = StatusCode::UNAVAILABLE); 839 840 // Sends RPCs until an LDS NACK is seen. 841 absl::optional<AdsServiceImpl::ResponseState> WaitForLdsNack( 842 const grpc_core::DebugLocation& debug_location, 843 const RpcOptions& rpc_options = RpcOptions(), 844 StatusCode expected_status = StatusCode::UNAVAILABLE) { 845 return WaitForNack( 846 debug_location, 847 [&]() { return balancer_->ads_service()->lds_response_state(); }, 848 rpc_options, expected_status); 849 } 850 851 // Sends RPCs until an RDS NACK is seen. 852 absl::optional<AdsServiceImpl::ResponseState> WaitForRdsNack( 853 const grpc_core::DebugLocation& debug_location, 854 const RpcOptions& rpc_options = RpcOptions(), 855 StatusCode expected_status = StatusCode::UNAVAILABLE) { 856 return WaitForNack( 857 debug_location, 858 [&]() { return RouteConfigurationResponseState(balancer_.get()); }, 859 rpc_options, expected_status); 860 } 861 862 // Sends RPCs until a CDS NACK is seen. 863 absl::optional<AdsServiceImpl::ResponseState> WaitForCdsNack( 864 const grpc_core::DebugLocation& debug_location, 865 const RpcOptions& rpc_options = RpcOptions(), 866 StatusCode expected_status = StatusCode::UNAVAILABLE) { 867 return WaitForNack( 868 debug_location, 869 [&]() { return balancer_->ads_service()->cds_response_state(); }, 870 rpc_options, expected_status); 871 } 872 873 // Sends RPCs until an EDS NACK is seen. 874 absl::optional<AdsServiceImpl::ResponseState> WaitForEdsNack( 875 const grpc_core::DebugLocation& debug_location, 876 const RpcOptions& rpc_options = RpcOptions()) { 877 return WaitForNack( 878 debug_location, 879 [&]() { return balancer_->ads_service()->eds_response_state(); }, 880 rpc_options); 881 } 882 883 // Convenient front-end to wait for RouteConfiguration to be NACKed, 884 // regardless of whether it's sent in LDS or RDS. 885 absl::optional<AdsServiceImpl::ResponseState> WaitForRouteConfigNack( 886 const grpc_core::DebugLocation& debug_location, 887 const RpcOptions& rpc_options = RpcOptions(), 888 StatusCode expected_status = StatusCode::UNAVAILABLE) { 889 if (GetParam().enable_rds_testing()) { 890 return WaitForRdsNack(debug_location, rpc_options, expected_status); 891 } 892 return WaitForLdsNack(debug_location, rpc_options, expected_status); 893 } 894 895 // Convenient front-end for accessing xDS response state for a 896 // RouteConfiguration, regardless of whether it's sent in LDS or RDS. RouteConfigurationResponseState(BalancerServerThread * balancer)897 absl::optional<AdsServiceImpl::ResponseState> RouteConfigurationResponseState( 898 BalancerServerThread* balancer) const { 899 AdsServiceImpl* ads_service = balancer->ads_service(); 900 if (GetParam().enable_rds_testing()) { 901 return ads_service->rds_response_state(); 902 } 903 return ads_service->lds_response_state(); 904 } 905 906 // 907 // Miscellaneous helper methods 908 // 909 910 // There is slight difference between time fetched by GPR and by C++ system 911 // clock API. It's unclear if they are using the same syscall, but we do know 912 // GPR round the number at millisecond-level. This creates a 1ms difference, 913 // which could cause flake. NowFromCycleCounter()914 static grpc_core::Timestamp NowFromCycleCounter() { 915 return grpc_core::Timestamp::FromTimespecRoundDown( 916 gpr_now(GPR_CLOCK_MONOTONIC)); 917 } 918 919 // Sets duration_proto to duration times grpc_test_slowdown_factor(). 920 static void SetProtoDuration(grpc_core::Duration duration, 921 google::protobuf::Duration* duration_proto); 922 923 // Returns the number of RPCs needed to pass error_tolerance at 99.99994% 924 // chance. Rolling dices in drop/fault-injection generates a binomial 925 // distribution (if our code is not horribly wrong). Let's make "n" the number 926 // of samples, "p" the probability. If we have np>5 & n(1-p)>5, we can 927 // approximately treat the binomial distribution as a normal distribution. 928 // 929 // For normal distribution, we can easily look up how many standard deviation 930 // we need to reach 99.995%. Based on Wiki's table 931 // https://en.wikipedia.org/wiki/68%E2%80%9395%E2%80%9399.7_rule, we need 5.00 932 // sigma (standard deviation) to cover the probability area of 99.99994%. In 933 // another word, for a sample with size "n" probability "p" error-tolerance 934 // "k", we want the error always land within 5.00 sigma. The sigma of 935 // binominal distribution and be computed as sqrt(np(1-p)). Hence, we have 936 // the equation: 937 // 938 // kn <= 5.00 * sqrt(np(1-p)) 939 // TODO(yashykt): The above explanation assumes a normal distribution, but we 940 // use a uniform distribution instead. We need a better estimate of how many 941 // RPCs are needed with what error tolerance. ComputeIdealNumRpcs(double p,double error_tolerance)942 static size_t ComputeIdealNumRpcs(double p, double error_tolerance) { 943 GPR_ASSERT(p >= 0 && p <= 1); 944 size_t num_rpcs = 945 ceil(p * (1 - p) * 5.00 * 5.00 / error_tolerance / error_tolerance); 946 num_rpcs += 1000; // Add 1K as a buffer to avoid flakiness. 947 gpr_log(GPR_INFO, 948 "Sending %" PRIuPTR 949 " RPCs for percentage=%.3f error_tolerance=%.3f", 950 num_rpcs, p, error_tolerance); 951 return num_rpcs; 952 } 953 954 // Returns a regex that can be matched against an RPC failure status 955 // message for a connection failure. 956 static std::string MakeConnectionFailureRegex(absl::string_view prefix); 957 958 // Returns a private key pair, read from local files. 959 static grpc_core::PemKeyCertPairList ReadTlsIdentityPair( 960 const char* key_path, const char* cert_path); 961 962 // Returns client credentials suitable for using as fallback 963 // credentials for XdsCredentials. 964 static std::shared_ptr<ChannelCredentials> CreateTlsFallbackCredentials(); 965 966 std::unique_ptr<BalancerServerThread> balancer_; 967 968 std::shared_ptr<Channel> channel_; 969 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; 970 std::unique_ptr<grpc::testing::EchoTest1Service::Stub> stub1_; 971 std::unique_ptr<grpc::testing::EchoTest2Service::Stub> stub2_; 972 973 std::vector<std::unique_ptr<BackendServerThread>> backends_; 974 975 // Default xDS resources. 976 Listener default_listener_; 977 RouteConfiguration default_route_config_; 978 Listener default_server_listener_; 979 RouteConfiguration default_server_route_config_; 980 Cluster default_cluster_; 981 982 int xds_drain_grace_time_ms_ = 10 * 60 * 1000; // 10 mins 983 984 bool bootstrap_contents_from_env_var_; 985 std::string bootstrap_; 986 char* bootstrap_file_ = nullptr; 987 absl::InlinedVector<grpc_arg, 3> xds_channel_args_to_add_; 988 grpc_channel_args xds_channel_args_; 989 }; 990 991 } // namespace testing 992 } // namespace grpc 993 994 #endif // GRPC_TEST_CPP_END2END_XDS_XDS_END2END_TEST_LIB_H 995