xref: /aosp_15_r20/external/federated-compute/fcp/base/scheduler_test.cc (revision 14675a029014e728ec732f129a32e299b2da0601)
1 /*
2  * Copyright 2018 Google LLC
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "fcp/base/scheduler.h"
18 
19 #include <atomic>
20 #include <cstdlib>  // for std::rand
21 
22 #include "gmock/gmock.h"
23 #include "gtest/gtest.h"
24 #include "absl/synchronization/blocking_counter.h"
25 #include "fcp/base/monitoring.h"
26 #include "fcp/testing/testing.h"
27 
28 namespace fcp {
29 namespace base {
30 namespace {
31 
32 // NOTE: many of tests below use reference captures in lambdas for locals.
33 // This is sound because the test methods do not return before the thread
34 // pool has become idle (pool->WaitUntilIdle()).
35 
36 // Tests whether scheduled tasks are successfully executed.
TEST(ThreadPool,TasksAreExecuted)37 TEST(ThreadPool, TasksAreExecuted) {
38   auto pool = CreateThreadPoolScheduler(2);
39 
40   bool b1 = false;
41   bool b2 = false;
42   pool->Schedule([&b1]() { b1 = true; });
43   pool->Schedule([&b2]() { b2 = true; });
44 
45   pool->WaitUntilIdle();
46 
47   EXPECT_TRUE(b1);
48   EXPECT_TRUE(b2);
49 }
50 
51 // Tests whether the pool uses actually multiple threads to execute tasks.
52 // The test goal is achieved by blocking in one task until another task
53 // unblocks, which can only work if multiple threads are used.
TEST(ThreadPool,ThreadsAreUtilized)54 TEST(ThreadPool, ThreadsAreUtilized) {
55   auto pool = CreateThreadPoolScheduler(2);
56 
57   absl::BlockingCounter counter(1);
58   bool b1 = false;
59   bool b2 = false;
60 
61   pool->Schedule([&b1, &counter] {
62     counter.Wait();
63     b1 = true;
64   });
65   pool->Schedule([&b2, &counter] {
66     counter.DecrementCount();
67     b2 = true;
68   });
69 
70   pool->WaitUntilIdle();
71 
72   EXPECT_TRUE(b1);
73   EXPECT_TRUE(b2);
74 }
75 
TEST(ThreadPool,StressTest)76 TEST(ThreadPool, StressTest) {
77   // A simple stress test where we spawn many threads and let them after
78   // a random wait time increment a counter.
79   static constexpr int kThreads = 32;
80   static constexpr int kIterations = 16;
81   auto pool = CreateThreadPoolScheduler(kThreads);
82   std::atomic<int64_t> atomic_counter{0};
83 
84   for (auto i = 0; i < kThreads; ++i) {
85     auto task = [&atomic_counter] {
86       for (auto j = 0; j < kIterations; ++j) {
87         absl::SleepFor(absl::Microseconds(std::rand() % 500));
88         atomic_counter.fetch_add(1);
89       }
90     };
91     pool->Schedule(task);
92   }
93 
94   pool->WaitUntilIdle();
95   ASSERT_EQ(atomic_counter, kThreads * kIterations);
96 }
97 
TEST(Worker,TasksAreExecutedSequentially)98 TEST(Worker, TasksAreExecutedSequentially) {
99   auto pool = CreateThreadPoolScheduler(3);
100   auto worker = pool->CreateWorker();
101   absl::Mutex mutex{};
102   std::vector<int> recorded{};
103   for (int i = 0; i < 128; i++) {
104     worker->Schedule([&mutex, &recorded, i] {
105       // Expect that no one is holding the mutex (tests for non-overlap).
106       if (mutex.TryLock()) {
107         // Add i to the recorded values (tests for execution in order).
108         recorded.push_back(i);
109         // Idle wait to be sure we don't execute faster than we schedule
110         absl::SleepFor(absl::Milliseconds(50));
111         mutex.Unlock();
112       } else {
113         FAIL() << "mutex was unexpectedly hold";
114       }
115     });
116   }
117   pool->WaitUntilIdle();
118 
119   // Verify recorded values.
120   for (int i = 0; i < 128; i++) {
121     ASSERT_EQ(recorded[i], i);
122   }
123 }
124 
125 }  // namespace
126 
127 }  // namespace base
128 }  // namespace fcp
129