1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "src/core/ext/filters/client_channel/client_channel.h"
20 
21 #include <inttypes.h>
22 #include <limits.h>
23 
24 #include <algorithm>
25 #include <functional>
26 #include <new>
27 #include <type_traits>
28 #include <utility>
29 #include <vector>
30 
31 #include "absl/cleanup/cleanup.h"
32 #include "absl/status/status.h"
33 #include "absl/status/statusor.h"
34 #include "absl/strings/cord.h"
35 #include "absl/strings/numbers.h"
36 #include "absl/strings/str_cat.h"
37 #include "absl/strings/str_join.h"
38 #include "absl/strings/string_view.h"
39 #include "absl/types/optional.h"
40 #include "absl/types/variant.h"
41 
42 #include <grpc/event_engine/event_engine.h>
43 #include <grpc/slice.h>
44 #include <grpc/status.h>
45 #include <grpc/support/json.h>
46 #include <grpc/support/log.h>
47 #include <grpc/support/string_util.h>
48 #include <grpc/support/time.h>
49 
50 #include "src/core/ext/filters/client_channel/backend_metric.h"
51 #include "src/core/ext/filters/client_channel/backup_poller.h"
52 #include "src/core/ext/filters/client_channel/client_channel_channelz.h"
53 #include "src/core/ext/filters/client_channel/client_channel_internal.h"
54 #include "src/core/ext/filters/client_channel/client_channel_service_config.h"
55 #include "src/core/ext/filters/client_channel/config_selector.h"
56 #include "src/core/ext/filters/client_channel/dynamic_filters.h"
57 #include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
58 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
59 #include "src/core/ext/filters/client_channel/local_subchannel_pool.h"
60 #include "src/core/ext/filters/client_channel/retry_filter.h"
61 #include "src/core/ext/filters/client_channel/subchannel.h"
62 #include "src/core/ext/filters/client_channel/subchannel_interface_internal.h"
63 #include "src/core/ext/filters/deadline/deadline_filter.h"
64 #include "src/core/lib/channel/channel_args.h"
65 #include "src/core/lib/channel/channel_stack.h"
66 #include "src/core/lib/channel/channel_trace.h"
67 #include "src/core/lib/channel/status_util.h"
68 #include "src/core/lib/config/core_configuration.h"
69 #include "src/core/lib/debug/trace.h"
70 #include "src/core/lib/gpr/useful.h"
71 #include "src/core/lib/gprpp/debug_location.h"
72 #include "src/core/lib/gprpp/status_helper.h"
73 #include "src/core/lib/gprpp/sync.h"
74 #include "src/core/lib/gprpp/unique_type_name.h"
75 #include "src/core/lib/gprpp/work_serializer.h"
76 #include "src/core/lib/handshaker/proxy_mapper_registry.h"
77 #include "src/core/lib/iomgr/exec_ctx.h"
78 #include "src/core/lib/iomgr/polling_entity.h"
79 #include "src/core/lib/iomgr/pollset_set.h"
80 #include "src/core/lib/json/json.h"
81 #include "src/core/lib/load_balancing/lb_policy_registry.h"
82 #include "src/core/lib/load_balancing/subchannel_interface.h"
83 #include "src/core/lib/resolver/resolver_registry.h"
84 #include "src/core/lib/resolver/server_address.h"
85 #include "src/core/lib/service_config/service_config_call_data.h"
86 #include "src/core/lib/service_config/service_config_impl.h"
87 #include "src/core/lib/slice/slice.h"
88 #include "src/core/lib/slice/slice_internal.h"
89 #include "src/core/lib/surface/channel.h"
90 #include "src/core/lib/transport/connectivity_state.h"
91 #include "src/core/lib/transport/error_utils.h"
92 #include "src/core/lib/transport/metadata_batch.h"
93 
94 //
95 // Client channel filter
96 //
97 
98 namespace grpc_core {
99 
100 using internal::ClientChannelMethodParsedConfig;
101 
102 TraceFlag grpc_client_channel_trace(false, "client_channel");
103 TraceFlag grpc_client_channel_call_trace(false, "client_channel_call");
104 TraceFlag grpc_client_channel_lb_call_trace(false, "client_channel_lb_call");
105 
106 //
107 // ClientChannel::CallData definition
108 //
109 
110 class ClientChannel::CallData {
111  public:
112   // Removes the call from the channel's list of calls queued
113   // for name resolution.
114   void RemoveCallFromResolverQueuedCallsLocked()
115       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_);
116 
117   // Called by the channel for each queued call when a new resolution
118   // result becomes available.
119   virtual void RetryCheckResolutionLocked()
120       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_) = 0;
121 
dynamic_filters() const122   RefCountedPtr<DynamicFilters> dynamic_filters() const {
123     return dynamic_filters_;
124   }
125 
126  protected:
127   CallData() = default;
128   virtual ~CallData() = default;
129 
130   // Checks whether a resolver result is available.  The following
131   // outcomes are possible:
132   // - No resolver result is available yet.  The call will be queued and
133   //   absl::nullopt will be returned.  Later, when a resolver result
134   //   becomes available, RetryCheckResolutionLocked() will be called.
135   // - The resolver has returned a transient failure.  If the call is
136   //   not wait_for_ready, a non-OK status will be returned.  (If the
137   //   call *is* wait_for_ready, it will be queued instead.)
138   // - There is a valid resolver result.  The service config will be
139   //   stored in the call context and an OK status will be returned.
140   absl::optional<absl::Status> CheckResolution(bool was_queued);
141 
142  private:
143   // Accessors for data stored in the subclass.
144   virtual ClientChannel* chand() const = 0;
145   virtual Arena* arena() const = 0;
146   virtual grpc_polling_entity* pollent() const = 0;
147   virtual grpc_metadata_batch* send_initial_metadata() = 0;
148   virtual grpc_call_context_element* call_context() const = 0;
149 
150   // Helper function for CheckResolution().  Returns true if the call
151   // can continue (i.e., there is a valid resolution result, or there is
152   // an invalid resolution result but the call is not wait_for_ready).
153   bool CheckResolutionLocked(
154       absl::StatusOr<RefCountedPtr<ConfigSelector>>* config_selector)
155       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_);
156 
157   // Adds the call to the channel's list of calls queued for name resolution.
158   void AddCallToResolverQueuedCallsLocked()
159       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_);
160 
161   // Called when adding the call to the resolver queue.
OnAddToQueueLocked()162   virtual void OnAddToQueueLocked()
163       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_) {}
164 
165   // Applies service config to the call.  Must be invoked once we know
166   // that the resolver has returned results to the channel.
167   // If an error is returned, the error indicates the status with which
168   // the call should be failed.
169   grpc_error_handle ApplyServiceConfigToCallLocked(
170       const absl::StatusOr<RefCountedPtr<ConfigSelector>>& config_selector);
171 
172   // Called to reset the deadline based on the service config obtained
173   // from the resolver.
174   virtual void ResetDeadline(Duration timeout) = 0;
175 
176   RefCountedPtr<DynamicFilters> dynamic_filters_;
177 };
178 
179 class ClientChannel::FilterBasedCallData : public ClientChannel::CallData {
180  public:
181   static grpc_error_handle Init(grpc_call_element* elem,
182                                 const grpc_call_element_args* args);
183   static void Destroy(grpc_call_element* elem,
184                       const grpc_call_final_info* final_info,
185                       grpc_closure* then_schedule_closure);
186   static void StartTransportStreamOpBatch(
187       grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
188   static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent);
189 
190  private:
191   class ResolverQueuedCallCanceller;
192 
193   FilterBasedCallData(grpc_call_element* elem,
194                       const grpc_call_element_args& args);
195   ~FilterBasedCallData() override;
196 
elem() const197   grpc_call_element* elem() const { return deadline_state_.elem; }
owning_call() const198   grpc_call_stack* owning_call() const { return deadline_state_.call_stack; }
call_combiner() const199   CallCombiner* call_combiner() const { return deadline_state_.call_combiner; }
200 
chand() const201   ClientChannel* chand() const override {
202     return static_cast<ClientChannel*>(elem()->channel_data);
203   }
arena() const204   Arena* arena() const override { return deadline_state_.arena; }
pollent() const205   grpc_polling_entity* pollent() const override { return pollent_; }
send_initial_metadata()206   grpc_metadata_batch* send_initial_metadata() override {
207     return pending_batches_[0]
208         ->payload->send_initial_metadata.send_initial_metadata;
209   }
call_context() const210   grpc_call_context_element* call_context() const override {
211     return call_context_;
212   }
213 
214   // Returns the index into pending_batches_ to be used for batch.
215   static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
216   void PendingBatchesAdd(grpc_transport_stream_op_batch* batch);
217   static void FailPendingBatchInCallCombiner(void* arg,
218                                              grpc_error_handle error);
219   // A predicate type and some useful implementations for PendingBatchesFail().
220   typedef bool (*YieldCallCombinerPredicate)(
221       const CallCombinerClosureList& closures);
YieldCallCombiner(const CallCombinerClosureList &)222   static bool YieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
223     return true;
224   }
NoYieldCallCombiner(const CallCombinerClosureList &)225   static bool NoYieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
226     return false;
227   }
YieldCallCombinerIfPendingBatchesFound(const CallCombinerClosureList & closures)228   static bool YieldCallCombinerIfPendingBatchesFound(
229       const CallCombinerClosureList& closures) {
230     return closures.size() > 0;
231   }
232   // Fails all pending batches.
233   // If yield_call_combiner_predicate returns true, assumes responsibility for
234   // yielding the call combiner.
235   void PendingBatchesFail(
236       grpc_error_handle error,
237       YieldCallCombinerPredicate yield_call_combiner_predicate);
238   static void ResumePendingBatchInCallCombiner(void* arg,
239                                                grpc_error_handle ignored);
240   // Resumes all pending batches on dynamic_call_.
241   void PendingBatchesResume();
242 
243   // Called to check for a resolution result, both when the call is
244   // initially started and when it is queued and the channel gets a new
245   // resolution result.
246   void TryCheckResolution(bool was_queued);
247 
248   void OnAddToQueueLocked() override
249       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_);
250 
251   void RetryCheckResolutionLocked() override
252       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_);
253 
ResetDeadline(Duration timeout)254   void ResetDeadline(Duration timeout) override {
255     const Timestamp per_method_deadline =
256         Timestamp::FromCycleCounterRoundUp(call_start_time_) + timeout;
257     if (per_method_deadline < deadline_) {
258       deadline_ = per_method_deadline;
259       grpc_deadline_state_reset(&deadline_state_, deadline_);
260     }
261   }
262 
263   void CreateDynamicCall();
264 
265   static void RecvTrailingMetadataReadyForConfigSelectorCommitCallback(
266       void* arg, grpc_error_handle error);
267 
268   grpc_slice path_;  // Request path.
269   grpc_call_context_element* call_context_;
270   gpr_cycle_counter call_start_time_;
271   Timestamp deadline_;
272 
273   // State for handling deadlines.
274   grpc_deadline_state deadline_state_;
275 
276   grpc_polling_entity* pollent_ = nullptr;
277 
278   // Accessed while holding ClientChannel::resolution_mu_.
279   ResolverQueuedCallCanceller* resolver_call_canceller_
280       ABSL_GUARDED_BY(&ClientChannel::resolution_mu_) = nullptr;
281 
282   grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
283   grpc_closure recv_trailing_metadata_ready_;
284 
285   RefCountedPtr<DynamicFilters::Call> dynamic_call_;
286 
287   // Batches are added to this list when received from above.
288   // They are removed when we are done handling the batch (i.e., when
289   // either we have invoked all of the batch's callbacks or we have
290   // passed the batch down to the LB call and are not intercepting any of
291   // its callbacks).
292   grpc_transport_stream_op_batch* pending_batches_[MAX_PENDING_BATCHES] = {};
293 
294   // Set when we get a cancel_stream op.
295   grpc_error_handle cancel_error_;
296 };
297 
298 //
299 // Filter vtable
300 //
301 
302 const grpc_channel_filter ClientChannel::kFilterVtable = {
303     ClientChannel::FilterBasedCallData::StartTransportStreamOpBatch,
304     nullptr,
305     ClientChannel::StartTransportOp,
306     sizeof(ClientChannel::FilterBasedCallData),
307     ClientChannel::FilterBasedCallData::Init,
308     ClientChannel::FilterBasedCallData::SetPollent,
309     ClientChannel::FilterBasedCallData::Destroy,
310     sizeof(ClientChannel),
311     ClientChannel::Init,
312     grpc_channel_stack_no_post_init,
313     ClientChannel::Destroy,
314     ClientChannel::GetChannelInfo,
315     "client-channel",
316 };
317 
318 //
319 // dynamic termination filter
320 //
321 
322 namespace {
323 
324 class DynamicTerminationFilter {
325  public:
326   class CallData;
327 
328   static const grpc_channel_filter kFilterVtable;
329 
Init(grpc_channel_element * elem,grpc_channel_element_args * args)330   static grpc_error_handle Init(grpc_channel_element* elem,
331                                 grpc_channel_element_args* args) {
332     GPR_ASSERT(args->is_last);
333     GPR_ASSERT(elem->filter == &kFilterVtable);
334     new (elem->channel_data) DynamicTerminationFilter(args->channel_args);
335     return absl::OkStatus();
336   }
337 
Destroy(grpc_channel_element * elem)338   static void Destroy(grpc_channel_element* elem) {
339     auto* chand = static_cast<DynamicTerminationFilter*>(elem->channel_data);
340     chand->~DynamicTerminationFilter();
341   }
342 
343   // Will never be called.
StartTransportOp(grpc_channel_element *,grpc_transport_op *)344   static void StartTransportOp(grpc_channel_element* /*elem*/,
345                                grpc_transport_op* /*op*/) {}
GetChannelInfo(grpc_channel_element *,const grpc_channel_info *)346   static void GetChannelInfo(grpc_channel_element* /*elem*/,
347                              const grpc_channel_info* /*info*/) {}
348 
349  private:
DynamicTerminationFilter(const ChannelArgs & args)350   explicit DynamicTerminationFilter(const ChannelArgs& args)
351       : chand_(args.GetObject<ClientChannel>()) {}
352 
353   ClientChannel* chand_;
354 };
355 
356 class DynamicTerminationFilter::CallData {
357  public:
Init(grpc_call_element * elem,const grpc_call_element_args * args)358   static grpc_error_handle Init(grpc_call_element* elem,
359                                 const grpc_call_element_args* args) {
360     new (elem->call_data) CallData(*args);
361     return absl::OkStatus();
362   }
363 
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure * then_schedule_closure)364   static void Destroy(grpc_call_element* elem,
365                       const grpc_call_final_info* /*final_info*/,
366                       grpc_closure* then_schedule_closure) {
367     auto* calld = static_cast<CallData*>(elem->call_data);
368     RefCountedPtr<SubchannelCall> subchannel_call;
369     if (GPR_LIKELY(calld->lb_call_ != nullptr)) {
370       subchannel_call = calld->lb_call_->subchannel_call();
371     }
372     calld->~CallData();
373     if (GPR_LIKELY(subchannel_call != nullptr)) {
374       subchannel_call->SetAfterCallStackDestroy(then_schedule_closure);
375     } else {
376       // TODO(yashkt) : This can potentially be a Closure::Run
377       ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, absl::OkStatus());
378     }
379   }
380 
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)381   static void StartTransportStreamOpBatch(
382       grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
383     auto* calld = static_cast<CallData*>(elem->call_data);
384     calld->lb_call_->StartTransportStreamOpBatch(batch);
385   }
386 
SetPollent(grpc_call_element * elem,grpc_polling_entity * pollent)387   static void SetPollent(grpc_call_element* elem,
388                          grpc_polling_entity* pollent) {
389     auto* calld = static_cast<CallData*>(elem->call_data);
390     auto* chand = static_cast<DynamicTerminationFilter*>(elem->channel_data);
391     ClientChannel* client_channel = chand->chand_;
392     grpc_call_element_args args = {calld->owning_call_,  nullptr,
393                                    calld->call_context_, calld->path_,
394                                    /*start_time=*/0,     calld->deadline_,
395                                    calld->arena_,        calld->call_combiner_};
396     auto* service_config_call_data =
397         static_cast<ClientChannelServiceConfigCallData*>(
398             calld->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
399     calld->lb_call_ = client_channel->CreateLoadBalancedCall(
400         args, pollent, nullptr,
401         [service_config_call_data]() { service_config_call_data->Commit(); },
402         /*is_transparent_retry=*/false);
403     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
404       gpr_log(GPR_INFO,
405               "chand=%p dynamic_termination_calld=%p: create lb_call=%p", chand,
406               client_channel, calld->lb_call_.get());
407     }
408   }
409 
410  private:
CallData(const grpc_call_element_args & args)411   explicit CallData(const grpc_call_element_args& args)
412       : path_(CSliceRef(args.path)),
413         deadline_(args.deadline),
414         arena_(args.arena),
415         owning_call_(args.call_stack),
416         call_combiner_(args.call_combiner),
417         call_context_(args.context) {}
418 
~CallData()419   ~CallData() { CSliceUnref(path_); }
420 
421   grpc_slice path_;  // Request path.
422   Timestamp deadline_;
423   Arena* arena_;
424   grpc_call_stack* owning_call_;
425   CallCombiner* call_combiner_;
426   grpc_call_context_element* call_context_;
427 
428   OrphanablePtr<ClientChannel::FilterBasedLoadBalancedCall> lb_call_;
429 };
430 
431 const grpc_channel_filter DynamicTerminationFilter::kFilterVtable = {
432     DynamicTerminationFilter::CallData::StartTransportStreamOpBatch,
433     nullptr,
434     DynamicTerminationFilter::StartTransportOp,
435     sizeof(DynamicTerminationFilter::CallData),
436     DynamicTerminationFilter::CallData::Init,
437     DynamicTerminationFilter::CallData::SetPollent,
438     DynamicTerminationFilter::CallData::Destroy,
439     sizeof(DynamicTerminationFilter),
440     DynamicTerminationFilter::Init,
441     grpc_channel_stack_no_post_init,
442     DynamicTerminationFilter::Destroy,
443     DynamicTerminationFilter::GetChannelInfo,
444     "dynamic_filter_termination",
445 };
446 
447 }  // namespace
448 
449 //
450 // ClientChannel::ResolverResultHandler
451 //
452 
453 class ClientChannel::ResolverResultHandler : public Resolver::ResultHandler {
454  public:
ResolverResultHandler(ClientChannel * chand)455   explicit ResolverResultHandler(ClientChannel* chand) : chand_(chand) {
456     GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ResolverResultHandler");
457   }
458 
~ResolverResultHandler()459   ~ResolverResultHandler() override {
460     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
461       gpr_log(GPR_INFO, "chand=%p: resolver shutdown complete", chand_);
462     }
463     GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ResolverResultHandler");
464   }
465 
ReportResult(Resolver::Result result)466   void ReportResult(Resolver::Result result) override
467       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
468     chand_->OnResolverResultChangedLocked(std::move(result));
469   }
470 
471  private:
472   ClientChannel* chand_;
473 };
474 
475 //
476 // ClientChannel::SubchannelWrapper
477 //
478 
479 // This class is a wrapper for Subchannel that hides details of the
480 // channel's implementation (such as the connected subchannel) from the
481 // LB policy API.
482 //
483 // Note that no synchronization is needed here, because even if the
484 // underlying subchannel is shared between channels, this wrapper will only
485 // be used within one channel, so it will always be synchronized by the
486 // control plane work_serializer.
487 class ClientChannel::SubchannelWrapper : public SubchannelInterface {
488  public:
SubchannelWrapper(ClientChannel * chand,RefCountedPtr<Subchannel> subchannel)489   SubchannelWrapper(ClientChannel* chand, RefCountedPtr<Subchannel> subchannel)
490       : SubchannelInterface(GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)
491                                 ? "SubchannelWrapper"
492                                 : nullptr),
493         chand_(chand),
494         subchannel_(std::move(subchannel)) {
495     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
496       gpr_log(GPR_INFO,
497               "chand=%p: creating subchannel wrapper %p for subchannel %p",
498               chand, this, subchannel_.get());
499     }
500     GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "SubchannelWrapper");
501     if (chand_->channelz_node_ != nullptr) {
502       auto* subchannel_node = subchannel_->channelz_node();
503       if (subchannel_node != nullptr) {
504         auto it = chand_->subchannel_refcount_map_.find(subchannel_.get());
505         if (it == chand_->subchannel_refcount_map_.end()) {
506           chand_->channelz_node_->AddChildSubchannel(subchannel_node->uuid());
507           it = chand_->subchannel_refcount_map_.emplace(subchannel_.get(), 0)
508                    .first;
509         }
510         ++it->second;
511       }
512     }
513     chand_->subchannel_wrappers_.insert(this);
514   }
515 
~SubchannelWrapper()516   ~SubchannelWrapper() override {
517     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
518       gpr_log(GPR_INFO,
519               "chand=%p: destroying subchannel wrapper %p for subchannel %p",
520               chand_, this, subchannel_.get());
521     }
522     chand_->subchannel_wrappers_.erase(this);
523     if (chand_->channelz_node_ != nullptr) {
524       auto* subchannel_node = subchannel_->channelz_node();
525       if (subchannel_node != nullptr) {
526         auto it = chand_->subchannel_refcount_map_.find(subchannel_.get());
527         GPR_ASSERT(it != chand_->subchannel_refcount_map_.end());
528         --it->second;
529         if (it->second == 0) {
530           chand_->channelz_node_->RemoveChildSubchannel(
531               subchannel_node->uuid());
532           chand_->subchannel_refcount_map_.erase(it);
533         }
534       }
535     }
536     GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "SubchannelWrapper");
537   }
538 
WatchConnectivityState(std::unique_ptr<ConnectivityStateWatcherInterface> watcher)539   void WatchConnectivityState(
540       std::unique_ptr<ConnectivityStateWatcherInterface> watcher) override
541       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
542     auto& watcher_wrapper = watcher_map_[watcher.get()];
543     GPR_ASSERT(watcher_wrapper == nullptr);
544     watcher_wrapper = new WatcherWrapper(std::move(watcher),
545                                          Ref(DEBUG_LOCATION, "WatcherWrapper"));
546     subchannel_->WatchConnectivityState(
547         RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface>(
548             watcher_wrapper));
549   }
550 
CancelConnectivityStateWatch(ConnectivityStateWatcherInterface * watcher)551   void CancelConnectivityStateWatch(ConnectivityStateWatcherInterface* watcher)
552       override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
553     auto it = watcher_map_.find(watcher);
554     GPR_ASSERT(it != watcher_map_.end());
555     subchannel_->CancelConnectivityStateWatch(it->second);
556     watcher_map_.erase(it);
557   }
558 
connected_subchannel() const559   RefCountedPtr<ConnectedSubchannel> connected_subchannel() const {
560     return subchannel_->connected_subchannel();
561   }
562 
RequestConnection()563   void RequestConnection() override { subchannel_->RequestConnection(); }
564 
ResetBackoff()565   void ResetBackoff() override { subchannel_->ResetBackoff(); }
566 
AddDataWatcher(std::unique_ptr<DataWatcherInterface> watcher)567   void AddDataWatcher(std::unique_ptr<DataWatcherInterface> watcher) override
568       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
569     std::unique_ptr<InternalSubchannelDataWatcherInterface> internal_watcher(
570         static_cast<InternalSubchannelDataWatcherInterface*>(
571             watcher.release()));
572     internal_watcher->SetSubchannel(subchannel_.get());
573     data_watchers_.push_back(std::move(internal_watcher));
574   }
575 
ThrottleKeepaliveTime(int new_keepalive_time)576   void ThrottleKeepaliveTime(int new_keepalive_time) {
577     subchannel_->ThrottleKeepaliveTime(new_keepalive_time);
578   }
579 
580  private:
581   // This wrapper provides a bridge between the internal Subchannel API
582   // and the SubchannelInterface API that we expose to LB policies.
583   // It implements Subchannel::ConnectivityStateWatcherInterface and wraps
584   // the instance of SubchannelInterface::ConnectivityStateWatcherInterface
585   // that was passed in by the LB policy.  We pass an instance of this
586   // class to the underlying Subchannel, and when we get updates from
587   // the subchannel, we pass those on to the wrapped watcher to return
588   // the update to the LB policy.
589   //
590   // This class handles things like hopping into the WorkSerializer
591   // before passing notifications to the LB policy and propagating
592   // keepalive information betwen subchannels.
593   class WatcherWrapper : public Subchannel::ConnectivityStateWatcherInterface {
594    public:
WatcherWrapper(std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> watcher,RefCountedPtr<SubchannelWrapper> parent)595     WatcherWrapper(
596         std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
597             watcher,
598         RefCountedPtr<SubchannelWrapper> parent)
599         : watcher_(std::move(watcher)), parent_(std::move(parent)) {}
600 
~WatcherWrapper()601     ~WatcherWrapper() override {
602       auto* parent = parent_.release();  // ref owned by lambda
603       parent->chand_->work_serializer_->Run(
604           [parent]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(
605               *parent_->chand_->work_serializer_) {
606             parent->Unref(DEBUG_LOCATION, "WatcherWrapper");
607           },
608           DEBUG_LOCATION);
609     }
610 
OnConnectivityStateChange(grpc_connectivity_state state,const absl::Status & status)611     void OnConnectivityStateChange(grpc_connectivity_state state,
612                                    const absl::Status& status) override {
613       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
614         gpr_log(GPR_INFO,
615                 "chand=%p: connectivity change for subchannel wrapper %p "
616                 "subchannel %p; hopping into work_serializer",
617                 parent_->chand_, parent_.get(), parent_->subchannel_.get());
618       }
619       Ref().release();  // ref owned by lambda
620       parent_->chand_->work_serializer_->Run(
621           [this, state, status]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(
622               *parent_->chand_->work_serializer_) {
623             ApplyUpdateInControlPlaneWorkSerializer(state, status);
624             Unref();
625           },
626           DEBUG_LOCATION);
627     }
628 
interested_parties()629     grpc_pollset_set* interested_parties() override {
630       return watcher_->interested_parties();
631     }
632 
633    private:
ApplyUpdateInControlPlaneWorkSerializer(grpc_connectivity_state state,const absl::Status & status)634     void ApplyUpdateInControlPlaneWorkSerializer(grpc_connectivity_state state,
635                                                  const absl::Status& status)
636         ABSL_EXCLUSIVE_LOCKS_REQUIRED(*parent_->chand_->work_serializer_) {
637       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
638         gpr_log(GPR_INFO,
639                 "chand=%p: processing connectivity change in work serializer "
640                 "for subchannel wrapper %p subchannel %p watcher=%p "
641                 "state=%s status=%s",
642                 parent_->chand_, parent_.get(), parent_->subchannel_.get(),
643                 watcher_.get(), ConnectivityStateName(state),
644                 status.ToString().c_str());
645       }
646       absl::optional<absl::Cord> keepalive_throttling =
647           status.GetPayload(kKeepaliveThrottlingKey);
648       if (keepalive_throttling.has_value()) {
649         int new_keepalive_time = -1;
650         if (absl::SimpleAtoi(std::string(keepalive_throttling.value()),
651                              &new_keepalive_time)) {
652           if (new_keepalive_time > parent_->chand_->keepalive_time_) {
653             parent_->chand_->keepalive_time_ = new_keepalive_time;
654             if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
655               gpr_log(GPR_INFO, "chand=%p: throttling keepalive time to %d",
656                       parent_->chand_, parent_->chand_->keepalive_time_);
657             }
658             // Propagate the new keepalive time to all subchannels. This is so
659             // that new transports created by any subchannel (and not just the
660             // subchannel that received the GOAWAY), use the new keepalive time.
661             for (auto* subchannel_wrapper :
662                  parent_->chand_->subchannel_wrappers_) {
663               subchannel_wrapper->ThrottleKeepaliveTime(new_keepalive_time);
664             }
665           }
666         } else {
667           gpr_log(GPR_ERROR, "chand=%p: Illegal keepalive throttling value %s",
668                   parent_->chand_,
669                   std::string(keepalive_throttling.value()).c_str());
670         }
671       }
672       // Propagate status only in state TF.
673       // We specifically want to avoid propagating the status for
674       // state IDLE that the real subchannel gave us only for the
675       // purpose of keepalive propagation.
676       watcher_->OnConnectivityStateChange(
677           state,
678           state == GRPC_CHANNEL_TRANSIENT_FAILURE ? status : absl::OkStatus());
679     }
680 
681     std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
682         watcher_;
683     RefCountedPtr<SubchannelWrapper> parent_;
684   };
685 
686   ClientChannel* chand_;
687   RefCountedPtr<Subchannel> subchannel_;
688   // Maps from the address of the watcher passed to us by the LB policy
689   // to the address of the WrapperWatcher that we passed to the underlying
690   // subchannel.  This is needed so that when the LB policy calls
691   // CancelConnectivityStateWatch() with its watcher, we know the
692   // corresponding WrapperWatcher to cancel on the underlying subchannel.
693   std::map<ConnectivityStateWatcherInterface*, WatcherWrapper*> watcher_map_
694       ABSL_GUARDED_BY(*chand_->work_serializer_);
695   std::vector<std::unique_ptr<InternalSubchannelDataWatcherInterface>>
696       data_watchers_ ABSL_GUARDED_BY(*chand_->work_serializer_);
697 };
698 
699 //
700 // ClientChannel::ExternalConnectivityWatcher
701 //
702 
ExternalConnectivityWatcher(ClientChannel * chand,grpc_polling_entity pollent,grpc_connectivity_state * state,grpc_closure * on_complete,grpc_closure * watcher_timer_init)703 ClientChannel::ExternalConnectivityWatcher::ExternalConnectivityWatcher(
704     ClientChannel* chand, grpc_polling_entity pollent,
705     grpc_connectivity_state* state, grpc_closure* on_complete,
706     grpc_closure* watcher_timer_init)
707     : chand_(chand),
708       pollent_(pollent),
709       initial_state_(*state),
710       state_(state),
711       on_complete_(on_complete),
712       watcher_timer_init_(watcher_timer_init) {
713   grpc_polling_entity_add_to_pollset_set(&pollent_,
714                                          chand_->interested_parties_);
715   GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher");
716   {
717     MutexLock lock(&chand_->external_watchers_mu_);
718     // Will be deleted when the watch is complete.
719     GPR_ASSERT(chand->external_watchers_[on_complete] == nullptr);
720     // Store a ref to the watcher in the external_watchers_ map.
721     chand->external_watchers_[on_complete] =
722         Ref(DEBUG_LOCATION, "AddWatcherToExternalWatchersMapLocked");
723   }
724   // Pass the ref from creating the object to Start().
725   chand_->work_serializer_->Run(
726       [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
727         // The ref is passed to AddWatcherLocked().
728         AddWatcherLocked();
729       },
730       DEBUG_LOCATION);
731 }
732 
~ExternalConnectivityWatcher()733 ClientChannel::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() {
734   grpc_polling_entity_del_from_pollset_set(&pollent_,
735                                            chand_->interested_parties_);
736   GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
737                            "ExternalConnectivityWatcher");
738 }
739 
740 void ClientChannel::ExternalConnectivityWatcher::
RemoveWatcherFromExternalWatchersMap(ClientChannel * chand,grpc_closure * on_complete,bool cancel)741     RemoveWatcherFromExternalWatchersMap(ClientChannel* chand,
742                                          grpc_closure* on_complete,
743                                          bool cancel) {
744   RefCountedPtr<ExternalConnectivityWatcher> watcher;
745   {
746     MutexLock lock(&chand->external_watchers_mu_);
747     auto it = chand->external_watchers_.find(on_complete);
748     if (it != chand->external_watchers_.end()) {
749       watcher = std::move(it->second);
750       chand->external_watchers_.erase(it);
751     }
752   }
753   // watcher->Cancel() will hop into the WorkSerializer, so we have to unlock
754   // the mutex before calling it.
755   if (watcher != nullptr && cancel) watcher->Cancel();
756 }
757 
Notify(grpc_connectivity_state state,const absl::Status &)758 void ClientChannel::ExternalConnectivityWatcher::Notify(
759     grpc_connectivity_state state, const absl::Status& /* status */) {
760   bool done = false;
761   if (!done_.compare_exchange_strong(done, true, std::memory_order_relaxed,
762                                      std::memory_order_relaxed)) {
763     return;  // Already done.
764   }
765   // Remove external watcher.
766   ExternalConnectivityWatcher::RemoveWatcherFromExternalWatchersMap(
767       chand_, on_complete_, /*cancel=*/false);
768   // Report new state to the user.
769   *state_ = state;
770   ExecCtx::Run(DEBUG_LOCATION, on_complete_, absl::OkStatus());
771   // Hop back into the work_serializer to clean up.
772   // Not needed in state SHUTDOWN, because the tracker will
773   // automatically remove all watchers in that case.
774   // Note: The callback takes a ref in case the ref inside the state tracker
775   // gets removed before the callback runs via a SHUTDOWN notification.
776   if (state != GRPC_CHANNEL_SHUTDOWN) {
777     Ref(DEBUG_LOCATION, "RemoveWatcherLocked()").release();
778     chand_->work_serializer_->Run(
779         [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
780           RemoveWatcherLocked();
781           Unref(DEBUG_LOCATION, "RemoveWatcherLocked()");
782         },
783         DEBUG_LOCATION);
784   }
785 }
786 
Cancel()787 void ClientChannel::ExternalConnectivityWatcher::Cancel() {
788   bool done = false;
789   if (!done_.compare_exchange_strong(done, true, std::memory_order_relaxed,
790                                      std::memory_order_relaxed)) {
791     return;  // Already done.
792   }
793   ExecCtx::Run(DEBUG_LOCATION, on_complete_, absl::CancelledError());
794   // Hop back into the work_serializer to clean up.
795   // Note: The callback takes a ref in case the ref inside the state tracker
796   // gets removed before the callback runs via a SHUTDOWN notification.
797   Ref(DEBUG_LOCATION, "RemoveWatcherLocked()").release();
798   chand_->work_serializer_->Run(
799       [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
800         RemoveWatcherLocked();
801         Unref(DEBUG_LOCATION, "RemoveWatcherLocked()");
802       },
803       DEBUG_LOCATION);
804 }
805 
AddWatcherLocked()806 void ClientChannel::ExternalConnectivityWatcher::AddWatcherLocked() {
807   Closure::Run(DEBUG_LOCATION, watcher_timer_init_, absl::OkStatus());
808   // Add new watcher. Pass the ref of the object from creation to OrphanablePtr.
809   chand_->state_tracker_.AddWatcher(
810       initial_state_, OrphanablePtr<ConnectivityStateWatcherInterface>(this));
811 }
812 
RemoveWatcherLocked()813 void ClientChannel::ExternalConnectivityWatcher::RemoveWatcherLocked() {
814   chand_->state_tracker_.RemoveWatcher(this);
815 }
816 
817 //
818 // ClientChannel::ConnectivityWatcherAdder
819 //
820 
821 class ClientChannel::ConnectivityWatcherAdder {
822  public:
ConnectivityWatcherAdder(ClientChannel * chand,grpc_connectivity_state initial_state,OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)823   ConnectivityWatcherAdder(
824       ClientChannel* chand, grpc_connectivity_state initial_state,
825       OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)
826       : chand_(chand),
827         initial_state_(initial_state),
828         watcher_(std::move(watcher)) {
829     GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherAdder");
830     chand_->work_serializer_->Run(
831         [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
832           AddWatcherLocked();
833         },
834         DEBUG_LOCATION);
835   }
836 
837  private:
AddWatcherLocked()838   void AddWatcherLocked()
839       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
840     chand_->state_tracker_.AddWatcher(initial_state_, std::move(watcher_));
841     GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ConnectivityWatcherAdder");
842     delete this;
843   }
844 
845   ClientChannel* chand_;
846   grpc_connectivity_state initial_state_;
847   OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher_;
848 };
849 
850 //
851 // ClientChannel::ConnectivityWatcherRemover
852 //
853 
854 class ClientChannel::ConnectivityWatcherRemover {
855  public:
ConnectivityWatcherRemover(ClientChannel * chand,AsyncConnectivityStateWatcherInterface * watcher)856   ConnectivityWatcherRemover(ClientChannel* chand,
857                              AsyncConnectivityStateWatcherInterface* watcher)
858       : chand_(chand), watcher_(watcher) {
859     GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherRemover");
860     chand_->work_serializer_->Run(
861         [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
862           RemoveWatcherLocked();
863         },
864         DEBUG_LOCATION);
865   }
866 
867  private:
RemoveWatcherLocked()868   void RemoveWatcherLocked()
869       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
870     chand_->state_tracker_.RemoveWatcher(watcher_);
871     GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
872                              "ConnectivityWatcherRemover");
873     delete this;
874   }
875 
876   ClientChannel* chand_;
877   AsyncConnectivityStateWatcherInterface* watcher_;
878 };
879 
880 //
881 // ClientChannel::ClientChannelControlHelper
882 //
883 
884 class ClientChannel::ClientChannelControlHelper
885     : public LoadBalancingPolicy::ChannelControlHelper {
886  public:
ClientChannelControlHelper(ClientChannel * chand)887   explicit ClientChannelControlHelper(ClientChannel* chand) : chand_(chand) {
888     GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ClientChannelControlHelper");
889   }
890 
~ClientChannelControlHelper()891   ~ClientChannelControlHelper() override {
892     GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
893                              "ClientChannelControlHelper");
894   }
895 
CreateSubchannel(ServerAddress address,const ChannelArgs & args)896   RefCountedPtr<SubchannelInterface> CreateSubchannel(
897       ServerAddress address, const ChannelArgs& args) override
898       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
899     if (chand_->resolver_ == nullptr) return nullptr;  // Shutting down.
900     ChannelArgs subchannel_args = ClientChannel::MakeSubchannelArgs(
901         args, address.args(), chand_->subchannel_pool_,
902         chand_->default_authority_);
903     // Create subchannel.
904     RefCountedPtr<Subchannel> subchannel =
905         chand_->client_channel_factory_->CreateSubchannel(address.address(),
906                                                           subchannel_args);
907     if (subchannel == nullptr) return nullptr;
908     // Make sure the subchannel has updated keepalive time.
909     subchannel->ThrottleKeepaliveTime(chand_->keepalive_time_);
910     // Create and return wrapper for the subchannel.
911     return MakeRefCounted<SubchannelWrapper>(chand_, std::move(subchannel));
912   }
913 
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker)914   void UpdateState(grpc_connectivity_state state, const absl::Status& status,
915                    RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker)
916       override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
917     if (chand_->resolver_ == nullptr) return;  // Shutting down.
918     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
919       const char* extra = chand_->disconnect_error_.ok()
920                               ? ""
921                               : " (ignoring -- channel shutting down)";
922       gpr_log(GPR_INFO, "chand=%p: update: state=%s status=(%s) picker=%p%s",
923               chand_, ConnectivityStateName(state), status.ToString().c_str(),
924               picker.get(), extra);
925     }
926     // Do update only if not shutting down.
927     if (chand_->disconnect_error_.ok()) {
928       chand_->UpdateStateAndPickerLocked(state, status, "helper",
929                                          std::move(picker));
930     }
931   }
932 
RequestReresolution()933   void RequestReresolution() override
934       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
935     if (chand_->resolver_ == nullptr) return;  // Shutting down.
936     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
937       gpr_log(GPR_INFO, "chand=%p: started name re-resolving", chand_);
938     }
939     chand_->resolver_->RequestReresolutionLocked();
940   }
941 
GetAuthority()942   absl::string_view GetAuthority() override {
943     return chand_->default_authority_;
944   }
945 
GetEventEngine()946   grpc_event_engine::experimental::EventEngine* GetEventEngine() override {
947     return chand_->owning_stack_->EventEngine();
948   }
949 
AddTraceEvent(TraceSeverity severity,absl::string_view message)950   void AddTraceEvent(TraceSeverity severity, absl::string_view message) override
951       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
952     if (chand_->resolver_ == nullptr) return;  // Shutting down.
953     if (chand_->channelz_node_ != nullptr) {
954       chand_->channelz_node_->AddTraceEvent(
955           ConvertSeverityEnum(severity),
956           grpc_slice_from_copied_buffer(message.data(), message.size()));
957     }
958   }
959 
960  private:
ConvertSeverityEnum(TraceSeverity severity)961   static channelz::ChannelTrace::Severity ConvertSeverityEnum(
962       TraceSeverity severity) {
963     if (severity == TRACE_INFO) return channelz::ChannelTrace::Info;
964     if (severity == TRACE_WARNING) return channelz::ChannelTrace::Warning;
965     return channelz::ChannelTrace::Error;
966   }
967 
968   ClientChannel* chand_;
969 };
970 
971 //
972 // ClientChannel implementation
973 //
974 
GetFromChannel(Channel * channel)975 ClientChannel* ClientChannel::GetFromChannel(Channel* channel) {
976   grpc_channel_element* elem =
977       grpc_channel_stack_last_element(channel->channel_stack());
978   if (elem->filter != &kFilterVtable) return nullptr;
979   return static_cast<ClientChannel*>(elem->channel_data);
980 }
981 
Init(grpc_channel_element * elem,grpc_channel_element_args * args)982 grpc_error_handle ClientChannel::Init(grpc_channel_element* elem,
983                                       grpc_channel_element_args* args) {
984   GPR_ASSERT(args->is_last);
985   GPR_ASSERT(elem->filter == &kFilterVtable);
986   grpc_error_handle error;
987   new (elem->channel_data) ClientChannel(args, &error);
988   return error;
989 }
990 
Destroy(grpc_channel_element * elem)991 void ClientChannel::Destroy(grpc_channel_element* elem) {
992   ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
993   chand->~ClientChannel();
994 }
995 
996 namespace {
997 
GetSubchannelPool(const ChannelArgs & args)998 RefCountedPtr<SubchannelPoolInterface> GetSubchannelPool(
999     const ChannelArgs& args) {
1000   if (args.GetBool(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL).value_or(false)) {
1001     return MakeRefCounted<LocalSubchannelPool>();
1002   }
1003   return GlobalSubchannelPool::instance();
1004 }
1005 
1006 }  // namespace
1007 
ClientChannel(grpc_channel_element_args * args,grpc_error_handle * error)1008 ClientChannel::ClientChannel(grpc_channel_element_args* args,
1009                              grpc_error_handle* error)
1010     : channel_args_(args->channel_args),
1011       deadline_checking_enabled_(grpc_deadline_checking_enabled(channel_args_)),
1012       owning_stack_(args->channel_stack),
1013       client_channel_factory_(channel_args_.GetObject<ClientChannelFactory>()),
1014       channelz_node_(channel_args_.GetObject<channelz::ChannelNode>()),
1015       interested_parties_(grpc_pollset_set_create()),
1016       service_config_parser_index_(
1017           internal::ClientChannelServiceConfigParser::ParserIndex()),
1018       work_serializer_(std::make_shared<WorkSerializer>()),
1019       state_tracker_("client_channel", GRPC_CHANNEL_IDLE),
1020       subchannel_pool_(GetSubchannelPool(channel_args_)) {
1021   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1022     gpr_log(GPR_INFO, "chand=%p: creating client_channel for channel stack %p",
1023             this, owning_stack_);
1024   }
1025   // Start backup polling.
1026   grpc_client_channel_start_backup_polling(interested_parties_);
1027   // Check client channel factory.
1028   if (client_channel_factory_ == nullptr) {
1029     *error = GRPC_ERROR_CREATE(
1030         "Missing client channel factory in args for client channel filter");
1031     return;
1032   }
1033   // Get default service config.  If none is specified via the client API,
1034   // we use an empty config.
1035   absl::optional<absl::string_view> service_config_json =
1036       channel_args_.GetString(GRPC_ARG_SERVICE_CONFIG);
1037   if (!service_config_json.has_value()) service_config_json = "{}";
1038   *error = absl::OkStatus();
1039   auto service_config =
1040       ServiceConfigImpl::Create(channel_args_, *service_config_json);
1041   if (!service_config.ok()) {
1042     *error = absl_status_to_grpc_error(service_config.status());
1043     return;
1044   }
1045   default_service_config_ = std::move(*service_config);
1046   // Get URI to resolve, using proxy mapper if needed.
1047   absl::optional<std::string> server_uri =
1048       channel_args_.GetOwnedString(GRPC_ARG_SERVER_URI);
1049   if (!server_uri.has_value()) {
1050     *error = GRPC_ERROR_CREATE(
1051         "target URI channel arg missing or wrong type in client channel "
1052         "filter");
1053     return;
1054   }
1055   uri_to_resolve_ = CoreConfiguration::Get()
1056                         .proxy_mapper_registry()
1057                         .MapName(*server_uri, &channel_args_)
1058                         .value_or(*server_uri);
1059   // Make sure the URI to resolve is valid, so that we know that
1060   // resolver creation will succeed later.
1061   if (!CoreConfiguration::Get().resolver_registry().IsValidTarget(
1062           uri_to_resolve_)) {
1063     *error = GRPC_ERROR_CREATE(
1064         absl::StrCat("the target uri is not valid: ", uri_to_resolve_));
1065     return;
1066   }
1067   // Strip out service config channel arg, so that it doesn't affect
1068   // subchannel uniqueness when the args flow down to that layer.
1069   channel_args_ = channel_args_.Remove(GRPC_ARG_SERVICE_CONFIG);
1070   // Set initial keepalive time.
1071   auto keepalive_arg = channel_args_.GetInt(GRPC_ARG_KEEPALIVE_TIME_MS);
1072   if (keepalive_arg.has_value()) {
1073     keepalive_time_ = Clamp(*keepalive_arg, 1, INT_MAX);
1074   } else {
1075     keepalive_time_ = -1;  // unset
1076   }
1077   // Set default authority.
1078   absl::optional<std::string> default_authority =
1079       channel_args_.GetOwnedString(GRPC_ARG_DEFAULT_AUTHORITY);
1080   if (!default_authority.has_value()) {
1081     default_authority_ =
1082         CoreConfiguration::Get().resolver_registry().GetDefaultAuthority(
1083             *server_uri);
1084   } else {
1085     default_authority_ = std::move(*default_authority);
1086   }
1087   // Success.
1088   *error = absl::OkStatus();
1089 }
1090 
~ClientChannel()1091 ClientChannel::~ClientChannel() {
1092   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1093     gpr_log(GPR_INFO, "chand=%p: destroying channel", this);
1094   }
1095   DestroyResolverAndLbPolicyLocked();
1096   // Stop backup polling.
1097   grpc_client_channel_stop_backup_polling(interested_parties_);
1098   grpc_pollset_set_destroy(interested_parties_);
1099 }
1100 
1101 OrphanablePtr<ClientChannel::FilterBasedLoadBalancedCall>
CreateLoadBalancedCall(const grpc_call_element_args & args,grpc_polling_entity * pollent,grpc_closure * on_call_destruction_complete,absl::AnyInvocable<void ()> on_commit,bool is_transparent_retry)1102 ClientChannel::CreateLoadBalancedCall(
1103     const grpc_call_element_args& args, grpc_polling_entity* pollent,
1104     grpc_closure* on_call_destruction_complete,
1105     absl::AnyInvocable<void()> on_commit, bool is_transparent_retry) {
1106   return OrphanablePtr<FilterBasedLoadBalancedCall>(
1107       args.arena->New<FilterBasedLoadBalancedCall>(
1108           this, args, pollent, on_call_destruction_complete,
1109           std::move(on_commit), is_transparent_retry));
1110 }
1111 
MakeSubchannelArgs(const ChannelArgs & channel_args,const ChannelArgs & address_args,const RefCountedPtr<SubchannelPoolInterface> & subchannel_pool,const std::string & channel_default_authority)1112 ChannelArgs ClientChannel::MakeSubchannelArgs(
1113     const ChannelArgs& channel_args, const ChannelArgs& address_args,
1114     const RefCountedPtr<SubchannelPoolInterface>& subchannel_pool,
1115     const std::string& channel_default_authority) {
1116   // Note that we start with the channel-level args and then apply the
1117   // per-address args, so that if a value is present in both, the one
1118   // in the channel-level args is used.  This is particularly important
1119   // for the GRPC_ARG_DEFAULT_AUTHORITY arg, which we want to allow
1120   // resolvers to set on a per-address basis only if the application
1121   // did not explicitly set it at the channel level.
1122   return channel_args.UnionWith(address_args)
1123       .SetObject(subchannel_pool)
1124       // If we haven't already set the default authority arg (i.e., it
1125       // was not explicitly set by the application nor overridden by
1126       // the resolver), add it from the channel's default.
1127       .SetIfUnset(GRPC_ARG_DEFAULT_AUTHORITY, channel_default_authority)
1128       // Remove channel args that should not affect subchannel
1129       // uniqueness.
1130       .Remove(GRPC_ARG_HEALTH_CHECK_SERVICE_NAME)
1131       .Remove(GRPC_ARG_INHIBIT_HEALTH_CHECKING)
1132       .Remove(GRPC_ARG_CHANNELZ_CHANNEL_NODE);
1133 }
1134 
ReprocessQueuedResolverCalls()1135 void ClientChannel::ReprocessQueuedResolverCalls() {
1136   for (CallData* calld : resolver_queued_calls_) {
1137     calld->RemoveCallFromResolverQueuedCallsLocked();
1138     calld->RetryCheckResolutionLocked();
1139   }
1140   resolver_queued_calls_.clear();
1141 }
1142 
1143 namespace {
1144 
ChooseLbPolicy(const Resolver::Result & resolver_result,const internal::ClientChannelGlobalParsedConfig * parsed_service_config)1145 RefCountedPtr<LoadBalancingPolicy::Config> ChooseLbPolicy(
1146     const Resolver::Result& resolver_result,
1147     const internal::ClientChannelGlobalParsedConfig* parsed_service_config) {
1148   // Prefer the LB policy config found in the service config.
1149   if (parsed_service_config->parsed_lb_config() != nullptr) {
1150     return parsed_service_config->parsed_lb_config();
1151   }
1152   // Try the deprecated LB policy name from the service config.
1153   // If not, try the setting from channel args.
1154   absl::optional<absl::string_view> policy_name;
1155   if (!parsed_service_config->parsed_deprecated_lb_policy().empty()) {
1156     policy_name = parsed_service_config->parsed_deprecated_lb_policy();
1157   } else {
1158     policy_name = resolver_result.args.GetString(GRPC_ARG_LB_POLICY_NAME);
1159     bool requires_config = false;
1160     if (policy_name.has_value() &&
1161         (!CoreConfiguration::Get()
1162               .lb_policy_registry()
1163               .LoadBalancingPolicyExists(*policy_name, &requires_config) ||
1164          requires_config)) {
1165       if (requires_config) {
1166         gpr_log(GPR_ERROR,
1167                 "LB policy: %s passed through channel_args must not "
1168                 "require a config. Using pick_first instead.",
1169                 std::string(*policy_name).c_str());
1170       } else {
1171         gpr_log(GPR_ERROR,
1172                 "LB policy: %s passed through channel_args does not exist. "
1173                 "Using pick_first instead.",
1174                 std::string(*policy_name).c_str());
1175       }
1176       policy_name = "pick_first";
1177     }
1178   }
1179   // Use pick_first if nothing was specified and we didn't select grpclb
1180   // above.
1181   if (!policy_name.has_value()) policy_name = "pick_first";
1182   // Now that we have the policy name, construct an empty config for it.
1183   Json config_json = Json::FromArray({Json::FromObject({
1184       {std::string(*policy_name), Json::FromObject({})},
1185   })});
1186   auto lb_policy_config =
1187       CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
1188           config_json);
1189   // The policy name came from one of three places:
1190   // - The deprecated loadBalancingPolicy field in the service config,
1191   //   in which case the code in ClientChannelServiceConfigParser
1192   //   already verified that the policy does not require a config.
1193   // - One of the hard-coded values here, all of which are known to not
1194   //   require a config.
1195   // - A channel arg, in which case we check that the specified policy exists
1196   //   and accepts an empty config. If not, we revert to using pick_first
1197   //   lb_policy
1198   GPR_ASSERT(lb_policy_config.ok());
1199   return std::move(*lb_policy_config);
1200 }
1201 
1202 }  // namespace
1203 
OnResolverResultChangedLocked(Resolver::Result result)1204 void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) {
1205   // Handle race conditions.
1206   if (resolver_ == nullptr) return;
1207   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1208     gpr_log(GPR_INFO, "chand=%p: got resolver result", this);
1209   }
1210   // Grab resolver result health callback.
1211   auto resolver_callback = std::move(result.result_health_callback);
1212   absl::Status resolver_result_status;
1213   // We only want to trace the address resolution in the follow cases:
1214   // (a) Address resolution resulted in service config change.
1215   // (b) Address resolution that causes number of backends to go from
1216   //     zero to non-zero.
1217   // (c) Address resolution that causes number of backends to go from
1218   //     non-zero to zero.
1219   // (d) Address resolution that causes a new LB policy to be created.
1220   //
1221   // We track a list of strings to eventually be concatenated and traced.
1222   std::vector<const char*> trace_strings;
1223   const bool resolution_contains_addresses =
1224       result.addresses.ok() && !result.addresses->empty();
1225   if (!resolution_contains_addresses &&
1226       previous_resolution_contained_addresses_) {
1227     trace_strings.push_back("Address list became empty");
1228   } else if (resolution_contains_addresses &&
1229              !previous_resolution_contained_addresses_) {
1230     trace_strings.push_back("Address list became non-empty");
1231   }
1232   previous_resolution_contained_addresses_ = resolution_contains_addresses;
1233   std::string service_config_error_string_storage;
1234   if (!result.service_config.ok()) {
1235     service_config_error_string_storage =
1236         result.service_config.status().ToString();
1237     trace_strings.push_back(service_config_error_string_storage.c_str());
1238   }
1239   // Choose the service config.
1240   RefCountedPtr<ServiceConfig> service_config;
1241   RefCountedPtr<ConfigSelector> config_selector;
1242   if (!result.service_config.ok()) {
1243     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1244       gpr_log(GPR_INFO, "chand=%p: resolver returned service config error: %s",
1245               this, result.service_config.status().ToString().c_str());
1246     }
1247     // If the service config was invalid, then fallback to the
1248     // previously returned service config.
1249     if (saved_service_config_ != nullptr) {
1250       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1251         gpr_log(GPR_INFO,
1252                 "chand=%p: resolver returned invalid service config. "
1253                 "Continuing to use previous service config.",
1254                 this);
1255       }
1256       service_config = saved_service_config_;
1257       config_selector = saved_config_selector_;
1258     } else {
1259       // We received a service config error and we don't have a
1260       // previous service config to fall back to.  Put the channel into
1261       // TRANSIENT_FAILURE.
1262       OnResolverErrorLocked(result.service_config.status());
1263       trace_strings.push_back("no valid service config");
1264       resolver_result_status =
1265           absl::UnavailableError("no valid service config");
1266     }
1267   } else if (*result.service_config == nullptr) {
1268     // Resolver did not return any service config.
1269     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1270       gpr_log(GPR_INFO,
1271               "chand=%p: resolver returned no service config. Using default "
1272               "service config for channel.",
1273               this);
1274     }
1275     service_config = default_service_config_;
1276   } else {
1277     // Use ServiceConfig and ConfigSelector returned by resolver.
1278     service_config = std::move(*result.service_config);
1279     config_selector = result.args.GetObjectRef<ConfigSelector>();
1280   }
1281   // Note: The only case in which service_config is null here is if the resolver
1282   // returned a service config error and we don't have a previous service
1283   // config to fall back to.
1284   if (service_config != nullptr) {
1285     // Extract global config for client channel.
1286     const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
1287         static_cast<const internal::ClientChannelGlobalParsedConfig*>(
1288             service_config->GetGlobalParsedConfig(
1289                 service_config_parser_index_));
1290     // Choose LB policy config.
1291     RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config =
1292         ChooseLbPolicy(result, parsed_service_config);
1293     // Check if the ServiceConfig has changed.
1294     const bool service_config_changed =
1295         saved_service_config_ == nullptr ||
1296         service_config->json_string() != saved_service_config_->json_string();
1297     // Check if the ConfigSelector has changed.
1298     const bool config_selector_changed = !ConfigSelector::Equals(
1299         saved_config_selector_.get(), config_selector.get());
1300     // If either has changed, apply the global parameters now.
1301     if (service_config_changed || config_selector_changed) {
1302       // Update service config in control plane.
1303       UpdateServiceConfigInControlPlaneLocked(
1304           std::move(service_config), std::move(config_selector),
1305           std::string(lb_policy_config->name()));
1306     } else if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1307       gpr_log(GPR_INFO, "chand=%p: service config not changed", this);
1308     }
1309     // Create or update LB policy, as needed.
1310     resolver_result_status = CreateOrUpdateLbPolicyLocked(
1311         std::move(lb_policy_config),
1312         parsed_service_config->health_check_service_name(), std::move(result));
1313     if (service_config_changed || config_selector_changed) {
1314       // Start using new service config for calls.
1315       // This needs to happen after the LB policy has been updated, since
1316       // the ConfigSelector may need the LB policy to know about new
1317       // destinations before it can send RPCs to those destinations.
1318       UpdateServiceConfigInDataPlaneLocked();
1319       // TODO(ncteisen): might be worth somehow including a snippet of the
1320       // config in the trace, at the risk of bloating the trace logs.
1321       trace_strings.push_back("Service config changed");
1322     }
1323   }
1324   // Invoke resolver callback if needed.
1325   if (resolver_callback != nullptr) {
1326     resolver_callback(std::move(resolver_result_status));
1327   }
1328   // Add channel trace event.
1329   if (!trace_strings.empty()) {
1330     std::string message =
1331         absl::StrCat("Resolution event: ", absl::StrJoin(trace_strings, ", "));
1332     if (channelz_node_ != nullptr) {
1333       channelz_node_->AddTraceEvent(channelz::ChannelTrace::Severity::Info,
1334                                     grpc_slice_from_cpp_string(message));
1335     }
1336   }
1337 }
1338 
OnResolverErrorLocked(absl::Status status)1339 void ClientChannel::OnResolverErrorLocked(absl::Status status) {
1340   if (resolver_ == nullptr) return;
1341   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1342     gpr_log(GPR_INFO, "chand=%p: resolver transient failure: %s", this,
1343             status.ToString().c_str());
1344   }
1345   // If we already have an LB policy from a previous resolution
1346   // result, then we continue to let it set the connectivity state.
1347   // Otherwise, we go into TRANSIENT_FAILURE.
1348   if (lb_policy_ == nullptr) {
1349     // Update connectivity state.
1350     UpdateStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, status,
1351                       "resolver failure");
1352     {
1353       MutexLock lock(&resolution_mu_);
1354       // Update resolver transient failure.
1355       resolver_transient_failure_error_ =
1356           MaybeRewriteIllegalStatusCode(status, "resolver");
1357       ReprocessQueuedResolverCalls();
1358     }
1359   }
1360 }
1361 
CreateOrUpdateLbPolicyLocked(RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,const absl::optional<std::string> & health_check_service_name,Resolver::Result result)1362 absl::Status ClientChannel::CreateOrUpdateLbPolicyLocked(
1363     RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
1364     const absl::optional<std::string>& health_check_service_name,
1365     Resolver::Result result) {
1366   // Construct update.
1367   LoadBalancingPolicy::UpdateArgs update_args;
1368   update_args.addresses = std::move(result.addresses);
1369   update_args.config = std::move(lb_policy_config);
1370   update_args.resolution_note = std::move(result.resolution_note);
1371   // Remove the config selector from channel args so that we're not holding
1372   // unnecessary refs that cause it to be destroyed somewhere other than in the
1373   // WorkSerializer.
1374   update_args.args = result.args.Remove(GRPC_ARG_CONFIG_SELECTOR);
1375   // Add health check service name to channel args.
1376   if (health_check_service_name.has_value()) {
1377     update_args.args = update_args.args.Set(GRPC_ARG_HEALTH_CHECK_SERVICE_NAME,
1378                                             *health_check_service_name);
1379   }
1380   // Create policy if needed.
1381   if (lb_policy_ == nullptr) {
1382     lb_policy_ = CreateLbPolicyLocked(update_args.args);
1383   }
1384   // Update the policy.
1385   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1386     gpr_log(GPR_INFO, "chand=%p: Updating child policy %p", this,
1387             lb_policy_.get());
1388   }
1389   return lb_policy_->UpdateLocked(std::move(update_args));
1390 }
1391 
1392 // Creates a new LB policy.
CreateLbPolicyLocked(const ChannelArgs & args)1393 OrphanablePtr<LoadBalancingPolicy> ClientChannel::CreateLbPolicyLocked(
1394     const ChannelArgs& args) {
1395   // The LB policy will start in state CONNECTING but will not
1396   // necessarily send us an update synchronously, so set state to
1397   // CONNECTING (in case the resolver had previously failed and put the
1398   // channel into TRANSIENT_FAILURE) and make sure we have a queueing picker.
1399   UpdateStateAndPickerLocked(
1400       GRPC_CHANNEL_CONNECTING, absl::Status(), "started resolving",
1401       MakeRefCounted<LoadBalancingPolicy::QueuePicker>(nullptr));
1402   // Now create the LB policy.
1403   LoadBalancingPolicy::Args lb_policy_args;
1404   lb_policy_args.work_serializer = work_serializer_;
1405   lb_policy_args.channel_control_helper =
1406       std::make_unique<ClientChannelControlHelper>(this);
1407   lb_policy_args.args = args;
1408   OrphanablePtr<LoadBalancingPolicy> lb_policy =
1409       MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
1410                                          &grpc_client_channel_trace);
1411   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1412     gpr_log(GPR_INFO, "chand=%p: created new LB policy %p", this,
1413             lb_policy.get());
1414   }
1415   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
1416                                    interested_parties_);
1417   return lb_policy;
1418 }
1419 
UpdateServiceConfigInControlPlaneLocked(RefCountedPtr<ServiceConfig> service_config,RefCountedPtr<ConfigSelector> config_selector,std::string lb_policy_name)1420 void ClientChannel::UpdateServiceConfigInControlPlaneLocked(
1421     RefCountedPtr<ServiceConfig> service_config,
1422     RefCountedPtr<ConfigSelector> config_selector, std::string lb_policy_name) {
1423   std::string service_config_json(service_config->json_string());
1424   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1425     gpr_log(GPR_INFO, "chand=%p: using service config: \"%s\"", this,
1426             service_config_json.c_str());
1427   }
1428   // Save service config.
1429   saved_service_config_ = std::move(service_config);
1430   // Swap out the data used by GetChannelInfo().
1431   {
1432     MutexLock lock(&info_mu_);
1433     info_lb_policy_name_ = std::move(lb_policy_name);
1434     info_service_config_json_ = std::move(service_config_json);
1435   }
1436   // Save config selector.
1437   saved_config_selector_ = std::move(config_selector);
1438   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1439     gpr_log(GPR_INFO, "chand=%p: using ConfigSelector %p", this,
1440             saved_config_selector_.get());
1441   }
1442 }
1443 
UpdateServiceConfigInDataPlaneLocked()1444 void ClientChannel::UpdateServiceConfigInDataPlaneLocked() {
1445   // Grab ref to service config.
1446   RefCountedPtr<ServiceConfig> service_config = saved_service_config_;
1447   // Grab ref to config selector.  Use default if resolver didn't supply one.
1448   RefCountedPtr<ConfigSelector> config_selector = saved_config_selector_;
1449   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1450     gpr_log(GPR_INFO, "chand=%p: switching to ConfigSelector %p", this,
1451             saved_config_selector_.get());
1452   }
1453   if (config_selector == nullptr) {
1454     config_selector =
1455         MakeRefCounted<DefaultConfigSelector>(saved_service_config_);
1456   }
1457   ChannelArgs new_args =
1458       channel_args_.SetObject(this).SetObject(service_config);
1459   bool enable_retries =
1460       !new_args.WantMinimalStack() &&
1461       new_args.GetBool(GRPC_ARG_ENABLE_RETRIES).value_or(true);
1462   // Construct dynamic filter stack.
1463   std::vector<const grpc_channel_filter*> filters =
1464       config_selector->GetFilters();
1465   if (enable_retries) {
1466     filters.push_back(&kRetryFilterVtable);
1467   } else {
1468     filters.push_back(&DynamicTerminationFilter::kFilterVtable);
1469   }
1470   RefCountedPtr<DynamicFilters> dynamic_filters =
1471       DynamicFilters::Create(new_args, std::move(filters));
1472   GPR_ASSERT(dynamic_filters != nullptr);
1473   // Grab data plane lock to update service config.
1474   //
1475   // We defer unreffing the old values (and deallocating memory) until
1476   // after releasing the lock to keep the critical section small.
1477   {
1478     MutexLock lock(&resolution_mu_);
1479     resolver_transient_failure_error_ = absl::OkStatus();
1480     // Update service config.
1481     received_service_config_data_ = true;
1482     // Old values will be unreffed after lock is released.
1483     service_config_.swap(service_config);
1484     config_selector_.swap(config_selector);
1485     dynamic_filters_.swap(dynamic_filters);
1486     // Re-process queued calls asynchronously.
1487     ReprocessQueuedResolverCalls();
1488   }
1489   // Old values will be unreffed after lock is released when they go out
1490   // of scope.
1491 }
1492 
CreateResolverLocked()1493 void ClientChannel::CreateResolverLocked() {
1494   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1495     gpr_log(GPR_INFO, "chand=%p: starting name resolution for %s", this,
1496             uri_to_resolve_.c_str());
1497   }
1498   resolver_ = CoreConfiguration::Get().resolver_registry().CreateResolver(
1499       uri_to_resolve_, channel_args_, interested_parties_, work_serializer_,
1500       std::make_unique<ResolverResultHandler>(this));
1501   // Since the validity of the args was checked when the channel was created,
1502   // CreateResolver() must return a non-null result.
1503   GPR_ASSERT(resolver_ != nullptr);
1504   UpdateStateLocked(GRPC_CHANNEL_CONNECTING, absl::Status(),
1505                     "started resolving");
1506   resolver_->StartLocked();
1507   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1508     gpr_log(GPR_INFO, "chand=%p: created resolver=%p", this, resolver_.get());
1509   }
1510 }
1511 
DestroyResolverAndLbPolicyLocked()1512 void ClientChannel::DestroyResolverAndLbPolicyLocked() {
1513   if (resolver_ != nullptr) {
1514     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1515       gpr_log(GPR_INFO, "chand=%p: shutting down resolver=%p", this,
1516               resolver_.get());
1517     }
1518     resolver_.reset();
1519     // Clear resolution state.
1520     saved_service_config_.reset();
1521     saved_config_selector_.reset();
1522     // Acquire resolution lock to update config selector and associated state.
1523     // To minimize lock contention, we wait to unref these objects until
1524     // after we release the lock.
1525     RefCountedPtr<ServiceConfig> service_config_to_unref;
1526     RefCountedPtr<ConfigSelector> config_selector_to_unref;
1527     RefCountedPtr<DynamicFilters> dynamic_filters_to_unref;
1528     {
1529       MutexLock lock(&resolution_mu_);
1530       received_service_config_data_ = false;
1531       service_config_to_unref = std::move(service_config_);
1532       config_selector_to_unref = std::move(config_selector_);
1533       dynamic_filters_to_unref = std::move(dynamic_filters_);
1534     }
1535     // Clear LB policy if set.
1536     if (lb_policy_ != nullptr) {
1537       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1538         gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", this,
1539                 lb_policy_.get());
1540       }
1541       grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(),
1542                                        interested_parties_);
1543       lb_policy_.reset();
1544     }
1545   }
1546 }
1547 
UpdateStateLocked(grpc_connectivity_state state,const absl::Status & status,const char * reason)1548 void ClientChannel::UpdateStateLocked(grpc_connectivity_state state,
1549                                       const absl::Status& status,
1550                                       const char* reason) {
1551   state_tracker_.SetState(state, status, reason);
1552   if (channelz_node_ != nullptr) {
1553     channelz_node_->SetConnectivityState(state);
1554     channelz_node_->AddTraceEvent(
1555         channelz::ChannelTrace::Severity::Info,
1556         grpc_slice_from_static_string(
1557             channelz::ChannelNode::GetChannelConnectivityStateChangeString(
1558                 state)));
1559   }
1560 }
1561 
UpdateStateAndPickerLocked(grpc_connectivity_state state,const absl::Status & status,const char * reason,RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker)1562 void ClientChannel::UpdateStateAndPickerLocked(
1563     grpc_connectivity_state state, const absl::Status& status,
1564     const char* reason,
1565     RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) {
1566   UpdateStateLocked(state, status, reason);
1567   // Grab the LB lock to update the picker and trigger reprocessing of the
1568   // queued picks.
1569   // Old picker will be unreffed after releasing the lock.
1570   MutexLock lock(&lb_mu_);
1571   picker_.swap(picker);
1572   // Reprocess queued picks.
1573   for (LoadBalancedCall* call : lb_queued_calls_) {
1574     call->RemoveCallFromLbQueuedCallsLocked();
1575     call->RetryPickLocked();
1576   }
1577   lb_queued_calls_.clear();
1578 }
1579 
1580 namespace {
1581 
1582 // TODO(roth): Remove this in favor of the gprpp Match() function once
1583 // we can do that without breaking lock annotations.
1584 template <typename T>
HandlePickResult(LoadBalancingPolicy::PickResult * result,std::function<T (LoadBalancingPolicy::PickResult::Complete *)> complete_func,std::function<T (LoadBalancingPolicy::PickResult::Queue *)> queue_func,std::function<T (LoadBalancingPolicy::PickResult::Fail *)> fail_func,std::function<T (LoadBalancingPolicy::PickResult::Drop *)> drop_func)1585 T HandlePickResult(
1586     LoadBalancingPolicy::PickResult* result,
1587     std::function<T(LoadBalancingPolicy::PickResult::Complete*)> complete_func,
1588     std::function<T(LoadBalancingPolicy::PickResult::Queue*)> queue_func,
1589     std::function<T(LoadBalancingPolicy::PickResult::Fail*)> fail_func,
1590     std::function<T(LoadBalancingPolicy::PickResult::Drop*)> drop_func) {
1591   auto* complete_pick =
1592       absl::get_if<LoadBalancingPolicy::PickResult::Complete>(&result->result);
1593   if (complete_pick != nullptr) {
1594     return complete_func(complete_pick);
1595   }
1596   auto* queue_pick =
1597       absl::get_if<LoadBalancingPolicy::PickResult::Queue>(&result->result);
1598   if (queue_pick != nullptr) {
1599     return queue_func(queue_pick);
1600   }
1601   auto* fail_pick =
1602       absl::get_if<LoadBalancingPolicy::PickResult::Fail>(&result->result);
1603   if (fail_pick != nullptr) {
1604     return fail_func(fail_pick);
1605   }
1606   auto* drop_pick =
1607       absl::get_if<LoadBalancingPolicy::PickResult::Drop>(&result->result);
1608   GPR_ASSERT(drop_pick != nullptr);
1609   return drop_func(drop_pick);
1610 }
1611 
1612 }  // namespace
1613 
DoPingLocked(grpc_transport_op * op)1614 grpc_error_handle ClientChannel::DoPingLocked(grpc_transport_op* op) {
1615   if (state_tracker_.state() != GRPC_CHANNEL_READY) {
1616     return GRPC_ERROR_CREATE("channel not connected");
1617   }
1618   LoadBalancingPolicy::PickResult result;
1619   {
1620     MutexLock lock(&lb_mu_);
1621     result = picker_->Pick(LoadBalancingPolicy::PickArgs());
1622   }
1623   return HandlePickResult<grpc_error_handle>(
1624       &result,
1625       // Complete pick.
1626       [op](LoadBalancingPolicy::PickResult::Complete* complete_pick)
1627           ABSL_EXCLUSIVE_LOCKS_REQUIRED(*ClientChannel::work_serializer_) {
1628             SubchannelWrapper* subchannel = static_cast<SubchannelWrapper*>(
1629                 complete_pick->subchannel.get());
1630             RefCountedPtr<ConnectedSubchannel> connected_subchannel =
1631                 subchannel->connected_subchannel();
1632             if (connected_subchannel == nullptr) {
1633               return GRPC_ERROR_CREATE("LB pick for ping not connected");
1634             }
1635             connected_subchannel->Ping(op->send_ping.on_initiate,
1636                                        op->send_ping.on_ack);
1637             return absl::OkStatus();
1638           },
1639       // Queue pick.
1640       [](LoadBalancingPolicy::PickResult::Queue* /*queue_pick*/) {
1641         return GRPC_ERROR_CREATE("LB picker queued call");
1642       },
1643       // Fail pick.
1644       [](LoadBalancingPolicy::PickResult::Fail* fail_pick) {
1645         return absl_status_to_grpc_error(fail_pick->status);
1646       },
1647       // Drop pick.
1648       [](LoadBalancingPolicy::PickResult::Drop* drop_pick) {
1649         return absl_status_to_grpc_error(drop_pick->status);
1650       });
1651 }
1652 
StartTransportOpLocked(grpc_transport_op * op)1653 void ClientChannel::StartTransportOpLocked(grpc_transport_op* op) {
1654   // Connectivity watch.
1655   if (op->start_connectivity_watch != nullptr) {
1656     state_tracker_.AddWatcher(op->start_connectivity_watch_state,
1657                               std::move(op->start_connectivity_watch));
1658   }
1659   if (op->stop_connectivity_watch != nullptr) {
1660     state_tracker_.RemoveWatcher(op->stop_connectivity_watch);
1661   }
1662   // Ping.
1663   if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
1664     grpc_error_handle error = DoPingLocked(op);
1665     if (!error.ok()) {
1666       ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_initiate, error);
1667       ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_ack, error);
1668     }
1669     op->bind_pollset = nullptr;
1670     op->send_ping.on_initiate = nullptr;
1671     op->send_ping.on_ack = nullptr;
1672   }
1673   // Reset backoff.
1674   if (op->reset_connect_backoff) {
1675     if (lb_policy_ != nullptr) {
1676       lb_policy_->ResetBackoffLocked();
1677     }
1678   }
1679   // Disconnect or enter IDLE.
1680   if (!op->disconnect_with_error.ok()) {
1681     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1682       gpr_log(GPR_INFO, "chand=%p: disconnect_with_error: %s", this,
1683               StatusToString(op->disconnect_with_error).c_str());
1684     }
1685     DestroyResolverAndLbPolicyLocked();
1686     intptr_t value;
1687     if (grpc_error_get_int(op->disconnect_with_error,
1688                            StatusIntProperty::ChannelConnectivityState,
1689                            &value) &&
1690         static_cast<grpc_connectivity_state>(value) == GRPC_CHANNEL_IDLE) {
1691       if (disconnect_error_.ok()) {  // Ignore if we're shutting down.
1692         // Enter IDLE state.
1693         UpdateStateAndPickerLocked(GRPC_CHANNEL_IDLE, absl::Status(),
1694                                    "channel entering IDLE", nullptr);
1695         // TODO(roth): Do we need to check for any queued picks here, in
1696         // case there's a race condition in the client_idle filter?
1697         // And maybe also check for calls in the resolver queue?
1698       }
1699     } else {
1700       // Disconnect.
1701       GPR_ASSERT(disconnect_error_.ok());
1702       disconnect_error_ = op->disconnect_with_error;
1703       UpdateStateAndPickerLocked(
1704           GRPC_CHANNEL_SHUTDOWN, absl::Status(), "shutdown from API",
1705           MakeRefCounted<LoadBalancingPolicy::TransientFailurePicker>(
1706               grpc_error_to_absl_status(op->disconnect_with_error)));
1707       // TODO(roth): If this happens when we're still waiting for a
1708       // resolver result, we need to trigger failures for all calls in
1709       // the resolver queue here.
1710     }
1711   }
1712   GRPC_CHANNEL_STACK_UNREF(owning_stack_, "start_transport_op");
1713   ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, absl::OkStatus());
1714 }
1715 
StartTransportOp(grpc_channel_element * elem,grpc_transport_op * op)1716 void ClientChannel::StartTransportOp(grpc_channel_element* elem,
1717                                      grpc_transport_op* op) {
1718   ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
1719   GPR_ASSERT(op->set_accept_stream == false);
1720   // Handle bind_pollset.
1721   if (op->bind_pollset != nullptr) {
1722     grpc_pollset_set_add_pollset(chand->interested_parties_, op->bind_pollset);
1723   }
1724   // Pop into control plane work_serializer for remaining ops.
1725   GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "start_transport_op");
1726   chand->work_serializer_->Run(
1727       [chand, op]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand->work_serializer_) {
1728         chand->StartTransportOpLocked(op);
1729       },
1730       DEBUG_LOCATION);
1731 }
1732 
GetChannelInfo(grpc_channel_element * elem,const grpc_channel_info * info)1733 void ClientChannel::GetChannelInfo(grpc_channel_element* elem,
1734                                    const grpc_channel_info* info) {
1735   ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
1736   MutexLock lock(&chand->info_mu_);
1737   if (info->lb_policy_name != nullptr) {
1738     *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name_.c_str());
1739   }
1740   if (info->service_config_json != nullptr) {
1741     *info->service_config_json =
1742         gpr_strdup(chand->info_service_config_json_.c_str());
1743   }
1744 }
1745 
TryToConnectLocked()1746 void ClientChannel::TryToConnectLocked() {
1747   if (lb_policy_ != nullptr) {
1748     lb_policy_->ExitIdleLocked();
1749   } else if (resolver_ == nullptr) {
1750     CreateResolverLocked();
1751   }
1752   GRPC_CHANNEL_STACK_UNREF(owning_stack_, "TryToConnect");
1753 }
1754 
CheckConnectivityState(bool try_to_connect)1755 grpc_connectivity_state ClientChannel::CheckConnectivityState(
1756     bool try_to_connect) {
1757   // state_tracker_ is guarded by work_serializer_, which we're not
1758   // holding here.  But the one method of state_tracker_ that *is*
1759   // thread-safe to call without external synchronization is the state()
1760   // method, so we can disable thread-safety analysis for this one read.
1761   grpc_connectivity_state out = ABSL_TS_UNCHECKED_READ(state_tracker_).state();
1762   if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
1763     GRPC_CHANNEL_STACK_REF(owning_stack_, "TryToConnect");
1764     work_serializer_->Run([this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(
1765                               *work_serializer_) { TryToConnectLocked(); },
1766                           DEBUG_LOCATION);
1767   }
1768   return out;
1769 }
1770 
AddConnectivityWatcher(grpc_connectivity_state initial_state,OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)1771 void ClientChannel::AddConnectivityWatcher(
1772     grpc_connectivity_state initial_state,
1773     OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher) {
1774   new ConnectivityWatcherAdder(this, initial_state, std::move(watcher));
1775 }
1776 
RemoveConnectivityWatcher(AsyncConnectivityStateWatcherInterface * watcher)1777 void ClientChannel::RemoveConnectivityWatcher(
1778     AsyncConnectivityStateWatcherInterface* watcher) {
1779   new ConnectivityWatcherRemover(this, watcher);
1780 }
1781 
1782 //
1783 // CallData implementation
1784 //
1785 
RemoveCallFromResolverQueuedCallsLocked()1786 void ClientChannel::CallData::RemoveCallFromResolverQueuedCallsLocked() {
1787   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1788     gpr_log(GPR_INFO,
1789             "chand=%p calld=%p: removing from resolver queued picks list",
1790             chand(), this);
1791   }
1792   // Remove call's pollent from channel's interested_parties.
1793   grpc_polling_entity_del_from_pollset_set(pollent(),
1794                                            chand()->interested_parties_);
1795   // Note: There's no need to actually remove the call from the queue
1796   // here, because that will be done in
1797   // ResolverQueuedCallCanceller::CancelLocked() or
1798   // ClientChannel::ReprocessQueuedResolverCalls().
1799 }
1800 
AddCallToResolverQueuedCallsLocked()1801 void ClientChannel::CallData::AddCallToResolverQueuedCallsLocked() {
1802   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1803     gpr_log(GPR_INFO, "chand=%p calld=%p: adding to resolver queued picks list",
1804             chand(), this);
1805   }
1806   // Add call's pollent to channel's interested_parties, so that I/O
1807   // can be done under the call's CQ.
1808   grpc_polling_entity_add_to_pollset_set(pollent(),
1809                                          chand()->interested_parties_);
1810   // Add to queue.
1811   chand()->resolver_queued_calls_.insert(this);
1812   OnAddToQueueLocked();
1813 }
1814 
ApplyServiceConfigToCallLocked(const absl::StatusOr<RefCountedPtr<ConfigSelector>> & config_selector)1815 grpc_error_handle ClientChannel::CallData::ApplyServiceConfigToCallLocked(
1816     const absl::StatusOr<RefCountedPtr<ConfigSelector>>& config_selector) {
1817   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1818     gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
1819             chand(), this);
1820   }
1821   if (!config_selector.ok()) return config_selector.status();
1822   // Create a ClientChannelServiceConfigCallData for the call.  This stores
1823   // a ref to the ServiceConfig and caches the right set of parsed configs
1824   // to use for the call.  The ClientChannelServiceConfigCallData will store
1825   // itself in the call context, so that it can be accessed by filters
1826   // below us in the stack, and it will be cleaned up when the call ends.
1827   auto* service_config_call_data =
1828       arena()->New<ClientChannelServiceConfigCallData>(arena(), call_context());
1829   // Use the ConfigSelector to determine the config for the call.
1830   absl::Status call_config_status =
1831       (*config_selector)
1832           ->GetCallConfig(
1833               {send_initial_metadata(), arena(), service_config_call_data});
1834   if (!call_config_status.ok()) {
1835     return absl_status_to_grpc_error(
1836         MaybeRewriteIllegalStatusCode(call_config_status, "ConfigSelector"));
1837   }
1838   // Apply our own method params to the call.
1839   auto* method_params = static_cast<ClientChannelMethodParsedConfig*>(
1840       service_config_call_data->GetMethodParsedConfig(
1841           chand()->service_config_parser_index_));
1842   if (method_params != nullptr) {
1843     // If the deadline from the service config is shorter than the one
1844     // from the client API, reset the deadline timer.
1845     if (chand()->deadline_checking_enabled_ &&
1846         method_params->timeout() != Duration::Zero()) {
1847       ResetDeadline(method_params->timeout());
1848     }
1849     // If the service config set wait_for_ready and the application
1850     // did not explicitly set it, use the value from the service config.
1851     auto* wait_for_ready =
1852         send_initial_metadata()->GetOrCreatePointer(WaitForReady());
1853     if (method_params->wait_for_ready().has_value() &&
1854         !wait_for_ready->explicitly_set) {
1855       wait_for_ready->value = method_params->wait_for_ready().value();
1856     }
1857   }
1858   return absl::OkStatus();
1859 }
1860 
CheckResolution(bool was_queued)1861 absl::optional<absl::Status> ClientChannel::CallData::CheckResolution(
1862     bool was_queued) {
1863   // Check if we have a resolver result to use.
1864   absl::StatusOr<RefCountedPtr<ConfigSelector>> config_selector;
1865   {
1866     MutexLock lock(&chand()->resolution_mu_);
1867     bool result_ready = CheckResolutionLocked(&config_selector);
1868     // If no result is available, queue the call.
1869     if (!result_ready) {
1870       AddCallToResolverQueuedCallsLocked();
1871       return absl::nullopt;
1872     }
1873   }
1874   // We have a result.  Apply service config to call.
1875   grpc_error_handle error = ApplyServiceConfigToCallLocked(config_selector);
1876   // ConfigSelector must be unreffed inside the WorkSerializer.
1877   if (config_selector.ok()) {
1878     chand()->work_serializer_->Run(
1879         [config_selector = std::move(*config_selector)]() mutable {
1880           config_selector.reset();
1881         },
1882         DEBUG_LOCATION);
1883   }
1884   // Handle errors.
1885   if (!error.ok()) {
1886     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1887       gpr_log(GPR_INFO,
1888               "chand=%p calld=%p: error applying config to call: error=%s",
1889               chand(), this, StatusToString(error).c_str());
1890     }
1891     return error;
1892   }
1893   // If the call was queued, add trace annotation.
1894   if (was_queued) {
1895     auto* call_tracer = static_cast<CallTracerAnnotationInterface*>(
1896         call_context()[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value);
1897     if (call_tracer != nullptr) {
1898       call_tracer->RecordAnnotation("Delayed name resolution complete.");
1899     }
1900   }
1901   return absl::OkStatus();
1902 }
1903 
CheckResolutionLocked(absl::StatusOr<RefCountedPtr<ConfigSelector>> * config_selector)1904 bool ClientChannel::CallData::CheckResolutionLocked(
1905     absl::StatusOr<RefCountedPtr<ConfigSelector>>* config_selector) {
1906   // If we don't yet have a resolver result, we need to queue the call
1907   // until we get one.
1908   if (GPR_UNLIKELY(!chand()->received_service_config_data_)) {
1909     // If the resolver returned transient failure before returning the
1910     // first service config, fail any non-wait_for_ready calls.
1911     absl::Status resolver_error = chand()->resolver_transient_failure_error_;
1912     if (!resolver_error.ok() &&
1913         !send_initial_metadata()->GetOrCreatePointer(WaitForReady())->value) {
1914       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1915         gpr_log(GPR_INFO, "chand=%p calld=%p: resolution failed, failing call",
1916                 chand(), this);
1917       }
1918       *config_selector = absl_status_to_grpc_error(resolver_error);
1919       return true;
1920     }
1921     // Either the resolver has not yet returned a result, or it has
1922     // returned transient failure but the call is wait_for_ready.  In
1923     // either case, queue the call.
1924     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1925       gpr_log(GPR_INFO, "chand=%p calld=%p: no resolver result yet", chand(),
1926               this);
1927     }
1928     return false;
1929   }
1930   // Result found.
1931   *config_selector = chand()->config_selector_;
1932   dynamic_filters_ = chand()->dynamic_filters_;
1933   return true;
1934 }
1935 
1936 //
1937 // FilterBasedCallData implementation
1938 //
1939 
FilterBasedCallData(grpc_call_element * elem,const grpc_call_element_args & args)1940 ClientChannel::FilterBasedCallData::FilterBasedCallData(
1941     grpc_call_element* elem, const grpc_call_element_args& args)
1942     : path_(CSliceRef(args.path)),
1943       call_context_(args.context),
1944       call_start_time_(args.start_time),
1945       deadline_(args.deadline),
1946       deadline_state_(elem, args,
1947                       GPR_LIKELY(static_cast<ClientChannel*>(elem->channel_data)
1948                                      ->deadline_checking_enabled_)
1949                           ? args.deadline
1950                           : Timestamp::InfFuture()) {
1951   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1952     gpr_log(GPR_INFO, "chand=%p calld=%p: created call", chand(), this);
1953   }
1954 }
1955 
~FilterBasedCallData()1956 ClientChannel::FilterBasedCallData::~FilterBasedCallData() {
1957   CSliceUnref(path_);
1958   // Make sure there are no remaining pending batches.
1959   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
1960     GPR_ASSERT(pending_batches_[i] == nullptr);
1961   }
1962 }
1963 
Init(grpc_call_element * elem,const grpc_call_element_args * args)1964 grpc_error_handle ClientChannel::FilterBasedCallData::Init(
1965     grpc_call_element* elem, const grpc_call_element_args* args) {
1966   new (elem->call_data) FilterBasedCallData(elem, *args);
1967   return absl::OkStatus();
1968 }
1969 
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure * then_schedule_closure)1970 void ClientChannel::FilterBasedCallData::Destroy(
1971     grpc_call_element* elem, const grpc_call_final_info* /*final_info*/,
1972     grpc_closure* then_schedule_closure) {
1973   auto* calld = static_cast<FilterBasedCallData*>(elem->call_data);
1974   RefCountedPtr<DynamicFilters::Call> dynamic_call =
1975       std::move(calld->dynamic_call_);
1976   calld->~FilterBasedCallData();
1977   if (GPR_LIKELY(dynamic_call != nullptr)) {
1978     dynamic_call->SetAfterCallStackDestroy(then_schedule_closure);
1979   } else {
1980     // TODO(yashkt) : This can potentially be a Closure::Run
1981     ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, absl::OkStatus());
1982   }
1983 }
1984 
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)1985 void ClientChannel::FilterBasedCallData::StartTransportStreamOpBatch(
1986     grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
1987   auto* calld = static_cast<FilterBasedCallData*>(elem->call_data);
1988   ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
1989   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace) &&
1990       !GRPC_TRACE_FLAG_ENABLED(grpc_trace_channel)) {
1991     gpr_log(GPR_INFO, "chand=%p calld=%p: batch started from above: %s", chand,
1992             calld, grpc_transport_stream_op_batch_string(batch, false).c_str());
1993   }
1994   if (GPR_LIKELY(chand->deadline_checking_enabled_)) {
1995     grpc_deadline_state_client_start_transport_stream_op_batch(
1996         &calld->deadline_state_, batch);
1997   }
1998   // Intercept recv_trailing_metadata to commit the call, in case we wind up
1999   // failing the call before we get down to the retry or LB call layer.
2000   if (batch->recv_trailing_metadata) {
2001     calld->original_recv_trailing_metadata_ready_ =
2002         batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
2003     GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_,
2004                       RecvTrailingMetadataReadyForConfigSelectorCommitCallback,
2005                       calld, nullptr);
2006     batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
2007         &calld->recv_trailing_metadata_ready_;
2008   }
2009   // If we already have a dynamic call, pass the batch down to it.
2010   // Note that once we have done so, we do not need to acquire the channel's
2011   // resolution mutex, which is more efficient (especially for streaming calls).
2012   if (calld->dynamic_call_ != nullptr) {
2013     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2014       gpr_log(GPR_INFO, "chand=%p calld=%p: starting batch on dynamic_call=%p",
2015               chand, calld, calld->dynamic_call_.get());
2016     }
2017     calld->dynamic_call_->StartTransportStreamOpBatch(batch);
2018     return;
2019   }
2020   // We do not yet have a dynamic call.
2021   //
2022   // If we've previously been cancelled, immediately fail any new batches.
2023   if (GPR_UNLIKELY(!calld->cancel_error_.ok())) {
2024     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2025       gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s",
2026               chand, calld, StatusToString(calld->cancel_error_).c_str());
2027     }
2028     // Note: This will release the call combiner.
2029     grpc_transport_stream_op_batch_finish_with_failure(
2030         batch, calld->cancel_error_, calld->call_combiner());
2031     return;
2032   }
2033   // Handle cancellation.
2034   if (GPR_UNLIKELY(batch->cancel_stream)) {
2035     // Stash a copy of cancel_error in our call data, so that we can use
2036     // it for subsequent operations.  This ensures that if the call is
2037     // cancelled before any batches are passed down (e.g., if the deadline
2038     // is in the past when the call starts), we can return the right
2039     // error to the caller when the first batch does get passed down.
2040     calld->cancel_error_ = batch->payload->cancel_stream.cancel_error;
2041     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2042       gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand,
2043               calld, StatusToString(calld->cancel_error_).c_str());
2044     }
2045     // Fail all pending batches.
2046     calld->PendingBatchesFail(calld->cancel_error_, NoYieldCallCombiner);
2047     // Note: This will release the call combiner.
2048     grpc_transport_stream_op_batch_finish_with_failure(
2049         batch, calld->cancel_error_, calld->call_combiner());
2050     return;
2051   }
2052   // Add the batch to the pending list.
2053   calld->PendingBatchesAdd(batch);
2054   // For batches containing a send_initial_metadata op, acquire the
2055   // channel's resolution mutex to apply the service config to the call,
2056   // after which we will create a dynamic call.
2057   if (GPR_LIKELY(batch->send_initial_metadata)) {
2058     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2059       gpr_log(GPR_INFO,
2060               "chand=%p calld=%p: grabbing resolution mutex to apply service "
2061               "config",
2062               chand, calld);
2063     }
2064     // If we're still in IDLE, we need to start resolving.
2065     if (GPR_UNLIKELY(chand->CheckConnectivityState(false) ==
2066                      GRPC_CHANNEL_IDLE)) {
2067       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2068         gpr_log(GPR_INFO, "chand=%p calld=%p: triggering exit idle", chand,
2069                 calld);
2070       }
2071       // Bounce into the control plane work serializer to start resolving.
2072       GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "ExitIdle");
2073       chand->work_serializer_->Run(
2074           [chand]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand->work_serializer_) {
2075             chand->CheckConnectivityState(/*try_to_connect=*/true);
2076             GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "ExitIdle");
2077           },
2078           DEBUG_LOCATION);
2079     }
2080     calld->TryCheckResolution(/*was_queued=*/false);
2081   } else {
2082     // For all other batches, release the call combiner.
2083     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2084       gpr_log(GPR_INFO,
2085               "chand=%p calld=%p: saved batch, yielding call combiner", chand,
2086               calld);
2087     }
2088     GRPC_CALL_COMBINER_STOP(calld->call_combiner(),
2089                             "batch does not include send_initial_metadata");
2090   }
2091 }
2092 
SetPollent(grpc_call_element * elem,grpc_polling_entity * pollent)2093 void ClientChannel::FilterBasedCallData::SetPollent(
2094     grpc_call_element* elem, grpc_polling_entity* pollent) {
2095   auto* calld = static_cast<FilterBasedCallData*>(elem->call_data);
2096   calld->pollent_ = pollent;
2097 }
2098 
GetBatchIndex(grpc_transport_stream_op_batch * batch)2099 size_t ClientChannel::FilterBasedCallData::GetBatchIndex(
2100     grpc_transport_stream_op_batch* batch) {
2101   // Note: It is important the send_initial_metadata be the first entry
2102   // here, since the code in CheckResolution() assumes it will be.
2103   if (batch->send_initial_metadata) return 0;
2104   if (batch->send_message) return 1;
2105   if (batch->send_trailing_metadata) return 2;
2106   if (batch->recv_initial_metadata) return 3;
2107   if (batch->recv_message) return 4;
2108   if (batch->recv_trailing_metadata) return 5;
2109   GPR_UNREACHABLE_CODE(return (size_t)-1);
2110 }
2111 
2112 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesAdd(grpc_transport_stream_op_batch * batch)2113 void ClientChannel::FilterBasedCallData::PendingBatchesAdd(
2114     grpc_transport_stream_op_batch* batch) {
2115   const size_t idx = GetBatchIndex(batch);
2116   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2117     gpr_log(GPR_INFO,
2118             "chand=%p calld=%p: adding pending batch at index %" PRIuPTR,
2119             chand(), this, idx);
2120   }
2121   grpc_transport_stream_op_batch*& pending = pending_batches_[idx];
2122   GPR_ASSERT(pending == nullptr);
2123   pending = batch;
2124 }
2125 
2126 // This is called via the call combiner, so access to calld is synchronized.
FailPendingBatchInCallCombiner(void * arg,grpc_error_handle error)2127 void ClientChannel::FilterBasedCallData::FailPendingBatchInCallCombiner(
2128     void* arg, grpc_error_handle error) {
2129   grpc_transport_stream_op_batch* batch =
2130       static_cast<grpc_transport_stream_op_batch*>(arg);
2131   auto* calld =
2132       static_cast<FilterBasedCallData*>(batch->handler_private.extra_arg);
2133   // Note: This will release the call combiner.
2134   grpc_transport_stream_op_batch_finish_with_failure(batch, error,
2135                                                      calld->call_combiner());
2136 }
2137 
2138 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesFail(grpc_error_handle error,YieldCallCombinerPredicate yield_call_combiner_predicate)2139 void ClientChannel::FilterBasedCallData::PendingBatchesFail(
2140     grpc_error_handle error,
2141     YieldCallCombinerPredicate yield_call_combiner_predicate) {
2142   GPR_ASSERT(!error.ok());
2143   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2144     size_t num_batches = 0;
2145     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2146       if (pending_batches_[i] != nullptr) ++num_batches;
2147     }
2148     gpr_log(GPR_INFO,
2149             "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
2150             chand(), this, num_batches, StatusToString(error).c_str());
2151   }
2152   CallCombinerClosureList closures;
2153   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2154     grpc_transport_stream_op_batch*& batch = pending_batches_[i];
2155     if (batch != nullptr) {
2156       batch->handler_private.extra_arg = this;
2157       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2158                         FailPendingBatchInCallCombiner, batch,
2159                         grpc_schedule_on_exec_ctx);
2160       closures.Add(&batch->handler_private.closure, error,
2161                    "PendingBatchesFail");
2162       batch = nullptr;
2163     }
2164   }
2165   if (yield_call_combiner_predicate(closures)) {
2166     closures.RunClosures(call_combiner());
2167   } else {
2168     closures.RunClosuresWithoutYielding(call_combiner());
2169   }
2170 }
2171 
2172 // This is called via the call combiner, so access to calld is synchronized.
ResumePendingBatchInCallCombiner(void * arg,grpc_error_handle)2173 void ClientChannel::FilterBasedCallData::ResumePendingBatchInCallCombiner(
2174     void* arg, grpc_error_handle /*ignored*/) {
2175   grpc_transport_stream_op_batch* batch =
2176       static_cast<grpc_transport_stream_op_batch*>(arg);
2177   auto* calld =
2178       static_cast<FilterBasedCallData*>(batch->handler_private.extra_arg);
2179   // Note: This will release the call combiner.
2180   calld->dynamic_call_->StartTransportStreamOpBatch(batch);
2181 }
2182 
2183 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesResume()2184 void ClientChannel::FilterBasedCallData::PendingBatchesResume() {
2185   // Retries not enabled; send down batches as-is.
2186   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2187     size_t num_batches = 0;
2188     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2189       if (pending_batches_[i] != nullptr) ++num_batches;
2190     }
2191     gpr_log(GPR_INFO,
2192             "chand=%p calld=%p: starting %" PRIuPTR
2193             " pending batches on dynamic_call=%p",
2194             chand(), this, num_batches, dynamic_call_.get());
2195   }
2196   CallCombinerClosureList closures;
2197   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2198     grpc_transport_stream_op_batch*& batch = pending_batches_[i];
2199     if (batch != nullptr) {
2200       batch->handler_private.extra_arg = this;
2201       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2202                         ResumePendingBatchInCallCombiner, batch, nullptr);
2203       closures.Add(&batch->handler_private.closure, absl::OkStatus(),
2204                    "resuming pending batch from client channel call");
2205       batch = nullptr;
2206     }
2207   }
2208   // Note: This will release the call combiner.
2209   closures.RunClosures(call_combiner());
2210 }
2211 
2212 // A class to handle the call combiner cancellation callback for a
2213 // queued pick.
2214 class ClientChannel::FilterBasedCallData::ResolverQueuedCallCanceller {
2215  public:
ResolverQueuedCallCanceller(FilterBasedCallData * calld)2216   explicit ResolverQueuedCallCanceller(FilterBasedCallData* calld)
2217       : calld_(calld) {
2218     GRPC_CALL_STACK_REF(calld->owning_call(), "ResolverQueuedCallCanceller");
2219     GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this,
2220                       grpc_schedule_on_exec_ctx);
2221     calld->call_combiner()->SetNotifyOnCancel(&closure_);
2222   }
2223 
2224  private:
CancelLocked(void * arg,grpc_error_handle error)2225   static void CancelLocked(void* arg, grpc_error_handle error) {
2226     auto* self = static_cast<ResolverQueuedCallCanceller*>(arg);
2227     auto* calld = self->calld_;
2228     auto* chand = calld->chand();
2229     {
2230       MutexLock lock(&chand->resolution_mu_);
2231       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2232         gpr_log(GPR_INFO,
2233                 "chand=%p calld=%p: cancelling resolver queued pick: "
2234                 "error=%s self=%p calld->resolver_pick_canceller=%p",
2235                 chand, calld, StatusToString(error).c_str(), self,
2236                 calld->resolver_call_canceller_);
2237       }
2238       if (calld->resolver_call_canceller_ == self && !error.ok()) {
2239         // Remove pick from list of queued picks.
2240         calld->RemoveCallFromResolverQueuedCallsLocked();
2241         chand->resolver_queued_calls_.erase(calld);
2242         // Fail pending batches on the call.
2243         calld->PendingBatchesFail(error,
2244                                   YieldCallCombinerIfPendingBatchesFound);
2245       }
2246     }
2247     GRPC_CALL_STACK_UNREF(calld->owning_call(), "ResolvingQueuedCallCanceller");
2248     delete self;
2249   }
2250 
2251   FilterBasedCallData* calld_;
2252   grpc_closure closure_;
2253 };
2254 
TryCheckResolution(bool was_queued)2255 void ClientChannel::FilterBasedCallData::TryCheckResolution(bool was_queued) {
2256   auto result = CheckResolution(was_queued);
2257   if (result.has_value()) {
2258     if (!result->ok()) {
2259       PendingBatchesFail(*result, YieldCallCombiner);
2260       return;
2261     }
2262     CreateDynamicCall();
2263   }
2264 }
2265 
OnAddToQueueLocked()2266 void ClientChannel::FilterBasedCallData::OnAddToQueueLocked() {
2267   // Register call combiner cancellation callback.
2268   resolver_call_canceller_ = new ResolverQueuedCallCanceller(this);
2269 }
2270 
RetryCheckResolutionLocked()2271 void ClientChannel::FilterBasedCallData::RetryCheckResolutionLocked() {
2272   // Lame the call combiner canceller.
2273   resolver_call_canceller_ = nullptr;
2274   // Do an async callback to resume call processing, so that we're not
2275   // doing it while holding the channel's resolution mutex.
2276   chand()->owning_stack_->EventEngine()->Run([this]() {
2277     ApplicationCallbackExecCtx application_exec_ctx;
2278     ExecCtx exec_ctx;
2279     TryCheckResolution(/*was_queued=*/true);
2280   });
2281 }
2282 
CreateDynamicCall()2283 void ClientChannel::FilterBasedCallData::CreateDynamicCall() {
2284   DynamicFilters::Call::Args args = {dynamic_filters(), pollent_,       path_,
2285                                      call_start_time_,  deadline_,      arena(),
2286                                      call_context_,     call_combiner()};
2287   grpc_error_handle error;
2288   DynamicFilters* channel_stack = args.channel_stack.get();
2289   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2290     gpr_log(
2291         GPR_INFO,
2292         "chand=%p calld=%p: creating dynamic call stack on channel_stack=%p",
2293         chand(), this, channel_stack);
2294   }
2295   dynamic_call_ = channel_stack->CreateCall(std::move(args), &error);
2296   if (!error.ok()) {
2297     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2298       gpr_log(GPR_INFO,
2299               "chand=%p calld=%p: failed to create dynamic call: error=%s",
2300               chand(), this, StatusToString(error).c_str());
2301     }
2302     PendingBatchesFail(error, YieldCallCombiner);
2303     return;
2304   }
2305   PendingBatchesResume();
2306 }
2307 
2308 void ClientChannel::FilterBasedCallData::
RecvTrailingMetadataReadyForConfigSelectorCommitCallback(void * arg,grpc_error_handle error)2309     RecvTrailingMetadataReadyForConfigSelectorCommitCallback(
2310         void* arg, grpc_error_handle error) {
2311   auto* calld = static_cast<FilterBasedCallData*>(arg);
2312   auto* chand = calld->chand();
2313   auto* service_config_call_data =
2314       static_cast<ClientChannelServiceConfigCallData*>(
2315           calld->call_context()[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
2316   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2317     gpr_log(GPR_INFO,
2318             "chand=%p calld=%p: got recv_trailing_metadata_ready: error=%s "
2319             "service_config_call_data=%p",
2320             chand, calld, StatusToString(error).c_str(),
2321             service_config_call_data);
2322   }
2323   if (service_config_call_data != nullptr) {
2324     service_config_call_data->Commit();
2325   }
2326   // Chain to original callback.
2327   Closure::Run(DEBUG_LOCATION, calld->original_recv_trailing_metadata_ready_,
2328                error);
2329 }
2330 
2331 //
2332 // ClientChannel::LoadBalancedCall::LbCallState
2333 //
2334 
2335 class ClientChannel::LoadBalancedCall::LbCallState
2336     : public ClientChannelLbCallState {
2337  public:
LbCallState(LoadBalancedCall * lb_call)2338   explicit LbCallState(LoadBalancedCall* lb_call) : lb_call_(lb_call) {}
2339 
Alloc(size_t size)2340   void* Alloc(size_t size) override { return lb_call_->arena()->Alloc(size); }
2341 
2342   // Internal API to allow first-party LB policies to access per-call
2343   // attributes set by the ConfigSelector.
2344   ServiceConfigCallData::CallAttributeInterface* GetCallAttribute(
2345       UniqueTypeName type) const override;
2346 
2347  private:
2348   LoadBalancedCall* lb_call_;
2349 };
2350 
2351 //
2352 // ClientChannel::LoadBalancedCall::Metadata
2353 //
2354 
2355 class ClientChannel::LoadBalancedCall::Metadata
2356     : public LoadBalancingPolicy::MetadataInterface {
2357  public:
Metadata(grpc_metadata_batch * batch)2358   explicit Metadata(grpc_metadata_batch* batch) : batch_(batch) {}
2359 
Add(absl::string_view key,absl::string_view value)2360   void Add(absl::string_view key, absl::string_view value) override {
2361     if (batch_ == nullptr) return;
2362     // Gross, egregious hack to support legacy grpclb behavior.
2363     // TODO(ctiller): Use a promise context for this once that plumbing is done.
2364     if (key == GrpcLbClientStatsMetadata::key()) {
2365       batch_->Set(
2366           GrpcLbClientStatsMetadata(),
2367           const_cast<GrpcLbClientStats*>(
2368               reinterpret_cast<const GrpcLbClientStats*>(value.data())));
2369       return;
2370     }
2371     batch_->Append(key, Slice::FromStaticString(value),
2372                    [key](absl::string_view error, const Slice& value) {
2373                      gpr_log(GPR_ERROR, "%s",
2374                              absl::StrCat(error, " key:", key,
2375                                           " value:", value.as_string_view())
2376                                  .c_str());
2377                    });
2378   }
2379 
TestOnlyCopyToVector()2380   std::vector<std::pair<std::string, std::string>> TestOnlyCopyToVector()
2381       override {
2382     if (batch_ == nullptr) return {};
2383     Encoder encoder;
2384     batch_->Encode(&encoder);
2385     return encoder.Take();
2386   }
2387 
Lookup(absl::string_view key,std::string * buffer) const2388   absl::optional<absl::string_view> Lookup(absl::string_view key,
2389                                            std::string* buffer) const override {
2390     if (batch_ == nullptr) return absl::nullopt;
2391     return batch_->GetStringValue(key, buffer);
2392   }
2393 
2394  private:
2395   class Encoder {
2396    public:
Encode(const Slice & key,const Slice & value)2397     void Encode(const Slice& key, const Slice& value) {
2398       out_.emplace_back(std::string(key.as_string_view()),
2399                         std::string(value.as_string_view()));
2400     }
2401 
2402     template <class Which>
Encode(Which,const typename Which::ValueType & value)2403     void Encode(Which, const typename Which::ValueType& value) {
2404       auto value_slice = Which::Encode(value);
2405       out_.emplace_back(std::string(Which::key()),
2406                         std::string(value_slice.as_string_view()));
2407     }
2408 
Encode(GrpcTimeoutMetadata,const typename GrpcTimeoutMetadata::ValueType &)2409     void Encode(GrpcTimeoutMetadata,
2410                 const typename GrpcTimeoutMetadata::ValueType&) {}
Encode(HttpPathMetadata,const Slice &)2411     void Encode(HttpPathMetadata, const Slice&) {}
Encode(HttpMethodMetadata,const typename HttpMethodMetadata::ValueType &)2412     void Encode(HttpMethodMetadata,
2413                 const typename HttpMethodMetadata::ValueType&) {}
2414 
Take()2415     std::vector<std::pair<std::string, std::string>> Take() {
2416       return std::move(out_);
2417     }
2418 
2419    private:
2420     std::vector<std::pair<std::string, std::string>> out_;
2421   };
2422 
2423   grpc_metadata_batch* batch_;
2424 };
2425 
2426 //
2427 // ClientChannel::LoadBalancedCall::LbCallState
2428 //
2429 
2430 ServiceConfigCallData::CallAttributeInterface*
GetCallAttribute(UniqueTypeName type) const2431 ClientChannel::LoadBalancedCall::LbCallState::GetCallAttribute(
2432     UniqueTypeName type) const {
2433   auto* service_config_call_data = static_cast<ServiceConfigCallData*>(
2434       lb_call_->call_context()[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
2435   return service_config_call_data->GetCallAttribute(type);
2436 }
2437 
2438 //
2439 // ClientChannel::LoadBalancedCall::BackendMetricAccessor
2440 //
2441 
2442 class ClientChannel::LoadBalancedCall::BackendMetricAccessor
2443     : public LoadBalancingPolicy::BackendMetricAccessor {
2444  public:
BackendMetricAccessor(LoadBalancedCall * lb_call,grpc_metadata_batch * recv_trailing_metadata)2445   BackendMetricAccessor(LoadBalancedCall* lb_call,
2446                         grpc_metadata_batch* recv_trailing_metadata)
2447       : lb_call_(lb_call), recv_trailing_metadata_(recv_trailing_metadata) {}
2448 
GetBackendMetricData()2449   const BackendMetricData* GetBackendMetricData() override {
2450     if (lb_call_->backend_metric_data_ == nullptr &&
2451         recv_trailing_metadata_ != nullptr) {
2452       if (const auto* md = recv_trailing_metadata_->get_pointer(
2453               EndpointLoadMetricsBinMetadata())) {
2454         BackendMetricAllocator allocator(lb_call_->arena());
2455         lb_call_->backend_metric_data_ =
2456             ParseBackendMetricData(md->as_string_view(), &allocator);
2457       }
2458     }
2459     return lb_call_->backend_metric_data_;
2460   }
2461 
2462  private:
2463   class BackendMetricAllocator : public BackendMetricAllocatorInterface {
2464    public:
BackendMetricAllocator(Arena * arena)2465     explicit BackendMetricAllocator(Arena* arena) : arena_(arena) {}
2466 
AllocateBackendMetricData()2467     BackendMetricData* AllocateBackendMetricData() override {
2468       return arena_->New<BackendMetricData>();
2469     }
2470 
AllocateString(size_t size)2471     char* AllocateString(size_t size) override {
2472       return static_cast<char*>(arena_->Alloc(size));
2473     }
2474 
2475    private:
2476     Arena* arena_;
2477   };
2478 
2479   LoadBalancedCall* lb_call_;
2480   grpc_metadata_batch* recv_trailing_metadata_;
2481 };
2482 
2483 //
2484 // ClientChannel::LoadBalancedCall
2485 //
2486 
2487 namespace {
2488 
CreateCallAttemptTracer(grpc_call_context_element * context,bool is_transparent_retry)2489 ClientCallTracer::CallAttemptTracer* CreateCallAttemptTracer(
2490     grpc_call_context_element* context, bool is_transparent_retry) {
2491   auto* call_tracer = static_cast<ClientCallTracer*>(
2492       context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value);
2493   if (call_tracer == nullptr) return nullptr;
2494   auto* tracer = call_tracer->StartNewAttempt(is_transparent_retry);
2495   context[GRPC_CONTEXT_CALL_TRACER].value = tracer;
2496   return tracer;
2497 }
2498 
2499 }  // namespace
2500 
LoadBalancedCall(ClientChannel * chand,grpc_call_context_element * call_context,absl::AnyInvocable<void ()> on_commit,bool is_transparent_retry)2501 ClientChannel::LoadBalancedCall::LoadBalancedCall(
2502     ClientChannel* chand, grpc_call_context_element* call_context,
2503     absl::AnyInvocable<void()> on_commit, bool is_transparent_retry)
2504     : InternallyRefCounted(
2505           GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)
2506               ? "LoadBalancedCall"
2507               : nullptr),
2508       chand_(chand),
2509       on_commit_(std::move(on_commit)) {
2510   CreateCallAttemptTracer(call_context, is_transparent_retry);
2511   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2512     gpr_log(GPR_INFO, "chand=%p lb_call=%p: created", chand_, this);
2513   }
2514 }
2515 
~LoadBalancedCall()2516 ClientChannel::LoadBalancedCall::~LoadBalancedCall() {
2517   if (backend_metric_data_ != nullptr) {
2518     backend_metric_data_->BackendMetricData::~BackendMetricData();
2519   }
2520 }
2521 
Orphan()2522 void ClientChannel::LoadBalancedCall::Orphan() {
2523   // Compute latency and report it to the tracer.
2524   if (call_attempt_tracer() != nullptr) {
2525     gpr_timespec latency =
2526         gpr_cycle_counter_sub(gpr_get_cycle_counter(), lb_call_start_time_);
2527     call_attempt_tracer()->RecordEnd(latency);
2528   }
2529   Unref();
2530 }
2531 
RecordCallCompletion(absl::Status status,grpc_metadata_batch * recv_trailing_metadata,grpc_transport_stream_stats * transport_stream_stats,absl::string_view peer_address)2532 void ClientChannel::LoadBalancedCall::RecordCallCompletion(
2533     absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
2534     grpc_transport_stream_stats* transport_stream_stats,
2535     absl::string_view peer_address) {
2536   // If we have a tracer, notify it.
2537   if (call_attempt_tracer() != nullptr) {
2538     call_attempt_tracer()->RecordReceivedTrailingMetadata(
2539         status, recv_trailing_metadata, transport_stream_stats);
2540   }
2541   // If the LB policy requested a callback for trailing metadata, invoke
2542   // the callback.
2543   if (lb_subchannel_call_tracker_ != nullptr) {
2544     Metadata trailing_metadata(recv_trailing_metadata);
2545     BackendMetricAccessor backend_metric_accessor(this, recv_trailing_metadata);
2546     LoadBalancingPolicy::SubchannelCallTrackerInterface::FinishArgs args = {
2547         peer_address, status, &trailing_metadata, &backend_metric_accessor};
2548     lb_subchannel_call_tracker_->Finish(args);
2549     lb_subchannel_call_tracker_.reset();
2550   }
2551 }
2552 
RemoveCallFromLbQueuedCallsLocked()2553 void ClientChannel::LoadBalancedCall::RemoveCallFromLbQueuedCallsLocked() {
2554   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2555     gpr_log(GPR_INFO, "chand=%p lb_call=%p: removing from queued picks list",
2556             chand_, this);
2557   }
2558   // Remove pollset_set linkage.
2559   grpc_polling_entity_del_from_pollset_set(pollent(),
2560                                            chand_->interested_parties_);
2561   // Note: There's no need to actually remove the call from the queue
2562   // here, beacuse that will be done in either
2563   // LbQueuedCallCanceller::CancelLocked() or
2564   // in ClientChannel::UpdateStateAndPickerLocked().
2565 }
2566 
AddCallToLbQueuedCallsLocked()2567 void ClientChannel::LoadBalancedCall::AddCallToLbQueuedCallsLocked() {
2568   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2569     gpr_log(GPR_INFO, "chand=%p lb_call=%p: adding to queued picks list",
2570             chand_, this);
2571   }
2572   // Add call's pollent to channel's interested_parties, so that I/O
2573   // can be done under the call's CQ.
2574   grpc_polling_entity_add_to_pollset_set(pollent(),
2575                                          chand_->interested_parties_);
2576   // Add to queue.
2577   chand_->lb_queued_calls_.insert(this);
2578   OnAddToQueueLocked();
2579 }
2580 
PickSubchannel(bool was_queued)2581 absl::optional<absl::Status> ClientChannel::LoadBalancedCall::PickSubchannel(
2582     bool was_queued) {
2583   // We may accumulate multiple pickers here, because if a picker says
2584   // to queue the call, we check again to see if the picker has been
2585   // updated before we queue it.
2586   // We need to unref pickers in the WorkSerializer.
2587   std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>> pickers;
2588   auto cleanup = absl::MakeCleanup([&]() {
2589     chand_->work_serializer_->Run(
2590         [pickers = std::move(pickers)]() mutable {
2591           for (auto& picker : pickers) {
2592             picker.reset(DEBUG_LOCATION, "PickSubchannel");
2593           }
2594         },
2595         DEBUG_LOCATION);
2596   });
2597   // Grab mutex and take a ref to the picker.
2598   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2599     gpr_log(GPR_INFO, "chand=%p lb_call=%p: grabbing LB mutex to get picker",
2600             chand_, this);
2601   }
2602   {
2603     MutexLock lock(&chand_->lb_mu_);
2604     pickers.emplace_back(chand_->picker_);
2605   }
2606   while (true) {
2607     // Do pick.
2608     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2609       gpr_log(GPR_INFO, "chand=%p lb_call=%p: performing pick with picker=%p",
2610               chand_, this, pickers.back().get());
2611     }
2612     grpc_error_handle error;
2613     bool pick_complete = PickSubchannelImpl(pickers.back().get(), &error);
2614     if (!pick_complete) {
2615       MutexLock lock(&chand_->lb_mu_);
2616       // If picker has been swapped out since we grabbed it, try again.
2617       if (chand_->picker_ != pickers.back()) {
2618         if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2619           gpr_log(GPR_INFO,
2620                   "chand=%p lb_call=%p: pick not complete, but picker changed",
2621                   chand_, this);
2622         }
2623         pickers.emplace_back(chand_->picker_);
2624         continue;
2625       }
2626       // Otherwise queue the pick to try again later when we get a new picker.
2627       AddCallToLbQueuedCallsLocked();
2628       return absl::nullopt;
2629     }
2630     // Pick is complete.
2631     // If it was queued, add a trace annotation.
2632     if (was_queued && call_attempt_tracer() != nullptr) {
2633       call_attempt_tracer()->RecordAnnotation("Delayed LB pick complete.");
2634     }
2635     // If the pick failed, fail the call.
2636     if (!error.ok()) {
2637       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2638         gpr_log(GPR_INFO,
2639                 "chand=%p lb_call=%p: failed to pick subchannel: error=%s",
2640                 chand_, this, StatusToString(error).c_str());
2641       }
2642       return error;
2643     }
2644     // Pick succeeded.
2645     Commit();
2646     return absl::OkStatus();
2647   }
2648 }
2649 
PickSubchannelImpl(LoadBalancingPolicy::SubchannelPicker * picker,grpc_error_handle * error)2650 bool ClientChannel::LoadBalancedCall::PickSubchannelImpl(
2651     LoadBalancingPolicy::SubchannelPicker* picker, grpc_error_handle* error) {
2652   GPR_ASSERT(connected_subchannel_ == nullptr);
2653   // Perform LB pick.
2654   LoadBalancingPolicy::PickArgs pick_args;
2655   Slice* path = send_initial_metadata()->get_pointer(HttpPathMetadata());
2656   GPR_ASSERT(path != nullptr);
2657   pick_args.path = path->as_string_view();
2658   LbCallState lb_call_state(this);
2659   pick_args.call_state = &lb_call_state;
2660   Metadata initial_metadata(send_initial_metadata());
2661   pick_args.initial_metadata = &initial_metadata;
2662   auto result = picker->Pick(pick_args);
2663   return HandlePickResult<bool>(
2664       &result,
2665       // CompletePick
2666       [this](LoadBalancingPolicy::PickResult::Complete* complete_pick) {
2667         if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2668           gpr_log(GPR_INFO,
2669                   "chand=%p lb_call=%p: LB pick succeeded: subchannel=%p",
2670                   chand_, this, complete_pick->subchannel.get());
2671         }
2672         GPR_ASSERT(complete_pick->subchannel != nullptr);
2673         // Grab a ref to the connected subchannel while we're still
2674         // holding the data plane mutex.
2675         SubchannelWrapper* subchannel =
2676             static_cast<SubchannelWrapper*>(complete_pick->subchannel.get());
2677         connected_subchannel_ = subchannel->connected_subchannel();
2678         // If the subchannel has no connected subchannel (e.g., if the
2679         // subchannel has moved out of state READY but the LB policy hasn't
2680         // yet seen that change and given us a new picker), then just
2681         // queue the pick.  We'll try again as soon as we get a new picker.
2682         if (connected_subchannel_ == nullptr) {
2683           if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2684             gpr_log(GPR_INFO,
2685                     "chand=%p lb_call=%p: subchannel returned by LB picker "
2686                     "has no connected subchannel; queueing pick",
2687                     chand_, this);
2688           }
2689           return false;
2690         }
2691         lb_subchannel_call_tracker_ =
2692             std::move(complete_pick->subchannel_call_tracker);
2693         if (lb_subchannel_call_tracker_ != nullptr) {
2694           lb_subchannel_call_tracker_->Start();
2695         }
2696         return true;
2697       },
2698       // QueuePick
2699       [this](LoadBalancingPolicy::PickResult::Queue* /*queue_pick*/) {
2700         if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2701           gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick queued", chand_,
2702                   this);
2703         }
2704         return false;
2705       },
2706       // FailPick
2707       [this, &error](LoadBalancingPolicy::PickResult::Fail* fail_pick) {
2708         if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2709           gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick failed: %s", chand_,
2710                   this, fail_pick->status.ToString().c_str());
2711         }
2712         // If wait_for_ready is false, then the error indicates the RPC
2713         // attempt's final status.
2714         if (!send_initial_metadata()
2715                  ->GetOrCreatePointer(WaitForReady())
2716                  ->value) {
2717           *error = absl_status_to_grpc_error(MaybeRewriteIllegalStatusCode(
2718               std::move(fail_pick->status), "LB pick"));
2719           return true;
2720         }
2721         // If wait_for_ready is true, then queue to retry when we get a new
2722         // picker.
2723         return false;
2724       },
2725       // DropPick
2726       [this, &error](LoadBalancingPolicy::PickResult::Drop* drop_pick) {
2727         if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2728           gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick dropped: %s", chand_,
2729                   this, drop_pick->status.ToString().c_str());
2730         }
2731         *error = grpc_error_set_int(
2732             absl_status_to_grpc_error(MaybeRewriteIllegalStatusCode(
2733                 std::move(drop_pick->status), "LB drop")),
2734             StatusIntProperty::kLbPolicyDrop, 1);
2735         return true;
2736       });
2737 }
2738 
2739 //
2740 // ClientChannel::FilterBasedLoadBalancedCall
2741 //
2742 
FilterBasedLoadBalancedCall(ClientChannel * chand,const grpc_call_element_args & args,grpc_polling_entity * pollent,grpc_closure * on_call_destruction_complete,absl::AnyInvocable<void ()> on_commit,bool is_transparent_retry)2743 ClientChannel::FilterBasedLoadBalancedCall::FilterBasedLoadBalancedCall(
2744     ClientChannel* chand, const grpc_call_element_args& args,
2745     grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete,
2746     absl::AnyInvocable<void()> on_commit, bool is_transparent_retry)
2747     : LoadBalancedCall(chand, args.context, std::move(on_commit),
2748                        is_transparent_retry),
2749       deadline_(args.deadline),
2750       arena_(args.arena),
2751       call_context_(args.context),
2752       owning_call_(args.call_stack),
2753       call_combiner_(args.call_combiner),
2754       pollent_(pollent),
2755       on_call_destruction_complete_(on_call_destruction_complete) {}
2756 
~FilterBasedLoadBalancedCall()2757 ClientChannel::FilterBasedLoadBalancedCall::~FilterBasedLoadBalancedCall() {
2758   // Make sure there are no remaining pending batches.
2759   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2760     GPR_ASSERT(pending_batches_[i] == nullptr);
2761   }
2762   if (on_call_destruction_complete_ != nullptr) {
2763     ExecCtx::Run(DEBUG_LOCATION, on_call_destruction_complete_,
2764                  absl::OkStatus());
2765   }
2766 }
2767 
Orphan()2768 void ClientChannel::FilterBasedLoadBalancedCall::Orphan() {
2769   // If the recv_trailing_metadata op was never started, then notify
2770   // about call completion here, as best we can.  We assume status
2771   // CANCELLED in this case.
2772   if (recv_trailing_metadata_ == nullptr) {
2773     RecordCallCompletion(absl::CancelledError("call cancelled"), nullptr,
2774                          nullptr, "");
2775   }
2776   // Delegate to parent.
2777   LoadBalancedCall::Orphan();
2778 }
2779 
GetBatchIndex(grpc_transport_stream_op_batch * batch)2780 size_t ClientChannel::FilterBasedLoadBalancedCall::GetBatchIndex(
2781     grpc_transport_stream_op_batch* batch) {
2782   // Note: It is important the send_initial_metadata be the first entry
2783   // here, since the code in PickSubchannelImpl() assumes it will be.
2784   if (batch->send_initial_metadata) return 0;
2785   if (batch->send_message) return 1;
2786   if (batch->send_trailing_metadata) return 2;
2787   if (batch->recv_initial_metadata) return 3;
2788   if (batch->recv_message) return 4;
2789   if (batch->recv_trailing_metadata) return 5;
2790   GPR_UNREACHABLE_CODE(return (size_t)-1);
2791 }
2792 
2793 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesAdd(grpc_transport_stream_op_batch * batch)2794 void ClientChannel::FilterBasedLoadBalancedCall::PendingBatchesAdd(
2795     grpc_transport_stream_op_batch* batch) {
2796   const size_t idx = GetBatchIndex(batch);
2797   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2798     gpr_log(GPR_INFO,
2799             "chand=%p lb_call=%p: adding pending batch at index %" PRIuPTR,
2800             chand(), this, idx);
2801   }
2802   GPR_ASSERT(pending_batches_[idx] == nullptr);
2803   pending_batches_[idx] = batch;
2804 }
2805 
2806 // This is called via the call combiner, so access to calld is synchronized.
FailPendingBatchInCallCombiner(void * arg,grpc_error_handle error)2807 void ClientChannel::FilterBasedLoadBalancedCall::FailPendingBatchInCallCombiner(
2808     void* arg, grpc_error_handle error) {
2809   grpc_transport_stream_op_batch* batch =
2810       static_cast<grpc_transport_stream_op_batch*>(arg);
2811   auto* self = static_cast<FilterBasedLoadBalancedCall*>(
2812       batch->handler_private.extra_arg);
2813   // Note: This will release the call combiner.
2814   grpc_transport_stream_op_batch_finish_with_failure(batch, error,
2815                                                      self->call_combiner_);
2816 }
2817 
2818 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesFail(grpc_error_handle error,YieldCallCombinerPredicate yield_call_combiner_predicate)2819 void ClientChannel::FilterBasedLoadBalancedCall::PendingBatchesFail(
2820     grpc_error_handle error,
2821     YieldCallCombinerPredicate yield_call_combiner_predicate) {
2822   GPR_ASSERT(!error.ok());
2823   failure_error_ = error;
2824   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2825     size_t num_batches = 0;
2826     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2827       if (pending_batches_[i] != nullptr) ++num_batches;
2828     }
2829     gpr_log(GPR_INFO,
2830             "chand=%p lb_call=%p: failing %" PRIuPTR " pending batches: %s",
2831             chand(), this, num_batches, StatusToString(error).c_str());
2832   }
2833   CallCombinerClosureList closures;
2834   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2835     grpc_transport_stream_op_batch*& batch = pending_batches_[i];
2836     if (batch != nullptr) {
2837       batch->handler_private.extra_arg = this;
2838       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2839                         FailPendingBatchInCallCombiner, batch,
2840                         grpc_schedule_on_exec_ctx);
2841       closures.Add(&batch->handler_private.closure, error,
2842                    "PendingBatchesFail");
2843       batch = nullptr;
2844     }
2845   }
2846   if (yield_call_combiner_predicate(closures)) {
2847     closures.RunClosures(call_combiner_);
2848   } else {
2849     closures.RunClosuresWithoutYielding(call_combiner_);
2850   }
2851 }
2852 
2853 // This is called via the call combiner, so access to calld is synchronized.
2854 void ClientChannel::FilterBasedLoadBalancedCall::
ResumePendingBatchInCallCombiner(void * arg,grpc_error_handle)2855     ResumePendingBatchInCallCombiner(void* arg, grpc_error_handle /*ignored*/) {
2856   grpc_transport_stream_op_batch* batch =
2857       static_cast<grpc_transport_stream_op_batch*>(arg);
2858   SubchannelCall* subchannel_call =
2859       static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
2860   // Note: This will release the call combiner.
2861   subchannel_call->StartTransportStreamOpBatch(batch);
2862 }
2863 
2864 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesResume()2865 void ClientChannel::FilterBasedLoadBalancedCall::PendingBatchesResume() {
2866   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2867     size_t num_batches = 0;
2868     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2869       if (pending_batches_[i] != nullptr) ++num_batches;
2870     }
2871     gpr_log(GPR_INFO,
2872             "chand=%p lb_call=%p: starting %" PRIuPTR
2873             " pending batches on subchannel_call=%p",
2874             chand(), this, num_batches, subchannel_call_.get());
2875   }
2876   CallCombinerClosureList closures;
2877   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2878     grpc_transport_stream_op_batch*& batch = pending_batches_[i];
2879     if (batch != nullptr) {
2880       batch->handler_private.extra_arg = subchannel_call_.get();
2881       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2882                         ResumePendingBatchInCallCombiner, batch,
2883                         grpc_schedule_on_exec_ctx);
2884       closures.Add(&batch->handler_private.closure, absl::OkStatus(),
2885                    "resuming pending batch from LB call");
2886       batch = nullptr;
2887     }
2888   }
2889   // Note: This will release the call combiner.
2890   closures.RunClosures(call_combiner_);
2891 }
2892 
StartTransportStreamOpBatch(grpc_transport_stream_op_batch * batch)2893 void ClientChannel::FilterBasedLoadBalancedCall::StartTransportStreamOpBatch(
2894     grpc_transport_stream_op_batch* batch) {
2895   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace) ||
2896       GRPC_TRACE_FLAG_ENABLED(grpc_trace_channel)) {
2897     gpr_log(GPR_INFO,
2898             "chand=%p lb_call=%p: batch started from above: %s, "
2899             "call_attempt_tracer()=%p",
2900             chand(), this,
2901             grpc_transport_stream_op_batch_string(batch, false).c_str(),
2902             call_attempt_tracer());
2903   }
2904   // Handle call tracing.
2905   if (call_attempt_tracer() != nullptr) {
2906     // Record send ops in tracer.
2907     if (batch->cancel_stream) {
2908       call_attempt_tracer()->RecordCancel(
2909           batch->payload->cancel_stream.cancel_error);
2910     }
2911     if (batch->send_initial_metadata) {
2912       call_attempt_tracer()->RecordSendInitialMetadata(
2913           batch->payload->send_initial_metadata.send_initial_metadata);
2914     }
2915     if (batch->send_trailing_metadata) {
2916       call_attempt_tracer()->RecordSendTrailingMetadata(
2917           batch->payload->send_trailing_metadata.send_trailing_metadata);
2918     }
2919     // Intercept recv ops.
2920     if (batch->recv_initial_metadata) {
2921       recv_initial_metadata_ =
2922           batch->payload->recv_initial_metadata.recv_initial_metadata;
2923       original_recv_initial_metadata_ready_ =
2924           batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
2925       GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
2926                         this, nullptr);
2927       batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
2928           &recv_initial_metadata_ready_;
2929     }
2930   }
2931   // Intercept recv_trailing_metadata even if there is no call tracer,
2932   // since we may need to notify the LB policy about trailing metadata.
2933   if (batch->recv_trailing_metadata) {
2934     recv_trailing_metadata_ =
2935         batch->payload->recv_trailing_metadata.recv_trailing_metadata;
2936     transport_stream_stats_ =
2937         batch->payload->recv_trailing_metadata.collect_stats;
2938     original_recv_trailing_metadata_ready_ =
2939         batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
2940     GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady,
2941                       this, nullptr);
2942     batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
2943         &recv_trailing_metadata_ready_;
2944   }
2945   // If we've already gotten a subchannel call, pass the batch down to it.
2946   // Note that once we have picked a subchannel, we do not need to acquire
2947   // the channel's data plane mutex, which is more efficient (especially for
2948   // streaming calls).
2949   if (subchannel_call_ != nullptr) {
2950     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2951       gpr_log(GPR_INFO,
2952               "chand=%p lb_call=%p: starting batch on subchannel_call=%p",
2953               chand(), this, subchannel_call_.get());
2954     }
2955     subchannel_call_->StartTransportStreamOpBatch(batch);
2956     return;
2957   }
2958   // We do not yet have a subchannel call.
2959   //
2960   // If we've previously been cancelled, immediately fail any new batches.
2961   if (GPR_UNLIKELY(!cancel_error_.ok())) {
2962     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2963       gpr_log(GPR_INFO, "chand=%p lb_call=%p: failing batch with error: %s",
2964               chand(), this, StatusToString(cancel_error_).c_str());
2965     }
2966     // Note: This will release the call combiner.
2967     grpc_transport_stream_op_batch_finish_with_failure(batch, cancel_error_,
2968                                                        call_combiner_);
2969     return;
2970   }
2971   // Handle cancellation.
2972   if (GPR_UNLIKELY(batch->cancel_stream)) {
2973     // Stash a copy of cancel_error in our call data, so that we can use
2974     // it for subsequent operations.  This ensures that if the call is
2975     // cancelled before any batches are passed down (e.g., if the deadline
2976     // is in the past when the call starts), we can return the right
2977     // error to the caller when the first batch does get passed down.
2978     cancel_error_ = batch->payload->cancel_stream.cancel_error;
2979     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2980       gpr_log(GPR_INFO, "chand=%p lb_call=%p: recording cancel_error=%s",
2981               chand(), this, StatusToString(cancel_error_).c_str());
2982     }
2983     // Fail all pending batches.
2984     PendingBatchesFail(cancel_error_, NoYieldCallCombiner);
2985     // Note: This will release the call combiner.
2986     grpc_transport_stream_op_batch_finish_with_failure(batch, cancel_error_,
2987                                                        call_combiner_);
2988     return;
2989   }
2990   // Add the batch to the pending list.
2991   PendingBatchesAdd(batch);
2992   // For batches containing a send_initial_metadata op, acquire the
2993   // channel's LB mutex to pick a subchannel.
2994   if (GPR_LIKELY(batch->send_initial_metadata)) {
2995     TryPick(/*was_queued=*/false);
2996   } else {
2997     // For all other batches, release the call combiner.
2998     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2999       gpr_log(GPR_INFO,
3000               "chand=%p lb_call=%p: saved batch, yielding call combiner",
3001               chand(), this);
3002     }
3003     GRPC_CALL_COMBINER_STOP(call_combiner_,
3004                             "batch does not include send_initial_metadata");
3005   }
3006 }
3007 
RecvInitialMetadataReady(void * arg,grpc_error_handle error)3008 void ClientChannel::FilterBasedLoadBalancedCall::RecvInitialMetadataReady(
3009     void* arg, grpc_error_handle error) {
3010   auto* self = static_cast<FilterBasedLoadBalancedCall*>(arg);
3011   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3012     gpr_log(GPR_INFO,
3013             "chand=%p lb_call=%p: got recv_initial_metadata_ready: error=%s",
3014             self->chand(), self, StatusToString(error).c_str());
3015   }
3016   if (error.ok()) {
3017     // recv_initial_metadata_flags is not populated for clients
3018     self->call_attempt_tracer()->RecordReceivedInitialMetadata(
3019         self->recv_initial_metadata_);
3020     auto* peer_string = self->recv_initial_metadata_->get_pointer(PeerString());
3021     if (peer_string != nullptr) self->peer_string_ = peer_string->Ref();
3022   }
3023   Closure::Run(DEBUG_LOCATION, self->original_recv_initial_metadata_ready_,
3024                error);
3025 }
3026 
RecvTrailingMetadataReady(void * arg,grpc_error_handle error)3027 void ClientChannel::FilterBasedLoadBalancedCall::RecvTrailingMetadataReady(
3028     void* arg, grpc_error_handle error) {
3029   auto* self = static_cast<FilterBasedLoadBalancedCall*>(arg);
3030   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3031     gpr_log(GPR_INFO,
3032             "chand=%p lb_call=%p: got recv_trailing_metadata_ready: error=%s "
3033             "call_attempt_tracer()=%p lb_subchannel_call_tracker_=%p "
3034             "failure_error_=%s",
3035             self->chand(), self, StatusToString(error).c_str(),
3036             self->call_attempt_tracer(), self->lb_subchannel_call_tracker(),
3037             StatusToString(self->failure_error_).c_str());
3038   }
3039   // Check if we have a tracer or an LB callback to invoke.
3040   if (self->call_attempt_tracer() != nullptr ||
3041       self->lb_subchannel_call_tracker() != nullptr) {
3042     // Get the call's status.
3043     absl::Status status;
3044     if (!error.ok()) {
3045       // Get status from error.
3046       grpc_status_code code;
3047       std::string message;
3048       grpc_error_get_status(error, self->deadline_, &code, &message,
3049                             /*http_error=*/nullptr, /*error_string=*/nullptr);
3050       status = absl::Status(static_cast<absl::StatusCode>(code), message);
3051     } else {
3052       // Get status from headers.
3053       const auto& md = *self->recv_trailing_metadata_;
3054       grpc_status_code code =
3055           md.get(GrpcStatusMetadata()).value_or(GRPC_STATUS_UNKNOWN);
3056       if (code != GRPC_STATUS_OK) {
3057         absl::string_view message;
3058         if (const auto* grpc_message = md.get_pointer(GrpcMessageMetadata())) {
3059           message = grpc_message->as_string_view();
3060         }
3061         status = absl::Status(static_cast<absl::StatusCode>(code), message);
3062       }
3063     }
3064     absl::string_view peer_string;
3065     if (self->peer_string_.has_value()) {
3066       peer_string = self->peer_string_->as_string_view();
3067     }
3068     self->RecordCallCompletion(status, self->recv_trailing_metadata_,
3069                                self->transport_stream_stats_, peer_string);
3070   }
3071   // Chain to original callback.
3072   if (!self->failure_error_.ok()) {
3073     error = self->failure_error_;
3074     self->failure_error_ = absl::OkStatus();
3075   }
3076   Closure::Run(DEBUG_LOCATION, self->original_recv_trailing_metadata_ready_,
3077                error);
3078 }
3079 
3080 // A class to handle the call combiner cancellation callback for a
3081 // queued pick.
3082 // TODO(roth): When we implement hedging support, we won't be able to
3083 // register a call combiner cancellation closure for each LB pick,
3084 // because there may be multiple LB picks happening in parallel.
3085 // Instead, we will probably need to maintain a list in the CallData
3086 // object of pending LB picks to be cancelled when the closure runs.
3087 class ClientChannel::FilterBasedLoadBalancedCall::LbQueuedCallCanceller {
3088  public:
LbQueuedCallCanceller(RefCountedPtr<FilterBasedLoadBalancedCall> lb_call)3089   explicit LbQueuedCallCanceller(
3090       RefCountedPtr<FilterBasedLoadBalancedCall> lb_call)
3091       : lb_call_(std::move(lb_call)) {
3092     GRPC_CALL_STACK_REF(lb_call_->owning_call_, "LbQueuedCallCanceller");
3093     GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this, nullptr);
3094     lb_call_->call_combiner_->SetNotifyOnCancel(&closure_);
3095   }
3096 
3097  private:
CancelLocked(void * arg,grpc_error_handle error)3098   static void CancelLocked(void* arg, grpc_error_handle error) {
3099     auto* self = static_cast<LbQueuedCallCanceller*>(arg);
3100     auto* lb_call = self->lb_call_.get();
3101     auto* chand = lb_call->chand();
3102     {
3103       MutexLock lock(&chand->lb_mu_);
3104       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3105         gpr_log(GPR_INFO,
3106                 "chand=%p lb_call=%p: cancelling queued pick: "
3107                 "error=%s self=%p calld->pick_canceller=%p",
3108                 chand, lb_call, StatusToString(error).c_str(), self,
3109                 lb_call->lb_call_canceller_);
3110       }
3111       if (lb_call->lb_call_canceller_ == self && !error.ok()) {
3112         lb_call->Commit();
3113         // Remove pick from list of queued picks.
3114         lb_call->RemoveCallFromLbQueuedCallsLocked();
3115         // Remove from queued picks list.
3116         chand->lb_queued_calls_.erase(lb_call);
3117         // Fail pending batches on the call.
3118         lb_call->PendingBatchesFail(error,
3119                                     YieldCallCombinerIfPendingBatchesFound);
3120       }
3121     }
3122     // Unref lb_call before unreffing the call stack, since unreffing
3123     // the call stack may destroy the arena in which lb_call is allocated.
3124     auto* owning_call = lb_call->owning_call_;
3125     self->lb_call_.reset();
3126     GRPC_CALL_STACK_UNREF(owning_call, "LbQueuedCallCanceller");
3127     delete self;
3128   }
3129 
3130   RefCountedPtr<FilterBasedLoadBalancedCall> lb_call_;
3131   grpc_closure closure_;
3132 };
3133 
TryPick(bool was_queued)3134 void ClientChannel::FilterBasedLoadBalancedCall::TryPick(bool was_queued) {
3135   auto result = PickSubchannel(was_queued);
3136   if (result.has_value()) {
3137     if (!result->ok()) {
3138       PendingBatchesFail(*result, YieldCallCombiner);
3139       return;
3140     }
3141     CreateSubchannelCall();
3142   }
3143 }
3144 
OnAddToQueueLocked()3145 void ClientChannel::FilterBasedLoadBalancedCall::OnAddToQueueLocked() {
3146   // Register call combiner cancellation callback.
3147   lb_call_canceller_ = new LbQueuedCallCanceller(Ref());
3148 }
3149 
RetryPickLocked()3150 void ClientChannel::FilterBasedLoadBalancedCall::RetryPickLocked() {
3151   // Lame the call combiner canceller.
3152   lb_call_canceller_ = nullptr;
3153   // Do an async callback to resume call processing, so that we're not
3154   // doing it while holding the channel's LB mutex.
3155   // TODO(roth): We should really be using EventEngine::Run() here
3156   // instead of ExecCtx::Run().  Unfortunately, doing that seems to cause
3157   // a flaky TSAN failure for reasons that I do not fully understand.
3158   // However, given that we are working toward eliminating this code as
3159   // part of the promise conversion, it doesn't seem worth further
3160   // investigation right now.
3161   ExecCtx::Run(DEBUG_LOCATION, NewClosure([this](grpc_error_handle) {
3162                  // If there are a lot of queued calls here, resuming them
3163                  // all may cause us to stay inside C-core for a long period
3164                  // of time. All of that work would be done using the same
3165                  // ExecCtx instance and therefore the same cached value of
3166                  // "now". The longer it takes to finish all of this work
3167                  // and exit from C-core, the more stale the cached value of
3168                  // "now" may become. This can cause problems whereby (e.g.)
3169                  // we calculate a timer deadline based on the stale value,
3170                  // which results in the timer firing too early. To avoid
3171                  // this, we invalidate the cached value for each call we
3172                  // process.
3173                  ExecCtx::Get()->InvalidateNow();
3174                  TryPick(/*was_queued=*/true);
3175                }),
3176                absl::OkStatus());
3177 }
3178 
CreateSubchannelCall()3179 void ClientChannel::FilterBasedLoadBalancedCall::CreateSubchannelCall() {
3180   Slice* path = send_initial_metadata()->get_pointer(HttpPathMetadata());
3181   GPR_ASSERT(path != nullptr);
3182   SubchannelCall::Args call_args = {
3183       connected_subchannel()->Ref(), pollent_, path->Ref(), /*start_time=*/0,
3184       deadline_, arena_,
3185       // TODO(roth): When we implement hedging support, we will probably
3186       // need to use a separate call context for each subchannel call.
3187       call_context_, call_combiner_};
3188   grpc_error_handle error;
3189   subchannel_call_ = SubchannelCall::Create(std::move(call_args), &error);
3190   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3191     gpr_log(GPR_INFO,
3192             "chand=%p lb_call=%p: create subchannel_call=%p: error=%s", chand(),
3193             this, subchannel_call_.get(), StatusToString(error).c_str());
3194   }
3195   if (on_call_destruction_complete_ != nullptr) {
3196     subchannel_call_->SetAfterCallStackDestroy(on_call_destruction_complete_);
3197     on_call_destruction_complete_ = nullptr;
3198   }
3199   if (GPR_UNLIKELY(!error.ok())) {
3200     PendingBatchesFail(error, YieldCallCombiner);
3201   } else {
3202     PendingBatchesResume();
3203   }
3204 }
3205 
3206 }  // namespace grpc_core
3207