xref: /aosp_15_r20/external/grpc-grpc/test/cpp/qps/driver.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "test/cpp/qps/driver.h"
20 
21 #include <cinttypes>
22 #include <deque>
23 #include <list>
24 #include <thread>
25 #include <unordered_map>
26 #include <vector>
27 
28 #include "google/protobuf/timestamp.pb.h"
29 
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/log.h>
32 #include <grpc/support/string_util.h>
33 #include <grpcpp/channel.h>
34 #include <grpcpp/client_context.h>
35 #include <grpcpp/create_channel.h>
36 
37 #include "src/core/lib/gprpp/crash.h"
38 #include "src/core/lib/gprpp/env.h"
39 #include "src/core/lib/gprpp/host_port.h"
40 #include "src/proto/grpc/testing/worker_service.grpc.pb.h"
41 #include "test/core/util/port.h"
42 #include "test/core/util/test_config.h"
43 #include "test/cpp/qps/client.h"
44 #include "test/cpp/qps/histogram.h"
45 #include "test/cpp/qps/qps_worker.h"
46 #include "test/cpp/qps/stats.h"
47 #include "test/cpp/util/test_credentials_provider.h"
48 
49 using std::deque;
50 using std::list;
51 using std::unique_ptr;
52 using std::vector;
53 
54 namespace grpc {
55 namespace testing {
get_host(const std::string & worker)56 static std::string get_host(const std::string& worker) {
57   absl::string_view host;
58   absl::string_view port;
59   grpc_core::SplitHostPort(worker.c_str(), &host, &port);
60   return std::string(host.data(), host.size());
61 }
62 
get_workers(const string & env_name)63 static deque<string> get_workers(const string& env_name) {
64   deque<string> out;
65   auto env = grpc_core::GetEnv(env_name.c_str()).value_or("");
66   const char* p = env.c_str();
67   if (!env.empty()) {
68     for (;;) {
69       const char* comma = strchr(p, ',');
70       if (comma) {
71         out.emplace_back(p, comma);
72         p = comma + 1;
73       } else {
74         out.emplace_back(p);
75         break;
76       }
77     }
78   }
79   if (out.empty()) {
80     gpr_log(GPR_ERROR,
81             "Environment variable \"%s\" does not contain a list of QPS "
82             "workers to use. Set it to a comma-separated list of "
83             "hostname:port pairs, starting with hosts that should act as "
84             "servers. E.g. export "
85             "%s=\"serverhost1:1234,clienthost1:1234,clienthost2:1234\"",
86             env_name.c_str(), env_name.c_str());
87   }
88   return out;
89 }
90 
GetCredType(const std::string & worker_addr,const std::map<std::string,std::string> & per_worker_credential_types,const std::string & credential_type)91 std::string GetCredType(
92     const std::string& worker_addr,
93     const std::map<std::string, std::string>& per_worker_credential_types,
94     const std::string& credential_type) {
95   auto it = per_worker_credential_types.find(worker_addr);
96   if (it != per_worker_credential_types.end()) {
97     return it->second;
98   }
99   return credential_type;
100 }
101 
102 // helpers for postprocess_scenario_result
WallTime(const ClientStats & s)103 static double WallTime(const ClientStats& s) { return s.time_elapsed(); }
SystemTime(const ClientStats & s)104 static double SystemTime(const ClientStats& s) { return s.time_system(); }
UserTime(const ClientStats & s)105 static double UserTime(const ClientStats& s) { return s.time_user(); }
CliPollCount(const ClientStats & s)106 static double CliPollCount(const ClientStats& s) { return s.cq_poll_count(); }
SvrPollCount(const ServerStats & s)107 static double SvrPollCount(const ServerStats& s) { return s.cq_poll_count(); }
ServerSystemTime(const ServerStats & s)108 static double ServerSystemTime(const ServerStats& s) { return s.time_system(); }
ServerUserTime(const ServerStats & s)109 static double ServerUserTime(const ServerStats& s) { return s.time_user(); }
ServerTotalCpuTime(const ServerStats & s)110 static double ServerTotalCpuTime(const ServerStats& s) {
111   return s.total_cpu_time();
112 }
ServerIdleCpuTime(const ServerStats & s)113 static double ServerIdleCpuTime(const ServerStats& s) {
114   return s.idle_cpu_time();
115 }
Cores(int n)116 static int Cores(int n) { return n; }
117 
IsSuccess(const Status & s)118 static bool IsSuccess(const Status& s) {
119   if (s.ok()) return true;
120   // Since we shutdown servers and clients at the same time, they both can
121   // observe cancellation.  Thus, we consider CANCELLED as good status.
122   if (static_cast<StatusCode>(s.error_code()) == StatusCode::CANCELLED) {
123     return true;
124   }
125   // Since we shutdown servers and clients at the same time, server can close
126   // the socket before the client attempts to do that, and vice versa.  Thus
127   // receiving a "Socket closed" error is fine.
128   if (s.error_message() == "Socket closed") return true;
129   return false;
130 }
131 
132 // Postprocess ScenarioResult and populate result summary.
postprocess_scenario_result(ScenarioResult * result)133 static void postprocess_scenario_result(ScenarioResult* result) {
134   // Get latencies from ScenarioResult latencies histogram and populate to
135   // result summary.
136   Histogram histogram;
137   histogram.MergeProto(result->latencies());
138   result->mutable_summary()->set_latency_50(histogram.Percentile(50));
139   result->mutable_summary()->set_latency_90(histogram.Percentile(90));
140   result->mutable_summary()->set_latency_95(histogram.Percentile(95));
141   result->mutable_summary()->set_latency_99(histogram.Percentile(99));
142   result->mutable_summary()->set_latency_999(histogram.Percentile(99.9));
143 
144   // Calculate qps and cpu load for each client and then aggregate results for
145   // all clients
146   double qps = 0;
147   double client_system_cpu_load = 0, client_user_cpu_load = 0;
148   for (int i = 0; i < result->client_stats_size(); i++) {
149     auto client_stat = result->client_stats(i);
150     qps += client_stat.latencies().count() / client_stat.time_elapsed();
151     client_system_cpu_load +=
152         client_stat.time_system() / client_stat.time_elapsed();
153     client_user_cpu_load +=
154         client_stat.time_user() / client_stat.time_elapsed();
155   }
156   // Calculate cpu load for each server and then aggregate results for all
157   // servers
158   double server_system_cpu_load = 0, server_user_cpu_load = 0;
159   for (int i = 0; i < result->server_stats_size(); i++) {
160     auto server_stat = result->server_stats(i);
161     server_system_cpu_load +=
162         server_stat.time_system() / server_stat.time_elapsed();
163     server_user_cpu_load +=
164         server_stat.time_user() / server_stat.time_elapsed();
165   }
166   result->mutable_summary()->set_qps(qps);
167   // Populate the percentage of cpu load to result summary.
168   result->mutable_summary()->set_server_system_time(100 *
169                                                     server_system_cpu_load);
170   result->mutable_summary()->set_server_user_time(100 * server_user_cpu_load);
171   result->mutable_summary()->set_client_system_time(100 *
172                                                     client_system_cpu_load);
173   result->mutable_summary()->set_client_user_time(100 * client_user_cpu_load);
174 
175   // For Non-linux platform, get_cpu_usage() is not implemented. Thus,
176   // ServerTotalCpuTime and ServerIdleCpuTime are both 0.
177   if (average(result->server_stats(), ServerTotalCpuTime) == 0) {
178     result->mutable_summary()->set_server_cpu_usage(0);
179   } else {
180     auto server_cpu_usage =
181         100 - 100 * average(result->server_stats(), ServerIdleCpuTime) /
182                   average(result->server_stats(), ServerTotalCpuTime);
183     result->mutable_summary()->set_server_cpu_usage(server_cpu_usage);
184   }
185 
186   // Calculate and populate successful request per second and failed requests
187   // per seconds to result summary.
188   auto time_estimate = average(result->client_stats(), WallTime);
189   if (result->request_results_size() > 0) {
190     int64_t successes = 0;
191     int64_t failures = 0;
192     for (int i = 0; i < result->request_results_size(); i++) {
193       const RequestResultCount& rrc = result->request_results(i);
194       if (rrc.status_code() == 0) {
195         successes += rrc.count();
196       } else {
197         failures += rrc.count();
198       }
199     }
200     result->mutable_summary()->set_successful_requests_per_second(
201         successes / time_estimate);
202     result->mutable_summary()->set_failed_requests_per_second(failures /
203                                                               time_estimate);
204   }
205 
206   // Fill in data for other metrics required in result summary
207   auto qps_per_server_core = qps / sum(result->server_cores(), Cores);
208   result->mutable_summary()->set_qps_per_server_core(qps_per_server_core);
209   result->mutable_summary()->set_client_polls_per_request(
210       sum(result->client_stats(), CliPollCount) / histogram.Count());
211   result->mutable_summary()->set_server_polls_per_request(
212       sum(result->server_stats(), SvrPollCount) / histogram.Count());
213 
214   auto server_queries_per_cpu_sec =
215       histogram.Count() / (sum(result->server_stats(), ServerSystemTime) +
216                            sum(result->server_stats(), ServerUserTime));
217   auto client_queries_per_cpu_sec =
218       histogram.Count() / (sum(result->client_stats(), SystemTime) +
219                            sum(result->client_stats(), UserTime));
220 
221   result->mutable_summary()->set_server_queries_per_cpu_sec(
222       server_queries_per_cpu_sec);
223   result->mutable_summary()->set_client_queries_per_cpu_sec(
224       client_queries_per_cpu_sec);
225 }
226 
227 struct ClientData {
228   unique_ptr<WorkerService::Stub> stub;
229   unique_ptr<ClientReaderWriter<ClientArgs, ClientStatus>> stream;
230 };
231 
232 struct ServerData {
233   unique_ptr<WorkerService::Stub> stub;
234   unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream;
235 };
236 
FinishClients(const std::vector<ClientData> & clients,const ClientArgs & client_mark)237 static void FinishClients(const std::vector<ClientData>& clients,
238                           const ClientArgs& client_mark) {
239   gpr_log(GPR_INFO, "Finishing clients");
240   for (size_t i = 0, i_end = clients.size(); i < i_end; i++) {
241     auto client = &clients[i];
242     if (!client->stream->Write(client_mark)) {
243       grpc_core::Crash(absl::StrFormat("Couldn't write mark to client %zu", i));
244     }
245     if (!client->stream->WritesDone()) {
246       grpc_core::Crash(absl::StrFormat("Failed WritesDone for client %zu", i));
247     }
248   }
249 }
250 
ReceiveFinalStatusFromClients(const std::vector<ClientData> & clients,Histogram & merged_latencies,std::unordered_map<int,int64_t> & merged_statuses,ScenarioResult & result)251 static void ReceiveFinalStatusFromClients(
252     const std::vector<ClientData>& clients, Histogram& merged_latencies,
253     std::unordered_map<int, int64_t>& merged_statuses, ScenarioResult& result) {
254   gpr_log(GPR_INFO, "Receiving final status from clients");
255   ClientStatus client_status;
256   for (size_t i = 0, i_end = clients.size(); i < i_end; i++) {
257     auto client = &clients[i];
258     // Read the client final status
259     if (client->stream->Read(&client_status)) {
260       gpr_log(GPR_INFO, "Received final status from client %zu", i);
261       const auto& stats = client_status.stats();
262       merged_latencies.MergeProto(stats.latencies());
263       for (int i = 0; i < stats.request_results_size(); i++) {
264         merged_statuses[stats.request_results(i).status_code()] +=
265             stats.request_results(i).count();
266       }
267       result.add_client_stats()->CopyFrom(stats);
268       // Check that final status was should be the last message on the client
269       // stream.
270       // TODO(jtattermusch): note that that waiting for Read to return can take
271       // long on some scenarios (e.g. unconstrained streaming_from_server). See
272       // https://github.com/grpc/grpc/blob/3bd0cd208ea549760a2daf595f79b91b247fe240/test/cpp/qps/server_async.cc#L176
273       // where the shutdown delay pretty much determines the wait here.
274       GPR_ASSERT(!client->stream->Read(&client_status));
275     } else {
276       grpc_core::Crash(
277           absl::StrFormat("Couldn't get final status from client %zu", i));
278     }
279   }
280 }
281 
ShutdownClients(const std::vector<ClientData> & clients,ScenarioResult & result)282 static void ShutdownClients(const std::vector<ClientData>& clients,
283                             ScenarioResult& result) {
284   gpr_log(GPR_INFO, "Shutdown clients");
285   for (size_t i = 0, i_end = clients.size(); i < i_end; i++) {
286     auto client = &clients[i];
287     Status s = client->stream->Finish();
288     // Since we shutdown servers and clients at the same time, clients can
289     // observe cancellation.  Thus, we consider both OK and CANCELLED as good
290     // status.
291     const bool success = IsSuccess(s);
292     result.add_client_success(success);
293     if (!success) {
294       grpc_core::Crash(absl::StrFormat("Client %zu had an error %s", i,
295                                        s.error_message().c_str()));
296     }
297   }
298 }
299 
FinishServers(const std::vector<ServerData> & servers,const ServerArgs & server_mark)300 static void FinishServers(const std::vector<ServerData>& servers,
301                           const ServerArgs& server_mark) {
302   gpr_log(GPR_INFO, "Finishing servers");
303   for (size_t i = 0, i_end = servers.size(); i < i_end; i++) {
304     auto server = &servers[i];
305     if (!server->stream->Write(server_mark)) {
306       grpc_core::Crash(absl::StrFormat("Couldn't write mark to server %zu", i));
307     }
308     if (!server->stream->WritesDone()) {
309       grpc_core::Crash(absl::StrFormat("Failed WritesDone for server %zu", i));
310     }
311   }
312 }
313 
ReceiveFinalStatusFromServer(const std::vector<ServerData> & servers,ScenarioResult & result)314 static void ReceiveFinalStatusFromServer(const std::vector<ServerData>& servers,
315                                          ScenarioResult& result) {
316   gpr_log(GPR_INFO, "Receiving final status from servers");
317   ServerStatus server_status;
318   for (size_t i = 0, i_end = servers.size(); i < i_end; i++) {
319     auto server = &servers[i];
320     // Read the server final status
321     if (server->stream->Read(&server_status)) {
322       gpr_log(GPR_INFO, "Received final status from server %zu", i);
323       result.add_server_stats()->CopyFrom(server_status.stats());
324       result.add_server_cores(server_status.cores());
325       // That final status should be the last message on the server stream
326       GPR_ASSERT(!server->stream->Read(&server_status));
327     } else {
328       grpc_core::Crash(
329           absl::StrFormat("Couldn't get final status from server %zu", i));
330     }
331   }
332 }
333 
ShutdownServers(const std::vector<ServerData> & servers,ScenarioResult & result)334 static void ShutdownServers(const std::vector<ServerData>& servers,
335                             ScenarioResult& result) {
336   gpr_log(GPR_INFO, "Shutdown servers");
337   for (size_t i = 0, i_end = servers.size(); i < i_end; i++) {
338     auto server = &servers[i];
339     Status s = server->stream->Finish();
340     // Since we shutdown servers and clients at the same time, servers can
341     // observe cancellation.  Thus, we consider both OK and CANCELLED as good
342     // status.
343     const bool success = IsSuccess(s);
344     result.add_server_success(success);
345     if (!success) {
346       grpc_core::Crash(absl::StrFormat("Server %zu had an error %s", i,
347                                        s.error_message().c_str()));
348     }
349   }
350 }
351 
352 std::vector<grpc::testing::Server*>* g_inproc_servers = nullptr;
353 
RunScenario(const ClientConfig & initial_client_config,size_t num_clients,const ServerConfig & initial_server_config,size_t num_servers,int warmup_seconds,int benchmark_seconds,int spawn_local_worker_count,const std::string & qps_server_target_override,const std::string & credential_type,const std::map<std::string,std::string> & per_worker_credential_types,bool run_inproc,int32_t median_latency_collection_interval_millis)354 std::unique_ptr<ScenarioResult> RunScenario(
355     const ClientConfig& initial_client_config, size_t num_clients,
356     const ServerConfig& initial_server_config, size_t num_servers,
357     int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count,
358     const std::string& qps_server_target_override,
359     const std::string& credential_type,
360     const std::map<std::string, std::string>& per_worker_credential_types,
361     bool run_inproc, int32_t median_latency_collection_interval_millis) {
362   if (run_inproc) {
363     g_inproc_servers = new std::vector<grpc::testing::Server*>;
364   }
365   // Log everything from the driver
366   gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
367 
368   // ClientContext allocations (all are destroyed at scope exit)
369   list<ClientContext> contexts;
370   auto alloc_context = [](list<ClientContext>* contexts) {
371     contexts->emplace_back();
372     auto context = &contexts->back();
373     context->set_wait_for_ready(true);
374     return context;
375   };
376 
377   // To be added to the result, containing the final configuration used for
378   // client and config (including host, etc.)
379   ClientConfig result_client_config;
380 
381   // Get client, server lists; ignore if inproc test
382   auto workers = (!run_inproc) ? get_workers("QPS_WORKERS") : deque<string>();
383   ClientConfig client_config = initial_client_config;
384 
385   // Spawn some local workers if desired
386   vector<unique_ptr<QpsWorker>> local_workers;
387   for (int i = 0; i < abs(spawn_local_worker_count); i++) {
388     // act as if we're a new test -- gets a good rng seed
389     static bool called_init = false;
390     if (!called_init) {
391       char args_buf[100];
392       strcpy(args_buf, "some-benchmark");
393       char* args[] = {args_buf};
394       int argc = 1;
395       grpc_test_init(&argc, args);
396       called_init = true;
397     }
398 
399     char addr[256];
400     // we use port # of -1 to indicate inproc
401     int driver_port = (!run_inproc) ? grpc_pick_unused_port_or_die() : -1;
402     local_workers.emplace_back(new QpsWorker(driver_port, 0, credential_type));
403     sprintf(addr, "localhost:%d", driver_port);
404     if (spawn_local_worker_count < 0) {
405       workers.push_front(addr);
406     } else {
407       workers.push_back(addr);
408     }
409   }
410   GPR_ASSERT(!workers.empty());
411 
412   // if num_clients is set to <=0, do dynamic sizing: all workers
413   // except for servers are clients
414   if (num_clients <= 0) {
415     num_clients = workers.size() - num_servers;
416   }
417 
418   // TODO(ctiller): support running multiple configurations, and binpack
419   // client/server pairs
420   // to available workers
421   GPR_ASSERT(workers.size() >= num_clients + num_servers);
422 
423   // Trim to just what we need
424   workers.resize(num_clients + num_servers);
425 
426   // Start servers
427   std::vector<ServerData> servers(num_servers);
428   std::unordered_map<string, std::deque<int>> hosts_cores;
429   ChannelArguments channel_args;
430 
431   for (size_t i = 0; i < num_servers; i++) {
432     gpr_log(GPR_INFO, "Starting server on %s (worker #%" PRIuPTR ")",
433             workers[i].c_str(), i);
434     if (!run_inproc) {
435       servers[i].stub = WorkerService::NewStub(grpc::CreateTestChannel(
436           workers[i],
437           GetCredType(workers[i], per_worker_credential_types, credential_type),
438           nullptr /* call creds */, {} /* interceptor creators */));
439     } else {
440       servers[i].stub = WorkerService::NewStub(
441           local_workers[i]->InProcessChannel(channel_args));
442     }
443 
444     const ServerConfig& server_config = initial_server_config;
445     if (server_config.core_limit() != 0) {
446       grpc_core::Crash("server config core limit is set but ignored by driver");
447     }
448 
449     ServerArgs args;
450     *args.mutable_setup() = server_config;
451     servers[i].stream = servers[i].stub->RunServer(alloc_context(&contexts));
452     if (!servers[i].stream->Write(args)) {
453       grpc_core::Crash(
454           absl::StrFormat("Could not write args to server %zu", i));
455     }
456     ServerStatus init_status;
457     if (!servers[i].stream->Read(&init_status)) {
458       grpc_core::Crash(
459           absl::StrFormat("Server %zu did not yield initial status", i));
460     }
461     if (run_inproc) {
462       std::string cli_target(INPROC_NAME_PREFIX);
463       cli_target += std::to_string(i);
464       client_config.add_server_targets(cli_target);
465     } else {
466       std::string host = get_host(workers[i]);
467       std::string cli_target =
468           grpc_core::JoinHostPort(host.c_str(), init_status.port());
469       client_config.add_server_targets(cli_target.c_str());
470     }
471   }
472   if (qps_server_target_override.length() > 0) {
473     // overriding the qps server target only makes since if there is <= 1
474     // servers
475     GPR_ASSERT(num_servers <= 1);
476     client_config.clear_server_targets();
477     client_config.add_server_targets(qps_server_target_override);
478   }
479   client_config.set_median_latency_collection_interval_millis(
480       median_latency_collection_interval_millis);
481 
482   // Targets are all set by now
483   result_client_config = client_config;
484   // Start clients
485   std::vector<ClientData> clients(num_clients);
486   size_t channels_allocated = 0;
487   for (size_t i = 0; i < num_clients; i++) {
488     const auto& worker = workers[i + num_servers];
489     gpr_log(GPR_INFO, "Starting client on %s (worker #%" PRIuPTR ")",
490             worker.c_str(), i + num_servers);
491     if (!run_inproc) {
492       clients[i].stub = WorkerService::NewStub(grpc::CreateTestChannel(
493           worker,
494           GetCredType(worker, per_worker_credential_types, credential_type),
495           nullptr /* call creds */, {} /* interceptor creators */));
496     } else {
497       clients[i].stub = WorkerService::NewStub(
498           local_workers[i + num_servers]->InProcessChannel(channel_args));
499     }
500     ClientConfig per_client_config = client_config;
501 
502     if (initial_client_config.core_limit() != 0) {
503       grpc_core::Crash("client config core limit set but ignored");
504     }
505 
506     // Reduce channel count so that total channels specified is held regardless
507     // of the number of clients available
508     size_t num_channels =
509         (client_config.client_channels() - channels_allocated) /
510         (num_clients - i);
511     channels_allocated += num_channels;
512     gpr_log(GPR_DEBUG, "Client %" PRIdPTR " gets %" PRIdPTR " channels", i,
513             num_channels);
514     per_client_config.set_client_channels(num_channels);
515 
516     ClientArgs args;
517     *args.mutable_setup() = per_client_config;
518     clients[i].stream = clients[i].stub->RunClient(alloc_context(&contexts));
519     if (!clients[i].stream->Write(args)) {
520       grpc_core::Crash(
521           absl::StrFormat("Could not write args to client %zu", i));
522     }
523   }
524 
525   for (size_t i = 0; i < num_clients; i++) {
526     ClientStatus init_status;
527     if (!clients[i].stream->Read(&init_status)) {
528       grpc_core::Crash(
529           absl::StrFormat("Client %zu did not yield initial status", i));
530     }
531   }
532 
533   // Send an initial mark: clients can use this to know that everything is ready
534   // to start
535   gpr_log(GPR_INFO, "Initiating");
536   ServerArgs server_mark;
537   server_mark.mutable_mark()->set_reset(true);
538   ClientArgs client_mark;
539   client_mark.mutable_mark()->set_reset(true);
540   ServerStatus server_status;
541   ClientStatus client_status;
542   for (size_t i = 0; i < num_clients; i++) {
543     auto client = &clients[i];
544     if (!client->stream->Write(client_mark)) {
545       grpc_core::Crash(absl::StrFormat("Couldn't write mark to client %zu", i));
546     }
547   }
548   for (size_t i = 0; i < num_clients; i++) {
549     auto client = &clients[i];
550     if (!client->stream->Read(&client_status)) {
551       grpc_core::Crash(
552           absl::StrFormat("Couldn't get status from client %zu", i));
553     }
554   }
555 
556   // Let everything warmup
557   gpr_log(GPR_INFO, "Warming up");
558   gpr_timespec start = gpr_now(GPR_CLOCK_REALTIME);
559   gpr_sleep_until(
560       gpr_time_add(start, gpr_time_from_seconds(warmup_seconds, GPR_TIMESPAN)));
561 
562   // Start a run
563   gpr_log(GPR_INFO, "Starting");
564 
565   auto start_time = time(nullptr);
566 
567   for (size_t i = 0; i < num_servers; i++) {
568     auto server = &servers[i];
569     if (!server->stream->Write(server_mark)) {
570       grpc_core::Crash(absl::StrFormat("Couldn't write mark to server %zu", i));
571     }
572   }
573   for (size_t i = 0; i < num_clients; i++) {
574     auto client = &clients[i];
575     if (!client->stream->Write(client_mark)) {
576       grpc_core::Crash(absl::StrFormat("Couldn't write mark to client %zu", i));
577     }
578   }
579   for (size_t i = 0; i < num_servers; i++) {
580     auto server = &servers[i];
581     if (!server->stream->Read(&server_status)) {
582       grpc_core::Crash(
583           absl::StrFormat("Couldn't get status from server %zu", i));
584     }
585   }
586   for (size_t i = 0; i < num_clients; i++) {
587     auto client = &clients[i];
588     if (!client->stream->Read(&client_status)) {
589       grpc_core::Crash(
590           absl::StrFormat("Couldn't get status from client %zu", i));
591     }
592   }
593 
594   // Wait some time
595   gpr_log(GPR_INFO, "Running");
596   // Use gpr_sleep_until rather than this_thread::sleep_until to support
597   // compilers that don't work with this_thread
598   gpr_sleep_until(gpr_time_add(
599       start,
600       gpr_time_from_seconds(warmup_seconds + benchmark_seconds, GPR_TIMESPAN)));
601 
602   // Finish a run
603   std::unique_ptr<ScenarioResult> result(new ScenarioResult);
604   Histogram merged_latencies;
605   std::unordered_map<int, int64_t> merged_statuses;
606 
607   // For the case where clients lead the test such as UNARY and
608   // STREAMING_FROM_CLIENT, clients need to finish completely while a server
609   // is running to prevent the clients from being stuck while waiting for
610   // the result.
611   bool client_finish_first =
612       (client_config.rpc_type() != STREAMING_FROM_SERVER);
613 
614   auto end_time = time(nullptr);
615 
616   FinishClients(clients, client_mark);
617 
618   if (!client_finish_first) {
619     FinishServers(servers, server_mark);
620   }
621 
622   ReceiveFinalStatusFromClients(clients, merged_latencies, merged_statuses,
623                                 *result);
624   ShutdownClients(clients, *result);
625 
626   if (client_finish_first) {
627     FinishServers(servers, server_mark);
628   }
629 
630   ReceiveFinalStatusFromServer(servers, *result);
631   ShutdownServers(servers, *result);
632 
633   delete g_inproc_servers;
634 
635   merged_latencies.FillProto(result->mutable_latencies());
636   for (std::unordered_map<int, int64_t>::iterator it = merged_statuses.begin();
637        it != merged_statuses.end(); ++it) {
638     RequestResultCount* rrc = result->add_request_results();
639     rrc->set_status_code(it->first);
640     rrc->set_count(it->second);
641   }
642 
643   // Fill in start and end time for the test scenario
644   result->mutable_summary()->mutable_start_time()->set_seconds(start_time);
645   result->mutable_summary()->mutable_end_time()->set_seconds(end_time);
646 
647   postprocess_scenario_result(result.get());
648   return result;
649 }
650 
RunQuit(const std::string & credential_type,const std::map<std::string,std::string> & per_worker_credential_types)651 bool RunQuit(
652     const std::string& credential_type,
653     const std::map<std::string, std::string>& per_worker_credential_types) {
654   // Get client, server lists
655   bool result = true;
656   auto workers = get_workers("QPS_WORKERS");
657   if (workers.empty()) {
658     return false;
659   }
660 
661   for (size_t i = 0; i < workers.size(); i++) {
662     auto stub = WorkerService::NewStub(grpc::CreateTestChannel(
663         workers[i],
664         GetCredType(workers[i], per_worker_credential_types, credential_type),
665         nullptr /* call creds */, {} /* interceptor creators */));
666     Void phony;
667     grpc::ClientContext ctx;
668     ctx.set_wait_for_ready(true);
669     Status s = stub->QuitWorker(&ctx, phony, &phony);
670     if (!s.ok()) {
671       gpr_log(GPR_ERROR, "Worker %zu could not be properly quit because %s", i,
672               s.error_message().c_str());
673       result = false;
674     }
675   }
676   return result;
677 }
678 
679 }  // namespace testing
680 }  // namespace grpc
681