1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_TIMER_H
20 #define GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_TIMER_H
21 
22 #include <grpc/support/port_platform.h>
23 
24 #include <stddef.h>
25 
26 #include <atomic>
27 #include <cstdint>
28 #include <memory>
29 #include <vector>
30 
31 #include "absl/base/thread_annotations.h"
32 #include "absl/types/optional.h"
33 
34 #include <grpc/event_engine/event_engine.h>
35 
36 #include "src/core/lib/event_engine/posix_engine/timer_heap.h"
37 #include "src/core/lib/gprpp/sync.h"
38 #include "src/core/lib/gprpp/time.h"
39 #include "src/core/lib/gprpp/time_averaged_stats.h"
40 
41 namespace grpc_event_engine {
42 namespace experimental {
43 
44 struct Timer {
45   int64_t deadline;
46   // kInvalidHeapIndex if not in heap.
47   size_t heap_index;
48   bool pending;
49   struct Timer* next;
50   struct Timer* prev;
51   experimental::EventEngine::Closure* closure;
52 #ifndef NDEBUG
53   struct Timer* hash_table_next;
54 #endif
55 
56   grpc_event_engine::experimental::EventEngine::TaskHandle task_handle;
57 };
58 
59 // Dependency injection: allow tests and/or TimerManager to inject
60 // their own implementations of Now, Kick.
61 class TimerListHost {
62  public:
63   // Return the current timestamp.
64   // Abstracted so that tests can be run deterministically.
65   virtual grpc_core::Timestamp Now() = 0;
66   // Wake up a thread to check for timers.
67   virtual void Kick() = 0;
68 
69  protected:
70   ~TimerListHost() = default;
71 };
72 
73 class TimerList {
74  public:
75   explicit TimerList(TimerListHost* host);
76 
77   TimerList(const TimerList&) = delete;
78   TimerList& operator=(const TimerList&) = delete;
79 
80   // Initialize *timer. When expired or canceled, closure will be called with
81   // error set to indicate if it expired (absl::OkStatus()) or was canceled
82   //(absl::CancelledError()). *closure is guaranteed to be called exactly once,
83   // and application code should check the error to determine how it was
84   // invoked. The application callback is also responsible for maintaining
85   // information about when to free up any user-level state. Behavior is
86   // undefined for a deadline of grpc_core::Timestamp::InfFuture().
87   void TimerInit(Timer* timer, grpc_core::Timestamp deadline,
88                  experimental::EventEngine::Closure* closure);
89 
90   // Note that there is no timer destroy function. This is because the
91   // timer is a one-time occurrence with a guarantee that the callback will
92   // be called exactly once, either at expiration or cancellation. Thus, all
93   // the internal timer event management state is destroyed just before
94   // that callback is invoked. If the user has additional state associated with
95   // the timer, the user is responsible for determining when it is safe to
96   // destroy that state.
97 
98   // Cancel an *timer.
99   // There are three cases:
100   // 1. We normally cancel the timer
101   // 2. The timer has already run
102   // 3. We can't cancel the timer because it is "in flight".
103 
104   // In all of these cases, the cancellation is still considered successful.
105   // They are essentially distinguished in that the timer_cb will be run
106   // exactly once from either the cancellation (with error
107   // absl::CancelledError()) or from the activation (with error
108   // absl::OkStatus()).
109 
110   // Note carefully that the callback function MAY occur in the same callstack
111   // as grpc_timer_cancel. It's expected that most timers will be cancelled
112   // (their primary use is to implement deadlines), and so this code is
113   // optimized such that cancellation costs as little as possible. Making
114   // callbacks run inline matches this aim.
115 
116   // Requires: cancel() must happen after init() on a given timer
117   bool TimerCancel(Timer* timer) GRPC_MUST_USE_RESULT;
118 
119   // iomgr internal api for dealing with timers
120 
121   // Check for timers to be run, and return them.
122   // Return nullopt if timers could not be checked due to contention with
123   // another thread checking.
124   // Return a vector of closures that *must* be run otherwise.
125   // If next is non-null, TRY to update *next with the next running timer
126   // IF that timer occurs before *next current value.
127   // *next is never guaranteed to be updated on any given execution; however,
128   // with high probability at least one thread in the system will see an update
129   // at any time slice.
130   absl::optional<std::vector<experimental::EventEngine::Closure*>> TimerCheck(
131       grpc_core::Timestamp* next);
132 
133  private:
134   // A "timer shard". Contains a 'heap' and a 'list' of timers. All timers with
135   // deadlines earlier than 'queue_deadline_cap' are maintained in the heap and
136   // others are maintained in the list (unordered). This helps to keep the
137   // number of elements in the heap low.
138   //
139   // The 'queue_deadline_cap' gets recomputed periodically based on the timer
140   // stats maintained in 'stats' and the relevant timers are then moved from the
141   // 'list' to 'heap'.
142   //
143   struct Shard {
144     Shard();
145 
146     grpc_core::Timestamp ComputeMinDeadline() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu);
147     bool RefillHeap(grpc_core::Timestamp now) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu);
148     Timer* PopOne(grpc_core::Timestamp now) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu);
149     void PopTimers(grpc_core::Timestamp now,
150                    grpc_core::Timestamp* new_min_deadline,
151                    std::vector<experimental::EventEngine::Closure*>* out)
152         ABSL_LOCKS_EXCLUDED(mu);
153 
154     grpc_core::Mutex mu;
155     grpc_core::TimeAveragedStats stats ABSL_GUARDED_BY(mu);
156     // All and only timers with deadlines < this will be in the heap.
157     grpc_core::Timestamp queue_deadline_cap ABSL_GUARDED_BY(mu);
158     // The deadline of the next timer due in this shard.
159     grpc_core::Timestamp min_deadline ABSL_GUARDED_BY(&TimerList::mu_);
160     // Index of this timer_shard in the g_shard_queue.
161     uint32_t shard_queue_index ABSL_GUARDED_BY(&TimerList::mu_);
162     // This holds all timers with deadlines < queue_deadline_cap. Timers in this
163     // list have the top bit of their deadline set to 0.
164     TimerHeap heap ABSL_GUARDED_BY(mu);
165     // This holds timers whose deadline is >= queue_deadline_cap.
166     Timer list ABSL_GUARDED_BY(mu);
167   };
168 
169   void SwapAdjacentShardsInQueue(uint32_t first_shard_queue_index)
170       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
171   void NoteDeadlineChange(Shard* shard) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
172   std::vector<experimental::EventEngine::Closure*> FindExpiredTimers(
173       grpc_core::Timestamp now, grpc_core::Timestamp* next);
174 
175   TimerListHost* const host_;
176   const size_t num_shards_;
177   grpc_core::Mutex mu_;
178   // The deadline of the next timer due across all timer shards
179   std::atomic<uint64_t> min_timer_;
180   // Allow only one FindExpiredTimers at once (used as a TryLock, protects no
181   // fields but ensures limits on concurrency)
182   grpc_core::Mutex checker_mu_;
183   // Array of timer shards. Whenever a timer (Timer *) is added, its address
184   // is hashed to select the timer shard to add the timer to
185   const std::unique_ptr<Shard[]> shards_;
186   // Maintains a sorted list of timer shards (sorted by their min_deadline, i.e
187   // the deadline of the next timer in each shard).
188   const std::unique_ptr<Shard*[]> shard_queue_ ABSL_GUARDED_BY(mu_);
189 };
190 
191 }  // namespace experimental
192 }  // namespace grpc_event_engine
193 
194 #endif  // GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_TIMER_H
195