1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "test/cpp/qps/driver.h"
20
21 #include <cinttypes>
22 #include <deque>
23 #include <list>
24 #include <thread>
25 #include <unordered_map>
26 #include <vector>
27
28 #include "google/protobuf/timestamp.pb.h"
29
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/log.h>
32 #include <grpc/support/string_util.h>
33 #include <grpcpp/channel.h>
34 #include <grpcpp/client_context.h>
35 #include <grpcpp/create_channel.h>
36
37 #include "src/core/lib/gprpp/crash.h"
38 #include "src/core/lib/gprpp/env.h"
39 #include "src/core/lib/gprpp/host_port.h"
40 #include "src/proto/grpc/testing/worker_service.grpc.pb.h"
41 #include "test/core/util/port.h"
42 #include "test/core/util/test_config.h"
43 #include "test/cpp/qps/client.h"
44 #include "test/cpp/qps/histogram.h"
45 #include "test/cpp/qps/qps_worker.h"
46 #include "test/cpp/qps/stats.h"
47 #include "test/cpp/util/test_credentials_provider.h"
48
49 using std::deque;
50 using std::list;
51 using std::unique_ptr;
52 using std::vector;
53
54 namespace grpc {
55 namespace testing {
get_host(const std::string & worker)56 static std::string get_host(const std::string& worker) {
57 absl::string_view host;
58 absl::string_view port;
59 grpc_core::SplitHostPort(worker.c_str(), &host, &port);
60 return std::string(host.data(), host.size());
61 }
62
get_workers(const string & env_name)63 static deque<string> get_workers(const string& env_name) {
64 deque<string> out;
65 auto env = grpc_core::GetEnv(env_name.c_str()).value_or("");
66 const char* p = env.c_str();
67 if (!env.empty()) {
68 for (;;) {
69 const char* comma = strchr(p, ',');
70 if (comma) {
71 out.emplace_back(p, comma);
72 p = comma + 1;
73 } else {
74 out.emplace_back(p);
75 break;
76 }
77 }
78 }
79 if (out.empty()) {
80 gpr_log(GPR_ERROR,
81 "Environment variable \"%s\" does not contain a list of QPS "
82 "workers to use. Set it to a comma-separated list of "
83 "hostname:port pairs, starting with hosts that should act as "
84 "servers. E.g. export "
85 "%s=\"serverhost1:1234,clienthost1:1234,clienthost2:1234\"",
86 env_name.c_str(), env_name.c_str());
87 }
88 return out;
89 }
90
GetCredType(const std::string & worker_addr,const std::map<std::string,std::string> & per_worker_credential_types,const std::string & credential_type)91 std::string GetCredType(
92 const std::string& worker_addr,
93 const std::map<std::string, std::string>& per_worker_credential_types,
94 const std::string& credential_type) {
95 auto it = per_worker_credential_types.find(worker_addr);
96 if (it != per_worker_credential_types.end()) {
97 return it->second;
98 }
99 return credential_type;
100 }
101
102 // helpers for postprocess_scenario_result
WallTime(const ClientStats & s)103 static double WallTime(const ClientStats& s) { return s.time_elapsed(); }
SystemTime(const ClientStats & s)104 static double SystemTime(const ClientStats& s) { return s.time_system(); }
UserTime(const ClientStats & s)105 static double UserTime(const ClientStats& s) { return s.time_user(); }
CliPollCount(const ClientStats & s)106 static double CliPollCount(const ClientStats& s) { return s.cq_poll_count(); }
SvrPollCount(const ServerStats & s)107 static double SvrPollCount(const ServerStats& s) { return s.cq_poll_count(); }
ServerSystemTime(const ServerStats & s)108 static double ServerSystemTime(const ServerStats& s) { return s.time_system(); }
ServerUserTime(const ServerStats & s)109 static double ServerUserTime(const ServerStats& s) { return s.time_user(); }
ServerTotalCpuTime(const ServerStats & s)110 static double ServerTotalCpuTime(const ServerStats& s) {
111 return s.total_cpu_time();
112 }
ServerIdleCpuTime(const ServerStats & s)113 static double ServerIdleCpuTime(const ServerStats& s) {
114 return s.idle_cpu_time();
115 }
Cores(int n)116 static int Cores(int n) { return n; }
117
IsSuccess(const Status & s)118 static bool IsSuccess(const Status& s) {
119 if (s.ok()) return true;
120 // Since we shutdown servers and clients at the same time, they both can
121 // observe cancellation. Thus, we consider CANCELLED as good status.
122 if (static_cast<StatusCode>(s.error_code()) == StatusCode::CANCELLED) {
123 return true;
124 }
125 // Since we shutdown servers and clients at the same time, server can close
126 // the socket before the client attempts to do that, and vice versa. Thus
127 // receiving a "Socket closed" error is fine.
128 if (s.error_message() == "Socket closed") return true;
129 return false;
130 }
131
132 // Postprocess ScenarioResult and populate result summary.
postprocess_scenario_result(ScenarioResult * result)133 static void postprocess_scenario_result(ScenarioResult* result) {
134 // Get latencies from ScenarioResult latencies histogram and populate to
135 // result summary.
136 Histogram histogram;
137 histogram.MergeProto(result->latencies());
138 result->mutable_summary()->set_latency_50(histogram.Percentile(50));
139 result->mutable_summary()->set_latency_90(histogram.Percentile(90));
140 result->mutable_summary()->set_latency_95(histogram.Percentile(95));
141 result->mutable_summary()->set_latency_99(histogram.Percentile(99));
142 result->mutable_summary()->set_latency_999(histogram.Percentile(99.9));
143
144 // Calculate qps and cpu load for each client and then aggregate results for
145 // all clients
146 double qps = 0;
147 double client_system_cpu_load = 0, client_user_cpu_load = 0;
148 for (int i = 0; i < result->client_stats_size(); i++) {
149 auto client_stat = result->client_stats(i);
150 qps += client_stat.latencies().count() / client_stat.time_elapsed();
151 client_system_cpu_load +=
152 client_stat.time_system() / client_stat.time_elapsed();
153 client_user_cpu_load +=
154 client_stat.time_user() / client_stat.time_elapsed();
155 }
156 // Calculate cpu load for each server and then aggregate results for all
157 // servers
158 double server_system_cpu_load = 0, server_user_cpu_load = 0;
159 for (int i = 0; i < result->server_stats_size(); i++) {
160 auto server_stat = result->server_stats(i);
161 server_system_cpu_load +=
162 server_stat.time_system() / server_stat.time_elapsed();
163 server_user_cpu_load +=
164 server_stat.time_user() / server_stat.time_elapsed();
165 }
166 result->mutable_summary()->set_qps(qps);
167 // Populate the percentage of cpu load to result summary.
168 result->mutable_summary()->set_server_system_time(100 *
169 server_system_cpu_load);
170 result->mutable_summary()->set_server_user_time(100 * server_user_cpu_load);
171 result->mutable_summary()->set_client_system_time(100 *
172 client_system_cpu_load);
173 result->mutable_summary()->set_client_user_time(100 * client_user_cpu_load);
174
175 // For Non-linux platform, get_cpu_usage() is not implemented. Thus,
176 // ServerTotalCpuTime and ServerIdleCpuTime are both 0.
177 if (average(result->server_stats(), ServerTotalCpuTime) == 0) {
178 result->mutable_summary()->set_server_cpu_usage(0);
179 } else {
180 auto server_cpu_usage =
181 100 - 100 * average(result->server_stats(), ServerIdleCpuTime) /
182 average(result->server_stats(), ServerTotalCpuTime);
183 result->mutable_summary()->set_server_cpu_usage(server_cpu_usage);
184 }
185
186 // Calculate and populate successful request per second and failed requests
187 // per seconds to result summary.
188 auto time_estimate = average(result->client_stats(), WallTime);
189 if (result->request_results_size() > 0) {
190 int64_t successes = 0;
191 int64_t failures = 0;
192 for (int i = 0; i < result->request_results_size(); i++) {
193 const RequestResultCount& rrc = result->request_results(i);
194 if (rrc.status_code() == 0) {
195 successes += rrc.count();
196 } else {
197 failures += rrc.count();
198 }
199 }
200 result->mutable_summary()->set_successful_requests_per_second(
201 successes / time_estimate);
202 result->mutable_summary()->set_failed_requests_per_second(failures /
203 time_estimate);
204 }
205
206 // Fill in data for other metrics required in result summary
207 auto qps_per_server_core = qps / sum(result->server_cores(), Cores);
208 result->mutable_summary()->set_qps_per_server_core(qps_per_server_core);
209 result->mutable_summary()->set_client_polls_per_request(
210 sum(result->client_stats(), CliPollCount) / histogram.Count());
211 result->mutable_summary()->set_server_polls_per_request(
212 sum(result->server_stats(), SvrPollCount) / histogram.Count());
213
214 auto server_queries_per_cpu_sec =
215 histogram.Count() / (sum(result->server_stats(), ServerSystemTime) +
216 sum(result->server_stats(), ServerUserTime));
217 auto client_queries_per_cpu_sec =
218 histogram.Count() / (sum(result->client_stats(), SystemTime) +
219 sum(result->client_stats(), UserTime));
220
221 result->mutable_summary()->set_server_queries_per_cpu_sec(
222 server_queries_per_cpu_sec);
223 result->mutable_summary()->set_client_queries_per_cpu_sec(
224 client_queries_per_cpu_sec);
225 }
226
227 struct ClientData {
228 unique_ptr<WorkerService::Stub> stub;
229 unique_ptr<ClientReaderWriter<ClientArgs, ClientStatus>> stream;
230 };
231
232 struct ServerData {
233 unique_ptr<WorkerService::Stub> stub;
234 unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream;
235 };
236
FinishClients(const std::vector<ClientData> & clients,const ClientArgs & client_mark)237 static void FinishClients(const std::vector<ClientData>& clients,
238 const ClientArgs& client_mark) {
239 gpr_log(GPR_INFO, "Finishing clients");
240 for (size_t i = 0, i_end = clients.size(); i < i_end; i++) {
241 auto client = &clients[i];
242 if (!client->stream->Write(client_mark)) {
243 grpc_core::Crash(absl::StrFormat("Couldn't write mark to client %zu", i));
244 }
245 if (!client->stream->WritesDone()) {
246 grpc_core::Crash(absl::StrFormat("Failed WritesDone for client %zu", i));
247 }
248 }
249 }
250
ReceiveFinalStatusFromClients(const std::vector<ClientData> & clients,Histogram & merged_latencies,std::unordered_map<int,int64_t> & merged_statuses,ScenarioResult & result)251 static void ReceiveFinalStatusFromClients(
252 const std::vector<ClientData>& clients, Histogram& merged_latencies,
253 std::unordered_map<int, int64_t>& merged_statuses, ScenarioResult& result) {
254 gpr_log(GPR_INFO, "Receiving final status from clients");
255 ClientStatus client_status;
256 for (size_t i = 0, i_end = clients.size(); i < i_end; i++) {
257 auto client = &clients[i];
258 // Read the client final status
259 if (client->stream->Read(&client_status)) {
260 gpr_log(GPR_INFO, "Received final status from client %zu", i);
261 const auto& stats = client_status.stats();
262 merged_latencies.MergeProto(stats.latencies());
263 for (int i = 0; i < stats.request_results_size(); i++) {
264 merged_statuses[stats.request_results(i).status_code()] +=
265 stats.request_results(i).count();
266 }
267 result.add_client_stats()->CopyFrom(stats);
268 // Check that final status was should be the last message on the client
269 // stream.
270 // TODO(jtattermusch): note that that waiting for Read to return can take
271 // long on some scenarios (e.g. unconstrained streaming_from_server). See
272 // https://github.com/grpc/grpc/blob/3bd0cd208ea549760a2daf595f79b91b247fe240/test/cpp/qps/server_async.cc#L176
273 // where the shutdown delay pretty much determines the wait here.
274 GPR_ASSERT(!client->stream->Read(&client_status));
275 } else {
276 grpc_core::Crash(
277 absl::StrFormat("Couldn't get final status from client %zu", i));
278 }
279 }
280 }
281
ShutdownClients(const std::vector<ClientData> & clients,ScenarioResult & result)282 static void ShutdownClients(const std::vector<ClientData>& clients,
283 ScenarioResult& result) {
284 gpr_log(GPR_INFO, "Shutdown clients");
285 for (size_t i = 0, i_end = clients.size(); i < i_end; i++) {
286 auto client = &clients[i];
287 Status s = client->stream->Finish();
288 // Since we shutdown servers and clients at the same time, clients can
289 // observe cancellation. Thus, we consider both OK and CANCELLED as good
290 // status.
291 const bool success = IsSuccess(s);
292 result.add_client_success(success);
293 if (!success) {
294 grpc_core::Crash(absl::StrFormat("Client %zu had an error %s", i,
295 s.error_message().c_str()));
296 }
297 }
298 }
299
FinishServers(const std::vector<ServerData> & servers,const ServerArgs & server_mark)300 static void FinishServers(const std::vector<ServerData>& servers,
301 const ServerArgs& server_mark) {
302 gpr_log(GPR_INFO, "Finishing servers");
303 for (size_t i = 0, i_end = servers.size(); i < i_end; i++) {
304 auto server = &servers[i];
305 if (!server->stream->Write(server_mark)) {
306 grpc_core::Crash(absl::StrFormat("Couldn't write mark to server %zu", i));
307 }
308 if (!server->stream->WritesDone()) {
309 grpc_core::Crash(absl::StrFormat("Failed WritesDone for server %zu", i));
310 }
311 }
312 }
313
ReceiveFinalStatusFromServer(const std::vector<ServerData> & servers,ScenarioResult & result)314 static void ReceiveFinalStatusFromServer(const std::vector<ServerData>& servers,
315 ScenarioResult& result) {
316 gpr_log(GPR_INFO, "Receiving final status from servers");
317 ServerStatus server_status;
318 for (size_t i = 0, i_end = servers.size(); i < i_end; i++) {
319 auto server = &servers[i];
320 // Read the server final status
321 if (server->stream->Read(&server_status)) {
322 gpr_log(GPR_INFO, "Received final status from server %zu", i);
323 result.add_server_stats()->CopyFrom(server_status.stats());
324 result.add_server_cores(server_status.cores());
325 // That final status should be the last message on the server stream
326 GPR_ASSERT(!server->stream->Read(&server_status));
327 } else {
328 grpc_core::Crash(
329 absl::StrFormat("Couldn't get final status from server %zu", i));
330 }
331 }
332 }
333
ShutdownServers(const std::vector<ServerData> & servers,ScenarioResult & result)334 static void ShutdownServers(const std::vector<ServerData>& servers,
335 ScenarioResult& result) {
336 gpr_log(GPR_INFO, "Shutdown servers");
337 for (size_t i = 0, i_end = servers.size(); i < i_end; i++) {
338 auto server = &servers[i];
339 Status s = server->stream->Finish();
340 // Since we shutdown servers and clients at the same time, servers can
341 // observe cancellation. Thus, we consider both OK and CANCELLED as good
342 // status.
343 const bool success = IsSuccess(s);
344 result.add_server_success(success);
345 if (!success) {
346 grpc_core::Crash(absl::StrFormat("Server %zu had an error %s", i,
347 s.error_message().c_str()));
348 }
349 }
350 }
351
352 std::vector<grpc::testing::Server*>* g_inproc_servers = nullptr;
353
RunScenario(const ClientConfig & initial_client_config,size_t num_clients,const ServerConfig & initial_server_config,size_t num_servers,int warmup_seconds,int benchmark_seconds,int spawn_local_worker_count,const std::string & qps_server_target_override,const std::string & credential_type,const std::map<std::string,std::string> & per_worker_credential_types,bool run_inproc,int32_t median_latency_collection_interval_millis)354 std::unique_ptr<ScenarioResult> RunScenario(
355 const ClientConfig& initial_client_config, size_t num_clients,
356 const ServerConfig& initial_server_config, size_t num_servers,
357 int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count,
358 const std::string& qps_server_target_override,
359 const std::string& credential_type,
360 const std::map<std::string, std::string>& per_worker_credential_types,
361 bool run_inproc, int32_t median_latency_collection_interval_millis) {
362 if (run_inproc) {
363 g_inproc_servers = new std::vector<grpc::testing::Server*>;
364 }
365 // Log everything from the driver
366 gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
367
368 // ClientContext allocations (all are destroyed at scope exit)
369 list<ClientContext> contexts;
370 auto alloc_context = [](list<ClientContext>* contexts) {
371 contexts->emplace_back();
372 auto context = &contexts->back();
373 context->set_wait_for_ready(true);
374 return context;
375 };
376
377 // To be added to the result, containing the final configuration used for
378 // client and config (including host, etc.)
379 ClientConfig result_client_config;
380
381 // Get client, server lists; ignore if inproc test
382 auto workers = (!run_inproc) ? get_workers("QPS_WORKERS") : deque<string>();
383 ClientConfig client_config = initial_client_config;
384
385 // Spawn some local workers if desired
386 vector<unique_ptr<QpsWorker>> local_workers;
387 for (int i = 0; i < abs(spawn_local_worker_count); i++) {
388 // act as if we're a new test -- gets a good rng seed
389 static bool called_init = false;
390 if (!called_init) {
391 char args_buf[100];
392 strcpy(args_buf, "some-benchmark");
393 char* args[] = {args_buf};
394 int argc = 1;
395 grpc_test_init(&argc, args);
396 called_init = true;
397 }
398
399 char addr[256];
400 // we use port # of -1 to indicate inproc
401 int driver_port = (!run_inproc) ? grpc_pick_unused_port_or_die() : -1;
402 local_workers.emplace_back(new QpsWorker(driver_port, 0, credential_type));
403 sprintf(addr, "localhost:%d", driver_port);
404 if (spawn_local_worker_count < 0) {
405 workers.push_front(addr);
406 } else {
407 workers.push_back(addr);
408 }
409 }
410 GPR_ASSERT(!workers.empty());
411
412 // if num_clients is set to <=0, do dynamic sizing: all workers
413 // except for servers are clients
414 if (num_clients <= 0) {
415 num_clients = workers.size() - num_servers;
416 }
417
418 // TODO(ctiller): support running multiple configurations, and binpack
419 // client/server pairs
420 // to available workers
421 GPR_ASSERT(workers.size() >= num_clients + num_servers);
422
423 // Trim to just what we need
424 workers.resize(num_clients + num_servers);
425
426 // Start servers
427 std::vector<ServerData> servers(num_servers);
428 std::unordered_map<string, std::deque<int>> hosts_cores;
429 ChannelArguments channel_args;
430
431 for (size_t i = 0; i < num_servers; i++) {
432 gpr_log(GPR_INFO, "Starting server on %s (worker #%" PRIuPTR ")",
433 workers[i].c_str(), i);
434 if (!run_inproc) {
435 servers[i].stub = WorkerService::NewStub(grpc::CreateTestChannel(
436 workers[i],
437 GetCredType(workers[i], per_worker_credential_types, credential_type),
438 nullptr /* call creds */, {} /* interceptor creators */));
439 } else {
440 servers[i].stub = WorkerService::NewStub(
441 local_workers[i]->InProcessChannel(channel_args));
442 }
443
444 const ServerConfig& server_config = initial_server_config;
445 if (server_config.core_limit() != 0) {
446 grpc_core::Crash("server config core limit is set but ignored by driver");
447 }
448
449 ServerArgs args;
450 *args.mutable_setup() = server_config;
451 servers[i].stream = servers[i].stub->RunServer(alloc_context(&contexts));
452 if (!servers[i].stream->Write(args)) {
453 grpc_core::Crash(
454 absl::StrFormat("Could not write args to server %zu", i));
455 }
456 ServerStatus init_status;
457 if (!servers[i].stream->Read(&init_status)) {
458 grpc_core::Crash(
459 absl::StrFormat("Server %zu did not yield initial status", i));
460 }
461 if (run_inproc) {
462 std::string cli_target(INPROC_NAME_PREFIX);
463 cli_target += std::to_string(i);
464 client_config.add_server_targets(cli_target);
465 } else {
466 std::string host = get_host(workers[i]);
467 std::string cli_target =
468 grpc_core::JoinHostPort(host.c_str(), init_status.port());
469 client_config.add_server_targets(cli_target.c_str());
470 }
471 }
472 if (qps_server_target_override.length() > 0) {
473 // overriding the qps server target only makes since if there is <= 1
474 // servers
475 GPR_ASSERT(num_servers <= 1);
476 client_config.clear_server_targets();
477 client_config.add_server_targets(qps_server_target_override);
478 }
479 client_config.set_median_latency_collection_interval_millis(
480 median_latency_collection_interval_millis);
481
482 // Targets are all set by now
483 result_client_config = client_config;
484 // Start clients
485 std::vector<ClientData> clients(num_clients);
486 size_t channels_allocated = 0;
487 for (size_t i = 0; i < num_clients; i++) {
488 const auto& worker = workers[i + num_servers];
489 gpr_log(GPR_INFO, "Starting client on %s (worker #%" PRIuPTR ")",
490 worker.c_str(), i + num_servers);
491 if (!run_inproc) {
492 clients[i].stub = WorkerService::NewStub(grpc::CreateTestChannel(
493 worker,
494 GetCredType(worker, per_worker_credential_types, credential_type),
495 nullptr /* call creds */, {} /* interceptor creators */));
496 } else {
497 clients[i].stub = WorkerService::NewStub(
498 local_workers[i + num_servers]->InProcessChannel(channel_args));
499 }
500 ClientConfig per_client_config = client_config;
501
502 if (initial_client_config.core_limit() != 0) {
503 grpc_core::Crash("client config core limit set but ignored");
504 }
505
506 // Reduce channel count so that total channels specified is held regardless
507 // of the number of clients available
508 size_t num_channels =
509 (client_config.client_channels() - channels_allocated) /
510 (num_clients - i);
511 channels_allocated += num_channels;
512 gpr_log(GPR_DEBUG, "Client %" PRIdPTR " gets %" PRIdPTR " channels", i,
513 num_channels);
514 per_client_config.set_client_channels(num_channels);
515
516 ClientArgs args;
517 *args.mutable_setup() = per_client_config;
518 clients[i].stream = clients[i].stub->RunClient(alloc_context(&contexts));
519 if (!clients[i].stream->Write(args)) {
520 grpc_core::Crash(
521 absl::StrFormat("Could not write args to client %zu", i));
522 }
523 }
524
525 for (size_t i = 0; i < num_clients; i++) {
526 ClientStatus init_status;
527 if (!clients[i].stream->Read(&init_status)) {
528 grpc_core::Crash(
529 absl::StrFormat("Client %zu did not yield initial status", i));
530 }
531 }
532
533 // Send an initial mark: clients can use this to know that everything is ready
534 // to start
535 gpr_log(GPR_INFO, "Initiating");
536 ServerArgs server_mark;
537 server_mark.mutable_mark()->set_reset(true);
538 ClientArgs client_mark;
539 client_mark.mutable_mark()->set_reset(true);
540 ServerStatus server_status;
541 ClientStatus client_status;
542 for (size_t i = 0; i < num_clients; i++) {
543 auto client = &clients[i];
544 if (!client->stream->Write(client_mark)) {
545 grpc_core::Crash(absl::StrFormat("Couldn't write mark to client %zu", i));
546 }
547 }
548 for (size_t i = 0; i < num_clients; i++) {
549 auto client = &clients[i];
550 if (!client->stream->Read(&client_status)) {
551 grpc_core::Crash(
552 absl::StrFormat("Couldn't get status from client %zu", i));
553 }
554 }
555
556 // Let everything warmup
557 gpr_log(GPR_INFO, "Warming up");
558 gpr_timespec start = gpr_now(GPR_CLOCK_REALTIME);
559 gpr_sleep_until(
560 gpr_time_add(start, gpr_time_from_seconds(warmup_seconds, GPR_TIMESPAN)));
561
562 // Start a run
563 gpr_log(GPR_INFO, "Starting");
564
565 auto start_time = time(nullptr);
566
567 for (size_t i = 0; i < num_servers; i++) {
568 auto server = &servers[i];
569 if (!server->stream->Write(server_mark)) {
570 grpc_core::Crash(absl::StrFormat("Couldn't write mark to server %zu", i));
571 }
572 }
573 for (size_t i = 0; i < num_clients; i++) {
574 auto client = &clients[i];
575 if (!client->stream->Write(client_mark)) {
576 grpc_core::Crash(absl::StrFormat("Couldn't write mark to client %zu", i));
577 }
578 }
579 for (size_t i = 0; i < num_servers; i++) {
580 auto server = &servers[i];
581 if (!server->stream->Read(&server_status)) {
582 grpc_core::Crash(
583 absl::StrFormat("Couldn't get status from server %zu", i));
584 }
585 }
586 for (size_t i = 0; i < num_clients; i++) {
587 auto client = &clients[i];
588 if (!client->stream->Read(&client_status)) {
589 grpc_core::Crash(
590 absl::StrFormat("Couldn't get status from client %zu", i));
591 }
592 }
593
594 // Wait some time
595 gpr_log(GPR_INFO, "Running");
596 // Use gpr_sleep_until rather than this_thread::sleep_until to support
597 // compilers that don't work with this_thread
598 gpr_sleep_until(gpr_time_add(
599 start,
600 gpr_time_from_seconds(warmup_seconds + benchmark_seconds, GPR_TIMESPAN)));
601
602 // Finish a run
603 std::unique_ptr<ScenarioResult> result(new ScenarioResult);
604 Histogram merged_latencies;
605 std::unordered_map<int, int64_t> merged_statuses;
606
607 // For the case where clients lead the test such as UNARY and
608 // STREAMING_FROM_CLIENT, clients need to finish completely while a server
609 // is running to prevent the clients from being stuck while waiting for
610 // the result.
611 bool client_finish_first =
612 (client_config.rpc_type() != STREAMING_FROM_SERVER);
613
614 auto end_time = time(nullptr);
615
616 FinishClients(clients, client_mark);
617
618 if (!client_finish_first) {
619 FinishServers(servers, server_mark);
620 }
621
622 ReceiveFinalStatusFromClients(clients, merged_latencies, merged_statuses,
623 *result);
624 ShutdownClients(clients, *result);
625
626 if (client_finish_first) {
627 FinishServers(servers, server_mark);
628 }
629
630 ReceiveFinalStatusFromServer(servers, *result);
631 ShutdownServers(servers, *result);
632
633 delete g_inproc_servers;
634
635 merged_latencies.FillProto(result->mutable_latencies());
636 for (std::unordered_map<int, int64_t>::iterator it = merged_statuses.begin();
637 it != merged_statuses.end(); ++it) {
638 RequestResultCount* rrc = result->add_request_results();
639 rrc->set_status_code(it->first);
640 rrc->set_count(it->second);
641 }
642
643 // Fill in start and end time for the test scenario
644 result->mutable_summary()->mutable_start_time()->set_seconds(start_time);
645 result->mutable_summary()->mutable_end_time()->set_seconds(end_time);
646
647 postprocess_scenario_result(result.get());
648 return result;
649 }
650
RunQuit(const std::string & credential_type,const std::map<std::string,std::string> & per_worker_credential_types)651 bool RunQuit(
652 const std::string& credential_type,
653 const std::map<std::string, std::string>& per_worker_credential_types) {
654 // Get client, server lists
655 bool result = true;
656 auto workers = get_workers("QPS_WORKERS");
657 if (workers.empty()) {
658 return false;
659 }
660
661 for (size_t i = 0; i < workers.size(); i++) {
662 auto stub = WorkerService::NewStub(grpc::CreateTestChannel(
663 workers[i],
664 GetCredType(workers[i], per_worker_credential_types, credential_type),
665 nullptr /* call creds */, {} /* interceptor creators */));
666 Void phony;
667 grpc::ClientContext ctx;
668 ctx.set_wait_for_ready(true);
669 Status s = stub->QuitWorker(&ctx, phony, &phony);
670 if (!s.ok()) {
671 gpr_log(GPR_ERROR, "Worker %zu could not be properly quit because %s", i,
672 s.error_message().c_str());
673 result = false;
674 }
675 }
676 return result;
677 }
678
679 } // namespace testing
680 } // namespace grpc
681