1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/call_combiner.h"
22 
23 #include <inttypes.h>
24 
25 #include <grpc/support/log.h>
26 
27 #include "src/core/lib/debug/stats.h"
28 #include "src/core/lib/debug/stats_data.h"
29 #include "src/core/lib/gprpp/crash.h"
30 
31 namespace grpc_core {
32 
33 DebugOnlyTraceFlag grpc_call_combiner_trace(false, "call_combiner");
34 
35 namespace {
36 
37 // grpc_error LSB can be used
38 constexpr intptr_t kErrorBit = 1;
39 
DecodeCancelStateError(gpr_atm cancel_state)40 grpc_error_handle DecodeCancelStateError(gpr_atm cancel_state) {
41   if (cancel_state & kErrorBit) {
42     return internal::StatusGetFromHeapPtr(cancel_state & ~kErrorBit);
43   }
44   return absl::OkStatus();
45 }
46 
47 }  // namespace
48 
CallCombiner()49 CallCombiner::CallCombiner() {
50   gpr_atm_no_barrier_store(&cancel_state_, 0);
51   gpr_atm_no_barrier_store(&size_, 0);
52 #ifdef GRPC_TSAN_ENABLED
53   GRPC_CLOSURE_INIT(&tsan_closure_, TsanClosure, this,
54                     grpc_schedule_on_exec_ctx);
55 #endif
56 }
57 
~CallCombiner()58 CallCombiner::~CallCombiner() {
59   if (cancel_state_ & kErrorBit) {
60     internal::StatusFreeHeapPtr(cancel_state_ & ~kErrorBit);
61   }
62 }
63 
64 #ifdef GRPC_TSAN_ENABLED
TsanClosure(void * arg,grpc_error_handle error)65 void CallCombiner::TsanClosure(void* arg, grpc_error_handle error) {
66   CallCombiner* self = static_cast<CallCombiner*>(arg);
67   // We ref-count the lock, and check if it's already taken.
68   // If it was taken, we should do nothing. Otherwise, we will mark it as
69   // locked. Note that if two different threads try to do this, only one of
70   // them will be able to mark the lock as acquired, while they both run their
71   // callbacks. In such cases (which should never happen for call_combiner),
72   // TSAN will correctly produce an error.
73   //
74   // TODO(soheil): This only covers the callbacks scheduled by
75   //               CallCombiner::Start() and CallCombiner::Stop().
76   //               If in the future, a callback gets scheduled using other
77   //               mechanisms, we will need to add APIs to externally lock
78   //               call combiners.
79   RefCountedPtr<TsanLock> lock = self->tsan_lock_;
80   bool prev = false;
81   if (lock->taken.compare_exchange_strong(prev, true)) {
82     TSAN_ANNOTATE_RWLOCK_ACQUIRED(&lock->taken, true);
83   } else {
84     lock.reset();
85   }
86   Closure::Run(DEBUG_LOCATION, self->original_closure_, error);
87   if (lock != nullptr) {
88     TSAN_ANNOTATE_RWLOCK_RELEASED(&lock->taken, true);
89     bool prev = true;
90     GPR_ASSERT(lock->taken.compare_exchange_strong(prev, false));
91   }
92 }
93 #endif
94 
ScheduleClosure(grpc_closure * closure,grpc_error_handle error)95 void CallCombiner::ScheduleClosure(grpc_closure* closure,
96                                    grpc_error_handle error) {
97 #ifdef GRPC_TSAN_ENABLED
98   original_closure_ = closure;
99   ExecCtx::Run(DEBUG_LOCATION, &tsan_closure_, error);
100 #else
101   ExecCtx::Run(DEBUG_LOCATION, closure, error);
102 #endif
103 }
104 
105 #ifndef NDEBUG
106 #define DEBUG_ARGS const char *file, int line,
107 #define DEBUG_FMT_STR "%s:%d: "
108 #define DEBUG_FMT_ARGS , file, line
109 #else
110 #define DEBUG_ARGS
111 #define DEBUG_FMT_STR
112 #define DEBUG_FMT_ARGS
113 #endif
114 
Start(grpc_closure * closure,grpc_error_handle error,DEBUG_ARGS const char * reason)115 void CallCombiner::Start(grpc_closure* closure, grpc_error_handle error,
116                          DEBUG_ARGS const char* reason) {
117   if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
118     gpr_log(GPR_INFO,
119             "==> CallCombiner::Start() [%p] closure=%s [" DEBUG_FMT_STR
120             "%s] error=%s",
121             this, closure->DebugString().c_str() DEBUG_FMT_ARGS, reason,
122             StatusToString(error).c_str());
123   }
124   size_t prev_size =
125       static_cast<size_t>(gpr_atm_full_fetch_add(&size_, (gpr_atm)1));
126   if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
127     gpr_log(GPR_INFO, "  size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
128             prev_size + 1);
129   }
130   if (prev_size == 0) {
131     if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
132       gpr_log(GPR_INFO, "  EXECUTING IMMEDIATELY");
133     }
134     // Queue was empty, so execute this closure immediately.
135     ScheduleClosure(closure, error);
136   } else {
137     if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
138       gpr_log(GPR_INFO, "  QUEUING");
139     }
140     // Queue was not empty, so add closure to queue.
141     closure->error_data.error = internal::StatusAllocHeapPtr(error);
142     queue_.Push(
143         reinterpret_cast<MultiProducerSingleConsumerQueue::Node*>(closure));
144   }
145 }
146 
Stop(DEBUG_ARGS const char * reason)147 void CallCombiner::Stop(DEBUG_ARGS const char* reason) {
148   if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
149     gpr_log(GPR_INFO, "==> CallCombiner::Stop() [%p] [" DEBUG_FMT_STR "%s]",
150             this DEBUG_FMT_ARGS, reason);
151   }
152   size_t prev_size =
153       static_cast<size_t>(gpr_atm_full_fetch_add(&size_, (gpr_atm)-1));
154   if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
155     gpr_log(GPR_INFO, "  size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
156             prev_size - 1);
157   }
158   GPR_ASSERT(prev_size >= 1);
159   if (prev_size > 1) {
160     while (true) {
161       if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
162         gpr_log(GPR_INFO, "  checking queue");
163       }
164       bool empty;
165       grpc_closure* closure =
166           reinterpret_cast<grpc_closure*>(queue_.PopAndCheckEnd(&empty));
167       if (closure == nullptr) {
168         // This can happen either due to a race condition within the mpscq
169         // code or because of a race with Start().
170         if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
171           gpr_log(GPR_INFO, "  queue returned no result; checking again");
172         }
173         continue;
174       }
175       grpc_error_handle error =
176           internal::StatusMoveFromHeapPtr(closure->error_data.error);
177       closure->error_data.error = 0;
178       if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
179         gpr_log(GPR_INFO, "  EXECUTING FROM QUEUE: closure=%s error=%s",
180                 closure->DebugString().c_str(), StatusToString(error).c_str());
181       }
182       ScheduleClosure(closure, error);
183       break;
184     }
185   } else if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
186     gpr_log(GPR_INFO, "  queue empty");
187   }
188 }
189 
SetNotifyOnCancel(grpc_closure * closure)190 void CallCombiner::SetNotifyOnCancel(grpc_closure* closure) {
191   while (true) {
192     // Decode original state.
193     gpr_atm original_state = gpr_atm_acq_load(&cancel_state_);
194     grpc_error_handle original_error = DecodeCancelStateError(original_state);
195     // If error is set, invoke the cancellation closure immediately.
196     // Otherwise, store the new closure.
197     if (!original_error.ok()) {
198       if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
199         gpr_log(GPR_INFO,
200                 "call_combiner=%p: scheduling notify_on_cancel callback=%p "
201                 "for pre-existing cancellation",
202                 this, closure);
203       }
204       ExecCtx::Run(DEBUG_LOCATION, closure, original_error);
205       break;
206     } else {
207       if (gpr_atm_full_cas(&cancel_state_, original_state,
208                            reinterpret_cast<gpr_atm>(closure))) {
209         if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
210           gpr_log(GPR_INFO, "call_combiner=%p: setting notify_on_cancel=%p",
211                   this, closure);
212         }
213         // If we replaced an earlier closure, invoke the original
214         // closure with absl::OkStatus().  This allows callers to clean
215         // up any resources they may be holding for the callback.
216         if (original_state != 0) {
217           closure = reinterpret_cast<grpc_closure*>(original_state);
218           if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
219             gpr_log(GPR_INFO,
220                     "call_combiner=%p: scheduling old cancel callback=%p", this,
221                     closure);
222           }
223           ExecCtx::Run(DEBUG_LOCATION, closure, absl::OkStatus());
224         }
225         break;
226       }
227     }
228     // cas failed, try again.
229   }
230 }
231 
Cancel(grpc_error_handle error)232 void CallCombiner::Cancel(grpc_error_handle error) {
233   intptr_t status_ptr = internal::StatusAllocHeapPtr(error);
234   gpr_atm new_state = kErrorBit | status_ptr;
235   while (true) {
236     gpr_atm original_state = gpr_atm_acq_load(&cancel_state_);
237     grpc_error_handle original_error = DecodeCancelStateError(original_state);
238     if (!original_error.ok()) {
239       internal::StatusFreeHeapPtr(status_ptr);
240       break;
241     }
242     if (gpr_atm_full_cas(&cancel_state_, original_state, new_state)) {
243       if (original_state != 0) {
244         grpc_closure* notify_on_cancel =
245             reinterpret_cast<grpc_closure*>(original_state);
246         if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
247           gpr_log(GPR_INFO,
248                   "call_combiner=%p: scheduling notify_on_cancel callback=%p",
249                   this, notify_on_cancel);
250         }
251         ExecCtx::Run(DEBUG_LOCATION, notify_on_cancel, error);
252       }
253       break;
254     }
255     // cas failed, try again.
256   }
257 }
258 
259 }  // namespace grpc_core
260