1 // 2 // 3 // Copyright 2017 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 // 18 19 #ifndef GRPC_SRC_CORE_LIB_IOMGR_CALL_COMBINER_H 20 #define GRPC_SRC_CORE_LIB_IOMGR_CALL_COMBINER_H 21 22 #include <grpc/support/port_platform.h> 23 24 #include <stddef.h> 25 26 #include "absl/container/inlined_vector.h" 27 28 #include <grpc/support/atm.h> 29 30 #include "src/core/lib/gprpp/mpscq.h" 31 #include "src/core/lib/gprpp/ref_counted.h" 32 #include "src/core/lib/gprpp/ref_counted_ptr.h" 33 #include "src/core/lib/iomgr/closure.h" 34 #include "src/core/lib/iomgr/dynamic_annotations.h" 35 #include "src/core/lib/iomgr/exec_ctx.h" 36 37 // A simple, lock-free mechanism for serializing activity related to a 38 // single call. This is similar to a combiner but is more lightweight. 39 // 40 // It requires the callback (or, in the common case where the callback 41 // actually kicks off a chain of callbacks, the last callback in that 42 // chain) to explicitly indicate (by calling GRPC_CALL_COMBINER_STOP()) 43 // when it is done with the action that was kicked off by the original 44 // callback. 45 46 namespace grpc_core { 47 48 extern DebugOnlyTraceFlag grpc_call_combiner_trace; 49 50 class CallCombiner { 51 public: 52 CallCombiner(); 53 ~CallCombiner(); 54 55 #ifndef NDEBUG 56 #define GRPC_CALL_COMBINER_START(call_combiner, closure, error, reason) \ 57 (call_combiner)->Start((closure), (error), __FILE__, __LINE__, (reason)) 58 #define GRPC_CALL_COMBINER_STOP(call_combiner, reason) \ 59 (call_combiner)->Stop(__FILE__, __LINE__, (reason)) 60 /// Starts processing \a closure. 61 void Start(grpc_closure* closure, grpc_error_handle error, const char* file, 62 int line, const char* reason); 63 /// Yields the call combiner to the next closure in the queue, if any. 64 void Stop(const char* file, int line, const char* reason); 65 #else 66 #define GRPC_CALL_COMBINER_START(call_combiner, closure, error, reason) \ 67 (call_combiner)->Start((closure), (error), (reason)) 68 #define GRPC_CALL_COMBINER_STOP(call_combiner, reason) \ 69 (call_combiner)->Stop((reason)) 70 /// Starts processing \a closure. 71 void Start(grpc_closure* closure, grpc_error_handle error, 72 const char* reason); 73 /// Yields the call combiner to the next closure in the queue, if any. 74 void Stop(const char* reason); 75 #endif 76 77 /// Registers \a closure to be invoked when Cancel() is called. 78 /// 79 /// Once a closure is registered, it will always be scheduled exactly 80 /// once; this allows the closure to hold references that will be freed 81 /// regardless of whether or not the call was cancelled. If a cancellation 82 /// does occur, the closure will be scheduled with the cancellation error; 83 /// otherwise, it will be scheduled with absl::OkStatus(). 84 /// 85 /// The closure will be scheduled in the following cases: 86 /// - If Cancel() was called prior to registering the closure, it will be 87 /// scheduled immediately with the cancelation error. 88 /// - If Cancel() is called after registering the closure, the closure will 89 /// be scheduled with the cancellation error. 90 /// - If SetNotifyOnCancel() is called again to register a new cancellation 91 /// closure, the previous cancellation closure will be scheduled with 92 /// absl::OkStatus(). 93 /// 94 /// If \a closure is NULL, then no closure will be invoked on 95 /// cancellation; this effectively unregisters the previously set closure. 96 /// However, most filters will not need to explicitly unregister their 97 /// callbacks, as this is done automatically when the call is destroyed. 98 void SetNotifyOnCancel(grpc_closure* closure); 99 100 /// Indicates that the call has been cancelled. 101 void Cancel(grpc_error_handle error); 102 103 private: 104 void ScheduleClosure(grpc_closure* closure, grpc_error_handle error); 105 #ifdef GRPC_TSAN_ENABLED 106 static void TsanClosure(void* arg, grpc_error_handle error); 107 #endif 108 109 gpr_atm size_ = 0; // size_t, num closures in queue or currently executing 110 MultiProducerSingleConsumerQueue queue_; 111 // Either 0 (if not cancelled and no cancellation closure set), 112 // a grpc_closure* (if the lowest bit is 0), 113 // or a grpc_error_handle (if the lowest bit is 1). 114 gpr_atm cancel_state_ = 0; 115 #ifdef GRPC_TSAN_ENABLED 116 // A fake ref-counted lock that is kept alive after the destruction of 117 // grpc_call_combiner, when we are running the original closure. 118 // 119 // Ideally we want to lock and unlock the call combiner as a pointer, when the 120 // callback is called. However, original_closure is free to trigger 121 // anything on the call combiner (including destruction of grpc_call). 122 // Thus, we need a ref-counted structure that can outlive the call combiner. 123 struct TsanLock : public RefCounted<TsanLock, NonPolymorphicRefCount> { TsanLockTsanLock124 TsanLock() { TSAN_ANNOTATE_RWLOCK_CREATE(&taken); } ~TsanLockTsanLock125 ~TsanLock() { TSAN_ANNOTATE_RWLOCK_DESTROY(&taken); } 126 // To avoid double-locking by the same thread, we should acquire/release 127 // the lock only when taken is false. On each acquire taken must be set to 128 // true. 129 std::atomic<bool> taken{false}; 130 }; 131 RefCountedPtr<TsanLock> tsan_lock_ = MakeRefCounted<TsanLock>(); 132 grpc_closure tsan_closure_; 133 grpc_closure* original_closure_; 134 #endif 135 }; 136 137 // Helper for running a list of closures in a call combiner. 138 // 139 // Each callback running in the call combiner will eventually be 140 // returned to the surface, at which point the surface will yield the 141 // call combiner. So when we are running in the call combiner and have 142 // more than one callback to return to the surface, we need to re-enter 143 // the call combiner for all but one of those callbacks. 144 class CallCombinerClosureList { 145 public: CallCombinerClosureList()146 CallCombinerClosureList() {} 147 148 // Adds a closure to the list. The closure must eventually result in 149 // the call combiner being yielded. Add(grpc_closure * closure,grpc_error_handle error,const char * reason)150 void Add(grpc_closure* closure, grpc_error_handle error, const char* reason) { 151 closures_.emplace_back(closure, error, reason); 152 } 153 154 // Runs all closures in the call combiner and yields the call combiner. 155 // 156 // All but one of the closures in the list will be scheduled via 157 // GRPC_CALL_COMBINER_START(), and the remaining closure will be 158 // scheduled via ExecCtx::Run(), which will eventually result 159 // in yielding the call combiner. If the list is empty, then the call 160 // combiner will be yielded immediately. RunClosures(CallCombiner * call_combiner)161 void RunClosures(CallCombiner* call_combiner) { 162 if (closures_.empty()) { 163 GRPC_CALL_COMBINER_STOP(call_combiner, "no closures to schedule"); 164 return; 165 } 166 for (size_t i = 1; i < closures_.size(); ++i) { 167 auto& closure = closures_[i]; 168 GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error, 169 closure.reason); 170 } 171 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) { 172 gpr_log(GPR_INFO, 173 "CallCombinerClosureList executing closure while already " 174 "holding call_combiner %p: closure=%s error=%s reason=%s", 175 call_combiner, closures_[0].closure->DebugString().c_str(), 176 StatusToString(closures_[0].error).c_str(), closures_[0].reason); 177 } 178 // This will release the call combiner. 179 ExecCtx::Run(DEBUG_LOCATION, closures_[0].closure, closures_[0].error); 180 closures_.clear(); 181 } 182 183 // Runs all closures in the call combiner, but does NOT yield the call 184 // combiner. All closures will be scheduled via GRPC_CALL_COMBINER_START(). RunClosuresWithoutYielding(CallCombiner * call_combiner)185 void RunClosuresWithoutYielding(CallCombiner* call_combiner) { 186 for (size_t i = 0; i < closures_.size(); ++i) { 187 auto& closure = closures_[i]; 188 GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error, 189 closure.reason); 190 } 191 closures_.clear(); 192 } 193 size()194 size_t size() const { return closures_.size(); } 195 196 private: 197 struct CallCombinerClosure { 198 grpc_closure* closure; 199 grpc_error_handle error; 200 const char* reason; 201 CallCombinerClosureCallCombinerClosure202 CallCombinerClosure(grpc_closure* closure, grpc_error_handle error, 203 const char* reason) 204 : closure(closure), error(error), reason(reason) {} 205 }; 206 207 // There are generally a maximum of 6 closures to run in the call 208 // combiner, one for each pending op. 209 absl::InlinedVector<CallCombinerClosure, 6> closures_; 210 }; 211 212 } // namespace grpc_core 213 214 #endif // GRPC_SRC_CORE_LIB_IOMGR_CALL_COMBINER_H 215