1 // Copyright 2017 The Abseil Authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef ABSL_SYNCHRONIZATION_INTERNAL_THREAD_POOL_H_ 16 #define ABSL_SYNCHRONIZATION_INTERNAL_THREAD_POOL_H_ 17 18 #include <cassert> 19 #include <cstddef> 20 #include <functional> 21 #include <queue> 22 #include <thread> // NOLINT(build/c++11) 23 #include <utility> 24 #include <vector> 25 26 #include "absl/base/thread_annotations.h" 27 #include "absl/functional/any_invocable.h" 28 #include "absl/synchronization/mutex.h" 29 30 namespace absl { 31 ABSL_NAMESPACE_BEGIN 32 namespace synchronization_internal { 33 34 // A simple ThreadPool implementation for tests. 35 class ThreadPool { 36 public: ThreadPool(int num_threads)37 explicit ThreadPool(int num_threads) { 38 threads_.reserve(num_threads); 39 for (int i = 0; i < num_threads; ++i) { 40 threads_.push_back(std::thread(&ThreadPool::WorkLoop, this)); 41 } 42 } 43 44 ThreadPool(const ThreadPool &) = delete; 45 ThreadPool &operator=(const ThreadPool &) = delete; 46 ~ThreadPool()47 ~ThreadPool() { 48 { 49 absl::MutexLock l(&mu_); 50 for (size_t i = 0; i < threads_.size(); i++) { 51 queue_.push(nullptr); // Shutdown signal. 52 } 53 } 54 for (auto &t : threads_) { 55 t.join(); 56 } 57 } 58 59 // Schedule a function to be run on a ThreadPool thread immediately. Schedule(absl::AnyInvocable<void ()> func)60 void Schedule(absl::AnyInvocable<void()> func) { 61 assert(func != nullptr); 62 absl::MutexLock l(&mu_); 63 queue_.push(std::move(func)); 64 } 65 66 private: WorkAvailable()67 bool WorkAvailable() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { 68 return !queue_.empty(); 69 } 70 WorkLoop()71 void WorkLoop() { 72 while (true) { 73 absl::AnyInvocable<void()> func; 74 { 75 absl::MutexLock l(&mu_); 76 mu_.Await(absl::Condition(this, &ThreadPool::WorkAvailable)); 77 func = std::move(queue_.front()); 78 queue_.pop(); 79 } 80 if (func == nullptr) { // Shutdown signal. 81 break; 82 } 83 func(); 84 } 85 } 86 87 absl::Mutex mu_; 88 std::queue<absl::AnyInvocable<void()>> queue_ ABSL_GUARDED_BY(mu_); 89 std::vector<std::thread> threads_; 90 }; 91 92 } // namespace synchronization_internal 93 ABSL_NAMESPACE_END 94 } // namespace absl 95 96 #endif // ABSL_SYNCHRONIZATION_INTERNAL_THREAD_POOL_H_ 97