1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPC_SRC_CORE_LIB_IOMGR_EXEC_CTX_H
20 #define GRPC_SRC_CORE_LIB_IOMGR_EXEC_CTX_H
21 
22 #include <grpc/support/port_platform.h>
23 
24 #include <limits>
25 
26 #include <grpc/impl/grpc_types.h>
27 #include <grpc/support/atm.h>
28 #include <grpc/support/cpu.h>
29 #include <grpc/support/log.h>
30 #include <grpc/support/time.h>
31 
32 #include "src/core/lib/gpr/time_precise.h"
33 #include "src/core/lib/gprpp/crash.h"
34 #include "src/core/lib/gprpp/debug_location.h"
35 #include "src/core/lib/gprpp/fork.h"
36 #include "src/core/lib/gprpp/time.h"
37 #include "src/core/lib/iomgr/closure.h"
38 
39 /// A combiner represents a list of work to be executed later.
40 /// Forward declared here to avoid a circular dependency with combiner.h.
41 typedef struct grpc_combiner grpc_combiner;
42 
43 // This exec_ctx is ready to return: either pre-populated, or cached as soon as
44 // the finish_check returns true
45 #define GRPC_EXEC_CTX_FLAG_IS_FINISHED 1
46 // The exec_ctx's thread is (potentially) owned by a call or channel: care
47 // should be given to not delete said call/channel from this exec_ctx
48 #define GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP 2
49 // This exec ctx was initialized by an internal thread, and should not
50 // be counted by fork handlers
51 #define GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD 4
52 
53 // This application callback exec ctx was initialized by an internal thread, and
54 // should not be counted by fork handlers
55 #define GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD 1
56 
57 namespace grpc_core {
58 class Combiner;
59 /// Execution context.
60 /// A bag of data that collects information along a callstack.
61 /// It is created on the stack at core entry points (public API or iomgr), and
62 /// stored internally as a thread-local variable.
63 ///
64 /// Generally, to create an exec_ctx instance, add the following line at the top
65 /// of the public API entry point or at the start of a thread's work function :
66 ///
67 /// ExecCtx exec_ctx;
68 ///
69 /// Access the created ExecCtx instance using :
70 /// ExecCtx::Get()
71 ///
72 /// Specific responsibilities (this may grow in the future):
73 /// - track a list of core work that needs to be delayed until the base of the
74 ///   call stack (this provides a convenient mechanism to run callbacks
75 ///   without worrying about locking issues)
76 /// - provide a decision maker (via IsReadyToFinish) that provides a
77 ///   signal as to whether a borrowed thread should continue to do work or
78 ///   should actively try to finish up and get this thread back to its owner
79 ///
80 /// CONVENTIONS:
81 /// - Instance of this must ALWAYS be constructed on the stack, never
82 ///   heap allocated.
83 /// - Do not pass exec_ctx as a parameter to a function. Always access it using
84 ///   ExecCtx::Get().
85 /// - NOTE: In the future, the convention is likely to change to allow only one
86 ///         ExecCtx on a thread's stack at the same time. The TODO below
87 ///         discusses this plan in more detail.
88 ///
89 /// TODO(yashykt): Only allow one "active" ExecCtx on a thread at the same time.
90 ///               Stage 1: If a new one is created on the stack, it should just
91 ///               pass-through to the underlying ExecCtx deeper in the thread's
92 ///               stack.
93 ///               Stage 2: Assert if a 2nd one is ever created on the stack
94 ///               since that implies a core re-entry outside of application
95 ///               callbacks.
96 ///
97 class ExecCtx {
98  public:
99   /// Default Constructor
100 
ExecCtx()101   ExecCtx() : flags_(GRPC_EXEC_CTX_FLAG_IS_FINISHED) {
102     Fork::IncExecCtxCount();
103     Set(this);
104   }
105 
106   /// Parameterised Constructor
ExecCtx(uintptr_t fl)107   explicit ExecCtx(uintptr_t fl) : flags_(fl) {
108     if (!(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags_)) {
109       Fork::IncExecCtxCount();
110     }
111     Set(this);
112   }
113 
114   /// Destructor
~ExecCtx()115   virtual ~ExecCtx() {
116     flags_ |= GRPC_EXEC_CTX_FLAG_IS_FINISHED;
117     Flush();
118     Set(last_exec_ctx_);
119     if (!(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags_)) {
120       Fork::DecExecCtxCount();
121     }
122   }
123 
124   /// Disallow copy and assignment operators
125   ExecCtx(const ExecCtx&) = delete;
126   ExecCtx& operator=(const ExecCtx&) = delete;
127 
starting_cpu()128   unsigned starting_cpu() {
129     if (starting_cpu_ == std::numeric_limits<unsigned>::max()) {
130       starting_cpu_ = gpr_cpu_current_cpu();
131     }
132     return starting_cpu_;
133   }
134 
135   struct CombinerData {
136     // currently active combiner: updated only via combiner.c
137     Combiner* active_combiner;
138     // last active combiner in the active combiner list
139     Combiner* last_combiner;
140   };
141 
142   /// Only to be used by grpc-combiner code
combiner_data()143   CombinerData* combiner_data() { return &combiner_data_; }
144 
145   /// Return pointer to grpc_closure_list
closure_list()146   grpc_closure_list* closure_list() { return &closure_list_; }
147 
148   /// Return flags
flags()149   uintptr_t flags() { return flags_; }
150 
151   /// Checks if there is work to be done
HasWork()152   bool HasWork() {
153     return combiner_data_.active_combiner != nullptr ||
154            !grpc_closure_list_empty(closure_list_);
155   }
156 
157   /// Flush any work that has been enqueued onto this grpc_exec_ctx.
158   /// Caller must guarantee that no interfering locks are held.
159   /// Returns true if work was performed, false otherwise.
160   ///
161   bool Flush();
162 
163   /// Returns true if we'd like to leave this execution context as soon as
164   /// possible: useful for deciding whether to do something more or not
165   /// depending on outside context.
166   ///
IsReadyToFinish()167   bool IsReadyToFinish() {
168     if ((flags_ & GRPC_EXEC_CTX_FLAG_IS_FINISHED) == 0) {
169       if (CheckReadyToFinish()) {
170         flags_ |= GRPC_EXEC_CTX_FLAG_IS_FINISHED;
171         return true;
172       }
173       return false;
174     } else {
175       return true;
176     }
177   }
178 
Now()179   Timestamp Now() { return Timestamp::Now(); }
InvalidateNow()180   void InvalidateNow() { time_cache_.InvalidateCache(); }
SetNowIomgrShutdown()181   void SetNowIomgrShutdown() {
182     // We get to do a test only set now on this path just because iomgr
183     // is getting removed and no point adding more interfaces for it.
184     time_cache_.TestOnlySetNow(Timestamp::InfFuture());
185   }
TestOnlySetNow(Timestamp now)186   void TestOnlySetNow(Timestamp now) { time_cache_.TestOnlySetNow(now); }
187 
188   /// Gets pointer to current exec_ctx.
Get()189   static ExecCtx* Get() { return exec_ctx_; }
190 
191   static void Run(const DebugLocation& location, grpc_closure* closure,
192                   grpc_error_handle error);
193 
194   static void RunList(const DebugLocation& location, grpc_closure_list* list);
195 
196  protected:
197   /// Check if ready to finish.
CheckReadyToFinish()198   virtual bool CheckReadyToFinish() { return false; }
199 
200   /// Disallow delete on ExecCtx.
delete(void *)201   static void operator delete(void* /* p */) { abort(); }
202 
203  private:
204   /// Set exec_ctx_ to exec_ctx.
Set(ExecCtx * exec_ctx)205   static void Set(ExecCtx* exec_ctx) { exec_ctx_ = exec_ctx; }
206 
207   grpc_closure_list closure_list_ = GRPC_CLOSURE_LIST_INIT;
208   CombinerData combiner_data_ = {nullptr, nullptr};
209   uintptr_t flags_;
210 
211   unsigned starting_cpu_ = std::numeric_limits<unsigned>::max();
212 
213   ScopedTimeCache time_cache_;
214   static thread_local ExecCtx* exec_ctx_;
215   ExecCtx* last_exec_ctx_ = Get();
216 };
217 
218 /// Application-callback execution context.
219 /// A bag of data that collects information along a callstack.
220 /// It is created on the stack at core entry points, and stored internally
221 /// as a thread-local variable.
222 ///
223 /// There are three key differences between this structure and ExecCtx:
224 ///   1. ApplicationCallbackExecCtx builds a list of application-level
225 ///      callbacks, but ExecCtx builds a list of internal callbacks to invoke.
226 ///   2. ApplicationCallbackExecCtx invokes its callbacks only at destruction;
227 ///      there is no explicit Flush method.
228 ///   3. If more than one ApplicationCallbackExecCtx is created on the thread's
229 ///      stack, only the one closest to the base of the stack is actually
230 ///      active and this is the only one that enqueues application callbacks.
231 ///      (Unlike ExecCtx, it is not feasible to prevent multiple of these on the
232 ///      stack since the executing application callback may itself enter core.
233 ///      However, the new one created will just pass callbacks through to the
234 ///      base one and those will not be executed until the return to the
235 ///      destructor of the base one, preventing unlimited stack growth.)
236 ///
237 /// This structure exists because application callbacks may themselves cause a
238 /// core re-entry (e.g., through a public API call) and if that call in turn
239 /// causes another application-callback, there could be arbitrarily growing
240 /// stacks of core re-entries. Instead, any application callbacks instead should
241 /// not be invoked until other core work is done and other application callbacks
242 /// have completed. To accomplish this, any application callback should be
243 /// enqueued using ApplicationCallbackExecCtx::Enqueue .
244 ///
245 /// CONVENTIONS:
246 /// - Instances of this must ALWAYS be constructed on the stack, never
247 ///   heap allocated.
248 /// - Instances of this are generally constructed before ExecCtx when needed.
249 ///   The only exception is for ExecCtx's that are explicitly flushed and
250 ///   that survive beyond the scope of the function that can cause application
251 ///   callbacks to be invoked (e.g., in the timer thread).
252 ///
253 /// Generally, core entry points that may trigger application-level callbacks
254 /// will have the following declarations:
255 ///
256 /// ApplicationCallbackExecCtx callback_exec_ctx;
257 /// ExecCtx exec_ctx;
258 ///
259 /// This ordering is important to make sure that the ApplicationCallbackExecCtx
260 /// is destroyed after the ExecCtx (to prevent the re-entry problem described
261 /// above, as well as making sure that ExecCtx core callbacks are invoked first)
262 ///
263 ///
264 
265 class ApplicationCallbackExecCtx {
266  public:
267   /// Default Constructor
ApplicationCallbackExecCtx()268   ApplicationCallbackExecCtx() { Set(this, flags_); }
269 
270   /// Parameterised Constructor
ApplicationCallbackExecCtx(uintptr_t fl)271   explicit ApplicationCallbackExecCtx(uintptr_t fl) : flags_(fl) {
272     Set(this, flags_);
273   }
274 
~ApplicationCallbackExecCtx()275   ~ApplicationCallbackExecCtx() {
276     if (Get() == this) {
277       while (head_ != nullptr) {
278         auto* f = head_;
279         head_ = f->internal_next;
280         if (f->internal_next == nullptr) {
281           tail_ = nullptr;
282         }
283         (*f->functor_run)(f, f->internal_success);
284       }
285       callback_exec_ctx_ = nullptr;
286       if (!(GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags_)) {
287         Fork::DecExecCtxCount();
288       }
289     } else {
290       GPR_DEBUG_ASSERT(head_ == nullptr);
291       GPR_DEBUG_ASSERT(tail_ == nullptr);
292     }
293   }
294 
Flags()295   uintptr_t Flags() { return flags_; }
296 
Get()297   static ApplicationCallbackExecCtx* Get() { return callback_exec_ctx_; }
298 
Set(ApplicationCallbackExecCtx * exec_ctx,uintptr_t flags)299   static void Set(ApplicationCallbackExecCtx* exec_ctx, uintptr_t flags) {
300     if (Get() == nullptr) {
301       if (!(GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags)) {
302         Fork::IncExecCtxCount();
303       }
304       callback_exec_ctx_ = exec_ctx;
305     }
306   }
307 
Enqueue(grpc_completion_queue_functor * functor,int is_success)308   static void Enqueue(grpc_completion_queue_functor* functor, int is_success) {
309     functor->internal_success = is_success;
310     functor->internal_next = nullptr;
311 
312     ApplicationCallbackExecCtx* ctx = Get();
313 
314     if (ctx->head_ == nullptr) {
315       ctx->head_ = functor;
316     }
317     if (ctx->tail_ != nullptr) {
318       ctx->tail_->internal_next = functor;
319     }
320     ctx->tail_ = functor;
321   }
322 
Available()323   static bool Available() { return Get() != nullptr; }
324 
325  private:
326   uintptr_t flags_{0u};
327   grpc_completion_queue_functor* head_{nullptr};
328   grpc_completion_queue_functor* tail_{nullptr};
329   static thread_local ApplicationCallbackExecCtx* callback_exec_ctx_;
330 };
331 
332 }  // namespace grpc_core
333 
334 #endif  // GRPC_SRC_CORE_LIB_IOMGR_EXEC_CTX_H
335