1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include <inttypes.h>
22 
23 #include <string>
24 
25 #include "absl/strings/str_cat.h"
26 #include "absl/strings/str_format.h"
27 
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/cpu.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/sync.h>
32 
33 #include "src/core/lib/debug/trace.h"
34 #include "src/core/lib/gpr/spinlock.h"
35 #include "src/core/lib/gpr/useful.h"
36 #include "src/core/lib/gprpp/crash.h"
37 #include "src/core/lib/gprpp/manual_constructor.h"
38 #include "src/core/lib/gprpp/time.h"
39 #include "src/core/lib/gprpp/time_averaged_stats.h"
40 #include "src/core/lib/iomgr/exec_ctx.h"
41 #include "src/core/lib/iomgr/port.h"
42 #include "src/core/lib/iomgr/timer.h"
43 #include "src/core/lib/iomgr/timer_heap.h"
44 
45 #define INVALID_HEAP_INDEX 0xffffffffu
46 
47 #define ADD_DEADLINE_SCALE 0.33
48 #define MIN_QUEUE_WINDOW_DURATION 0.01
49 #define MAX_QUEUE_WINDOW_DURATION 1.0
50 
51 grpc_core::TraceFlag grpc_timer_trace(false, "timer");
52 grpc_core::TraceFlag grpc_timer_check_trace(false, "timer_check");
53 
54 // A "timer shard". Contains a 'heap' and a 'list' of timers. All timers with
55 // deadlines earlier than 'queue_deadline_cap' are maintained in the heap and
56 // others are maintained in the list (unordered). This helps to keep the number
57 // of elements in the heap low.
58 //
59 // The 'queue_deadline_cap' gets recomputed periodically based on the timer
60 // stats maintained in 'stats' and the relevant timers are then moved from the
61 // 'list' to 'heap'.
62 //
63 struct timer_shard {
64   gpr_mu mu;
65   grpc_core::ManualConstructor<grpc_core::TimeAveragedStats> stats;
66   // All and only timers with deadlines < this will be in the heap.
67   grpc_core::Timestamp queue_deadline_cap;
68   // The deadline of the next timer due in this shard.
69   grpc_core::Timestamp min_deadline;
70   // Index of this timer_shard in the g_shard_queue.
71   uint32_t shard_queue_index;
72   // This holds all timers with deadlines < queue_deadline_cap. Timers in this
73   // list have the top bit of their deadline set to 0.
74   grpc_timer_heap heap;
75   // This holds timers whose deadline is >= queue_deadline_cap.
76   grpc_timer list;
77 };
78 static size_t g_num_shards;
79 
80 // Array of timer shards. Whenever a timer (grpc_timer *) is added, its address
81 // is hashed to select the timer shard to add the timer to
82 static timer_shard* g_shards;
83 
84 // Maintains a sorted list of timer shards (sorted by their min_deadline, i.e
85 // the deadline of the next timer in each shard).
86 // Access to this is protected by g_shared_mutables.mu
87 static timer_shard** g_shard_queue;
88 
89 #ifndef NDEBUG
90 
91 // == DEBUG ONLY: hash table for duplicate timer detection ==
92 
93 #define NUM_HASH_BUCKETS 1009  // Prime number close to 1000
94 
95 static gpr_mu g_hash_mu[NUM_HASH_BUCKETS];  // One mutex per bucket
96 static grpc_timer* g_timer_ht[NUM_HASH_BUCKETS] = {nullptr};
97 
init_timer_ht()98 static void init_timer_ht() {
99   for (int i = 0; i < NUM_HASH_BUCKETS; i++) {
100     gpr_mu_init(&g_hash_mu[i]);
101   }
102 }
103 
destroy_timer_ht()104 static void destroy_timer_ht() {
105   for (int i = 0; i < NUM_HASH_BUCKETS; i++) {
106     gpr_mu_destroy(&g_hash_mu[i]);
107   }
108 }
109 
is_in_ht(grpc_timer * t)110 static bool is_in_ht(grpc_timer* t) {
111   size_t i = grpc_core::HashPointer(t, NUM_HASH_BUCKETS);
112 
113   gpr_mu_lock(&g_hash_mu[i]);
114   grpc_timer* p = g_timer_ht[i];
115   while (p != nullptr && p != t) {
116     p = p->hash_table_next;
117   }
118   gpr_mu_unlock(&g_hash_mu[i]);
119 
120   return (p == t);
121 }
122 
add_to_ht(grpc_timer * t)123 static void add_to_ht(grpc_timer* t) {
124   GPR_ASSERT(!t->hash_table_next);
125   size_t i = grpc_core::HashPointer(t, NUM_HASH_BUCKETS);
126 
127   gpr_mu_lock(&g_hash_mu[i]);
128   grpc_timer* p = g_timer_ht[i];
129   while (p != nullptr && p != t) {
130     p = p->hash_table_next;
131   }
132 
133   if (p == t) {
134     grpc_closure* c = t->closure;
135     grpc_core::Crash(absl::StrFormat(
136         "** Duplicate timer (%p) being added. Closure: (%p), created at: "
137         "(%s:%d), scheduled at: (%s:%d) **",
138         t, c, c->file_created, c->line_created, c->file_initiated,
139         c->line_initiated));
140   }
141 
142   // Timer not present in the bucket. Insert at head of the list
143   t->hash_table_next = g_timer_ht[i];
144   g_timer_ht[i] = t;
145   gpr_mu_unlock(&g_hash_mu[i]);
146 }
147 
remove_from_ht(grpc_timer * t)148 static void remove_from_ht(grpc_timer* t) {
149   size_t i = grpc_core::HashPointer(t, NUM_HASH_BUCKETS);
150   bool removed = false;
151 
152   gpr_mu_lock(&g_hash_mu[i]);
153   if (g_timer_ht[i] == t) {
154     g_timer_ht[i] = g_timer_ht[i]->hash_table_next;
155     removed = true;
156   } else if (g_timer_ht[i] != nullptr) {
157     grpc_timer* p = g_timer_ht[i];
158     while (p->hash_table_next != nullptr && p->hash_table_next != t) {
159       p = p->hash_table_next;
160     }
161 
162     if (p->hash_table_next == t) {
163       p->hash_table_next = t->hash_table_next;
164       removed = true;
165     }
166   }
167   gpr_mu_unlock(&g_hash_mu[i]);
168 
169   if (!removed) {
170     grpc_closure* c = t->closure;
171     grpc_core::Crash(absl::StrFormat(
172         "** Removing timer (%p) that is not added to hash table. Closure "
173         "(%p), created at: (%s:%d), scheduled at: (%s:%d) **",
174         t, c, c->file_created, c->line_created, c->file_initiated,
175         c->line_initiated));
176   }
177 
178   t->hash_table_next = nullptr;
179 }
180 
181 // If a timer is added to a timer shard (either heap or a list), it must
182 // be pending. A timer is added to hash table only-if it is added to the
183 // timer shard.
184 // Therefore, if timer->pending is false, it cannot be in hash table
validate_non_pending_timer(grpc_timer * t)185 static void validate_non_pending_timer(grpc_timer* t) {
186   if (!t->pending && is_in_ht(t)) {
187     grpc_closure* c = t->closure;
188     grpc_core::Crash(absl::StrFormat(
189         "** gpr_timer_cancel() called on a non-pending timer (%p) which "
190         "is in the hash table. Closure: (%p), created at: (%s:%d), "
191         "scheduled at: (%s:%d) **",
192         t, c, c->file_created, c->line_created, c->file_initiated,
193         c->line_initiated));
194   }
195 }
196 
197 #define INIT_TIMER_HASH_TABLE() init_timer_ht()
198 #define DESTROY_TIMER_HASH_TABLE() destroy_timer_ht()
199 #define ADD_TO_HASH_TABLE(t) add_to_ht((t))
200 #define REMOVE_FROM_HASH_TABLE(t) remove_from_ht((t))
201 #define VALIDATE_NON_PENDING_TIMER(t) validate_non_pending_timer((t))
202 
203 #else
204 
205 #define INIT_TIMER_HASH_TABLE()
206 #define DESTROY_TIMER_HASH_TABLE()
207 #define ADD_TO_HASH_TABLE(t)
208 #define REMOVE_FROM_HASH_TABLE(t)
209 #define VALIDATE_NON_PENDING_TIMER(t)
210 
211 #endif
212 
213 // Thread local variable that stores the deadline of the next timer the thread
214 // has last-seen. This is an optimization to prevent the thread from checking
215 // shared_mutables.min_timer (which requires acquiring shared_mutables.mu lock,
216 // an expensive operation)
217 static thread_local int64_t g_last_seen_min_timer;
218 
219 struct shared_mutables {
220   // The deadline of the next timer due across all timer shards
221   grpc_core::Timestamp min_timer;
222   // Allow only one run_some_expired_timers at once
223   gpr_spinlock checker_mu;
224   bool initialized;
225   // Protects g_shard_queue (and the shared_mutables struct itself)
226   gpr_mu mu;
227 } GPR_ALIGN_STRUCT(GPR_CACHELINE_SIZE);
228 
229 static struct shared_mutables g_shared_mutables;
230 
231 static grpc_timer_check_result run_some_expired_timers(
232     grpc_core::Timestamp now, grpc_core::Timestamp* next,
233     grpc_error_handle error);
234 
compute_min_deadline(timer_shard * shard)235 static grpc_core::Timestamp compute_min_deadline(timer_shard* shard) {
236   return grpc_timer_heap_is_empty(&shard->heap)
237              ? shard->queue_deadline_cap + grpc_core::Duration::Epsilon()
238              : grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(
239                    grpc_timer_heap_top(&shard->heap)->deadline);
240 }
241 
timer_list_init()242 static void timer_list_init() {
243   uint32_t i;
244 
245   g_num_shards = grpc_core::Clamp(2 * gpr_cpu_num_cores(), 1u, 32u);
246   g_shards =
247       static_cast<timer_shard*>(gpr_zalloc(g_num_shards * sizeof(*g_shards)));
248   g_shard_queue = static_cast<timer_shard**>(
249       gpr_zalloc(g_num_shards * sizeof(*g_shard_queue)));
250 
251   g_shared_mutables.initialized = true;
252   g_shared_mutables.checker_mu = GPR_SPINLOCK_INITIALIZER;
253   gpr_mu_init(&g_shared_mutables.mu);
254   g_shared_mutables.min_timer = grpc_core::Timestamp::Now();
255 
256   g_last_seen_min_timer = 0;
257 
258   for (i = 0; i < g_num_shards; i++) {
259     timer_shard* shard = &g_shards[i];
260     gpr_mu_init(&shard->mu);
261     shard->stats.Init(1.0 / ADD_DEADLINE_SCALE, 0.1, 0.5);
262     shard->queue_deadline_cap = g_shared_mutables.min_timer;
263     shard->shard_queue_index = i;
264     grpc_timer_heap_init(&shard->heap);
265     shard->list.next = shard->list.prev = &shard->list;
266     shard->min_deadline = compute_min_deadline(shard);
267     g_shard_queue[i] = shard;
268   }
269 
270   INIT_TIMER_HASH_TABLE();
271 }
272 
timer_list_shutdown()273 static void timer_list_shutdown() {
274   size_t i;
275   run_some_expired_timers(grpc_core::Timestamp::InfFuture(), nullptr,
276                           GRPC_ERROR_CREATE("Timer list shutdown"));
277   for (i = 0; i < g_num_shards; i++) {
278     timer_shard* shard = &g_shards[i];
279     gpr_mu_destroy(&shard->mu);
280     grpc_timer_heap_destroy(&shard->heap);
281   }
282   gpr_mu_destroy(&g_shared_mutables.mu);
283   gpr_free(g_shards);
284   gpr_free(g_shard_queue);
285   g_shared_mutables.initialized = false;
286 
287   DESTROY_TIMER_HASH_TABLE();
288 }
289 
290 // returns true if the first element in the list
list_join(grpc_timer * head,grpc_timer * timer)291 static void list_join(grpc_timer* head, grpc_timer* timer) {
292   timer->next = head;
293   timer->prev = head->prev;
294   timer->next->prev = timer->prev->next = timer;
295 }
296 
list_remove(grpc_timer * timer)297 static void list_remove(grpc_timer* timer) {
298   timer->next->prev = timer->prev;
299   timer->prev->next = timer->next;
300 }
301 
swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index)302 static void swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index) {
303   timer_shard* temp;
304   temp = g_shard_queue[first_shard_queue_index];
305   g_shard_queue[first_shard_queue_index] =
306       g_shard_queue[first_shard_queue_index + 1];
307   g_shard_queue[first_shard_queue_index + 1] = temp;
308   g_shard_queue[first_shard_queue_index]->shard_queue_index =
309       first_shard_queue_index;
310   g_shard_queue[first_shard_queue_index + 1]->shard_queue_index =
311       first_shard_queue_index + 1;
312 }
313 
note_deadline_change(timer_shard * shard)314 static void note_deadline_change(timer_shard* shard) {
315   while (shard->shard_queue_index > 0 &&
316          shard->min_deadline <
317              g_shard_queue[shard->shard_queue_index - 1]->min_deadline) {
318     swap_adjacent_shards_in_queue(shard->shard_queue_index - 1);
319   }
320   while (shard->shard_queue_index < g_num_shards - 1 &&
321          shard->min_deadline >
322              g_shard_queue[shard->shard_queue_index + 1]->min_deadline) {
323     swap_adjacent_shards_in_queue(shard->shard_queue_index);
324   }
325 }
326 
grpc_timer_init_unset(grpc_timer * timer)327 void grpc_timer_init_unset(grpc_timer* timer) { timer->pending = false; }
328 
timer_init(grpc_timer * timer,grpc_core::Timestamp deadline,grpc_closure * closure)329 static void timer_init(grpc_timer* timer, grpc_core::Timestamp deadline,
330                        grpc_closure* closure) {
331   int is_first_timer = 0;
332   timer_shard* shard = &g_shards[grpc_core::HashPointer(timer, g_num_shards)];
333   timer->closure = closure;
334   timer->deadline = deadline.milliseconds_after_process_epoch();
335 
336 #ifndef NDEBUG
337   timer->hash_table_next = nullptr;
338 #endif
339 
340   if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_trace)) {
341     gpr_log(GPR_DEBUG, "TIMER %p: SET %" PRId64 " now %" PRId64 " call %p[%p]",
342             timer, deadline.milliseconds_after_process_epoch(),
343             grpc_core::Timestamp::Now().milliseconds_after_process_epoch(),
344             closure, closure->cb);
345   }
346 
347   if (!g_shared_mutables.initialized) {
348     timer->pending = false;
349     grpc_core::ExecCtx::Run(
350         DEBUG_LOCATION, timer->closure,
351         GRPC_ERROR_CREATE("Attempt to create timer before initialization"));
352     return;
353   }
354 
355   gpr_mu_lock(&shard->mu);
356   timer->pending = true;
357   grpc_core::Timestamp now = grpc_core::Timestamp::Now();
358   if (deadline <= now) {
359     timer->pending = false;
360     grpc_core::ExecCtx::Run(DEBUG_LOCATION, timer->closure, absl::OkStatus());
361     gpr_mu_unlock(&shard->mu);
362     // early out
363     return;
364   }
365 
366   shard->stats->AddSample((deadline - now).millis() / 1000.0);
367 
368   ADD_TO_HASH_TABLE(timer);
369 
370   if (deadline < shard->queue_deadline_cap) {
371     is_first_timer = grpc_timer_heap_add(&shard->heap, timer);
372   } else {
373     timer->heap_index = INVALID_HEAP_INDEX;
374     list_join(&shard->list, timer);
375   }
376   if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_trace)) {
377     gpr_log(GPR_DEBUG,
378             "  .. add to shard %d with queue_deadline_cap=%" PRId64
379             " => is_first_timer=%s",
380             static_cast<int>(shard - g_shards),
381             shard->queue_deadline_cap.milliseconds_after_process_epoch(),
382             is_first_timer ? "true" : "false");
383   }
384   gpr_mu_unlock(&shard->mu);
385 
386   // Deadline may have decreased, we need to adjust the main queue.  Note
387   // that there is a potential racy unlocked region here.  There could be a
388   // reordering of multiple grpc_timer_init calls, at this point, but the < test
389   // below should ensure that we err on the side of caution.  There could
390   // also be a race with grpc_timer_check, which might beat us to the lock.  In
391   // that case, it is possible that the timer that we added will have already
392   // run by the time we hold the lock, but that too is a safe error.
393   // Finally, it's possible that the grpc_timer_check that intervened failed to
394   // trigger the new timer because the min_deadline hadn't yet been reduced.
395   // In that case, the timer will simply have to wait for the next
396   // grpc_timer_check.
397   if (is_first_timer) {
398     gpr_mu_lock(&g_shared_mutables.mu);
399     if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_trace)) {
400       gpr_log(GPR_DEBUG, "  .. old shard min_deadline=%" PRId64,
401               shard->min_deadline.milliseconds_after_process_epoch());
402     }
403     if (deadline < shard->min_deadline) {
404       grpc_core::Timestamp old_min_deadline = g_shard_queue[0]->min_deadline;
405       shard->min_deadline = deadline;
406       note_deadline_change(shard);
407       if (shard->shard_queue_index == 0 && deadline < old_min_deadline) {
408 #if GPR_ARCH_64
409         // TODO(sreek): Using c-style cast here. static_cast<> gives an error
410         // (on mac platforms complaining that gpr_atm* is (long *) while
411         // (&g_shared_mutables.min_timer) is a (long long *). The cast should be
412         // safe since we know that both are pointer types and 64-bit wide.
413         gpr_atm_no_barrier_store((gpr_atm*)(&g_shared_mutables.min_timer),
414                                  deadline.milliseconds_after_process_epoch());
415 #else
416         // On 32-bit systems, gpr_atm_no_barrier_store does not work on 64-bit
417         // types (like grpc_core::Timestamp). So all reads and writes to
418         // g_shared_mutables.min_timer varialbe under g_shared_mutables.mu
419         g_shared_mutables.min_timer = deadline;
420 #endif
421         grpc_kick_poller();
422       }
423     }
424     gpr_mu_unlock(&g_shared_mutables.mu);
425   }
426 }
427 
timer_consume_kick(void)428 static void timer_consume_kick(void) {
429   // Force re-evaluation of last seen min
430   g_last_seen_min_timer = 0;
431 }
432 
timer_cancel(grpc_timer * timer)433 static void timer_cancel(grpc_timer* timer) {
434   if (!g_shared_mutables.initialized) {
435     // must have already been cancelled, also the shard mutex is invalid
436     return;
437   }
438 
439   timer_shard* shard = &g_shards[grpc_core::HashPointer(timer, g_num_shards)];
440   gpr_mu_lock(&shard->mu);
441   if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_trace)) {
442     gpr_log(GPR_DEBUG, "TIMER %p: CANCEL pending=%s", timer,
443             timer->pending ? "true" : "false");
444   }
445 
446   if (timer->pending) {
447     REMOVE_FROM_HASH_TABLE(timer);
448 
449     grpc_core::ExecCtx::Run(DEBUG_LOCATION, timer->closure,
450                             absl::CancelledError());
451     timer->pending = false;
452     if (timer->heap_index == INVALID_HEAP_INDEX) {
453       list_remove(timer);
454     } else {
455       grpc_timer_heap_remove(&shard->heap, timer);
456     }
457   } else {
458     VALIDATE_NON_PENDING_TIMER(timer);
459   }
460   gpr_mu_unlock(&shard->mu);
461 }
462 
463 // Rebalances the timer shard by computing a new 'queue_deadline_cap' and moving
464 // all relevant timers in shard->list (i.e timers with deadlines earlier than
465 // 'queue_deadline_cap') into into shard->heap.
466 // Returns 'true' if shard->heap has at least ONE element
467 // REQUIRES: shard->mu locked
refill_heap(timer_shard * shard,grpc_core::Timestamp now)468 static bool refill_heap(timer_shard* shard, grpc_core::Timestamp now) {
469   // Compute the new queue window width and bound by the limits:
470   double computed_deadline_delta =
471       shard->stats->UpdateAverage() * ADD_DEADLINE_SCALE;
472   double deadline_delta =
473       grpc_core::Clamp(computed_deadline_delta, MIN_QUEUE_WINDOW_DURATION,
474                        MAX_QUEUE_WINDOW_DURATION);
475   grpc_timer *timer, *next;
476 
477   // Compute the new cap and put all timers under it into the queue:
478   shard->queue_deadline_cap =
479       std::max(now, shard->queue_deadline_cap) +
480       grpc_core::Duration::FromSecondsAsDouble(deadline_delta);
481 
482   if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
483     gpr_log(GPR_DEBUG, "  .. shard[%d]->queue_deadline_cap --> %" PRId64,
484             static_cast<int>(shard - g_shards),
485             shard->queue_deadline_cap.milliseconds_after_process_epoch());
486   }
487   for (timer = shard->list.next; timer != &shard->list; timer = next) {
488     next = timer->next;
489     auto timer_deadline =
490         grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(
491             timer->deadline);
492 
493     if (timer_deadline < shard->queue_deadline_cap) {
494       if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
495         gpr_log(GPR_DEBUG, "  .. add timer with deadline %" PRId64 " to heap",
496                 timer_deadline.milliseconds_after_process_epoch());
497       }
498       list_remove(timer);
499       grpc_timer_heap_add(&shard->heap, timer);
500     }
501   }
502   return !grpc_timer_heap_is_empty(&shard->heap);
503 }
504 
505 // This pops the next non-cancelled timer with deadline <= now from the
506 // queue, or returns NULL if there isn't one.
507 // REQUIRES: shard->mu locked
pop_one(timer_shard * shard,grpc_core::Timestamp now)508 static grpc_timer* pop_one(timer_shard* shard, grpc_core::Timestamp now) {
509   grpc_timer* timer;
510   for (;;) {
511     if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
512       gpr_log(GPR_DEBUG, "  .. shard[%d]: heap_empty=%s",
513               static_cast<int>(shard - g_shards),
514               grpc_timer_heap_is_empty(&shard->heap) ? "true" : "false");
515     }
516     if (grpc_timer_heap_is_empty(&shard->heap)) {
517       if (now < shard->queue_deadline_cap) return nullptr;
518       if (!refill_heap(shard, now)) return nullptr;
519     }
520     timer = grpc_timer_heap_top(&shard->heap);
521     auto timer_deadline =
522         grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(
523             timer->deadline);
524     if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
525       gpr_log(GPR_DEBUG,
526               "  .. check top timer deadline=%" PRId64 " now=%" PRId64,
527               timer_deadline.milliseconds_after_process_epoch(),
528               now.milliseconds_after_process_epoch());
529     }
530     if (timer_deadline > now) return nullptr;
531     if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_trace)) {
532       gpr_log(GPR_DEBUG, "TIMER %p: FIRE %" PRId64 "ms late", timer,
533               (now - timer_deadline).millis());
534     }
535     timer->pending = false;
536     grpc_timer_heap_pop(&shard->heap);
537     return timer;
538   }
539 }
540 
541 // REQUIRES: shard->mu unlocked
pop_timers(timer_shard * shard,grpc_core::Timestamp now,grpc_core::Timestamp * new_min_deadline,grpc_error_handle error)542 static size_t pop_timers(timer_shard* shard, grpc_core::Timestamp now,
543                          grpc_core::Timestamp* new_min_deadline,
544                          grpc_error_handle error) {
545   size_t n = 0;
546   grpc_timer* timer;
547   gpr_mu_lock(&shard->mu);
548   while ((timer = pop_one(shard, now))) {
549     REMOVE_FROM_HASH_TABLE(timer);
550     grpc_core::ExecCtx::Run(DEBUG_LOCATION, timer->closure, error);
551     n++;
552   }
553   *new_min_deadline = compute_min_deadline(shard);
554   gpr_mu_unlock(&shard->mu);
555   if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
556     gpr_log(GPR_DEBUG, "  .. shard[%d] popped %" PRIdPTR,
557             static_cast<int>(shard - g_shards), n);
558   }
559   return n;
560 }
561 
run_some_expired_timers(grpc_core::Timestamp now,grpc_core::Timestamp * next,grpc_error_handle error)562 static grpc_timer_check_result run_some_expired_timers(
563     grpc_core::Timestamp now, grpc_core::Timestamp* next,
564     grpc_error_handle error) {
565   grpc_timer_check_result result = GRPC_TIMERS_NOT_CHECKED;
566 
567 #if GPR_ARCH_64
568   // TODO(sreek): Using c-style cast here. static_cast<> gives an error (on
569   // mac platforms complaining that gpr_atm* is (long *) while
570   // (&g_shared_mutables.min_timer) is a (long long *). The cast should be
571   // safe since we know that both are pointer types and 64-bit wide
572   grpc_core::Timestamp min_timer =
573       grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(
574           gpr_atm_no_barrier_load((gpr_atm*)(&g_shared_mutables.min_timer)));
575 #else
576   // On 32-bit systems, gpr_atm_no_barrier_load does not work on 64-bit types
577   // (like grpc_core::Timestamp). So all reads and writes to
578   // g_shared_mutables.min_timer are done under g_shared_mutables.mu
579   gpr_mu_lock(&g_shared_mutables.mu);
580   grpc_core::Timestamp min_timer = g_shared_mutables.min_timer;
581   gpr_mu_unlock(&g_shared_mutables.mu);
582 #endif
583   g_last_seen_min_timer = min_timer.milliseconds_after_process_epoch();
584 
585   if (now < min_timer) {
586     if (next != nullptr) *next = std::min(*next, min_timer);
587     return GRPC_TIMERS_CHECKED_AND_EMPTY;
588   }
589 
590   if (gpr_spinlock_trylock(&g_shared_mutables.checker_mu)) {
591     gpr_mu_lock(&g_shared_mutables.mu);
592     result = GRPC_TIMERS_CHECKED_AND_EMPTY;
593 
594     if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
595       gpr_log(
596           GPR_DEBUG, "  .. shard[%d]->min_deadline = %" PRId64,
597           static_cast<int>(g_shard_queue[0] - g_shards),
598           g_shard_queue[0]->min_deadline.milliseconds_after_process_epoch());
599     }
600 
601     while (g_shard_queue[0]->min_deadline < now ||
602            (now != grpc_core::Timestamp::InfFuture() &&
603             g_shard_queue[0]->min_deadline == now)) {
604       grpc_core::Timestamp new_min_deadline;
605 
606       // For efficiency, we pop as many available timers as we can from the
607       // shard.  This may violate perfect timer deadline ordering, but that
608       // shouldn't be a big deal because we don't make ordering guarantees.
609       if (pop_timers(g_shard_queue[0], now, &new_min_deadline, error) > 0) {
610         result = GRPC_TIMERS_FIRED;
611       }
612 
613       if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
614         gpr_log(
615             GPR_DEBUG,
616             "  .. result --> %d"
617             ", shard[%d]->min_deadline %" PRId64 " --> %" PRId64
618             ", now=%" PRId64,
619             result, static_cast<int>(g_shard_queue[0] - g_shards),
620             g_shard_queue[0]->min_deadline.milliseconds_after_process_epoch(),
621             new_min_deadline.milliseconds_after_process_epoch(),
622             now.milliseconds_after_process_epoch());
623       }
624 
625       // An grpc_timer_init() on the shard could intervene here, adding a new
626       // timer that is earlier than new_min_deadline.  However,
627       // grpc_timer_init() will block on the mutex before it can call
628       // set_min_deadline, so this one will complete first and then the Addtimer
629       // will reduce the min_deadline (perhaps unnecessarily).
630       g_shard_queue[0]->min_deadline = new_min_deadline;
631       note_deadline_change(g_shard_queue[0]);
632     }
633 
634     if (next) {
635       *next = std::min(*next, g_shard_queue[0]->min_deadline);
636     }
637 
638 #if GPR_ARCH_64
639     // TODO(sreek): Using c-style cast here. static_cast<> gives an error (on
640     // mac platforms complaining that gpr_atm* is (long *) while
641     // (&g_shared_mutables.min_timer) is a (long long *). The cast should be
642     // safe since we know that both are pointer types and 64-bit wide
643     gpr_atm_no_barrier_store(
644         (gpr_atm*)(&g_shared_mutables.min_timer),
645         g_shard_queue[0]->min_deadline.milliseconds_after_process_epoch());
646 #else
647     // On 32-bit systems, gpr_atm_no_barrier_store does not work on 64-bit
648     // types (like grpc_core::Timestamp). So all reads and writes to
649     // g_shared_mutables.min_timer are done under g_shared_mutables.mu
650     g_shared_mutables.min_timer = g_shard_queue[0]->min_deadline;
651 #endif
652     gpr_mu_unlock(&g_shared_mutables.mu);
653     gpr_spinlock_unlock(&g_shared_mutables.checker_mu);
654   }
655 
656   return result;
657 }
658 
timer_check(grpc_core::Timestamp * next)659 static grpc_timer_check_result timer_check(grpc_core::Timestamp* next) {
660   // prelude
661   grpc_core::Timestamp now = grpc_core::Timestamp::Now();
662 
663   // fetch from a thread-local first: this avoids contention on a globally
664   // mutable cacheline in the common case
665   grpc_core::Timestamp min_timer =
666       grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(
667           g_last_seen_min_timer);
668 
669   if (now < min_timer) {
670     if (next != nullptr) {
671       *next = std::min(*next, min_timer);
672     }
673     if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
674       gpr_log(GPR_DEBUG, "TIMER CHECK SKIP: now=%" PRId64 " min_timer=%" PRId64,
675               now.milliseconds_after_process_epoch(),
676               min_timer.milliseconds_after_process_epoch());
677     }
678     return GRPC_TIMERS_CHECKED_AND_EMPTY;
679   }
680 
681   grpc_error_handle shutdown_error =
682       now != grpc_core::Timestamp::InfFuture()
683           ? absl::OkStatus()
684           : GRPC_ERROR_CREATE("Shutting down timer system");
685 
686   // tracing
687   if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
688     std::string next_str;
689     if (next == nullptr) {
690       next_str = "NULL";
691     } else {
692       next_str = absl::StrCat(next->milliseconds_after_process_epoch());
693     }
694 #if GPR_ARCH_64
695     gpr_log(
696         GPR_DEBUG,
697         "TIMER CHECK BEGIN: now=%" PRId64 " next=%s tls_min=%" PRId64
698         " glob_min=%" PRId64,
699         now.milliseconds_after_process_epoch(), next_str.c_str(),
700         min_timer.milliseconds_after_process_epoch(),
701         grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(
702             gpr_atm_no_barrier_load((gpr_atm*)(&g_shared_mutables.min_timer)))
703             .milliseconds_after_process_epoch());
704 #else
705     gpr_log(GPR_DEBUG,
706             "TIMER CHECK BEGIN: now=%" PRId64 " next=%s min=%" PRId64,
707             now.milliseconds_after_process_epoch(), next_str.c_str(),
708             min_timer.milliseconds_after_process_epoch());
709 #endif
710   }
711   // actual code
712   grpc_timer_check_result r =
713       run_some_expired_timers(now, next, shutdown_error);
714   // tracing
715   if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
716     std::string next_str;
717     if (next == nullptr) {
718       next_str = "NULL";
719     } else {
720       next_str = absl::StrCat(next->milliseconds_after_process_epoch());
721     }
722     gpr_log(GPR_DEBUG, "TIMER CHECK END: r=%d; next=%s", r, next_str.c_str());
723   }
724   return r;
725 }
726 
727 grpc_timer_vtable grpc_generic_timer_vtable = {
728     timer_init,      timer_cancel,        timer_check,
729     timer_list_init, timer_list_shutdown, timer_consume_kick};
730