1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "src/core/ext/xds/xds_client.h"
20 
21 #include <inttypes.h>
22 #include <string.h>
23 
24 #include <algorithm>
25 #include <type_traits>
26 
27 #include "absl/strings/match.h"
28 #include "absl/strings/str_cat.h"
29 #include "absl/strings/str_join.h"
30 #include "absl/strings/str_split.h"
31 #include "absl/strings/string_view.h"
32 #include "absl/strings/strip.h"
33 #include "absl/types/optional.h"
34 #include "upb/mem/arena.h"
35 
36 #include <grpc/event_engine/event_engine.h>
37 #include <grpc/support/log.h>
38 
39 #include "src/core/ext/xds/xds_api.h"
40 #include "src/core/ext/xds/xds_bootstrap.h"
41 #include "src/core/ext/xds/xds_client_stats.h"
42 #include "src/core/lib/backoff/backoff.h"
43 #include "src/core/lib/gprpp/debug_location.h"
44 #include "src/core/lib/gprpp/orphanable.h"
45 #include "src/core/lib/gprpp/ref_counted_ptr.h"
46 #include "src/core/lib/gprpp/sync.h"
47 #include "src/core/lib/iomgr/exec_ctx.h"
48 #include "src/core/lib/uri/uri_parser.h"
49 
50 #define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
51 #define GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER 1.6
52 #define GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS 120
53 #define GRPC_XDS_RECONNECT_JITTER 0.2
54 #define GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS 1000
55 
56 namespace grpc_core {
57 
58 using ::grpc_event_engine::experimental::EventEngine;
59 
60 TraceFlag grpc_xds_client_trace(false, "xds_client");
61 TraceFlag grpc_xds_client_refcount_trace(false, "xds_client_refcount");
62 
63 //
64 // Internal class declarations
65 //
66 
67 // An xds call wrapper that can restart a call upon failure. Holds a ref to
68 // the xds channel. The template parameter is the kind of wrapped xds call.
69 template <typename T>
70 class XdsClient::ChannelState::RetryableCall
71     : public InternallyRefCounted<RetryableCall<T>> {
72  public:
73   explicit RetryableCall(WeakRefCountedPtr<ChannelState> chand);
74 
75   // Disable thread-safety analysis because this method is called via
76   // OrphanablePtr<>, but there's no way to pass the lock annotation
77   // through there.
78   void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
79 
80   void OnCallFinishedLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
81 
calld() const82   T* calld() const { return calld_.get(); }
chand() const83   ChannelState* chand() const { return chand_.get(); }
84 
85   bool IsCurrentCallOnChannel() const;
86 
87  private:
88   void StartNewCallLocked();
89   void StartRetryTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
90 
91   void OnRetryTimer();
92 
93   // The wrapped xds call that talks to the xds server. It's instantiated
94   // every time we start a new call. It's null during call retry backoff.
95   OrphanablePtr<T> calld_;
96   // The owning xds channel.
97   WeakRefCountedPtr<ChannelState> chand_;
98 
99   // Retry state.
100   BackOff backoff_;
101   absl::optional<EventEngine::TaskHandle> timer_handle_
102       ABSL_GUARDED_BY(&XdsClient::mu_);
103 
104   bool shutting_down_ = false;
105 };
106 
107 // Contains an ADS call to the xds server.
108 class XdsClient::ChannelState::AdsCallState
109     : public InternallyRefCounted<AdsCallState> {
110  public:
111   // The ctor and dtor should not be used directly.
112   explicit AdsCallState(RefCountedPtr<RetryableCall<AdsCallState>> parent);
113 
114   void Orphan() override;
115 
parent() const116   RetryableCall<AdsCallState>* parent() const { return parent_.get(); }
chand() const117   ChannelState* chand() const { return parent_->chand(); }
xds_client() const118   XdsClient* xds_client() const { return chand()->xds_client(); }
seen_response() const119   bool seen_response() const { return seen_response_; }
120 
121   void SubscribeLocked(const XdsResourceType* type, const XdsResourceName& name,
122                        bool delay_send)
123       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
124   void UnsubscribeLocked(const XdsResourceType* type,
125                          const XdsResourceName& name, bool delay_unsubscription)
126       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
127 
128   bool HasSubscribedResources() const;
129 
130  private:
131   class AdsResponseParser : public XdsApi::AdsResponseParserInterface {
132    public:
133     struct Result {
134       const XdsResourceType* type;
135       std::string type_url;
136       std::string version;
137       std::string nonce;
138       std::vector<std::string> errors;
139       std::map<std::string /*authority*/, std::set<XdsResourceKey>>
140           resources_seen;
141       bool have_valid_resources = false;
142     };
143 
AdsResponseParser(AdsCallState * ads_call_state)144     explicit AdsResponseParser(AdsCallState* ads_call_state)
145         : ads_call_state_(ads_call_state) {}
146 
147     absl::Status ProcessAdsResponseFields(AdsResponseFields fields) override
148         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
149 
150     void ParseResource(upb_Arena* arena, size_t idx, absl::string_view type_url,
151                        absl::string_view resource_name,
152                        absl::string_view serialized_resource) override
153         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
154 
155     void ResourceWrapperParsingFailed(size_t idx,
156                                       absl::string_view message) override;
157 
TakeResult()158     Result TakeResult() { return std::move(result_); }
159 
160    private:
xds_client() const161     XdsClient* xds_client() const { return ads_call_state_->xds_client(); }
162 
163     AdsCallState* ads_call_state_;
164     const Timestamp update_time_ = Timestamp::Now();
165     Result result_;
166   };
167 
168   class ResourceTimer : public InternallyRefCounted<ResourceTimer> {
169    public:
ResourceTimer(const XdsResourceType * type,const XdsResourceName & name)170     ResourceTimer(const XdsResourceType* type, const XdsResourceName& name)
171         : type_(type), name_(name) {}
172 
173     // Disable thread-safety analysis because this method is called via
174     // OrphanablePtr<>, but there's no way to pass the lock annotation
175     // through there.
Orphan()176     void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS {
177       MaybeCancelTimer();
178       Unref(DEBUG_LOCATION, "Orphan");
179     }
180 
MarkSubscriptionSendStarted()181     void MarkSubscriptionSendStarted()
182         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
183       subscription_sent_ = true;
184     }
185 
MaybeMarkSubscriptionSendComplete(RefCountedPtr<AdsCallState> ads_calld)186     void MaybeMarkSubscriptionSendComplete(
187         RefCountedPtr<AdsCallState> ads_calld)
188         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
189       if (subscription_sent_) MaybeStartTimer(std::move(ads_calld));
190     }
191 
MarkSeen()192     void MarkSeen() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
193       resource_seen_ = true;
194       MaybeCancelTimer();
195     }
196 
MaybeCancelTimer()197     void MaybeCancelTimer() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
198       if (timer_handle_.has_value() &&
199           ads_calld_->xds_client()->engine()->Cancel(*timer_handle_)) {
200         timer_handle_.reset();
201       }
202     }
203 
204    private:
MaybeStartTimer(RefCountedPtr<AdsCallState> ads_calld)205     void MaybeStartTimer(RefCountedPtr<AdsCallState> ads_calld)
206         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
207       // Don't start timer if we've already either seen the resource or
208       // marked it as non-existing.
209       // Note: There are edge cases where we can have seen the resource
210       // before we have sent the initial subscription request, such as
211       // when we unsubscribe and then resubscribe to a given resource
212       // and then get a response containing that resource, all while a
213       // send_message op is in flight.
214       if (resource_seen_) return;
215       // Don't start timer if we haven't yet sent the initial subscription
216       // request for the resource.
217       if (!subscription_sent_) return;
218       // Don't start timer if it's already running.
219       if (timer_handle_.has_value()) return;
220       // Check if we already have a cached version of this resource
221       // (i.e., if this is the initial request for the resource after an
222       // ADS stream restart).  If so, we don't start the timer, because
223       // (a) we already have the resource and (b) the server may
224       // optimize by not resending the resource that we already have.
225       auto& authority_state =
226           ads_calld->xds_client()->authority_state_map_[name_.authority];
227       ResourceState& state = authority_state.resource_map[type_][name_.key];
228       if (state.resource != nullptr) return;
229       // Start timer.
230       ads_calld_ = std::move(ads_calld);
231       timer_handle_ = ads_calld_->xds_client()->engine()->RunAfter(
232           ads_calld_->xds_client()->request_timeout_,
233           [self = Ref(DEBUG_LOCATION, "timer")]() {
234             ApplicationCallbackExecCtx callback_exec_ctx;
235             ExecCtx exec_ctx;
236             self->OnTimer();
237           });
238     }
239 
OnTimer()240     void OnTimer() {
241       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
242         gpr_log(GPR_INFO,
243                 "[xds_client %p] xds server %s: timeout obtaining resource "
244                 "{type=%s name=%s} from xds server",
245                 ads_calld_->xds_client(),
246                 ads_calld_->chand()->server_.server_uri().c_str(),
247                 std::string(type_->type_url()).c_str(),
248                 XdsClient::ConstructFullXdsResourceName(
249                     name_.authority, type_->type_url(), name_.key)
250                     .c_str());
251       }
252       {
253         MutexLock lock(&ads_calld_->xds_client()->mu_);
254         timer_handle_.reset();
255         resource_seen_ = true;
256         auto& authority_state =
257             ads_calld_->xds_client()->authority_state_map_[name_.authority];
258         ResourceState& state = authority_state.resource_map[type_][name_.key];
259         state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
260         ads_calld_->xds_client()->NotifyWatchersOnResourceDoesNotExist(
261             state.watchers);
262       }
263       ads_calld_->xds_client()->work_serializer_.DrainQueue();
264       ads_calld_.reset();
265     }
266 
267     const XdsResourceType* type_;
268     const XdsResourceName name_;
269 
270     RefCountedPtr<AdsCallState> ads_calld_;
271     // True if we have sent the initial subscription request for this
272     // resource on this ADS stream.
273     bool subscription_sent_ ABSL_GUARDED_BY(&XdsClient::mu_) = false;
274     // True if we have either (a) seen the resource in a response on this
275     // stream or (b) declared the resource to not exist due to the timer
276     // firing.
277     bool resource_seen_ ABSL_GUARDED_BY(&XdsClient::mu_) = false;
278     absl::optional<EventEngine::TaskHandle> timer_handle_
279         ABSL_GUARDED_BY(&XdsClient::mu_);
280   };
281 
282   class StreamEventHandler
283       : public XdsTransportFactory::XdsTransport::StreamingCall::EventHandler {
284    public:
StreamEventHandler(RefCountedPtr<AdsCallState> ads_calld)285     explicit StreamEventHandler(RefCountedPtr<AdsCallState> ads_calld)
286         : ads_calld_(std::move(ads_calld)) {}
287 
OnRequestSent(bool ok)288     void OnRequestSent(bool ok) override { ads_calld_->OnRequestSent(ok); }
OnRecvMessage(absl::string_view payload)289     void OnRecvMessage(absl::string_view payload) override {
290       ads_calld_->OnRecvMessage(payload);
291     }
OnStatusReceived(absl::Status status)292     void OnStatusReceived(absl::Status status) override {
293       ads_calld_->OnStatusReceived(std::move(status));
294     }
295 
296    private:
297     RefCountedPtr<AdsCallState> ads_calld_;
298   };
299 
300   struct ResourceTypeState {
301     // Nonce and status for this resource type.
302     std::string nonce;
303     absl::Status status;
304 
305     // Subscribed resources of this type.
306     std::map<std::string /*authority*/,
307              std::map<XdsResourceKey, OrphanablePtr<ResourceTimer>>>
308         subscribed_resources;
309   };
310 
311   void SendMessageLocked(const XdsResourceType* type)
312       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
313 
314   void OnRequestSent(bool ok);
315   void OnRecvMessage(absl::string_view payload);
316   void OnStatusReceived(absl::Status status);
317 
318   bool IsCurrentCallOnChannel() const;
319 
320   // Constructs a list of resource names of a given type for an ADS
321   // request.  Also starts the timer for each resource if needed.
322   std::vector<std::string> ResourceNamesForRequest(const XdsResourceType* type)
323       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
324 
325   // The owning RetryableCall<>.
326   RefCountedPtr<RetryableCall<AdsCallState>> parent_;
327 
328   OrphanablePtr<XdsTransportFactory::XdsTransport::StreamingCall> call_;
329 
330   bool sent_initial_message_ = false;
331   bool seen_response_ = false;
332 
333   const XdsResourceType* send_message_pending_
334       ABSL_GUARDED_BY(&XdsClient::mu_) = nullptr;
335 
336   // Resource types for which requests need to be sent.
337   std::set<const XdsResourceType*> buffered_requests_;
338 
339   // State for each resource type.
340   std::map<const XdsResourceType*, ResourceTypeState> state_map_;
341 };
342 
343 // Contains an LRS call to the xds server.
344 class XdsClient::ChannelState::LrsCallState
345     : public InternallyRefCounted<LrsCallState> {
346  public:
347   // The ctor and dtor should not be used directly.
348   explicit LrsCallState(RefCountedPtr<RetryableCall<LrsCallState>> parent);
349 
350   void Orphan() override;
351 
352   void MaybeStartReportingLocked()
353       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
354 
parent()355   RetryableCall<LrsCallState>* parent() { return parent_.get(); }
chand() const356   ChannelState* chand() const { return parent_->chand(); }
xds_client() const357   XdsClient* xds_client() const { return chand()->xds_client(); }
seen_response() const358   bool seen_response() const { return seen_response_; }
359 
360  private:
361   class StreamEventHandler
362       : public XdsTransportFactory::XdsTransport::StreamingCall::EventHandler {
363    public:
StreamEventHandler(RefCountedPtr<LrsCallState> lrs_calld)364     explicit StreamEventHandler(RefCountedPtr<LrsCallState> lrs_calld)
365         : lrs_calld_(std::move(lrs_calld)) {}
366 
OnRequestSent(bool ok)367     void OnRequestSent(bool ok) override { lrs_calld_->OnRequestSent(ok); }
OnRecvMessage(absl::string_view payload)368     void OnRecvMessage(absl::string_view payload) override {
369       lrs_calld_->OnRecvMessage(payload);
370     }
OnStatusReceived(absl::Status status)371     void OnStatusReceived(absl::Status status) override {
372       lrs_calld_->OnStatusReceived(std::move(status));
373     }
374 
375    private:
376     RefCountedPtr<LrsCallState> lrs_calld_;
377   };
378 
379   // Reports client-side load stats according to a fixed interval.
380   class Reporter : public InternallyRefCounted<Reporter> {
381    public:
Reporter(RefCountedPtr<LrsCallState> parent,Duration report_interval)382     Reporter(RefCountedPtr<LrsCallState> parent, Duration report_interval)
383         : parent_(std::move(parent)), report_interval_(report_interval) {
384       ScheduleNextReportLocked();
385     }
386 
387     // Disable thread-safety analysis because this method is called via
388     // OrphanablePtr<>, but there's no way to pass the lock annotation
389     // through there.
390     void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
391 
392     void OnReportDoneLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
393 
394    private:
395     void ScheduleNextReportLocked()
396         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
397     bool OnNextReportTimer();
398     bool SendReportLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
399 
IsCurrentReporterOnCall() const400     bool IsCurrentReporterOnCall() const {
401       return this == parent_->reporter_.get();
402     }
xds_client() const403     XdsClient* xds_client() const { return parent_->xds_client(); }
404 
405     // The owning LRS call.
406     RefCountedPtr<LrsCallState> parent_;
407 
408     // The load reporting state.
409     const Duration report_interval_;
410     bool last_report_counters_were_zero_ = false;
411     absl::optional<EventEngine::TaskHandle> timer_handle_
412         ABSL_GUARDED_BY(&XdsClient::mu_);
413   };
414 
415   void OnRequestSent(bool ok);
416   void OnRecvMessage(absl::string_view payload);
417   void OnStatusReceived(absl::Status status);
418 
419   bool IsCurrentCallOnChannel() const;
420 
421   // The owning RetryableCall<>.
422   RefCountedPtr<RetryableCall<LrsCallState>> parent_;
423 
424   OrphanablePtr<XdsTransportFactory::XdsTransport::StreamingCall> call_;
425 
426   bool seen_response_ = false;
427   bool send_message_pending_ ABSL_GUARDED_BY(&XdsClient::mu_) = false;
428 
429   // Load reporting state.
430   bool send_all_clusters_ = false;
431   std::set<std::string> cluster_names_;  // Asked for by the LRS server.
432   Duration load_reporting_interval_;
433   OrphanablePtr<Reporter> reporter_;
434 };
435 
436 //
437 // XdsClient::ChannelState
438 //
439 
ChannelState(WeakRefCountedPtr<XdsClient> xds_client,const XdsBootstrap::XdsServer & server)440 XdsClient::ChannelState::ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
441                                       const XdsBootstrap::XdsServer& server)
442     : DualRefCounted<ChannelState>(
443           GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
444               ? "ChannelState"
445               : nullptr),
446       xds_client_(std::move(xds_client)),
447       server_(server) {
448   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
449     gpr_log(GPR_INFO, "[xds_client %p] creating channel %p for server %s",
450             xds_client_.get(), this, server.server_uri().c_str());
451   }
452   absl::Status status;
453   transport_ = xds_client_->transport_factory_->Create(
454       server,
455       [self = WeakRef(DEBUG_LOCATION, "OnConnectivityFailure")](
456           absl::Status status) {
457         self->OnConnectivityFailure(std::move(status));
458       },
459       &status);
460   GPR_ASSERT(transport_ != nullptr);
461   if (!status.ok()) SetChannelStatusLocked(std::move(status));
462 }
463 
~ChannelState()464 XdsClient::ChannelState::~ChannelState() {
465   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
466     gpr_log(GPR_INFO, "[xds_client %p] destroying xds channel %p for server %s",
467             xds_client(), this, server_.server_uri().c_str());
468   }
469   xds_client_.reset(DEBUG_LOCATION, "ChannelState");
470 }
471 
472 // This method should only ever be called when holding the lock, but we can't
473 // use a ABSL_EXCLUSIVE_LOCKS_REQUIRED annotation, because Orphan() will be
474 // called from DualRefCounted::Unref, which cannot have a lock annotation for
475 // a lock in this subclass.
Orphan()476 void XdsClient::ChannelState::Orphan() ABSL_NO_THREAD_SAFETY_ANALYSIS {
477   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
478     gpr_log(GPR_INFO, "[xds_client %p] orphaning xds channel %p for server %s",
479             xds_client(), this, server_.server_uri().c_str());
480   }
481   shutting_down_ = true;
482   transport_.reset();
483   // At this time, all strong refs are removed, remove from channel map to
484   // prevent subsequent subscription from trying to use this ChannelState as
485   // it is shutting down.
486   xds_client_->xds_server_channel_map_.erase(&server_);
487   ads_calld_.reset();
488   lrs_calld_.reset();
489 }
490 
ResetBackoff()491 void XdsClient::ChannelState::ResetBackoff() { transport_->ResetBackoff(); }
492 
ads_calld() const493 XdsClient::ChannelState::AdsCallState* XdsClient::ChannelState::ads_calld()
494     const {
495   return ads_calld_->calld();
496 }
497 
lrs_calld() const498 XdsClient::ChannelState::LrsCallState* XdsClient::ChannelState::lrs_calld()
499     const {
500   return lrs_calld_->calld();
501 }
502 
MaybeStartLrsCall()503 void XdsClient::ChannelState::MaybeStartLrsCall() {
504   if (lrs_calld_ != nullptr) return;
505   lrs_calld_.reset(new RetryableCall<LrsCallState>(
506       WeakRef(DEBUG_LOCATION, "ChannelState+lrs")));
507 }
508 
StopLrsCallLocked()509 void XdsClient::ChannelState::StopLrsCallLocked() {
510   xds_client_->xds_load_report_server_map_.erase(&server_);
511   lrs_calld_.reset();
512 }
513 
SubscribeLocked(const XdsResourceType * type,const XdsResourceName & name)514 void XdsClient::ChannelState::SubscribeLocked(const XdsResourceType* type,
515                                               const XdsResourceName& name) {
516   if (ads_calld_ == nullptr) {
517     // Start the ADS call if this is the first request.
518     ads_calld_.reset(new RetryableCall<AdsCallState>(
519         WeakRef(DEBUG_LOCATION, "ChannelState+ads")));
520     // Note: AdsCallState's ctor will automatically subscribe to all
521     // resources that the XdsClient already has watchers for, so we can
522     // return here.
523     return;
524   }
525   // If the ADS call is in backoff state, we don't need to do anything now
526   // because when the call is restarted it will resend all necessary requests.
527   if (ads_calld() == nullptr) return;
528   // Subscribe to this resource if the ADS call is active.
529   ads_calld()->SubscribeLocked(type, name, /*delay_send=*/false);
530 }
531 
UnsubscribeLocked(const XdsResourceType * type,const XdsResourceName & name,bool delay_unsubscription)532 void XdsClient::ChannelState::UnsubscribeLocked(const XdsResourceType* type,
533                                                 const XdsResourceName& name,
534                                                 bool delay_unsubscription) {
535   if (ads_calld_ != nullptr) {
536     auto* calld = ads_calld_->calld();
537     if (calld != nullptr) {
538       calld->UnsubscribeLocked(type, name, delay_unsubscription);
539       if (!calld->HasSubscribedResources()) {
540         ads_calld_.reset();
541       }
542     }
543   }
544 }
545 
OnConnectivityFailure(absl::Status status)546 void XdsClient::ChannelState::OnConnectivityFailure(absl::Status status) {
547   {
548     MutexLock lock(&xds_client_->mu_);
549     SetChannelStatusLocked(std::move(status));
550   }
551   xds_client_->work_serializer_.DrainQueue();
552 }
553 
SetChannelStatusLocked(absl::Status status)554 void XdsClient::ChannelState::SetChannelStatusLocked(absl::Status status) {
555   if (shutting_down_) return;
556   status = absl::Status(status.code(), absl::StrCat("xDS channel for server ",
557                                                     server_.server_uri(), ": ",
558                                                     status.message()));
559   gpr_log(GPR_INFO, "[xds_client %p] %s", xds_client(),
560           status.ToString().c_str());
561   // If the node ID is set, append that to the status message that we send to
562   // the watchers, so that it will appear in log messages visible to users.
563   const auto* node = xds_client_->bootstrap_->node();
564   if (node != nullptr) {
565     status = absl::Status(
566         status.code(),
567         absl::StrCat(status.message(),
568                      " (node ID:", xds_client_->bootstrap_->node()->id(), ")"));
569   }
570   // Save status in channel, so that we can immediately generate an
571   // error for any new watchers that may be started.
572   status_ = status;
573   // Find all watchers for this channel.
574   std::set<RefCountedPtr<ResourceWatcherInterface>> watchers;
575   for (const auto& a : xds_client_->authority_state_map_) {  // authority
576     if (a.second.channel_state != this) continue;
577     for (const auto& t : a.second.resource_map) {  // type
578       for (const auto& r : t.second) {             // resource id
579         for (const auto& w : r.second.watchers) {  // watchers
580           watchers.insert(w.second);
581         }
582       }
583     }
584   }
585   // Enqueue notification for the watchers.
586   xds_client_->work_serializer_.Schedule(
587       [watchers = std::move(watchers), status = std::move(status)]()
588           ABSL_EXCLUSIVE_LOCKS_REQUIRED(xds_client_->work_serializer_) {
589             for (const auto& watcher : watchers) {
590               watcher->OnError(status);
591             }
592           },
593       DEBUG_LOCATION);
594 }
595 
596 //
597 // XdsClient::ChannelState::RetryableCall<>
598 //
599 
600 template <typename T>
RetryableCall(WeakRefCountedPtr<ChannelState> chand)601 XdsClient::ChannelState::RetryableCall<T>::RetryableCall(
602     WeakRefCountedPtr<ChannelState> chand)
603     : chand_(std::move(chand)),
604       backoff_(BackOff::Options()
605                    .set_initial_backoff(Duration::Seconds(
606                        GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS))
607                    .set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER)
608                    .set_jitter(GRPC_XDS_RECONNECT_JITTER)
609                    .set_max_backoff(Duration::Seconds(
610                        GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS))) {
611   StartNewCallLocked();
612 }
613 
614 template <typename T>
Orphan()615 void XdsClient::ChannelState::RetryableCall<T>::Orphan() {
616   shutting_down_ = true;
617   calld_.reset();
618   if (timer_handle_.has_value()) {
619     chand()->xds_client()->engine()->Cancel(*timer_handle_);
620     timer_handle_.reset();
621   }
622   this->Unref(DEBUG_LOCATION, "RetryableCall+orphaned");
623 }
624 
625 template <typename T>
OnCallFinishedLocked()626 void XdsClient::ChannelState::RetryableCall<T>::OnCallFinishedLocked() {
627   // If we saw a response on the current stream, reset backoff.
628   if (calld_->seen_response()) backoff_.Reset();
629   calld_.reset();
630   // Start retry timer.
631   StartRetryTimerLocked();
632 }
633 
634 template <typename T>
StartNewCallLocked()635 void XdsClient::ChannelState::RetryableCall<T>::StartNewCallLocked() {
636   if (shutting_down_) return;
637   GPR_ASSERT(chand_->transport_ != nullptr);
638   GPR_ASSERT(calld_ == nullptr);
639   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
640     gpr_log(GPR_INFO,
641             "[xds_client %p] xds server %s: start new call from retryable "
642             "call %p",
643             chand()->xds_client(), chand()->server_.server_uri().c_str(), this);
644   }
645   calld_ = MakeOrphanable<T>(
646       this->Ref(DEBUG_LOCATION, "RetryableCall+start_new_call"));
647 }
648 
649 template <typename T>
StartRetryTimerLocked()650 void XdsClient::ChannelState::RetryableCall<T>::StartRetryTimerLocked() {
651   if (shutting_down_) return;
652   const Timestamp next_attempt_time = backoff_.NextAttemptTime();
653   const Duration timeout =
654       std::max(next_attempt_time - Timestamp::Now(), Duration::Zero());
655   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
656     gpr_log(GPR_INFO,
657             "[xds_client %p] xds server %s: call attempt failed; "
658             "retry timer will fire in %" PRId64 "ms.",
659             chand()->xds_client(), chand()->server_.server_uri().c_str(),
660             timeout.millis());
661   }
662   timer_handle_ = chand()->xds_client()->engine()->RunAfter(
663       timeout,
664       [self = this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start")]() {
665         ApplicationCallbackExecCtx callback_exec_ctx;
666         ExecCtx exec_ctx;
667         self->OnRetryTimer();
668       });
669 }
670 
671 template <typename T>
OnRetryTimer()672 void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimer() {
673   MutexLock lock(&chand_->xds_client()->mu_);
674   if (timer_handle_.has_value()) {
675     timer_handle_.reset();
676     if (shutting_down_) return;
677     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
678       gpr_log(GPR_INFO,
679               "[xds_client %p] xds server %s: retry timer fired (retryable "
680               "call: %p)",
681               chand()->xds_client(), chand()->server_.server_uri().c_str(),
682               this);
683     }
684     StartNewCallLocked();
685   }
686 }
687 
688 //
689 // XdsClient::ChannelState::AdsCallState::AdsResponseParser
690 //
691 
692 absl::Status XdsClient::ChannelState::AdsCallState::AdsResponseParser::
ProcessAdsResponseFields(AdsResponseFields fields)693     ProcessAdsResponseFields(AdsResponseFields fields) {
694   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
695     gpr_log(
696         GPR_INFO,
697         "[xds_client %p] xds server %s: received ADS response: type_url=%s, "
698         "version=%s, nonce=%s, num_resources=%" PRIuPTR,
699         ads_call_state_->xds_client(),
700         ads_call_state_->chand()->server_.server_uri().c_str(),
701         fields.type_url.c_str(), fields.version.c_str(), fields.nonce.c_str(),
702         fields.num_resources);
703   }
704   result_.type =
705       ads_call_state_->xds_client()->GetResourceTypeLocked(fields.type_url);
706   if (result_.type == nullptr) {
707     return absl::InvalidArgumentError(
708         absl::StrCat("unknown resource type ", fields.type_url));
709   }
710   result_.type_url = std::move(fields.type_url);
711   result_.version = std::move(fields.version);
712   result_.nonce = std::move(fields.nonce);
713   return absl::OkStatus();
714 }
715 
716 namespace {
717 
718 // Build a resource metadata struct for ADS result accepting methods and CSDS.
CreateResourceMetadataAcked(std::string serialized_proto,std::string version,Timestamp update_time)719 XdsApi::ResourceMetadata CreateResourceMetadataAcked(
720     std::string serialized_proto, std::string version, Timestamp update_time) {
721   XdsApi::ResourceMetadata resource_metadata;
722   resource_metadata.serialized_proto = std::move(serialized_proto);
723   resource_metadata.update_time = update_time;
724   resource_metadata.version = std::move(version);
725   resource_metadata.client_status = XdsApi::ResourceMetadata::ACKED;
726   return resource_metadata;
727 }
728 
729 // Update resource_metadata for NACK.
UpdateResourceMetadataNacked(const std::string & version,const std::string & details,Timestamp update_time,XdsApi::ResourceMetadata * resource_metadata)730 void UpdateResourceMetadataNacked(const std::string& version,
731                                   const std::string& details,
732                                   Timestamp update_time,
733                                   XdsApi::ResourceMetadata* resource_metadata) {
734   resource_metadata->client_status = XdsApi::ResourceMetadata::NACKED;
735   resource_metadata->failed_version = version;
736   resource_metadata->failed_details = details;
737   resource_metadata->failed_update_time = update_time;
738 }
739 
740 }  // namespace
741 
ParseResource(upb_Arena * arena,size_t idx,absl::string_view type_url,absl::string_view resource_name,absl::string_view serialized_resource)742 void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource(
743     upb_Arena* arena, size_t idx, absl::string_view type_url,
744     absl::string_view resource_name, absl::string_view serialized_resource) {
745   std::string error_prefix = absl::StrCat(
746       "resource index ", idx, ": ",
747       resource_name.empty() ? "" : absl::StrCat(resource_name, ": "));
748   // Check the type_url of the resource.
749   if (result_.type_url != type_url) {
750     result_.errors.emplace_back(
751         absl::StrCat(error_prefix, "incorrect resource type \"", type_url,
752                      "\" (should be \"", result_.type_url, "\")"));
753     return;
754   }
755   // Parse the resource.
756   XdsResourceType::DecodeContext context = {
757       xds_client(), ads_call_state_->chand()->server_, &grpc_xds_client_trace,
758       xds_client()->symtab_.ptr(), arena};
759   XdsResourceType::DecodeResult decode_result =
760       result_.type->Decode(context, serialized_resource);
761   // If we didn't already have the resource name from the Resource
762   // wrapper, try to get it from the decoding result.
763   if (resource_name.empty()) {
764     if (decode_result.name.has_value()) {
765       resource_name = *decode_result.name;
766       error_prefix =
767           absl::StrCat("resource index ", idx, ": ", resource_name, ": ");
768     } else {
769       // We don't have any way of determining the resource name, so
770       // there's nothing more we can do here.
771       result_.errors.emplace_back(absl::StrCat(
772           error_prefix, decode_result.resource.status().ToString()));
773       return;
774     }
775   }
776   // If decoding failed, make sure we include the error in the NACK.
777   const absl::Status& decode_status = decode_result.resource.status();
778   if (!decode_status.ok()) {
779     result_.errors.emplace_back(
780         absl::StrCat(error_prefix, decode_status.ToString()));
781   }
782   // Check the resource name.
783   auto parsed_resource_name =
784       xds_client()->ParseXdsResourceName(resource_name, result_.type);
785   if (!parsed_resource_name.ok()) {
786     result_.errors.emplace_back(
787         absl::StrCat(error_prefix, "Cannot parse xDS resource name"));
788     return;
789   }
790   // Cancel resource-does-not-exist timer, if needed.
791   auto timer_it = ads_call_state_->state_map_.find(result_.type);
792   if (timer_it != ads_call_state_->state_map_.end()) {
793     auto it = timer_it->second.subscribed_resources.find(
794         parsed_resource_name->authority);
795     if (it != timer_it->second.subscribed_resources.end()) {
796       auto res_it = it->second.find(parsed_resource_name->key);
797       if (res_it != it->second.end()) {
798         res_it->second->MarkSeen();
799       }
800     }
801   }
802   // Lookup the authority in the cache.
803   auto authority_it =
804       xds_client()->authority_state_map_.find(parsed_resource_name->authority);
805   if (authority_it == xds_client()->authority_state_map_.end()) {
806     return;  // Skip resource -- we don't have a subscription for it.
807   }
808   // Found authority, so look up type.
809   AuthorityState& authority_state = authority_it->second;
810   auto type_it = authority_state.resource_map.find(result_.type);
811   if (type_it == authority_state.resource_map.end()) {
812     return;  // Skip resource -- we don't have a subscription for it.
813   }
814   auto& type_map = type_it->second;
815   // Found type, so look up resource key.
816   auto it = type_map.find(parsed_resource_name->key);
817   if (it == type_map.end()) {
818     return;  // Skip resource -- we don't have a subscription for it.
819   }
820   ResourceState& resource_state = it->second;
821   // If needed, record that we've seen this resource.
822   if (result_.type->AllResourcesRequiredInSotW()) {
823     result_.resources_seen[parsed_resource_name->authority].insert(
824         parsed_resource_name->key);
825   }
826   // If we previously ignored the resource's deletion, log that we're
827   // now re-adding it.
828   if (resource_state.ignored_deletion) {
829     gpr_log(GPR_INFO,
830             "[xds_client %p] xds server %s: server returned new version of "
831             "resource for which we previously ignored a deletion: type %s "
832             "name %s",
833             xds_client(),
834             ads_call_state_->chand()->server_.server_uri().c_str(),
835             std::string(type_url).c_str(), std::string(resource_name).c_str());
836     resource_state.ignored_deletion = false;
837   }
838   // Update resource state based on whether the resource is valid.
839   if (!decode_status.ok()) {
840     xds_client()->NotifyWatchersOnErrorLocked(
841         resource_state.watchers,
842         absl::UnavailableError(
843             absl::StrCat("invalid resource: ", decode_status.ToString())));
844     UpdateResourceMetadataNacked(result_.version, decode_status.ToString(),
845                                  update_time_, &resource_state.meta);
846     return;
847   }
848   // Resource is valid.
849   result_.have_valid_resources = true;
850   // If it didn't change, ignore it.
851   if (resource_state.resource != nullptr &&
852       result_.type->ResourcesEqual(resource_state.resource.get(),
853                                    decode_result.resource->get())) {
854     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
855       gpr_log(GPR_INFO,
856               "[xds_client %p] %s resource %s identical to current, ignoring.",
857               xds_client(), result_.type_url.c_str(),
858               std::string(resource_name).c_str());
859     }
860     return;
861   }
862   // Update the resource state.
863   resource_state.resource = std::move(*decode_result.resource);
864   resource_state.meta = CreateResourceMetadataAcked(
865       std::string(serialized_resource), result_.version, update_time_);
866   // Notify watchers.
867   auto& watchers_list = resource_state.watchers;
868   auto* value =
869       result_.type->CopyResource(resource_state.resource.get()).release();
870   xds_client()->work_serializer_.Schedule(
871       [watchers_list, value]()
872           ABSL_EXCLUSIVE_LOCKS_REQUIRED(&xds_client()->work_serializer_) {
873             for (const auto& p : watchers_list) {
874               p.first->OnGenericResourceChanged(value);
875             }
876             delete value;
877           },
878       DEBUG_LOCATION);
879 }
880 
881 void XdsClient::ChannelState::AdsCallState::AdsResponseParser::
ResourceWrapperParsingFailed(size_t idx,absl::string_view message)882     ResourceWrapperParsingFailed(size_t idx, absl::string_view message) {
883   result_.errors.emplace_back(
884       absl::StrCat("resource index ", idx, ": ", message));
885 }
886 
887 //
888 // XdsClient::ChannelState::AdsCallState
889 //
890 
AdsCallState(RefCountedPtr<RetryableCall<AdsCallState>> parent)891 XdsClient::ChannelState::AdsCallState::AdsCallState(
892     RefCountedPtr<RetryableCall<AdsCallState>> parent)
893     : InternallyRefCounted<AdsCallState>(
894           GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
895               ? "AdsCallState"
896               : nullptr),
897       parent_(std::move(parent)) {
898   GPR_ASSERT(xds_client() != nullptr);
899   // Init the ADS call.
900   const char* method =
901       "/envoy.service.discovery.v3.AggregatedDiscoveryService/"
902       "StreamAggregatedResources";
903   call_ = chand()->transport_->CreateStreamingCall(
904       method, std::make_unique<StreamEventHandler>(
905                   // Passing the initial ref here.  This ref will go away when
906                   // the StreamEventHandler is destroyed.
907                   RefCountedPtr<AdsCallState>(this)));
908   GPR_ASSERT(call_ != nullptr);
909   // Start the call.
910   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
911     gpr_log(GPR_INFO,
912             "[xds_client %p] xds server %s: starting ADS call "
913             "(calld: %p, call: %p)",
914             xds_client(), chand()->server_.server_uri().c_str(), this,
915             call_.get());
916   }
917   // If this is a reconnect, add any necessary subscriptions from what's
918   // already in the cache.
919   for (const auto& a : xds_client()->authority_state_map_) {
920     const std::string& authority = a.first;
921     // Skip authorities that are not using this xDS channel.
922     if (a.second.channel_state != chand()) continue;
923     for (const auto& t : a.second.resource_map) {
924       const XdsResourceType* type = t.first;
925       for (const auto& r : t.second) {
926         const XdsResourceKey& resource_key = r.first;
927         SubscribeLocked(type, {authority, resource_key}, /*delay_send=*/true);
928       }
929     }
930   }
931   // Send initial message if we added any subscriptions above.
932   for (const auto& p : state_map_) {
933     SendMessageLocked(p.first);
934   }
935 }
936 
Orphan()937 void XdsClient::ChannelState::AdsCallState::Orphan() {
938   state_map_.clear();
939   // Note that the initial ref is held by the StreamEventHandler, which
940   // will be destroyed when call_ is destroyed, which may not happen
941   // here, since there may be other refs held to call_ by internal callbacks.
942   call_.reset();
943 }
944 
SendMessageLocked(const XdsResourceType * type)945 void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
946     const XdsResourceType* type)
947     ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
948   // Buffer message sending if an existing message is in flight.
949   if (send_message_pending_ != nullptr) {
950     buffered_requests_.insert(type);
951     return;
952   }
953   auto& state = state_map_[type];
954   std::string serialized_message = xds_client()->api_.CreateAdsRequest(
955       type->type_url(), chand()->resource_type_version_map_[type], state.nonce,
956       ResourceNamesForRequest(type), state.status, !sent_initial_message_);
957   sent_initial_message_ = true;
958   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
959     gpr_log(GPR_INFO,
960             "[xds_client %p] xds server %s: sending ADS request: type=%s "
961             "version=%s nonce=%s error=%s",
962             xds_client(), chand()->server_.server_uri().c_str(),
963             std::string(type->type_url()).c_str(),
964             chand()->resource_type_version_map_[type].c_str(),
965             state.nonce.c_str(), state.status.ToString().c_str());
966   }
967   state.status = absl::OkStatus();
968   call_->SendMessage(std::move(serialized_message));
969   send_message_pending_ = type;
970 }
971 
SubscribeLocked(const XdsResourceType * type,const XdsResourceName & name,bool delay_send)972 void XdsClient::ChannelState::AdsCallState::SubscribeLocked(
973     const XdsResourceType* type, const XdsResourceName& name, bool delay_send) {
974   auto& state = state_map_[type].subscribed_resources[name.authority][name.key];
975   if (state == nullptr) {
976     state = MakeOrphanable<ResourceTimer>(type, name);
977     if (!delay_send) SendMessageLocked(type);
978   }
979 }
980 
UnsubscribeLocked(const XdsResourceType * type,const XdsResourceName & name,bool delay_unsubscription)981 void XdsClient::ChannelState::AdsCallState::UnsubscribeLocked(
982     const XdsResourceType* type, const XdsResourceName& name,
983     bool delay_unsubscription) {
984   auto& type_state_map = state_map_[type];
985   auto& authority_map = type_state_map.subscribed_resources[name.authority];
986   authority_map.erase(name.key);
987   if (authority_map.empty()) {
988     type_state_map.subscribed_resources.erase(name.authority);
989   }
990   // Don't need to send unsubscription message if this was the last
991   // resource we were subscribed to, since we'll be closing the stream
992   // immediately in that case.
993   if (!delay_unsubscription && HasSubscribedResources()) {
994     SendMessageLocked(type);
995   }
996 }
997 
HasSubscribedResources() const998 bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const {
999   for (const auto& p : state_map_) {
1000     if (!p.second.subscribed_resources.empty()) return true;
1001   }
1002   return false;
1003 }
1004 
OnRequestSent(bool ok)1005 void XdsClient::ChannelState::AdsCallState::OnRequestSent(bool ok) {
1006   MutexLock lock(&xds_client()->mu_);
1007   // For each resource that was in the message we just sent, start the
1008   // resource timer if needed.
1009   if (ok) {
1010     auto& resource_type_state = state_map_[send_message_pending_];
1011     for (const auto& p : resource_type_state.subscribed_resources) {
1012       for (auto& q : p.second) {
1013         q.second->MaybeMarkSubscriptionSendComplete(
1014             Ref(DEBUG_LOCATION, "ResourceTimer"));
1015       }
1016     }
1017   }
1018   send_message_pending_ = nullptr;
1019   if (ok && IsCurrentCallOnChannel()) {
1020     // Continue to send another pending message if any.
1021     // TODO(roth): The current code to handle buffered messages has the
1022     // advantage of sending only the most recent list of resource names for
1023     // each resource type (no matter how many times that resource type has
1024     // been requested to send while the current message sending is still
1025     // pending). But its disadvantage is that we send the requests in fixed
1026     // order of resource types. We need to fix this if we are seeing some
1027     // resource type(s) starved due to frequent requests of other resource
1028     // type(s).
1029     auto it = buffered_requests_.begin();
1030     if (it != buffered_requests_.end()) {
1031       SendMessageLocked(*it);
1032       buffered_requests_.erase(it);
1033     }
1034   }
1035 }
1036 
OnRecvMessage(absl::string_view payload)1037 void XdsClient::ChannelState::AdsCallState::OnRecvMessage(
1038     absl::string_view payload) {
1039   {
1040     MutexLock lock(&xds_client()->mu_);
1041     if (!IsCurrentCallOnChannel()) return;
1042     // Parse and validate the response.
1043     AdsResponseParser parser(this);
1044     absl::Status status = xds_client()->api_.ParseAdsResponse(payload, &parser);
1045     if (!status.ok()) {
1046       // Ignore unparsable response.
1047       gpr_log(GPR_ERROR,
1048               "[xds_client %p] xds server %s: error parsing ADS response (%s) "
1049               "-- ignoring",
1050               xds_client(), chand()->server_.server_uri().c_str(),
1051               status.ToString().c_str());
1052     } else {
1053       seen_response_ = true;
1054       chand()->status_ = absl::OkStatus();
1055       AdsResponseParser::Result result = parser.TakeResult();
1056       // Update nonce.
1057       auto& state = state_map_[result.type];
1058       state.nonce = result.nonce;
1059       // If we got an error, set state.status so that we'll NACK the update.
1060       if (!result.errors.empty()) {
1061         state.status = absl::UnavailableError(
1062             absl::StrCat("xDS response validation errors: [",
1063                          absl::StrJoin(result.errors, "; "), "]"));
1064         gpr_log(GPR_ERROR,
1065                 "[xds_client %p] xds server %s: ADS response invalid for "
1066                 "resource "
1067                 "type %s version %s, will NACK: nonce=%s status=%s",
1068                 xds_client(), chand()->server_.server_uri().c_str(),
1069                 result.type_url.c_str(), result.version.c_str(),
1070                 state.nonce.c_str(), state.status.ToString().c_str());
1071       }
1072       // Delete resources not seen in update if needed.
1073       if (result.type->AllResourcesRequiredInSotW()) {
1074         for (auto& a : xds_client()->authority_state_map_) {
1075           const std::string& authority = a.first;
1076           AuthorityState& authority_state = a.second;
1077           // Skip authorities that are not using this xDS channel.
1078           if (authority_state.channel_state != chand()) continue;
1079           auto seen_authority_it = result.resources_seen.find(authority);
1080           // Find this resource type.
1081           auto type_it = authority_state.resource_map.find(result.type);
1082           if (type_it == authority_state.resource_map.end()) continue;
1083           // Iterate over resource ids.
1084           for (auto& r : type_it->second) {
1085             const XdsResourceKey& resource_key = r.first;
1086             ResourceState& resource_state = r.second;
1087             if (seen_authority_it == result.resources_seen.end() ||
1088                 seen_authority_it->second.find(resource_key) ==
1089                     seen_authority_it->second.end()) {
1090               // If the resource was newly requested but has not yet been
1091               // received, we don't want to generate an error for the
1092               // watchers, because this ADS response may be in reaction to an
1093               // earlier request that did not yet request the new resource, so
1094               // its absence from the response does not necessarily indicate
1095               // that the resource does not exist.  For that case, we rely on
1096               // the request timeout instead.
1097               if (resource_state.resource == nullptr) continue;
1098               if (chand()->server_.IgnoreResourceDeletion()) {
1099                 if (!resource_state.ignored_deletion) {
1100                   gpr_log(GPR_ERROR,
1101                           "[xds_client %p] xds server %s: ignoring deletion "
1102                           "for resource type %s name %s",
1103                           xds_client(), chand()->server_.server_uri().c_str(),
1104                           result.type_url.c_str(),
1105                           XdsClient::ConstructFullXdsResourceName(
1106                               authority, result.type_url.c_str(), resource_key)
1107                               .c_str());
1108                   resource_state.ignored_deletion = true;
1109                 }
1110               } else {
1111                 resource_state.resource.reset();
1112                 resource_state.meta.client_status =
1113                     XdsApi::ResourceMetadata::DOES_NOT_EXIST;
1114                 xds_client()->NotifyWatchersOnResourceDoesNotExist(
1115                     resource_state.watchers);
1116               }
1117             }
1118           }
1119         }
1120       }
1121       // If we had valid resources or the update was empty, update the version.
1122       if (result.have_valid_resources || result.errors.empty()) {
1123         chand()->resource_type_version_map_[result.type] =
1124             std::move(result.version);
1125         // Start load reporting if needed.
1126         auto& lrs_call = chand()->lrs_calld_;
1127         if (lrs_call != nullptr) {
1128           LrsCallState* lrs_calld = lrs_call->calld();
1129           if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked();
1130         }
1131       }
1132       // Send ACK or NACK.
1133       SendMessageLocked(result.type);
1134     }
1135   }
1136   xds_client()->work_serializer_.DrainQueue();
1137 }
1138 
OnStatusReceived(absl::Status status)1139 void XdsClient::ChannelState::AdsCallState::OnStatusReceived(
1140     absl::Status status) {
1141   {
1142     MutexLock lock(&xds_client()->mu_);
1143     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1144       gpr_log(GPR_INFO,
1145               "[xds_client %p] xds server %s: ADS call status received "
1146               "(chand=%p, ads_calld=%p, call=%p): %s",
1147               xds_client(), chand()->server_.server_uri().c_str(), chand(),
1148               this, call_.get(), status.ToString().c_str());
1149     }
1150     // Cancel any does-not-exist timers that may be pending.
1151     for (const auto& p : state_map_) {
1152       for (const auto& q : p.second.subscribed_resources) {
1153         for (auto& r : q.second) {
1154           r.second->MaybeCancelTimer();
1155         }
1156       }
1157     }
1158     // Ignore status from a stale call.
1159     if (IsCurrentCallOnChannel()) {
1160       // Try to restart the call.
1161       parent_->OnCallFinishedLocked();
1162       // If we didn't receive a response on the stream, report the
1163       // stream failure as a connectivity failure, which will report the
1164       // error to all watchers of resources on this channel.
1165       if (!seen_response_) {
1166         chand()->SetChannelStatusLocked(absl::UnavailableError(
1167             absl::StrCat("xDS call failed with no responses received; status: ",
1168                          status.ToString())));
1169       }
1170     }
1171   }
1172   xds_client()->work_serializer_.DrainQueue();
1173 }
1174 
IsCurrentCallOnChannel() const1175 bool XdsClient::ChannelState::AdsCallState::IsCurrentCallOnChannel() const {
1176   // If the retryable ADS call is null (which only happens when the xds
1177   // channel is shutting down), all the ADS calls are stale.
1178   if (chand()->ads_calld_ == nullptr) return false;
1179   return this == chand()->ads_calld_->calld();
1180 }
1181 
1182 std::vector<std::string>
ResourceNamesForRequest(const XdsResourceType * type)1183 XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest(
1184     const XdsResourceType* type) {
1185   std::vector<std::string> resource_names;
1186   auto it = state_map_.find(type);
1187   if (it != state_map_.end()) {
1188     for (auto& a : it->second.subscribed_resources) {
1189       const std::string& authority = a.first;
1190       for (auto& p : a.second) {
1191         const XdsResourceKey& resource_key = p.first;
1192         resource_names.emplace_back(XdsClient::ConstructFullXdsResourceName(
1193             authority, type->type_url(), resource_key));
1194         OrphanablePtr<ResourceTimer>& resource_timer = p.second;
1195         resource_timer->MarkSubscriptionSendStarted();
1196       }
1197     }
1198   }
1199   return resource_names;
1200 }
1201 
1202 //
1203 // XdsClient::ChannelState::LrsCallState::Reporter
1204 //
1205 
Orphan()1206 void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() {
1207   if (timer_handle_.has_value() &&
1208       xds_client()->engine()->Cancel(*timer_handle_)) {
1209     timer_handle_.reset();
1210     Unref(DEBUG_LOCATION, "Orphan");
1211   }
1212 }
1213 
1214 void XdsClient::ChannelState::LrsCallState::Reporter::
ScheduleNextReportLocked()1215     ScheduleNextReportLocked() {
1216   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1217     gpr_log(GPR_INFO,
1218             "[xds_client %p] xds server %s: scheduling load report timer",
1219             xds_client(), parent_->chand()->server_.server_uri().c_str());
1220   }
1221   timer_handle_ = xds_client()->engine()->RunAfter(report_interval_, [this]() {
1222     ApplicationCallbackExecCtx callback_exec_ctx;
1223     ExecCtx exec_ctx;
1224     if (OnNextReportTimer()) {
1225       Unref(DEBUG_LOCATION, "OnNextReportTimer()");
1226     }
1227   });
1228 }
1229 
OnNextReportTimer()1230 bool XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer() {
1231   MutexLock lock(&xds_client()->mu_);
1232   timer_handle_.reset();
1233   if (!IsCurrentReporterOnCall()) return true;
1234   SendReportLocked();
1235   return false;
1236 }
1237 
1238 namespace {
1239 
LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap & snapshot)1240 bool LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap& snapshot) {
1241   for (const auto& p : snapshot) {
1242     const XdsApi::ClusterLoadReport& cluster_snapshot = p.second;
1243     if (!cluster_snapshot.dropped_requests.IsZero()) return false;
1244     for (const auto& q : cluster_snapshot.locality_stats) {
1245       const XdsClusterLocalityStats::Snapshot& locality_snapshot = q.second;
1246       if (!locality_snapshot.IsZero()) return false;
1247     }
1248   }
1249   return true;
1250 }
1251 
1252 }  // namespace
1253 
SendReportLocked()1254 bool XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
1255   // Construct snapshot from all reported stats.
1256   XdsApi::ClusterLoadReportMap snapshot =
1257       xds_client()->BuildLoadReportSnapshotLocked(parent_->chand()->server_,
1258                                                   parent_->send_all_clusters_,
1259                                                   parent_->cluster_names_);
1260   // Skip client load report if the counters were all zero in the last
1261   // report and they are still zero in this one.
1262   const bool old_val = last_report_counters_were_zero_;
1263   last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot);
1264   if (old_val && last_report_counters_were_zero_) {
1265     auto it = xds_client()->xds_load_report_server_map_.find(
1266         &parent_->chand()->server_);
1267     if (it == xds_client()->xds_load_report_server_map_.end() ||
1268         it->second.load_report_map.empty()) {
1269       it->second.channel_state->StopLrsCallLocked();
1270       return true;
1271     }
1272     ScheduleNextReportLocked();
1273     return false;
1274   }
1275   // Send a request that contains the snapshot.
1276   std::string serialized_payload =
1277       xds_client()->api_.CreateLrsRequest(std::move(snapshot));
1278   parent_->call_->SendMessage(std::move(serialized_payload));
1279   parent_->send_message_pending_ = true;
1280   return false;
1281 }
1282 
OnReportDoneLocked()1283 void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked() {
1284   // If a reporter starts a send_message op, then the reporting interval
1285   // changes and we destroy that reporter and create a new one, and then
1286   // the send_message op started by the old reporter finishes, this
1287   // method will be called even though it was for a completion started
1288   // by the old reporter.  In that case, the timer will be pending, so
1289   // we just ignore the completion and wait for the timer to fire.
1290   if (timer_handle_.has_value()) return;
1291   // If there are no more registered stats to report, cancel the call.
1292   auto it = xds_client()->xds_load_report_server_map_.find(
1293       &parent_->chand()->server_);
1294   if (it == xds_client()->xds_load_report_server_map_.end()) return;
1295   if (it->second.load_report_map.empty()) {
1296     if (it->second.channel_state != nullptr) {
1297       it->second.channel_state->StopLrsCallLocked();
1298     }
1299     return;
1300   }
1301   // Otherwise, schedule the next load report.
1302   ScheduleNextReportLocked();
1303 }
1304 
1305 //
1306 // XdsClient::ChannelState::LrsCallState
1307 //
1308 
LrsCallState(RefCountedPtr<RetryableCall<LrsCallState>> parent)1309 XdsClient::ChannelState::LrsCallState::LrsCallState(
1310     RefCountedPtr<RetryableCall<LrsCallState>> parent)
1311     : InternallyRefCounted<LrsCallState>(
1312           GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
1313               ? "LrsCallState"
1314               : nullptr),
1315       parent_(std::move(parent)) {
1316   // Init the LRS call. Note that the call will progress every time there's
1317   // activity in xds_client()->interested_parties_, which is comprised of
1318   // the polling entities from client_channel.
1319   GPR_ASSERT(xds_client() != nullptr);
1320   const char* method =
1321       "/envoy.service.load_stats.v3.LoadReportingService/StreamLoadStats";
1322   call_ = chand()->transport_->CreateStreamingCall(
1323       method, std::make_unique<StreamEventHandler>(
1324                   // Passing the initial ref here.  This ref will go away when
1325                   // the StreamEventHandler is destroyed.
1326                   RefCountedPtr<LrsCallState>(this)));
1327   GPR_ASSERT(call_ != nullptr);
1328   // Start the call.
1329   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1330     gpr_log(GPR_INFO,
1331             "[xds_client %p] xds server %s: starting LRS call (calld=%p, "
1332             "call=%p)",
1333             xds_client(), chand()->server_.server_uri().c_str(), this,
1334             call_.get());
1335   }
1336   // Send the initial request.
1337   std::string serialized_payload = xds_client()->api_.CreateLrsInitialRequest();
1338   call_->SendMessage(std::move(serialized_payload));
1339   send_message_pending_ = true;
1340 }
1341 
Orphan()1342 void XdsClient::ChannelState::LrsCallState::Orphan() {
1343   reporter_.reset();
1344   // Note that the initial ref is held by the StreamEventHandler, which
1345   // will be destroyed when call_ is destroyed, which may not happen
1346   // here, since there may be other refs held to call_ by internal callbacks.
1347   call_.reset();
1348 }
1349 
MaybeStartReportingLocked()1350 void XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked() {
1351   // Don't start again if already started.
1352   if (reporter_ != nullptr) return;
1353   // Don't start if the previous send_message op (of the initial request or
1354   // the last report of the previous reporter) hasn't completed.
1355   if (call_ != nullptr && send_message_pending_) return;
1356   // Don't start if no LRS response has arrived.
1357   if (!seen_response()) return;
1358   // Don't start if the ADS call hasn't received any valid response. Note that
1359   // this must be the first channel because it is the current channel but its
1360   // ADS call hasn't seen any response.
1361   if (chand()->ads_calld_ == nullptr ||
1362       chand()->ads_calld_->calld() == nullptr ||
1363       !chand()->ads_calld_->calld()->seen_response()) {
1364     return;
1365   }
1366   // Start reporting.
1367   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1368     gpr_log(GPR_INFO, "[xds_client %p] xds server %s: creating load reporter",
1369             xds_client(), chand()->server_.server_uri().c_str());
1370   }
1371   reporter_ = MakeOrphanable<Reporter>(
1372       Ref(DEBUG_LOCATION, "LRS+load_report+start"), load_reporting_interval_);
1373 }
1374 
OnRequestSent(bool)1375 void XdsClient::ChannelState::LrsCallState::OnRequestSent(bool /*ok*/) {
1376   MutexLock lock(&xds_client()->mu_);
1377   send_message_pending_ = false;
1378   if (reporter_ != nullptr) {
1379     reporter_->OnReportDoneLocked();
1380   } else {
1381     MaybeStartReportingLocked();
1382   }
1383 }
1384 
OnRecvMessage(absl::string_view payload)1385 void XdsClient::ChannelState::LrsCallState::OnRecvMessage(
1386     absl::string_view payload) {
1387   MutexLock lock(&xds_client()->mu_);
1388   // If we're no longer the current call, ignore the result.
1389   if (!IsCurrentCallOnChannel()) return;
1390   // Parse the response.
1391   bool send_all_clusters = false;
1392   std::set<std::string> new_cluster_names;
1393   Duration new_load_reporting_interval;
1394   absl::Status status = xds_client()->api_.ParseLrsResponse(
1395       payload, &send_all_clusters, &new_cluster_names,
1396       &new_load_reporting_interval);
1397   if (!status.ok()) {
1398     gpr_log(GPR_ERROR,
1399             "[xds_client %p] xds server %s: LRS response parsing failed: %s",
1400             xds_client(), chand()->server_.server_uri().c_str(),
1401             status.ToString().c_str());
1402     return;
1403   }
1404   seen_response_ = true;
1405   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1406     gpr_log(
1407         GPR_INFO,
1408         "[xds_client %p] xds server %s: LRS response received, %" PRIuPTR
1409         " cluster names, send_all_clusters=%d, load_report_interval=%" PRId64
1410         "ms",
1411         xds_client(), chand()->server_.server_uri().c_str(),
1412         new_cluster_names.size(), send_all_clusters,
1413         new_load_reporting_interval.millis());
1414     size_t i = 0;
1415     for (const auto& name : new_cluster_names) {
1416       gpr_log(GPR_INFO, "[xds_client %p] cluster_name %" PRIuPTR ": %s",
1417               xds_client(), i++, name.c_str());
1418     }
1419   }
1420   if (new_load_reporting_interval <
1421       Duration::Milliseconds(GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS)) {
1422     new_load_reporting_interval =
1423         Duration::Milliseconds(GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
1424     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1425       gpr_log(GPR_INFO,
1426               "[xds_client %p] xds server %s: increased load_report_interval "
1427               "to minimum value %dms",
1428               xds_client(), chand()->server_.server_uri().c_str(),
1429               GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
1430     }
1431   }
1432   // Ignore identical update.
1433   if (send_all_clusters == send_all_clusters_ &&
1434       cluster_names_ == new_cluster_names &&
1435       load_reporting_interval_ == new_load_reporting_interval) {
1436     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1437       gpr_log(GPR_INFO,
1438               "[xds_client %p] xds server %s: incoming LRS response identical "
1439               "to current, ignoring.",
1440               xds_client(), chand()->server_.server_uri().c_str());
1441     }
1442     return;
1443   }
1444   // Stop current load reporting (if any) to adopt the new config.
1445   reporter_.reset();
1446   // Record the new config.
1447   send_all_clusters_ = send_all_clusters;
1448   cluster_names_ = std::move(new_cluster_names);
1449   load_reporting_interval_ = new_load_reporting_interval;
1450   // Try starting sending load report.
1451   MaybeStartReportingLocked();
1452 }
1453 
OnStatusReceived(absl::Status status)1454 void XdsClient::ChannelState::LrsCallState::OnStatusReceived(
1455     absl::Status status) {
1456   MutexLock lock(&xds_client()->mu_);
1457   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1458     gpr_log(GPR_INFO,
1459             "[xds_client %p] xds server %s: LRS call status received "
1460             "(chand=%p, calld=%p, call=%p): %s",
1461             xds_client(), chand()->server_.server_uri().c_str(), chand(), this,
1462             call_.get(), status.ToString().c_str());
1463   }
1464   // Ignore status from a stale call.
1465   if (IsCurrentCallOnChannel()) {
1466     // Try to restart the call.
1467     parent_->OnCallFinishedLocked();
1468   }
1469 }
1470 
IsCurrentCallOnChannel() const1471 bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const {
1472   // If the retryable LRS call is null (which only happens when the xds
1473   // channel is shutting down), all the LRS calls are stale.
1474   if (chand()->lrs_calld_ == nullptr) return false;
1475   return this == chand()->lrs_calld_->calld();
1476 }
1477 
1478 //
1479 // XdsClient
1480 //
1481 
XdsClient(std::unique_ptr<XdsBootstrap> bootstrap,OrphanablePtr<XdsTransportFactory> transport_factory,std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine,std::string user_agent_name,std::string user_agent_version,Duration resource_request_timeout)1482 XdsClient::XdsClient(
1483     std::unique_ptr<XdsBootstrap> bootstrap,
1484     OrphanablePtr<XdsTransportFactory> transport_factory,
1485     std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine,
1486     std::string user_agent_name, std::string user_agent_version,
1487     Duration resource_request_timeout)
1488     : DualRefCounted<XdsClient>(
1489           GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) ? "XdsClient"
1490                                                                   : nullptr),
1491       bootstrap_(std::move(bootstrap)),
1492       transport_factory_(std::move(transport_factory)),
1493       request_timeout_(resource_request_timeout),
1494       xds_federation_enabled_(XdsFederationEnabled()),
1495       api_(this, &grpc_xds_client_trace, bootstrap_->node(), &symtab_,
1496            std::move(user_agent_name), std::move(user_agent_version)),
1497       engine_(std::move(engine)) {
1498   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1499     gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this);
1500   }
1501   GPR_ASSERT(bootstrap_ != nullptr);
1502   if (bootstrap_->node() != nullptr) {
1503     gpr_log(GPR_INFO, "[xds_client %p] xDS node ID: %s", this,
1504             bootstrap_->node()->id().c_str());
1505   }
1506 }
1507 
~XdsClient()1508 XdsClient::~XdsClient() {
1509   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1510     gpr_log(GPR_INFO, "[xds_client %p] destroying xds client", this);
1511   }
1512 }
1513 
Orphan()1514 void XdsClient::Orphan() {
1515   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1516     gpr_log(GPR_INFO, "[xds_client %p] shutting down xds client", this);
1517   }
1518   MutexLock lock(&mu_);
1519   shutting_down_ = true;
1520   // Clear cache and any remaining watchers that may not have been cancelled.
1521   authority_state_map_.clear();
1522   invalid_watchers_.clear();
1523   // We may still be sending lingering queued load report data, so don't
1524   // just clear the load reporting map, but we do want to clear the refs
1525   // we're holding to the ChannelState objects, to make sure that
1526   // everything shuts down properly.
1527   for (auto& p : xds_load_report_server_map_) {
1528     p.second.channel_state.reset(DEBUG_LOCATION, "XdsClient::Orphan()");
1529   }
1530 }
1531 
GetOrCreateChannelStateLocked(const XdsBootstrap::XdsServer & server,const char * reason)1532 RefCountedPtr<XdsClient::ChannelState> XdsClient::GetOrCreateChannelStateLocked(
1533     const XdsBootstrap::XdsServer& server, const char* reason) {
1534   auto it = xds_server_channel_map_.find(&server);
1535   if (it != xds_server_channel_map_.end()) {
1536     return it->second->Ref(DEBUG_LOCATION, reason);
1537   }
1538   // Channel not found, so create a new one.
1539   auto channel_state = MakeRefCounted<ChannelState>(
1540       WeakRef(DEBUG_LOCATION, "ChannelState"), server);
1541   xds_server_channel_map_[&server] = channel_state.get();
1542   return channel_state;
1543 }
1544 
WatchResource(const XdsResourceType * type,absl::string_view name,RefCountedPtr<ResourceWatcherInterface> watcher)1545 void XdsClient::WatchResource(const XdsResourceType* type,
1546                               absl::string_view name,
1547                               RefCountedPtr<ResourceWatcherInterface> watcher) {
1548   ResourceWatcherInterface* w = watcher.get();
1549   // Lambda for handling failure cases.
1550   auto fail = [&](absl::Status status) mutable {
1551     {
1552       MutexLock lock(&mu_);
1553       MaybeRegisterResourceTypeLocked(type);
1554       invalid_watchers_[w] = watcher;
1555     }
1556     work_serializer_.Run(
1557         [watcher = std::move(watcher), status = std::move(status)]()
1558             ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
1559               watcher->OnError(status);
1560             },
1561         DEBUG_LOCATION);
1562   };
1563   auto resource_name = ParseXdsResourceName(name, type);
1564   if (!resource_name.ok()) {
1565     fail(absl::UnavailableError(
1566         absl::StrCat("Unable to parse resource name ", name)));
1567     return;
1568   }
1569   // Find server to use.
1570   const XdsBootstrap::XdsServer* xds_server = nullptr;
1571   absl::string_view authority_name = resource_name->authority;
1572   if (absl::ConsumePrefix(&authority_name, "xdstp:")) {
1573     auto* authority = bootstrap_->LookupAuthority(std::string(authority_name));
1574     if (authority == nullptr) {
1575       fail(absl::UnavailableError(
1576           absl::StrCat("authority \"", authority_name,
1577                        "\" not present in bootstrap config")));
1578       return;
1579     }
1580     xds_server = authority->server();
1581   }
1582   if (xds_server == nullptr) xds_server = &bootstrap_->server();
1583   // Canonify the xDS server instance, so that we make sure we're using
1584   // the same instance as will be used in AddClusterDropStats() and
1585   // AddClusterLocalityStats().  This may yield a different result than
1586   // the logic above if the same server is listed both in the authority
1587   // and as the top-level server.
1588   // TODO(roth): This is really ugly -- need to find a better way to
1589   // index the xDS server than by address here.
1590   xds_server = bootstrap_->FindXdsServer(*xds_server);
1591   {
1592     MutexLock lock(&mu_);
1593     MaybeRegisterResourceTypeLocked(type);
1594     AuthorityState& authority_state =
1595         authority_state_map_[resource_name->authority];
1596     ResourceState& resource_state =
1597         authority_state.resource_map[type][resource_name->key];
1598     resource_state.watchers[w] = watcher;
1599     // If we already have a cached value for the resource, notify the new
1600     // watcher immediately.
1601     if (resource_state.resource != nullptr) {
1602       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1603         gpr_log(GPR_INFO,
1604                 "[xds_client %p] returning cached listener data for %s", this,
1605                 std::string(name).c_str());
1606       }
1607       auto* value = type->CopyResource(resource_state.resource.get()).release();
1608       work_serializer_.Schedule(
1609           [watcher, value]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
1610             watcher->OnGenericResourceChanged(value);
1611             delete value;
1612           },
1613           DEBUG_LOCATION);
1614     } else if (resource_state.meta.client_status ==
1615                XdsApi::ResourceMetadata::DOES_NOT_EXIST) {
1616       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1617         gpr_log(GPR_INFO,
1618                 "[xds_client %p] reporting cached does-not-exist for %s", this,
1619                 std::string(name).c_str());
1620       }
1621       work_serializer_.Schedule(
1622           [watcher]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
1623             watcher->OnResourceDoesNotExist();
1624           },
1625           DEBUG_LOCATION);
1626     } else if (resource_state.meta.client_status ==
1627                XdsApi::ResourceMetadata::NACKED) {
1628       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1629         gpr_log(
1630             GPR_INFO,
1631             "[xds_client %p] reporting cached validation failure for %s: %s",
1632             this, std::string(name).c_str(),
1633             resource_state.meta.failed_details.c_str());
1634       }
1635       std::string details = resource_state.meta.failed_details;
1636       const auto* node = bootstrap_->node();
1637       if (node != nullptr) {
1638         absl::StrAppend(&details, " (node ID:", bootstrap_->node()->id(), ")");
1639       }
1640       work_serializer_.Schedule(
1641           [watcher, details = std::move(details)]()
1642               ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
1643                 watcher->OnError(absl::UnavailableError(
1644                     absl::StrCat("invalid resource: ", details)));
1645               },
1646           DEBUG_LOCATION);
1647     }
1648     // If the authority doesn't yet have a channel, set it, creating it if
1649     // needed.
1650     if (authority_state.channel_state == nullptr) {
1651       authority_state.channel_state =
1652           GetOrCreateChannelStateLocked(*xds_server, "start watch");
1653     }
1654     absl::Status channel_status = authority_state.channel_state->status();
1655     if (!channel_status.ok()) {
1656       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1657         gpr_log(GPR_INFO,
1658                 "[xds_client %p] returning cached channel error for %s: %s",
1659                 this, std::string(name).c_str(),
1660                 channel_status.ToString().c_str());
1661       }
1662       work_serializer_.Schedule(
1663           [watcher = std::move(watcher), status = std::move(channel_status)]()
1664               ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) mutable {
1665                 watcher->OnError(std::move(status));
1666               },
1667           DEBUG_LOCATION);
1668     }
1669     authority_state.channel_state->SubscribeLocked(type, *resource_name);
1670   }
1671   work_serializer_.DrainQueue();
1672 }
1673 
CancelResourceWatch(const XdsResourceType * type,absl::string_view name,ResourceWatcherInterface * watcher,bool delay_unsubscription)1674 void XdsClient::CancelResourceWatch(const XdsResourceType* type,
1675                                     absl::string_view name,
1676                                     ResourceWatcherInterface* watcher,
1677                                     bool delay_unsubscription) {
1678   auto resource_name = ParseXdsResourceName(name, type);
1679   MutexLock lock(&mu_);
1680   // We cannot be sure whether the watcher is in invalid_watchers_ or in
1681   // authority_state_map_, so we check both, just to be safe.
1682   invalid_watchers_.erase(watcher);
1683   // Find authority.
1684   if (!resource_name.ok()) return;
1685   auto authority_it = authority_state_map_.find(resource_name->authority);
1686   if (authority_it == authority_state_map_.end()) return;
1687   AuthorityState& authority_state = authority_it->second;
1688   // Find type map.
1689   auto type_it = authority_state.resource_map.find(type);
1690   if (type_it == authority_state.resource_map.end()) return;
1691   auto& type_map = type_it->second;
1692   // Find resource key.
1693   auto resource_it = type_map.find(resource_name->key);
1694   if (resource_it == type_map.end()) return;
1695   ResourceState& resource_state = resource_it->second;
1696   // Remove watcher.
1697   resource_state.watchers.erase(watcher);
1698   // Clean up empty map entries, if any.
1699   if (resource_state.watchers.empty()) {
1700     if (resource_state.ignored_deletion) {
1701       gpr_log(GPR_INFO,
1702               "[xds_client %p] unsubscribing from a resource for which we "
1703               "previously ignored a deletion: type %s name %s",
1704               this, std::string(type->type_url()).c_str(),
1705               std::string(name).c_str());
1706     }
1707     authority_state.channel_state->UnsubscribeLocked(type, *resource_name,
1708                                                      delay_unsubscription);
1709     type_map.erase(resource_it);
1710     if (type_map.empty()) {
1711       authority_state.resource_map.erase(type_it);
1712       if (authority_state.resource_map.empty()) {
1713         authority_state.channel_state.reset();
1714       }
1715     }
1716   }
1717 }
1718 
MaybeRegisterResourceTypeLocked(const XdsResourceType * resource_type)1719 void XdsClient::MaybeRegisterResourceTypeLocked(
1720     const XdsResourceType* resource_type) {
1721   auto it = resource_types_.find(resource_type->type_url());
1722   if (it != resource_types_.end()) {
1723     GPR_ASSERT(it->second == resource_type);
1724     return;
1725   }
1726   resource_types_.emplace(resource_type->type_url(), resource_type);
1727   resource_type->InitUpbSymtab(this, symtab_.ptr());
1728 }
1729 
GetResourceTypeLocked(absl::string_view resource_type)1730 const XdsResourceType* XdsClient::GetResourceTypeLocked(
1731     absl::string_view resource_type) {
1732   auto it = resource_types_.find(resource_type);
1733   if (it != resource_types_.end()) return it->second;
1734   return nullptr;
1735 }
1736 
ParseXdsResourceName(absl::string_view name,const XdsResourceType * type)1737 absl::StatusOr<XdsClient::XdsResourceName> XdsClient::ParseXdsResourceName(
1738     absl::string_view name, const XdsResourceType* type) {
1739   // Old-style names use the empty string for authority.
1740   // authority is prefixed with "old:" to indicate that it's an old-style
1741   // name.
1742   if (!xds_federation_enabled_ || !absl::StartsWith(name, "xdstp:")) {
1743     return XdsResourceName{"old:", {std::string(name), {}}};
1744   }
1745   // New style name.  Parse URI.
1746   auto uri = URI::Parse(name);
1747   if (!uri.ok()) return uri.status();
1748   // Split the resource type off of the path to get the id.
1749   std::pair<absl::string_view, absl::string_view> path_parts = absl::StrSplit(
1750       absl::StripPrefix(uri->path(), "/"), absl::MaxSplits('/', 1));
1751   if (type->type_url() != path_parts.first) {
1752     return absl::InvalidArgumentError(
1753         "xdstp URI path must indicate valid xDS resource type");
1754   }
1755   // Canonicalize order of query params.
1756   std::vector<URI::QueryParam> query_params;
1757   for (const auto& p : uri->query_parameter_map()) {
1758     query_params.emplace_back(
1759         URI::QueryParam{std::string(p.first), std::string(p.second)});
1760   }
1761   return XdsResourceName{
1762       absl::StrCat("xdstp:", uri->authority()),
1763       {std::string(path_parts.second), std::move(query_params)}};
1764 }
1765 
ConstructFullXdsResourceName(absl::string_view authority,absl::string_view resource_type,const XdsResourceKey & key)1766 std::string XdsClient::ConstructFullXdsResourceName(
1767     absl::string_view authority, absl::string_view resource_type,
1768     const XdsResourceKey& key) {
1769   if (absl::ConsumePrefix(&authority, "xdstp:")) {
1770     auto uri = URI::Create("xdstp", std::string(authority),
1771                            absl::StrCat("/", resource_type, "/", key.id),
1772                            key.query_params, /*fragment=*/"");
1773     GPR_ASSERT(uri.ok());
1774     return uri->ToString();
1775   }
1776   // Old-style name.
1777   return key.id;
1778 }
1779 
AddClusterDropStats(const XdsBootstrap::XdsServer & xds_server,absl::string_view cluster_name,absl::string_view eds_service_name)1780 RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats(
1781     const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
1782     absl::string_view eds_service_name) {
1783   const auto* server = bootstrap_->FindXdsServer(xds_server);
1784   if (server == nullptr) return nullptr;
1785   auto key =
1786       std::make_pair(std::string(cluster_name), std::string(eds_service_name));
1787   RefCountedPtr<XdsClusterDropStats> cluster_drop_stats;
1788   {
1789     MutexLock lock(&mu_);
1790     // We jump through some hoops here to make sure that the const
1791     // XdsBootstrap::XdsServer& and absl::string_views
1792     // stored in the XdsClusterDropStats object point to the
1793     // XdsBootstrap::XdsServer and strings
1794     // in the load_report_map_ key, so that they have the same lifetime.
1795     auto server_it =
1796         xds_load_report_server_map_.emplace(server, LoadReportServer()).first;
1797     if (server_it->second.channel_state == nullptr) {
1798       server_it->second.channel_state = GetOrCreateChannelStateLocked(
1799           *server, "load report map (drop stats)");
1800     }
1801     auto load_report_it = server_it->second.load_report_map
1802                               .emplace(std::move(key), LoadReportState())
1803                               .first;
1804     LoadReportState& load_report_state = load_report_it->second;
1805     if (load_report_state.drop_stats != nullptr) {
1806       cluster_drop_stats = load_report_state.drop_stats->RefIfNonZero();
1807     }
1808     if (cluster_drop_stats == nullptr) {
1809       if (load_report_state.drop_stats != nullptr) {
1810         load_report_state.deleted_drop_stats +=
1811             load_report_state.drop_stats->GetSnapshotAndReset();
1812       }
1813       cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>(
1814           Ref(DEBUG_LOCATION, "DropStats"), *server,
1815           load_report_it->first.first /*cluster_name*/,
1816           load_report_it->first.second /*eds_service_name*/);
1817       load_report_state.drop_stats = cluster_drop_stats.get();
1818     }
1819     server_it->second.channel_state->MaybeStartLrsCall();
1820   }
1821   work_serializer_.DrainQueue();
1822   return cluster_drop_stats;
1823 }
1824 
RemoveClusterDropStats(const XdsBootstrap::XdsServer & xds_server,absl::string_view cluster_name,absl::string_view eds_service_name,XdsClusterDropStats * cluster_drop_stats)1825 void XdsClient::RemoveClusterDropStats(
1826     const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
1827     absl::string_view eds_service_name,
1828     XdsClusterDropStats* cluster_drop_stats) {
1829   const auto* server = bootstrap_->FindXdsServer(xds_server);
1830   if (server == nullptr) return;
1831   MutexLock lock(&mu_);
1832   auto server_it = xds_load_report_server_map_.find(server);
1833   if (server_it == xds_load_report_server_map_.end()) return;
1834   auto load_report_it = server_it->second.load_report_map.find(
1835       std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
1836   if (load_report_it == server_it->second.load_report_map.end()) return;
1837   LoadReportState& load_report_state = load_report_it->second;
1838   if (load_report_state.drop_stats == cluster_drop_stats) {
1839     // Record final snapshot in deleted_drop_stats, which will be
1840     // added to the next load report.
1841     load_report_state.deleted_drop_stats +=
1842         load_report_state.drop_stats->GetSnapshotAndReset();
1843     load_report_state.drop_stats = nullptr;
1844   }
1845 }
1846 
AddClusterLocalityStats(const XdsBootstrap::XdsServer & xds_server,absl::string_view cluster_name,absl::string_view eds_service_name,RefCountedPtr<XdsLocalityName> locality)1847 RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats(
1848     const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
1849     absl::string_view eds_service_name,
1850     RefCountedPtr<XdsLocalityName> locality) {
1851   const auto* server = bootstrap_->FindXdsServer(xds_server);
1852   if (server == nullptr) return nullptr;
1853   auto key =
1854       std::make_pair(std::string(cluster_name), std::string(eds_service_name));
1855   RefCountedPtr<XdsClusterLocalityStats> cluster_locality_stats;
1856   {
1857     MutexLock lock(&mu_);
1858     // We jump through some hoops here to make sure that the const
1859     // XdsBootstrap::XdsServer& and absl::string_views
1860     // stored in the XdsClusterDropStats object point to the
1861     // XdsBootstrap::XdsServer and strings
1862     // in the load_report_map_ key, so that they have the same lifetime.
1863     auto server_it =
1864         xds_load_report_server_map_.emplace(server, LoadReportServer()).first;
1865     if (server_it->second.channel_state == nullptr) {
1866       server_it->second.channel_state = GetOrCreateChannelStateLocked(
1867           *server, "load report map (locality stats)");
1868     }
1869     auto load_report_it = server_it->second.load_report_map
1870                               .emplace(std::move(key), LoadReportState())
1871                               .first;
1872     LoadReportState& load_report_state = load_report_it->second;
1873     LoadReportState::LocalityState& locality_state =
1874         load_report_state.locality_stats[locality];
1875     if (locality_state.locality_stats != nullptr) {
1876       cluster_locality_stats = locality_state.locality_stats->RefIfNonZero();
1877     }
1878     if (cluster_locality_stats == nullptr) {
1879       if (locality_state.locality_stats != nullptr) {
1880         locality_state.deleted_locality_stats +=
1881             locality_state.locality_stats->GetSnapshotAndReset();
1882       }
1883       cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>(
1884           Ref(DEBUG_LOCATION, "LocalityStats"), *server,
1885           load_report_it->first.first /*cluster_name*/,
1886           load_report_it->first.second /*eds_service_name*/,
1887           std::move(locality));
1888       locality_state.locality_stats = cluster_locality_stats.get();
1889     }
1890     server_it->second.channel_state->MaybeStartLrsCall();
1891   }
1892   work_serializer_.DrainQueue();
1893   return cluster_locality_stats;
1894 }
1895 
RemoveClusterLocalityStats(const XdsBootstrap::XdsServer & xds_server,absl::string_view cluster_name,absl::string_view eds_service_name,const RefCountedPtr<XdsLocalityName> & locality,XdsClusterLocalityStats * cluster_locality_stats)1896 void XdsClient::RemoveClusterLocalityStats(
1897     const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
1898     absl::string_view eds_service_name,
1899     const RefCountedPtr<XdsLocalityName>& locality,
1900     XdsClusterLocalityStats* cluster_locality_stats) {
1901   const auto* server = bootstrap_->FindXdsServer(xds_server);
1902   if (server == nullptr) return;
1903   MutexLock lock(&mu_);
1904   auto server_it = xds_load_report_server_map_.find(server);
1905   if (server_it == xds_load_report_server_map_.end()) return;
1906   auto load_report_it = server_it->second.load_report_map.find(
1907       std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
1908   if (load_report_it == server_it->second.load_report_map.end()) return;
1909   LoadReportState& load_report_state = load_report_it->second;
1910   auto locality_it = load_report_state.locality_stats.find(locality);
1911   if (locality_it == load_report_state.locality_stats.end()) return;
1912   LoadReportState::LocalityState& locality_state = locality_it->second;
1913   if (locality_state.locality_stats == cluster_locality_stats) {
1914     // Record final snapshot in deleted_locality_stats, which will be
1915     // added to the next load report.
1916     locality_state.deleted_locality_stats +=
1917         locality_state.locality_stats->GetSnapshotAndReset();
1918     locality_state.locality_stats = nullptr;
1919   }
1920 }
1921 
ResetBackoff()1922 void XdsClient::ResetBackoff() {
1923   MutexLock lock(&mu_);
1924   for (auto& p : xds_server_channel_map_) {
1925     p.second->ResetBackoff();
1926   }
1927 }
1928 
NotifyWatchersOnErrorLocked(const std::map<ResourceWatcherInterface *,RefCountedPtr<ResourceWatcherInterface>> & watchers,absl::Status status)1929 void XdsClient::NotifyWatchersOnErrorLocked(
1930     const std::map<ResourceWatcherInterface*,
1931                    RefCountedPtr<ResourceWatcherInterface>>& watchers,
1932     absl::Status status) {
1933   const auto* node = bootstrap_->node();
1934   if (node != nullptr) {
1935     status = absl::Status(
1936         status.code(),
1937         absl::StrCat(status.message(), " (node ID:", node->id(), ")"));
1938   }
1939   work_serializer_.Schedule(
1940       [watchers, status = std::move(status)]()
1941           ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
1942             for (const auto& p : watchers) {
1943               p.first->OnError(status);
1944             }
1945           },
1946       DEBUG_LOCATION);
1947 }
1948 
NotifyWatchersOnResourceDoesNotExist(const std::map<ResourceWatcherInterface *,RefCountedPtr<ResourceWatcherInterface>> & watchers)1949 void XdsClient::NotifyWatchersOnResourceDoesNotExist(
1950     const std::map<ResourceWatcherInterface*,
1951                    RefCountedPtr<ResourceWatcherInterface>>& watchers) {
1952   work_serializer_.Schedule(
1953       [watchers]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
1954         for (const auto& p : watchers) {
1955           p.first->OnResourceDoesNotExist();
1956         }
1957       },
1958       DEBUG_LOCATION);
1959 }
1960 
BuildLoadReportSnapshotLocked(const XdsBootstrap::XdsServer & xds_server,bool send_all_clusters,const std::set<std::string> & clusters)1961 XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(
1962     const XdsBootstrap::XdsServer& xds_server, bool send_all_clusters,
1963     const std::set<std::string>& clusters) {
1964   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1965     gpr_log(GPR_INFO, "[xds_client %p] start building load report", this);
1966   }
1967   XdsApi::ClusterLoadReportMap snapshot_map;
1968   auto server_it = xds_load_report_server_map_.find(&xds_server);
1969   if (server_it == xds_load_report_server_map_.end()) return snapshot_map;
1970   auto& load_report_map = server_it->second.load_report_map;
1971   for (auto load_report_it = load_report_map.begin();
1972        load_report_it != load_report_map.end();) {
1973     // Cluster key is cluster and EDS service name.
1974     const auto& cluster_key = load_report_it->first;
1975     LoadReportState& load_report = load_report_it->second;
1976     // If the CDS response for a cluster indicates to use LRS but the
1977     // LRS server does not say that it wants reports for this cluster,
1978     // then we'll have stats objects here whose data we're not going to
1979     // include in the load report.  However, we still need to clear out
1980     // the data from the stats objects, so that if the LRS server starts
1981     // asking for the data in the future, we don't incorrectly include
1982     // data from previous reporting intervals in that future report.
1983     const bool record_stats =
1984         send_all_clusters || clusters.find(cluster_key.first) != clusters.end();
1985     XdsApi::ClusterLoadReport snapshot;
1986     // Aggregate drop stats.
1987     snapshot.dropped_requests = std::move(load_report.deleted_drop_stats);
1988     if (load_report.drop_stats != nullptr) {
1989       snapshot.dropped_requests +=
1990           load_report.drop_stats->GetSnapshotAndReset();
1991       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1992         gpr_log(GPR_INFO,
1993                 "[xds_client %p] cluster=%s eds_service_name=%s drop_stats=%p",
1994                 this, cluster_key.first.c_str(), cluster_key.second.c_str(),
1995                 load_report.drop_stats);
1996       }
1997     }
1998     // Aggregate locality stats.
1999     for (auto it = load_report.locality_stats.begin();
2000          it != load_report.locality_stats.end();) {
2001       const RefCountedPtr<XdsLocalityName>& locality_name = it->first;
2002       auto& locality_state = it->second;
2003       XdsClusterLocalityStats::Snapshot& locality_snapshot =
2004           snapshot.locality_stats[locality_name];
2005       locality_snapshot = std::move(locality_state.deleted_locality_stats);
2006       if (locality_state.locality_stats != nullptr) {
2007         locality_snapshot +=
2008             locality_state.locality_stats->GetSnapshotAndReset();
2009         if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2010           gpr_log(GPR_INFO,
2011                   "[xds_client %p] cluster=%s eds_service_name=%s "
2012                   "locality=%s locality_stats=%p",
2013                   this, cluster_key.first.c_str(), cluster_key.second.c_str(),
2014                   locality_name->AsHumanReadableString().c_str(),
2015                   locality_state.locality_stats);
2016         }
2017       }
2018       // If the only thing left in this entry was final snapshots from
2019       // deleted locality stats objects, remove the entry.
2020       if (locality_state.locality_stats == nullptr) {
2021         it = load_report.locality_stats.erase(it);
2022       } else {
2023         ++it;
2024       }
2025     }
2026     // Compute load report interval.
2027     const Timestamp now = Timestamp::Now();
2028     snapshot.load_report_interval = now - load_report.last_report_time;
2029     load_report.last_report_time = now;
2030     // Record snapshot.
2031     if (record_stats) {
2032       snapshot_map[cluster_key] = std::move(snapshot);
2033     }
2034     // If the only thing left in this entry was final snapshots from
2035     // deleted stats objects, remove the entry.
2036     if (load_report.locality_stats.empty() &&
2037         load_report.drop_stats == nullptr) {
2038       load_report_it = load_report_map.erase(load_report_it);
2039     } else {
2040       ++load_report_it;
2041     }
2042   }
2043   return snapshot_map;
2044 }
2045 
DumpClientConfigBinary()2046 std::string XdsClient::DumpClientConfigBinary() {
2047   MutexLock lock(&mu_);
2048   XdsApi::ResourceTypeMetadataMap resource_type_metadata_map;
2049   for (const auto& a : authority_state_map_) {  // authority
2050     const std::string& authority = a.first;
2051     for (const auto& t : a.second.resource_map) {  // type
2052       const XdsResourceType* type = t.first;
2053       auto& resource_metadata_map =
2054           resource_type_metadata_map[type->type_url()];
2055       for (const auto& r : t.second) {  // resource id
2056         const XdsResourceKey& resource_key = r.first;
2057         const ResourceState& resource_state = r.second;
2058         resource_metadata_map[ConstructFullXdsResourceName(
2059             authority, type->type_url(), resource_key)] = &resource_state.meta;
2060       }
2061     }
2062   }
2063   // Assemble config dump messages
2064   return api_.AssembleClientConfig(resource_type_metadata_map);
2065 }
2066 
2067 }  // namespace grpc_core
2068