1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/gprpp/fork.h"
22 
23 #include <grpc/support/atm.h>
24 #include <grpc/support/sync.h>
25 #include <grpc/support/time.h>
26 
27 #include "src/core/lib/config/config_vars.h"
28 #include "src/core/lib/event_engine/thread_local.h"
29 #include "src/core/lib/gprpp/no_destruct.h"
30 
31 //
32 // NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK
33 //       AROUND VERY SPECIFIC USE CASES.
34 //
35 
36 namespace grpc_core {
37 namespace {
38 // The exec_ctx_count has 2 modes, blocked and unblocked.
39 // When unblocked, the count is 2-indexed; exec_ctx_count=2 indicates
40 // 0 active ExecCtxs, exex_ctx_count=3 indicates 1 active ExecCtxs...
41 
42 // When blocked, the exec_ctx_count is 0-indexed.  Note that ExecCtx
43 // creation can only be blocked if there is exactly 1 outstanding ExecCtx,
44 // meaning that BLOCKED and UNBLOCKED counts partition the integers
45 #define UNBLOCKED(n) ((n) + 2)
46 #define BLOCKED(n) (n)
47 
48 class ExecCtxState {
49  public:
ExecCtxState()50   ExecCtxState() : fork_complete_(true) {
51     gpr_mu_init(&mu_);
52     gpr_cv_init(&cv_);
53     gpr_atm_no_barrier_store(&count_, UNBLOCKED(0));
54   }
55 
IncExecCtxCount()56   void IncExecCtxCount() {
57     // EventEngine is expected to terminate all threads before fork, and so this
58     // extra work is unnecessary
59     if (grpc_event_engine::experimental::ThreadLocal::IsEventEngineThread()) {
60       return;
61     }
62     gpr_atm count = gpr_atm_no_barrier_load(&count_);
63     while (true) {
64       if (count <= BLOCKED(1)) {
65         // This only occurs if we are trying to fork.  Wait until the fork()
66         // operation completes before allowing new ExecCtxs.
67         gpr_mu_lock(&mu_);
68         if (gpr_atm_no_barrier_load(&count_) <= BLOCKED(1)) {
69           while (!fork_complete_) {
70             gpr_cv_wait(&cv_, &mu_, gpr_inf_future(GPR_CLOCK_REALTIME));
71           }
72         }
73         gpr_mu_unlock(&mu_);
74       } else if (gpr_atm_no_barrier_cas(&count_, count, count + 1)) {
75         break;
76       }
77       count = gpr_atm_no_barrier_load(&count_);
78     }
79   }
80 
DecExecCtxCount()81   void DecExecCtxCount() {
82     if (grpc_event_engine::experimental::ThreadLocal::IsEventEngineThread()) {
83       return;
84     }
85     gpr_atm_no_barrier_fetch_add(&count_, -1);
86   }
87 
BlockExecCtx()88   bool BlockExecCtx() {
89     // Assumes there is an active ExecCtx when this function is called
90     if (gpr_atm_no_barrier_cas(&count_, UNBLOCKED(1), BLOCKED(1))) {
91       gpr_mu_lock(&mu_);
92       fork_complete_ = false;
93       gpr_mu_unlock(&mu_);
94       return true;
95     }
96     return false;
97   }
98 
AllowExecCtx()99   void AllowExecCtx() {
100     gpr_mu_lock(&mu_);
101     gpr_atm_no_barrier_store(&count_, UNBLOCKED(0));
102     fork_complete_ = true;
103     gpr_cv_broadcast(&cv_);
104     gpr_mu_unlock(&mu_);
105   }
106 
~ExecCtxState()107   ~ExecCtxState() {
108     gpr_mu_destroy(&mu_);
109     gpr_cv_destroy(&cv_);
110   }
111 
112  private:
113   bool fork_complete_;
114   gpr_mu mu_;
115   gpr_cv cv_;
116   gpr_atm count_;
117 };
118 
119 class ThreadState {
120  public:
ThreadState()121   ThreadState() : awaiting_threads_(false), threads_done_(false), count_(0) {
122     gpr_mu_init(&mu_);
123     gpr_cv_init(&cv_);
124   }
125 
IncThreadCount()126   void IncThreadCount() {
127     gpr_mu_lock(&mu_);
128     count_++;
129     gpr_mu_unlock(&mu_);
130   }
131 
DecThreadCount()132   void DecThreadCount() {
133     gpr_mu_lock(&mu_);
134     count_--;
135     if (awaiting_threads_ && count_ == 0) {
136       threads_done_ = true;
137       gpr_cv_signal(&cv_);
138     }
139     gpr_mu_unlock(&mu_);
140   }
AwaitThreads()141   void AwaitThreads() {
142     gpr_mu_lock(&mu_);
143     awaiting_threads_ = true;
144     threads_done_ = (count_ == 0);
145     while (!threads_done_) {
146       gpr_cv_wait(&cv_, &mu_, gpr_inf_future(GPR_CLOCK_REALTIME));
147     }
148     awaiting_threads_ = true;
149     gpr_mu_unlock(&mu_);
150   }
151 
~ThreadState()152   ~ThreadState() {
153     gpr_mu_destroy(&mu_);
154     gpr_cv_destroy(&cv_);
155   }
156 
157  private:
158   bool awaiting_threads_;
159   bool threads_done_;
160   gpr_mu mu_;
161   gpr_cv cv_;
162   int count_;
163 };
164 
165 }  // namespace
166 
GlobalInit()167 void Fork::GlobalInit() {
168   if (!override_enabled_) {
169     support_enabled_.store(ConfigVars::Get().EnableForkSupport(),
170                            std::memory_order_relaxed);
171   }
172 }
173 
Enabled()174 bool Fork::Enabled() {
175   return support_enabled_.load(std::memory_order_relaxed);
176 }
177 
178 // Testing Only
Enable(bool enable)179 void Fork::Enable(bool enable) {
180   override_enabled_ = true;
181   support_enabled_.store(enable, std::memory_order_relaxed);
182 }
183 
DoIncExecCtxCount()184 void Fork::DoIncExecCtxCount() {
185   NoDestructSingleton<ExecCtxState>::Get()->IncExecCtxCount();
186 }
187 
DoDecExecCtxCount()188 void Fork::DoDecExecCtxCount() {
189   NoDestructSingleton<ExecCtxState>::Get()->DecExecCtxCount();
190 }
191 
SetResetChildPollingEngineFunc(Fork::child_postfork_func reset_child_polling_engine)192 void Fork::SetResetChildPollingEngineFunc(
193     Fork::child_postfork_func reset_child_polling_engine) {
194   if (reset_child_polling_engine_ == nullptr) {
195     reset_child_polling_engine_ = new std::vector<Fork::child_postfork_func>();
196   }
197   if (reset_child_polling_engine == nullptr) {
198     reset_child_polling_engine_->clear();
199   } else {
200     reset_child_polling_engine_->emplace_back(reset_child_polling_engine);
201   }
202 }
203 
204 const std::vector<Fork::child_postfork_func>&
GetResetChildPollingEngineFunc()205 Fork::GetResetChildPollingEngineFunc() {
206   return *reset_child_polling_engine_;
207 }
208 
BlockExecCtx()209 bool Fork::BlockExecCtx() {
210   if (support_enabled_.load(std::memory_order_relaxed)) {
211     return NoDestructSingleton<ExecCtxState>::Get()->BlockExecCtx();
212   }
213   return false;
214 }
215 
AllowExecCtx()216 void Fork::AllowExecCtx() {
217   if (support_enabled_.load(std::memory_order_relaxed)) {
218     NoDestructSingleton<ExecCtxState>::Get()->AllowExecCtx();
219   }
220 }
221 
IncThreadCount()222 void Fork::IncThreadCount() {
223   if (support_enabled_.load(std::memory_order_relaxed)) {
224     NoDestructSingleton<ThreadState>::Get()->IncThreadCount();
225   }
226 }
227 
DecThreadCount()228 void Fork::DecThreadCount() {
229   if (support_enabled_.load(std::memory_order_relaxed)) {
230     NoDestructSingleton<ThreadState>::Get()->DecThreadCount();
231   }
232 }
AwaitThreads()233 void Fork::AwaitThreads() {
234   if (support_enabled_.load(std::memory_order_relaxed)) {
235     NoDestructSingleton<ThreadState>::Get()->AwaitThreads();
236   }
237 }
238 
239 std::atomic<bool> Fork::support_enabled_(false);
240 bool Fork::override_enabled_ = false;
241 std::vector<Fork::child_postfork_func>* Fork::reset_child_polling_engine_ =
242     nullptr;
243 }  // namespace grpc_core
244