1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/gprpp/fork.h"
22
23 #include <grpc/support/atm.h>
24 #include <grpc/support/sync.h>
25 #include <grpc/support/time.h>
26
27 #include "src/core/lib/config/config_vars.h"
28 #include "src/core/lib/event_engine/thread_local.h"
29 #include "src/core/lib/gprpp/no_destruct.h"
30
31 //
32 // NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK
33 // AROUND VERY SPECIFIC USE CASES.
34 //
35
36 namespace grpc_core {
37 namespace {
38 // The exec_ctx_count has 2 modes, blocked and unblocked.
39 // When unblocked, the count is 2-indexed; exec_ctx_count=2 indicates
40 // 0 active ExecCtxs, exex_ctx_count=3 indicates 1 active ExecCtxs...
41
42 // When blocked, the exec_ctx_count is 0-indexed. Note that ExecCtx
43 // creation can only be blocked if there is exactly 1 outstanding ExecCtx,
44 // meaning that BLOCKED and UNBLOCKED counts partition the integers
45 #define UNBLOCKED(n) ((n) + 2)
46 #define BLOCKED(n) (n)
47
48 class ExecCtxState {
49 public:
ExecCtxState()50 ExecCtxState() : fork_complete_(true) {
51 gpr_mu_init(&mu_);
52 gpr_cv_init(&cv_);
53 gpr_atm_no_barrier_store(&count_, UNBLOCKED(0));
54 }
55
IncExecCtxCount()56 void IncExecCtxCount() {
57 // EventEngine is expected to terminate all threads before fork, and so this
58 // extra work is unnecessary
59 if (grpc_event_engine::experimental::ThreadLocal::IsEventEngineThread()) {
60 return;
61 }
62 gpr_atm count = gpr_atm_no_barrier_load(&count_);
63 while (true) {
64 if (count <= BLOCKED(1)) {
65 // This only occurs if we are trying to fork. Wait until the fork()
66 // operation completes before allowing new ExecCtxs.
67 gpr_mu_lock(&mu_);
68 if (gpr_atm_no_barrier_load(&count_) <= BLOCKED(1)) {
69 while (!fork_complete_) {
70 gpr_cv_wait(&cv_, &mu_, gpr_inf_future(GPR_CLOCK_REALTIME));
71 }
72 }
73 gpr_mu_unlock(&mu_);
74 } else if (gpr_atm_no_barrier_cas(&count_, count, count + 1)) {
75 break;
76 }
77 count = gpr_atm_no_barrier_load(&count_);
78 }
79 }
80
DecExecCtxCount()81 void DecExecCtxCount() {
82 if (grpc_event_engine::experimental::ThreadLocal::IsEventEngineThread()) {
83 return;
84 }
85 gpr_atm_no_barrier_fetch_add(&count_, -1);
86 }
87
BlockExecCtx()88 bool BlockExecCtx() {
89 // Assumes there is an active ExecCtx when this function is called
90 if (gpr_atm_no_barrier_cas(&count_, UNBLOCKED(1), BLOCKED(1))) {
91 gpr_mu_lock(&mu_);
92 fork_complete_ = false;
93 gpr_mu_unlock(&mu_);
94 return true;
95 }
96 return false;
97 }
98
AllowExecCtx()99 void AllowExecCtx() {
100 gpr_mu_lock(&mu_);
101 gpr_atm_no_barrier_store(&count_, UNBLOCKED(0));
102 fork_complete_ = true;
103 gpr_cv_broadcast(&cv_);
104 gpr_mu_unlock(&mu_);
105 }
106
~ExecCtxState()107 ~ExecCtxState() {
108 gpr_mu_destroy(&mu_);
109 gpr_cv_destroy(&cv_);
110 }
111
112 private:
113 bool fork_complete_;
114 gpr_mu mu_;
115 gpr_cv cv_;
116 gpr_atm count_;
117 };
118
119 class ThreadState {
120 public:
ThreadState()121 ThreadState() : awaiting_threads_(false), threads_done_(false), count_(0) {
122 gpr_mu_init(&mu_);
123 gpr_cv_init(&cv_);
124 }
125
IncThreadCount()126 void IncThreadCount() {
127 gpr_mu_lock(&mu_);
128 count_++;
129 gpr_mu_unlock(&mu_);
130 }
131
DecThreadCount()132 void DecThreadCount() {
133 gpr_mu_lock(&mu_);
134 count_--;
135 if (awaiting_threads_ && count_ == 0) {
136 threads_done_ = true;
137 gpr_cv_signal(&cv_);
138 }
139 gpr_mu_unlock(&mu_);
140 }
AwaitThreads()141 void AwaitThreads() {
142 gpr_mu_lock(&mu_);
143 awaiting_threads_ = true;
144 threads_done_ = (count_ == 0);
145 while (!threads_done_) {
146 gpr_cv_wait(&cv_, &mu_, gpr_inf_future(GPR_CLOCK_REALTIME));
147 }
148 awaiting_threads_ = true;
149 gpr_mu_unlock(&mu_);
150 }
151
~ThreadState()152 ~ThreadState() {
153 gpr_mu_destroy(&mu_);
154 gpr_cv_destroy(&cv_);
155 }
156
157 private:
158 bool awaiting_threads_;
159 bool threads_done_;
160 gpr_mu mu_;
161 gpr_cv cv_;
162 int count_;
163 };
164
165 } // namespace
166
GlobalInit()167 void Fork::GlobalInit() {
168 if (!override_enabled_) {
169 support_enabled_.store(ConfigVars::Get().EnableForkSupport(),
170 std::memory_order_relaxed);
171 }
172 }
173
Enabled()174 bool Fork::Enabled() {
175 return support_enabled_.load(std::memory_order_relaxed);
176 }
177
178 // Testing Only
Enable(bool enable)179 void Fork::Enable(bool enable) {
180 override_enabled_ = true;
181 support_enabled_.store(enable, std::memory_order_relaxed);
182 }
183
DoIncExecCtxCount()184 void Fork::DoIncExecCtxCount() {
185 NoDestructSingleton<ExecCtxState>::Get()->IncExecCtxCount();
186 }
187
DoDecExecCtxCount()188 void Fork::DoDecExecCtxCount() {
189 NoDestructSingleton<ExecCtxState>::Get()->DecExecCtxCount();
190 }
191
SetResetChildPollingEngineFunc(Fork::child_postfork_func reset_child_polling_engine)192 void Fork::SetResetChildPollingEngineFunc(
193 Fork::child_postfork_func reset_child_polling_engine) {
194 if (reset_child_polling_engine_ == nullptr) {
195 reset_child_polling_engine_ = new std::vector<Fork::child_postfork_func>();
196 }
197 if (reset_child_polling_engine == nullptr) {
198 reset_child_polling_engine_->clear();
199 } else {
200 reset_child_polling_engine_->emplace_back(reset_child_polling_engine);
201 }
202 }
203
204 const std::vector<Fork::child_postfork_func>&
GetResetChildPollingEngineFunc()205 Fork::GetResetChildPollingEngineFunc() {
206 return *reset_child_polling_engine_;
207 }
208
BlockExecCtx()209 bool Fork::BlockExecCtx() {
210 if (support_enabled_.load(std::memory_order_relaxed)) {
211 return NoDestructSingleton<ExecCtxState>::Get()->BlockExecCtx();
212 }
213 return false;
214 }
215
AllowExecCtx()216 void Fork::AllowExecCtx() {
217 if (support_enabled_.load(std::memory_order_relaxed)) {
218 NoDestructSingleton<ExecCtxState>::Get()->AllowExecCtx();
219 }
220 }
221
IncThreadCount()222 void Fork::IncThreadCount() {
223 if (support_enabled_.load(std::memory_order_relaxed)) {
224 NoDestructSingleton<ThreadState>::Get()->IncThreadCount();
225 }
226 }
227
DecThreadCount()228 void Fork::DecThreadCount() {
229 if (support_enabled_.load(std::memory_order_relaxed)) {
230 NoDestructSingleton<ThreadState>::Get()->DecThreadCount();
231 }
232 }
AwaitThreads()233 void Fork::AwaitThreads() {
234 if (support_enabled_.load(std::memory_order_relaxed)) {
235 NoDestructSingleton<ThreadState>::Get()->AwaitThreads();
236 }
237 }
238
239 std::atomic<bool> Fork::support_enabled_(false);
240 bool Fork::override_enabled_ = false;
241 std::vector<Fork::child_postfork_func>* Fork::reset_child_polling_engine_ =
242 nullptr;
243 } // namespace grpc_core
244