1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
15
16 #include <atomic>
17 #include <chrono>
18 #include <cmath>
19 #include <cstddef>
20 #include <functional>
21 #include <memory>
22 #include <thread>
23 #include <tuple>
24 #include <vector>
25
26 #include "absl/time/clock.h"
27 #include "absl/time/time.h"
28 #include "gtest/gtest.h"
29
30 #include <grpc/grpc.h>
31 #include <grpc/support/thd_id.h>
32
33 #include "src/core/lib/event_engine/thread_pool/thread_count.h"
34 #include "src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h"
35 #include "src/core/lib/gprpp/notification.h"
36 #include "src/core/lib/gprpp/thd.h"
37 #include "src/core/lib/gprpp/time.h"
38 #include "test/core/util/test_config.h"
39
40 namespace grpc_event_engine {
41 namespace experimental {
42
43 template <typename T>
44 class ThreadPoolTest : public testing::Test {};
45
46 using ThreadPoolTypes = ::testing::Types<WorkStealingThreadPool>;
47 TYPED_TEST_SUITE(ThreadPoolTest, ThreadPoolTypes);
48
TYPED_TEST(ThreadPoolTest,CanRunAnyInvocable)49 TYPED_TEST(ThreadPoolTest, CanRunAnyInvocable) {
50 TypeParam p(8);
51 grpc_core::Notification n;
52 p.Run([&n] { n.Notify(); });
53 n.WaitForNotification();
54 p.Quiesce();
55 }
56
TYPED_TEST(ThreadPoolTest,CanDestroyInsideClosure)57 TYPED_TEST(ThreadPoolTest, CanDestroyInsideClosure) {
58 auto* p = new TypeParam(8);
59 grpc_core::Notification n;
60 p->Run([p, &n]() mutable {
61 // This should delete the thread pool and not deadlock
62 p->Quiesce();
63 delete p;
64 n.Notify();
65 });
66 n.WaitForNotification();
67 }
68
TYPED_TEST(ThreadPoolTest,CanSurviveFork)69 TYPED_TEST(ThreadPoolTest, CanSurviveFork) {
70 TypeParam p(8);
71 grpc_core::Notification inner_closure_ran;
72 p.Run([&inner_closure_ran, &p] {
73 std::this_thread::sleep_for(std::chrono::seconds(1));
74 p.Run([&inner_closure_ran] {
75 std::this_thread::sleep_for(std::chrono::seconds(1));
76 inner_closure_ran.Notify();
77 });
78 });
79 // simulate a fork and watch the child process
80 p.PrepareFork();
81 p.PostforkChild();
82 inner_closure_ran.WaitForNotification();
83 grpc_core::Notification n2;
84 p.Run([&n2] { n2.Notify(); });
85 n2.WaitForNotification();
86 p.Quiesce();
87 }
88
TYPED_TEST(ThreadPoolTest,ForkStressTest)89 TYPED_TEST(ThreadPoolTest, ForkStressTest) {
90 // Runs a large number of closures and multiple simulated fork events,
91 // ensuring that only some fixed number of closures are executed between fork
92 // events.
93 //
94 // Why: Python relies on fork support, and fork behaves poorly in the presence
95 // of threads, but non-deterministically. gRPC has had problems in this space.
96 // This test exercises a subset of the fork logic, the pieces we can control
97 // without an actual OS fork.
98 constexpr int expected_runcount = 1000;
99 constexpr absl::Duration fork_freqency{absl::Milliseconds(50)};
100 constexpr int num_closures_between_forks{100};
101 TypeParam pool(8);
102 std::atomic<int> runcount{0};
103 std::atomic<int> fork_count{0};
104 std::function<void()> inner_fn;
105 inner_fn = [&]() {
106 auto curr_runcount = runcount.load(std::memory_order_relaxed);
107 // exit when the right number of closures have run, with some flex for
108 // relaxed atomics.
109 if (curr_runcount >= expected_runcount) return;
110 if (fork_count.load(std::memory_order_relaxed) *
111 num_closures_between_forks <=
112 curr_runcount) {
113 // skip incrementing, and schedule again.
114 pool.Run(inner_fn);
115 return;
116 }
117 runcount.fetch_add(1, std::memory_order_relaxed);
118 };
119 for (auto i = 0; i < expected_runcount; i++) {
120 pool.Run(inner_fn);
121 }
122 // simulate multiple forks at a fixed frequency
123 int curr_runcount = 0;
124 while (curr_runcount < expected_runcount) {
125 absl::SleepFor(fork_freqency);
126 curr_runcount = runcount.load(std::memory_order_relaxed);
127 int curr_forkcount = fork_count.load(std::memory_order_relaxed);
128 if (curr_forkcount * num_closures_between_forks > curr_runcount) {
129 continue;
130 }
131 pool.PrepareFork();
132 pool.PostforkChild();
133 fork_count.fetch_add(1);
134 }
135 ASSERT_GE(fork_count.load(), expected_runcount / num_closures_between_forks);
136 // owners are the local pool, and the copy inside `inner_fn`.
137 pool.Quiesce();
138 }
139
TYPED_TEST(ThreadPoolTest,StartQuiesceRaceStressTest)140 TYPED_TEST(ThreadPoolTest, StartQuiesceRaceStressTest) {
141 // Repeatedly race Start and Quiesce against each other to ensure thread
142 // safety.
143 constexpr int iter_count = 500;
144 struct ThdState {
145 std::unique_ptr<TypeParam> pool;
146 int i;
147 };
148 for (auto i = 0; i < iter_count; i++) {
149 ThdState state{std::make_unique<TypeParam>(8), i};
150 state.pool->PrepareFork();
151 grpc_core::Thread t1(
152 "t1",
153 [](void* arg) {
154 ThdState* state = static_cast<ThdState*>(arg);
155 state->i % 2 == 0 ? state->pool->Quiesce()
156 : state->pool->PostforkParent();
157 },
158 &state, nullptr,
159 grpc_core::Thread::Options().set_tracked(false).set_joinable(true));
160 grpc_core::Thread t2(
161 "t2",
162 [](void* arg) {
163 ThdState* state = static_cast<ThdState*>(arg);
164 state->i % 2 == 1 ? state->pool->Quiesce()
165 : state->pool->PostforkParent();
166 },
167 &state, nullptr,
168 grpc_core::Thread::Options().set_tracked(false).set_joinable(true));
169 t1.Start();
170 t2.Start();
171 t1.Join();
172 t2.Join();
173 }
174 }
175
ScheduleSelf(ThreadPool * p)176 void ScheduleSelf(ThreadPool* p) {
177 p->Run([p] { ScheduleSelf(p); });
178 }
179
ScheduleTwiceUntilZero(ThreadPool * p,std::atomic<int> & runcount,int n)180 void ScheduleTwiceUntilZero(ThreadPool* p, std::atomic<int>& runcount, int n) {
181 runcount.fetch_add(1);
182 if (n == 0) return;
183 p->Run([p, &runcount, n] {
184 ScheduleTwiceUntilZero(p, runcount, n - 1);
185 ScheduleTwiceUntilZero(p, runcount, n - 1);
186 });
187 }
188
TYPED_TEST(ThreadPoolTest,CanStartLotsOfClosures)189 TYPED_TEST(ThreadPoolTest, CanStartLotsOfClosures) {
190 TypeParam p(8);
191 std::atomic<int> runcount{0};
192 int branch_factor = 20;
193 ScheduleTwiceUntilZero(&p, runcount, branch_factor);
194 p.Quiesce();
195 ASSERT_EQ(runcount.load(), pow(2, branch_factor + 1) - 1);
196 }
197
TYPED_TEST(ThreadPoolTest,ScalesWhenBackloggedFromGlobalQueue)198 TYPED_TEST(ThreadPoolTest, ScalesWhenBackloggedFromGlobalQueue) {
199 int pool_thread_count = 8;
200 TypeParam p(pool_thread_count);
201 grpc_core::Notification signal;
202 // Ensures the pool is saturated before signaling closures to continue.
203 std::atomic<int> waiters{0};
204 std::atomic<bool> signaled{false};
205 for (auto i = 0; i < pool_thread_count; i++) {
206 p.Run([&]() {
207 waiters.fetch_add(1);
208 while (!signaled.load()) {
209 signal.WaitForNotification();
210 }
211 });
212 }
213 while (waiters.load() != pool_thread_count) {
214 absl::SleepFor(absl::Milliseconds(50));
215 }
216 p.Run([&]() {
217 signaled.store(true);
218 signal.Notify();
219 });
220 p.Quiesce();
221 }
222
TYPED_TEST(ThreadPoolTest,ScalesWhenBackloggedFromSingleThreadLocalQueue)223 TYPED_TEST(ThreadPoolTest, ScalesWhenBackloggedFromSingleThreadLocalQueue) {
224 constexpr int pool_thread_count = 8;
225 TypeParam p(pool_thread_count);
226 grpc_core::Notification signal;
227 // Ensures the pool is saturated before signaling closures to continue.
228 std::atomic<int> waiters{0};
229 std::atomic<bool> signaled{false};
230 p.Run([&]() {
231 for (int i = 0; i < pool_thread_count; i++) {
232 p.Run([&]() {
233 waiters.fetch_add(1);
234 while (!signaled.load()) {
235 signal.WaitForNotification();
236 }
237 });
238 }
239 while (waiters.load() != pool_thread_count) {
240 absl::SleepFor(absl::Milliseconds(50));
241 }
242 p.Run([&]() {
243 signaled.store(true);
244 signal.Notify();
245 });
246 });
247 p.Quiesce();
248 }
249
TYPED_TEST(ThreadPoolTest,QuiesceRaceStressTest)250 TYPED_TEST(ThreadPoolTest, QuiesceRaceStressTest) {
251 constexpr int cycle_count = 333;
252 constexpr int thread_count = 8;
253 constexpr int run_count = thread_count * 2;
254 for (auto i = 0; i < cycle_count; i++) {
255 TypeParam p(thread_count);
256 for (auto j = 0; j < run_count; j++) {
257 p.Run([]() {});
258 }
259 p.Quiesce();
260 }
261 }
262
TYPED_TEST(ThreadPoolTest,WorkerThreadLocalRunWorksWithOtherPools)263 TYPED_TEST(ThreadPoolTest, WorkerThreadLocalRunWorksWithOtherPools) {
264 // WorkStealingThreadPools may queue work onto a thread-local queue, and that
265 // work may be stolen by other threads. This test tries to ensure that work
266 // queued from a pool-A worker-thread, to pool-B, does not end up on a pool-A
267 // queue.
268 constexpr size_t p1_run_iterations = 32;
269 constexpr size_t p2_run_iterations = 1000;
270 TypeParam p1(8);
271 TypeParam p2(8);
272 std::vector<gpr_thd_id> tid(p1_run_iterations);
273 std::atomic<size_t> iter_count{0};
274 grpc_core::Notification finished_all_iterations;
275 for (size_t p1_i = 0; p1_i < p1_run_iterations; p1_i++) {
276 p1.Run([&, p1_i, total_iterations = p1_run_iterations * p2_run_iterations] {
277 tid[p1_i] = gpr_thd_currentid();
278 for (size_t p2_i = 0; p2_i < p2_run_iterations; p2_i++) {
279 p2.Run([&, p1_i, total_iterations] {
280 EXPECT_NE(tid[p1_i], gpr_thd_currentid());
281 if (total_iterations == iter_count.fetch_add(1) + 1) {
282 finished_all_iterations.Notify();
283 }
284 });
285 }
286 });
287 }
288 finished_all_iterations.WaitForNotification();
289 p2.Quiesce();
290 p1.Quiesce();
291 }
292
TYPED_TEST(ThreadPoolTest,DISABLED_TestDumpStack)293 TYPED_TEST(ThreadPoolTest, DISABLED_TestDumpStack) {
294 TypeParam p1(8);
295 for (size_t i = 0; i < 8; i++) {
296 p1.Run([]() { absl::SleepFor(absl::Seconds(90)); });
297 }
298 absl::SleepFor(absl::Seconds(2));
299 p1.Quiesce();
300 }
301
302 class BusyThreadCountTest : public testing::Test {};
303
TEST_F(BusyThreadCountTest,StressTest)304 TEST_F(BusyThreadCountTest, StressTest) {
305 // Spawns a large number of threads to concurrently increments/decrement the
306 // counters, and request count totals. Magic numbers were tuned for tests to
307 // run in a reasonable amount of time.
308 constexpr size_t thread_count = 300;
309 constexpr int run_count = 1000;
310 constexpr int increment_by = 50;
311 BusyThreadCount busy_thread_count;
312 grpc_core::Notification stop_counting;
313 std::thread counter_thread([&]() {
314 while (!stop_counting.HasBeenNotified()) {
315 busy_thread_count.count();
316 }
317 });
318 std::vector<std::thread> threads;
319 threads.reserve(thread_count);
320 for (size_t i = 0; i < thread_count; i++) {
321 threads.emplace_back([&]() {
322 for (int j = 0; j < run_count; j++) {
323 // Get a new index for every iteration.
324 // This is not the intended use, but further stress tests the NextIndex
325 // function.
326 auto thread_idx = busy_thread_count.NextIndex();
327 for (int inc = 0; inc < increment_by; inc++) {
328 busy_thread_count.Increment(thread_idx);
329 }
330 for (int inc = 0; inc < increment_by; inc++) {
331 busy_thread_count.Decrement(thread_idx);
332 }
333 }
334 });
335 }
336 for (auto& thd : threads) thd.join();
337 stop_counting.Notify();
338 counter_thread.join();
339 ASSERT_EQ(busy_thread_count.count(), 0);
340 }
341
TEST_F(BusyThreadCountTest,AutoCountStressTest)342 TEST_F(BusyThreadCountTest, AutoCountStressTest) {
343 // Spawns a large number of threads to concurrently increments/decrement the
344 // counters, and request count totals. Magic numbers were tuned for tests to
345 // run in a reasonable amount of time.
346 constexpr size_t thread_count = 150;
347 constexpr int run_count = 1000;
348 constexpr int increment_by = 30;
349 BusyThreadCount busy_thread_count;
350 grpc_core::Notification stop_counting;
351 std::thread counter_thread([&]() {
352 while (!stop_counting.HasBeenNotified()) {
353 busy_thread_count.count();
354 }
355 });
356 std::vector<std::thread> threads;
357 threads.reserve(thread_count);
358 for (size_t i = 0; i < thread_count; i++) {
359 threads.emplace_back([&]() {
360 for (int j = 0; j < run_count; j++) {
361 std::vector<BusyThreadCount::AutoThreadCounter> auto_counters;
362 auto_counters.reserve(increment_by);
363 for (int ctr_count = 0; ctr_count < increment_by; ctr_count++) {
364 auto_counters.push_back(busy_thread_count.MakeAutoThreadCounter(
365 busy_thread_count.NextIndex()));
366 }
367 }
368 });
369 }
370 for (auto& thd : threads) thd.join();
371 stop_counting.Notify();
372 counter_thread.join();
373 ASSERT_EQ(busy_thread_count.count(), 0);
374 }
375
376 class LivingThreadCountTest : public testing::Test {};
377
TEST_F(LivingThreadCountTest,StressTest)378 TEST_F(LivingThreadCountTest, StressTest) {
379 // Spawns a large number of threads to concurrently increments/decrement the
380 // counters, and request count totals. Magic numbers were tuned for tests to
381 // run in a reasonable amount of time.
382 constexpr size_t thread_count = 50;
383 constexpr int run_count = 1000;
384 constexpr int increment_by = 10;
385 LivingThreadCount living_thread_count;
386 grpc_core::Notification stop_counting;
387 std::thread counter_thread([&]() {
388 while (!stop_counting.HasBeenNotified()) {
389 living_thread_count.count();
390 }
391 });
392 std::vector<std::thread> threads;
393 threads.reserve(thread_count);
394 for (size_t i = 0; i < thread_count; i++) {
395 threads.emplace_back([&]() {
396 for (int j = 0; j < run_count; j++) {
397 // Get a new index for every iteration.
398 // This is not the intended use, but further stress tests the NextIndex
399 // function.
400 for (int inc = 0; inc < increment_by; inc++) {
401 living_thread_count.Increment();
402 }
403 for (int inc = 0; inc < increment_by; inc++) {
404 living_thread_count.Decrement();
405 }
406 }
407 });
408 }
409 for (auto& thd : threads) thd.join();
410 stop_counting.Notify();
411 counter_thread.join();
412 ASSERT_EQ(living_thread_count.count(), 0);
413 }
414
TEST_F(LivingThreadCountTest,AutoCountStressTest)415 TEST_F(LivingThreadCountTest, AutoCountStressTest) {
416 // Spawns a large number of threads to concurrently increments/decrement the
417 // counters, and request count totals. Magic numbers were tuned for tests to
418 // run in a reasonable amount of time.
419 constexpr size_t thread_count = 50;
420 constexpr int run_count = 1000;
421 constexpr int increment_by = 10;
422 LivingThreadCount living_thread_count;
423 grpc_core::Notification stop_counting;
424 std::thread counter_thread([&]() {
425 while (!stop_counting.HasBeenNotified()) {
426 living_thread_count.count();
427 }
428 });
429 std::vector<std::thread> threads;
430 threads.reserve(thread_count);
431 for (size_t i = 0; i < thread_count; i++) {
432 threads.emplace_back([&]() {
433 for (int j = 0; j < run_count; j++) {
434 std::vector<LivingThreadCount::AutoThreadCounter> auto_counters;
435 auto_counters.reserve(increment_by);
436 for (int ctr_count = 0; ctr_count < increment_by; ctr_count++) {
437 auto_counters.push_back(living_thread_count.MakeAutoThreadCounter());
438 }
439 }
440 });
441 }
442 for (auto& thd : threads) thd.join();
443 stop_counting.Notify();
444 counter_thread.join();
445 ASSERT_EQ(living_thread_count.count(), 0);
446 }
447
TEST_F(LivingThreadCountTest,BlockUntilThreadCountTest)448 TEST_F(LivingThreadCountTest, BlockUntilThreadCountTest) {
449 constexpr size_t thread_count = 100;
450 grpc_core::Notification waiting;
451 LivingThreadCount living_thread_count;
452 std::vector<std::thread> threads;
453 threads.reserve(thread_count);
454 // Start N living threads
455 for (size_t i = 0; i < thread_count; i++) {
456 threads.emplace_back([&]() {
457 auto alive = living_thread_count.MakeAutoThreadCounter();
458 waiting.WaitForNotification();
459 });
460 }
461 // Join in a separate thread
462 std::thread joiner([&]() {
463 waiting.Notify();
464 for (auto& thd : threads) thd.join();
465 });
466 {
467 auto alive = living_thread_count.MakeAutoThreadCounter();
468 std::ignore = living_thread_count.BlockUntilThreadCount(
469 1, "block until 1 thread remains", grpc_core::Duration::Infinity());
470 }
471 std::ignore = living_thread_count.BlockUntilThreadCount(
472 0, "block until all threads are gone", grpc_core::Duration::Infinity());
473 joiner.join();
474 ASSERT_EQ(living_thread_count.count(), 0);
475 }
476
477 } // namespace experimental
478 } // namespace grpc_event_engine
479
main(int argc,char ** argv)480 int main(int argc, char** argv) {
481 ::testing::InitGoogleTest(&argc, argv);
482 grpc::testing::TestEnvironment env(&argc, argv);
483 grpc_init();
484 auto result = RUN_ALL_TESTS();
485 grpc_shutdown();
486 return result;
487 }
488