1 //
2 //
3 // Copyright 2020 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 /// Event engine based on Apple's CFRunLoop API family. If the CFRunLoop engine
20 /// is enabled (see iomgr_posix_cfstream.cc), a global thread is started to
21 /// handle and trigger all the CFStream events. The CFStream streams register
22 /// themselves with the run loop with functions grpc_apple_register_read_stream
23 /// and grpc_apple_register_read_stream. Pollsets are phony and block on a
24 /// condition variable in pollset_work().
25 
26 #include <grpc/support/port_platform.h>
27 
28 #include "src/core/lib/iomgr/port.h"
29 
30 #ifdef GRPC_APPLE_EV
31 
32 #include <CoreFoundation/CoreFoundation.h>
33 
34 #include <list>
35 
36 #include "absl/time/time.h"
37 
38 #include "src/core/lib/gprpp/thd.h"
39 #include "src/core/lib/gprpp/time_util.h"
40 #include "src/core/lib/iomgr/ev_apple.h"
41 
42 grpc_core::DebugOnlyTraceFlag grpc_apple_polling_trace(false, "apple_polling");
43 
44 #ifndef NDEBUG
45 #define GRPC_POLLING_TRACE(format, ...)                    \
46   if (GRPC_TRACE_FLAG_ENABLED(grpc_apple_polling_trace)) { \
47     gpr_log(GPR_DEBUG, "(polling) " format, __VA_ARGS__);  \
48   }
49 #else
50 #define GRPC_POLLING_TRACE(...)
51 #endif  // NDEBUG
52 
53 #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1)
54 
55 struct GlobalRunLoopContext {
56   grpc_core::CondVar init_cv;
57   grpc_core::CondVar input_source_cv;
58 
59   grpc_core::Mutex mu;
60 
61   // Whether an input source registration is pending. Protected by mu.
62   bool input_source_registered = false;
63 
64   // The reference to the global run loop object. Protected by mu.
65   CFRunLoopRef run_loop;
66 
67   // Whether the pollset has been globally shut down. Protected by mu.
68   bool is_shutdown = false;
69 };
70 
71 struct GrpcAppleWorker {
72   // The condition varible to kick the worker. Works with the pollset's lock
73   // (GrpcApplePollset.mu).
74   grpc_core::CondVar cv;
75 
76   // Whether the worker is kicked. Protected by the pollset's lock
77   // (GrpcApplePollset.mu).
78   bool kicked = false;
79 };
80 
81 struct GrpcApplePollset {
82   grpc_core::Mutex mu;
83 
84   // Tracks the current workers in the pollset. Protected by mu.
85   std::list<GrpcAppleWorker*> workers;
86 
87   // Whether the pollset is shut down. Protected by mu.
88   bool is_shutdown = false;
89 
90   // Closure to call when shutdown is done. Protected by mu.
91   grpc_closure* shutdown_closure;
92 
93   // Whether there's an outstanding kick that was not processed. Protected by
94   // mu.
95   bool kicked_without_poller = false;
96 };
97 
98 static GlobalRunLoopContext* gGlobalRunLoopContext = nullptr;
99 static grpc_core::Thread* gGlobalRunLoopThread = nullptr;
100 
101 /// Register the stream with the dispatch queue. Callbacks of the stream will be
102 /// issued to the dispatch queue when a network event happens and will be
103 /// managed by Grand Central Dispatch.
grpc_apple_register_read_stream_queue(CFReadStreamRef read_stream,dispatch_queue_t dispatch_queue)104 static void grpc_apple_register_read_stream_queue(
105     CFReadStreamRef read_stream, dispatch_queue_t dispatch_queue) {
106   CFReadStreamSetDispatchQueue(read_stream, dispatch_queue);
107 }
108 
109 /// Register the stream with the dispatch queue. Callbacks of the stream will be
110 /// issued to the dispatch queue when a network event happens and will be
111 /// managed by Grand Central Dispatch.
grpc_apple_register_write_stream_queue(CFWriteStreamRef write_stream,dispatch_queue_t dispatch_queue)112 static void grpc_apple_register_write_stream_queue(
113     CFWriteStreamRef write_stream, dispatch_queue_t dispatch_queue) {
114   CFWriteStreamSetDispatchQueue(write_stream, dispatch_queue);
115 }
116 
117 /// Register the stream with the global run loop. Callbacks of the stream will
118 /// be issued to the run loop when a network event happens and will be driven by
119 /// the global run loop thread gGlobalRunLoopThread.
grpc_apple_register_read_stream_run_loop(CFReadStreamRef read_stream,dispatch_queue_t)120 static void grpc_apple_register_read_stream_run_loop(
121     CFReadStreamRef read_stream, dispatch_queue_t /*dispatch_queue*/) {
122   GRPC_POLLING_TRACE("Register read stream: %p", read_stream);
123   grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu);
124   CFReadStreamScheduleWithRunLoop(read_stream, gGlobalRunLoopContext->run_loop,
125                                   kCFRunLoopDefaultMode);
126   gGlobalRunLoopContext->input_source_registered = true;
127   gGlobalRunLoopContext->input_source_cv.Signal();
128 }
129 
130 /// Register the stream with the global run loop. Callbacks of the stream will
131 /// be issued to the run loop when a network event happens, and will be driven
132 /// by the global run loop thread gGlobalRunLoopThread.
grpc_apple_register_write_stream_run_loop(CFWriteStreamRef write_stream,dispatch_queue_t)133 static void grpc_apple_register_write_stream_run_loop(
134     CFWriteStreamRef write_stream, dispatch_queue_t /*dispatch_queue*/) {
135   GRPC_POLLING_TRACE("Register write stream: %p", write_stream);
136   grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu);
137   CFWriteStreamScheduleWithRunLoop(
138       write_stream, gGlobalRunLoopContext->run_loop, kCFRunLoopDefaultMode);
139   gGlobalRunLoopContext->input_source_registered = true;
140   gGlobalRunLoopContext->input_source_cv.Signal();
141 }
142 
143 /// The default implementation of stream registration is to register the stream
144 /// to a dispatch queue. However, if the CFRunLoop based pollset is enabled (by
145 /// macro and environment variable, see docs in iomgr_posix_cfstream.cc), the
146 /// CFStream streams are registered with the global run loop instead (see
147 /// pollset_global_init below).
148 static void (*grpc_apple_register_read_stream_impl)(
149     CFReadStreamRef, dispatch_queue_t) = grpc_apple_register_read_stream_queue;
150 static void (*grpc_apple_register_write_stream_impl)(CFWriteStreamRef,
151                                                      dispatch_queue_t) =
152     grpc_apple_register_write_stream_queue;
153 
grpc_apple_register_read_stream(CFReadStreamRef read_stream,dispatch_queue_t dispatch_queue)154 void grpc_apple_register_read_stream(CFReadStreamRef read_stream,
155                                      dispatch_queue_t dispatch_queue) {
156   grpc_apple_register_read_stream_impl(read_stream, dispatch_queue);
157 }
158 
grpc_apple_register_write_stream(CFWriteStreamRef write_stream,dispatch_queue_t dispatch_queue)159 void grpc_apple_register_write_stream(CFWriteStreamRef write_stream,
160                                       dispatch_queue_t dispatch_queue) {
161   grpc_apple_register_write_stream_impl(write_stream, dispatch_queue);
162 }
163 
164 /// Drive the run loop in a global singleton thread until the global run loop is
165 /// shutdown.
GlobalRunLoopFunc(void *)166 static void GlobalRunLoopFunc(void* /*arg*/) {
167   grpc_core::LockableAndReleasableMutexLock lock(&gGlobalRunLoopContext->mu);
168   gGlobalRunLoopContext->run_loop = CFRunLoopGetCurrent();
169   gGlobalRunLoopContext->init_cv.Signal();
170 
171   while (!gGlobalRunLoopContext->is_shutdown) {
172     // CFRunLoopRun() will return immediately if no stream is registered on it.
173     // So we wait on a conditional variable until a stream is registered;
174     // otherwise we'll be running a spinning loop.
175     while (!gGlobalRunLoopContext->input_source_registered) {
176       gGlobalRunLoopContext->input_source_cv.Wait(&gGlobalRunLoopContext->mu);
177     }
178     gGlobalRunLoopContext->input_source_registered = false;
179     lock.Release();
180     CFRunLoopRun();
181     lock.Lock();
182   }
183   lock.Release();
184 }
185 
186 // pollset implementation
187 
pollset_global_init(void)188 static void pollset_global_init(void) {
189   gGlobalRunLoopContext = new GlobalRunLoopContext;
190 
191   grpc_apple_register_read_stream_impl =
192       grpc_apple_register_read_stream_run_loop;
193   grpc_apple_register_write_stream_impl =
194       grpc_apple_register_write_stream_run_loop;
195 
196   grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu);
197   gGlobalRunLoopThread =
198       new grpc_core::Thread("apple_ev", GlobalRunLoopFunc, nullptr);
199   gGlobalRunLoopThread->Start();
200   while (gGlobalRunLoopContext->run_loop == NULL)
201     gGlobalRunLoopContext->init_cv.Wait(&gGlobalRunLoopContext->mu);
202 }
203 
pollset_global_shutdown(void)204 static void pollset_global_shutdown(void) {
205   {
206     grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu);
207     gGlobalRunLoopContext->is_shutdown = true;
208     CFRunLoopStop(gGlobalRunLoopContext->run_loop);
209   }
210   gGlobalRunLoopThread->Join();
211   delete gGlobalRunLoopThread;
212   delete gGlobalRunLoopContext;
213 }
214 
215 /// The caller must acquire the lock GrpcApplePollset.mu before calling this
216 /// function. The lock may be temporarily released when waiting on the condition
217 /// variable but will be re-acquired before the function returns.
218 ///
219 /// The Apple pollset simply waits on a condition variable until it is kicked.
220 /// The network events are handled in the global run loop thread. Processing of
221 /// these events will eventually trigger the kick.
pollset_work(grpc_pollset * pollset,grpc_pollset_worker ** worker,grpc_core::Timestamp deadline)222 static grpc_error_handle pollset_work(grpc_pollset* pollset,
223                                       grpc_pollset_worker** worker,
224                                       grpc_core::Timestamp deadline) {
225   GRPC_POLLING_TRACE("pollset work: %p, worker: %p, deadline: %" PRIu64,
226                      pollset, worker,
227                      deadline.milliseconds_after_process_epoch());
228   GrpcApplePollset* apple_pollset =
229       reinterpret_cast<GrpcApplePollset*>(pollset);
230   GrpcAppleWorker actual_worker;
231   if (worker) {
232     *worker = reinterpret_cast<grpc_pollset_worker*>(&actual_worker);
233   }
234 
235   if (apple_pollset->kicked_without_poller) {
236     // Process the outstanding kick and reset the flag. Do not block.
237     apple_pollset->kicked_without_poller = false;
238   } else {
239     // Block until kicked, timed out, or the pollset shuts down.
240     apple_pollset->workers.push_front(&actual_worker);
241     auto it = apple_pollset->workers.begin();
242 
243     while (!actual_worker.kicked && !apple_pollset->is_shutdown) {
244       if (actual_worker.cv.WaitWithDeadline(
245               &apple_pollset->mu, grpc_core::ToAbslTime(deadline.as_timespec(
246                                       GPR_CLOCK_REALTIME)))) {
247         // timed out
248         break;
249       }
250     }
251 
252     apple_pollset->workers.erase(it);
253 
254     // If the pollset is shut down asynchronously and this is the last pending
255     // worker, the shutdown process is complete at this moment and the shutdown
256     // callback will be called.
257     if (apple_pollset->is_shutdown && apple_pollset->workers.empty()) {
258       grpc_core::ExecCtx::Run(DEBUG_LOCATION, apple_pollset->shutdown_closure,
259                               absl::OkStatus());
260     }
261   }
262 
263   return absl::OkStatus();
264 }
265 
266 /// Kick a specific worker. The caller must acquire the lock GrpcApplePollset.mu
267 /// before calling this function.
kick_worker(GrpcAppleWorker * worker)268 static void kick_worker(GrpcAppleWorker* worker) {
269   worker->kicked = true;
270   worker->cv.Signal();
271 }
272 
273 /// The caller must acquire the lock GrpcApplePollset.mu before calling this
274 /// function. The kick action simply signals the condition variable of the
275 /// worker.
pollset_kick(grpc_pollset * pollset,grpc_pollset_worker * specific_worker)276 static grpc_error_handle pollset_kick(grpc_pollset* pollset,
277                                       grpc_pollset_worker* specific_worker) {
278   GrpcApplePollset* apple_pollset =
279       reinterpret_cast<GrpcApplePollset*>(pollset);
280 
281   GRPC_POLLING_TRACE("pollset kick: %p, worker:%p", pollset, specific_worker);
282 
283   if (specific_worker == nullptr) {
284     if (apple_pollset->workers.empty()) {
285       apple_pollset->kicked_without_poller = true;
286     } else {
287       GrpcAppleWorker* actual_worker = apple_pollset->workers.front();
288       kick_worker(actual_worker);
289     }
290   } else if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
291     for (auto& actual_worker : apple_pollset->workers) {
292       kick_worker(actual_worker);
293     }
294   } else {
295     GrpcAppleWorker* actual_worker =
296         reinterpret_cast<GrpcAppleWorker*>(specific_worker);
297     kick_worker(actual_worker);
298   }
299 
300   return absl::OkStatus();
301 }
302 
pollset_init(grpc_pollset * pollset,gpr_mu ** mu)303 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
304   GRPC_POLLING_TRACE("pollset init: %p", pollset);
305   GrpcApplePollset* apple_pollset = new (pollset) GrpcApplePollset();
306   *mu = grpc_core::GetUnderlyingGprMu(&apple_pollset->mu);
307 }
308 
309 /// The caller must acquire the lock GrpcApplePollset.mu before calling this
310 /// function.
pollset_shutdown(grpc_pollset * pollset,grpc_closure * closure)311 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
312   GRPC_POLLING_TRACE("pollset shutdown: %p", pollset);
313 
314   GrpcApplePollset* apple_pollset =
315       reinterpret_cast<GrpcApplePollset*>(pollset);
316   apple_pollset->is_shutdown = true;
317   (void)pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
318 
319   // If there is any worker blocked, shutdown will be done asynchronously.
320   if (apple_pollset->workers.empty()) {
321     grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, absl::OkStatus());
322   } else {
323     apple_pollset->shutdown_closure = closure;
324   }
325 }
326 
pollset_destroy(grpc_pollset * pollset)327 static void pollset_destroy(grpc_pollset* pollset) {
328   GRPC_POLLING_TRACE("pollset destroy: %p", pollset);
329   GrpcApplePollset* apple_pollset =
330       reinterpret_cast<GrpcApplePollset*>(pollset);
331   apple_pollset->~GrpcApplePollset();
332 }
333 
pollset_size(void)334 size_t pollset_size(void) { return sizeof(GrpcApplePollset); }
335 
336 grpc_pollset_vtable grpc_apple_pollset_vtable = {
337     pollset_global_init, pollset_global_shutdown,
338     pollset_init,        pollset_shutdown,
339     pollset_destroy,     pollset_work,
340     pollset_kick,        pollset_size};
341 
342 // pollset_set implementation
343 
pollset_set_create(void)344 grpc_pollset_set* pollset_set_create(void) { return nullptr; }
pollset_set_destroy(grpc_pollset_set *)345 void pollset_set_destroy(grpc_pollset_set* /*pollset_set*/) {}
pollset_set_add_pollset(grpc_pollset_set *,grpc_pollset *)346 void pollset_set_add_pollset(grpc_pollset_set* /*pollset_set*/,
347                              grpc_pollset* /*pollset*/) {}
pollset_set_del_pollset(grpc_pollset_set *,grpc_pollset *)348 void pollset_set_del_pollset(grpc_pollset_set* /*pollset_set*/,
349                              grpc_pollset* /*pollset*/) {}
pollset_set_add_pollset_set(grpc_pollset_set *,grpc_pollset_set *)350 void pollset_set_add_pollset_set(grpc_pollset_set* /*bag*/,
351                                  grpc_pollset_set* /*item*/) {}
pollset_set_del_pollset_set(grpc_pollset_set *,grpc_pollset_set *)352 void pollset_set_del_pollset_set(grpc_pollset_set* /*bag*/,
353                                  grpc_pollset_set* /*item*/) {}
354 
355 grpc_pollset_set_vtable grpc_apple_pollset_set_vtable = {
356     pollset_set_create,          pollset_set_destroy,
357     pollset_set_add_pollset,     pollset_set_del_pollset,
358     pollset_set_add_pollset_set, pollset_set_del_pollset_set};
359 
360 #endif
361