xref: /aosp_15_r20/external/abseil-cpp/absl/synchronization/internal/thread_pool.h (revision 9356374a3709195abf420251b3e825997ff56c0f)
1 // Copyright 2017 The Abseil Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef ABSL_SYNCHRONIZATION_INTERNAL_THREAD_POOL_H_
16 #define ABSL_SYNCHRONIZATION_INTERNAL_THREAD_POOL_H_
17 
18 #include <cassert>
19 #include <cstddef>
20 #include <functional>
21 #include <queue>
22 #include <thread>  // NOLINT(build/c++11)
23 #include <utility>
24 #include <vector>
25 
26 #include "absl/base/thread_annotations.h"
27 #include "absl/functional/any_invocable.h"
28 #include "absl/synchronization/mutex.h"
29 
30 namespace absl {
31 ABSL_NAMESPACE_BEGIN
32 namespace synchronization_internal {
33 
34 // A simple ThreadPool implementation for tests.
35 class ThreadPool {
36  public:
ThreadPool(int num_threads)37   explicit ThreadPool(int num_threads) {
38     threads_.reserve(num_threads);
39     for (int i = 0; i < num_threads; ++i) {
40       threads_.push_back(std::thread(&ThreadPool::WorkLoop, this));
41     }
42   }
43 
44   ThreadPool(const ThreadPool &) = delete;
45   ThreadPool &operator=(const ThreadPool &) = delete;
46 
~ThreadPool()47   ~ThreadPool() {
48     {
49       absl::MutexLock l(&mu_);
50       for (size_t i = 0; i < threads_.size(); i++) {
51         queue_.push(nullptr);  // Shutdown signal.
52       }
53     }
54     for (auto &t : threads_) {
55       t.join();
56     }
57   }
58 
59   // Schedule a function to be run on a ThreadPool thread immediately.
Schedule(absl::AnyInvocable<void ()> func)60   void Schedule(absl::AnyInvocable<void()> func) {
61     assert(func != nullptr);
62     absl::MutexLock l(&mu_);
63     queue_.push(std::move(func));
64   }
65 
66  private:
WorkAvailable()67   bool WorkAvailable() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
68     return !queue_.empty();
69   }
70 
WorkLoop()71   void WorkLoop() {
72     while (true) {
73       absl::AnyInvocable<void()> func;
74       {
75         absl::MutexLock l(&mu_);
76         mu_.Await(absl::Condition(this, &ThreadPool::WorkAvailable));
77         func = std::move(queue_.front());
78         queue_.pop();
79       }
80       if (func == nullptr) {  // Shutdown signal.
81         break;
82       }
83       func();
84     }
85   }
86 
87   absl::Mutex mu_;
88   std::queue<absl::AnyInvocable<void()>> queue_ ABSL_GUARDED_BY(mu_);
89   std::vector<std::thread> threads_;
90 };
91 
92 }  // namespace synchronization_internal
93 ABSL_NAMESPACE_END
94 }  // namespace absl
95 
96 #endif  // ABSL_SYNCHRONIZATION_INTERNAL_THREAD_POOL_H_
97