xref: /aosp_15_r20/external/grpc-grpc/src/cpp/common/completion_queue_cc.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 //
17 
18 #include <vector>
19 
20 #include "absl/base/thread_annotations.h"
21 
22 #include <grpc/grpc.h>
23 #include <grpc/support/cpu.h>
24 #include <grpc/support/log.h>
25 #include <grpc/support/sync.h>
26 #include <grpc/support/time.h>
27 #include <grpcpp/completion_queue.h>
28 #include <grpcpp/impl/completion_queue_tag.h>
29 #include <grpcpp/impl/grpc_library.h>
30 
31 #include "src/core/lib/gpr/useful.h"
32 #include "src/core/lib/gprpp/sync.h"
33 #include "src/core/lib/gprpp/thd.h"
34 
35 namespace grpc {
36 namespace {
37 
38 gpr_once g_once_init_callback_alternative = GPR_ONCE_INIT;
39 grpc_core::Mutex* g_callback_alternative_mu;
40 
41 // Implement a ref-counted callback CQ for global use in the alternative
42 // implementation so that its threads are only created once. Do this using
43 // explicit ref-counts and raw pointers rather than a shared-ptr since that
44 // has a non-trivial destructor and thus can't be used for global variables.
45 struct CallbackAlternativeCQ {
46   int refs ABSL_GUARDED_BY(g_callback_alternative_mu) = 0;
47   CompletionQueue* cq ABSL_GUARDED_BY(g_callback_alternative_mu);
48   std::vector<grpc_core::Thread>* nexting_threads
49       ABSL_GUARDED_BY(g_callback_alternative_mu);
50 
Refgrpc::__anon440836130111::CallbackAlternativeCQ51   CompletionQueue* Ref() {
52     grpc_core::MutexLock lock(&*g_callback_alternative_mu);
53     refs++;
54     if (refs == 1) {
55       cq = new CompletionQueue;
56       int num_nexting_threads =
57           grpc_core::Clamp(gpr_cpu_num_cores() / 2, 2u, 16u);
58       nexting_threads = new std::vector<grpc_core::Thread>;
59       for (int i = 0; i < num_nexting_threads; i++) {
60         nexting_threads->emplace_back(
61             "nexting_thread",
62             [](void* arg) {
63               grpc_completion_queue* cq =
64                   static_cast<CompletionQueue*>(arg)->cq();
65               while (true) {
66                 // Use the raw Core next function rather than the C++ Next since
67                 // Next incorporates FinalizeResult and we actually want that
68                 // called from the callback functor itself.
69                 // TODO(vjpai): Migrate below to next without a timeout or idle
70                 // phase. That's currently starving out some other polling,
71                 // though.
72                 auto ev = grpc_completion_queue_next(
73                     cq,
74                     gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
75                                  gpr_time_from_millis(1000, GPR_TIMESPAN)),
76                     nullptr);
77                 if (ev.type == GRPC_QUEUE_SHUTDOWN) {
78                   return;
79                 }
80                 if (ev.type == GRPC_QUEUE_TIMEOUT) {
81                   gpr_sleep_until(
82                       gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
83                                    gpr_time_from_millis(100, GPR_TIMESPAN)));
84                   continue;
85                 }
86                 GPR_DEBUG_ASSERT(ev.type == GRPC_OP_COMPLETE);
87                 // We can always execute the callback inline rather than
88                 // pushing it to another Executor thread because this
89                 // thread is definitely running on a background thread, does not
90                 // hold any application locks before executing the callback,
91                 // and cannot be entered recursively.
92                 auto* functor =
93                     static_cast<grpc_completion_queue_functor*>(ev.tag);
94                 functor->functor_run(functor, ev.success);
95               }
96             },
97             cq);
98       }
99       for (auto& th : *nexting_threads) {
100         th.Start();
101       }
102     }
103     return cq;
104   }
105 
Unrefgrpc::__anon440836130111::CallbackAlternativeCQ106   void Unref() {
107     grpc_core::MutexLock lock(g_callback_alternative_mu);
108     refs--;
109     if (refs == 0) {
110       cq->Shutdown();
111       for (auto& th : *nexting_threads) {
112         th.Join();
113       }
114       delete nexting_threads;
115       delete cq;
116     }
117   }
118 };
119 
120 CallbackAlternativeCQ g_callback_alternative_cq;
121 
122 }  // namespace
123 
124 // 'CompletionQueue' constructor can safely call GrpcLibraryCodegen(false) here
125 // i.e not have GrpcLibraryCodegen call grpc_init(). This is because, to create
126 // a 'grpc_completion_queue' instance (which is being passed as the input to
127 // this constructor), one must have already called grpc_init().
CompletionQueue(grpc_completion_queue * take)128 CompletionQueue::CompletionQueue(grpc_completion_queue* take)
129     : GrpcLibrary(false), cq_(take) {
130   InitialAvalanching();
131 }
132 
Shutdown()133 void CompletionQueue::Shutdown() {
134 #ifndef NDEBUG
135   if (!ServerListEmpty()) {
136     gpr_log(GPR_ERROR,
137             "CompletionQueue shutdown being shutdown before its server.");
138   }
139 #endif
140   CompleteAvalanching();
141 }
142 
AsyncNextInternal(void ** tag,bool * ok,gpr_timespec deadline)143 CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
144     void** tag, bool* ok, gpr_timespec deadline) {
145   for (;;) {
146     auto ev = grpc_completion_queue_next(cq_, deadline, nullptr);
147     switch (ev.type) {
148       case GRPC_QUEUE_TIMEOUT:
149         return TIMEOUT;
150       case GRPC_QUEUE_SHUTDOWN:
151         return SHUTDOWN;
152       case GRPC_OP_COMPLETE:
153         auto core_cq_tag =
154             static_cast<grpc::internal::CompletionQueueTag*>(ev.tag);
155         *ok = ev.success != 0;
156         *tag = core_cq_tag;
157         if (core_cq_tag->FinalizeResult(tag, ok)) {
158           return GOT_EVENT;
159         }
160         break;
161     }
162   }
163 }
164 
CompletionQueueTLSCache(CompletionQueue * cq)165 CompletionQueue::CompletionQueueTLSCache::CompletionQueueTLSCache(
166     CompletionQueue* cq)
167     : cq_(cq), flushed_(false) {
168   grpc_completion_queue_thread_local_cache_init(cq_->cq_);
169 }
170 
~CompletionQueueTLSCache()171 CompletionQueue::CompletionQueueTLSCache::~CompletionQueueTLSCache() {
172   GPR_ASSERT(flushed_);
173 }
174 
Flush(void ** tag,bool * ok)175 bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) {
176   int res = 0;
177   void* res_tag;
178   flushed_ = true;
179   if (grpc_completion_queue_thread_local_cache_flush(cq_->cq_, &res_tag,
180                                                      &res)) {
181     auto core_cq_tag =
182         static_cast<grpc::internal::CompletionQueueTag*>(res_tag);
183     *ok = res == 1;
184     if (core_cq_tag->FinalizeResult(tag, ok)) {
185       return true;
186     }
187   }
188   return false;
189 }
190 
CallbackAlternativeCQ()191 CompletionQueue* CompletionQueue::CallbackAlternativeCQ() {
192   gpr_once_init(&g_once_init_callback_alternative,
193                 [] { g_callback_alternative_mu = new grpc_core::Mutex(); });
194   return g_callback_alternative_cq.Ref();
195 }
196 
ReleaseCallbackAlternativeCQ(CompletionQueue * cq)197 void CompletionQueue::ReleaseCallbackAlternativeCQ(CompletionQueue* cq)
198     ABSL_NO_THREAD_SAFETY_ANALYSIS {
199   (void)cq;
200   // This accesses g_callback_alternative_cq without acquiring the mutex
201   // but it's considered safe because it just reads the pointer address.
202   GPR_DEBUG_ASSERT(cq == g_callback_alternative_cq.cq);
203   g_callback_alternative_cq.Unref();
204 }
205 
206 }  // namespace grpc
207