xref: /aosp_15_r20/external/grpc-grpc/include/grpcpp/support/method_handler.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPCPP_SUPPORT_METHOD_HANDLER_H
20 #define GRPCPP_SUPPORT_METHOD_HANDLER_H
21 
22 #include <grpc/byte_buffer.h>
23 #include <grpc/support/log.h>
24 #include <grpcpp/impl/rpc_service_method.h>
25 #include <grpcpp/support/byte_buffer.h>
26 #include <grpcpp/support/sync_stream.h>
27 
28 namespace grpc {
29 
30 namespace internal {
31 
32 // Invoke the method handler, fill in the status, and
33 // return whether or not we finished safely (without an exception).
34 // Note that exception handling is 0-cost in most compiler/library
35 // implementations (except when an exception is actually thrown),
36 // so this process doesn't require additional overhead in the common case.
37 // Additionally, we don't need to return if we caught an exception or not;
38 // the handling is the same in either case.
39 template <class Callable>
CatchingFunctionHandler(Callable && handler)40 ::grpc::Status CatchingFunctionHandler(Callable&& handler) {
41 #if GRPC_ALLOW_EXCEPTIONS
42   try {
43     return handler();
44   } catch (...) {
45     return grpc::Status(grpc::StatusCode::UNKNOWN,
46                         "Unexpected error in RPC handling");
47   }
48 #else   // GRPC_ALLOW_EXCEPTIONS
49   return handler();
50 #endif  // GRPC_ALLOW_EXCEPTIONS
51 }
52 
53 /// A helper function with reduced templating to do the common work needed to
54 /// actually send the server response. Uses non-const parameter for Status since
55 /// this should only ever be called from the end of the RunHandler method.
56 
57 template <class ResponseType>
UnaryRunHandlerHelper(const MethodHandler::HandlerParameter & param,ResponseType * rsp,grpc::Status & status)58 void UnaryRunHandlerHelper(const MethodHandler::HandlerParameter& param,
59                            ResponseType* rsp, grpc::Status& status) {
60   GPR_ASSERT(!param.server_context->sent_initial_metadata_);
61   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
62                             grpc::internal::CallOpSendMessage,
63                             grpc::internal::CallOpServerSendStatus>
64       ops;
65   ops.SendInitialMetadata(&param.server_context->initial_metadata_,
66                           param.server_context->initial_metadata_flags());
67   if (param.server_context->compression_level_set()) {
68     ops.set_compression_level(param.server_context->compression_level());
69   }
70   if (status.ok()) {
71     status = ops.SendMessagePtr(rsp);
72   }
73   ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
74   param.call->PerformOps(&ops);
75   param.call->cq()->Pluck(&ops);
76 }
77 
78 /// A helper function with reduced templating to do deserializing.
79 
80 template <class RequestType>
UnaryDeserializeHelper(grpc_byte_buffer * req,grpc::Status * status,RequestType * request)81 void* UnaryDeserializeHelper(grpc_byte_buffer* req, grpc::Status* status,
82                              RequestType* request) {
83   grpc::ByteBuffer buf;
84   buf.set_buffer(req);
85   *status = grpc::SerializationTraits<RequestType>::Deserialize(
86       &buf, static_cast<RequestType*>(request));
87   buf.Release();
88   if (status->ok()) {
89     return request;
90   }
91   request->~RequestType();
92   return nullptr;
93 }
94 
95 /// A wrapper class of an application provided rpc method handler.
96 template <class ServiceType, class RequestType, class ResponseType,
97           class BaseRequestType = RequestType,
98           class BaseResponseType = ResponseType>
99 class RpcMethodHandler : public grpc::internal::MethodHandler {
100  public:
RpcMethodHandler(std::function<grpc::Status (ServiceType *,grpc::ServerContext *,const RequestType *,ResponseType *)> func,ServiceType * service)101   RpcMethodHandler(
102       std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
103                                  const RequestType*, ResponseType*)>
104           func,
105       ServiceType* service)
106       : func_(func), service_(service) {}
107 
RunHandler(const HandlerParameter & param)108   void RunHandler(const HandlerParameter& param) final {
109     ResponseType rsp;
110     grpc::Status status = param.status;
111     if (status.ok()) {
112       status = CatchingFunctionHandler([this, &param, &rsp] {
113         return func_(service_,
114                      static_cast<grpc::ServerContext*>(param.server_context),
115                      static_cast<RequestType*>(param.request), &rsp);
116       });
117       static_cast<RequestType*>(param.request)->~RequestType();
118     }
119     UnaryRunHandlerHelper(param, static_cast<BaseResponseType*>(&rsp), status);
120   }
121 
Deserialize(grpc_call * call,grpc_byte_buffer * req,grpc::Status * status,void **)122   void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
123                     grpc::Status* status, void** /*handler_data*/) final {
124     auto* request =
125         new (grpc_call_arena_alloc(call, sizeof(RequestType))) RequestType;
126     return UnaryDeserializeHelper(req, status,
127                                   static_cast<BaseRequestType*>(request));
128   }
129 
130  private:
131   /// Application provided rpc handler function.
132   std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
133                              const RequestType*, ResponseType*)>
134       func_;
135   // The class the above handler function lives in.
136   ServiceType* service_;
137 };
138 
139 /// A wrapper class of an application provided client streaming handler.
140 template <class ServiceType, class RequestType, class ResponseType>
141 class ClientStreamingHandler : public grpc::internal::MethodHandler {
142  public:
ClientStreamingHandler(std::function<grpc::Status (ServiceType *,grpc::ServerContext *,ServerReader<RequestType> *,ResponseType *)> func,ServiceType * service)143   ClientStreamingHandler(
144       std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
145                                  ServerReader<RequestType>*, ResponseType*)>
146           func,
147       ServiceType* service)
148       : func_(func), service_(service) {}
149 
RunHandler(const HandlerParameter & param)150   void RunHandler(const HandlerParameter& param) final {
151     ServerReader<RequestType> reader(
152         param.call, static_cast<grpc::ServerContext*>(param.server_context));
153     ResponseType rsp;
154     grpc::Status status =
155         CatchingFunctionHandler([this, &param, &reader, &rsp] {
156           return func_(service_,
157                        static_cast<grpc::ServerContext*>(param.server_context),
158                        &reader, &rsp);
159         });
160 
161     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
162                               grpc::internal::CallOpSendMessage,
163                               grpc::internal::CallOpServerSendStatus>
164         ops;
165     if (!param.server_context->sent_initial_metadata_) {
166       ops.SendInitialMetadata(&param.server_context->initial_metadata_,
167                               param.server_context->initial_metadata_flags());
168       if (param.server_context->compression_level_set()) {
169         ops.set_compression_level(param.server_context->compression_level());
170       }
171     }
172     if (status.ok()) {
173       status = ops.SendMessagePtr(&rsp);
174     }
175     ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
176     param.call->PerformOps(&ops);
177     param.call->cq()->Pluck(&ops);
178   }
179 
180  private:
181   std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
182                              ServerReader<RequestType>*, ResponseType*)>
183       func_;
184   ServiceType* service_;
185 };
186 
187 /// A wrapper class of an application provided server streaming handler.
188 template <class ServiceType, class RequestType, class ResponseType>
189 class ServerStreamingHandler : public grpc::internal::MethodHandler {
190  public:
ServerStreamingHandler(std::function<grpc::Status (ServiceType *,grpc::ServerContext *,const RequestType *,ServerWriter<ResponseType> *)> func,ServiceType * service)191   ServerStreamingHandler(std::function<grpc::Status(
192                              ServiceType*, grpc::ServerContext*,
193                              const RequestType*, ServerWriter<ResponseType>*)>
194                              func,
195                          ServiceType* service)
196       : func_(func), service_(service) {}
197 
RunHandler(const HandlerParameter & param)198   void RunHandler(const HandlerParameter& param) final {
199     grpc::Status status = param.status;
200     if (status.ok()) {
201       ServerWriter<ResponseType> writer(
202           param.call, static_cast<grpc::ServerContext*>(param.server_context));
203       status = CatchingFunctionHandler([this, &param, &writer] {
204         return func_(service_,
205                      static_cast<grpc::ServerContext*>(param.server_context),
206                      static_cast<RequestType*>(param.request), &writer);
207       });
208       static_cast<RequestType*>(param.request)->~RequestType();
209     }
210 
211     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
212                               grpc::internal::CallOpServerSendStatus>
213         ops;
214     if (!param.server_context->sent_initial_metadata_) {
215       ops.SendInitialMetadata(&param.server_context->initial_metadata_,
216                               param.server_context->initial_metadata_flags());
217       if (param.server_context->compression_level_set()) {
218         ops.set_compression_level(param.server_context->compression_level());
219       }
220     }
221     ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
222     param.call->PerformOps(&ops);
223     if (param.server_context->has_pending_ops_) {
224       param.call->cq()->Pluck(&param.server_context->pending_ops_);
225     }
226     param.call->cq()->Pluck(&ops);
227   }
228 
Deserialize(grpc_call * call,grpc_byte_buffer * req,grpc::Status * status,void **)229   void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
230                     grpc::Status* status, void** /*handler_data*/) final {
231     grpc::ByteBuffer buf;
232     buf.set_buffer(req);
233     auto* request =
234         new (grpc_call_arena_alloc(call, sizeof(RequestType))) RequestType();
235     *status =
236         grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
237     buf.Release();
238     if (status->ok()) {
239       return request;
240     }
241     request->~RequestType();
242     return nullptr;
243   }
244 
245  private:
246   std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
247                              const RequestType*, ServerWriter<ResponseType>*)>
248       func_;
249   ServiceType* service_;
250 };
251 
252 /// A wrapper class of an application provided bidi-streaming handler.
253 /// This also applies to server-streamed implementation of a unary method
254 /// with the additional requirement that such methods must have done a
255 /// write for status to be ok
256 /// Since this is used by more than 1 class, the service is not passed in.
257 /// Instead, it is expected to be an implicitly-captured argument of func
258 /// (through bind or something along those lines)
259 template <class Streamer, bool WriteNeeded>
260 class TemplatedBidiStreamingHandler : public grpc::internal::MethodHandler {
261  public:
TemplatedBidiStreamingHandler(std::function<grpc::Status (grpc::ServerContext *,Streamer *)> func)262   explicit TemplatedBidiStreamingHandler(
263       std::function<grpc::Status(grpc::ServerContext*, Streamer*)> func)
264       : func_(func), write_needed_(WriteNeeded) {}
265 
RunHandler(const HandlerParameter & param)266   void RunHandler(const HandlerParameter& param) final {
267     Streamer stream(param.call,
268                     static_cast<grpc::ServerContext*>(param.server_context));
269     grpc::Status status = CatchingFunctionHandler([this, &param, &stream] {
270       return func_(static_cast<grpc::ServerContext*>(param.server_context),
271                    &stream);
272     });
273 
274     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
275                               grpc::internal::CallOpServerSendStatus>
276         ops;
277     if (!param.server_context->sent_initial_metadata_) {
278       ops.SendInitialMetadata(&param.server_context->initial_metadata_,
279                               param.server_context->initial_metadata_flags());
280       if (param.server_context->compression_level_set()) {
281         ops.set_compression_level(param.server_context->compression_level());
282       }
283       if (write_needed_ && status.ok()) {
284         // If we needed a write but never did one, we need to mark the
285         // status as a fail
286         status = grpc::Status(grpc::StatusCode::INTERNAL,
287                               "Service did not provide response message");
288       }
289     }
290     ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
291     param.call->PerformOps(&ops);
292     if (param.server_context->has_pending_ops_) {
293       param.call->cq()->Pluck(&param.server_context->pending_ops_);
294     }
295     param.call->cq()->Pluck(&ops);
296   }
297 
298  private:
299   std::function<grpc::Status(grpc::ServerContext*, Streamer*)> func_;
300   const bool write_needed_;
301 };
302 
303 template <class ServiceType, class RequestType, class ResponseType>
304 class BidiStreamingHandler
305     : public TemplatedBidiStreamingHandler<
306           ServerReaderWriter<ResponseType, RequestType>, false> {
307  public:
BidiStreamingHandler(std::function<grpc::Status (ServiceType *,grpc::ServerContext *,ServerReaderWriter<ResponseType,RequestType> *)> func,ServiceType * service)308   BidiStreamingHandler(std::function<grpc::Status(
309                            ServiceType*, grpc::ServerContext*,
310                            ServerReaderWriter<ResponseType, RequestType>*)>
311                            func,
312                        ServiceType* service)
313       // TODO(vjpai): When gRPC supports C++14, move-capture func in the below
314       : TemplatedBidiStreamingHandler<
315             ServerReaderWriter<ResponseType, RequestType>, false>(
316             [func, service](
317                 grpc::ServerContext* ctx,
318                 ServerReaderWriter<ResponseType, RequestType>* streamer) {
319               return func(service, ctx, streamer);
320             }) {}
321 };
322 
323 template <class RequestType, class ResponseType>
324 class StreamedUnaryHandler
325     : public TemplatedBidiStreamingHandler<
326           ServerUnaryStreamer<RequestType, ResponseType>, true> {
327  public:
StreamedUnaryHandler(std::function<grpc::Status (grpc::ServerContext *,ServerUnaryStreamer<RequestType,ResponseType> *)> func)328   explicit StreamedUnaryHandler(
329       std::function<
330           grpc::Status(grpc::ServerContext*,
331                        ServerUnaryStreamer<RequestType, ResponseType>*)>
332           func)
333       : TemplatedBidiStreamingHandler<
334             ServerUnaryStreamer<RequestType, ResponseType>, true>(
335             std::move(func)) {}
336 };
337 
338 template <class RequestType, class ResponseType>
339 class SplitServerStreamingHandler
340     : public TemplatedBidiStreamingHandler<
341           ServerSplitStreamer<RequestType, ResponseType>, false> {
342  public:
SplitServerStreamingHandler(std::function<grpc::Status (grpc::ServerContext *,ServerSplitStreamer<RequestType,ResponseType> *)> func)343   explicit SplitServerStreamingHandler(
344       std::function<
345           grpc::Status(grpc::ServerContext*,
346                        ServerSplitStreamer<RequestType, ResponseType>*)>
347           func)
348       : TemplatedBidiStreamingHandler<
349             ServerSplitStreamer<RequestType, ResponseType>, false>(
350             std::move(func)) {}
351 };
352 
353 /// General method handler class for errors that prevent real method use
354 /// e.g., handle unknown method by returning UNIMPLEMENTED error.
355 template <grpc::StatusCode code>
356 class ErrorMethodHandler : public grpc::internal::MethodHandler {
357  public:
ErrorMethodHandler(const std::string & message)358   explicit ErrorMethodHandler(const std::string& message) : message_(message) {}
359 
360   template <class T>
FillOps(grpc::ServerContextBase * context,const std::string & message,T * ops)361   static void FillOps(grpc::ServerContextBase* context,
362                       const std::string& message, T* ops) {
363     grpc::Status status(code, message);
364     if (!context->sent_initial_metadata_) {
365       ops->SendInitialMetadata(&context->initial_metadata_,
366                                context->initial_metadata_flags());
367       if (context->compression_level_set()) {
368         ops->set_compression_level(context->compression_level());
369       }
370       context->sent_initial_metadata_ = true;
371     }
372     ops->ServerSendStatus(&context->trailing_metadata_, status);
373   }
374 
RunHandler(const HandlerParameter & param)375   void RunHandler(const HandlerParameter& param) final {
376     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
377                               grpc::internal::CallOpServerSendStatus>
378         ops;
379     FillOps(param.server_context, message_, &ops);
380     param.call->PerformOps(&ops);
381     param.call->cq()->Pluck(&ops);
382   }
383 
Deserialize(grpc_call *,grpc_byte_buffer * req,grpc::Status *,void **)384   void* Deserialize(grpc_call* /*call*/, grpc_byte_buffer* req,
385                     grpc::Status* /*status*/, void** /*handler_data*/) final {
386     // We have to destroy any request payload
387     if (req != nullptr) {
388       grpc_byte_buffer_destroy(req);
389     }
390     return nullptr;
391   }
392 
393  private:
394   const std::string message_;
395 };
396 
397 typedef ErrorMethodHandler<grpc::StatusCode::UNIMPLEMENTED>
398     UnknownMethodHandler;
399 typedef ErrorMethodHandler<grpc::StatusCode::RESOURCE_EXHAUSTED>
400     ResourceExhaustedHandler;
401 
402 }  // namespace internal
403 }  // namespace grpc
404 
405 #endif  // GRPCPP_SUPPORT_METHOD_HANDLER_H
406