1 // Copyright 2021 The libgav1 Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/utils/threadpool.h"
16
17 #include <cassert>
18 #include <cstdint>
19 #include <memory>
20
21 #include "absl/synchronization/mutex.h"
22 #include "absl/time/clock.h"
23 #include "absl/time/time.h"
24 #include "gtest/gtest.h"
25 #include "src/utils/compiler_attributes.h"
26 #include "src/utils/executor.h"
27
28 namespace libgav1 {
29 namespace {
30
31 class SimpleGuardedInteger {
32 public:
SimpleGuardedInteger(int initial_value)33 explicit SimpleGuardedInteger(int initial_value) : value_(initial_value) {}
34 SimpleGuardedInteger(const SimpleGuardedInteger&) = delete;
35 SimpleGuardedInteger& operator=(const SimpleGuardedInteger&) = delete;
36
Decrement()37 void Decrement() {
38 absl::MutexLock l(&mutex_);
39 assert(value_ >= 1);
40 --value_;
41 changed_.SignalAll();
42 }
43
Increment()44 void Increment() {
45 absl::MutexLock l(&mutex_);
46 ++value_;
47 changed_.SignalAll();
48 }
49
Value()50 int Value() {
51 absl::MutexLock l(&mutex_);
52 return value_;
53 }
54
WaitForZero()55 void WaitForZero() {
56 absl::MutexLock l(&mutex_);
57 while (value_ != 0) {
58 changed_.Wait(&mutex_);
59 }
60 }
61
62 private:
63 absl::Mutex mutex_;
64 absl::CondVar changed_;
65 int value_ LIBGAV1_GUARDED_BY(mutex_);
66 };
67
68 // Loops for |milliseconds| of wall-clock time.
LoopForMs(int64_t milliseconds)69 void LoopForMs(int64_t milliseconds) {
70 const absl::Time deadline = absl::Now() + absl::Milliseconds(milliseconds);
71 while (absl::Now() < deadline) {
72 }
73 }
74
75 // A function that increments the given integer.
IncrementIntegerJob(SimpleGuardedInteger * value)76 void IncrementIntegerJob(SimpleGuardedInteger* value) {
77 LoopForMs(100);
78 value->Increment();
79 }
80
TEST(ThreadPoolTest,ThreadedIntegerIncrement)81 TEST(ThreadPoolTest, ThreadedIntegerIncrement) {
82 std::unique_ptr<ThreadPool> thread_pool = ThreadPool::Create(100);
83 ASSERT_NE(thread_pool, nullptr);
84 EXPECT_EQ(thread_pool->num_threads(), 100);
85 SimpleGuardedInteger count(0);
86 for (int i = 0; i < 1000; ++i) {
87 thread_pool->Schedule([&count]() { IncrementIntegerJob(&count); });
88 }
89 thread_pool.reset(nullptr);
90 EXPECT_EQ(count.Value(), 1000);
91 }
92
93 // Test a ThreadPool via the Executor interface.
TEST(ThreadPoolTest,ExecutorInterface)94 TEST(ThreadPoolTest, ExecutorInterface) {
95 std::unique_ptr<ThreadPool> thread_pool = ThreadPool::Create(100);
96 ASSERT_NE(thread_pool, nullptr);
97 std::unique_ptr<Executor> executor(thread_pool.release());
98 SimpleGuardedInteger count(0);
99 for (int i = 0; i < 1000; ++i) {
100 executor->Schedule([&count]() { IncrementIntegerJob(&count); });
101 }
102 executor.reset(nullptr);
103 EXPECT_EQ(count.Value(), 1000);
104 }
105
TEST(ThreadPoolTest,DestroyWithoutUse)106 TEST(ThreadPoolTest, DestroyWithoutUse) {
107 std::unique_ptr<ThreadPool> thread_pool = ThreadPool::Create(100);
108 EXPECT_NE(thread_pool, nullptr);
109 thread_pool.reset(nullptr);
110 }
111
112 // If num_threads is 0, ThreadPool::Create() should return a null pointer.
TEST(ThreadPoolTest,NumThreadsZero)113 TEST(ThreadPoolTest, NumThreadsZero) {
114 std::unique_ptr<ThreadPool> thread_pool = ThreadPool::Create(0);
115 EXPECT_EQ(thread_pool, nullptr);
116 }
117
118 // If num_threads is 1, the closures are run in FIFO order.
TEST(ThreadPoolTest,OneThreadRunsClosuresFIFO)119 TEST(ThreadPoolTest, OneThreadRunsClosuresFIFO) {
120 int count = 0; // Declare first so that it outlives the thread pool.
121 std::unique_ptr<ThreadPool> pool = ThreadPool::Create(1);
122 ASSERT_NE(pool, nullptr);
123 EXPECT_EQ(pool->num_threads(), 1);
124 for (int i = 0; i < 1000; ++i) {
125 pool->Schedule([&count, i]() {
126 EXPECT_EQ(count, i);
127 count++;
128 });
129 }
130 }
131
132 } // namespace
133 } // namespace libgav1
134