1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/event_engine/thread_pool/original_thread_pool.h"
22
23 #include <atomic>
24 #include <memory>
25 #include <utility>
26
27 #include "absl/base/attributes.h"
28 #include "absl/time/clock.h"
29 #include "absl/time/time.h"
30
31 #include <grpc/support/log.h>
32
33 #include "src/core/lib/event_engine/thread_local.h"
34 #include "src/core/lib/gprpp/thd.h"
35 #include "src/core/lib/gprpp/time.h"
36
37 namespace grpc_event_engine {
38 namespace experimental {
39
StartThread(StatePtr state,StartThreadReason reason)40 void OriginalThreadPool::StartThread(StatePtr state, StartThreadReason reason) {
41 state->thread_count.Add();
42 const auto now = grpc_core::Timestamp::Now();
43 switch (reason) {
44 case StartThreadReason::kNoWaitersWhenScheduling: {
45 auto time_since_last_start =
46 now - grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(
47 state->last_started_thread.load(std::memory_order_relaxed));
48 if (time_since_last_start < grpc_core::Duration::Seconds(1)) {
49 state->thread_count.Remove();
50 return;
51 }
52 }
53 ABSL_FALLTHROUGH_INTENDED;
54 case StartThreadReason::kNoWaitersWhenFinishedStarting:
55 if (state->currently_starting_one_thread.exchange(
56 true, std::memory_order_relaxed)) {
57 state->thread_count.Remove();
58 return;
59 }
60 state->last_started_thread.store(now.milliseconds_after_process_epoch(),
61 std::memory_order_relaxed);
62 break;
63 case StartThreadReason::kInitialPool:
64 break;
65 }
66 struct ThreadArg {
67 StatePtr state;
68 StartThreadReason reason;
69 };
70 grpc_core::Thread(
71 "event_engine",
72 [](void* arg) {
73 std::unique_ptr<ThreadArg> a(static_cast<ThreadArg*>(arg));
74 ThreadLocal::SetIsEventEngineThread(true);
75 switch (a->reason) {
76 case StartThreadReason::kInitialPool:
77 break;
78 case StartThreadReason::kNoWaitersWhenFinishedStarting:
79 a->state->queue.SleepIfRunning();
80 ABSL_FALLTHROUGH_INTENDED;
81 case StartThreadReason::kNoWaitersWhenScheduling:
82 // Release throttling variable
83 GPR_ASSERT(a->state->currently_starting_one_thread.exchange(
84 false, std::memory_order_relaxed));
85 if (a->state->queue.IsBacklogged()) {
86 StartThread(a->state,
87 StartThreadReason::kNoWaitersWhenFinishedStarting);
88 }
89 break;
90 }
91 ThreadFunc(a->state);
92 },
93 new ThreadArg{state, reason}, nullptr,
94 grpc_core::Thread::Options().set_tracked(false).set_joinable(false))
95 .Start();
96 }
97
ThreadFunc(StatePtr state)98 void OriginalThreadPool::ThreadFunc(StatePtr state) {
99 while (state->queue.Step()) {
100 }
101 state->thread_count.Remove();
102 }
103
Step()104 bool OriginalThreadPool::Queue::Step() {
105 grpc_core::ReleasableMutexLock lock(&queue_mu_);
106 // Wait until work is available or we are shutting down.
107 while (!shutdown_ && !forking_ && callbacks_.empty()) {
108 // If there are too many threads waiting, then quit this thread.
109 // TODO(ctiller): wait some time in this case to be sure.
110 if (threads_waiting_ >= reserve_threads_) {
111 threads_waiting_++;
112 bool timeout = cv_.WaitWithTimeout(&queue_mu_, absl::Seconds(30));
113 threads_waiting_--;
114 if (timeout && threads_waiting_ >= reserve_threads_) {
115 return false;
116 }
117 } else {
118 threads_waiting_++;
119 cv_.Wait(&queue_mu_);
120 threads_waiting_--;
121 }
122 }
123 if (forking_) return false;
124 if (shutdown_ && callbacks_.empty()) return false;
125 GPR_ASSERT(!callbacks_.empty());
126 auto callback = std::move(callbacks_.front());
127 callbacks_.pop();
128 lock.Release();
129 callback();
130 return true;
131 }
132
OriginalThreadPool(size_t reserve_threads)133 OriginalThreadPool::OriginalThreadPool(size_t reserve_threads)
134 : reserve_threads_(reserve_threads),
135 state_(std::make_shared<State>(reserve_threads)) {
136 for (unsigned i = 0; i < reserve_threads; i++) {
137 StartThread(state_, StartThreadReason::kInitialPool);
138 }
139 }
140
IsThreadPoolThread()141 bool OriginalThreadPool::IsThreadPoolThread() {
142 return ThreadLocal::IsEventEngineThread();
143 }
144
Quiesce()145 void OriginalThreadPool::Quiesce() {
146 state_->queue.SetShutdown(true);
147 // Wait until all threads are exited.
148 // Note that if this is a threadpool thread then we won't exit this thread
149 // until the callstack unwinds a little, so we need to wait for just one
150 // thread running instead of zero.
151 state_->thread_count.BlockUntilThreadCount(
152 ThreadLocal::IsEventEngineThread() ? 1 : 0, "shutting down");
153 quiesced_.store(true, std::memory_order_relaxed);
154 }
155
~OriginalThreadPool()156 OriginalThreadPool::~OriginalThreadPool() {
157 GPR_ASSERT(quiesced_.load(std::memory_order_relaxed));
158 }
159
Run(absl::AnyInvocable<void ()> callback)160 void OriginalThreadPool::Run(absl::AnyInvocable<void()> callback) {
161 GPR_DEBUG_ASSERT(quiesced_.load(std::memory_order_relaxed) == false);
162 if (state_->queue.Add(std::move(callback))) {
163 StartThread(state_, StartThreadReason::kNoWaitersWhenScheduling);
164 }
165 }
166
Run(EventEngine::Closure * closure)167 void OriginalThreadPool::Run(EventEngine::Closure* closure) {
168 Run([closure]() { closure->Run(); });
169 }
170
Add(absl::AnyInvocable<void ()> callback)171 bool OriginalThreadPool::Queue::Add(absl::AnyInvocable<void()> callback) {
172 grpc_core::MutexLock lock(&queue_mu_);
173 // Add works to the callbacks list
174 callbacks_.push(std::move(callback));
175 cv_.Signal();
176 if (forking_) return false;
177 return callbacks_.size() > threads_waiting_;
178 }
179
IsBacklogged()180 bool OriginalThreadPool::Queue::IsBacklogged() {
181 grpc_core::MutexLock lock(&queue_mu_);
182 if (forking_) return false;
183 return callbacks_.size() > 1;
184 }
185
SleepIfRunning()186 void OriginalThreadPool::Queue::SleepIfRunning() {
187 grpc_core::MutexLock lock(&queue_mu_);
188 auto end = grpc_core::Duration::Seconds(1) + grpc_core::Timestamp::Now();
189 while (true) {
190 grpc_core::Timestamp now = grpc_core::Timestamp::Now();
191 if (now >= end || forking_) return;
192 cv_.WaitWithTimeout(&queue_mu_, absl::Milliseconds((end - now).millis()));
193 }
194 }
195
SetShutdown(bool is_shutdown)196 void OriginalThreadPool::Queue::SetShutdown(bool is_shutdown) {
197 grpc_core::MutexLock lock(&queue_mu_);
198 auto was_shutdown = std::exchange(shutdown_, is_shutdown);
199 GPR_ASSERT(is_shutdown != was_shutdown);
200 cv_.SignalAll();
201 }
202
SetForking(bool is_forking)203 void OriginalThreadPool::Queue::SetForking(bool is_forking) {
204 grpc_core::MutexLock lock(&queue_mu_);
205 auto was_forking = std::exchange(forking_, is_forking);
206 GPR_ASSERT(is_forking != was_forking);
207 cv_.SignalAll();
208 }
209
Add()210 void OriginalThreadPool::ThreadCount::Add() {
211 grpc_core::MutexLock lock(&thread_count_mu_);
212 ++threads_;
213 }
214
Remove()215 void OriginalThreadPool::ThreadCount::Remove() {
216 grpc_core::MutexLock lock(&thread_count_mu_);
217 --threads_;
218 cv_.Signal();
219 }
220
BlockUntilThreadCount(int threads,const char * why)221 void OriginalThreadPool::ThreadCount::BlockUntilThreadCount(int threads,
222 const char* why) {
223 grpc_core::MutexLock lock(&thread_count_mu_);
224 auto last_log = absl::Now();
225 while (threads_ > threads) {
226 // Wait for all threads to exit.
227 // At least once every three seconds (but no faster than once per second in
228 // the event of spurious wakeups) log a message indicating we're waiting to
229 // fork.
230 cv_.WaitWithTimeout(&thread_count_mu_, absl::Seconds(3));
231 if (threads_ > threads && absl::Now() - last_log > absl::Seconds(1)) {
232 gpr_log(GPR_ERROR, "Waiting for thread pool to idle before %s (%d to %d)",
233 why, threads_, threads);
234 last_log = absl::Now();
235 }
236 }
237 }
238
PrepareFork()239 void OriginalThreadPool::PrepareFork() {
240 state_->queue.SetForking(true);
241 state_->thread_count.BlockUntilThreadCount(0, "forking");
242 }
243
PostforkParent()244 void OriginalThreadPool::PostforkParent() { Postfork(); }
245
PostforkChild()246 void OriginalThreadPool::PostforkChild() { Postfork(); }
247
Postfork()248 void OriginalThreadPool::Postfork() {
249 state_->queue.SetForking(false);
250 for (unsigned i = 0; i < reserve_threads_; i++) {
251 StartThread(state_, StartThreadReason::kInitialPool);
252 }
253 }
254
255 } // namespace experimental
256 } // namespace grpc_event_engine
257