1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 #ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_POOL_ORIGINAL_THREAD_POOL_H
19 #define GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_POOL_ORIGINAL_THREAD_POOL_H
20 
21 #include <grpc/support/port_platform.h>
22 
23 #include <stddef.h>
24 #include <stdint.h>
25 
26 #include <atomic>
27 #include <memory>
28 #include <queue>
29 
30 #include "absl/base/thread_annotations.h"
31 #include "absl/functional/any_invocable.h"
32 
33 #include <grpc/event_engine/event_engine.h>
34 
35 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
36 #include "src/core/lib/gprpp/sync.h"
37 
38 namespace grpc_event_engine {
39 namespace experimental {
40 
41 class OriginalThreadPool final : public ThreadPool {
42  public:
43   explicit OriginalThreadPool(size_t reserve_threads);
44   // Asserts Quiesce was called.
45   ~OriginalThreadPool() override;
46 
47   void Quiesce() override;
48 
49   // Run must not be called after Quiesce completes
50   void Run(absl::AnyInvocable<void()> callback) override;
51   void Run(EventEngine::Closure* closure) override;
52 
53   // Forkable
54   // Ensures that the thread pool is empty before forking.
55   void PrepareFork() override;
56   void PostforkParent() override;
57   void PostforkChild() override;
58 
59   // Returns true if the current thread is a thread pool thread.
60   static bool IsThreadPoolThread();
61 
62  private:
63   class Queue {
64    public:
Queue(unsigned reserve_threads)65     explicit Queue(unsigned reserve_threads)
66         : reserve_threads_(reserve_threads) {}
67     bool Step();
68     // Add a callback to the queue.
69     // Return true if we should also spin up a new thread.
70     bool Add(absl::AnyInvocable<void()> callback);
71     void SetShutdown(bool is_shutdown);
72     void SetForking(bool is_forking);
73     bool IsBacklogged();
74     void SleepIfRunning();
75 
76    private:
77     const unsigned reserve_threads_;
78     grpc_core::Mutex queue_mu_;
79     grpc_core::CondVar cv_;
80     std::queue<absl::AnyInvocable<void()>> callbacks_
81         ABSL_GUARDED_BY(queue_mu_);
82     unsigned threads_waiting_ ABSL_GUARDED_BY(queue_mu_) = 0;
83     // Track shutdown and fork bits separately.
84     // It's possible for a ThreadPool to initiate shut down while fork handlers
85     // are running, and similarly possible for a fork event to occur during
86     // shutdown.
87     bool shutdown_ ABSL_GUARDED_BY(queue_mu_) = false;
88     bool forking_ ABSL_GUARDED_BY(queue_mu_) = false;
89   };
90 
91   class ThreadCount {
92    public:
93     void Add();
94     void Remove();
95     void BlockUntilThreadCount(int threads, const char* why);
96 
97    private:
98     grpc_core::Mutex thread_count_mu_;
99     grpc_core::CondVar cv_;
100     int threads_ ABSL_GUARDED_BY(thread_count_mu_) = 0;
101   };
102 
103   struct State {
StateState104     explicit State(int reserve_threads) : queue(reserve_threads) {}
105     Queue queue;
106     ThreadCount thread_count;
107     // After pool creation we use this to rate limit creation of threads to one
108     // at a time.
109     std::atomic<bool> currently_starting_one_thread{false};
110     std::atomic<uint64_t> last_started_thread{0};
111   };
112 
113   using StatePtr = std::shared_ptr<State>;
114 
115   enum class StartThreadReason {
116     kInitialPool,
117     kNoWaitersWhenScheduling,
118     kNoWaitersWhenFinishedStarting,
119   };
120 
121   static void ThreadFunc(StatePtr state);
122   // Start a new thread; throttled indicates whether the State::starting_thread
123   // variable is being used to throttle this threads creation against others or
124   // not: at thread pool startup we start several threads concurrently, but
125   // after that we only start one at a time.
126   static void StartThread(StatePtr state, StartThreadReason reason);
127   void Postfork();
128 
129   const size_t reserve_threads_;
130   const StatePtr state_;
131   std::atomic<bool> quiesced_{false};
132 };
133 
134 }  // namespace experimental
135 }  // namespace grpc_event_engine
136 
137 #endif  // GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_POOL_ORIGINAL_THREAD_POOL_H
138