1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 #include <grpc/grpc.h>
19 #include <grpc/support/alloc.h>
20 #include <grpcpp/security/server_credentials.h>
21 #include <grpcpp/server.h>
22 #include <grpcpp/server_context.h>
23
24 #include "src/core/lib/gprpp/host_port.h"
25 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
26 #include "test/cpp/qps/qps_server_builder.h"
27 #include "test/cpp/qps/server.h"
28 #include "test/cpp/qps/usage_timer.h"
29
30 namespace grpc {
31 namespace testing {
32
33 class BenchmarkCallbackServiceImpl final
34 : public BenchmarkService::CallbackService {
35 public:
UnaryCall(grpc::CallbackServerContext * context,const SimpleRequest * request,SimpleResponse * response)36 grpc::ServerUnaryReactor* UnaryCall(grpc::CallbackServerContext* context,
37 const SimpleRequest* request,
38 SimpleResponse* response) override {
39 auto* reactor = context->DefaultReactor();
40 reactor->Finish(SetResponse(request, response));
41 return reactor;
42 }
43
44 grpc::ServerBidiReactor<grpc::testing::SimpleRequest,
45 grpc::testing::SimpleResponse>*
StreamingCall(grpc::CallbackServerContext *)46 StreamingCall(grpc::CallbackServerContext*) override {
47 class Reactor
48 : public grpc::ServerBidiReactor<grpc::testing::SimpleRequest,
49 grpc::testing::SimpleResponse> {
50 public:
51 Reactor() { StartRead(&request_); }
52
53 void OnReadDone(bool ok) override {
54 if (!ok) {
55 Finish(grpc::Status::OK);
56 return;
57 }
58 auto s = SetResponse(&request_, &response_);
59 if (!s.ok()) {
60 Finish(s);
61 return;
62 }
63 StartWrite(&response_);
64 }
65
66 void OnWriteDone(bool ok) override {
67 if (!ok) {
68 Finish(grpc::Status::OK);
69 return;
70 }
71 StartRead(&request_);
72 }
73
74 void OnDone() override { delete (this); }
75
76 private:
77 SimpleRequest request_;
78 SimpleResponse response_;
79 };
80 return new Reactor;
81 }
82
83 private:
SetResponse(const SimpleRequest * request,SimpleResponse * response)84 static Status SetResponse(const SimpleRequest* request,
85 SimpleResponse* response) {
86 if (request->response_size() > 0) {
87 if (!Server::SetPayload(request->response_type(),
88 request->response_size(),
89 response->mutable_payload())) {
90 return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
91 }
92 }
93 return Status::OK;
94 }
95 };
96
97 class CallbackServer final : public grpc::testing::Server {
98 public:
CallbackServer(const ServerConfig & config)99 explicit CallbackServer(const ServerConfig& config) : Server(config) {
100 std::unique_ptr<ServerBuilder> builder = CreateQpsServerBuilder();
101
102 auto port_num = port();
103 // Negative port number means inproc server, so no listen port needed
104 if (port_num >= 0) {
105 std::string server_address = grpc_core::JoinHostPort("::", port_num);
106 builder->AddListeningPort(
107 server_address, Server::CreateServerCredentials(config), &port_num);
108 }
109
110 ApplyConfigToBuilder(config, builder.get());
111
112 builder->RegisterService(&service_);
113
114 impl_ = builder->BuildAndStart();
115 if (impl_ == nullptr) {
116 gpr_log(GPR_ERROR, "Server: Fail to BuildAndStart(port=%d)", port_num);
117 } else {
118 gpr_log(GPR_INFO, "Server: BuildAndStart(port=%d)", port_num);
119 }
120 }
121
InProcessChannel(const ChannelArguments & args)122 std::shared_ptr<Channel> InProcessChannel(
123 const ChannelArguments& args) override {
124 return impl_->InProcessChannel(args);
125 }
126
127 private:
128 BenchmarkCallbackServiceImpl service_;
129 std::unique_ptr<grpc::Server> impl_;
130 };
131
CreateCallbackServer(const ServerConfig & config)132 std::unique_ptr<grpc::testing::Server> CreateCallbackServer(
133 const ServerConfig& config) {
134 return std::unique_ptr<Server>(new CallbackServer(config));
135 }
136
137 } // namespace testing
138 } // namespace grpc
139