xref: /aosp_15_r20/external/grpc-grpc/src/core/resolver/xds/xds_resolver.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 // Copyright 2019 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include <stdint.h>
20 #include <string.h>
21 
22 #include <algorithm>
23 #include <functional>
24 #include <map>
25 #include <memory>
26 #include <string>
27 #include <type_traits>
28 #include <utility>
29 #include <vector>
30 
31 #include "absl/meta/type_traits.h"
32 #include "absl/random/random.h"
33 #include "absl/status/status.h"
34 #include "absl/status/statusor.h"
35 #include "absl/strings/match.h"
36 #include "absl/strings/str_cat.h"
37 #include "absl/strings/str_format.h"
38 #include "absl/strings/str_join.h"
39 #include "absl/strings/str_replace.h"
40 #include "absl/strings/string_view.h"
41 #include "absl/strings/strip.h"
42 #include "absl/types/optional.h"
43 #include "absl/types/variant.h"
44 #include "re2/re2.h"
45 
46 #include <grpc/impl/channel_arg_names.h>
47 #include <grpc/slice.h>
48 #include <grpc/status.h>
49 #include <grpc/support/log.h>
50 
51 #include "src/core/client_channel/client_channel_internal.h"
52 #include "src/core/client_channel/config_selector.h"
53 #include "src/core/ext/xds/xds_bootstrap.h"
54 #include "src/core/ext/xds/xds_bootstrap_grpc.h"
55 #include "src/core/ext/xds/xds_client_grpc.h"
56 #include "src/core/ext/xds/xds_http_filters.h"
57 #include "src/core/ext/xds/xds_listener.h"
58 #include "src/core/ext/xds/xds_route_config.h"
59 #include "src/core/ext/xds/xds_routing.h"
60 #include "src/core/lib/channel/channel_args.h"
61 #include "src/core/lib/channel/channel_fwd.h"
62 #include "src/core/lib/channel/channel_stack.h"
63 #include "src/core/lib/channel/context.h"
64 #include "src/core/lib/channel/promise_based_filter.h"
65 #include "src/core/lib/channel/status_util.h"
66 #include "src/core/lib/config/core_configuration.h"
67 #include "src/core/lib/debug/trace.h"
68 #include "src/core/lib/experiments/experiments.h"
69 #include "src/core/lib/gprpp/debug_location.h"
70 #include "src/core/lib/gprpp/dual_ref_counted.h"
71 #include "src/core/lib/gprpp/match.h"
72 #include "src/core/lib/gprpp/orphanable.h"
73 #include "src/core/lib/gprpp/ref_counted.h"
74 #include "src/core/lib/gprpp/ref_counted_ptr.h"
75 #include "src/core/lib/gprpp/time.h"
76 #include "src/core/lib/gprpp/work_serializer.h"
77 #include "src/core/lib/gprpp/xxhash_inline.h"
78 #include "src/core/lib/iomgr/iomgr_fwd.h"
79 #include "src/core/lib/iomgr/pollset_set.h"
80 #include "src/core/lib/promise/arena_promise.h"
81 #include "src/core/lib/promise/context.h"
82 #include "src/core/lib/resource_quota/arena.h"
83 #include "src/core/lib/slice/slice.h"
84 #include "src/core/lib/transport/metadata_batch.h"
85 #include "src/core/lib/transport/transport.h"
86 #include "src/core/lib/uri/uri_parser.h"
87 #include "src/core/load_balancing/ring_hash/ring_hash.h"
88 #include "src/core/resolver/endpoint_addresses.h"
89 #include "src/core/resolver/resolver.h"
90 #include "src/core/resolver/resolver_factory.h"
91 #include "src/core/resolver/xds/xds_dependency_manager.h"
92 #include "src/core/resolver/xds/xds_resolver_attributes.h"
93 #include "src/core/resolver/xds/xds_resolver_trace.h"
94 #include "src/core/service_config/service_config.h"
95 #include "src/core/service_config/service_config_impl.h"
96 
97 namespace grpc_core {
98 
99 namespace {
100 
101 //
102 // XdsResolver
103 //
104 
105 class XdsResolver final : public Resolver {
106  public:
XdsResolver(ResolverArgs args,std::string data_plane_authority)107   XdsResolver(ResolverArgs args, std::string data_plane_authority)
108       : work_serializer_(std::move(args.work_serializer)),
109         result_handler_(std::move(args.result_handler)),
110         args_(std::move(args.args)),
111         interested_parties_(args.pollset_set),
112         uri_(std::move(args.uri)),
113         data_plane_authority_(std::move(data_plane_authority)),
114         channel_id_(absl::Uniform<uint64_t>(absl::BitGen())) {
115     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
116       gpr_log(
117           GPR_INFO,
118           "[xds_resolver %p] created for URI %s; data plane authority is %s",
119           this, uri_.ToString().c_str(), data_plane_authority_.c_str());
120     }
121   }
122 
~XdsResolver()123   ~XdsResolver() override {
124     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
125       gpr_log(GPR_INFO, "[xds_resolver %p] destroyed", this);
126     }
127   }
128 
129   void StartLocked() override;
130 
131   void ShutdownLocked() override;
132 
ResetBackoffLocked()133   void ResetBackoffLocked() override {
134     if (xds_client_ != nullptr) xds_client_->ResetBackoff();
135   }
136 
137  private:
138   class XdsWatcher final : public XdsDependencyManager::Watcher {
139    public:
XdsWatcher(RefCountedPtr<XdsResolver> resolver)140     explicit XdsWatcher(RefCountedPtr<XdsResolver> resolver)
141         : resolver_(std::move(resolver)) {}
142 
OnUpdate(RefCountedPtr<const XdsDependencyManager::XdsConfig> config)143     void OnUpdate(
144         RefCountedPtr<const XdsDependencyManager::XdsConfig> config) override {
145       resolver_->OnUpdate(std::move(config));
146     }
147 
OnError(absl::string_view context,absl::Status status)148     void OnError(absl::string_view context, absl::Status status) override {
149       resolver_->OnError(context, std::move(status));
150     }
151 
OnResourceDoesNotExist(std::string context)152     void OnResourceDoesNotExist(std::string context) override {
153       resolver_->OnResourceDoesNotExist(std::move(context));
154     }
155 
156    private:
157     RefCountedPtr<XdsResolver> resolver_;
158   };
159 
160   // An entry in the map of clusters that need to be present in the LB
161   // policy config.  The map holds a weak ref.  One strong ref is held by
162   // the ConfigSelector, and another is held by each call assigned to
163   // the cluster by the ConfigSelector.  The ref for each call is held
164   // until the call is committed.  When the strong refs go away, we hop
165   // back into the WorkSerializer to remove the entry from the map.
166   class ClusterRef final : public DualRefCounted<ClusterRef> {
167    public:
ClusterRef(RefCountedPtr<XdsResolver> resolver,RefCountedPtr<XdsDependencyManager::ClusterSubscription> cluster_subscription,absl::string_view cluster_key)168     ClusterRef(RefCountedPtr<XdsResolver> resolver,
169                RefCountedPtr<XdsDependencyManager::ClusterSubscription>
170                    cluster_subscription,
171                absl::string_view cluster_key)
172         : resolver_(std::move(resolver)),
173           cluster_subscription_(std::move(cluster_subscription)),
174           cluster_key_(cluster_key) {}
175 
Orphaned()176     void Orphaned() override {
177       XdsResolver* resolver_ptr = resolver_.get();
178       resolver_ptr->work_serializer_->Run(
179           [resolver = std::move(resolver_)]() {
180             resolver->MaybeRemoveUnusedClusters();
181           },
182           DEBUG_LOCATION);
183       cluster_subscription_.reset();
184     }
185 
cluster_key() const186     const std::string& cluster_key() const { return cluster_key_; }
187 
188    private:
189     RefCountedPtr<XdsResolver> resolver_;
190     RefCountedPtr<XdsDependencyManager::ClusterSubscription>
191         cluster_subscription_;
192     std::string cluster_key_;
193   };
194 
195   // A routing data including cluster refs and routes table held by the
196   // XdsConfigSelector. A ref to this map will be taken by each call processed
197   // by the XdsConfigSelector, stored in a the call's call attributes, and later
198   // unreffed by the ClusterSelection filter.
199   class RouteConfigData final : public RefCounted<RouteConfigData> {
200    public:
201     struct RouteEntry {
202       struct ClusterWeightState {
203         uint32_t range_end;
204         absl::string_view cluster;
205         RefCountedPtr<ServiceConfig> method_config;
206 
operator ==grpc_core::__anon886d5f1d0111::XdsResolver::RouteConfigData::RouteEntry::ClusterWeightState207         bool operator==(const ClusterWeightState& other) const {
208           return range_end == other.range_end && cluster == other.cluster &&
209                  MethodConfigsEqual(method_config.get(),
210                                     other.method_config.get());
211         }
212       };
213 
214       XdsRouteConfigResource::Route route;
215       RefCountedPtr<ServiceConfig> method_config;
216       std::vector<ClusterWeightState> weighted_cluster_state;
217 
RouteEntrygrpc_core::__anon886d5f1d0111::XdsResolver::RouteConfigData::RouteEntry218       explicit RouteEntry(const XdsRouteConfigResource::Route& r) : route(r) {}
219 
operator ==grpc_core::__anon886d5f1d0111::XdsResolver::RouteConfigData::RouteEntry220       bool operator==(const RouteEntry& other) const {
221         return route == other.route &&
222                weighted_cluster_state == other.weighted_cluster_state &&
223                MethodConfigsEqual(method_config.get(),
224                                   other.method_config.get());
225       }
226     };
227 
228     static absl::StatusOr<RefCountedPtr<RouteConfigData>> Create(
229         XdsResolver* resolver, const Duration& default_max_stream_duration);
230 
operator ==(const RouteConfigData & other) const231     bool operator==(const RouteConfigData& other) const {
232       return clusters_ == other.clusters_ && routes_ == other.routes_;
233     }
234 
FindClusterRef(absl::string_view name) const235     RefCountedPtr<ClusterRef> FindClusterRef(absl::string_view name) const {
236       auto it = clusters_.find(name);
237       if (it == clusters_.end()) {
238         return nullptr;
239       }
240       return it->second;
241     }
242 
243     RouteEntry* GetRouteForRequest(absl::string_view path,
244                                    grpc_metadata_batch* initial_metadata);
245 
246    private:
247     class RouteListIterator;
248 
249     static absl::StatusOr<RefCountedPtr<ServiceConfig>> CreateMethodConfig(
250         XdsResolver* resolver, const XdsRouteConfigResource::Route& route,
251         const XdsRouteConfigResource::Route::RouteAction::ClusterWeight*
252             cluster_weight);
253 
MethodConfigsEqual(const ServiceConfig * sc1,const ServiceConfig * sc2)254     static bool MethodConfigsEqual(const ServiceConfig* sc1,
255                                    const ServiceConfig* sc2) {
256       if (sc1 == nullptr) return sc2 == nullptr;
257       if (sc2 == nullptr) return false;
258       return sc1->json_string() == sc2->json_string();
259     }
260 
261     absl::Status AddRouteEntry(XdsResolver* resolver,
262                                const XdsRouteConfigResource::Route& route,
263                                const Duration& default_max_stream_duration);
264 
265     std::map<absl::string_view, RefCountedPtr<ClusterRef>> clusters_;
266     std::vector<RouteEntry> routes_;
267   };
268 
269   class XdsConfigSelector final : public ConfigSelector {
270    public:
271     XdsConfigSelector(RefCountedPtr<XdsResolver> resolver,
272                       RefCountedPtr<RouteConfigData> route_config_data);
273     ~XdsConfigSelector() override;
274 
name() const275     const char* name() const override { return "XdsConfigSelector"; }
276 
Equals(const ConfigSelector * other) const277     bool Equals(const ConfigSelector* other) const override {
278       const auto* other_xds = static_cast<const XdsConfigSelector*>(other);
279       // Don't need to compare resolver_, since that will always be the same.
280       return *route_config_data_ == *other_xds->route_config_data_ &&
281              filters_ == other_xds->filters_;
282     }
283 
284     absl::Status GetCallConfig(GetCallConfigArgs args) override;
285 
GetFilters()286     std::vector<const grpc_channel_filter*> GetFilters() override {
287       return filters_;
288     }
289 
290    private:
291     RefCountedPtr<XdsResolver> resolver_;
292     RefCountedPtr<RouteConfigData> route_config_data_;
293     std::vector<const grpc_channel_filter*> filters_;
294   };
295 
296   class XdsRouteStateAttributeImpl final : public XdsRouteStateAttribute {
297    public:
XdsRouteStateAttributeImpl(RefCountedPtr<RouteConfigData> route_config_data,RouteConfigData::RouteEntry * route)298     explicit XdsRouteStateAttributeImpl(
299         RefCountedPtr<RouteConfigData> route_config_data,
300         RouteConfigData::RouteEntry* route)
301         : route_config_data_(std::move(route_config_data)), route_(route) {}
302 
303     // This method can be called only once. The first call will release
304     // the reference to the cluster map, and subsequent calls will return
305     // nullptr.
306     RefCountedPtr<ClusterRef> LockAndGetCluster(absl::string_view cluster_name);
307 
308     bool HasClusterForRoute(absl::string_view cluster_name) const override;
309 
310    private:
311     RefCountedPtr<RouteConfigData> route_config_data_;
312     RouteConfigData::RouteEntry* route_;
313   };
314 
315   class ClusterSelectionFilter final
316       : public ImplementChannelFilter<ClusterSelectionFilter> {
317    public:
318     const static grpc_channel_filter kFilter;
319 
Create(const ChannelArgs &,ChannelFilter::Args filter_args)320     static absl::StatusOr<ClusterSelectionFilter> Create(
321         const ChannelArgs& /* unused */, ChannelFilter::Args filter_args) {
322       return ClusterSelectionFilter(filter_args);
323     }
324 
325     // Construct a promise for one call.
326     class Call {
327      public:
328       void OnClientInitialMetadata(ClientMetadata& md);
329       static const NoInterceptor OnServerInitialMetadata;
330       static const NoInterceptor OnServerTrailingMetadata;
331       static const NoInterceptor OnClientToServerMessage;
332       static const NoInterceptor OnServerToClientMessage;
333       static const NoInterceptor OnFinalize;
334     };
335 
336    private:
ClusterSelectionFilter(ChannelFilter::Args filter_args)337     explicit ClusterSelectionFilter(ChannelFilter::Args filter_args)
338         : filter_args_(filter_args) {}
339 
340     ChannelFilter::Args filter_args_;
341   };
342 
GetOrCreateClusterRef(absl::string_view cluster_key,absl::string_view cluster_name)343   RefCountedPtr<ClusterRef> GetOrCreateClusterRef(
344       absl::string_view cluster_key, absl::string_view cluster_name) {
345     auto it = cluster_ref_map_.find(cluster_key);
346     if (it == cluster_ref_map_.end()) {
347       RefCountedPtr<XdsDependencyManager::ClusterSubscription> subscription;
348       if (!cluster_name.empty()) {
349         // The cluster ref will hold a subscription to ensure that the
350         // XdsDependencyManager stays subscribed to the CDS resource as
351         // long as the cluster ref exists.
352         subscription = dependency_mgr_->GetClusterSubscription(cluster_name);
353       }
354       auto cluster = MakeRefCounted<ClusterRef>(
355           RefAsSubclass<XdsResolver>(), std::move(subscription), cluster_key);
356       cluster_ref_map_.emplace(cluster->cluster_key(), cluster->WeakRef());
357       return cluster;
358     }
359     return it->second->Ref();
360   }
361 
362   void OnUpdate(RefCountedPtr<const XdsDependencyManager::XdsConfig> config);
363   void OnError(absl::string_view context, absl::Status status);
364   void OnResourceDoesNotExist(std::string context);
365 
366   absl::StatusOr<RefCountedPtr<ServiceConfig>> CreateServiceConfig();
367   void GenerateResult();
368   void MaybeRemoveUnusedClusters();
369 
370   std::shared_ptr<WorkSerializer> work_serializer_;
371   std::unique_ptr<ResultHandler> result_handler_;
372   ChannelArgs args_;
373   grpc_pollset_set* interested_parties_;
374   URI uri_;
375   RefCountedPtr<GrpcXdsClient> xds_client_;
376   std::string lds_resource_name_;
377   std::string data_plane_authority_;
378   const uint64_t channel_id_;
379 
380   OrphanablePtr<XdsDependencyManager> dependency_mgr_;
381   RefCountedPtr<const XdsDependencyManager::XdsConfig> current_config_;
382   std::map<absl::string_view, WeakRefCountedPtr<ClusterRef>> cluster_ref_map_;
383 };
384 
385 const NoInterceptor
386     XdsResolver::ClusterSelectionFilter::Call::OnServerInitialMetadata;
387 const NoInterceptor
388     XdsResolver::ClusterSelectionFilter::Call::OnServerTrailingMetadata;
389 const NoInterceptor
390     XdsResolver::ClusterSelectionFilter::Call::OnClientToServerMessage;
391 const NoInterceptor
392     XdsResolver::ClusterSelectionFilter::Call::OnServerToClientMessage;
393 const NoInterceptor XdsResolver::ClusterSelectionFilter::Call::OnFinalize;
394 
395 //
396 // XdsResolver::RouteConfigData::RouteListIterator
397 //
398 
399 // Implementation of XdsRouting::RouteListIterator for getting the matching
400 // route for a request.
401 class XdsResolver::RouteConfigData::RouteListIterator final
402     : public XdsRouting::RouteListIterator {
403  public:
RouteListIterator(const RouteConfigData * route_table)404   explicit RouteListIterator(const RouteConfigData* route_table)
405       : route_table_(route_table) {}
406 
Size() const407   size_t Size() const override { return route_table_->routes_.size(); }
408 
GetMatchersForRoute(size_t index) const409   const XdsRouteConfigResource::Route::Matchers& GetMatchersForRoute(
410       size_t index) const override {
411     return route_table_->routes_[index].route.matchers;
412   }
413 
414  private:
415   const RouteConfigData* route_table_;
416 };
417 
418 //
419 // XdsResolver::RouteConfigData
420 //
421 
422 absl::StatusOr<RefCountedPtr<XdsResolver::RouteConfigData>>
Create(XdsResolver * resolver,const Duration & default_max_stream_duration)423 XdsResolver::RouteConfigData::Create(
424     XdsResolver* resolver, const Duration& default_max_stream_duration) {
425   auto data = MakeRefCounted<RouteConfigData>();
426   // Reserve the necessary entries up-front to avoid reallocation as we add
427   // elements. This is necessary because the string_view in the entry's
428   // weighted_cluster_state field points to the memory in the route field, so
429   // moving the entry in a reallocation will cause the string_view to point to
430   // invalid data.
431   data->routes_.reserve(resolver->current_config_->virtual_host->routes.size());
432   for (auto& route : resolver->current_config_->virtual_host->routes) {
433     absl::Status status =
434         data->AddRouteEntry(resolver, route, default_max_stream_duration);
435     if (!status.ok()) {
436       return status;
437     }
438   }
439   return data;
440 }
441 
442 XdsResolver::RouteConfigData::RouteEntry*
GetRouteForRequest(absl::string_view path,grpc_metadata_batch * initial_metadata)443 XdsResolver::RouteConfigData::GetRouteForRequest(
444     absl::string_view path, grpc_metadata_batch* initial_metadata) {
445   auto route_index = XdsRouting::GetRouteForRequest(RouteListIterator(this),
446                                                     path, initial_metadata);
447   if (!route_index.has_value()) {
448     return nullptr;
449   }
450   return &routes_[*route_index];
451 }
452 
453 absl::StatusOr<RefCountedPtr<ServiceConfig>>
CreateMethodConfig(XdsResolver * resolver,const XdsRouteConfigResource::Route & route,const XdsRouteConfigResource::Route::RouteAction::ClusterWeight * cluster_weight)454 XdsResolver::RouteConfigData::CreateMethodConfig(
455     XdsResolver* resolver, const XdsRouteConfigResource::Route& route,
456     const XdsRouteConfigResource::Route::RouteAction::ClusterWeight*
457         cluster_weight) {
458   std::vector<std::string> fields;
459   const auto& route_action =
460       absl::get<XdsRouteConfigResource::Route::RouteAction>(route.action);
461   // Set retry policy if any.
462   if (route_action.retry_policy.has_value() &&
463       !route_action.retry_policy->retry_on.Empty()) {
464     std::vector<std::string> retry_parts;
465     retry_parts.push_back(absl::StrFormat(
466         "\"retryPolicy\": {\n"
467         "      \"maxAttempts\": %d,\n"
468         "      \"initialBackoff\": \"%s\",\n"
469         "      \"maxBackoff\": \"%s\",\n"
470         "      \"backoffMultiplier\": 2,\n",
471         route_action.retry_policy->num_retries + 1,
472         route_action.retry_policy->retry_back_off.base_interval.ToJsonString(),
473         route_action.retry_policy->retry_back_off.max_interval.ToJsonString()));
474     std::vector<std::string> code_parts;
475     if (route_action.retry_policy->retry_on.Contains(GRPC_STATUS_CANCELLED)) {
476       code_parts.push_back("        \"CANCELLED\"");
477     }
478     if (route_action.retry_policy->retry_on.Contains(
479             GRPC_STATUS_DEADLINE_EXCEEDED)) {
480       code_parts.push_back("        \"DEADLINE_EXCEEDED\"");
481     }
482     if (route_action.retry_policy->retry_on.Contains(GRPC_STATUS_INTERNAL)) {
483       code_parts.push_back("        \"INTERNAL\"");
484     }
485     if (route_action.retry_policy->retry_on.Contains(
486             GRPC_STATUS_RESOURCE_EXHAUSTED)) {
487       code_parts.push_back("        \"RESOURCE_EXHAUSTED\"");
488     }
489     if (route_action.retry_policy->retry_on.Contains(GRPC_STATUS_UNAVAILABLE)) {
490       code_parts.push_back("        \"UNAVAILABLE\"");
491     }
492     retry_parts.push_back(
493         absl::StrFormat("      \"retryableStatusCodes\": [\n %s ]\n",
494                         absl::StrJoin(code_parts, ",\n")));
495     retry_parts.push_back("    }");
496     fields.emplace_back(absl::StrJoin(retry_parts, ""));
497   }
498   // Set timeout.
499   if (route_action.max_stream_duration.has_value() &&
500       (route_action.max_stream_duration != Duration::Zero())) {
501     fields.emplace_back(
502         absl::StrFormat("    \"timeout\": \"%s\"",
503                         route_action.max_stream_duration->ToJsonString()));
504   }
505   // Handle xDS HTTP filters.
506   const auto& hcm = absl::get<XdsListenerResource::HttpConnectionManager>(
507       resolver->current_config_->listener->listener);
508   auto result = XdsRouting::GeneratePerHTTPFilterConfigs(
509       static_cast<const GrpcXdsBootstrap&>(resolver->xds_client_->bootstrap())
510           .http_filter_registry(),
511       hcm.http_filters, *resolver->current_config_->virtual_host, route,
512       cluster_weight, resolver->args_);
513   if (!result.ok()) return result.status();
514   for (const auto& p : result->per_filter_configs) {
515     fields.emplace_back(absl::StrCat("    \"", p.first, "\": [\n",
516                                      absl::StrJoin(p.second, ",\n"),
517                                      "\n    ]"));
518   }
519   // Construct service config.
520   if (!fields.empty()) {
521     std::string json = absl::StrCat(
522         "{\n"
523         "  \"methodConfig\": [ {\n"
524         "    \"name\": [\n"
525         "      {}\n"
526         "    ],\n"
527         "    ",
528         absl::StrJoin(fields, ",\n"),
529         "\n  } ]\n"
530         "}");
531     return ServiceConfigImpl::Create(result->args, json.c_str());
532   }
533   return nullptr;
534 }
535 
AddRouteEntry(XdsResolver * resolver,const XdsRouteConfigResource::Route & route,const Duration & default_max_stream_duration)536 absl::Status XdsResolver::RouteConfigData::AddRouteEntry(
537     XdsResolver* resolver, const XdsRouteConfigResource::Route& route,
538     const Duration& default_max_stream_duration) {
539   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
540     gpr_log(GPR_INFO, "[xds_resolver %p] XdsConfigSelector %p: route: %s",
541             resolver, this, route.ToString().c_str());
542   }
543   routes_.emplace_back(route);
544   auto* route_entry = &routes_.back();
545   auto maybe_add_cluster = [&](absl::string_view cluster_key,
546                                absl::string_view cluster_name) {
547     if (clusters_.find(cluster_key) != clusters_.end()) return;
548     auto cluster_state =
549         resolver->GetOrCreateClusterRef(cluster_key, cluster_name);
550     absl::string_view key = cluster_state->cluster_key();
551     clusters_.emplace(key, std::move(cluster_state));
552   };
553   auto* route_action = absl::get_if<XdsRouteConfigResource::Route::RouteAction>(
554       &route_entry->route.action);
555   if (route_action != nullptr) {
556     // If the route doesn't specify a timeout, set its timeout to the global
557     // one.
558     if (!route_action->max_stream_duration.has_value()) {
559       route_action->max_stream_duration = default_max_stream_duration;
560     }
561     absl::Status status = Match(
562         route_action->action,
563         // cluster name
564         [&](const XdsRouteConfigResource::Route::RouteAction::ClusterName&
565                 cluster_name) {
566           auto result =
567               CreateMethodConfig(resolver, route_entry->route, nullptr);
568           if (!result.ok()) {
569             return result.status();
570           }
571           route_entry->method_config = std::move(*result);
572           maybe_add_cluster(absl::StrCat("cluster:", cluster_name.cluster_name),
573                             cluster_name.cluster_name);
574           return absl::OkStatus();
575         },
576         // WeightedClusters
577         [&](const std::vector<
578             XdsRouteConfigResource::Route::RouteAction::ClusterWeight>&
579                 weighted_clusters) {
580           uint32_t end = 0;
581           for (const auto& weighted_cluster : weighted_clusters) {
582             auto result = CreateMethodConfig(resolver, route_entry->route,
583                                              &weighted_cluster);
584             if (!result.ok()) {
585               return result.status();
586             }
587             RouteEntry::ClusterWeightState cluster_weight_state;
588             cluster_weight_state.method_config = std::move(*result);
589             end += weighted_cluster.weight;
590             cluster_weight_state.range_end = end;
591             cluster_weight_state.cluster = weighted_cluster.name;
592             route_entry->weighted_cluster_state.push_back(
593                 std::move(cluster_weight_state));
594             maybe_add_cluster(absl::StrCat("cluster:", weighted_cluster.name),
595                               weighted_cluster.name);
596           }
597           return absl::OkStatus();
598         },
599         // ClusterSpecifierPlugin
600         [&](const XdsRouteConfigResource::Route::RouteAction::
601                 ClusterSpecifierPluginName& cluster_specifier_plugin_name) {
602           auto result =
603               CreateMethodConfig(resolver, route_entry->route, nullptr);
604           if (!result.ok()) {
605             return result.status();
606           }
607           route_entry->method_config = std::move(*result);
608           maybe_add_cluster(
609               absl::StrCat(
610                   "cluster_specifier_plugin:",
611                   cluster_specifier_plugin_name.cluster_specifier_plugin_name),
612               /*subscription_name=*/"");
613           return absl::OkStatus();
614         });
615     if (!status.ok()) {
616       return status;
617     }
618   }
619   return absl::OkStatus();
620 }
621 
622 //
623 // XdsResolver::XdsConfigSelector
624 //
625 
XdsConfigSelector(RefCountedPtr<XdsResolver> resolver,RefCountedPtr<RouteConfigData> route_config_data)626 XdsResolver::XdsConfigSelector::XdsConfigSelector(
627     RefCountedPtr<XdsResolver> resolver,
628     RefCountedPtr<RouteConfigData> route_config_data)
629     : resolver_(std::move(resolver)),
630       route_config_data_(std::move(route_config_data)) {
631   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
632     gpr_log(GPR_INFO, "[xds_resolver %p] creating XdsConfigSelector %p",
633             resolver_.get(), this);
634   }
635   // Populate filter list.
636   const auto& http_filter_registry =
637       static_cast<const GrpcXdsBootstrap&>(resolver_->xds_client_->bootstrap())
638           .http_filter_registry();
639   const auto& hcm = absl::get<XdsListenerResource::HttpConnectionManager>(
640       resolver_->current_config_->listener->listener);
641   for (const auto& http_filter : hcm.http_filters) {
642     // Find filter.  This is guaranteed to succeed, because it's checked
643     // at config validation time in the XdsApi code.
644     const XdsHttpFilterImpl* filter_impl =
645         http_filter_registry.GetFilterForType(
646             http_filter.config.config_proto_type_name);
647     GPR_ASSERT(filter_impl != nullptr);
648     // Add C-core filter to list.
649     if (filter_impl->channel_filter() != nullptr) {
650       filters_.push_back(filter_impl->channel_filter());
651     }
652   }
653   filters_.push_back(&ClusterSelectionFilter::kFilter);
654 }
655 
~XdsConfigSelector()656 XdsResolver::XdsConfigSelector::~XdsConfigSelector() {
657   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
658     gpr_log(GPR_INFO, "[xds_resolver %p] destroying XdsConfigSelector %p",
659             resolver_.get(), this);
660   }
661   route_config_data_.reset();
662   if (!IsWorkSerializerDispatchEnabled()) {
663     resolver_->MaybeRemoveUnusedClusters();
664     return;
665   }
666   resolver_->work_serializer_->Run(
667       [resolver = std::move(resolver_)]() {
668         resolver->MaybeRemoveUnusedClusters();
669       },
670       DEBUG_LOCATION);
671 }
672 
HeaderHashHelper(const XdsRouteConfigResource::Route::RouteAction::HashPolicy::Header & header_policy,grpc_metadata_batch * initial_metadata)673 absl::optional<uint64_t> HeaderHashHelper(
674     const XdsRouteConfigResource::Route::RouteAction::HashPolicy::Header&
675         header_policy,
676     grpc_metadata_batch* initial_metadata) {
677   std::string value_buffer;
678   absl::optional<absl::string_view> header_value = XdsRouting::GetHeaderValue(
679       initial_metadata, header_policy.header_name, &value_buffer);
680   if (!header_value.has_value()) return absl::nullopt;
681   if (header_policy.regex != nullptr) {
682     // If GetHeaderValue() did not already store the value in
683     // value_buffer, copy it there now, so we can modify it.
684     if (header_value->data() != value_buffer.data()) {
685       value_buffer = std::string(*header_value);
686     }
687     RE2::GlobalReplace(&value_buffer, *header_policy.regex,
688                        header_policy.regex_substitution);
689     header_value = value_buffer;
690   }
691   return XXH64(header_value->data(), header_value->size(), 0);
692 }
693 
GetCallConfig(GetCallConfigArgs args)694 absl::Status XdsResolver::XdsConfigSelector::GetCallConfig(
695     GetCallConfigArgs args) {
696   Slice* path = args.initial_metadata->get_pointer(HttpPathMetadata());
697   GPR_ASSERT(path != nullptr);
698   auto* entry = route_config_data_->GetRouteForRequest(path->as_string_view(),
699                                                        args.initial_metadata);
700   if (entry == nullptr) {
701     return absl::UnavailableError(
702         "No matching route found in xDS route config");
703   }
704   // Found a route match
705   const auto* route_action =
706       absl::get_if<XdsRouteConfigResource::Route::RouteAction>(
707           &entry->route.action);
708   if (route_action == nullptr) {
709     return absl::UnavailableError("Matching route has inappropriate action");
710   }
711   std::string cluster_name;
712   RefCountedPtr<ServiceConfig> method_config;
713   Match(
714       route_action->action,
715       // cluster name
716       [&](const XdsRouteConfigResource::Route::RouteAction::ClusterName&
717               action_cluster_name) {
718         cluster_name =
719             absl::StrCat("cluster:", action_cluster_name.cluster_name);
720         method_config = entry->method_config;
721       },
722       // WeightedClusters
723       [&](const std::vector<
724           XdsRouteConfigResource::Route::RouteAction::ClusterWeight>&
725           /*weighted_clusters*/) {
726         const uint32_t key = absl::Uniform<uint32_t>(
727             absl::BitGen(), 0, entry->weighted_cluster_state.back().range_end);
728         // Find the index in weighted clusters corresponding to key.
729         size_t mid = 0;
730         size_t start_index = 0;
731         size_t end_index = entry->weighted_cluster_state.size() - 1;
732         size_t index = 0;
733         while (end_index > start_index) {
734           mid = (start_index + end_index) / 2;
735           if (entry->weighted_cluster_state[mid].range_end > key) {
736             end_index = mid;
737           } else if (entry->weighted_cluster_state[mid].range_end < key) {
738             start_index = mid + 1;
739           } else {
740             index = mid + 1;
741             break;
742           }
743         }
744         if (index == 0) index = start_index;
745         GPR_ASSERT(entry->weighted_cluster_state[index].range_end > key);
746         cluster_name = absl::StrCat(
747             "cluster:", entry->weighted_cluster_state[index].cluster);
748         method_config = entry->weighted_cluster_state[index].method_config;
749       },
750       // ClusterSpecifierPlugin
751       [&](const XdsRouteConfigResource::Route::RouteAction::
752               ClusterSpecifierPluginName& cluster_specifier_plugin_name) {
753         cluster_name = absl::StrCat(
754             "cluster_specifier_plugin:",
755             cluster_specifier_plugin_name.cluster_specifier_plugin_name);
756         method_config = entry->method_config;
757       });
758   auto cluster = route_config_data_->FindClusterRef(cluster_name);
759   GPR_ASSERT(cluster != nullptr);
760   // Generate a hash.
761   absl::optional<uint64_t> hash;
762   for (const auto& hash_policy : route_action->hash_policies) {
763     absl::optional<uint64_t> new_hash = Match(
764         hash_policy.policy,
765         [&](const XdsRouteConfigResource::Route::RouteAction::HashPolicy::
766                 Header& header) {
767           return HeaderHashHelper(header, args.initial_metadata);
768         },
769         [&](const XdsRouteConfigResource::Route::RouteAction::HashPolicy::
770                 ChannelId&) -> absl::optional<uint64_t> {
771           return resolver_->channel_id_;
772         });
773     if (new_hash.has_value()) {
774       // Rotating the old value prevents duplicate hash rules from cancelling
775       // each other out and preserves all of the entropy
776       const uint64_t old_value =
777           hash.has_value() ? ((*hash << 1) | (*hash >> 63)) : 0;
778       hash = old_value ^ *new_hash;
779     }
780     // If the policy is a terminal policy and a hash has been generated,
781     // ignore the rest of the hash policies.
782     if (hash_policy.terminal && hash.has_value()) {
783       break;
784     }
785   }
786   if (!hash.has_value()) {
787     hash = absl::Uniform<uint64_t>(absl::BitGen());
788   }
789   // Populate service config call data.
790   if (method_config != nullptr) {
791     auto* parsed_method_configs =
792         method_config->GetMethodParsedConfigVector(grpc_empty_slice());
793     args.service_config_call_data->SetServiceConfig(std::move(method_config),
794                                                     parsed_method_configs);
795   }
796   args.service_config_call_data->SetCallAttribute(
797       args.arena->New<XdsClusterAttribute>(cluster->cluster_key()));
798   args.service_config_call_data->SetCallAttribute(
799       args.arena->New<RequestHashAttribute>(*hash));
800   args.service_config_call_data->SetCallAttribute(
801       args.arena->ManagedNew<XdsRouteStateAttributeImpl>(route_config_data_,
802                                                          entry));
803   return absl::OkStatus();
804 }
805 
806 //
807 // XdsResolver::XdsRouteStateAttributeImpl
808 //
809 
HasClusterForRoute(absl::string_view cluster_name) const810 bool XdsResolver::XdsRouteStateAttributeImpl::HasClusterForRoute(
811     absl::string_view cluster_name) const {
812   // Found a route match
813   const auto* route_action =
814       absl::get_if<XdsRouteConfigResource::Route::RouteAction>(
815           &static_cast<RouteConfigData::RouteEntry*>(route_)->route.action);
816   if (route_action == nullptr) return false;
817   return Match(
818       route_action->action,
819       [&](const XdsRouteConfigResource::Route::RouteAction::ClusterName& name) {
820         return name.cluster_name == cluster_name;
821       },
822       [&](const std::vector<
823           XdsRouteConfigResource::Route::RouteAction::ClusterWeight>&
824               clusters) {
825         for (const auto& cluster : clusters) {
826           if (cluster.name == cluster_name) {
827             return true;
828           }
829         }
830         return false;
831       },
832       [&](const XdsRouteConfigResource::Route::RouteAction::
833               ClusterSpecifierPluginName& /* name */) { return false; });
834 }
835 
836 RefCountedPtr<XdsResolver::ClusterRef>
LockAndGetCluster(absl::string_view cluster_name)837 XdsResolver::XdsRouteStateAttributeImpl::LockAndGetCluster(
838     absl::string_view cluster_name) {
839   if (route_config_data_ == nullptr) {
840     return nullptr;
841   }
842   auto cluster = route_config_data_->FindClusterRef(cluster_name);
843   route_config_data_.reset();
844   return cluster;
845 }
846 
847 //
848 // XdsResolver::ClusterSelectionFilter
849 //
850 
851 const grpc_channel_filter XdsResolver::ClusterSelectionFilter::kFilter =
852     MakePromiseBasedFilter<ClusterSelectionFilter, FilterEndpoint::kClient,
853                            kFilterExaminesServerInitialMetadata>(
854         "cluster_selection_filter");
855 
OnClientInitialMetadata(ClientMetadata &)856 void XdsResolver::ClusterSelectionFilter::Call::OnClientInitialMetadata(
857     ClientMetadata&) {
858   auto* service_config_call_data =
859       static_cast<ClientChannelServiceConfigCallData*>(
860           GetContext<grpc_call_context_element>()
861               [GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA]
862                   .value);
863   GPR_ASSERT(service_config_call_data != nullptr);
864   auto* route_state_attribute = static_cast<XdsRouteStateAttributeImpl*>(
865       service_config_call_data->GetCallAttribute<XdsRouteStateAttribute>());
866   auto* cluster_name_attribute =
867       service_config_call_data->GetCallAttribute<XdsClusterAttribute>();
868   if (route_state_attribute != nullptr && cluster_name_attribute != nullptr) {
869     auto cluster = route_state_attribute->LockAndGetCluster(
870         cluster_name_attribute->cluster());
871     if (cluster != nullptr) {
872       service_config_call_data->SetOnCommit(
873           [cluster = std::move(cluster)]() mutable { cluster.reset(); });
874     }
875   }
876 }
877 
878 //
879 // XdsResolver
880 //
881 
StartLocked()882 void XdsResolver::StartLocked() {
883   auto xds_client =
884       GrpcXdsClient::GetOrCreate(uri_.ToString(), args_, "xds resolver");
885   if (!xds_client.ok()) {
886     gpr_log(GPR_ERROR,
887             "Failed to create xds client -- channel will remain in "
888             "TRANSIENT_FAILURE: %s",
889             xds_client.status().ToString().c_str());
890     absl::Status status = absl::UnavailableError(absl::StrCat(
891         "Failed to create XdsClient: ", xds_client.status().message()));
892     Result result;
893     result.addresses = status;
894     result.service_config = std::move(status);
895     result.args = args_;
896     result_handler_->ReportResult(std::move(result));
897     return;
898   }
899   xds_client_ = std::move(*xds_client);
900   grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(),
901                                    interested_parties_);
902   // Determine LDS resource name.
903   std::string resource_name_fragment(absl::StripPrefix(uri_.path(), "/"));
904   if (!uri_.authority().empty()) {
905     // target_uri.authority is set case
906     const auto* authority_config =
907         static_cast<const GrpcXdsBootstrap::GrpcAuthority*>(
908             xds_client_->bootstrap().LookupAuthority(uri_.authority()));
909     if (authority_config == nullptr) {
910       absl::Status status = absl::UnavailableError(
911           absl::StrCat("Invalid target URI -- authority not found for ",
912                        uri_.authority().c_str()));
913       Result result;
914       result.addresses = status;
915       result.service_config = std::move(status);
916       result.args = args_;
917       result_handler_->ReportResult(std::move(result));
918       return;
919     }
920     std::string name_template =
921         authority_config->client_listener_resource_name_template();
922     if (name_template.empty()) {
923       name_template = absl::StrCat(
924           "xdstp://", URI::PercentEncodeAuthority(uri_.authority()),
925           "/envoy.config.listener.v3.Listener/%s");
926     }
927     lds_resource_name_ = absl::StrReplaceAll(
928         name_template,
929         {{"%s", URI::PercentEncodePath(resource_name_fragment)}});
930   } else {
931     // target_uri.authority not set
932     absl::string_view name_template =
933         static_cast<const GrpcXdsBootstrap&>(xds_client_->bootstrap())
934             .client_default_listener_resource_name_template();
935     if (name_template.empty()) {
936       name_template = "%s";
937     }
938     if (absl::StartsWith(name_template, "xdstp:")) {
939       resource_name_fragment = URI::PercentEncodePath(resource_name_fragment);
940     }
941     lds_resource_name_ =
942         absl::StrReplaceAll(name_template, {{"%s", resource_name_fragment}});
943   }
944   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
945     gpr_log(GPR_INFO, "[xds_resolver %p] Started with lds_resource_name %s.",
946             this, lds_resource_name_.c_str());
947   }
948   // Start watch for xDS config.
949   dependency_mgr_ = MakeOrphanable<XdsDependencyManager>(
950       xds_client_, work_serializer_,
951       std::make_unique<XdsWatcher>(RefAsSubclass<XdsResolver>()),
952       data_plane_authority_, lds_resource_name_, args_, interested_parties_);
953 }
954 
ShutdownLocked()955 void XdsResolver::ShutdownLocked() {
956   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
957     gpr_log(GPR_INFO, "[xds_resolver %p] shutting down", this);
958   }
959   if (xds_client_ != nullptr) {
960     dependency_mgr_.reset();
961     grpc_pollset_set_del_pollset_set(xds_client_->interested_parties(),
962                                      interested_parties_);
963     xds_client_.reset(DEBUG_LOCATION, "xds resolver");
964   }
965 }
966 
OnUpdate(RefCountedPtr<const XdsDependencyManager::XdsConfig> config)967 void XdsResolver::OnUpdate(
968     RefCountedPtr<const XdsDependencyManager::XdsConfig> config) {
969   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
970     gpr_log(GPR_INFO, "[xds_resolver %p] received updated xDS config", this);
971   }
972   if (xds_client_ == nullptr) return;
973   current_config_ = std::move(config);
974   GenerateResult();
975 }
976 
OnError(absl::string_view context,absl::Status status)977 void XdsResolver::OnError(absl::string_view context, absl::Status status) {
978   gpr_log(GPR_ERROR, "[xds_resolver %p] received error from XdsClient: %s: %s",
979           this, std::string(context).c_str(), status.ToString().c_str());
980   if (xds_client_ == nullptr) return;
981   status =
982       absl::UnavailableError(absl::StrCat(context, ": ", status.ToString()));
983   Result result;
984   result.addresses = status;
985   result.service_config = std::move(status);
986   result.args =
987       args_.SetObject(xds_client_.Ref(DEBUG_LOCATION, "xds resolver result"));
988   result_handler_->ReportResult(std::move(result));
989 }
990 
OnResourceDoesNotExist(std::string context)991 void XdsResolver::OnResourceDoesNotExist(std::string context) {
992   gpr_log(GPR_ERROR,
993           "[xds_resolver %p] LDS/RDS resource does not exist -- clearing "
994           "update and returning empty service config",
995           this);
996   if (xds_client_ == nullptr) return;
997   current_config_.reset();
998   Result result;
999   result.addresses.emplace();
1000   result.service_config = ServiceConfigImpl::Create(args_, "{}");
1001   GPR_ASSERT(result.service_config.ok());
1002   result.resolution_note = std::move(context);
1003   result.args = args_;
1004   result_handler_->ReportResult(std::move(result));
1005 }
1006 
1007 absl::StatusOr<RefCountedPtr<ServiceConfig>>
CreateServiceConfig()1008 XdsResolver::CreateServiceConfig() {
1009   std::vector<std::string> clusters;
1010   for (const auto& cluster : cluster_ref_map_) {
1011     absl::string_view child_name = cluster.first;
1012     if (absl::ConsumePrefix(&child_name, "cluster_specifier_plugin:")) {
1013       clusters.push_back(absl::StrFormat(
1014           "      \"%s\":{\n"
1015           "        \"childPolicy\": %s\n"
1016           "       }",
1017           cluster.first,
1018           current_config_->route_config->cluster_specifier_plugin_map.at(
1019               std::string(child_name))));
1020     } else {
1021       absl::ConsumePrefix(&child_name, "cluster:");
1022       clusters.push_back(
1023           absl::StrFormat("      \"%s\":{\n"
1024                           "        \"childPolicy\":[ {\n"
1025                           "          \"cds_experimental\":{\n"
1026                           "            \"cluster\": \"%s\"\n"
1027                           "          }\n"
1028                           "        } ]\n"
1029                           "       }",
1030                           cluster.first, child_name));
1031     }
1032   }
1033   std::vector<std::string> config_parts;
1034   config_parts.push_back(
1035       "{\n"
1036       "  \"loadBalancingConfig\":[\n"
1037       "    { \"xds_cluster_manager_experimental\":{\n"
1038       "      \"children\":{\n");
1039   config_parts.push_back(absl::StrJoin(clusters, ",\n"));
1040   config_parts.push_back(
1041       "    }\n"
1042       "    } }\n"
1043       "  ]\n"
1044       "}");
1045   std::string json = absl::StrJoin(config_parts, "");
1046   return ServiceConfigImpl::Create(args_, json.c_str());
1047 }
1048 
GenerateResult()1049 void XdsResolver::GenerateResult() {
1050   if (xds_client_ == nullptr || current_config_ == nullptr) return;
1051   // First create XdsConfigSelector, which may add new entries to the cluster
1052   // state map.
1053   const auto& hcm = absl::get<XdsListenerResource::HttpConnectionManager>(
1054       current_config_->listener->listener);
1055   auto route_config_data =
1056       RouteConfigData::Create(this, hcm.http_max_stream_duration);
1057   if (!route_config_data.ok()) {
1058     OnError("could not create ConfigSelector",
1059             absl::UnavailableError(route_config_data.status().message()));
1060     return;
1061   }
1062   auto config_selector = MakeRefCounted<XdsConfigSelector>(
1063       RefAsSubclass<XdsResolver>(), std::move(*route_config_data));
1064   // Now create the service config.
1065   Result result;
1066   result.addresses.emplace();
1067   result.service_config = CreateServiceConfig();
1068   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
1069     gpr_log(GPR_INFO, "[xds_resolver %p] generated service config: %s", this,
1070             result.service_config.ok()
1071                 ? std::string((*result.service_config)->json_string()).c_str()
1072                 : result.service_config.status().ToString().c_str());
1073   }
1074   result.args =
1075       args_.SetObject(xds_client_.Ref(DEBUG_LOCATION, "xds resolver result"))
1076           .SetObject(config_selector)
1077           .SetObject(current_config_)
1078           .SetObject(dependency_mgr_->Ref());
1079   result_handler_->ReportResult(std::move(result));
1080 }
1081 
MaybeRemoveUnusedClusters()1082 void XdsResolver::MaybeRemoveUnusedClusters() {
1083   bool update_needed = false;
1084   for (auto it = cluster_ref_map_.begin(); it != cluster_ref_map_.end();) {
1085     RefCountedPtr<ClusterRef> cluster_state = it->second->RefIfNonZero();
1086     if (cluster_state != nullptr) {
1087       ++it;
1088     } else {
1089       update_needed = true;
1090       it = cluster_ref_map_.erase(it);
1091     }
1092   }
1093   if (update_needed) GenerateResult();
1094 }
1095 
1096 //
1097 // XdsResolverFactory
1098 //
1099 
1100 class XdsResolverFactory final : public ResolverFactory {
1101  public:
scheme() const1102   absl::string_view scheme() const override { return "xds"; }
1103 
IsValidUri(const URI & uri) const1104   bool IsValidUri(const URI& uri) const override {
1105     if (uri.path().empty() || uri.path().back() == '/') {
1106       gpr_log(GPR_ERROR,
1107               "URI path does not contain valid data plane authority");
1108       return false;
1109     }
1110     return true;
1111   }
1112 
CreateResolver(ResolverArgs args) const1113   OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
1114     if (!IsValidUri(args.uri)) return nullptr;
1115     std::string authority = GetDataPlaneAuthority(args.args, args.uri);
1116     return MakeOrphanable<XdsResolver>(std::move(args), std::move(authority));
1117   }
1118 
1119  private:
GetDataPlaneAuthority(const ChannelArgs & args,const URI & uri) const1120   std::string GetDataPlaneAuthority(const ChannelArgs& args,
1121                                     const URI& uri) const {
1122     absl::optional<absl::string_view> authority =
1123         args.GetString(GRPC_ARG_DEFAULT_AUTHORITY);
1124     if (authority.has_value()) return URI::PercentEncodeAuthority(*authority);
1125     return GetDefaultAuthority(uri);
1126   }
1127 };
1128 
1129 }  // namespace
1130 
RegisterXdsResolver(CoreConfiguration::Builder * builder)1131 void RegisterXdsResolver(CoreConfiguration::Builder* builder) {
1132   builder->resolver_registry()->RegisterResolverFactory(
1133       std::make_unique<XdsResolverFactory>());
1134 }
1135 
1136 }  // namespace grpc_core
1137