1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 //
17
18 #include <limits.h>
19 #include <string.h>
20
21 #include <algorithm>
22 #include <atomic>
23 #include <cstdlib>
24 #include <memory>
25 #include <new>
26 #include <sstream>
27 #include <string>
28 #include <type_traits>
29 #include <utility>
30 #include <vector>
31
32 #include "absl/status/status.h"
33
34 #include <grpc/byte_buffer.h>
35 #include <grpc/grpc.h>
36 #include <grpc/slice.h>
37 #include <grpc/support/log.h>
38 #include <grpc/support/sync.h>
39 #include <grpc/support/time.h>
40 #include <grpcpp/channel.h>
41 #include <grpcpp/completion_queue.h>
42 #include <grpcpp/generic/async_generic_service.h>
43 #include <grpcpp/health_check_service_interface.h>
44 #include <grpcpp/impl/call.h>
45 #include <grpcpp/impl/call_op_set.h>
46 #include <grpcpp/impl/call_op_set_interface.h>
47 #include <grpcpp/impl/completion_queue_tag.h>
48 #include <grpcpp/impl/interceptor_common.h>
49 #include <grpcpp/impl/metadata_map.h>
50 #include <grpcpp/impl/rpc_method.h>
51 #include <grpcpp/impl/rpc_service_method.h>
52 #include <grpcpp/impl/server_callback_handlers.h>
53 #include <grpcpp/impl/server_initializer.h>
54 #include <grpcpp/impl/service_type.h>
55 #include <grpcpp/impl/sync.h>
56 #include <grpcpp/security/server_credentials.h>
57 #include <grpcpp/server.h>
58 #include <grpcpp/server_context.h>
59 #include <grpcpp/server_interface.h>
60 #include <grpcpp/support/channel_arguments.h>
61 #include <grpcpp/support/client_interceptor.h>
62 #include <grpcpp/support/interceptor.h>
63 #include <grpcpp/support/method_handler.h>
64 #include <grpcpp/support/server_interceptor.h>
65 #include <grpcpp/support/slice.h>
66 #include <grpcpp/support/status.h>
67
68 #include "src/core/ext/transport/inproc/inproc_transport.h"
69 #include "src/core/lib/gprpp/manual_constructor.h"
70 #include "src/core/lib/iomgr/exec_ctx.h"
71 #include "src/core/lib/iomgr/iomgr.h"
72 #include "src/core/lib/resource_quota/api.h"
73 #include "src/core/lib/surface/completion_queue.h"
74 #include "src/core/lib/surface/server.h"
75 #include "src/cpp/client/create_channel_internal.h"
76 #include "src/cpp/server/external_connection_acceptor_impl.h"
77 #include "src/cpp/server/health/default_health_check_service.h"
78 #include "src/cpp/thread_manager/thread_manager.h"
79
80 namespace grpc {
81 namespace {
82
83 // The default value for maximum number of threads that can be created in the
84 // sync server. This value of INT_MAX is chosen to match the default behavior if
85 // no ResourceQuota is set. To modify the max number of threads in a sync
86 // server, pass a custom ResourceQuota object (with the desired number of
87 // max-threads set) to the server builder.
88 #define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX
89
90 // Give a useful status error message if the resource is exhausted specifically
91 // because the server threadpool is full.
92 const char* kServerThreadpoolExhausted = "Server Threadpool Exhausted";
93
94 // Although we might like to give a useful status error message on unimplemented
95 // RPCs, it's not always possible since that also would need to be added across
96 // languages and isn't actually required by the spec.
97 const char* kUnknownRpcMethod = "";
98
99 class DefaultGlobalCallbacks final : public Server::GlobalCallbacks {
100 public:
~DefaultGlobalCallbacks()101 ~DefaultGlobalCallbacks() override {}
PreSynchronousRequest(ServerContext *)102 void PreSynchronousRequest(ServerContext* /*context*/) override {}
PostSynchronousRequest(ServerContext *)103 void PostSynchronousRequest(ServerContext* /*context*/) override {}
104 };
105
106 std::shared_ptr<Server::GlobalCallbacks> g_callbacks = nullptr;
107 gpr_once g_once_init_callbacks = GPR_ONCE_INIT;
108
InitGlobalCallbacks()109 void InitGlobalCallbacks() {
110 if (!g_callbacks) {
111 g_callbacks.reset(new DefaultGlobalCallbacks());
112 }
113 }
114
115 class ShutdownTag : public internal::CompletionQueueTag {
116 public:
FinalizeResult(void **,bool *)117 bool FinalizeResult(void** /*tag*/, bool* /*status*/) override {
118 return false;
119 }
120 };
121
122 class PhonyTag : public internal::CompletionQueueTag {
123 public:
FinalizeResult(void **,bool *)124 bool FinalizeResult(void** /*tag*/, bool* /*status*/) override {
125 return true;
126 }
127 };
128
129 class UnimplementedAsyncRequestContext {
130 protected:
UnimplementedAsyncRequestContext()131 UnimplementedAsyncRequestContext() : generic_stream_(&server_context_) {}
132
133 GenericServerContext server_context_;
134 GenericServerAsyncReaderWriter generic_stream_;
135 };
136
137 } // namespace
138
BaseAsyncRequest(ServerInterface * server,ServerContext * context,internal::ServerAsyncStreamingInterface * stream,CompletionQueue * call_cq,ServerCompletionQueue * notification_cq,void * tag,bool delete_on_finalize)139 ServerInterface::BaseAsyncRequest::BaseAsyncRequest(
140 ServerInterface* server, ServerContext* context,
141 internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
142 ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize)
143 : server_(server),
144 context_(context),
145 stream_(stream),
146 call_cq_(call_cq),
147 notification_cq_(notification_cq),
148 tag_(tag),
149 delete_on_finalize_(delete_on_finalize),
150 call_(nullptr),
151 done_intercepting_(false) {
152 // Set up interception state partially for the receive ops. call_wrapper_ is
153 // not filled at this point, but it will be filled before the interceptors are
154 // run.
155 interceptor_methods_.SetCall(&call_wrapper_);
156 interceptor_methods_.SetReverse();
157 call_cq_->RegisterAvalanching(); // This op will trigger more ops
158 call_metric_recording_enabled_ = server_->call_metric_recording_enabled();
159 server_metric_recorder_ = server_->server_metric_recorder();
160 }
161
~BaseAsyncRequest()162 ServerInterface::BaseAsyncRequest::~BaseAsyncRequest() {
163 call_cq_->CompleteAvalanching();
164 }
165
FinalizeResult(void ** tag,bool * status)166 bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
167 bool* status) {
168 if (done_intercepting_) {
169 *tag = tag_;
170 if (delete_on_finalize_) {
171 delete this;
172 }
173 return true;
174 }
175 context_->set_call(call_, call_metric_recording_enabled_,
176 server_metric_recorder_);
177 context_->cq_ = call_cq_;
178 if (call_wrapper_.call() == nullptr) {
179 // Fill it since it is empty.
180 call_wrapper_ = internal::Call(
181 call_, server_, call_cq_, server_->max_receive_message_size(), nullptr);
182 }
183
184 // just the pointers inside call are copied here
185 stream_->BindCall(&call_wrapper_);
186
187 if (*status && call_ && call_wrapper_.server_rpc_info()) {
188 done_intercepting_ = true;
189 // Set interception point for RECV INITIAL METADATA
190 interceptor_methods_.AddInterceptionHookPoint(
191 experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA);
192 interceptor_methods_.SetRecvInitialMetadata(&context_->client_metadata_);
193 if (interceptor_methods_.RunInterceptors(
194 [this]() { ContinueFinalizeResultAfterInterception(); })) {
195 // There are no interceptors to run. Continue
196 } else {
197 // There were interceptors to be run, so
198 // ContinueFinalizeResultAfterInterception will be run when interceptors
199 // are done.
200 return false;
201 }
202 }
203 if (*status && call_) {
204 context_->BeginCompletionOp(&call_wrapper_, nullptr, nullptr);
205 }
206 *tag = tag_;
207 if (delete_on_finalize_) {
208 delete this;
209 }
210 return true;
211 }
212
213 void ServerInterface::BaseAsyncRequest::
ContinueFinalizeResultAfterInterception()214 ContinueFinalizeResultAfterInterception() {
215 context_->BeginCompletionOp(&call_wrapper_, nullptr, nullptr);
216 // Queue a tag which will be returned immediately
217 grpc_core::ExecCtx exec_ctx;
218 grpc_cq_begin_op(notification_cq_->cq(), this);
219 grpc_cq_end_op(
220 notification_cq_->cq(), this, absl::OkStatus(),
221 [](void* /*arg*/, grpc_cq_completion* completion) { delete completion; },
222 nullptr, new grpc_cq_completion());
223 }
224
RegisteredAsyncRequest(ServerInterface * server,ServerContext * context,internal::ServerAsyncStreamingInterface * stream,CompletionQueue * call_cq,ServerCompletionQueue * notification_cq,void * tag,const char * name,internal::RpcMethod::RpcType type)225 ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest(
226 ServerInterface* server, ServerContext* context,
227 internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
228 ServerCompletionQueue* notification_cq, void* tag, const char* name,
229 internal::RpcMethod::RpcType type)
230 : BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag,
231 true),
232 name_(name),
233 type_(type) {}
234
IssueRequest(void * registered_method,grpc_byte_buffer ** payload,ServerCompletionQueue * notification_cq)235 void ServerInterface::RegisteredAsyncRequest::IssueRequest(
236 void* registered_method, grpc_byte_buffer** payload,
237 ServerCompletionQueue* notification_cq) {
238 // The following call_start_batch is internally-generated so no need for an
239 // explanatory log on failure.
240 GPR_ASSERT(grpc_server_request_registered_call(
241 server_->server(), registered_method, &call_,
242 &context_->deadline_, context_->client_metadata_.arr(),
243 payload, call_cq_->cq(), notification_cq->cq(),
244 this) == GRPC_CALL_OK);
245 }
246
GenericAsyncRequest(ServerInterface * server,GenericServerContext * context,internal::ServerAsyncStreamingInterface * stream,CompletionQueue * call_cq,ServerCompletionQueue * notification_cq,void * tag,bool delete_on_finalize,bool issue_request)247 ServerInterface::GenericAsyncRequest::GenericAsyncRequest(
248 ServerInterface* server, GenericServerContext* context,
249 internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
250 ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize,
251 bool issue_request)
252 : BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag,
253 delete_on_finalize) {
254 grpc_call_details_init(&call_details_);
255 GPR_ASSERT(notification_cq);
256 GPR_ASSERT(call_cq);
257 if (issue_request) {
258 IssueRequest();
259 }
260 }
261
FinalizeResult(void ** tag,bool * status)262 bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag,
263 bool* status) {
264 // If we are done intercepting, there is nothing more for us to do
265 if (done_intercepting_) {
266 return BaseAsyncRequest::FinalizeResult(tag, status);
267 }
268 // TODO(yangg) remove the copy here.
269 if (*status) {
270 static_cast<GenericServerContext*>(context_)->method_ =
271 StringFromCopiedSlice(call_details_.method);
272 static_cast<GenericServerContext*>(context_)->host_ =
273 StringFromCopiedSlice(call_details_.host);
274 context_->deadline_ = call_details_.deadline;
275 }
276 grpc_slice_unref(call_details_.method);
277 grpc_slice_unref(call_details_.host);
278 call_wrapper_ = internal::Call(
279 call_, server_, call_cq_, server_->max_receive_message_size(),
280 context_->set_server_rpc_info(
281 static_cast<GenericServerContext*>(context_)->method_.c_str(),
282 internal::RpcMethod::BIDI_STREAMING,
283 *server_->interceptor_creators()));
284 return BaseAsyncRequest::FinalizeResult(tag, status);
285 }
286
IssueRequest()287 void ServerInterface::GenericAsyncRequest::IssueRequest() {
288 // The following call_start_batch is internally-generated so no need for an
289 // explanatory log on failure.
290 GPR_ASSERT(grpc_server_request_call(server_->server(), &call_, &call_details_,
291 context_->client_metadata_.arr(),
292 call_cq_->cq(), notification_cq_->cq(),
293 this) == GRPC_CALL_OK);
294 }
295
296 namespace {
297 class ShutdownCallback : public grpc_completion_queue_functor {
298 public:
ShutdownCallback()299 ShutdownCallback() {
300 functor_run = &ShutdownCallback::Run;
301 // Set inlineable to true since this callback is trivial and thus does not
302 // need to be run from the executor (triggering a thread hop). This should
303 // only be used by internal callbacks like this and not by user application
304 // code.
305 inlineable = true;
306 }
307 // TakeCQ takes ownership of the cq into the shutdown callback
308 // so that the shutdown callback will be responsible for destroying it
TakeCQ(CompletionQueue * cq)309 void TakeCQ(CompletionQueue* cq) { cq_ = cq; }
310
311 // The Run function will get invoked by the completion queue library
312 // when the shutdown is actually complete
Run(grpc_completion_queue_functor * cb,int)313 static void Run(grpc_completion_queue_functor* cb, int) {
314 auto* callback = static_cast<ShutdownCallback*>(cb);
315 delete callback->cq_;
316 delete callback;
317 }
318
319 private:
320 CompletionQueue* cq_ = nullptr;
321 };
322 } // namespace
323
324 /// Use private inheritance rather than composition only to establish order
325 /// of construction, since the public base class should be constructed after the
326 /// elements belonging to the private base class are constructed. This is not
327 /// possible using true composition.
328 class Server::UnimplementedAsyncRequest final
329 : private grpc::UnimplementedAsyncRequestContext,
330 public GenericAsyncRequest {
331 public:
UnimplementedAsyncRequest(ServerInterface * server,grpc::ServerCompletionQueue * cq)332 UnimplementedAsyncRequest(ServerInterface* server,
333 grpc::ServerCompletionQueue* cq)
334 : GenericAsyncRequest(server, &server_context_, &generic_stream_, cq, cq,
335 /*tag=*/nullptr, /*delete_on_finalize=*/false,
336 /*issue_request=*/false) {
337 // Issue request here instead of the base class to prevent race on vptr.
338 IssueRequest();
339 }
340
341 bool FinalizeResult(void** tag, bool* status) override;
342
context()343 grpc::ServerContext* context() { return &server_context_; }
stream()344 grpc::GenericServerAsyncReaderWriter* stream() { return &generic_stream_; }
345 };
346
347 /// UnimplementedAsyncResponse should not post user-visible completions to the
348 /// C++ completion queue, but is generated as a CQ event by the core
349 class Server::UnimplementedAsyncResponse final
350 : public grpc::internal::CallOpSet<
351 grpc::internal::CallOpSendInitialMetadata,
352 grpc::internal::CallOpServerSendStatus> {
353 public:
354 explicit UnimplementedAsyncResponse(UnimplementedAsyncRequest* request);
~UnimplementedAsyncResponse()355 ~UnimplementedAsyncResponse() override { delete request_; }
356
FinalizeResult(void ** tag,bool * status)357 bool FinalizeResult(void** tag, bool* status) override {
358 if (grpc::internal::CallOpSet<
359 grpc::internal::CallOpSendInitialMetadata,
360 grpc::internal::CallOpServerSendStatus>::FinalizeResult(tag,
361 status)) {
362 delete this;
363 } else {
364 // The tag was swallowed due to interception. We will see it again.
365 }
366 return false;
367 }
368
369 private:
370 UnimplementedAsyncRequest* const request_;
371 };
372
373 class Server::SyncRequest final : public grpc::internal::CompletionQueueTag {
374 public:
SyncRequest(Server * server,grpc::internal::RpcServiceMethod * method,grpc_core::Server::RegisteredCallAllocation * data)375 SyncRequest(Server* server, grpc::internal::RpcServiceMethod* method,
376 grpc_core::Server::RegisteredCallAllocation* data)
377 : SyncRequest(server, method) {
378 CommonSetup(data);
379 data->deadline = &deadline_;
380 data->optional_payload = has_request_payload_ ? &request_payload_ : nullptr;
381 }
382
SyncRequest(Server * server,grpc::internal::RpcServiceMethod * method,grpc_core::Server::BatchCallAllocation * data)383 SyncRequest(Server* server, grpc::internal::RpcServiceMethod* method,
384 grpc_core::Server::BatchCallAllocation* data)
385 : SyncRequest(server, method) {
386 CommonSetup(data);
387 call_details_ = new grpc_call_details;
388 grpc_call_details_init(call_details_);
389 data->details = call_details_;
390 }
391
~SyncRequest()392 ~SyncRequest() override {
393 // The destructor should only cleanup those objects created in the
394 // constructor, since some paths may or may not actually go through the
395 // Run stage where other objects are allocated.
396 if (has_request_payload_ && request_payload_) {
397 grpc_byte_buffer_destroy(request_payload_);
398 }
399 if (call_details_ != nullptr) {
400 grpc_call_details_destroy(call_details_);
401 delete call_details_;
402 }
403 grpc_metadata_array_destroy(&request_metadata_);
404 server_->UnrefWithPossibleNotify();
405 }
406
FinalizeResult(void **,bool * status)407 bool FinalizeResult(void** /*tag*/, bool* status) override {
408 if (!*status) {
409 delete this;
410 return false;
411 }
412 if (call_details_) {
413 deadline_ = call_details_->deadline;
414 }
415 return true;
416 }
417
Run(const std::shared_ptr<GlobalCallbacks> & global_callbacks,bool resources)418 void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks,
419 bool resources) {
420 ctx_.Init(deadline_, &request_metadata_);
421 wrapped_call_.Init(
422 call_, server_, &cq_, server_->max_receive_message_size(),
423 ctx_->ctx.set_server_rpc_info(method_->name(), method_->method_type(),
424 server_->interceptor_creators_));
425 ctx_->ctx.set_call(call_, server_->call_metric_recording_enabled(),
426 server_->server_metric_recorder());
427 ctx_->ctx.cq_ = &cq_;
428 request_metadata_.count = 0;
429
430 global_callbacks_ = global_callbacks;
431 resources_ = resources;
432
433 interceptor_methods_.SetCall(&*wrapped_call_);
434 interceptor_methods_.SetReverse();
435 // Set interception point for RECV INITIAL METADATA
436 interceptor_methods_.AddInterceptionHookPoint(
437 grpc::experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA);
438 interceptor_methods_.SetRecvInitialMetadata(&ctx_->ctx.client_metadata_);
439
440 if (has_request_payload_) {
441 // Set interception point for RECV MESSAGE
442 auto* handler = resources_ ? method_->handler()
443 : server_->resource_exhausted_handler_.get();
444 deserialized_request_ = handler->Deserialize(call_, request_payload_,
445 &request_status_, nullptr);
446 if (!request_status_.ok()) {
447 gpr_log(GPR_DEBUG, "Failed to deserialize message.");
448 }
449 request_payload_ = nullptr;
450 interceptor_methods_.AddInterceptionHookPoint(
451 grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
452 interceptor_methods_.SetRecvMessage(deserialized_request_, nullptr);
453 }
454
455 if (interceptor_methods_.RunInterceptors(
456 [this]() { ContinueRunAfterInterception(); })) {
457 ContinueRunAfterInterception();
458 } else {
459 // There were interceptors to be run, so ContinueRunAfterInterception
460 // will be run when interceptors are done.
461 }
462 }
463
ContinueRunAfterInterception()464 void ContinueRunAfterInterception() {
465 ctx_->ctx.BeginCompletionOp(&*wrapped_call_, nullptr, nullptr);
466 global_callbacks_->PreSynchronousRequest(&ctx_->ctx);
467 auto* handler = resources_ ? method_->handler()
468 : server_->resource_exhausted_handler_.get();
469 handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter(
470 &*wrapped_call_, &ctx_->ctx, deserialized_request_, request_status_,
471 nullptr, nullptr));
472 global_callbacks_->PostSynchronousRequest(&ctx_->ctx);
473
474 cq_.Shutdown();
475
476 grpc::internal::CompletionQueueTag* op_tag = ctx_->ctx.GetCompletionOpTag();
477 cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME));
478
479 // Ensure the cq_ is shutdown
480 grpc::PhonyTag ignored_tag;
481 GPR_ASSERT(cq_.Pluck(&ignored_tag) == false);
482
483 // Cleanup structures allocated during Run/ContinueRunAfterInterception
484 wrapped_call_.Destroy();
485 ctx_.Destroy();
486
487 delete this;
488 }
489
490 // For requests that must be only cleaned up but not actually Run
Cleanup()491 void Cleanup() {
492 cq_.Shutdown();
493 grpc_call_unref(call_);
494 delete this;
495 }
496
497 private:
SyncRequest(Server * server,grpc::internal::RpcServiceMethod * method)498 SyncRequest(Server* server, grpc::internal::RpcServiceMethod* method)
499 : server_(server),
500 method_(method),
501 has_request_payload_(method->method_type() ==
502 grpc::internal::RpcMethod::NORMAL_RPC ||
503 method->method_type() ==
504 grpc::internal::RpcMethod::SERVER_STREAMING),
505 cq_(grpc_completion_queue_create_for_pluck(nullptr)) {}
506
507 template <class CallAllocation>
CommonSetup(CallAllocation * data)508 void CommonSetup(CallAllocation* data) {
509 server_->Ref();
510 grpc_metadata_array_init(&request_metadata_);
511 data->tag = static_cast<void*>(this);
512 data->call = &call_;
513 data->initial_metadata = &request_metadata_;
514 data->cq = cq_.cq();
515 }
516
517 Server* const server_;
518 grpc::internal::RpcServiceMethod* const method_;
519 const bool has_request_payload_;
520 grpc_call* call_;
521 grpc_call_details* call_details_ = nullptr;
522 gpr_timespec deadline_;
523 grpc_metadata_array request_metadata_;
524 grpc_byte_buffer* request_payload_ = nullptr;
525 grpc::CompletionQueue cq_;
526 grpc::Status request_status_;
527 std::shared_ptr<GlobalCallbacks> global_callbacks_;
528 bool resources_;
529 void* deserialized_request_ = nullptr;
530 grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_;
531
532 // ServerContextWrapper allows ManualConstructor while using a private
533 // contructor of ServerContext via this friend class.
534 struct ServerContextWrapper {
535 ServerContext ctx;
536
ServerContextWrappergrpc::Server::SyncRequest::ServerContextWrapper537 ServerContextWrapper(gpr_timespec deadline, grpc_metadata_array* arr)
538 : ctx(deadline, arr) {}
539 };
540
541 grpc_core::ManualConstructor<ServerContextWrapper> ctx_;
542 grpc_core::ManualConstructor<internal::Call> wrapped_call_;
543 };
544
545 template <class ServerContextType>
546 class Server::CallbackRequest final
547 : public grpc::internal::CompletionQueueTag {
548 public:
549 static_assert(
550 std::is_base_of<grpc::CallbackServerContext, ServerContextType>::value,
551 "ServerContextType must be derived from CallbackServerContext");
552
553 // For codegen services, the value of method represents the defined
554 // characteristics of the method being requested. For generic services, method
555 // is nullptr since these services don't have pre-defined methods.
CallbackRequest(Server * server,grpc::internal::RpcServiceMethod * method,grpc::CompletionQueue * cq,grpc_core::Server::RegisteredCallAllocation * data)556 CallbackRequest(Server* server, grpc::internal::RpcServiceMethod* method,
557 grpc::CompletionQueue* cq,
558 grpc_core::Server::RegisteredCallAllocation* data)
559 : server_(server),
560 method_(method),
561 has_request_payload_(method->method_type() ==
562 grpc::internal::RpcMethod::NORMAL_RPC ||
563 method->method_type() ==
564 grpc::internal::RpcMethod::SERVER_STREAMING),
565 cq_(cq),
566 tag_(this),
567 ctx_(server_->context_allocator() != nullptr
568 ? server_->context_allocator()->NewCallbackServerContext()
569 : nullptr) {
570 CommonSetup(server, data);
571 data->deadline = &deadline_;
572 data->optional_payload = has_request_payload_ ? &request_payload_ : nullptr;
573 }
574
575 // For generic services, method is nullptr since these services don't have
576 // pre-defined methods.
CallbackRequest(Server * server,grpc::CompletionQueue * cq,grpc_core::Server::BatchCallAllocation * data)577 CallbackRequest(Server* server, grpc::CompletionQueue* cq,
578 grpc_core::Server::BatchCallAllocation* data)
579 : server_(server),
580 method_(nullptr),
581 has_request_payload_(false),
582 call_details_(new grpc_call_details),
583 cq_(cq),
584 tag_(this),
585 ctx_(server_->context_allocator() != nullptr
586 ? server_->context_allocator()
587 ->NewGenericCallbackServerContext()
588 : nullptr) {
589 CommonSetup(server, data);
590 grpc_call_details_init(call_details_);
591 data->details = call_details_;
592 }
593
~CallbackRequest()594 ~CallbackRequest() override {
595 delete call_details_;
596 grpc_metadata_array_destroy(&request_metadata_);
597 if (has_request_payload_ && request_payload_) {
598 grpc_byte_buffer_destroy(request_payload_);
599 }
600 if (ctx_alloc_by_default_ || server_->context_allocator() == nullptr) {
601 default_ctx_.Destroy();
602 }
603 server_->UnrefWithPossibleNotify();
604 }
605
606 // Needs specialization to account for different processing of metadata
607 // in generic API
608 bool FinalizeResult(void** tag, bool* status) override;
609
610 private:
611 // method_name needs to be specialized between named method and generic
612 const char* method_name() const;
613
614 class CallbackCallTag : public grpc_completion_queue_functor {
615 public:
CallbackCallTag(Server::CallbackRequest<ServerContextType> * req)616 explicit CallbackCallTag(Server::CallbackRequest<ServerContextType>* req)
617 : req_(req) {
618 functor_run = &CallbackCallTag::StaticRun;
619 // Set inlineable to true since this callback is internally-controlled
620 // without taking any locks, and thus does not need to be run from the
621 // executor (which triggers a thread hop). This should only be used by
622 // internal callbacks like this and not by user application code. The work
623 // here is actually non-trivial, but there is no chance of having user
624 // locks conflict with each other so it's ok to run inlined.
625 inlineable = true;
626 }
627
628 // force_run can not be performed on a tag if operations using this tag
629 // have been sent to PerformOpsOnCall. It is intended for error conditions
630 // that are detected before the operations are internally processed.
force_run(bool ok)631 void force_run(bool ok) { Run(ok); }
632
633 private:
634 Server::CallbackRequest<ServerContextType>* req_;
635 grpc::internal::Call* call_;
636
StaticRun(grpc_completion_queue_functor * cb,int ok)637 static void StaticRun(grpc_completion_queue_functor* cb, int ok) {
638 static_cast<CallbackCallTag*>(cb)->Run(static_cast<bool>(ok));
639 }
Run(bool ok)640 void Run(bool ok) {
641 void* ignored = req_;
642 bool new_ok = ok;
643 GPR_ASSERT(!req_->FinalizeResult(&ignored, &new_ok));
644 GPR_ASSERT(ignored == req_);
645
646 if (!ok) {
647 // The call has been shutdown.
648 // Delete its contents to free up the request.
649 delete req_;
650 return;
651 }
652
653 // Bind the call, deadline, and metadata from what we got
654 req_->ctx_->set_call(req_->call_,
655 req_->server_->call_metric_recording_enabled(),
656 req_->server_->server_metric_recorder());
657 req_->ctx_->cq_ = req_->cq_;
658 req_->ctx_->BindDeadlineAndMetadata(req_->deadline_,
659 &req_->request_metadata_);
660 req_->request_metadata_.count = 0;
661
662 // Create a C++ Call to control the underlying core call
663 call_ =
664 new (grpc_call_arena_alloc(req_->call_, sizeof(grpc::internal::Call)))
665 grpc::internal::Call(
666 req_->call_, req_->server_, req_->cq_,
667 req_->server_->max_receive_message_size(),
668 req_->ctx_->set_server_rpc_info(
669 req_->method_name(),
670 (req_->method_ != nullptr)
671 ? req_->method_->method_type()
672 : grpc::internal::RpcMethod::BIDI_STREAMING,
673 req_->server_->interceptor_creators_));
674
675 req_->interceptor_methods_.SetCall(call_);
676 req_->interceptor_methods_.SetReverse();
677 // Set interception point for RECV INITIAL METADATA
678 req_->interceptor_methods_.AddInterceptionHookPoint(
679 grpc::experimental::InterceptionHookPoints::
680 POST_RECV_INITIAL_METADATA);
681 req_->interceptor_methods_.SetRecvInitialMetadata(
682 &req_->ctx_->client_metadata_);
683
684 if (req_->has_request_payload_) {
685 // Set interception point for RECV MESSAGE
686 req_->request_ = req_->method_->handler()->Deserialize(
687 req_->call_, req_->request_payload_, &req_->request_status_,
688 &req_->handler_data_);
689 if (!(req_->request_status_.ok())) {
690 gpr_log(GPR_DEBUG, "Failed to deserialize message.");
691 }
692 req_->request_payload_ = nullptr;
693 req_->interceptor_methods_.AddInterceptionHookPoint(
694 grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
695 req_->interceptor_methods_.SetRecvMessage(req_->request_, nullptr);
696 }
697
698 if (req_->interceptor_methods_.RunInterceptors(
699 [this] { ContinueRunAfterInterception(); })) {
700 ContinueRunAfterInterception();
701 } else {
702 // There were interceptors to be run, so ContinueRunAfterInterception
703 // will be run when interceptors are done.
704 }
705 }
ContinueRunAfterInterception()706 void ContinueRunAfterInterception() {
707 auto* handler = (req_->method_ != nullptr)
708 ? req_->method_->handler()
709 : req_->server_->generic_handler_.get();
710 handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter(
711 call_, req_->ctx_, req_->request_, req_->request_status_,
712 req_->handler_data_, [this] { delete req_; }));
713 }
714 };
715
716 template <class CallAllocation>
CommonSetup(Server * server,CallAllocation * data)717 void CommonSetup(Server* server, CallAllocation* data) {
718 server->Ref();
719 grpc_metadata_array_init(&request_metadata_);
720 data->tag = static_cast<void*>(&tag_);
721 data->call = &call_;
722 data->initial_metadata = &request_metadata_;
723 if (ctx_ == nullptr) {
724 default_ctx_.Init();
725 ctx_ = &*default_ctx_;
726 ctx_alloc_by_default_ = true;
727 }
728 ctx_->set_context_allocator(server->context_allocator());
729 data->cq = cq_->cq();
730 }
731
732 Server* const server_;
733 grpc::internal::RpcServiceMethod* const method_;
734 const bool has_request_payload_;
735 grpc_byte_buffer* request_payload_ = nullptr;
736 void* request_ = nullptr;
737 void* handler_data_ = nullptr;
738 grpc::Status request_status_;
739 grpc_call_details* const call_details_ = nullptr;
740 grpc_call* call_;
741 gpr_timespec deadline_;
742 grpc_metadata_array request_metadata_;
743 grpc::CompletionQueue* const cq_;
744 bool ctx_alloc_by_default_ = false;
745 CallbackCallTag tag_;
746 ServerContextType* ctx_ = nullptr;
747 grpc_core::ManualConstructor<ServerContextType> default_ctx_;
748 grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_;
749 };
750
751 template <>
FinalizeResult(void **,bool *)752 bool Server::CallbackRequest<grpc::CallbackServerContext>::FinalizeResult(
753 void** /*tag*/, bool* /*status*/) {
754 return false;
755 }
756
757 template <>
758 bool Server::CallbackRequest<
FinalizeResult(void **,bool * status)759 grpc::GenericCallbackServerContext>::FinalizeResult(void** /*tag*/,
760 bool* status) {
761 if (*status) {
762 deadline_ = call_details_->deadline;
763 // TODO(yangg) remove the copy here
764 ctx_->method_ = grpc::StringFromCopiedSlice(call_details_->method);
765 ctx_->host_ = grpc::StringFromCopiedSlice(call_details_->host);
766 }
767 grpc_slice_unref(call_details_->method);
768 grpc_slice_unref(call_details_->host);
769 return false;
770 }
771
772 template <>
method_name() const773 const char* Server::CallbackRequest<grpc::CallbackServerContext>::method_name()
774 const {
775 return method_->name();
776 }
777
778 template <>
779 const char* Server::CallbackRequest<
method_name() const780 grpc::GenericCallbackServerContext>::method_name() const {
781 return ctx_->method().c_str();
782 }
783
784 // Implementation of ThreadManager. Each instance of SyncRequestThreadManager
785 // manages a pool of threads that poll for incoming Sync RPCs and call the
786 // appropriate RPC handlers
787 class Server::SyncRequestThreadManager : public grpc::ThreadManager {
788 public:
SyncRequestThreadManager(Server * server,grpc::CompletionQueue * server_cq,std::shared_ptr<GlobalCallbacks> global_callbacks,grpc_resource_quota * rq,int min_pollers,int max_pollers,int cq_timeout_msec)789 SyncRequestThreadManager(Server* server, grpc::CompletionQueue* server_cq,
790 std::shared_ptr<GlobalCallbacks> global_callbacks,
791 grpc_resource_quota* rq, int min_pollers,
792 int max_pollers, int cq_timeout_msec)
793 : ThreadManager("SyncServer", rq, min_pollers, max_pollers),
794 server_(server),
795 server_cq_(server_cq),
796 cq_timeout_msec_(cq_timeout_msec),
797 global_callbacks_(std::move(global_callbacks)) {}
798
PollForWork(void ** tag,bool * ok)799 WorkStatus PollForWork(void** tag, bool* ok) override {
800 *tag = nullptr;
801 // TODO(ctiller): workaround for GPR_TIMESPAN based deadlines not working
802 // right now
803 gpr_timespec deadline =
804 gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
805 gpr_time_from_millis(cq_timeout_msec_, GPR_TIMESPAN));
806
807 switch (server_cq_->AsyncNext(tag, ok, deadline)) {
808 case grpc::CompletionQueue::TIMEOUT:
809 return TIMEOUT;
810 case grpc::CompletionQueue::SHUTDOWN:
811 return SHUTDOWN;
812 case grpc::CompletionQueue::GOT_EVENT:
813 return WORK_FOUND;
814 }
815
816 GPR_UNREACHABLE_CODE(return TIMEOUT);
817 }
818
DoWork(void * tag,bool ok,bool resources)819 void DoWork(void* tag, bool ok, bool resources) override {
820 (void)ok;
821 SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
822
823 // Under the AllocatingRequestMatcher model we will never see an invalid tag
824 // here.
825 GPR_DEBUG_ASSERT(sync_req != nullptr);
826 GPR_DEBUG_ASSERT(ok);
827
828 sync_req->Run(global_callbacks_, resources);
829 }
830
AddSyncMethod(grpc::internal::RpcServiceMethod * method,void * tag)831 void AddSyncMethod(grpc::internal::RpcServiceMethod* method, void* tag) {
832 grpc_core::Server::FromC(server_->server())
833 ->SetRegisteredMethodAllocator(server_cq_->cq(), tag, [this, method] {
834 grpc_core::Server::RegisteredCallAllocation result;
835 new SyncRequest(server_, method, &result);
836 return result;
837 });
838 has_sync_method_ = true;
839 }
840
AddUnknownSyncMethod()841 void AddUnknownSyncMethod() {
842 if (has_sync_method_) {
843 unknown_method_ = std::make_unique<grpc::internal::RpcServiceMethod>(
844 "unknown", grpc::internal::RpcMethod::BIDI_STREAMING,
845 new grpc::internal::UnknownMethodHandler(kUnknownRpcMethod));
846 grpc_core::Server::FromC(server_->server())
847 ->SetBatchMethodAllocator(server_cq_->cq(), [this] {
848 grpc_core::Server::BatchCallAllocation result;
849 new SyncRequest(server_, unknown_method_.get(), &result);
850 return result;
851 });
852 }
853 }
854
Shutdown()855 void Shutdown() override {
856 ThreadManager::Shutdown();
857 server_cq_->Shutdown();
858 }
859
Wait()860 void Wait() override {
861 ThreadManager::Wait();
862 // Drain any pending items from the queue
863 void* tag;
864 bool ok;
865 while (server_cq_->Next(&tag, &ok)) {
866 // This problem can arise if the server CQ gets a request queued to it
867 // before it gets shutdown but then pulls it after shutdown.
868 static_cast<SyncRequest*>(tag)->Cleanup();
869 }
870 }
871
Start()872 void Start() {
873 if (has_sync_method_) {
874 Initialize(); // ThreadManager's Initialize()
875 }
876 }
877
878 private:
879 Server* server_;
880 grpc::CompletionQueue* server_cq_;
881 int cq_timeout_msec_;
882 bool has_sync_method_ = false;
883 std::unique_ptr<grpc::internal::RpcServiceMethod> unknown_method_;
884 std::shared_ptr<Server::GlobalCallbacks> global_callbacks_;
885 };
886
Server(grpc::ChannelArguments * args,std::shared_ptr<std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>> sync_server_cqs,int min_pollers,int max_pollers,int sync_cq_timeout_msec,std::vector<std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>> acceptors,grpc_server_config_fetcher * server_config_fetcher,grpc_resource_quota * server_rq,std::vector<std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>> interceptor_creators,experimental::ServerMetricRecorder * server_metric_recorder)887 Server::Server(
888 grpc::ChannelArguments* args,
889 std::shared_ptr<std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>>
890 sync_server_cqs,
891 int min_pollers, int max_pollers, int sync_cq_timeout_msec,
892 std::vector<std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>>
893 acceptors,
894 grpc_server_config_fetcher* server_config_fetcher,
895 grpc_resource_quota* server_rq,
896 std::vector<
897 std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>>
898 interceptor_creators,
899 experimental::ServerMetricRecorder* server_metric_recorder)
900 : acceptors_(std::move(acceptors)),
901 interceptor_creators_(std::move(interceptor_creators)),
902 max_receive_message_size_(INT_MIN),
903 sync_server_cqs_(std::move(sync_server_cqs)),
904 started_(false),
905 shutdown_(false),
906 shutdown_notified_(false),
907 server_(nullptr),
908 server_initializer_(new ServerInitializer(this)),
909 health_check_service_disabled_(false),
910 server_metric_recorder_(server_metric_recorder) {
911 gpr_once_init(&grpc::g_once_init_callbacks, grpc::InitGlobalCallbacks);
912 global_callbacks_ = grpc::g_callbacks;
913 global_callbacks_->UpdateArguments(args);
914
915 if (sync_server_cqs_ != nullptr) {
916 bool default_rq_created = false;
917 if (server_rq == nullptr) {
918 server_rq = grpc_resource_quota_create("SyncServer-default-rq");
919 grpc_resource_quota_set_max_threads(server_rq,
920 DEFAULT_MAX_SYNC_SERVER_THREADS);
921 default_rq_created = true;
922 }
923
924 for (const auto& it : *sync_server_cqs_) {
925 sync_req_mgrs_.emplace_back(new SyncRequestThreadManager(
926 this, it.get(), global_callbacks_, server_rq, min_pollers,
927 max_pollers, sync_cq_timeout_msec));
928 }
929
930 if (default_rq_created) {
931 grpc_resource_quota_unref(server_rq);
932 }
933 }
934
935 for (auto& acceptor : acceptors_) {
936 acceptor->SetToChannelArgs(args);
937 }
938
939 grpc_channel_args channel_args;
940 args->SetChannelArgs(&channel_args);
941
942 for (size_t i = 0; i < channel_args.num_args; i++) {
943 if (0 == strcmp(channel_args.args[i].key,
944 grpc::kHealthCheckServiceInterfaceArg)) {
945 if (channel_args.args[i].value.pointer.p == nullptr) {
946 health_check_service_disabled_ = true;
947 } else {
948 health_check_service_.reset(
949 static_cast<grpc::HealthCheckServiceInterface*>(
950 channel_args.args[i].value.pointer.p));
951 }
952 }
953 if (0 ==
954 strcmp(channel_args.args[i].key, GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH)) {
955 max_receive_message_size_ = channel_args.args[i].value.integer;
956 }
957 if (0 == strcmp(channel_args.args[i].key,
958 GRPC_ARG_SERVER_CALL_METRIC_RECORDING)) {
959 call_metric_recording_enabled_ = channel_args.args[i].value.integer;
960 }
961 }
962 server_ = grpc_server_create(&channel_args, nullptr);
963 grpc_server_set_config_fetcher(server_, server_config_fetcher);
964 }
965
~Server()966 Server::~Server() {
967 {
968 grpc::internal::ReleasableMutexLock lock(&mu_);
969 if (started_ && !shutdown_) {
970 lock.Release();
971 Shutdown();
972 } else if (!started_) {
973 // Shutdown the completion queues
974 for (const auto& value : sync_req_mgrs_) {
975 value->Shutdown();
976 }
977 CompletionQueue* callback_cq =
978 callback_cq_.load(std::memory_order_relaxed);
979 if (callback_cq != nullptr) {
980 if (grpc_iomgr_run_in_background()) {
981 // gRPC-core provides the backing needed for the preferred CQ type
982 callback_cq->Shutdown();
983 } else {
984 CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq);
985 }
986 callback_cq_.store(nullptr, std::memory_order_release);
987 }
988 }
989 }
990 // Destroy health check service before we destroy the C server so that
991 // it does not call grpc_server_request_registered_call() after the C
992 // server has been destroyed.
993 health_check_service_.reset();
994 grpc_server_destroy(server_);
995 }
996
SetGlobalCallbacks(GlobalCallbacks * callbacks)997 void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
998 GPR_ASSERT(!grpc::g_callbacks);
999 GPR_ASSERT(callbacks);
1000 grpc::g_callbacks.reset(callbacks);
1001 }
1002
c_server()1003 grpc_server* Server::c_server() { return server_; }
1004
InProcessChannel(const grpc::ChannelArguments & args)1005 std::shared_ptr<grpc::Channel> Server::InProcessChannel(
1006 const grpc::ChannelArguments& args) {
1007 grpc_channel_args channel_args = args.c_channel_args();
1008 return grpc::CreateChannelInternal(
1009 "inproc", grpc_inproc_channel_create(server_, &channel_args, nullptr),
1010 std::vector<std::unique_ptr<
1011 grpc::experimental::ClientInterceptorFactoryInterface>>());
1012 }
1013
1014 std::shared_ptr<grpc::Channel>
InProcessChannelWithInterceptors(const grpc::ChannelArguments & args,std::vector<std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> interceptor_creators)1015 Server::experimental_type::InProcessChannelWithInterceptors(
1016 const grpc::ChannelArguments& args,
1017 std::vector<
1018 std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>>
1019 interceptor_creators) {
1020 grpc_channel_args channel_args = args.c_channel_args();
1021 return grpc::CreateChannelInternal(
1022 "inproc",
1023 grpc_inproc_channel_create(server_->server_, &channel_args, nullptr),
1024 std::move(interceptor_creators));
1025 }
1026
PayloadHandlingForMethod(grpc::internal::RpcServiceMethod * method)1027 static grpc_server_register_method_payload_handling PayloadHandlingForMethod(
1028 grpc::internal::RpcServiceMethod* method) {
1029 switch (method->method_type()) {
1030 case grpc::internal::RpcMethod::NORMAL_RPC:
1031 case grpc::internal::RpcMethod::SERVER_STREAMING:
1032 return GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER;
1033 case grpc::internal::RpcMethod::CLIENT_STREAMING:
1034 case grpc::internal::RpcMethod::BIDI_STREAMING:
1035 return GRPC_SRM_PAYLOAD_NONE;
1036 }
1037 GPR_UNREACHABLE_CODE(return GRPC_SRM_PAYLOAD_NONE;);
1038 }
1039
RegisterService(const std::string * addr,grpc::Service * service)1040 bool Server::RegisterService(const std::string* addr, grpc::Service* service) {
1041 bool has_async_methods = service->has_async_methods();
1042 if (has_async_methods) {
1043 GPR_ASSERT(service->server_ == nullptr &&
1044 "Can only register an asynchronous service against one server.");
1045 service->server_ = this;
1046 }
1047
1048 const char* method_name = nullptr;
1049
1050 for (const auto& method : service->methods_) {
1051 if (method == nullptr) { // Handled by generic service if any.
1052 continue;
1053 }
1054
1055 void* method_registration_tag = grpc_server_register_method(
1056 server_, method->name(), addr ? addr->c_str() : nullptr,
1057 PayloadHandlingForMethod(method.get()), 0);
1058 if (method_registration_tag == nullptr) {
1059 gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
1060 method->name());
1061 return false;
1062 }
1063
1064 if (method->handler() == nullptr) { // Async method without handler
1065 method->set_server_tag(method_registration_tag);
1066 } else if (method->api_type() ==
1067 grpc::internal::RpcServiceMethod::ApiType::SYNC) {
1068 for (const auto& value : sync_req_mgrs_) {
1069 value->AddSyncMethod(method.get(), method_registration_tag);
1070 }
1071 } else {
1072 has_callback_methods_ = true;
1073 grpc::internal::RpcServiceMethod* method_value = method.get();
1074 grpc::CompletionQueue* cq = CallbackCQ();
1075 grpc_server_register_completion_queue(server_, cq->cq(), nullptr);
1076 grpc_core::Server::FromC(server_)->SetRegisteredMethodAllocator(
1077 cq->cq(), method_registration_tag, [this, cq, method_value] {
1078 grpc_core::Server::RegisteredCallAllocation result;
1079 new CallbackRequest<grpc::CallbackServerContext>(this, method_value,
1080 cq, &result);
1081 return result;
1082 });
1083 }
1084
1085 method_name = method->name();
1086 }
1087
1088 // Parse service name.
1089 if (method_name != nullptr) {
1090 std::stringstream ss(method_name);
1091 std::string service_name;
1092 if (std::getline(ss, service_name, '/') &&
1093 std::getline(ss, service_name, '/')) {
1094 services_.push_back(service_name);
1095 }
1096 }
1097 return true;
1098 }
1099
RegisterAsyncGenericService(grpc::AsyncGenericService * service)1100 void Server::RegisterAsyncGenericService(grpc::AsyncGenericService* service) {
1101 GPR_ASSERT(service->server_ == nullptr &&
1102 "Can only register an async generic service against one server.");
1103 service->server_ = this;
1104 has_async_generic_service_ = true;
1105 }
1106
RegisterCallbackGenericService(grpc::CallbackGenericService * service)1107 void Server::RegisterCallbackGenericService(
1108 grpc::CallbackGenericService* service) {
1109 GPR_ASSERT(
1110 service->server_ == nullptr &&
1111 "Can only register a callback generic service against one server.");
1112 service->server_ = this;
1113 has_callback_generic_service_ = true;
1114 generic_handler_.reset(service->Handler());
1115
1116 grpc::CompletionQueue* cq = CallbackCQ();
1117 grpc_core::Server::FromC(server_)->SetBatchMethodAllocator(cq->cq(), [this,
1118 cq] {
1119 grpc_core::Server::BatchCallAllocation result;
1120 new CallbackRequest<grpc::GenericCallbackServerContext>(this, cq, &result);
1121 return result;
1122 });
1123 }
1124
AddListeningPort(const std::string & addr,grpc::ServerCredentials * creds)1125 int Server::AddListeningPort(const std::string& addr,
1126 grpc::ServerCredentials* creds) {
1127 GPR_ASSERT(!started_);
1128 int port = creds->AddPortToServer(addr, server_);
1129 global_callbacks_->AddPort(this, addr, creds, port);
1130 return port;
1131 }
1132
Ref()1133 void Server::Ref() {
1134 shutdown_refs_outstanding_.fetch_add(1, std::memory_order_relaxed);
1135 }
1136
UnrefWithPossibleNotify()1137 void Server::UnrefWithPossibleNotify() {
1138 if (GPR_UNLIKELY(shutdown_refs_outstanding_.fetch_sub(
1139 1, std::memory_order_acq_rel) == 1)) {
1140 // No refs outstanding means that shutdown has been initiated and no more
1141 // callback requests are outstanding.
1142 grpc::internal::MutexLock lock(&mu_);
1143 GPR_ASSERT(shutdown_);
1144 shutdown_done_ = true;
1145 shutdown_done_cv_.Signal();
1146 }
1147 }
1148
UnrefAndWaitLocked()1149 void Server::UnrefAndWaitLocked() {
1150 if (GPR_UNLIKELY(shutdown_refs_outstanding_.fetch_sub(
1151 1, std::memory_order_acq_rel) == 1)) {
1152 shutdown_done_ = true;
1153 return; // no need to wait on CV since done condition already set
1154 }
1155 while (!shutdown_done_) {
1156 shutdown_done_cv_.Wait(&mu_);
1157 }
1158 }
1159
Start(grpc::ServerCompletionQueue ** cqs,size_t num_cqs)1160 void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
1161 GPR_ASSERT(!started_);
1162 global_callbacks_->PreServerStart(this);
1163 started_ = true;
1164
1165 // Only create default health check service when user did not provide an
1166 // explicit one.
1167 if (health_check_service_ == nullptr && !health_check_service_disabled_ &&
1168 grpc::DefaultHealthCheckServiceEnabled()) {
1169 auto default_hc_service = std::make_unique<DefaultHealthCheckService>();
1170 auto* hc_service_impl = default_hc_service->GetHealthCheckService();
1171 health_check_service_ = std::move(default_hc_service);
1172 RegisterService(nullptr, hc_service_impl);
1173 }
1174
1175 for (auto& acceptor : acceptors_) {
1176 acceptor->GetCredentials()->AddPortToServer(acceptor->name(), server_);
1177 }
1178
1179 #ifndef NDEBUG
1180 for (size_t i = 0; i < num_cqs; i++) {
1181 cq_list_.push_back(cqs[i]);
1182 }
1183 #endif
1184
1185 // We must have exactly one generic service to handle requests for
1186 // unmatched method names (i.e., to return UNIMPLEMENTED for any RPC
1187 // method for which we don't have a registered implementation). This
1188 // service comes from one of the following places (first match wins):
1189 // - If the application supplied a generic service via either the async
1190 // or callback APIs, we use that.
1191 // - If there are callback methods, register a callback generic service.
1192 // - If there are sync methods, register a sync generic service.
1193 // (This must be done before server start to initialize an
1194 // AllocatingRequestMatcher.)
1195 // - Otherwise (we have only async methods), we wait until the server
1196 // is started and then start an UnimplementedAsyncRequest on each
1197 // async CQ, so that the requests will be moved along by polling
1198 // done in application threads.
1199 bool unknown_rpc_needed =
1200 !has_async_generic_service_ && !has_callback_generic_service_;
1201 if (unknown_rpc_needed && has_callback_methods_) {
1202 unimplemented_service_ = std::make_unique<grpc::CallbackGenericService>();
1203 RegisterCallbackGenericService(unimplemented_service_.get());
1204 unknown_rpc_needed = false;
1205 }
1206 if (unknown_rpc_needed && !sync_req_mgrs_.empty()) {
1207 sync_req_mgrs_[0]->AddUnknownSyncMethod();
1208 unknown_rpc_needed = false;
1209 }
1210
1211 grpc_server_start(server_);
1212
1213 if (unknown_rpc_needed) {
1214 for (size_t i = 0; i < num_cqs; i++) {
1215 if (cqs[i]->IsFrequentlyPolled()) {
1216 new UnimplementedAsyncRequest(this, cqs[i]);
1217 }
1218 }
1219 unknown_rpc_needed = false;
1220 }
1221
1222 // If this server has any support for synchronous methods (has any sync
1223 // server CQs), make sure that we have a ResourceExhausted handler
1224 // to deal with the case of thread exhaustion
1225 if (sync_server_cqs_ != nullptr && !sync_server_cqs_->empty()) {
1226 resource_exhausted_handler_ =
1227 std::make_unique<grpc::internal::ResourceExhaustedHandler>(
1228 kServerThreadpoolExhausted);
1229 }
1230
1231 for (const auto& value : sync_req_mgrs_) {
1232 value->Start();
1233 }
1234
1235 for (auto& acceptor : acceptors_) {
1236 acceptor->Start();
1237 }
1238 }
1239
ShutdownInternal(gpr_timespec deadline)1240 void Server::ShutdownInternal(gpr_timespec deadline) {
1241 grpc::internal::MutexLock lock(&mu_);
1242 if (shutdown_) {
1243 return;
1244 }
1245
1246 shutdown_ = true;
1247
1248 for (auto& acceptor : acceptors_) {
1249 acceptor->Shutdown();
1250 }
1251
1252 /// The completion queue to use for server shutdown completion notification
1253 grpc::CompletionQueue shutdown_cq;
1254 grpc::ShutdownTag shutdown_tag; // Phony shutdown tag
1255 grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag);
1256
1257 shutdown_cq.Shutdown();
1258
1259 void* tag;
1260 bool ok;
1261 grpc::CompletionQueue::NextStatus status =
1262 shutdown_cq.AsyncNext(&tag, &ok, deadline);
1263
1264 // If this timed out, it means we are done with the grace period for a clean
1265 // shutdown. We should force a shutdown now by cancelling all inflight calls
1266 if (status == grpc::CompletionQueue::NextStatus::TIMEOUT) {
1267 grpc_server_cancel_all_calls(server_);
1268 }
1269 // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has
1270 // successfully shutdown
1271
1272 // Drop the shutdown ref and wait for all other refs to drop as well.
1273 UnrefAndWaitLocked();
1274
1275 // Shutdown all ThreadManagers. This will try to gracefully stop all the
1276 // threads in the ThreadManagers (once they process any inflight requests)
1277 for (const auto& value : sync_req_mgrs_) {
1278 value->Shutdown(); // ThreadManager's Shutdown()
1279 }
1280
1281 // Wait for threads in all ThreadManagers to terminate
1282 for (const auto& value : sync_req_mgrs_) {
1283 value->Wait();
1284 }
1285
1286 // Shutdown the callback CQ. The CQ is owned by its own shutdown tag, so it
1287 // will delete itself at true shutdown.
1288 CompletionQueue* callback_cq = callback_cq_.load(std::memory_order_relaxed);
1289 if (callback_cq != nullptr) {
1290 if (grpc_iomgr_run_in_background()) {
1291 // gRPC-core provides the backing needed for the preferred CQ type
1292 callback_cq->Shutdown();
1293 } else {
1294 CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq);
1295 }
1296 callback_cq_.store(nullptr, std::memory_order_release);
1297 }
1298
1299 // Drain the shutdown queue (if the previous call to AsyncNext() timed out
1300 // and we didn't remove the tag from the queue yet)
1301 while (shutdown_cq.Next(&tag, &ok)) {
1302 // Nothing to be done here. Just ignore ok and tag values
1303 }
1304
1305 shutdown_notified_ = true;
1306 shutdown_cv_.SignalAll();
1307
1308 #ifndef NDEBUG
1309 // Unregister this server with the CQs passed into it by the user so that
1310 // those can be checked for properly-ordered shutdown.
1311 for (auto* cq : cq_list_) {
1312 cq->UnregisterServer(this);
1313 }
1314 cq_list_.clear();
1315 #endif
1316 }
1317
Wait()1318 void Server::Wait() {
1319 grpc::internal::MutexLock lock(&mu_);
1320 while (started_ && !shutdown_notified_) {
1321 shutdown_cv_.Wait(&mu_);
1322 }
1323 }
1324
PerformOpsOnCall(grpc::internal::CallOpSetInterface * ops,grpc::internal::Call * call)1325 void Server::PerformOpsOnCall(grpc::internal::CallOpSetInterface* ops,
1326 grpc::internal::Call* call) {
1327 ops->FillOps(call);
1328 }
1329
FinalizeResult(void ** tag,bool * status)1330 bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag,
1331 bool* status) {
1332 if (GenericAsyncRequest::FinalizeResult(tag, status)) {
1333 // We either had no interceptors run or we are done intercepting
1334 if (*status) {
1335 // Create a new request/response pair using the server and CQ values
1336 // stored in this object's base class.
1337 new UnimplementedAsyncRequest(server_, notification_cq_);
1338 new UnimplementedAsyncResponse(this);
1339 } else {
1340 delete this;
1341 }
1342 } else {
1343 // The tag was swallowed due to interception. We will see it again.
1344 }
1345 return false;
1346 }
1347
UnimplementedAsyncResponse(UnimplementedAsyncRequest * request)1348 Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse(
1349 UnimplementedAsyncRequest* request)
1350 : request_(request) {
1351 grpc::Status status(grpc::StatusCode::UNIMPLEMENTED, kUnknownRpcMethod);
1352 grpc::internal::UnknownMethodHandler::FillOps(request_->context(),
1353 kUnknownRpcMethod, this);
1354 request_->stream()->call_.PerformOps(this);
1355 }
1356
initializer()1357 grpc::ServerInitializer* Server::initializer() {
1358 return server_initializer_.get();
1359 }
1360
CallbackCQ()1361 grpc::CompletionQueue* Server::CallbackCQ() {
1362 // TODO(vjpai): Consider using a single global CQ for the default CQ
1363 // if there is no explicit per-server CQ registered
1364 CompletionQueue* callback_cq = callback_cq_.load(std::memory_order_acquire);
1365 if (callback_cq != nullptr) {
1366 return callback_cq;
1367 }
1368 // The callback_cq_ wasn't already set, so grab a lock and set it up exactly
1369 // once for this server.
1370 grpc::internal::MutexLock l(&mu_);
1371 callback_cq = callback_cq_.load(std::memory_order_relaxed);
1372 if (callback_cq != nullptr) {
1373 return callback_cq;
1374 }
1375 if (grpc_iomgr_run_in_background()) {
1376 // gRPC-core provides the backing needed for the preferred CQ type
1377 auto* shutdown_callback = new grpc::ShutdownCallback;
1378 callback_cq = new grpc::CompletionQueue(grpc_completion_queue_attributes{
1379 GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING,
1380 shutdown_callback});
1381
1382 // Transfer ownership of the new cq to its own shutdown callback
1383 shutdown_callback->TakeCQ(callback_cq);
1384 } else {
1385 // Otherwise we need to use the alternative CQ variant
1386 callback_cq = CompletionQueue::CallbackAlternativeCQ();
1387 }
1388
1389 callback_cq_.store(callback_cq, std::memory_order_release);
1390 return callback_cq;
1391 }
1392
1393 } // namespace grpc
1394