xref: /aosp_15_r20/external/grpc-grpc/test/cpp/qps/server_callback.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 #include <grpc/grpc.h>
19 #include <grpc/support/alloc.h>
20 #include <grpcpp/security/server_credentials.h>
21 #include <grpcpp/server.h>
22 #include <grpcpp/server_context.h>
23 
24 #include "src/core/lib/gprpp/host_port.h"
25 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
26 #include "test/cpp/qps/qps_server_builder.h"
27 #include "test/cpp/qps/server.h"
28 #include "test/cpp/qps/usage_timer.h"
29 
30 namespace grpc {
31 namespace testing {
32 
33 class BenchmarkCallbackServiceImpl final
34     : public BenchmarkService::CallbackService {
35  public:
UnaryCall(grpc::CallbackServerContext * context,const SimpleRequest * request,SimpleResponse * response)36   grpc::ServerUnaryReactor* UnaryCall(grpc::CallbackServerContext* context,
37                                       const SimpleRequest* request,
38                                       SimpleResponse* response) override {
39     auto* reactor = context->DefaultReactor();
40     reactor->Finish(SetResponse(request, response));
41     return reactor;
42   }
43 
44   grpc::ServerBidiReactor<grpc::testing::SimpleRequest,
45                           grpc::testing::SimpleResponse>*
StreamingCall(grpc::CallbackServerContext *)46   StreamingCall(grpc::CallbackServerContext*) override {
47     class Reactor
48         : public grpc::ServerBidiReactor<grpc::testing::SimpleRequest,
49                                          grpc::testing::SimpleResponse> {
50      public:
51       Reactor() { StartRead(&request_); }
52 
53       void OnReadDone(bool ok) override {
54         if (!ok) {
55           Finish(grpc::Status::OK);
56           return;
57         }
58         auto s = SetResponse(&request_, &response_);
59         if (!s.ok()) {
60           Finish(s);
61           return;
62         }
63         StartWrite(&response_);
64       }
65 
66       void OnWriteDone(bool ok) override {
67         if (!ok) {
68           Finish(grpc::Status::OK);
69           return;
70         }
71         StartRead(&request_);
72       }
73 
74       void OnDone() override { delete (this); }
75 
76      private:
77       SimpleRequest request_;
78       SimpleResponse response_;
79     };
80     return new Reactor;
81   }
82 
83  private:
SetResponse(const SimpleRequest * request,SimpleResponse * response)84   static Status SetResponse(const SimpleRequest* request,
85                             SimpleResponse* response) {
86     if (request->response_size() > 0) {
87       if (!Server::SetPayload(request->response_type(),
88                               request->response_size(),
89                               response->mutable_payload())) {
90         return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
91       }
92     }
93     return Status::OK;
94   }
95 };
96 
97 class CallbackServer final : public grpc::testing::Server {
98  public:
CallbackServer(const ServerConfig & config)99   explicit CallbackServer(const ServerConfig& config) : Server(config) {
100     std::unique_ptr<ServerBuilder> builder = CreateQpsServerBuilder();
101 
102     auto port_num = port();
103     // Negative port number means inproc server, so no listen port needed
104     if (port_num >= 0) {
105       std::string server_address = grpc_core::JoinHostPort("::", port_num);
106       builder->AddListeningPort(
107           server_address, Server::CreateServerCredentials(config), &port_num);
108     }
109 
110     ApplyConfigToBuilder(config, builder.get());
111 
112     builder->RegisterService(&service_);
113 
114     impl_ = builder->BuildAndStart();
115     if (impl_ == nullptr) {
116       gpr_log(GPR_ERROR, "Server: Fail to BuildAndStart(port=%d)", port_num);
117     } else {
118       gpr_log(GPR_INFO, "Server: BuildAndStart(port=%d)", port_num);
119     }
120   }
121 
InProcessChannel(const ChannelArguments & args)122   std::shared_ptr<Channel> InProcessChannel(
123       const ChannelArguments& args) override {
124     return impl_->InProcessChannel(args);
125   }
126 
127  private:
128   BenchmarkCallbackServiceImpl service_;
129   std::unique_ptr<grpc::Server> impl_;
130 };
131 
CreateCallbackServer(const ServerConfig & config)132 std::unique_ptr<grpc::testing::Server> CreateCallbackServer(
133     const ServerConfig& config) {
134   return std::unique_ptr<Server>(new CallbackServer(config));
135 }
136 
137 }  // namespace testing
138 }  // namespace grpc
139