1 /*
2 * Copyright 2018 Google LLC
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "fcp/base/scheduler.h"
18
19 #include <atomic>
20 #include <cstdlib> // for std::rand
21
22 #include "gmock/gmock.h"
23 #include "gtest/gtest.h"
24 #include "absl/synchronization/blocking_counter.h"
25 #include "fcp/base/monitoring.h"
26 #include "fcp/testing/testing.h"
27
28 namespace fcp {
29 namespace base {
30 namespace {
31
32 // NOTE: many of tests below use reference captures in lambdas for locals.
33 // This is sound because the test methods do not return before the thread
34 // pool has become idle (pool->WaitUntilIdle()).
35
36 // Tests whether scheduled tasks are successfully executed.
TEST(ThreadPool,TasksAreExecuted)37 TEST(ThreadPool, TasksAreExecuted) {
38 auto pool = CreateThreadPoolScheduler(2);
39
40 bool b1 = false;
41 bool b2 = false;
42 pool->Schedule([&b1]() { b1 = true; });
43 pool->Schedule([&b2]() { b2 = true; });
44
45 pool->WaitUntilIdle();
46
47 EXPECT_TRUE(b1);
48 EXPECT_TRUE(b2);
49 }
50
51 // Tests whether the pool uses actually multiple threads to execute tasks.
52 // The test goal is achieved by blocking in one task until another task
53 // unblocks, which can only work if multiple threads are used.
TEST(ThreadPool,ThreadsAreUtilized)54 TEST(ThreadPool, ThreadsAreUtilized) {
55 auto pool = CreateThreadPoolScheduler(2);
56
57 absl::BlockingCounter counter(1);
58 bool b1 = false;
59 bool b2 = false;
60
61 pool->Schedule([&b1, &counter] {
62 counter.Wait();
63 b1 = true;
64 });
65 pool->Schedule([&b2, &counter] {
66 counter.DecrementCount();
67 b2 = true;
68 });
69
70 pool->WaitUntilIdle();
71
72 EXPECT_TRUE(b1);
73 EXPECT_TRUE(b2);
74 }
75
TEST(ThreadPool,StressTest)76 TEST(ThreadPool, StressTest) {
77 // A simple stress test where we spawn many threads and let them after
78 // a random wait time increment a counter.
79 static constexpr int kThreads = 32;
80 static constexpr int kIterations = 16;
81 auto pool = CreateThreadPoolScheduler(kThreads);
82 std::atomic<int64_t> atomic_counter{0};
83
84 for (auto i = 0; i < kThreads; ++i) {
85 auto task = [&atomic_counter] {
86 for (auto j = 0; j < kIterations; ++j) {
87 absl::SleepFor(absl::Microseconds(std::rand() % 500));
88 atomic_counter.fetch_add(1);
89 }
90 };
91 pool->Schedule(task);
92 }
93
94 pool->WaitUntilIdle();
95 ASSERT_EQ(atomic_counter, kThreads * kIterations);
96 }
97
TEST(Worker,TasksAreExecutedSequentially)98 TEST(Worker, TasksAreExecutedSequentially) {
99 auto pool = CreateThreadPoolScheduler(3);
100 auto worker = pool->CreateWorker();
101 absl::Mutex mutex{};
102 std::vector<int> recorded{};
103 for (int i = 0; i < 128; i++) {
104 worker->Schedule([&mutex, &recorded, i] {
105 // Expect that no one is holding the mutex (tests for non-overlap).
106 if (mutex.TryLock()) {
107 // Add i to the recorded values (tests for execution in order).
108 recorded.push_back(i);
109 // Idle wait to be sure we don't execute faster than we schedule
110 absl::SleepFor(absl::Milliseconds(50));
111 mutex.Unlock();
112 } else {
113 FAIL() << "mutex was unexpectedly hold";
114 }
115 });
116 }
117 pool->WaitUntilIdle();
118
119 // Verify recorded values.
120 for (int i = 0; i < 128; i++) {
121 ASSERT_EQ(recorded[i], i);
122 }
123 }
124
125 } // namespace
126
127 } // namespace base
128 } // namespace fcp
129