1 // 2 // 3 // Copyright 2015 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 // 18 #ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_POOL_ORIGINAL_THREAD_POOL_H 19 #define GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_POOL_ORIGINAL_THREAD_POOL_H 20 21 #include <grpc/support/port_platform.h> 22 23 #include <stddef.h> 24 #include <stdint.h> 25 26 #include <atomic> 27 #include <memory> 28 #include <queue> 29 30 #include "absl/base/thread_annotations.h" 31 #include "absl/functional/any_invocable.h" 32 33 #include <grpc/event_engine/event_engine.h> 34 35 #include "src/core/lib/event_engine/thread_pool/thread_pool.h" 36 #include "src/core/lib/gprpp/sync.h" 37 38 namespace grpc_event_engine { 39 namespace experimental { 40 41 class OriginalThreadPool final : public ThreadPool { 42 public: 43 explicit OriginalThreadPool(size_t reserve_threads); 44 // Asserts Quiesce was called. 45 ~OriginalThreadPool() override; 46 47 void Quiesce() override; 48 49 // Run must not be called after Quiesce completes 50 void Run(absl::AnyInvocable<void()> callback) override; 51 void Run(EventEngine::Closure* closure) override; 52 53 // Forkable 54 // Ensures that the thread pool is empty before forking. 55 void PrepareFork() override; 56 void PostforkParent() override; 57 void PostforkChild() override; 58 59 // Returns true if the current thread is a thread pool thread. 60 static bool IsThreadPoolThread(); 61 62 private: 63 class Queue { 64 public: Queue(unsigned reserve_threads)65 explicit Queue(unsigned reserve_threads) 66 : reserve_threads_(reserve_threads) {} 67 bool Step(); 68 // Add a callback to the queue. 69 // Return true if we should also spin up a new thread. 70 bool Add(absl::AnyInvocable<void()> callback); 71 void SetShutdown(bool is_shutdown); 72 void SetForking(bool is_forking); 73 bool IsBacklogged(); 74 void SleepIfRunning(); 75 76 private: 77 const unsigned reserve_threads_; 78 grpc_core::Mutex queue_mu_; 79 grpc_core::CondVar cv_; 80 std::queue<absl::AnyInvocable<void()>> callbacks_ 81 ABSL_GUARDED_BY(queue_mu_); 82 unsigned threads_waiting_ ABSL_GUARDED_BY(queue_mu_) = 0; 83 // Track shutdown and fork bits separately. 84 // It's possible for a ThreadPool to initiate shut down while fork handlers 85 // are running, and similarly possible for a fork event to occur during 86 // shutdown. 87 bool shutdown_ ABSL_GUARDED_BY(queue_mu_) = false; 88 bool forking_ ABSL_GUARDED_BY(queue_mu_) = false; 89 }; 90 91 class ThreadCount { 92 public: 93 void Add(); 94 void Remove(); 95 void BlockUntilThreadCount(int threads, const char* why); 96 97 private: 98 grpc_core::Mutex thread_count_mu_; 99 grpc_core::CondVar cv_; 100 int threads_ ABSL_GUARDED_BY(thread_count_mu_) = 0; 101 }; 102 103 struct State { StateState104 explicit State(int reserve_threads) : queue(reserve_threads) {} 105 Queue queue; 106 ThreadCount thread_count; 107 // After pool creation we use this to rate limit creation of threads to one 108 // at a time. 109 std::atomic<bool> currently_starting_one_thread{false}; 110 std::atomic<uint64_t> last_started_thread{0}; 111 }; 112 113 using StatePtr = std::shared_ptr<State>; 114 115 enum class StartThreadReason { 116 kInitialPool, 117 kNoWaitersWhenScheduling, 118 kNoWaitersWhenFinishedStarting, 119 }; 120 121 static void ThreadFunc(StatePtr state); 122 // Start a new thread; throttled indicates whether the State::starting_thread 123 // variable is being used to throttle this threads creation against others or 124 // not: at thread pool startup we start several threads concurrently, but 125 // after that we only start one at a time. 126 static void StartThread(StatePtr state, StartThreadReason reason); 127 void Postfork(); 128 129 const size_t reserve_threads_; 130 const StatePtr state_; 131 std::atomic<bool> quiesced_{false}; 132 }; 133 134 } // namespace experimental 135 } // namespace grpc_event_engine 136 137 #endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_POOL_ORIGINAL_THREAD_POOL_H 138