1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/event_engine/thread_pool/original_thread_pool.h"
22 
23 #include <atomic>
24 #include <memory>
25 #include <utility>
26 
27 #include "absl/base/attributes.h"
28 #include "absl/time/clock.h"
29 #include "absl/time/time.h"
30 
31 #include <grpc/support/log.h>
32 
33 #include "src/core/lib/event_engine/thread_local.h"
34 #include "src/core/lib/gprpp/thd.h"
35 #include "src/core/lib/gprpp/time.h"
36 
37 namespace grpc_event_engine {
38 namespace experimental {
39 
StartThread(StatePtr state,StartThreadReason reason)40 void OriginalThreadPool::StartThread(StatePtr state, StartThreadReason reason) {
41   state->thread_count.Add();
42   const auto now = grpc_core::Timestamp::Now();
43   switch (reason) {
44     case StartThreadReason::kNoWaitersWhenScheduling: {
45       auto time_since_last_start =
46           now - grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(
47                     state->last_started_thread.load(std::memory_order_relaxed));
48       if (time_since_last_start < grpc_core::Duration::Seconds(1)) {
49         state->thread_count.Remove();
50         return;
51       }
52     }
53       ABSL_FALLTHROUGH_INTENDED;
54     case StartThreadReason::kNoWaitersWhenFinishedStarting:
55       if (state->currently_starting_one_thread.exchange(
56               true, std::memory_order_relaxed)) {
57         state->thread_count.Remove();
58         return;
59       }
60       state->last_started_thread.store(now.milliseconds_after_process_epoch(),
61                                        std::memory_order_relaxed);
62       break;
63     case StartThreadReason::kInitialPool:
64       break;
65   }
66   struct ThreadArg {
67     StatePtr state;
68     StartThreadReason reason;
69   };
70   grpc_core::Thread(
71       "event_engine",
72       [](void* arg) {
73         std::unique_ptr<ThreadArg> a(static_cast<ThreadArg*>(arg));
74         ThreadLocal::SetIsEventEngineThread(true);
75         switch (a->reason) {
76           case StartThreadReason::kInitialPool:
77             break;
78           case StartThreadReason::kNoWaitersWhenFinishedStarting:
79             a->state->queue.SleepIfRunning();
80             ABSL_FALLTHROUGH_INTENDED;
81           case StartThreadReason::kNoWaitersWhenScheduling:
82             // Release throttling variable
83             GPR_ASSERT(a->state->currently_starting_one_thread.exchange(
84                 false, std::memory_order_relaxed));
85             if (a->state->queue.IsBacklogged()) {
86               StartThread(a->state,
87                           StartThreadReason::kNoWaitersWhenFinishedStarting);
88             }
89             break;
90         }
91         ThreadFunc(a->state);
92       },
93       new ThreadArg{state, reason}, nullptr,
94       grpc_core::Thread::Options().set_tracked(false).set_joinable(false))
95       .Start();
96 }
97 
ThreadFunc(StatePtr state)98 void OriginalThreadPool::ThreadFunc(StatePtr state) {
99   while (state->queue.Step()) {
100   }
101   state->thread_count.Remove();
102 }
103 
Step()104 bool OriginalThreadPool::Queue::Step() {
105   grpc_core::ReleasableMutexLock lock(&queue_mu_);
106   // Wait until work is available or we are shutting down.
107   while (!shutdown_ && !forking_ && callbacks_.empty()) {
108     // If there are too many threads waiting, then quit this thread.
109     // TODO(ctiller): wait some time in this case to be sure.
110     if (threads_waiting_ >= reserve_threads_) {
111       threads_waiting_++;
112       bool timeout = cv_.WaitWithTimeout(&queue_mu_, absl::Seconds(30));
113       threads_waiting_--;
114       if (timeout && threads_waiting_ >= reserve_threads_) {
115         return false;
116       }
117     } else {
118       threads_waiting_++;
119       cv_.Wait(&queue_mu_);
120       threads_waiting_--;
121     }
122   }
123   if (forking_) return false;
124   if (shutdown_ && callbacks_.empty()) return false;
125   GPR_ASSERT(!callbacks_.empty());
126   auto callback = std::move(callbacks_.front());
127   callbacks_.pop();
128   lock.Release();
129   callback();
130   return true;
131 }
132 
OriginalThreadPool(size_t reserve_threads)133 OriginalThreadPool::OriginalThreadPool(size_t reserve_threads)
134     : reserve_threads_(reserve_threads),
135       state_(std::make_shared<State>(reserve_threads)) {
136   for (unsigned i = 0; i < reserve_threads; i++) {
137     StartThread(state_, StartThreadReason::kInitialPool);
138   }
139 }
140 
IsThreadPoolThread()141 bool OriginalThreadPool::IsThreadPoolThread() {
142   return ThreadLocal::IsEventEngineThread();
143 }
144 
Quiesce()145 void OriginalThreadPool::Quiesce() {
146   state_->queue.SetShutdown(true);
147   // Wait until all threads are exited.
148   // Note that if this is a threadpool thread then we won't exit this thread
149   // until the callstack unwinds a little, so we need to wait for just one
150   // thread running instead of zero.
151   state_->thread_count.BlockUntilThreadCount(
152       ThreadLocal::IsEventEngineThread() ? 1 : 0, "shutting down");
153   quiesced_.store(true, std::memory_order_relaxed);
154 }
155 
~OriginalThreadPool()156 OriginalThreadPool::~OriginalThreadPool() {
157   GPR_ASSERT(quiesced_.load(std::memory_order_relaxed));
158 }
159 
Run(absl::AnyInvocable<void ()> callback)160 void OriginalThreadPool::Run(absl::AnyInvocable<void()> callback) {
161   GPR_DEBUG_ASSERT(quiesced_.load(std::memory_order_relaxed) == false);
162   if (state_->queue.Add(std::move(callback))) {
163     StartThread(state_, StartThreadReason::kNoWaitersWhenScheduling);
164   }
165 }
166 
Run(EventEngine::Closure * closure)167 void OriginalThreadPool::Run(EventEngine::Closure* closure) {
168   Run([closure]() { closure->Run(); });
169 }
170 
Add(absl::AnyInvocable<void ()> callback)171 bool OriginalThreadPool::Queue::Add(absl::AnyInvocable<void()> callback) {
172   grpc_core::MutexLock lock(&queue_mu_);
173   // Add works to the callbacks list
174   callbacks_.push(std::move(callback));
175   cv_.Signal();
176   if (forking_) return false;
177   return callbacks_.size() > threads_waiting_;
178 }
179 
IsBacklogged()180 bool OriginalThreadPool::Queue::IsBacklogged() {
181   grpc_core::MutexLock lock(&queue_mu_);
182   if (forking_) return false;
183   return callbacks_.size() > 1;
184 }
185 
SleepIfRunning()186 void OriginalThreadPool::Queue::SleepIfRunning() {
187   grpc_core::MutexLock lock(&queue_mu_);
188   auto end = grpc_core::Duration::Seconds(1) + grpc_core::Timestamp::Now();
189   while (true) {
190     grpc_core::Timestamp now = grpc_core::Timestamp::Now();
191     if (now >= end || forking_) return;
192     cv_.WaitWithTimeout(&queue_mu_, absl::Milliseconds((end - now).millis()));
193   }
194 }
195 
SetShutdown(bool is_shutdown)196 void OriginalThreadPool::Queue::SetShutdown(bool is_shutdown) {
197   grpc_core::MutexLock lock(&queue_mu_);
198   auto was_shutdown = std::exchange(shutdown_, is_shutdown);
199   GPR_ASSERT(is_shutdown != was_shutdown);
200   cv_.SignalAll();
201 }
202 
SetForking(bool is_forking)203 void OriginalThreadPool::Queue::SetForking(bool is_forking) {
204   grpc_core::MutexLock lock(&queue_mu_);
205   auto was_forking = std::exchange(forking_, is_forking);
206   GPR_ASSERT(is_forking != was_forking);
207   cv_.SignalAll();
208 }
209 
Add()210 void OriginalThreadPool::ThreadCount::Add() {
211   grpc_core::MutexLock lock(&thread_count_mu_);
212   ++threads_;
213 }
214 
Remove()215 void OriginalThreadPool::ThreadCount::Remove() {
216   grpc_core::MutexLock lock(&thread_count_mu_);
217   --threads_;
218   cv_.Signal();
219 }
220 
BlockUntilThreadCount(int threads,const char * why)221 void OriginalThreadPool::ThreadCount::BlockUntilThreadCount(int threads,
222                                                             const char* why) {
223   grpc_core::MutexLock lock(&thread_count_mu_);
224   auto last_log = absl::Now();
225   while (threads_ > threads) {
226     // Wait for all threads to exit.
227     // At least once every three seconds (but no faster than once per second in
228     // the event of spurious wakeups) log a message indicating we're waiting to
229     // fork.
230     cv_.WaitWithTimeout(&thread_count_mu_, absl::Seconds(3));
231     if (threads_ > threads && absl::Now() - last_log > absl::Seconds(1)) {
232       gpr_log(GPR_ERROR, "Waiting for thread pool to idle before %s (%d to %d)",
233               why, threads_, threads);
234       last_log = absl::Now();
235     }
236   }
237 }
238 
PrepareFork()239 void OriginalThreadPool::PrepareFork() {
240   state_->queue.SetForking(true);
241   state_->thread_count.BlockUntilThreadCount(0, "forking");
242 }
243 
PostforkParent()244 void OriginalThreadPool::PostforkParent() { Postfork(); }
245 
PostforkChild()246 void OriginalThreadPool::PostforkChild() { Postfork(); }
247 
Postfork()248 void OriginalThreadPool::Postfork() {
249   state_->queue.SetForking(false);
250   for (unsigned i = 0; i < reserve_threads_; i++) {
251     StartThread(state_, StartThreadReason::kInitialPool);
252   }
253 }
254 
255 }  // namespace experimental
256 }  // namespace grpc_event_engine
257