1 //
2 // Copyright 2019 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include <stdint.h>
20 #include <string.h>
21
22 #include <algorithm>
23 #include <functional>
24 #include <map>
25 #include <memory>
26 #include <string>
27 #include <type_traits>
28 #include <utility>
29 #include <vector>
30
31 #include "absl/meta/type_traits.h"
32 #include "absl/random/random.h"
33 #include "absl/status/status.h"
34 #include "absl/status/statusor.h"
35 #include "absl/strings/match.h"
36 #include "absl/strings/str_cat.h"
37 #include "absl/strings/str_format.h"
38 #include "absl/strings/str_join.h"
39 #include "absl/strings/str_replace.h"
40 #include "absl/strings/string_view.h"
41 #include "absl/strings/strip.h"
42 #include "absl/types/optional.h"
43 #include "absl/types/variant.h"
44 #include "re2/re2.h"
45
46 #include <grpc/impl/channel_arg_names.h>
47 #include <grpc/slice.h>
48 #include <grpc/status.h>
49 #include <grpc/support/log.h>
50
51 #include "src/core/client_channel/client_channel_internal.h"
52 #include "src/core/client_channel/config_selector.h"
53 #include "src/core/ext/xds/xds_bootstrap.h"
54 #include "src/core/ext/xds/xds_bootstrap_grpc.h"
55 #include "src/core/ext/xds/xds_client_grpc.h"
56 #include "src/core/ext/xds/xds_http_filters.h"
57 #include "src/core/ext/xds/xds_listener.h"
58 #include "src/core/ext/xds/xds_route_config.h"
59 #include "src/core/ext/xds/xds_routing.h"
60 #include "src/core/lib/channel/channel_args.h"
61 #include "src/core/lib/channel/channel_fwd.h"
62 #include "src/core/lib/channel/channel_stack.h"
63 #include "src/core/lib/channel/context.h"
64 #include "src/core/lib/channel/promise_based_filter.h"
65 #include "src/core/lib/channel/status_util.h"
66 #include "src/core/lib/config/core_configuration.h"
67 #include "src/core/lib/debug/trace.h"
68 #include "src/core/lib/experiments/experiments.h"
69 #include "src/core/lib/gprpp/debug_location.h"
70 #include "src/core/lib/gprpp/dual_ref_counted.h"
71 #include "src/core/lib/gprpp/match.h"
72 #include "src/core/lib/gprpp/orphanable.h"
73 #include "src/core/lib/gprpp/ref_counted.h"
74 #include "src/core/lib/gprpp/ref_counted_ptr.h"
75 #include "src/core/lib/gprpp/time.h"
76 #include "src/core/lib/gprpp/work_serializer.h"
77 #include "src/core/lib/gprpp/xxhash_inline.h"
78 #include "src/core/lib/iomgr/iomgr_fwd.h"
79 #include "src/core/lib/iomgr/pollset_set.h"
80 #include "src/core/lib/promise/arena_promise.h"
81 #include "src/core/lib/promise/context.h"
82 #include "src/core/lib/resource_quota/arena.h"
83 #include "src/core/lib/slice/slice.h"
84 #include "src/core/lib/transport/metadata_batch.h"
85 #include "src/core/lib/transport/transport.h"
86 #include "src/core/lib/uri/uri_parser.h"
87 #include "src/core/load_balancing/ring_hash/ring_hash.h"
88 #include "src/core/resolver/endpoint_addresses.h"
89 #include "src/core/resolver/resolver.h"
90 #include "src/core/resolver/resolver_factory.h"
91 #include "src/core/resolver/xds/xds_dependency_manager.h"
92 #include "src/core/resolver/xds/xds_resolver_attributes.h"
93 #include "src/core/resolver/xds/xds_resolver_trace.h"
94 #include "src/core/service_config/service_config.h"
95 #include "src/core/service_config/service_config_impl.h"
96
97 namespace grpc_core {
98
99 namespace {
100
101 //
102 // XdsResolver
103 //
104
105 class XdsResolver final : public Resolver {
106 public:
XdsResolver(ResolverArgs args,std::string data_plane_authority)107 XdsResolver(ResolverArgs args, std::string data_plane_authority)
108 : work_serializer_(std::move(args.work_serializer)),
109 result_handler_(std::move(args.result_handler)),
110 args_(std::move(args.args)),
111 interested_parties_(args.pollset_set),
112 uri_(std::move(args.uri)),
113 data_plane_authority_(std::move(data_plane_authority)),
114 channel_id_(absl::Uniform<uint64_t>(absl::BitGen())) {
115 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
116 gpr_log(
117 GPR_INFO,
118 "[xds_resolver %p] created for URI %s; data plane authority is %s",
119 this, uri_.ToString().c_str(), data_plane_authority_.c_str());
120 }
121 }
122
~XdsResolver()123 ~XdsResolver() override {
124 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
125 gpr_log(GPR_INFO, "[xds_resolver %p] destroyed", this);
126 }
127 }
128
129 void StartLocked() override;
130
131 void ShutdownLocked() override;
132
ResetBackoffLocked()133 void ResetBackoffLocked() override {
134 if (xds_client_ != nullptr) xds_client_->ResetBackoff();
135 }
136
137 private:
138 class XdsWatcher final : public XdsDependencyManager::Watcher {
139 public:
XdsWatcher(RefCountedPtr<XdsResolver> resolver)140 explicit XdsWatcher(RefCountedPtr<XdsResolver> resolver)
141 : resolver_(std::move(resolver)) {}
142
OnUpdate(RefCountedPtr<const XdsDependencyManager::XdsConfig> config)143 void OnUpdate(
144 RefCountedPtr<const XdsDependencyManager::XdsConfig> config) override {
145 resolver_->OnUpdate(std::move(config));
146 }
147
OnError(absl::string_view context,absl::Status status)148 void OnError(absl::string_view context, absl::Status status) override {
149 resolver_->OnError(context, std::move(status));
150 }
151
OnResourceDoesNotExist(std::string context)152 void OnResourceDoesNotExist(std::string context) override {
153 resolver_->OnResourceDoesNotExist(std::move(context));
154 }
155
156 private:
157 RefCountedPtr<XdsResolver> resolver_;
158 };
159
160 // An entry in the map of clusters that need to be present in the LB
161 // policy config. The map holds a weak ref. One strong ref is held by
162 // the ConfigSelector, and another is held by each call assigned to
163 // the cluster by the ConfigSelector. The ref for each call is held
164 // until the call is committed. When the strong refs go away, we hop
165 // back into the WorkSerializer to remove the entry from the map.
166 class ClusterRef final : public DualRefCounted<ClusterRef> {
167 public:
ClusterRef(RefCountedPtr<XdsResolver> resolver,RefCountedPtr<XdsDependencyManager::ClusterSubscription> cluster_subscription,absl::string_view cluster_key)168 ClusterRef(RefCountedPtr<XdsResolver> resolver,
169 RefCountedPtr<XdsDependencyManager::ClusterSubscription>
170 cluster_subscription,
171 absl::string_view cluster_key)
172 : resolver_(std::move(resolver)),
173 cluster_subscription_(std::move(cluster_subscription)),
174 cluster_key_(cluster_key) {}
175
Orphaned()176 void Orphaned() override {
177 XdsResolver* resolver_ptr = resolver_.get();
178 resolver_ptr->work_serializer_->Run(
179 [resolver = std::move(resolver_)]() {
180 resolver->MaybeRemoveUnusedClusters();
181 },
182 DEBUG_LOCATION);
183 cluster_subscription_.reset();
184 }
185
cluster_key() const186 const std::string& cluster_key() const { return cluster_key_; }
187
188 private:
189 RefCountedPtr<XdsResolver> resolver_;
190 RefCountedPtr<XdsDependencyManager::ClusterSubscription>
191 cluster_subscription_;
192 std::string cluster_key_;
193 };
194
195 // A routing data including cluster refs and routes table held by the
196 // XdsConfigSelector. A ref to this map will be taken by each call processed
197 // by the XdsConfigSelector, stored in a the call's call attributes, and later
198 // unreffed by the ClusterSelection filter.
199 class RouteConfigData final : public RefCounted<RouteConfigData> {
200 public:
201 struct RouteEntry {
202 struct ClusterWeightState {
203 uint32_t range_end;
204 absl::string_view cluster;
205 RefCountedPtr<ServiceConfig> method_config;
206
operator ==grpc_core::__anon886d5f1d0111::XdsResolver::RouteConfigData::RouteEntry::ClusterWeightState207 bool operator==(const ClusterWeightState& other) const {
208 return range_end == other.range_end && cluster == other.cluster &&
209 MethodConfigsEqual(method_config.get(),
210 other.method_config.get());
211 }
212 };
213
214 XdsRouteConfigResource::Route route;
215 RefCountedPtr<ServiceConfig> method_config;
216 std::vector<ClusterWeightState> weighted_cluster_state;
217
RouteEntrygrpc_core::__anon886d5f1d0111::XdsResolver::RouteConfigData::RouteEntry218 explicit RouteEntry(const XdsRouteConfigResource::Route& r) : route(r) {}
219
operator ==grpc_core::__anon886d5f1d0111::XdsResolver::RouteConfigData::RouteEntry220 bool operator==(const RouteEntry& other) const {
221 return route == other.route &&
222 weighted_cluster_state == other.weighted_cluster_state &&
223 MethodConfigsEqual(method_config.get(),
224 other.method_config.get());
225 }
226 };
227
228 static absl::StatusOr<RefCountedPtr<RouteConfigData>> Create(
229 XdsResolver* resolver, const Duration& default_max_stream_duration);
230
operator ==(const RouteConfigData & other) const231 bool operator==(const RouteConfigData& other) const {
232 return clusters_ == other.clusters_ && routes_ == other.routes_;
233 }
234
FindClusterRef(absl::string_view name) const235 RefCountedPtr<ClusterRef> FindClusterRef(absl::string_view name) const {
236 auto it = clusters_.find(name);
237 if (it == clusters_.end()) {
238 return nullptr;
239 }
240 return it->second;
241 }
242
243 RouteEntry* GetRouteForRequest(absl::string_view path,
244 grpc_metadata_batch* initial_metadata);
245
246 private:
247 class RouteListIterator;
248
249 static absl::StatusOr<RefCountedPtr<ServiceConfig>> CreateMethodConfig(
250 XdsResolver* resolver, const XdsRouteConfigResource::Route& route,
251 const XdsRouteConfigResource::Route::RouteAction::ClusterWeight*
252 cluster_weight);
253
MethodConfigsEqual(const ServiceConfig * sc1,const ServiceConfig * sc2)254 static bool MethodConfigsEqual(const ServiceConfig* sc1,
255 const ServiceConfig* sc2) {
256 if (sc1 == nullptr) return sc2 == nullptr;
257 if (sc2 == nullptr) return false;
258 return sc1->json_string() == sc2->json_string();
259 }
260
261 absl::Status AddRouteEntry(XdsResolver* resolver,
262 const XdsRouteConfigResource::Route& route,
263 const Duration& default_max_stream_duration);
264
265 std::map<absl::string_view, RefCountedPtr<ClusterRef>> clusters_;
266 std::vector<RouteEntry> routes_;
267 };
268
269 class XdsConfigSelector final : public ConfigSelector {
270 public:
271 XdsConfigSelector(RefCountedPtr<XdsResolver> resolver,
272 RefCountedPtr<RouteConfigData> route_config_data);
273 ~XdsConfigSelector() override;
274
name() const275 const char* name() const override { return "XdsConfigSelector"; }
276
Equals(const ConfigSelector * other) const277 bool Equals(const ConfigSelector* other) const override {
278 const auto* other_xds = static_cast<const XdsConfigSelector*>(other);
279 // Don't need to compare resolver_, since that will always be the same.
280 return *route_config_data_ == *other_xds->route_config_data_ &&
281 filters_ == other_xds->filters_;
282 }
283
284 absl::Status GetCallConfig(GetCallConfigArgs args) override;
285
GetFilters()286 std::vector<const grpc_channel_filter*> GetFilters() override {
287 return filters_;
288 }
289
290 private:
291 RefCountedPtr<XdsResolver> resolver_;
292 RefCountedPtr<RouteConfigData> route_config_data_;
293 std::vector<const grpc_channel_filter*> filters_;
294 };
295
296 class XdsRouteStateAttributeImpl final : public XdsRouteStateAttribute {
297 public:
XdsRouteStateAttributeImpl(RefCountedPtr<RouteConfigData> route_config_data,RouteConfigData::RouteEntry * route)298 explicit XdsRouteStateAttributeImpl(
299 RefCountedPtr<RouteConfigData> route_config_data,
300 RouteConfigData::RouteEntry* route)
301 : route_config_data_(std::move(route_config_data)), route_(route) {}
302
303 // This method can be called only once. The first call will release
304 // the reference to the cluster map, and subsequent calls will return
305 // nullptr.
306 RefCountedPtr<ClusterRef> LockAndGetCluster(absl::string_view cluster_name);
307
308 bool HasClusterForRoute(absl::string_view cluster_name) const override;
309
310 private:
311 RefCountedPtr<RouteConfigData> route_config_data_;
312 RouteConfigData::RouteEntry* route_;
313 };
314
315 class ClusterSelectionFilter final
316 : public ImplementChannelFilter<ClusterSelectionFilter> {
317 public:
318 const static grpc_channel_filter kFilter;
319
Create(const ChannelArgs &,ChannelFilter::Args filter_args)320 static absl::StatusOr<ClusterSelectionFilter> Create(
321 const ChannelArgs& /* unused */, ChannelFilter::Args filter_args) {
322 return ClusterSelectionFilter(filter_args);
323 }
324
325 // Construct a promise for one call.
326 class Call {
327 public:
328 void OnClientInitialMetadata(ClientMetadata& md);
329 static const NoInterceptor OnServerInitialMetadata;
330 static const NoInterceptor OnServerTrailingMetadata;
331 static const NoInterceptor OnClientToServerMessage;
332 static const NoInterceptor OnServerToClientMessage;
333 static const NoInterceptor OnFinalize;
334 };
335
336 private:
ClusterSelectionFilter(ChannelFilter::Args filter_args)337 explicit ClusterSelectionFilter(ChannelFilter::Args filter_args)
338 : filter_args_(filter_args) {}
339
340 ChannelFilter::Args filter_args_;
341 };
342
GetOrCreateClusterRef(absl::string_view cluster_key,absl::string_view cluster_name)343 RefCountedPtr<ClusterRef> GetOrCreateClusterRef(
344 absl::string_view cluster_key, absl::string_view cluster_name) {
345 auto it = cluster_ref_map_.find(cluster_key);
346 if (it == cluster_ref_map_.end()) {
347 RefCountedPtr<XdsDependencyManager::ClusterSubscription> subscription;
348 if (!cluster_name.empty()) {
349 // The cluster ref will hold a subscription to ensure that the
350 // XdsDependencyManager stays subscribed to the CDS resource as
351 // long as the cluster ref exists.
352 subscription = dependency_mgr_->GetClusterSubscription(cluster_name);
353 }
354 auto cluster = MakeRefCounted<ClusterRef>(
355 RefAsSubclass<XdsResolver>(), std::move(subscription), cluster_key);
356 cluster_ref_map_.emplace(cluster->cluster_key(), cluster->WeakRef());
357 return cluster;
358 }
359 return it->second->Ref();
360 }
361
362 void OnUpdate(RefCountedPtr<const XdsDependencyManager::XdsConfig> config);
363 void OnError(absl::string_view context, absl::Status status);
364 void OnResourceDoesNotExist(std::string context);
365
366 absl::StatusOr<RefCountedPtr<ServiceConfig>> CreateServiceConfig();
367 void GenerateResult();
368 void MaybeRemoveUnusedClusters();
369
370 std::shared_ptr<WorkSerializer> work_serializer_;
371 std::unique_ptr<ResultHandler> result_handler_;
372 ChannelArgs args_;
373 grpc_pollset_set* interested_parties_;
374 URI uri_;
375 RefCountedPtr<GrpcXdsClient> xds_client_;
376 std::string lds_resource_name_;
377 std::string data_plane_authority_;
378 const uint64_t channel_id_;
379
380 OrphanablePtr<XdsDependencyManager> dependency_mgr_;
381 RefCountedPtr<const XdsDependencyManager::XdsConfig> current_config_;
382 std::map<absl::string_view, WeakRefCountedPtr<ClusterRef>> cluster_ref_map_;
383 };
384
385 const NoInterceptor
386 XdsResolver::ClusterSelectionFilter::Call::OnServerInitialMetadata;
387 const NoInterceptor
388 XdsResolver::ClusterSelectionFilter::Call::OnServerTrailingMetadata;
389 const NoInterceptor
390 XdsResolver::ClusterSelectionFilter::Call::OnClientToServerMessage;
391 const NoInterceptor
392 XdsResolver::ClusterSelectionFilter::Call::OnServerToClientMessage;
393 const NoInterceptor XdsResolver::ClusterSelectionFilter::Call::OnFinalize;
394
395 //
396 // XdsResolver::RouteConfigData::RouteListIterator
397 //
398
399 // Implementation of XdsRouting::RouteListIterator for getting the matching
400 // route for a request.
401 class XdsResolver::RouteConfigData::RouteListIterator final
402 : public XdsRouting::RouteListIterator {
403 public:
RouteListIterator(const RouteConfigData * route_table)404 explicit RouteListIterator(const RouteConfigData* route_table)
405 : route_table_(route_table) {}
406
Size() const407 size_t Size() const override { return route_table_->routes_.size(); }
408
GetMatchersForRoute(size_t index) const409 const XdsRouteConfigResource::Route::Matchers& GetMatchersForRoute(
410 size_t index) const override {
411 return route_table_->routes_[index].route.matchers;
412 }
413
414 private:
415 const RouteConfigData* route_table_;
416 };
417
418 //
419 // XdsResolver::RouteConfigData
420 //
421
422 absl::StatusOr<RefCountedPtr<XdsResolver::RouteConfigData>>
Create(XdsResolver * resolver,const Duration & default_max_stream_duration)423 XdsResolver::RouteConfigData::Create(
424 XdsResolver* resolver, const Duration& default_max_stream_duration) {
425 auto data = MakeRefCounted<RouteConfigData>();
426 // Reserve the necessary entries up-front to avoid reallocation as we add
427 // elements. This is necessary because the string_view in the entry's
428 // weighted_cluster_state field points to the memory in the route field, so
429 // moving the entry in a reallocation will cause the string_view to point to
430 // invalid data.
431 data->routes_.reserve(resolver->current_config_->virtual_host->routes.size());
432 for (auto& route : resolver->current_config_->virtual_host->routes) {
433 absl::Status status =
434 data->AddRouteEntry(resolver, route, default_max_stream_duration);
435 if (!status.ok()) {
436 return status;
437 }
438 }
439 return data;
440 }
441
442 XdsResolver::RouteConfigData::RouteEntry*
GetRouteForRequest(absl::string_view path,grpc_metadata_batch * initial_metadata)443 XdsResolver::RouteConfigData::GetRouteForRequest(
444 absl::string_view path, grpc_metadata_batch* initial_metadata) {
445 auto route_index = XdsRouting::GetRouteForRequest(RouteListIterator(this),
446 path, initial_metadata);
447 if (!route_index.has_value()) {
448 return nullptr;
449 }
450 return &routes_[*route_index];
451 }
452
453 absl::StatusOr<RefCountedPtr<ServiceConfig>>
CreateMethodConfig(XdsResolver * resolver,const XdsRouteConfigResource::Route & route,const XdsRouteConfigResource::Route::RouteAction::ClusterWeight * cluster_weight)454 XdsResolver::RouteConfigData::CreateMethodConfig(
455 XdsResolver* resolver, const XdsRouteConfigResource::Route& route,
456 const XdsRouteConfigResource::Route::RouteAction::ClusterWeight*
457 cluster_weight) {
458 std::vector<std::string> fields;
459 const auto& route_action =
460 absl::get<XdsRouteConfigResource::Route::RouteAction>(route.action);
461 // Set retry policy if any.
462 if (route_action.retry_policy.has_value() &&
463 !route_action.retry_policy->retry_on.Empty()) {
464 std::vector<std::string> retry_parts;
465 retry_parts.push_back(absl::StrFormat(
466 "\"retryPolicy\": {\n"
467 " \"maxAttempts\": %d,\n"
468 " \"initialBackoff\": \"%s\",\n"
469 " \"maxBackoff\": \"%s\",\n"
470 " \"backoffMultiplier\": 2,\n",
471 route_action.retry_policy->num_retries + 1,
472 route_action.retry_policy->retry_back_off.base_interval.ToJsonString(),
473 route_action.retry_policy->retry_back_off.max_interval.ToJsonString()));
474 std::vector<std::string> code_parts;
475 if (route_action.retry_policy->retry_on.Contains(GRPC_STATUS_CANCELLED)) {
476 code_parts.push_back(" \"CANCELLED\"");
477 }
478 if (route_action.retry_policy->retry_on.Contains(
479 GRPC_STATUS_DEADLINE_EXCEEDED)) {
480 code_parts.push_back(" \"DEADLINE_EXCEEDED\"");
481 }
482 if (route_action.retry_policy->retry_on.Contains(GRPC_STATUS_INTERNAL)) {
483 code_parts.push_back(" \"INTERNAL\"");
484 }
485 if (route_action.retry_policy->retry_on.Contains(
486 GRPC_STATUS_RESOURCE_EXHAUSTED)) {
487 code_parts.push_back(" \"RESOURCE_EXHAUSTED\"");
488 }
489 if (route_action.retry_policy->retry_on.Contains(GRPC_STATUS_UNAVAILABLE)) {
490 code_parts.push_back(" \"UNAVAILABLE\"");
491 }
492 retry_parts.push_back(
493 absl::StrFormat(" \"retryableStatusCodes\": [\n %s ]\n",
494 absl::StrJoin(code_parts, ",\n")));
495 retry_parts.push_back(" }");
496 fields.emplace_back(absl::StrJoin(retry_parts, ""));
497 }
498 // Set timeout.
499 if (route_action.max_stream_duration.has_value() &&
500 (route_action.max_stream_duration != Duration::Zero())) {
501 fields.emplace_back(
502 absl::StrFormat(" \"timeout\": \"%s\"",
503 route_action.max_stream_duration->ToJsonString()));
504 }
505 // Handle xDS HTTP filters.
506 const auto& hcm = absl::get<XdsListenerResource::HttpConnectionManager>(
507 resolver->current_config_->listener->listener);
508 auto result = XdsRouting::GeneratePerHTTPFilterConfigs(
509 static_cast<const GrpcXdsBootstrap&>(resolver->xds_client_->bootstrap())
510 .http_filter_registry(),
511 hcm.http_filters, *resolver->current_config_->virtual_host, route,
512 cluster_weight, resolver->args_);
513 if (!result.ok()) return result.status();
514 for (const auto& p : result->per_filter_configs) {
515 fields.emplace_back(absl::StrCat(" \"", p.first, "\": [\n",
516 absl::StrJoin(p.second, ",\n"),
517 "\n ]"));
518 }
519 // Construct service config.
520 if (!fields.empty()) {
521 std::string json = absl::StrCat(
522 "{\n"
523 " \"methodConfig\": [ {\n"
524 " \"name\": [\n"
525 " {}\n"
526 " ],\n"
527 " ",
528 absl::StrJoin(fields, ",\n"),
529 "\n } ]\n"
530 "}");
531 return ServiceConfigImpl::Create(result->args, json.c_str());
532 }
533 return nullptr;
534 }
535
AddRouteEntry(XdsResolver * resolver,const XdsRouteConfigResource::Route & route,const Duration & default_max_stream_duration)536 absl::Status XdsResolver::RouteConfigData::AddRouteEntry(
537 XdsResolver* resolver, const XdsRouteConfigResource::Route& route,
538 const Duration& default_max_stream_duration) {
539 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
540 gpr_log(GPR_INFO, "[xds_resolver %p] XdsConfigSelector %p: route: %s",
541 resolver, this, route.ToString().c_str());
542 }
543 routes_.emplace_back(route);
544 auto* route_entry = &routes_.back();
545 auto maybe_add_cluster = [&](absl::string_view cluster_key,
546 absl::string_view cluster_name) {
547 if (clusters_.find(cluster_key) != clusters_.end()) return;
548 auto cluster_state =
549 resolver->GetOrCreateClusterRef(cluster_key, cluster_name);
550 absl::string_view key = cluster_state->cluster_key();
551 clusters_.emplace(key, std::move(cluster_state));
552 };
553 auto* route_action = absl::get_if<XdsRouteConfigResource::Route::RouteAction>(
554 &route_entry->route.action);
555 if (route_action != nullptr) {
556 // If the route doesn't specify a timeout, set its timeout to the global
557 // one.
558 if (!route_action->max_stream_duration.has_value()) {
559 route_action->max_stream_duration = default_max_stream_duration;
560 }
561 absl::Status status = Match(
562 route_action->action,
563 // cluster name
564 [&](const XdsRouteConfigResource::Route::RouteAction::ClusterName&
565 cluster_name) {
566 auto result =
567 CreateMethodConfig(resolver, route_entry->route, nullptr);
568 if (!result.ok()) {
569 return result.status();
570 }
571 route_entry->method_config = std::move(*result);
572 maybe_add_cluster(absl::StrCat("cluster:", cluster_name.cluster_name),
573 cluster_name.cluster_name);
574 return absl::OkStatus();
575 },
576 // WeightedClusters
577 [&](const std::vector<
578 XdsRouteConfigResource::Route::RouteAction::ClusterWeight>&
579 weighted_clusters) {
580 uint32_t end = 0;
581 for (const auto& weighted_cluster : weighted_clusters) {
582 auto result = CreateMethodConfig(resolver, route_entry->route,
583 &weighted_cluster);
584 if (!result.ok()) {
585 return result.status();
586 }
587 RouteEntry::ClusterWeightState cluster_weight_state;
588 cluster_weight_state.method_config = std::move(*result);
589 end += weighted_cluster.weight;
590 cluster_weight_state.range_end = end;
591 cluster_weight_state.cluster = weighted_cluster.name;
592 route_entry->weighted_cluster_state.push_back(
593 std::move(cluster_weight_state));
594 maybe_add_cluster(absl::StrCat("cluster:", weighted_cluster.name),
595 weighted_cluster.name);
596 }
597 return absl::OkStatus();
598 },
599 // ClusterSpecifierPlugin
600 [&](const XdsRouteConfigResource::Route::RouteAction::
601 ClusterSpecifierPluginName& cluster_specifier_plugin_name) {
602 auto result =
603 CreateMethodConfig(resolver, route_entry->route, nullptr);
604 if (!result.ok()) {
605 return result.status();
606 }
607 route_entry->method_config = std::move(*result);
608 maybe_add_cluster(
609 absl::StrCat(
610 "cluster_specifier_plugin:",
611 cluster_specifier_plugin_name.cluster_specifier_plugin_name),
612 /*subscription_name=*/"");
613 return absl::OkStatus();
614 });
615 if (!status.ok()) {
616 return status;
617 }
618 }
619 return absl::OkStatus();
620 }
621
622 //
623 // XdsResolver::XdsConfigSelector
624 //
625
XdsConfigSelector(RefCountedPtr<XdsResolver> resolver,RefCountedPtr<RouteConfigData> route_config_data)626 XdsResolver::XdsConfigSelector::XdsConfigSelector(
627 RefCountedPtr<XdsResolver> resolver,
628 RefCountedPtr<RouteConfigData> route_config_data)
629 : resolver_(std::move(resolver)),
630 route_config_data_(std::move(route_config_data)) {
631 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
632 gpr_log(GPR_INFO, "[xds_resolver %p] creating XdsConfigSelector %p",
633 resolver_.get(), this);
634 }
635 // Populate filter list.
636 const auto& http_filter_registry =
637 static_cast<const GrpcXdsBootstrap&>(resolver_->xds_client_->bootstrap())
638 .http_filter_registry();
639 const auto& hcm = absl::get<XdsListenerResource::HttpConnectionManager>(
640 resolver_->current_config_->listener->listener);
641 for (const auto& http_filter : hcm.http_filters) {
642 // Find filter. This is guaranteed to succeed, because it's checked
643 // at config validation time in the XdsApi code.
644 const XdsHttpFilterImpl* filter_impl =
645 http_filter_registry.GetFilterForType(
646 http_filter.config.config_proto_type_name);
647 GPR_ASSERT(filter_impl != nullptr);
648 // Add C-core filter to list.
649 if (filter_impl->channel_filter() != nullptr) {
650 filters_.push_back(filter_impl->channel_filter());
651 }
652 }
653 filters_.push_back(&ClusterSelectionFilter::kFilter);
654 }
655
~XdsConfigSelector()656 XdsResolver::XdsConfigSelector::~XdsConfigSelector() {
657 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
658 gpr_log(GPR_INFO, "[xds_resolver %p] destroying XdsConfigSelector %p",
659 resolver_.get(), this);
660 }
661 route_config_data_.reset();
662 if (!IsWorkSerializerDispatchEnabled()) {
663 resolver_->MaybeRemoveUnusedClusters();
664 return;
665 }
666 resolver_->work_serializer_->Run(
667 [resolver = std::move(resolver_)]() {
668 resolver->MaybeRemoveUnusedClusters();
669 },
670 DEBUG_LOCATION);
671 }
672
HeaderHashHelper(const XdsRouteConfigResource::Route::RouteAction::HashPolicy::Header & header_policy,grpc_metadata_batch * initial_metadata)673 absl::optional<uint64_t> HeaderHashHelper(
674 const XdsRouteConfigResource::Route::RouteAction::HashPolicy::Header&
675 header_policy,
676 grpc_metadata_batch* initial_metadata) {
677 std::string value_buffer;
678 absl::optional<absl::string_view> header_value = XdsRouting::GetHeaderValue(
679 initial_metadata, header_policy.header_name, &value_buffer);
680 if (!header_value.has_value()) return absl::nullopt;
681 if (header_policy.regex != nullptr) {
682 // If GetHeaderValue() did not already store the value in
683 // value_buffer, copy it there now, so we can modify it.
684 if (header_value->data() != value_buffer.data()) {
685 value_buffer = std::string(*header_value);
686 }
687 RE2::GlobalReplace(&value_buffer, *header_policy.regex,
688 header_policy.regex_substitution);
689 header_value = value_buffer;
690 }
691 return XXH64(header_value->data(), header_value->size(), 0);
692 }
693
GetCallConfig(GetCallConfigArgs args)694 absl::Status XdsResolver::XdsConfigSelector::GetCallConfig(
695 GetCallConfigArgs args) {
696 Slice* path = args.initial_metadata->get_pointer(HttpPathMetadata());
697 GPR_ASSERT(path != nullptr);
698 auto* entry = route_config_data_->GetRouteForRequest(path->as_string_view(),
699 args.initial_metadata);
700 if (entry == nullptr) {
701 return absl::UnavailableError(
702 "No matching route found in xDS route config");
703 }
704 // Found a route match
705 const auto* route_action =
706 absl::get_if<XdsRouteConfigResource::Route::RouteAction>(
707 &entry->route.action);
708 if (route_action == nullptr) {
709 return absl::UnavailableError("Matching route has inappropriate action");
710 }
711 std::string cluster_name;
712 RefCountedPtr<ServiceConfig> method_config;
713 Match(
714 route_action->action,
715 // cluster name
716 [&](const XdsRouteConfigResource::Route::RouteAction::ClusterName&
717 action_cluster_name) {
718 cluster_name =
719 absl::StrCat("cluster:", action_cluster_name.cluster_name);
720 method_config = entry->method_config;
721 },
722 // WeightedClusters
723 [&](const std::vector<
724 XdsRouteConfigResource::Route::RouteAction::ClusterWeight>&
725 /*weighted_clusters*/) {
726 const uint32_t key = absl::Uniform<uint32_t>(
727 absl::BitGen(), 0, entry->weighted_cluster_state.back().range_end);
728 // Find the index in weighted clusters corresponding to key.
729 size_t mid = 0;
730 size_t start_index = 0;
731 size_t end_index = entry->weighted_cluster_state.size() - 1;
732 size_t index = 0;
733 while (end_index > start_index) {
734 mid = (start_index + end_index) / 2;
735 if (entry->weighted_cluster_state[mid].range_end > key) {
736 end_index = mid;
737 } else if (entry->weighted_cluster_state[mid].range_end < key) {
738 start_index = mid + 1;
739 } else {
740 index = mid + 1;
741 break;
742 }
743 }
744 if (index == 0) index = start_index;
745 GPR_ASSERT(entry->weighted_cluster_state[index].range_end > key);
746 cluster_name = absl::StrCat(
747 "cluster:", entry->weighted_cluster_state[index].cluster);
748 method_config = entry->weighted_cluster_state[index].method_config;
749 },
750 // ClusterSpecifierPlugin
751 [&](const XdsRouteConfigResource::Route::RouteAction::
752 ClusterSpecifierPluginName& cluster_specifier_plugin_name) {
753 cluster_name = absl::StrCat(
754 "cluster_specifier_plugin:",
755 cluster_specifier_plugin_name.cluster_specifier_plugin_name);
756 method_config = entry->method_config;
757 });
758 auto cluster = route_config_data_->FindClusterRef(cluster_name);
759 GPR_ASSERT(cluster != nullptr);
760 // Generate a hash.
761 absl::optional<uint64_t> hash;
762 for (const auto& hash_policy : route_action->hash_policies) {
763 absl::optional<uint64_t> new_hash = Match(
764 hash_policy.policy,
765 [&](const XdsRouteConfigResource::Route::RouteAction::HashPolicy::
766 Header& header) {
767 return HeaderHashHelper(header, args.initial_metadata);
768 },
769 [&](const XdsRouteConfigResource::Route::RouteAction::HashPolicy::
770 ChannelId&) -> absl::optional<uint64_t> {
771 return resolver_->channel_id_;
772 });
773 if (new_hash.has_value()) {
774 // Rotating the old value prevents duplicate hash rules from cancelling
775 // each other out and preserves all of the entropy
776 const uint64_t old_value =
777 hash.has_value() ? ((*hash << 1) | (*hash >> 63)) : 0;
778 hash = old_value ^ *new_hash;
779 }
780 // If the policy is a terminal policy and a hash has been generated,
781 // ignore the rest of the hash policies.
782 if (hash_policy.terminal && hash.has_value()) {
783 break;
784 }
785 }
786 if (!hash.has_value()) {
787 hash = absl::Uniform<uint64_t>(absl::BitGen());
788 }
789 // Populate service config call data.
790 if (method_config != nullptr) {
791 auto* parsed_method_configs =
792 method_config->GetMethodParsedConfigVector(grpc_empty_slice());
793 args.service_config_call_data->SetServiceConfig(std::move(method_config),
794 parsed_method_configs);
795 }
796 args.service_config_call_data->SetCallAttribute(
797 args.arena->New<XdsClusterAttribute>(cluster->cluster_key()));
798 args.service_config_call_data->SetCallAttribute(
799 args.arena->New<RequestHashAttribute>(*hash));
800 args.service_config_call_data->SetCallAttribute(
801 args.arena->ManagedNew<XdsRouteStateAttributeImpl>(route_config_data_,
802 entry));
803 return absl::OkStatus();
804 }
805
806 //
807 // XdsResolver::XdsRouteStateAttributeImpl
808 //
809
HasClusterForRoute(absl::string_view cluster_name) const810 bool XdsResolver::XdsRouteStateAttributeImpl::HasClusterForRoute(
811 absl::string_view cluster_name) const {
812 // Found a route match
813 const auto* route_action =
814 absl::get_if<XdsRouteConfigResource::Route::RouteAction>(
815 &static_cast<RouteConfigData::RouteEntry*>(route_)->route.action);
816 if (route_action == nullptr) return false;
817 return Match(
818 route_action->action,
819 [&](const XdsRouteConfigResource::Route::RouteAction::ClusterName& name) {
820 return name.cluster_name == cluster_name;
821 },
822 [&](const std::vector<
823 XdsRouteConfigResource::Route::RouteAction::ClusterWeight>&
824 clusters) {
825 for (const auto& cluster : clusters) {
826 if (cluster.name == cluster_name) {
827 return true;
828 }
829 }
830 return false;
831 },
832 [&](const XdsRouteConfigResource::Route::RouteAction::
833 ClusterSpecifierPluginName& /* name */) { return false; });
834 }
835
836 RefCountedPtr<XdsResolver::ClusterRef>
LockAndGetCluster(absl::string_view cluster_name)837 XdsResolver::XdsRouteStateAttributeImpl::LockAndGetCluster(
838 absl::string_view cluster_name) {
839 if (route_config_data_ == nullptr) {
840 return nullptr;
841 }
842 auto cluster = route_config_data_->FindClusterRef(cluster_name);
843 route_config_data_.reset();
844 return cluster;
845 }
846
847 //
848 // XdsResolver::ClusterSelectionFilter
849 //
850
851 const grpc_channel_filter XdsResolver::ClusterSelectionFilter::kFilter =
852 MakePromiseBasedFilter<ClusterSelectionFilter, FilterEndpoint::kClient,
853 kFilterExaminesServerInitialMetadata>(
854 "cluster_selection_filter");
855
OnClientInitialMetadata(ClientMetadata &)856 void XdsResolver::ClusterSelectionFilter::Call::OnClientInitialMetadata(
857 ClientMetadata&) {
858 auto* service_config_call_data =
859 static_cast<ClientChannelServiceConfigCallData*>(
860 GetContext<grpc_call_context_element>()
861 [GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA]
862 .value);
863 GPR_ASSERT(service_config_call_data != nullptr);
864 auto* route_state_attribute = static_cast<XdsRouteStateAttributeImpl*>(
865 service_config_call_data->GetCallAttribute<XdsRouteStateAttribute>());
866 auto* cluster_name_attribute =
867 service_config_call_data->GetCallAttribute<XdsClusterAttribute>();
868 if (route_state_attribute != nullptr && cluster_name_attribute != nullptr) {
869 auto cluster = route_state_attribute->LockAndGetCluster(
870 cluster_name_attribute->cluster());
871 if (cluster != nullptr) {
872 service_config_call_data->SetOnCommit(
873 [cluster = std::move(cluster)]() mutable { cluster.reset(); });
874 }
875 }
876 }
877
878 //
879 // XdsResolver
880 //
881
StartLocked()882 void XdsResolver::StartLocked() {
883 auto xds_client =
884 GrpcXdsClient::GetOrCreate(uri_.ToString(), args_, "xds resolver");
885 if (!xds_client.ok()) {
886 gpr_log(GPR_ERROR,
887 "Failed to create xds client -- channel will remain in "
888 "TRANSIENT_FAILURE: %s",
889 xds_client.status().ToString().c_str());
890 absl::Status status = absl::UnavailableError(absl::StrCat(
891 "Failed to create XdsClient: ", xds_client.status().message()));
892 Result result;
893 result.addresses = status;
894 result.service_config = std::move(status);
895 result.args = args_;
896 result_handler_->ReportResult(std::move(result));
897 return;
898 }
899 xds_client_ = std::move(*xds_client);
900 grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(),
901 interested_parties_);
902 // Determine LDS resource name.
903 std::string resource_name_fragment(absl::StripPrefix(uri_.path(), "/"));
904 if (!uri_.authority().empty()) {
905 // target_uri.authority is set case
906 const auto* authority_config =
907 static_cast<const GrpcXdsBootstrap::GrpcAuthority*>(
908 xds_client_->bootstrap().LookupAuthority(uri_.authority()));
909 if (authority_config == nullptr) {
910 absl::Status status = absl::UnavailableError(
911 absl::StrCat("Invalid target URI -- authority not found for ",
912 uri_.authority().c_str()));
913 Result result;
914 result.addresses = status;
915 result.service_config = std::move(status);
916 result.args = args_;
917 result_handler_->ReportResult(std::move(result));
918 return;
919 }
920 std::string name_template =
921 authority_config->client_listener_resource_name_template();
922 if (name_template.empty()) {
923 name_template = absl::StrCat(
924 "xdstp://", URI::PercentEncodeAuthority(uri_.authority()),
925 "/envoy.config.listener.v3.Listener/%s");
926 }
927 lds_resource_name_ = absl::StrReplaceAll(
928 name_template,
929 {{"%s", URI::PercentEncodePath(resource_name_fragment)}});
930 } else {
931 // target_uri.authority not set
932 absl::string_view name_template =
933 static_cast<const GrpcXdsBootstrap&>(xds_client_->bootstrap())
934 .client_default_listener_resource_name_template();
935 if (name_template.empty()) {
936 name_template = "%s";
937 }
938 if (absl::StartsWith(name_template, "xdstp:")) {
939 resource_name_fragment = URI::PercentEncodePath(resource_name_fragment);
940 }
941 lds_resource_name_ =
942 absl::StrReplaceAll(name_template, {{"%s", resource_name_fragment}});
943 }
944 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
945 gpr_log(GPR_INFO, "[xds_resolver %p] Started with lds_resource_name %s.",
946 this, lds_resource_name_.c_str());
947 }
948 // Start watch for xDS config.
949 dependency_mgr_ = MakeOrphanable<XdsDependencyManager>(
950 xds_client_, work_serializer_,
951 std::make_unique<XdsWatcher>(RefAsSubclass<XdsResolver>()),
952 data_plane_authority_, lds_resource_name_, args_, interested_parties_);
953 }
954
ShutdownLocked()955 void XdsResolver::ShutdownLocked() {
956 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
957 gpr_log(GPR_INFO, "[xds_resolver %p] shutting down", this);
958 }
959 if (xds_client_ != nullptr) {
960 dependency_mgr_.reset();
961 grpc_pollset_set_del_pollset_set(xds_client_->interested_parties(),
962 interested_parties_);
963 xds_client_.reset(DEBUG_LOCATION, "xds resolver");
964 }
965 }
966
OnUpdate(RefCountedPtr<const XdsDependencyManager::XdsConfig> config)967 void XdsResolver::OnUpdate(
968 RefCountedPtr<const XdsDependencyManager::XdsConfig> config) {
969 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
970 gpr_log(GPR_INFO, "[xds_resolver %p] received updated xDS config", this);
971 }
972 if (xds_client_ == nullptr) return;
973 current_config_ = std::move(config);
974 GenerateResult();
975 }
976
OnError(absl::string_view context,absl::Status status)977 void XdsResolver::OnError(absl::string_view context, absl::Status status) {
978 gpr_log(GPR_ERROR, "[xds_resolver %p] received error from XdsClient: %s: %s",
979 this, std::string(context).c_str(), status.ToString().c_str());
980 if (xds_client_ == nullptr) return;
981 status =
982 absl::UnavailableError(absl::StrCat(context, ": ", status.ToString()));
983 Result result;
984 result.addresses = status;
985 result.service_config = std::move(status);
986 result.args =
987 args_.SetObject(xds_client_.Ref(DEBUG_LOCATION, "xds resolver result"));
988 result_handler_->ReportResult(std::move(result));
989 }
990
OnResourceDoesNotExist(std::string context)991 void XdsResolver::OnResourceDoesNotExist(std::string context) {
992 gpr_log(GPR_ERROR,
993 "[xds_resolver %p] LDS/RDS resource does not exist -- clearing "
994 "update and returning empty service config",
995 this);
996 if (xds_client_ == nullptr) return;
997 current_config_.reset();
998 Result result;
999 result.addresses.emplace();
1000 result.service_config = ServiceConfigImpl::Create(args_, "{}");
1001 GPR_ASSERT(result.service_config.ok());
1002 result.resolution_note = std::move(context);
1003 result.args = args_;
1004 result_handler_->ReportResult(std::move(result));
1005 }
1006
1007 absl::StatusOr<RefCountedPtr<ServiceConfig>>
CreateServiceConfig()1008 XdsResolver::CreateServiceConfig() {
1009 std::vector<std::string> clusters;
1010 for (const auto& cluster : cluster_ref_map_) {
1011 absl::string_view child_name = cluster.first;
1012 if (absl::ConsumePrefix(&child_name, "cluster_specifier_plugin:")) {
1013 clusters.push_back(absl::StrFormat(
1014 " \"%s\":{\n"
1015 " \"childPolicy\": %s\n"
1016 " }",
1017 cluster.first,
1018 current_config_->route_config->cluster_specifier_plugin_map.at(
1019 std::string(child_name))));
1020 } else {
1021 absl::ConsumePrefix(&child_name, "cluster:");
1022 clusters.push_back(
1023 absl::StrFormat(" \"%s\":{\n"
1024 " \"childPolicy\":[ {\n"
1025 " \"cds_experimental\":{\n"
1026 " \"cluster\": \"%s\"\n"
1027 " }\n"
1028 " } ]\n"
1029 " }",
1030 cluster.first, child_name));
1031 }
1032 }
1033 std::vector<std::string> config_parts;
1034 config_parts.push_back(
1035 "{\n"
1036 " \"loadBalancingConfig\":[\n"
1037 " { \"xds_cluster_manager_experimental\":{\n"
1038 " \"children\":{\n");
1039 config_parts.push_back(absl::StrJoin(clusters, ",\n"));
1040 config_parts.push_back(
1041 " }\n"
1042 " } }\n"
1043 " ]\n"
1044 "}");
1045 std::string json = absl::StrJoin(config_parts, "");
1046 return ServiceConfigImpl::Create(args_, json.c_str());
1047 }
1048
GenerateResult()1049 void XdsResolver::GenerateResult() {
1050 if (xds_client_ == nullptr || current_config_ == nullptr) return;
1051 // First create XdsConfigSelector, which may add new entries to the cluster
1052 // state map.
1053 const auto& hcm = absl::get<XdsListenerResource::HttpConnectionManager>(
1054 current_config_->listener->listener);
1055 auto route_config_data =
1056 RouteConfigData::Create(this, hcm.http_max_stream_duration);
1057 if (!route_config_data.ok()) {
1058 OnError("could not create ConfigSelector",
1059 absl::UnavailableError(route_config_data.status().message()));
1060 return;
1061 }
1062 auto config_selector = MakeRefCounted<XdsConfigSelector>(
1063 RefAsSubclass<XdsResolver>(), std::move(*route_config_data));
1064 // Now create the service config.
1065 Result result;
1066 result.addresses.emplace();
1067 result.service_config = CreateServiceConfig();
1068 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
1069 gpr_log(GPR_INFO, "[xds_resolver %p] generated service config: %s", this,
1070 result.service_config.ok()
1071 ? std::string((*result.service_config)->json_string()).c_str()
1072 : result.service_config.status().ToString().c_str());
1073 }
1074 result.args =
1075 args_.SetObject(xds_client_.Ref(DEBUG_LOCATION, "xds resolver result"))
1076 .SetObject(config_selector)
1077 .SetObject(current_config_)
1078 .SetObject(dependency_mgr_->Ref());
1079 result_handler_->ReportResult(std::move(result));
1080 }
1081
MaybeRemoveUnusedClusters()1082 void XdsResolver::MaybeRemoveUnusedClusters() {
1083 bool update_needed = false;
1084 for (auto it = cluster_ref_map_.begin(); it != cluster_ref_map_.end();) {
1085 RefCountedPtr<ClusterRef> cluster_state = it->second->RefIfNonZero();
1086 if (cluster_state != nullptr) {
1087 ++it;
1088 } else {
1089 update_needed = true;
1090 it = cluster_ref_map_.erase(it);
1091 }
1092 }
1093 if (update_needed) GenerateResult();
1094 }
1095
1096 //
1097 // XdsResolverFactory
1098 //
1099
1100 class XdsResolverFactory final : public ResolverFactory {
1101 public:
scheme() const1102 absl::string_view scheme() const override { return "xds"; }
1103
IsValidUri(const URI & uri) const1104 bool IsValidUri(const URI& uri) const override {
1105 if (uri.path().empty() || uri.path().back() == '/') {
1106 gpr_log(GPR_ERROR,
1107 "URI path does not contain valid data plane authority");
1108 return false;
1109 }
1110 return true;
1111 }
1112
CreateResolver(ResolverArgs args) const1113 OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
1114 if (!IsValidUri(args.uri)) return nullptr;
1115 std::string authority = GetDataPlaneAuthority(args.args, args.uri);
1116 return MakeOrphanable<XdsResolver>(std::move(args), std::move(authority));
1117 }
1118
1119 private:
GetDataPlaneAuthority(const ChannelArgs & args,const URI & uri) const1120 std::string GetDataPlaneAuthority(const ChannelArgs& args,
1121 const URI& uri) const {
1122 absl::optional<absl::string_view> authority =
1123 args.GetString(GRPC_ARG_DEFAULT_AUTHORITY);
1124 if (authority.has_value()) return URI::PercentEncodeAuthority(*authority);
1125 return GetDefaultAuthority(uri);
1126 }
1127 };
1128
1129 } // namespace
1130
RegisterXdsResolver(CoreConfiguration::Builder * builder)1131 void RegisterXdsResolver(CoreConfiguration::Builder* builder) {
1132 builder->resolver_registry()->RegisterResolverFactory(
1133 std::make_unique<XdsResolverFactory>());
1134 }
1135
1136 } // namespace grpc_core
1137