1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include "src/core/ext/xds/xds_client.h"
20
21 #include <inttypes.h>
22 #include <string.h>
23
24 #include <algorithm>
25 #include <type_traits>
26
27 #include "absl/strings/match.h"
28 #include "absl/strings/str_cat.h"
29 #include "absl/strings/str_join.h"
30 #include "absl/strings/str_split.h"
31 #include "absl/strings/string_view.h"
32 #include "absl/strings/strip.h"
33 #include "absl/types/optional.h"
34 #include "upb/mem/arena.h"
35
36 #include <grpc/event_engine/event_engine.h>
37 #include <grpc/support/log.h>
38
39 #include "src/core/ext/xds/xds_api.h"
40 #include "src/core/ext/xds/xds_bootstrap.h"
41 #include "src/core/ext/xds/xds_client_stats.h"
42 #include "src/core/lib/backoff/backoff.h"
43 #include "src/core/lib/gprpp/debug_location.h"
44 #include "src/core/lib/gprpp/orphanable.h"
45 #include "src/core/lib/gprpp/ref_counted_ptr.h"
46 #include "src/core/lib/gprpp/sync.h"
47 #include "src/core/lib/iomgr/exec_ctx.h"
48 #include "src/core/lib/uri/uri_parser.h"
49
50 #define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
51 #define GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER 1.6
52 #define GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS 120
53 #define GRPC_XDS_RECONNECT_JITTER 0.2
54 #define GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS 1000
55
56 namespace grpc_core {
57
58 using ::grpc_event_engine::experimental::EventEngine;
59
60 TraceFlag grpc_xds_client_trace(false, "xds_client");
61 TraceFlag grpc_xds_client_refcount_trace(false, "xds_client_refcount");
62
63 //
64 // Internal class declarations
65 //
66
67 // An xds call wrapper that can restart a call upon failure. Holds a ref to
68 // the xds channel. The template parameter is the kind of wrapped xds call.
69 template <typename T>
70 class XdsClient::ChannelState::RetryableCall
71 : public InternallyRefCounted<RetryableCall<T>> {
72 public:
73 explicit RetryableCall(WeakRefCountedPtr<ChannelState> chand);
74
75 // Disable thread-safety analysis because this method is called via
76 // OrphanablePtr<>, but there's no way to pass the lock annotation
77 // through there.
78 void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
79
80 void OnCallFinishedLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
81
calld() const82 T* calld() const { return calld_.get(); }
chand() const83 ChannelState* chand() const { return chand_.get(); }
84
85 bool IsCurrentCallOnChannel() const;
86
87 private:
88 void StartNewCallLocked();
89 void StartRetryTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
90
91 void OnRetryTimer();
92
93 // The wrapped xds call that talks to the xds server. It's instantiated
94 // every time we start a new call. It's null during call retry backoff.
95 OrphanablePtr<T> calld_;
96 // The owning xds channel.
97 WeakRefCountedPtr<ChannelState> chand_;
98
99 // Retry state.
100 BackOff backoff_;
101 absl::optional<EventEngine::TaskHandle> timer_handle_
102 ABSL_GUARDED_BY(&XdsClient::mu_);
103
104 bool shutting_down_ = false;
105 };
106
107 // Contains an ADS call to the xds server.
108 class XdsClient::ChannelState::AdsCallState
109 : public InternallyRefCounted<AdsCallState> {
110 public:
111 // The ctor and dtor should not be used directly.
112 explicit AdsCallState(RefCountedPtr<RetryableCall<AdsCallState>> parent);
113
114 void Orphan() override;
115
parent() const116 RetryableCall<AdsCallState>* parent() const { return parent_.get(); }
chand() const117 ChannelState* chand() const { return parent_->chand(); }
xds_client() const118 XdsClient* xds_client() const { return chand()->xds_client(); }
seen_response() const119 bool seen_response() const { return seen_response_; }
120
121 void SubscribeLocked(const XdsResourceType* type, const XdsResourceName& name,
122 bool delay_send)
123 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
124 void UnsubscribeLocked(const XdsResourceType* type,
125 const XdsResourceName& name, bool delay_unsubscription)
126 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
127
128 bool HasSubscribedResources() const;
129
130 private:
131 class AdsResponseParser : public XdsApi::AdsResponseParserInterface {
132 public:
133 struct Result {
134 const XdsResourceType* type;
135 std::string type_url;
136 std::string version;
137 std::string nonce;
138 std::vector<std::string> errors;
139 std::map<std::string /*authority*/, std::set<XdsResourceKey>>
140 resources_seen;
141 bool have_valid_resources = false;
142 };
143
AdsResponseParser(AdsCallState * ads_call_state)144 explicit AdsResponseParser(AdsCallState* ads_call_state)
145 : ads_call_state_(ads_call_state) {}
146
147 absl::Status ProcessAdsResponseFields(AdsResponseFields fields) override
148 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
149
150 void ParseResource(upb_Arena* arena, size_t idx, absl::string_view type_url,
151 absl::string_view resource_name,
152 absl::string_view serialized_resource) override
153 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
154
155 void ResourceWrapperParsingFailed(size_t idx,
156 absl::string_view message) override;
157
TakeResult()158 Result TakeResult() { return std::move(result_); }
159
160 private:
xds_client() const161 XdsClient* xds_client() const { return ads_call_state_->xds_client(); }
162
163 AdsCallState* ads_call_state_;
164 const Timestamp update_time_ = Timestamp::Now();
165 Result result_;
166 };
167
168 class ResourceTimer : public InternallyRefCounted<ResourceTimer> {
169 public:
ResourceTimer(const XdsResourceType * type,const XdsResourceName & name)170 ResourceTimer(const XdsResourceType* type, const XdsResourceName& name)
171 : type_(type), name_(name) {}
172
173 // Disable thread-safety analysis because this method is called via
174 // OrphanablePtr<>, but there's no way to pass the lock annotation
175 // through there.
Orphan()176 void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS {
177 MaybeCancelTimer();
178 Unref(DEBUG_LOCATION, "Orphan");
179 }
180
MarkSubscriptionSendStarted()181 void MarkSubscriptionSendStarted()
182 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
183 subscription_sent_ = true;
184 }
185
MaybeMarkSubscriptionSendComplete(RefCountedPtr<AdsCallState> ads_calld)186 void MaybeMarkSubscriptionSendComplete(
187 RefCountedPtr<AdsCallState> ads_calld)
188 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
189 if (subscription_sent_) MaybeStartTimer(std::move(ads_calld));
190 }
191
MarkSeen()192 void MarkSeen() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
193 resource_seen_ = true;
194 MaybeCancelTimer();
195 }
196
MaybeCancelTimer()197 void MaybeCancelTimer() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
198 if (timer_handle_.has_value() &&
199 ads_calld_->xds_client()->engine()->Cancel(*timer_handle_)) {
200 timer_handle_.reset();
201 }
202 }
203
204 private:
MaybeStartTimer(RefCountedPtr<AdsCallState> ads_calld)205 void MaybeStartTimer(RefCountedPtr<AdsCallState> ads_calld)
206 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
207 // Don't start timer if we've already either seen the resource or
208 // marked it as non-existing.
209 // Note: There are edge cases where we can have seen the resource
210 // before we have sent the initial subscription request, such as
211 // when we unsubscribe and then resubscribe to a given resource
212 // and then get a response containing that resource, all while a
213 // send_message op is in flight.
214 if (resource_seen_) return;
215 // Don't start timer if we haven't yet sent the initial subscription
216 // request for the resource.
217 if (!subscription_sent_) return;
218 // Don't start timer if it's already running.
219 if (timer_handle_.has_value()) return;
220 // Check if we already have a cached version of this resource
221 // (i.e., if this is the initial request for the resource after an
222 // ADS stream restart). If so, we don't start the timer, because
223 // (a) we already have the resource and (b) the server may
224 // optimize by not resending the resource that we already have.
225 auto& authority_state =
226 ads_calld->xds_client()->authority_state_map_[name_.authority];
227 ResourceState& state = authority_state.resource_map[type_][name_.key];
228 if (state.resource != nullptr) return;
229 // Start timer.
230 ads_calld_ = std::move(ads_calld);
231 timer_handle_ = ads_calld_->xds_client()->engine()->RunAfter(
232 ads_calld_->xds_client()->request_timeout_,
233 [self = Ref(DEBUG_LOCATION, "timer")]() {
234 ApplicationCallbackExecCtx callback_exec_ctx;
235 ExecCtx exec_ctx;
236 self->OnTimer();
237 });
238 }
239
OnTimer()240 void OnTimer() {
241 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
242 gpr_log(GPR_INFO,
243 "[xds_client %p] xds server %s: timeout obtaining resource "
244 "{type=%s name=%s} from xds server",
245 ads_calld_->xds_client(),
246 ads_calld_->chand()->server_.server_uri().c_str(),
247 std::string(type_->type_url()).c_str(),
248 XdsClient::ConstructFullXdsResourceName(
249 name_.authority, type_->type_url(), name_.key)
250 .c_str());
251 }
252 {
253 MutexLock lock(&ads_calld_->xds_client()->mu_);
254 timer_handle_.reset();
255 resource_seen_ = true;
256 auto& authority_state =
257 ads_calld_->xds_client()->authority_state_map_[name_.authority];
258 ResourceState& state = authority_state.resource_map[type_][name_.key];
259 state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
260 ads_calld_->xds_client()->NotifyWatchersOnResourceDoesNotExist(
261 state.watchers);
262 }
263 ads_calld_->xds_client()->work_serializer_.DrainQueue();
264 ads_calld_.reset();
265 }
266
267 const XdsResourceType* type_;
268 const XdsResourceName name_;
269
270 RefCountedPtr<AdsCallState> ads_calld_;
271 // True if we have sent the initial subscription request for this
272 // resource on this ADS stream.
273 bool subscription_sent_ ABSL_GUARDED_BY(&XdsClient::mu_) = false;
274 // True if we have either (a) seen the resource in a response on this
275 // stream or (b) declared the resource to not exist due to the timer
276 // firing.
277 bool resource_seen_ ABSL_GUARDED_BY(&XdsClient::mu_) = false;
278 absl::optional<EventEngine::TaskHandle> timer_handle_
279 ABSL_GUARDED_BY(&XdsClient::mu_);
280 };
281
282 class StreamEventHandler
283 : public XdsTransportFactory::XdsTransport::StreamingCall::EventHandler {
284 public:
StreamEventHandler(RefCountedPtr<AdsCallState> ads_calld)285 explicit StreamEventHandler(RefCountedPtr<AdsCallState> ads_calld)
286 : ads_calld_(std::move(ads_calld)) {}
287
OnRequestSent(bool ok)288 void OnRequestSent(bool ok) override { ads_calld_->OnRequestSent(ok); }
OnRecvMessage(absl::string_view payload)289 void OnRecvMessage(absl::string_view payload) override {
290 ads_calld_->OnRecvMessage(payload);
291 }
OnStatusReceived(absl::Status status)292 void OnStatusReceived(absl::Status status) override {
293 ads_calld_->OnStatusReceived(std::move(status));
294 }
295
296 private:
297 RefCountedPtr<AdsCallState> ads_calld_;
298 };
299
300 struct ResourceTypeState {
301 // Nonce and status for this resource type.
302 std::string nonce;
303 absl::Status status;
304
305 // Subscribed resources of this type.
306 std::map<std::string /*authority*/,
307 std::map<XdsResourceKey, OrphanablePtr<ResourceTimer>>>
308 subscribed_resources;
309 };
310
311 void SendMessageLocked(const XdsResourceType* type)
312 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
313
314 void OnRequestSent(bool ok);
315 void OnRecvMessage(absl::string_view payload);
316 void OnStatusReceived(absl::Status status);
317
318 bool IsCurrentCallOnChannel() const;
319
320 // Constructs a list of resource names of a given type for an ADS
321 // request. Also starts the timer for each resource if needed.
322 std::vector<std::string> ResourceNamesForRequest(const XdsResourceType* type)
323 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
324
325 // The owning RetryableCall<>.
326 RefCountedPtr<RetryableCall<AdsCallState>> parent_;
327
328 OrphanablePtr<XdsTransportFactory::XdsTransport::StreamingCall> call_;
329
330 bool sent_initial_message_ = false;
331 bool seen_response_ = false;
332
333 const XdsResourceType* send_message_pending_
334 ABSL_GUARDED_BY(&XdsClient::mu_) = nullptr;
335
336 // Resource types for which requests need to be sent.
337 std::set<const XdsResourceType*> buffered_requests_;
338
339 // State for each resource type.
340 std::map<const XdsResourceType*, ResourceTypeState> state_map_;
341 };
342
343 // Contains an LRS call to the xds server.
344 class XdsClient::ChannelState::LrsCallState
345 : public InternallyRefCounted<LrsCallState> {
346 public:
347 // The ctor and dtor should not be used directly.
348 explicit LrsCallState(RefCountedPtr<RetryableCall<LrsCallState>> parent);
349
350 void Orphan() override;
351
352 void MaybeStartReportingLocked()
353 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
354
parent()355 RetryableCall<LrsCallState>* parent() { return parent_.get(); }
chand() const356 ChannelState* chand() const { return parent_->chand(); }
xds_client() const357 XdsClient* xds_client() const { return chand()->xds_client(); }
seen_response() const358 bool seen_response() const { return seen_response_; }
359
360 private:
361 class StreamEventHandler
362 : public XdsTransportFactory::XdsTransport::StreamingCall::EventHandler {
363 public:
StreamEventHandler(RefCountedPtr<LrsCallState> lrs_calld)364 explicit StreamEventHandler(RefCountedPtr<LrsCallState> lrs_calld)
365 : lrs_calld_(std::move(lrs_calld)) {}
366
OnRequestSent(bool ok)367 void OnRequestSent(bool ok) override { lrs_calld_->OnRequestSent(ok); }
OnRecvMessage(absl::string_view payload)368 void OnRecvMessage(absl::string_view payload) override {
369 lrs_calld_->OnRecvMessage(payload);
370 }
OnStatusReceived(absl::Status status)371 void OnStatusReceived(absl::Status status) override {
372 lrs_calld_->OnStatusReceived(std::move(status));
373 }
374
375 private:
376 RefCountedPtr<LrsCallState> lrs_calld_;
377 };
378
379 // Reports client-side load stats according to a fixed interval.
380 class Reporter : public InternallyRefCounted<Reporter> {
381 public:
Reporter(RefCountedPtr<LrsCallState> parent,Duration report_interval)382 Reporter(RefCountedPtr<LrsCallState> parent, Duration report_interval)
383 : parent_(std::move(parent)), report_interval_(report_interval) {
384 ScheduleNextReportLocked();
385 }
386
387 // Disable thread-safety analysis because this method is called via
388 // OrphanablePtr<>, but there's no way to pass the lock annotation
389 // through there.
390 void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
391
392 void OnReportDoneLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
393
394 private:
395 void ScheduleNextReportLocked()
396 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
397 bool OnNextReportTimer();
398 bool SendReportLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
399
IsCurrentReporterOnCall() const400 bool IsCurrentReporterOnCall() const {
401 return this == parent_->reporter_.get();
402 }
xds_client() const403 XdsClient* xds_client() const { return parent_->xds_client(); }
404
405 // The owning LRS call.
406 RefCountedPtr<LrsCallState> parent_;
407
408 // The load reporting state.
409 const Duration report_interval_;
410 bool last_report_counters_were_zero_ = false;
411 absl::optional<EventEngine::TaskHandle> timer_handle_
412 ABSL_GUARDED_BY(&XdsClient::mu_);
413 };
414
415 void OnRequestSent(bool ok);
416 void OnRecvMessage(absl::string_view payload);
417 void OnStatusReceived(absl::Status status);
418
419 bool IsCurrentCallOnChannel() const;
420
421 // The owning RetryableCall<>.
422 RefCountedPtr<RetryableCall<LrsCallState>> parent_;
423
424 OrphanablePtr<XdsTransportFactory::XdsTransport::StreamingCall> call_;
425
426 bool seen_response_ = false;
427 bool send_message_pending_ ABSL_GUARDED_BY(&XdsClient::mu_) = false;
428
429 // Load reporting state.
430 bool send_all_clusters_ = false;
431 std::set<std::string> cluster_names_; // Asked for by the LRS server.
432 Duration load_reporting_interval_;
433 OrphanablePtr<Reporter> reporter_;
434 };
435
436 //
437 // XdsClient::ChannelState
438 //
439
ChannelState(WeakRefCountedPtr<XdsClient> xds_client,const XdsBootstrap::XdsServer & server)440 XdsClient::ChannelState::ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
441 const XdsBootstrap::XdsServer& server)
442 : DualRefCounted<ChannelState>(
443 GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
444 ? "ChannelState"
445 : nullptr),
446 xds_client_(std::move(xds_client)),
447 server_(server) {
448 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
449 gpr_log(GPR_INFO, "[xds_client %p] creating channel %p for server %s",
450 xds_client_.get(), this, server.server_uri().c_str());
451 }
452 absl::Status status;
453 transport_ = xds_client_->transport_factory_->Create(
454 server,
455 [self = WeakRef(DEBUG_LOCATION, "OnConnectivityFailure")](
456 absl::Status status) {
457 self->OnConnectivityFailure(std::move(status));
458 },
459 &status);
460 GPR_ASSERT(transport_ != nullptr);
461 if (!status.ok()) SetChannelStatusLocked(std::move(status));
462 }
463
~ChannelState()464 XdsClient::ChannelState::~ChannelState() {
465 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
466 gpr_log(GPR_INFO, "[xds_client %p] destroying xds channel %p for server %s",
467 xds_client(), this, server_.server_uri().c_str());
468 }
469 xds_client_.reset(DEBUG_LOCATION, "ChannelState");
470 }
471
472 // This method should only ever be called when holding the lock, but we can't
473 // use a ABSL_EXCLUSIVE_LOCKS_REQUIRED annotation, because Orphan() will be
474 // called from DualRefCounted::Unref, which cannot have a lock annotation for
475 // a lock in this subclass.
Orphan()476 void XdsClient::ChannelState::Orphan() ABSL_NO_THREAD_SAFETY_ANALYSIS {
477 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
478 gpr_log(GPR_INFO, "[xds_client %p] orphaning xds channel %p for server %s",
479 xds_client(), this, server_.server_uri().c_str());
480 }
481 shutting_down_ = true;
482 transport_.reset();
483 // At this time, all strong refs are removed, remove from channel map to
484 // prevent subsequent subscription from trying to use this ChannelState as
485 // it is shutting down.
486 xds_client_->xds_server_channel_map_.erase(&server_);
487 ads_calld_.reset();
488 lrs_calld_.reset();
489 }
490
ResetBackoff()491 void XdsClient::ChannelState::ResetBackoff() { transport_->ResetBackoff(); }
492
ads_calld() const493 XdsClient::ChannelState::AdsCallState* XdsClient::ChannelState::ads_calld()
494 const {
495 return ads_calld_->calld();
496 }
497
lrs_calld() const498 XdsClient::ChannelState::LrsCallState* XdsClient::ChannelState::lrs_calld()
499 const {
500 return lrs_calld_->calld();
501 }
502
MaybeStartLrsCall()503 void XdsClient::ChannelState::MaybeStartLrsCall() {
504 if (lrs_calld_ != nullptr) return;
505 lrs_calld_.reset(new RetryableCall<LrsCallState>(
506 WeakRef(DEBUG_LOCATION, "ChannelState+lrs")));
507 }
508
StopLrsCallLocked()509 void XdsClient::ChannelState::StopLrsCallLocked() {
510 xds_client_->xds_load_report_server_map_.erase(&server_);
511 lrs_calld_.reset();
512 }
513
SubscribeLocked(const XdsResourceType * type,const XdsResourceName & name)514 void XdsClient::ChannelState::SubscribeLocked(const XdsResourceType* type,
515 const XdsResourceName& name) {
516 if (ads_calld_ == nullptr) {
517 // Start the ADS call if this is the first request.
518 ads_calld_.reset(new RetryableCall<AdsCallState>(
519 WeakRef(DEBUG_LOCATION, "ChannelState+ads")));
520 // Note: AdsCallState's ctor will automatically subscribe to all
521 // resources that the XdsClient already has watchers for, so we can
522 // return here.
523 return;
524 }
525 // If the ADS call is in backoff state, we don't need to do anything now
526 // because when the call is restarted it will resend all necessary requests.
527 if (ads_calld() == nullptr) return;
528 // Subscribe to this resource if the ADS call is active.
529 ads_calld()->SubscribeLocked(type, name, /*delay_send=*/false);
530 }
531
UnsubscribeLocked(const XdsResourceType * type,const XdsResourceName & name,bool delay_unsubscription)532 void XdsClient::ChannelState::UnsubscribeLocked(const XdsResourceType* type,
533 const XdsResourceName& name,
534 bool delay_unsubscription) {
535 if (ads_calld_ != nullptr) {
536 auto* calld = ads_calld_->calld();
537 if (calld != nullptr) {
538 calld->UnsubscribeLocked(type, name, delay_unsubscription);
539 if (!calld->HasSubscribedResources()) {
540 ads_calld_.reset();
541 }
542 }
543 }
544 }
545
OnConnectivityFailure(absl::Status status)546 void XdsClient::ChannelState::OnConnectivityFailure(absl::Status status) {
547 {
548 MutexLock lock(&xds_client_->mu_);
549 SetChannelStatusLocked(std::move(status));
550 }
551 xds_client_->work_serializer_.DrainQueue();
552 }
553
SetChannelStatusLocked(absl::Status status)554 void XdsClient::ChannelState::SetChannelStatusLocked(absl::Status status) {
555 if (shutting_down_) return;
556 status = absl::Status(status.code(), absl::StrCat("xDS channel for server ",
557 server_.server_uri(), ": ",
558 status.message()));
559 gpr_log(GPR_INFO, "[xds_client %p] %s", xds_client(),
560 status.ToString().c_str());
561 // If the node ID is set, append that to the status message that we send to
562 // the watchers, so that it will appear in log messages visible to users.
563 const auto* node = xds_client_->bootstrap_->node();
564 if (node != nullptr) {
565 status = absl::Status(
566 status.code(),
567 absl::StrCat(status.message(),
568 " (node ID:", xds_client_->bootstrap_->node()->id(), ")"));
569 }
570 // Save status in channel, so that we can immediately generate an
571 // error for any new watchers that may be started.
572 status_ = status;
573 // Find all watchers for this channel.
574 std::set<RefCountedPtr<ResourceWatcherInterface>> watchers;
575 for (const auto& a : xds_client_->authority_state_map_) { // authority
576 if (a.second.channel_state != this) continue;
577 for (const auto& t : a.second.resource_map) { // type
578 for (const auto& r : t.second) { // resource id
579 for (const auto& w : r.second.watchers) { // watchers
580 watchers.insert(w.second);
581 }
582 }
583 }
584 }
585 // Enqueue notification for the watchers.
586 xds_client_->work_serializer_.Schedule(
587 [watchers = std::move(watchers), status = std::move(status)]()
588 ABSL_EXCLUSIVE_LOCKS_REQUIRED(xds_client_->work_serializer_) {
589 for (const auto& watcher : watchers) {
590 watcher->OnError(status);
591 }
592 },
593 DEBUG_LOCATION);
594 }
595
596 //
597 // XdsClient::ChannelState::RetryableCall<>
598 //
599
600 template <typename T>
RetryableCall(WeakRefCountedPtr<ChannelState> chand)601 XdsClient::ChannelState::RetryableCall<T>::RetryableCall(
602 WeakRefCountedPtr<ChannelState> chand)
603 : chand_(std::move(chand)),
604 backoff_(BackOff::Options()
605 .set_initial_backoff(Duration::Seconds(
606 GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS))
607 .set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER)
608 .set_jitter(GRPC_XDS_RECONNECT_JITTER)
609 .set_max_backoff(Duration::Seconds(
610 GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS))) {
611 StartNewCallLocked();
612 }
613
614 template <typename T>
Orphan()615 void XdsClient::ChannelState::RetryableCall<T>::Orphan() {
616 shutting_down_ = true;
617 calld_.reset();
618 if (timer_handle_.has_value()) {
619 chand()->xds_client()->engine()->Cancel(*timer_handle_);
620 timer_handle_.reset();
621 }
622 this->Unref(DEBUG_LOCATION, "RetryableCall+orphaned");
623 }
624
625 template <typename T>
OnCallFinishedLocked()626 void XdsClient::ChannelState::RetryableCall<T>::OnCallFinishedLocked() {
627 // If we saw a response on the current stream, reset backoff.
628 if (calld_->seen_response()) backoff_.Reset();
629 calld_.reset();
630 // Start retry timer.
631 StartRetryTimerLocked();
632 }
633
634 template <typename T>
StartNewCallLocked()635 void XdsClient::ChannelState::RetryableCall<T>::StartNewCallLocked() {
636 if (shutting_down_) return;
637 GPR_ASSERT(chand_->transport_ != nullptr);
638 GPR_ASSERT(calld_ == nullptr);
639 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
640 gpr_log(GPR_INFO,
641 "[xds_client %p] xds server %s: start new call from retryable "
642 "call %p",
643 chand()->xds_client(), chand()->server_.server_uri().c_str(), this);
644 }
645 calld_ = MakeOrphanable<T>(
646 this->Ref(DEBUG_LOCATION, "RetryableCall+start_new_call"));
647 }
648
649 template <typename T>
StartRetryTimerLocked()650 void XdsClient::ChannelState::RetryableCall<T>::StartRetryTimerLocked() {
651 if (shutting_down_) return;
652 const Timestamp next_attempt_time = backoff_.NextAttemptTime();
653 const Duration timeout =
654 std::max(next_attempt_time - Timestamp::Now(), Duration::Zero());
655 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
656 gpr_log(GPR_INFO,
657 "[xds_client %p] xds server %s: call attempt failed; "
658 "retry timer will fire in %" PRId64 "ms.",
659 chand()->xds_client(), chand()->server_.server_uri().c_str(),
660 timeout.millis());
661 }
662 timer_handle_ = chand()->xds_client()->engine()->RunAfter(
663 timeout,
664 [self = this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start")]() {
665 ApplicationCallbackExecCtx callback_exec_ctx;
666 ExecCtx exec_ctx;
667 self->OnRetryTimer();
668 });
669 }
670
671 template <typename T>
OnRetryTimer()672 void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimer() {
673 MutexLock lock(&chand_->xds_client()->mu_);
674 if (timer_handle_.has_value()) {
675 timer_handle_.reset();
676 if (shutting_down_) return;
677 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
678 gpr_log(GPR_INFO,
679 "[xds_client %p] xds server %s: retry timer fired (retryable "
680 "call: %p)",
681 chand()->xds_client(), chand()->server_.server_uri().c_str(),
682 this);
683 }
684 StartNewCallLocked();
685 }
686 }
687
688 //
689 // XdsClient::ChannelState::AdsCallState::AdsResponseParser
690 //
691
692 absl::Status XdsClient::ChannelState::AdsCallState::AdsResponseParser::
ProcessAdsResponseFields(AdsResponseFields fields)693 ProcessAdsResponseFields(AdsResponseFields fields) {
694 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
695 gpr_log(
696 GPR_INFO,
697 "[xds_client %p] xds server %s: received ADS response: type_url=%s, "
698 "version=%s, nonce=%s, num_resources=%" PRIuPTR,
699 ads_call_state_->xds_client(),
700 ads_call_state_->chand()->server_.server_uri().c_str(),
701 fields.type_url.c_str(), fields.version.c_str(), fields.nonce.c_str(),
702 fields.num_resources);
703 }
704 result_.type =
705 ads_call_state_->xds_client()->GetResourceTypeLocked(fields.type_url);
706 if (result_.type == nullptr) {
707 return absl::InvalidArgumentError(
708 absl::StrCat("unknown resource type ", fields.type_url));
709 }
710 result_.type_url = std::move(fields.type_url);
711 result_.version = std::move(fields.version);
712 result_.nonce = std::move(fields.nonce);
713 return absl::OkStatus();
714 }
715
716 namespace {
717
718 // Build a resource metadata struct for ADS result accepting methods and CSDS.
CreateResourceMetadataAcked(std::string serialized_proto,std::string version,Timestamp update_time)719 XdsApi::ResourceMetadata CreateResourceMetadataAcked(
720 std::string serialized_proto, std::string version, Timestamp update_time) {
721 XdsApi::ResourceMetadata resource_metadata;
722 resource_metadata.serialized_proto = std::move(serialized_proto);
723 resource_metadata.update_time = update_time;
724 resource_metadata.version = std::move(version);
725 resource_metadata.client_status = XdsApi::ResourceMetadata::ACKED;
726 return resource_metadata;
727 }
728
729 // Update resource_metadata for NACK.
UpdateResourceMetadataNacked(const std::string & version,const std::string & details,Timestamp update_time,XdsApi::ResourceMetadata * resource_metadata)730 void UpdateResourceMetadataNacked(const std::string& version,
731 const std::string& details,
732 Timestamp update_time,
733 XdsApi::ResourceMetadata* resource_metadata) {
734 resource_metadata->client_status = XdsApi::ResourceMetadata::NACKED;
735 resource_metadata->failed_version = version;
736 resource_metadata->failed_details = details;
737 resource_metadata->failed_update_time = update_time;
738 }
739
740 } // namespace
741
ParseResource(upb_Arena * arena,size_t idx,absl::string_view type_url,absl::string_view resource_name,absl::string_view serialized_resource)742 void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource(
743 upb_Arena* arena, size_t idx, absl::string_view type_url,
744 absl::string_view resource_name, absl::string_view serialized_resource) {
745 std::string error_prefix = absl::StrCat(
746 "resource index ", idx, ": ",
747 resource_name.empty() ? "" : absl::StrCat(resource_name, ": "));
748 // Check the type_url of the resource.
749 if (result_.type_url != type_url) {
750 result_.errors.emplace_back(
751 absl::StrCat(error_prefix, "incorrect resource type \"", type_url,
752 "\" (should be \"", result_.type_url, "\")"));
753 return;
754 }
755 // Parse the resource.
756 XdsResourceType::DecodeContext context = {
757 xds_client(), ads_call_state_->chand()->server_, &grpc_xds_client_trace,
758 xds_client()->symtab_.ptr(), arena};
759 XdsResourceType::DecodeResult decode_result =
760 result_.type->Decode(context, serialized_resource);
761 // If we didn't already have the resource name from the Resource
762 // wrapper, try to get it from the decoding result.
763 if (resource_name.empty()) {
764 if (decode_result.name.has_value()) {
765 resource_name = *decode_result.name;
766 error_prefix =
767 absl::StrCat("resource index ", idx, ": ", resource_name, ": ");
768 } else {
769 // We don't have any way of determining the resource name, so
770 // there's nothing more we can do here.
771 result_.errors.emplace_back(absl::StrCat(
772 error_prefix, decode_result.resource.status().ToString()));
773 return;
774 }
775 }
776 // If decoding failed, make sure we include the error in the NACK.
777 const absl::Status& decode_status = decode_result.resource.status();
778 if (!decode_status.ok()) {
779 result_.errors.emplace_back(
780 absl::StrCat(error_prefix, decode_status.ToString()));
781 }
782 // Check the resource name.
783 auto parsed_resource_name =
784 xds_client()->ParseXdsResourceName(resource_name, result_.type);
785 if (!parsed_resource_name.ok()) {
786 result_.errors.emplace_back(
787 absl::StrCat(error_prefix, "Cannot parse xDS resource name"));
788 return;
789 }
790 // Cancel resource-does-not-exist timer, if needed.
791 auto timer_it = ads_call_state_->state_map_.find(result_.type);
792 if (timer_it != ads_call_state_->state_map_.end()) {
793 auto it = timer_it->second.subscribed_resources.find(
794 parsed_resource_name->authority);
795 if (it != timer_it->second.subscribed_resources.end()) {
796 auto res_it = it->second.find(parsed_resource_name->key);
797 if (res_it != it->second.end()) {
798 res_it->second->MarkSeen();
799 }
800 }
801 }
802 // Lookup the authority in the cache.
803 auto authority_it =
804 xds_client()->authority_state_map_.find(parsed_resource_name->authority);
805 if (authority_it == xds_client()->authority_state_map_.end()) {
806 return; // Skip resource -- we don't have a subscription for it.
807 }
808 // Found authority, so look up type.
809 AuthorityState& authority_state = authority_it->second;
810 auto type_it = authority_state.resource_map.find(result_.type);
811 if (type_it == authority_state.resource_map.end()) {
812 return; // Skip resource -- we don't have a subscription for it.
813 }
814 auto& type_map = type_it->second;
815 // Found type, so look up resource key.
816 auto it = type_map.find(parsed_resource_name->key);
817 if (it == type_map.end()) {
818 return; // Skip resource -- we don't have a subscription for it.
819 }
820 ResourceState& resource_state = it->second;
821 // If needed, record that we've seen this resource.
822 if (result_.type->AllResourcesRequiredInSotW()) {
823 result_.resources_seen[parsed_resource_name->authority].insert(
824 parsed_resource_name->key);
825 }
826 // If we previously ignored the resource's deletion, log that we're
827 // now re-adding it.
828 if (resource_state.ignored_deletion) {
829 gpr_log(GPR_INFO,
830 "[xds_client %p] xds server %s: server returned new version of "
831 "resource for which we previously ignored a deletion: type %s "
832 "name %s",
833 xds_client(),
834 ads_call_state_->chand()->server_.server_uri().c_str(),
835 std::string(type_url).c_str(), std::string(resource_name).c_str());
836 resource_state.ignored_deletion = false;
837 }
838 // Update resource state based on whether the resource is valid.
839 if (!decode_status.ok()) {
840 xds_client()->NotifyWatchersOnErrorLocked(
841 resource_state.watchers,
842 absl::UnavailableError(
843 absl::StrCat("invalid resource: ", decode_status.ToString())));
844 UpdateResourceMetadataNacked(result_.version, decode_status.ToString(),
845 update_time_, &resource_state.meta);
846 return;
847 }
848 // Resource is valid.
849 result_.have_valid_resources = true;
850 // If it didn't change, ignore it.
851 if (resource_state.resource != nullptr &&
852 result_.type->ResourcesEqual(resource_state.resource.get(),
853 decode_result.resource->get())) {
854 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
855 gpr_log(GPR_INFO,
856 "[xds_client %p] %s resource %s identical to current, ignoring.",
857 xds_client(), result_.type_url.c_str(),
858 std::string(resource_name).c_str());
859 }
860 return;
861 }
862 // Update the resource state.
863 resource_state.resource = std::move(*decode_result.resource);
864 resource_state.meta = CreateResourceMetadataAcked(
865 std::string(serialized_resource), result_.version, update_time_);
866 // Notify watchers.
867 auto& watchers_list = resource_state.watchers;
868 auto* value =
869 result_.type->CopyResource(resource_state.resource.get()).release();
870 xds_client()->work_serializer_.Schedule(
871 [watchers_list, value]()
872 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&xds_client()->work_serializer_) {
873 for (const auto& p : watchers_list) {
874 p.first->OnGenericResourceChanged(value);
875 }
876 delete value;
877 },
878 DEBUG_LOCATION);
879 }
880
881 void XdsClient::ChannelState::AdsCallState::AdsResponseParser::
ResourceWrapperParsingFailed(size_t idx,absl::string_view message)882 ResourceWrapperParsingFailed(size_t idx, absl::string_view message) {
883 result_.errors.emplace_back(
884 absl::StrCat("resource index ", idx, ": ", message));
885 }
886
887 //
888 // XdsClient::ChannelState::AdsCallState
889 //
890
AdsCallState(RefCountedPtr<RetryableCall<AdsCallState>> parent)891 XdsClient::ChannelState::AdsCallState::AdsCallState(
892 RefCountedPtr<RetryableCall<AdsCallState>> parent)
893 : InternallyRefCounted<AdsCallState>(
894 GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
895 ? "AdsCallState"
896 : nullptr),
897 parent_(std::move(parent)) {
898 GPR_ASSERT(xds_client() != nullptr);
899 // Init the ADS call.
900 const char* method =
901 "/envoy.service.discovery.v3.AggregatedDiscoveryService/"
902 "StreamAggregatedResources";
903 call_ = chand()->transport_->CreateStreamingCall(
904 method, std::make_unique<StreamEventHandler>(
905 // Passing the initial ref here. This ref will go away when
906 // the StreamEventHandler is destroyed.
907 RefCountedPtr<AdsCallState>(this)));
908 GPR_ASSERT(call_ != nullptr);
909 // Start the call.
910 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
911 gpr_log(GPR_INFO,
912 "[xds_client %p] xds server %s: starting ADS call "
913 "(calld: %p, call: %p)",
914 xds_client(), chand()->server_.server_uri().c_str(), this,
915 call_.get());
916 }
917 // If this is a reconnect, add any necessary subscriptions from what's
918 // already in the cache.
919 for (const auto& a : xds_client()->authority_state_map_) {
920 const std::string& authority = a.first;
921 // Skip authorities that are not using this xDS channel.
922 if (a.second.channel_state != chand()) continue;
923 for (const auto& t : a.second.resource_map) {
924 const XdsResourceType* type = t.first;
925 for (const auto& r : t.second) {
926 const XdsResourceKey& resource_key = r.first;
927 SubscribeLocked(type, {authority, resource_key}, /*delay_send=*/true);
928 }
929 }
930 }
931 // Send initial message if we added any subscriptions above.
932 for (const auto& p : state_map_) {
933 SendMessageLocked(p.first);
934 }
935 }
936
Orphan()937 void XdsClient::ChannelState::AdsCallState::Orphan() {
938 state_map_.clear();
939 // Note that the initial ref is held by the StreamEventHandler, which
940 // will be destroyed when call_ is destroyed, which may not happen
941 // here, since there may be other refs held to call_ by internal callbacks.
942 call_.reset();
943 }
944
SendMessageLocked(const XdsResourceType * type)945 void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
946 const XdsResourceType* type)
947 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
948 // Buffer message sending if an existing message is in flight.
949 if (send_message_pending_ != nullptr) {
950 buffered_requests_.insert(type);
951 return;
952 }
953 auto& state = state_map_[type];
954 std::string serialized_message = xds_client()->api_.CreateAdsRequest(
955 type->type_url(), chand()->resource_type_version_map_[type], state.nonce,
956 ResourceNamesForRequest(type), state.status, !sent_initial_message_);
957 sent_initial_message_ = true;
958 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
959 gpr_log(GPR_INFO,
960 "[xds_client %p] xds server %s: sending ADS request: type=%s "
961 "version=%s nonce=%s error=%s",
962 xds_client(), chand()->server_.server_uri().c_str(),
963 std::string(type->type_url()).c_str(),
964 chand()->resource_type_version_map_[type].c_str(),
965 state.nonce.c_str(), state.status.ToString().c_str());
966 }
967 state.status = absl::OkStatus();
968 call_->SendMessage(std::move(serialized_message));
969 send_message_pending_ = type;
970 }
971
SubscribeLocked(const XdsResourceType * type,const XdsResourceName & name,bool delay_send)972 void XdsClient::ChannelState::AdsCallState::SubscribeLocked(
973 const XdsResourceType* type, const XdsResourceName& name, bool delay_send) {
974 auto& state = state_map_[type].subscribed_resources[name.authority][name.key];
975 if (state == nullptr) {
976 state = MakeOrphanable<ResourceTimer>(type, name);
977 if (!delay_send) SendMessageLocked(type);
978 }
979 }
980
UnsubscribeLocked(const XdsResourceType * type,const XdsResourceName & name,bool delay_unsubscription)981 void XdsClient::ChannelState::AdsCallState::UnsubscribeLocked(
982 const XdsResourceType* type, const XdsResourceName& name,
983 bool delay_unsubscription) {
984 auto& type_state_map = state_map_[type];
985 auto& authority_map = type_state_map.subscribed_resources[name.authority];
986 authority_map.erase(name.key);
987 if (authority_map.empty()) {
988 type_state_map.subscribed_resources.erase(name.authority);
989 }
990 // Don't need to send unsubscription message if this was the last
991 // resource we were subscribed to, since we'll be closing the stream
992 // immediately in that case.
993 if (!delay_unsubscription && HasSubscribedResources()) {
994 SendMessageLocked(type);
995 }
996 }
997
HasSubscribedResources() const998 bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const {
999 for (const auto& p : state_map_) {
1000 if (!p.second.subscribed_resources.empty()) return true;
1001 }
1002 return false;
1003 }
1004
OnRequestSent(bool ok)1005 void XdsClient::ChannelState::AdsCallState::OnRequestSent(bool ok) {
1006 MutexLock lock(&xds_client()->mu_);
1007 // For each resource that was in the message we just sent, start the
1008 // resource timer if needed.
1009 if (ok) {
1010 auto& resource_type_state = state_map_[send_message_pending_];
1011 for (const auto& p : resource_type_state.subscribed_resources) {
1012 for (auto& q : p.second) {
1013 q.second->MaybeMarkSubscriptionSendComplete(
1014 Ref(DEBUG_LOCATION, "ResourceTimer"));
1015 }
1016 }
1017 }
1018 send_message_pending_ = nullptr;
1019 if (ok && IsCurrentCallOnChannel()) {
1020 // Continue to send another pending message if any.
1021 // TODO(roth): The current code to handle buffered messages has the
1022 // advantage of sending only the most recent list of resource names for
1023 // each resource type (no matter how many times that resource type has
1024 // been requested to send while the current message sending is still
1025 // pending). But its disadvantage is that we send the requests in fixed
1026 // order of resource types. We need to fix this if we are seeing some
1027 // resource type(s) starved due to frequent requests of other resource
1028 // type(s).
1029 auto it = buffered_requests_.begin();
1030 if (it != buffered_requests_.end()) {
1031 SendMessageLocked(*it);
1032 buffered_requests_.erase(it);
1033 }
1034 }
1035 }
1036
OnRecvMessage(absl::string_view payload)1037 void XdsClient::ChannelState::AdsCallState::OnRecvMessage(
1038 absl::string_view payload) {
1039 {
1040 MutexLock lock(&xds_client()->mu_);
1041 if (!IsCurrentCallOnChannel()) return;
1042 // Parse and validate the response.
1043 AdsResponseParser parser(this);
1044 absl::Status status = xds_client()->api_.ParseAdsResponse(payload, &parser);
1045 if (!status.ok()) {
1046 // Ignore unparsable response.
1047 gpr_log(GPR_ERROR,
1048 "[xds_client %p] xds server %s: error parsing ADS response (%s) "
1049 "-- ignoring",
1050 xds_client(), chand()->server_.server_uri().c_str(),
1051 status.ToString().c_str());
1052 } else {
1053 seen_response_ = true;
1054 chand()->status_ = absl::OkStatus();
1055 AdsResponseParser::Result result = parser.TakeResult();
1056 // Update nonce.
1057 auto& state = state_map_[result.type];
1058 state.nonce = result.nonce;
1059 // If we got an error, set state.status so that we'll NACK the update.
1060 if (!result.errors.empty()) {
1061 state.status = absl::UnavailableError(
1062 absl::StrCat("xDS response validation errors: [",
1063 absl::StrJoin(result.errors, "; "), "]"));
1064 gpr_log(GPR_ERROR,
1065 "[xds_client %p] xds server %s: ADS response invalid for "
1066 "resource "
1067 "type %s version %s, will NACK: nonce=%s status=%s",
1068 xds_client(), chand()->server_.server_uri().c_str(),
1069 result.type_url.c_str(), result.version.c_str(),
1070 state.nonce.c_str(), state.status.ToString().c_str());
1071 }
1072 // Delete resources not seen in update if needed.
1073 if (result.type->AllResourcesRequiredInSotW()) {
1074 for (auto& a : xds_client()->authority_state_map_) {
1075 const std::string& authority = a.first;
1076 AuthorityState& authority_state = a.second;
1077 // Skip authorities that are not using this xDS channel.
1078 if (authority_state.channel_state != chand()) continue;
1079 auto seen_authority_it = result.resources_seen.find(authority);
1080 // Find this resource type.
1081 auto type_it = authority_state.resource_map.find(result.type);
1082 if (type_it == authority_state.resource_map.end()) continue;
1083 // Iterate over resource ids.
1084 for (auto& r : type_it->second) {
1085 const XdsResourceKey& resource_key = r.first;
1086 ResourceState& resource_state = r.second;
1087 if (seen_authority_it == result.resources_seen.end() ||
1088 seen_authority_it->second.find(resource_key) ==
1089 seen_authority_it->second.end()) {
1090 // If the resource was newly requested but has not yet been
1091 // received, we don't want to generate an error for the
1092 // watchers, because this ADS response may be in reaction to an
1093 // earlier request that did not yet request the new resource, so
1094 // its absence from the response does not necessarily indicate
1095 // that the resource does not exist. For that case, we rely on
1096 // the request timeout instead.
1097 if (resource_state.resource == nullptr) continue;
1098 if (chand()->server_.IgnoreResourceDeletion()) {
1099 if (!resource_state.ignored_deletion) {
1100 gpr_log(GPR_ERROR,
1101 "[xds_client %p] xds server %s: ignoring deletion "
1102 "for resource type %s name %s",
1103 xds_client(), chand()->server_.server_uri().c_str(),
1104 result.type_url.c_str(),
1105 XdsClient::ConstructFullXdsResourceName(
1106 authority, result.type_url.c_str(), resource_key)
1107 .c_str());
1108 resource_state.ignored_deletion = true;
1109 }
1110 } else {
1111 resource_state.resource.reset();
1112 resource_state.meta.client_status =
1113 XdsApi::ResourceMetadata::DOES_NOT_EXIST;
1114 xds_client()->NotifyWatchersOnResourceDoesNotExist(
1115 resource_state.watchers);
1116 }
1117 }
1118 }
1119 }
1120 }
1121 // If we had valid resources or the update was empty, update the version.
1122 if (result.have_valid_resources || result.errors.empty()) {
1123 chand()->resource_type_version_map_[result.type] =
1124 std::move(result.version);
1125 // Start load reporting if needed.
1126 auto& lrs_call = chand()->lrs_calld_;
1127 if (lrs_call != nullptr) {
1128 LrsCallState* lrs_calld = lrs_call->calld();
1129 if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked();
1130 }
1131 }
1132 // Send ACK or NACK.
1133 SendMessageLocked(result.type);
1134 }
1135 }
1136 xds_client()->work_serializer_.DrainQueue();
1137 }
1138
OnStatusReceived(absl::Status status)1139 void XdsClient::ChannelState::AdsCallState::OnStatusReceived(
1140 absl::Status status) {
1141 {
1142 MutexLock lock(&xds_client()->mu_);
1143 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1144 gpr_log(GPR_INFO,
1145 "[xds_client %p] xds server %s: ADS call status received "
1146 "(chand=%p, ads_calld=%p, call=%p): %s",
1147 xds_client(), chand()->server_.server_uri().c_str(), chand(),
1148 this, call_.get(), status.ToString().c_str());
1149 }
1150 // Cancel any does-not-exist timers that may be pending.
1151 for (const auto& p : state_map_) {
1152 for (const auto& q : p.second.subscribed_resources) {
1153 for (auto& r : q.second) {
1154 r.second->MaybeCancelTimer();
1155 }
1156 }
1157 }
1158 // Ignore status from a stale call.
1159 if (IsCurrentCallOnChannel()) {
1160 // Try to restart the call.
1161 parent_->OnCallFinishedLocked();
1162 // If we didn't receive a response on the stream, report the
1163 // stream failure as a connectivity failure, which will report the
1164 // error to all watchers of resources on this channel.
1165 if (!seen_response_) {
1166 chand()->SetChannelStatusLocked(absl::UnavailableError(
1167 absl::StrCat("xDS call failed with no responses received; status: ",
1168 status.ToString())));
1169 }
1170 }
1171 }
1172 xds_client()->work_serializer_.DrainQueue();
1173 }
1174
IsCurrentCallOnChannel() const1175 bool XdsClient::ChannelState::AdsCallState::IsCurrentCallOnChannel() const {
1176 // If the retryable ADS call is null (which only happens when the xds
1177 // channel is shutting down), all the ADS calls are stale.
1178 if (chand()->ads_calld_ == nullptr) return false;
1179 return this == chand()->ads_calld_->calld();
1180 }
1181
1182 std::vector<std::string>
ResourceNamesForRequest(const XdsResourceType * type)1183 XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest(
1184 const XdsResourceType* type) {
1185 std::vector<std::string> resource_names;
1186 auto it = state_map_.find(type);
1187 if (it != state_map_.end()) {
1188 for (auto& a : it->second.subscribed_resources) {
1189 const std::string& authority = a.first;
1190 for (auto& p : a.second) {
1191 const XdsResourceKey& resource_key = p.first;
1192 resource_names.emplace_back(XdsClient::ConstructFullXdsResourceName(
1193 authority, type->type_url(), resource_key));
1194 OrphanablePtr<ResourceTimer>& resource_timer = p.second;
1195 resource_timer->MarkSubscriptionSendStarted();
1196 }
1197 }
1198 }
1199 return resource_names;
1200 }
1201
1202 //
1203 // XdsClient::ChannelState::LrsCallState::Reporter
1204 //
1205
Orphan()1206 void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() {
1207 if (timer_handle_.has_value() &&
1208 xds_client()->engine()->Cancel(*timer_handle_)) {
1209 timer_handle_.reset();
1210 Unref(DEBUG_LOCATION, "Orphan");
1211 }
1212 }
1213
1214 void XdsClient::ChannelState::LrsCallState::Reporter::
ScheduleNextReportLocked()1215 ScheduleNextReportLocked() {
1216 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1217 gpr_log(GPR_INFO,
1218 "[xds_client %p] xds server %s: scheduling load report timer",
1219 xds_client(), parent_->chand()->server_.server_uri().c_str());
1220 }
1221 timer_handle_ = xds_client()->engine()->RunAfter(report_interval_, [this]() {
1222 ApplicationCallbackExecCtx callback_exec_ctx;
1223 ExecCtx exec_ctx;
1224 if (OnNextReportTimer()) {
1225 Unref(DEBUG_LOCATION, "OnNextReportTimer()");
1226 }
1227 });
1228 }
1229
OnNextReportTimer()1230 bool XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer() {
1231 MutexLock lock(&xds_client()->mu_);
1232 timer_handle_.reset();
1233 if (!IsCurrentReporterOnCall()) return true;
1234 SendReportLocked();
1235 return false;
1236 }
1237
1238 namespace {
1239
LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap & snapshot)1240 bool LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap& snapshot) {
1241 for (const auto& p : snapshot) {
1242 const XdsApi::ClusterLoadReport& cluster_snapshot = p.second;
1243 if (!cluster_snapshot.dropped_requests.IsZero()) return false;
1244 for (const auto& q : cluster_snapshot.locality_stats) {
1245 const XdsClusterLocalityStats::Snapshot& locality_snapshot = q.second;
1246 if (!locality_snapshot.IsZero()) return false;
1247 }
1248 }
1249 return true;
1250 }
1251
1252 } // namespace
1253
SendReportLocked()1254 bool XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
1255 // Construct snapshot from all reported stats.
1256 XdsApi::ClusterLoadReportMap snapshot =
1257 xds_client()->BuildLoadReportSnapshotLocked(parent_->chand()->server_,
1258 parent_->send_all_clusters_,
1259 parent_->cluster_names_);
1260 // Skip client load report if the counters were all zero in the last
1261 // report and they are still zero in this one.
1262 const bool old_val = last_report_counters_were_zero_;
1263 last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot);
1264 if (old_val && last_report_counters_were_zero_) {
1265 auto it = xds_client()->xds_load_report_server_map_.find(
1266 &parent_->chand()->server_);
1267 if (it == xds_client()->xds_load_report_server_map_.end() ||
1268 it->second.load_report_map.empty()) {
1269 it->second.channel_state->StopLrsCallLocked();
1270 return true;
1271 }
1272 ScheduleNextReportLocked();
1273 return false;
1274 }
1275 // Send a request that contains the snapshot.
1276 std::string serialized_payload =
1277 xds_client()->api_.CreateLrsRequest(std::move(snapshot));
1278 parent_->call_->SendMessage(std::move(serialized_payload));
1279 parent_->send_message_pending_ = true;
1280 return false;
1281 }
1282
OnReportDoneLocked()1283 void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked() {
1284 // If a reporter starts a send_message op, then the reporting interval
1285 // changes and we destroy that reporter and create a new one, and then
1286 // the send_message op started by the old reporter finishes, this
1287 // method will be called even though it was for a completion started
1288 // by the old reporter. In that case, the timer will be pending, so
1289 // we just ignore the completion and wait for the timer to fire.
1290 if (timer_handle_.has_value()) return;
1291 // If there are no more registered stats to report, cancel the call.
1292 auto it = xds_client()->xds_load_report_server_map_.find(
1293 &parent_->chand()->server_);
1294 if (it == xds_client()->xds_load_report_server_map_.end()) return;
1295 if (it->second.load_report_map.empty()) {
1296 if (it->second.channel_state != nullptr) {
1297 it->second.channel_state->StopLrsCallLocked();
1298 }
1299 return;
1300 }
1301 // Otherwise, schedule the next load report.
1302 ScheduleNextReportLocked();
1303 }
1304
1305 //
1306 // XdsClient::ChannelState::LrsCallState
1307 //
1308
LrsCallState(RefCountedPtr<RetryableCall<LrsCallState>> parent)1309 XdsClient::ChannelState::LrsCallState::LrsCallState(
1310 RefCountedPtr<RetryableCall<LrsCallState>> parent)
1311 : InternallyRefCounted<LrsCallState>(
1312 GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
1313 ? "LrsCallState"
1314 : nullptr),
1315 parent_(std::move(parent)) {
1316 // Init the LRS call. Note that the call will progress every time there's
1317 // activity in xds_client()->interested_parties_, which is comprised of
1318 // the polling entities from client_channel.
1319 GPR_ASSERT(xds_client() != nullptr);
1320 const char* method =
1321 "/envoy.service.load_stats.v3.LoadReportingService/StreamLoadStats";
1322 call_ = chand()->transport_->CreateStreamingCall(
1323 method, std::make_unique<StreamEventHandler>(
1324 // Passing the initial ref here. This ref will go away when
1325 // the StreamEventHandler is destroyed.
1326 RefCountedPtr<LrsCallState>(this)));
1327 GPR_ASSERT(call_ != nullptr);
1328 // Start the call.
1329 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1330 gpr_log(GPR_INFO,
1331 "[xds_client %p] xds server %s: starting LRS call (calld=%p, "
1332 "call=%p)",
1333 xds_client(), chand()->server_.server_uri().c_str(), this,
1334 call_.get());
1335 }
1336 // Send the initial request.
1337 std::string serialized_payload = xds_client()->api_.CreateLrsInitialRequest();
1338 call_->SendMessage(std::move(serialized_payload));
1339 send_message_pending_ = true;
1340 }
1341
Orphan()1342 void XdsClient::ChannelState::LrsCallState::Orphan() {
1343 reporter_.reset();
1344 // Note that the initial ref is held by the StreamEventHandler, which
1345 // will be destroyed when call_ is destroyed, which may not happen
1346 // here, since there may be other refs held to call_ by internal callbacks.
1347 call_.reset();
1348 }
1349
MaybeStartReportingLocked()1350 void XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked() {
1351 // Don't start again if already started.
1352 if (reporter_ != nullptr) return;
1353 // Don't start if the previous send_message op (of the initial request or
1354 // the last report of the previous reporter) hasn't completed.
1355 if (call_ != nullptr && send_message_pending_) return;
1356 // Don't start if no LRS response has arrived.
1357 if (!seen_response()) return;
1358 // Don't start if the ADS call hasn't received any valid response. Note that
1359 // this must be the first channel because it is the current channel but its
1360 // ADS call hasn't seen any response.
1361 if (chand()->ads_calld_ == nullptr ||
1362 chand()->ads_calld_->calld() == nullptr ||
1363 !chand()->ads_calld_->calld()->seen_response()) {
1364 return;
1365 }
1366 // Start reporting.
1367 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1368 gpr_log(GPR_INFO, "[xds_client %p] xds server %s: creating load reporter",
1369 xds_client(), chand()->server_.server_uri().c_str());
1370 }
1371 reporter_ = MakeOrphanable<Reporter>(
1372 Ref(DEBUG_LOCATION, "LRS+load_report+start"), load_reporting_interval_);
1373 }
1374
OnRequestSent(bool)1375 void XdsClient::ChannelState::LrsCallState::OnRequestSent(bool /*ok*/) {
1376 MutexLock lock(&xds_client()->mu_);
1377 send_message_pending_ = false;
1378 if (reporter_ != nullptr) {
1379 reporter_->OnReportDoneLocked();
1380 } else {
1381 MaybeStartReportingLocked();
1382 }
1383 }
1384
OnRecvMessage(absl::string_view payload)1385 void XdsClient::ChannelState::LrsCallState::OnRecvMessage(
1386 absl::string_view payload) {
1387 MutexLock lock(&xds_client()->mu_);
1388 // If we're no longer the current call, ignore the result.
1389 if (!IsCurrentCallOnChannel()) return;
1390 // Parse the response.
1391 bool send_all_clusters = false;
1392 std::set<std::string> new_cluster_names;
1393 Duration new_load_reporting_interval;
1394 absl::Status status = xds_client()->api_.ParseLrsResponse(
1395 payload, &send_all_clusters, &new_cluster_names,
1396 &new_load_reporting_interval);
1397 if (!status.ok()) {
1398 gpr_log(GPR_ERROR,
1399 "[xds_client %p] xds server %s: LRS response parsing failed: %s",
1400 xds_client(), chand()->server_.server_uri().c_str(),
1401 status.ToString().c_str());
1402 return;
1403 }
1404 seen_response_ = true;
1405 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1406 gpr_log(
1407 GPR_INFO,
1408 "[xds_client %p] xds server %s: LRS response received, %" PRIuPTR
1409 " cluster names, send_all_clusters=%d, load_report_interval=%" PRId64
1410 "ms",
1411 xds_client(), chand()->server_.server_uri().c_str(),
1412 new_cluster_names.size(), send_all_clusters,
1413 new_load_reporting_interval.millis());
1414 size_t i = 0;
1415 for (const auto& name : new_cluster_names) {
1416 gpr_log(GPR_INFO, "[xds_client %p] cluster_name %" PRIuPTR ": %s",
1417 xds_client(), i++, name.c_str());
1418 }
1419 }
1420 if (new_load_reporting_interval <
1421 Duration::Milliseconds(GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS)) {
1422 new_load_reporting_interval =
1423 Duration::Milliseconds(GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
1424 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1425 gpr_log(GPR_INFO,
1426 "[xds_client %p] xds server %s: increased load_report_interval "
1427 "to minimum value %dms",
1428 xds_client(), chand()->server_.server_uri().c_str(),
1429 GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
1430 }
1431 }
1432 // Ignore identical update.
1433 if (send_all_clusters == send_all_clusters_ &&
1434 cluster_names_ == new_cluster_names &&
1435 load_reporting_interval_ == new_load_reporting_interval) {
1436 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1437 gpr_log(GPR_INFO,
1438 "[xds_client %p] xds server %s: incoming LRS response identical "
1439 "to current, ignoring.",
1440 xds_client(), chand()->server_.server_uri().c_str());
1441 }
1442 return;
1443 }
1444 // Stop current load reporting (if any) to adopt the new config.
1445 reporter_.reset();
1446 // Record the new config.
1447 send_all_clusters_ = send_all_clusters;
1448 cluster_names_ = std::move(new_cluster_names);
1449 load_reporting_interval_ = new_load_reporting_interval;
1450 // Try starting sending load report.
1451 MaybeStartReportingLocked();
1452 }
1453
OnStatusReceived(absl::Status status)1454 void XdsClient::ChannelState::LrsCallState::OnStatusReceived(
1455 absl::Status status) {
1456 MutexLock lock(&xds_client()->mu_);
1457 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1458 gpr_log(GPR_INFO,
1459 "[xds_client %p] xds server %s: LRS call status received "
1460 "(chand=%p, calld=%p, call=%p): %s",
1461 xds_client(), chand()->server_.server_uri().c_str(), chand(), this,
1462 call_.get(), status.ToString().c_str());
1463 }
1464 // Ignore status from a stale call.
1465 if (IsCurrentCallOnChannel()) {
1466 // Try to restart the call.
1467 parent_->OnCallFinishedLocked();
1468 }
1469 }
1470
IsCurrentCallOnChannel() const1471 bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const {
1472 // If the retryable LRS call is null (which only happens when the xds
1473 // channel is shutting down), all the LRS calls are stale.
1474 if (chand()->lrs_calld_ == nullptr) return false;
1475 return this == chand()->lrs_calld_->calld();
1476 }
1477
1478 //
1479 // XdsClient
1480 //
1481
XdsClient(std::unique_ptr<XdsBootstrap> bootstrap,OrphanablePtr<XdsTransportFactory> transport_factory,std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine,std::string user_agent_name,std::string user_agent_version,Duration resource_request_timeout)1482 XdsClient::XdsClient(
1483 std::unique_ptr<XdsBootstrap> bootstrap,
1484 OrphanablePtr<XdsTransportFactory> transport_factory,
1485 std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine,
1486 std::string user_agent_name, std::string user_agent_version,
1487 Duration resource_request_timeout)
1488 : DualRefCounted<XdsClient>(
1489 GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) ? "XdsClient"
1490 : nullptr),
1491 bootstrap_(std::move(bootstrap)),
1492 transport_factory_(std::move(transport_factory)),
1493 request_timeout_(resource_request_timeout),
1494 xds_federation_enabled_(XdsFederationEnabled()),
1495 api_(this, &grpc_xds_client_trace, bootstrap_->node(), &symtab_,
1496 std::move(user_agent_name), std::move(user_agent_version)),
1497 engine_(std::move(engine)) {
1498 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1499 gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this);
1500 }
1501 GPR_ASSERT(bootstrap_ != nullptr);
1502 if (bootstrap_->node() != nullptr) {
1503 gpr_log(GPR_INFO, "[xds_client %p] xDS node ID: %s", this,
1504 bootstrap_->node()->id().c_str());
1505 }
1506 }
1507
~XdsClient()1508 XdsClient::~XdsClient() {
1509 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1510 gpr_log(GPR_INFO, "[xds_client %p] destroying xds client", this);
1511 }
1512 }
1513
Orphan()1514 void XdsClient::Orphan() {
1515 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1516 gpr_log(GPR_INFO, "[xds_client %p] shutting down xds client", this);
1517 }
1518 MutexLock lock(&mu_);
1519 shutting_down_ = true;
1520 // Clear cache and any remaining watchers that may not have been cancelled.
1521 authority_state_map_.clear();
1522 invalid_watchers_.clear();
1523 // We may still be sending lingering queued load report data, so don't
1524 // just clear the load reporting map, but we do want to clear the refs
1525 // we're holding to the ChannelState objects, to make sure that
1526 // everything shuts down properly.
1527 for (auto& p : xds_load_report_server_map_) {
1528 p.second.channel_state.reset(DEBUG_LOCATION, "XdsClient::Orphan()");
1529 }
1530 }
1531
GetOrCreateChannelStateLocked(const XdsBootstrap::XdsServer & server,const char * reason)1532 RefCountedPtr<XdsClient::ChannelState> XdsClient::GetOrCreateChannelStateLocked(
1533 const XdsBootstrap::XdsServer& server, const char* reason) {
1534 auto it = xds_server_channel_map_.find(&server);
1535 if (it != xds_server_channel_map_.end()) {
1536 return it->second->Ref(DEBUG_LOCATION, reason);
1537 }
1538 // Channel not found, so create a new one.
1539 auto channel_state = MakeRefCounted<ChannelState>(
1540 WeakRef(DEBUG_LOCATION, "ChannelState"), server);
1541 xds_server_channel_map_[&server] = channel_state.get();
1542 return channel_state;
1543 }
1544
WatchResource(const XdsResourceType * type,absl::string_view name,RefCountedPtr<ResourceWatcherInterface> watcher)1545 void XdsClient::WatchResource(const XdsResourceType* type,
1546 absl::string_view name,
1547 RefCountedPtr<ResourceWatcherInterface> watcher) {
1548 ResourceWatcherInterface* w = watcher.get();
1549 // Lambda for handling failure cases.
1550 auto fail = [&](absl::Status status) mutable {
1551 {
1552 MutexLock lock(&mu_);
1553 MaybeRegisterResourceTypeLocked(type);
1554 invalid_watchers_[w] = watcher;
1555 }
1556 work_serializer_.Run(
1557 [watcher = std::move(watcher), status = std::move(status)]()
1558 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
1559 watcher->OnError(status);
1560 },
1561 DEBUG_LOCATION);
1562 };
1563 auto resource_name = ParseXdsResourceName(name, type);
1564 if (!resource_name.ok()) {
1565 fail(absl::UnavailableError(
1566 absl::StrCat("Unable to parse resource name ", name)));
1567 return;
1568 }
1569 // Find server to use.
1570 const XdsBootstrap::XdsServer* xds_server = nullptr;
1571 absl::string_view authority_name = resource_name->authority;
1572 if (absl::ConsumePrefix(&authority_name, "xdstp:")) {
1573 auto* authority = bootstrap_->LookupAuthority(std::string(authority_name));
1574 if (authority == nullptr) {
1575 fail(absl::UnavailableError(
1576 absl::StrCat("authority \"", authority_name,
1577 "\" not present in bootstrap config")));
1578 return;
1579 }
1580 xds_server = authority->server();
1581 }
1582 if (xds_server == nullptr) xds_server = &bootstrap_->server();
1583 // Canonify the xDS server instance, so that we make sure we're using
1584 // the same instance as will be used in AddClusterDropStats() and
1585 // AddClusterLocalityStats(). This may yield a different result than
1586 // the logic above if the same server is listed both in the authority
1587 // and as the top-level server.
1588 // TODO(roth): This is really ugly -- need to find a better way to
1589 // index the xDS server than by address here.
1590 xds_server = bootstrap_->FindXdsServer(*xds_server);
1591 {
1592 MutexLock lock(&mu_);
1593 MaybeRegisterResourceTypeLocked(type);
1594 AuthorityState& authority_state =
1595 authority_state_map_[resource_name->authority];
1596 ResourceState& resource_state =
1597 authority_state.resource_map[type][resource_name->key];
1598 resource_state.watchers[w] = watcher;
1599 // If we already have a cached value for the resource, notify the new
1600 // watcher immediately.
1601 if (resource_state.resource != nullptr) {
1602 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1603 gpr_log(GPR_INFO,
1604 "[xds_client %p] returning cached listener data for %s", this,
1605 std::string(name).c_str());
1606 }
1607 auto* value = type->CopyResource(resource_state.resource.get()).release();
1608 work_serializer_.Schedule(
1609 [watcher, value]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
1610 watcher->OnGenericResourceChanged(value);
1611 delete value;
1612 },
1613 DEBUG_LOCATION);
1614 } else if (resource_state.meta.client_status ==
1615 XdsApi::ResourceMetadata::DOES_NOT_EXIST) {
1616 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1617 gpr_log(GPR_INFO,
1618 "[xds_client %p] reporting cached does-not-exist for %s", this,
1619 std::string(name).c_str());
1620 }
1621 work_serializer_.Schedule(
1622 [watcher]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
1623 watcher->OnResourceDoesNotExist();
1624 },
1625 DEBUG_LOCATION);
1626 } else if (resource_state.meta.client_status ==
1627 XdsApi::ResourceMetadata::NACKED) {
1628 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1629 gpr_log(
1630 GPR_INFO,
1631 "[xds_client %p] reporting cached validation failure for %s: %s",
1632 this, std::string(name).c_str(),
1633 resource_state.meta.failed_details.c_str());
1634 }
1635 std::string details = resource_state.meta.failed_details;
1636 const auto* node = bootstrap_->node();
1637 if (node != nullptr) {
1638 absl::StrAppend(&details, " (node ID:", bootstrap_->node()->id(), ")");
1639 }
1640 work_serializer_.Schedule(
1641 [watcher, details = std::move(details)]()
1642 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
1643 watcher->OnError(absl::UnavailableError(
1644 absl::StrCat("invalid resource: ", details)));
1645 },
1646 DEBUG_LOCATION);
1647 }
1648 // If the authority doesn't yet have a channel, set it, creating it if
1649 // needed.
1650 if (authority_state.channel_state == nullptr) {
1651 authority_state.channel_state =
1652 GetOrCreateChannelStateLocked(*xds_server, "start watch");
1653 }
1654 absl::Status channel_status = authority_state.channel_state->status();
1655 if (!channel_status.ok()) {
1656 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1657 gpr_log(GPR_INFO,
1658 "[xds_client %p] returning cached channel error for %s: %s",
1659 this, std::string(name).c_str(),
1660 channel_status.ToString().c_str());
1661 }
1662 work_serializer_.Schedule(
1663 [watcher = std::move(watcher), status = std::move(channel_status)]()
1664 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) mutable {
1665 watcher->OnError(std::move(status));
1666 },
1667 DEBUG_LOCATION);
1668 }
1669 authority_state.channel_state->SubscribeLocked(type, *resource_name);
1670 }
1671 work_serializer_.DrainQueue();
1672 }
1673
CancelResourceWatch(const XdsResourceType * type,absl::string_view name,ResourceWatcherInterface * watcher,bool delay_unsubscription)1674 void XdsClient::CancelResourceWatch(const XdsResourceType* type,
1675 absl::string_view name,
1676 ResourceWatcherInterface* watcher,
1677 bool delay_unsubscription) {
1678 auto resource_name = ParseXdsResourceName(name, type);
1679 MutexLock lock(&mu_);
1680 // We cannot be sure whether the watcher is in invalid_watchers_ or in
1681 // authority_state_map_, so we check both, just to be safe.
1682 invalid_watchers_.erase(watcher);
1683 // Find authority.
1684 if (!resource_name.ok()) return;
1685 auto authority_it = authority_state_map_.find(resource_name->authority);
1686 if (authority_it == authority_state_map_.end()) return;
1687 AuthorityState& authority_state = authority_it->second;
1688 // Find type map.
1689 auto type_it = authority_state.resource_map.find(type);
1690 if (type_it == authority_state.resource_map.end()) return;
1691 auto& type_map = type_it->second;
1692 // Find resource key.
1693 auto resource_it = type_map.find(resource_name->key);
1694 if (resource_it == type_map.end()) return;
1695 ResourceState& resource_state = resource_it->second;
1696 // Remove watcher.
1697 resource_state.watchers.erase(watcher);
1698 // Clean up empty map entries, if any.
1699 if (resource_state.watchers.empty()) {
1700 if (resource_state.ignored_deletion) {
1701 gpr_log(GPR_INFO,
1702 "[xds_client %p] unsubscribing from a resource for which we "
1703 "previously ignored a deletion: type %s name %s",
1704 this, std::string(type->type_url()).c_str(),
1705 std::string(name).c_str());
1706 }
1707 authority_state.channel_state->UnsubscribeLocked(type, *resource_name,
1708 delay_unsubscription);
1709 type_map.erase(resource_it);
1710 if (type_map.empty()) {
1711 authority_state.resource_map.erase(type_it);
1712 if (authority_state.resource_map.empty()) {
1713 authority_state.channel_state.reset();
1714 }
1715 }
1716 }
1717 }
1718
MaybeRegisterResourceTypeLocked(const XdsResourceType * resource_type)1719 void XdsClient::MaybeRegisterResourceTypeLocked(
1720 const XdsResourceType* resource_type) {
1721 auto it = resource_types_.find(resource_type->type_url());
1722 if (it != resource_types_.end()) {
1723 GPR_ASSERT(it->second == resource_type);
1724 return;
1725 }
1726 resource_types_.emplace(resource_type->type_url(), resource_type);
1727 resource_type->InitUpbSymtab(this, symtab_.ptr());
1728 }
1729
GetResourceTypeLocked(absl::string_view resource_type)1730 const XdsResourceType* XdsClient::GetResourceTypeLocked(
1731 absl::string_view resource_type) {
1732 auto it = resource_types_.find(resource_type);
1733 if (it != resource_types_.end()) return it->second;
1734 return nullptr;
1735 }
1736
ParseXdsResourceName(absl::string_view name,const XdsResourceType * type)1737 absl::StatusOr<XdsClient::XdsResourceName> XdsClient::ParseXdsResourceName(
1738 absl::string_view name, const XdsResourceType* type) {
1739 // Old-style names use the empty string for authority.
1740 // authority is prefixed with "old:" to indicate that it's an old-style
1741 // name.
1742 if (!xds_federation_enabled_ || !absl::StartsWith(name, "xdstp:")) {
1743 return XdsResourceName{"old:", {std::string(name), {}}};
1744 }
1745 // New style name. Parse URI.
1746 auto uri = URI::Parse(name);
1747 if (!uri.ok()) return uri.status();
1748 // Split the resource type off of the path to get the id.
1749 std::pair<absl::string_view, absl::string_view> path_parts = absl::StrSplit(
1750 absl::StripPrefix(uri->path(), "/"), absl::MaxSplits('/', 1));
1751 if (type->type_url() != path_parts.first) {
1752 return absl::InvalidArgumentError(
1753 "xdstp URI path must indicate valid xDS resource type");
1754 }
1755 // Canonicalize order of query params.
1756 std::vector<URI::QueryParam> query_params;
1757 for (const auto& p : uri->query_parameter_map()) {
1758 query_params.emplace_back(
1759 URI::QueryParam{std::string(p.first), std::string(p.second)});
1760 }
1761 return XdsResourceName{
1762 absl::StrCat("xdstp:", uri->authority()),
1763 {std::string(path_parts.second), std::move(query_params)}};
1764 }
1765
ConstructFullXdsResourceName(absl::string_view authority,absl::string_view resource_type,const XdsResourceKey & key)1766 std::string XdsClient::ConstructFullXdsResourceName(
1767 absl::string_view authority, absl::string_view resource_type,
1768 const XdsResourceKey& key) {
1769 if (absl::ConsumePrefix(&authority, "xdstp:")) {
1770 auto uri = URI::Create("xdstp", std::string(authority),
1771 absl::StrCat("/", resource_type, "/", key.id),
1772 key.query_params, /*fragment=*/"");
1773 GPR_ASSERT(uri.ok());
1774 return uri->ToString();
1775 }
1776 // Old-style name.
1777 return key.id;
1778 }
1779
AddClusterDropStats(const XdsBootstrap::XdsServer & xds_server,absl::string_view cluster_name,absl::string_view eds_service_name)1780 RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats(
1781 const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
1782 absl::string_view eds_service_name) {
1783 const auto* server = bootstrap_->FindXdsServer(xds_server);
1784 if (server == nullptr) return nullptr;
1785 auto key =
1786 std::make_pair(std::string(cluster_name), std::string(eds_service_name));
1787 RefCountedPtr<XdsClusterDropStats> cluster_drop_stats;
1788 {
1789 MutexLock lock(&mu_);
1790 // We jump through some hoops here to make sure that the const
1791 // XdsBootstrap::XdsServer& and absl::string_views
1792 // stored in the XdsClusterDropStats object point to the
1793 // XdsBootstrap::XdsServer and strings
1794 // in the load_report_map_ key, so that they have the same lifetime.
1795 auto server_it =
1796 xds_load_report_server_map_.emplace(server, LoadReportServer()).first;
1797 if (server_it->second.channel_state == nullptr) {
1798 server_it->second.channel_state = GetOrCreateChannelStateLocked(
1799 *server, "load report map (drop stats)");
1800 }
1801 auto load_report_it = server_it->second.load_report_map
1802 .emplace(std::move(key), LoadReportState())
1803 .first;
1804 LoadReportState& load_report_state = load_report_it->second;
1805 if (load_report_state.drop_stats != nullptr) {
1806 cluster_drop_stats = load_report_state.drop_stats->RefIfNonZero();
1807 }
1808 if (cluster_drop_stats == nullptr) {
1809 if (load_report_state.drop_stats != nullptr) {
1810 load_report_state.deleted_drop_stats +=
1811 load_report_state.drop_stats->GetSnapshotAndReset();
1812 }
1813 cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>(
1814 Ref(DEBUG_LOCATION, "DropStats"), *server,
1815 load_report_it->first.first /*cluster_name*/,
1816 load_report_it->first.second /*eds_service_name*/);
1817 load_report_state.drop_stats = cluster_drop_stats.get();
1818 }
1819 server_it->second.channel_state->MaybeStartLrsCall();
1820 }
1821 work_serializer_.DrainQueue();
1822 return cluster_drop_stats;
1823 }
1824
RemoveClusterDropStats(const XdsBootstrap::XdsServer & xds_server,absl::string_view cluster_name,absl::string_view eds_service_name,XdsClusterDropStats * cluster_drop_stats)1825 void XdsClient::RemoveClusterDropStats(
1826 const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
1827 absl::string_view eds_service_name,
1828 XdsClusterDropStats* cluster_drop_stats) {
1829 const auto* server = bootstrap_->FindXdsServer(xds_server);
1830 if (server == nullptr) return;
1831 MutexLock lock(&mu_);
1832 auto server_it = xds_load_report_server_map_.find(server);
1833 if (server_it == xds_load_report_server_map_.end()) return;
1834 auto load_report_it = server_it->second.load_report_map.find(
1835 std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
1836 if (load_report_it == server_it->second.load_report_map.end()) return;
1837 LoadReportState& load_report_state = load_report_it->second;
1838 if (load_report_state.drop_stats == cluster_drop_stats) {
1839 // Record final snapshot in deleted_drop_stats, which will be
1840 // added to the next load report.
1841 load_report_state.deleted_drop_stats +=
1842 load_report_state.drop_stats->GetSnapshotAndReset();
1843 load_report_state.drop_stats = nullptr;
1844 }
1845 }
1846
AddClusterLocalityStats(const XdsBootstrap::XdsServer & xds_server,absl::string_view cluster_name,absl::string_view eds_service_name,RefCountedPtr<XdsLocalityName> locality)1847 RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats(
1848 const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
1849 absl::string_view eds_service_name,
1850 RefCountedPtr<XdsLocalityName> locality) {
1851 const auto* server = bootstrap_->FindXdsServer(xds_server);
1852 if (server == nullptr) return nullptr;
1853 auto key =
1854 std::make_pair(std::string(cluster_name), std::string(eds_service_name));
1855 RefCountedPtr<XdsClusterLocalityStats> cluster_locality_stats;
1856 {
1857 MutexLock lock(&mu_);
1858 // We jump through some hoops here to make sure that the const
1859 // XdsBootstrap::XdsServer& and absl::string_views
1860 // stored in the XdsClusterDropStats object point to the
1861 // XdsBootstrap::XdsServer and strings
1862 // in the load_report_map_ key, so that they have the same lifetime.
1863 auto server_it =
1864 xds_load_report_server_map_.emplace(server, LoadReportServer()).first;
1865 if (server_it->second.channel_state == nullptr) {
1866 server_it->second.channel_state = GetOrCreateChannelStateLocked(
1867 *server, "load report map (locality stats)");
1868 }
1869 auto load_report_it = server_it->second.load_report_map
1870 .emplace(std::move(key), LoadReportState())
1871 .first;
1872 LoadReportState& load_report_state = load_report_it->second;
1873 LoadReportState::LocalityState& locality_state =
1874 load_report_state.locality_stats[locality];
1875 if (locality_state.locality_stats != nullptr) {
1876 cluster_locality_stats = locality_state.locality_stats->RefIfNonZero();
1877 }
1878 if (cluster_locality_stats == nullptr) {
1879 if (locality_state.locality_stats != nullptr) {
1880 locality_state.deleted_locality_stats +=
1881 locality_state.locality_stats->GetSnapshotAndReset();
1882 }
1883 cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>(
1884 Ref(DEBUG_LOCATION, "LocalityStats"), *server,
1885 load_report_it->first.first /*cluster_name*/,
1886 load_report_it->first.second /*eds_service_name*/,
1887 std::move(locality));
1888 locality_state.locality_stats = cluster_locality_stats.get();
1889 }
1890 server_it->second.channel_state->MaybeStartLrsCall();
1891 }
1892 work_serializer_.DrainQueue();
1893 return cluster_locality_stats;
1894 }
1895
RemoveClusterLocalityStats(const XdsBootstrap::XdsServer & xds_server,absl::string_view cluster_name,absl::string_view eds_service_name,const RefCountedPtr<XdsLocalityName> & locality,XdsClusterLocalityStats * cluster_locality_stats)1896 void XdsClient::RemoveClusterLocalityStats(
1897 const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
1898 absl::string_view eds_service_name,
1899 const RefCountedPtr<XdsLocalityName>& locality,
1900 XdsClusterLocalityStats* cluster_locality_stats) {
1901 const auto* server = bootstrap_->FindXdsServer(xds_server);
1902 if (server == nullptr) return;
1903 MutexLock lock(&mu_);
1904 auto server_it = xds_load_report_server_map_.find(server);
1905 if (server_it == xds_load_report_server_map_.end()) return;
1906 auto load_report_it = server_it->second.load_report_map.find(
1907 std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
1908 if (load_report_it == server_it->second.load_report_map.end()) return;
1909 LoadReportState& load_report_state = load_report_it->second;
1910 auto locality_it = load_report_state.locality_stats.find(locality);
1911 if (locality_it == load_report_state.locality_stats.end()) return;
1912 LoadReportState::LocalityState& locality_state = locality_it->second;
1913 if (locality_state.locality_stats == cluster_locality_stats) {
1914 // Record final snapshot in deleted_locality_stats, which will be
1915 // added to the next load report.
1916 locality_state.deleted_locality_stats +=
1917 locality_state.locality_stats->GetSnapshotAndReset();
1918 locality_state.locality_stats = nullptr;
1919 }
1920 }
1921
ResetBackoff()1922 void XdsClient::ResetBackoff() {
1923 MutexLock lock(&mu_);
1924 for (auto& p : xds_server_channel_map_) {
1925 p.second->ResetBackoff();
1926 }
1927 }
1928
NotifyWatchersOnErrorLocked(const std::map<ResourceWatcherInterface *,RefCountedPtr<ResourceWatcherInterface>> & watchers,absl::Status status)1929 void XdsClient::NotifyWatchersOnErrorLocked(
1930 const std::map<ResourceWatcherInterface*,
1931 RefCountedPtr<ResourceWatcherInterface>>& watchers,
1932 absl::Status status) {
1933 const auto* node = bootstrap_->node();
1934 if (node != nullptr) {
1935 status = absl::Status(
1936 status.code(),
1937 absl::StrCat(status.message(), " (node ID:", node->id(), ")"));
1938 }
1939 work_serializer_.Schedule(
1940 [watchers, status = std::move(status)]()
1941 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
1942 for (const auto& p : watchers) {
1943 p.first->OnError(status);
1944 }
1945 },
1946 DEBUG_LOCATION);
1947 }
1948
NotifyWatchersOnResourceDoesNotExist(const std::map<ResourceWatcherInterface *,RefCountedPtr<ResourceWatcherInterface>> & watchers)1949 void XdsClient::NotifyWatchersOnResourceDoesNotExist(
1950 const std::map<ResourceWatcherInterface*,
1951 RefCountedPtr<ResourceWatcherInterface>>& watchers) {
1952 work_serializer_.Schedule(
1953 [watchers]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
1954 for (const auto& p : watchers) {
1955 p.first->OnResourceDoesNotExist();
1956 }
1957 },
1958 DEBUG_LOCATION);
1959 }
1960
BuildLoadReportSnapshotLocked(const XdsBootstrap::XdsServer & xds_server,bool send_all_clusters,const std::set<std::string> & clusters)1961 XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(
1962 const XdsBootstrap::XdsServer& xds_server, bool send_all_clusters,
1963 const std::set<std::string>& clusters) {
1964 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1965 gpr_log(GPR_INFO, "[xds_client %p] start building load report", this);
1966 }
1967 XdsApi::ClusterLoadReportMap snapshot_map;
1968 auto server_it = xds_load_report_server_map_.find(&xds_server);
1969 if (server_it == xds_load_report_server_map_.end()) return snapshot_map;
1970 auto& load_report_map = server_it->second.load_report_map;
1971 for (auto load_report_it = load_report_map.begin();
1972 load_report_it != load_report_map.end();) {
1973 // Cluster key is cluster and EDS service name.
1974 const auto& cluster_key = load_report_it->first;
1975 LoadReportState& load_report = load_report_it->second;
1976 // If the CDS response for a cluster indicates to use LRS but the
1977 // LRS server does not say that it wants reports for this cluster,
1978 // then we'll have stats objects here whose data we're not going to
1979 // include in the load report. However, we still need to clear out
1980 // the data from the stats objects, so that if the LRS server starts
1981 // asking for the data in the future, we don't incorrectly include
1982 // data from previous reporting intervals in that future report.
1983 const bool record_stats =
1984 send_all_clusters || clusters.find(cluster_key.first) != clusters.end();
1985 XdsApi::ClusterLoadReport snapshot;
1986 // Aggregate drop stats.
1987 snapshot.dropped_requests = std::move(load_report.deleted_drop_stats);
1988 if (load_report.drop_stats != nullptr) {
1989 snapshot.dropped_requests +=
1990 load_report.drop_stats->GetSnapshotAndReset();
1991 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1992 gpr_log(GPR_INFO,
1993 "[xds_client %p] cluster=%s eds_service_name=%s drop_stats=%p",
1994 this, cluster_key.first.c_str(), cluster_key.second.c_str(),
1995 load_report.drop_stats);
1996 }
1997 }
1998 // Aggregate locality stats.
1999 for (auto it = load_report.locality_stats.begin();
2000 it != load_report.locality_stats.end();) {
2001 const RefCountedPtr<XdsLocalityName>& locality_name = it->first;
2002 auto& locality_state = it->second;
2003 XdsClusterLocalityStats::Snapshot& locality_snapshot =
2004 snapshot.locality_stats[locality_name];
2005 locality_snapshot = std::move(locality_state.deleted_locality_stats);
2006 if (locality_state.locality_stats != nullptr) {
2007 locality_snapshot +=
2008 locality_state.locality_stats->GetSnapshotAndReset();
2009 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2010 gpr_log(GPR_INFO,
2011 "[xds_client %p] cluster=%s eds_service_name=%s "
2012 "locality=%s locality_stats=%p",
2013 this, cluster_key.first.c_str(), cluster_key.second.c_str(),
2014 locality_name->AsHumanReadableString().c_str(),
2015 locality_state.locality_stats);
2016 }
2017 }
2018 // If the only thing left in this entry was final snapshots from
2019 // deleted locality stats objects, remove the entry.
2020 if (locality_state.locality_stats == nullptr) {
2021 it = load_report.locality_stats.erase(it);
2022 } else {
2023 ++it;
2024 }
2025 }
2026 // Compute load report interval.
2027 const Timestamp now = Timestamp::Now();
2028 snapshot.load_report_interval = now - load_report.last_report_time;
2029 load_report.last_report_time = now;
2030 // Record snapshot.
2031 if (record_stats) {
2032 snapshot_map[cluster_key] = std::move(snapshot);
2033 }
2034 // If the only thing left in this entry was final snapshots from
2035 // deleted stats objects, remove the entry.
2036 if (load_report.locality_stats.empty() &&
2037 load_report.drop_stats == nullptr) {
2038 load_report_it = load_report_map.erase(load_report_it);
2039 } else {
2040 ++load_report_it;
2041 }
2042 }
2043 return snapshot_map;
2044 }
2045
DumpClientConfigBinary()2046 std::string XdsClient::DumpClientConfigBinary() {
2047 MutexLock lock(&mu_);
2048 XdsApi::ResourceTypeMetadataMap resource_type_metadata_map;
2049 for (const auto& a : authority_state_map_) { // authority
2050 const std::string& authority = a.first;
2051 for (const auto& t : a.second.resource_map) { // type
2052 const XdsResourceType* type = t.first;
2053 auto& resource_metadata_map =
2054 resource_type_metadata_map[type->type_url()];
2055 for (const auto& r : t.second) { // resource id
2056 const XdsResourceKey& resource_key = r.first;
2057 const ResourceState& resource_state = r.second;
2058 resource_metadata_map[ConstructFullXdsResourceName(
2059 authority, type->type_url(), resource_key)] = &resource_state.meta;
2060 }
2061 }
2062 }
2063 // Assemble config dump messages
2064 return api_.AssembleClientConfig(resource_type_metadata_map);
2065 }
2066
2067 } // namespace grpc_core
2068