1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include <grpc/support/log.h>
22
23 #include "src/core/lib/gprpp/crash.h"
24 #include "src/core/lib/iomgr/port.h"
25
26 // This polling engine is only relevant on linux kernels supporting epoll
27 // epoll_create() or epoll_create1()
28 #ifdef GRPC_LINUX_EPOLL
29 #include <assert.h>
30 #include <errno.h>
31 #include <fcntl.h>
32 #include <limits.h>
33 #include <poll.h>
34 #include <pthread.h>
35 #include <string.h>
36 #include <sys/epoll.h>
37 #include <sys/socket.h>
38 #include <unistd.h>
39
40 #include <string>
41 #include <vector>
42
43 #include "absl/strings/str_cat.h"
44 #include "absl/strings/str_format.h"
45 #include "absl/strings/str_join.h"
46
47 #include <grpc/support/alloc.h>
48 #include <grpc/support/cpu.h>
49
50 #include "src/core/lib/debug/stats.h"
51 #include "src/core/lib/debug/stats_data.h"
52 #include "src/core/lib/gpr/string.h"
53 #include "src/core/lib/gpr/useful.h"
54 #include "src/core/lib/gprpp/manual_constructor.h"
55 #include "src/core/lib/gprpp/strerror.h"
56 #include "src/core/lib/iomgr/block_annotate.h"
57 #include "src/core/lib/iomgr/ev_epoll1_linux.h"
58 #include "src/core/lib/iomgr/ev_posix.h"
59 #include "src/core/lib/iomgr/iomgr_internal.h"
60 #include "src/core/lib/iomgr/lockfree_event.h"
61 #include "src/core/lib/iomgr/wakeup_fd_posix.h"
62
63 static grpc_wakeup_fd global_wakeup_fd;
64 static bool g_is_shutdown = true;
65
66 //******************************************************************************
67 // Singleton epoll set related fields
68 //
69
70 #define MAX_EPOLL_EVENTS 100
71 #define MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION 1
72
73 // NOTE ON SYNCHRONIZATION:
74 // - Fields in this struct are only modified by the designated poller. Hence
75 // there is no need for any locks to protect the struct.
76 // - num_events and cursor fields have to be of atomic type to provide memory
77 // visibility guarantees only. i.e In case of multiple pollers, the designated
78 // polling thread keeps changing; the thread that wrote these values may be
79 // different from the thread reading the values
80 //
81 typedef struct epoll_set {
82 int epfd;
83
84 // The epoll_events after the last call to epoll_wait()
85 struct epoll_event events[MAX_EPOLL_EVENTS];
86
87 // The number of epoll_events after the last call to epoll_wait()
88 gpr_atm num_events;
89
90 // Index of the first event in epoll_events that has to be processed. This
91 // field is only valid if num_events > 0
92 gpr_atm cursor;
93 } epoll_set;
94
95 // The global singleton epoll set
96 static epoll_set g_epoll_set;
97
epoll_create_and_cloexec()98 static int epoll_create_and_cloexec() {
99 #ifdef GRPC_LINUX_EPOLL_CREATE1
100 int fd = epoll_create1(EPOLL_CLOEXEC);
101 if (fd < 0) {
102 gpr_log(GPR_ERROR, "epoll_create1 unavailable");
103 }
104 #else
105 int fd = epoll_create(MAX_EPOLL_EVENTS);
106 if (fd < 0) {
107 gpr_log(GPR_ERROR, "epoll_create unavailable");
108 } else if (fcntl(fd, F_SETFD, FD_CLOEXEC) != 0) {
109 gpr_log(GPR_ERROR, "fcntl following epoll_create failed");
110 return -1;
111 }
112 #endif
113 return fd;
114 }
115
116 // Must be called *only* once
epoll_set_init()117 static bool epoll_set_init() {
118 g_epoll_set.epfd = epoll_create_and_cloexec();
119 if (g_epoll_set.epfd < 0) {
120 return false;
121 }
122
123 gpr_log(GPR_INFO, "grpc epoll fd: %d", g_epoll_set.epfd);
124 gpr_atm_no_barrier_store(&g_epoll_set.num_events, 0);
125 gpr_atm_no_barrier_store(&g_epoll_set.cursor, 0);
126 return true;
127 }
128
129 // epoll_set_init() MUST be called before calling this.
epoll_set_shutdown()130 static void epoll_set_shutdown() {
131 if (g_epoll_set.epfd >= 0) {
132 close(g_epoll_set.epfd);
133 g_epoll_set.epfd = -1;
134 }
135 }
136
137 //******************************************************************************
138 // Fd Declarations
139 //
140
141 // Only used when GRPC_ENABLE_FORK_SUPPORT=1
142 struct grpc_fork_fd_list {
143 grpc_fd* fd;
144 grpc_fd* next;
145 grpc_fd* prev;
146 };
147
148 struct grpc_fd {
149 int fd;
150
151 grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure;
152 grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure;
153 grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure;
154
155 struct grpc_fd* freelist_next;
156
157 grpc_iomgr_object iomgr_object;
158
159 // Only used when GRPC_ENABLE_FORK_SUPPORT=1
160 grpc_fork_fd_list* fork_fd_list;
161
162 bool is_pre_allocated;
163 };
164
165 static void fd_global_init(void);
166 static void fd_global_shutdown(void);
167
168 //******************************************************************************
169 // Pollset Declarations
170 //
171
172 typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state;
173
kick_state_string(kick_state st)174 static const char* kick_state_string(kick_state st) {
175 switch (st) {
176 case UNKICKED:
177 return "UNKICKED";
178 case KICKED:
179 return "KICKED";
180 case DESIGNATED_POLLER:
181 return "DESIGNATED_POLLER";
182 }
183 GPR_UNREACHABLE_CODE(return "UNKNOWN");
184 }
185
186 struct grpc_pollset_worker {
187 kick_state state;
188 int kick_state_mutator; // which line of code last changed kick state
189 bool initialized_cv;
190 grpc_pollset_worker* next;
191 grpc_pollset_worker* prev;
192 gpr_cv cv;
193 grpc_closure_list schedule_on_end_work;
194 };
195
196 #define SET_KICK_STATE(worker, kick_state) \
197 do { \
198 (worker)->state = (kick_state); \
199 (worker)->kick_state_mutator = __LINE__; \
200 } while (false)
201
202 #define MAX_NEIGHBORHOODS 1024u
203
204 typedef struct pollset_neighborhood {
205 union {
206 char pad[GPR_CACHELINE_SIZE];
207 struct {
208 gpr_mu mu;
209 grpc_pollset* active_root;
210 };
211 };
212 } pollset_neighborhood;
213
214 struct grpc_pollset {
215 gpr_mu mu;
216 pollset_neighborhood* neighborhood;
217 bool reassigning_neighborhood;
218 grpc_pollset_worker* root_worker;
219 bool kicked_without_poller;
220
221 // Set to true if the pollset is observed to have no workers available to
222 // poll
223 bool seen_inactive;
224 bool shutting_down; // Is the pollset shutting down ?
225 grpc_closure* shutdown_closure; // Called after shutdown is complete
226
227 // Number of workers who are *about-to* attach themselves to the pollset
228 // worker list
229 int begin_refs;
230
231 grpc_pollset* next;
232 grpc_pollset* prev;
233 };
234
235 //******************************************************************************
236 // Pollset-set Declarations
237 //
238
239 struct grpc_pollset_set {
240 char unused;
241 };
242
243 //******************************************************************************
244 // Common helpers
245 //
246
append_error(grpc_error_handle * composite,grpc_error_handle error,const char * desc)247 static bool append_error(grpc_error_handle* composite, grpc_error_handle error,
248 const char* desc) {
249 if (error.ok()) return true;
250 if (composite->ok()) {
251 *composite = GRPC_ERROR_CREATE(desc);
252 }
253 *composite = grpc_error_add_child(*composite, error);
254 return false;
255 }
256
257 //******************************************************************************
258 // Fd Definitions
259 //
260
261 // We need to keep a freelist not because of any concerns of malloc performance
262 // but instead so that implementations with multiple threads in (for example)
263 // epoll_wait deal with the race between pollset removal and incoming poll
264 // notifications.
265 //
266 // The problem is that the poller ultimately holds a reference to this
267 // object, so it is very difficult to know when is safe to free it, at least
268 // without some expensive synchronization.
269 //
270 // If we keep the object freelisted, in the worst case losing this race just
271 // becomes a spurious read notification on a reused fd.
272 //
273
274 // The alarm system needs to be able to wakeup 'some poller' sometimes
275 // (specifically when a new alarm needs to be triggered earlier than the next
276 // alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
277 // case occurs.
278
279 static grpc_fd* fd_freelist = nullptr;
280 static gpr_mu fd_freelist_mu;
281
282 // Only used when GRPC_ENABLE_FORK_SUPPORT=1
283 static grpc_fd* fork_fd_list_head = nullptr;
284 static gpr_mu fork_fd_list_mu;
285
fd_global_init(void)286 static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
287
fd_global_shutdown(void)288 static void fd_global_shutdown(void) {
289 // TODO(guantaol): We don't have a reasonable explanation about this
290 // lock()/unlock() pattern. It can be a valid barrier if there is at most one
291 // pending lock() at this point. Otherwise, there is still a possibility of
292 // use-after-free race. Need to reason about the code and/or clean it up.
293 gpr_mu_lock(&fd_freelist_mu);
294 gpr_mu_unlock(&fd_freelist_mu);
295 while (fd_freelist != nullptr) {
296 grpc_fd* fd = fd_freelist;
297 fd_freelist = fd_freelist->freelist_next;
298 gpr_free(fd);
299 }
300 gpr_mu_destroy(&fd_freelist_mu);
301 }
302
fork_fd_list_add_grpc_fd(grpc_fd * fd)303 static void fork_fd_list_add_grpc_fd(grpc_fd* fd) {
304 if (grpc_core::Fork::Enabled()) {
305 gpr_mu_lock(&fork_fd_list_mu);
306 fd->fork_fd_list =
307 static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
308 fd->fork_fd_list->next = fork_fd_list_head;
309 fd->fork_fd_list->prev = nullptr;
310 if (fork_fd_list_head != nullptr) {
311 fork_fd_list_head->fork_fd_list->prev = fd;
312 }
313 fork_fd_list_head = fd;
314 gpr_mu_unlock(&fork_fd_list_mu);
315 }
316 }
317
fork_fd_list_remove_grpc_fd(grpc_fd * fd)318 static void fork_fd_list_remove_grpc_fd(grpc_fd* fd) {
319 if (grpc_core::Fork::Enabled()) {
320 gpr_mu_lock(&fork_fd_list_mu);
321 if (fork_fd_list_head == fd) {
322 fork_fd_list_head = fd->fork_fd_list->next;
323 }
324 if (fd->fork_fd_list->prev != nullptr) {
325 fd->fork_fd_list->prev->fork_fd_list->next = fd->fork_fd_list->next;
326 }
327 if (fd->fork_fd_list->next != nullptr) {
328 fd->fork_fd_list->next->fork_fd_list->prev = fd->fork_fd_list->prev;
329 }
330 gpr_free(fd->fork_fd_list);
331 gpr_mu_unlock(&fork_fd_list_mu);
332 }
333 }
334
fd_create(int fd,const char * name,bool track_err)335 static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
336 grpc_fd* new_fd = nullptr;
337
338 gpr_mu_lock(&fd_freelist_mu);
339 if (fd_freelist != nullptr) {
340 new_fd = fd_freelist;
341 fd_freelist = fd_freelist->freelist_next;
342 }
343 gpr_mu_unlock(&fd_freelist_mu);
344
345 if (new_fd == nullptr) {
346 new_fd = static_cast<grpc_fd*>(gpr_malloc(sizeof(grpc_fd)));
347 new_fd->read_closure.Init();
348 new_fd->write_closure.Init();
349 new_fd->error_closure.Init();
350 }
351 new_fd->fd = fd;
352 new_fd->read_closure->InitEvent();
353 new_fd->write_closure->InitEvent();
354 new_fd->error_closure->InitEvent();
355
356 new_fd->freelist_next = nullptr;
357 new_fd->is_pre_allocated = false;
358
359 std::string fd_name = absl::StrCat(name, " fd=", fd);
360 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name.c_str());
361 fork_fd_list_add_grpc_fd(new_fd);
362 #ifndef NDEBUG
363 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_fd_refcount)) {
364 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, new_fd, fd_name.c_str());
365 }
366 #endif
367
368 struct epoll_event ev;
369 ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLOUT | EPOLLET);
370 // Use the least significant bit of ev.data.ptr to store track_err. We expect
371 // the addresses to be word aligned. We need to store track_err to avoid
372 // synchronization issues when accessing it after receiving an event.
373 // Accessing fd would be a data race there because the fd might have been
374 // returned to the free list at that point.
375 ev.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(new_fd) |
376 (track_err ? 1 : 0));
377 if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
378 gpr_log(GPR_ERROR, "epoll_ctl failed: %s",
379 grpc_core::StrError(errno).c_str());
380 }
381
382 return new_fd;
383 }
384
fd_wrapped_fd(grpc_fd * fd)385 static int fd_wrapped_fd(grpc_fd* fd) { return fd->fd; }
386
387 // if 'releasing_fd' is true, it means that we are going to detach the internal
388 // fd from grpc_fd structure (i.e which means we should not be calling
389 // shutdown() syscall on that fd)
fd_shutdown_internal(grpc_fd * fd,grpc_error_handle why,bool releasing_fd)390 static void fd_shutdown_internal(grpc_fd* fd, grpc_error_handle why,
391 bool releasing_fd) {
392 if (fd->read_closure->SetShutdown(why)) {
393 if (!releasing_fd) {
394 if (!fd->is_pre_allocated) {
395 shutdown(fd->fd, SHUT_RDWR);
396 }
397 } else {
398 // we need a phony event for earlier linux versions.
399 epoll_event phony_event;
400 if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_DEL, fd->fd, &phony_event) !=
401 0) {
402 gpr_log(GPR_ERROR, "epoll_ctl failed: %s",
403 grpc_core::StrError(errno).c_str());
404 }
405 }
406 fd->write_closure->SetShutdown(why);
407 fd->error_closure->SetShutdown(why);
408 }
409 }
410
411 // Might be called multiple times
fd_shutdown(grpc_fd * fd,grpc_error_handle why)412 static void fd_shutdown(grpc_fd* fd, grpc_error_handle why) {
413 fd_shutdown_internal(fd, why, false);
414 }
415
fd_orphan(grpc_fd * fd,grpc_closure * on_done,int * release_fd,const char * reason)416 static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
417 const char* reason) {
418 grpc_error_handle error;
419 bool is_release_fd = (release_fd != nullptr);
420
421 if (!fd->read_closure->IsShutdown()) {
422 fd_shutdown_internal(fd, GRPC_ERROR_CREATE(reason), is_release_fd);
423 }
424
425 // If release_fd is not NULL, we should be relinquishing control of the file
426 // descriptor fd->fd (but we still own the grpc_fd structure).
427 if (is_release_fd) {
428 *release_fd = fd->fd;
429 } else {
430 if (!fd->is_pre_allocated) {
431 close(fd->fd);
432 }
433 }
434
435 grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, error);
436
437 grpc_iomgr_unregister_object(&fd->iomgr_object);
438 fork_fd_list_remove_grpc_fd(fd);
439 fd->read_closure->DestroyEvent();
440 fd->write_closure->DestroyEvent();
441 fd->error_closure->DestroyEvent();
442
443 gpr_mu_lock(&fd_freelist_mu);
444 fd->freelist_next = fd_freelist;
445 fd_freelist = fd;
446 gpr_mu_unlock(&fd_freelist_mu);
447 }
448
fd_is_shutdown(grpc_fd * fd)449 static bool fd_is_shutdown(grpc_fd* fd) {
450 return fd->read_closure->IsShutdown();
451 }
452
fd_notify_on_read(grpc_fd * fd,grpc_closure * closure)453 static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) {
454 fd->read_closure->NotifyOn(closure);
455 }
456
fd_notify_on_write(grpc_fd * fd,grpc_closure * closure)457 static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
458 fd->write_closure->NotifyOn(closure);
459 }
460
fd_notify_on_error(grpc_fd * fd,grpc_closure * closure)461 static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
462 fd->error_closure->NotifyOn(closure);
463 }
464
fd_become_readable(grpc_fd * fd)465 static void fd_become_readable(grpc_fd* fd) { fd->read_closure->SetReady(); }
466
fd_become_writable(grpc_fd * fd)467 static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
468
fd_has_errors(grpc_fd * fd)469 static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
470
fd_set_pre_allocated(grpc_fd * fd)471 static void fd_set_pre_allocated(grpc_fd* fd) { fd->is_pre_allocated = true; }
472
473 //******************************************************************************
474 // Pollset Definitions
475 //
476
477 static thread_local grpc_pollset* g_current_thread_pollset;
478 static thread_local grpc_pollset_worker* g_current_thread_worker;
479
480 // The designated poller
481 static gpr_atm g_active_poller;
482
483 static pollset_neighborhood* g_neighborhoods;
484 static size_t g_num_neighborhoods;
485
486 // Return true if first in list
worker_insert(grpc_pollset * pollset,grpc_pollset_worker * worker)487 static bool worker_insert(grpc_pollset* pollset, grpc_pollset_worker* worker) {
488 if (pollset->root_worker == nullptr) {
489 pollset->root_worker = worker;
490 worker->next = worker->prev = worker;
491 return true;
492 } else {
493 worker->next = pollset->root_worker;
494 worker->prev = worker->next->prev;
495 worker->next->prev = worker;
496 worker->prev->next = worker;
497 return false;
498 }
499 }
500
501 // Return true if last in list
502 typedef enum { EMPTIED, NEW_ROOT, REMOVED } worker_remove_result;
503
worker_remove(grpc_pollset * pollset,grpc_pollset_worker * worker)504 static worker_remove_result worker_remove(grpc_pollset* pollset,
505 grpc_pollset_worker* worker) {
506 if (worker == pollset->root_worker) {
507 if (worker == worker->next) {
508 pollset->root_worker = nullptr;
509 return EMPTIED;
510 } else {
511 pollset->root_worker = worker->next;
512 worker->prev->next = worker->next;
513 worker->next->prev = worker->prev;
514 return NEW_ROOT;
515 }
516 } else {
517 worker->prev->next = worker->next;
518 worker->next->prev = worker->prev;
519 return REMOVED;
520 }
521 }
522
choose_neighborhood(void)523 static size_t choose_neighborhood(void) {
524 return static_cast<size_t>(gpr_cpu_current_cpu()) % g_num_neighborhoods;
525 }
526
pollset_global_init(void)527 static grpc_error_handle pollset_global_init(void) {
528 gpr_atm_no_barrier_store(&g_active_poller, 0);
529 global_wakeup_fd.read_fd = -1;
530 grpc_error_handle err = grpc_wakeup_fd_init(&global_wakeup_fd);
531 if (!err.ok()) return err;
532 struct epoll_event ev;
533 ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLET);
534 ev.data.ptr = &global_wakeup_fd;
535 if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd,
536 &ev) != 0) {
537 return GRPC_OS_ERROR(errno, "epoll_ctl");
538 }
539 g_num_neighborhoods =
540 grpc_core::Clamp(gpr_cpu_num_cores(), 1u, MAX_NEIGHBORHOODS);
541 g_neighborhoods = static_cast<pollset_neighborhood*>(
542 gpr_zalloc(sizeof(*g_neighborhoods) * g_num_neighborhoods));
543 for (size_t i = 0; i < g_num_neighborhoods; i++) {
544 gpr_mu_init(&g_neighborhoods[i].mu);
545 }
546 return absl::OkStatus();
547 }
548
pollset_global_shutdown(void)549 static void pollset_global_shutdown(void) {
550 if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd);
551 for (size_t i = 0; i < g_num_neighborhoods; i++) {
552 gpr_mu_destroy(&g_neighborhoods[i].mu);
553 }
554 gpr_free(g_neighborhoods);
555 }
556
pollset_init(grpc_pollset * pollset,gpr_mu ** mu)557 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
558 gpr_mu_init(&pollset->mu);
559 *mu = &pollset->mu;
560 pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
561 pollset->reassigning_neighborhood = false;
562 pollset->root_worker = nullptr;
563 pollset->kicked_without_poller = false;
564 pollset->seen_inactive = true;
565 pollset->shutting_down = false;
566 pollset->shutdown_closure = nullptr;
567 pollset->begin_refs = 0;
568 pollset->next = pollset->prev = nullptr;
569 }
570
pollset_destroy(grpc_pollset * pollset)571 static void pollset_destroy(grpc_pollset* pollset) {
572 gpr_mu_lock(&pollset->mu);
573 if (!pollset->seen_inactive) {
574 pollset_neighborhood* neighborhood = pollset->neighborhood;
575 gpr_mu_unlock(&pollset->mu);
576 retry_lock_neighborhood:
577 gpr_mu_lock(&neighborhood->mu);
578 gpr_mu_lock(&pollset->mu);
579 if (!pollset->seen_inactive) {
580 if (pollset->neighborhood != neighborhood) {
581 gpr_mu_unlock(&neighborhood->mu);
582 neighborhood = pollset->neighborhood;
583 gpr_mu_unlock(&pollset->mu);
584 goto retry_lock_neighborhood;
585 }
586 pollset->prev->next = pollset->next;
587 pollset->next->prev = pollset->prev;
588 if (pollset == pollset->neighborhood->active_root) {
589 pollset->neighborhood->active_root =
590 pollset->next == pollset ? nullptr : pollset->next;
591 }
592 }
593 gpr_mu_unlock(&pollset->neighborhood->mu);
594 }
595 gpr_mu_unlock(&pollset->mu);
596 gpr_mu_destroy(&pollset->mu);
597 }
598
pollset_kick_all(grpc_pollset * pollset)599 static grpc_error_handle pollset_kick_all(grpc_pollset* pollset) {
600 grpc_error_handle error;
601 if (pollset->root_worker != nullptr) {
602 grpc_pollset_worker* worker = pollset->root_worker;
603 do {
604 switch (worker->state) {
605 case KICKED:
606 break;
607 case UNKICKED:
608 SET_KICK_STATE(worker, KICKED);
609 if (worker->initialized_cv) {
610 gpr_cv_signal(&worker->cv);
611 }
612 break;
613 case DESIGNATED_POLLER:
614 SET_KICK_STATE(worker, KICKED);
615 append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
616 "pollset_kick_all");
617 break;
618 }
619
620 worker = worker->next;
621 } while (worker != pollset->root_worker);
622 }
623 // TODO(sreek): Check if we need to set 'kicked_without_poller' to true here
624 // in the else case
625 return error;
626 }
627
pollset_maybe_finish_shutdown(grpc_pollset * pollset)628 static void pollset_maybe_finish_shutdown(grpc_pollset* pollset) {
629 if (pollset->shutdown_closure != nullptr && pollset->root_worker == nullptr &&
630 pollset->begin_refs == 0) {
631 grpc_core::ExecCtx::Run(DEBUG_LOCATION, pollset->shutdown_closure,
632 absl::OkStatus());
633 pollset->shutdown_closure = nullptr;
634 }
635 }
636
pollset_shutdown(grpc_pollset * pollset,grpc_closure * closure)637 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
638 GPR_ASSERT(pollset->shutdown_closure == nullptr);
639 GPR_ASSERT(!pollset->shutting_down);
640 pollset->shutdown_closure = closure;
641 pollset->shutting_down = true;
642 GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset));
643 pollset_maybe_finish_shutdown(pollset);
644 }
645
poll_deadline_to_millis_timeout(grpc_core::Timestamp millis)646 static int poll_deadline_to_millis_timeout(grpc_core::Timestamp millis) {
647 if (millis == grpc_core::Timestamp::InfFuture()) return -1;
648 int64_t delta = (millis - grpc_core::Timestamp::Now()).millis();
649 if (delta > INT_MAX) {
650 return INT_MAX;
651 } else if (delta < 0) {
652 return 0;
653 } else {
654 return static_cast<int>(delta);
655 }
656 }
657
658 // Process the epoll events found by do_epoll_wait() function.
659 // - g_epoll_set.cursor points to the index of the first event to be processed
660 // - This function then processes up-to MAX_EPOLL_EVENTS_PER_ITERATION and
661 // updates the g_epoll_set.cursor
662
663 // NOTE ON SYNCRHONIZATION: Similar to do_epoll_wait(), this function is only
664 // called by g_active_poller thread. So there is no need for synchronization
665 // when accessing fields in g_epoll_set
process_epoll_events(grpc_pollset *)666 static grpc_error_handle process_epoll_events(grpc_pollset* /*pollset*/) {
667 static const char* err_desc = "process_events";
668 grpc_error_handle error;
669 long num_events = gpr_atm_acq_load(&g_epoll_set.num_events);
670 long cursor = gpr_atm_acq_load(&g_epoll_set.cursor);
671 for (int idx = 0;
672 (idx < MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION) && cursor != num_events;
673 idx++) {
674 long c = cursor++;
675 struct epoll_event* ev = &g_epoll_set.events[c];
676 void* data_ptr = ev->data.ptr;
677
678 if (data_ptr == &global_wakeup_fd) {
679 append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
680 err_desc);
681 } else {
682 grpc_fd* fd = reinterpret_cast<grpc_fd*>(
683 reinterpret_cast<intptr_t>(data_ptr) & ~intptr_t{1});
684 bool track_err = reinterpret_cast<intptr_t>(data_ptr) & intptr_t{1};
685 bool cancel = (ev->events & EPOLLHUP) != 0;
686 bool error = (ev->events & EPOLLERR) != 0;
687 bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
688 bool write_ev = (ev->events & EPOLLOUT) != 0;
689 bool err_fallback = error && !track_err;
690
691 if (error && !err_fallback) {
692 fd_has_errors(fd);
693 }
694
695 if (read_ev || cancel || err_fallback) {
696 fd_become_readable(fd);
697 }
698
699 if (write_ev || cancel || err_fallback) {
700 fd_become_writable(fd);
701 }
702 }
703 }
704 gpr_atm_rel_store(&g_epoll_set.cursor, cursor);
705 return error;
706 }
707
708 // Do epoll_wait and store the events in g_epoll_set.events field. This does not
709 // "process" any of the events yet; that is done in process_epoll_events().
710 // *See process_epoll_events() function for more details.
711
712 // NOTE ON SYNCHRONIZATION: At any point of time, only the g_active_poller
713 // (i.e the designated poller thread) will be calling this function. So there is
714 // no need for any synchronization when accesing fields in g_epoll_set
do_epoll_wait(grpc_pollset * ps,grpc_core::Timestamp deadline)715 static grpc_error_handle do_epoll_wait(grpc_pollset* ps,
716 grpc_core::Timestamp deadline) {
717 int r;
718 int timeout = poll_deadline_to_millis_timeout(deadline);
719 if (timeout != 0) {
720 GRPC_SCHEDULING_START_BLOCKING_REGION;
721 }
722 do {
723 r = epoll_wait(g_epoll_set.epfd, g_epoll_set.events, MAX_EPOLL_EVENTS,
724 timeout);
725 } while (r < 0 && errno == EINTR);
726 if (timeout != 0) {
727 GRPC_SCHEDULING_END_BLOCKING_REGION;
728 }
729
730 if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
731
732 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
733 gpr_log(GPR_INFO, "ps: %p poll got %d events", ps, r);
734 }
735
736 gpr_atm_rel_store(&g_epoll_set.num_events, r);
737 gpr_atm_rel_store(&g_epoll_set.cursor, 0);
738
739 return absl::OkStatus();
740 }
741
begin_worker(grpc_pollset * pollset,grpc_pollset_worker * worker,grpc_pollset_worker ** worker_hdl,grpc_core::Timestamp deadline)742 static bool begin_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
743 grpc_pollset_worker** worker_hdl,
744 grpc_core::Timestamp deadline) {
745 if (worker_hdl != nullptr) *worker_hdl = worker;
746 worker->initialized_cv = false;
747 SET_KICK_STATE(worker, UNKICKED);
748 worker->schedule_on_end_work = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
749 pollset->begin_refs++;
750
751 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
752 gpr_log(GPR_INFO, "PS:%p BEGIN_STARTS:%p", pollset, worker);
753 }
754
755 if (pollset->seen_inactive) {
756 // pollset has been observed to be inactive, we need to move back to the
757 // active list
758 bool is_reassigning = false;
759 if (!pollset->reassigning_neighborhood) {
760 is_reassigning = true;
761 pollset->reassigning_neighborhood = true;
762 pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
763 }
764 pollset_neighborhood* neighborhood = pollset->neighborhood;
765 gpr_mu_unlock(&pollset->mu);
766 // pollset unlocked: state may change (even worker->kick_state)
767 retry_lock_neighborhood:
768 gpr_mu_lock(&neighborhood->mu);
769 gpr_mu_lock(&pollset->mu);
770 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
771 gpr_log(GPR_INFO, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d",
772 pollset, worker, kick_state_string(worker->state),
773 is_reassigning);
774 }
775 if (pollset->seen_inactive) {
776 if (neighborhood != pollset->neighborhood) {
777 gpr_mu_unlock(&neighborhood->mu);
778 neighborhood = pollset->neighborhood;
779 gpr_mu_unlock(&pollset->mu);
780 goto retry_lock_neighborhood;
781 }
782
783 // In the brief time we released the pollset locks above, the worker MAY
784 // have been kicked. In this case, the worker should get out of this
785 // pollset ASAP and hence this should neither add the pollset to
786 // neighborhood nor mark the pollset as active.
787
788 // On a side note, the only way a worker's kick state could have changed
789 // at this point is if it were "kicked specifically". Since the worker has
790 // not added itself to the pollset yet (by calling worker_insert()), it is
791 // not visible in the "kick any" path yet
792 if (worker->state == UNKICKED) {
793 pollset->seen_inactive = false;
794 if (neighborhood->active_root == nullptr) {
795 neighborhood->active_root = pollset->next = pollset->prev = pollset;
796 // Make this the designated poller if there isn't one already
797 if (worker->state == UNKICKED &&
798 gpr_atm_no_barrier_cas(&g_active_poller, 0,
799 reinterpret_cast<gpr_atm>(worker))) {
800 SET_KICK_STATE(worker, DESIGNATED_POLLER);
801 }
802 } else {
803 pollset->next = neighborhood->active_root;
804 pollset->prev = pollset->next->prev;
805 pollset->next->prev = pollset->prev->next = pollset;
806 }
807 }
808 }
809 if (is_reassigning) {
810 GPR_ASSERT(pollset->reassigning_neighborhood);
811 pollset->reassigning_neighborhood = false;
812 }
813 gpr_mu_unlock(&neighborhood->mu);
814 }
815
816 worker_insert(pollset, worker);
817 pollset->begin_refs--;
818 if (worker->state == UNKICKED && !pollset->kicked_without_poller) {
819 GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
820 worker->initialized_cv = true;
821 gpr_cv_init(&worker->cv);
822 while (worker->state == UNKICKED && !pollset->shutting_down) {
823 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
824 gpr_log(GPR_INFO, "PS:%p BEGIN_WAIT:%p kick_state=%s shutdown=%d",
825 pollset, worker, kick_state_string(worker->state),
826 pollset->shutting_down);
827 }
828
829 if (gpr_cv_wait(&worker->cv, &pollset->mu,
830 deadline.as_timespec(GPR_CLOCK_MONOTONIC)) &&
831 worker->state == UNKICKED) {
832 // If gpr_cv_wait returns true (i.e a timeout), pretend that the worker
833 // received a kick
834 SET_KICK_STATE(worker, KICKED);
835 }
836 }
837 grpc_core::ExecCtx::Get()->InvalidateNow();
838 }
839
840 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
841 gpr_log(GPR_INFO,
842 "PS:%p BEGIN_DONE:%p kick_state=%s shutdown=%d "
843 "kicked_without_poller: %d",
844 pollset, worker, kick_state_string(worker->state),
845 pollset->shutting_down, pollset->kicked_without_poller);
846 }
847
848 // We release pollset lock in this function at a couple of places:
849 // 1. Briefly when assigning pollset to a neighborhood
850 // 2. When doing gpr_cv_wait()
851 // It is possible that 'kicked_without_poller' was set to true during (1) and
852 // 'shutting_down' is set to true during (1) or (2). If either of them is
853 // true, this worker cannot do polling
854 // TODO(sreek): Perhaps there is a better way to handle kicked_without_poller
855 // case; especially when the worker is the DESIGNATED_POLLER
856
857 if (pollset->kicked_without_poller) {
858 pollset->kicked_without_poller = false;
859 return false;
860 }
861
862 return worker->state == DESIGNATED_POLLER && !pollset->shutting_down;
863 }
864
check_neighborhood_for_available_poller(pollset_neighborhood * neighborhood)865 static bool check_neighborhood_for_available_poller(
866 pollset_neighborhood* neighborhood) {
867 bool found_worker = false;
868 do {
869 grpc_pollset* inspect = neighborhood->active_root;
870 if (inspect == nullptr) {
871 break;
872 }
873 gpr_mu_lock(&inspect->mu);
874 GPR_ASSERT(!inspect->seen_inactive);
875 grpc_pollset_worker* inspect_worker = inspect->root_worker;
876 if (inspect_worker != nullptr) {
877 do {
878 switch (inspect_worker->state) {
879 case UNKICKED:
880 if (gpr_atm_no_barrier_cas(
881 &g_active_poller, 0,
882 reinterpret_cast<gpr_atm>(inspect_worker))) {
883 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
884 gpr_log(GPR_INFO, " .. choose next poller to be %p",
885 inspect_worker);
886 }
887 SET_KICK_STATE(inspect_worker, DESIGNATED_POLLER);
888 if (inspect_worker->initialized_cv) {
889 gpr_cv_signal(&inspect_worker->cv);
890 }
891 } else {
892 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
893 gpr_log(GPR_INFO, " .. beaten to choose next poller");
894 }
895 }
896 // even if we didn't win the cas, there's a worker, we can stop
897 found_worker = true;
898 break;
899 case KICKED:
900 break;
901 case DESIGNATED_POLLER:
902 found_worker = true; // ok, so someone else found the worker, but
903 // we'll accept that
904 break;
905 }
906 inspect_worker = inspect_worker->next;
907 } while (!found_worker && inspect_worker != inspect->root_worker);
908 }
909 if (!found_worker) {
910 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
911 gpr_log(GPR_INFO, " .. mark pollset %p inactive", inspect);
912 }
913 inspect->seen_inactive = true;
914 if (inspect == neighborhood->active_root) {
915 neighborhood->active_root =
916 inspect->next == inspect ? nullptr : inspect->next;
917 }
918 inspect->next->prev = inspect->prev;
919 inspect->prev->next = inspect->next;
920 inspect->next = inspect->prev = nullptr;
921 }
922 gpr_mu_unlock(&inspect->mu);
923 } while (!found_worker);
924 return found_worker;
925 }
926
end_worker(grpc_pollset * pollset,grpc_pollset_worker * worker,grpc_pollset_worker ** worker_hdl)927 static void end_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
928 grpc_pollset_worker** worker_hdl) {
929 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
930 gpr_log(GPR_INFO, "PS:%p END_WORKER:%p", pollset, worker);
931 }
932 if (worker_hdl != nullptr) *worker_hdl = nullptr;
933 // Make sure we appear kicked
934 SET_KICK_STATE(worker, KICKED);
935 grpc_closure_list_move(&worker->schedule_on_end_work,
936 grpc_core::ExecCtx::Get()->closure_list());
937 if (gpr_atm_no_barrier_load(&g_active_poller) ==
938 reinterpret_cast<gpr_atm>(worker)) {
939 if (worker->next != worker && worker->next->state == UNKICKED) {
940 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
941 gpr_log(GPR_INFO, " .. choose next poller to be peer %p", worker);
942 }
943 GPR_ASSERT(worker->next->initialized_cv);
944 gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next);
945 SET_KICK_STATE(worker->next, DESIGNATED_POLLER);
946 gpr_cv_signal(&worker->next->cv);
947 if (grpc_core::ExecCtx::Get()->HasWork()) {
948 gpr_mu_unlock(&pollset->mu);
949 grpc_core::ExecCtx::Get()->Flush();
950 gpr_mu_lock(&pollset->mu);
951 }
952 } else {
953 gpr_atm_no_barrier_store(&g_active_poller, 0);
954 size_t poller_neighborhood_idx =
955 static_cast<size_t>(pollset->neighborhood - g_neighborhoods);
956 gpr_mu_unlock(&pollset->mu);
957 bool found_worker = false;
958 bool scan_state[MAX_NEIGHBORHOODS];
959 for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) {
960 pollset_neighborhood* neighborhood =
961 &g_neighborhoods[(poller_neighborhood_idx + i) %
962 g_num_neighborhoods];
963 if (gpr_mu_trylock(&neighborhood->mu)) {
964 found_worker = check_neighborhood_for_available_poller(neighborhood);
965 gpr_mu_unlock(&neighborhood->mu);
966 scan_state[i] = true;
967 } else {
968 scan_state[i] = false;
969 }
970 }
971 for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) {
972 if (scan_state[i]) continue;
973 pollset_neighborhood* neighborhood =
974 &g_neighborhoods[(poller_neighborhood_idx + i) %
975 g_num_neighborhoods];
976 gpr_mu_lock(&neighborhood->mu);
977 found_worker = check_neighborhood_for_available_poller(neighborhood);
978 gpr_mu_unlock(&neighborhood->mu);
979 }
980 grpc_core::ExecCtx::Get()->Flush();
981 gpr_mu_lock(&pollset->mu);
982 }
983 } else if (grpc_core::ExecCtx::Get()->HasWork()) {
984 gpr_mu_unlock(&pollset->mu);
985 grpc_core::ExecCtx::Get()->Flush();
986 gpr_mu_lock(&pollset->mu);
987 }
988 if (worker->initialized_cv) {
989 gpr_cv_destroy(&worker->cv);
990 }
991 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
992 gpr_log(GPR_INFO, " .. remove worker");
993 }
994 if (EMPTIED == worker_remove(pollset, worker)) {
995 pollset_maybe_finish_shutdown(pollset);
996 }
997 GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
998 }
999
1000 // pollset->po.mu lock must be held by the caller before calling this.
1001 // The function pollset_work() may temporarily release the lock (pollset->po.mu)
1002 // during the course of its execution but it will always re-acquire the lock and
1003 // ensure that it is held by the time the function returns
pollset_work(grpc_pollset * ps,grpc_pollset_worker ** worker_hdl,grpc_core::Timestamp deadline)1004 static grpc_error_handle pollset_work(grpc_pollset* ps,
1005 grpc_pollset_worker** worker_hdl,
1006 grpc_core::Timestamp deadline) {
1007 grpc_pollset_worker worker;
1008 grpc_error_handle error;
1009 static const char* err_desc = "pollset_work";
1010 if (ps->kicked_without_poller) {
1011 ps->kicked_without_poller = false;
1012 return absl::OkStatus();
1013 }
1014
1015 if (begin_worker(ps, &worker, worker_hdl, deadline)) {
1016 g_current_thread_pollset = ps;
1017 g_current_thread_worker = &worker;
1018 GPR_ASSERT(!ps->shutting_down);
1019 GPR_ASSERT(!ps->seen_inactive);
1020
1021 gpr_mu_unlock(&ps->mu); // unlock
1022 // This is the designated polling thread at this point and should ideally do
1023 // polling. However, if there are unprocessed events left from a previous
1024 // call to do_epoll_wait(), skip calling epoll_wait() in this iteration and
1025 // process the pending epoll events.
1026
1027 // The reason for decoupling do_epoll_wait and process_epoll_events is to
1028 // better distribute the work (i.e handling epoll events) across multiple
1029 // threads
1030
1031 // process_epoll_events() returns very quickly: It just queues the work on
1032 // exec_ctx but does not execute it (the actual exectution or more
1033 // accurately grpc_core::ExecCtx::Get()->Flush() happens in end_worker()
1034 // AFTER selecting a designated poller). So we are not waiting long periods
1035 // without a designated poller
1036 if (gpr_atm_acq_load(&g_epoll_set.cursor) ==
1037 gpr_atm_acq_load(&g_epoll_set.num_events)) {
1038 append_error(&error, do_epoll_wait(ps, deadline), err_desc);
1039 }
1040 append_error(&error, process_epoll_events(ps), err_desc);
1041
1042 gpr_mu_lock(&ps->mu); // lock
1043
1044 g_current_thread_worker = nullptr;
1045 } else {
1046 g_current_thread_pollset = ps;
1047 }
1048 end_worker(ps, &worker, worker_hdl);
1049
1050 g_current_thread_pollset = nullptr;
1051 return error;
1052 }
1053
pollset_kick(grpc_pollset * pollset,grpc_pollset_worker * specific_worker)1054 static grpc_error_handle pollset_kick(grpc_pollset* pollset,
1055 grpc_pollset_worker* specific_worker) {
1056 grpc_error_handle ret_err;
1057 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1058 std::vector<std::string> log;
1059 log.push_back(absl::StrFormat(
1060 "PS:%p KICK:%p curps=%p curworker=%p root=%p", pollset, specific_worker,
1061 static_cast<void*>(g_current_thread_pollset),
1062 static_cast<void*>(g_current_thread_worker), pollset->root_worker));
1063 if (pollset->root_worker != nullptr) {
1064 log.push_back(absl::StrFormat(
1065 " {kick_state=%s next=%p {kick_state=%s}}",
1066 kick_state_string(pollset->root_worker->state),
1067 pollset->root_worker->next,
1068 kick_state_string(pollset->root_worker->next->state)));
1069 }
1070 if (specific_worker != nullptr) {
1071 log.push_back(absl::StrFormat(" worker_kick_state=%s",
1072 kick_state_string(specific_worker->state)));
1073 }
1074 gpr_log(GPR_DEBUG, "%s", absl::StrJoin(log, "").c_str());
1075 }
1076
1077 if (specific_worker == nullptr) {
1078 if (g_current_thread_pollset != pollset) {
1079 grpc_pollset_worker* root_worker = pollset->root_worker;
1080 if (root_worker == nullptr) {
1081 pollset->kicked_without_poller = true;
1082 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1083 gpr_log(GPR_INFO, " .. kicked_without_poller");
1084 }
1085 goto done;
1086 }
1087 grpc_pollset_worker* next_worker = root_worker->next;
1088 if (root_worker->state == KICKED) {
1089 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1090 gpr_log(GPR_INFO, " .. already kicked %p", root_worker);
1091 }
1092 SET_KICK_STATE(root_worker, KICKED);
1093 goto done;
1094 } else if (next_worker->state == KICKED) {
1095 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1096 gpr_log(GPR_INFO, " .. already kicked %p", next_worker);
1097 }
1098 SET_KICK_STATE(next_worker, KICKED);
1099 goto done;
1100 } else if (root_worker == next_worker && // only try and wake up a poller
1101 // if there is no next worker
1102 root_worker ==
1103 reinterpret_cast<grpc_pollset_worker*>(
1104 gpr_atm_no_barrier_load(&g_active_poller))) {
1105 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1106 gpr_log(GPR_INFO, " .. kicked %p", root_worker);
1107 }
1108 SET_KICK_STATE(root_worker, KICKED);
1109 ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1110 goto done;
1111 } else if (next_worker->state == UNKICKED) {
1112 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1113 gpr_log(GPR_INFO, " .. kicked %p", next_worker);
1114 }
1115 GPR_ASSERT(next_worker->initialized_cv);
1116 SET_KICK_STATE(next_worker, KICKED);
1117 gpr_cv_signal(&next_worker->cv);
1118 goto done;
1119 } else if (next_worker->state == DESIGNATED_POLLER) {
1120 if (root_worker->state != DESIGNATED_POLLER) {
1121 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1122 gpr_log(
1123 GPR_INFO,
1124 " .. kicked root non-poller %p (initialized_cv=%d) (poller=%p)",
1125 root_worker, root_worker->initialized_cv, next_worker);
1126 }
1127 SET_KICK_STATE(root_worker, KICKED);
1128 if (root_worker->initialized_cv) {
1129 gpr_cv_signal(&root_worker->cv);
1130 }
1131 goto done;
1132 } else {
1133 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1134 gpr_log(GPR_INFO, " .. non-root poller %p (root=%p)", next_worker,
1135 root_worker);
1136 }
1137 SET_KICK_STATE(next_worker, KICKED);
1138 ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1139 goto done;
1140 }
1141 } else {
1142 GPR_ASSERT(next_worker->state == KICKED);
1143 SET_KICK_STATE(next_worker, KICKED);
1144 goto done;
1145 }
1146 } else {
1147 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1148 gpr_log(GPR_INFO, " .. kicked while waking up");
1149 }
1150 goto done;
1151 }
1152
1153 GPR_UNREACHABLE_CODE(goto done);
1154 }
1155
1156 if (specific_worker->state == KICKED) {
1157 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1158 gpr_log(GPR_INFO, " .. specific worker already kicked");
1159 }
1160 goto done;
1161 } else if (g_current_thread_worker == specific_worker) {
1162 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1163 gpr_log(GPR_INFO, " .. mark %p kicked", specific_worker);
1164 }
1165 SET_KICK_STATE(specific_worker, KICKED);
1166 goto done;
1167 } else if (specific_worker ==
1168 reinterpret_cast<grpc_pollset_worker*>(
1169 gpr_atm_no_barrier_load(&g_active_poller))) {
1170 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1171 gpr_log(GPR_INFO, " .. kick active poller");
1172 }
1173 SET_KICK_STATE(specific_worker, KICKED);
1174 ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1175 goto done;
1176 } else if (specific_worker->initialized_cv) {
1177 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1178 gpr_log(GPR_INFO, " .. kick waiting worker");
1179 }
1180 SET_KICK_STATE(specific_worker, KICKED);
1181 gpr_cv_signal(&specific_worker->cv);
1182 goto done;
1183 } else {
1184 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1185 gpr_log(GPR_INFO, " .. kick non-waiting worker");
1186 }
1187 SET_KICK_STATE(specific_worker, KICKED);
1188 goto done;
1189 }
1190 done:
1191 return ret_err;
1192 }
1193
pollset_add_fd(grpc_pollset *,grpc_fd *)1194 static void pollset_add_fd(grpc_pollset* /*pollset*/, grpc_fd* /*fd*/) {}
1195
1196 //******************************************************************************
1197 // Pollset-set Definitions
1198 //
1199
pollset_set_create(void)1200 static grpc_pollset_set* pollset_set_create(void) {
1201 return reinterpret_cast<grpc_pollset_set*>(static_cast<intptr_t>(0xdeafbeef));
1202 }
1203
pollset_set_destroy(grpc_pollset_set *)1204 static void pollset_set_destroy(grpc_pollset_set* /*pss*/) {}
1205
pollset_set_add_fd(grpc_pollset_set *,grpc_fd *)1206 static void pollset_set_add_fd(grpc_pollset_set* /*pss*/, grpc_fd* /*fd*/) {}
1207
pollset_set_del_fd(grpc_pollset_set *,grpc_fd *)1208 static void pollset_set_del_fd(grpc_pollset_set* /*pss*/, grpc_fd* /*fd*/) {}
1209
pollset_set_add_pollset(grpc_pollset_set *,grpc_pollset *)1210 static void pollset_set_add_pollset(grpc_pollset_set* /*pss*/,
1211 grpc_pollset* /*ps*/) {}
1212
pollset_set_del_pollset(grpc_pollset_set *,grpc_pollset *)1213 static void pollset_set_del_pollset(grpc_pollset_set* /*pss*/,
1214 grpc_pollset* /*ps*/) {}
1215
pollset_set_add_pollset_set(grpc_pollset_set *,grpc_pollset_set *)1216 static void pollset_set_add_pollset_set(grpc_pollset_set* /*bag*/,
1217 grpc_pollset_set* /*item*/) {}
1218
pollset_set_del_pollset_set(grpc_pollset_set *,grpc_pollset_set *)1219 static void pollset_set_del_pollset_set(grpc_pollset_set* /*bag*/,
1220 grpc_pollset_set* /*item*/) {}
1221
1222 //******************************************************************************
1223 // Event engine binding
1224 //
1225
is_any_background_poller_thread(void)1226 static bool is_any_background_poller_thread(void) { return false; }
1227
shutdown_background_closure(void)1228 static void shutdown_background_closure(void) {}
1229
add_closure_to_background_poller(grpc_closure *,grpc_error_handle)1230 static bool add_closure_to_background_poller(grpc_closure* /*closure*/,
1231 grpc_error_handle /*error*/) {
1232 return false;
1233 }
1234
shutdown_engine(void)1235 static void shutdown_engine(void) {
1236 fd_global_shutdown();
1237 pollset_global_shutdown();
1238 epoll_set_shutdown();
1239 if (grpc_core::Fork::Enabled()) {
1240 gpr_mu_destroy(&fork_fd_list_mu);
1241 grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr);
1242 }
1243 g_is_shutdown = true;
1244 }
1245
1246 static bool init_epoll1_linux();
1247
1248 const grpc_event_engine_vtable grpc_ev_epoll1_posix = {
1249 sizeof(grpc_pollset),
1250 true,
1251 false,
1252
1253 fd_create,
1254 fd_wrapped_fd,
1255 fd_orphan,
1256 fd_shutdown,
1257 fd_notify_on_read,
1258 fd_notify_on_write,
1259 fd_notify_on_error,
1260 fd_become_readable,
1261 fd_become_writable,
1262 fd_has_errors,
1263 fd_is_shutdown,
1264
1265 pollset_init,
1266 pollset_shutdown,
1267 pollset_destroy,
1268 pollset_work,
1269 pollset_kick,
1270 pollset_add_fd,
1271
1272 pollset_set_create,
1273 pollset_set_destroy,
1274 pollset_set_add_pollset,
1275 pollset_set_del_pollset,
1276 pollset_set_add_pollset_set,
1277 pollset_set_del_pollset_set,
1278 pollset_set_add_fd,
1279 pollset_set_del_fd,
1280
1281 is_any_background_poller_thread,
1282 /* name = */ "epoll1",
1283 /* check_engine_available = */
__anon6dd34bdf0502() 1284 [](bool) { return init_epoll1_linux(); },
1285 /* init_engine = */
__anon6dd34bdf0602() 1286 []() { GPR_ASSERT(init_epoll1_linux()); },
1287 shutdown_background_closure,
1288 /* shutdown_engine = */
__anon6dd34bdf0702() 1289 []() { shutdown_engine(); },
1290 add_closure_to_background_poller,
1291
1292 fd_set_pre_allocated,
1293 };
1294
1295 // Called by the child process's post-fork handler to close open fds, including
1296 // the global epoll fd. This allows gRPC to shutdown in the child process
1297 // without interfering with connections or RPCs ongoing in the parent.
reset_event_manager_on_fork()1298 static void reset_event_manager_on_fork() {
1299 gpr_mu_lock(&fork_fd_list_mu);
1300 while (fork_fd_list_head != nullptr) {
1301 close(fork_fd_list_head->fd);
1302 fork_fd_list_head->fd = -1;
1303 fork_fd_list_head = fork_fd_list_head->fork_fd_list->next;
1304 }
1305 gpr_mu_unlock(&fork_fd_list_mu);
1306 shutdown_engine();
1307 init_epoll1_linux();
1308 }
1309
1310 // It is possible that GLIBC has epoll but the underlying kernel doesn't.
1311 // Create epoll_fd (epoll_set_init() takes care of that) to make sure epoll
1312 // support is available
init_epoll1_linux()1313 static bool init_epoll1_linux() {
1314 if (!g_is_shutdown) return true;
1315 if (!grpc_has_wakeup_fd()) {
1316 gpr_log(GPR_ERROR, "Skipping epoll1 because of no wakeup fd.");
1317 return false;
1318 }
1319
1320 if (!epoll_set_init()) {
1321 return false;
1322 }
1323
1324 fd_global_init();
1325
1326 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1327 fd_global_shutdown();
1328 epoll_set_shutdown();
1329 return false;
1330 }
1331
1332 if (grpc_core::Fork::Enabled()) {
1333 gpr_mu_init(&fork_fd_list_mu);
1334 grpc_core::Fork::SetResetChildPollingEngineFunc(
1335 reset_event_manager_on_fork);
1336 }
1337 g_is_shutdown = false;
1338 return true;
1339 }
1340
1341 #else // defined(GRPC_LINUX_EPOLL)
1342 #if defined(GRPC_POSIX_SOCKET_EV_EPOLL1)
1343 #include "src/core/lib/iomgr/ev_epoll1_linux.h"
1344 const grpc_event_engine_vtable grpc_ev_epoll1_posix = {
1345 1,
1346 true,
1347 false,
1348
1349 nullptr,
1350 nullptr,
1351 nullptr,
1352 nullptr,
1353 nullptr,
1354 nullptr,
1355 nullptr,
1356 nullptr,
1357 nullptr,
1358 nullptr,
1359 nullptr,
1360
1361 nullptr,
1362 nullptr,
1363 nullptr,
1364 nullptr,
1365 nullptr,
1366 nullptr,
1367
1368 nullptr,
1369 nullptr,
1370 nullptr,
1371 nullptr,
1372 nullptr,
1373 nullptr,
1374 nullptr,
1375 nullptr,
1376
1377 nullptr,
1378 /* name = */ "epoll1",
__anon6dd34bdf0802() 1379 /* check_engine_available = */ [](bool) { return false; },
1380 nullptr,
1381 nullptr,
1382 nullptr,
1383 nullptr,
1384 nullptr,
1385 };
1386 #endif // defined(GRPC_POSIX_SOCKET_EV_EPOLL1)
1387 #endif // !defined(GRPC_LINUX_EPOLL)
1388