xref: /aosp_15_r20/external/grpc-grpc/test/cpp/end2end/xds/xds_end2end_test_lib.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2017 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 //
15 
16 #ifndef GRPC_TEST_CPP_END2END_XDS_XDS_END2END_TEST_LIB_H
17 #define GRPC_TEST_CPP_END2END_XDS_XDS_END2END_TEST_LIB_H
18 
19 #include <memory>
20 #include <set>
21 #include <string>
22 #include <thread>
23 #include <vector>
24 
25 #include <gmock/gmock.h>
26 #include <gtest/gtest.h>
27 
28 #include "absl/status/statusor.h"
29 #include "absl/strings/str_cat.h"
30 #include "absl/strings/string_view.h"
31 #include "absl/types/optional.h"
32 
33 #include <grpc/grpc.h>
34 #include <grpc/grpc_security.h>
35 #include <grpcpp/channel.h>
36 #include <grpcpp/client_context.h>
37 #include <grpcpp/ext/call_metric_recorder.h>
38 #include <grpcpp/ext/server_metric_recorder.h>
39 #include <grpcpp/xds_server_builder.h>
40 
41 #include "src/core/lib/security/credentials/fake/fake_credentials.h"
42 #include "src/core/lib/security/security_connector/ssl_utils.h"
43 #include "src/cpp/server/secure_server_credentials.h"
44 #include "src/proto/grpc/testing/echo.grpc.pb.h"
45 #include "src/proto/grpc/testing/xds/v3/http_connection_manager.grpc.pb.h"
46 #include "src/proto/grpc/testing/xds/v3/http_filter_rbac.grpc.pb.h"
47 #include "src/proto/grpc/testing/xds/v3/orca_load_report.pb.h"
48 #include "src/proto/grpc/testing/xds/v3/rbac.pb.h"
49 #include "test/core/util/port.h"
50 #include "test/cpp/end2end/counted_service.h"
51 #include "test/cpp/end2end/test_service_impl.h"
52 #include "test/cpp/end2end/xds/xds_server.h"
53 #include "test/cpp/end2end/xds/xds_utils.h"
54 
55 namespace grpc {
56 namespace testing {
57 
58 // The parameter type for INSTANTIATE_TEST_SUITE_P().
59 class XdsTestType {
60  public:
61   enum HttpFilterConfigLocation {
62     // Set the HTTP filter config directly in LDS.
63     kHttpFilterConfigInListener,
64     // Enable the HTTP filter in LDS, but override the filter config in route.
65     kHttpFilterConfigInRoute,
66   };
67 
68   enum BootstrapSource {
69     kBootstrapFromChannelArg,
70     kBootstrapFromFile,
71     kBootstrapFromEnvVar,
72   };
73 
set_enable_load_reporting()74   XdsTestType& set_enable_load_reporting() {
75     enable_load_reporting_ = true;
76     return *this;
77   }
78 
set_enable_rds_testing()79   XdsTestType& set_enable_rds_testing() {
80     enable_rds_testing_ = true;
81     return *this;
82   }
83 
set_use_xds_credentials()84   XdsTestType& set_use_xds_credentials() {
85     use_xds_credentials_ = true;
86     return *this;
87   }
88 
set_use_csds_streaming()89   XdsTestType& set_use_csds_streaming() {
90     use_csds_streaming_ = true;
91     return *this;
92   }
93 
set_filter_config_setup(HttpFilterConfigLocation setup)94   XdsTestType& set_filter_config_setup(HttpFilterConfigLocation setup) {
95     filter_config_setup_ = setup;
96     return *this;
97   }
98 
set_bootstrap_source(BootstrapSource bootstrap_source)99   XdsTestType& set_bootstrap_source(BootstrapSource bootstrap_source) {
100     bootstrap_source_ = bootstrap_source;
101     return *this;
102   }
103 
set_rbac_action(::envoy::config::rbac::v3::RBAC_Action action)104   XdsTestType& set_rbac_action(::envoy::config::rbac::v3::RBAC_Action action) {
105     rbac_action_ = action;
106     return *this;
107   }
108 
set_rbac_audit_condition(::envoy::config::rbac::v3::RBAC_AuditLoggingOptions_AuditCondition audit_condition)109   XdsTestType& set_rbac_audit_condition(
110       ::envoy::config::rbac::v3::RBAC_AuditLoggingOptions_AuditCondition
111           audit_condition) {
112     rbac_audit_condition_ = audit_condition;
113     return *this;
114   }
115 
enable_load_reporting()116   bool enable_load_reporting() const { return enable_load_reporting_; }
enable_rds_testing()117   bool enable_rds_testing() const { return enable_rds_testing_; }
use_xds_credentials()118   bool use_xds_credentials() const { return use_xds_credentials_; }
use_csds_streaming()119   bool use_csds_streaming() const { return use_csds_streaming_; }
filter_config_setup()120   HttpFilterConfigLocation filter_config_setup() const {
121     return filter_config_setup_;
122   }
bootstrap_source()123   BootstrapSource bootstrap_source() const { return bootstrap_source_; }
rbac_action()124   ::envoy::config::rbac::v3::RBAC_Action rbac_action() const {
125     return rbac_action_;
126   }
127   ::envoy::config::rbac::v3::RBAC_AuditLoggingOptions_AuditCondition
rbac_audit_condition()128   rbac_audit_condition() const {
129     return rbac_audit_condition_;
130   }
131 
AsString()132   std::string AsString() const {
133     std::string retval = "V3";
134     if (enable_load_reporting_) retval += "WithLoadReporting";
135     if (enable_rds_testing_) retval += "Rds";
136     if (use_xds_credentials_) retval += "XdsCreds";
137     if (use_csds_streaming_) retval += "CsdsStreaming";
138     if (filter_config_setup_ == kHttpFilterConfigInRoute) {
139       retval += "FilterPerRouteOverride";
140     }
141     if (bootstrap_source_ == kBootstrapFromFile) {
142       retval += "BootstrapFromFile";
143     } else if (bootstrap_source_ == kBootstrapFromEnvVar) {
144       retval += "BootstrapFromEnvVar";
145     }
146     if (rbac_action_ == ::envoy::config::rbac::v3::RBAC_Action_ALLOW) {
147       retval += "RbacAllow";
148     } else if (rbac_action_ == ::envoy::config::rbac::v3::RBAC_Action_DENY) {
149       retval += "RbacDeny";
150     }
151     if (rbac_audit_condition_ !=
152         ::envoy::config::rbac::v3::
153             RBAC_AuditLoggingOptions_AuditCondition_NONE) {
154       retval += absl::StrCat("AuditCondition",
155                              ::envoy::config::rbac::v3::
156                                  RBAC_AuditLoggingOptions_AuditCondition_Name(
157                                      rbac_audit_condition_));
158     }
159     return retval;
160   }
161 
162   // For use as the final parameter in INSTANTIATE_TEST_SUITE_P().
Name(const::testing::TestParamInfo<XdsTestType> & info)163   static std::string Name(const ::testing::TestParamInfo<XdsTestType>& info) {
164     return info.param.AsString();
165   }
166 
167  private:
168   bool enable_load_reporting_ = false;
169   bool enable_rds_testing_ = false;
170   bool use_xds_credentials_ = false;
171   bool use_csds_streaming_ = false;
172   HttpFilterConfigLocation filter_config_setup_ = kHttpFilterConfigInListener;
173   BootstrapSource bootstrap_source_ = kBootstrapFromChannelArg;
174   ::envoy::config::rbac::v3::RBAC_Action rbac_action_ =
175       ::envoy::config::rbac::v3::RBAC_Action_LOG;
176   ::envoy::config::rbac::v3::RBAC_AuditLoggingOptions_AuditCondition
177       rbac_audit_condition_ = ::envoy::config::rbac::v3::
178           RBAC_AuditLoggingOptions_AuditCondition_NONE;
179 };
180 
181 // A base class for xDS end-to-end tests.
182 //
183 // An xDS server is provided in balancer_.  It is automatically started
184 // for every test.  Additional xDS servers can be started if needed by
185 // calling CreateAndStartBalancer().
186 //
187 // A default set of LDS, RDS, and CDS resources are created for gRPC
188 // clients, available in default_listener_, default_route_config_, and
189 // default_cluster_.  These resources are automatically loaded into
190 // balancer_ but can be modified by individual tests.  No EDS resource
191 // is provided by default.  There are also default LDS and RDS resources
192 // for the gRPC server side in default_server_listener_ and
193 // default_server_route_config_.  Methods are provided for constructing new
194 // resources that can be added to the xDS server as needed.
195 //
196 // This class provides a mechanism for running backend servers, which will
197 // be stored in backends_.  No servers are created or started by default,
198 // but tests can call CreateAndStartBackends() to start however many
199 // backends they want.  There are also a number of methods for accessing
200 // backends by index, which is the index into the backends_ vector.
201 // For methods that take a start_index and stop_index, this refers to
202 // the indexes in the range [start_index, stop_index).  If stop_index
203 // is 0, backends_.size() is used.  Backends may or may not be
204 // xDS-enabled, at the discretion of the test.
205 class XdsEnd2endTest : public ::testing::TestWithParam<XdsTestType>,
206                        public XdsResourceUtils {
207  protected:
208   // TLS certificate paths.
209   static const char kCaCertPath[];
210   static const char kServerCertPath[];
211   static const char kServerKeyPath[];
212 
213   // Message used in EchoRequest to the backend.
214   static const char kRequestMessage[];
215 
216   // A base class for server threads.
217   class ServerThread {
218    public:
219     // A status notifier for xDS-enabled servers.
220     class XdsServingStatusNotifier
221         : public grpc::XdsServerServingStatusNotifierInterface {
222      public:
223       void OnServingStatusUpdate(std::string uri,
224                                  ServingStatusUpdate update) override;
225 
226       void WaitOnServingStatusChange(std::string uri,
227                                      grpc::StatusCode expected_status);
228 
229      private:
230       grpc_core::Mutex mu_;
231       grpc_core::CondVar cond_;
232       std::map<std::string, grpc::Status> status_map ABSL_GUARDED_BY(mu_);
233     };
234 
235     // If use_xds_enabled_server is true, the server will use xDS.
236     explicit ServerThread(XdsEnd2endTest* test_obj,
237                           bool use_xds_enabled_server = false)
test_obj_(test_obj)238         : test_obj_(test_obj),
239           port_(grpc_pick_unused_port_or_die()),
240           use_xds_enabled_server_(use_xds_enabled_server) {}
241 
~ServerThread()242     virtual ~ServerThread() {
243       // Shutdown should be called manually. Shutdown calls virtual methods and
244       // can't be called from the base class destructor.
245       GPR_ASSERT(!running_);
246     }
247 
248     void Start();
249     void Shutdown();
250 
Credentials()251     virtual std::shared_ptr<ServerCredentials> Credentials() {
252       return std::make_shared<SecureServerCredentials>(
253           grpc_fake_transport_security_server_credentials_create());
254     }
255 
target()256     std::string target() const { return absl::StrCat("localhost:", port_); }
257 
port()258     int port() const { return port_; }
259 
use_xds_enabled_server()260     bool use_xds_enabled_server() const { return use_xds_enabled_server_; }
261 
notifier()262     XdsServingStatusNotifier* notifier() { return &notifier_; }
263 
set_allow_put_requests(bool allow_put_requests)264     void set_allow_put_requests(bool allow_put_requests) {
265       allow_put_requests_ = allow_put_requests;
266     }
267 
268     void StopListening();
269 
270     void StopListeningAndSendGoaways();
271 
272    private:
273     class XdsChannelArgsServerBuilderOption;
274 
275     virtual const char* Type() = 0;
276     virtual void RegisterAllServices(ServerBuilder* builder) = 0;
277     virtual void StartAllServices() = 0;
278     virtual void ShutdownAllServices() = 0;
279 
280     void Serve(grpc_core::Mutex* mu, grpc_core::CondVar* cond);
281 
282     XdsEnd2endTest* test_obj_;
283     const int port_;
284     std::unique_ptr<Server> server_;
285     XdsServingStatusNotifier notifier_;
286     std::unique_ptr<std::thread> thread_;
287     bool running_ = false;
288     const bool use_xds_enabled_server_;
289     bool allow_put_requests_ = false;
290   };
291 
292   // A server thread for a backend server.
293   class BackendServerThread : public ServerThread {
294    public:
295     // A wrapper around the backend echo test service impl that counts
296     // requests and responses.
297     template <typename RpcService>
298     class BackendServiceImpl
299         : public CountedService<TestMultipleServiceImpl<RpcService>> {
300      public:
BackendServiceImpl()301       BackendServiceImpl() {}
302 
Echo(ServerContext * context,const EchoRequest * request,EchoResponse * response)303       Status Echo(ServerContext* context, const EchoRequest* request,
304                   EchoResponse* response) override {
305         auto peer_identity = context->auth_context()->GetPeerIdentity();
306         CountedService<
307             TestMultipleServiceImpl<RpcService>>::IncreaseRequestCount();
308         {
309           grpc_core::MutexLock lock(&mu_);
310           clients_.insert(context->peer());
311           last_peer_identity_.clear();
312           for (const auto& entry : peer_identity) {
313             last_peer_identity_.emplace_back(entry.data(), entry.size());
314           }
315         }
316         if (request->has_param() && request->param().has_backend_metrics()) {
317           const auto& request_metrics = request->param().backend_metrics();
318           auto* recorder = context->ExperimentalGetCallMetricRecorder();
319           for (const auto& p : request_metrics.named_metrics()) {
320             char* key = static_cast<char*>(
321                 grpc_call_arena_alloc(context->c_call(), p.first.size() + 1));
322             strncpy(key, p.first.data(), p.first.size());
323             key[p.first.size()] = '\0';
324             recorder->RecordNamedMetric(key, p.second);
325           }
326         }
327         const auto status = TestMultipleServiceImpl<RpcService>::Echo(
328             context, request, response);
329         CountedService<
330             TestMultipleServiceImpl<RpcService>>::IncreaseResponseCount();
331         return status;
332       }
333 
Echo1(ServerContext * context,const EchoRequest * request,EchoResponse * response)334       Status Echo1(ServerContext* context, const EchoRequest* request,
335                    EchoResponse* response) override {
336         return Echo(context, request, response);
337       }
338 
Echo2(ServerContext * context,const EchoRequest * request,EchoResponse * response)339       Status Echo2(ServerContext* context, const EchoRequest* request,
340                    EchoResponse* response) override {
341         return Echo(context, request, response);
342       }
343 
Start()344       void Start() {}
Shutdown()345       void Shutdown() {}
346 
clients()347       std::set<std::string> clients() {
348         grpc_core::MutexLock lock(&mu_);
349         return clients_;
350       }
351 
last_peer_identity()352       const std::vector<std::string>& last_peer_identity() {
353         grpc_core::MutexLock lock(&mu_);
354         return last_peer_identity_;
355       }
356 
357      private:
358       grpc_core::Mutex mu_;
359       std::set<std::string> clients_ ABSL_GUARDED_BY(&mu_);
360       std::vector<std::string> last_peer_identity_ ABSL_GUARDED_BY(&mu_);
361     };
362 
363     // If use_xds_enabled_server is true, the server will use xDS.
364     BackendServerThread(XdsEnd2endTest* test_obj, bool use_xds_enabled_server);
365 
366     BackendServiceImpl<grpc::testing::EchoTestService::Service>*
backend_service()367     backend_service() {
368       return &backend_service_;
369     }
370     BackendServiceImpl<grpc::testing::EchoTest1Service::Service>*
backend_service1()371     backend_service1() {
372       return &backend_service1_;
373     }
374     BackendServiceImpl<grpc::testing::EchoTest2Service::Service>*
backend_service2()375     backend_service2() {
376       return &backend_service2_;
377     }
server_metric_recorder()378     grpc::experimental::ServerMetricRecorder* server_metric_recorder() const {
379       return server_metric_recorder_.get();
380     }
381 
382     // If XdsTestType::use_xds_credentials() and use_xds_enabled_server()
383     // are both true, returns XdsServerCredentials.
384     // Otherwise, if XdsTestType::use_xds_credentials() is true and
385     // use_xds_enabled_server() is false, returns TlsServerCredentials.
386     // Otherwise, returns fake credentials.
387     std::shared_ptr<ServerCredentials> Credentials() override;
388 
389    private:
Type()390     const char* Type() override { return "Backend"; }
391     void RegisterAllServices(ServerBuilder* builder) override;
392     void StartAllServices() override;
393     void ShutdownAllServices() override;
394 
395     BackendServiceImpl<grpc::testing::EchoTestService::Service>
396         backend_service_;
397     BackendServiceImpl<grpc::testing::EchoTest1Service::Service>
398         backend_service1_;
399     BackendServiceImpl<grpc::testing::EchoTest2Service::Service>
400         backend_service2_;
401     std::unique_ptr<experimental::ServerMetricRecorder> server_metric_recorder_;
402   };
403 
404   // A server thread for the xDS server.
405   class BalancerServerThread : public ServerThread {
406    public:
407     explicit BalancerServerThread(XdsEnd2endTest* test_obj,
408                                   absl::string_view debug_label);
409 
ads_service()410     AdsServiceImpl* ads_service() { return ads_service_.get(); }
lrs_service()411     LrsServiceImpl* lrs_service() { return lrs_service_.get(); }
412 
413    private:
Type()414     const char* Type() override { return "Balancer"; }
415     void RegisterAllServices(ServerBuilder* builder) override;
416     void StartAllServices() override;
417     void ShutdownAllServices() override;
418 
419     std::shared_ptr<AdsServiceImpl> ads_service_;
420     std::shared_ptr<LrsServiceImpl> lrs_service_;
421   };
422 
423   // RPC services used to talk to the backends.
424   enum RpcService {
425     SERVICE_ECHO,
426     SERVICE_ECHO1,
427     SERVICE_ECHO2,
428   };
429 
430   // RPC methods used to talk to the backends.
431   enum RpcMethod {
432     METHOD_ECHO,
433     METHOD_ECHO1,
434     METHOD_ECHO2,
435   };
436 
437   XdsEnd2endTest();
438 
SetUp()439   void SetUp() override { InitClient(); }
440   void TearDown() override;
441 
442   //
443   // xDS server management
444   //
445 
446   // Creates and starts a new balancer, running in its own thread.
447   // Most tests will not need to call this; instead, they can use
448   // balancer_, which is already populated with default resources.
449   std::unique_ptr<BalancerServerThread> CreateAndStartBalancer(
450       absl::string_view debug_label = "");
451 
452   // Sets the Listener and RouteConfiguration resource on the specified
453   // balancer.  If RDS is in use, they will be set as separate resources;
454   // otherwise, the RouteConfig will be inlined into the Listener.
455   void SetListenerAndRouteConfiguration(
456       BalancerServerThread* balancer, Listener listener,
457       const RouteConfiguration& route_config,
458       const HcmAccessor& hcm_accessor = ClientHcmAccessor()) {
459     XdsResourceUtils::SetListenerAndRouteConfiguration(
460         balancer->ads_service(), std::move(listener), route_config,
461         GetParam().enable_rds_testing(), hcm_accessor);
462   }
463 
464   // A convenient wrapper for setting the Listener and
465   // RouteConfiguration resources on the server side.
SetServerListenerNameAndRouteConfiguration(BalancerServerThread * balancer,Listener listener,int port,const RouteConfiguration & route_config)466   void SetServerListenerNameAndRouteConfiguration(
467       BalancerServerThread* balancer, Listener listener, int port,
468       const RouteConfiguration& route_config) {
469     SetListenerAndRouteConfiguration(
470         balancer, PopulateServerListenerNameAndPort(listener, port),
471         route_config, ServerHcmAccessor());
472   }
473 
474   // Sets the RouteConfiguration resource on the specified balancer.
475   // If RDS is in use, it will be set directly as an independent
476   // resource; otherwise, it will be inlined into a Listener resource
477   // (either listener_to_copy, or if that is null, default_listener_).
478   void SetRouteConfiguration(BalancerServerThread* balancer,
479                              const RouteConfiguration& route_config,
480                              const Listener* listener_to_copy = nullptr) {
481     XdsResourceUtils::SetRouteConfiguration(
482         balancer->ads_service(), route_config, GetParam().enable_rds_testing(),
483         listener_to_copy);
484   }
485 
486   // Helper method for generating an endpoint for a backend, for use in
487   // constructing an EDS resource.
488   EdsResourceArgs::Endpoint CreateEndpoint(
489       size_t backend_idx,
490       ::envoy::config::core::v3::HealthStatus health_status =
491           ::envoy::config::core::v3::HealthStatus::UNKNOWN,
492       int lb_weight = 1, std::vector<size_t> additional_backend_indxees = {}) {
493     std::vector<int> additional_ports;
494     additional_ports.reserve(additional_backend_indxees.size());
495     for (size_t idx : additional_backend_indxees) {
496       additional_ports.push_back(backends_[idx]->port());
497     }
498     return EdsResourceArgs::Endpoint(backends_[backend_idx]->port(),
499                                      health_status, lb_weight,
500                                      additional_ports);
501   }
502 
503   // Creates a vector of endpoints for a specified range of backends,
504   // for use in constructing an EDS resource.
505   std::vector<EdsResourceArgs::Endpoint> CreateEndpointsForBackends(
506       size_t start_index = 0, size_t stop_index = 0,
507       ::envoy::config::core::v3::HealthStatus health_status =
508           ::envoy::config::core::v3::HealthStatus::UNKNOWN,
509       int lb_weight = 1);
510 
511   // Returns an endpoint for an unused port, for use in constructing an
512   // EDS resource.
MakeNonExistantEndpoint()513   EdsResourceArgs::Endpoint MakeNonExistantEndpoint() {
514     return EdsResourceArgs::Endpoint(grpc_pick_unused_port_or_die());
515   }
516 
517   //
518   // Backend management
519   //
520 
521   // Creates num_backends backends and stores them in backends_, but
522   // does not actually start them.  If xds_enabled is true, the backends
523   // are xDS-enabled.
524   void CreateBackends(size_t num_backends, bool xds_enabled = false) {
525     for (size_t i = 0; i < num_backends; ++i) {
526       backends_.emplace_back(new BackendServerThread(this, xds_enabled));
527     }
528   }
529 
530   // Starts all backends in backends_.
StartAllBackends()531   void StartAllBackends() {
532     for (auto& backend : backends_) backend->Start();
533   }
534 
535   // Same as CreateBackends(), but also starts the backends.
536   void CreateAndStartBackends(size_t num_backends, bool xds_enabled = false) {
537     CreateBackends(num_backends, xds_enabled);
538     StartAllBackends();
539   }
540 
541   // Starts the backend at backends_[index].
StartBackend(size_t index)542   void StartBackend(size_t index) { backends_[index]->Start(); }
543 
544   // Shuts down all backends in backends_.
ShutdownAllBackends()545   void ShutdownAllBackends() {
546     for (auto& backend : backends_) backend->Shutdown();
547   }
548 
549   // Shuts down the backend at backends_[index].
ShutdownBackend(size_t index)550   void ShutdownBackend(size_t index) { backends_[index]->Shutdown(); }
551 
552   // Resets the request counters for backends in the specified range.
553   void ResetBackendCounters(size_t start_index = 0, size_t stop_index = 0);
554 
555   // Returns true if the specified backend has received requests for the
556   // specified service.
557   bool SeenBackend(size_t backend_idx,
558                    const RpcService rpc_service = SERVICE_ECHO);
559 
560   // Returns true if all backends in the specified range have received
561   // requests for the specified service.
562   bool SeenAllBackends(size_t start_index = 0, size_t stop_index = 0,
563                        const RpcService rpc_service = SERVICE_ECHO);
564 
565   // Returns a vector containing the port for every backend in the
566   // specified range.
567   std::vector<int> GetBackendPorts(size_t start_index = 0,
568                                    size_t stop_index = 0) const;
569 
570   //
571   // Client management
572   //
573 
574   // Initializes global state for the client, such as the bootstrap file
575   // and channel args for the XdsClient.  Then calls ResetStub().
576   // All tests must call this exactly once at the start of the test.
577   void InitClient(absl::optional<XdsBootstrapBuilder> builder = absl::nullopt,
578                   std::string lb_expected_authority = "",
579                   int xds_resource_does_not_exist_timeout_ms = 0);
580 
MakeBootstrapBuilder()581   XdsBootstrapBuilder MakeBootstrapBuilder() {
582     return XdsBootstrapBuilder().SetServers(
583         {absl::StrCat("localhost:", balancer_->port())});
584   }
585 
586   // Sets channel_, stub_, stub1_, and stub2_.
587   void ResetStub(int failover_timeout_ms = 0, ChannelArguments* args = nullptr);
588 
589   // Creates a new client channel.  Requires that InitClient() has
590   // already been called.
591   std::shared_ptr<Channel> CreateChannel(int failover_timeout_ms = 0,
592                                          const char* server_name = kServerName,
593                                          const char* xds_authority = "",
594                                          ChannelArguments* args = nullptr);
595 
596   //
597   // Sending RPCs
598   //
599 
600   // Options used for sending an RPC.
601   struct RpcOptions {
602     RpcService service = SERVICE_ECHO;
603     RpcMethod method = METHOD_ECHO;
604     // Will be multiplied by grpc_test_slowdown_factor().
605     int timeout_ms = 5000;
606     bool wait_for_ready = false;
607     std::vector<std::pair<std::string, std::string>> metadata;
608     // These options are used by the backend service impl.
609     bool server_fail = false;
610     int server_sleep_us = 0;
611     int client_cancel_after_us = 0;
612     bool skip_cancelled_check = false;
613     StatusCode server_expected_error = StatusCode::OK;
614     absl::optional<xds::data::orca::v3::OrcaLoadReport> backend_metrics;
615     bool server_notify_client_when_started = false;
616 
RpcOptionsRpcOptions617     RpcOptions() {}
618 
set_rpc_serviceRpcOptions619     RpcOptions& set_rpc_service(RpcService rpc_service) {
620       service = rpc_service;
621       return *this;
622     }
623 
set_rpc_methodRpcOptions624     RpcOptions& set_rpc_method(RpcMethod rpc_method) {
625       method = rpc_method;
626       return *this;
627     }
628 
set_timeout_msRpcOptions629     RpcOptions& set_timeout_ms(int rpc_timeout_ms) {
630       timeout_ms = rpc_timeout_ms;
631       return *this;
632     }
633 
set_timeoutRpcOptions634     RpcOptions& set_timeout(grpc_core::Duration timeout) {
635       timeout_ms = timeout.millis();
636       return *this;
637     }
638 
set_wait_for_readyRpcOptions639     RpcOptions& set_wait_for_ready(bool rpc_wait_for_ready) {
640       wait_for_ready = rpc_wait_for_ready;
641       return *this;
642     }
643 
set_metadataRpcOptions644     RpcOptions& set_metadata(
645         std::vector<std::pair<std::string, std::string>> rpc_metadata) {
646       metadata = std::move(rpc_metadata);
647       return *this;
648     }
649 
set_server_failRpcOptions650     RpcOptions& set_server_fail(bool rpc_server_fail) {
651       server_fail = rpc_server_fail;
652       return *this;
653     }
654 
set_server_sleep_usRpcOptions655     RpcOptions& set_server_sleep_us(int rpc_server_sleep_us) {
656       server_sleep_us = rpc_server_sleep_us;
657       return *this;
658     }
659 
set_client_cancel_after_usRpcOptions660     RpcOptions& set_client_cancel_after_us(int rpc_client_cancel_after_us) {
661       client_cancel_after_us = rpc_client_cancel_after_us;
662       return *this;
663     }
664 
set_skip_cancelled_checkRpcOptions665     RpcOptions& set_skip_cancelled_check(bool rpc_skip_cancelled_check) {
666       skip_cancelled_check = rpc_skip_cancelled_check;
667       return *this;
668     }
669 
set_server_expected_errorRpcOptions670     RpcOptions& set_server_expected_error(StatusCode code) {
671       server_expected_error = code;
672       return *this;
673     }
674 
set_backend_metricsRpcOptions675     RpcOptions& set_backend_metrics(
676         absl::optional<xds::data::orca::v3::OrcaLoadReport> metrics) {
677       backend_metrics = std::move(metrics);
678       return *this;
679     }
680 
set_server_notify_client_when_startedRpcOptions681     RpcOptions& set_server_notify_client_when_started(
682         bool rpc_server_notify_client_when_started) {
683       server_notify_client_when_started = rpc_server_notify_client_when_started;
684       return *this;
685     }
686 
687     // Populates context and request.
688     void SetupRpc(ClientContext* context, EchoRequest* request) const;
689   };
690 
691   // Sends an RPC with the specified options.
692   // If response is non-null, it will be populated with the response.
693   // Returns the status of the RPC.
694   Status SendRpc(const RpcOptions& rpc_options = RpcOptions(),
695                  EchoResponse* response = nullptr,
696                  std::multimap<std::string, std::string>*
697                      server_initial_metadata = nullptr);
698 
699   // Internal helper function for SendRpc().
700   template <typename Stub>
SendRpcMethod(Stub * stub,const RpcOptions & rpc_options,ClientContext * context,EchoRequest & request,EchoResponse * response)701   static Status SendRpcMethod(Stub* stub, const RpcOptions& rpc_options,
702                               ClientContext* context, EchoRequest& request,
703                               EchoResponse* response) {
704     switch (rpc_options.method) {
705       case METHOD_ECHO:
706         return stub->Echo(context, request, response);
707       case METHOD_ECHO1:
708         return stub->Echo1(context, request, response);
709       case METHOD_ECHO2:
710         return stub->Echo2(context, request, response);
711     }
712     GPR_UNREACHABLE_CODE(return grpc::Status::OK);
713   }
714 
715   // Send RPCs in a loop until either continue_predicate() returns false
716   // or timeout_ms elapses.
717   struct RpcResult {
718     Status status;
719     EchoResponse response;
720   };
721   void SendRpcsUntil(const grpc_core::DebugLocation& debug_location,
722                      std::function<bool(const RpcResult&)> continue_predicate,
723                      int timeout_ms = 15000,
724                      const RpcOptions& rpc_options = RpcOptions());
725 
726   // Sends the specified number of RPCs and fails if the RPC fails.
727   void CheckRpcSendOk(const grpc_core::DebugLocation& debug_location,
728                       const size_t times = 1,
729                       const RpcOptions& rpc_options = RpcOptions());
730 
731   // Sends one RPC, which must fail with the specified status code and
732   // a message matching the specified regex.
733   void CheckRpcSendFailure(const grpc_core::DebugLocation& debug_location,
734                            StatusCode expected_status,
735                            absl::string_view expected_message_regex,
736                            const RpcOptions& rpc_options = RpcOptions());
737 
738   // Sends num_rpcs RPCs, counting how many of them fail with a message
739   // matching the specfied expected_message_prefix.
740   // Any failure with a non-matching status or message is a test failure.
741   size_t SendRpcsAndCountFailuresWithMessage(
742       const grpc_core::DebugLocation& debug_location, size_t num_rpcs,
743       StatusCode expected_status, absl::string_view expected_message_prefix,
744       const RpcOptions& rpc_options = RpcOptions());
745 
746   // A class for running a long-running RPC in its own thread.
747   // TODO(roth): Maybe consolidate this and SendConcurrentRpcs()
748   // somehow?  LongRunningRpc has a cleaner API, but SendConcurrentRpcs()
749   // uses the callback API, which is probably better.
750   class LongRunningRpc {
751    public:
752     // Starts the RPC.
753     void StartRpc(grpc::testing::EchoTestService::Stub* stub,
754                   const RpcOptions& rpc_options =
755                       RpcOptions().set_timeout_ms(0).set_client_cancel_after_us(
756                           1 * 1000 * 1000));
757 
758     // Cancels the RPC.
759     void CancelRpc();
760 
761     // Gets the RPC's status.  Blocks if the RPC is not yet complete.
762     Status GetStatus();
763 
764    private:
765     std::thread sender_thread_;
766     ClientContext context_;
767     Status status_;
768   };
769 
770   // Starts a set of concurrent RPCs.
771   struct ConcurrentRpc {
772     ClientContext context;
773     Status status;
774     grpc_core::Duration elapsed_time;
775     EchoResponse response;
776   };
777   std::vector<ConcurrentRpc> SendConcurrentRpcs(
778       const grpc_core::DebugLocation& debug_location,
779       grpc::testing::EchoTestService::Stub* stub, size_t num_rpcs,
780       const RpcOptions& rpc_options);
781 
782   //
783   // Waiting for individual backends to be seen by the client
784   //
785 
786   struct WaitForBackendOptions {
787     // If true, resets the backend counters before returning.
788     bool reset_counters = true;
789     // How long to wait for the backend(s) to see requests.
790     // Will be multiplied by grpc_test_slowdown_factor().
791     int timeout_ms = 15000;
792 
WaitForBackendOptionsWaitForBackendOptions793     WaitForBackendOptions() {}
794 
set_reset_countersWaitForBackendOptions795     WaitForBackendOptions& set_reset_counters(bool enable) {
796       reset_counters = enable;
797       return *this;
798     }
799 
set_timeout_msWaitForBackendOptions800     WaitForBackendOptions& set_timeout_ms(int ms) {
801       timeout_ms = ms;
802       return *this;
803     }
804   };
805 
806   // Sends RPCs until all of the backends in the specified range see requests.
807   // The check_status callback will be invoked to check the status of
808   // every RPC; if null, the default is to check that the RPC succeeded.
809   // Returns the total number of RPCs sent.
810   size_t WaitForAllBackends(
811       const grpc_core::DebugLocation& debug_location, size_t start_index = 0,
812       size_t stop_index = 0,
813       std::function<void(const RpcResult&)> check_status = nullptr,
814       const WaitForBackendOptions& wait_options = WaitForBackendOptions(),
815       const RpcOptions& rpc_options = RpcOptions());
816 
817   // Sends RPCs until the backend at index backend_idx sees requests.
818   void WaitForBackend(
819       const grpc_core::DebugLocation& debug_location, size_t backend_idx,
820       std::function<void(const RpcResult&)> check_status = nullptr,
821       const WaitForBackendOptions& wait_options = WaitForBackendOptions(),
822       const RpcOptions& rpc_options = RpcOptions()) {
823     WaitForAllBackends(debug_location, backend_idx, backend_idx + 1,
824                        check_status, wait_options, rpc_options);
825   }
826 
827   //
828   // Waiting for xDS NACKs
829   //
830   // These methods send RPCs in a loop until they see a NACK from the
831   // xDS server, or until a timeout expires.
832 
833   // Sends RPCs until get_state() returns a response.
834   absl::optional<AdsServiceImpl::ResponseState> WaitForNack(
835       const grpc_core::DebugLocation& debug_location,
836       std::function<absl::optional<AdsServiceImpl::ResponseState>()> get_state,
837       const RpcOptions& rpc_options = RpcOptions(),
838       StatusCode expected_status = StatusCode::UNAVAILABLE);
839 
840   // Sends RPCs until an LDS NACK is seen.
841   absl::optional<AdsServiceImpl::ResponseState> WaitForLdsNack(
842       const grpc_core::DebugLocation& debug_location,
843       const RpcOptions& rpc_options = RpcOptions(),
844       StatusCode expected_status = StatusCode::UNAVAILABLE) {
845     return WaitForNack(
846         debug_location,
847         [&]() { return balancer_->ads_service()->lds_response_state(); },
848         rpc_options, expected_status);
849   }
850 
851   // Sends RPCs until an RDS NACK is seen.
852   absl::optional<AdsServiceImpl::ResponseState> WaitForRdsNack(
853       const grpc_core::DebugLocation& debug_location,
854       const RpcOptions& rpc_options = RpcOptions(),
855       StatusCode expected_status = StatusCode::UNAVAILABLE) {
856     return WaitForNack(
857         debug_location,
858         [&]() { return RouteConfigurationResponseState(balancer_.get()); },
859         rpc_options, expected_status);
860   }
861 
862   // Sends RPCs until a CDS NACK is seen.
863   absl::optional<AdsServiceImpl::ResponseState> WaitForCdsNack(
864       const grpc_core::DebugLocation& debug_location,
865       const RpcOptions& rpc_options = RpcOptions(),
866       StatusCode expected_status = StatusCode::UNAVAILABLE) {
867     return WaitForNack(
868         debug_location,
869         [&]() { return balancer_->ads_service()->cds_response_state(); },
870         rpc_options, expected_status);
871   }
872 
873   // Sends RPCs until an EDS NACK is seen.
874   absl::optional<AdsServiceImpl::ResponseState> WaitForEdsNack(
875       const grpc_core::DebugLocation& debug_location,
876       const RpcOptions& rpc_options = RpcOptions()) {
877     return WaitForNack(
878         debug_location,
879         [&]() { return balancer_->ads_service()->eds_response_state(); },
880         rpc_options);
881   }
882 
883   // Convenient front-end to wait for RouteConfiguration to be NACKed,
884   // regardless of whether it's sent in LDS or RDS.
885   absl::optional<AdsServiceImpl::ResponseState> WaitForRouteConfigNack(
886       const grpc_core::DebugLocation& debug_location,
887       const RpcOptions& rpc_options = RpcOptions(),
888       StatusCode expected_status = StatusCode::UNAVAILABLE) {
889     if (GetParam().enable_rds_testing()) {
890       return WaitForRdsNack(debug_location, rpc_options, expected_status);
891     }
892     return WaitForLdsNack(debug_location, rpc_options, expected_status);
893   }
894 
895   // Convenient front-end for accessing xDS response state for a
896   // RouteConfiguration, regardless of whether it's sent in LDS or RDS.
RouteConfigurationResponseState(BalancerServerThread * balancer)897   absl::optional<AdsServiceImpl::ResponseState> RouteConfigurationResponseState(
898       BalancerServerThread* balancer) const {
899     AdsServiceImpl* ads_service = balancer->ads_service();
900     if (GetParam().enable_rds_testing()) {
901       return ads_service->rds_response_state();
902     }
903     return ads_service->lds_response_state();
904   }
905 
906   //
907   // Miscellaneous helper methods
908   //
909 
910   // There is slight difference between time fetched by GPR and by C++ system
911   // clock API. It's unclear if they are using the same syscall, but we do know
912   // GPR round the number at millisecond-level. This creates a 1ms difference,
913   // which could cause flake.
NowFromCycleCounter()914   static grpc_core::Timestamp NowFromCycleCounter() {
915     return grpc_core::Timestamp::FromTimespecRoundDown(
916         gpr_now(GPR_CLOCK_MONOTONIC));
917   }
918 
919   // Sets duration_proto to duration times grpc_test_slowdown_factor().
920   static void SetProtoDuration(grpc_core::Duration duration,
921                                google::protobuf::Duration* duration_proto);
922 
923   // Returns the number of RPCs needed to pass error_tolerance at 99.99994%
924   // chance. Rolling dices in drop/fault-injection generates a binomial
925   // distribution (if our code is not horribly wrong). Let's make "n" the number
926   // of samples, "p" the probability. If we have np>5 & n(1-p)>5, we can
927   // approximately treat the binomial distribution as a normal distribution.
928   //
929   // For normal distribution, we can easily look up how many standard deviation
930   // we need to reach 99.995%. Based on Wiki's table
931   // https://en.wikipedia.org/wiki/68%E2%80%9395%E2%80%9399.7_rule, we need 5.00
932   // sigma (standard deviation) to cover the probability area of 99.99994%. In
933   // another word, for a sample with size "n" probability "p" error-tolerance
934   // "k", we want the error always land within 5.00 sigma. The sigma of
935   // binominal distribution and be computed as sqrt(np(1-p)). Hence, we have
936   // the equation:
937   //
938   //   kn <= 5.00 * sqrt(np(1-p))
939   // TODO(yashykt): The above explanation assumes a normal distribution, but we
940   // use a uniform distribution instead. We need a better estimate of how many
941   // RPCs are needed with what error tolerance.
ComputeIdealNumRpcs(double p,double error_tolerance)942   static size_t ComputeIdealNumRpcs(double p, double error_tolerance) {
943     GPR_ASSERT(p >= 0 && p <= 1);
944     size_t num_rpcs =
945         ceil(p * (1 - p) * 5.00 * 5.00 / error_tolerance / error_tolerance);
946     num_rpcs += 1000;  // Add 1K as a buffer to avoid flakiness.
947     gpr_log(GPR_INFO,
948             "Sending %" PRIuPTR
949             " RPCs for percentage=%.3f error_tolerance=%.3f",
950             num_rpcs, p, error_tolerance);
951     return num_rpcs;
952   }
953 
954   // Returns a regex that can be matched against an RPC failure status
955   // message for a connection failure.
956   static std::string MakeConnectionFailureRegex(absl::string_view prefix);
957 
958   // Returns a private key pair, read from local files.
959   static grpc_core::PemKeyCertPairList ReadTlsIdentityPair(
960       const char* key_path, const char* cert_path);
961 
962   // Returns client credentials suitable for using as fallback
963   // credentials for XdsCredentials.
964   static std::shared_ptr<ChannelCredentials> CreateTlsFallbackCredentials();
965 
966   std::unique_ptr<BalancerServerThread> balancer_;
967 
968   std::shared_ptr<Channel> channel_;
969   std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
970   std::unique_ptr<grpc::testing::EchoTest1Service::Stub> stub1_;
971   std::unique_ptr<grpc::testing::EchoTest2Service::Stub> stub2_;
972 
973   std::vector<std::unique_ptr<BackendServerThread>> backends_;
974 
975   // Default xDS resources.
976   Listener default_listener_;
977   RouteConfiguration default_route_config_;
978   Listener default_server_listener_;
979   RouteConfiguration default_server_route_config_;
980   Cluster default_cluster_;
981 
982   int xds_drain_grace_time_ms_ = 10 * 60 * 1000;  // 10 mins
983 
984   bool bootstrap_contents_from_env_var_;
985   std::string bootstrap_;
986   char* bootstrap_file_ = nullptr;
987   absl::InlinedVector<grpc_arg, 3> xds_channel_args_to_add_;
988   grpc_channel_args xds_channel_args_;
989 };
990 
991 }  // namespace testing
992 }  // namespace grpc
993 
994 #endif  // GRPC_TEST_CPP_END2END_XDS_XDS_END2END_TEST_LIB_H
995