1*cc02d7e2SAndroid Build Coastguard Worker //
2*cc02d7e2SAndroid Build Coastguard Worker //
3*cc02d7e2SAndroid Build Coastguard Worker // Copyright 2015 gRPC authors.
4*cc02d7e2SAndroid Build Coastguard Worker //
5*cc02d7e2SAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License");
6*cc02d7e2SAndroid Build Coastguard Worker // you may not use this file except in compliance with the License.
7*cc02d7e2SAndroid Build Coastguard Worker // You may obtain a copy of the License at
8*cc02d7e2SAndroid Build Coastguard Worker //
9*cc02d7e2SAndroid Build Coastguard Worker // http://www.apache.org/licenses/LICENSE-2.0
10*cc02d7e2SAndroid Build Coastguard Worker //
11*cc02d7e2SAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
12*cc02d7e2SAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS,
13*cc02d7e2SAndroid Build Coastguard Worker // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14*cc02d7e2SAndroid Build Coastguard Worker // See the License for the specific language governing permissions and
15*cc02d7e2SAndroid Build Coastguard Worker // limitations under the License.
16*cc02d7e2SAndroid Build Coastguard Worker //
17*cc02d7e2SAndroid Build Coastguard Worker //
18*cc02d7e2SAndroid Build Coastguard Worker
19*cc02d7e2SAndroid Build Coastguard Worker #include <atomic>
20*cc02d7e2SAndroid Build Coastguard Worker #include <thread>
21*cc02d7e2SAndroid Build Coastguard Worker
22*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/grpc.h>
23*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/support/alloc.h>
24*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/security/server_credentials.h>
25*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/server.h>
26*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/server_context.h>
27*cc02d7e2SAndroid Build Coastguard Worker
28*cc02d7e2SAndroid Build Coastguard Worker #include "src/core/lib/gprpp/host_port.h"
29*cc02d7e2SAndroid Build Coastguard Worker #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
30*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/qps/qps_server_builder.h"
31*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/qps/server.h"
32*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/qps/usage_timer.h"
33*cc02d7e2SAndroid Build Coastguard Worker
34*cc02d7e2SAndroid Build Coastguard Worker namespace grpc {
35*cc02d7e2SAndroid Build Coastguard Worker namespace testing {
36*cc02d7e2SAndroid Build Coastguard Worker
37*cc02d7e2SAndroid Build Coastguard Worker class BenchmarkServiceImpl final : public BenchmarkService::Service {
38*cc02d7e2SAndroid Build Coastguard Worker public:
UnaryCall(ServerContext *,const SimpleRequest * request,SimpleResponse * response)39*cc02d7e2SAndroid Build Coastguard Worker Status UnaryCall(ServerContext* /*context*/, const SimpleRequest* request,
40*cc02d7e2SAndroid Build Coastguard Worker SimpleResponse* response) override {
41*cc02d7e2SAndroid Build Coastguard Worker auto s = SetResponse(request, response);
42*cc02d7e2SAndroid Build Coastguard Worker if (!s.ok()) {
43*cc02d7e2SAndroid Build Coastguard Worker return s;
44*cc02d7e2SAndroid Build Coastguard Worker }
45*cc02d7e2SAndroid Build Coastguard Worker return Status::OK;
46*cc02d7e2SAndroid Build Coastguard Worker }
StreamingCall(ServerContext *,ServerReaderWriter<SimpleResponse,SimpleRequest> * stream)47*cc02d7e2SAndroid Build Coastguard Worker Status StreamingCall(
48*cc02d7e2SAndroid Build Coastguard Worker ServerContext* /*context*/,
49*cc02d7e2SAndroid Build Coastguard Worker ServerReaderWriter<SimpleResponse, SimpleRequest>* stream) override {
50*cc02d7e2SAndroid Build Coastguard Worker SimpleRequest request;
51*cc02d7e2SAndroid Build Coastguard Worker while (stream->Read(&request)) {
52*cc02d7e2SAndroid Build Coastguard Worker SimpleResponse response;
53*cc02d7e2SAndroid Build Coastguard Worker auto s = SetResponse(&request, &response);
54*cc02d7e2SAndroid Build Coastguard Worker if (!s.ok()) {
55*cc02d7e2SAndroid Build Coastguard Worker return s;
56*cc02d7e2SAndroid Build Coastguard Worker }
57*cc02d7e2SAndroid Build Coastguard Worker if (!stream->Write(response)) {
58*cc02d7e2SAndroid Build Coastguard Worker return Status(StatusCode::INTERNAL, "Server couldn't respond");
59*cc02d7e2SAndroid Build Coastguard Worker }
60*cc02d7e2SAndroid Build Coastguard Worker }
61*cc02d7e2SAndroid Build Coastguard Worker return Status::OK;
62*cc02d7e2SAndroid Build Coastguard Worker }
StreamingFromClient(ServerContext * context,ServerReader<SimpleRequest> * stream,SimpleResponse * response)63*cc02d7e2SAndroid Build Coastguard Worker Status StreamingFromClient(ServerContext* context,
64*cc02d7e2SAndroid Build Coastguard Worker ServerReader<SimpleRequest>* stream,
65*cc02d7e2SAndroid Build Coastguard Worker SimpleResponse* response) override {
66*cc02d7e2SAndroid Build Coastguard Worker auto s = ClientPull(context, stream, response);
67*cc02d7e2SAndroid Build Coastguard Worker if (!s.ok()) {
68*cc02d7e2SAndroid Build Coastguard Worker return s;
69*cc02d7e2SAndroid Build Coastguard Worker }
70*cc02d7e2SAndroid Build Coastguard Worker return Status::OK;
71*cc02d7e2SAndroid Build Coastguard Worker }
StreamingFromServer(ServerContext * context,const SimpleRequest * request,ServerWriter<SimpleResponse> * stream)72*cc02d7e2SAndroid Build Coastguard Worker Status StreamingFromServer(ServerContext* context,
73*cc02d7e2SAndroid Build Coastguard Worker const SimpleRequest* request,
74*cc02d7e2SAndroid Build Coastguard Worker ServerWriter<SimpleResponse>* stream) override {
75*cc02d7e2SAndroid Build Coastguard Worker SimpleResponse response;
76*cc02d7e2SAndroid Build Coastguard Worker auto s = SetResponse(request, &response);
77*cc02d7e2SAndroid Build Coastguard Worker if (!s.ok()) {
78*cc02d7e2SAndroid Build Coastguard Worker return s;
79*cc02d7e2SAndroid Build Coastguard Worker }
80*cc02d7e2SAndroid Build Coastguard Worker return ServerPush(context, stream, response, nullptr);
81*cc02d7e2SAndroid Build Coastguard Worker }
StreamingBothWays(ServerContext * context,ServerReaderWriter<SimpleResponse,SimpleRequest> * stream)82*cc02d7e2SAndroid Build Coastguard Worker Status StreamingBothWays(
83*cc02d7e2SAndroid Build Coastguard Worker ServerContext* context,
84*cc02d7e2SAndroid Build Coastguard Worker ServerReaderWriter<SimpleResponse, SimpleRequest>* stream) override {
85*cc02d7e2SAndroid Build Coastguard Worker // Read the first client message to setup server response
86*cc02d7e2SAndroid Build Coastguard Worker SimpleRequest request;
87*cc02d7e2SAndroid Build Coastguard Worker if (!stream->Read(&request)) {
88*cc02d7e2SAndroid Build Coastguard Worker return Status::OK;
89*cc02d7e2SAndroid Build Coastguard Worker }
90*cc02d7e2SAndroid Build Coastguard Worker SimpleResponse response;
91*cc02d7e2SAndroid Build Coastguard Worker auto s = SetResponse(&request, &response);
92*cc02d7e2SAndroid Build Coastguard Worker if (!s.ok()) {
93*cc02d7e2SAndroid Build Coastguard Worker return s;
94*cc02d7e2SAndroid Build Coastguard Worker }
95*cc02d7e2SAndroid Build Coastguard Worker std::atomic_bool done;
96*cc02d7e2SAndroid Build Coastguard Worker Status sp;
97*cc02d7e2SAndroid Build Coastguard Worker std::thread t([context, stream, &response, &done, &sp]() {
98*cc02d7e2SAndroid Build Coastguard Worker sp = ServerPush(context, stream, response, [&done]() {
99*cc02d7e2SAndroid Build Coastguard Worker return done.load(std::memory_order_relaxed);
100*cc02d7e2SAndroid Build Coastguard Worker });
101*cc02d7e2SAndroid Build Coastguard Worker });
102*cc02d7e2SAndroid Build Coastguard Worker SimpleResponse phony;
103*cc02d7e2SAndroid Build Coastguard Worker auto cp = ClientPull(context, stream, &phony);
104*cc02d7e2SAndroid Build Coastguard Worker done.store(true, std::memory_order_relaxed); // can be lazy
105*cc02d7e2SAndroid Build Coastguard Worker t.join();
106*cc02d7e2SAndroid Build Coastguard Worker if (!cp.ok()) {
107*cc02d7e2SAndroid Build Coastguard Worker return cp;
108*cc02d7e2SAndroid Build Coastguard Worker }
109*cc02d7e2SAndroid Build Coastguard Worker if (!sp.ok()) {
110*cc02d7e2SAndroid Build Coastguard Worker return sp;
111*cc02d7e2SAndroid Build Coastguard Worker }
112*cc02d7e2SAndroid Build Coastguard Worker return Status::OK;
113*cc02d7e2SAndroid Build Coastguard Worker }
114*cc02d7e2SAndroid Build Coastguard Worker
115*cc02d7e2SAndroid Build Coastguard Worker private:
116*cc02d7e2SAndroid Build Coastguard Worker template <class R>
ClientPull(ServerContext *,R * stream,SimpleResponse * response)117*cc02d7e2SAndroid Build Coastguard Worker static Status ClientPull(ServerContext* /*context*/, R* stream,
118*cc02d7e2SAndroid Build Coastguard Worker SimpleResponse* response) {
119*cc02d7e2SAndroid Build Coastguard Worker SimpleRequest request;
120*cc02d7e2SAndroid Build Coastguard Worker while (stream->Read(&request)) {
121*cc02d7e2SAndroid Build Coastguard Worker }
122*cc02d7e2SAndroid Build Coastguard Worker if (request.response_size() > 0) {
123*cc02d7e2SAndroid Build Coastguard Worker if (!Server::SetPayload(request.response_type(), request.response_size(),
124*cc02d7e2SAndroid Build Coastguard Worker response->mutable_payload())) {
125*cc02d7e2SAndroid Build Coastguard Worker return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
126*cc02d7e2SAndroid Build Coastguard Worker }
127*cc02d7e2SAndroid Build Coastguard Worker }
128*cc02d7e2SAndroid Build Coastguard Worker return Status::OK;
129*cc02d7e2SAndroid Build Coastguard Worker }
130*cc02d7e2SAndroid Build Coastguard Worker template <class W>
ServerPush(ServerContext *,W * stream,const SimpleResponse & response,const std::function<bool ()> & done)131*cc02d7e2SAndroid Build Coastguard Worker static Status ServerPush(ServerContext* /*context*/, W* stream,
132*cc02d7e2SAndroid Build Coastguard Worker const SimpleResponse& response,
133*cc02d7e2SAndroid Build Coastguard Worker const std::function<bool()>& done) {
134*cc02d7e2SAndroid Build Coastguard Worker while ((done == nullptr) || !done()) {
135*cc02d7e2SAndroid Build Coastguard Worker // TODO(vjpai): Add potential for rate-pacing on this
136*cc02d7e2SAndroid Build Coastguard Worker if (!stream->Write(response)) {
137*cc02d7e2SAndroid Build Coastguard Worker return Status(StatusCode::INTERNAL, "Server couldn't push");
138*cc02d7e2SAndroid Build Coastguard Worker }
139*cc02d7e2SAndroid Build Coastguard Worker }
140*cc02d7e2SAndroid Build Coastguard Worker return Status::OK;
141*cc02d7e2SAndroid Build Coastguard Worker }
SetResponse(const SimpleRequest * request,SimpleResponse * response)142*cc02d7e2SAndroid Build Coastguard Worker static Status SetResponse(const SimpleRequest* request,
143*cc02d7e2SAndroid Build Coastguard Worker SimpleResponse* response) {
144*cc02d7e2SAndroid Build Coastguard Worker if (request->response_size() > 0) {
145*cc02d7e2SAndroid Build Coastguard Worker if (!Server::SetPayload(request->response_type(),
146*cc02d7e2SAndroid Build Coastguard Worker request->response_size(),
147*cc02d7e2SAndroid Build Coastguard Worker response->mutable_payload())) {
148*cc02d7e2SAndroid Build Coastguard Worker return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
149*cc02d7e2SAndroid Build Coastguard Worker }
150*cc02d7e2SAndroid Build Coastguard Worker }
151*cc02d7e2SAndroid Build Coastguard Worker return Status::OK;
152*cc02d7e2SAndroid Build Coastguard Worker }
153*cc02d7e2SAndroid Build Coastguard Worker };
154*cc02d7e2SAndroid Build Coastguard Worker
155*cc02d7e2SAndroid Build Coastguard Worker class SynchronousServer final : public grpc::testing::Server {
156*cc02d7e2SAndroid Build Coastguard Worker public:
SynchronousServer(const ServerConfig & config)157*cc02d7e2SAndroid Build Coastguard Worker explicit SynchronousServer(const ServerConfig& config) : Server(config) {
158*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<ServerBuilder> builder = CreateQpsServerBuilder();
159*cc02d7e2SAndroid Build Coastguard Worker
160*cc02d7e2SAndroid Build Coastguard Worker auto port_num = port();
161*cc02d7e2SAndroid Build Coastguard Worker // Negative port number means inproc server, so no listen port needed
162*cc02d7e2SAndroid Build Coastguard Worker if (port_num >= 0) {
163*cc02d7e2SAndroid Build Coastguard Worker std::string server_address = grpc_core::JoinHostPort("::", port_num);
164*cc02d7e2SAndroid Build Coastguard Worker builder->AddListeningPort(
165*cc02d7e2SAndroid Build Coastguard Worker server_address, Server::CreateServerCredentials(config), &port_num);
166*cc02d7e2SAndroid Build Coastguard Worker }
167*cc02d7e2SAndroid Build Coastguard Worker
168*cc02d7e2SAndroid Build Coastguard Worker ApplyConfigToBuilder(config, builder.get());
169*cc02d7e2SAndroid Build Coastguard Worker
170*cc02d7e2SAndroid Build Coastguard Worker builder->RegisterService(&service_);
171*cc02d7e2SAndroid Build Coastguard Worker
172*cc02d7e2SAndroid Build Coastguard Worker impl_ = builder->BuildAndStart();
173*cc02d7e2SAndroid Build Coastguard Worker if (impl_ == nullptr) {
174*cc02d7e2SAndroid Build Coastguard Worker gpr_log(GPR_ERROR, "Server: Fail to BuildAndStart(port=%d)", port_num);
175*cc02d7e2SAndroid Build Coastguard Worker } else {
176*cc02d7e2SAndroid Build Coastguard Worker gpr_log(GPR_INFO, "Server: BuildAndStart(port=%d)", port_num);
177*cc02d7e2SAndroid Build Coastguard Worker }
178*cc02d7e2SAndroid Build Coastguard Worker }
179*cc02d7e2SAndroid Build Coastguard Worker
InProcessChannel(const ChannelArguments & args)180*cc02d7e2SAndroid Build Coastguard Worker std::shared_ptr<Channel> InProcessChannel(
181*cc02d7e2SAndroid Build Coastguard Worker const ChannelArguments& args) override {
182*cc02d7e2SAndroid Build Coastguard Worker return impl_->InProcessChannel(args);
183*cc02d7e2SAndroid Build Coastguard Worker }
184*cc02d7e2SAndroid Build Coastguard Worker
185*cc02d7e2SAndroid Build Coastguard Worker private:
186*cc02d7e2SAndroid Build Coastguard Worker BenchmarkServiceImpl service_;
187*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<grpc::Server> impl_;
188*cc02d7e2SAndroid Build Coastguard Worker };
189*cc02d7e2SAndroid Build Coastguard Worker
CreateSynchronousServer(const ServerConfig & config)190*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<grpc::testing::Server> CreateSynchronousServer(
191*cc02d7e2SAndroid Build Coastguard Worker const ServerConfig& config) {
192*cc02d7e2SAndroid Build Coastguard Worker return std::unique_ptr<Server>(new SynchronousServer(config));
193*cc02d7e2SAndroid Build Coastguard Worker }
194*cc02d7e2SAndroid Build Coastguard Worker
195*cc02d7e2SAndroid Build Coastguard Worker } // namespace testing
196*cc02d7e2SAndroid Build Coastguard Worker } // namespace grpc
197