1*cc02d7e2SAndroid Build Coastguard Worker //
2*cc02d7e2SAndroid Build Coastguard Worker //
3*cc02d7e2SAndroid Build Coastguard Worker // Copyright 2015 gRPC authors.
4*cc02d7e2SAndroid Build Coastguard Worker //
5*cc02d7e2SAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License");
6*cc02d7e2SAndroid Build Coastguard Worker // you may not use this file except in compliance with the License.
7*cc02d7e2SAndroid Build Coastguard Worker // You may obtain a copy of the License at
8*cc02d7e2SAndroid Build Coastguard Worker //
9*cc02d7e2SAndroid Build Coastguard Worker // http://www.apache.org/licenses/LICENSE-2.0
10*cc02d7e2SAndroid Build Coastguard Worker //
11*cc02d7e2SAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
12*cc02d7e2SAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS,
13*cc02d7e2SAndroid Build Coastguard Worker // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14*cc02d7e2SAndroid Build Coastguard Worker // See the License for the specific language governing permissions and
15*cc02d7e2SAndroid Build Coastguard Worker // limitations under the License.
16*cc02d7e2SAndroid Build Coastguard Worker //
17*cc02d7e2SAndroid Build Coastguard Worker //
18*cc02d7e2SAndroid Build Coastguard Worker
19*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/qps/driver.h"
20*cc02d7e2SAndroid Build Coastguard Worker
21*cc02d7e2SAndroid Build Coastguard Worker #include <cinttypes>
22*cc02d7e2SAndroid Build Coastguard Worker #include <deque>
23*cc02d7e2SAndroid Build Coastguard Worker #include <list>
24*cc02d7e2SAndroid Build Coastguard Worker #include <thread>
25*cc02d7e2SAndroid Build Coastguard Worker #include <unordered_map>
26*cc02d7e2SAndroid Build Coastguard Worker #include <vector>
27*cc02d7e2SAndroid Build Coastguard Worker
28*cc02d7e2SAndroid Build Coastguard Worker #include "google/protobuf/timestamp.pb.h"
29*cc02d7e2SAndroid Build Coastguard Worker
30*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/support/alloc.h>
31*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/support/log.h>
32*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/support/string_util.h>
33*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/channel.h>
34*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/client_context.h>
35*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/create_channel.h>
36*cc02d7e2SAndroid Build Coastguard Worker
37*cc02d7e2SAndroid Build Coastguard Worker #include "src/core/lib/gprpp/crash.h"
38*cc02d7e2SAndroid Build Coastguard Worker #include "src/core/lib/gprpp/env.h"
39*cc02d7e2SAndroid Build Coastguard Worker #include "src/core/lib/gprpp/host_port.h"
40*cc02d7e2SAndroid Build Coastguard Worker #include "src/proto/grpc/testing/worker_service.grpc.pb.h"
41*cc02d7e2SAndroid Build Coastguard Worker #include "test/core/util/port.h"
42*cc02d7e2SAndroid Build Coastguard Worker #include "test/core/util/test_config.h"
43*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/qps/client.h"
44*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/qps/histogram.h"
45*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/qps/qps_worker.h"
46*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/qps/stats.h"
47*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/util/test_credentials_provider.h"
48*cc02d7e2SAndroid Build Coastguard Worker
49*cc02d7e2SAndroid Build Coastguard Worker using std::deque;
50*cc02d7e2SAndroid Build Coastguard Worker using std::list;
51*cc02d7e2SAndroid Build Coastguard Worker using std::unique_ptr;
52*cc02d7e2SAndroid Build Coastguard Worker using std::vector;
53*cc02d7e2SAndroid Build Coastguard Worker
54*cc02d7e2SAndroid Build Coastguard Worker namespace grpc {
55*cc02d7e2SAndroid Build Coastguard Worker namespace testing {
get_host(const std::string & worker)56*cc02d7e2SAndroid Build Coastguard Worker static std::string get_host(const std::string& worker) {
57*cc02d7e2SAndroid Build Coastguard Worker absl::string_view host;
58*cc02d7e2SAndroid Build Coastguard Worker absl::string_view port;
59*cc02d7e2SAndroid Build Coastguard Worker grpc_core::SplitHostPort(worker.c_str(), &host, &port);
60*cc02d7e2SAndroid Build Coastguard Worker return std::string(host.data(), host.size());
61*cc02d7e2SAndroid Build Coastguard Worker }
62*cc02d7e2SAndroid Build Coastguard Worker
get_workers(const string & env_name)63*cc02d7e2SAndroid Build Coastguard Worker static deque<string> get_workers(const string& env_name) {
64*cc02d7e2SAndroid Build Coastguard Worker deque<string> out;
65*cc02d7e2SAndroid Build Coastguard Worker auto env = grpc_core::GetEnv(env_name.c_str()).value_or("");
66*cc02d7e2SAndroid Build Coastguard Worker const char* p = env.c_str();
67*cc02d7e2SAndroid Build Coastguard Worker if (!env.empty()) {
68*cc02d7e2SAndroid Build Coastguard Worker for (;;) {
69*cc02d7e2SAndroid Build Coastguard Worker const char* comma = strchr(p, ',');
70*cc02d7e2SAndroid Build Coastguard Worker if (comma) {
71*cc02d7e2SAndroid Build Coastguard Worker out.emplace_back(p, comma);
72*cc02d7e2SAndroid Build Coastguard Worker p = comma + 1;
73*cc02d7e2SAndroid Build Coastguard Worker } else {
74*cc02d7e2SAndroid Build Coastguard Worker out.emplace_back(p);
75*cc02d7e2SAndroid Build Coastguard Worker break;
76*cc02d7e2SAndroid Build Coastguard Worker }
77*cc02d7e2SAndroid Build Coastguard Worker }
78*cc02d7e2SAndroid Build Coastguard Worker }
79*cc02d7e2SAndroid Build Coastguard Worker if (out.empty()) {
80*cc02d7e2SAndroid Build Coastguard Worker gpr_log(GPR_ERROR,
81*cc02d7e2SAndroid Build Coastguard Worker "Environment variable \"%s\" does not contain a list of QPS "
82*cc02d7e2SAndroid Build Coastguard Worker "workers to use. Set it to a comma-separated list of "
83*cc02d7e2SAndroid Build Coastguard Worker "hostname:port pairs, starting with hosts that should act as "
84*cc02d7e2SAndroid Build Coastguard Worker "servers. E.g. export "
85*cc02d7e2SAndroid Build Coastguard Worker "%s=\"serverhost1:1234,clienthost1:1234,clienthost2:1234\"",
86*cc02d7e2SAndroid Build Coastguard Worker env_name.c_str(), env_name.c_str());
87*cc02d7e2SAndroid Build Coastguard Worker }
88*cc02d7e2SAndroid Build Coastguard Worker return out;
89*cc02d7e2SAndroid Build Coastguard Worker }
90*cc02d7e2SAndroid Build Coastguard Worker
GetCredType(const std::string & worker_addr,const std::map<std::string,std::string> & per_worker_credential_types,const std::string & credential_type)91*cc02d7e2SAndroid Build Coastguard Worker std::string GetCredType(
92*cc02d7e2SAndroid Build Coastguard Worker const std::string& worker_addr,
93*cc02d7e2SAndroid Build Coastguard Worker const std::map<std::string, std::string>& per_worker_credential_types,
94*cc02d7e2SAndroid Build Coastguard Worker const std::string& credential_type) {
95*cc02d7e2SAndroid Build Coastguard Worker auto it = per_worker_credential_types.find(worker_addr);
96*cc02d7e2SAndroid Build Coastguard Worker if (it != per_worker_credential_types.end()) {
97*cc02d7e2SAndroid Build Coastguard Worker return it->second;
98*cc02d7e2SAndroid Build Coastguard Worker }
99*cc02d7e2SAndroid Build Coastguard Worker return credential_type;
100*cc02d7e2SAndroid Build Coastguard Worker }
101*cc02d7e2SAndroid Build Coastguard Worker
102*cc02d7e2SAndroid Build Coastguard Worker // helpers for postprocess_scenario_result
WallTime(const ClientStats & s)103*cc02d7e2SAndroid Build Coastguard Worker static double WallTime(const ClientStats& s) { return s.time_elapsed(); }
SystemTime(const ClientStats & s)104*cc02d7e2SAndroid Build Coastguard Worker static double SystemTime(const ClientStats& s) { return s.time_system(); }
UserTime(const ClientStats & s)105*cc02d7e2SAndroid Build Coastguard Worker static double UserTime(const ClientStats& s) { return s.time_user(); }
CliPollCount(const ClientStats & s)106*cc02d7e2SAndroid Build Coastguard Worker static double CliPollCount(const ClientStats& s) { return s.cq_poll_count(); }
SvrPollCount(const ServerStats & s)107*cc02d7e2SAndroid Build Coastguard Worker static double SvrPollCount(const ServerStats& s) { return s.cq_poll_count(); }
ServerSystemTime(const ServerStats & s)108*cc02d7e2SAndroid Build Coastguard Worker static double ServerSystemTime(const ServerStats& s) { return s.time_system(); }
ServerUserTime(const ServerStats & s)109*cc02d7e2SAndroid Build Coastguard Worker static double ServerUserTime(const ServerStats& s) { return s.time_user(); }
ServerTotalCpuTime(const ServerStats & s)110*cc02d7e2SAndroid Build Coastguard Worker static double ServerTotalCpuTime(const ServerStats& s) {
111*cc02d7e2SAndroid Build Coastguard Worker return s.total_cpu_time();
112*cc02d7e2SAndroid Build Coastguard Worker }
ServerIdleCpuTime(const ServerStats & s)113*cc02d7e2SAndroid Build Coastguard Worker static double ServerIdleCpuTime(const ServerStats& s) {
114*cc02d7e2SAndroid Build Coastguard Worker return s.idle_cpu_time();
115*cc02d7e2SAndroid Build Coastguard Worker }
Cores(int n)116*cc02d7e2SAndroid Build Coastguard Worker static int Cores(int n) { return n; }
117*cc02d7e2SAndroid Build Coastguard Worker
IsSuccess(const Status & s)118*cc02d7e2SAndroid Build Coastguard Worker static bool IsSuccess(const Status& s) {
119*cc02d7e2SAndroid Build Coastguard Worker if (s.ok()) return true;
120*cc02d7e2SAndroid Build Coastguard Worker // Since we shutdown servers and clients at the same time, they both can
121*cc02d7e2SAndroid Build Coastguard Worker // observe cancellation. Thus, we consider CANCELLED as good status.
122*cc02d7e2SAndroid Build Coastguard Worker if (static_cast<StatusCode>(s.error_code()) == StatusCode::CANCELLED) {
123*cc02d7e2SAndroid Build Coastguard Worker return true;
124*cc02d7e2SAndroid Build Coastguard Worker }
125*cc02d7e2SAndroid Build Coastguard Worker // Since we shutdown servers and clients at the same time, server can close
126*cc02d7e2SAndroid Build Coastguard Worker // the socket before the client attempts to do that, and vice versa. Thus
127*cc02d7e2SAndroid Build Coastguard Worker // receiving a "Socket closed" error is fine.
128*cc02d7e2SAndroid Build Coastguard Worker if (s.error_message() == "Socket closed") return true;
129*cc02d7e2SAndroid Build Coastguard Worker return false;
130*cc02d7e2SAndroid Build Coastguard Worker }
131*cc02d7e2SAndroid Build Coastguard Worker
132*cc02d7e2SAndroid Build Coastguard Worker // Postprocess ScenarioResult and populate result summary.
postprocess_scenario_result(ScenarioResult * result)133*cc02d7e2SAndroid Build Coastguard Worker static void postprocess_scenario_result(ScenarioResult* result) {
134*cc02d7e2SAndroid Build Coastguard Worker // Get latencies from ScenarioResult latencies histogram and populate to
135*cc02d7e2SAndroid Build Coastguard Worker // result summary.
136*cc02d7e2SAndroid Build Coastguard Worker Histogram histogram;
137*cc02d7e2SAndroid Build Coastguard Worker histogram.MergeProto(result->latencies());
138*cc02d7e2SAndroid Build Coastguard Worker result->mutable_summary()->set_latency_50(histogram.Percentile(50));
139*cc02d7e2SAndroid Build Coastguard Worker result->mutable_summary()->set_latency_90(histogram.Percentile(90));
140*cc02d7e2SAndroid Build Coastguard Worker result->mutable_summary()->set_latency_95(histogram.Percentile(95));
141*cc02d7e2SAndroid Build Coastguard Worker result->mutable_summary()->set_latency_99(histogram.Percentile(99));
142*cc02d7e2SAndroid Build Coastguard Worker result->mutable_summary()->set_latency_999(histogram.Percentile(99.9));
143*cc02d7e2SAndroid Build Coastguard Worker
144*cc02d7e2SAndroid Build Coastguard Worker // Calculate qps and cpu load for each client and then aggregate results for
145*cc02d7e2SAndroid Build Coastguard Worker // all clients
146*cc02d7e2SAndroid Build Coastguard Worker double qps = 0;
147*cc02d7e2SAndroid Build Coastguard Worker double client_system_cpu_load = 0, client_user_cpu_load = 0;
148*cc02d7e2SAndroid Build Coastguard Worker for (int i = 0; i < result->client_stats_size(); i++) {
149*cc02d7e2SAndroid Build Coastguard Worker auto client_stat = result->client_stats(i);
150*cc02d7e2SAndroid Build Coastguard Worker qps += client_stat.latencies().count() / client_stat.time_elapsed();
151*cc02d7e2SAndroid Build Coastguard Worker client_system_cpu_load +=
152*cc02d7e2SAndroid Build Coastguard Worker client_stat.time_system() / client_stat.time_elapsed();
153*cc02d7e2SAndroid Build Coastguard Worker client_user_cpu_load +=
154*cc02d7e2SAndroid Build Coastguard Worker client_stat.time_user() / client_stat.time_elapsed();
155*cc02d7e2SAndroid Build Coastguard Worker }
156*cc02d7e2SAndroid Build Coastguard Worker // Calculate cpu load for each server and then aggregate results for all
157*cc02d7e2SAndroid Build Coastguard Worker // servers
158*cc02d7e2SAndroid Build Coastguard Worker double server_system_cpu_load = 0, server_user_cpu_load = 0;
159*cc02d7e2SAndroid Build Coastguard Worker for (int i = 0; i < result->server_stats_size(); i++) {
160*cc02d7e2SAndroid Build Coastguard Worker auto server_stat = result->server_stats(i);
161*cc02d7e2SAndroid Build Coastguard Worker server_system_cpu_load +=
162*cc02d7e2SAndroid Build Coastguard Worker server_stat.time_system() / server_stat.time_elapsed();
163*cc02d7e2SAndroid Build Coastguard Worker server_user_cpu_load +=
164*cc02d7e2SAndroid Build Coastguard Worker server_stat.time_user() / server_stat.time_elapsed();
165*cc02d7e2SAndroid Build Coastguard Worker }
166*cc02d7e2SAndroid Build Coastguard Worker result->mutable_summary()->set_qps(qps);
167*cc02d7e2SAndroid Build Coastguard Worker // Populate the percentage of cpu load to result summary.
168*cc02d7e2SAndroid Build Coastguard Worker result->mutable_summary()->set_server_system_time(100 *
169*cc02d7e2SAndroid Build Coastguard Worker server_system_cpu_load);
170*cc02d7e2SAndroid Build Coastguard Worker result->mutable_summary()->set_server_user_time(100 * server_user_cpu_load);
171*cc02d7e2SAndroid Build Coastguard Worker result->mutable_summary()->set_client_system_time(100 *
172*cc02d7e2SAndroid Build Coastguard Worker client_system_cpu_load);
173*cc02d7e2SAndroid Build Coastguard Worker result->mutable_summary()->set_client_user_time(100 * client_user_cpu_load);
174*cc02d7e2SAndroid Build Coastguard Worker
175*cc02d7e2SAndroid Build Coastguard Worker // For Non-linux platform, get_cpu_usage() is not implemented. Thus,
176*cc02d7e2SAndroid Build Coastguard Worker // ServerTotalCpuTime and ServerIdleCpuTime are both 0.
177*cc02d7e2SAndroid Build Coastguard Worker if (average(result->server_stats(), ServerTotalCpuTime) == 0) {
178*cc02d7e2SAndroid Build Coastguard Worker result->mutable_summary()->set_server_cpu_usage(0);
179*cc02d7e2SAndroid Build Coastguard Worker } else {
180*cc02d7e2SAndroid Build Coastguard Worker auto server_cpu_usage =
181*cc02d7e2SAndroid Build Coastguard Worker 100 - 100 * average(result->server_stats(), ServerIdleCpuTime) /
182*cc02d7e2SAndroid Build Coastguard Worker average(result->server_stats(), ServerTotalCpuTime);
183*cc02d7e2SAndroid Build Coastguard Worker result->mutable_summary()->set_server_cpu_usage(server_cpu_usage);
184*cc02d7e2SAndroid Build Coastguard Worker }
185*cc02d7e2SAndroid Build Coastguard Worker
186*cc02d7e2SAndroid Build Coastguard Worker // Calculate and populate successful request per second and failed requests
187*cc02d7e2SAndroid Build Coastguard Worker // per seconds to result summary.
188*cc02d7e2SAndroid Build Coastguard Worker auto time_estimate = average(result->client_stats(), WallTime);
189*cc02d7e2SAndroid Build Coastguard Worker if (result->request_results_size() > 0) {
190*cc02d7e2SAndroid Build Coastguard Worker int64_t successes = 0;
191*cc02d7e2SAndroid Build Coastguard Worker int64_t failures = 0;
192*cc02d7e2SAndroid Build Coastguard Worker for (int i = 0; i < result->request_results_size(); i++) {
193*cc02d7e2SAndroid Build Coastguard Worker const RequestResultCount& rrc = result->request_results(i);
194*cc02d7e2SAndroid Build Coastguard Worker if (rrc.status_code() == 0) {
195*cc02d7e2SAndroid Build Coastguard Worker successes += rrc.count();
196*cc02d7e2SAndroid Build Coastguard Worker } else {
197*cc02d7e2SAndroid Build Coastguard Worker failures += rrc.count();
198*cc02d7e2SAndroid Build Coastguard Worker }
199*cc02d7e2SAndroid Build Coastguard Worker }
200*cc02d7e2SAndroid Build Coastguard Worker result->mutable_summary()->set_successful_requests_per_second(
201*cc02d7e2SAndroid Build Coastguard Worker successes / time_estimate);
202*cc02d7e2SAndroid Build Coastguard Worker result->mutable_summary()->set_failed_requests_per_second(failures /
203*cc02d7e2SAndroid Build Coastguard Worker time_estimate);
204*cc02d7e2SAndroid Build Coastguard Worker }
205*cc02d7e2SAndroid Build Coastguard Worker
206*cc02d7e2SAndroid Build Coastguard Worker // Fill in data for other metrics required in result summary
207*cc02d7e2SAndroid Build Coastguard Worker auto qps_per_server_core = qps / sum(result->server_cores(), Cores);
208*cc02d7e2SAndroid Build Coastguard Worker result->mutable_summary()->set_qps_per_server_core(qps_per_server_core);
209*cc02d7e2SAndroid Build Coastguard Worker result->mutable_summary()->set_client_polls_per_request(
210*cc02d7e2SAndroid Build Coastguard Worker sum(result->client_stats(), CliPollCount) / histogram.Count());
211*cc02d7e2SAndroid Build Coastguard Worker result->mutable_summary()->set_server_polls_per_request(
212*cc02d7e2SAndroid Build Coastguard Worker sum(result->server_stats(), SvrPollCount) / histogram.Count());
213*cc02d7e2SAndroid Build Coastguard Worker
214*cc02d7e2SAndroid Build Coastguard Worker auto server_queries_per_cpu_sec =
215*cc02d7e2SAndroid Build Coastguard Worker histogram.Count() / (sum(result->server_stats(), ServerSystemTime) +
216*cc02d7e2SAndroid Build Coastguard Worker sum(result->server_stats(), ServerUserTime));
217*cc02d7e2SAndroid Build Coastguard Worker auto client_queries_per_cpu_sec =
218*cc02d7e2SAndroid Build Coastguard Worker histogram.Count() / (sum(result->client_stats(), SystemTime) +
219*cc02d7e2SAndroid Build Coastguard Worker sum(result->client_stats(), UserTime));
220*cc02d7e2SAndroid Build Coastguard Worker
221*cc02d7e2SAndroid Build Coastguard Worker result->mutable_summary()->set_server_queries_per_cpu_sec(
222*cc02d7e2SAndroid Build Coastguard Worker server_queries_per_cpu_sec);
223*cc02d7e2SAndroid Build Coastguard Worker result->mutable_summary()->set_client_queries_per_cpu_sec(
224*cc02d7e2SAndroid Build Coastguard Worker client_queries_per_cpu_sec);
225*cc02d7e2SAndroid Build Coastguard Worker }
226*cc02d7e2SAndroid Build Coastguard Worker
227*cc02d7e2SAndroid Build Coastguard Worker struct ClientData {
228*cc02d7e2SAndroid Build Coastguard Worker unique_ptr<WorkerService::Stub> stub;
229*cc02d7e2SAndroid Build Coastguard Worker unique_ptr<ClientReaderWriter<ClientArgs, ClientStatus>> stream;
230*cc02d7e2SAndroid Build Coastguard Worker };
231*cc02d7e2SAndroid Build Coastguard Worker
232*cc02d7e2SAndroid Build Coastguard Worker struct ServerData {
233*cc02d7e2SAndroid Build Coastguard Worker unique_ptr<WorkerService::Stub> stub;
234*cc02d7e2SAndroid Build Coastguard Worker unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream;
235*cc02d7e2SAndroid Build Coastguard Worker };
236*cc02d7e2SAndroid Build Coastguard Worker
FinishClients(const std::vector<ClientData> & clients,const ClientArgs & client_mark)237*cc02d7e2SAndroid Build Coastguard Worker static void FinishClients(const std::vector<ClientData>& clients,
238*cc02d7e2SAndroid Build Coastguard Worker const ClientArgs& client_mark) {
239*cc02d7e2SAndroid Build Coastguard Worker gpr_log(GPR_INFO, "Finishing clients");
240*cc02d7e2SAndroid Build Coastguard Worker for (size_t i = 0, i_end = clients.size(); i < i_end; i++) {
241*cc02d7e2SAndroid Build Coastguard Worker auto client = &clients[i];
242*cc02d7e2SAndroid Build Coastguard Worker if (!client->stream->Write(client_mark)) {
243*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Crash(absl::StrFormat("Couldn't write mark to client %zu", i));
244*cc02d7e2SAndroid Build Coastguard Worker }
245*cc02d7e2SAndroid Build Coastguard Worker if (!client->stream->WritesDone()) {
246*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Crash(absl::StrFormat("Failed WritesDone for client %zu", i));
247*cc02d7e2SAndroid Build Coastguard Worker }
248*cc02d7e2SAndroid Build Coastguard Worker }
249*cc02d7e2SAndroid Build Coastguard Worker }
250*cc02d7e2SAndroid Build Coastguard Worker
ReceiveFinalStatusFromClients(const std::vector<ClientData> & clients,Histogram & merged_latencies,std::unordered_map<int,int64_t> & merged_statuses,ScenarioResult & result)251*cc02d7e2SAndroid Build Coastguard Worker static void ReceiveFinalStatusFromClients(
252*cc02d7e2SAndroid Build Coastguard Worker const std::vector<ClientData>& clients, Histogram& merged_latencies,
253*cc02d7e2SAndroid Build Coastguard Worker std::unordered_map<int, int64_t>& merged_statuses, ScenarioResult& result) {
254*cc02d7e2SAndroid Build Coastguard Worker gpr_log(GPR_INFO, "Receiving final status from clients");
255*cc02d7e2SAndroid Build Coastguard Worker ClientStatus client_status;
256*cc02d7e2SAndroid Build Coastguard Worker for (size_t i = 0, i_end = clients.size(); i < i_end; i++) {
257*cc02d7e2SAndroid Build Coastguard Worker auto client = &clients[i];
258*cc02d7e2SAndroid Build Coastguard Worker // Read the client final status
259*cc02d7e2SAndroid Build Coastguard Worker if (client->stream->Read(&client_status)) {
260*cc02d7e2SAndroid Build Coastguard Worker gpr_log(GPR_INFO, "Received final status from client %zu", i);
261*cc02d7e2SAndroid Build Coastguard Worker const auto& stats = client_status.stats();
262*cc02d7e2SAndroid Build Coastguard Worker merged_latencies.MergeProto(stats.latencies());
263*cc02d7e2SAndroid Build Coastguard Worker for (int i = 0; i < stats.request_results_size(); i++) {
264*cc02d7e2SAndroid Build Coastguard Worker merged_statuses[stats.request_results(i).status_code()] +=
265*cc02d7e2SAndroid Build Coastguard Worker stats.request_results(i).count();
266*cc02d7e2SAndroid Build Coastguard Worker }
267*cc02d7e2SAndroid Build Coastguard Worker result.add_client_stats()->CopyFrom(stats);
268*cc02d7e2SAndroid Build Coastguard Worker // Check that final status was should be the last message on the client
269*cc02d7e2SAndroid Build Coastguard Worker // stream.
270*cc02d7e2SAndroid Build Coastguard Worker // TODO(jtattermusch): note that that waiting for Read to return can take
271*cc02d7e2SAndroid Build Coastguard Worker // long on some scenarios (e.g. unconstrained streaming_from_server). See
272*cc02d7e2SAndroid Build Coastguard Worker // https://github.com/grpc/grpc/blob/3bd0cd208ea549760a2daf595f79b91b247fe240/test/cpp/qps/server_async.cc#L176
273*cc02d7e2SAndroid Build Coastguard Worker // where the shutdown delay pretty much determines the wait here.
274*cc02d7e2SAndroid Build Coastguard Worker GPR_ASSERT(!client->stream->Read(&client_status));
275*cc02d7e2SAndroid Build Coastguard Worker } else {
276*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Crash(
277*cc02d7e2SAndroid Build Coastguard Worker absl::StrFormat("Couldn't get final status from client %zu", i));
278*cc02d7e2SAndroid Build Coastguard Worker }
279*cc02d7e2SAndroid Build Coastguard Worker }
280*cc02d7e2SAndroid Build Coastguard Worker }
281*cc02d7e2SAndroid Build Coastguard Worker
ShutdownClients(const std::vector<ClientData> & clients,ScenarioResult & result)282*cc02d7e2SAndroid Build Coastguard Worker static void ShutdownClients(const std::vector<ClientData>& clients,
283*cc02d7e2SAndroid Build Coastguard Worker ScenarioResult& result) {
284*cc02d7e2SAndroid Build Coastguard Worker gpr_log(GPR_INFO, "Shutdown clients");
285*cc02d7e2SAndroid Build Coastguard Worker for (size_t i = 0, i_end = clients.size(); i < i_end; i++) {
286*cc02d7e2SAndroid Build Coastguard Worker auto client = &clients[i];
287*cc02d7e2SAndroid Build Coastguard Worker Status s = client->stream->Finish();
288*cc02d7e2SAndroid Build Coastguard Worker // Since we shutdown servers and clients at the same time, clients can
289*cc02d7e2SAndroid Build Coastguard Worker // observe cancellation. Thus, we consider both OK and CANCELLED as good
290*cc02d7e2SAndroid Build Coastguard Worker // status.
291*cc02d7e2SAndroid Build Coastguard Worker const bool success = IsSuccess(s);
292*cc02d7e2SAndroid Build Coastguard Worker result.add_client_success(success);
293*cc02d7e2SAndroid Build Coastguard Worker if (!success) {
294*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Crash(absl::StrFormat("Client %zu had an error %s", i,
295*cc02d7e2SAndroid Build Coastguard Worker s.error_message().c_str()));
296*cc02d7e2SAndroid Build Coastguard Worker }
297*cc02d7e2SAndroid Build Coastguard Worker }
298*cc02d7e2SAndroid Build Coastguard Worker }
299*cc02d7e2SAndroid Build Coastguard Worker
FinishServers(const std::vector<ServerData> & servers,const ServerArgs & server_mark)300*cc02d7e2SAndroid Build Coastguard Worker static void FinishServers(const std::vector<ServerData>& servers,
301*cc02d7e2SAndroid Build Coastguard Worker const ServerArgs& server_mark) {
302*cc02d7e2SAndroid Build Coastguard Worker gpr_log(GPR_INFO, "Finishing servers");
303*cc02d7e2SAndroid Build Coastguard Worker for (size_t i = 0, i_end = servers.size(); i < i_end; i++) {
304*cc02d7e2SAndroid Build Coastguard Worker auto server = &servers[i];
305*cc02d7e2SAndroid Build Coastguard Worker if (!server->stream->Write(server_mark)) {
306*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Crash(absl::StrFormat("Couldn't write mark to server %zu", i));
307*cc02d7e2SAndroid Build Coastguard Worker }
308*cc02d7e2SAndroid Build Coastguard Worker if (!server->stream->WritesDone()) {
309*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Crash(absl::StrFormat("Failed WritesDone for server %zu", i));
310*cc02d7e2SAndroid Build Coastguard Worker }
311*cc02d7e2SAndroid Build Coastguard Worker }
312*cc02d7e2SAndroid Build Coastguard Worker }
313*cc02d7e2SAndroid Build Coastguard Worker
ReceiveFinalStatusFromServer(const std::vector<ServerData> & servers,ScenarioResult & result)314*cc02d7e2SAndroid Build Coastguard Worker static void ReceiveFinalStatusFromServer(const std::vector<ServerData>& servers,
315*cc02d7e2SAndroid Build Coastguard Worker ScenarioResult& result) {
316*cc02d7e2SAndroid Build Coastguard Worker gpr_log(GPR_INFO, "Receiving final status from servers");
317*cc02d7e2SAndroid Build Coastguard Worker ServerStatus server_status;
318*cc02d7e2SAndroid Build Coastguard Worker for (size_t i = 0, i_end = servers.size(); i < i_end; i++) {
319*cc02d7e2SAndroid Build Coastguard Worker auto server = &servers[i];
320*cc02d7e2SAndroid Build Coastguard Worker // Read the server final status
321*cc02d7e2SAndroid Build Coastguard Worker if (server->stream->Read(&server_status)) {
322*cc02d7e2SAndroid Build Coastguard Worker gpr_log(GPR_INFO, "Received final status from server %zu", i);
323*cc02d7e2SAndroid Build Coastguard Worker result.add_server_stats()->CopyFrom(server_status.stats());
324*cc02d7e2SAndroid Build Coastguard Worker result.add_server_cores(server_status.cores());
325*cc02d7e2SAndroid Build Coastguard Worker // That final status should be the last message on the server stream
326*cc02d7e2SAndroid Build Coastguard Worker GPR_ASSERT(!server->stream->Read(&server_status));
327*cc02d7e2SAndroid Build Coastguard Worker } else {
328*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Crash(
329*cc02d7e2SAndroid Build Coastguard Worker absl::StrFormat("Couldn't get final status from server %zu", i));
330*cc02d7e2SAndroid Build Coastguard Worker }
331*cc02d7e2SAndroid Build Coastguard Worker }
332*cc02d7e2SAndroid Build Coastguard Worker }
333*cc02d7e2SAndroid Build Coastguard Worker
ShutdownServers(const std::vector<ServerData> & servers,ScenarioResult & result)334*cc02d7e2SAndroid Build Coastguard Worker static void ShutdownServers(const std::vector<ServerData>& servers,
335*cc02d7e2SAndroid Build Coastguard Worker ScenarioResult& result) {
336*cc02d7e2SAndroid Build Coastguard Worker gpr_log(GPR_INFO, "Shutdown servers");
337*cc02d7e2SAndroid Build Coastguard Worker for (size_t i = 0, i_end = servers.size(); i < i_end; i++) {
338*cc02d7e2SAndroid Build Coastguard Worker auto server = &servers[i];
339*cc02d7e2SAndroid Build Coastguard Worker Status s = server->stream->Finish();
340*cc02d7e2SAndroid Build Coastguard Worker // Since we shutdown servers and clients at the same time, servers can
341*cc02d7e2SAndroid Build Coastguard Worker // observe cancellation. Thus, we consider both OK and CANCELLED as good
342*cc02d7e2SAndroid Build Coastguard Worker // status.
343*cc02d7e2SAndroid Build Coastguard Worker const bool success = IsSuccess(s);
344*cc02d7e2SAndroid Build Coastguard Worker result.add_server_success(success);
345*cc02d7e2SAndroid Build Coastguard Worker if (!success) {
346*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Crash(absl::StrFormat("Server %zu had an error %s", i,
347*cc02d7e2SAndroid Build Coastguard Worker s.error_message().c_str()));
348*cc02d7e2SAndroid Build Coastguard Worker }
349*cc02d7e2SAndroid Build Coastguard Worker }
350*cc02d7e2SAndroid Build Coastguard Worker }
351*cc02d7e2SAndroid Build Coastguard Worker
352*cc02d7e2SAndroid Build Coastguard Worker std::vector<grpc::testing::Server*>* g_inproc_servers = nullptr;
353*cc02d7e2SAndroid Build Coastguard Worker
RunScenario(const ClientConfig & initial_client_config,size_t num_clients,const ServerConfig & initial_server_config,size_t num_servers,int warmup_seconds,int benchmark_seconds,int spawn_local_worker_count,const std::string & qps_server_target_override,const std::string & credential_type,const std::map<std::string,std::string> & per_worker_credential_types,bool run_inproc,int32_t median_latency_collection_interval_millis)354*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<ScenarioResult> RunScenario(
355*cc02d7e2SAndroid Build Coastguard Worker const ClientConfig& initial_client_config, size_t num_clients,
356*cc02d7e2SAndroid Build Coastguard Worker const ServerConfig& initial_server_config, size_t num_servers,
357*cc02d7e2SAndroid Build Coastguard Worker int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count,
358*cc02d7e2SAndroid Build Coastguard Worker const std::string& qps_server_target_override,
359*cc02d7e2SAndroid Build Coastguard Worker const std::string& credential_type,
360*cc02d7e2SAndroid Build Coastguard Worker const std::map<std::string, std::string>& per_worker_credential_types,
361*cc02d7e2SAndroid Build Coastguard Worker bool run_inproc, int32_t median_latency_collection_interval_millis) {
362*cc02d7e2SAndroid Build Coastguard Worker if (run_inproc) {
363*cc02d7e2SAndroid Build Coastguard Worker g_inproc_servers = new std::vector<grpc::testing::Server*>;
364*cc02d7e2SAndroid Build Coastguard Worker }
365*cc02d7e2SAndroid Build Coastguard Worker // Log everything from the driver
366*cc02d7e2SAndroid Build Coastguard Worker gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
367*cc02d7e2SAndroid Build Coastguard Worker
368*cc02d7e2SAndroid Build Coastguard Worker // ClientContext allocations (all are destroyed at scope exit)
369*cc02d7e2SAndroid Build Coastguard Worker list<ClientContext> contexts;
370*cc02d7e2SAndroid Build Coastguard Worker auto alloc_context = [](list<ClientContext>* contexts) {
371*cc02d7e2SAndroid Build Coastguard Worker contexts->emplace_back();
372*cc02d7e2SAndroid Build Coastguard Worker auto context = &contexts->back();
373*cc02d7e2SAndroid Build Coastguard Worker context->set_wait_for_ready(true);
374*cc02d7e2SAndroid Build Coastguard Worker return context;
375*cc02d7e2SAndroid Build Coastguard Worker };
376*cc02d7e2SAndroid Build Coastguard Worker
377*cc02d7e2SAndroid Build Coastguard Worker // To be added to the result, containing the final configuration used for
378*cc02d7e2SAndroid Build Coastguard Worker // client and config (including host, etc.)
379*cc02d7e2SAndroid Build Coastguard Worker ClientConfig result_client_config;
380*cc02d7e2SAndroid Build Coastguard Worker
381*cc02d7e2SAndroid Build Coastguard Worker // Get client, server lists; ignore if inproc test
382*cc02d7e2SAndroid Build Coastguard Worker auto workers = (!run_inproc) ? get_workers("QPS_WORKERS") : deque<string>();
383*cc02d7e2SAndroid Build Coastguard Worker ClientConfig client_config = initial_client_config;
384*cc02d7e2SAndroid Build Coastguard Worker
385*cc02d7e2SAndroid Build Coastguard Worker // Spawn some local workers if desired
386*cc02d7e2SAndroid Build Coastguard Worker vector<unique_ptr<QpsWorker>> local_workers;
387*cc02d7e2SAndroid Build Coastguard Worker for (int i = 0; i < abs(spawn_local_worker_count); i++) {
388*cc02d7e2SAndroid Build Coastguard Worker // act as if we're a new test -- gets a good rng seed
389*cc02d7e2SAndroid Build Coastguard Worker static bool called_init = false;
390*cc02d7e2SAndroid Build Coastguard Worker if (!called_init) {
391*cc02d7e2SAndroid Build Coastguard Worker char args_buf[100];
392*cc02d7e2SAndroid Build Coastguard Worker strcpy(args_buf, "some-benchmark");
393*cc02d7e2SAndroid Build Coastguard Worker char* args[] = {args_buf};
394*cc02d7e2SAndroid Build Coastguard Worker int argc = 1;
395*cc02d7e2SAndroid Build Coastguard Worker grpc_test_init(&argc, args);
396*cc02d7e2SAndroid Build Coastguard Worker called_init = true;
397*cc02d7e2SAndroid Build Coastguard Worker }
398*cc02d7e2SAndroid Build Coastguard Worker
399*cc02d7e2SAndroid Build Coastguard Worker char addr[256];
400*cc02d7e2SAndroid Build Coastguard Worker // we use port # of -1 to indicate inproc
401*cc02d7e2SAndroid Build Coastguard Worker int driver_port = (!run_inproc) ? grpc_pick_unused_port_or_die() : -1;
402*cc02d7e2SAndroid Build Coastguard Worker local_workers.emplace_back(new QpsWorker(driver_port, 0, credential_type));
403*cc02d7e2SAndroid Build Coastguard Worker sprintf(addr, "localhost:%d", driver_port);
404*cc02d7e2SAndroid Build Coastguard Worker if (spawn_local_worker_count < 0) {
405*cc02d7e2SAndroid Build Coastguard Worker workers.push_front(addr);
406*cc02d7e2SAndroid Build Coastguard Worker } else {
407*cc02d7e2SAndroid Build Coastguard Worker workers.push_back(addr);
408*cc02d7e2SAndroid Build Coastguard Worker }
409*cc02d7e2SAndroid Build Coastguard Worker }
410*cc02d7e2SAndroid Build Coastguard Worker GPR_ASSERT(!workers.empty());
411*cc02d7e2SAndroid Build Coastguard Worker
412*cc02d7e2SAndroid Build Coastguard Worker // if num_clients is set to <=0, do dynamic sizing: all workers
413*cc02d7e2SAndroid Build Coastguard Worker // except for servers are clients
414*cc02d7e2SAndroid Build Coastguard Worker if (num_clients <= 0) {
415*cc02d7e2SAndroid Build Coastguard Worker num_clients = workers.size() - num_servers;
416*cc02d7e2SAndroid Build Coastguard Worker }
417*cc02d7e2SAndroid Build Coastguard Worker
418*cc02d7e2SAndroid Build Coastguard Worker // TODO(ctiller): support running multiple configurations, and binpack
419*cc02d7e2SAndroid Build Coastguard Worker // client/server pairs
420*cc02d7e2SAndroid Build Coastguard Worker // to available workers
421*cc02d7e2SAndroid Build Coastguard Worker GPR_ASSERT(workers.size() >= num_clients + num_servers);
422*cc02d7e2SAndroid Build Coastguard Worker
423*cc02d7e2SAndroid Build Coastguard Worker // Trim to just what we need
424*cc02d7e2SAndroid Build Coastguard Worker workers.resize(num_clients + num_servers);
425*cc02d7e2SAndroid Build Coastguard Worker
426*cc02d7e2SAndroid Build Coastguard Worker // Start servers
427*cc02d7e2SAndroid Build Coastguard Worker std::vector<ServerData> servers(num_servers);
428*cc02d7e2SAndroid Build Coastguard Worker std::unordered_map<string, std::deque<int>> hosts_cores;
429*cc02d7e2SAndroid Build Coastguard Worker ChannelArguments channel_args;
430*cc02d7e2SAndroid Build Coastguard Worker
431*cc02d7e2SAndroid Build Coastguard Worker for (size_t i = 0; i < num_servers; i++) {
432*cc02d7e2SAndroid Build Coastguard Worker gpr_log(GPR_INFO, "Starting server on %s (worker #%" PRIuPTR ")",
433*cc02d7e2SAndroid Build Coastguard Worker workers[i].c_str(), i);
434*cc02d7e2SAndroid Build Coastguard Worker if (!run_inproc) {
435*cc02d7e2SAndroid Build Coastguard Worker servers[i].stub = WorkerService::NewStub(grpc::CreateTestChannel(
436*cc02d7e2SAndroid Build Coastguard Worker workers[i],
437*cc02d7e2SAndroid Build Coastguard Worker GetCredType(workers[i], per_worker_credential_types, credential_type),
438*cc02d7e2SAndroid Build Coastguard Worker nullptr /* call creds */, {} /* interceptor creators */));
439*cc02d7e2SAndroid Build Coastguard Worker } else {
440*cc02d7e2SAndroid Build Coastguard Worker servers[i].stub = WorkerService::NewStub(
441*cc02d7e2SAndroid Build Coastguard Worker local_workers[i]->InProcessChannel(channel_args));
442*cc02d7e2SAndroid Build Coastguard Worker }
443*cc02d7e2SAndroid Build Coastguard Worker
444*cc02d7e2SAndroid Build Coastguard Worker const ServerConfig& server_config = initial_server_config;
445*cc02d7e2SAndroid Build Coastguard Worker if (server_config.core_limit() != 0) {
446*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Crash("server config core limit is set but ignored by driver");
447*cc02d7e2SAndroid Build Coastguard Worker }
448*cc02d7e2SAndroid Build Coastguard Worker
449*cc02d7e2SAndroid Build Coastguard Worker ServerArgs args;
450*cc02d7e2SAndroid Build Coastguard Worker *args.mutable_setup() = server_config;
451*cc02d7e2SAndroid Build Coastguard Worker servers[i].stream = servers[i].stub->RunServer(alloc_context(&contexts));
452*cc02d7e2SAndroid Build Coastguard Worker if (!servers[i].stream->Write(args)) {
453*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Crash(
454*cc02d7e2SAndroid Build Coastguard Worker absl::StrFormat("Could not write args to server %zu", i));
455*cc02d7e2SAndroid Build Coastguard Worker }
456*cc02d7e2SAndroid Build Coastguard Worker ServerStatus init_status;
457*cc02d7e2SAndroid Build Coastguard Worker if (!servers[i].stream->Read(&init_status)) {
458*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Crash(
459*cc02d7e2SAndroid Build Coastguard Worker absl::StrFormat("Server %zu did not yield initial status", i));
460*cc02d7e2SAndroid Build Coastguard Worker }
461*cc02d7e2SAndroid Build Coastguard Worker if (run_inproc) {
462*cc02d7e2SAndroid Build Coastguard Worker std::string cli_target(INPROC_NAME_PREFIX);
463*cc02d7e2SAndroid Build Coastguard Worker cli_target += std::to_string(i);
464*cc02d7e2SAndroid Build Coastguard Worker client_config.add_server_targets(cli_target);
465*cc02d7e2SAndroid Build Coastguard Worker } else {
466*cc02d7e2SAndroid Build Coastguard Worker std::string host = get_host(workers[i]);
467*cc02d7e2SAndroid Build Coastguard Worker std::string cli_target =
468*cc02d7e2SAndroid Build Coastguard Worker grpc_core::JoinHostPort(host.c_str(), init_status.port());
469*cc02d7e2SAndroid Build Coastguard Worker client_config.add_server_targets(cli_target.c_str());
470*cc02d7e2SAndroid Build Coastguard Worker }
471*cc02d7e2SAndroid Build Coastguard Worker }
472*cc02d7e2SAndroid Build Coastguard Worker if (qps_server_target_override.length() > 0) {
473*cc02d7e2SAndroid Build Coastguard Worker // overriding the qps server target only makes since if there is <= 1
474*cc02d7e2SAndroid Build Coastguard Worker // servers
475*cc02d7e2SAndroid Build Coastguard Worker GPR_ASSERT(num_servers <= 1);
476*cc02d7e2SAndroid Build Coastguard Worker client_config.clear_server_targets();
477*cc02d7e2SAndroid Build Coastguard Worker client_config.add_server_targets(qps_server_target_override);
478*cc02d7e2SAndroid Build Coastguard Worker }
479*cc02d7e2SAndroid Build Coastguard Worker client_config.set_median_latency_collection_interval_millis(
480*cc02d7e2SAndroid Build Coastguard Worker median_latency_collection_interval_millis);
481*cc02d7e2SAndroid Build Coastguard Worker
482*cc02d7e2SAndroid Build Coastguard Worker // Targets are all set by now
483*cc02d7e2SAndroid Build Coastguard Worker result_client_config = client_config;
484*cc02d7e2SAndroid Build Coastguard Worker // Start clients
485*cc02d7e2SAndroid Build Coastguard Worker std::vector<ClientData> clients(num_clients);
486*cc02d7e2SAndroid Build Coastguard Worker size_t channels_allocated = 0;
487*cc02d7e2SAndroid Build Coastguard Worker for (size_t i = 0; i < num_clients; i++) {
488*cc02d7e2SAndroid Build Coastguard Worker const auto& worker = workers[i + num_servers];
489*cc02d7e2SAndroid Build Coastguard Worker gpr_log(GPR_INFO, "Starting client on %s (worker #%" PRIuPTR ")",
490*cc02d7e2SAndroid Build Coastguard Worker worker.c_str(), i + num_servers);
491*cc02d7e2SAndroid Build Coastguard Worker if (!run_inproc) {
492*cc02d7e2SAndroid Build Coastguard Worker clients[i].stub = WorkerService::NewStub(grpc::CreateTestChannel(
493*cc02d7e2SAndroid Build Coastguard Worker worker,
494*cc02d7e2SAndroid Build Coastguard Worker GetCredType(worker, per_worker_credential_types, credential_type),
495*cc02d7e2SAndroid Build Coastguard Worker nullptr /* call creds */, {} /* interceptor creators */));
496*cc02d7e2SAndroid Build Coastguard Worker } else {
497*cc02d7e2SAndroid Build Coastguard Worker clients[i].stub = WorkerService::NewStub(
498*cc02d7e2SAndroid Build Coastguard Worker local_workers[i + num_servers]->InProcessChannel(channel_args));
499*cc02d7e2SAndroid Build Coastguard Worker }
500*cc02d7e2SAndroid Build Coastguard Worker ClientConfig per_client_config = client_config;
501*cc02d7e2SAndroid Build Coastguard Worker
502*cc02d7e2SAndroid Build Coastguard Worker if (initial_client_config.core_limit() != 0) {
503*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Crash("client config core limit set but ignored");
504*cc02d7e2SAndroid Build Coastguard Worker }
505*cc02d7e2SAndroid Build Coastguard Worker
506*cc02d7e2SAndroid Build Coastguard Worker // Reduce channel count so that total channels specified is held regardless
507*cc02d7e2SAndroid Build Coastguard Worker // of the number of clients available
508*cc02d7e2SAndroid Build Coastguard Worker size_t num_channels =
509*cc02d7e2SAndroid Build Coastguard Worker (client_config.client_channels() - channels_allocated) /
510*cc02d7e2SAndroid Build Coastguard Worker (num_clients - i);
511*cc02d7e2SAndroid Build Coastguard Worker channels_allocated += num_channels;
512*cc02d7e2SAndroid Build Coastguard Worker gpr_log(GPR_DEBUG, "Client %" PRIdPTR " gets %" PRIdPTR " channels", i,
513*cc02d7e2SAndroid Build Coastguard Worker num_channels);
514*cc02d7e2SAndroid Build Coastguard Worker per_client_config.set_client_channels(num_channels);
515*cc02d7e2SAndroid Build Coastguard Worker
516*cc02d7e2SAndroid Build Coastguard Worker ClientArgs args;
517*cc02d7e2SAndroid Build Coastguard Worker *args.mutable_setup() = per_client_config;
518*cc02d7e2SAndroid Build Coastguard Worker clients[i].stream = clients[i].stub->RunClient(alloc_context(&contexts));
519*cc02d7e2SAndroid Build Coastguard Worker if (!clients[i].stream->Write(args)) {
520*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Crash(
521*cc02d7e2SAndroid Build Coastguard Worker absl::StrFormat("Could not write args to client %zu", i));
522*cc02d7e2SAndroid Build Coastguard Worker }
523*cc02d7e2SAndroid Build Coastguard Worker }
524*cc02d7e2SAndroid Build Coastguard Worker
525*cc02d7e2SAndroid Build Coastguard Worker for (size_t i = 0; i < num_clients; i++) {
526*cc02d7e2SAndroid Build Coastguard Worker ClientStatus init_status;
527*cc02d7e2SAndroid Build Coastguard Worker if (!clients[i].stream->Read(&init_status)) {
528*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Crash(
529*cc02d7e2SAndroid Build Coastguard Worker absl::StrFormat("Client %zu did not yield initial status", i));
530*cc02d7e2SAndroid Build Coastguard Worker }
531*cc02d7e2SAndroid Build Coastguard Worker }
532*cc02d7e2SAndroid Build Coastguard Worker
533*cc02d7e2SAndroid Build Coastguard Worker // Send an initial mark: clients can use this to know that everything is ready
534*cc02d7e2SAndroid Build Coastguard Worker // to start
535*cc02d7e2SAndroid Build Coastguard Worker gpr_log(GPR_INFO, "Initiating");
536*cc02d7e2SAndroid Build Coastguard Worker ServerArgs server_mark;
537*cc02d7e2SAndroid Build Coastguard Worker server_mark.mutable_mark()->set_reset(true);
538*cc02d7e2SAndroid Build Coastguard Worker ClientArgs client_mark;
539*cc02d7e2SAndroid Build Coastguard Worker client_mark.mutable_mark()->set_reset(true);
540*cc02d7e2SAndroid Build Coastguard Worker ServerStatus server_status;
541*cc02d7e2SAndroid Build Coastguard Worker ClientStatus client_status;
542*cc02d7e2SAndroid Build Coastguard Worker for (size_t i = 0; i < num_clients; i++) {
543*cc02d7e2SAndroid Build Coastguard Worker auto client = &clients[i];
544*cc02d7e2SAndroid Build Coastguard Worker if (!client->stream->Write(client_mark)) {
545*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Crash(absl::StrFormat("Couldn't write mark to client %zu", i));
546*cc02d7e2SAndroid Build Coastguard Worker }
547*cc02d7e2SAndroid Build Coastguard Worker }
548*cc02d7e2SAndroid Build Coastguard Worker for (size_t i = 0; i < num_clients; i++) {
549*cc02d7e2SAndroid Build Coastguard Worker auto client = &clients[i];
550*cc02d7e2SAndroid Build Coastguard Worker if (!client->stream->Read(&client_status)) {
551*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Crash(
552*cc02d7e2SAndroid Build Coastguard Worker absl::StrFormat("Couldn't get status from client %zu", i));
553*cc02d7e2SAndroid Build Coastguard Worker }
554*cc02d7e2SAndroid Build Coastguard Worker }
555*cc02d7e2SAndroid Build Coastguard Worker
556*cc02d7e2SAndroid Build Coastguard Worker // Let everything warmup
557*cc02d7e2SAndroid Build Coastguard Worker gpr_log(GPR_INFO, "Warming up");
558*cc02d7e2SAndroid Build Coastguard Worker gpr_timespec start = gpr_now(GPR_CLOCK_REALTIME);
559*cc02d7e2SAndroid Build Coastguard Worker gpr_sleep_until(
560*cc02d7e2SAndroid Build Coastguard Worker gpr_time_add(start, gpr_time_from_seconds(warmup_seconds, GPR_TIMESPAN)));
561*cc02d7e2SAndroid Build Coastguard Worker
562*cc02d7e2SAndroid Build Coastguard Worker // Start a run
563*cc02d7e2SAndroid Build Coastguard Worker gpr_log(GPR_INFO, "Starting");
564*cc02d7e2SAndroid Build Coastguard Worker
565*cc02d7e2SAndroid Build Coastguard Worker auto start_time = time(nullptr);
566*cc02d7e2SAndroid Build Coastguard Worker
567*cc02d7e2SAndroid Build Coastguard Worker for (size_t i = 0; i < num_servers; i++) {
568*cc02d7e2SAndroid Build Coastguard Worker auto server = &servers[i];
569*cc02d7e2SAndroid Build Coastguard Worker if (!server->stream->Write(server_mark)) {
570*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Crash(absl::StrFormat("Couldn't write mark to server %zu", i));
571*cc02d7e2SAndroid Build Coastguard Worker }
572*cc02d7e2SAndroid Build Coastguard Worker }
573*cc02d7e2SAndroid Build Coastguard Worker for (size_t i = 0; i < num_clients; i++) {
574*cc02d7e2SAndroid Build Coastguard Worker auto client = &clients[i];
575*cc02d7e2SAndroid Build Coastguard Worker if (!client->stream->Write(client_mark)) {
576*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Crash(absl::StrFormat("Couldn't write mark to client %zu", i));
577*cc02d7e2SAndroid Build Coastguard Worker }
578*cc02d7e2SAndroid Build Coastguard Worker }
579*cc02d7e2SAndroid Build Coastguard Worker for (size_t i = 0; i < num_servers; i++) {
580*cc02d7e2SAndroid Build Coastguard Worker auto server = &servers[i];
581*cc02d7e2SAndroid Build Coastguard Worker if (!server->stream->Read(&server_status)) {
582*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Crash(
583*cc02d7e2SAndroid Build Coastguard Worker absl::StrFormat("Couldn't get status from server %zu", i));
584*cc02d7e2SAndroid Build Coastguard Worker }
585*cc02d7e2SAndroid Build Coastguard Worker }
586*cc02d7e2SAndroid Build Coastguard Worker for (size_t i = 0; i < num_clients; i++) {
587*cc02d7e2SAndroid Build Coastguard Worker auto client = &clients[i];
588*cc02d7e2SAndroid Build Coastguard Worker if (!client->stream->Read(&client_status)) {
589*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Crash(
590*cc02d7e2SAndroid Build Coastguard Worker absl::StrFormat("Couldn't get status from client %zu", i));
591*cc02d7e2SAndroid Build Coastguard Worker }
592*cc02d7e2SAndroid Build Coastguard Worker }
593*cc02d7e2SAndroid Build Coastguard Worker
594*cc02d7e2SAndroid Build Coastguard Worker // Wait some time
595*cc02d7e2SAndroid Build Coastguard Worker gpr_log(GPR_INFO, "Running");
596*cc02d7e2SAndroid Build Coastguard Worker // Use gpr_sleep_until rather than this_thread::sleep_until to support
597*cc02d7e2SAndroid Build Coastguard Worker // compilers that don't work with this_thread
598*cc02d7e2SAndroid Build Coastguard Worker gpr_sleep_until(gpr_time_add(
599*cc02d7e2SAndroid Build Coastguard Worker start,
600*cc02d7e2SAndroid Build Coastguard Worker gpr_time_from_seconds(warmup_seconds + benchmark_seconds, GPR_TIMESPAN)));
601*cc02d7e2SAndroid Build Coastguard Worker
602*cc02d7e2SAndroid Build Coastguard Worker // Finish a run
603*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<ScenarioResult> result(new ScenarioResult);
604*cc02d7e2SAndroid Build Coastguard Worker Histogram merged_latencies;
605*cc02d7e2SAndroid Build Coastguard Worker std::unordered_map<int, int64_t> merged_statuses;
606*cc02d7e2SAndroid Build Coastguard Worker
607*cc02d7e2SAndroid Build Coastguard Worker // For the case where clients lead the test such as UNARY and
608*cc02d7e2SAndroid Build Coastguard Worker // STREAMING_FROM_CLIENT, clients need to finish completely while a server
609*cc02d7e2SAndroid Build Coastguard Worker // is running to prevent the clients from being stuck while waiting for
610*cc02d7e2SAndroid Build Coastguard Worker // the result.
611*cc02d7e2SAndroid Build Coastguard Worker bool client_finish_first =
612*cc02d7e2SAndroid Build Coastguard Worker (client_config.rpc_type() != STREAMING_FROM_SERVER);
613*cc02d7e2SAndroid Build Coastguard Worker
614*cc02d7e2SAndroid Build Coastguard Worker auto end_time = time(nullptr);
615*cc02d7e2SAndroid Build Coastguard Worker
616*cc02d7e2SAndroid Build Coastguard Worker FinishClients(clients, client_mark);
617*cc02d7e2SAndroid Build Coastguard Worker
618*cc02d7e2SAndroid Build Coastguard Worker if (!client_finish_first) {
619*cc02d7e2SAndroid Build Coastguard Worker FinishServers(servers, server_mark);
620*cc02d7e2SAndroid Build Coastguard Worker }
621*cc02d7e2SAndroid Build Coastguard Worker
622*cc02d7e2SAndroid Build Coastguard Worker ReceiveFinalStatusFromClients(clients, merged_latencies, merged_statuses,
623*cc02d7e2SAndroid Build Coastguard Worker *result);
624*cc02d7e2SAndroid Build Coastguard Worker ShutdownClients(clients, *result);
625*cc02d7e2SAndroid Build Coastguard Worker
626*cc02d7e2SAndroid Build Coastguard Worker if (client_finish_first) {
627*cc02d7e2SAndroid Build Coastguard Worker FinishServers(servers, server_mark);
628*cc02d7e2SAndroid Build Coastguard Worker }
629*cc02d7e2SAndroid Build Coastguard Worker
630*cc02d7e2SAndroid Build Coastguard Worker ReceiveFinalStatusFromServer(servers, *result);
631*cc02d7e2SAndroid Build Coastguard Worker ShutdownServers(servers, *result);
632*cc02d7e2SAndroid Build Coastguard Worker
633*cc02d7e2SAndroid Build Coastguard Worker delete g_inproc_servers;
634*cc02d7e2SAndroid Build Coastguard Worker
635*cc02d7e2SAndroid Build Coastguard Worker merged_latencies.FillProto(result->mutable_latencies());
636*cc02d7e2SAndroid Build Coastguard Worker for (std::unordered_map<int, int64_t>::iterator it = merged_statuses.begin();
637*cc02d7e2SAndroid Build Coastguard Worker it != merged_statuses.end(); ++it) {
638*cc02d7e2SAndroid Build Coastguard Worker RequestResultCount* rrc = result->add_request_results();
639*cc02d7e2SAndroid Build Coastguard Worker rrc->set_status_code(it->first);
640*cc02d7e2SAndroid Build Coastguard Worker rrc->set_count(it->second);
641*cc02d7e2SAndroid Build Coastguard Worker }
642*cc02d7e2SAndroid Build Coastguard Worker
643*cc02d7e2SAndroid Build Coastguard Worker // Fill in start and end time for the test scenario
644*cc02d7e2SAndroid Build Coastguard Worker result->mutable_summary()->mutable_start_time()->set_seconds(start_time);
645*cc02d7e2SAndroid Build Coastguard Worker result->mutable_summary()->mutable_end_time()->set_seconds(end_time);
646*cc02d7e2SAndroid Build Coastguard Worker
647*cc02d7e2SAndroid Build Coastguard Worker postprocess_scenario_result(result.get());
648*cc02d7e2SAndroid Build Coastguard Worker return result;
649*cc02d7e2SAndroid Build Coastguard Worker }
650*cc02d7e2SAndroid Build Coastguard Worker
RunQuit(const std::string & credential_type,const std::map<std::string,std::string> & per_worker_credential_types)651*cc02d7e2SAndroid Build Coastguard Worker bool RunQuit(
652*cc02d7e2SAndroid Build Coastguard Worker const std::string& credential_type,
653*cc02d7e2SAndroid Build Coastguard Worker const std::map<std::string, std::string>& per_worker_credential_types) {
654*cc02d7e2SAndroid Build Coastguard Worker // Get client, server lists
655*cc02d7e2SAndroid Build Coastguard Worker bool result = true;
656*cc02d7e2SAndroid Build Coastguard Worker auto workers = get_workers("QPS_WORKERS");
657*cc02d7e2SAndroid Build Coastguard Worker if (workers.empty()) {
658*cc02d7e2SAndroid Build Coastguard Worker return false;
659*cc02d7e2SAndroid Build Coastguard Worker }
660*cc02d7e2SAndroid Build Coastguard Worker
661*cc02d7e2SAndroid Build Coastguard Worker for (size_t i = 0; i < workers.size(); i++) {
662*cc02d7e2SAndroid Build Coastguard Worker auto stub = WorkerService::NewStub(grpc::CreateTestChannel(
663*cc02d7e2SAndroid Build Coastguard Worker workers[i],
664*cc02d7e2SAndroid Build Coastguard Worker GetCredType(workers[i], per_worker_credential_types, credential_type),
665*cc02d7e2SAndroid Build Coastguard Worker nullptr /* call creds */, {} /* interceptor creators */));
666*cc02d7e2SAndroid Build Coastguard Worker Void phony;
667*cc02d7e2SAndroid Build Coastguard Worker grpc::ClientContext ctx;
668*cc02d7e2SAndroid Build Coastguard Worker ctx.set_wait_for_ready(true);
669*cc02d7e2SAndroid Build Coastguard Worker Status s = stub->QuitWorker(&ctx, phony, &phony);
670*cc02d7e2SAndroid Build Coastguard Worker if (!s.ok()) {
671*cc02d7e2SAndroid Build Coastguard Worker gpr_log(GPR_ERROR, "Worker %zu could not be properly quit because %s", i,
672*cc02d7e2SAndroid Build Coastguard Worker s.error_message().c_str());
673*cc02d7e2SAndroid Build Coastguard Worker result = false;
674*cc02d7e2SAndroid Build Coastguard Worker }
675*cc02d7e2SAndroid Build Coastguard Worker }
676*cc02d7e2SAndroid Build Coastguard Worker return result;
677*cc02d7e2SAndroid Build Coastguard Worker }
678*cc02d7e2SAndroid Build Coastguard Worker
679*cc02d7e2SAndroid Build Coastguard Worker } // namespace testing
680*cc02d7e2SAndroid Build Coastguard Worker } // namespace grpc
681