xref: /aosp_15_r20/external/grpc-grpc/test/core/event_engine/thread_pool_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
15 
16 #include <atomic>
17 #include <chrono>
18 #include <cmath>
19 #include <cstddef>
20 #include <functional>
21 #include <memory>
22 #include <thread>
23 #include <tuple>
24 #include <vector>
25 
26 #include "absl/time/clock.h"
27 #include "absl/time/time.h"
28 #include "gtest/gtest.h"
29 
30 #include <grpc/grpc.h>
31 #include <grpc/support/thd_id.h>
32 
33 #include "src/core/lib/event_engine/thread_pool/thread_count.h"
34 #include "src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h"
35 #include "src/core/lib/gprpp/notification.h"
36 #include "src/core/lib/gprpp/thd.h"
37 #include "src/core/lib/gprpp/time.h"
38 #include "test/core/util/test_config.h"
39 
40 namespace grpc_event_engine {
41 namespace experimental {
42 
43 template <typename T>
44 class ThreadPoolTest : public testing::Test {};
45 
46 using ThreadPoolTypes = ::testing::Types<WorkStealingThreadPool>;
47 TYPED_TEST_SUITE(ThreadPoolTest, ThreadPoolTypes);
48 
TYPED_TEST(ThreadPoolTest,CanRunAnyInvocable)49 TYPED_TEST(ThreadPoolTest, CanRunAnyInvocable) {
50   TypeParam p(8);
51   grpc_core::Notification n;
52   p.Run([&n] { n.Notify(); });
53   n.WaitForNotification();
54   p.Quiesce();
55 }
56 
TYPED_TEST(ThreadPoolTest,CanDestroyInsideClosure)57 TYPED_TEST(ThreadPoolTest, CanDestroyInsideClosure) {
58   auto* p = new TypeParam(8);
59   grpc_core::Notification n;
60   p->Run([p, &n]() mutable {
61     // This should delete the thread pool and not deadlock
62     p->Quiesce();
63     delete p;
64     n.Notify();
65   });
66   n.WaitForNotification();
67 }
68 
TYPED_TEST(ThreadPoolTest,CanSurviveFork)69 TYPED_TEST(ThreadPoolTest, CanSurviveFork) {
70   TypeParam p(8);
71   grpc_core::Notification inner_closure_ran;
72   p.Run([&inner_closure_ran, &p] {
73     std::this_thread::sleep_for(std::chrono::seconds(1));
74     p.Run([&inner_closure_ran] {
75       std::this_thread::sleep_for(std::chrono::seconds(1));
76       inner_closure_ran.Notify();
77     });
78   });
79   // simulate a fork and watch the child process
80   p.PrepareFork();
81   p.PostforkChild();
82   inner_closure_ran.WaitForNotification();
83   grpc_core::Notification n2;
84   p.Run([&n2] { n2.Notify(); });
85   n2.WaitForNotification();
86   p.Quiesce();
87 }
88 
TYPED_TEST(ThreadPoolTest,ForkStressTest)89 TYPED_TEST(ThreadPoolTest, ForkStressTest) {
90   // Runs a large number of closures and multiple simulated fork events,
91   // ensuring that only some fixed number of closures are executed between fork
92   // events.
93   //
94   // Why: Python relies on fork support, and fork behaves poorly in the presence
95   // of threads, but non-deterministically. gRPC has had problems in this space.
96   // This test exercises a subset of the fork logic, the pieces we can control
97   // without an actual OS fork.
98   constexpr int expected_runcount = 1000;
99   constexpr absl::Duration fork_freqency{absl::Milliseconds(50)};
100   constexpr int num_closures_between_forks{100};
101   TypeParam pool(8);
102   std::atomic<int> runcount{0};
103   std::atomic<int> fork_count{0};
104   std::function<void()> inner_fn;
105   inner_fn = [&]() {
106     auto curr_runcount = runcount.load(std::memory_order_relaxed);
107     // exit when the right number of closures have run, with some flex for
108     // relaxed atomics.
109     if (curr_runcount >= expected_runcount) return;
110     if (fork_count.load(std::memory_order_relaxed) *
111             num_closures_between_forks <=
112         curr_runcount) {
113       // skip incrementing, and schedule again.
114       pool.Run(inner_fn);
115       return;
116     }
117     runcount.fetch_add(1, std::memory_order_relaxed);
118   };
119   for (auto i = 0; i < expected_runcount; i++) {
120     pool.Run(inner_fn);
121   }
122   // simulate multiple forks at a fixed frequency
123   int curr_runcount = 0;
124   while (curr_runcount < expected_runcount) {
125     absl::SleepFor(fork_freqency);
126     curr_runcount = runcount.load(std::memory_order_relaxed);
127     int curr_forkcount = fork_count.load(std::memory_order_relaxed);
128     if (curr_forkcount * num_closures_between_forks > curr_runcount) {
129       continue;
130     }
131     pool.PrepareFork();
132     pool.PostforkChild();
133     fork_count.fetch_add(1);
134   }
135   ASSERT_GE(fork_count.load(), expected_runcount / num_closures_between_forks);
136   // owners are the local pool, and the copy inside `inner_fn`.
137   pool.Quiesce();
138 }
139 
TYPED_TEST(ThreadPoolTest,StartQuiesceRaceStressTest)140 TYPED_TEST(ThreadPoolTest, StartQuiesceRaceStressTest) {
141   // Repeatedly race Start and Quiesce against each other to ensure thread
142   // safety.
143   constexpr int iter_count = 500;
144   struct ThdState {
145     std::unique_ptr<TypeParam> pool;
146     int i;
147   };
148   for (auto i = 0; i < iter_count; i++) {
149     ThdState state{std::make_unique<TypeParam>(8), i};
150     state.pool->PrepareFork();
151     grpc_core::Thread t1(
152         "t1",
153         [](void* arg) {
154           ThdState* state = static_cast<ThdState*>(arg);
155           state->i % 2 == 0 ? state->pool->Quiesce()
156                             : state->pool->PostforkParent();
157         },
158         &state, nullptr,
159         grpc_core::Thread::Options().set_tracked(false).set_joinable(true));
160     grpc_core::Thread t2(
161         "t2",
162         [](void* arg) {
163           ThdState* state = static_cast<ThdState*>(arg);
164           state->i % 2 == 1 ? state->pool->Quiesce()
165                             : state->pool->PostforkParent();
166         },
167         &state, nullptr,
168         grpc_core::Thread::Options().set_tracked(false).set_joinable(true));
169     t1.Start();
170     t2.Start();
171     t1.Join();
172     t2.Join();
173   }
174 }
175 
ScheduleSelf(ThreadPool * p)176 void ScheduleSelf(ThreadPool* p) {
177   p->Run([p] { ScheduleSelf(p); });
178 }
179 
ScheduleTwiceUntilZero(ThreadPool * p,std::atomic<int> & runcount,int n)180 void ScheduleTwiceUntilZero(ThreadPool* p, std::atomic<int>& runcount, int n) {
181   runcount.fetch_add(1);
182   if (n == 0) return;
183   p->Run([p, &runcount, n] {
184     ScheduleTwiceUntilZero(p, runcount, n - 1);
185     ScheduleTwiceUntilZero(p, runcount, n - 1);
186   });
187 }
188 
TYPED_TEST(ThreadPoolTest,CanStartLotsOfClosures)189 TYPED_TEST(ThreadPoolTest, CanStartLotsOfClosures) {
190   TypeParam p(8);
191   std::atomic<int> runcount{0};
192   int branch_factor = 20;
193   ScheduleTwiceUntilZero(&p, runcount, branch_factor);
194   p.Quiesce();
195   ASSERT_EQ(runcount.load(), pow(2, branch_factor + 1) - 1);
196 }
197 
TYPED_TEST(ThreadPoolTest,ScalesWhenBackloggedFromGlobalQueue)198 TYPED_TEST(ThreadPoolTest, ScalesWhenBackloggedFromGlobalQueue) {
199   int pool_thread_count = 8;
200   TypeParam p(pool_thread_count);
201   grpc_core::Notification signal;
202   // Ensures the pool is saturated before signaling closures to continue.
203   std::atomic<int> waiters{0};
204   std::atomic<bool> signaled{false};
205   for (auto i = 0; i < pool_thread_count; i++) {
206     p.Run([&]() {
207       waiters.fetch_add(1);
208       while (!signaled.load()) {
209         signal.WaitForNotification();
210       }
211     });
212   }
213   while (waiters.load() != pool_thread_count) {
214     absl::SleepFor(absl::Milliseconds(50));
215   }
216   p.Run([&]() {
217     signaled.store(true);
218     signal.Notify();
219   });
220   p.Quiesce();
221 }
222 
TYPED_TEST(ThreadPoolTest,ScalesWhenBackloggedFromSingleThreadLocalQueue)223 TYPED_TEST(ThreadPoolTest, ScalesWhenBackloggedFromSingleThreadLocalQueue) {
224   constexpr int pool_thread_count = 8;
225   TypeParam p(pool_thread_count);
226   grpc_core::Notification signal;
227   // Ensures the pool is saturated before signaling closures to continue.
228   std::atomic<int> waiters{0};
229   std::atomic<bool> signaled{false};
230   p.Run([&]() {
231     for (int i = 0; i < pool_thread_count; i++) {
232       p.Run([&]() {
233         waiters.fetch_add(1);
234         while (!signaled.load()) {
235           signal.WaitForNotification();
236         }
237       });
238     }
239     while (waiters.load() != pool_thread_count) {
240       absl::SleepFor(absl::Milliseconds(50));
241     }
242     p.Run([&]() {
243       signaled.store(true);
244       signal.Notify();
245     });
246   });
247   p.Quiesce();
248 }
249 
TYPED_TEST(ThreadPoolTest,QuiesceRaceStressTest)250 TYPED_TEST(ThreadPoolTest, QuiesceRaceStressTest) {
251   constexpr int cycle_count = 333;
252   constexpr int thread_count = 8;
253   constexpr int run_count = thread_count * 2;
254   for (auto i = 0; i < cycle_count; i++) {
255     TypeParam p(thread_count);
256     for (auto j = 0; j < run_count; j++) {
257       p.Run([]() {});
258     }
259     p.Quiesce();
260   }
261 }
262 
TYPED_TEST(ThreadPoolTest,WorkerThreadLocalRunWorksWithOtherPools)263 TYPED_TEST(ThreadPoolTest, WorkerThreadLocalRunWorksWithOtherPools) {
264   // WorkStealingThreadPools may queue work onto a thread-local queue, and that
265   // work may be stolen by other threads. This test tries to ensure that work
266   // queued from a pool-A worker-thread, to pool-B, does not end up on a pool-A
267   // queue.
268   constexpr size_t p1_run_iterations = 32;
269   constexpr size_t p2_run_iterations = 1000;
270   TypeParam p1(8);
271   TypeParam p2(8);
272   std::vector<gpr_thd_id> tid(p1_run_iterations);
273   std::atomic<size_t> iter_count{0};
274   grpc_core::Notification finished_all_iterations;
275   for (size_t p1_i = 0; p1_i < p1_run_iterations; p1_i++) {
276     p1.Run([&, p1_i, total_iterations = p1_run_iterations * p2_run_iterations] {
277       tid[p1_i] = gpr_thd_currentid();
278       for (size_t p2_i = 0; p2_i < p2_run_iterations; p2_i++) {
279         p2.Run([&, p1_i, total_iterations] {
280           EXPECT_NE(tid[p1_i], gpr_thd_currentid());
281           if (total_iterations == iter_count.fetch_add(1) + 1) {
282             finished_all_iterations.Notify();
283           }
284         });
285       }
286     });
287   }
288   finished_all_iterations.WaitForNotification();
289   p2.Quiesce();
290   p1.Quiesce();
291 }
292 
TYPED_TEST(ThreadPoolTest,DISABLED_TestDumpStack)293 TYPED_TEST(ThreadPoolTest, DISABLED_TestDumpStack) {
294   TypeParam p1(8);
295   for (size_t i = 0; i < 8; i++) {
296     p1.Run([]() { absl::SleepFor(absl::Seconds(90)); });
297   }
298   absl::SleepFor(absl::Seconds(2));
299   p1.Quiesce();
300 }
301 
302 class BusyThreadCountTest : public testing::Test {};
303 
TEST_F(BusyThreadCountTest,StressTest)304 TEST_F(BusyThreadCountTest, StressTest) {
305   // Spawns a large number of threads to concurrently increments/decrement the
306   // counters, and request count totals. Magic numbers were tuned for tests to
307   // run in a reasonable amount of time.
308   constexpr size_t thread_count = 300;
309   constexpr int run_count = 1000;
310   constexpr int increment_by = 50;
311   BusyThreadCount busy_thread_count;
312   grpc_core::Notification stop_counting;
313   std::thread counter_thread([&]() {
314     while (!stop_counting.HasBeenNotified()) {
315       busy_thread_count.count();
316     }
317   });
318   std::vector<std::thread> threads;
319   threads.reserve(thread_count);
320   for (size_t i = 0; i < thread_count; i++) {
321     threads.emplace_back([&]() {
322       for (int j = 0; j < run_count; j++) {
323         // Get a new index for every iteration.
324         // This is not the intended use, but further stress tests the NextIndex
325         // function.
326         auto thread_idx = busy_thread_count.NextIndex();
327         for (int inc = 0; inc < increment_by; inc++) {
328           busy_thread_count.Increment(thread_idx);
329         }
330         for (int inc = 0; inc < increment_by; inc++) {
331           busy_thread_count.Decrement(thread_idx);
332         }
333       }
334     });
335   }
336   for (auto& thd : threads) thd.join();
337   stop_counting.Notify();
338   counter_thread.join();
339   ASSERT_EQ(busy_thread_count.count(), 0);
340 }
341 
TEST_F(BusyThreadCountTest,AutoCountStressTest)342 TEST_F(BusyThreadCountTest, AutoCountStressTest) {
343   // Spawns a large number of threads to concurrently increments/decrement the
344   // counters, and request count totals. Magic numbers were tuned for tests to
345   // run in a reasonable amount of time.
346   constexpr size_t thread_count = 150;
347   constexpr int run_count = 1000;
348   constexpr int increment_by = 30;
349   BusyThreadCount busy_thread_count;
350   grpc_core::Notification stop_counting;
351   std::thread counter_thread([&]() {
352     while (!stop_counting.HasBeenNotified()) {
353       busy_thread_count.count();
354     }
355   });
356   std::vector<std::thread> threads;
357   threads.reserve(thread_count);
358   for (size_t i = 0; i < thread_count; i++) {
359     threads.emplace_back([&]() {
360       for (int j = 0; j < run_count; j++) {
361         std::vector<BusyThreadCount::AutoThreadCounter> auto_counters;
362         auto_counters.reserve(increment_by);
363         for (int ctr_count = 0; ctr_count < increment_by; ctr_count++) {
364           auto_counters.push_back(busy_thread_count.MakeAutoThreadCounter(
365               busy_thread_count.NextIndex()));
366         }
367       }
368     });
369   }
370   for (auto& thd : threads) thd.join();
371   stop_counting.Notify();
372   counter_thread.join();
373   ASSERT_EQ(busy_thread_count.count(), 0);
374 }
375 
376 class LivingThreadCountTest : public testing::Test {};
377 
TEST_F(LivingThreadCountTest,StressTest)378 TEST_F(LivingThreadCountTest, StressTest) {
379   // Spawns a large number of threads to concurrently increments/decrement the
380   // counters, and request count totals. Magic numbers were tuned for tests to
381   // run in a reasonable amount of time.
382   constexpr size_t thread_count = 50;
383   constexpr int run_count = 1000;
384   constexpr int increment_by = 10;
385   LivingThreadCount living_thread_count;
386   grpc_core::Notification stop_counting;
387   std::thread counter_thread([&]() {
388     while (!stop_counting.HasBeenNotified()) {
389       living_thread_count.count();
390     }
391   });
392   std::vector<std::thread> threads;
393   threads.reserve(thread_count);
394   for (size_t i = 0; i < thread_count; i++) {
395     threads.emplace_back([&]() {
396       for (int j = 0; j < run_count; j++) {
397         // Get a new index for every iteration.
398         // This is not the intended use, but further stress tests the NextIndex
399         // function.
400         for (int inc = 0; inc < increment_by; inc++) {
401           living_thread_count.Increment();
402         }
403         for (int inc = 0; inc < increment_by; inc++) {
404           living_thread_count.Decrement();
405         }
406       }
407     });
408   }
409   for (auto& thd : threads) thd.join();
410   stop_counting.Notify();
411   counter_thread.join();
412   ASSERT_EQ(living_thread_count.count(), 0);
413 }
414 
TEST_F(LivingThreadCountTest,AutoCountStressTest)415 TEST_F(LivingThreadCountTest, AutoCountStressTest) {
416   // Spawns a large number of threads to concurrently increments/decrement the
417   // counters, and request count totals. Magic numbers were tuned for tests to
418   // run in a reasonable amount of time.
419   constexpr size_t thread_count = 50;
420   constexpr int run_count = 1000;
421   constexpr int increment_by = 10;
422   LivingThreadCount living_thread_count;
423   grpc_core::Notification stop_counting;
424   std::thread counter_thread([&]() {
425     while (!stop_counting.HasBeenNotified()) {
426       living_thread_count.count();
427     }
428   });
429   std::vector<std::thread> threads;
430   threads.reserve(thread_count);
431   for (size_t i = 0; i < thread_count; i++) {
432     threads.emplace_back([&]() {
433       for (int j = 0; j < run_count; j++) {
434         std::vector<LivingThreadCount::AutoThreadCounter> auto_counters;
435         auto_counters.reserve(increment_by);
436         for (int ctr_count = 0; ctr_count < increment_by; ctr_count++) {
437           auto_counters.push_back(living_thread_count.MakeAutoThreadCounter());
438         }
439       }
440     });
441   }
442   for (auto& thd : threads) thd.join();
443   stop_counting.Notify();
444   counter_thread.join();
445   ASSERT_EQ(living_thread_count.count(), 0);
446 }
447 
TEST_F(LivingThreadCountTest,BlockUntilThreadCountTest)448 TEST_F(LivingThreadCountTest, BlockUntilThreadCountTest) {
449   constexpr size_t thread_count = 100;
450   grpc_core::Notification waiting;
451   LivingThreadCount living_thread_count;
452   std::vector<std::thread> threads;
453   threads.reserve(thread_count);
454   // Start N living threads
455   for (size_t i = 0; i < thread_count; i++) {
456     threads.emplace_back([&]() {
457       auto alive = living_thread_count.MakeAutoThreadCounter();
458       waiting.WaitForNotification();
459     });
460   }
461   // Join in a separate thread
462   std::thread joiner([&]() {
463     waiting.Notify();
464     for (auto& thd : threads) thd.join();
465   });
466   {
467     auto alive = living_thread_count.MakeAutoThreadCounter();
468     std::ignore = living_thread_count.BlockUntilThreadCount(
469         1, "block until 1 thread remains", grpc_core::Duration::Infinity());
470   }
471   std::ignore = living_thread_count.BlockUntilThreadCount(
472       0, "block until all threads are gone", grpc_core::Duration::Infinity());
473   joiner.join();
474   ASSERT_EQ(living_thread_count.count(), 0);
475 }
476 
477 }  // namespace experimental
478 }  // namespace grpc_event_engine
479 
main(int argc,char ** argv)480 int main(int argc, char** argv) {
481   ::testing::InitGoogleTest(&argc, argv);
482   grpc::testing::TestEnvironment env(&argc, argv);
483   grpc_init();
484   auto result = RUN_ALL_TESTS();
485   grpc_shutdown();
486   return result;
487 }
488