1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include <grpc/support/log.h>
22 
23 #include "src/core/lib/gprpp/crash.h"
24 #include "src/core/lib/iomgr/port.h"
25 
26 // This polling engine is only relevant on linux kernels supporting epoll
27 // epoll_create() or epoll_create1()
28 #ifdef GRPC_LINUX_EPOLL
29 #include <assert.h>
30 #include <errno.h>
31 #include <fcntl.h>
32 #include <limits.h>
33 #include <poll.h>
34 #include <pthread.h>
35 #include <string.h>
36 #include <sys/epoll.h>
37 #include <sys/socket.h>
38 #include <unistd.h>
39 
40 #include <string>
41 #include <vector>
42 
43 #include "absl/strings/str_cat.h"
44 #include "absl/strings/str_format.h"
45 #include "absl/strings/str_join.h"
46 
47 #include <grpc/support/alloc.h>
48 #include <grpc/support/cpu.h>
49 
50 #include "src/core/lib/debug/stats.h"
51 #include "src/core/lib/debug/stats_data.h"
52 #include "src/core/lib/gpr/string.h"
53 #include "src/core/lib/gpr/useful.h"
54 #include "src/core/lib/gprpp/manual_constructor.h"
55 #include "src/core/lib/gprpp/strerror.h"
56 #include "src/core/lib/iomgr/block_annotate.h"
57 #include "src/core/lib/iomgr/ev_epoll1_linux.h"
58 #include "src/core/lib/iomgr/ev_posix.h"
59 #include "src/core/lib/iomgr/iomgr_internal.h"
60 #include "src/core/lib/iomgr/lockfree_event.h"
61 #include "src/core/lib/iomgr/wakeup_fd_posix.h"
62 
63 static grpc_wakeup_fd global_wakeup_fd;
64 static bool g_is_shutdown = true;
65 
66 //******************************************************************************
67 // Singleton epoll set related fields
68 //
69 
70 #define MAX_EPOLL_EVENTS 100
71 #define MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION 1
72 
73 // NOTE ON SYNCHRONIZATION:
74 // - Fields in this struct are only modified by the designated poller. Hence
75 //   there is no need for any locks to protect the struct.
76 // - num_events and cursor fields have to be of atomic type to provide memory
77 //   visibility guarantees only. i.e In case of multiple pollers, the designated
78 //   polling thread keeps changing; the thread that wrote these values may be
79 //   different from the thread reading the values
80 //
81 typedef struct epoll_set {
82   int epfd;
83 
84   // The epoll_events after the last call to epoll_wait()
85   struct epoll_event events[MAX_EPOLL_EVENTS];
86 
87   // The number of epoll_events after the last call to epoll_wait()
88   gpr_atm num_events;
89 
90   // Index of the first event in epoll_events that has to be processed. This
91   // field is only valid if num_events > 0
92   gpr_atm cursor;
93 } epoll_set;
94 
95 // The global singleton epoll set
96 static epoll_set g_epoll_set;
97 
epoll_create_and_cloexec()98 static int epoll_create_and_cloexec() {
99 #ifdef GRPC_LINUX_EPOLL_CREATE1
100   int fd = epoll_create1(EPOLL_CLOEXEC);
101   if (fd < 0) {
102     gpr_log(GPR_ERROR, "epoll_create1 unavailable");
103   }
104 #else
105   int fd = epoll_create(MAX_EPOLL_EVENTS);
106   if (fd < 0) {
107     gpr_log(GPR_ERROR, "epoll_create unavailable");
108   } else if (fcntl(fd, F_SETFD, FD_CLOEXEC) != 0) {
109     gpr_log(GPR_ERROR, "fcntl following epoll_create failed");
110     return -1;
111   }
112 #endif
113   return fd;
114 }
115 
116 // Must be called *only* once
epoll_set_init()117 static bool epoll_set_init() {
118   g_epoll_set.epfd = epoll_create_and_cloexec();
119   if (g_epoll_set.epfd < 0) {
120     return false;
121   }
122 
123   gpr_log(GPR_INFO, "grpc epoll fd: %d", g_epoll_set.epfd);
124   gpr_atm_no_barrier_store(&g_epoll_set.num_events, 0);
125   gpr_atm_no_barrier_store(&g_epoll_set.cursor, 0);
126   return true;
127 }
128 
129 // epoll_set_init() MUST be called before calling this.
epoll_set_shutdown()130 static void epoll_set_shutdown() {
131   if (g_epoll_set.epfd >= 0) {
132     close(g_epoll_set.epfd);
133     g_epoll_set.epfd = -1;
134   }
135 }
136 
137 //******************************************************************************
138 // Fd Declarations
139 //
140 
141 // Only used when GRPC_ENABLE_FORK_SUPPORT=1
142 struct grpc_fork_fd_list {
143   grpc_fd* fd;
144   grpc_fd* next;
145   grpc_fd* prev;
146 };
147 
148 struct grpc_fd {
149   int fd;
150 
151   grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure;
152   grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure;
153   grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure;
154 
155   struct grpc_fd* freelist_next;
156 
157   grpc_iomgr_object iomgr_object;
158 
159   // Only used when GRPC_ENABLE_FORK_SUPPORT=1
160   grpc_fork_fd_list* fork_fd_list;
161 
162   bool is_pre_allocated;
163 };
164 
165 static void fd_global_init(void);
166 static void fd_global_shutdown(void);
167 
168 //******************************************************************************
169 // Pollset Declarations
170 //
171 
172 typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state;
173 
kick_state_string(kick_state st)174 static const char* kick_state_string(kick_state st) {
175   switch (st) {
176     case UNKICKED:
177       return "UNKICKED";
178     case KICKED:
179       return "KICKED";
180     case DESIGNATED_POLLER:
181       return "DESIGNATED_POLLER";
182   }
183   GPR_UNREACHABLE_CODE(return "UNKNOWN");
184 }
185 
186 struct grpc_pollset_worker {
187   kick_state state;
188   int kick_state_mutator;  // which line of code last changed kick state
189   bool initialized_cv;
190   grpc_pollset_worker* next;
191   grpc_pollset_worker* prev;
192   gpr_cv cv;
193   grpc_closure_list schedule_on_end_work;
194 };
195 
196 #define SET_KICK_STATE(worker, kick_state)   \
197   do {                                       \
198     (worker)->state = (kick_state);          \
199     (worker)->kick_state_mutator = __LINE__; \
200   } while (false)
201 
202 #define MAX_NEIGHBORHOODS 1024u
203 
204 typedef struct pollset_neighborhood {
205   union {
206     char pad[GPR_CACHELINE_SIZE];
207     struct {
208       gpr_mu mu;
209       grpc_pollset* active_root;
210     };
211   };
212 } pollset_neighborhood;
213 
214 struct grpc_pollset {
215   gpr_mu mu;
216   pollset_neighborhood* neighborhood;
217   bool reassigning_neighborhood;
218   grpc_pollset_worker* root_worker;
219   bool kicked_without_poller;
220 
221   // Set to true if the pollset is observed to have no workers available to
222   // poll
223   bool seen_inactive;
224   bool shutting_down;              // Is the pollset shutting down ?
225   grpc_closure* shutdown_closure;  // Called after shutdown is complete
226 
227   // Number of workers who are *about-to* attach themselves to the pollset
228   // worker list
229   int begin_refs;
230 
231   grpc_pollset* next;
232   grpc_pollset* prev;
233 };
234 
235 //******************************************************************************
236 // Pollset-set Declarations
237 //
238 
239 struct grpc_pollset_set {
240   char unused;
241 };
242 
243 //******************************************************************************
244 // Common helpers
245 //
246 
append_error(grpc_error_handle * composite,grpc_error_handle error,const char * desc)247 static bool append_error(grpc_error_handle* composite, grpc_error_handle error,
248                          const char* desc) {
249   if (error.ok()) return true;
250   if (composite->ok()) {
251     *composite = GRPC_ERROR_CREATE(desc);
252   }
253   *composite = grpc_error_add_child(*composite, error);
254   return false;
255 }
256 
257 //******************************************************************************
258 // Fd Definitions
259 //
260 
261 // We need to keep a freelist not because of any concerns of malloc performance
262 // but instead so that implementations with multiple threads in (for example)
263 // epoll_wait deal with the race between pollset removal and incoming poll
264 // notifications.
265 //
266 // The problem is that the poller ultimately holds a reference to this
267 // object, so it is very difficult to know when is safe to free it, at least
268 // without some expensive synchronization.
269 //
270 // If we keep the object freelisted, in the worst case losing this race just
271 // becomes a spurious read notification on a reused fd.
272 //
273 
274 // The alarm system needs to be able to wakeup 'some poller' sometimes
275 // (specifically when a new alarm needs to be triggered earlier than the next
276 // alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
277 // case occurs.
278 
279 static grpc_fd* fd_freelist = nullptr;
280 static gpr_mu fd_freelist_mu;
281 
282 // Only used when GRPC_ENABLE_FORK_SUPPORT=1
283 static grpc_fd* fork_fd_list_head = nullptr;
284 static gpr_mu fork_fd_list_mu;
285 
fd_global_init(void)286 static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
287 
fd_global_shutdown(void)288 static void fd_global_shutdown(void) {
289   // TODO(guantaol): We don't have a reasonable explanation about this
290   // lock()/unlock() pattern. It can be a valid barrier if there is at most one
291   // pending lock() at this point. Otherwise, there is still a possibility of
292   // use-after-free race. Need to reason about the code and/or clean it up.
293   gpr_mu_lock(&fd_freelist_mu);
294   gpr_mu_unlock(&fd_freelist_mu);
295   while (fd_freelist != nullptr) {
296     grpc_fd* fd = fd_freelist;
297     fd_freelist = fd_freelist->freelist_next;
298     gpr_free(fd);
299   }
300   gpr_mu_destroy(&fd_freelist_mu);
301 }
302 
fork_fd_list_add_grpc_fd(grpc_fd * fd)303 static void fork_fd_list_add_grpc_fd(grpc_fd* fd) {
304   if (grpc_core::Fork::Enabled()) {
305     gpr_mu_lock(&fork_fd_list_mu);
306     fd->fork_fd_list =
307         static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
308     fd->fork_fd_list->next = fork_fd_list_head;
309     fd->fork_fd_list->prev = nullptr;
310     if (fork_fd_list_head != nullptr) {
311       fork_fd_list_head->fork_fd_list->prev = fd;
312     }
313     fork_fd_list_head = fd;
314     gpr_mu_unlock(&fork_fd_list_mu);
315   }
316 }
317 
fork_fd_list_remove_grpc_fd(grpc_fd * fd)318 static void fork_fd_list_remove_grpc_fd(grpc_fd* fd) {
319   if (grpc_core::Fork::Enabled()) {
320     gpr_mu_lock(&fork_fd_list_mu);
321     if (fork_fd_list_head == fd) {
322       fork_fd_list_head = fd->fork_fd_list->next;
323     }
324     if (fd->fork_fd_list->prev != nullptr) {
325       fd->fork_fd_list->prev->fork_fd_list->next = fd->fork_fd_list->next;
326     }
327     if (fd->fork_fd_list->next != nullptr) {
328       fd->fork_fd_list->next->fork_fd_list->prev = fd->fork_fd_list->prev;
329     }
330     gpr_free(fd->fork_fd_list);
331     gpr_mu_unlock(&fork_fd_list_mu);
332   }
333 }
334 
fd_create(int fd,const char * name,bool track_err)335 static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
336   grpc_fd* new_fd = nullptr;
337 
338   gpr_mu_lock(&fd_freelist_mu);
339   if (fd_freelist != nullptr) {
340     new_fd = fd_freelist;
341     fd_freelist = fd_freelist->freelist_next;
342   }
343   gpr_mu_unlock(&fd_freelist_mu);
344 
345   if (new_fd == nullptr) {
346     new_fd = static_cast<grpc_fd*>(gpr_malloc(sizeof(grpc_fd)));
347     new_fd->read_closure.Init();
348     new_fd->write_closure.Init();
349     new_fd->error_closure.Init();
350   }
351   new_fd->fd = fd;
352   new_fd->read_closure->InitEvent();
353   new_fd->write_closure->InitEvent();
354   new_fd->error_closure->InitEvent();
355 
356   new_fd->freelist_next = nullptr;
357   new_fd->is_pre_allocated = false;
358 
359   std::string fd_name = absl::StrCat(name, " fd=", fd);
360   grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name.c_str());
361   fork_fd_list_add_grpc_fd(new_fd);
362 #ifndef NDEBUG
363   if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_fd_refcount)) {
364     gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, new_fd, fd_name.c_str());
365   }
366 #endif
367 
368   struct epoll_event ev;
369   ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLOUT | EPOLLET);
370   // Use the least significant bit of ev.data.ptr to store track_err. We expect
371   // the addresses to be word aligned. We need to store track_err to avoid
372   // synchronization issues when accessing it after receiving an event.
373   // Accessing fd would be a data race there because the fd might have been
374   // returned to the free list at that point.
375   ev.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(new_fd) |
376                                         (track_err ? 1 : 0));
377   if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
378     gpr_log(GPR_ERROR, "epoll_ctl failed: %s",
379             grpc_core::StrError(errno).c_str());
380   }
381 
382   return new_fd;
383 }
384 
fd_wrapped_fd(grpc_fd * fd)385 static int fd_wrapped_fd(grpc_fd* fd) { return fd->fd; }
386 
387 // if 'releasing_fd' is true, it means that we are going to detach the internal
388 // fd from grpc_fd structure (i.e which means we should not be calling
389 // shutdown() syscall on that fd)
fd_shutdown_internal(grpc_fd * fd,grpc_error_handle why,bool releasing_fd)390 static void fd_shutdown_internal(grpc_fd* fd, grpc_error_handle why,
391                                  bool releasing_fd) {
392   if (fd->read_closure->SetShutdown(why)) {
393     if (!releasing_fd) {
394       if (!fd->is_pre_allocated) {
395         shutdown(fd->fd, SHUT_RDWR);
396       }
397     } else {
398       // we need a phony event for earlier linux versions.
399       epoll_event phony_event;
400       if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_DEL, fd->fd, &phony_event) !=
401           0) {
402         gpr_log(GPR_ERROR, "epoll_ctl failed: %s",
403                 grpc_core::StrError(errno).c_str());
404       }
405     }
406     fd->write_closure->SetShutdown(why);
407     fd->error_closure->SetShutdown(why);
408   }
409 }
410 
411 // Might be called multiple times
fd_shutdown(grpc_fd * fd,grpc_error_handle why)412 static void fd_shutdown(grpc_fd* fd, grpc_error_handle why) {
413   fd_shutdown_internal(fd, why, false);
414 }
415 
fd_orphan(grpc_fd * fd,grpc_closure * on_done,int * release_fd,const char * reason)416 static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
417                       const char* reason) {
418   grpc_error_handle error;
419   bool is_release_fd = (release_fd != nullptr);
420 
421   if (!fd->read_closure->IsShutdown()) {
422     fd_shutdown_internal(fd, GRPC_ERROR_CREATE(reason), is_release_fd);
423   }
424 
425   // If release_fd is not NULL, we should be relinquishing control of the file
426   // descriptor fd->fd (but we still own the grpc_fd structure).
427   if (is_release_fd) {
428     *release_fd = fd->fd;
429   } else {
430     if (!fd->is_pre_allocated) {
431       close(fd->fd);
432     }
433   }
434 
435   grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, error);
436 
437   grpc_iomgr_unregister_object(&fd->iomgr_object);
438   fork_fd_list_remove_grpc_fd(fd);
439   fd->read_closure->DestroyEvent();
440   fd->write_closure->DestroyEvent();
441   fd->error_closure->DestroyEvent();
442 
443   gpr_mu_lock(&fd_freelist_mu);
444   fd->freelist_next = fd_freelist;
445   fd_freelist = fd;
446   gpr_mu_unlock(&fd_freelist_mu);
447 }
448 
fd_is_shutdown(grpc_fd * fd)449 static bool fd_is_shutdown(grpc_fd* fd) {
450   return fd->read_closure->IsShutdown();
451 }
452 
fd_notify_on_read(grpc_fd * fd,grpc_closure * closure)453 static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) {
454   fd->read_closure->NotifyOn(closure);
455 }
456 
fd_notify_on_write(grpc_fd * fd,grpc_closure * closure)457 static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
458   fd->write_closure->NotifyOn(closure);
459 }
460 
fd_notify_on_error(grpc_fd * fd,grpc_closure * closure)461 static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
462   fd->error_closure->NotifyOn(closure);
463 }
464 
fd_become_readable(grpc_fd * fd)465 static void fd_become_readable(grpc_fd* fd) { fd->read_closure->SetReady(); }
466 
fd_become_writable(grpc_fd * fd)467 static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
468 
fd_has_errors(grpc_fd * fd)469 static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
470 
fd_set_pre_allocated(grpc_fd * fd)471 static void fd_set_pre_allocated(grpc_fd* fd) { fd->is_pre_allocated = true; }
472 
473 //******************************************************************************
474 // Pollset Definitions
475 //
476 
477 static thread_local grpc_pollset* g_current_thread_pollset;
478 static thread_local grpc_pollset_worker* g_current_thread_worker;
479 
480 // The designated poller
481 static gpr_atm g_active_poller;
482 
483 static pollset_neighborhood* g_neighborhoods;
484 static size_t g_num_neighborhoods;
485 
486 // Return true if first in list
worker_insert(grpc_pollset * pollset,grpc_pollset_worker * worker)487 static bool worker_insert(grpc_pollset* pollset, grpc_pollset_worker* worker) {
488   if (pollset->root_worker == nullptr) {
489     pollset->root_worker = worker;
490     worker->next = worker->prev = worker;
491     return true;
492   } else {
493     worker->next = pollset->root_worker;
494     worker->prev = worker->next->prev;
495     worker->next->prev = worker;
496     worker->prev->next = worker;
497     return false;
498   }
499 }
500 
501 // Return true if last in list
502 typedef enum { EMPTIED, NEW_ROOT, REMOVED } worker_remove_result;
503 
worker_remove(grpc_pollset * pollset,grpc_pollset_worker * worker)504 static worker_remove_result worker_remove(grpc_pollset* pollset,
505                                           grpc_pollset_worker* worker) {
506   if (worker == pollset->root_worker) {
507     if (worker == worker->next) {
508       pollset->root_worker = nullptr;
509       return EMPTIED;
510     } else {
511       pollset->root_worker = worker->next;
512       worker->prev->next = worker->next;
513       worker->next->prev = worker->prev;
514       return NEW_ROOT;
515     }
516   } else {
517     worker->prev->next = worker->next;
518     worker->next->prev = worker->prev;
519     return REMOVED;
520   }
521 }
522 
choose_neighborhood(void)523 static size_t choose_neighborhood(void) {
524   return static_cast<size_t>(gpr_cpu_current_cpu()) % g_num_neighborhoods;
525 }
526 
pollset_global_init(void)527 static grpc_error_handle pollset_global_init(void) {
528   gpr_atm_no_barrier_store(&g_active_poller, 0);
529   global_wakeup_fd.read_fd = -1;
530   grpc_error_handle err = grpc_wakeup_fd_init(&global_wakeup_fd);
531   if (!err.ok()) return err;
532   struct epoll_event ev;
533   ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLET);
534   ev.data.ptr = &global_wakeup_fd;
535   if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd,
536                 &ev) != 0) {
537     return GRPC_OS_ERROR(errno, "epoll_ctl");
538   }
539   g_num_neighborhoods =
540       grpc_core::Clamp(gpr_cpu_num_cores(), 1u, MAX_NEIGHBORHOODS);
541   g_neighborhoods = static_cast<pollset_neighborhood*>(
542       gpr_zalloc(sizeof(*g_neighborhoods) * g_num_neighborhoods));
543   for (size_t i = 0; i < g_num_neighborhoods; i++) {
544     gpr_mu_init(&g_neighborhoods[i].mu);
545   }
546   return absl::OkStatus();
547 }
548 
pollset_global_shutdown(void)549 static void pollset_global_shutdown(void) {
550   if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd);
551   for (size_t i = 0; i < g_num_neighborhoods; i++) {
552     gpr_mu_destroy(&g_neighborhoods[i].mu);
553   }
554   gpr_free(g_neighborhoods);
555 }
556 
pollset_init(grpc_pollset * pollset,gpr_mu ** mu)557 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
558   gpr_mu_init(&pollset->mu);
559   *mu = &pollset->mu;
560   pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
561   pollset->reassigning_neighborhood = false;
562   pollset->root_worker = nullptr;
563   pollset->kicked_without_poller = false;
564   pollset->seen_inactive = true;
565   pollset->shutting_down = false;
566   pollset->shutdown_closure = nullptr;
567   pollset->begin_refs = 0;
568   pollset->next = pollset->prev = nullptr;
569 }
570 
pollset_destroy(grpc_pollset * pollset)571 static void pollset_destroy(grpc_pollset* pollset) {
572   gpr_mu_lock(&pollset->mu);
573   if (!pollset->seen_inactive) {
574     pollset_neighborhood* neighborhood = pollset->neighborhood;
575     gpr_mu_unlock(&pollset->mu);
576   retry_lock_neighborhood:
577     gpr_mu_lock(&neighborhood->mu);
578     gpr_mu_lock(&pollset->mu);
579     if (!pollset->seen_inactive) {
580       if (pollset->neighborhood != neighborhood) {
581         gpr_mu_unlock(&neighborhood->mu);
582         neighborhood = pollset->neighborhood;
583         gpr_mu_unlock(&pollset->mu);
584         goto retry_lock_neighborhood;
585       }
586       pollset->prev->next = pollset->next;
587       pollset->next->prev = pollset->prev;
588       if (pollset == pollset->neighborhood->active_root) {
589         pollset->neighborhood->active_root =
590             pollset->next == pollset ? nullptr : pollset->next;
591       }
592     }
593     gpr_mu_unlock(&pollset->neighborhood->mu);
594   }
595   gpr_mu_unlock(&pollset->mu);
596   gpr_mu_destroy(&pollset->mu);
597 }
598 
pollset_kick_all(grpc_pollset * pollset)599 static grpc_error_handle pollset_kick_all(grpc_pollset* pollset) {
600   grpc_error_handle error;
601   if (pollset->root_worker != nullptr) {
602     grpc_pollset_worker* worker = pollset->root_worker;
603     do {
604       switch (worker->state) {
605         case KICKED:
606           break;
607         case UNKICKED:
608           SET_KICK_STATE(worker, KICKED);
609           if (worker->initialized_cv) {
610             gpr_cv_signal(&worker->cv);
611           }
612           break;
613         case DESIGNATED_POLLER:
614           SET_KICK_STATE(worker, KICKED);
615           append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
616                        "pollset_kick_all");
617           break;
618       }
619 
620       worker = worker->next;
621     } while (worker != pollset->root_worker);
622   }
623   // TODO(sreek): Check if we need to set 'kicked_without_poller' to true here
624   // in the else case
625   return error;
626 }
627 
pollset_maybe_finish_shutdown(grpc_pollset * pollset)628 static void pollset_maybe_finish_shutdown(grpc_pollset* pollset) {
629   if (pollset->shutdown_closure != nullptr && pollset->root_worker == nullptr &&
630       pollset->begin_refs == 0) {
631     grpc_core::ExecCtx::Run(DEBUG_LOCATION, pollset->shutdown_closure,
632                             absl::OkStatus());
633     pollset->shutdown_closure = nullptr;
634   }
635 }
636 
pollset_shutdown(grpc_pollset * pollset,grpc_closure * closure)637 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
638   GPR_ASSERT(pollset->shutdown_closure == nullptr);
639   GPR_ASSERT(!pollset->shutting_down);
640   pollset->shutdown_closure = closure;
641   pollset->shutting_down = true;
642   GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset));
643   pollset_maybe_finish_shutdown(pollset);
644 }
645 
poll_deadline_to_millis_timeout(grpc_core::Timestamp millis)646 static int poll_deadline_to_millis_timeout(grpc_core::Timestamp millis) {
647   if (millis == grpc_core::Timestamp::InfFuture()) return -1;
648   int64_t delta = (millis - grpc_core::Timestamp::Now()).millis();
649   if (delta > INT_MAX) {
650     return INT_MAX;
651   } else if (delta < 0) {
652     return 0;
653   } else {
654     return static_cast<int>(delta);
655   }
656 }
657 
658 // Process the epoll events found by do_epoll_wait() function.
659 // - g_epoll_set.cursor points to the index of the first event to be processed
660 // - This function then processes up-to MAX_EPOLL_EVENTS_PER_ITERATION and
661 //   updates the g_epoll_set.cursor
662 
663 // NOTE ON SYNCRHONIZATION: Similar to do_epoll_wait(), this function is only
664 // called by g_active_poller thread. So there is no need for synchronization
665 // when accessing fields in g_epoll_set
process_epoll_events(grpc_pollset *)666 static grpc_error_handle process_epoll_events(grpc_pollset* /*pollset*/) {
667   static const char* err_desc = "process_events";
668   grpc_error_handle error;
669   long num_events = gpr_atm_acq_load(&g_epoll_set.num_events);
670   long cursor = gpr_atm_acq_load(&g_epoll_set.cursor);
671   for (int idx = 0;
672        (idx < MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION) && cursor != num_events;
673        idx++) {
674     long c = cursor++;
675     struct epoll_event* ev = &g_epoll_set.events[c];
676     void* data_ptr = ev->data.ptr;
677 
678     if (data_ptr == &global_wakeup_fd) {
679       append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
680                    err_desc);
681     } else {
682       grpc_fd* fd = reinterpret_cast<grpc_fd*>(
683           reinterpret_cast<intptr_t>(data_ptr) & ~intptr_t{1});
684       bool track_err = reinterpret_cast<intptr_t>(data_ptr) & intptr_t{1};
685       bool cancel = (ev->events & EPOLLHUP) != 0;
686       bool error = (ev->events & EPOLLERR) != 0;
687       bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
688       bool write_ev = (ev->events & EPOLLOUT) != 0;
689       bool err_fallback = error && !track_err;
690 
691       if (error && !err_fallback) {
692         fd_has_errors(fd);
693       }
694 
695       if (read_ev || cancel || err_fallback) {
696         fd_become_readable(fd);
697       }
698 
699       if (write_ev || cancel || err_fallback) {
700         fd_become_writable(fd);
701       }
702     }
703   }
704   gpr_atm_rel_store(&g_epoll_set.cursor, cursor);
705   return error;
706 }
707 
708 // Do epoll_wait and store the events in g_epoll_set.events field. This does not
709 // "process" any of the events yet; that is done in process_epoll_events().
710 // *See process_epoll_events() function for more details.
711 
712 // NOTE ON SYNCHRONIZATION: At any point of time, only the g_active_poller
713 // (i.e the designated poller thread) will be calling this function. So there is
714 // no need for any synchronization when accesing fields in g_epoll_set
do_epoll_wait(grpc_pollset * ps,grpc_core::Timestamp deadline)715 static grpc_error_handle do_epoll_wait(grpc_pollset* ps,
716                                        grpc_core::Timestamp deadline) {
717   int r;
718   int timeout = poll_deadline_to_millis_timeout(deadline);
719   if (timeout != 0) {
720     GRPC_SCHEDULING_START_BLOCKING_REGION;
721   }
722   do {
723     r = epoll_wait(g_epoll_set.epfd, g_epoll_set.events, MAX_EPOLL_EVENTS,
724                    timeout);
725   } while (r < 0 && errno == EINTR);
726   if (timeout != 0) {
727     GRPC_SCHEDULING_END_BLOCKING_REGION;
728   }
729 
730   if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
731 
732   if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
733     gpr_log(GPR_INFO, "ps: %p poll got %d events", ps, r);
734   }
735 
736   gpr_atm_rel_store(&g_epoll_set.num_events, r);
737   gpr_atm_rel_store(&g_epoll_set.cursor, 0);
738 
739   return absl::OkStatus();
740 }
741 
begin_worker(grpc_pollset * pollset,grpc_pollset_worker * worker,grpc_pollset_worker ** worker_hdl,grpc_core::Timestamp deadline)742 static bool begin_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
743                          grpc_pollset_worker** worker_hdl,
744                          grpc_core::Timestamp deadline) {
745   if (worker_hdl != nullptr) *worker_hdl = worker;
746   worker->initialized_cv = false;
747   SET_KICK_STATE(worker, UNKICKED);
748   worker->schedule_on_end_work = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
749   pollset->begin_refs++;
750 
751   if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
752     gpr_log(GPR_INFO, "PS:%p BEGIN_STARTS:%p", pollset, worker);
753   }
754 
755   if (pollset->seen_inactive) {
756     // pollset has been observed to be inactive, we need to move back to the
757     // active list
758     bool is_reassigning = false;
759     if (!pollset->reassigning_neighborhood) {
760       is_reassigning = true;
761       pollset->reassigning_neighborhood = true;
762       pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
763     }
764     pollset_neighborhood* neighborhood = pollset->neighborhood;
765     gpr_mu_unlock(&pollset->mu);
766   // pollset unlocked: state may change (even worker->kick_state)
767   retry_lock_neighborhood:
768     gpr_mu_lock(&neighborhood->mu);
769     gpr_mu_lock(&pollset->mu);
770     if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
771       gpr_log(GPR_INFO, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d",
772               pollset, worker, kick_state_string(worker->state),
773               is_reassigning);
774     }
775     if (pollset->seen_inactive) {
776       if (neighborhood != pollset->neighborhood) {
777         gpr_mu_unlock(&neighborhood->mu);
778         neighborhood = pollset->neighborhood;
779         gpr_mu_unlock(&pollset->mu);
780         goto retry_lock_neighborhood;
781       }
782 
783       // In the brief time we released the pollset locks above, the worker MAY
784       // have been kicked. In this case, the worker should get out of this
785       // pollset ASAP and hence this should neither add the pollset to
786       // neighborhood nor mark the pollset as active.
787 
788       // On a side note, the only way a worker's kick state could have changed
789       // at this point is if it were "kicked specifically". Since the worker has
790       // not added itself to the pollset yet (by calling worker_insert()), it is
791       // not visible in the "kick any" path yet
792       if (worker->state == UNKICKED) {
793         pollset->seen_inactive = false;
794         if (neighborhood->active_root == nullptr) {
795           neighborhood->active_root = pollset->next = pollset->prev = pollset;
796           // Make this the designated poller if there isn't one already
797           if (worker->state == UNKICKED &&
798               gpr_atm_no_barrier_cas(&g_active_poller, 0,
799                                      reinterpret_cast<gpr_atm>(worker))) {
800             SET_KICK_STATE(worker, DESIGNATED_POLLER);
801           }
802         } else {
803           pollset->next = neighborhood->active_root;
804           pollset->prev = pollset->next->prev;
805           pollset->next->prev = pollset->prev->next = pollset;
806         }
807       }
808     }
809     if (is_reassigning) {
810       GPR_ASSERT(pollset->reassigning_neighborhood);
811       pollset->reassigning_neighborhood = false;
812     }
813     gpr_mu_unlock(&neighborhood->mu);
814   }
815 
816   worker_insert(pollset, worker);
817   pollset->begin_refs--;
818   if (worker->state == UNKICKED && !pollset->kicked_without_poller) {
819     GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
820     worker->initialized_cv = true;
821     gpr_cv_init(&worker->cv);
822     while (worker->state == UNKICKED && !pollset->shutting_down) {
823       if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
824         gpr_log(GPR_INFO, "PS:%p BEGIN_WAIT:%p kick_state=%s shutdown=%d",
825                 pollset, worker, kick_state_string(worker->state),
826                 pollset->shutting_down);
827       }
828 
829       if (gpr_cv_wait(&worker->cv, &pollset->mu,
830                       deadline.as_timespec(GPR_CLOCK_MONOTONIC)) &&
831           worker->state == UNKICKED) {
832         // If gpr_cv_wait returns true (i.e a timeout), pretend that the worker
833         // received a kick
834         SET_KICK_STATE(worker, KICKED);
835       }
836     }
837     grpc_core::ExecCtx::Get()->InvalidateNow();
838   }
839 
840   if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
841     gpr_log(GPR_INFO,
842             "PS:%p BEGIN_DONE:%p kick_state=%s shutdown=%d "
843             "kicked_without_poller: %d",
844             pollset, worker, kick_state_string(worker->state),
845             pollset->shutting_down, pollset->kicked_without_poller);
846   }
847 
848   // We release pollset lock in this function at a couple of places:
849   //   1. Briefly when assigning pollset to a neighborhood
850   //   2. When doing gpr_cv_wait()
851   // It is possible that 'kicked_without_poller' was set to true during (1) and
852   // 'shutting_down' is set to true during (1) or (2). If either of them is
853   // true, this worker cannot do polling
854   // TODO(sreek): Perhaps there is a better way to handle kicked_without_poller
855   // case; especially when the worker is the DESIGNATED_POLLER
856 
857   if (pollset->kicked_without_poller) {
858     pollset->kicked_without_poller = false;
859     return false;
860   }
861 
862   return worker->state == DESIGNATED_POLLER && !pollset->shutting_down;
863 }
864 
check_neighborhood_for_available_poller(pollset_neighborhood * neighborhood)865 static bool check_neighborhood_for_available_poller(
866     pollset_neighborhood* neighborhood) {
867   bool found_worker = false;
868   do {
869     grpc_pollset* inspect = neighborhood->active_root;
870     if (inspect == nullptr) {
871       break;
872     }
873     gpr_mu_lock(&inspect->mu);
874     GPR_ASSERT(!inspect->seen_inactive);
875     grpc_pollset_worker* inspect_worker = inspect->root_worker;
876     if (inspect_worker != nullptr) {
877       do {
878         switch (inspect_worker->state) {
879           case UNKICKED:
880             if (gpr_atm_no_barrier_cas(
881                     &g_active_poller, 0,
882                     reinterpret_cast<gpr_atm>(inspect_worker))) {
883               if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
884                 gpr_log(GPR_INFO, " .. choose next poller to be %p",
885                         inspect_worker);
886               }
887               SET_KICK_STATE(inspect_worker, DESIGNATED_POLLER);
888               if (inspect_worker->initialized_cv) {
889                 gpr_cv_signal(&inspect_worker->cv);
890               }
891             } else {
892               if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
893                 gpr_log(GPR_INFO, " .. beaten to choose next poller");
894               }
895             }
896             // even if we didn't win the cas, there's a worker, we can stop
897             found_worker = true;
898             break;
899           case KICKED:
900             break;
901           case DESIGNATED_POLLER:
902             found_worker = true;  // ok, so someone else found the worker, but
903                                   // we'll accept that
904             break;
905         }
906         inspect_worker = inspect_worker->next;
907       } while (!found_worker && inspect_worker != inspect->root_worker);
908     }
909     if (!found_worker) {
910       if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
911         gpr_log(GPR_INFO, " .. mark pollset %p inactive", inspect);
912       }
913       inspect->seen_inactive = true;
914       if (inspect == neighborhood->active_root) {
915         neighborhood->active_root =
916             inspect->next == inspect ? nullptr : inspect->next;
917       }
918       inspect->next->prev = inspect->prev;
919       inspect->prev->next = inspect->next;
920       inspect->next = inspect->prev = nullptr;
921     }
922     gpr_mu_unlock(&inspect->mu);
923   } while (!found_worker);
924   return found_worker;
925 }
926 
end_worker(grpc_pollset * pollset,grpc_pollset_worker * worker,grpc_pollset_worker ** worker_hdl)927 static void end_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
928                        grpc_pollset_worker** worker_hdl) {
929   if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
930     gpr_log(GPR_INFO, "PS:%p END_WORKER:%p", pollset, worker);
931   }
932   if (worker_hdl != nullptr) *worker_hdl = nullptr;
933   // Make sure we appear kicked
934   SET_KICK_STATE(worker, KICKED);
935   grpc_closure_list_move(&worker->schedule_on_end_work,
936                          grpc_core::ExecCtx::Get()->closure_list());
937   if (gpr_atm_no_barrier_load(&g_active_poller) ==
938       reinterpret_cast<gpr_atm>(worker)) {
939     if (worker->next != worker && worker->next->state == UNKICKED) {
940       if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
941         gpr_log(GPR_INFO, " .. choose next poller to be peer %p", worker);
942       }
943       GPR_ASSERT(worker->next->initialized_cv);
944       gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next);
945       SET_KICK_STATE(worker->next, DESIGNATED_POLLER);
946       gpr_cv_signal(&worker->next->cv);
947       if (grpc_core::ExecCtx::Get()->HasWork()) {
948         gpr_mu_unlock(&pollset->mu);
949         grpc_core::ExecCtx::Get()->Flush();
950         gpr_mu_lock(&pollset->mu);
951       }
952     } else {
953       gpr_atm_no_barrier_store(&g_active_poller, 0);
954       size_t poller_neighborhood_idx =
955           static_cast<size_t>(pollset->neighborhood - g_neighborhoods);
956       gpr_mu_unlock(&pollset->mu);
957       bool found_worker = false;
958       bool scan_state[MAX_NEIGHBORHOODS];
959       for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) {
960         pollset_neighborhood* neighborhood =
961             &g_neighborhoods[(poller_neighborhood_idx + i) %
962                              g_num_neighborhoods];
963         if (gpr_mu_trylock(&neighborhood->mu)) {
964           found_worker = check_neighborhood_for_available_poller(neighborhood);
965           gpr_mu_unlock(&neighborhood->mu);
966           scan_state[i] = true;
967         } else {
968           scan_state[i] = false;
969         }
970       }
971       for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) {
972         if (scan_state[i]) continue;
973         pollset_neighborhood* neighborhood =
974             &g_neighborhoods[(poller_neighborhood_idx + i) %
975                              g_num_neighborhoods];
976         gpr_mu_lock(&neighborhood->mu);
977         found_worker = check_neighborhood_for_available_poller(neighborhood);
978         gpr_mu_unlock(&neighborhood->mu);
979       }
980       grpc_core::ExecCtx::Get()->Flush();
981       gpr_mu_lock(&pollset->mu);
982     }
983   } else if (grpc_core::ExecCtx::Get()->HasWork()) {
984     gpr_mu_unlock(&pollset->mu);
985     grpc_core::ExecCtx::Get()->Flush();
986     gpr_mu_lock(&pollset->mu);
987   }
988   if (worker->initialized_cv) {
989     gpr_cv_destroy(&worker->cv);
990   }
991   if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
992     gpr_log(GPR_INFO, " .. remove worker");
993   }
994   if (EMPTIED == worker_remove(pollset, worker)) {
995     pollset_maybe_finish_shutdown(pollset);
996   }
997   GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
998 }
999 
1000 // pollset->po.mu lock must be held by the caller before calling this.
1001 // The function pollset_work() may temporarily release the lock (pollset->po.mu)
1002 // during the course of its execution but it will always re-acquire the lock and
1003 // ensure that it is held by the time the function returns
pollset_work(grpc_pollset * ps,grpc_pollset_worker ** worker_hdl,grpc_core::Timestamp deadline)1004 static grpc_error_handle pollset_work(grpc_pollset* ps,
1005                                       grpc_pollset_worker** worker_hdl,
1006                                       grpc_core::Timestamp deadline) {
1007   grpc_pollset_worker worker;
1008   grpc_error_handle error;
1009   static const char* err_desc = "pollset_work";
1010   if (ps->kicked_without_poller) {
1011     ps->kicked_without_poller = false;
1012     return absl::OkStatus();
1013   }
1014 
1015   if (begin_worker(ps, &worker, worker_hdl, deadline)) {
1016     g_current_thread_pollset = ps;
1017     g_current_thread_worker = &worker;
1018     GPR_ASSERT(!ps->shutting_down);
1019     GPR_ASSERT(!ps->seen_inactive);
1020 
1021     gpr_mu_unlock(&ps->mu);  // unlock
1022     // This is the designated polling thread at this point and should ideally do
1023     // polling. However, if there are unprocessed events left from a previous
1024     // call to do_epoll_wait(), skip calling epoll_wait() in this iteration and
1025     // process the pending epoll events.
1026 
1027     // The reason for decoupling do_epoll_wait and process_epoll_events is to
1028     // better distribute the work (i.e handling epoll events) across multiple
1029     // threads
1030 
1031     // process_epoll_events() returns very quickly: It just queues the work on
1032     // exec_ctx but does not execute it (the actual exectution or more
1033     // accurately grpc_core::ExecCtx::Get()->Flush() happens in end_worker()
1034     // AFTER selecting a designated poller). So we are not waiting long periods
1035     // without a designated poller
1036     if (gpr_atm_acq_load(&g_epoll_set.cursor) ==
1037         gpr_atm_acq_load(&g_epoll_set.num_events)) {
1038       append_error(&error, do_epoll_wait(ps, deadline), err_desc);
1039     }
1040     append_error(&error, process_epoll_events(ps), err_desc);
1041 
1042     gpr_mu_lock(&ps->mu);  // lock
1043 
1044     g_current_thread_worker = nullptr;
1045   } else {
1046     g_current_thread_pollset = ps;
1047   }
1048   end_worker(ps, &worker, worker_hdl);
1049 
1050   g_current_thread_pollset = nullptr;
1051   return error;
1052 }
1053 
pollset_kick(grpc_pollset * pollset,grpc_pollset_worker * specific_worker)1054 static grpc_error_handle pollset_kick(grpc_pollset* pollset,
1055                                       grpc_pollset_worker* specific_worker) {
1056   grpc_error_handle ret_err;
1057   if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1058     std::vector<std::string> log;
1059     log.push_back(absl::StrFormat(
1060         "PS:%p KICK:%p curps=%p curworker=%p root=%p", pollset, specific_worker,
1061         static_cast<void*>(g_current_thread_pollset),
1062         static_cast<void*>(g_current_thread_worker), pollset->root_worker));
1063     if (pollset->root_worker != nullptr) {
1064       log.push_back(absl::StrFormat(
1065           " {kick_state=%s next=%p {kick_state=%s}}",
1066           kick_state_string(pollset->root_worker->state),
1067           pollset->root_worker->next,
1068           kick_state_string(pollset->root_worker->next->state)));
1069     }
1070     if (specific_worker != nullptr) {
1071       log.push_back(absl::StrFormat(" worker_kick_state=%s",
1072                                     kick_state_string(specific_worker->state)));
1073     }
1074     gpr_log(GPR_DEBUG, "%s", absl::StrJoin(log, "").c_str());
1075   }
1076 
1077   if (specific_worker == nullptr) {
1078     if (g_current_thread_pollset != pollset) {
1079       grpc_pollset_worker* root_worker = pollset->root_worker;
1080       if (root_worker == nullptr) {
1081         pollset->kicked_without_poller = true;
1082         if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1083           gpr_log(GPR_INFO, " .. kicked_without_poller");
1084         }
1085         goto done;
1086       }
1087       grpc_pollset_worker* next_worker = root_worker->next;
1088       if (root_worker->state == KICKED) {
1089         if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1090           gpr_log(GPR_INFO, " .. already kicked %p", root_worker);
1091         }
1092         SET_KICK_STATE(root_worker, KICKED);
1093         goto done;
1094       } else if (next_worker->state == KICKED) {
1095         if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1096           gpr_log(GPR_INFO, " .. already kicked %p", next_worker);
1097         }
1098         SET_KICK_STATE(next_worker, KICKED);
1099         goto done;
1100       } else if (root_worker == next_worker &&  // only try and wake up a poller
1101                                                 // if there is no next worker
1102                  root_worker ==
1103                      reinterpret_cast<grpc_pollset_worker*>(
1104                          gpr_atm_no_barrier_load(&g_active_poller))) {
1105         if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1106           gpr_log(GPR_INFO, " .. kicked %p", root_worker);
1107         }
1108         SET_KICK_STATE(root_worker, KICKED);
1109         ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1110         goto done;
1111       } else if (next_worker->state == UNKICKED) {
1112         if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1113           gpr_log(GPR_INFO, " .. kicked %p", next_worker);
1114         }
1115         GPR_ASSERT(next_worker->initialized_cv);
1116         SET_KICK_STATE(next_worker, KICKED);
1117         gpr_cv_signal(&next_worker->cv);
1118         goto done;
1119       } else if (next_worker->state == DESIGNATED_POLLER) {
1120         if (root_worker->state != DESIGNATED_POLLER) {
1121           if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1122             gpr_log(
1123                 GPR_INFO,
1124                 " .. kicked root non-poller %p (initialized_cv=%d) (poller=%p)",
1125                 root_worker, root_worker->initialized_cv, next_worker);
1126           }
1127           SET_KICK_STATE(root_worker, KICKED);
1128           if (root_worker->initialized_cv) {
1129             gpr_cv_signal(&root_worker->cv);
1130           }
1131           goto done;
1132         } else {
1133           if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1134             gpr_log(GPR_INFO, " .. non-root poller %p (root=%p)", next_worker,
1135                     root_worker);
1136           }
1137           SET_KICK_STATE(next_worker, KICKED);
1138           ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1139           goto done;
1140         }
1141       } else {
1142         GPR_ASSERT(next_worker->state == KICKED);
1143         SET_KICK_STATE(next_worker, KICKED);
1144         goto done;
1145       }
1146     } else {
1147       if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1148         gpr_log(GPR_INFO, " .. kicked while waking up");
1149       }
1150       goto done;
1151     }
1152 
1153     GPR_UNREACHABLE_CODE(goto done);
1154   }
1155 
1156   if (specific_worker->state == KICKED) {
1157     if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1158       gpr_log(GPR_INFO, " .. specific worker already kicked");
1159     }
1160     goto done;
1161   } else if (g_current_thread_worker == specific_worker) {
1162     if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1163       gpr_log(GPR_INFO, " .. mark %p kicked", specific_worker);
1164     }
1165     SET_KICK_STATE(specific_worker, KICKED);
1166     goto done;
1167   } else if (specific_worker ==
1168              reinterpret_cast<grpc_pollset_worker*>(
1169                  gpr_atm_no_barrier_load(&g_active_poller))) {
1170     if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1171       gpr_log(GPR_INFO, " .. kick active poller");
1172     }
1173     SET_KICK_STATE(specific_worker, KICKED);
1174     ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1175     goto done;
1176   } else if (specific_worker->initialized_cv) {
1177     if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1178       gpr_log(GPR_INFO, " .. kick waiting worker");
1179     }
1180     SET_KICK_STATE(specific_worker, KICKED);
1181     gpr_cv_signal(&specific_worker->cv);
1182     goto done;
1183   } else {
1184     if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1185       gpr_log(GPR_INFO, " .. kick non-waiting worker");
1186     }
1187     SET_KICK_STATE(specific_worker, KICKED);
1188     goto done;
1189   }
1190 done:
1191   return ret_err;
1192 }
1193 
pollset_add_fd(grpc_pollset *,grpc_fd *)1194 static void pollset_add_fd(grpc_pollset* /*pollset*/, grpc_fd* /*fd*/) {}
1195 
1196 //******************************************************************************
1197 // Pollset-set Definitions
1198 //
1199 
pollset_set_create(void)1200 static grpc_pollset_set* pollset_set_create(void) {
1201   return reinterpret_cast<grpc_pollset_set*>(static_cast<intptr_t>(0xdeafbeef));
1202 }
1203 
pollset_set_destroy(grpc_pollset_set *)1204 static void pollset_set_destroy(grpc_pollset_set* /*pss*/) {}
1205 
pollset_set_add_fd(grpc_pollset_set *,grpc_fd *)1206 static void pollset_set_add_fd(grpc_pollset_set* /*pss*/, grpc_fd* /*fd*/) {}
1207 
pollset_set_del_fd(grpc_pollset_set *,grpc_fd *)1208 static void pollset_set_del_fd(grpc_pollset_set* /*pss*/, grpc_fd* /*fd*/) {}
1209 
pollset_set_add_pollset(grpc_pollset_set *,grpc_pollset *)1210 static void pollset_set_add_pollset(grpc_pollset_set* /*pss*/,
1211                                     grpc_pollset* /*ps*/) {}
1212 
pollset_set_del_pollset(grpc_pollset_set *,grpc_pollset *)1213 static void pollset_set_del_pollset(grpc_pollset_set* /*pss*/,
1214                                     grpc_pollset* /*ps*/) {}
1215 
pollset_set_add_pollset_set(grpc_pollset_set *,grpc_pollset_set *)1216 static void pollset_set_add_pollset_set(grpc_pollset_set* /*bag*/,
1217                                         grpc_pollset_set* /*item*/) {}
1218 
pollset_set_del_pollset_set(grpc_pollset_set *,grpc_pollset_set *)1219 static void pollset_set_del_pollset_set(grpc_pollset_set* /*bag*/,
1220                                         grpc_pollset_set* /*item*/) {}
1221 
1222 //******************************************************************************
1223 // Event engine binding
1224 //
1225 
is_any_background_poller_thread(void)1226 static bool is_any_background_poller_thread(void) { return false; }
1227 
shutdown_background_closure(void)1228 static void shutdown_background_closure(void) {}
1229 
add_closure_to_background_poller(grpc_closure *,grpc_error_handle)1230 static bool add_closure_to_background_poller(grpc_closure* /*closure*/,
1231                                              grpc_error_handle /*error*/) {
1232   return false;
1233 }
1234 
shutdown_engine(void)1235 static void shutdown_engine(void) {
1236   fd_global_shutdown();
1237   pollset_global_shutdown();
1238   epoll_set_shutdown();
1239   if (grpc_core::Fork::Enabled()) {
1240     gpr_mu_destroy(&fork_fd_list_mu);
1241     grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr);
1242   }
1243   g_is_shutdown = true;
1244 }
1245 
1246 static bool init_epoll1_linux();
1247 
1248 const grpc_event_engine_vtable grpc_ev_epoll1_posix = {
1249     sizeof(grpc_pollset),
1250     true,
1251     false,
1252 
1253     fd_create,
1254     fd_wrapped_fd,
1255     fd_orphan,
1256     fd_shutdown,
1257     fd_notify_on_read,
1258     fd_notify_on_write,
1259     fd_notify_on_error,
1260     fd_become_readable,
1261     fd_become_writable,
1262     fd_has_errors,
1263     fd_is_shutdown,
1264 
1265     pollset_init,
1266     pollset_shutdown,
1267     pollset_destroy,
1268     pollset_work,
1269     pollset_kick,
1270     pollset_add_fd,
1271 
1272     pollset_set_create,
1273     pollset_set_destroy,
1274     pollset_set_add_pollset,
1275     pollset_set_del_pollset,
1276     pollset_set_add_pollset_set,
1277     pollset_set_del_pollset_set,
1278     pollset_set_add_fd,
1279     pollset_set_del_fd,
1280 
1281     is_any_background_poller_thread,
1282     /* name = */ "epoll1",
1283     /* check_engine_available = */
__anon6dd34bdf0502() 1284     [](bool) { return init_epoll1_linux(); },
1285     /* init_engine = */
__anon6dd34bdf0602() 1286     []() { GPR_ASSERT(init_epoll1_linux()); },
1287     shutdown_background_closure,
1288     /* shutdown_engine = */
__anon6dd34bdf0702() 1289     []() { shutdown_engine(); },
1290     add_closure_to_background_poller,
1291 
1292     fd_set_pre_allocated,
1293 };
1294 
1295 // Called by the child process's post-fork handler to close open fds, including
1296 // the global epoll fd. This allows gRPC to shutdown in the child process
1297 // without interfering with connections or RPCs ongoing in the parent.
reset_event_manager_on_fork()1298 static void reset_event_manager_on_fork() {
1299   gpr_mu_lock(&fork_fd_list_mu);
1300   while (fork_fd_list_head != nullptr) {
1301     close(fork_fd_list_head->fd);
1302     fork_fd_list_head->fd = -1;
1303     fork_fd_list_head = fork_fd_list_head->fork_fd_list->next;
1304   }
1305   gpr_mu_unlock(&fork_fd_list_mu);
1306   shutdown_engine();
1307   init_epoll1_linux();
1308 }
1309 
1310 // It is possible that GLIBC has epoll but the underlying kernel doesn't.
1311 // Create epoll_fd (epoll_set_init() takes care of that) to make sure epoll
1312 // support is available
init_epoll1_linux()1313 static bool init_epoll1_linux() {
1314   if (!g_is_shutdown) return true;
1315   if (!grpc_has_wakeup_fd()) {
1316     gpr_log(GPR_ERROR, "Skipping epoll1 because of no wakeup fd.");
1317     return false;
1318   }
1319 
1320   if (!epoll_set_init()) {
1321     return false;
1322   }
1323 
1324   fd_global_init();
1325 
1326   if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1327     fd_global_shutdown();
1328     epoll_set_shutdown();
1329     return false;
1330   }
1331 
1332   if (grpc_core::Fork::Enabled()) {
1333     gpr_mu_init(&fork_fd_list_mu);
1334     grpc_core::Fork::SetResetChildPollingEngineFunc(
1335         reset_event_manager_on_fork);
1336   }
1337   g_is_shutdown = false;
1338   return true;
1339 }
1340 
1341 #else  // defined(GRPC_LINUX_EPOLL)
1342 #if defined(GRPC_POSIX_SOCKET_EV_EPOLL1)
1343 #include "src/core/lib/iomgr/ev_epoll1_linux.h"
1344 const grpc_event_engine_vtable grpc_ev_epoll1_posix = {
1345     1,
1346     true,
1347     false,
1348 
1349     nullptr,
1350     nullptr,
1351     nullptr,
1352     nullptr,
1353     nullptr,
1354     nullptr,
1355     nullptr,
1356     nullptr,
1357     nullptr,
1358     nullptr,
1359     nullptr,
1360 
1361     nullptr,
1362     nullptr,
1363     nullptr,
1364     nullptr,
1365     nullptr,
1366     nullptr,
1367 
1368     nullptr,
1369     nullptr,
1370     nullptr,
1371     nullptr,
1372     nullptr,
1373     nullptr,
1374     nullptr,
1375     nullptr,
1376 
1377     nullptr,
1378     /* name = */ "epoll1",
__anon6dd34bdf0802() 1379     /* check_engine_available = */ [](bool) { return false; },
1380     nullptr,
1381     nullptr,
1382     nullptr,
1383     nullptr,
1384     nullptr,
1385 };
1386 #endif  // defined(GRPC_POSIX_SOCKET_EV_EPOLL1)
1387 #endif  // !defined(GRPC_LINUX_EPOLL)
1388