1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/event_engine/posix_engine/timer_manager.h"
22
23 #include <memory>
24 #include <utility>
25
26 #include "absl/time/time.h"
27 #include "absl/types/optional.h"
28
29 #include <grpc/support/log.h>
30 #include <grpc/support/time.h>
31
32 #include "src/core/lib/debug/trace.h"
33 #include "src/core/lib/gprpp/thd.h"
34
35 static thread_local bool g_timer_thread;
36
37 namespace grpc_event_engine {
38 namespace experimental {
39
40 grpc_core::DebugOnlyTraceFlag grpc_event_engine_timer_trace(false, "timer");
41
RunSomeTimers(std::vector<experimental::EventEngine::Closure * > timers)42 void TimerManager::RunSomeTimers(
43 std::vector<experimental::EventEngine::Closure*> timers) {
44 for (auto* timer : timers) {
45 thread_pool_->Run(timer);
46 }
47 }
48
49 // wait until 'next' (or forever if there is already a timed waiter in the pool)
50 // returns true if the thread should continue executing (false if it should
51 // shutdown)
WaitUntil(grpc_core::Timestamp next)52 bool TimerManager::WaitUntil(grpc_core::Timestamp next) {
53 grpc_core::MutexLock lock(&mu_);
54 if (shutdown_) return false;
55 // If kicked_ is true at this point, it means there was a kick from the timer
56 // system that the timer-manager threads here missed. We cannot trust 'next'
57 // here any longer (since there might be an earlier deadline). So if kicked_
58 // is true at this point, we should quickly exit this and get the next
59 // deadline from the timer system
60 if (!kicked_) {
61 cv_wait_.WaitWithTimeout(&mu_,
62 absl::Milliseconds((next - host_.Now()).millis()));
63 ++wakeups_;
64 }
65 kicked_ = false;
66 return true;
67 }
68
MainLoop()69 void TimerManager::MainLoop() {
70 for (;;) {
71 grpc_core::Timestamp next = grpc_core::Timestamp::InfFuture();
72 absl::optional<std::vector<experimental::EventEngine::Closure*>>
73 check_result = timer_list_->TimerCheck(&next);
74 GPR_ASSERT(check_result.has_value() &&
75 "ERROR: More than one MainLoop is running.");
76 if (!check_result->empty()) {
77 RunSomeTimers(std::move(*check_result));
78 continue;
79 }
80 if (!WaitUntil(next)) break;
81 }
82 main_loop_exit_signal_->Notify();
83 }
84
IsTimerManagerThread()85 bool TimerManager::IsTimerManagerThread() { return g_timer_thread; }
86
StartMainLoopThread()87 void TimerManager::StartMainLoopThread() {
88 main_thread_ = grpc_core::Thread(
89 "timer_manager",
90 [](void* arg) {
91 auto self = static_cast<TimerManager*>(arg);
92 self->MainLoop();
93 },
94 this, nullptr,
95 grpc_core::Thread::Options().set_tracked(false).set_joinable(false));
96 main_thread_.Start();
97 }
98
TimerManager(std::shared_ptr<grpc_event_engine::experimental::ThreadPool> thread_pool)99 TimerManager::TimerManager(
100 std::shared_ptr<grpc_event_engine::experimental::ThreadPool> thread_pool)
101 : host_(this), thread_pool_(std::move(thread_pool)) {
102 timer_list_ = std::make_unique<TimerList>(&host_);
103 main_loop_exit_signal_.emplace();
104 StartMainLoopThread();
105 }
106
Now()107 grpc_core::Timestamp TimerManager::Host::Now() {
108 return grpc_core::Timestamp::FromTimespecRoundDown(
109 gpr_now(GPR_CLOCK_MONOTONIC));
110 }
111
TimerInit(Timer * timer,grpc_core::Timestamp deadline,experimental::EventEngine::Closure * closure)112 void TimerManager::TimerInit(Timer* timer, grpc_core::Timestamp deadline,
113 experimental::EventEngine::Closure* closure) {
114 if (grpc_event_engine_timer_trace.enabled()) {
115 grpc_core::MutexLock lock(&mu_);
116 if (shutdown_) {
117 gpr_log(GPR_ERROR,
118 "WARNING: TimerManager::%p: scheduling Closure::%p after "
119 "TimerManager has been shut down.",
120 this, closure);
121 }
122 }
123 timer_list_->TimerInit(timer, deadline, closure);
124 }
125
TimerCancel(Timer * timer)126 bool TimerManager::TimerCancel(Timer* timer) {
127 return timer_list_->TimerCancel(timer);
128 }
129
Shutdown()130 void TimerManager::Shutdown() {
131 {
132 grpc_core::MutexLock lock(&mu_);
133 if (shutdown_) return;
134 if (grpc_event_engine_timer_trace.enabled()) {
135 gpr_log(GPR_DEBUG, "TimerManager::%p shutting down", this);
136 }
137 shutdown_ = true;
138 // Wait on the main loop to exit.
139 cv_wait_.Signal();
140 }
141 main_loop_exit_signal_->WaitForNotification();
142 if (grpc_event_engine_timer_trace.enabled()) {
143 gpr_log(GPR_DEBUG, "TimerManager::%p shutdown complete", this);
144 }
145 }
146
~TimerManager()147 TimerManager::~TimerManager() { Shutdown(); }
148
Kick()149 void TimerManager::Host::Kick() { timer_manager_->Kick(); }
150
Kick()151 void TimerManager::Kick() {
152 grpc_core::MutexLock lock(&mu_);
153 kicked_ = true;
154 cv_wait_.Signal();
155 }
156
RestartPostFork()157 void TimerManager::RestartPostFork() {
158 grpc_core::MutexLock lock(&mu_);
159 GPR_ASSERT(GPR_LIKELY(shutdown_));
160 if (grpc_event_engine_timer_trace.enabled()) {
161 gpr_log(GPR_DEBUG, "TimerManager::%p restarting after shutdown", this);
162 }
163 shutdown_ = false;
164 main_loop_exit_signal_.emplace();
165 StartMainLoopThread();
166 }
167
PrepareFork()168 void TimerManager::PrepareFork() { Shutdown(); }
PostforkParent()169 void TimerManager::PostforkParent() { RestartPostFork(); }
PostforkChild()170 void TimerManager::PostforkChild() { RestartPostFork(); }
171
172 } // namespace experimental
173 } // namespace grpc_event_engine
174