1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/event_engine/posix_engine/timer_manager.h"
22 
23 #include <memory>
24 #include <utility>
25 
26 #include "absl/time/time.h"
27 #include "absl/types/optional.h"
28 
29 #include <grpc/support/log.h>
30 #include <grpc/support/time.h>
31 
32 #include "src/core/lib/debug/trace.h"
33 #include "src/core/lib/gprpp/thd.h"
34 
35 static thread_local bool g_timer_thread;
36 
37 namespace grpc_event_engine {
38 namespace experimental {
39 
40 grpc_core::DebugOnlyTraceFlag grpc_event_engine_timer_trace(false, "timer");
41 
RunSomeTimers(std::vector<experimental::EventEngine::Closure * > timers)42 void TimerManager::RunSomeTimers(
43     std::vector<experimental::EventEngine::Closure*> timers) {
44   for (auto* timer : timers) {
45     thread_pool_->Run(timer);
46   }
47 }
48 
49 // wait until 'next' (or forever if there is already a timed waiter in the pool)
50 // returns true if the thread should continue executing (false if it should
51 // shutdown)
WaitUntil(grpc_core::Timestamp next)52 bool TimerManager::WaitUntil(grpc_core::Timestamp next) {
53   grpc_core::MutexLock lock(&mu_);
54   if (shutdown_) return false;
55   // If kicked_ is true at this point, it means there was a kick from the timer
56   // system that the timer-manager threads here missed. We cannot trust 'next'
57   // here any longer (since there might be an earlier deadline). So if kicked_
58   // is true at this point, we should quickly exit this and get the next
59   // deadline from the timer system
60   if (!kicked_) {
61     cv_wait_.WaitWithTimeout(&mu_,
62                              absl::Milliseconds((next - host_.Now()).millis()));
63     ++wakeups_;
64   }
65   kicked_ = false;
66   return true;
67 }
68 
MainLoop()69 void TimerManager::MainLoop() {
70   for (;;) {
71     grpc_core::Timestamp next = grpc_core::Timestamp::InfFuture();
72     absl::optional<std::vector<experimental::EventEngine::Closure*>>
73         check_result = timer_list_->TimerCheck(&next);
74     GPR_ASSERT(check_result.has_value() &&
75                "ERROR: More than one MainLoop is running.");
76     if (!check_result->empty()) {
77       RunSomeTimers(std::move(*check_result));
78       continue;
79     }
80     if (!WaitUntil(next)) break;
81   }
82   main_loop_exit_signal_->Notify();
83 }
84 
IsTimerManagerThread()85 bool TimerManager::IsTimerManagerThread() { return g_timer_thread; }
86 
StartMainLoopThread()87 void TimerManager::StartMainLoopThread() {
88   main_thread_ = grpc_core::Thread(
89       "timer_manager",
90       [](void* arg) {
91         auto self = static_cast<TimerManager*>(arg);
92         self->MainLoop();
93       },
94       this, nullptr,
95       grpc_core::Thread::Options().set_tracked(false).set_joinable(false));
96   main_thread_.Start();
97 }
98 
TimerManager(std::shared_ptr<grpc_event_engine::experimental::ThreadPool> thread_pool)99 TimerManager::TimerManager(
100     std::shared_ptr<grpc_event_engine::experimental::ThreadPool> thread_pool)
101     : host_(this), thread_pool_(std::move(thread_pool)) {
102   timer_list_ = std::make_unique<TimerList>(&host_);
103   main_loop_exit_signal_.emplace();
104   StartMainLoopThread();
105 }
106 
Now()107 grpc_core::Timestamp TimerManager::Host::Now() {
108   return grpc_core::Timestamp::FromTimespecRoundDown(
109       gpr_now(GPR_CLOCK_MONOTONIC));
110 }
111 
TimerInit(Timer * timer,grpc_core::Timestamp deadline,experimental::EventEngine::Closure * closure)112 void TimerManager::TimerInit(Timer* timer, grpc_core::Timestamp deadline,
113                              experimental::EventEngine::Closure* closure) {
114   if (grpc_event_engine_timer_trace.enabled()) {
115     grpc_core::MutexLock lock(&mu_);
116     if (shutdown_) {
117       gpr_log(GPR_ERROR,
118               "WARNING: TimerManager::%p: scheduling Closure::%p after "
119               "TimerManager has been shut down.",
120               this, closure);
121     }
122   }
123   timer_list_->TimerInit(timer, deadline, closure);
124 }
125 
TimerCancel(Timer * timer)126 bool TimerManager::TimerCancel(Timer* timer) {
127   return timer_list_->TimerCancel(timer);
128 }
129 
Shutdown()130 void TimerManager::Shutdown() {
131   {
132     grpc_core::MutexLock lock(&mu_);
133     if (shutdown_) return;
134     if (grpc_event_engine_timer_trace.enabled()) {
135       gpr_log(GPR_DEBUG, "TimerManager::%p shutting down", this);
136     }
137     shutdown_ = true;
138     // Wait on the main loop to exit.
139     cv_wait_.Signal();
140   }
141   main_loop_exit_signal_->WaitForNotification();
142   if (grpc_event_engine_timer_trace.enabled()) {
143     gpr_log(GPR_DEBUG, "TimerManager::%p shutdown complete", this);
144   }
145 }
146 
~TimerManager()147 TimerManager::~TimerManager() { Shutdown(); }
148 
Kick()149 void TimerManager::Host::Kick() { timer_manager_->Kick(); }
150 
Kick()151 void TimerManager::Kick() {
152   grpc_core::MutexLock lock(&mu_);
153   kicked_ = true;
154   cv_wait_.Signal();
155 }
156 
RestartPostFork()157 void TimerManager::RestartPostFork() {
158   grpc_core::MutexLock lock(&mu_);
159   GPR_ASSERT(GPR_LIKELY(shutdown_));
160   if (grpc_event_engine_timer_trace.enabled()) {
161     gpr_log(GPR_DEBUG, "TimerManager::%p restarting after shutdown", this);
162   }
163   shutdown_ = false;
164   main_loop_exit_signal_.emplace();
165   StartMainLoopThread();
166 }
167 
PrepareFork()168 void TimerManager::PrepareFork() { Shutdown(); }
PostforkParent()169 void TimerManager::PostforkParent() { RestartPostFork(); }
PostforkChild()170 void TimerManager::PostforkChild() { RestartPostFork(); }
171 
172 }  // namespace experimental
173 }  // namespace grpc_event_engine
174