1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <algorithm>
20 #include <forward_list>
21 #include <functional>
22 #include <memory>
23 #include <mutex>
24 #include <thread>
25
26 #include <grpc/grpc.h>
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/log.h>
29 #include <grpcpp/generic/async_generic_service.h>
30 #include <grpcpp/resource_quota.h>
31 #include <grpcpp/security/server_credentials.h>
32 #include <grpcpp/server.h>
33 #include <grpcpp/server_builder.h>
34 #include <grpcpp/server_context.h>
35 #include <grpcpp/support/config.h>
36
37 #include "src/core/lib/gprpp/crash.h"
38 #include "src/core/lib/gprpp/host_port.h"
39 #include "src/core/lib/surface/completion_queue.h"
40 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
41 #include "test/core/util/test_config.h"
42 #include "test/cpp/qps/qps_server_builder.h"
43 #include "test/cpp/qps/server.h"
44
45 namespace grpc {
46 namespace testing {
47
48 template <class RequestType, class ResponseType, class ServiceType,
49 class ServerContextType>
50 class AsyncQpsServerTest final : public grpc::testing::Server {
51 public:
AsyncQpsServerTest(const ServerConfig & config,std::function<void (ServerBuilder *,ServiceType *)> register_service,std::function<void (ServiceType *,ServerContextType *,RequestType *,ServerAsyncResponseWriter<ResponseType> *,CompletionQueue *,ServerCompletionQueue *,void *)> request_unary_function,std::function<void (ServiceType *,ServerContextType *,ServerAsyncReaderWriter<ResponseType,RequestType> *,CompletionQueue *,ServerCompletionQueue *,void *)> request_streaming_function,std::function<void (ServiceType *,ServerContextType *,ServerAsyncReader<ResponseType,RequestType> *,CompletionQueue *,ServerCompletionQueue *,void *)> request_streaming_from_client_function,std::function<void (ServiceType *,ServerContextType *,RequestType *,ServerAsyncWriter<ResponseType> *,CompletionQueue *,ServerCompletionQueue *,void *)> request_streaming_from_server_function,std::function<void (ServiceType *,ServerContextType *,ServerAsyncReaderWriter<ResponseType,RequestType> *,CompletionQueue *,ServerCompletionQueue *,void *)> request_streaming_both_ways_function,std::function<grpc::Status (const PayloadConfig &,RequestType *,ResponseType *)> process_rpc)52 AsyncQpsServerTest(
53 const ServerConfig& config,
54 std::function<void(ServerBuilder*, ServiceType*)> register_service,
55 std::function<void(ServiceType*, ServerContextType*, RequestType*,
56 ServerAsyncResponseWriter<ResponseType>*,
57 CompletionQueue*, ServerCompletionQueue*, void*)>
58 request_unary_function,
59 std::function<void(ServiceType*, ServerContextType*,
60 ServerAsyncReaderWriter<ResponseType, RequestType>*,
61 CompletionQueue*, ServerCompletionQueue*, void*)>
62 request_streaming_function,
63 std::function<void(ServiceType*, ServerContextType*,
64 ServerAsyncReader<ResponseType, RequestType>*,
65 CompletionQueue*, ServerCompletionQueue*, void*)>
66 request_streaming_from_client_function,
67 std::function<void(ServiceType*, ServerContextType*, RequestType*,
68 ServerAsyncWriter<ResponseType>*, CompletionQueue*,
69 ServerCompletionQueue*, void*)>
70 request_streaming_from_server_function,
71 std::function<void(ServiceType*, ServerContextType*,
72 ServerAsyncReaderWriter<ResponseType, RequestType>*,
73 CompletionQueue*, ServerCompletionQueue*, void*)>
74 request_streaming_both_ways_function,
75 std::function<grpc::Status(const PayloadConfig&, RequestType*,
76 ResponseType*)>
77 process_rpc)
78 : Server(config) {
79 std::unique_ptr<ServerBuilder> builder = CreateQpsServerBuilder();
80
81 auto port_num = port();
82 // Negative port number means inproc server, so no listen port needed
83 if (port_num >= 0) {
84 std::string server_address = grpc_core::JoinHostPort("::", port_num);
85 builder->AddListeningPort(
86 server_address, Server::CreateServerCredentials(config), &port_num);
87 }
88
89 register_service(builder.get(), &async_service_);
90
91 int num_threads = config.async_server_threads();
92 if (num_threads <= 0) { // dynamic sizing
93 num_threads = std::min(64, cores());
94 gpr_log(GPR_INFO,
95 "Sizing async server to %d threads. Defaults to number of cores "
96 "in machine or 64 threads if machine has more than 64 cores to "
97 "avoid OOMs.",
98 num_threads);
99 }
100
101 int tpc = std::max(1, config.threads_per_cq()); // 1 if unspecified
102 int num_cqs = (num_threads + tpc - 1) / tpc; // ceiling operator
103 for (int i = 0; i < num_cqs; i++) {
104 srv_cqs_.emplace_back(builder->AddCompletionQueue());
105 }
106 for (int i = 0; i < num_threads; i++) {
107 cq_.emplace_back(i % srv_cqs_.size());
108 }
109
110 ApplyConfigToBuilder(config, builder.get());
111
112 server_ = builder->BuildAndStart();
113 if (server_ == nullptr) {
114 gpr_log(GPR_ERROR, "Server: Fail to BuildAndStart(port=%d)", port_num);
115 } else {
116 gpr_log(GPR_INFO, "Server: BuildAndStart(port=%d)", port_num);
117 }
118
119 auto process_rpc_bound =
120 std::bind(process_rpc, config.payload_config(), std::placeholders::_1,
121 std::placeholders::_2);
122
123 for (int i = 0; i < 5000; i++) {
124 for (int j = 0; j < num_cqs; j++) {
125 if (request_unary_function) {
126 auto request_unary = std::bind(
127 request_unary_function, &async_service_, std::placeholders::_1,
128 std::placeholders::_2, std::placeholders::_3, srv_cqs_[j].get(),
129 srv_cqs_[j].get(), std::placeholders::_4);
130 contexts_.emplace_back(
131 new ServerRpcContextUnaryImpl(request_unary, process_rpc_bound));
132 }
133 if (request_streaming_function) {
134 auto request_streaming = std::bind(
135 request_streaming_function, &async_service_,
136 std::placeholders::_1, std::placeholders::_2, srv_cqs_[j].get(),
137 srv_cqs_[j].get(), std::placeholders::_3);
138 contexts_.emplace_back(new ServerRpcContextStreamingImpl(
139 request_streaming, process_rpc_bound));
140 }
141 if (request_streaming_from_client_function) {
142 auto request_streaming_from_client = std::bind(
143 request_streaming_from_client_function, &async_service_,
144 std::placeholders::_1, std::placeholders::_2, srv_cqs_[j].get(),
145 srv_cqs_[j].get(), std::placeholders::_3);
146 contexts_.emplace_back(new ServerRpcContextStreamingFromClientImpl(
147 request_streaming_from_client, process_rpc_bound));
148 }
149 if (request_streaming_from_server_function) {
150 auto request_streaming_from_server =
151 std::bind(request_streaming_from_server_function, &async_service_,
152 std::placeholders::_1, std::placeholders::_2,
153 std::placeholders::_3, srv_cqs_[j].get(),
154 srv_cqs_[j].get(), std::placeholders::_4);
155 contexts_.emplace_back(new ServerRpcContextStreamingFromServerImpl(
156 request_streaming_from_server, process_rpc_bound));
157 }
158 if (request_streaming_both_ways_function) {
159 // TODO(vjpai): Add this code
160 }
161 }
162 }
163
164 for (int i = 0; i < num_threads; i++) {
165 shutdown_state_.emplace_back(new PerThreadShutdownState());
166 threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i);
167 }
168 }
~AsyncQpsServerTest()169 ~AsyncQpsServerTest() override {
170 for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
171 std::lock_guard<std::mutex> lock((*ss)->mutex);
172 (*ss)->shutdown = true;
173 }
174 // TODO(vjpai): Remove the following deadline and allow full proper
175 // shutdown.
176 server_->Shutdown(std::chrono::system_clock::now() +
177 std::chrono::seconds(3));
178 for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
179 (*cq)->Shutdown();
180 }
181 for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
182 thr->join();
183 }
184 for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
185 bool ok;
186 void* got_tag;
187 while ((*cq)->Next(&got_tag, &ok)) {
188 }
189 }
190 }
191
GetPollCount()192 int GetPollCount() override {
193 int count = 0;
194 for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); cq++) {
195 count += grpc_get_cq_poll_num((*cq)->cq());
196 }
197 return count;
198 }
199
InProcessChannel(const ChannelArguments & args)200 std::shared_ptr<Channel> InProcessChannel(
201 const ChannelArguments& args) override {
202 return server_->InProcessChannel(args);
203 }
204
205 private:
ThreadFunc(int thread_idx)206 void ThreadFunc(int thread_idx) {
207 // Wait until work is available or we are shutting down
208 bool ok;
209 void* got_tag;
210 if (!srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
211 return;
212 }
213 ServerRpcContext* ctx;
214 std::mutex* mu_ptr = &shutdown_state_[thread_idx]->mutex;
215 do {
216 ctx = detag(got_tag);
217 // The tag is a pointer to an RPC context to invoke
218 // Proceed while holding a lock to make sure that
219 // this thread isn't supposed to shut down
220 mu_ptr->lock();
221 if (shutdown_state_[thread_idx]->shutdown) {
222 mu_ptr->unlock();
223 return;
224 }
225 } while (srv_cqs_[cq_[thread_idx]]->DoThenAsyncNext(
226 [&, ctx, ok, mu_ptr]() {
227 ctx->lock();
228 if (!ctx->RunNextState(ok)) {
229 ctx->Reset();
230 }
231 ctx->unlock();
232 mu_ptr->unlock();
233 },
234 &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME)));
235 }
236
237 class ServerRpcContext {
238 public:
ServerRpcContext()239 ServerRpcContext() {}
lock()240 void lock() { mu_.lock(); }
unlock()241 void unlock() { mu_.unlock(); }
~ServerRpcContext()242 virtual ~ServerRpcContext(){};
243 virtual bool RunNextState(bool) = 0; // next state, return false if done
244 virtual void Reset() = 0; // start this back at a clean state
245 private:
246 std::mutex mu_;
247 };
tag(ServerRpcContext * func)248 static void* tag(ServerRpcContext* func) { return static_cast<void*>(func); }
detag(void * tag)249 static ServerRpcContext* detag(void* tag) {
250 return static_cast<ServerRpcContext*>(tag);
251 }
252
253 class ServerRpcContextUnaryImpl final : public ServerRpcContext {
254 public:
ServerRpcContextUnaryImpl(std::function<void (ServerContextType *,RequestType *,grpc::ServerAsyncResponseWriter<ResponseType> *,void *)> request_method,std::function<grpc::Status (RequestType *,ResponseType *)> invoke_method)255 ServerRpcContextUnaryImpl(
256 std::function<void(ServerContextType*, RequestType*,
257 grpc::ServerAsyncResponseWriter<ResponseType>*,
258 void*)>
259 request_method,
260 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method)
261 : srv_ctx_(new ServerContextType),
262 next_state_(&ServerRpcContextUnaryImpl::invoker),
263 request_method_(request_method),
264 invoke_method_(invoke_method),
265 response_writer_(srv_ctx_.get()) {
266 request_method_(srv_ctx_.get(), &req_, &response_writer_,
267 AsyncQpsServerTest::tag(this));
268 }
~ServerRpcContextUnaryImpl()269 ~ServerRpcContextUnaryImpl() override {}
RunNextState(bool ok)270 bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
Reset()271 void Reset() override {
272 srv_ctx_.reset(new ServerContextType);
273 req_ = RequestType();
274 response_writer_ =
275 grpc::ServerAsyncResponseWriter<ResponseType>(srv_ctx_.get());
276
277 // Then request the method
278 next_state_ = &ServerRpcContextUnaryImpl::invoker;
279 request_method_(srv_ctx_.get(), &req_, &response_writer_,
280 AsyncQpsServerTest::tag(this));
281 }
282
283 private:
finisher(bool)284 bool finisher(bool) { return false; }
invoker(bool ok)285 bool invoker(bool ok) {
286 if (!ok) {
287 return false;
288 }
289
290 // Call the RPC processing function
291 grpc::Status status = invoke_method_(&req_, &response_);
292
293 // Have the response writer work and invoke on_finish when done
294 next_state_ = &ServerRpcContextUnaryImpl::finisher;
295 response_writer_.Finish(response_, status, AsyncQpsServerTest::tag(this));
296 return true;
297 }
298 std::unique_ptr<ServerContextType> srv_ctx_;
299 RequestType req_;
300 ResponseType response_;
301 bool (ServerRpcContextUnaryImpl::*next_state_)(bool);
302 std::function<void(ServerContextType*, RequestType*,
303 grpc::ServerAsyncResponseWriter<ResponseType>*, void*)>
304 request_method_;
305 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_;
306 grpc::ServerAsyncResponseWriter<ResponseType> response_writer_;
307 };
308
309 class ServerRpcContextStreamingImpl final : public ServerRpcContext {
310 public:
ServerRpcContextStreamingImpl(std::function<void (ServerContextType *,grpc::ServerAsyncReaderWriter<ResponseType,RequestType> *,void *)> request_method,std::function<grpc::Status (RequestType *,ResponseType *)> invoke_method)311 ServerRpcContextStreamingImpl(
312 std::function<void(
313 ServerContextType*,
314 grpc::ServerAsyncReaderWriter<ResponseType, RequestType>*, void*)>
315 request_method,
316 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method)
317 : srv_ctx_(new ServerContextType),
318 next_state_(&ServerRpcContextStreamingImpl::request_done),
319 request_method_(request_method),
320 invoke_method_(invoke_method),
321 stream_(srv_ctx_.get()) {
322 request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
323 }
~ServerRpcContextStreamingImpl()324 ~ServerRpcContextStreamingImpl() override {}
RunNextState(bool ok)325 bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
Reset()326 void Reset() override {
327 srv_ctx_.reset(new ServerContextType);
328 req_ = RequestType();
329 stream_ = grpc::ServerAsyncReaderWriter<ResponseType, RequestType>(
330 srv_ctx_.get());
331
332 // Then request the method
333 next_state_ = &ServerRpcContextStreamingImpl::request_done;
334 request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
335 }
336
337 private:
request_done(bool ok)338 bool request_done(bool ok) {
339 if (!ok) {
340 return false;
341 }
342 next_state_ = &ServerRpcContextStreamingImpl::read_done;
343 stream_.Read(&req_, AsyncQpsServerTest::tag(this));
344 return true;
345 }
346
read_done(bool ok)347 bool read_done(bool ok) {
348 if (ok) {
349 // invoke the method
350 // Call the RPC processing function
351 grpc::Status status = invoke_method_(&req_, &response_);
352 // initiate the write
353 next_state_ = &ServerRpcContextStreamingImpl::write_done;
354 stream_.Write(response_, AsyncQpsServerTest::tag(this));
355 } else { // client has sent writes done
356 // finish the stream
357 next_state_ = &ServerRpcContextStreamingImpl::finish_done;
358 stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
359 }
360 return true;
361 }
write_done(bool ok)362 bool write_done(bool ok) {
363 // now go back and get another streaming read!
364 if (ok) {
365 next_state_ = &ServerRpcContextStreamingImpl::read_done;
366 stream_.Read(&req_, AsyncQpsServerTest::tag(this));
367 } else {
368 next_state_ = &ServerRpcContextStreamingImpl::finish_done;
369 stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
370 }
371 return true;
372 }
finish_done(bool)373 bool finish_done(bool /*ok*/) { return false; /*reset the context*/ }
374
375 std::unique_ptr<ServerContextType> srv_ctx_;
376 RequestType req_;
377 ResponseType response_;
378 bool (ServerRpcContextStreamingImpl::*next_state_)(bool);
379 std::function<void(
380 ServerContextType*,
381 grpc::ServerAsyncReaderWriter<ResponseType, RequestType>*, void*)>
382 request_method_;
383 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_;
384 grpc::ServerAsyncReaderWriter<ResponseType, RequestType> stream_;
385 };
386
387 class ServerRpcContextStreamingFromClientImpl final
388 : public ServerRpcContext {
389 public:
ServerRpcContextStreamingFromClientImpl(std::function<void (ServerContextType *,grpc::ServerAsyncReader<ResponseType,RequestType> *,void *)> request_method,std::function<grpc::Status (RequestType *,ResponseType *)> invoke_method)390 ServerRpcContextStreamingFromClientImpl(
391 std::function<void(ServerContextType*,
392 grpc::ServerAsyncReader<ResponseType, RequestType>*,
393 void*)>
394 request_method,
395 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method)
396 : srv_ctx_(new ServerContextType),
397 next_state_(&ServerRpcContextStreamingFromClientImpl::request_done),
398 request_method_(request_method),
399 invoke_method_(invoke_method),
400 stream_(srv_ctx_.get()) {
401 request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
402 }
~ServerRpcContextStreamingFromClientImpl()403 ~ServerRpcContextStreamingFromClientImpl() override {}
RunNextState(bool ok)404 bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
Reset()405 void Reset() override {
406 srv_ctx_.reset(new ServerContextType);
407 req_ = RequestType();
408 stream_ =
409 grpc::ServerAsyncReader<ResponseType, RequestType>(srv_ctx_.get());
410
411 // Then request the method
412 next_state_ = &ServerRpcContextStreamingFromClientImpl::request_done;
413 request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
414 }
415
416 private:
request_done(bool ok)417 bool request_done(bool ok) {
418 if (!ok) {
419 return false;
420 }
421 next_state_ = &ServerRpcContextStreamingFromClientImpl::read_done;
422 stream_.Read(&req_, AsyncQpsServerTest::tag(this));
423 return true;
424 }
425
read_done(bool ok)426 bool read_done(bool ok) {
427 if (ok) {
428 // In this case, just do another read
429 // next_state_ is unchanged
430 stream_.Read(&req_, AsyncQpsServerTest::tag(this));
431 return true;
432 } else { // client has sent writes done
433 // invoke the method
434 // Call the RPC processing function
435 grpc::Status status = invoke_method_(&req_, &response_);
436 // finish the stream
437 next_state_ = &ServerRpcContextStreamingFromClientImpl::finish_done;
438 stream_.Finish(response_, Status::OK, AsyncQpsServerTest::tag(this));
439 }
440 return true;
441 }
finish_done(bool)442 bool finish_done(bool /*ok*/) { return false; /*reset the context*/ }
443
444 std::unique_ptr<ServerContextType> srv_ctx_;
445 RequestType req_;
446 ResponseType response_;
447 bool (ServerRpcContextStreamingFromClientImpl::*next_state_)(bool);
448 std::function<void(ServerContextType*,
449 grpc::ServerAsyncReader<ResponseType, RequestType>*,
450 void*)>
451 request_method_;
452 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_;
453 grpc::ServerAsyncReader<ResponseType, RequestType> stream_;
454 };
455
456 class ServerRpcContextStreamingFromServerImpl final
457 : public ServerRpcContext {
458 public:
ServerRpcContextStreamingFromServerImpl(std::function<void (ServerContextType *,RequestType *,grpc::ServerAsyncWriter<ResponseType> *,void *)> request_method,std::function<grpc::Status (RequestType *,ResponseType *)> invoke_method)459 ServerRpcContextStreamingFromServerImpl(
460 std::function<void(ServerContextType*, RequestType*,
461 grpc::ServerAsyncWriter<ResponseType>*, void*)>
462 request_method,
463 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method)
464 : srv_ctx_(new ServerContextType),
465 next_state_(&ServerRpcContextStreamingFromServerImpl::request_done),
466 request_method_(request_method),
467 invoke_method_(invoke_method),
468 stream_(srv_ctx_.get()) {
469 request_method_(srv_ctx_.get(), &req_, &stream_,
470 AsyncQpsServerTest::tag(this));
471 }
~ServerRpcContextStreamingFromServerImpl()472 ~ServerRpcContextStreamingFromServerImpl() override {}
RunNextState(bool ok)473 bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
Reset()474 void Reset() override {
475 srv_ctx_.reset(new ServerContextType);
476 req_ = RequestType();
477 stream_ = grpc::ServerAsyncWriter<ResponseType>(srv_ctx_.get());
478
479 // Then request the method
480 next_state_ = &ServerRpcContextStreamingFromServerImpl::request_done;
481 request_method_(srv_ctx_.get(), &req_, &stream_,
482 AsyncQpsServerTest::tag(this));
483 }
484
485 private:
request_done(bool ok)486 bool request_done(bool ok) {
487 if (!ok) {
488 return false;
489 }
490 // invoke the method
491 // Call the RPC processing function
492 grpc::Status status = invoke_method_(&req_, &response_);
493
494 next_state_ = &ServerRpcContextStreamingFromServerImpl::write_done;
495 stream_.Write(response_, AsyncQpsServerTest::tag(this));
496 return true;
497 }
498
write_done(bool ok)499 bool write_done(bool ok) {
500 if (ok) {
501 // Do another write!
502 // next_state_ is unchanged
503 stream_.Write(response_, AsyncQpsServerTest::tag(this));
504 } else { // must be done so let's finish
505 next_state_ = &ServerRpcContextStreamingFromServerImpl::finish_done;
506 stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
507 }
508 return true;
509 }
finish_done(bool)510 bool finish_done(bool /*ok*/) { return false; /*reset the context*/ }
511
512 std::unique_ptr<ServerContextType> srv_ctx_;
513 RequestType req_;
514 ResponseType response_;
515 bool (ServerRpcContextStreamingFromServerImpl::*next_state_)(bool);
516 std::function<void(ServerContextType*, RequestType*,
517 grpc::ServerAsyncWriter<ResponseType>*, void*)>
518 request_method_;
519 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_;
520 grpc::ServerAsyncWriter<ResponseType> stream_;
521 };
522
523 std::vector<std::thread> threads_;
524 std::unique_ptr<grpc::Server> server_;
525 std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> srv_cqs_;
526 std::vector<int> cq_;
527 ServiceType async_service_;
528 std::vector<std::unique_ptr<ServerRpcContext>> contexts_;
529
530 struct PerThreadShutdownState {
531 mutable std::mutex mutex;
532 bool shutdown;
PerThreadShutdownStategrpc::testing::AsyncQpsServerTest::PerThreadShutdownState533 PerThreadShutdownState() : shutdown(false) {}
534 };
535
536 std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
537 };
538
RegisterBenchmarkService(ServerBuilder * builder,BenchmarkService::AsyncService * service)539 static void RegisterBenchmarkService(ServerBuilder* builder,
540 BenchmarkService::AsyncService* service) {
541 builder->RegisterService(service);
542 }
RegisterGenericService(ServerBuilder * builder,grpc::AsyncGenericService * service)543 static void RegisterGenericService(ServerBuilder* builder,
544 grpc::AsyncGenericService* service) {
545 builder->RegisterAsyncGenericService(service);
546 }
547
ProcessSimpleRPC(const PayloadConfig &,SimpleRequest * request,SimpleResponse * response)548 static Status ProcessSimpleRPC(const PayloadConfig&, SimpleRequest* request,
549 SimpleResponse* response) {
550 if (request->response_size() > 0) {
551 if (!Server::SetPayload(request->response_type(), request->response_size(),
552 response->mutable_payload())) {
553 return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
554 }
555 }
556 // We are done using the request. Clear it to reduce working memory.
557 // This proves to reduce cache misses in large message size cases.
558 request->Clear();
559 return Status::OK;
560 }
561
ProcessGenericRPC(const PayloadConfig & payload_config,ByteBuffer * request,ByteBuffer * response)562 static Status ProcessGenericRPC(const PayloadConfig& payload_config,
563 ByteBuffer* request, ByteBuffer* response) {
564 // We are done using the request. Clear it to reduce working memory.
565 // This proves to reduce cache misses in large message size cases.
566 request->Clear();
567 int resp_size = payload_config.bytebuf_params().resp_size();
568 std::unique_ptr<char[]> buf(new char[resp_size]);
569 memset(buf.get(), 0, static_cast<size_t>(resp_size));
570 Slice slice(buf.get(), resp_size);
571 *response = ByteBuffer(&slice, 1);
572 return Status::OK;
573 }
574
CreateAsyncServer(const ServerConfig & config)575 std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config) {
576 return std::unique_ptr<Server>(
577 new AsyncQpsServerTest<SimpleRequest, SimpleResponse,
578 BenchmarkService::AsyncService,
579 grpc::ServerContext>(
580 config, RegisterBenchmarkService,
581 &BenchmarkService::AsyncService::RequestUnaryCall,
582 &BenchmarkService::AsyncService::RequestStreamingCall,
583 &BenchmarkService::AsyncService::RequestStreamingFromClient,
584 &BenchmarkService::AsyncService::RequestStreamingFromServer,
585 &BenchmarkService::AsyncService::RequestStreamingBothWays,
586 ProcessSimpleRPC));
587 }
CreateAsyncGenericServer(const ServerConfig & config)588 std::unique_ptr<Server> CreateAsyncGenericServer(const ServerConfig& config) {
589 return std::unique_ptr<Server>(
590 new AsyncQpsServerTest<ByteBuffer, ByteBuffer, grpc::AsyncGenericService,
591 grpc::GenericServerContext>(
592 config, RegisterGenericService, nullptr,
593 &grpc::AsyncGenericService::RequestCall, nullptr, nullptr, nullptr,
594 ProcessGenericRPC));
595 }
596
597 } // namespace testing
598 } // namespace grpc
599