xref: /aosp_15_r20/external/grpc-grpc/test/cpp/qps/driver.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1*cc02d7e2SAndroid Build Coastguard Worker //
2*cc02d7e2SAndroid Build Coastguard Worker //
3*cc02d7e2SAndroid Build Coastguard Worker // Copyright 2015 gRPC authors.
4*cc02d7e2SAndroid Build Coastguard Worker //
5*cc02d7e2SAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License");
6*cc02d7e2SAndroid Build Coastguard Worker // you may not use this file except in compliance with the License.
7*cc02d7e2SAndroid Build Coastguard Worker // You may obtain a copy of the License at
8*cc02d7e2SAndroid Build Coastguard Worker //
9*cc02d7e2SAndroid Build Coastguard Worker //     http://www.apache.org/licenses/LICENSE-2.0
10*cc02d7e2SAndroid Build Coastguard Worker //
11*cc02d7e2SAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
12*cc02d7e2SAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS,
13*cc02d7e2SAndroid Build Coastguard Worker // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14*cc02d7e2SAndroid Build Coastguard Worker // See the License for the specific language governing permissions and
15*cc02d7e2SAndroid Build Coastguard Worker // limitations under the License.
16*cc02d7e2SAndroid Build Coastguard Worker //
17*cc02d7e2SAndroid Build Coastguard Worker //
18*cc02d7e2SAndroid Build Coastguard Worker 
19*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/qps/driver.h"
20*cc02d7e2SAndroid Build Coastguard Worker 
21*cc02d7e2SAndroid Build Coastguard Worker #include <cinttypes>
22*cc02d7e2SAndroid Build Coastguard Worker #include <deque>
23*cc02d7e2SAndroid Build Coastguard Worker #include <list>
24*cc02d7e2SAndroid Build Coastguard Worker #include <thread>
25*cc02d7e2SAndroid Build Coastguard Worker #include <unordered_map>
26*cc02d7e2SAndroid Build Coastguard Worker #include <vector>
27*cc02d7e2SAndroid Build Coastguard Worker 
28*cc02d7e2SAndroid Build Coastguard Worker #include "google/protobuf/timestamp.pb.h"
29*cc02d7e2SAndroid Build Coastguard Worker 
30*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/support/alloc.h>
31*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/support/log.h>
32*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/support/string_util.h>
33*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/channel.h>
34*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/client_context.h>
35*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/create_channel.h>
36*cc02d7e2SAndroid Build Coastguard Worker 
37*cc02d7e2SAndroid Build Coastguard Worker #include "src/core/lib/gprpp/crash.h"
38*cc02d7e2SAndroid Build Coastguard Worker #include "src/core/lib/gprpp/env.h"
39*cc02d7e2SAndroid Build Coastguard Worker #include "src/core/lib/gprpp/host_port.h"
40*cc02d7e2SAndroid Build Coastguard Worker #include "src/proto/grpc/testing/worker_service.grpc.pb.h"
41*cc02d7e2SAndroid Build Coastguard Worker #include "test/core/util/port.h"
42*cc02d7e2SAndroid Build Coastguard Worker #include "test/core/util/test_config.h"
43*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/qps/client.h"
44*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/qps/histogram.h"
45*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/qps/qps_worker.h"
46*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/qps/stats.h"
47*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/util/test_credentials_provider.h"
48*cc02d7e2SAndroid Build Coastguard Worker 
49*cc02d7e2SAndroid Build Coastguard Worker using std::deque;
50*cc02d7e2SAndroid Build Coastguard Worker using std::list;
51*cc02d7e2SAndroid Build Coastguard Worker using std::unique_ptr;
52*cc02d7e2SAndroid Build Coastguard Worker using std::vector;
53*cc02d7e2SAndroid Build Coastguard Worker 
54*cc02d7e2SAndroid Build Coastguard Worker namespace grpc {
55*cc02d7e2SAndroid Build Coastguard Worker namespace testing {
get_host(const std::string & worker)56*cc02d7e2SAndroid Build Coastguard Worker static std::string get_host(const std::string& worker) {
57*cc02d7e2SAndroid Build Coastguard Worker   absl::string_view host;
58*cc02d7e2SAndroid Build Coastguard Worker   absl::string_view port;
59*cc02d7e2SAndroid Build Coastguard Worker   grpc_core::SplitHostPort(worker.c_str(), &host, &port);
60*cc02d7e2SAndroid Build Coastguard Worker   return std::string(host.data(), host.size());
61*cc02d7e2SAndroid Build Coastguard Worker }
62*cc02d7e2SAndroid Build Coastguard Worker 
get_workers(const string & env_name)63*cc02d7e2SAndroid Build Coastguard Worker static deque<string> get_workers(const string& env_name) {
64*cc02d7e2SAndroid Build Coastguard Worker   deque<string> out;
65*cc02d7e2SAndroid Build Coastguard Worker   auto env = grpc_core::GetEnv(env_name.c_str()).value_or("");
66*cc02d7e2SAndroid Build Coastguard Worker   const char* p = env.c_str();
67*cc02d7e2SAndroid Build Coastguard Worker   if (!env.empty()) {
68*cc02d7e2SAndroid Build Coastguard Worker     for (;;) {
69*cc02d7e2SAndroid Build Coastguard Worker       const char* comma = strchr(p, ',');
70*cc02d7e2SAndroid Build Coastguard Worker       if (comma) {
71*cc02d7e2SAndroid Build Coastguard Worker         out.emplace_back(p, comma);
72*cc02d7e2SAndroid Build Coastguard Worker         p = comma + 1;
73*cc02d7e2SAndroid Build Coastguard Worker       } else {
74*cc02d7e2SAndroid Build Coastguard Worker         out.emplace_back(p);
75*cc02d7e2SAndroid Build Coastguard Worker         break;
76*cc02d7e2SAndroid Build Coastguard Worker       }
77*cc02d7e2SAndroid Build Coastguard Worker     }
78*cc02d7e2SAndroid Build Coastguard Worker   }
79*cc02d7e2SAndroid Build Coastguard Worker   if (out.empty()) {
80*cc02d7e2SAndroid Build Coastguard Worker     gpr_log(GPR_ERROR,
81*cc02d7e2SAndroid Build Coastguard Worker             "Environment variable \"%s\" does not contain a list of QPS "
82*cc02d7e2SAndroid Build Coastguard Worker             "workers to use. Set it to a comma-separated list of "
83*cc02d7e2SAndroid Build Coastguard Worker             "hostname:port pairs, starting with hosts that should act as "
84*cc02d7e2SAndroid Build Coastguard Worker             "servers. E.g. export "
85*cc02d7e2SAndroid Build Coastguard Worker             "%s=\"serverhost1:1234,clienthost1:1234,clienthost2:1234\"",
86*cc02d7e2SAndroid Build Coastguard Worker             env_name.c_str(), env_name.c_str());
87*cc02d7e2SAndroid Build Coastguard Worker   }
88*cc02d7e2SAndroid Build Coastguard Worker   return out;
89*cc02d7e2SAndroid Build Coastguard Worker }
90*cc02d7e2SAndroid Build Coastguard Worker 
GetCredType(const std::string & worker_addr,const std::map<std::string,std::string> & per_worker_credential_types,const std::string & credential_type)91*cc02d7e2SAndroid Build Coastguard Worker std::string GetCredType(
92*cc02d7e2SAndroid Build Coastguard Worker     const std::string& worker_addr,
93*cc02d7e2SAndroid Build Coastguard Worker     const std::map<std::string, std::string>& per_worker_credential_types,
94*cc02d7e2SAndroid Build Coastguard Worker     const std::string& credential_type) {
95*cc02d7e2SAndroid Build Coastguard Worker   auto it = per_worker_credential_types.find(worker_addr);
96*cc02d7e2SAndroid Build Coastguard Worker   if (it != per_worker_credential_types.end()) {
97*cc02d7e2SAndroid Build Coastguard Worker     return it->second;
98*cc02d7e2SAndroid Build Coastguard Worker   }
99*cc02d7e2SAndroid Build Coastguard Worker   return credential_type;
100*cc02d7e2SAndroid Build Coastguard Worker }
101*cc02d7e2SAndroid Build Coastguard Worker 
102*cc02d7e2SAndroid Build Coastguard Worker // helpers for postprocess_scenario_result
WallTime(const ClientStats & s)103*cc02d7e2SAndroid Build Coastguard Worker static double WallTime(const ClientStats& s) { return s.time_elapsed(); }
SystemTime(const ClientStats & s)104*cc02d7e2SAndroid Build Coastguard Worker static double SystemTime(const ClientStats& s) { return s.time_system(); }
UserTime(const ClientStats & s)105*cc02d7e2SAndroid Build Coastguard Worker static double UserTime(const ClientStats& s) { return s.time_user(); }
CliPollCount(const ClientStats & s)106*cc02d7e2SAndroid Build Coastguard Worker static double CliPollCount(const ClientStats& s) { return s.cq_poll_count(); }
SvrPollCount(const ServerStats & s)107*cc02d7e2SAndroid Build Coastguard Worker static double SvrPollCount(const ServerStats& s) { return s.cq_poll_count(); }
ServerSystemTime(const ServerStats & s)108*cc02d7e2SAndroid Build Coastguard Worker static double ServerSystemTime(const ServerStats& s) { return s.time_system(); }
ServerUserTime(const ServerStats & s)109*cc02d7e2SAndroid Build Coastguard Worker static double ServerUserTime(const ServerStats& s) { return s.time_user(); }
ServerTotalCpuTime(const ServerStats & s)110*cc02d7e2SAndroid Build Coastguard Worker static double ServerTotalCpuTime(const ServerStats& s) {
111*cc02d7e2SAndroid Build Coastguard Worker   return s.total_cpu_time();
112*cc02d7e2SAndroid Build Coastguard Worker }
ServerIdleCpuTime(const ServerStats & s)113*cc02d7e2SAndroid Build Coastguard Worker static double ServerIdleCpuTime(const ServerStats& s) {
114*cc02d7e2SAndroid Build Coastguard Worker   return s.idle_cpu_time();
115*cc02d7e2SAndroid Build Coastguard Worker }
Cores(int n)116*cc02d7e2SAndroid Build Coastguard Worker static int Cores(int n) { return n; }
117*cc02d7e2SAndroid Build Coastguard Worker 
IsSuccess(const Status & s)118*cc02d7e2SAndroid Build Coastguard Worker static bool IsSuccess(const Status& s) {
119*cc02d7e2SAndroid Build Coastguard Worker   if (s.ok()) return true;
120*cc02d7e2SAndroid Build Coastguard Worker   // Since we shutdown servers and clients at the same time, they both can
121*cc02d7e2SAndroid Build Coastguard Worker   // observe cancellation.  Thus, we consider CANCELLED as good status.
122*cc02d7e2SAndroid Build Coastguard Worker   if (static_cast<StatusCode>(s.error_code()) == StatusCode::CANCELLED) {
123*cc02d7e2SAndroid Build Coastguard Worker     return true;
124*cc02d7e2SAndroid Build Coastguard Worker   }
125*cc02d7e2SAndroid Build Coastguard Worker   // Since we shutdown servers and clients at the same time, server can close
126*cc02d7e2SAndroid Build Coastguard Worker   // the socket before the client attempts to do that, and vice versa.  Thus
127*cc02d7e2SAndroid Build Coastguard Worker   // receiving a "Socket closed" error is fine.
128*cc02d7e2SAndroid Build Coastguard Worker   if (s.error_message() == "Socket closed") return true;
129*cc02d7e2SAndroid Build Coastguard Worker   return false;
130*cc02d7e2SAndroid Build Coastguard Worker }
131*cc02d7e2SAndroid Build Coastguard Worker 
132*cc02d7e2SAndroid Build Coastguard Worker // Postprocess ScenarioResult and populate result summary.
postprocess_scenario_result(ScenarioResult * result)133*cc02d7e2SAndroid Build Coastguard Worker static void postprocess_scenario_result(ScenarioResult* result) {
134*cc02d7e2SAndroid Build Coastguard Worker   // Get latencies from ScenarioResult latencies histogram and populate to
135*cc02d7e2SAndroid Build Coastguard Worker   // result summary.
136*cc02d7e2SAndroid Build Coastguard Worker   Histogram histogram;
137*cc02d7e2SAndroid Build Coastguard Worker   histogram.MergeProto(result->latencies());
138*cc02d7e2SAndroid Build Coastguard Worker   result->mutable_summary()->set_latency_50(histogram.Percentile(50));
139*cc02d7e2SAndroid Build Coastguard Worker   result->mutable_summary()->set_latency_90(histogram.Percentile(90));
140*cc02d7e2SAndroid Build Coastguard Worker   result->mutable_summary()->set_latency_95(histogram.Percentile(95));
141*cc02d7e2SAndroid Build Coastguard Worker   result->mutable_summary()->set_latency_99(histogram.Percentile(99));
142*cc02d7e2SAndroid Build Coastguard Worker   result->mutable_summary()->set_latency_999(histogram.Percentile(99.9));
143*cc02d7e2SAndroid Build Coastguard Worker 
144*cc02d7e2SAndroid Build Coastguard Worker   // Calculate qps and cpu load for each client and then aggregate results for
145*cc02d7e2SAndroid Build Coastguard Worker   // all clients
146*cc02d7e2SAndroid Build Coastguard Worker   double qps = 0;
147*cc02d7e2SAndroid Build Coastguard Worker   double client_system_cpu_load = 0, client_user_cpu_load = 0;
148*cc02d7e2SAndroid Build Coastguard Worker   for (int i = 0; i < result->client_stats_size(); i++) {
149*cc02d7e2SAndroid Build Coastguard Worker     auto client_stat = result->client_stats(i);
150*cc02d7e2SAndroid Build Coastguard Worker     qps += client_stat.latencies().count() / client_stat.time_elapsed();
151*cc02d7e2SAndroid Build Coastguard Worker     client_system_cpu_load +=
152*cc02d7e2SAndroid Build Coastguard Worker         client_stat.time_system() / client_stat.time_elapsed();
153*cc02d7e2SAndroid Build Coastguard Worker     client_user_cpu_load +=
154*cc02d7e2SAndroid Build Coastguard Worker         client_stat.time_user() / client_stat.time_elapsed();
155*cc02d7e2SAndroid Build Coastguard Worker   }
156*cc02d7e2SAndroid Build Coastguard Worker   // Calculate cpu load for each server and then aggregate results for all
157*cc02d7e2SAndroid Build Coastguard Worker   // servers
158*cc02d7e2SAndroid Build Coastguard Worker   double server_system_cpu_load = 0, server_user_cpu_load = 0;
159*cc02d7e2SAndroid Build Coastguard Worker   for (int i = 0; i < result->server_stats_size(); i++) {
160*cc02d7e2SAndroid Build Coastguard Worker     auto server_stat = result->server_stats(i);
161*cc02d7e2SAndroid Build Coastguard Worker     server_system_cpu_load +=
162*cc02d7e2SAndroid Build Coastguard Worker         server_stat.time_system() / server_stat.time_elapsed();
163*cc02d7e2SAndroid Build Coastguard Worker     server_user_cpu_load +=
164*cc02d7e2SAndroid Build Coastguard Worker         server_stat.time_user() / server_stat.time_elapsed();
165*cc02d7e2SAndroid Build Coastguard Worker   }
166*cc02d7e2SAndroid Build Coastguard Worker   result->mutable_summary()->set_qps(qps);
167*cc02d7e2SAndroid Build Coastguard Worker   // Populate the percentage of cpu load to result summary.
168*cc02d7e2SAndroid Build Coastguard Worker   result->mutable_summary()->set_server_system_time(100 *
169*cc02d7e2SAndroid Build Coastguard Worker                                                     server_system_cpu_load);
170*cc02d7e2SAndroid Build Coastguard Worker   result->mutable_summary()->set_server_user_time(100 * server_user_cpu_load);
171*cc02d7e2SAndroid Build Coastguard Worker   result->mutable_summary()->set_client_system_time(100 *
172*cc02d7e2SAndroid Build Coastguard Worker                                                     client_system_cpu_load);
173*cc02d7e2SAndroid Build Coastguard Worker   result->mutable_summary()->set_client_user_time(100 * client_user_cpu_load);
174*cc02d7e2SAndroid Build Coastguard Worker 
175*cc02d7e2SAndroid Build Coastguard Worker   // For Non-linux platform, get_cpu_usage() is not implemented. Thus,
176*cc02d7e2SAndroid Build Coastguard Worker   // ServerTotalCpuTime and ServerIdleCpuTime are both 0.
177*cc02d7e2SAndroid Build Coastguard Worker   if (average(result->server_stats(), ServerTotalCpuTime) == 0) {
178*cc02d7e2SAndroid Build Coastguard Worker     result->mutable_summary()->set_server_cpu_usage(0);
179*cc02d7e2SAndroid Build Coastguard Worker   } else {
180*cc02d7e2SAndroid Build Coastguard Worker     auto server_cpu_usage =
181*cc02d7e2SAndroid Build Coastguard Worker         100 - 100 * average(result->server_stats(), ServerIdleCpuTime) /
182*cc02d7e2SAndroid Build Coastguard Worker                   average(result->server_stats(), ServerTotalCpuTime);
183*cc02d7e2SAndroid Build Coastguard Worker     result->mutable_summary()->set_server_cpu_usage(server_cpu_usage);
184*cc02d7e2SAndroid Build Coastguard Worker   }
185*cc02d7e2SAndroid Build Coastguard Worker 
186*cc02d7e2SAndroid Build Coastguard Worker   // Calculate and populate successful request per second and failed requests
187*cc02d7e2SAndroid Build Coastguard Worker   // per seconds to result summary.
188*cc02d7e2SAndroid Build Coastguard Worker   auto time_estimate = average(result->client_stats(), WallTime);
189*cc02d7e2SAndroid Build Coastguard Worker   if (result->request_results_size() > 0) {
190*cc02d7e2SAndroid Build Coastguard Worker     int64_t successes = 0;
191*cc02d7e2SAndroid Build Coastguard Worker     int64_t failures = 0;
192*cc02d7e2SAndroid Build Coastguard Worker     for (int i = 0; i < result->request_results_size(); i++) {
193*cc02d7e2SAndroid Build Coastguard Worker       const RequestResultCount& rrc = result->request_results(i);
194*cc02d7e2SAndroid Build Coastguard Worker       if (rrc.status_code() == 0) {
195*cc02d7e2SAndroid Build Coastguard Worker         successes += rrc.count();
196*cc02d7e2SAndroid Build Coastguard Worker       } else {
197*cc02d7e2SAndroid Build Coastguard Worker         failures += rrc.count();
198*cc02d7e2SAndroid Build Coastguard Worker       }
199*cc02d7e2SAndroid Build Coastguard Worker     }
200*cc02d7e2SAndroid Build Coastguard Worker     result->mutable_summary()->set_successful_requests_per_second(
201*cc02d7e2SAndroid Build Coastguard Worker         successes / time_estimate);
202*cc02d7e2SAndroid Build Coastguard Worker     result->mutable_summary()->set_failed_requests_per_second(failures /
203*cc02d7e2SAndroid Build Coastguard Worker                                                               time_estimate);
204*cc02d7e2SAndroid Build Coastguard Worker   }
205*cc02d7e2SAndroid Build Coastguard Worker 
206*cc02d7e2SAndroid Build Coastguard Worker   // Fill in data for other metrics required in result summary
207*cc02d7e2SAndroid Build Coastguard Worker   auto qps_per_server_core = qps / sum(result->server_cores(), Cores);
208*cc02d7e2SAndroid Build Coastguard Worker   result->mutable_summary()->set_qps_per_server_core(qps_per_server_core);
209*cc02d7e2SAndroid Build Coastguard Worker   result->mutable_summary()->set_client_polls_per_request(
210*cc02d7e2SAndroid Build Coastguard Worker       sum(result->client_stats(), CliPollCount) / histogram.Count());
211*cc02d7e2SAndroid Build Coastguard Worker   result->mutable_summary()->set_server_polls_per_request(
212*cc02d7e2SAndroid Build Coastguard Worker       sum(result->server_stats(), SvrPollCount) / histogram.Count());
213*cc02d7e2SAndroid Build Coastguard Worker 
214*cc02d7e2SAndroid Build Coastguard Worker   auto server_queries_per_cpu_sec =
215*cc02d7e2SAndroid Build Coastguard Worker       histogram.Count() / (sum(result->server_stats(), ServerSystemTime) +
216*cc02d7e2SAndroid Build Coastguard Worker                            sum(result->server_stats(), ServerUserTime));
217*cc02d7e2SAndroid Build Coastguard Worker   auto client_queries_per_cpu_sec =
218*cc02d7e2SAndroid Build Coastguard Worker       histogram.Count() / (sum(result->client_stats(), SystemTime) +
219*cc02d7e2SAndroid Build Coastguard Worker                            sum(result->client_stats(), UserTime));
220*cc02d7e2SAndroid Build Coastguard Worker 
221*cc02d7e2SAndroid Build Coastguard Worker   result->mutable_summary()->set_server_queries_per_cpu_sec(
222*cc02d7e2SAndroid Build Coastguard Worker       server_queries_per_cpu_sec);
223*cc02d7e2SAndroid Build Coastguard Worker   result->mutable_summary()->set_client_queries_per_cpu_sec(
224*cc02d7e2SAndroid Build Coastguard Worker       client_queries_per_cpu_sec);
225*cc02d7e2SAndroid Build Coastguard Worker }
226*cc02d7e2SAndroid Build Coastguard Worker 
227*cc02d7e2SAndroid Build Coastguard Worker struct ClientData {
228*cc02d7e2SAndroid Build Coastguard Worker   unique_ptr<WorkerService::Stub> stub;
229*cc02d7e2SAndroid Build Coastguard Worker   unique_ptr<ClientReaderWriter<ClientArgs, ClientStatus>> stream;
230*cc02d7e2SAndroid Build Coastguard Worker };
231*cc02d7e2SAndroid Build Coastguard Worker 
232*cc02d7e2SAndroid Build Coastguard Worker struct ServerData {
233*cc02d7e2SAndroid Build Coastguard Worker   unique_ptr<WorkerService::Stub> stub;
234*cc02d7e2SAndroid Build Coastguard Worker   unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream;
235*cc02d7e2SAndroid Build Coastguard Worker };
236*cc02d7e2SAndroid Build Coastguard Worker 
FinishClients(const std::vector<ClientData> & clients,const ClientArgs & client_mark)237*cc02d7e2SAndroid Build Coastguard Worker static void FinishClients(const std::vector<ClientData>& clients,
238*cc02d7e2SAndroid Build Coastguard Worker                           const ClientArgs& client_mark) {
239*cc02d7e2SAndroid Build Coastguard Worker   gpr_log(GPR_INFO, "Finishing clients");
240*cc02d7e2SAndroid Build Coastguard Worker   for (size_t i = 0, i_end = clients.size(); i < i_end; i++) {
241*cc02d7e2SAndroid Build Coastguard Worker     auto client = &clients[i];
242*cc02d7e2SAndroid Build Coastguard Worker     if (!client->stream->Write(client_mark)) {
243*cc02d7e2SAndroid Build Coastguard Worker       grpc_core::Crash(absl::StrFormat("Couldn't write mark to client %zu", i));
244*cc02d7e2SAndroid Build Coastguard Worker     }
245*cc02d7e2SAndroid Build Coastguard Worker     if (!client->stream->WritesDone()) {
246*cc02d7e2SAndroid Build Coastguard Worker       grpc_core::Crash(absl::StrFormat("Failed WritesDone for client %zu", i));
247*cc02d7e2SAndroid Build Coastguard Worker     }
248*cc02d7e2SAndroid Build Coastguard Worker   }
249*cc02d7e2SAndroid Build Coastguard Worker }
250*cc02d7e2SAndroid Build Coastguard Worker 
ReceiveFinalStatusFromClients(const std::vector<ClientData> & clients,Histogram & merged_latencies,std::unordered_map<int,int64_t> & merged_statuses,ScenarioResult & result)251*cc02d7e2SAndroid Build Coastguard Worker static void ReceiveFinalStatusFromClients(
252*cc02d7e2SAndroid Build Coastguard Worker     const std::vector<ClientData>& clients, Histogram& merged_latencies,
253*cc02d7e2SAndroid Build Coastguard Worker     std::unordered_map<int, int64_t>& merged_statuses, ScenarioResult& result) {
254*cc02d7e2SAndroid Build Coastguard Worker   gpr_log(GPR_INFO, "Receiving final status from clients");
255*cc02d7e2SAndroid Build Coastguard Worker   ClientStatus client_status;
256*cc02d7e2SAndroid Build Coastguard Worker   for (size_t i = 0, i_end = clients.size(); i < i_end; i++) {
257*cc02d7e2SAndroid Build Coastguard Worker     auto client = &clients[i];
258*cc02d7e2SAndroid Build Coastguard Worker     // Read the client final status
259*cc02d7e2SAndroid Build Coastguard Worker     if (client->stream->Read(&client_status)) {
260*cc02d7e2SAndroid Build Coastguard Worker       gpr_log(GPR_INFO, "Received final status from client %zu", i);
261*cc02d7e2SAndroid Build Coastguard Worker       const auto& stats = client_status.stats();
262*cc02d7e2SAndroid Build Coastguard Worker       merged_latencies.MergeProto(stats.latencies());
263*cc02d7e2SAndroid Build Coastguard Worker       for (int i = 0; i < stats.request_results_size(); i++) {
264*cc02d7e2SAndroid Build Coastguard Worker         merged_statuses[stats.request_results(i).status_code()] +=
265*cc02d7e2SAndroid Build Coastguard Worker             stats.request_results(i).count();
266*cc02d7e2SAndroid Build Coastguard Worker       }
267*cc02d7e2SAndroid Build Coastguard Worker       result.add_client_stats()->CopyFrom(stats);
268*cc02d7e2SAndroid Build Coastguard Worker       // Check that final status was should be the last message on the client
269*cc02d7e2SAndroid Build Coastguard Worker       // stream.
270*cc02d7e2SAndroid Build Coastguard Worker       // TODO(jtattermusch): note that that waiting for Read to return can take
271*cc02d7e2SAndroid Build Coastguard Worker       // long on some scenarios (e.g. unconstrained streaming_from_server). See
272*cc02d7e2SAndroid Build Coastguard Worker       // https://github.com/grpc/grpc/blob/3bd0cd208ea549760a2daf595f79b91b247fe240/test/cpp/qps/server_async.cc#L176
273*cc02d7e2SAndroid Build Coastguard Worker       // where the shutdown delay pretty much determines the wait here.
274*cc02d7e2SAndroid Build Coastguard Worker       GPR_ASSERT(!client->stream->Read(&client_status));
275*cc02d7e2SAndroid Build Coastguard Worker     } else {
276*cc02d7e2SAndroid Build Coastguard Worker       grpc_core::Crash(
277*cc02d7e2SAndroid Build Coastguard Worker           absl::StrFormat("Couldn't get final status from client %zu", i));
278*cc02d7e2SAndroid Build Coastguard Worker     }
279*cc02d7e2SAndroid Build Coastguard Worker   }
280*cc02d7e2SAndroid Build Coastguard Worker }
281*cc02d7e2SAndroid Build Coastguard Worker 
ShutdownClients(const std::vector<ClientData> & clients,ScenarioResult & result)282*cc02d7e2SAndroid Build Coastguard Worker static void ShutdownClients(const std::vector<ClientData>& clients,
283*cc02d7e2SAndroid Build Coastguard Worker                             ScenarioResult& result) {
284*cc02d7e2SAndroid Build Coastguard Worker   gpr_log(GPR_INFO, "Shutdown clients");
285*cc02d7e2SAndroid Build Coastguard Worker   for (size_t i = 0, i_end = clients.size(); i < i_end; i++) {
286*cc02d7e2SAndroid Build Coastguard Worker     auto client = &clients[i];
287*cc02d7e2SAndroid Build Coastguard Worker     Status s = client->stream->Finish();
288*cc02d7e2SAndroid Build Coastguard Worker     // Since we shutdown servers and clients at the same time, clients can
289*cc02d7e2SAndroid Build Coastguard Worker     // observe cancellation.  Thus, we consider both OK and CANCELLED as good
290*cc02d7e2SAndroid Build Coastguard Worker     // status.
291*cc02d7e2SAndroid Build Coastguard Worker     const bool success = IsSuccess(s);
292*cc02d7e2SAndroid Build Coastguard Worker     result.add_client_success(success);
293*cc02d7e2SAndroid Build Coastguard Worker     if (!success) {
294*cc02d7e2SAndroid Build Coastguard Worker       grpc_core::Crash(absl::StrFormat("Client %zu had an error %s", i,
295*cc02d7e2SAndroid Build Coastguard Worker                                        s.error_message().c_str()));
296*cc02d7e2SAndroid Build Coastguard Worker     }
297*cc02d7e2SAndroid Build Coastguard Worker   }
298*cc02d7e2SAndroid Build Coastguard Worker }
299*cc02d7e2SAndroid Build Coastguard Worker 
FinishServers(const std::vector<ServerData> & servers,const ServerArgs & server_mark)300*cc02d7e2SAndroid Build Coastguard Worker static void FinishServers(const std::vector<ServerData>& servers,
301*cc02d7e2SAndroid Build Coastguard Worker                           const ServerArgs& server_mark) {
302*cc02d7e2SAndroid Build Coastguard Worker   gpr_log(GPR_INFO, "Finishing servers");
303*cc02d7e2SAndroid Build Coastguard Worker   for (size_t i = 0, i_end = servers.size(); i < i_end; i++) {
304*cc02d7e2SAndroid Build Coastguard Worker     auto server = &servers[i];
305*cc02d7e2SAndroid Build Coastguard Worker     if (!server->stream->Write(server_mark)) {
306*cc02d7e2SAndroid Build Coastguard Worker       grpc_core::Crash(absl::StrFormat("Couldn't write mark to server %zu", i));
307*cc02d7e2SAndroid Build Coastguard Worker     }
308*cc02d7e2SAndroid Build Coastguard Worker     if (!server->stream->WritesDone()) {
309*cc02d7e2SAndroid Build Coastguard Worker       grpc_core::Crash(absl::StrFormat("Failed WritesDone for server %zu", i));
310*cc02d7e2SAndroid Build Coastguard Worker     }
311*cc02d7e2SAndroid Build Coastguard Worker   }
312*cc02d7e2SAndroid Build Coastguard Worker }
313*cc02d7e2SAndroid Build Coastguard Worker 
ReceiveFinalStatusFromServer(const std::vector<ServerData> & servers,ScenarioResult & result)314*cc02d7e2SAndroid Build Coastguard Worker static void ReceiveFinalStatusFromServer(const std::vector<ServerData>& servers,
315*cc02d7e2SAndroid Build Coastguard Worker                                          ScenarioResult& result) {
316*cc02d7e2SAndroid Build Coastguard Worker   gpr_log(GPR_INFO, "Receiving final status from servers");
317*cc02d7e2SAndroid Build Coastguard Worker   ServerStatus server_status;
318*cc02d7e2SAndroid Build Coastguard Worker   for (size_t i = 0, i_end = servers.size(); i < i_end; i++) {
319*cc02d7e2SAndroid Build Coastguard Worker     auto server = &servers[i];
320*cc02d7e2SAndroid Build Coastguard Worker     // Read the server final status
321*cc02d7e2SAndroid Build Coastguard Worker     if (server->stream->Read(&server_status)) {
322*cc02d7e2SAndroid Build Coastguard Worker       gpr_log(GPR_INFO, "Received final status from server %zu", i);
323*cc02d7e2SAndroid Build Coastguard Worker       result.add_server_stats()->CopyFrom(server_status.stats());
324*cc02d7e2SAndroid Build Coastguard Worker       result.add_server_cores(server_status.cores());
325*cc02d7e2SAndroid Build Coastguard Worker       // That final status should be the last message on the server stream
326*cc02d7e2SAndroid Build Coastguard Worker       GPR_ASSERT(!server->stream->Read(&server_status));
327*cc02d7e2SAndroid Build Coastguard Worker     } else {
328*cc02d7e2SAndroid Build Coastguard Worker       grpc_core::Crash(
329*cc02d7e2SAndroid Build Coastguard Worker           absl::StrFormat("Couldn't get final status from server %zu", i));
330*cc02d7e2SAndroid Build Coastguard Worker     }
331*cc02d7e2SAndroid Build Coastguard Worker   }
332*cc02d7e2SAndroid Build Coastguard Worker }
333*cc02d7e2SAndroid Build Coastguard Worker 
ShutdownServers(const std::vector<ServerData> & servers,ScenarioResult & result)334*cc02d7e2SAndroid Build Coastguard Worker static void ShutdownServers(const std::vector<ServerData>& servers,
335*cc02d7e2SAndroid Build Coastguard Worker                             ScenarioResult& result) {
336*cc02d7e2SAndroid Build Coastguard Worker   gpr_log(GPR_INFO, "Shutdown servers");
337*cc02d7e2SAndroid Build Coastguard Worker   for (size_t i = 0, i_end = servers.size(); i < i_end; i++) {
338*cc02d7e2SAndroid Build Coastguard Worker     auto server = &servers[i];
339*cc02d7e2SAndroid Build Coastguard Worker     Status s = server->stream->Finish();
340*cc02d7e2SAndroid Build Coastguard Worker     // Since we shutdown servers and clients at the same time, servers can
341*cc02d7e2SAndroid Build Coastguard Worker     // observe cancellation.  Thus, we consider both OK and CANCELLED as good
342*cc02d7e2SAndroid Build Coastguard Worker     // status.
343*cc02d7e2SAndroid Build Coastguard Worker     const bool success = IsSuccess(s);
344*cc02d7e2SAndroid Build Coastguard Worker     result.add_server_success(success);
345*cc02d7e2SAndroid Build Coastguard Worker     if (!success) {
346*cc02d7e2SAndroid Build Coastguard Worker       grpc_core::Crash(absl::StrFormat("Server %zu had an error %s", i,
347*cc02d7e2SAndroid Build Coastguard Worker                                        s.error_message().c_str()));
348*cc02d7e2SAndroid Build Coastguard Worker     }
349*cc02d7e2SAndroid Build Coastguard Worker   }
350*cc02d7e2SAndroid Build Coastguard Worker }
351*cc02d7e2SAndroid Build Coastguard Worker 
352*cc02d7e2SAndroid Build Coastguard Worker std::vector<grpc::testing::Server*>* g_inproc_servers = nullptr;
353*cc02d7e2SAndroid Build Coastguard Worker 
RunScenario(const ClientConfig & initial_client_config,size_t num_clients,const ServerConfig & initial_server_config,size_t num_servers,int warmup_seconds,int benchmark_seconds,int spawn_local_worker_count,const std::string & qps_server_target_override,const std::string & credential_type,const std::map<std::string,std::string> & per_worker_credential_types,bool run_inproc,int32_t median_latency_collection_interval_millis)354*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<ScenarioResult> RunScenario(
355*cc02d7e2SAndroid Build Coastguard Worker     const ClientConfig& initial_client_config, size_t num_clients,
356*cc02d7e2SAndroid Build Coastguard Worker     const ServerConfig& initial_server_config, size_t num_servers,
357*cc02d7e2SAndroid Build Coastguard Worker     int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count,
358*cc02d7e2SAndroid Build Coastguard Worker     const std::string& qps_server_target_override,
359*cc02d7e2SAndroid Build Coastguard Worker     const std::string& credential_type,
360*cc02d7e2SAndroid Build Coastguard Worker     const std::map<std::string, std::string>& per_worker_credential_types,
361*cc02d7e2SAndroid Build Coastguard Worker     bool run_inproc, int32_t median_latency_collection_interval_millis) {
362*cc02d7e2SAndroid Build Coastguard Worker   if (run_inproc) {
363*cc02d7e2SAndroid Build Coastguard Worker     g_inproc_servers = new std::vector<grpc::testing::Server*>;
364*cc02d7e2SAndroid Build Coastguard Worker   }
365*cc02d7e2SAndroid Build Coastguard Worker   // Log everything from the driver
366*cc02d7e2SAndroid Build Coastguard Worker   gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
367*cc02d7e2SAndroid Build Coastguard Worker 
368*cc02d7e2SAndroid Build Coastguard Worker   // ClientContext allocations (all are destroyed at scope exit)
369*cc02d7e2SAndroid Build Coastguard Worker   list<ClientContext> contexts;
370*cc02d7e2SAndroid Build Coastguard Worker   auto alloc_context = [](list<ClientContext>* contexts) {
371*cc02d7e2SAndroid Build Coastguard Worker     contexts->emplace_back();
372*cc02d7e2SAndroid Build Coastguard Worker     auto context = &contexts->back();
373*cc02d7e2SAndroid Build Coastguard Worker     context->set_wait_for_ready(true);
374*cc02d7e2SAndroid Build Coastguard Worker     return context;
375*cc02d7e2SAndroid Build Coastguard Worker   };
376*cc02d7e2SAndroid Build Coastguard Worker 
377*cc02d7e2SAndroid Build Coastguard Worker   // To be added to the result, containing the final configuration used for
378*cc02d7e2SAndroid Build Coastguard Worker   // client and config (including host, etc.)
379*cc02d7e2SAndroid Build Coastguard Worker   ClientConfig result_client_config;
380*cc02d7e2SAndroid Build Coastguard Worker 
381*cc02d7e2SAndroid Build Coastguard Worker   // Get client, server lists; ignore if inproc test
382*cc02d7e2SAndroid Build Coastguard Worker   auto workers = (!run_inproc) ? get_workers("QPS_WORKERS") : deque<string>();
383*cc02d7e2SAndroid Build Coastguard Worker   ClientConfig client_config = initial_client_config;
384*cc02d7e2SAndroid Build Coastguard Worker 
385*cc02d7e2SAndroid Build Coastguard Worker   // Spawn some local workers if desired
386*cc02d7e2SAndroid Build Coastguard Worker   vector<unique_ptr<QpsWorker>> local_workers;
387*cc02d7e2SAndroid Build Coastguard Worker   for (int i = 0; i < abs(spawn_local_worker_count); i++) {
388*cc02d7e2SAndroid Build Coastguard Worker     // act as if we're a new test -- gets a good rng seed
389*cc02d7e2SAndroid Build Coastguard Worker     static bool called_init = false;
390*cc02d7e2SAndroid Build Coastguard Worker     if (!called_init) {
391*cc02d7e2SAndroid Build Coastguard Worker       char args_buf[100];
392*cc02d7e2SAndroid Build Coastguard Worker       strcpy(args_buf, "some-benchmark");
393*cc02d7e2SAndroid Build Coastguard Worker       char* args[] = {args_buf};
394*cc02d7e2SAndroid Build Coastguard Worker       int argc = 1;
395*cc02d7e2SAndroid Build Coastguard Worker       grpc_test_init(&argc, args);
396*cc02d7e2SAndroid Build Coastguard Worker       called_init = true;
397*cc02d7e2SAndroid Build Coastguard Worker     }
398*cc02d7e2SAndroid Build Coastguard Worker 
399*cc02d7e2SAndroid Build Coastguard Worker     char addr[256];
400*cc02d7e2SAndroid Build Coastguard Worker     // we use port # of -1 to indicate inproc
401*cc02d7e2SAndroid Build Coastguard Worker     int driver_port = (!run_inproc) ? grpc_pick_unused_port_or_die() : -1;
402*cc02d7e2SAndroid Build Coastguard Worker     local_workers.emplace_back(new QpsWorker(driver_port, 0, credential_type));
403*cc02d7e2SAndroid Build Coastguard Worker     sprintf(addr, "localhost:%d", driver_port);
404*cc02d7e2SAndroid Build Coastguard Worker     if (spawn_local_worker_count < 0) {
405*cc02d7e2SAndroid Build Coastguard Worker       workers.push_front(addr);
406*cc02d7e2SAndroid Build Coastguard Worker     } else {
407*cc02d7e2SAndroid Build Coastguard Worker       workers.push_back(addr);
408*cc02d7e2SAndroid Build Coastguard Worker     }
409*cc02d7e2SAndroid Build Coastguard Worker   }
410*cc02d7e2SAndroid Build Coastguard Worker   GPR_ASSERT(!workers.empty());
411*cc02d7e2SAndroid Build Coastguard Worker 
412*cc02d7e2SAndroid Build Coastguard Worker   // if num_clients is set to <=0, do dynamic sizing: all workers
413*cc02d7e2SAndroid Build Coastguard Worker   // except for servers are clients
414*cc02d7e2SAndroid Build Coastguard Worker   if (num_clients <= 0) {
415*cc02d7e2SAndroid Build Coastguard Worker     num_clients = workers.size() - num_servers;
416*cc02d7e2SAndroid Build Coastguard Worker   }
417*cc02d7e2SAndroid Build Coastguard Worker 
418*cc02d7e2SAndroid Build Coastguard Worker   // TODO(ctiller): support running multiple configurations, and binpack
419*cc02d7e2SAndroid Build Coastguard Worker   // client/server pairs
420*cc02d7e2SAndroid Build Coastguard Worker   // to available workers
421*cc02d7e2SAndroid Build Coastguard Worker   GPR_ASSERT(workers.size() >= num_clients + num_servers);
422*cc02d7e2SAndroid Build Coastguard Worker 
423*cc02d7e2SAndroid Build Coastguard Worker   // Trim to just what we need
424*cc02d7e2SAndroid Build Coastguard Worker   workers.resize(num_clients + num_servers);
425*cc02d7e2SAndroid Build Coastguard Worker 
426*cc02d7e2SAndroid Build Coastguard Worker   // Start servers
427*cc02d7e2SAndroid Build Coastguard Worker   std::vector<ServerData> servers(num_servers);
428*cc02d7e2SAndroid Build Coastguard Worker   std::unordered_map<string, std::deque<int>> hosts_cores;
429*cc02d7e2SAndroid Build Coastguard Worker   ChannelArguments channel_args;
430*cc02d7e2SAndroid Build Coastguard Worker 
431*cc02d7e2SAndroid Build Coastguard Worker   for (size_t i = 0; i < num_servers; i++) {
432*cc02d7e2SAndroid Build Coastguard Worker     gpr_log(GPR_INFO, "Starting server on %s (worker #%" PRIuPTR ")",
433*cc02d7e2SAndroid Build Coastguard Worker             workers[i].c_str(), i);
434*cc02d7e2SAndroid Build Coastguard Worker     if (!run_inproc) {
435*cc02d7e2SAndroid Build Coastguard Worker       servers[i].stub = WorkerService::NewStub(grpc::CreateTestChannel(
436*cc02d7e2SAndroid Build Coastguard Worker           workers[i],
437*cc02d7e2SAndroid Build Coastguard Worker           GetCredType(workers[i], per_worker_credential_types, credential_type),
438*cc02d7e2SAndroid Build Coastguard Worker           nullptr /* call creds */, {} /* interceptor creators */));
439*cc02d7e2SAndroid Build Coastguard Worker     } else {
440*cc02d7e2SAndroid Build Coastguard Worker       servers[i].stub = WorkerService::NewStub(
441*cc02d7e2SAndroid Build Coastguard Worker           local_workers[i]->InProcessChannel(channel_args));
442*cc02d7e2SAndroid Build Coastguard Worker     }
443*cc02d7e2SAndroid Build Coastguard Worker 
444*cc02d7e2SAndroid Build Coastguard Worker     const ServerConfig& server_config = initial_server_config;
445*cc02d7e2SAndroid Build Coastguard Worker     if (server_config.core_limit() != 0) {
446*cc02d7e2SAndroid Build Coastguard Worker       grpc_core::Crash("server config core limit is set but ignored by driver");
447*cc02d7e2SAndroid Build Coastguard Worker     }
448*cc02d7e2SAndroid Build Coastguard Worker 
449*cc02d7e2SAndroid Build Coastguard Worker     ServerArgs args;
450*cc02d7e2SAndroid Build Coastguard Worker     *args.mutable_setup() = server_config;
451*cc02d7e2SAndroid Build Coastguard Worker     servers[i].stream = servers[i].stub->RunServer(alloc_context(&contexts));
452*cc02d7e2SAndroid Build Coastguard Worker     if (!servers[i].stream->Write(args)) {
453*cc02d7e2SAndroid Build Coastguard Worker       grpc_core::Crash(
454*cc02d7e2SAndroid Build Coastguard Worker           absl::StrFormat("Could not write args to server %zu", i));
455*cc02d7e2SAndroid Build Coastguard Worker     }
456*cc02d7e2SAndroid Build Coastguard Worker     ServerStatus init_status;
457*cc02d7e2SAndroid Build Coastguard Worker     if (!servers[i].stream->Read(&init_status)) {
458*cc02d7e2SAndroid Build Coastguard Worker       grpc_core::Crash(
459*cc02d7e2SAndroid Build Coastguard Worker           absl::StrFormat("Server %zu did not yield initial status", i));
460*cc02d7e2SAndroid Build Coastguard Worker     }
461*cc02d7e2SAndroid Build Coastguard Worker     if (run_inproc) {
462*cc02d7e2SAndroid Build Coastguard Worker       std::string cli_target(INPROC_NAME_PREFIX);
463*cc02d7e2SAndroid Build Coastguard Worker       cli_target += std::to_string(i);
464*cc02d7e2SAndroid Build Coastguard Worker       client_config.add_server_targets(cli_target);
465*cc02d7e2SAndroid Build Coastguard Worker     } else {
466*cc02d7e2SAndroid Build Coastguard Worker       std::string host = get_host(workers[i]);
467*cc02d7e2SAndroid Build Coastguard Worker       std::string cli_target =
468*cc02d7e2SAndroid Build Coastguard Worker           grpc_core::JoinHostPort(host.c_str(), init_status.port());
469*cc02d7e2SAndroid Build Coastguard Worker       client_config.add_server_targets(cli_target.c_str());
470*cc02d7e2SAndroid Build Coastguard Worker     }
471*cc02d7e2SAndroid Build Coastguard Worker   }
472*cc02d7e2SAndroid Build Coastguard Worker   if (qps_server_target_override.length() > 0) {
473*cc02d7e2SAndroid Build Coastguard Worker     // overriding the qps server target only makes since if there is <= 1
474*cc02d7e2SAndroid Build Coastguard Worker     // servers
475*cc02d7e2SAndroid Build Coastguard Worker     GPR_ASSERT(num_servers <= 1);
476*cc02d7e2SAndroid Build Coastguard Worker     client_config.clear_server_targets();
477*cc02d7e2SAndroid Build Coastguard Worker     client_config.add_server_targets(qps_server_target_override);
478*cc02d7e2SAndroid Build Coastguard Worker   }
479*cc02d7e2SAndroid Build Coastguard Worker   client_config.set_median_latency_collection_interval_millis(
480*cc02d7e2SAndroid Build Coastguard Worker       median_latency_collection_interval_millis);
481*cc02d7e2SAndroid Build Coastguard Worker 
482*cc02d7e2SAndroid Build Coastguard Worker   // Targets are all set by now
483*cc02d7e2SAndroid Build Coastguard Worker   result_client_config = client_config;
484*cc02d7e2SAndroid Build Coastguard Worker   // Start clients
485*cc02d7e2SAndroid Build Coastguard Worker   std::vector<ClientData> clients(num_clients);
486*cc02d7e2SAndroid Build Coastguard Worker   size_t channels_allocated = 0;
487*cc02d7e2SAndroid Build Coastguard Worker   for (size_t i = 0; i < num_clients; i++) {
488*cc02d7e2SAndroid Build Coastguard Worker     const auto& worker = workers[i + num_servers];
489*cc02d7e2SAndroid Build Coastguard Worker     gpr_log(GPR_INFO, "Starting client on %s (worker #%" PRIuPTR ")",
490*cc02d7e2SAndroid Build Coastguard Worker             worker.c_str(), i + num_servers);
491*cc02d7e2SAndroid Build Coastguard Worker     if (!run_inproc) {
492*cc02d7e2SAndroid Build Coastguard Worker       clients[i].stub = WorkerService::NewStub(grpc::CreateTestChannel(
493*cc02d7e2SAndroid Build Coastguard Worker           worker,
494*cc02d7e2SAndroid Build Coastguard Worker           GetCredType(worker, per_worker_credential_types, credential_type),
495*cc02d7e2SAndroid Build Coastguard Worker           nullptr /* call creds */, {} /* interceptor creators */));
496*cc02d7e2SAndroid Build Coastguard Worker     } else {
497*cc02d7e2SAndroid Build Coastguard Worker       clients[i].stub = WorkerService::NewStub(
498*cc02d7e2SAndroid Build Coastguard Worker           local_workers[i + num_servers]->InProcessChannel(channel_args));
499*cc02d7e2SAndroid Build Coastguard Worker     }
500*cc02d7e2SAndroid Build Coastguard Worker     ClientConfig per_client_config = client_config;
501*cc02d7e2SAndroid Build Coastguard Worker 
502*cc02d7e2SAndroid Build Coastguard Worker     if (initial_client_config.core_limit() != 0) {
503*cc02d7e2SAndroid Build Coastguard Worker       grpc_core::Crash("client config core limit set but ignored");
504*cc02d7e2SAndroid Build Coastguard Worker     }
505*cc02d7e2SAndroid Build Coastguard Worker 
506*cc02d7e2SAndroid Build Coastguard Worker     // Reduce channel count so that total channels specified is held regardless
507*cc02d7e2SAndroid Build Coastguard Worker     // of the number of clients available
508*cc02d7e2SAndroid Build Coastguard Worker     size_t num_channels =
509*cc02d7e2SAndroid Build Coastguard Worker         (client_config.client_channels() - channels_allocated) /
510*cc02d7e2SAndroid Build Coastguard Worker         (num_clients - i);
511*cc02d7e2SAndroid Build Coastguard Worker     channels_allocated += num_channels;
512*cc02d7e2SAndroid Build Coastguard Worker     gpr_log(GPR_DEBUG, "Client %" PRIdPTR " gets %" PRIdPTR " channels", i,
513*cc02d7e2SAndroid Build Coastguard Worker             num_channels);
514*cc02d7e2SAndroid Build Coastguard Worker     per_client_config.set_client_channels(num_channels);
515*cc02d7e2SAndroid Build Coastguard Worker 
516*cc02d7e2SAndroid Build Coastguard Worker     ClientArgs args;
517*cc02d7e2SAndroid Build Coastguard Worker     *args.mutable_setup() = per_client_config;
518*cc02d7e2SAndroid Build Coastguard Worker     clients[i].stream = clients[i].stub->RunClient(alloc_context(&contexts));
519*cc02d7e2SAndroid Build Coastguard Worker     if (!clients[i].stream->Write(args)) {
520*cc02d7e2SAndroid Build Coastguard Worker       grpc_core::Crash(
521*cc02d7e2SAndroid Build Coastguard Worker           absl::StrFormat("Could not write args to client %zu", i));
522*cc02d7e2SAndroid Build Coastguard Worker     }
523*cc02d7e2SAndroid Build Coastguard Worker   }
524*cc02d7e2SAndroid Build Coastguard Worker 
525*cc02d7e2SAndroid Build Coastguard Worker   for (size_t i = 0; i < num_clients; i++) {
526*cc02d7e2SAndroid Build Coastguard Worker     ClientStatus init_status;
527*cc02d7e2SAndroid Build Coastguard Worker     if (!clients[i].stream->Read(&init_status)) {
528*cc02d7e2SAndroid Build Coastguard Worker       grpc_core::Crash(
529*cc02d7e2SAndroid Build Coastguard Worker           absl::StrFormat("Client %zu did not yield initial status", i));
530*cc02d7e2SAndroid Build Coastguard Worker     }
531*cc02d7e2SAndroid Build Coastguard Worker   }
532*cc02d7e2SAndroid Build Coastguard Worker 
533*cc02d7e2SAndroid Build Coastguard Worker   // Send an initial mark: clients can use this to know that everything is ready
534*cc02d7e2SAndroid Build Coastguard Worker   // to start
535*cc02d7e2SAndroid Build Coastguard Worker   gpr_log(GPR_INFO, "Initiating");
536*cc02d7e2SAndroid Build Coastguard Worker   ServerArgs server_mark;
537*cc02d7e2SAndroid Build Coastguard Worker   server_mark.mutable_mark()->set_reset(true);
538*cc02d7e2SAndroid Build Coastguard Worker   ClientArgs client_mark;
539*cc02d7e2SAndroid Build Coastguard Worker   client_mark.mutable_mark()->set_reset(true);
540*cc02d7e2SAndroid Build Coastguard Worker   ServerStatus server_status;
541*cc02d7e2SAndroid Build Coastguard Worker   ClientStatus client_status;
542*cc02d7e2SAndroid Build Coastguard Worker   for (size_t i = 0; i < num_clients; i++) {
543*cc02d7e2SAndroid Build Coastguard Worker     auto client = &clients[i];
544*cc02d7e2SAndroid Build Coastguard Worker     if (!client->stream->Write(client_mark)) {
545*cc02d7e2SAndroid Build Coastguard Worker       grpc_core::Crash(absl::StrFormat("Couldn't write mark to client %zu", i));
546*cc02d7e2SAndroid Build Coastguard Worker     }
547*cc02d7e2SAndroid Build Coastguard Worker   }
548*cc02d7e2SAndroid Build Coastguard Worker   for (size_t i = 0; i < num_clients; i++) {
549*cc02d7e2SAndroid Build Coastguard Worker     auto client = &clients[i];
550*cc02d7e2SAndroid Build Coastguard Worker     if (!client->stream->Read(&client_status)) {
551*cc02d7e2SAndroid Build Coastguard Worker       grpc_core::Crash(
552*cc02d7e2SAndroid Build Coastguard Worker           absl::StrFormat("Couldn't get status from client %zu", i));
553*cc02d7e2SAndroid Build Coastguard Worker     }
554*cc02d7e2SAndroid Build Coastguard Worker   }
555*cc02d7e2SAndroid Build Coastguard Worker 
556*cc02d7e2SAndroid Build Coastguard Worker   // Let everything warmup
557*cc02d7e2SAndroid Build Coastguard Worker   gpr_log(GPR_INFO, "Warming up");
558*cc02d7e2SAndroid Build Coastguard Worker   gpr_timespec start = gpr_now(GPR_CLOCK_REALTIME);
559*cc02d7e2SAndroid Build Coastguard Worker   gpr_sleep_until(
560*cc02d7e2SAndroid Build Coastguard Worker       gpr_time_add(start, gpr_time_from_seconds(warmup_seconds, GPR_TIMESPAN)));
561*cc02d7e2SAndroid Build Coastguard Worker 
562*cc02d7e2SAndroid Build Coastguard Worker   // Start a run
563*cc02d7e2SAndroid Build Coastguard Worker   gpr_log(GPR_INFO, "Starting");
564*cc02d7e2SAndroid Build Coastguard Worker 
565*cc02d7e2SAndroid Build Coastguard Worker   auto start_time = time(nullptr);
566*cc02d7e2SAndroid Build Coastguard Worker 
567*cc02d7e2SAndroid Build Coastguard Worker   for (size_t i = 0; i < num_servers; i++) {
568*cc02d7e2SAndroid Build Coastguard Worker     auto server = &servers[i];
569*cc02d7e2SAndroid Build Coastguard Worker     if (!server->stream->Write(server_mark)) {
570*cc02d7e2SAndroid Build Coastguard Worker       grpc_core::Crash(absl::StrFormat("Couldn't write mark to server %zu", i));
571*cc02d7e2SAndroid Build Coastguard Worker     }
572*cc02d7e2SAndroid Build Coastguard Worker   }
573*cc02d7e2SAndroid Build Coastguard Worker   for (size_t i = 0; i < num_clients; i++) {
574*cc02d7e2SAndroid Build Coastguard Worker     auto client = &clients[i];
575*cc02d7e2SAndroid Build Coastguard Worker     if (!client->stream->Write(client_mark)) {
576*cc02d7e2SAndroid Build Coastguard Worker       grpc_core::Crash(absl::StrFormat("Couldn't write mark to client %zu", i));
577*cc02d7e2SAndroid Build Coastguard Worker     }
578*cc02d7e2SAndroid Build Coastguard Worker   }
579*cc02d7e2SAndroid Build Coastguard Worker   for (size_t i = 0; i < num_servers; i++) {
580*cc02d7e2SAndroid Build Coastguard Worker     auto server = &servers[i];
581*cc02d7e2SAndroid Build Coastguard Worker     if (!server->stream->Read(&server_status)) {
582*cc02d7e2SAndroid Build Coastguard Worker       grpc_core::Crash(
583*cc02d7e2SAndroid Build Coastguard Worker           absl::StrFormat("Couldn't get status from server %zu", i));
584*cc02d7e2SAndroid Build Coastguard Worker     }
585*cc02d7e2SAndroid Build Coastguard Worker   }
586*cc02d7e2SAndroid Build Coastguard Worker   for (size_t i = 0; i < num_clients; i++) {
587*cc02d7e2SAndroid Build Coastguard Worker     auto client = &clients[i];
588*cc02d7e2SAndroid Build Coastguard Worker     if (!client->stream->Read(&client_status)) {
589*cc02d7e2SAndroid Build Coastguard Worker       grpc_core::Crash(
590*cc02d7e2SAndroid Build Coastguard Worker           absl::StrFormat("Couldn't get status from client %zu", i));
591*cc02d7e2SAndroid Build Coastguard Worker     }
592*cc02d7e2SAndroid Build Coastguard Worker   }
593*cc02d7e2SAndroid Build Coastguard Worker 
594*cc02d7e2SAndroid Build Coastguard Worker   // Wait some time
595*cc02d7e2SAndroid Build Coastguard Worker   gpr_log(GPR_INFO, "Running");
596*cc02d7e2SAndroid Build Coastguard Worker   // Use gpr_sleep_until rather than this_thread::sleep_until to support
597*cc02d7e2SAndroid Build Coastguard Worker   // compilers that don't work with this_thread
598*cc02d7e2SAndroid Build Coastguard Worker   gpr_sleep_until(gpr_time_add(
599*cc02d7e2SAndroid Build Coastguard Worker       start,
600*cc02d7e2SAndroid Build Coastguard Worker       gpr_time_from_seconds(warmup_seconds + benchmark_seconds, GPR_TIMESPAN)));
601*cc02d7e2SAndroid Build Coastguard Worker 
602*cc02d7e2SAndroid Build Coastguard Worker   // Finish a run
603*cc02d7e2SAndroid Build Coastguard Worker   std::unique_ptr<ScenarioResult> result(new ScenarioResult);
604*cc02d7e2SAndroid Build Coastguard Worker   Histogram merged_latencies;
605*cc02d7e2SAndroid Build Coastguard Worker   std::unordered_map<int, int64_t> merged_statuses;
606*cc02d7e2SAndroid Build Coastguard Worker 
607*cc02d7e2SAndroid Build Coastguard Worker   // For the case where clients lead the test such as UNARY and
608*cc02d7e2SAndroid Build Coastguard Worker   // STREAMING_FROM_CLIENT, clients need to finish completely while a server
609*cc02d7e2SAndroid Build Coastguard Worker   // is running to prevent the clients from being stuck while waiting for
610*cc02d7e2SAndroid Build Coastguard Worker   // the result.
611*cc02d7e2SAndroid Build Coastguard Worker   bool client_finish_first =
612*cc02d7e2SAndroid Build Coastguard Worker       (client_config.rpc_type() != STREAMING_FROM_SERVER);
613*cc02d7e2SAndroid Build Coastguard Worker 
614*cc02d7e2SAndroid Build Coastguard Worker   auto end_time = time(nullptr);
615*cc02d7e2SAndroid Build Coastguard Worker 
616*cc02d7e2SAndroid Build Coastguard Worker   FinishClients(clients, client_mark);
617*cc02d7e2SAndroid Build Coastguard Worker 
618*cc02d7e2SAndroid Build Coastguard Worker   if (!client_finish_first) {
619*cc02d7e2SAndroid Build Coastguard Worker     FinishServers(servers, server_mark);
620*cc02d7e2SAndroid Build Coastguard Worker   }
621*cc02d7e2SAndroid Build Coastguard Worker 
622*cc02d7e2SAndroid Build Coastguard Worker   ReceiveFinalStatusFromClients(clients, merged_latencies, merged_statuses,
623*cc02d7e2SAndroid Build Coastguard Worker                                 *result);
624*cc02d7e2SAndroid Build Coastguard Worker   ShutdownClients(clients, *result);
625*cc02d7e2SAndroid Build Coastguard Worker 
626*cc02d7e2SAndroid Build Coastguard Worker   if (client_finish_first) {
627*cc02d7e2SAndroid Build Coastguard Worker     FinishServers(servers, server_mark);
628*cc02d7e2SAndroid Build Coastguard Worker   }
629*cc02d7e2SAndroid Build Coastguard Worker 
630*cc02d7e2SAndroid Build Coastguard Worker   ReceiveFinalStatusFromServer(servers, *result);
631*cc02d7e2SAndroid Build Coastguard Worker   ShutdownServers(servers, *result);
632*cc02d7e2SAndroid Build Coastguard Worker 
633*cc02d7e2SAndroid Build Coastguard Worker   delete g_inproc_servers;
634*cc02d7e2SAndroid Build Coastguard Worker 
635*cc02d7e2SAndroid Build Coastguard Worker   merged_latencies.FillProto(result->mutable_latencies());
636*cc02d7e2SAndroid Build Coastguard Worker   for (std::unordered_map<int, int64_t>::iterator it = merged_statuses.begin();
637*cc02d7e2SAndroid Build Coastguard Worker        it != merged_statuses.end(); ++it) {
638*cc02d7e2SAndroid Build Coastguard Worker     RequestResultCount* rrc = result->add_request_results();
639*cc02d7e2SAndroid Build Coastguard Worker     rrc->set_status_code(it->first);
640*cc02d7e2SAndroid Build Coastguard Worker     rrc->set_count(it->second);
641*cc02d7e2SAndroid Build Coastguard Worker   }
642*cc02d7e2SAndroid Build Coastguard Worker 
643*cc02d7e2SAndroid Build Coastguard Worker   // Fill in start and end time for the test scenario
644*cc02d7e2SAndroid Build Coastguard Worker   result->mutable_summary()->mutable_start_time()->set_seconds(start_time);
645*cc02d7e2SAndroid Build Coastguard Worker   result->mutable_summary()->mutable_end_time()->set_seconds(end_time);
646*cc02d7e2SAndroid Build Coastguard Worker 
647*cc02d7e2SAndroid Build Coastguard Worker   postprocess_scenario_result(result.get());
648*cc02d7e2SAndroid Build Coastguard Worker   return result;
649*cc02d7e2SAndroid Build Coastguard Worker }
650*cc02d7e2SAndroid Build Coastguard Worker 
RunQuit(const std::string & credential_type,const std::map<std::string,std::string> & per_worker_credential_types)651*cc02d7e2SAndroid Build Coastguard Worker bool RunQuit(
652*cc02d7e2SAndroid Build Coastguard Worker     const std::string& credential_type,
653*cc02d7e2SAndroid Build Coastguard Worker     const std::map<std::string, std::string>& per_worker_credential_types) {
654*cc02d7e2SAndroid Build Coastguard Worker   // Get client, server lists
655*cc02d7e2SAndroid Build Coastguard Worker   bool result = true;
656*cc02d7e2SAndroid Build Coastguard Worker   auto workers = get_workers("QPS_WORKERS");
657*cc02d7e2SAndroid Build Coastguard Worker   if (workers.empty()) {
658*cc02d7e2SAndroid Build Coastguard Worker     return false;
659*cc02d7e2SAndroid Build Coastguard Worker   }
660*cc02d7e2SAndroid Build Coastguard Worker 
661*cc02d7e2SAndroid Build Coastguard Worker   for (size_t i = 0; i < workers.size(); i++) {
662*cc02d7e2SAndroid Build Coastguard Worker     auto stub = WorkerService::NewStub(grpc::CreateTestChannel(
663*cc02d7e2SAndroid Build Coastguard Worker         workers[i],
664*cc02d7e2SAndroid Build Coastguard Worker         GetCredType(workers[i], per_worker_credential_types, credential_type),
665*cc02d7e2SAndroid Build Coastguard Worker         nullptr /* call creds */, {} /* interceptor creators */));
666*cc02d7e2SAndroid Build Coastguard Worker     Void phony;
667*cc02d7e2SAndroid Build Coastguard Worker     grpc::ClientContext ctx;
668*cc02d7e2SAndroid Build Coastguard Worker     ctx.set_wait_for_ready(true);
669*cc02d7e2SAndroid Build Coastguard Worker     Status s = stub->QuitWorker(&ctx, phony, &phony);
670*cc02d7e2SAndroid Build Coastguard Worker     if (!s.ok()) {
671*cc02d7e2SAndroid Build Coastguard Worker       gpr_log(GPR_ERROR, "Worker %zu could not be properly quit because %s", i,
672*cc02d7e2SAndroid Build Coastguard Worker               s.error_message().c_str());
673*cc02d7e2SAndroid Build Coastguard Worker       result = false;
674*cc02d7e2SAndroid Build Coastguard Worker     }
675*cc02d7e2SAndroid Build Coastguard Worker   }
676*cc02d7e2SAndroid Build Coastguard Worker   return result;
677*cc02d7e2SAndroid Build Coastguard Worker }
678*cc02d7e2SAndroid Build Coastguard Worker 
679*cc02d7e2SAndroid Build Coastguard Worker }  // namespace testing
680*cc02d7e2SAndroid Build Coastguard Worker }  // namespace grpc
681