xref: /aosp_15_r20/external/grpc-grpc/test/cpp/interop/xds_interop_client.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2020 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <atomic>
20 #include <chrono>
21 #include <condition_variable>
22 #include <deque>
23 #include <map>
24 #include <memory>
25 #include <mutex>
26 #include <set>
27 #include <sstream>
28 #include <string>
29 #include <thread>
30 #include <utility>
31 #include <vector>
32 
33 #include "absl/algorithm/container.h"
34 #include "absl/flags/flag.h"
35 #include "absl/strings/str_split.h"
36 #include "opentelemetry/exporters/prometheus/exporter_factory.h"
37 #include "opentelemetry/exporters/prometheus/exporter_options.h"
38 #include "opentelemetry/sdk/metrics/meter_provider.h"
39 
40 #include <grpcpp/ext/admin_services.h>
41 #include <grpcpp/ext/csm_observability.h>
42 #include <grpcpp/ext/proto_server_reflection_plugin.h>
43 #include <grpcpp/grpcpp.h>
44 #include <grpcpp/server.h>
45 #include <grpcpp/server_builder.h>
46 #include <grpcpp/server_context.h>
47 
48 #include "src/core/lib/channel/status_util.h"
49 #include "src/core/lib/gprpp/env.h"
50 #include "src/proto/grpc/testing/empty.pb.h"
51 #include "src/proto/grpc/testing/messages.pb.h"
52 #include "src/proto/grpc/testing/test.grpc.pb.h"
53 #include "test/core/util/test_config.h"
54 #include "test/cpp/interop/rpc_behavior_lb_policy.h"
55 #include "test/cpp/interop/xds_stats_watcher.h"
56 #include "test/cpp/util/test_config.h"
57 
58 ABSL_FLAG(bool, fail_on_failed_rpc, false,
59           "Fail client if any RPCs fail after first successful RPC.");
60 ABSL_FLAG(int32_t, num_channels, 1, "Number of channels.");
61 ABSL_FLAG(bool, print_response, false, "Write RPC response to stdout.");
62 ABSL_FLAG(int32_t, qps, 1, "Qps per channel.");
63 // TODO(Capstan): Consider using absl::Duration
64 ABSL_FLAG(int32_t, rpc_timeout_sec, 30, "Per RPC timeout seconds.");
65 ABSL_FLAG(std::string, server, "localhost:50051", "Address of server.");
66 ABSL_FLAG(int32_t, stats_port, 50052,
67           "Port to expose peer distribution stats service.");
68 ABSL_FLAG(std::string, rpc, "UnaryCall",
69           "a comma separated list of rpc methods.");
70 ABSL_FLAG(int32_t, request_payload_size, 0,
71           "Set the SimpleRequest.payload.body to a string of repeated 0 (zero) "
72           "ASCII characters of the given size in bytes.");
73 ABSL_FLAG(int32_t, response_payload_size, 0,
74           "Ask the server to respond with SimpleResponse.payload.body of the "
75           "given length (may not be implemented on the server).");
76 ABSL_FLAG(std::string, metadata, "", "metadata to send with the RPC.");
77 ABSL_FLAG(std::string, expect_status, "OK",
78           "RPC status for the test RPC to be considered successful");
79 ABSL_FLAG(
80     bool, secure_mode, false,
81     "If true, XdsCredentials are used, InsecureChannelCredentials otherwise");
82 ABSL_FLAG(bool, enable_csm_observability, false,
83           "Whether to enable CSM Observability");
84 
85 using grpc::Channel;
86 using grpc::ClientAsyncResponseReader;
87 using grpc::ClientContext;
88 using grpc::CompletionQueue;
89 using grpc::Server;
90 using grpc::ServerBuilder;
91 using grpc::ServerContext;
92 using grpc::Status;
93 using grpc::testing::AsyncClientCallResult;
94 using grpc::testing::ClientConfigureRequest;
95 using grpc::testing::ClientConfigureResponse;
96 using grpc::testing::Empty;
97 using grpc::testing::LoadBalancerAccumulatedStatsRequest;
98 using grpc::testing::LoadBalancerAccumulatedStatsResponse;
99 using grpc::testing::LoadBalancerStatsRequest;
100 using grpc::testing::LoadBalancerStatsResponse;
101 using grpc::testing::LoadBalancerStatsService;
102 using grpc::testing::SimpleRequest;
103 using grpc::testing::SimpleResponse;
104 using grpc::testing::StatsWatchers;
105 using grpc::testing::TestService;
106 using grpc::testing::XdsStatsWatcher;
107 using grpc::testing::XdsUpdateClientConfigureService;
108 
109 struct AsyncClientCall {
110   ClientContext context;
111   std::unique_ptr<ClientAsyncResponseReader<Empty>> empty_response_reader;
112   std::unique_ptr<ClientAsyncResponseReader<SimpleResponse>>
113       simple_response_reader;
114 
115   AsyncClientCallResult result;
116 };
117 
118 // Whether at least one RPC has succeeded, indicating xDS resolution
119 // completed.
120 std::atomic<bool> one_rpc_succeeded(false);
121 // RPC configuration detailing how RPC should be sent.
122 struct RpcConfig {
123   ClientConfigureRequest::RpcType type;
124   std::vector<std::pair<std::string, std::string>> metadata;
125   int timeout_sec = 0;
126   std::string request_payload;
127   int request_payload_size = 0;
128   int response_payload_size = 0;
129 };
130 struct RpcConfigurationsQueue {
131   // A queue of RPC configurations detailing how RPCs should be sent.
132   std::deque<std::vector<RpcConfig>> rpc_configs_queue;
133   // Mutex for rpc_configs_queue
134   std::mutex mu_rpc_configs_queue;
135 };
136 
137 class TestClient {
138  public:
TestClient(const std::shared_ptr<Channel> & channel,StatsWatchers * stats_watchers)139   TestClient(const std::shared_ptr<Channel>& channel,
140              StatsWatchers* stats_watchers)
141       : stub_(TestService::NewStub(channel)), stats_watchers_(stats_watchers) {}
142 
AsyncUnaryCall(const RpcConfig & config)143   void AsyncUnaryCall(const RpcConfig& config) {
144     SimpleResponse response;
145     int saved_request_id;
146     {
147       std::lock_guard<std::mutex> lock(stats_watchers_->mu);
148       saved_request_id = ++stats_watchers_->global_request_id;
149       ++stats_watchers_
150             ->global_request_id_by_type[ClientConfigureRequest::UNARY_CALL];
151     }
152     std::chrono::system_clock::time_point deadline =
153         std::chrono::system_clock::now() +
154         std::chrono::seconds(config.timeout_sec != 0
155                                  ? config.timeout_sec
156                                  : absl::GetFlag(FLAGS_rpc_timeout_sec));
157     AsyncClientCall* call = new AsyncClientCall;
158     for (const auto& data : config.metadata) {
159       call->context.AddMetadata(data.first, data.second);
160       // TODO(@donnadionne): move deadline to separate proto.
161       if (data.first == "rpc-behavior" && data.second == "keep-open") {
162         deadline =
163             std::chrono::system_clock::now() + std::chrono::seconds(INT_MAX);
164       }
165     }
166     SimpleRequest request;
167     request.set_response_size(config.response_payload_size);
168     if (config.request_payload_size > 0) {
169       request.mutable_payload()->set_body(config.request_payload.c_str(),
170                                           config.request_payload_size);
171     }
172     call->context.set_deadline(deadline);
173     call->result.saved_request_id = saved_request_id;
174     call->result.rpc_type = ClientConfigureRequest::UNARY_CALL;
175     call->simple_response_reader =
176         stub_->PrepareAsyncUnaryCall(&call->context, request, &cq_);
177     call->simple_response_reader->StartCall();
178     call->simple_response_reader->Finish(&call->result.simple_response,
179                                          &call->result.status, call);
180   }
181 
AsyncEmptyCall(const RpcConfig & config)182   void AsyncEmptyCall(const RpcConfig& config) {
183     Empty response;
184     int saved_request_id;
185     {
186       std::lock_guard<std::mutex> lock(stats_watchers_->mu);
187       saved_request_id = ++stats_watchers_->global_request_id;
188       ++stats_watchers_
189             ->global_request_id_by_type[ClientConfigureRequest::EMPTY_CALL];
190     }
191     std::chrono::system_clock::time_point deadline =
192         std::chrono::system_clock::now() +
193         std::chrono::seconds(config.timeout_sec != 0
194                                  ? config.timeout_sec
195                                  : absl::GetFlag(FLAGS_rpc_timeout_sec));
196     AsyncClientCall* call = new AsyncClientCall;
197     for (const auto& data : config.metadata) {
198       call->context.AddMetadata(data.first, data.second);
199       // TODO(@donnadionne): move deadline to separate proto.
200       if (data.first == "rpc-behavior" && data.second == "keep-open") {
201         deadline =
202             std::chrono::system_clock::now() + std::chrono::seconds(INT_MAX);
203       }
204     }
205     call->context.set_deadline(deadline);
206     call->result.saved_request_id = saved_request_id;
207     call->result.rpc_type = ClientConfigureRequest::EMPTY_CALL;
208     call->empty_response_reader = stub_->PrepareAsyncEmptyCall(
209         &call->context, Empty::default_instance(), &cq_);
210     call->empty_response_reader->StartCall();
211     call->empty_response_reader->Finish(&call->result.empty_response,
212                                         &call->result.status, call);
213   }
214 
AsyncCompleteRpc()215   void AsyncCompleteRpc() {
216     void* got_tag;
217     bool ok = false;
218     while (cq_.Next(&got_tag, &ok)) {
219       AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);
220       GPR_ASSERT(ok);
221       {
222         std::lock_guard<std::mutex> lock(stats_watchers_->mu);
223         auto server_initial_metadata = call->context.GetServerInitialMetadata();
224         auto metadata_hostname =
225             call->context.GetServerInitialMetadata().find("hostname");
226         std::string hostname =
227             metadata_hostname != call->context.GetServerInitialMetadata().end()
228                 ? std::string(metadata_hostname->second.data(),
229                               metadata_hostname->second.length())
230                 : call->result.simple_response.hostname();
231         for (auto watcher : stats_watchers_->watchers) {
232           watcher->RpcCompleted(call->result, hostname,
233                                 call->context.GetServerInitialMetadata(),
234                                 call->context.GetServerTrailingMetadata());
235         }
236       }
237 
238       if (!RpcStatusCheckSuccess(call)) {
239         if (absl::GetFlag(FLAGS_print_response) ||
240             absl::GetFlag(FLAGS_fail_on_failed_rpc)) {
241           std::cout << "RPC failed: " << call->result.status.error_code()
242                     << ": " << call->result.status.error_message() << std::endl;
243         }
244         if (absl::GetFlag(FLAGS_fail_on_failed_rpc) &&
245             one_rpc_succeeded.load()) {
246           abort();
247         }
248       } else {
249         if (absl::GetFlag(FLAGS_print_response)) {
250           auto metadata_hostname =
251               call->context.GetServerInitialMetadata().find("hostname");
252           std::string hostname =
253               metadata_hostname !=
254                       call->context.GetServerInitialMetadata().end()
255                   ? std::string(metadata_hostname->second.data(),
256                                 metadata_hostname->second.length())
257                   : call->result.simple_response.hostname();
258           std::cout << "Greeting: Hello world, this is " << hostname
259                     << ", from " << call->context.peer() << std::endl;
260         }
261         one_rpc_succeeded = true;
262       }
263 
264       delete call;
265     }
266   }
267 
268  private:
RpcStatusCheckSuccess(AsyncClientCall * call)269   static bool RpcStatusCheckSuccess(AsyncClientCall* call) {
270     // Determine RPC success based on expected status.
271     grpc_status_code code;
272     GPR_ASSERT(grpc_status_code_from_string(
273         absl::GetFlag(FLAGS_expect_status).c_str(), &code));
274     return code ==
275            static_cast<grpc_status_code>(call->result.status.error_code());
276   }
277 
278   std::unique_ptr<TestService::Stub> stub_;
279   StatsWatchers* stats_watchers_;
280   CompletionQueue cq_;
281 };
282 
283 class LoadBalancerStatsServiceImpl : public LoadBalancerStatsService::Service {
284  public:
LoadBalancerStatsServiceImpl(StatsWatchers * stats_watchers)285   explicit LoadBalancerStatsServiceImpl(StatsWatchers* stats_watchers)
286       : stats_watchers_(stats_watchers) {}
287 
GetClientStats(ServerContext *,const LoadBalancerStatsRequest * request,LoadBalancerStatsResponse * response)288   Status GetClientStats(ServerContext* /*context*/,
289                         const LoadBalancerStatsRequest* request,
290                         LoadBalancerStatsResponse* response) override {
291     int start_id;
292     int end_id;
293     std::unique_ptr<XdsStatsWatcher> watcher;
294     {
295       std::lock_guard<std::mutex> lock(stats_watchers_->mu);
296       start_id = stats_watchers_->global_request_id + 1;
297       end_id = start_id + request->num_rpcs();
298       watcher = std::make_unique<XdsStatsWatcher>(
299           start_id, end_id,
300           std::vector<std::string>(request->metadata_keys().begin(),
301                                    request->metadata_keys().end()));
302       stats_watchers_->watchers.insert(watcher.get());
303     }
304     *response = watcher->WaitForRpcStatsResponse(request->timeout_sec());
305     {
306       std::lock_guard<std::mutex> lock(stats_watchers_->mu);
307       stats_watchers_->watchers.erase(watcher.get());
308     }
309     return Status::OK;
310   }
311 
GetClientAccumulatedStats(ServerContext *,const LoadBalancerAccumulatedStatsRequest *,LoadBalancerAccumulatedStatsResponse * response)312   Status GetClientAccumulatedStats(
313       ServerContext* /*context*/,
314       const LoadBalancerAccumulatedStatsRequest* /*request*/,
315       LoadBalancerAccumulatedStatsResponse* response) override {
316     std::lock_guard<std::mutex> lock(stats_watchers_->mu);
317     stats_watchers_->global_watcher->GetCurrentRpcStats(response,
318                                                         stats_watchers_);
319     return Status::OK;
320   }
321 
322  private:
323   StatsWatchers* stats_watchers_;
324 };
325 
326 class XdsUpdateClientConfigureServiceImpl
327     : public XdsUpdateClientConfigureService::Service {
328  public:
XdsUpdateClientConfigureServiceImpl(RpcConfigurationsQueue * rpc_configs_queue)329   explicit XdsUpdateClientConfigureServiceImpl(
330       RpcConfigurationsQueue* rpc_configs_queue)
331       : rpc_configs_queue_(rpc_configs_queue) {}
332 
Configure(ServerContext *,const ClientConfigureRequest * request,ClientConfigureResponse *)333   Status Configure(ServerContext* /*context*/,
334                    const ClientConfigureRequest* request,
335                    ClientConfigureResponse* /*response*/) override {
336     std::map<int, std::vector<std::pair<std::string, std::string>>>
337         metadata_map;
338     for (const auto& data : request->metadata()) {
339       metadata_map[data.type()].push_back({data.key(), data.value()});
340     }
341     std::vector<RpcConfig> configs;
342     int request_payload_size = absl::GetFlag(FLAGS_request_payload_size);
343     int response_payload_size = absl::GetFlag(FLAGS_response_payload_size);
344     GPR_ASSERT(request_payload_size >= 0);
345     GPR_ASSERT(response_payload_size >= 0);
346     for (const auto& rpc : request->types()) {
347       RpcConfig config;
348       config.timeout_sec = request->timeout_sec();
349       config.type = static_cast<ClientConfigureRequest::RpcType>(rpc);
350       auto metadata_iter = metadata_map.find(rpc);
351       if (metadata_iter != metadata_map.end()) {
352         config.metadata = metadata_iter->second;
353       }
354       if (request_payload_size > 0 &&
355           config.type == ClientConfigureRequest::EMPTY_CALL) {
356         gpr_log(GPR_ERROR,
357                 "request_payload_size should not be set "
358                 "for EMPTY_CALL");
359       }
360       if (response_payload_size > 0 &&
361           config.type == ClientConfigureRequest::EMPTY_CALL) {
362         gpr_log(GPR_ERROR,
363                 "response_payload_size should not be set "
364                 "for EMPTY_CALL");
365       }
366       config.request_payload_size = request_payload_size;
367       std::string payload(config.request_payload_size, '0');
368       config.request_payload = payload;
369       config.response_payload_size = response_payload_size;
370       configs.push_back(std::move(config));
371     }
372     {
373       std::lock_guard<std::mutex> lock(
374           rpc_configs_queue_->mu_rpc_configs_queue);
375       rpc_configs_queue_->rpc_configs_queue.emplace_back(std::move(configs));
376     }
377     return Status::OK;
378   }
379 
380  private:
381   RpcConfigurationsQueue* rpc_configs_queue_;
382 };
383 
RunTestLoop(std::chrono::duration<double> duration_per_query,StatsWatchers * stats_watchers,RpcConfigurationsQueue * rpc_configs_queue)384 void RunTestLoop(std::chrono::duration<double> duration_per_query,
385                  StatsWatchers* stats_watchers,
386                  RpcConfigurationsQueue* rpc_configs_queue) {
387   grpc::ChannelArguments channel_args;
388   channel_args.SetInt(GRPC_ARG_ENABLE_RETRIES, 1);
389   TestClient client(
390       grpc::CreateCustomChannel(
391           absl::GetFlag(FLAGS_server),
392           absl::GetFlag(FLAGS_secure_mode)
393               ? grpc::XdsCredentials(grpc::InsecureChannelCredentials())
394               : grpc::InsecureChannelCredentials(),
395           channel_args),
396       stats_watchers);
397   std::chrono::time_point<std::chrono::system_clock> start =
398       std::chrono::system_clock::now();
399   std::chrono::duration<double> elapsed;
400 
401   std::thread thread = std::thread(&TestClient::AsyncCompleteRpc, &client);
402 
403   std::vector<RpcConfig> configs;
404   while (true) {
405     {
406       std::lock_guard<std::mutex> lock(rpc_configs_queue->mu_rpc_configs_queue);
407       if (!rpc_configs_queue->rpc_configs_queue.empty()) {
408         configs = std::move(rpc_configs_queue->rpc_configs_queue.front());
409         rpc_configs_queue->rpc_configs_queue.pop_front();
410       }
411     }
412 
413     elapsed = std::chrono::system_clock::now() - start;
414     if (elapsed > duration_per_query) {
415       start = std::chrono::system_clock::now();
416       for (const auto& config : configs) {
417         if (config.type == ClientConfigureRequest::EMPTY_CALL) {
418           client.AsyncEmptyCall(config);
419         } else if (config.type == ClientConfigureRequest::UNARY_CALL) {
420           client.AsyncUnaryCall(config);
421         } else {
422           GPR_ASSERT(0);
423         }
424       }
425     }
426   }
427   GPR_UNREACHABLE_CODE(thread.join());
428 }
429 
EnableCsmObservability()430 grpc::CsmObservability EnableCsmObservability() {
431   gpr_log(GPR_DEBUG, "Registering Prometheus exporter");
432   opentelemetry::exporter::metrics::PrometheusExporterOptions opts;
433   // default was "localhost:9464" which causes connection issue across GKE
434   // pods
435   opts.url = "0.0.0.0:9464";
436   auto prometheus_exporter =
437       opentelemetry::exporter::metrics::PrometheusExporterFactory::Create(opts);
438   auto meter_provider =
439       std::make_shared<opentelemetry::sdk::metrics::MeterProvider>();
440   meter_provider->AddMetricReader(std::move(prometheus_exporter));
441   auto observability = grpc::CsmObservabilityBuilder()
442                            .SetMeterProvider(std::move(meter_provider))
443                            .BuildAndRegister();
444   assert(observability.ok());
445   return *std::move(observability);
446 }
447 
RunServer(const int port,StatsWatchers * stats_watchers,RpcConfigurationsQueue * rpc_configs_queue)448 void RunServer(const int port, StatsWatchers* stats_watchers,
449                RpcConfigurationsQueue* rpc_configs_queue) {
450   GPR_ASSERT(port != 0);
451   std::ostringstream server_address;
452   server_address << "0.0.0.0:" << port;
453 
454   LoadBalancerStatsServiceImpl stats_service(stats_watchers);
455   XdsUpdateClientConfigureServiceImpl client_config_service(rpc_configs_queue);
456 
457   grpc::reflection::InitProtoReflectionServerBuilderPlugin();
458   ServerBuilder builder;
459   builder.RegisterService(&stats_service);
460   builder.RegisterService(&client_config_service);
461   grpc::AddAdminServices(&builder);
462   builder.AddListeningPort(server_address.str(),
463                            grpc::InsecureServerCredentials());
464   std::unique_ptr<Server> server(builder.BuildAndStart());
465   gpr_log(GPR_DEBUG, "Server listening on %s", server_address.str().c_str());
466 
467   server->Wait();
468 }
469 
BuildRpcConfigsFromFlags(RpcConfigurationsQueue * rpc_configs_queue)470 void BuildRpcConfigsFromFlags(RpcConfigurationsQueue* rpc_configs_queue) {
471   // Store Metadata like
472   // "EmptyCall:key1:value1,UnaryCall:key1:value1,UnaryCall:key2:value2" into a
473   // map where the key is the RPC method and value is a vector of key:value
474   // pairs. {EmptyCall, [{key1,value1}],
475   //  UnaryCall, [{key1,value1}, {key2,value2}]}
476   std::vector<std::string> rpc_metadata =
477       absl::StrSplit(absl::GetFlag(FLAGS_metadata), ',', absl::SkipEmpty());
478   std::map<int, std::vector<std::pair<std::string, std::string>>> metadata_map;
479   for (auto& data : rpc_metadata) {
480     std::vector<std::string> metadata =
481         absl::StrSplit(data, ':', absl::SkipEmpty());
482     GPR_ASSERT(metadata.size() == 3);
483     if (metadata[0] == "EmptyCall") {
484       metadata_map[ClientConfigureRequest::EMPTY_CALL].push_back(
485           {metadata[1], metadata[2]});
486     } else if (metadata[0] == "UnaryCall") {
487       metadata_map[ClientConfigureRequest::UNARY_CALL].push_back(
488           {metadata[1], metadata[2]});
489     } else {
490       GPR_ASSERT(0);
491     }
492   }
493   std::vector<RpcConfig> configs;
494   std::vector<std::string> rpc_methods =
495       absl::StrSplit(absl::GetFlag(FLAGS_rpc), ',', absl::SkipEmpty());
496   int request_payload_size = absl::GetFlag(FLAGS_request_payload_size);
497   int response_payload_size = absl::GetFlag(FLAGS_response_payload_size);
498   GPR_ASSERT(request_payload_size >= 0);
499   GPR_ASSERT(response_payload_size >= 0);
500   for (const std::string& rpc_method : rpc_methods) {
501     RpcConfig config;
502     if (rpc_method == "EmptyCall") {
503       config.type = ClientConfigureRequest::EMPTY_CALL;
504     } else if (rpc_method == "UnaryCall") {
505       config.type = ClientConfigureRequest::UNARY_CALL;
506     } else {
507       GPR_ASSERT(0);
508     }
509     auto metadata_iter = metadata_map.find(config.type);
510     if (metadata_iter != metadata_map.end()) {
511       config.metadata = metadata_iter->second;
512     }
513     if (request_payload_size > 0 &&
514         config.type == ClientConfigureRequest::EMPTY_CALL) {
515       gpr_log(GPR_ERROR,
516               "request_payload_size should not be set "
517               "for EMPTY_CALL");
518     }
519     if (response_payload_size > 0 &&
520         config.type == ClientConfigureRequest::EMPTY_CALL) {
521       gpr_log(GPR_ERROR,
522               "response_payload_size should not be set "
523               "for EMPTY_CALL");
524     }
525     config.request_payload_size = request_payload_size;
526     std::string payload(config.request_payload_size, '0');
527     config.request_payload = payload;
528     config.response_payload_size = response_payload_size;
529     configs.push_back(std::move(config));
530   }
531   {
532     std::lock_guard<std::mutex> lock(rpc_configs_queue->mu_rpc_configs_queue);
533     rpc_configs_queue->rpc_configs_queue.emplace_back(std::move(configs));
534   }
535 }
536 
main(int argc,char ** argv)537 int main(int argc, char** argv) {
538   grpc_core::CoreConfiguration::RegisterBuilder(
539       grpc::testing::RegisterRpcBehaviorLbPolicy);
540   grpc::testing::TestEnvironment env(&argc, argv);
541   grpc::testing::InitTest(&argc, &argv, true);
542   // Validate the expect_status flag.
543   grpc_status_code code;
544   GPR_ASSERT(grpc_status_code_from_string(
545       absl::GetFlag(FLAGS_expect_status).c_str(), &code));
546   StatsWatchers stats_watchers;
547   RpcConfigurationsQueue rpc_config_queue;
548 
549   {
550     std::lock_guard<std::mutex> lock(stats_watchers.mu);
551     stats_watchers.global_watcher = new XdsStatsWatcher(0, 0, {});
552     stats_watchers.watchers.insert(stats_watchers.global_watcher);
553   }
554 
555   BuildRpcConfigsFromFlags(&rpc_config_queue);
556   grpc::CsmObservability observability;
557   if (absl::GetFlag(FLAGS_enable_csm_observability)) {
558     observability = EnableCsmObservability();
559   }
560 
561   std::chrono::duration<double> duration_per_query =
562       std::chrono::nanoseconds(std::chrono::seconds(1)) /
563       absl::GetFlag(FLAGS_qps);
564 
565   std::vector<std::thread> test_threads;
566   test_threads.reserve(absl::GetFlag(FLAGS_num_channels));
567   for (int i = 0; i < absl::GetFlag(FLAGS_num_channels); i++) {
568     test_threads.emplace_back(std::thread(&RunTestLoop, duration_per_query,
569                                           &stats_watchers, &rpc_config_queue));
570   }
571 
572   RunServer(absl::GetFlag(FLAGS_stats_port), &stats_watchers,
573             &rpc_config_queue);
574 
575   for (auto it = test_threads.begin(); it != test_threads.end(); it++) {
576     it->join();
577   }
578 
579   return 0;
580 }
581