1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #ifndef GRPCPP_SUPPORT_METHOD_HANDLER_H
20 #define GRPCPP_SUPPORT_METHOD_HANDLER_H
21
22 #include <grpc/byte_buffer.h>
23 #include <grpc/support/log.h>
24 #include <grpcpp/impl/rpc_service_method.h>
25 #include <grpcpp/support/byte_buffer.h>
26 #include <grpcpp/support/sync_stream.h>
27
28 namespace grpc {
29
30 namespace internal {
31
32 // Invoke the method handler, fill in the status, and
33 // return whether or not we finished safely (without an exception).
34 // Note that exception handling is 0-cost in most compiler/library
35 // implementations (except when an exception is actually thrown),
36 // so this process doesn't require additional overhead in the common case.
37 // Additionally, we don't need to return if we caught an exception or not;
38 // the handling is the same in either case.
39 template <class Callable>
CatchingFunctionHandler(Callable && handler)40 ::grpc::Status CatchingFunctionHandler(Callable&& handler) {
41 #if GRPC_ALLOW_EXCEPTIONS
42 try {
43 return handler();
44 } catch (...) {
45 return grpc::Status(grpc::StatusCode::UNKNOWN,
46 "Unexpected error in RPC handling");
47 }
48 #else // GRPC_ALLOW_EXCEPTIONS
49 return handler();
50 #endif // GRPC_ALLOW_EXCEPTIONS
51 }
52
53 /// A helper function with reduced templating to do the common work needed to
54 /// actually send the server response. Uses non-const parameter for Status since
55 /// this should only ever be called from the end of the RunHandler method.
56
57 template <class ResponseType>
UnaryRunHandlerHelper(const MethodHandler::HandlerParameter & param,ResponseType * rsp,grpc::Status & status)58 void UnaryRunHandlerHelper(const MethodHandler::HandlerParameter& param,
59 ResponseType* rsp, grpc::Status& status) {
60 GPR_ASSERT(!param.server_context->sent_initial_metadata_);
61 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
62 grpc::internal::CallOpSendMessage,
63 grpc::internal::CallOpServerSendStatus>
64 ops;
65 ops.SendInitialMetadata(¶m.server_context->initial_metadata_,
66 param.server_context->initial_metadata_flags());
67 if (param.server_context->compression_level_set()) {
68 ops.set_compression_level(param.server_context->compression_level());
69 }
70 if (status.ok()) {
71 status = ops.SendMessagePtr(rsp);
72 }
73 ops.ServerSendStatus(¶m.server_context->trailing_metadata_, status);
74 param.call->PerformOps(&ops);
75 param.call->cq()->Pluck(&ops);
76 }
77
78 /// A helper function with reduced templating to do deserializing.
79
80 template <class RequestType>
UnaryDeserializeHelper(grpc_byte_buffer * req,grpc::Status * status,RequestType * request)81 void* UnaryDeserializeHelper(grpc_byte_buffer* req, grpc::Status* status,
82 RequestType* request) {
83 grpc::ByteBuffer buf;
84 buf.set_buffer(req);
85 *status = grpc::SerializationTraits<RequestType>::Deserialize(
86 &buf, static_cast<RequestType*>(request));
87 buf.Release();
88 if (status->ok()) {
89 return request;
90 }
91 request->~RequestType();
92 return nullptr;
93 }
94
95 /// A wrapper class of an application provided rpc method handler.
96 template <class ServiceType, class RequestType, class ResponseType,
97 class BaseRequestType = RequestType,
98 class BaseResponseType = ResponseType>
99 class RpcMethodHandler : public grpc::internal::MethodHandler {
100 public:
RpcMethodHandler(std::function<grpc::Status (ServiceType *,grpc::ServerContext *,const RequestType *,ResponseType *)> func,ServiceType * service)101 RpcMethodHandler(
102 std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
103 const RequestType*, ResponseType*)>
104 func,
105 ServiceType* service)
106 : func_(func), service_(service) {}
107
RunHandler(const HandlerParameter & param)108 void RunHandler(const HandlerParameter& param) final {
109 ResponseType rsp;
110 grpc::Status status = param.status;
111 if (status.ok()) {
112 status = CatchingFunctionHandler([this, ¶m, &rsp] {
113 return func_(service_,
114 static_cast<grpc::ServerContext*>(param.server_context),
115 static_cast<RequestType*>(param.request), &rsp);
116 });
117 static_cast<RequestType*>(param.request)->~RequestType();
118 }
119 UnaryRunHandlerHelper(param, static_cast<BaseResponseType*>(&rsp), status);
120 }
121
Deserialize(grpc_call * call,grpc_byte_buffer * req,grpc::Status * status,void **)122 void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
123 grpc::Status* status, void** /*handler_data*/) final {
124 auto* request =
125 new (grpc_call_arena_alloc(call, sizeof(RequestType))) RequestType;
126 return UnaryDeserializeHelper(req, status,
127 static_cast<BaseRequestType*>(request));
128 }
129
130 private:
131 /// Application provided rpc handler function.
132 std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
133 const RequestType*, ResponseType*)>
134 func_;
135 // The class the above handler function lives in.
136 ServiceType* service_;
137 };
138
139 /// A wrapper class of an application provided client streaming handler.
140 template <class ServiceType, class RequestType, class ResponseType>
141 class ClientStreamingHandler : public grpc::internal::MethodHandler {
142 public:
ClientStreamingHandler(std::function<grpc::Status (ServiceType *,grpc::ServerContext *,ServerReader<RequestType> *,ResponseType *)> func,ServiceType * service)143 ClientStreamingHandler(
144 std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
145 ServerReader<RequestType>*, ResponseType*)>
146 func,
147 ServiceType* service)
148 : func_(func), service_(service) {}
149
RunHandler(const HandlerParameter & param)150 void RunHandler(const HandlerParameter& param) final {
151 ServerReader<RequestType> reader(
152 param.call, static_cast<grpc::ServerContext*>(param.server_context));
153 ResponseType rsp;
154 grpc::Status status =
155 CatchingFunctionHandler([this, ¶m, &reader, &rsp] {
156 return func_(service_,
157 static_cast<grpc::ServerContext*>(param.server_context),
158 &reader, &rsp);
159 });
160
161 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
162 grpc::internal::CallOpSendMessage,
163 grpc::internal::CallOpServerSendStatus>
164 ops;
165 if (!param.server_context->sent_initial_metadata_) {
166 ops.SendInitialMetadata(¶m.server_context->initial_metadata_,
167 param.server_context->initial_metadata_flags());
168 if (param.server_context->compression_level_set()) {
169 ops.set_compression_level(param.server_context->compression_level());
170 }
171 }
172 if (status.ok()) {
173 status = ops.SendMessagePtr(&rsp);
174 }
175 ops.ServerSendStatus(¶m.server_context->trailing_metadata_, status);
176 param.call->PerformOps(&ops);
177 param.call->cq()->Pluck(&ops);
178 }
179
180 private:
181 std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
182 ServerReader<RequestType>*, ResponseType*)>
183 func_;
184 ServiceType* service_;
185 };
186
187 /// A wrapper class of an application provided server streaming handler.
188 template <class ServiceType, class RequestType, class ResponseType>
189 class ServerStreamingHandler : public grpc::internal::MethodHandler {
190 public:
ServerStreamingHandler(std::function<grpc::Status (ServiceType *,grpc::ServerContext *,const RequestType *,ServerWriter<ResponseType> *)> func,ServiceType * service)191 ServerStreamingHandler(std::function<grpc::Status(
192 ServiceType*, grpc::ServerContext*,
193 const RequestType*, ServerWriter<ResponseType>*)>
194 func,
195 ServiceType* service)
196 : func_(func), service_(service) {}
197
RunHandler(const HandlerParameter & param)198 void RunHandler(const HandlerParameter& param) final {
199 grpc::Status status = param.status;
200 if (status.ok()) {
201 ServerWriter<ResponseType> writer(
202 param.call, static_cast<grpc::ServerContext*>(param.server_context));
203 status = CatchingFunctionHandler([this, ¶m, &writer] {
204 return func_(service_,
205 static_cast<grpc::ServerContext*>(param.server_context),
206 static_cast<RequestType*>(param.request), &writer);
207 });
208 static_cast<RequestType*>(param.request)->~RequestType();
209 }
210
211 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
212 grpc::internal::CallOpServerSendStatus>
213 ops;
214 if (!param.server_context->sent_initial_metadata_) {
215 ops.SendInitialMetadata(¶m.server_context->initial_metadata_,
216 param.server_context->initial_metadata_flags());
217 if (param.server_context->compression_level_set()) {
218 ops.set_compression_level(param.server_context->compression_level());
219 }
220 }
221 ops.ServerSendStatus(¶m.server_context->trailing_metadata_, status);
222 param.call->PerformOps(&ops);
223 if (param.server_context->has_pending_ops_) {
224 param.call->cq()->Pluck(¶m.server_context->pending_ops_);
225 }
226 param.call->cq()->Pluck(&ops);
227 }
228
Deserialize(grpc_call * call,grpc_byte_buffer * req,grpc::Status * status,void **)229 void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
230 grpc::Status* status, void** /*handler_data*/) final {
231 grpc::ByteBuffer buf;
232 buf.set_buffer(req);
233 auto* request =
234 new (grpc_call_arena_alloc(call, sizeof(RequestType))) RequestType();
235 *status =
236 grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
237 buf.Release();
238 if (status->ok()) {
239 return request;
240 }
241 request->~RequestType();
242 return nullptr;
243 }
244
245 private:
246 std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
247 const RequestType*, ServerWriter<ResponseType>*)>
248 func_;
249 ServiceType* service_;
250 };
251
252 /// A wrapper class of an application provided bidi-streaming handler.
253 /// This also applies to server-streamed implementation of a unary method
254 /// with the additional requirement that such methods must have done a
255 /// write for status to be ok
256 /// Since this is used by more than 1 class, the service is not passed in.
257 /// Instead, it is expected to be an implicitly-captured argument of func
258 /// (through bind or something along those lines)
259 template <class Streamer, bool WriteNeeded>
260 class TemplatedBidiStreamingHandler : public grpc::internal::MethodHandler {
261 public:
TemplatedBidiStreamingHandler(std::function<grpc::Status (grpc::ServerContext *,Streamer *)> func)262 explicit TemplatedBidiStreamingHandler(
263 std::function<grpc::Status(grpc::ServerContext*, Streamer*)> func)
264 : func_(func), write_needed_(WriteNeeded) {}
265
RunHandler(const HandlerParameter & param)266 void RunHandler(const HandlerParameter& param) final {
267 Streamer stream(param.call,
268 static_cast<grpc::ServerContext*>(param.server_context));
269 grpc::Status status = CatchingFunctionHandler([this, ¶m, &stream] {
270 return func_(static_cast<grpc::ServerContext*>(param.server_context),
271 &stream);
272 });
273
274 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
275 grpc::internal::CallOpServerSendStatus>
276 ops;
277 if (!param.server_context->sent_initial_metadata_) {
278 ops.SendInitialMetadata(¶m.server_context->initial_metadata_,
279 param.server_context->initial_metadata_flags());
280 if (param.server_context->compression_level_set()) {
281 ops.set_compression_level(param.server_context->compression_level());
282 }
283 if (write_needed_ && status.ok()) {
284 // If we needed a write but never did one, we need to mark the
285 // status as a fail
286 status = grpc::Status(grpc::StatusCode::INTERNAL,
287 "Service did not provide response message");
288 }
289 }
290 ops.ServerSendStatus(¶m.server_context->trailing_metadata_, status);
291 param.call->PerformOps(&ops);
292 if (param.server_context->has_pending_ops_) {
293 param.call->cq()->Pluck(¶m.server_context->pending_ops_);
294 }
295 param.call->cq()->Pluck(&ops);
296 }
297
298 private:
299 std::function<grpc::Status(grpc::ServerContext*, Streamer*)> func_;
300 const bool write_needed_;
301 };
302
303 template <class ServiceType, class RequestType, class ResponseType>
304 class BidiStreamingHandler
305 : public TemplatedBidiStreamingHandler<
306 ServerReaderWriter<ResponseType, RequestType>, false> {
307 public:
BidiStreamingHandler(std::function<grpc::Status (ServiceType *,grpc::ServerContext *,ServerReaderWriter<ResponseType,RequestType> *)> func,ServiceType * service)308 BidiStreamingHandler(std::function<grpc::Status(
309 ServiceType*, grpc::ServerContext*,
310 ServerReaderWriter<ResponseType, RequestType>*)>
311 func,
312 ServiceType* service)
313 // TODO(vjpai): When gRPC supports C++14, move-capture func in the below
314 : TemplatedBidiStreamingHandler<
315 ServerReaderWriter<ResponseType, RequestType>, false>(
316 [func, service](
317 grpc::ServerContext* ctx,
318 ServerReaderWriter<ResponseType, RequestType>* streamer) {
319 return func(service, ctx, streamer);
320 }) {}
321 };
322
323 template <class RequestType, class ResponseType>
324 class StreamedUnaryHandler
325 : public TemplatedBidiStreamingHandler<
326 ServerUnaryStreamer<RequestType, ResponseType>, true> {
327 public:
StreamedUnaryHandler(std::function<grpc::Status (grpc::ServerContext *,ServerUnaryStreamer<RequestType,ResponseType> *)> func)328 explicit StreamedUnaryHandler(
329 std::function<
330 grpc::Status(grpc::ServerContext*,
331 ServerUnaryStreamer<RequestType, ResponseType>*)>
332 func)
333 : TemplatedBidiStreamingHandler<
334 ServerUnaryStreamer<RequestType, ResponseType>, true>(
335 std::move(func)) {}
336 };
337
338 template <class RequestType, class ResponseType>
339 class SplitServerStreamingHandler
340 : public TemplatedBidiStreamingHandler<
341 ServerSplitStreamer<RequestType, ResponseType>, false> {
342 public:
SplitServerStreamingHandler(std::function<grpc::Status (grpc::ServerContext *,ServerSplitStreamer<RequestType,ResponseType> *)> func)343 explicit SplitServerStreamingHandler(
344 std::function<
345 grpc::Status(grpc::ServerContext*,
346 ServerSplitStreamer<RequestType, ResponseType>*)>
347 func)
348 : TemplatedBidiStreamingHandler<
349 ServerSplitStreamer<RequestType, ResponseType>, false>(
350 std::move(func)) {}
351 };
352
353 /// General method handler class for errors that prevent real method use
354 /// e.g., handle unknown method by returning UNIMPLEMENTED error.
355 template <grpc::StatusCode code>
356 class ErrorMethodHandler : public grpc::internal::MethodHandler {
357 public:
ErrorMethodHandler(const std::string & message)358 explicit ErrorMethodHandler(const std::string& message) : message_(message) {}
359
360 template <class T>
FillOps(grpc::ServerContextBase * context,const std::string & message,T * ops)361 static void FillOps(grpc::ServerContextBase* context,
362 const std::string& message, T* ops) {
363 grpc::Status status(code, message);
364 if (!context->sent_initial_metadata_) {
365 ops->SendInitialMetadata(&context->initial_metadata_,
366 context->initial_metadata_flags());
367 if (context->compression_level_set()) {
368 ops->set_compression_level(context->compression_level());
369 }
370 context->sent_initial_metadata_ = true;
371 }
372 ops->ServerSendStatus(&context->trailing_metadata_, status);
373 }
374
RunHandler(const HandlerParameter & param)375 void RunHandler(const HandlerParameter& param) final {
376 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
377 grpc::internal::CallOpServerSendStatus>
378 ops;
379 FillOps(param.server_context, message_, &ops);
380 param.call->PerformOps(&ops);
381 param.call->cq()->Pluck(&ops);
382 }
383
Deserialize(grpc_call *,grpc_byte_buffer * req,grpc::Status *,void **)384 void* Deserialize(grpc_call* /*call*/, grpc_byte_buffer* req,
385 grpc::Status* /*status*/, void** /*handler_data*/) final {
386 // We have to destroy any request payload
387 if (req != nullptr) {
388 grpc_byte_buffer_destroy(req);
389 }
390 return nullptr;
391 }
392
393 private:
394 const std::string message_;
395 };
396
397 typedef ErrorMethodHandler<grpc::StatusCode::UNIMPLEMENTED>
398 UnknownMethodHandler;
399 typedef ErrorMethodHandler<grpc::StatusCode::RESOURCE_EXHAUSTED>
400 ResourceExhaustedHandler;
401
402 } // namespace internal
403 } // namespace grpc
404
405 #endif // GRPCPP_SUPPORT_METHOD_HANDLER_H
406