1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 //
17
18 #include <vector>
19
20 #include "absl/base/thread_annotations.h"
21
22 #include <grpc/grpc.h>
23 #include <grpc/support/cpu.h>
24 #include <grpc/support/log.h>
25 #include <grpc/support/sync.h>
26 #include <grpc/support/time.h>
27 #include <grpcpp/completion_queue.h>
28 #include <grpcpp/impl/completion_queue_tag.h>
29 #include <grpcpp/impl/grpc_library.h>
30
31 #include "src/core/lib/gpr/useful.h"
32 #include "src/core/lib/gprpp/sync.h"
33 #include "src/core/lib/gprpp/thd.h"
34
35 namespace grpc {
36 namespace {
37
38 gpr_once g_once_init_callback_alternative = GPR_ONCE_INIT;
39 grpc_core::Mutex* g_callback_alternative_mu;
40
41 // Implement a ref-counted callback CQ for global use in the alternative
42 // implementation so that its threads are only created once. Do this using
43 // explicit ref-counts and raw pointers rather than a shared-ptr since that
44 // has a non-trivial destructor and thus can't be used for global variables.
45 struct CallbackAlternativeCQ {
46 int refs ABSL_GUARDED_BY(g_callback_alternative_mu) = 0;
47 CompletionQueue* cq ABSL_GUARDED_BY(g_callback_alternative_mu);
48 std::vector<grpc_core::Thread>* nexting_threads
49 ABSL_GUARDED_BY(g_callback_alternative_mu);
50
Refgrpc::__anon440836130111::CallbackAlternativeCQ51 CompletionQueue* Ref() {
52 grpc_core::MutexLock lock(&*g_callback_alternative_mu);
53 refs++;
54 if (refs == 1) {
55 cq = new CompletionQueue;
56 int num_nexting_threads =
57 grpc_core::Clamp(gpr_cpu_num_cores() / 2, 2u, 16u);
58 nexting_threads = new std::vector<grpc_core::Thread>;
59 for (int i = 0; i < num_nexting_threads; i++) {
60 nexting_threads->emplace_back(
61 "nexting_thread",
62 [](void* arg) {
63 grpc_completion_queue* cq =
64 static_cast<CompletionQueue*>(arg)->cq();
65 while (true) {
66 // Use the raw Core next function rather than the C++ Next since
67 // Next incorporates FinalizeResult and we actually want that
68 // called from the callback functor itself.
69 // TODO(vjpai): Migrate below to next without a timeout or idle
70 // phase. That's currently starving out some other polling,
71 // though.
72 auto ev = grpc_completion_queue_next(
73 cq,
74 gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
75 gpr_time_from_millis(1000, GPR_TIMESPAN)),
76 nullptr);
77 if (ev.type == GRPC_QUEUE_SHUTDOWN) {
78 return;
79 }
80 if (ev.type == GRPC_QUEUE_TIMEOUT) {
81 gpr_sleep_until(
82 gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
83 gpr_time_from_millis(100, GPR_TIMESPAN)));
84 continue;
85 }
86 GPR_DEBUG_ASSERT(ev.type == GRPC_OP_COMPLETE);
87 // We can always execute the callback inline rather than
88 // pushing it to another Executor thread because this
89 // thread is definitely running on a background thread, does not
90 // hold any application locks before executing the callback,
91 // and cannot be entered recursively.
92 auto* functor =
93 static_cast<grpc_completion_queue_functor*>(ev.tag);
94 functor->functor_run(functor, ev.success);
95 }
96 },
97 cq);
98 }
99 for (auto& th : *nexting_threads) {
100 th.Start();
101 }
102 }
103 return cq;
104 }
105
Unrefgrpc::__anon440836130111::CallbackAlternativeCQ106 void Unref() {
107 grpc_core::MutexLock lock(g_callback_alternative_mu);
108 refs--;
109 if (refs == 0) {
110 cq->Shutdown();
111 for (auto& th : *nexting_threads) {
112 th.Join();
113 }
114 delete nexting_threads;
115 delete cq;
116 }
117 }
118 };
119
120 CallbackAlternativeCQ g_callback_alternative_cq;
121
122 } // namespace
123
124 // 'CompletionQueue' constructor can safely call GrpcLibraryCodegen(false) here
125 // i.e not have GrpcLibraryCodegen call grpc_init(). This is because, to create
126 // a 'grpc_completion_queue' instance (which is being passed as the input to
127 // this constructor), one must have already called grpc_init().
CompletionQueue(grpc_completion_queue * take)128 CompletionQueue::CompletionQueue(grpc_completion_queue* take)
129 : GrpcLibrary(false), cq_(take) {
130 InitialAvalanching();
131 }
132
Shutdown()133 void CompletionQueue::Shutdown() {
134 #ifndef NDEBUG
135 if (!ServerListEmpty()) {
136 gpr_log(GPR_ERROR,
137 "CompletionQueue shutdown being shutdown before its server.");
138 }
139 #endif
140 CompleteAvalanching();
141 }
142
AsyncNextInternal(void ** tag,bool * ok,gpr_timespec deadline)143 CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
144 void** tag, bool* ok, gpr_timespec deadline) {
145 for (;;) {
146 auto ev = grpc_completion_queue_next(cq_, deadline, nullptr);
147 switch (ev.type) {
148 case GRPC_QUEUE_TIMEOUT:
149 return TIMEOUT;
150 case GRPC_QUEUE_SHUTDOWN:
151 return SHUTDOWN;
152 case GRPC_OP_COMPLETE:
153 auto core_cq_tag =
154 static_cast<grpc::internal::CompletionQueueTag*>(ev.tag);
155 *ok = ev.success != 0;
156 *tag = core_cq_tag;
157 if (core_cq_tag->FinalizeResult(tag, ok)) {
158 return GOT_EVENT;
159 }
160 break;
161 }
162 }
163 }
164
CompletionQueueTLSCache(CompletionQueue * cq)165 CompletionQueue::CompletionQueueTLSCache::CompletionQueueTLSCache(
166 CompletionQueue* cq)
167 : cq_(cq), flushed_(false) {
168 grpc_completion_queue_thread_local_cache_init(cq_->cq_);
169 }
170
~CompletionQueueTLSCache()171 CompletionQueue::CompletionQueueTLSCache::~CompletionQueueTLSCache() {
172 GPR_ASSERT(flushed_);
173 }
174
Flush(void ** tag,bool * ok)175 bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) {
176 int res = 0;
177 void* res_tag;
178 flushed_ = true;
179 if (grpc_completion_queue_thread_local_cache_flush(cq_->cq_, &res_tag,
180 &res)) {
181 auto core_cq_tag =
182 static_cast<grpc::internal::CompletionQueueTag*>(res_tag);
183 *ok = res == 1;
184 if (core_cq_tag->FinalizeResult(tag, ok)) {
185 return true;
186 }
187 }
188 return false;
189 }
190
CallbackAlternativeCQ()191 CompletionQueue* CompletionQueue::CallbackAlternativeCQ() {
192 gpr_once_init(&g_once_init_callback_alternative,
193 [] { g_callback_alternative_mu = new grpc_core::Mutex(); });
194 return g_callback_alternative_cq.Ref();
195 }
196
ReleaseCallbackAlternativeCQ(CompletionQueue * cq)197 void CompletionQueue::ReleaseCallbackAlternativeCQ(CompletionQueue* cq)
198 ABSL_NO_THREAD_SAFETY_ANALYSIS {
199 (void)cq;
200 // This accesses g_callback_alternative_cq without acquiring the mutex
201 // but it's considered safe because it just reads the pointer address.
202 GPR_DEBUG_ASSERT(cq == g_callback_alternative_cq.cq);
203 g_callback_alternative_cq.Unref();
204 }
205
206 } // namespace grpc
207