1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include "src/core/ext/filters/client_channel/client_channel.h"
20
21 #include <inttypes.h>
22 #include <limits.h>
23
24 #include <algorithm>
25 #include <functional>
26 #include <new>
27 #include <type_traits>
28 #include <utility>
29 #include <vector>
30
31 #include "absl/cleanup/cleanup.h"
32 #include "absl/status/status.h"
33 #include "absl/status/statusor.h"
34 #include "absl/strings/cord.h"
35 #include "absl/strings/numbers.h"
36 #include "absl/strings/str_cat.h"
37 #include "absl/strings/str_join.h"
38 #include "absl/strings/string_view.h"
39 #include "absl/types/optional.h"
40 #include "absl/types/variant.h"
41
42 #include <grpc/event_engine/event_engine.h>
43 #include <grpc/slice.h>
44 #include <grpc/status.h>
45 #include <grpc/support/json.h>
46 #include <grpc/support/log.h>
47 #include <grpc/support/string_util.h>
48 #include <grpc/support/time.h>
49
50 #include "src/core/ext/filters/client_channel/backend_metric.h"
51 #include "src/core/ext/filters/client_channel/backup_poller.h"
52 #include "src/core/ext/filters/client_channel/client_channel_channelz.h"
53 #include "src/core/ext/filters/client_channel/client_channel_internal.h"
54 #include "src/core/ext/filters/client_channel/client_channel_service_config.h"
55 #include "src/core/ext/filters/client_channel/config_selector.h"
56 #include "src/core/ext/filters/client_channel/dynamic_filters.h"
57 #include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
58 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
59 #include "src/core/ext/filters/client_channel/local_subchannel_pool.h"
60 #include "src/core/ext/filters/client_channel/retry_filter.h"
61 #include "src/core/ext/filters/client_channel/subchannel.h"
62 #include "src/core/ext/filters/client_channel/subchannel_interface_internal.h"
63 #include "src/core/ext/filters/deadline/deadline_filter.h"
64 #include "src/core/lib/channel/channel_args.h"
65 #include "src/core/lib/channel/channel_stack.h"
66 #include "src/core/lib/channel/channel_trace.h"
67 #include "src/core/lib/channel/status_util.h"
68 #include "src/core/lib/config/core_configuration.h"
69 #include "src/core/lib/debug/trace.h"
70 #include "src/core/lib/gpr/useful.h"
71 #include "src/core/lib/gprpp/debug_location.h"
72 #include "src/core/lib/gprpp/status_helper.h"
73 #include "src/core/lib/gprpp/sync.h"
74 #include "src/core/lib/gprpp/unique_type_name.h"
75 #include "src/core/lib/gprpp/work_serializer.h"
76 #include "src/core/lib/handshaker/proxy_mapper_registry.h"
77 #include "src/core/lib/iomgr/exec_ctx.h"
78 #include "src/core/lib/iomgr/polling_entity.h"
79 #include "src/core/lib/iomgr/pollset_set.h"
80 #include "src/core/lib/json/json.h"
81 #include "src/core/lib/load_balancing/lb_policy_registry.h"
82 #include "src/core/lib/load_balancing/subchannel_interface.h"
83 #include "src/core/lib/resolver/resolver_registry.h"
84 #include "src/core/lib/resolver/server_address.h"
85 #include "src/core/lib/service_config/service_config_call_data.h"
86 #include "src/core/lib/service_config/service_config_impl.h"
87 #include "src/core/lib/slice/slice.h"
88 #include "src/core/lib/slice/slice_internal.h"
89 #include "src/core/lib/surface/channel.h"
90 #include "src/core/lib/transport/connectivity_state.h"
91 #include "src/core/lib/transport/error_utils.h"
92 #include "src/core/lib/transport/metadata_batch.h"
93
94 //
95 // Client channel filter
96 //
97
98 namespace grpc_core {
99
100 using internal::ClientChannelMethodParsedConfig;
101
102 TraceFlag grpc_client_channel_trace(false, "client_channel");
103 TraceFlag grpc_client_channel_call_trace(false, "client_channel_call");
104 TraceFlag grpc_client_channel_lb_call_trace(false, "client_channel_lb_call");
105
106 //
107 // ClientChannel::CallData definition
108 //
109
110 class ClientChannel::CallData {
111 public:
112 // Removes the call from the channel's list of calls queued
113 // for name resolution.
114 void RemoveCallFromResolverQueuedCallsLocked()
115 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_);
116
117 // Called by the channel for each queued call when a new resolution
118 // result becomes available.
119 virtual void RetryCheckResolutionLocked()
120 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_) = 0;
121
dynamic_filters() const122 RefCountedPtr<DynamicFilters> dynamic_filters() const {
123 return dynamic_filters_;
124 }
125
126 protected:
127 CallData() = default;
128 virtual ~CallData() = default;
129
130 // Checks whether a resolver result is available. The following
131 // outcomes are possible:
132 // - No resolver result is available yet. The call will be queued and
133 // absl::nullopt will be returned. Later, when a resolver result
134 // becomes available, RetryCheckResolutionLocked() will be called.
135 // - The resolver has returned a transient failure. If the call is
136 // not wait_for_ready, a non-OK status will be returned. (If the
137 // call *is* wait_for_ready, it will be queued instead.)
138 // - There is a valid resolver result. The service config will be
139 // stored in the call context and an OK status will be returned.
140 absl::optional<absl::Status> CheckResolution(bool was_queued);
141
142 private:
143 // Accessors for data stored in the subclass.
144 virtual ClientChannel* chand() const = 0;
145 virtual Arena* arena() const = 0;
146 virtual grpc_polling_entity* pollent() const = 0;
147 virtual grpc_metadata_batch* send_initial_metadata() = 0;
148 virtual grpc_call_context_element* call_context() const = 0;
149
150 // Helper function for CheckResolution(). Returns true if the call
151 // can continue (i.e., there is a valid resolution result, or there is
152 // an invalid resolution result but the call is not wait_for_ready).
153 bool CheckResolutionLocked(
154 absl::StatusOr<RefCountedPtr<ConfigSelector>>* config_selector)
155 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_);
156
157 // Adds the call to the channel's list of calls queued for name resolution.
158 void AddCallToResolverQueuedCallsLocked()
159 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_);
160
161 // Called when adding the call to the resolver queue.
OnAddToQueueLocked()162 virtual void OnAddToQueueLocked()
163 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_) {}
164
165 // Applies service config to the call. Must be invoked once we know
166 // that the resolver has returned results to the channel.
167 // If an error is returned, the error indicates the status with which
168 // the call should be failed.
169 grpc_error_handle ApplyServiceConfigToCallLocked(
170 const absl::StatusOr<RefCountedPtr<ConfigSelector>>& config_selector);
171
172 // Called to reset the deadline based on the service config obtained
173 // from the resolver.
174 virtual void ResetDeadline(Duration timeout) = 0;
175
176 RefCountedPtr<DynamicFilters> dynamic_filters_;
177 };
178
179 class ClientChannel::FilterBasedCallData : public ClientChannel::CallData {
180 public:
181 static grpc_error_handle Init(grpc_call_element* elem,
182 const grpc_call_element_args* args);
183 static void Destroy(grpc_call_element* elem,
184 const grpc_call_final_info* final_info,
185 grpc_closure* then_schedule_closure);
186 static void StartTransportStreamOpBatch(
187 grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
188 static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent);
189
190 private:
191 class ResolverQueuedCallCanceller;
192
193 FilterBasedCallData(grpc_call_element* elem,
194 const grpc_call_element_args& args);
195 ~FilterBasedCallData() override;
196
elem() const197 grpc_call_element* elem() const { return deadline_state_.elem; }
owning_call() const198 grpc_call_stack* owning_call() const { return deadline_state_.call_stack; }
call_combiner() const199 CallCombiner* call_combiner() const { return deadline_state_.call_combiner; }
200
chand() const201 ClientChannel* chand() const override {
202 return static_cast<ClientChannel*>(elem()->channel_data);
203 }
arena() const204 Arena* arena() const override { return deadline_state_.arena; }
pollent() const205 grpc_polling_entity* pollent() const override { return pollent_; }
send_initial_metadata()206 grpc_metadata_batch* send_initial_metadata() override {
207 return pending_batches_[0]
208 ->payload->send_initial_metadata.send_initial_metadata;
209 }
call_context() const210 grpc_call_context_element* call_context() const override {
211 return call_context_;
212 }
213
214 // Returns the index into pending_batches_ to be used for batch.
215 static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
216 void PendingBatchesAdd(grpc_transport_stream_op_batch* batch);
217 static void FailPendingBatchInCallCombiner(void* arg,
218 grpc_error_handle error);
219 // A predicate type and some useful implementations for PendingBatchesFail().
220 typedef bool (*YieldCallCombinerPredicate)(
221 const CallCombinerClosureList& closures);
YieldCallCombiner(const CallCombinerClosureList &)222 static bool YieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
223 return true;
224 }
NoYieldCallCombiner(const CallCombinerClosureList &)225 static bool NoYieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
226 return false;
227 }
YieldCallCombinerIfPendingBatchesFound(const CallCombinerClosureList & closures)228 static bool YieldCallCombinerIfPendingBatchesFound(
229 const CallCombinerClosureList& closures) {
230 return closures.size() > 0;
231 }
232 // Fails all pending batches.
233 // If yield_call_combiner_predicate returns true, assumes responsibility for
234 // yielding the call combiner.
235 void PendingBatchesFail(
236 grpc_error_handle error,
237 YieldCallCombinerPredicate yield_call_combiner_predicate);
238 static void ResumePendingBatchInCallCombiner(void* arg,
239 grpc_error_handle ignored);
240 // Resumes all pending batches on dynamic_call_.
241 void PendingBatchesResume();
242
243 // Called to check for a resolution result, both when the call is
244 // initially started and when it is queued and the channel gets a new
245 // resolution result.
246 void TryCheckResolution(bool was_queued);
247
248 void OnAddToQueueLocked() override
249 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_);
250
251 void RetryCheckResolutionLocked() override
252 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_);
253
ResetDeadline(Duration timeout)254 void ResetDeadline(Duration timeout) override {
255 const Timestamp per_method_deadline =
256 Timestamp::FromCycleCounterRoundUp(call_start_time_) + timeout;
257 if (per_method_deadline < deadline_) {
258 deadline_ = per_method_deadline;
259 grpc_deadline_state_reset(&deadline_state_, deadline_);
260 }
261 }
262
263 void CreateDynamicCall();
264
265 static void RecvTrailingMetadataReadyForConfigSelectorCommitCallback(
266 void* arg, grpc_error_handle error);
267
268 grpc_slice path_; // Request path.
269 grpc_call_context_element* call_context_;
270 gpr_cycle_counter call_start_time_;
271 Timestamp deadline_;
272
273 // State for handling deadlines.
274 grpc_deadline_state deadline_state_;
275
276 grpc_polling_entity* pollent_ = nullptr;
277
278 // Accessed while holding ClientChannel::resolution_mu_.
279 ResolverQueuedCallCanceller* resolver_call_canceller_
280 ABSL_GUARDED_BY(&ClientChannel::resolution_mu_) = nullptr;
281
282 grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
283 grpc_closure recv_trailing_metadata_ready_;
284
285 RefCountedPtr<DynamicFilters::Call> dynamic_call_;
286
287 // Batches are added to this list when received from above.
288 // They are removed when we are done handling the batch (i.e., when
289 // either we have invoked all of the batch's callbacks or we have
290 // passed the batch down to the LB call and are not intercepting any of
291 // its callbacks).
292 grpc_transport_stream_op_batch* pending_batches_[MAX_PENDING_BATCHES] = {};
293
294 // Set when we get a cancel_stream op.
295 grpc_error_handle cancel_error_;
296 };
297
298 //
299 // Filter vtable
300 //
301
302 const grpc_channel_filter ClientChannel::kFilterVtable = {
303 ClientChannel::FilterBasedCallData::StartTransportStreamOpBatch,
304 nullptr,
305 ClientChannel::StartTransportOp,
306 sizeof(ClientChannel::FilterBasedCallData),
307 ClientChannel::FilterBasedCallData::Init,
308 ClientChannel::FilterBasedCallData::SetPollent,
309 ClientChannel::FilterBasedCallData::Destroy,
310 sizeof(ClientChannel),
311 ClientChannel::Init,
312 grpc_channel_stack_no_post_init,
313 ClientChannel::Destroy,
314 ClientChannel::GetChannelInfo,
315 "client-channel",
316 };
317
318 //
319 // dynamic termination filter
320 //
321
322 namespace {
323
324 class DynamicTerminationFilter {
325 public:
326 class CallData;
327
328 static const grpc_channel_filter kFilterVtable;
329
Init(grpc_channel_element * elem,grpc_channel_element_args * args)330 static grpc_error_handle Init(grpc_channel_element* elem,
331 grpc_channel_element_args* args) {
332 GPR_ASSERT(args->is_last);
333 GPR_ASSERT(elem->filter == &kFilterVtable);
334 new (elem->channel_data) DynamicTerminationFilter(args->channel_args);
335 return absl::OkStatus();
336 }
337
Destroy(grpc_channel_element * elem)338 static void Destroy(grpc_channel_element* elem) {
339 auto* chand = static_cast<DynamicTerminationFilter*>(elem->channel_data);
340 chand->~DynamicTerminationFilter();
341 }
342
343 // Will never be called.
StartTransportOp(grpc_channel_element *,grpc_transport_op *)344 static void StartTransportOp(grpc_channel_element* /*elem*/,
345 grpc_transport_op* /*op*/) {}
GetChannelInfo(grpc_channel_element *,const grpc_channel_info *)346 static void GetChannelInfo(grpc_channel_element* /*elem*/,
347 const grpc_channel_info* /*info*/) {}
348
349 private:
DynamicTerminationFilter(const ChannelArgs & args)350 explicit DynamicTerminationFilter(const ChannelArgs& args)
351 : chand_(args.GetObject<ClientChannel>()) {}
352
353 ClientChannel* chand_;
354 };
355
356 class DynamicTerminationFilter::CallData {
357 public:
Init(grpc_call_element * elem,const grpc_call_element_args * args)358 static grpc_error_handle Init(grpc_call_element* elem,
359 const grpc_call_element_args* args) {
360 new (elem->call_data) CallData(*args);
361 return absl::OkStatus();
362 }
363
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure * then_schedule_closure)364 static void Destroy(grpc_call_element* elem,
365 const grpc_call_final_info* /*final_info*/,
366 grpc_closure* then_schedule_closure) {
367 auto* calld = static_cast<CallData*>(elem->call_data);
368 RefCountedPtr<SubchannelCall> subchannel_call;
369 if (GPR_LIKELY(calld->lb_call_ != nullptr)) {
370 subchannel_call = calld->lb_call_->subchannel_call();
371 }
372 calld->~CallData();
373 if (GPR_LIKELY(subchannel_call != nullptr)) {
374 subchannel_call->SetAfterCallStackDestroy(then_schedule_closure);
375 } else {
376 // TODO(yashkt) : This can potentially be a Closure::Run
377 ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, absl::OkStatus());
378 }
379 }
380
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)381 static void StartTransportStreamOpBatch(
382 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
383 auto* calld = static_cast<CallData*>(elem->call_data);
384 calld->lb_call_->StartTransportStreamOpBatch(batch);
385 }
386
SetPollent(grpc_call_element * elem,grpc_polling_entity * pollent)387 static void SetPollent(grpc_call_element* elem,
388 grpc_polling_entity* pollent) {
389 auto* calld = static_cast<CallData*>(elem->call_data);
390 auto* chand = static_cast<DynamicTerminationFilter*>(elem->channel_data);
391 ClientChannel* client_channel = chand->chand_;
392 grpc_call_element_args args = {calld->owning_call_, nullptr,
393 calld->call_context_, calld->path_,
394 /*start_time=*/0, calld->deadline_,
395 calld->arena_, calld->call_combiner_};
396 auto* service_config_call_data =
397 static_cast<ClientChannelServiceConfigCallData*>(
398 calld->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
399 calld->lb_call_ = client_channel->CreateLoadBalancedCall(
400 args, pollent, nullptr,
401 [service_config_call_data]() { service_config_call_data->Commit(); },
402 /*is_transparent_retry=*/false);
403 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
404 gpr_log(GPR_INFO,
405 "chand=%p dynamic_termination_calld=%p: create lb_call=%p", chand,
406 client_channel, calld->lb_call_.get());
407 }
408 }
409
410 private:
CallData(const grpc_call_element_args & args)411 explicit CallData(const grpc_call_element_args& args)
412 : path_(CSliceRef(args.path)),
413 deadline_(args.deadline),
414 arena_(args.arena),
415 owning_call_(args.call_stack),
416 call_combiner_(args.call_combiner),
417 call_context_(args.context) {}
418
~CallData()419 ~CallData() { CSliceUnref(path_); }
420
421 grpc_slice path_; // Request path.
422 Timestamp deadline_;
423 Arena* arena_;
424 grpc_call_stack* owning_call_;
425 CallCombiner* call_combiner_;
426 grpc_call_context_element* call_context_;
427
428 OrphanablePtr<ClientChannel::FilterBasedLoadBalancedCall> lb_call_;
429 };
430
431 const grpc_channel_filter DynamicTerminationFilter::kFilterVtable = {
432 DynamicTerminationFilter::CallData::StartTransportStreamOpBatch,
433 nullptr,
434 DynamicTerminationFilter::StartTransportOp,
435 sizeof(DynamicTerminationFilter::CallData),
436 DynamicTerminationFilter::CallData::Init,
437 DynamicTerminationFilter::CallData::SetPollent,
438 DynamicTerminationFilter::CallData::Destroy,
439 sizeof(DynamicTerminationFilter),
440 DynamicTerminationFilter::Init,
441 grpc_channel_stack_no_post_init,
442 DynamicTerminationFilter::Destroy,
443 DynamicTerminationFilter::GetChannelInfo,
444 "dynamic_filter_termination",
445 };
446
447 } // namespace
448
449 //
450 // ClientChannel::ResolverResultHandler
451 //
452
453 class ClientChannel::ResolverResultHandler : public Resolver::ResultHandler {
454 public:
ResolverResultHandler(ClientChannel * chand)455 explicit ResolverResultHandler(ClientChannel* chand) : chand_(chand) {
456 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ResolverResultHandler");
457 }
458
~ResolverResultHandler()459 ~ResolverResultHandler() override {
460 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
461 gpr_log(GPR_INFO, "chand=%p: resolver shutdown complete", chand_);
462 }
463 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ResolverResultHandler");
464 }
465
ReportResult(Resolver::Result result)466 void ReportResult(Resolver::Result result) override
467 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
468 chand_->OnResolverResultChangedLocked(std::move(result));
469 }
470
471 private:
472 ClientChannel* chand_;
473 };
474
475 //
476 // ClientChannel::SubchannelWrapper
477 //
478
479 // This class is a wrapper for Subchannel that hides details of the
480 // channel's implementation (such as the connected subchannel) from the
481 // LB policy API.
482 //
483 // Note that no synchronization is needed here, because even if the
484 // underlying subchannel is shared between channels, this wrapper will only
485 // be used within one channel, so it will always be synchronized by the
486 // control plane work_serializer.
487 class ClientChannel::SubchannelWrapper : public SubchannelInterface {
488 public:
SubchannelWrapper(ClientChannel * chand,RefCountedPtr<Subchannel> subchannel)489 SubchannelWrapper(ClientChannel* chand, RefCountedPtr<Subchannel> subchannel)
490 : SubchannelInterface(GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)
491 ? "SubchannelWrapper"
492 : nullptr),
493 chand_(chand),
494 subchannel_(std::move(subchannel)) {
495 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
496 gpr_log(GPR_INFO,
497 "chand=%p: creating subchannel wrapper %p for subchannel %p",
498 chand, this, subchannel_.get());
499 }
500 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "SubchannelWrapper");
501 if (chand_->channelz_node_ != nullptr) {
502 auto* subchannel_node = subchannel_->channelz_node();
503 if (subchannel_node != nullptr) {
504 auto it = chand_->subchannel_refcount_map_.find(subchannel_.get());
505 if (it == chand_->subchannel_refcount_map_.end()) {
506 chand_->channelz_node_->AddChildSubchannel(subchannel_node->uuid());
507 it = chand_->subchannel_refcount_map_.emplace(subchannel_.get(), 0)
508 .first;
509 }
510 ++it->second;
511 }
512 }
513 chand_->subchannel_wrappers_.insert(this);
514 }
515
~SubchannelWrapper()516 ~SubchannelWrapper() override {
517 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
518 gpr_log(GPR_INFO,
519 "chand=%p: destroying subchannel wrapper %p for subchannel %p",
520 chand_, this, subchannel_.get());
521 }
522 chand_->subchannel_wrappers_.erase(this);
523 if (chand_->channelz_node_ != nullptr) {
524 auto* subchannel_node = subchannel_->channelz_node();
525 if (subchannel_node != nullptr) {
526 auto it = chand_->subchannel_refcount_map_.find(subchannel_.get());
527 GPR_ASSERT(it != chand_->subchannel_refcount_map_.end());
528 --it->second;
529 if (it->second == 0) {
530 chand_->channelz_node_->RemoveChildSubchannel(
531 subchannel_node->uuid());
532 chand_->subchannel_refcount_map_.erase(it);
533 }
534 }
535 }
536 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "SubchannelWrapper");
537 }
538
WatchConnectivityState(std::unique_ptr<ConnectivityStateWatcherInterface> watcher)539 void WatchConnectivityState(
540 std::unique_ptr<ConnectivityStateWatcherInterface> watcher) override
541 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
542 auto& watcher_wrapper = watcher_map_[watcher.get()];
543 GPR_ASSERT(watcher_wrapper == nullptr);
544 watcher_wrapper = new WatcherWrapper(std::move(watcher),
545 Ref(DEBUG_LOCATION, "WatcherWrapper"));
546 subchannel_->WatchConnectivityState(
547 RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface>(
548 watcher_wrapper));
549 }
550
CancelConnectivityStateWatch(ConnectivityStateWatcherInterface * watcher)551 void CancelConnectivityStateWatch(ConnectivityStateWatcherInterface* watcher)
552 override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
553 auto it = watcher_map_.find(watcher);
554 GPR_ASSERT(it != watcher_map_.end());
555 subchannel_->CancelConnectivityStateWatch(it->second);
556 watcher_map_.erase(it);
557 }
558
connected_subchannel() const559 RefCountedPtr<ConnectedSubchannel> connected_subchannel() const {
560 return subchannel_->connected_subchannel();
561 }
562
RequestConnection()563 void RequestConnection() override { subchannel_->RequestConnection(); }
564
ResetBackoff()565 void ResetBackoff() override { subchannel_->ResetBackoff(); }
566
AddDataWatcher(std::unique_ptr<DataWatcherInterface> watcher)567 void AddDataWatcher(std::unique_ptr<DataWatcherInterface> watcher) override
568 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
569 std::unique_ptr<InternalSubchannelDataWatcherInterface> internal_watcher(
570 static_cast<InternalSubchannelDataWatcherInterface*>(
571 watcher.release()));
572 internal_watcher->SetSubchannel(subchannel_.get());
573 data_watchers_.push_back(std::move(internal_watcher));
574 }
575
ThrottleKeepaliveTime(int new_keepalive_time)576 void ThrottleKeepaliveTime(int new_keepalive_time) {
577 subchannel_->ThrottleKeepaliveTime(new_keepalive_time);
578 }
579
580 private:
581 // This wrapper provides a bridge between the internal Subchannel API
582 // and the SubchannelInterface API that we expose to LB policies.
583 // It implements Subchannel::ConnectivityStateWatcherInterface and wraps
584 // the instance of SubchannelInterface::ConnectivityStateWatcherInterface
585 // that was passed in by the LB policy. We pass an instance of this
586 // class to the underlying Subchannel, and when we get updates from
587 // the subchannel, we pass those on to the wrapped watcher to return
588 // the update to the LB policy.
589 //
590 // This class handles things like hopping into the WorkSerializer
591 // before passing notifications to the LB policy and propagating
592 // keepalive information betwen subchannels.
593 class WatcherWrapper : public Subchannel::ConnectivityStateWatcherInterface {
594 public:
WatcherWrapper(std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> watcher,RefCountedPtr<SubchannelWrapper> parent)595 WatcherWrapper(
596 std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
597 watcher,
598 RefCountedPtr<SubchannelWrapper> parent)
599 : watcher_(std::move(watcher)), parent_(std::move(parent)) {}
600
~WatcherWrapper()601 ~WatcherWrapper() override {
602 auto* parent = parent_.release(); // ref owned by lambda
603 parent->chand_->work_serializer_->Run(
604 [parent]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(
605 *parent_->chand_->work_serializer_) {
606 parent->Unref(DEBUG_LOCATION, "WatcherWrapper");
607 },
608 DEBUG_LOCATION);
609 }
610
OnConnectivityStateChange(grpc_connectivity_state state,const absl::Status & status)611 void OnConnectivityStateChange(grpc_connectivity_state state,
612 const absl::Status& status) override {
613 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
614 gpr_log(GPR_INFO,
615 "chand=%p: connectivity change for subchannel wrapper %p "
616 "subchannel %p; hopping into work_serializer",
617 parent_->chand_, parent_.get(), parent_->subchannel_.get());
618 }
619 Ref().release(); // ref owned by lambda
620 parent_->chand_->work_serializer_->Run(
621 [this, state, status]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(
622 *parent_->chand_->work_serializer_) {
623 ApplyUpdateInControlPlaneWorkSerializer(state, status);
624 Unref();
625 },
626 DEBUG_LOCATION);
627 }
628
interested_parties()629 grpc_pollset_set* interested_parties() override {
630 return watcher_->interested_parties();
631 }
632
633 private:
ApplyUpdateInControlPlaneWorkSerializer(grpc_connectivity_state state,const absl::Status & status)634 void ApplyUpdateInControlPlaneWorkSerializer(grpc_connectivity_state state,
635 const absl::Status& status)
636 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*parent_->chand_->work_serializer_) {
637 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
638 gpr_log(GPR_INFO,
639 "chand=%p: processing connectivity change in work serializer "
640 "for subchannel wrapper %p subchannel %p watcher=%p "
641 "state=%s status=%s",
642 parent_->chand_, parent_.get(), parent_->subchannel_.get(),
643 watcher_.get(), ConnectivityStateName(state),
644 status.ToString().c_str());
645 }
646 absl::optional<absl::Cord> keepalive_throttling =
647 status.GetPayload(kKeepaliveThrottlingKey);
648 if (keepalive_throttling.has_value()) {
649 int new_keepalive_time = -1;
650 if (absl::SimpleAtoi(std::string(keepalive_throttling.value()),
651 &new_keepalive_time)) {
652 if (new_keepalive_time > parent_->chand_->keepalive_time_) {
653 parent_->chand_->keepalive_time_ = new_keepalive_time;
654 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
655 gpr_log(GPR_INFO, "chand=%p: throttling keepalive time to %d",
656 parent_->chand_, parent_->chand_->keepalive_time_);
657 }
658 // Propagate the new keepalive time to all subchannels. This is so
659 // that new transports created by any subchannel (and not just the
660 // subchannel that received the GOAWAY), use the new keepalive time.
661 for (auto* subchannel_wrapper :
662 parent_->chand_->subchannel_wrappers_) {
663 subchannel_wrapper->ThrottleKeepaliveTime(new_keepalive_time);
664 }
665 }
666 } else {
667 gpr_log(GPR_ERROR, "chand=%p: Illegal keepalive throttling value %s",
668 parent_->chand_,
669 std::string(keepalive_throttling.value()).c_str());
670 }
671 }
672 // Propagate status only in state TF.
673 // We specifically want to avoid propagating the status for
674 // state IDLE that the real subchannel gave us only for the
675 // purpose of keepalive propagation.
676 watcher_->OnConnectivityStateChange(
677 state,
678 state == GRPC_CHANNEL_TRANSIENT_FAILURE ? status : absl::OkStatus());
679 }
680
681 std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
682 watcher_;
683 RefCountedPtr<SubchannelWrapper> parent_;
684 };
685
686 ClientChannel* chand_;
687 RefCountedPtr<Subchannel> subchannel_;
688 // Maps from the address of the watcher passed to us by the LB policy
689 // to the address of the WrapperWatcher that we passed to the underlying
690 // subchannel. This is needed so that when the LB policy calls
691 // CancelConnectivityStateWatch() with its watcher, we know the
692 // corresponding WrapperWatcher to cancel on the underlying subchannel.
693 std::map<ConnectivityStateWatcherInterface*, WatcherWrapper*> watcher_map_
694 ABSL_GUARDED_BY(*chand_->work_serializer_);
695 std::vector<std::unique_ptr<InternalSubchannelDataWatcherInterface>>
696 data_watchers_ ABSL_GUARDED_BY(*chand_->work_serializer_);
697 };
698
699 //
700 // ClientChannel::ExternalConnectivityWatcher
701 //
702
ExternalConnectivityWatcher(ClientChannel * chand,grpc_polling_entity pollent,grpc_connectivity_state * state,grpc_closure * on_complete,grpc_closure * watcher_timer_init)703 ClientChannel::ExternalConnectivityWatcher::ExternalConnectivityWatcher(
704 ClientChannel* chand, grpc_polling_entity pollent,
705 grpc_connectivity_state* state, grpc_closure* on_complete,
706 grpc_closure* watcher_timer_init)
707 : chand_(chand),
708 pollent_(pollent),
709 initial_state_(*state),
710 state_(state),
711 on_complete_(on_complete),
712 watcher_timer_init_(watcher_timer_init) {
713 grpc_polling_entity_add_to_pollset_set(&pollent_,
714 chand_->interested_parties_);
715 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher");
716 {
717 MutexLock lock(&chand_->external_watchers_mu_);
718 // Will be deleted when the watch is complete.
719 GPR_ASSERT(chand->external_watchers_[on_complete] == nullptr);
720 // Store a ref to the watcher in the external_watchers_ map.
721 chand->external_watchers_[on_complete] =
722 Ref(DEBUG_LOCATION, "AddWatcherToExternalWatchersMapLocked");
723 }
724 // Pass the ref from creating the object to Start().
725 chand_->work_serializer_->Run(
726 [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
727 // The ref is passed to AddWatcherLocked().
728 AddWatcherLocked();
729 },
730 DEBUG_LOCATION);
731 }
732
~ExternalConnectivityWatcher()733 ClientChannel::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() {
734 grpc_polling_entity_del_from_pollset_set(&pollent_,
735 chand_->interested_parties_);
736 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
737 "ExternalConnectivityWatcher");
738 }
739
740 void ClientChannel::ExternalConnectivityWatcher::
RemoveWatcherFromExternalWatchersMap(ClientChannel * chand,grpc_closure * on_complete,bool cancel)741 RemoveWatcherFromExternalWatchersMap(ClientChannel* chand,
742 grpc_closure* on_complete,
743 bool cancel) {
744 RefCountedPtr<ExternalConnectivityWatcher> watcher;
745 {
746 MutexLock lock(&chand->external_watchers_mu_);
747 auto it = chand->external_watchers_.find(on_complete);
748 if (it != chand->external_watchers_.end()) {
749 watcher = std::move(it->second);
750 chand->external_watchers_.erase(it);
751 }
752 }
753 // watcher->Cancel() will hop into the WorkSerializer, so we have to unlock
754 // the mutex before calling it.
755 if (watcher != nullptr && cancel) watcher->Cancel();
756 }
757
Notify(grpc_connectivity_state state,const absl::Status &)758 void ClientChannel::ExternalConnectivityWatcher::Notify(
759 grpc_connectivity_state state, const absl::Status& /* status */) {
760 bool done = false;
761 if (!done_.compare_exchange_strong(done, true, std::memory_order_relaxed,
762 std::memory_order_relaxed)) {
763 return; // Already done.
764 }
765 // Remove external watcher.
766 ExternalConnectivityWatcher::RemoveWatcherFromExternalWatchersMap(
767 chand_, on_complete_, /*cancel=*/false);
768 // Report new state to the user.
769 *state_ = state;
770 ExecCtx::Run(DEBUG_LOCATION, on_complete_, absl::OkStatus());
771 // Hop back into the work_serializer to clean up.
772 // Not needed in state SHUTDOWN, because the tracker will
773 // automatically remove all watchers in that case.
774 // Note: The callback takes a ref in case the ref inside the state tracker
775 // gets removed before the callback runs via a SHUTDOWN notification.
776 if (state != GRPC_CHANNEL_SHUTDOWN) {
777 Ref(DEBUG_LOCATION, "RemoveWatcherLocked()").release();
778 chand_->work_serializer_->Run(
779 [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
780 RemoveWatcherLocked();
781 Unref(DEBUG_LOCATION, "RemoveWatcherLocked()");
782 },
783 DEBUG_LOCATION);
784 }
785 }
786
Cancel()787 void ClientChannel::ExternalConnectivityWatcher::Cancel() {
788 bool done = false;
789 if (!done_.compare_exchange_strong(done, true, std::memory_order_relaxed,
790 std::memory_order_relaxed)) {
791 return; // Already done.
792 }
793 ExecCtx::Run(DEBUG_LOCATION, on_complete_, absl::CancelledError());
794 // Hop back into the work_serializer to clean up.
795 // Note: The callback takes a ref in case the ref inside the state tracker
796 // gets removed before the callback runs via a SHUTDOWN notification.
797 Ref(DEBUG_LOCATION, "RemoveWatcherLocked()").release();
798 chand_->work_serializer_->Run(
799 [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
800 RemoveWatcherLocked();
801 Unref(DEBUG_LOCATION, "RemoveWatcherLocked()");
802 },
803 DEBUG_LOCATION);
804 }
805
AddWatcherLocked()806 void ClientChannel::ExternalConnectivityWatcher::AddWatcherLocked() {
807 Closure::Run(DEBUG_LOCATION, watcher_timer_init_, absl::OkStatus());
808 // Add new watcher. Pass the ref of the object from creation to OrphanablePtr.
809 chand_->state_tracker_.AddWatcher(
810 initial_state_, OrphanablePtr<ConnectivityStateWatcherInterface>(this));
811 }
812
RemoveWatcherLocked()813 void ClientChannel::ExternalConnectivityWatcher::RemoveWatcherLocked() {
814 chand_->state_tracker_.RemoveWatcher(this);
815 }
816
817 //
818 // ClientChannel::ConnectivityWatcherAdder
819 //
820
821 class ClientChannel::ConnectivityWatcherAdder {
822 public:
ConnectivityWatcherAdder(ClientChannel * chand,grpc_connectivity_state initial_state,OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)823 ConnectivityWatcherAdder(
824 ClientChannel* chand, grpc_connectivity_state initial_state,
825 OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)
826 : chand_(chand),
827 initial_state_(initial_state),
828 watcher_(std::move(watcher)) {
829 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherAdder");
830 chand_->work_serializer_->Run(
831 [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
832 AddWatcherLocked();
833 },
834 DEBUG_LOCATION);
835 }
836
837 private:
AddWatcherLocked()838 void AddWatcherLocked()
839 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
840 chand_->state_tracker_.AddWatcher(initial_state_, std::move(watcher_));
841 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ConnectivityWatcherAdder");
842 delete this;
843 }
844
845 ClientChannel* chand_;
846 grpc_connectivity_state initial_state_;
847 OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher_;
848 };
849
850 //
851 // ClientChannel::ConnectivityWatcherRemover
852 //
853
854 class ClientChannel::ConnectivityWatcherRemover {
855 public:
ConnectivityWatcherRemover(ClientChannel * chand,AsyncConnectivityStateWatcherInterface * watcher)856 ConnectivityWatcherRemover(ClientChannel* chand,
857 AsyncConnectivityStateWatcherInterface* watcher)
858 : chand_(chand), watcher_(watcher) {
859 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherRemover");
860 chand_->work_serializer_->Run(
861 [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
862 RemoveWatcherLocked();
863 },
864 DEBUG_LOCATION);
865 }
866
867 private:
RemoveWatcherLocked()868 void RemoveWatcherLocked()
869 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
870 chand_->state_tracker_.RemoveWatcher(watcher_);
871 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
872 "ConnectivityWatcherRemover");
873 delete this;
874 }
875
876 ClientChannel* chand_;
877 AsyncConnectivityStateWatcherInterface* watcher_;
878 };
879
880 //
881 // ClientChannel::ClientChannelControlHelper
882 //
883
884 class ClientChannel::ClientChannelControlHelper
885 : public LoadBalancingPolicy::ChannelControlHelper {
886 public:
ClientChannelControlHelper(ClientChannel * chand)887 explicit ClientChannelControlHelper(ClientChannel* chand) : chand_(chand) {
888 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ClientChannelControlHelper");
889 }
890
~ClientChannelControlHelper()891 ~ClientChannelControlHelper() override {
892 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
893 "ClientChannelControlHelper");
894 }
895
CreateSubchannel(ServerAddress address,const ChannelArgs & args)896 RefCountedPtr<SubchannelInterface> CreateSubchannel(
897 ServerAddress address, const ChannelArgs& args) override
898 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
899 if (chand_->resolver_ == nullptr) return nullptr; // Shutting down.
900 ChannelArgs subchannel_args = ClientChannel::MakeSubchannelArgs(
901 args, address.args(), chand_->subchannel_pool_,
902 chand_->default_authority_);
903 // Create subchannel.
904 RefCountedPtr<Subchannel> subchannel =
905 chand_->client_channel_factory_->CreateSubchannel(address.address(),
906 subchannel_args);
907 if (subchannel == nullptr) return nullptr;
908 // Make sure the subchannel has updated keepalive time.
909 subchannel->ThrottleKeepaliveTime(chand_->keepalive_time_);
910 // Create and return wrapper for the subchannel.
911 return MakeRefCounted<SubchannelWrapper>(chand_, std::move(subchannel));
912 }
913
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker)914 void UpdateState(grpc_connectivity_state state, const absl::Status& status,
915 RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker)
916 override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
917 if (chand_->resolver_ == nullptr) return; // Shutting down.
918 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
919 const char* extra = chand_->disconnect_error_.ok()
920 ? ""
921 : " (ignoring -- channel shutting down)";
922 gpr_log(GPR_INFO, "chand=%p: update: state=%s status=(%s) picker=%p%s",
923 chand_, ConnectivityStateName(state), status.ToString().c_str(),
924 picker.get(), extra);
925 }
926 // Do update only if not shutting down.
927 if (chand_->disconnect_error_.ok()) {
928 chand_->UpdateStateAndPickerLocked(state, status, "helper",
929 std::move(picker));
930 }
931 }
932
RequestReresolution()933 void RequestReresolution() override
934 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
935 if (chand_->resolver_ == nullptr) return; // Shutting down.
936 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
937 gpr_log(GPR_INFO, "chand=%p: started name re-resolving", chand_);
938 }
939 chand_->resolver_->RequestReresolutionLocked();
940 }
941
GetAuthority()942 absl::string_view GetAuthority() override {
943 return chand_->default_authority_;
944 }
945
GetEventEngine()946 grpc_event_engine::experimental::EventEngine* GetEventEngine() override {
947 return chand_->owning_stack_->EventEngine();
948 }
949
AddTraceEvent(TraceSeverity severity,absl::string_view message)950 void AddTraceEvent(TraceSeverity severity, absl::string_view message) override
951 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
952 if (chand_->resolver_ == nullptr) return; // Shutting down.
953 if (chand_->channelz_node_ != nullptr) {
954 chand_->channelz_node_->AddTraceEvent(
955 ConvertSeverityEnum(severity),
956 grpc_slice_from_copied_buffer(message.data(), message.size()));
957 }
958 }
959
960 private:
ConvertSeverityEnum(TraceSeverity severity)961 static channelz::ChannelTrace::Severity ConvertSeverityEnum(
962 TraceSeverity severity) {
963 if (severity == TRACE_INFO) return channelz::ChannelTrace::Info;
964 if (severity == TRACE_WARNING) return channelz::ChannelTrace::Warning;
965 return channelz::ChannelTrace::Error;
966 }
967
968 ClientChannel* chand_;
969 };
970
971 //
972 // ClientChannel implementation
973 //
974
GetFromChannel(Channel * channel)975 ClientChannel* ClientChannel::GetFromChannel(Channel* channel) {
976 grpc_channel_element* elem =
977 grpc_channel_stack_last_element(channel->channel_stack());
978 if (elem->filter != &kFilterVtable) return nullptr;
979 return static_cast<ClientChannel*>(elem->channel_data);
980 }
981
Init(grpc_channel_element * elem,grpc_channel_element_args * args)982 grpc_error_handle ClientChannel::Init(grpc_channel_element* elem,
983 grpc_channel_element_args* args) {
984 GPR_ASSERT(args->is_last);
985 GPR_ASSERT(elem->filter == &kFilterVtable);
986 grpc_error_handle error;
987 new (elem->channel_data) ClientChannel(args, &error);
988 return error;
989 }
990
Destroy(grpc_channel_element * elem)991 void ClientChannel::Destroy(grpc_channel_element* elem) {
992 ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
993 chand->~ClientChannel();
994 }
995
996 namespace {
997
GetSubchannelPool(const ChannelArgs & args)998 RefCountedPtr<SubchannelPoolInterface> GetSubchannelPool(
999 const ChannelArgs& args) {
1000 if (args.GetBool(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL).value_or(false)) {
1001 return MakeRefCounted<LocalSubchannelPool>();
1002 }
1003 return GlobalSubchannelPool::instance();
1004 }
1005
1006 } // namespace
1007
ClientChannel(grpc_channel_element_args * args,grpc_error_handle * error)1008 ClientChannel::ClientChannel(grpc_channel_element_args* args,
1009 grpc_error_handle* error)
1010 : channel_args_(args->channel_args),
1011 deadline_checking_enabled_(grpc_deadline_checking_enabled(channel_args_)),
1012 owning_stack_(args->channel_stack),
1013 client_channel_factory_(channel_args_.GetObject<ClientChannelFactory>()),
1014 channelz_node_(channel_args_.GetObject<channelz::ChannelNode>()),
1015 interested_parties_(grpc_pollset_set_create()),
1016 service_config_parser_index_(
1017 internal::ClientChannelServiceConfigParser::ParserIndex()),
1018 work_serializer_(std::make_shared<WorkSerializer>()),
1019 state_tracker_("client_channel", GRPC_CHANNEL_IDLE),
1020 subchannel_pool_(GetSubchannelPool(channel_args_)) {
1021 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1022 gpr_log(GPR_INFO, "chand=%p: creating client_channel for channel stack %p",
1023 this, owning_stack_);
1024 }
1025 // Start backup polling.
1026 grpc_client_channel_start_backup_polling(interested_parties_);
1027 // Check client channel factory.
1028 if (client_channel_factory_ == nullptr) {
1029 *error = GRPC_ERROR_CREATE(
1030 "Missing client channel factory in args for client channel filter");
1031 return;
1032 }
1033 // Get default service config. If none is specified via the client API,
1034 // we use an empty config.
1035 absl::optional<absl::string_view> service_config_json =
1036 channel_args_.GetString(GRPC_ARG_SERVICE_CONFIG);
1037 if (!service_config_json.has_value()) service_config_json = "{}";
1038 *error = absl::OkStatus();
1039 auto service_config =
1040 ServiceConfigImpl::Create(channel_args_, *service_config_json);
1041 if (!service_config.ok()) {
1042 *error = absl_status_to_grpc_error(service_config.status());
1043 return;
1044 }
1045 default_service_config_ = std::move(*service_config);
1046 // Get URI to resolve, using proxy mapper if needed.
1047 absl::optional<std::string> server_uri =
1048 channel_args_.GetOwnedString(GRPC_ARG_SERVER_URI);
1049 if (!server_uri.has_value()) {
1050 *error = GRPC_ERROR_CREATE(
1051 "target URI channel arg missing or wrong type in client channel "
1052 "filter");
1053 return;
1054 }
1055 uri_to_resolve_ = CoreConfiguration::Get()
1056 .proxy_mapper_registry()
1057 .MapName(*server_uri, &channel_args_)
1058 .value_or(*server_uri);
1059 // Make sure the URI to resolve is valid, so that we know that
1060 // resolver creation will succeed later.
1061 if (!CoreConfiguration::Get().resolver_registry().IsValidTarget(
1062 uri_to_resolve_)) {
1063 *error = GRPC_ERROR_CREATE(
1064 absl::StrCat("the target uri is not valid: ", uri_to_resolve_));
1065 return;
1066 }
1067 // Strip out service config channel arg, so that it doesn't affect
1068 // subchannel uniqueness when the args flow down to that layer.
1069 channel_args_ = channel_args_.Remove(GRPC_ARG_SERVICE_CONFIG);
1070 // Set initial keepalive time.
1071 auto keepalive_arg = channel_args_.GetInt(GRPC_ARG_KEEPALIVE_TIME_MS);
1072 if (keepalive_arg.has_value()) {
1073 keepalive_time_ = Clamp(*keepalive_arg, 1, INT_MAX);
1074 } else {
1075 keepalive_time_ = -1; // unset
1076 }
1077 // Set default authority.
1078 absl::optional<std::string> default_authority =
1079 channel_args_.GetOwnedString(GRPC_ARG_DEFAULT_AUTHORITY);
1080 if (!default_authority.has_value()) {
1081 default_authority_ =
1082 CoreConfiguration::Get().resolver_registry().GetDefaultAuthority(
1083 *server_uri);
1084 } else {
1085 default_authority_ = std::move(*default_authority);
1086 }
1087 // Success.
1088 *error = absl::OkStatus();
1089 }
1090
~ClientChannel()1091 ClientChannel::~ClientChannel() {
1092 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1093 gpr_log(GPR_INFO, "chand=%p: destroying channel", this);
1094 }
1095 DestroyResolverAndLbPolicyLocked();
1096 // Stop backup polling.
1097 grpc_client_channel_stop_backup_polling(interested_parties_);
1098 grpc_pollset_set_destroy(interested_parties_);
1099 }
1100
1101 OrphanablePtr<ClientChannel::FilterBasedLoadBalancedCall>
CreateLoadBalancedCall(const grpc_call_element_args & args,grpc_polling_entity * pollent,grpc_closure * on_call_destruction_complete,absl::AnyInvocable<void ()> on_commit,bool is_transparent_retry)1102 ClientChannel::CreateLoadBalancedCall(
1103 const grpc_call_element_args& args, grpc_polling_entity* pollent,
1104 grpc_closure* on_call_destruction_complete,
1105 absl::AnyInvocable<void()> on_commit, bool is_transparent_retry) {
1106 return OrphanablePtr<FilterBasedLoadBalancedCall>(
1107 args.arena->New<FilterBasedLoadBalancedCall>(
1108 this, args, pollent, on_call_destruction_complete,
1109 std::move(on_commit), is_transparent_retry));
1110 }
1111
MakeSubchannelArgs(const ChannelArgs & channel_args,const ChannelArgs & address_args,const RefCountedPtr<SubchannelPoolInterface> & subchannel_pool,const std::string & channel_default_authority)1112 ChannelArgs ClientChannel::MakeSubchannelArgs(
1113 const ChannelArgs& channel_args, const ChannelArgs& address_args,
1114 const RefCountedPtr<SubchannelPoolInterface>& subchannel_pool,
1115 const std::string& channel_default_authority) {
1116 // Note that we start with the channel-level args and then apply the
1117 // per-address args, so that if a value is present in both, the one
1118 // in the channel-level args is used. This is particularly important
1119 // for the GRPC_ARG_DEFAULT_AUTHORITY arg, which we want to allow
1120 // resolvers to set on a per-address basis only if the application
1121 // did not explicitly set it at the channel level.
1122 return channel_args.UnionWith(address_args)
1123 .SetObject(subchannel_pool)
1124 // If we haven't already set the default authority arg (i.e., it
1125 // was not explicitly set by the application nor overridden by
1126 // the resolver), add it from the channel's default.
1127 .SetIfUnset(GRPC_ARG_DEFAULT_AUTHORITY, channel_default_authority)
1128 // Remove channel args that should not affect subchannel
1129 // uniqueness.
1130 .Remove(GRPC_ARG_HEALTH_CHECK_SERVICE_NAME)
1131 .Remove(GRPC_ARG_INHIBIT_HEALTH_CHECKING)
1132 .Remove(GRPC_ARG_CHANNELZ_CHANNEL_NODE);
1133 }
1134
ReprocessQueuedResolverCalls()1135 void ClientChannel::ReprocessQueuedResolverCalls() {
1136 for (CallData* calld : resolver_queued_calls_) {
1137 calld->RemoveCallFromResolverQueuedCallsLocked();
1138 calld->RetryCheckResolutionLocked();
1139 }
1140 resolver_queued_calls_.clear();
1141 }
1142
1143 namespace {
1144
ChooseLbPolicy(const Resolver::Result & resolver_result,const internal::ClientChannelGlobalParsedConfig * parsed_service_config)1145 RefCountedPtr<LoadBalancingPolicy::Config> ChooseLbPolicy(
1146 const Resolver::Result& resolver_result,
1147 const internal::ClientChannelGlobalParsedConfig* parsed_service_config) {
1148 // Prefer the LB policy config found in the service config.
1149 if (parsed_service_config->parsed_lb_config() != nullptr) {
1150 return parsed_service_config->parsed_lb_config();
1151 }
1152 // Try the deprecated LB policy name from the service config.
1153 // If not, try the setting from channel args.
1154 absl::optional<absl::string_view> policy_name;
1155 if (!parsed_service_config->parsed_deprecated_lb_policy().empty()) {
1156 policy_name = parsed_service_config->parsed_deprecated_lb_policy();
1157 } else {
1158 policy_name = resolver_result.args.GetString(GRPC_ARG_LB_POLICY_NAME);
1159 bool requires_config = false;
1160 if (policy_name.has_value() &&
1161 (!CoreConfiguration::Get()
1162 .lb_policy_registry()
1163 .LoadBalancingPolicyExists(*policy_name, &requires_config) ||
1164 requires_config)) {
1165 if (requires_config) {
1166 gpr_log(GPR_ERROR,
1167 "LB policy: %s passed through channel_args must not "
1168 "require a config. Using pick_first instead.",
1169 std::string(*policy_name).c_str());
1170 } else {
1171 gpr_log(GPR_ERROR,
1172 "LB policy: %s passed through channel_args does not exist. "
1173 "Using pick_first instead.",
1174 std::string(*policy_name).c_str());
1175 }
1176 policy_name = "pick_first";
1177 }
1178 }
1179 // Use pick_first if nothing was specified and we didn't select grpclb
1180 // above.
1181 if (!policy_name.has_value()) policy_name = "pick_first";
1182 // Now that we have the policy name, construct an empty config for it.
1183 Json config_json = Json::FromArray({Json::FromObject({
1184 {std::string(*policy_name), Json::FromObject({})},
1185 })});
1186 auto lb_policy_config =
1187 CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
1188 config_json);
1189 // The policy name came from one of three places:
1190 // - The deprecated loadBalancingPolicy field in the service config,
1191 // in which case the code in ClientChannelServiceConfigParser
1192 // already verified that the policy does not require a config.
1193 // - One of the hard-coded values here, all of which are known to not
1194 // require a config.
1195 // - A channel arg, in which case we check that the specified policy exists
1196 // and accepts an empty config. If not, we revert to using pick_first
1197 // lb_policy
1198 GPR_ASSERT(lb_policy_config.ok());
1199 return std::move(*lb_policy_config);
1200 }
1201
1202 } // namespace
1203
OnResolverResultChangedLocked(Resolver::Result result)1204 void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) {
1205 // Handle race conditions.
1206 if (resolver_ == nullptr) return;
1207 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1208 gpr_log(GPR_INFO, "chand=%p: got resolver result", this);
1209 }
1210 // Grab resolver result health callback.
1211 auto resolver_callback = std::move(result.result_health_callback);
1212 absl::Status resolver_result_status;
1213 // We only want to trace the address resolution in the follow cases:
1214 // (a) Address resolution resulted in service config change.
1215 // (b) Address resolution that causes number of backends to go from
1216 // zero to non-zero.
1217 // (c) Address resolution that causes number of backends to go from
1218 // non-zero to zero.
1219 // (d) Address resolution that causes a new LB policy to be created.
1220 //
1221 // We track a list of strings to eventually be concatenated and traced.
1222 std::vector<const char*> trace_strings;
1223 const bool resolution_contains_addresses =
1224 result.addresses.ok() && !result.addresses->empty();
1225 if (!resolution_contains_addresses &&
1226 previous_resolution_contained_addresses_) {
1227 trace_strings.push_back("Address list became empty");
1228 } else if (resolution_contains_addresses &&
1229 !previous_resolution_contained_addresses_) {
1230 trace_strings.push_back("Address list became non-empty");
1231 }
1232 previous_resolution_contained_addresses_ = resolution_contains_addresses;
1233 std::string service_config_error_string_storage;
1234 if (!result.service_config.ok()) {
1235 service_config_error_string_storage =
1236 result.service_config.status().ToString();
1237 trace_strings.push_back(service_config_error_string_storage.c_str());
1238 }
1239 // Choose the service config.
1240 RefCountedPtr<ServiceConfig> service_config;
1241 RefCountedPtr<ConfigSelector> config_selector;
1242 if (!result.service_config.ok()) {
1243 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1244 gpr_log(GPR_INFO, "chand=%p: resolver returned service config error: %s",
1245 this, result.service_config.status().ToString().c_str());
1246 }
1247 // If the service config was invalid, then fallback to the
1248 // previously returned service config.
1249 if (saved_service_config_ != nullptr) {
1250 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1251 gpr_log(GPR_INFO,
1252 "chand=%p: resolver returned invalid service config. "
1253 "Continuing to use previous service config.",
1254 this);
1255 }
1256 service_config = saved_service_config_;
1257 config_selector = saved_config_selector_;
1258 } else {
1259 // We received a service config error and we don't have a
1260 // previous service config to fall back to. Put the channel into
1261 // TRANSIENT_FAILURE.
1262 OnResolverErrorLocked(result.service_config.status());
1263 trace_strings.push_back("no valid service config");
1264 resolver_result_status =
1265 absl::UnavailableError("no valid service config");
1266 }
1267 } else if (*result.service_config == nullptr) {
1268 // Resolver did not return any service config.
1269 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1270 gpr_log(GPR_INFO,
1271 "chand=%p: resolver returned no service config. Using default "
1272 "service config for channel.",
1273 this);
1274 }
1275 service_config = default_service_config_;
1276 } else {
1277 // Use ServiceConfig and ConfigSelector returned by resolver.
1278 service_config = std::move(*result.service_config);
1279 config_selector = result.args.GetObjectRef<ConfigSelector>();
1280 }
1281 // Note: The only case in which service_config is null here is if the resolver
1282 // returned a service config error and we don't have a previous service
1283 // config to fall back to.
1284 if (service_config != nullptr) {
1285 // Extract global config for client channel.
1286 const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
1287 static_cast<const internal::ClientChannelGlobalParsedConfig*>(
1288 service_config->GetGlobalParsedConfig(
1289 service_config_parser_index_));
1290 // Choose LB policy config.
1291 RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config =
1292 ChooseLbPolicy(result, parsed_service_config);
1293 // Check if the ServiceConfig has changed.
1294 const bool service_config_changed =
1295 saved_service_config_ == nullptr ||
1296 service_config->json_string() != saved_service_config_->json_string();
1297 // Check if the ConfigSelector has changed.
1298 const bool config_selector_changed = !ConfigSelector::Equals(
1299 saved_config_selector_.get(), config_selector.get());
1300 // If either has changed, apply the global parameters now.
1301 if (service_config_changed || config_selector_changed) {
1302 // Update service config in control plane.
1303 UpdateServiceConfigInControlPlaneLocked(
1304 std::move(service_config), std::move(config_selector),
1305 std::string(lb_policy_config->name()));
1306 } else if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1307 gpr_log(GPR_INFO, "chand=%p: service config not changed", this);
1308 }
1309 // Create or update LB policy, as needed.
1310 resolver_result_status = CreateOrUpdateLbPolicyLocked(
1311 std::move(lb_policy_config),
1312 parsed_service_config->health_check_service_name(), std::move(result));
1313 if (service_config_changed || config_selector_changed) {
1314 // Start using new service config for calls.
1315 // This needs to happen after the LB policy has been updated, since
1316 // the ConfigSelector may need the LB policy to know about new
1317 // destinations before it can send RPCs to those destinations.
1318 UpdateServiceConfigInDataPlaneLocked();
1319 // TODO(ncteisen): might be worth somehow including a snippet of the
1320 // config in the trace, at the risk of bloating the trace logs.
1321 trace_strings.push_back("Service config changed");
1322 }
1323 }
1324 // Invoke resolver callback if needed.
1325 if (resolver_callback != nullptr) {
1326 resolver_callback(std::move(resolver_result_status));
1327 }
1328 // Add channel trace event.
1329 if (!trace_strings.empty()) {
1330 std::string message =
1331 absl::StrCat("Resolution event: ", absl::StrJoin(trace_strings, ", "));
1332 if (channelz_node_ != nullptr) {
1333 channelz_node_->AddTraceEvent(channelz::ChannelTrace::Severity::Info,
1334 grpc_slice_from_cpp_string(message));
1335 }
1336 }
1337 }
1338
OnResolverErrorLocked(absl::Status status)1339 void ClientChannel::OnResolverErrorLocked(absl::Status status) {
1340 if (resolver_ == nullptr) return;
1341 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1342 gpr_log(GPR_INFO, "chand=%p: resolver transient failure: %s", this,
1343 status.ToString().c_str());
1344 }
1345 // If we already have an LB policy from a previous resolution
1346 // result, then we continue to let it set the connectivity state.
1347 // Otherwise, we go into TRANSIENT_FAILURE.
1348 if (lb_policy_ == nullptr) {
1349 // Update connectivity state.
1350 UpdateStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, status,
1351 "resolver failure");
1352 {
1353 MutexLock lock(&resolution_mu_);
1354 // Update resolver transient failure.
1355 resolver_transient_failure_error_ =
1356 MaybeRewriteIllegalStatusCode(status, "resolver");
1357 ReprocessQueuedResolverCalls();
1358 }
1359 }
1360 }
1361
CreateOrUpdateLbPolicyLocked(RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,const absl::optional<std::string> & health_check_service_name,Resolver::Result result)1362 absl::Status ClientChannel::CreateOrUpdateLbPolicyLocked(
1363 RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
1364 const absl::optional<std::string>& health_check_service_name,
1365 Resolver::Result result) {
1366 // Construct update.
1367 LoadBalancingPolicy::UpdateArgs update_args;
1368 update_args.addresses = std::move(result.addresses);
1369 update_args.config = std::move(lb_policy_config);
1370 update_args.resolution_note = std::move(result.resolution_note);
1371 // Remove the config selector from channel args so that we're not holding
1372 // unnecessary refs that cause it to be destroyed somewhere other than in the
1373 // WorkSerializer.
1374 update_args.args = result.args.Remove(GRPC_ARG_CONFIG_SELECTOR);
1375 // Add health check service name to channel args.
1376 if (health_check_service_name.has_value()) {
1377 update_args.args = update_args.args.Set(GRPC_ARG_HEALTH_CHECK_SERVICE_NAME,
1378 *health_check_service_name);
1379 }
1380 // Create policy if needed.
1381 if (lb_policy_ == nullptr) {
1382 lb_policy_ = CreateLbPolicyLocked(update_args.args);
1383 }
1384 // Update the policy.
1385 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1386 gpr_log(GPR_INFO, "chand=%p: Updating child policy %p", this,
1387 lb_policy_.get());
1388 }
1389 return lb_policy_->UpdateLocked(std::move(update_args));
1390 }
1391
1392 // Creates a new LB policy.
CreateLbPolicyLocked(const ChannelArgs & args)1393 OrphanablePtr<LoadBalancingPolicy> ClientChannel::CreateLbPolicyLocked(
1394 const ChannelArgs& args) {
1395 // The LB policy will start in state CONNECTING but will not
1396 // necessarily send us an update synchronously, so set state to
1397 // CONNECTING (in case the resolver had previously failed and put the
1398 // channel into TRANSIENT_FAILURE) and make sure we have a queueing picker.
1399 UpdateStateAndPickerLocked(
1400 GRPC_CHANNEL_CONNECTING, absl::Status(), "started resolving",
1401 MakeRefCounted<LoadBalancingPolicy::QueuePicker>(nullptr));
1402 // Now create the LB policy.
1403 LoadBalancingPolicy::Args lb_policy_args;
1404 lb_policy_args.work_serializer = work_serializer_;
1405 lb_policy_args.channel_control_helper =
1406 std::make_unique<ClientChannelControlHelper>(this);
1407 lb_policy_args.args = args;
1408 OrphanablePtr<LoadBalancingPolicy> lb_policy =
1409 MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
1410 &grpc_client_channel_trace);
1411 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1412 gpr_log(GPR_INFO, "chand=%p: created new LB policy %p", this,
1413 lb_policy.get());
1414 }
1415 grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
1416 interested_parties_);
1417 return lb_policy;
1418 }
1419
UpdateServiceConfigInControlPlaneLocked(RefCountedPtr<ServiceConfig> service_config,RefCountedPtr<ConfigSelector> config_selector,std::string lb_policy_name)1420 void ClientChannel::UpdateServiceConfigInControlPlaneLocked(
1421 RefCountedPtr<ServiceConfig> service_config,
1422 RefCountedPtr<ConfigSelector> config_selector, std::string lb_policy_name) {
1423 std::string service_config_json(service_config->json_string());
1424 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1425 gpr_log(GPR_INFO, "chand=%p: using service config: \"%s\"", this,
1426 service_config_json.c_str());
1427 }
1428 // Save service config.
1429 saved_service_config_ = std::move(service_config);
1430 // Swap out the data used by GetChannelInfo().
1431 {
1432 MutexLock lock(&info_mu_);
1433 info_lb_policy_name_ = std::move(lb_policy_name);
1434 info_service_config_json_ = std::move(service_config_json);
1435 }
1436 // Save config selector.
1437 saved_config_selector_ = std::move(config_selector);
1438 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1439 gpr_log(GPR_INFO, "chand=%p: using ConfigSelector %p", this,
1440 saved_config_selector_.get());
1441 }
1442 }
1443
UpdateServiceConfigInDataPlaneLocked()1444 void ClientChannel::UpdateServiceConfigInDataPlaneLocked() {
1445 // Grab ref to service config.
1446 RefCountedPtr<ServiceConfig> service_config = saved_service_config_;
1447 // Grab ref to config selector. Use default if resolver didn't supply one.
1448 RefCountedPtr<ConfigSelector> config_selector = saved_config_selector_;
1449 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1450 gpr_log(GPR_INFO, "chand=%p: switching to ConfigSelector %p", this,
1451 saved_config_selector_.get());
1452 }
1453 if (config_selector == nullptr) {
1454 config_selector =
1455 MakeRefCounted<DefaultConfigSelector>(saved_service_config_);
1456 }
1457 ChannelArgs new_args =
1458 channel_args_.SetObject(this).SetObject(service_config);
1459 bool enable_retries =
1460 !new_args.WantMinimalStack() &&
1461 new_args.GetBool(GRPC_ARG_ENABLE_RETRIES).value_or(true);
1462 // Construct dynamic filter stack.
1463 std::vector<const grpc_channel_filter*> filters =
1464 config_selector->GetFilters();
1465 if (enable_retries) {
1466 filters.push_back(&kRetryFilterVtable);
1467 } else {
1468 filters.push_back(&DynamicTerminationFilter::kFilterVtable);
1469 }
1470 RefCountedPtr<DynamicFilters> dynamic_filters =
1471 DynamicFilters::Create(new_args, std::move(filters));
1472 GPR_ASSERT(dynamic_filters != nullptr);
1473 // Grab data plane lock to update service config.
1474 //
1475 // We defer unreffing the old values (and deallocating memory) until
1476 // after releasing the lock to keep the critical section small.
1477 {
1478 MutexLock lock(&resolution_mu_);
1479 resolver_transient_failure_error_ = absl::OkStatus();
1480 // Update service config.
1481 received_service_config_data_ = true;
1482 // Old values will be unreffed after lock is released.
1483 service_config_.swap(service_config);
1484 config_selector_.swap(config_selector);
1485 dynamic_filters_.swap(dynamic_filters);
1486 // Re-process queued calls asynchronously.
1487 ReprocessQueuedResolverCalls();
1488 }
1489 // Old values will be unreffed after lock is released when they go out
1490 // of scope.
1491 }
1492
CreateResolverLocked()1493 void ClientChannel::CreateResolverLocked() {
1494 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1495 gpr_log(GPR_INFO, "chand=%p: starting name resolution for %s", this,
1496 uri_to_resolve_.c_str());
1497 }
1498 resolver_ = CoreConfiguration::Get().resolver_registry().CreateResolver(
1499 uri_to_resolve_, channel_args_, interested_parties_, work_serializer_,
1500 std::make_unique<ResolverResultHandler>(this));
1501 // Since the validity of the args was checked when the channel was created,
1502 // CreateResolver() must return a non-null result.
1503 GPR_ASSERT(resolver_ != nullptr);
1504 UpdateStateLocked(GRPC_CHANNEL_CONNECTING, absl::Status(),
1505 "started resolving");
1506 resolver_->StartLocked();
1507 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1508 gpr_log(GPR_INFO, "chand=%p: created resolver=%p", this, resolver_.get());
1509 }
1510 }
1511
DestroyResolverAndLbPolicyLocked()1512 void ClientChannel::DestroyResolverAndLbPolicyLocked() {
1513 if (resolver_ != nullptr) {
1514 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1515 gpr_log(GPR_INFO, "chand=%p: shutting down resolver=%p", this,
1516 resolver_.get());
1517 }
1518 resolver_.reset();
1519 // Clear resolution state.
1520 saved_service_config_.reset();
1521 saved_config_selector_.reset();
1522 // Acquire resolution lock to update config selector and associated state.
1523 // To minimize lock contention, we wait to unref these objects until
1524 // after we release the lock.
1525 RefCountedPtr<ServiceConfig> service_config_to_unref;
1526 RefCountedPtr<ConfigSelector> config_selector_to_unref;
1527 RefCountedPtr<DynamicFilters> dynamic_filters_to_unref;
1528 {
1529 MutexLock lock(&resolution_mu_);
1530 received_service_config_data_ = false;
1531 service_config_to_unref = std::move(service_config_);
1532 config_selector_to_unref = std::move(config_selector_);
1533 dynamic_filters_to_unref = std::move(dynamic_filters_);
1534 }
1535 // Clear LB policy if set.
1536 if (lb_policy_ != nullptr) {
1537 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1538 gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", this,
1539 lb_policy_.get());
1540 }
1541 grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(),
1542 interested_parties_);
1543 lb_policy_.reset();
1544 }
1545 }
1546 }
1547
UpdateStateLocked(grpc_connectivity_state state,const absl::Status & status,const char * reason)1548 void ClientChannel::UpdateStateLocked(grpc_connectivity_state state,
1549 const absl::Status& status,
1550 const char* reason) {
1551 state_tracker_.SetState(state, status, reason);
1552 if (channelz_node_ != nullptr) {
1553 channelz_node_->SetConnectivityState(state);
1554 channelz_node_->AddTraceEvent(
1555 channelz::ChannelTrace::Severity::Info,
1556 grpc_slice_from_static_string(
1557 channelz::ChannelNode::GetChannelConnectivityStateChangeString(
1558 state)));
1559 }
1560 }
1561
UpdateStateAndPickerLocked(grpc_connectivity_state state,const absl::Status & status,const char * reason,RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker)1562 void ClientChannel::UpdateStateAndPickerLocked(
1563 grpc_connectivity_state state, const absl::Status& status,
1564 const char* reason,
1565 RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) {
1566 UpdateStateLocked(state, status, reason);
1567 // Grab the LB lock to update the picker and trigger reprocessing of the
1568 // queued picks.
1569 // Old picker will be unreffed after releasing the lock.
1570 MutexLock lock(&lb_mu_);
1571 picker_.swap(picker);
1572 // Reprocess queued picks.
1573 for (LoadBalancedCall* call : lb_queued_calls_) {
1574 call->RemoveCallFromLbQueuedCallsLocked();
1575 call->RetryPickLocked();
1576 }
1577 lb_queued_calls_.clear();
1578 }
1579
1580 namespace {
1581
1582 // TODO(roth): Remove this in favor of the gprpp Match() function once
1583 // we can do that without breaking lock annotations.
1584 template <typename T>
HandlePickResult(LoadBalancingPolicy::PickResult * result,std::function<T (LoadBalancingPolicy::PickResult::Complete *)> complete_func,std::function<T (LoadBalancingPolicy::PickResult::Queue *)> queue_func,std::function<T (LoadBalancingPolicy::PickResult::Fail *)> fail_func,std::function<T (LoadBalancingPolicy::PickResult::Drop *)> drop_func)1585 T HandlePickResult(
1586 LoadBalancingPolicy::PickResult* result,
1587 std::function<T(LoadBalancingPolicy::PickResult::Complete*)> complete_func,
1588 std::function<T(LoadBalancingPolicy::PickResult::Queue*)> queue_func,
1589 std::function<T(LoadBalancingPolicy::PickResult::Fail*)> fail_func,
1590 std::function<T(LoadBalancingPolicy::PickResult::Drop*)> drop_func) {
1591 auto* complete_pick =
1592 absl::get_if<LoadBalancingPolicy::PickResult::Complete>(&result->result);
1593 if (complete_pick != nullptr) {
1594 return complete_func(complete_pick);
1595 }
1596 auto* queue_pick =
1597 absl::get_if<LoadBalancingPolicy::PickResult::Queue>(&result->result);
1598 if (queue_pick != nullptr) {
1599 return queue_func(queue_pick);
1600 }
1601 auto* fail_pick =
1602 absl::get_if<LoadBalancingPolicy::PickResult::Fail>(&result->result);
1603 if (fail_pick != nullptr) {
1604 return fail_func(fail_pick);
1605 }
1606 auto* drop_pick =
1607 absl::get_if<LoadBalancingPolicy::PickResult::Drop>(&result->result);
1608 GPR_ASSERT(drop_pick != nullptr);
1609 return drop_func(drop_pick);
1610 }
1611
1612 } // namespace
1613
DoPingLocked(grpc_transport_op * op)1614 grpc_error_handle ClientChannel::DoPingLocked(grpc_transport_op* op) {
1615 if (state_tracker_.state() != GRPC_CHANNEL_READY) {
1616 return GRPC_ERROR_CREATE("channel not connected");
1617 }
1618 LoadBalancingPolicy::PickResult result;
1619 {
1620 MutexLock lock(&lb_mu_);
1621 result = picker_->Pick(LoadBalancingPolicy::PickArgs());
1622 }
1623 return HandlePickResult<grpc_error_handle>(
1624 &result,
1625 // Complete pick.
1626 [op](LoadBalancingPolicy::PickResult::Complete* complete_pick)
1627 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*ClientChannel::work_serializer_) {
1628 SubchannelWrapper* subchannel = static_cast<SubchannelWrapper*>(
1629 complete_pick->subchannel.get());
1630 RefCountedPtr<ConnectedSubchannel> connected_subchannel =
1631 subchannel->connected_subchannel();
1632 if (connected_subchannel == nullptr) {
1633 return GRPC_ERROR_CREATE("LB pick for ping not connected");
1634 }
1635 connected_subchannel->Ping(op->send_ping.on_initiate,
1636 op->send_ping.on_ack);
1637 return absl::OkStatus();
1638 },
1639 // Queue pick.
1640 [](LoadBalancingPolicy::PickResult::Queue* /*queue_pick*/) {
1641 return GRPC_ERROR_CREATE("LB picker queued call");
1642 },
1643 // Fail pick.
1644 [](LoadBalancingPolicy::PickResult::Fail* fail_pick) {
1645 return absl_status_to_grpc_error(fail_pick->status);
1646 },
1647 // Drop pick.
1648 [](LoadBalancingPolicy::PickResult::Drop* drop_pick) {
1649 return absl_status_to_grpc_error(drop_pick->status);
1650 });
1651 }
1652
StartTransportOpLocked(grpc_transport_op * op)1653 void ClientChannel::StartTransportOpLocked(grpc_transport_op* op) {
1654 // Connectivity watch.
1655 if (op->start_connectivity_watch != nullptr) {
1656 state_tracker_.AddWatcher(op->start_connectivity_watch_state,
1657 std::move(op->start_connectivity_watch));
1658 }
1659 if (op->stop_connectivity_watch != nullptr) {
1660 state_tracker_.RemoveWatcher(op->stop_connectivity_watch);
1661 }
1662 // Ping.
1663 if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
1664 grpc_error_handle error = DoPingLocked(op);
1665 if (!error.ok()) {
1666 ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_initiate, error);
1667 ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_ack, error);
1668 }
1669 op->bind_pollset = nullptr;
1670 op->send_ping.on_initiate = nullptr;
1671 op->send_ping.on_ack = nullptr;
1672 }
1673 // Reset backoff.
1674 if (op->reset_connect_backoff) {
1675 if (lb_policy_ != nullptr) {
1676 lb_policy_->ResetBackoffLocked();
1677 }
1678 }
1679 // Disconnect or enter IDLE.
1680 if (!op->disconnect_with_error.ok()) {
1681 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1682 gpr_log(GPR_INFO, "chand=%p: disconnect_with_error: %s", this,
1683 StatusToString(op->disconnect_with_error).c_str());
1684 }
1685 DestroyResolverAndLbPolicyLocked();
1686 intptr_t value;
1687 if (grpc_error_get_int(op->disconnect_with_error,
1688 StatusIntProperty::ChannelConnectivityState,
1689 &value) &&
1690 static_cast<grpc_connectivity_state>(value) == GRPC_CHANNEL_IDLE) {
1691 if (disconnect_error_.ok()) { // Ignore if we're shutting down.
1692 // Enter IDLE state.
1693 UpdateStateAndPickerLocked(GRPC_CHANNEL_IDLE, absl::Status(),
1694 "channel entering IDLE", nullptr);
1695 // TODO(roth): Do we need to check for any queued picks here, in
1696 // case there's a race condition in the client_idle filter?
1697 // And maybe also check for calls in the resolver queue?
1698 }
1699 } else {
1700 // Disconnect.
1701 GPR_ASSERT(disconnect_error_.ok());
1702 disconnect_error_ = op->disconnect_with_error;
1703 UpdateStateAndPickerLocked(
1704 GRPC_CHANNEL_SHUTDOWN, absl::Status(), "shutdown from API",
1705 MakeRefCounted<LoadBalancingPolicy::TransientFailurePicker>(
1706 grpc_error_to_absl_status(op->disconnect_with_error)));
1707 // TODO(roth): If this happens when we're still waiting for a
1708 // resolver result, we need to trigger failures for all calls in
1709 // the resolver queue here.
1710 }
1711 }
1712 GRPC_CHANNEL_STACK_UNREF(owning_stack_, "start_transport_op");
1713 ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, absl::OkStatus());
1714 }
1715
StartTransportOp(grpc_channel_element * elem,grpc_transport_op * op)1716 void ClientChannel::StartTransportOp(grpc_channel_element* elem,
1717 grpc_transport_op* op) {
1718 ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
1719 GPR_ASSERT(op->set_accept_stream == false);
1720 // Handle bind_pollset.
1721 if (op->bind_pollset != nullptr) {
1722 grpc_pollset_set_add_pollset(chand->interested_parties_, op->bind_pollset);
1723 }
1724 // Pop into control plane work_serializer for remaining ops.
1725 GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "start_transport_op");
1726 chand->work_serializer_->Run(
1727 [chand, op]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand->work_serializer_) {
1728 chand->StartTransportOpLocked(op);
1729 },
1730 DEBUG_LOCATION);
1731 }
1732
GetChannelInfo(grpc_channel_element * elem,const grpc_channel_info * info)1733 void ClientChannel::GetChannelInfo(grpc_channel_element* elem,
1734 const grpc_channel_info* info) {
1735 ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
1736 MutexLock lock(&chand->info_mu_);
1737 if (info->lb_policy_name != nullptr) {
1738 *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name_.c_str());
1739 }
1740 if (info->service_config_json != nullptr) {
1741 *info->service_config_json =
1742 gpr_strdup(chand->info_service_config_json_.c_str());
1743 }
1744 }
1745
TryToConnectLocked()1746 void ClientChannel::TryToConnectLocked() {
1747 if (lb_policy_ != nullptr) {
1748 lb_policy_->ExitIdleLocked();
1749 } else if (resolver_ == nullptr) {
1750 CreateResolverLocked();
1751 }
1752 GRPC_CHANNEL_STACK_UNREF(owning_stack_, "TryToConnect");
1753 }
1754
CheckConnectivityState(bool try_to_connect)1755 grpc_connectivity_state ClientChannel::CheckConnectivityState(
1756 bool try_to_connect) {
1757 // state_tracker_ is guarded by work_serializer_, which we're not
1758 // holding here. But the one method of state_tracker_ that *is*
1759 // thread-safe to call without external synchronization is the state()
1760 // method, so we can disable thread-safety analysis for this one read.
1761 grpc_connectivity_state out = ABSL_TS_UNCHECKED_READ(state_tracker_).state();
1762 if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
1763 GRPC_CHANNEL_STACK_REF(owning_stack_, "TryToConnect");
1764 work_serializer_->Run([this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(
1765 *work_serializer_) { TryToConnectLocked(); },
1766 DEBUG_LOCATION);
1767 }
1768 return out;
1769 }
1770
AddConnectivityWatcher(grpc_connectivity_state initial_state,OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)1771 void ClientChannel::AddConnectivityWatcher(
1772 grpc_connectivity_state initial_state,
1773 OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher) {
1774 new ConnectivityWatcherAdder(this, initial_state, std::move(watcher));
1775 }
1776
RemoveConnectivityWatcher(AsyncConnectivityStateWatcherInterface * watcher)1777 void ClientChannel::RemoveConnectivityWatcher(
1778 AsyncConnectivityStateWatcherInterface* watcher) {
1779 new ConnectivityWatcherRemover(this, watcher);
1780 }
1781
1782 //
1783 // CallData implementation
1784 //
1785
RemoveCallFromResolverQueuedCallsLocked()1786 void ClientChannel::CallData::RemoveCallFromResolverQueuedCallsLocked() {
1787 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1788 gpr_log(GPR_INFO,
1789 "chand=%p calld=%p: removing from resolver queued picks list",
1790 chand(), this);
1791 }
1792 // Remove call's pollent from channel's interested_parties.
1793 grpc_polling_entity_del_from_pollset_set(pollent(),
1794 chand()->interested_parties_);
1795 // Note: There's no need to actually remove the call from the queue
1796 // here, because that will be done in
1797 // ResolverQueuedCallCanceller::CancelLocked() or
1798 // ClientChannel::ReprocessQueuedResolverCalls().
1799 }
1800
AddCallToResolverQueuedCallsLocked()1801 void ClientChannel::CallData::AddCallToResolverQueuedCallsLocked() {
1802 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1803 gpr_log(GPR_INFO, "chand=%p calld=%p: adding to resolver queued picks list",
1804 chand(), this);
1805 }
1806 // Add call's pollent to channel's interested_parties, so that I/O
1807 // can be done under the call's CQ.
1808 grpc_polling_entity_add_to_pollset_set(pollent(),
1809 chand()->interested_parties_);
1810 // Add to queue.
1811 chand()->resolver_queued_calls_.insert(this);
1812 OnAddToQueueLocked();
1813 }
1814
ApplyServiceConfigToCallLocked(const absl::StatusOr<RefCountedPtr<ConfigSelector>> & config_selector)1815 grpc_error_handle ClientChannel::CallData::ApplyServiceConfigToCallLocked(
1816 const absl::StatusOr<RefCountedPtr<ConfigSelector>>& config_selector) {
1817 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1818 gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
1819 chand(), this);
1820 }
1821 if (!config_selector.ok()) return config_selector.status();
1822 // Create a ClientChannelServiceConfigCallData for the call. This stores
1823 // a ref to the ServiceConfig and caches the right set of parsed configs
1824 // to use for the call. The ClientChannelServiceConfigCallData will store
1825 // itself in the call context, so that it can be accessed by filters
1826 // below us in the stack, and it will be cleaned up when the call ends.
1827 auto* service_config_call_data =
1828 arena()->New<ClientChannelServiceConfigCallData>(arena(), call_context());
1829 // Use the ConfigSelector to determine the config for the call.
1830 absl::Status call_config_status =
1831 (*config_selector)
1832 ->GetCallConfig(
1833 {send_initial_metadata(), arena(), service_config_call_data});
1834 if (!call_config_status.ok()) {
1835 return absl_status_to_grpc_error(
1836 MaybeRewriteIllegalStatusCode(call_config_status, "ConfigSelector"));
1837 }
1838 // Apply our own method params to the call.
1839 auto* method_params = static_cast<ClientChannelMethodParsedConfig*>(
1840 service_config_call_data->GetMethodParsedConfig(
1841 chand()->service_config_parser_index_));
1842 if (method_params != nullptr) {
1843 // If the deadline from the service config is shorter than the one
1844 // from the client API, reset the deadline timer.
1845 if (chand()->deadline_checking_enabled_ &&
1846 method_params->timeout() != Duration::Zero()) {
1847 ResetDeadline(method_params->timeout());
1848 }
1849 // If the service config set wait_for_ready and the application
1850 // did not explicitly set it, use the value from the service config.
1851 auto* wait_for_ready =
1852 send_initial_metadata()->GetOrCreatePointer(WaitForReady());
1853 if (method_params->wait_for_ready().has_value() &&
1854 !wait_for_ready->explicitly_set) {
1855 wait_for_ready->value = method_params->wait_for_ready().value();
1856 }
1857 }
1858 return absl::OkStatus();
1859 }
1860
CheckResolution(bool was_queued)1861 absl::optional<absl::Status> ClientChannel::CallData::CheckResolution(
1862 bool was_queued) {
1863 // Check if we have a resolver result to use.
1864 absl::StatusOr<RefCountedPtr<ConfigSelector>> config_selector;
1865 {
1866 MutexLock lock(&chand()->resolution_mu_);
1867 bool result_ready = CheckResolutionLocked(&config_selector);
1868 // If no result is available, queue the call.
1869 if (!result_ready) {
1870 AddCallToResolverQueuedCallsLocked();
1871 return absl::nullopt;
1872 }
1873 }
1874 // We have a result. Apply service config to call.
1875 grpc_error_handle error = ApplyServiceConfigToCallLocked(config_selector);
1876 // ConfigSelector must be unreffed inside the WorkSerializer.
1877 if (config_selector.ok()) {
1878 chand()->work_serializer_->Run(
1879 [config_selector = std::move(*config_selector)]() mutable {
1880 config_selector.reset();
1881 },
1882 DEBUG_LOCATION);
1883 }
1884 // Handle errors.
1885 if (!error.ok()) {
1886 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1887 gpr_log(GPR_INFO,
1888 "chand=%p calld=%p: error applying config to call: error=%s",
1889 chand(), this, StatusToString(error).c_str());
1890 }
1891 return error;
1892 }
1893 // If the call was queued, add trace annotation.
1894 if (was_queued) {
1895 auto* call_tracer = static_cast<CallTracerAnnotationInterface*>(
1896 call_context()[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value);
1897 if (call_tracer != nullptr) {
1898 call_tracer->RecordAnnotation("Delayed name resolution complete.");
1899 }
1900 }
1901 return absl::OkStatus();
1902 }
1903
CheckResolutionLocked(absl::StatusOr<RefCountedPtr<ConfigSelector>> * config_selector)1904 bool ClientChannel::CallData::CheckResolutionLocked(
1905 absl::StatusOr<RefCountedPtr<ConfigSelector>>* config_selector) {
1906 // If we don't yet have a resolver result, we need to queue the call
1907 // until we get one.
1908 if (GPR_UNLIKELY(!chand()->received_service_config_data_)) {
1909 // If the resolver returned transient failure before returning the
1910 // first service config, fail any non-wait_for_ready calls.
1911 absl::Status resolver_error = chand()->resolver_transient_failure_error_;
1912 if (!resolver_error.ok() &&
1913 !send_initial_metadata()->GetOrCreatePointer(WaitForReady())->value) {
1914 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1915 gpr_log(GPR_INFO, "chand=%p calld=%p: resolution failed, failing call",
1916 chand(), this);
1917 }
1918 *config_selector = absl_status_to_grpc_error(resolver_error);
1919 return true;
1920 }
1921 // Either the resolver has not yet returned a result, or it has
1922 // returned transient failure but the call is wait_for_ready. In
1923 // either case, queue the call.
1924 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1925 gpr_log(GPR_INFO, "chand=%p calld=%p: no resolver result yet", chand(),
1926 this);
1927 }
1928 return false;
1929 }
1930 // Result found.
1931 *config_selector = chand()->config_selector_;
1932 dynamic_filters_ = chand()->dynamic_filters_;
1933 return true;
1934 }
1935
1936 //
1937 // FilterBasedCallData implementation
1938 //
1939
FilterBasedCallData(grpc_call_element * elem,const grpc_call_element_args & args)1940 ClientChannel::FilterBasedCallData::FilterBasedCallData(
1941 grpc_call_element* elem, const grpc_call_element_args& args)
1942 : path_(CSliceRef(args.path)),
1943 call_context_(args.context),
1944 call_start_time_(args.start_time),
1945 deadline_(args.deadline),
1946 deadline_state_(elem, args,
1947 GPR_LIKELY(static_cast<ClientChannel*>(elem->channel_data)
1948 ->deadline_checking_enabled_)
1949 ? args.deadline
1950 : Timestamp::InfFuture()) {
1951 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1952 gpr_log(GPR_INFO, "chand=%p calld=%p: created call", chand(), this);
1953 }
1954 }
1955
~FilterBasedCallData()1956 ClientChannel::FilterBasedCallData::~FilterBasedCallData() {
1957 CSliceUnref(path_);
1958 // Make sure there are no remaining pending batches.
1959 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
1960 GPR_ASSERT(pending_batches_[i] == nullptr);
1961 }
1962 }
1963
Init(grpc_call_element * elem,const grpc_call_element_args * args)1964 grpc_error_handle ClientChannel::FilterBasedCallData::Init(
1965 grpc_call_element* elem, const grpc_call_element_args* args) {
1966 new (elem->call_data) FilterBasedCallData(elem, *args);
1967 return absl::OkStatus();
1968 }
1969
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure * then_schedule_closure)1970 void ClientChannel::FilterBasedCallData::Destroy(
1971 grpc_call_element* elem, const grpc_call_final_info* /*final_info*/,
1972 grpc_closure* then_schedule_closure) {
1973 auto* calld = static_cast<FilterBasedCallData*>(elem->call_data);
1974 RefCountedPtr<DynamicFilters::Call> dynamic_call =
1975 std::move(calld->dynamic_call_);
1976 calld->~FilterBasedCallData();
1977 if (GPR_LIKELY(dynamic_call != nullptr)) {
1978 dynamic_call->SetAfterCallStackDestroy(then_schedule_closure);
1979 } else {
1980 // TODO(yashkt) : This can potentially be a Closure::Run
1981 ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, absl::OkStatus());
1982 }
1983 }
1984
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)1985 void ClientChannel::FilterBasedCallData::StartTransportStreamOpBatch(
1986 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
1987 auto* calld = static_cast<FilterBasedCallData*>(elem->call_data);
1988 ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
1989 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace) &&
1990 !GRPC_TRACE_FLAG_ENABLED(grpc_trace_channel)) {
1991 gpr_log(GPR_INFO, "chand=%p calld=%p: batch started from above: %s", chand,
1992 calld, grpc_transport_stream_op_batch_string(batch, false).c_str());
1993 }
1994 if (GPR_LIKELY(chand->deadline_checking_enabled_)) {
1995 grpc_deadline_state_client_start_transport_stream_op_batch(
1996 &calld->deadline_state_, batch);
1997 }
1998 // Intercept recv_trailing_metadata to commit the call, in case we wind up
1999 // failing the call before we get down to the retry or LB call layer.
2000 if (batch->recv_trailing_metadata) {
2001 calld->original_recv_trailing_metadata_ready_ =
2002 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
2003 GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_,
2004 RecvTrailingMetadataReadyForConfigSelectorCommitCallback,
2005 calld, nullptr);
2006 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
2007 &calld->recv_trailing_metadata_ready_;
2008 }
2009 // If we already have a dynamic call, pass the batch down to it.
2010 // Note that once we have done so, we do not need to acquire the channel's
2011 // resolution mutex, which is more efficient (especially for streaming calls).
2012 if (calld->dynamic_call_ != nullptr) {
2013 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2014 gpr_log(GPR_INFO, "chand=%p calld=%p: starting batch on dynamic_call=%p",
2015 chand, calld, calld->dynamic_call_.get());
2016 }
2017 calld->dynamic_call_->StartTransportStreamOpBatch(batch);
2018 return;
2019 }
2020 // We do not yet have a dynamic call.
2021 //
2022 // If we've previously been cancelled, immediately fail any new batches.
2023 if (GPR_UNLIKELY(!calld->cancel_error_.ok())) {
2024 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2025 gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s",
2026 chand, calld, StatusToString(calld->cancel_error_).c_str());
2027 }
2028 // Note: This will release the call combiner.
2029 grpc_transport_stream_op_batch_finish_with_failure(
2030 batch, calld->cancel_error_, calld->call_combiner());
2031 return;
2032 }
2033 // Handle cancellation.
2034 if (GPR_UNLIKELY(batch->cancel_stream)) {
2035 // Stash a copy of cancel_error in our call data, so that we can use
2036 // it for subsequent operations. This ensures that if the call is
2037 // cancelled before any batches are passed down (e.g., if the deadline
2038 // is in the past when the call starts), we can return the right
2039 // error to the caller when the first batch does get passed down.
2040 calld->cancel_error_ = batch->payload->cancel_stream.cancel_error;
2041 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2042 gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand,
2043 calld, StatusToString(calld->cancel_error_).c_str());
2044 }
2045 // Fail all pending batches.
2046 calld->PendingBatchesFail(calld->cancel_error_, NoYieldCallCombiner);
2047 // Note: This will release the call combiner.
2048 grpc_transport_stream_op_batch_finish_with_failure(
2049 batch, calld->cancel_error_, calld->call_combiner());
2050 return;
2051 }
2052 // Add the batch to the pending list.
2053 calld->PendingBatchesAdd(batch);
2054 // For batches containing a send_initial_metadata op, acquire the
2055 // channel's resolution mutex to apply the service config to the call,
2056 // after which we will create a dynamic call.
2057 if (GPR_LIKELY(batch->send_initial_metadata)) {
2058 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2059 gpr_log(GPR_INFO,
2060 "chand=%p calld=%p: grabbing resolution mutex to apply service "
2061 "config",
2062 chand, calld);
2063 }
2064 // If we're still in IDLE, we need to start resolving.
2065 if (GPR_UNLIKELY(chand->CheckConnectivityState(false) ==
2066 GRPC_CHANNEL_IDLE)) {
2067 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2068 gpr_log(GPR_INFO, "chand=%p calld=%p: triggering exit idle", chand,
2069 calld);
2070 }
2071 // Bounce into the control plane work serializer to start resolving.
2072 GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "ExitIdle");
2073 chand->work_serializer_->Run(
2074 [chand]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand->work_serializer_) {
2075 chand->CheckConnectivityState(/*try_to_connect=*/true);
2076 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "ExitIdle");
2077 },
2078 DEBUG_LOCATION);
2079 }
2080 calld->TryCheckResolution(/*was_queued=*/false);
2081 } else {
2082 // For all other batches, release the call combiner.
2083 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2084 gpr_log(GPR_INFO,
2085 "chand=%p calld=%p: saved batch, yielding call combiner", chand,
2086 calld);
2087 }
2088 GRPC_CALL_COMBINER_STOP(calld->call_combiner(),
2089 "batch does not include send_initial_metadata");
2090 }
2091 }
2092
SetPollent(grpc_call_element * elem,grpc_polling_entity * pollent)2093 void ClientChannel::FilterBasedCallData::SetPollent(
2094 grpc_call_element* elem, grpc_polling_entity* pollent) {
2095 auto* calld = static_cast<FilterBasedCallData*>(elem->call_data);
2096 calld->pollent_ = pollent;
2097 }
2098
GetBatchIndex(grpc_transport_stream_op_batch * batch)2099 size_t ClientChannel::FilterBasedCallData::GetBatchIndex(
2100 grpc_transport_stream_op_batch* batch) {
2101 // Note: It is important the send_initial_metadata be the first entry
2102 // here, since the code in CheckResolution() assumes it will be.
2103 if (batch->send_initial_metadata) return 0;
2104 if (batch->send_message) return 1;
2105 if (batch->send_trailing_metadata) return 2;
2106 if (batch->recv_initial_metadata) return 3;
2107 if (batch->recv_message) return 4;
2108 if (batch->recv_trailing_metadata) return 5;
2109 GPR_UNREACHABLE_CODE(return (size_t)-1);
2110 }
2111
2112 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesAdd(grpc_transport_stream_op_batch * batch)2113 void ClientChannel::FilterBasedCallData::PendingBatchesAdd(
2114 grpc_transport_stream_op_batch* batch) {
2115 const size_t idx = GetBatchIndex(batch);
2116 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2117 gpr_log(GPR_INFO,
2118 "chand=%p calld=%p: adding pending batch at index %" PRIuPTR,
2119 chand(), this, idx);
2120 }
2121 grpc_transport_stream_op_batch*& pending = pending_batches_[idx];
2122 GPR_ASSERT(pending == nullptr);
2123 pending = batch;
2124 }
2125
2126 // This is called via the call combiner, so access to calld is synchronized.
FailPendingBatchInCallCombiner(void * arg,grpc_error_handle error)2127 void ClientChannel::FilterBasedCallData::FailPendingBatchInCallCombiner(
2128 void* arg, grpc_error_handle error) {
2129 grpc_transport_stream_op_batch* batch =
2130 static_cast<grpc_transport_stream_op_batch*>(arg);
2131 auto* calld =
2132 static_cast<FilterBasedCallData*>(batch->handler_private.extra_arg);
2133 // Note: This will release the call combiner.
2134 grpc_transport_stream_op_batch_finish_with_failure(batch, error,
2135 calld->call_combiner());
2136 }
2137
2138 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesFail(grpc_error_handle error,YieldCallCombinerPredicate yield_call_combiner_predicate)2139 void ClientChannel::FilterBasedCallData::PendingBatchesFail(
2140 grpc_error_handle error,
2141 YieldCallCombinerPredicate yield_call_combiner_predicate) {
2142 GPR_ASSERT(!error.ok());
2143 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2144 size_t num_batches = 0;
2145 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2146 if (pending_batches_[i] != nullptr) ++num_batches;
2147 }
2148 gpr_log(GPR_INFO,
2149 "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
2150 chand(), this, num_batches, StatusToString(error).c_str());
2151 }
2152 CallCombinerClosureList closures;
2153 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2154 grpc_transport_stream_op_batch*& batch = pending_batches_[i];
2155 if (batch != nullptr) {
2156 batch->handler_private.extra_arg = this;
2157 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2158 FailPendingBatchInCallCombiner, batch,
2159 grpc_schedule_on_exec_ctx);
2160 closures.Add(&batch->handler_private.closure, error,
2161 "PendingBatchesFail");
2162 batch = nullptr;
2163 }
2164 }
2165 if (yield_call_combiner_predicate(closures)) {
2166 closures.RunClosures(call_combiner());
2167 } else {
2168 closures.RunClosuresWithoutYielding(call_combiner());
2169 }
2170 }
2171
2172 // This is called via the call combiner, so access to calld is synchronized.
ResumePendingBatchInCallCombiner(void * arg,grpc_error_handle)2173 void ClientChannel::FilterBasedCallData::ResumePendingBatchInCallCombiner(
2174 void* arg, grpc_error_handle /*ignored*/) {
2175 grpc_transport_stream_op_batch* batch =
2176 static_cast<grpc_transport_stream_op_batch*>(arg);
2177 auto* calld =
2178 static_cast<FilterBasedCallData*>(batch->handler_private.extra_arg);
2179 // Note: This will release the call combiner.
2180 calld->dynamic_call_->StartTransportStreamOpBatch(batch);
2181 }
2182
2183 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesResume()2184 void ClientChannel::FilterBasedCallData::PendingBatchesResume() {
2185 // Retries not enabled; send down batches as-is.
2186 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2187 size_t num_batches = 0;
2188 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2189 if (pending_batches_[i] != nullptr) ++num_batches;
2190 }
2191 gpr_log(GPR_INFO,
2192 "chand=%p calld=%p: starting %" PRIuPTR
2193 " pending batches on dynamic_call=%p",
2194 chand(), this, num_batches, dynamic_call_.get());
2195 }
2196 CallCombinerClosureList closures;
2197 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2198 grpc_transport_stream_op_batch*& batch = pending_batches_[i];
2199 if (batch != nullptr) {
2200 batch->handler_private.extra_arg = this;
2201 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2202 ResumePendingBatchInCallCombiner, batch, nullptr);
2203 closures.Add(&batch->handler_private.closure, absl::OkStatus(),
2204 "resuming pending batch from client channel call");
2205 batch = nullptr;
2206 }
2207 }
2208 // Note: This will release the call combiner.
2209 closures.RunClosures(call_combiner());
2210 }
2211
2212 // A class to handle the call combiner cancellation callback for a
2213 // queued pick.
2214 class ClientChannel::FilterBasedCallData::ResolverQueuedCallCanceller {
2215 public:
ResolverQueuedCallCanceller(FilterBasedCallData * calld)2216 explicit ResolverQueuedCallCanceller(FilterBasedCallData* calld)
2217 : calld_(calld) {
2218 GRPC_CALL_STACK_REF(calld->owning_call(), "ResolverQueuedCallCanceller");
2219 GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this,
2220 grpc_schedule_on_exec_ctx);
2221 calld->call_combiner()->SetNotifyOnCancel(&closure_);
2222 }
2223
2224 private:
CancelLocked(void * arg,grpc_error_handle error)2225 static void CancelLocked(void* arg, grpc_error_handle error) {
2226 auto* self = static_cast<ResolverQueuedCallCanceller*>(arg);
2227 auto* calld = self->calld_;
2228 auto* chand = calld->chand();
2229 {
2230 MutexLock lock(&chand->resolution_mu_);
2231 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2232 gpr_log(GPR_INFO,
2233 "chand=%p calld=%p: cancelling resolver queued pick: "
2234 "error=%s self=%p calld->resolver_pick_canceller=%p",
2235 chand, calld, StatusToString(error).c_str(), self,
2236 calld->resolver_call_canceller_);
2237 }
2238 if (calld->resolver_call_canceller_ == self && !error.ok()) {
2239 // Remove pick from list of queued picks.
2240 calld->RemoveCallFromResolverQueuedCallsLocked();
2241 chand->resolver_queued_calls_.erase(calld);
2242 // Fail pending batches on the call.
2243 calld->PendingBatchesFail(error,
2244 YieldCallCombinerIfPendingBatchesFound);
2245 }
2246 }
2247 GRPC_CALL_STACK_UNREF(calld->owning_call(), "ResolvingQueuedCallCanceller");
2248 delete self;
2249 }
2250
2251 FilterBasedCallData* calld_;
2252 grpc_closure closure_;
2253 };
2254
TryCheckResolution(bool was_queued)2255 void ClientChannel::FilterBasedCallData::TryCheckResolution(bool was_queued) {
2256 auto result = CheckResolution(was_queued);
2257 if (result.has_value()) {
2258 if (!result->ok()) {
2259 PendingBatchesFail(*result, YieldCallCombiner);
2260 return;
2261 }
2262 CreateDynamicCall();
2263 }
2264 }
2265
OnAddToQueueLocked()2266 void ClientChannel::FilterBasedCallData::OnAddToQueueLocked() {
2267 // Register call combiner cancellation callback.
2268 resolver_call_canceller_ = new ResolverQueuedCallCanceller(this);
2269 }
2270
RetryCheckResolutionLocked()2271 void ClientChannel::FilterBasedCallData::RetryCheckResolutionLocked() {
2272 // Lame the call combiner canceller.
2273 resolver_call_canceller_ = nullptr;
2274 // Do an async callback to resume call processing, so that we're not
2275 // doing it while holding the channel's resolution mutex.
2276 chand()->owning_stack_->EventEngine()->Run([this]() {
2277 ApplicationCallbackExecCtx application_exec_ctx;
2278 ExecCtx exec_ctx;
2279 TryCheckResolution(/*was_queued=*/true);
2280 });
2281 }
2282
CreateDynamicCall()2283 void ClientChannel::FilterBasedCallData::CreateDynamicCall() {
2284 DynamicFilters::Call::Args args = {dynamic_filters(), pollent_, path_,
2285 call_start_time_, deadline_, arena(),
2286 call_context_, call_combiner()};
2287 grpc_error_handle error;
2288 DynamicFilters* channel_stack = args.channel_stack.get();
2289 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2290 gpr_log(
2291 GPR_INFO,
2292 "chand=%p calld=%p: creating dynamic call stack on channel_stack=%p",
2293 chand(), this, channel_stack);
2294 }
2295 dynamic_call_ = channel_stack->CreateCall(std::move(args), &error);
2296 if (!error.ok()) {
2297 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2298 gpr_log(GPR_INFO,
2299 "chand=%p calld=%p: failed to create dynamic call: error=%s",
2300 chand(), this, StatusToString(error).c_str());
2301 }
2302 PendingBatchesFail(error, YieldCallCombiner);
2303 return;
2304 }
2305 PendingBatchesResume();
2306 }
2307
2308 void ClientChannel::FilterBasedCallData::
RecvTrailingMetadataReadyForConfigSelectorCommitCallback(void * arg,grpc_error_handle error)2309 RecvTrailingMetadataReadyForConfigSelectorCommitCallback(
2310 void* arg, grpc_error_handle error) {
2311 auto* calld = static_cast<FilterBasedCallData*>(arg);
2312 auto* chand = calld->chand();
2313 auto* service_config_call_data =
2314 static_cast<ClientChannelServiceConfigCallData*>(
2315 calld->call_context()[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
2316 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2317 gpr_log(GPR_INFO,
2318 "chand=%p calld=%p: got recv_trailing_metadata_ready: error=%s "
2319 "service_config_call_data=%p",
2320 chand, calld, StatusToString(error).c_str(),
2321 service_config_call_data);
2322 }
2323 if (service_config_call_data != nullptr) {
2324 service_config_call_data->Commit();
2325 }
2326 // Chain to original callback.
2327 Closure::Run(DEBUG_LOCATION, calld->original_recv_trailing_metadata_ready_,
2328 error);
2329 }
2330
2331 //
2332 // ClientChannel::LoadBalancedCall::LbCallState
2333 //
2334
2335 class ClientChannel::LoadBalancedCall::LbCallState
2336 : public ClientChannelLbCallState {
2337 public:
LbCallState(LoadBalancedCall * lb_call)2338 explicit LbCallState(LoadBalancedCall* lb_call) : lb_call_(lb_call) {}
2339
Alloc(size_t size)2340 void* Alloc(size_t size) override { return lb_call_->arena()->Alloc(size); }
2341
2342 // Internal API to allow first-party LB policies to access per-call
2343 // attributes set by the ConfigSelector.
2344 ServiceConfigCallData::CallAttributeInterface* GetCallAttribute(
2345 UniqueTypeName type) const override;
2346
2347 private:
2348 LoadBalancedCall* lb_call_;
2349 };
2350
2351 //
2352 // ClientChannel::LoadBalancedCall::Metadata
2353 //
2354
2355 class ClientChannel::LoadBalancedCall::Metadata
2356 : public LoadBalancingPolicy::MetadataInterface {
2357 public:
Metadata(grpc_metadata_batch * batch)2358 explicit Metadata(grpc_metadata_batch* batch) : batch_(batch) {}
2359
Add(absl::string_view key,absl::string_view value)2360 void Add(absl::string_view key, absl::string_view value) override {
2361 if (batch_ == nullptr) return;
2362 // Gross, egregious hack to support legacy grpclb behavior.
2363 // TODO(ctiller): Use a promise context for this once that plumbing is done.
2364 if (key == GrpcLbClientStatsMetadata::key()) {
2365 batch_->Set(
2366 GrpcLbClientStatsMetadata(),
2367 const_cast<GrpcLbClientStats*>(
2368 reinterpret_cast<const GrpcLbClientStats*>(value.data())));
2369 return;
2370 }
2371 batch_->Append(key, Slice::FromStaticString(value),
2372 [key](absl::string_view error, const Slice& value) {
2373 gpr_log(GPR_ERROR, "%s",
2374 absl::StrCat(error, " key:", key,
2375 " value:", value.as_string_view())
2376 .c_str());
2377 });
2378 }
2379
TestOnlyCopyToVector()2380 std::vector<std::pair<std::string, std::string>> TestOnlyCopyToVector()
2381 override {
2382 if (batch_ == nullptr) return {};
2383 Encoder encoder;
2384 batch_->Encode(&encoder);
2385 return encoder.Take();
2386 }
2387
Lookup(absl::string_view key,std::string * buffer) const2388 absl::optional<absl::string_view> Lookup(absl::string_view key,
2389 std::string* buffer) const override {
2390 if (batch_ == nullptr) return absl::nullopt;
2391 return batch_->GetStringValue(key, buffer);
2392 }
2393
2394 private:
2395 class Encoder {
2396 public:
Encode(const Slice & key,const Slice & value)2397 void Encode(const Slice& key, const Slice& value) {
2398 out_.emplace_back(std::string(key.as_string_view()),
2399 std::string(value.as_string_view()));
2400 }
2401
2402 template <class Which>
Encode(Which,const typename Which::ValueType & value)2403 void Encode(Which, const typename Which::ValueType& value) {
2404 auto value_slice = Which::Encode(value);
2405 out_.emplace_back(std::string(Which::key()),
2406 std::string(value_slice.as_string_view()));
2407 }
2408
Encode(GrpcTimeoutMetadata,const typename GrpcTimeoutMetadata::ValueType &)2409 void Encode(GrpcTimeoutMetadata,
2410 const typename GrpcTimeoutMetadata::ValueType&) {}
Encode(HttpPathMetadata,const Slice &)2411 void Encode(HttpPathMetadata, const Slice&) {}
Encode(HttpMethodMetadata,const typename HttpMethodMetadata::ValueType &)2412 void Encode(HttpMethodMetadata,
2413 const typename HttpMethodMetadata::ValueType&) {}
2414
Take()2415 std::vector<std::pair<std::string, std::string>> Take() {
2416 return std::move(out_);
2417 }
2418
2419 private:
2420 std::vector<std::pair<std::string, std::string>> out_;
2421 };
2422
2423 grpc_metadata_batch* batch_;
2424 };
2425
2426 //
2427 // ClientChannel::LoadBalancedCall::LbCallState
2428 //
2429
2430 ServiceConfigCallData::CallAttributeInterface*
GetCallAttribute(UniqueTypeName type) const2431 ClientChannel::LoadBalancedCall::LbCallState::GetCallAttribute(
2432 UniqueTypeName type) const {
2433 auto* service_config_call_data = static_cast<ServiceConfigCallData*>(
2434 lb_call_->call_context()[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
2435 return service_config_call_data->GetCallAttribute(type);
2436 }
2437
2438 //
2439 // ClientChannel::LoadBalancedCall::BackendMetricAccessor
2440 //
2441
2442 class ClientChannel::LoadBalancedCall::BackendMetricAccessor
2443 : public LoadBalancingPolicy::BackendMetricAccessor {
2444 public:
BackendMetricAccessor(LoadBalancedCall * lb_call,grpc_metadata_batch * recv_trailing_metadata)2445 BackendMetricAccessor(LoadBalancedCall* lb_call,
2446 grpc_metadata_batch* recv_trailing_metadata)
2447 : lb_call_(lb_call), recv_trailing_metadata_(recv_trailing_metadata) {}
2448
GetBackendMetricData()2449 const BackendMetricData* GetBackendMetricData() override {
2450 if (lb_call_->backend_metric_data_ == nullptr &&
2451 recv_trailing_metadata_ != nullptr) {
2452 if (const auto* md = recv_trailing_metadata_->get_pointer(
2453 EndpointLoadMetricsBinMetadata())) {
2454 BackendMetricAllocator allocator(lb_call_->arena());
2455 lb_call_->backend_metric_data_ =
2456 ParseBackendMetricData(md->as_string_view(), &allocator);
2457 }
2458 }
2459 return lb_call_->backend_metric_data_;
2460 }
2461
2462 private:
2463 class BackendMetricAllocator : public BackendMetricAllocatorInterface {
2464 public:
BackendMetricAllocator(Arena * arena)2465 explicit BackendMetricAllocator(Arena* arena) : arena_(arena) {}
2466
AllocateBackendMetricData()2467 BackendMetricData* AllocateBackendMetricData() override {
2468 return arena_->New<BackendMetricData>();
2469 }
2470
AllocateString(size_t size)2471 char* AllocateString(size_t size) override {
2472 return static_cast<char*>(arena_->Alloc(size));
2473 }
2474
2475 private:
2476 Arena* arena_;
2477 };
2478
2479 LoadBalancedCall* lb_call_;
2480 grpc_metadata_batch* recv_trailing_metadata_;
2481 };
2482
2483 //
2484 // ClientChannel::LoadBalancedCall
2485 //
2486
2487 namespace {
2488
CreateCallAttemptTracer(grpc_call_context_element * context,bool is_transparent_retry)2489 ClientCallTracer::CallAttemptTracer* CreateCallAttemptTracer(
2490 grpc_call_context_element* context, bool is_transparent_retry) {
2491 auto* call_tracer = static_cast<ClientCallTracer*>(
2492 context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value);
2493 if (call_tracer == nullptr) return nullptr;
2494 auto* tracer = call_tracer->StartNewAttempt(is_transparent_retry);
2495 context[GRPC_CONTEXT_CALL_TRACER].value = tracer;
2496 return tracer;
2497 }
2498
2499 } // namespace
2500
LoadBalancedCall(ClientChannel * chand,grpc_call_context_element * call_context,absl::AnyInvocable<void ()> on_commit,bool is_transparent_retry)2501 ClientChannel::LoadBalancedCall::LoadBalancedCall(
2502 ClientChannel* chand, grpc_call_context_element* call_context,
2503 absl::AnyInvocable<void()> on_commit, bool is_transparent_retry)
2504 : InternallyRefCounted(
2505 GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)
2506 ? "LoadBalancedCall"
2507 : nullptr),
2508 chand_(chand),
2509 on_commit_(std::move(on_commit)) {
2510 CreateCallAttemptTracer(call_context, is_transparent_retry);
2511 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2512 gpr_log(GPR_INFO, "chand=%p lb_call=%p: created", chand_, this);
2513 }
2514 }
2515
~LoadBalancedCall()2516 ClientChannel::LoadBalancedCall::~LoadBalancedCall() {
2517 if (backend_metric_data_ != nullptr) {
2518 backend_metric_data_->BackendMetricData::~BackendMetricData();
2519 }
2520 }
2521
Orphan()2522 void ClientChannel::LoadBalancedCall::Orphan() {
2523 // Compute latency and report it to the tracer.
2524 if (call_attempt_tracer() != nullptr) {
2525 gpr_timespec latency =
2526 gpr_cycle_counter_sub(gpr_get_cycle_counter(), lb_call_start_time_);
2527 call_attempt_tracer()->RecordEnd(latency);
2528 }
2529 Unref();
2530 }
2531
RecordCallCompletion(absl::Status status,grpc_metadata_batch * recv_trailing_metadata,grpc_transport_stream_stats * transport_stream_stats,absl::string_view peer_address)2532 void ClientChannel::LoadBalancedCall::RecordCallCompletion(
2533 absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
2534 grpc_transport_stream_stats* transport_stream_stats,
2535 absl::string_view peer_address) {
2536 // If we have a tracer, notify it.
2537 if (call_attempt_tracer() != nullptr) {
2538 call_attempt_tracer()->RecordReceivedTrailingMetadata(
2539 status, recv_trailing_metadata, transport_stream_stats);
2540 }
2541 // If the LB policy requested a callback for trailing metadata, invoke
2542 // the callback.
2543 if (lb_subchannel_call_tracker_ != nullptr) {
2544 Metadata trailing_metadata(recv_trailing_metadata);
2545 BackendMetricAccessor backend_metric_accessor(this, recv_trailing_metadata);
2546 LoadBalancingPolicy::SubchannelCallTrackerInterface::FinishArgs args = {
2547 peer_address, status, &trailing_metadata, &backend_metric_accessor};
2548 lb_subchannel_call_tracker_->Finish(args);
2549 lb_subchannel_call_tracker_.reset();
2550 }
2551 }
2552
RemoveCallFromLbQueuedCallsLocked()2553 void ClientChannel::LoadBalancedCall::RemoveCallFromLbQueuedCallsLocked() {
2554 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2555 gpr_log(GPR_INFO, "chand=%p lb_call=%p: removing from queued picks list",
2556 chand_, this);
2557 }
2558 // Remove pollset_set linkage.
2559 grpc_polling_entity_del_from_pollset_set(pollent(),
2560 chand_->interested_parties_);
2561 // Note: There's no need to actually remove the call from the queue
2562 // here, beacuse that will be done in either
2563 // LbQueuedCallCanceller::CancelLocked() or
2564 // in ClientChannel::UpdateStateAndPickerLocked().
2565 }
2566
AddCallToLbQueuedCallsLocked()2567 void ClientChannel::LoadBalancedCall::AddCallToLbQueuedCallsLocked() {
2568 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2569 gpr_log(GPR_INFO, "chand=%p lb_call=%p: adding to queued picks list",
2570 chand_, this);
2571 }
2572 // Add call's pollent to channel's interested_parties, so that I/O
2573 // can be done under the call's CQ.
2574 grpc_polling_entity_add_to_pollset_set(pollent(),
2575 chand_->interested_parties_);
2576 // Add to queue.
2577 chand_->lb_queued_calls_.insert(this);
2578 OnAddToQueueLocked();
2579 }
2580
PickSubchannel(bool was_queued)2581 absl::optional<absl::Status> ClientChannel::LoadBalancedCall::PickSubchannel(
2582 bool was_queued) {
2583 // We may accumulate multiple pickers here, because if a picker says
2584 // to queue the call, we check again to see if the picker has been
2585 // updated before we queue it.
2586 // We need to unref pickers in the WorkSerializer.
2587 std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>> pickers;
2588 auto cleanup = absl::MakeCleanup([&]() {
2589 chand_->work_serializer_->Run(
2590 [pickers = std::move(pickers)]() mutable {
2591 for (auto& picker : pickers) {
2592 picker.reset(DEBUG_LOCATION, "PickSubchannel");
2593 }
2594 },
2595 DEBUG_LOCATION);
2596 });
2597 // Grab mutex and take a ref to the picker.
2598 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2599 gpr_log(GPR_INFO, "chand=%p lb_call=%p: grabbing LB mutex to get picker",
2600 chand_, this);
2601 }
2602 {
2603 MutexLock lock(&chand_->lb_mu_);
2604 pickers.emplace_back(chand_->picker_);
2605 }
2606 while (true) {
2607 // Do pick.
2608 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2609 gpr_log(GPR_INFO, "chand=%p lb_call=%p: performing pick with picker=%p",
2610 chand_, this, pickers.back().get());
2611 }
2612 grpc_error_handle error;
2613 bool pick_complete = PickSubchannelImpl(pickers.back().get(), &error);
2614 if (!pick_complete) {
2615 MutexLock lock(&chand_->lb_mu_);
2616 // If picker has been swapped out since we grabbed it, try again.
2617 if (chand_->picker_ != pickers.back()) {
2618 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2619 gpr_log(GPR_INFO,
2620 "chand=%p lb_call=%p: pick not complete, but picker changed",
2621 chand_, this);
2622 }
2623 pickers.emplace_back(chand_->picker_);
2624 continue;
2625 }
2626 // Otherwise queue the pick to try again later when we get a new picker.
2627 AddCallToLbQueuedCallsLocked();
2628 return absl::nullopt;
2629 }
2630 // Pick is complete.
2631 // If it was queued, add a trace annotation.
2632 if (was_queued && call_attempt_tracer() != nullptr) {
2633 call_attempt_tracer()->RecordAnnotation("Delayed LB pick complete.");
2634 }
2635 // If the pick failed, fail the call.
2636 if (!error.ok()) {
2637 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2638 gpr_log(GPR_INFO,
2639 "chand=%p lb_call=%p: failed to pick subchannel: error=%s",
2640 chand_, this, StatusToString(error).c_str());
2641 }
2642 return error;
2643 }
2644 // Pick succeeded.
2645 Commit();
2646 return absl::OkStatus();
2647 }
2648 }
2649
PickSubchannelImpl(LoadBalancingPolicy::SubchannelPicker * picker,grpc_error_handle * error)2650 bool ClientChannel::LoadBalancedCall::PickSubchannelImpl(
2651 LoadBalancingPolicy::SubchannelPicker* picker, grpc_error_handle* error) {
2652 GPR_ASSERT(connected_subchannel_ == nullptr);
2653 // Perform LB pick.
2654 LoadBalancingPolicy::PickArgs pick_args;
2655 Slice* path = send_initial_metadata()->get_pointer(HttpPathMetadata());
2656 GPR_ASSERT(path != nullptr);
2657 pick_args.path = path->as_string_view();
2658 LbCallState lb_call_state(this);
2659 pick_args.call_state = &lb_call_state;
2660 Metadata initial_metadata(send_initial_metadata());
2661 pick_args.initial_metadata = &initial_metadata;
2662 auto result = picker->Pick(pick_args);
2663 return HandlePickResult<bool>(
2664 &result,
2665 // CompletePick
2666 [this](LoadBalancingPolicy::PickResult::Complete* complete_pick) {
2667 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2668 gpr_log(GPR_INFO,
2669 "chand=%p lb_call=%p: LB pick succeeded: subchannel=%p",
2670 chand_, this, complete_pick->subchannel.get());
2671 }
2672 GPR_ASSERT(complete_pick->subchannel != nullptr);
2673 // Grab a ref to the connected subchannel while we're still
2674 // holding the data plane mutex.
2675 SubchannelWrapper* subchannel =
2676 static_cast<SubchannelWrapper*>(complete_pick->subchannel.get());
2677 connected_subchannel_ = subchannel->connected_subchannel();
2678 // If the subchannel has no connected subchannel (e.g., if the
2679 // subchannel has moved out of state READY but the LB policy hasn't
2680 // yet seen that change and given us a new picker), then just
2681 // queue the pick. We'll try again as soon as we get a new picker.
2682 if (connected_subchannel_ == nullptr) {
2683 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2684 gpr_log(GPR_INFO,
2685 "chand=%p lb_call=%p: subchannel returned by LB picker "
2686 "has no connected subchannel; queueing pick",
2687 chand_, this);
2688 }
2689 return false;
2690 }
2691 lb_subchannel_call_tracker_ =
2692 std::move(complete_pick->subchannel_call_tracker);
2693 if (lb_subchannel_call_tracker_ != nullptr) {
2694 lb_subchannel_call_tracker_->Start();
2695 }
2696 return true;
2697 },
2698 // QueuePick
2699 [this](LoadBalancingPolicy::PickResult::Queue* /*queue_pick*/) {
2700 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2701 gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick queued", chand_,
2702 this);
2703 }
2704 return false;
2705 },
2706 // FailPick
2707 [this, &error](LoadBalancingPolicy::PickResult::Fail* fail_pick) {
2708 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2709 gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick failed: %s", chand_,
2710 this, fail_pick->status.ToString().c_str());
2711 }
2712 // If wait_for_ready is false, then the error indicates the RPC
2713 // attempt's final status.
2714 if (!send_initial_metadata()
2715 ->GetOrCreatePointer(WaitForReady())
2716 ->value) {
2717 *error = absl_status_to_grpc_error(MaybeRewriteIllegalStatusCode(
2718 std::move(fail_pick->status), "LB pick"));
2719 return true;
2720 }
2721 // If wait_for_ready is true, then queue to retry when we get a new
2722 // picker.
2723 return false;
2724 },
2725 // DropPick
2726 [this, &error](LoadBalancingPolicy::PickResult::Drop* drop_pick) {
2727 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2728 gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick dropped: %s", chand_,
2729 this, drop_pick->status.ToString().c_str());
2730 }
2731 *error = grpc_error_set_int(
2732 absl_status_to_grpc_error(MaybeRewriteIllegalStatusCode(
2733 std::move(drop_pick->status), "LB drop")),
2734 StatusIntProperty::kLbPolicyDrop, 1);
2735 return true;
2736 });
2737 }
2738
2739 //
2740 // ClientChannel::FilterBasedLoadBalancedCall
2741 //
2742
FilterBasedLoadBalancedCall(ClientChannel * chand,const grpc_call_element_args & args,grpc_polling_entity * pollent,grpc_closure * on_call_destruction_complete,absl::AnyInvocable<void ()> on_commit,bool is_transparent_retry)2743 ClientChannel::FilterBasedLoadBalancedCall::FilterBasedLoadBalancedCall(
2744 ClientChannel* chand, const grpc_call_element_args& args,
2745 grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete,
2746 absl::AnyInvocable<void()> on_commit, bool is_transparent_retry)
2747 : LoadBalancedCall(chand, args.context, std::move(on_commit),
2748 is_transparent_retry),
2749 deadline_(args.deadline),
2750 arena_(args.arena),
2751 call_context_(args.context),
2752 owning_call_(args.call_stack),
2753 call_combiner_(args.call_combiner),
2754 pollent_(pollent),
2755 on_call_destruction_complete_(on_call_destruction_complete) {}
2756
~FilterBasedLoadBalancedCall()2757 ClientChannel::FilterBasedLoadBalancedCall::~FilterBasedLoadBalancedCall() {
2758 // Make sure there are no remaining pending batches.
2759 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2760 GPR_ASSERT(pending_batches_[i] == nullptr);
2761 }
2762 if (on_call_destruction_complete_ != nullptr) {
2763 ExecCtx::Run(DEBUG_LOCATION, on_call_destruction_complete_,
2764 absl::OkStatus());
2765 }
2766 }
2767
Orphan()2768 void ClientChannel::FilterBasedLoadBalancedCall::Orphan() {
2769 // If the recv_trailing_metadata op was never started, then notify
2770 // about call completion here, as best we can. We assume status
2771 // CANCELLED in this case.
2772 if (recv_trailing_metadata_ == nullptr) {
2773 RecordCallCompletion(absl::CancelledError("call cancelled"), nullptr,
2774 nullptr, "");
2775 }
2776 // Delegate to parent.
2777 LoadBalancedCall::Orphan();
2778 }
2779
GetBatchIndex(grpc_transport_stream_op_batch * batch)2780 size_t ClientChannel::FilterBasedLoadBalancedCall::GetBatchIndex(
2781 grpc_transport_stream_op_batch* batch) {
2782 // Note: It is important the send_initial_metadata be the first entry
2783 // here, since the code in PickSubchannelImpl() assumes it will be.
2784 if (batch->send_initial_metadata) return 0;
2785 if (batch->send_message) return 1;
2786 if (batch->send_trailing_metadata) return 2;
2787 if (batch->recv_initial_metadata) return 3;
2788 if (batch->recv_message) return 4;
2789 if (batch->recv_trailing_metadata) return 5;
2790 GPR_UNREACHABLE_CODE(return (size_t)-1);
2791 }
2792
2793 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesAdd(grpc_transport_stream_op_batch * batch)2794 void ClientChannel::FilterBasedLoadBalancedCall::PendingBatchesAdd(
2795 grpc_transport_stream_op_batch* batch) {
2796 const size_t idx = GetBatchIndex(batch);
2797 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2798 gpr_log(GPR_INFO,
2799 "chand=%p lb_call=%p: adding pending batch at index %" PRIuPTR,
2800 chand(), this, idx);
2801 }
2802 GPR_ASSERT(pending_batches_[idx] == nullptr);
2803 pending_batches_[idx] = batch;
2804 }
2805
2806 // This is called via the call combiner, so access to calld is synchronized.
FailPendingBatchInCallCombiner(void * arg,grpc_error_handle error)2807 void ClientChannel::FilterBasedLoadBalancedCall::FailPendingBatchInCallCombiner(
2808 void* arg, grpc_error_handle error) {
2809 grpc_transport_stream_op_batch* batch =
2810 static_cast<grpc_transport_stream_op_batch*>(arg);
2811 auto* self = static_cast<FilterBasedLoadBalancedCall*>(
2812 batch->handler_private.extra_arg);
2813 // Note: This will release the call combiner.
2814 grpc_transport_stream_op_batch_finish_with_failure(batch, error,
2815 self->call_combiner_);
2816 }
2817
2818 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesFail(grpc_error_handle error,YieldCallCombinerPredicate yield_call_combiner_predicate)2819 void ClientChannel::FilterBasedLoadBalancedCall::PendingBatchesFail(
2820 grpc_error_handle error,
2821 YieldCallCombinerPredicate yield_call_combiner_predicate) {
2822 GPR_ASSERT(!error.ok());
2823 failure_error_ = error;
2824 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2825 size_t num_batches = 0;
2826 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2827 if (pending_batches_[i] != nullptr) ++num_batches;
2828 }
2829 gpr_log(GPR_INFO,
2830 "chand=%p lb_call=%p: failing %" PRIuPTR " pending batches: %s",
2831 chand(), this, num_batches, StatusToString(error).c_str());
2832 }
2833 CallCombinerClosureList closures;
2834 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2835 grpc_transport_stream_op_batch*& batch = pending_batches_[i];
2836 if (batch != nullptr) {
2837 batch->handler_private.extra_arg = this;
2838 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2839 FailPendingBatchInCallCombiner, batch,
2840 grpc_schedule_on_exec_ctx);
2841 closures.Add(&batch->handler_private.closure, error,
2842 "PendingBatchesFail");
2843 batch = nullptr;
2844 }
2845 }
2846 if (yield_call_combiner_predicate(closures)) {
2847 closures.RunClosures(call_combiner_);
2848 } else {
2849 closures.RunClosuresWithoutYielding(call_combiner_);
2850 }
2851 }
2852
2853 // This is called via the call combiner, so access to calld is synchronized.
2854 void ClientChannel::FilterBasedLoadBalancedCall::
ResumePendingBatchInCallCombiner(void * arg,grpc_error_handle)2855 ResumePendingBatchInCallCombiner(void* arg, grpc_error_handle /*ignored*/) {
2856 grpc_transport_stream_op_batch* batch =
2857 static_cast<grpc_transport_stream_op_batch*>(arg);
2858 SubchannelCall* subchannel_call =
2859 static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
2860 // Note: This will release the call combiner.
2861 subchannel_call->StartTransportStreamOpBatch(batch);
2862 }
2863
2864 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesResume()2865 void ClientChannel::FilterBasedLoadBalancedCall::PendingBatchesResume() {
2866 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2867 size_t num_batches = 0;
2868 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2869 if (pending_batches_[i] != nullptr) ++num_batches;
2870 }
2871 gpr_log(GPR_INFO,
2872 "chand=%p lb_call=%p: starting %" PRIuPTR
2873 " pending batches on subchannel_call=%p",
2874 chand(), this, num_batches, subchannel_call_.get());
2875 }
2876 CallCombinerClosureList closures;
2877 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2878 grpc_transport_stream_op_batch*& batch = pending_batches_[i];
2879 if (batch != nullptr) {
2880 batch->handler_private.extra_arg = subchannel_call_.get();
2881 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2882 ResumePendingBatchInCallCombiner, batch,
2883 grpc_schedule_on_exec_ctx);
2884 closures.Add(&batch->handler_private.closure, absl::OkStatus(),
2885 "resuming pending batch from LB call");
2886 batch = nullptr;
2887 }
2888 }
2889 // Note: This will release the call combiner.
2890 closures.RunClosures(call_combiner_);
2891 }
2892
StartTransportStreamOpBatch(grpc_transport_stream_op_batch * batch)2893 void ClientChannel::FilterBasedLoadBalancedCall::StartTransportStreamOpBatch(
2894 grpc_transport_stream_op_batch* batch) {
2895 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace) ||
2896 GRPC_TRACE_FLAG_ENABLED(grpc_trace_channel)) {
2897 gpr_log(GPR_INFO,
2898 "chand=%p lb_call=%p: batch started from above: %s, "
2899 "call_attempt_tracer()=%p",
2900 chand(), this,
2901 grpc_transport_stream_op_batch_string(batch, false).c_str(),
2902 call_attempt_tracer());
2903 }
2904 // Handle call tracing.
2905 if (call_attempt_tracer() != nullptr) {
2906 // Record send ops in tracer.
2907 if (batch->cancel_stream) {
2908 call_attempt_tracer()->RecordCancel(
2909 batch->payload->cancel_stream.cancel_error);
2910 }
2911 if (batch->send_initial_metadata) {
2912 call_attempt_tracer()->RecordSendInitialMetadata(
2913 batch->payload->send_initial_metadata.send_initial_metadata);
2914 }
2915 if (batch->send_trailing_metadata) {
2916 call_attempt_tracer()->RecordSendTrailingMetadata(
2917 batch->payload->send_trailing_metadata.send_trailing_metadata);
2918 }
2919 // Intercept recv ops.
2920 if (batch->recv_initial_metadata) {
2921 recv_initial_metadata_ =
2922 batch->payload->recv_initial_metadata.recv_initial_metadata;
2923 original_recv_initial_metadata_ready_ =
2924 batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
2925 GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
2926 this, nullptr);
2927 batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
2928 &recv_initial_metadata_ready_;
2929 }
2930 }
2931 // Intercept recv_trailing_metadata even if there is no call tracer,
2932 // since we may need to notify the LB policy about trailing metadata.
2933 if (batch->recv_trailing_metadata) {
2934 recv_trailing_metadata_ =
2935 batch->payload->recv_trailing_metadata.recv_trailing_metadata;
2936 transport_stream_stats_ =
2937 batch->payload->recv_trailing_metadata.collect_stats;
2938 original_recv_trailing_metadata_ready_ =
2939 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
2940 GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady,
2941 this, nullptr);
2942 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
2943 &recv_trailing_metadata_ready_;
2944 }
2945 // If we've already gotten a subchannel call, pass the batch down to it.
2946 // Note that once we have picked a subchannel, we do not need to acquire
2947 // the channel's data plane mutex, which is more efficient (especially for
2948 // streaming calls).
2949 if (subchannel_call_ != nullptr) {
2950 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2951 gpr_log(GPR_INFO,
2952 "chand=%p lb_call=%p: starting batch on subchannel_call=%p",
2953 chand(), this, subchannel_call_.get());
2954 }
2955 subchannel_call_->StartTransportStreamOpBatch(batch);
2956 return;
2957 }
2958 // We do not yet have a subchannel call.
2959 //
2960 // If we've previously been cancelled, immediately fail any new batches.
2961 if (GPR_UNLIKELY(!cancel_error_.ok())) {
2962 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2963 gpr_log(GPR_INFO, "chand=%p lb_call=%p: failing batch with error: %s",
2964 chand(), this, StatusToString(cancel_error_).c_str());
2965 }
2966 // Note: This will release the call combiner.
2967 grpc_transport_stream_op_batch_finish_with_failure(batch, cancel_error_,
2968 call_combiner_);
2969 return;
2970 }
2971 // Handle cancellation.
2972 if (GPR_UNLIKELY(batch->cancel_stream)) {
2973 // Stash a copy of cancel_error in our call data, so that we can use
2974 // it for subsequent operations. This ensures that if the call is
2975 // cancelled before any batches are passed down (e.g., if the deadline
2976 // is in the past when the call starts), we can return the right
2977 // error to the caller when the first batch does get passed down.
2978 cancel_error_ = batch->payload->cancel_stream.cancel_error;
2979 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2980 gpr_log(GPR_INFO, "chand=%p lb_call=%p: recording cancel_error=%s",
2981 chand(), this, StatusToString(cancel_error_).c_str());
2982 }
2983 // Fail all pending batches.
2984 PendingBatchesFail(cancel_error_, NoYieldCallCombiner);
2985 // Note: This will release the call combiner.
2986 grpc_transport_stream_op_batch_finish_with_failure(batch, cancel_error_,
2987 call_combiner_);
2988 return;
2989 }
2990 // Add the batch to the pending list.
2991 PendingBatchesAdd(batch);
2992 // For batches containing a send_initial_metadata op, acquire the
2993 // channel's LB mutex to pick a subchannel.
2994 if (GPR_LIKELY(batch->send_initial_metadata)) {
2995 TryPick(/*was_queued=*/false);
2996 } else {
2997 // For all other batches, release the call combiner.
2998 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2999 gpr_log(GPR_INFO,
3000 "chand=%p lb_call=%p: saved batch, yielding call combiner",
3001 chand(), this);
3002 }
3003 GRPC_CALL_COMBINER_STOP(call_combiner_,
3004 "batch does not include send_initial_metadata");
3005 }
3006 }
3007
RecvInitialMetadataReady(void * arg,grpc_error_handle error)3008 void ClientChannel::FilterBasedLoadBalancedCall::RecvInitialMetadataReady(
3009 void* arg, grpc_error_handle error) {
3010 auto* self = static_cast<FilterBasedLoadBalancedCall*>(arg);
3011 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3012 gpr_log(GPR_INFO,
3013 "chand=%p lb_call=%p: got recv_initial_metadata_ready: error=%s",
3014 self->chand(), self, StatusToString(error).c_str());
3015 }
3016 if (error.ok()) {
3017 // recv_initial_metadata_flags is not populated for clients
3018 self->call_attempt_tracer()->RecordReceivedInitialMetadata(
3019 self->recv_initial_metadata_);
3020 auto* peer_string = self->recv_initial_metadata_->get_pointer(PeerString());
3021 if (peer_string != nullptr) self->peer_string_ = peer_string->Ref();
3022 }
3023 Closure::Run(DEBUG_LOCATION, self->original_recv_initial_metadata_ready_,
3024 error);
3025 }
3026
RecvTrailingMetadataReady(void * arg,grpc_error_handle error)3027 void ClientChannel::FilterBasedLoadBalancedCall::RecvTrailingMetadataReady(
3028 void* arg, grpc_error_handle error) {
3029 auto* self = static_cast<FilterBasedLoadBalancedCall*>(arg);
3030 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3031 gpr_log(GPR_INFO,
3032 "chand=%p lb_call=%p: got recv_trailing_metadata_ready: error=%s "
3033 "call_attempt_tracer()=%p lb_subchannel_call_tracker_=%p "
3034 "failure_error_=%s",
3035 self->chand(), self, StatusToString(error).c_str(),
3036 self->call_attempt_tracer(), self->lb_subchannel_call_tracker(),
3037 StatusToString(self->failure_error_).c_str());
3038 }
3039 // Check if we have a tracer or an LB callback to invoke.
3040 if (self->call_attempt_tracer() != nullptr ||
3041 self->lb_subchannel_call_tracker() != nullptr) {
3042 // Get the call's status.
3043 absl::Status status;
3044 if (!error.ok()) {
3045 // Get status from error.
3046 grpc_status_code code;
3047 std::string message;
3048 grpc_error_get_status(error, self->deadline_, &code, &message,
3049 /*http_error=*/nullptr, /*error_string=*/nullptr);
3050 status = absl::Status(static_cast<absl::StatusCode>(code), message);
3051 } else {
3052 // Get status from headers.
3053 const auto& md = *self->recv_trailing_metadata_;
3054 grpc_status_code code =
3055 md.get(GrpcStatusMetadata()).value_or(GRPC_STATUS_UNKNOWN);
3056 if (code != GRPC_STATUS_OK) {
3057 absl::string_view message;
3058 if (const auto* grpc_message = md.get_pointer(GrpcMessageMetadata())) {
3059 message = grpc_message->as_string_view();
3060 }
3061 status = absl::Status(static_cast<absl::StatusCode>(code), message);
3062 }
3063 }
3064 absl::string_view peer_string;
3065 if (self->peer_string_.has_value()) {
3066 peer_string = self->peer_string_->as_string_view();
3067 }
3068 self->RecordCallCompletion(status, self->recv_trailing_metadata_,
3069 self->transport_stream_stats_, peer_string);
3070 }
3071 // Chain to original callback.
3072 if (!self->failure_error_.ok()) {
3073 error = self->failure_error_;
3074 self->failure_error_ = absl::OkStatus();
3075 }
3076 Closure::Run(DEBUG_LOCATION, self->original_recv_trailing_metadata_ready_,
3077 error);
3078 }
3079
3080 // A class to handle the call combiner cancellation callback for a
3081 // queued pick.
3082 // TODO(roth): When we implement hedging support, we won't be able to
3083 // register a call combiner cancellation closure for each LB pick,
3084 // because there may be multiple LB picks happening in parallel.
3085 // Instead, we will probably need to maintain a list in the CallData
3086 // object of pending LB picks to be cancelled when the closure runs.
3087 class ClientChannel::FilterBasedLoadBalancedCall::LbQueuedCallCanceller {
3088 public:
LbQueuedCallCanceller(RefCountedPtr<FilterBasedLoadBalancedCall> lb_call)3089 explicit LbQueuedCallCanceller(
3090 RefCountedPtr<FilterBasedLoadBalancedCall> lb_call)
3091 : lb_call_(std::move(lb_call)) {
3092 GRPC_CALL_STACK_REF(lb_call_->owning_call_, "LbQueuedCallCanceller");
3093 GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this, nullptr);
3094 lb_call_->call_combiner_->SetNotifyOnCancel(&closure_);
3095 }
3096
3097 private:
CancelLocked(void * arg,grpc_error_handle error)3098 static void CancelLocked(void* arg, grpc_error_handle error) {
3099 auto* self = static_cast<LbQueuedCallCanceller*>(arg);
3100 auto* lb_call = self->lb_call_.get();
3101 auto* chand = lb_call->chand();
3102 {
3103 MutexLock lock(&chand->lb_mu_);
3104 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3105 gpr_log(GPR_INFO,
3106 "chand=%p lb_call=%p: cancelling queued pick: "
3107 "error=%s self=%p calld->pick_canceller=%p",
3108 chand, lb_call, StatusToString(error).c_str(), self,
3109 lb_call->lb_call_canceller_);
3110 }
3111 if (lb_call->lb_call_canceller_ == self && !error.ok()) {
3112 lb_call->Commit();
3113 // Remove pick from list of queued picks.
3114 lb_call->RemoveCallFromLbQueuedCallsLocked();
3115 // Remove from queued picks list.
3116 chand->lb_queued_calls_.erase(lb_call);
3117 // Fail pending batches on the call.
3118 lb_call->PendingBatchesFail(error,
3119 YieldCallCombinerIfPendingBatchesFound);
3120 }
3121 }
3122 // Unref lb_call before unreffing the call stack, since unreffing
3123 // the call stack may destroy the arena in which lb_call is allocated.
3124 auto* owning_call = lb_call->owning_call_;
3125 self->lb_call_.reset();
3126 GRPC_CALL_STACK_UNREF(owning_call, "LbQueuedCallCanceller");
3127 delete self;
3128 }
3129
3130 RefCountedPtr<FilterBasedLoadBalancedCall> lb_call_;
3131 grpc_closure closure_;
3132 };
3133
TryPick(bool was_queued)3134 void ClientChannel::FilterBasedLoadBalancedCall::TryPick(bool was_queued) {
3135 auto result = PickSubchannel(was_queued);
3136 if (result.has_value()) {
3137 if (!result->ok()) {
3138 PendingBatchesFail(*result, YieldCallCombiner);
3139 return;
3140 }
3141 CreateSubchannelCall();
3142 }
3143 }
3144
OnAddToQueueLocked()3145 void ClientChannel::FilterBasedLoadBalancedCall::OnAddToQueueLocked() {
3146 // Register call combiner cancellation callback.
3147 lb_call_canceller_ = new LbQueuedCallCanceller(Ref());
3148 }
3149
RetryPickLocked()3150 void ClientChannel::FilterBasedLoadBalancedCall::RetryPickLocked() {
3151 // Lame the call combiner canceller.
3152 lb_call_canceller_ = nullptr;
3153 // Do an async callback to resume call processing, so that we're not
3154 // doing it while holding the channel's LB mutex.
3155 // TODO(roth): We should really be using EventEngine::Run() here
3156 // instead of ExecCtx::Run(). Unfortunately, doing that seems to cause
3157 // a flaky TSAN failure for reasons that I do not fully understand.
3158 // However, given that we are working toward eliminating this code as
3159 // part of the promise conversion, it doesn't seem worth further
3160 // investigation right now.
3161 ExecCtx::Run(DEBUG_LOCATION, NewClosure([this](grpc_error_handle) {
3162 // If there are a lot of queued calls here, resuming them
3163 // all may cause us to stay inside C-core for a long period
3164 // of time. All of that work would be done using the same
3165 // ExecCtx instance and therefore the same cached value of
3166 // "now". The longer it takes to finish all of this work
3167 // and exit from C-core, the more stale the cached value of
3168 // "now" may become. This can cause problems whereby (e.g.)
3169 // we calculate a timer deadline based on the stale value,
3170 // which results in the timer firing too early. To avoid
3171 // this, we invalidate the cached value for each call we
3172 // process.
3173 ExecCtx::Get()->InvalidateNow();
3174 TryPick(/*was_queued=*/true);
3175 }),
3176 absl::OkStatus());
3177 }
3178
CreateSubchannelCall()3179 void ClientChannel::FilterBasedLoadBalancedCall::CreateSubchannelCall() {
3180 Slice* path = send_initial_metadata()->get_pointer(HttpPathMetadata());
3181 GPR_ASSERT(path != nullptr);
3182 SubchannelCall::Args call_args = {
3183 connected_subchannel()->Ref(), pollent_, path->Ref(), /*start_time=*/0,
3184 deadline_, arena_,
3185 // TODO(roth): When we implement hedging support, we will probably
3186 // need to use a separate call context for each subchannel call.
3187 call_context_, call_combiner_};
3188 grpc_error_handle error;
3189 subchannel_call_ = SubchannelCall::Create(std::move(call_args), &error);
3190 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3191 gpr_log(GPR_INFO,
3192 "chand=%p lb_call=%p: create subchannel_call=%p: error=%s", chand(),
3193 this, subchannel_call_.get(), StatusToString(error).c_str());
3194 }
3195 if (on_call_destruction_complete_ != nullptr) {
3196 subchannel_call_->SetAfterCallStackDestroy(on_call_destruction_complete_);
3197 on_call_destruction_complete_ = nullptr;
3198 }
3199 if (GPR_UNLIKELY(!error.ok())) {
3200 PendingBatchesFail(error, YieldCallCombiner);
3201 } else {
3202 PendingBatchesResume();
3203 }
3204 }
3205
3206 } // namespace grpc_core
3207