1 // 2 // 3 // Copyright 2015 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 // 18 19 #ifndef GRPC_SRC_CORE_LIB_IOMGR_EXEC_CTX_H 20 #define GRPC_SRC_CORE_LIB_IOMGR_EXEC_CTX_H 21 22 #include <grpc/support/port_platform.h> 23 24 #include <limits> 25 26 #include <grpc/impl/grpc_types.h> 27 #include <grpc/support/atm.h> 28 #include <grpc/support/cpu.h> 29 #include <grpc/support/log.h> 30 #include <grpc/support/time.h> 31 32 #include "src/core/lib/gpr/time_precise.h" 33 #include "src/core/lib/gprpp/crash.h" 34 #include "src/core/lib/gprpp/debug_location.h" 35 #include "src/core/lib/gprpp/fork.h" 36 #include "src/core/lib/gprpp/time.h" 37 #include "src/core/lib/iomgr/closure.h" 38 39 /// A combiner represents a list of work to be executed later. 40 /// Forward declared here to avoid a circular dependency with combiner.h. 41 typedef struct grpc_combiner grpc_combiner; 42 43 // This exec_ctx is ready to return: either pre-populated, or cached as soon as 44 // the finish_check returns true 45 #define GRPC_EXEC_CTX_FLAG_IS_FINISHED 1 46 // The exec_ctx's thread is (potentially) owned by a call or channel: care 47 // should be given to not delete said call/channel from this exec_ctx 48 #define GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP 2 49 // This exec ctx was initialized by an internal thread, and should not 50 // be counted by fork handlers 51 #define GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD 4 52 53 // This application callback exec ctx was initialized by an internal thread, and 54 // should not be counted by fork handlers 55 #define GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD 1 56 57 namespace grpc_core { 58 class Combiner; 59 /// Execution context. 60 /// A bag of data that collects information along a callstack. 61 /// It is created on the stack at core entry points (public API or iomgr), and 62 /// stored internally as a thread-local variable. 63 /// 64 /// Generally, to create an exec_ctx instance, add the following line at the top 65 /// of the public API entry point or at the start of a thread's work function : 66 /// 67 /// ExecCtx exec_ctx; 68 /// 69 /// Access the created ExecCtx instance using : 70 /// ExecCtx::Get() 71 /// 72 /// Specific responsibilities (this may grow in the future): 73 /// - track a list of core work that needs to be delayed until the base of the 74 /// call stack (this provides a convenient mechanism to run callbacks 75 /// without worrying about locking issues) 76 /// - provide a decision maker (via IsReadyToFinish) that provides a 77 /// signal as to whether a borrowed thread should continue to do work or 78 /// should actively try to finish up and get this thread back to its owner 79 /// 80 /// CONVENTIONS: 81 /// - Instance of this must ALWAYS be constructed on the stack, never 82 /// heap allocated. 83 /// - Do not pass exec_ctx as a parameter to a function. Always access it using 84 /// ExecCtx::Get(). 85 /// - NOTE: In the future, the convention is likely to change to allow only one 86 /// ExecCtx on a thread's stack at the same time. The TODO below 87 /// discusses this plan in more detail. 88 /// 89 /// TODO(yashykt): Only allow one "active" ExecCtx on a thread at the same time. 90 /// Stage 1: If a new one is created on the stack, it should just 91 /// pass-through to the underlying ExecCtx deeper in the thread's 92 /// stack. 93 /// Stage 2: Assert if a 2nd one is ever created on the stack 94 /// since that implies a core re-entry outside of application 95 /// callbacks. 96 /// 97 class ExecCtx { 98 public: 99 /// Default Constructor 100 ExecCtx()101 ExecCtx() : flags_(GRPC_EXEC_CTX_FLAG_IS_FINISHED) { 102 Fork::IncExecCtxCount(); 103 Set(this); 104 } 105 106 /// Parameterised Constructor ExecCtx(uintptr_t fl)107 explicit ExecCtx(uintptr_t fl) : flags_(fl) { 108 if (!(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags_)) { 109 Fork::IncExecCtxCount(); 110 } 111 Set(this); 112 } 113 114 /// Destructor ~ExecCtx()115 virtual ~ExecCtx() { 116 flags_ |= GRPC_EXEC_CTX_FLAG_IS_FINISHED; 117 Flush(); 118 Set(last_exec_ctx_); 119 if (!(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags_)) { 120 Fork::DecExecCtxCount(); 121 } 122 } 123 124 /// Disallow copy and assignment operators 125 ExecCtx(const ExecCtx&) = delete; 126 ExecCtx& operator=(const ExecCtx&) = delete; 127 starting_cpu()128 unsigned starting_cpu() { 129 if (starting_cpu_ == std::numeric_limits<unsigned>::max()) { 130 starting_cpu_ = gpr_cpu_current_cpu(); 131 } 132 return starting_cpu_; 133 } 134 135 struct CombinerData { 136 // currently active combiner: updated only via combiner.c 137 Combiner* active_combiner; 138 // last active combiner in the active combiner list 139 Combiner* last_combiner; 140 }; 141 142 /// Only to be used by grpc-combiner code combiner_data()143 CombinerData* combiner_data() { return &combiner_data_; } 144 145 /// Return pointer to grpc_closure_list closure_list()146 grpc_closure_list* closure_list() { return &closure_list_; } 147 148 /// Return flags flags()149 uintptr_t flags() { return flags_; } 150 151 /// Checks if there is work to be done HasWork()152 bool HasWork() { 153 return combiner_data_.active_combiner != nullptr || 154 !grpc_closure_list_empty(closure_list_); 155 } 156 157 /// Flush any work that has been enqueued onto this grpc_exec_ctx. 158 /// Caller must guarantee that no interfering locks are held. 159 /// Returns true if work was performed, false otherwise. 160 /// 161 bool Flush(); 162 163 /// Returns true if we'd like to leave this execution context as soon as 164 /// possible: useful for deciding whether to do something more or not 165 /// depending on outside context. 166 /// IsReadyToFinish()167 bool IsReadyToFinish() { 168 if ((flags_ & GRPC_EXEC_CTX_FLAG_IS_FINISHED) == 0) { 169 if (CheckReadyToFinish()) { 170 flags_ |= GRPC_EXEC_CTX_FLAG_IS_FINISHED; 171 return true; 172 } 173 return false; 174 } else { 175 return true; 176 } 177 } 178 Now()179 Timestamp Now() { return Timestamp::Now(); } InvalidateNow()180 void InvalidateNow() { time_cache_.InvalidateCache(); } SetNowIomgrShutdown()181 void SetNowIomgrShutdown() { 182 // We get to do a test only set now on this path just because iomgr 183 // is getting removed and no point adding more interfaces for it. 184 time_cache_.TestOnlySetNow(Timestamp::InfFuture()); 185 } TestOnlySetNow(Timestamp now)186 void TestOnlySetNow(Timestamp now) { time_cache_.TestOnlySetNow(now); } 187 188 /// Gets pointer to current exec_ctx. Get()189 static ExecCtx* Get() { return exec_ctx_; } 190 191 static void Run(const DebugLocation& location, grpc_closure* closure, 192 grpc_error_handle error); 193 194 static void RunList(const DebugLocation& location, grpc_closure_list* list); 195 196 protected: 197 /// Check if ready to finish. CheckReadyToFinish()198 virtual bool CheckReadyToFinish() { return false; } 199 200 /// Disallow delete on ExecCtx. delete(void *)201 static void operator delete(void* /* p */) { abort(); } 202 203 private: 204 /// Set exec_ctx_ to exec_ctx. Set(ExecCtx * exec_ctx)205 static void Set(ExecCtx* exec_ctx) { exec_ctx_ = exec_ctx; } 206 207 grpc_closure_list closure_list_ = GRPC_CLOSURE_LIST_INIT; 208 CombinerData combiner_data_ = {nullptr, nullptr}; 209 uintptr_t flags_; 210 211 unsigned starting_cpu_ = std::numeric_limits<unsigned>::max(); 212 213 ScopedTimeCache time_cache_; 214 static thread_local ExecCtx* exec_ctx_; 215 ExecCtx* last_exec_ctx_ = Get(); 216 }; 217 218 /// Application-callback execution context. 219 /// A bag of data that collects information along a callstack. 220 /// It is created on the stack at core entry points, and stored internally 221 /// as a thread-local variable. 222 /// 223 /// There are three key differences between this structure and ExecCtx: 224 /// 1. ApplicationCallbackExecCtx builds a list of application-level 225 /// callbacks, but ExecCtx builds a list of internal callbacks to invoke. 226 /// 2. ApplicationCallbackExecCtx invokes its callbacks only at destruction; 227 /// there is no explicit Flush method. 228 /// 3. If more than one ApplicationCallbackExecCtx is created on the thread's 229 /// stack, only the one closest to the base of the stack is actually 230 /// active and this is the only one that enqueues application callbacks. 231 /// (Unlike ExecCtx, it is not feasible to prevent multiple of these on the 232 /// stack since the executing application callback may itself enter core. 233 /// However, the new one created will just pass callbacks through to the 234 /// base one and those will not be executed until the return to the 235 /// destructor of the base one, preventing unlimited stack growth.) 236 /// 237 /// This structure exists because application callbacks may themselves cause a 238 /// core re-entry (e.g., through a public API call) and if that call in turn 239 /// causes another application-callback, there could be arbitrarily growing 240 /// stacks of core re-entries. Instead, any application callbacks instead should 241 /// not be invoked until other core work is done and other application callbacks 242 /// have completed. To accomplish this, any application callback should be 243 /// enqueued using ApplicationCallbackExecCtx::Enqueue . 244 /// 245 /// CONVENTIONS: 246 /// - Instances of this must ALWAYS be constructed on the stack, never 247 /// heap allocated. 248 /// - Instances of this are generally constructed before ExecCtx when needed. 249 /// The only exception is for ExecCtx's that are explicitly flushed and 250 /// that survive beyond the scope of the function that can cause application 251 /// callbacks to be invoked (e.g., in the timer thread). 252 /// 253 /// Generally, core entry points that may trigger application-level callbacks 254 /// will have the following declarations: 255 /// 256 /// ApplicationCallbackExecCtx callback_exec_ctx; 257 /// ExecCtx exec_ctx; 258 /// 259 /// This ordering is important to make sure that the ApplicationCallbackExecCtx 260 /// is destroyed after the ExecCtx (to prevent the re-entry problem described 261 /// above, as well as making sure that ExecCtx core callbacks are invoked first) 262 /// 263 /// 264 265 class ApplicationCallbackExecCtx { 266 public: 267 /// Default Constructor ApplicationCallbackExecCtx()268 ApplicationCallbackExecCtx() { Set(this, flags_); } 269 270 /// Parameterised Constructor ApplicationCallbackExecCtx(uintptr_t fl)271 explicit ApplicationCallbackExecCtx(uintptr_t fl) : flags_(fl) { 272 Set(this, flags_); 273 } 274 ~ApplicationCallbackExecCtx()275 ~ApplicationCallbackExecCtx() { 276 if (Get() == this) { 277 while (head_ != nullptr) { 278 auto* f = head_; 279 head_ = f->internal_next; 280 if (f->internal_next == nullptr) { 281 tail_ = nullptr; 282 } 283 (*f->functor_run)(f, f->internal_success); 284 } 285 callback_exec_ctx_ = nullptr; 286 if (!(GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags_)) { 287 Fork::DecExecCtxCount(); 288 } 289 } else { 290 GPR_DEBUG_ASSERT(head_ == nullptr); 291 GPR_DEBUG_ASSERT(tail_ == nullptr); 292 } 293 } 294 Flags()295 uintptr_t Flags() { return flags_; } 296 Get()297 static ApplicationCallbackExecCtx* Get() { return callback_exec_ctx_; } 298 Set(ApplicationCallbackExecCtx * exec_ctx,uintptr_t flags)299 static void Set(ApplicationCallbackExecCtx* exec_ctx, uintptr_t flags) { 300 if (Get() == nullptr) { 301 if (!(GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags)) { 302 Fork::IncExecCtxCount(); 303 } 304 callback_exec_ctx_ = exec_ctx; 305 } 306 } 307 Enqueue(grpc_completion_queue_functor * functor,int is_success)308 static void Enqueue(grpc_completion_queue_functor* functor, int is_success) { 309 functor->internal_success = is_success; 310 functor->internal_next = nullptr; 311 312 ApplicationCallbackExecCtx* ctx = Get(); 313 314 if (ctx->head_ == nullptr) { 315 ctx->head_ = functor; 316 } 317 if (ctx->tail_ != nullptr) { 318 ctx->tail_->internal_next = functor; 319 } 320 ctx->tail_ = functor; 321 } 322 Available()323 static bool Available() { return Get() != nullptr; } 324 325 private: 326 uintptr_t flags_{0u}; 327 grpc_completion_queue_functor* head_{nullptr}; 328 grpc_completion_queue_functor* tail_{nullptr}; 329 static thread_local ApplicationCallbackExecCtx* callback_exec_ctx_; 330 }; 331 332 } // namespace grpc_core 333 334 #endif // GRPC_SRC_CORE_LIB_IOMGR_EXEC_CTX_H 335