xref: /aosp_15_r20/external/grpc-grpc/src/core/load_balancing/rls/rls.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 // Copyright 2020 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 // Implementation of the Route Lookup Service (RLS) LB policy
18 //
19 // The policy queries a route lookup service for the name of the actual service
20 // to use. A child policy that recognizes the name as a field of its
21 // configuration will take further load balancing action on the request.
22 
23 #include <grpc/support/port_platform.h>
24 
25 #include "src/core/load_balancing/rls/rls.h"
26 
27 #include <inttypes.h>
28 #include <stdlib.h>
29 #include <string.h>
30 
31 #include <algorithm>
32 #include <deque>
33 #include <list>
34 #include <map>
35 #include <memory>
36 #include <random>
37 #include <set>
38 #include <string>
39 #include <type_traits>
40 #include <unordered_map>
41 #include <utility>
42 #include <vector>
43 
44 #include "absl/base/thread_annotations.h"
45 #include "absl/hash/hash.h"
46 #include "absl/random/random.h"
47 #include "absl/status/status.h"
48 #include "absl/status/statusor.h"
49 #include "absl/strings/str_cat.h"
50 #include "absl/strings/str_format.h"
51 #include "absl/strings/str_join.h"
52 #include "absl/strings/string_view.h"
53 #include "absl/types/optional.h"
54 #include "upb/base/string_view.h"
55 #include "upb/mem/arena.hpp"
56 
57 #include <grpc/byte_buffer.h>
58 #include <grpc/byte_buffer_reader.h>
59 #include <grpc/event_engine/event_engine.h>
60 #include <grpc/grpc.h>
61 #include <grpc/impl/channel_arg_names.h>
62 #include <grpc/impl/connectivity_state.h>
63 #include <grpc/impl/propagation_bits.h>
64 #include <grpc/slice.h>
65 #include <grpc/status.h>
66 #include <grpc/support/json.h>
67 #include <grpc/support/log.h>
68 
69 #include "src/core/client_channel/client_channel_filter.h"
70 #include "src/core/lib/backoff/backoff.h"
71 #include "src/core/lib/channel/channel_args.h"
72 #include "src/core/lib/channel/channelz.h"
73 #include "src/core/lib/channel/metrics.h"
74 #include "src/core/lib/config/core_configuration.h"
75 #include "src/core/lib/debug/trace.h"
76 #include "src/core/lib/gprpp/debug_location.h"
77 #include "src/core/lib/gprpp/dual_ref_counted.h"
78 #include "src/core/lib/gprpp/match.h"
79 #include "src/core/lib/gprpp/orphanable.h"
80 #include "src/core/lib/gprpp/ref_counted_ptr.h"
81 #include "src/core/lib/gprpp/status_helper.h"
82 #include "src/core/lib/gprpp/sync.h"
83 #include "src/core/lib/gprpp/time.h"
84 #include "src/core/lib/gprpp/uuid_v4.h"
85 #include "src/core/lib/gprpp/validation_errors.h"
86 #include "src/core/lib/gprpp/work_serializer.h"
87 #include "src/core/lib/iomgr/closure.h"
88 #include "src/core/lib/iomgr/error.h"
89 #include "src/core/lib/iomgr/exec_ctx.h"
90 #include "src/core/lib/iomgr/pollset_set.h"
91 #include "src/core/lib/json/json.h"
92 #include "src/core/lib/json/json_args.h"
93 #include "src/core/lib/json/json_object_loader.h"
94 #include "src/core/lib/json/json_writer.h"
95 #include "src/core/lib/security/credentials/fake/fake_credentials.h"
96 #include "src/core/lib/slice/slice.h"
97 #include "src/core/lib/slice/slice_internal.h"
98 #include "src/core/lib/surface/call.h"
99 #include "src/core/lib/surface/channel.h"
100 #include "src/core/lib/transport/connectivity_state.h"
101 #include "src/core/lib/transport/error_utils.h"
102 #include "src/core/load_balancing/child_policy_handler.h"
103 #include "src/core/load_balancing/delegating_helper.h"
104 #include "src/core/load_balancing/lb_policy.h"
105 #include "src/core/load_balancing/lb_policy_factory.h"
106 #include "src/core/load_balancing/lb_policy_registry.h"
107 #include "src/core/resolver/endpoint_addresses.h"
108 #include "src/core/resolver/resolver_registry.h"
109 #include "src/core/service_config/service_config_impl.h"
110 #include "src/proto/grpc/lookup/v1/rls.upb.h"
111 
112 using ::grpc_event_engine::experimental::EventEngine;
113 
114 namespace grpc_core {
115 
116 TraceFlag grpc_lb_rls_trace(false, "rls_lb");
117 
118 namespace {
119 
120 constexpr absl::string_view kMetricLabelRlsServerTarget =
121     "grpc.lb.rls.server_target";
122 constexpr absl::string_view kMetricLabelRlsInstanceUuid =
123     "grpc.lb.rls.instance_uuid";
124 constexpr absl::string_view kMetricRlsDataPlaneTarget =
125     "grpc.lb.rls.data_plane_target";
126 constexpr absl::string_view kMetricLabelPickResult = "grpc.lb.pick_result";
127 
128 const auto kMetricCacheSize =
129     GlobalInstrumentsRegistry::RegisterCallbackInt64Gauge(
130         "grpc.lb.rls.cache_size", "EXPERIMENTAL.  Size of the RLS cache.", "By",
131         {kMetricLabelTarget, kMetricLabelRlsServerTarget,
132          kMetricLabelRlsInstanceUuid},
133         {}, false);
134 
135 const auto kMetricCacheEntries =
136     GlobalInstrumentsRegistry::RegisterCallbackInt64Gauge(
137         "grpc.lb.rls.cache_entries",
138         "EXPERIMENTAL.  Number of entries in the RLS cache.", "{entry}",
139         {kMetricLabelTarget, kMetricLabelRlsServerTarget,
140          kMetricLabelRlsInstanceUuid},
141         {}, false);
142 
143 const auto kMetricDefaultTargetPicks =
144     GlobalInstrumentsRegistry::RegisterUInt64Counter(
145         "grpc.lb.rls.default_target_picks",
146         "EXPERIMENTAL.  Number of LB picks sent to the default target.",
147         "{pick}",
148         {kMetricLabelTarget, kMetricLabelRlsServerTarget,
149          kMetricRlsDataPlaneTarget, kMetricLabelPickResult},
150         {}, false);
151 
152 const auto kMetricTargetPicks =
153     GlobalInstrumentsRegistry::RegisterUInt64Counter(
154         "grpc.lb.rls.target_picks",
155         "EXPERIMENTAL.  Number of LB picks sent to each RLS target.  Note that "
156         "if the default target is also returned by the RLS server, RPCs sent "
157         "to that target from the cache will be counted in this metric, not "
158         "in grpc.rls.default_target_picks.",
159         "{pick}",
160         {kMetricLabelTarget, kMetricLabelRlsServerTarget,
161          kMetricRlsDataPlaneTarget, kMetricLabelPickResult},
162         {}, false);
163 
164 const auto kMetricFailedPicks =
165     GlobalInstrumentsRegistry::RegisterUInt64Counter(
166         "grpc.lb.rls.failed_picks",
167         "EXPERIMENTAL.  Number of LB picks failed due to either a failed RLS "
168         "request or the RLS channel being throttled.",
169         "{pick}", {kMetricLabelTarget, kMetricLabelRlsServerTarget}, {}, false);
170 
171 constexpr absl::string_view kRls = "rls_experimental";
172 const char kGrpc[] = "grpc";
173 const char* kRlsRequestPath = "/grpc.lookup.v1.RouteLookupService/RouteLookup";
174 const char* kFakeTargetFieldValue = "fake_target_field_value";
175 const char* kRlsHeaderKey = "x-google-rls-data";
176 
177 const Duration kDefaultLookupServiceTimeout = Duration::Seconds(10);
178 const Duration kMaxMaxAge = Duration::Minutes(5);
179 const Duration kMinExpirationTime = Duration::Seconds(5);
180 const Duration kCacheBackoffInitial = Duration::Seconds(1);
181 const double kCacheBackoffMultiplier = 1.6;
182 const double kCacheBackoffJitter = 0.2;
183 const Duration kCacheBackoffMax = Duration::Minutes(2);
184 const Duration kDefaultThrottleWindowSize = Duration::Seconds(30);
185 const double kDefaultThrottleRatioForSuccesses = 2.0;
186 const int kDefaultThrottlePadding = 8;
187 const Duration kCacheCleanupTimerInterval = Duration::Minutes(1);
188 const int64_t kMaxCacheSizeBytes = 5 * 1024 * 1024;
189 
190 // Parsed RLS LB policy configuration.
191 class RlsLbConfig final : public LoadBalancingPolicy::Config {
192  public:
193   struct KeyBuilder {
194     std::map<std::string /*key*/, std::vector<std::string /*header*/>>
195         header_keys;
196     std::string host_key;
197     std::string service_key;
198     std::string method_key;
199     std::map<std::string /*key*/, std::string /*value*/> constant_keys;
200   };
201   using KeyBuilderMap = std::unordered_map<std::string /*path*/, KeyBuilder>;
202 
203   struct RouteLookupConfig {
204     KeyBuilderMap key_builder_map;
205     std::string lookup_service;
206     Duration lookup_service_timeout = kDefaultLookupServiceTimeout;
207     Duration max_age = kMaxMaxAge;
208     Duration stale_age = kMaxMaxAge;
209     int64_t cache_size_bytes = 0;
210     std::string default_target;
211 
212     static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
213     void JsonPostLoad(const Json& json, const JsonArgs& args,
214                       ValidationErrors* errors);
215   };
216 
217   RlsLbConfig() = default;
218 
219   RlsLbConfig(const RlsLbConfig&) = delete;
220   RlsLbConfig& operator=(const RlsLbConfig&) = delete;
221 
222   RlsLbConfig(RlsLbConfig&& other) = delete;
223   RlsLbConfig& operator=(RlsLbConfig&& other) = delete;
224 
name() const225   absl::string_view name() const override { return kRls; }
226 
key_builder_map() const227   const KeyBuilderMap& key_builder_map() const {
228     return route_lookup_config_.key_builder_map;
229   }
lookup_service() const230   const std::string& lookup_service() const {
231     return route_lookup_config_.lookup_service;
232   }
lookup_service_timeout() const233   Duration lookup_service_timeout() const {
234     return route_lookup_config_.lookup_service_timeout;
235   }
max_age() const236   Duration max_age() const { return route_lookup_config_.max_age; }
stale_age() const237   Duration stale_age() const { return route_lookup_config_.stale_age; }
cache_size_bytes() const238   int64_t cache_size_bytes() const {
239     return route_lookup_config_.cache_size_bytes;
240   }
default_target() const241   const std::string& default_target() const {
242     return route_lookup_config_.default_target;
243   }
rls_channel_service_config() const244   const std::string& rls_channel_service_config() const {
245     return rls_channel_service_config_;
246   }
child_policy_config() const247   const Json& child_policy_config() const { return child_policy_config_; }
child_policy_config_target_field_name() const248   const std::string& child_policy_config_target_field_name() const {
249     return child_policy_config_target_field_name_;
250   }
251   RefCountedPtr<LoadBalancingPolicy::Config>
default_child_policy_parsed_config() const252   default_child_policy_parsed_config() const {
253     return default_child_policy_parsed_config_;
254   }
255 
256   static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
257   void JsonPostLoad(const Json& json, const JsonArgs&,
258                     ValidationErrors* errors);
259 
260  private:
261   RouteLookupConfig route_lookup_config_;
262   std::string rls_channel_service_config_;
263   Json child_policy_config_;
264   std::string child_policy_config_target_field_name_;
265   RefCountedPtr<LoadBalancingPolicy::Config>
266       default_child_policy_parsed_config_;
267 };
268 
269 // RLS LB policy.
270 class RlsLb final : public LoadBalancingPolicy {
271  public:
272   explicit RlsLb(Args args);
273 
name() const274   absl::string_view name() const override { return kRls; }
275   absl::Status UpdateLocked(UpdateArgs args) override;
276   void ExitIdleLocked() override;
277   void ResetBackoffLocked() override;
278 
279  private:
280   // Key to access entries in the cache and the request map.
281   struct RequestKey {
282     std::map<std::string, std::string> key_map;
283 
operator ==grpc_core::__anon52bb9b7c0111::RlsLb::RequestKey284     bool operator==(const RequestKey& rhs) const {
285       return key_map == rhs.key_map;
286     }
287 
288     template <typename H>
AbslHashValue(H h,const RequestKey & key)289     friend H AbslHashValue(H h, const RequestKey& key) {
290       std::hash<std::string> string_hasher;
291       for (auto& kv : key.key_map) {
292         h = H::combine(std::move(h), string_hasher(kv.first),
293                        string_hasher(kv.second));
294       }
295       return h;
296     }
297 
Sizegrpc_core::__anon52bb9b7c0111::RlsLb::RequestKey298     size_t Size() const {
299       size_t size = sizeof(RequestKey);
300       for (auto& kv : key_map) {
301         size += kv.first.length() + kv.second.length();
302       }
303       return size;
304     }
305 
ToStringgrpc_core::__anon52bb9b7c0111::RlsLb::RequestKey306     std::string ToString() const {
307       return absl::StrCat(
308           "{", absl::StrJoin(key_map, ",", absl::PairFormatter("=")), "}");
309     }
310   };
311 
312   // Data from an RLS response.
313   struct ResponseInfo {
314     absl::Status status;
315     std::vector<std::string> targets;
316     std::string header_data;
317 
ToStringgrpc_core::__anon52bb9b7c0111::RlsLb::ResponseInfo318     std::string ToString() const {
319       return absl::StrFormat("{status=%s, targets=[%s], header_data=\"%s\"}",
320                              status.ToString(), absl::StrJoin(targets, ","),
321                              header_data);
322     }
323   };
324 
325   // Wraps a child policy for a given RLS target.
326   class ChildPolicyWrapper final : public DualRefCounted<ChildPolicyWrapper> {
327    public:
328     ChildPolicyWrapper(RefCountedPtr<RlsLb> lb_policy, std::string target);
329 
target() const330     const std::string& target() const { return target_; }
331 
Pick(PickArgs args)332     PickResult Pick(PickArgs args) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
333       return picker_->Pick(args);
334     }
335 
336     // Updates for the child policy are handled in two phases:
337     // 1. In StartUpdate(), we parse and validate the new child policy
338     //    config and store the parsed config.
339     // 2. In MaybeFinishUpdate(), we actually pass the parsed config to the
340     //    child policy's UpdateLocked() method.
341     //
342     // The reason we do this is to avoid deadlocks.  In StartUpdate(),
343     // if the new config fails to validate, then we need to set
344     // picker_ to an instance that will fail all requests, which
345     // requires holding the lock.  However, we cannot call the child
346     // policy's UpdateLocked() method from MaybeFinishUpdate() while
347     // holding the lock, since that would cause a deadlock: the child's
348     // UpdateLocked() will call the helper's UpdateState() method, which
349     // will try to acquire the lock to set picker_.  So StartUpdate() is
350     // called while we are still holding the lock, but MaybeFinishUpdate()
351     // is called after releasing it.
352     //
353     // Both methods grab the data they need from the parent object.
354     void StartUpdate() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
355     absl::Status MaybeFinishUpdate() ABSL_LOCKS_EXCLUDED(&RlsLb::mu_);
356 
ExitIdleLocked()357     void ExitIdleLocked() {
358       if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
359     }
360 
ResetBackoffLocked()361     void ResetBackoffLocked() {
362       if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
363     }
364 
365     // Gets the connectivity state of the child policy. Once the child policy
366     // reports TRANSIENT_FAILURE, the function will always return
367     // TRANSIENT_FAILURE state instead of the actual state of the child policy
368     // until the child policy reports another READY state.
connectivity_state() const369     grpc_connectivity_state connectivity_state() const
370         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
371       return connectivity_state_;
372     }
373 
374    private:
375     // ChannelControlHelper object that allows the child policy to update state
376     // with the wrapper.
377     class ChildPolicyHelper final : public DelegatingChannelControlHelper {
378      public:
ChildPolicyHelper(WeakRefCountedPtr<ChildPolicyWrapper> wrapper)379       explicit ChildPolicyHelper(WeakRefCountedPtr<ChildPolicyWrapper> wrapper)
380           : wrapper_(std::move(wrapper)) {}
~ChildPolicyHelper()381       ~ChildPolicyHelper() override {
382         wrapper_.reset(DEBUG_LOCATION, "ChildPolicyHelper");
383       }
384 
385       void UpdateState(grpc_connectivity_state state,
386                        const absl::Status& status,
387                        RefCountedPtr<SubchannelPicker> picker) override;
388 
389      private:
parent_helper() const390       ChannelControlHelper* parent_helper() const override {
391         return wrapper_->lb_policy_->channel_control_helper();
392       }
393 
394       WeakRefCountedPtr<ChildPolicyWrapper> wrapper_;
395     };
396 
397     // Note: We are forced to disable lock analysis here because
398     // Orphan() is called by Unref() which is called by RefCountedPtr<>, which
399     // cannot have lock annotations for this particular caller.
400     void Orphaned() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
401 
402     RefCountedPtr<RlsLb> lb_policy_;
403     std::string target_;
404 
405     bool is_shutdown_ = false;
406 
407     OrphanablePtr<ChildPolicyHandler> child_policy_;
408     RefCountedPtr<LoadBalancingPolicy::Config> pending_config_;
409 
410     grpc_connectivity_state connectivity_state_ ABSL_GUARDED_BY(&RlsLb::mu_) =
411         GRPC_CHANNEL_CONNECTING;
412     RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker_
413         ABSL_GUARDED_BY(&RlsLb::mu_);
414   };
415 
416   // A picker that uses the cache and the request map in the LB policy
417   // (synchronized via a mutex) to determine how to route requests.
418   class Picker final : public LoadBalancingPolicy::SubchannelPicker {
419    public:
420     explicit Picker(RefCountedPtr<RlsLb> lb_policy);
421 
422     PickResult Pick(PickArgs args) override;
423 
424    private:
425     PickResult PickFromDefaultTargetOrFail(const char* reason, PickArgs args,
426                                            absl::Status status)
427         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
428 
429     RefCountedPtr<RlsLb> lb_policy_;
430     RefCountedPtr<RlsLbConfig> config_;
431     RefCountedPtr<ChildPolicyWrapper> default_child_policy_;
432   };
433 
434   // An LRU cache with adjustable size.
435   class Cache final {
436    public:
437     using Iterator = std::list<RequestKey>::iterator;
438 
439     class Entry final : public InternallyRefCounted<Entry> {
440      public:
441       Entry(RefCountedPtr<RlsLb> lb_policy, const RequestKey& key);
442 
443       // Notify the entry when it's evicted from the cache. Performs shut down.
444       // Note: We are forced to disable lock analysis here because
445       // Orphan() is called by OrphanablePtr<>, which cannot have lock
446       // annotations for this particular caller.
447       void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
448 
status() const449       const absl::Status& status() const
450           ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
451         return status_;
452       }
backoff_time() const453       Timestamp backoff_time() const
454           ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
455         return backoff_time_;
456       }
backoff_expiration_time() const457       Timestamp backoff_expiration_time() const
458           ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
459         return backoff_expiration_time_;
460       }
data_expiration_time() const461       Timestamp data_expiration_time() const
462           ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
463         return data_expiration_time_;
464       }
header_data() const465       const std::string& header_data() const
466           ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
467         return header_data_;
468       }
stale_time() const469       Timestamp stale_time() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
470         return stale_time_;
471       }
min_expiration_time() const472       Timestamp min_expiration_time() const
473           ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
474         return min_expiration_time_;
475       }
476 
TakeBackoffState()477       std::unique_ptr<BackOff> TakeBackoffState()
478           ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
479         return std::move(backoff_state_);
480       }
481 
482       // Cache size of entry.
483       size_t Size() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
484 
485       // Pick subchannel for request based on the entry's state.
486       PickResult Pick(PickArgs args) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
487 
488       // If the cache entry is in backoff state, resets the backoff and, if
489       // applicable, its backoff timer. The method does not update the LB
490       // policy's picker; the caller is responsible for that if necessary.
491       void ResetBackoff() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
492 
493       // Check if the entry should be removed by the clean-up timer.
494       bool ShouldRemove() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
495 
496       // Check if the entry can be evicted from the cache, i.e. the
497       // min_expiration_time_ has passed.
498       bool CanEvict() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
499 
500       // Updates the entry upon reception of a new RLS response.
501       // Returns a list of child policy wrappers on which FinishUpdate()
502       // needs to be called after releasing the lock.
503       std::vector<ChildPolicyWrapper*> OnRlsResponseLocked(
504           ResponseInfo response, std::unique_ptr<BackOff> backoff_state)
505           ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
506 
507       // Moves entry to the end of the LRU list.
508       void MarkUsed() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
509 
510      private:
511       class BackoffTimer final : public InternallyRefCounted<BackoffTimer> {
512        public:
513         BackoffTimer(RefCountedPtr<Entry> entry, Timestamp backoff_time);
514 
515         // Note: We are forced to disable lock analysis here because
516         // Orphan() is called by OrphanablePtr<>, which cannot have lock
517         // annotations for this particular caller.
518         void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
519 
520        private:
521         void OnBackoffTimerLocked();
522 
523         RefCountedPtr<Entry> entry_;
524         absl::optional<EventEngine::TaskHandle> backoff_timer_task_handle_
525             ABSL_GUARDED_BY(&RlsLb::mu_);
526       };
527 
528       RefCountedPtr<RlsLb> lb_policy_;
529 
530       bool is_shutdown_ ABSL_GUARDED_BY(&RlsLb::mu_) = false;
531 
532       // Backoff states
533       absl::Status status_ ABSL_GUARDED_BY(&RlsLb::mu_);
534       std::unique_ptr<BackOff> backoff_state_ ABSL_GUARDED_BY(&RlsLb::mu_);
535       Timestamp backoff_time_ ABSL_GUARDED_BY(&RlsLb::mu_) =
536           Timestamp::InfPast();
537       Timestamp backoff_expiration_time_ ABSL_GUARDED_BY(&RlsLb::mu_) =
538           Timestamp::InfPast();
539       OrphanablePtr<BackoffTimer> backoff_timer_;
540 
541       // RLS response states
542       std::vector<RefCountedPtr<ChildPolicyWrapper>> child_policy_wrappers_
543           ABSL_GUARDED_BY(&RlsLb::mu_);
544       std::string header_data_ ABSL_GUARDED_BY(&RlsLb::mu_);
545       Timestamp data_expiration_time_ ABSL_GUARDED_BY(&RlsLb::mu_) =
546           Timestamp::InfPast();
547       Timestamp stale_time_ ABSL_GUARDED_BY(&RlsLb::mu_) = Timestamp::InfPast();
548 
549       Timestamp min_expiration_time_ ABSL_GUARDED_BY(&RlsLb::mu_);
550       Cache::Iterator lru_iterator_ ABSL_GUARDED_BY(&RlsLb::mu_);
551     };
552 
553     explicit Cache(RlsLb* lb_policy);
554 
555     // Finds an entry from the cache that corresponds to a key. If an entry is
556     // not found, nullptr is returned. Otherwise, the entry is considered
557     // recently used and its order in the LRU list of the cache is updated.
558     Entry* Find(const RequestKey& key)
559         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
560 
561     // Finds an entry from the cache that corresponds to a key. If an entry is
562     // not found, an entry is created, inserted in the cache, and returned to
563     // the caller. Otherwise, the entry found is returned to the caller. The
564     // entry returned to the user is considered recently used and its order in
565     // the LRU list of the cache is updated.
566     Entry* FindOrInsert(const RequestKey& key)
567         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
568 
569     // Resizes the cache. If the new cache size is greater than the current size
570     // of the cache, do nothing. Otherwise, evict the oldest entries that
571     // exceed the new size limit of the cache.
572     void Resize(size_t bytes) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
573 
574     // Resets backoff of all the cache entries.
575     void ResetAllBackoff() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
576 
577     // Shutdown the cache; clean-up and orphan all the stored cache entries.
578     void Shutdown() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
579 
580     void ReportMetricsLocked(CallbackMetricReporter& reporter)
581         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
582 
583    private:
584     // Shared logic for starting the cleanup timer
585     void StartCleanupTimer() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
586 
587     void OnCleanupTimer();
588 
589     // Returns the entry size for a given key.
590     static size_t EntrySizeForKey(const RequestKey& key);
591 
592     // Evicts oversized cache elements when the current size is greater than
593     // the specified limit.
594     void MaybeShrinkSize(size_t bytes)
595         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
596 
597     RlsLb* lb_policy_;
598 
599     size_t size_limit_ ABSL_GUARDED_BY(&RlsLb::mu_) = 0;
600     size_t size_ ABSL_GUARDED_BY(&RlsLb::mu_) = 0;
601 
602     std::list<RequestKey> lru_list_ ABSL_GUARDED_BY(&RlsLb::mu_);
603     std::unordered_map<RequestKey, OrphanablePtr<Entry>, absl::Hash<RequestKey>>
604         map_ ABSL_GUARDED_BY(&RlsLb::mu_);
605     absl::optional<EventEngine::TaskHandle> cleanup_timer_handle_;
606   };
607 
608   // Channel for communicating with the RLS server.
609   // Contains throttling logic for RLS requests.
610   class RlsChannel final : public InternallyRefCounted<RlsChannel> {
611    public:
612     explicit RlsChannel(RefCountedPtr<RlsLb> lb_policy);
613 
614     // Shuts down the channel.
615     void Orphan() override;
616 
617     // Starts an RLS call.
618     // If stale_entry is non-null, it points to the entry containing
619     // stale data for the key.
620     void StartRlsCall(const RequestKey& key, Cache::Entry* stale_entry)
621         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
622 
623     // Reports the result of an RLS call to the throttle.
624     void ReportResponseLocked(bool response_succeeded)
625         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
626 
627     // Checks if a proposed RLS call should be throttled.
ShouldThrottle()628     bool ShouldThrottle() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
629       return throttle_.ShouldThrottle();
630     }
631 
632     // Resets the channel's backoff.
633     void ResetBackoff();
634 
channel() const635     Channel* channel() const { return channel_.get(); }
636 
637    private:
638     // Watches the state of the RLS channel. Notifies the LB policy when
639     // the channel was previously in TRANSIENT_FAILURE and then becomes READY.
640     class StateWatcher final : public AsyncConnectivityStateWatcherInterface {
641      public:
StateWatcher(RefCountedPtr<RlsChannel> rls_channel)642       explicit StateWatcher(RefCountedPtr<RlsChannel> rls_channel)
643           : AsyncConnectivityStateWatcherInterface(
644                 rls_channel->lb_policy_->work_serializer()),
645             rls_channel_(std::move(rls_channel)) {}
646 
647      private:
648       void OnConnectivityStateChange(grpc_connectivity_state new_state,
649                                      const absl::Status& status) override;
650 
651       RefCountedPtr<RlsChannel> rls_channel_;
652       bool was_transient_failure_ = false;
653     };
654 
655     // Throttle state for RLS requests.
656     class Throttle final {
657      public:
Throttle(Duration window_size=kDefaultThrottleWindowSize,float ratio_for_successes=kDefaultThrottleRatioForSuccesses,int padding=kDefaultThrottlePadding)658       explicit Throttle(
659           Duration window_size = kDefaultThrottleWindowSize,
660           float ratio_for_successes = kDefaultThrottleRatioForSuccesses,
661           int padding = kDefaultThrottlePadding)
662           : window_size_(window_size),
663             ratio_for_successes_(ratio_for_successes),
664             padding_(padding) {}
665 
666       bool ShouldThrottle() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
667 
668       void RegisterResponse(bool success)
669           ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
670 
671      private:
672       Duration window_size_;
673       double ratio_for_successes_;
674       int padding_;
675       std::mt19937 rng_{std::random_device()()};
676 
677       // Logged timestamp of requests.
678       std::deque<Timestamp> requests_ ABSL_GUARDED_BY(&RlsLb::mu_);
679 
680       // Logged timestamps of failures.
681       std::deque<Timestamp> failures_ ABSL_GUARDED_BY(&RlsLb::mu_);
682     };
683 
684     RefCountedPtr<RlsLb> lb_policy_;
685     bool is_shutdown_ = false;
686 
687     OrphanablePtr<Channel> channel_;
688     RefCountedPtr<channelz::ChannelNode> parent_channelz_node_;
689     StateWatcher* watcher_ = nullptr;
690     Throttle throttle_ ABSL_GUARDED_BY(&RlsLb::mu_);
691   };
692 
693   // A pending RLS request.  Instances will be tracked in request_map_.
694   class RlsRequest final : public InternallyRefCounted<RlsRequest> {
695    public:
696     // Asynchronously starts a call on rls_channel for key.
697     // Stores backoff_state, which will be transferred to the data cache
698     // if the RLS request fails.
699     RlsRequest(RefCountedPtr<RlsLb> lb_policy, RlsLb::RequestKey key,
700                RefCountedPtr<RlsChannel> rls_channel,
701                std::unique_ptr<BackOff> backoff_state,
702                grpc_lookup_v1_RouteLookupRequest_Reason reason,
703                std::string stale_header_data);
704     ~RlsRequest() override;
705 
706     // Shuts down the request.  If the request is still in flight, it is
707     // cancelled, in which case no response will be added to the cache.
708     void Orphan() override;
709 
710    private:
711     // Callback to be invoked to start the call.
712     static void StartCall(void* arg, grpc_error_handle error);
713 
714     // Helper for StartCall() that runs within the WorkSerializer.
715     void StartCallLocked();
716 
717     // Callback to be invoked when the call is completed.
718     static void OnRlsCallComplete(void* arg, grpc_error_handle error);
719 
720     // Call completion callback running on LB policy WorkSerializer.
721     void OnRlsCallCompleteLocked(grpc_error_handle error);
722 
723     grpc_byte_buffer* MakeRequestProto();
724     ResponseInfo ParseResponseProto();
725 
726     RefCountedPtr<RlsLb> lb_policy_;
727     RlsLb::RequestKey key_;
728     RefCountedPtr<RlsChannel> rls_channel_;
729     std::unique_ptr<BackOff> backoff_state_;
730     grpc_lookup_v1_RouteLookupRequest_Reason reason_;
731     std::string stale_header_data_;
732 
733     // RLS call state.
734     Timestamp deadline_;
735     grpc_closure call_start_cb_;
736     grpc_closure call_complete_cb_;
737     grpc_call* call_ = nullptr;
738     grpc_byte_buffer* send_message_ = nullptr;
739     grpc_metadata_array recv_initial_metadata_;
740     grpc_byte_buffer* recv_message_ = nullptr;
741     grpc_metadata_array recv_trailing_metadata_;
742     grpc_status_code status_recv_;
743     grpc_slice status_details_recv_;
744   };
745 
746   void ShutdownLocked() override;
747 
748   // Returns a new picker to the channel to trigger reprocessing of
749   // pending picks.  Schedules the actual picker update on the ExecCtx
750   // to be run later, so it's safe to invoke this while holding the lock.
751   void UpdatePickerAsync();
752   // Hops into work serializer and calls UpdatePickerLocked().
753   static void UpdatePickerCallback(void* arg, grpc_error_handle error);
754   // Updates the picker in the work serializer.
755   void UpdatePickerLocked() ABSL_LOCKS_EXCLUDED(&mu_);
756 
757   void MaybeExportPickCount(
758       GlobalInstrumentsRegistry::GlobalUInt64CounterHandle handle,
759       absl::string_view target, const PickResult& pick_result);
760 
761   const std::string instance_uuid_;
762 
763   // Mutex to guard LB policy state that is accessed by the picker.
764   Mutex mu_;
765   bool is_shutdown_ ABSL_GUARDED_BY(mu_) = false;
766   bool update_in_progress_ = false;
767   Cache cache_ ABSL_GUARDED_BY(mu_);
768   // Maps an RLS request key to an RlsRequest object that represents a pending
769   // RLS request.
770   std::unordered_map<RequestKey, OrphanablePtr<RlsRequest>,
771                      absl::Hash<RequestKey>>
772       request_map_ ABSL_GUARDED_BY(mu_);
773   // The channel on which RLS requests are sent.
774   // Note that this channel may be swapped out when the RLS policy gets
775   // an update.  However, when that happens, any existing entries in
776   // request_map_ will continue to use the previous channel.
777   OrphanablePtr<RlsChannel> rls_channel_ ABSL_GUARDED_BY(mu_);
778 
779   // Accessed only from within WorkSerializer.
780   absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses_;
781   ChannelArgs channel_args_;
782   RefCountedPtr<RlsLbConfig> config_;
783   RefCountedPtr<ChildPolicyWrapper> default_child_policy_;
784   std::map<std::string /*target*/, ChildPolicyWrapper*> child_policy_map_;
785 
786   // Must be after mu_, so that it is destroyed before mu_.
787   std::unique_ptr<RegisteredMetricCallback> registered_metric_callback_;
788 };
789 
790 //
791 // RlsLb::ChildPolicyWrapper
792 //
793 
ChildPolicyWrapper(RefCountedPtr<RlsLb> lb_policy,std::string target)794 RlsLb::ChildPolicyWrapper::ChildPolicyWrapper(RefCountedPtr<RlsLb> lb_policy,
795                                               std::string target)
796     : DualRefCounted<ChildPolicyWrapper>(
797           GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace) ? "ChildPolicyWrapper"
798                                                      : nullptr),
799       lb_policy_(std::move(lb_policy)),
800       target_(std::move(target)),
801       picker_(MakeRefCounted<QueuePicker>(nullptr)) {
802   lb_policy_->child_policy_map_.emplace(target_, this);
803 }
804 
Orphaned()805 void RlsLb::ChildPolicyWrapper::Orphaned() {
806   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
807     gpr_log(GPR_INFO, "[rlslb %p] ChildPolicyWrapper=%p [%s]: shutdown",
808             lb_policy_.get(), this, target_.c_str());
809   }
810   is_shutdown_ = true;
811   lb_policy_->child_policy_map_.erase(target_);
812   if (child_policy_ != nullptr) {
813     grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
814                                      lb_policy_->interested_parties());
815     child_policy_.reset();
816   }
817   picker_.reset();
818 }
819 
InsertOrUpdateChildPolicyField(const std::string & field,const std::string & value,const Json & config,ValidationErrors * errors)820 absl::optional<Json> InsertOrUpdateChildPolicyField(const std::string& field,
821                                                     const std::string& value,
822                                                     const Json& config,
823                                                     ValidationErrors* errors) {
824   if (config.type() != Json::Type::kArray) {
825     errors->AddError("is not an array");
826     return absl::nullopt;
827   }
828   const size_t original_num_errors = errors->size();
829   Json::Array array;
830   for (size_t i = 0; i < config.array().size(); ++i) {
831     const Json& child_json = config.array()[i];
832     ValidationErrors::ScopedField json_field(errors, absl::StrCat("[", i, "]"));
833     if (child_json.type() != Json::Type::kObject) {
834       errors->AddError("is not an object");
835     } else {
836       const Json::Object& child = child_json.object();
837       if (child.size() != 1) {
838         errors->AddError("child policy object contains more than one field");
839       } else {
840         const std::string& child_name = child.begin()->first;
841         ValidationErrors::ScopedField json_field(
842             errors, absl::StrCat("[\"", child_name, "\"]"));
843         const Json& child_config_json = child.begin()->second;
844         if (child_config_json.type() != Json::Type::kObject) {
845           errors->AddError("child policy config is not an object");
846         } else {
847           Json::Object child_config = child_config_json.object();
848           child_config[field] = Json::FromString(value);
849           array.emplace_back(Json::FromObject(
850               {{child_name, Json::FromObject(std::move(child_config))}}));
851         }
852       }
853     }
854   }
855   if (errors->size() != original_num_errors) return absl::nullopt;
856   return Json::FromArray(std::move(array));
857 }
858 
StartUpdate()859 void RlsLb::ChildPolicyWrapper::StartUpdate() {
860   ValidationErrors errors;
861   auto child_policy_config = InsertOrUpdateChildPolicyField(
862       lb_policy_->config_->child_policy_config_target_field_name(), target_,
863       lb_policy_->config_->child_policy_config(), &errors);
864   GPR_ASSERT(child_policy_config.has_value());
865   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
866     gpr_log(
867         GPR_INFO,
868         "[rlslb %p] ChildPolicyWrapper=%p [%s]: validating update, config: %s",
869         lb_policy_.get(), this, target_.c_str(),
870         JsonDump(*child_policy_config).c_str());
871   }
872   auto config =
873       CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
874           *child_policy_config);
875   // Returned RLS target fails the validation.
876   if (!config.ok()) {
877     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
878       gpr_log(GPR_INFO,
879               "[rlslb %p] ChildPolicyWrapper=%p [%s]: config failed to parse: "
880               "%s",
881               lb_policy_.get(), this, target_.c_str(),
882               config.status().ToString().c_str());
883     }
884     pending_config_.reset();
885     picker_ = MakeRefCounted<TransientFailurePicker>(
886         absl::UnavailableError(config.status().message()));
887     child_policy_.reset();
888   } else {
889     pending_config_ = std::move(*config);
890   }
891 }
892 
MaybeFinishUpdate()893 absl::Status RlsLb::ChildPolicyWrapper::MaybeFinishUpdate() {
894   // If pending_config_ is not set, that means StartUpdate() failed, so
895   // there's nothing to do here.
896   if (pending_config_ == nullptr) return absl::OkStatus();
897   // If child policy doesn't yet exist, create it.
898   if (child_policy_ == nullptr) {
899     Args create_args;
900     create_args.work_serializer = lb_policy_->work_serializer();
901     create_args.channel_control_helper = std::make_unique<ChildPolicyHelper>(
902         WeakRef(DEBUG_LOCATION, "ChildPolicyHelper"));
903     create_args.args = lb_policy_->channel_args_;
904     child_policy_ = MakeOrphanable<ChildPolicyHandler>(std::move(create_args),
905                                                        &grpc_lb_rls_trace);
906     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
907       gpr_log(GPR_INFO,
908               "[rlslb %p] ChildPolicyWrapper=%p [%s], created new child policy "
909               "handler %p",
910               lb_policy_.get(), this, target_.c_str(), child_policy_.get());
911     }
912     grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(),
913                                      lb_policy_->interested_parties());
914   }
915   // Send the child the updated config.
916   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
917     gpr_log(GPR_INFO,
918             "[rlslb %p] ChildPolicyWrapper=%p [%s], updating child policy "
919             "handler %p",
920             lb_policy_.get(), this, target_.c_str(), child_policy_.get());
921   }
922   UpdateArgs update_args;
923   update_args.config = std::move(pending_config_);
924   update_args.addresses = lb_policy_->addresses_;
925   update_args.args = lb_policy_->channel_args_;
926   return child_policy_->UpdateLocked(std::move(update_args));
927 }
928 
929 //
930 // RlsLb::ChildPolicyWrapper::ChildPolicyHelper
931 //
932 
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)933 void RlsLb::ChildPolicyWrapper::ChildPolicyHelper::UpdateState(
934     grpc_connectivity_state state, const absl::Status& status,
935     RefCountedPtr<SubchannelPicker> picker) {
936   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
937     gpr_log(GPR_INFO,
938             "[rlslb %p] ChildPolicyWrapper=%p [%s] ChildPolicyHelper=%p: "
939             "UpdateState(state=%s, status=%s, picker=%p)",
940             wrapper_->lb_policy_.get(), wrapper_.get(),
941             wrapper_->target_.c_str(), this, ConnectivityStateName(state),
942             status.ToString().c_str(), picker.get());
943   }
944   {
945     MutexLock lock(&wrapper_->lb_policy_->mu_);
946     if (wrapper_->is_shutdown_) return;
947     // TODO(roth): It looks like this ignores subsequent TF updates that
948     // might change the status used to fail picks, which seems wrong.
949     if (wrapper_->connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE &&
950         state != GRPC_CHANNEL_READY) {
951       return;
952     }
953     wrapper_->connectivity_state_ = state;
954     GPR_DEBUG_ASSERT(picker != nullptr);
955     if (picker != nullptr) {
956       wrapper_->picker_ = std::move(picker);
957     }
958   }
959   wrapper_->lb_policy_->UpdatePickerLocked();
960 }
961 
962 //
963 // RlsLb::Picker
964 //
965 
966 // Builds the key to be used for a request based on path and initial_metadata.
BuildKeyMap(const RlsLbConfig::KeyBuilderMap & key_builder_map,absl::string_view path,absl::string_view host,const LoadBalancingPolicy::MetadataInterface * initial_metadata)967 std::map<std::string, std::string> BuildKeyMap(
968     const RlsLbConfig::KeyBuilderMap& key_builder_map, absl::string_view path,
969     absl::string_view host,
970     const LoadBalancingPolicy::MetadataInterface* initial_metadata) {
971   size_t last_slash_pos = path.npos;  // May need this a few times, so cache it.
972   // Find key builder for this path.
973   auto it = key_builder_map.find(std::string(path));
974   if (it == key_builder_map.end()) {
975     // Didn't find exact match, try method wildcard.
976     last_slash_pos = path.rfind('/');
977     GPR_DEBUG_ASSERT(last_slash_pos != path.npos);
978     if (GPR_UNLIKELY(last_slash_pos == path.npos)) return {};
979     std::string service(path.substr(0, last_slash_pos + 1));
980     it = key_builder_map.find(service);
981     if (it == key_builder_map.end()) return {};
982   }
983   const RlsLbConfig::KeyBuilder* key_builder = &it->second;
984   // Construct key map using key builder.
985   std::map<std::string, std::string> key_map;
986   // Add header keys.
987   for (const auto& p : key_builder->header_keys) {
988     const std::string& key = p.first;
989     const std::vector<std::string>& header_names = p.second;
990     for (const std::string& header_name : header_names) {
991       std::string buffer;
992       absl::optional<absl::string_view> value =
993           initial_metadata->Lookup(header_name, &buffer);
994       if (value.has_value()) {
995         key_map[key] = std::string(*value);
996         break;
997       }
998     }
999   }
1000   // Add constant keys.
1001   key_map.insert(key_builder->constant_keys.begin(),
1002                  key_builder->constant_keys.end());
1003   // Add host key.
1004   if (!key_builder->host_key.empty()) {
1005     key_map[key_builder->host_key] = std::string(host);
1006   }
1007   // Add service key.
1008   if (!key_builder->service_key.empty()) {
1009     if (last_slash_pos == path.npos) {
1010       last_slash_pos = path.rfind('/');
1011       GPR_DEBUG_ASSERT(last_slash_pos != path.npos);
1012       if (GPR_UNLIKELY(last_slash_pos == path.npos)) return {};
1013     }
1014     key_map[key_builder->service_key] =
1015         std::string(path.substr(1, last_slash_pos - 1));
1016   }
1017   // Add method key.
1018   if (!key_builder->method_key.empty()) {
1019     if (last_slash_pos == path.npos) {
1020       last_slash_pos = path.rfind('/');
1021       GPR_DEBUG_ASSERT(last_slash_pos != path.npos);
1022       if (GPR_UNLIKELY(last_slash_pos == path.npos)) return {};
1023     }
1024     key_map[key_builder->method_key] =
1025         std::string(path.substr(last_slash_pos + 1));
1026   }
1027   return key_map;
1028 }
1029 
Picker(RefCountedPtr<RlsLb> lb_policy)1030 RlsLb::Picker::Picker(RefCountedPtr<RlsLb> lb_policy)
1031     : lb_policy_(std::move(lb_policy)), config_(lb_policy_->config_) {
1032   if (lb_policy_->default_child_policy_ != nullptr) {
1033     default_child_policy_ =
1034         lb_policy_->default_child_policy_->Ref(DEBUG_LOCATION, "Picker");
1035   }
1036 }
1037 
Pick(PickArgs args)1038 LoadBalancingPolicy::PickResult RlsLb::Picker::Pick(PickArgs args) {
1039   // Construct key for request.
1040   RequestKey key = {
1041       BuildKeyMap(config_->key_builder_map(), args.path,
1042                   lb_policy_->channel_control_helper()->GetAuthority(),
1043                   args.initial_metadata)};
1044   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1045     gpr_log(GPR_INFO, "[rlslb %p] picker=%p: request keys: %s",
1046             lb_policy_.get(), this, key.ToString().c_str());
1047   }
1048   Timestamp now = Timestamp::Now();
1049   MutexLock lock(&lb_policy_->mu_);
1050   if (lb_policy_->is_shutdown_) {
1051     return PickResult::Fail(
1052         absl::UnavailableError("LB policy already shut down"));
1053   }
1054   // Check if there's a cache entry.
1055   Cache::Entry* entry = lb_policy_->cache_.Find(key);
1056   // If there is no cache entry, or if the cache entry is not in backoff
1057   // and has a stale time in the past, and there is not already a
1058   // pending RLS request for this key, then try to start a new RLS request.
1059   if ((entry == nullptr ||
1060        (entry->stale_time() < now && entry->backoff_time() < now)) &&
1061       lb_policy_->request_map_.find(key) == lb_policy_->request_map_.end()) {
1062     // Check if requests are being throttled.
1063     if (lb_policy_->rls_channel_->ShouldThrottle()) {
1064       // Request is throttled.
1065       // If there is no non-expired data in the cache, then we use the
1066       // default target if set, or else we fail the pick.
1067       if (entry == nullptr || entry->data_expiration_time() < now) {
1068         return PickFromDefaultTargetOrFail(
1069             "RLS call throttled", args,
1070             absl::UnavailableError("RLS request throttled"));
1071       }
1072     }
1073     // Start the RLS call.
1074     lb_policy_->rls_channel_->StartRlsCall(
1075         key, (entry == nullptr || entry->data_expiration_time() < now) ? nullptr
1076                                                                        : entry);
1077   }
1078   // If the cache entry exists, see if it has usable data.
1079   if (entry != nullptr) {
1080     // If the entry has non-expired data, use it.
1081     if (entry->data_expiration_time() >= now) {
1082       if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1083         gpr_log(GPR_INFO, "[rlslb %p] picker=%p: using cache entry %p",
1084                 lb_policy_.get(), this, entry);
1085       }
1086       return entry->Pick(args);
1087     }
1088     // If the entry is in backoff, then use the default target if set,
1089     // or else fail the pick.
1090     if (entry->backoff_time() >= now) {
1091       return PickFromDefaultTargetOrFail(
1092           "RLS call in backoff", args,
1093           absl::UnavailableError(absl::StrCat("RLS request failed: ",
1094                                               entry->status().ToString())));
1095     }
1096   }
1097   // RLS call pending.  Queue the pick.
1098   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1099     gpr_log(GPR_INFO, "[rlslb %p] picker=%p: RLS request pending; queuing pick",
1100             lb_policy_.get(), this);
1101   }
1102   return PickResult::Queue();
1103 }
1104 
PickFromDefaultTargetOrFail(const char * reason,PickArgs args,absl::Status status)1105 LoadBalancingPolicy::PickResult RlsLb::Picker::PickFromDefaultTargetOrFail(
1106     const char* reason, PickArgs args, absl::Status status) {
1107   if (default_child_policy_ != nullptr) {
1108     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1109       gpr_log(GPR_INFO, "[rlslb %p] picker=%p: %s; using default target",
1110               lb_policy_.get(), this, reason);
1111     }
1112     auto pick_result = default_child_policy_->Pick(args);
1113     lb_policy_->MaybeExportPickCount(kMetricDefaultTargetPicks,
1114                                      config_->default_target(), pick_result);
1115     return pick_result;
1116   }
1117   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1118     gpr_log(GPR_INFO, "[rlslb %p] picker=%p: %s; failing pick",
1119             lb_policy_.get(), this, reason);
1120   }
1121   auto& stats_plugins =
1122       lb_policy_->channel_control_helper()->GetStatsPluginGroup();
1123   stats_plugins.AddCounter(kMetricFailedPicks, 1,
1124                            {lb_policy_->channel_control_helper()->GetTarget(),
1125                             config_->lookup_service()},
1126                            {});
1127   return PickResult::Fail(std::move(status));
1128 }
1129 
1130 //
1131 // RlsLb::Cache::Entry::BackoffTimer
1132 //
1133 
BackoffTimer(RefCountedPtr<Entry> entry,Timestamp backoff_time)1134 RlsLb::Cache::Entry::BackoffTimer::BackoffTimer(RefCountedPtr<Entry> entry,
1135                                                 Timestamp backoff_time)
1136     : entry_(std::move(entry)) {
1137   backoff_timer_task_handle_ =
1138       entry_->lb_policy_->channel_control_helper()->GetEventEngine()->RunAfter(
1139           backoff_time - Timestamp::Now(),
1140           [self = Ref(DEBUG_LOCATION, "BackoffTimer")]() mutable {
1141             ApplicationCallbackExecCtx callback_exec_ctx;
1142             ExecCtx exec_ctx;
1143             auto self_ptr = self.get();
1144             self_ptr->entry_->lb_policy_->work_serializer()->Run(
1145                 [self = std::move(self)]() { self->OnBackoffTimerLocked(); },
1146                 DEBUG_LOCATION);
1147           });
1148 }
1149 
Orphan()1150 void RlsLb::Cache::Entry::BackoffTimer::Orphan() {
1151   if (backoff_timer_task_handle_.has_value() &&
1152       entry_->lb_policy_->channel_control_helper()->GetEventEngine()->Cancel(
1153           *backoff_timer_task_handle_)) {
1154     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1155       gpr_log(GPR_INFO, "[rlslb %p] cache entry=%p %s, backoff timer canceled",
1156               entry_->lb_policy_.get(), entry_.get(),
1157               entry_->is_shutdown_ ? "(shut down)"
1158                                    : entry_->lru_iterator_->ToString().c_str());
1159     }
1160   }
1161   backoff_timer_task_handle_.reset();
1162   Unref(DEBUG_LOCATION, "Orphan");
1163 }
1164 
OnBackoffTimerLocked()1165 void RlsLb::Cache::Entry::BackoffTimer::OnBackoffTimerLocked() {
1166   {
1167     MutexLock lock(&entry_->lb_policy_->mu_);
1168     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1169       gpr_log(GPR_INFO, "[rlslb %p] cache entry=%p %s, backoff timer fired",
1170               entry_->lb_policy_.get(), entry_.get(),
1171               entry_->is_shutdown_ ? "(shut down)"
1172                                    : entry_->lru_iterator_->ToString().c_str());
1173     }
1174     // Skip the update if Orphaned
1175     if (!backoff_timer_task_handle_.has_value()) return;
1176     backoff_timer_task_handle_.reset();
1177   }
1178   // The pick was in backoff state and there could be a pick queued if
1179   // wait_for_ready is true. We'll update the picker for that case.
1180   entry_->lb_policy_->UpdatePickerLocked();
1181 }
1182 
1183 //
1184 // RlsLb::Cache::Entry
1185 //
1186 
MakeCacheEntryBackoff()1187 std::unique_ptr<BackOff> MakeCacheEntryBackoff() {
1188   return std::make_unique<BackOff>(
1189       BackOff::Options()
1190           .set_initial_backoff(kCacheBackoffInitial)
1191           .set_multiplier(kCacheBackoffMultiplier)
1192           .set_jitter(kCacheBackoffJitter)
1193           .set_max_backoff(kCacheBackoffMax));
1194 }
1195 
Entry(RefCountedPtr<RlsLb> lb_policy,const RequestKey & key)1196 RlsLb::Cache::Entry::Entry(RefCountedPtr<RlsLb> lb_policy,
1197                            const RequestKey& key)
1198     : InternallyRefCounted<Entry>(
1199           GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace) ? "CacheEntry" : nullptr),
1200       lb_policy_(std::move(lb_policy)),
1201       backoff_state_(MakeCacheEntryBackoff()),
1202       min_expiration_time_(Timestamp::Now() + kMinExpirationTime),
1203       lru_iterator_(lb_policy_->cache_.lru_list_.insert(
1204           lb_policy_->cache_.lru_list_.end(), key)) {}
1205 
Orphan()1206 void RlsLb::Cache::Entry::Orphan() {
1207   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1208     gpr_log(GPR_INFO, "[rlslb %p] cache entry=%p %s: cache entry evicted",
1209             lb_policy_.get(), this, lru_iterator_->ToString().c_str());
1210   }
1211   is_shutdown_ = true;
1212   lb_policy_->cache_.lru_list_.erase(lru_iterator_);
1213   lru_iterator_ = lb_policy_->cache_.lru_list_.end();  // Just in case.
1214   backoff_state_.reset();
1215   if (backoff_timer_ != nullptr) {
1216     backoff_timer_.reset();
1217     lb_policy_->UpdatePickerAsync();
1218   }
1219   child_policy_wrappers_.clear();
1220   Unref(DEBUG_LOCATION, "Orphan");
1221 }
1222 
Size() const1223 size_t RlsLb::Cache::Entry::Size() const {
1224   // lru_iterator_ is not valid once we're shut down.
1225   GPR_ASSERT(!is_shutdown_);
1226   return lb_policy_->cache_.EntrySizeForKey(*lru_iterator_);
1227 }
1228 
Pick(PickArgs args)1229 LoadBalancingPolicy::PickResult RlsLb::Cache::Entry::Pick(PickArgs args) {
1230   size_t i = 0;
1231   ChildPolicyWrapper* child_policy_wrapper = nullptr;
1232   // Skip targets before the last one that are in state TRANSIENT_FAILURE.
1233   for (; i < child_policy_wrappers_.size(); ++i) {
1234     child_policy_wrapper = child_policy_wrappers_[i].get();
1235     if (child_policy_wrapper->connectivity_state() ==
1236             GRPC_CHANNEL_TRANSIENT_FAILURE &&
1237         i < child_policy_wrappers_.size() - 1) {
1238       if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1239         gpr_log(GPR_INFO,
1240                 "[rlslb %p] cache entry=%p %s: target %s (%" PRIuPTR
1241                 " of %" PRIuPTR ") in state TRANSIENT_FAILURE; skipping",
1242                 lb_policy_.get(), this, lru_iterator_->ToString().c_str(),
1243                 child_policy_wrapper->target().c_str(), i,
1244                 child_policy_wrappers_.size());
1245       }
1246       continue;
1247     }
1248     break;
1249   }
1250   // Child policy not in TRANSIENT_FAILURE or is the last target in
1251   // the list, so delegate.
1252   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1253     gpr_log(GPR_INFO,
1254             "[rlslb %p] cache entry=%p %s: target %s (%" PRIuPTR " of %" PRIuPTR
1255             ") in state %s; delegating",
1256             lb_policy_.get(), this, lru_iterator_->ToString().c_str(),
1257             child_policy_wrapper->target().c_str(), i,
1258             child_policy_wrappers_.size(),
1259             ConnectivityStateName(child_policy_wrapper->connectivity_state()));
1260   }
1261   // Add header data.
1262   // Note that even if the target we're using is in TRANSIENT_FAILURE,
1263   // the pick might still succeed (e.g., if the child is ring_hash), so
1264   // we need to pass the right header info down in all cases.
1265   if (!header_data_.empty()) {
1266     char* copied_header_data =
1267         static_cast<char*>(args.call_state->Alloc(header_data_.length() + 1));
1268     strcpy(copied_header_data, header_data_.c_str());
1269     args.initial_metadata->Add(kRlsHeaderKey, copied_header_data);
1270   }
1271   auto pick_result = child_policy_wrapper->Pick(args);
1272   lb_policy_->MaybeExportPickCount(kMetricTargetPicks,
1273                                    child_policy_wrapper->target(), pick_result);
1274   return pick_result;
1275 }
1276 
ResetBackoff()1277 void RlsLb::Cache::Entry::ResetBackoff() {
1278   backoff_time_ = Timestamp::InfPast();
1279   backoff_timer_.reset();
1280 }
1281 
ShouldRemove() const1282 bool RlsLb::Cache::Entry::ShouldRemove() const {
1283   Timestamp now = Timestamp::Now();
1284   return data_expiration_time_ < now && backoff_expiration_time_ < now;
1285 }
1286 
CanEvict() const1287 bool RlsLb::Cache::Entry::CanEvict() const {
1288   Timestamp now = Timestamp::Now();
1289   return min_expiration_time_ < now;
1290 }
1291 
MarkUsed()1292 void RlsLb::Cache::Entry::MarkUsed() {
1293   auto& lru_list = lb_policy_->cache_.lru_list_;
1294   auto new_it = lru_list.insert(lru_list.end(), *lru_iterator_);
1295   lru_list.erase(lru_iterator_);
1296   lru_iterator_ = new_it;
1297 }
1298 
1299 std::vector<RlsLb::ChildPolicyWrapper*>
OnRlsResponseLocked(ResponseInfo response,std::unique_ptr<BackOff> backoff_state)1300 RlsLb::Cache::Entry::OnRlsResponseLocked(
1301     ResponseInfo response, std::unique_ptr<BackOff> backoff_state) {
1302   // Move the entry to the end of the LRU list.
1303   MarkUsed();
1304   // If the request failed, store the failed status and update the
1305   // backoff state.
1306   if (!response.status.ok()) {
1307     status_ = response.status;
1308     if (backoff_state != nullptr) {
1309       backoff_state_ = std::move(backoff_state);
1310     } else {
1311       backoff_state_ = MakeCacheEntryBackoff();
1312     }
1313     backoff_time_ = backoff_state_->NextAttemptTime();
1314     Timestamp now = Timestamp::Now();
1315     backoff_expiration_time_ = now + (backoff_time_ - now) * 2;
1316     backoff_timer_ = MakeOrphanable<BackoffTimer>(
1317         Ref(DEBUG_LOCATION, "BackoffTimer"), backoff_time_);
1318     lb_policy_->UpdatePickerAsync();
1319     return {};
1320   }
1321   // Request succeeded, so store the result.
1322   header_data_ = std::move(response.header_data);
1323   Timestamp now = Timestamp::Now();
1324   data_expiration_time_ = now + lb_policy_->config_->max_age();
1325   stale_time_ = now + lb_policy_->config_->stale_age();
1326   status_ = absl::OkStatus();
1327   backoff_state_.reset();
1328   backoff_time_ = Timestamp::InfPast();
1329   backoff_expiration_time_ = Timestamp::InfPast();
1330   // Check if we need to update this list of targets.
1331   bool targets_changed = [&]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
1332     if (child_policy_wrappers_.size() != response.targets.size()) return true;
1333     for (size_t i = 0; i < response.targets.size(); ++i) {
1334       if (child_policy_wrappers_[i]->target() != response.targets[i]) {
1335         return true;
1336       }
1337     }
1338     return false;
1339   }();
1340   if (!targets_changed) {
1341     // Targets didn't change, so we're not updating the list of child
1342     // policies.  Return a new picker so that any queued requests can be
1343     // re-processed.
1344     lb_policy_->UpdatePickerAsync();
1345     return {};
1346   }
1347   // Target list changed, so update it.
1348   std::set<absl::string_view> old_targets;
1349   for (RefCountedPtr<ChildPolicyWrapper>& child_policy_wrapper :
1350        child_policy_wrappers_) {
1351     old_targets.emplace(child_policy_wrapper->target());
1352   }
1353   bool update_picker = false;
1354   std::vector<ChildPolicyWrapper*> child_policies_to_finish_update;
1355   std::vector<RefCountedPtr<ChildPolicyWrapper>> new_child_policy_wrappers;
1356   new_child_policy_wrappers.reserve(response.targets.size());
1357   for (std::string& target : response.targets) {
1358     auto it = lb_policy_->child_policy_map_.find(target);
1359     if (it == lb_policy_->child_policy_map_.end()) {
1360       auto new_child = MakeRefCounted<ChildPolicyWrapper>(
1361           lb_policy_.Ref(DEBUG_LOCATION, "ChildPolicyWrapper"), target);
1362       new_child->StartUpdate();
1363       child_policies_to_finish_update.push_back(new_child.get());
1364       new_child_policy_wrappers.emplace_back(std::move(new_child));
1365     } else {
1366       new_child_policy_wrappers.emplace_back(
1367           it->second->Ref(DEBUG_LOCATION, "CacheEntry"));
1368       // If the target already existed but was not previously used for
1369       // this key, then we'll need to update the picker, since we
1370       // didn't actually create a new child policy, which would have
1371       // triggered an RLS picker update when it returned its first picker.
1372       if (old_targets.find(target) == old_targets.end()) {
1373         update_picker = true;
1374       }
1375     }
1376   }
1377   child_policy_wrappers_ = std::move(new_child_policy_wrappers);
1378   if (update_picker) {
1379     lb_policy_->UpdatePickerAsync();
1380   }
1381   return child_policies_to_finish_update;
1382 }
1383 
1384 //
1385 // RlsLb::Cache
1386 //
1387 
Cache(RlsLb * lb_policy)1388 RlsLb::Cache::Cache(RlsLb* lb_policy) : lb_policy_(lb_policy) {
1389   StartCleanupTimer();
1390 }
1391 
Find(const RequestKey & key)1392 RlsLb::Cache::Entry* RlsLb::Cache::Find(const RequestKey& key) {
1393   auto it = map_.find(key);
1394   if (it == map_.end()) return nullptr;
1395   it->second->MarkUsed();
1396   return it->second.get();
1397 }
1398 
FindOrInsert(const RequestKey & key)1399 RlsLb::Cache::Entry* RlsLb::Cache::FindOrInsert(const RequestKey& key) {
1400   auto it = map_.find(key);
1401   // If not found, create new entry.
1402   if (it == map_.end()) {
1403     size_t entry_size = EntrySizeForKey(key);
1404     MaybeShrinkSize(size_limit_ - std::min(size_limit_, entry_size));
1405     Entry* entry = new Entry(
1406         lb_policy_->RefAsSubclass<RlsLb>(DEBUG_LOCATION, "CacheEntry"), key);
1407     map_.emplace(key, OrphanablePtr<Entry>(entry));
1408     size_ += entry_size;
1409     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1410       gpr_log(GPR_INFO, "[rlslb %p] key=%s: cache entry added, entry=%p",
1411               lb_policy_, key.ToString().c_str(), entry);
1412     }
1413     return entry;
1414   }
1415   // Entry found, so use it.
1416   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1417     gpr_log(GPR_INFO, "[rlslb %p] key=%s: found cache entry %p", lb_policy_,
1418             key.ToString().c_str(), it->second.get());
1419   }
1420   it->second->MarkUsed();
1421   return it->second.get();
1422 }
1423 
Resize(size_t bytes)1424 void RlsLb::Cache::Resize(size_t bytes) {
1425   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1426     gpr_log(GPR_INFO, "[rlslb %p] resizing cache to %" PRIuPTR " bytes",
1427             lb_policy_, bytes);
1428   }
1429   size_limit_ = bytes;
1430   MaybeShrinkSize(size_limit_);
1431 }
1432 
ResetAllBackoff()1433 void RlsLb::Cache::ResetAllBackoff() {
1434   for (auto& p : map_) {
1435     p.second->ResetBackoff();
1436   }
1437   lb_policy_->UpdatePickerAsync();
1438 }
1439 
Shutdown()1440 void RlsLb::Cache::Shutdown() {
1441   map_.clear();
1442   lru_list_.clear();
1443   if (cleanup_timer_handle_.has_value() &&
1444       lb_policy_->channel_control_helper()->GetEventEngine()->Cancel(
1445           *cleanup_timer_handle_)) {
1446     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1447       gpr_log(GPR_INFO, "[rlslb %p] cache cleanup timer canceled", lb_policy_);
1448     }
1449   }
1450   cleanup_timer_handle_.reset();
1451 }
1452 
ReportMetricsLocked(CallbackMetricReporter & reporter)1453 void RlsLb::Cache::ReportMetricsLocked(CallbackMetricReporter& reporter) {
1454   reporter.Report(
1455       kMetricCacheSize, size_,
1456       {lb_policy_->channel_control_helper()->GetTarget(),
1457        lb_policy_->config_->lookup_service(), lb_policy_->instance_uuid_},
1458       {});
1459   reporter.Report(
1460       kMetricCacheEntries, map_.size(),
1461       {lb_policy_->channel_control_helper()->GetTarget(),
1462        lb_policy_->config_->lookup_service(), lb_policy_->instance_uuid_},
1463       {});
1464 }
1465 
StartCleanupTimer()1466 void RlsLb::Cache::StartCleanupTimer() {
1467   cleanup_timer_handle_ =
1468       lb_policy_->channel_control_helper()->GetEventEngine()->RunAfter(
1469           kCacheCleanupTimerInterval,
1470           [this, lb_policy = lb_policy_->Ref(DEBUG_LOCATION,
1471                                              "CacheCleanupTimer")]() mutable {
1472             ApplicationCallbackExecCtx callback_exec_ctx;
1473             ExecCtx exec_ctx;
1474             lb_policy_->work_serializer()->Run(
1475                 [this, lb_policy = std::move(lb_policy)]() {
1476                   // The lb_policy ref is held until the callback completes
1477                   OnCleanupTimer();
1478                 },
1479                 DEBUG_LOCATION);
1480           });
1481 }
1482 
OnCleanupTimer()1483 void RlsLb::Cache::OnCleanupTimer() {
1484   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1485     gpr_log(GPR_INFO, "[rlslb %p] cache cleanup timer fired", lb_policy_);
1486   }
1487   MutexLock lock(&lb_policy_->mu_);
1488   if (!cleanup_timer_handle_.has_value()) return;
1489   if (lb_policy_->is_shutdown_) return;
1490   for (auto it = map_.begin(); it != map_.end();) {
1491     if (GPR_UNLIKELY(it->second->ShouldRemove() && it->second->CanEvict())) {
1492       size_ -= it->second->Size();
1493       it = map_.erase(it);
1494     } else {
1495       ++it;
1496     }
1497   }
1498   StartCleanupTimer();
1499 }
1500 
EntrySizeForKey(const RequestKey & key)1501 size_t RlsLb::Cache::EntrySizeForKey(const RequestKey& key) {
1502   // Key is stored twice, once in LRU list and again in the cache map.
1503   return (key.Size() * 2) + sizeof(Entry);
1504 }
1505 
MaybeShrinkSize(size_t bytes)1506 void RlsLb::Cache::MaybeShrinkSize(size_t bytes) {
1507   while (size_ > bytes) {
1508     auto lru_it = lru_list_.begin();
1509     if (GPR_UNLIKELY(lru_it == lru_list_.end())) break;
1510     auto map_it = map_.find(*lru_it);
1511     GPR_ASSERT(map_it != map_.end());
1512     if (!map_it->second->CanEvict()) break;
1513     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1514       gpr_log(GPR_INFO, "[rlslb %p] LRU eviction: removing entry %p %s",
1515               lb_policy_, map_it->second.get(), lru_it->ToString().c_str());
1516     }
1517     size_ -= map_it->second->Size();
1518     map_.erase(map_it);
1519   }
1520   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1521     gpr_log(GPR_INFO,
1522             "[rlslb %p] LRU pass complete: desired size=%" PRIuPTR
1523             " size=%" PRIuPTR,
1524             lb_policy_, bytes, size_);
1525   }
1526 }
1527 
1528 //
1529 // RlsLb::RlsChannel::StateWatcher
1530 //
1531 
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status & status)1532 void RlsLb::RlsChannel::StateWatcher::OnConnectivityStateChange(
1533     grpc_connectivity_state new_state, const absl::Status& status) {
1534   auto* lb_policy = rls_channel_->lb_policy_.get();
1535   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1536     gpr_log(GPR_INFO,
1537             "[rlslb %p] RlsChannel=%p StateWatcher=%p: "
1538             "state changed to %s (%s)",
1539             lb_policy, rls_channel_.get(), this,
1540             ConnectivityStateName(new_state), status.ToString().c_str());
1541   }
1542   if (rls_channel_->is_shutdown_) return;
1543   MutexLock lock(&lb_policy->mu_);
1544   if (new_state == GRPC_CHANNEL_READY && was_transient_failure_) {
1545     was_transient_failure_ = false;
1546     // Reset the backoff of all cache entries, so that we don't
1547     // double-penalize if an RLS request fails while the channel is
1548     // down, since the throttling for the channel being down is handled
1549     // at the channel level instead of in the individual cache entries.
1550     lb_policy->cache_.ResetAllBackoff();
1551   } else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
1552     was_transient_failure_ = true;
1553   }
1554 }
1555 
1556 //
1557 // RlsLb::RlsChannel::Throttle
1558 //
1559 
ShouldThrottle()1560 bool RlsLb::RlsChannel::Throttle::ShouldThrottle() {
1561   Timestamp now = Timestamp::Now();
1562   while (!requests_.empty() && now - requests_.front() > window_size_) {
1563     requests_.pop_front();
1564   }
1565   while (!failures_.empty() && now - failures_.front() > window_size_) {
1566     failures_.pop_front();
1567   }
1568   // Compute probability of throttling.
1569   float num_requests = requests_.size();
1570   float num_successes = num_requests - failures_.size();
1571   // Note: it's possible that this ratio will be negative, in which case
1572   // no throttling will be done.
1573   float throttle_probability =
1574       (num_requests - (num_successes * ratio_for_successes_)) /
1575       (num_requests + padding_);
1576   // Generate a random number for the request.
1577   std::uniform_real_distribution<float> dist(0, 1.0);
1578   // Check if we should throttle the request.
1579   bool throttle = dist(rng_) < throttle_probability;
1580   // If we're throttling, record the request and the failure.
1581   if (throttle) {
1582     requests_.push_back(now);
1583     failures_.push_back(now);
1584   }
1585   return throttle;
1586 }
1587 
RegisterResponse(bool success)1588 void RlsLb::RlsChannel::Throttle::RegisterResponse(bool success) {
1589   Timestamp now = Timestamp::Now();
1590   requests_.push_back(now);
1591   if (!success) failures_.push_back(now);
1592 }
1593 
1594 //
1595 // RlsLb::RlsChannel
1596 //
1597 
RlsChannel(RefCountedPtr<RlsLb> lb_policy)1598 RlsLb::RlsChannel::RlsChannel(RefCountedPtr<RlsLb> lb_policy)
1599     : InternallyRefCounted<RlsChannel>(
1600           GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace) ? "RlsChannel" : nullptr),
1601       lb_policy_(std::move(lb_policy)) {
1602   // Get channel creds from parent channel.
1603   // Note that we are using the "unsafe" channel creds here, which do
1604   // include any associated call creds.  This is safe in this case,
1605   // because we are using the parent channel's authority on the RLS channel.
1606   auto creds =
1607       lb_policy_->channel_control_helper()->GetUnsafeChannelCredentials();
1608   // Use the parent channel's authority.
1609   auto authority = lb_policy_->channel_control_helper()->GetAuthority();
1610   ChannelArgs args = ChannelArgs()
1611                          .Set(GRPC_ARG_DEFAULT_AUTHORITY, authority)
1612                          .Set(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL, 1);
1613   // Propagate fake security connector expected targets, if any.
1614   // (This is ugly, but it seems better than propagating all channel args
1615   // from the parent channel by default and then having a giant
1616   // exclude list of args to strip out, like we do in grpclb.)
1617   absl::optional<absl::string_view> fake_security_expected_targets =
1618       lb_policy_->channel_args_.GetString(
1619           GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS);
1620   if (fake_security_expected_targets.has_value()) {
1621     args = args.Set(GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS,
1622                     *fake_security_expected_targets);
1623   }
1624   // Add service config args if needed.
1625   const std::string& service_config =
1626       lb_policy_->config_->rls_channel_service_config();
1627   if (!service_config.empty()) {
1628     args = args.Set(GRPC_ARG_SERVICE_CONFIG, service_config)
1629                .Set(GRPC_ARG_SERVICE_CONFIG_DISABLE_RESOLUTION, 1);
1630   }
1631   channel_.reset(Channel::FromC(
1632       grpc_channel_create(lb_policy_->config_->lookup_service().c_str(),
1633                           creds.get(), args.ToC().get())));
1634   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1635     gpr_log(GPR_INFO, "[rlslb %p] RlsChannel=%p: created channel %p for %s",
1636             lb_policy_.get(), this, channel_.get(),
1637             lb_policy_->config_->lookup_service().c_str());
1638   }
1639   if (channel_ != nullptr) {
1640     // Set up channelz linkage.
1641     channelz::ChannelNode* child_channelz_node = channel_->channelz_node();
1642     auto parent_channelz_node =
1643         lb_policy_->channel_args_.GetObjectRef<channelz::ChannelNode>();
1644     if (child_channelz_node != nullptr && parent_channelz_node != nullptr) {
1645       parent_channelz_node->AddChildChannel(child_channelz_node->uuid());
1646       parent_channelz_node_ = std::move(parent_channelz_node);
1647     }
1648     // Start connectivity watch.
1649     watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "StateWatcher"));
1650     channel_->AddConnectivityWatcher(
1651         GRPC_CHANNEL_IDLE,
1652         OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
1653   }
1654 }
1655 
Orphan()1656 void RlsLb::RlsChannel::Orphan() {
1657   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1658     gpr_log(GPR_INFO, "[rlslb %p] RlsChannel=%p, channel=%p: shutdown",
1659             lb_policy_.get(), this, channel_.get());
1660   }
1661   is_shutdown_ = true;
1662   if (channel_ != nullptr) {
1663     // Remove channelz linkage.
1664     if (parent_channelz_node_ != nullptr) {
1665       channelz::ChannelNode* child_channelz_node = channel_->channelz_node();
1666       GPR_ASSERT(child_channelz_node != nullptr);
1667       parent_channelz_node_->RemoveChildChannel(child_channelz_node->uuid());
1668     }
1669     // Stop connectivity watch.
1670     if (watcher_ != nullptr) {
1671       channel_->RemoveConnectivityWatcher(watcher_);
1672       watcher_ = nullptr;
1673     }
1674     channel_.reset();
1675   }
1676   Unref(DEBUG_LOCATION, "Orphan");
1677 }
1678 
StartRlsCall(const RequestKey & key,Cache::Entry * stale_entry)1679 void RlsLb::RlsChannel::StartRlsCall(const RequestKey& key,
1680                                      Cache::Entry* stale_entry) {
1681   std::unique_ptr<BackOff> backoff_state;
1682   grpc_lookup_v1_RouteLookupRequest_Reason reason =
1683       grpc_lookup_v1_RouteLookupRequest_REASON_MISS;
1684   std::string stale_header_data;
1685   if (stale_entry != nullptr) {
1686     backoff_state = stale_entry->TakeBackoffState();
1687     reason = grpc_lookup_v1_RouteLookupRequest_REASON_STALE;
1688     stale_header_data = stale_entry->header_data();
1689   }
1690   lb_policy_->request_map_.emplace(
1691       key, MakeOrphanable<RlsRequest>(
1692                lb_policy_.Ref(DEBUG_LOCATION, "RlsRequest"), key,
1693                lb_policy_->rls_channel_->Ref(DEBUG_LOCATION, "RlsRequest"),
1694                std::move(backoff_state), reason, std::move(stale_header_data)));
1695 }
1696 
ReportResponseLocked(bool response_succeeded)1697 void RlsLb::RlsChannel::ReportResponseLocked(bool response_succeeded) {
1698   throttle_.RegisterResponse(response_succeeded);
1699 }
1700 
ResetBackoff()1701 void RlsLb::RlsChannel::ResetBackoff() {
1702   GPR_DEBUG_ASSERT(channel_ != nullptr);
1703   channel_->ResetConnectionBackoff();
1704 }
1705 
1706 //
1707 // RlsLb::RlsRequest
1708 //
1709 
RlsRequest(RefCountedPtr<RlsLb> lb_policy,RequestKey key,RefCountedPtr<RlsChannel> rls_channel,std::unique_ptr<BackOff> backoff_state,grpc_lookup_v1_RouteLookupRequest_Reason reason,std::string stale_header_data)1710 RlsLb::RlsRequest::RlsRequest(RefCountedPtr<RlsLb> lb_policy, RequestKey key,
1711                               RefCountedPtr<RlsChannel> rls_channel,
1712                               std::unique_ptr<BackOff> backoff_state,
1713                               grpc_lookup_v1_RouteLookupRequest_Reason reason,
1714                               std::string stale_header_data)
1715     : InternallyRefCounted<RlsRequest>(
1716           GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace) ? "RlsRequest" : nullptr),
1717       lb_policy_(std::move(lb_policy)),
1718       key_(std::move(key)),
1719       rls_channel_(std::move(rls_channel)),
1720       backoff_state_(std::move(backoff_state)),
1721       reason_(reason),
1722       stale_header_data_(std::move(stale_header_data)) {
1723   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1724     gpr_log(GPR_INFO,
1725             "[rlslb %p] rls_request=%p: RLS request created for key %s",
1726             lb_policy_.get(), this, key_.ToString().c_str());
1727   }
1728   GRPC_CLOSURE_INIT(&call_complete_cb_, OnRlsCallComplete, this, nullptr);
1729   ExecCtx::Run(
1730       DEBUG_LOCATION,
1731       GRPC_CLOSURE_INIT(&call_start_cb_, StartCall,
1732                         Ref(DEBUG_LOCATION, "StartCall").release(), nullptr),
1733       absl::OkStatus());
1734 }
1735 
~RlsRequest()1736 RlsLb::RlsRequest::~RlsRequest() { GPR_ASSERT(call_ == nullptr); }
1737 
Orphan()1738 void RlsLb::RlsRequest::Orphan() {
1739   if (call_ != nullptr) {
1740     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1741       gpr_log(GPR_INFO, "[rlslb %p] rls_request=%p %s: cancelling RLS call",
1742               lb_policy_.get(), this, key_.ToString().c_str());
1743     }
1744     grpc_call_cancel_internal(call_);
1745   }
1746   Unref(DEBUG_LOCATION, "Orphan");
1747 }
1748 
StartCall(void * arg,grpc_error_handle)1749 void RlsLb::RlsRequest::StartCall(void* arg, grpc_error_handle /*error*/) {
1750   auto* request = static_cast<RlsRequest*>(arg);
1751   request->lb_policy_->work_serializer()->Run(
1752       [request]() {
1753         request->StartCallLocked();
1754         request->Unref(DEBUG_LOCATION, "StartCall");
1755       },
1756       DEBUG_LOCATION);
1757 }
1758 
StartCallLocked()1759 void RlsLb::RlsRequest::StartCallLocked() {
1760   {
1761     MutexLock lock(&lb_policy_->mu_);
1762     if (lb_policy_->is_shutdown_) return;
1763   }
1764   Timestamp now = Timestamp::Now();
1765   deadline_ = now + lb_policy_->config_->lookup_service_timeout();
1766   grpc_metadata_array_init(&recv_initial_metadata_);
1767   grpc_metadata_array_init(&recv_trailing_metadata_);
1768   call_ = rls_channel_->channel()->CreateCall(
1769       /*parent_call=*/nullptr, GRPC_PROPAGATE_DEFAULTS, /*cq=*/nullptr,
1770       lb_policy_->interested_parties(),
1771       Slice::FromStaticString(kRlsRequestPath), /*authority=*/absl::nullopt,
1772       deadline_, /*registered_method=*/true);
1773   grpc_op ops[6];
1774   memset(ops, 0, sizeof(ops));
1775   grpc_op* op = ops;
1776   op->op = GRPC_OP_SEND_INITIAL_METADATA;
1777   ++op;
1778   op->op = GRPC_OP_SEND_MESSAGE;
1779   send_message_ = MakeRequestProto();
1780   op->data.send_message.send_message = send_message_;
1781   ++op;
1782   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
1783   ++op;
1784   op->op = GRPC_OP_RECV_INITIAL_METADATA;
1785   op->data.recv_initial_metadata.recv_initial_metadata =
1786       &recv_initial_metadata_;
1787   ++op;
1788   op->op = GRPC_OP_RECV_MESSAGE;
1789   op->data.recv_message.recv_message = &recv_message_;
1790   ++op;
1791   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
1792   op->data.recv_status_on_client.trailing_metadata = &recv_trailing_metadata_;
1793   op->data.recv_status_on_client.status = &status_recv_;
1794   op->data.recv_status_on_client.status_details = &status_details_recv_;
1795   ++op;
1796   Ref(DEBUG_LOCATION, "OnRlsCallComplete").release();
1797   auto call_error = grpc_call_start_batch_and_execute(
1798       call_, ops, static_cast<size_t>(op - ops), &call_complete_cb_);
1799   GPR_ASSERT(call_error == GRPC_CALL_OK);
1800 }
1801 
OnRlsCallComplete(void * arg,grpc_error_handle error)1802 void RlsLb::RlsRequest::OnRlsCallComplete(void* arg, grpc_error_handle error) {
1803   auto* request = static_cast<RlsRequest*>(arg);
1804   request->lb_policy_->work_serializer()->Run(
1805       [request, error]() {
1806         request->OnRlsCallCompleteLocked(error);
1807         request->Unref(DEBUG_LOCATION, "OnRlsCallComplete");
1808       },
1809       DEBUG_LOCATION);
1810 }
1811 
OnRlsCallCompleteLocked(grpc_error_handle error)1812 void RlsLb::RlsRequest::OnRlsCallCompleteLocked(grpc_error_handle error) {
1813   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1814     std::string status_message(StringViewFromSlice(status_details_recv_));
1815     gpr_log(GPR_INFO,
1816             "[rlslb %p] rls_request=%p %s, error=%s, status={%d, %s} RLS call "
1817             "response received",
1818             lb_policy_.get(), this, key_.ToString().c_str(),
1819             StatusToString(error).c_str(), status_recv_,
1820             status_message.c_str());
1821   }
1822   // Parse response.
1823   ResponseInfo response;
1824   if (!error.ok()) {
1825     grpc_status_code code;
1826     std::string message;
1827     grpc_error_get_status(error, deadline_, &code, &message,
1828                           /*http_error=*/nullptr, /*error_string=*/nullptr);
1829     response.status =
1830         absl::Status(static_cast<absl::StatusCode>(code), message);
1831   } else if (status_recv_ != GRPC_STATUS_OK) {
1832     response.status = absl::Status(static_cast<absl::StatusCode>(status_recv_),
1833                                    StringViewFromSlice(status_details_recv_));
1834   } else {
1835     response = ParseResponseProto();
1836   }
1837   // Clean up call state.
1838   grpc_byte_buffer_destroy(send_message_);
1839   grpc_byte_buffer_destroy(recv_message_);
1840   grpc_metadata_array_destroy(&recv_initial_metadata_);
1841   grpc_metadata_array_destroy(&recv_trailing_metadata_);
1842   CSliceUnref(status_details_recv_);
1843   grpc_call_unref(call_);
1844   call_ = nullptr;
1845   // Return result to cache.
1846   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1847     gpr_log(GPR_INFO, "[rlslb %p] rls_request=%p %s: response info: %s",
1848             lb_policy_.get(), this, key_.ToString().c_str(),
1849             response.ToString().c_str());
1850   }
1851   std::vector<ChildPolicyWrapper*> child_policies_to_finish_update;
1852   {
1853     MutexLock lock(&lb_policy_->mu_);
1854     if (lb_policy_->is_shutdown_) return;
1855     rls_channel_->ReportResponseLocked(response.status.ok());
1856     Cache::Entry* cache_entry = lb_policy_->cache_.FindOrInsert(key_);
1857     child_policies_to_finish_update = cache_entry->OnRlsResponseLocked(
1858         std::move(response), std::move(backoff_state_));
1859     lb_policy_->request_map_.erase(key_);
1860   }
1861   // Now that we've released the lock, finish the update on any newly
1862   // created child policies.
1863   for (ChildPolicyWrapper* child : child_policies_to_finish_update) {
1864     // TODO(roth): If the child reports an error with the update, we
1865     // need to propagate that back to the resolver somehow.
1866     (void)child->MaybeFinishUpdate();
1867   }
1868 }
1869 
MakeRequestProto()1870 grpc_byte_buffer* RlsLb::RlsRequest::MakeRequestProto() {
1871   upb::Arena arena;
1872   grpc_lookup_v1_RouteLookupRequest* req =
1873       grpc_lookup_v1_RouteLookupRequest_new(arena.ptr());
1874   grpc_lookup_v1_RouteLookupRequest_set_target_type(
1875       req, upb_StringView_FromDataAndSize(kGrpc, sizeof(kGrpc) - 1));
1876   for (const auto& kv : key_.key_map) {
1877     grpc_lookup_v1_RouteLookupRequest_key_map_set(
1878         req, upb_StringView_FromDataAndSize(kv.first.data(), kv.first.size()),
1879         upb_StringView_FromDataAndSize(kv.second.data(), kv.second.size()),
1880         arena.ptr());
1881   }
1882   grpc_lookup_v1_RouteLookupRequest_set_reason(req, reason_);
1883   if (!stale_header_data_.empty()) {
1884     grpc_lookup_v1_RouteLookupRequest_set_stale_header_data(
1885         req, upb_StringView_FromDataAndSize(stale_header_data_.data(),
1886                                             stale_header_data_.size()));
1887   }
1888   size_t len;
1889   char* buf =
1890       grpc_lookup_v1_RouteLookupRequest_serialize(req, arena.ptr(), &len);
1891   grpc_slice send_slice = grpc_slice_from_copied_buffer(buf, len);
1892   grpc_byte_buffer* byte_buffer = grpc_raw_byte_buffer_create(&send_slice, 1);
1893   CSliceUnref(send_slice);
1894   return byte_buffer;
1895 }
1896 
ParseResponseProto()1897 RlsLb::ResponseInfo RlsLb::RlsRequest::ParseResponseProto() {
1898   ResponseInfo response_info;
1899   upb::Arena arena;
1900   grpc_byte_buffer_reader bbr;
1901   grpc_byte_buffer_reader_init(&bbr, recv_message_);
1902   grpc_slice recv_slice = grpc_byte_buffer_reader_readall(&bbr);
1903   grpc_byte_buffer_reader_destroy(&bbr);
1904   grpc_lookup_v1_RouteLookupResponse* response =
1905       grpc_lookup_v1_RouteLookupResponse_parse(
1906           reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(recv_slice)),
1907           GRPC_SLICE_LENGTH(recv_slice), arena.ptr());
1908   CSliceUnref(recv_slice);
1909   if (response == nullptr) {
1910     response_info.status = absl::InternalError("cannot parse RLS response");
1911     return response_info;
1912   }
1913   size_t num_targets;
1914   const upb_StringView* targets_strview =
1915       grpc_lookup_v1_RouteLookupResponse_targets(response, &num_targets);
1916   if (num_targets == 0) {
1917     response_info.status =
1918         absl::InvalidArgumentError("RLS response has no target entry");
1919     return response_info;
1920   }
1921   response_info.targets.reserve(num_targets);
1922   for (size_t i = 0; i < num_targets; ++i) {
1923     response_info.targets.emplace_back(targets_strview[i].data,
1924                                        targets_strview[i].size);
1925   }
1926   upb_StringView header_data_strview =
1927       grpc_lookup_v1_RouteLookupResponse_header_data(response);
1928   response_info.header_data =
1929       std::string(header_data_strview.data, header_data_strview.size);
1930   return response_info;
1931 }
1932 
1933 //
1934 // RlsLb
1935 //
1936 
GenerateUUID()1937 std::string GenerateUUID() {
1938   absl::uniform_int_distribution<uint64_t> distribution;
1939   absl::BitGen bitgen;
1940   uint64_t hi = distribution(bitgen);
1941   uint64_t lo = distribution(bitgen);
1942   return GenerateUUIDv4(hi, lo);
1943 }
1944 
RlsLb(Args args)1945 RlsLb::RlsLb(Args args)
1946     : LoadBalancingPolicy(std::move(args)),
1947       instance_uuid_(channel_args()
1948                          .GetOwnedString(GRPC_ARG_TEST_ONLY_RLS_INSTANCE_ID)
1949                          .value_or(GenerateUUID())),
1950       cache_(this),
1951       registered_metric_callback_(
1952           channel_control_helper()->GetStatsPluginGroup().RegisterCallback(
1953               [this](CallbackMetricReporter& reporter) {
1954                 MutexLock lock(&mu_);
1955                 cache_.ReportMetricsLocked(reporter);
1956               },
__anon52bb9b7c0902(CallbackMetricReporter& reporter) 1957               {kMetricCacheSize, kMetricCacheEntries})) {
1958   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1959     gpr_log(GPR_INFO, "[rlslb %p] policy created", this);
1960   }
1961 }
1962 
EndpointsEqual(const absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> endpoints1,const absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> endpoints2)1963 bool EndpointsEqual(
1964     const absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> endpoints1,
1965     const absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>>
1966         endpoints2) {
1967   if (endpoints1.status() != endpoints2.status()) return false;
1968   if (endpoints1.ok()) {
1969     std::vector<EndpointAddresses> e1_list;
1970     (*endpoints1)->ForEach([&](const EndpointAddresses& endpoint) {
1971       e1_list.push_back(endpoint);
1972     });
1973     size_t i = 0;
1974     bool different = false;
1975     (*endpoints2)->ForEach([&](const EndpointAddresses& endpoint) {
1976       if (endpoint != e1_list[i++]) different = true;
1977     });
1978     if (different) return false;
1979     if (i != e1_list.size()) return false;
1980   }
1981   return true;
1982 }
1983 
UpdateLocked(UpdateArgs args)1984 absl::Status RlsLb::UpdateLocked(UpdateArgs args) {
1985   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1986     gpr_log(GPR_INFO, "[rlslb %p] policy updated", this);
1987   }
1988   update_in_progress_ = true;
1989   // Swap out config.
1990   RefCountedPtr<RlsLbConfig> old_config = std::move(config_);
1991   config_ = args.config.TakeAsSubclass<RlsLbConfig>();
1992   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace) &&
1993       (old_config == nullptr ||
1994        old_config->child_policy_config() != config_->child_policy_config())) {
1995     gpr_log(GPR_INFO, "[rlslb %p] updated child policy config: %s", this,
1996             JsonDump(config_->child_policy_config()).c_str());
1997   }
1998   // Swap out addresses.
1999   // If the new address list is an error and we have an existing address list,
2000   // stick with the existing addresses.
2001   absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> old_addresses;
2002   if (args.addresses.ok()) {
2003     old_addresses = std::move(addresses_);
2004     addresses_ = std::move(args.addresses);
2005   } else {
2006     old_addresses = addresses_;
2007   }
2008   // Swap out channel args.
2009   channel_args_ = std::move(args.args);
2010   // Determine whether we need to update all child policies.
2011   bool update_child_policies =
2012       old_config == nullptr ||
2013       old_config->child_policy_config() != config_->child_policy_config() ||
2014       !EndpointsEqual(old_addresses, addresses_) || args.args != channel_args_;
2015   // If default target changes, swap out child policy.
2016   bool created_default_child = false;
2017   if (old_config == nullptr ||
2018       config_->default_target() != old_config->default_target()) {
2019     if (config_->default_target().empty()) {
2020       if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
2021         gpr_log(GPR_INFO, "[rlslb %p] unsetting default target", this);
2022       }
2023       default_child_policy_.reset();
2024     } else {
2025       auto it = child_policy_map_.find(config_->default_target());
2026       if (it == child_policy_map_.end()) {
2027         if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
2028           gpr_log(GPR_INFO, "[rlslb %p] creating new default target", this);
2029         }
2030         default_child_policy_ = MakeRefCounted<ChildPolicyWrapper>(
2031             RefAsSubclass<RlsLb>(DEBUG_LOCATION, "ChildPolicyWrapper"),
2032             config_->default_target());
2033         created_default_child = true;
2034       } else {
2035         if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
2036           gpr_log(GPR_INFO,
2037                   "[rlslb %p] using existing child for default target", this);
2038         }
2039         default_child_policy_ =
2040             it->second->Ref(DEBUG_LOCATION, "DefaultChildPolicy");
2041       }
2042     }
2043   }
2044   // Now grab the lock to swap out the state it guards.
2045   {
2046     MutexLock lock(&mu_);
2047     // Swap out RLS channel if needed.
2048     if (old_config == nullptr ||
2049         config_->lookup_service() != old_config->lookup_service()) {
2050       rls_channel_ = MakeOrphanable<RlsChannel>(
2051           RefAsSubclass<RlsLb>(DEBUG_LOCATION, "RlsChannel"));
2052     }
2053     // Resize cache if needed.
2054     if (old_config == nullptr ||
2055         config_->cache_size_bytes() != old_config->cache_size_bytes()) {
2056       cache_.Resize(static_cast<size_t>(config_->cache_size_bytes()));
2057     }
2058     // Start update of child policies if needed.
2059     if (update_child_policies) {
2060       if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
2061         gpr_log(GPR_INFO, "[rlslb %p] starting child policy updates", this);
2062       }
2063       for (auto& p : child_policy_map_) {
2064         p.second->StartUpdate();
2065       }
2066     } else if (created_default_child) {
2067       if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
2068         gpr_log(GPR_INFO, "[rlslb %p] starting default child policy update",
2069                 this);
2070       }
2071       default_child_policy_->StartUpdate();
2072     }
2073   }
2074   // Now that we've released the lock, finish update of child policies.
2075   std::vector<std::string> errors;
2076   if (update_child_policies) {
2077     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
2078       gpr_log(GPR_INFO, "[rlslb %p] finishing child policy updates", this);
2079     }
2080     for (auto& p : child_policy_map_) {
2081       absl::Status status = p.second->MaybeFinishUpdate();
2082       if (!status.ok()) {
2083         errors.emplace_back(
2084             absl::StrCat("target ", p.first, ": ", status.ToString()));
2085       }
2086     }
2087   } else if (created_default_child) {
2088     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
2089       gpr_log(GPR_INFO, "[rlslb %p] finishing default child policy update",
2090               this);
2091     }
2092     absl::Status status = default_child_policy_->MaybeFinishUpdate();
2093     if (!status.ok()) {
2094       errors.emplace_back(absl::StrCat("target ", config_->default_target(),
2095                                        ": ", status.ToString()));
2096     }
2097   }
2098   update_in_progress_ = false;
2099   // In principle, we need to update the picker here only if the config
2100   // fields used by the picker have changed.  However, it seems fragile
2101   // to check individual fields, since the picker logic could change in
2102   // the future to use additional config fields, and we might not
2103   // remember to update the code here.  So for now, we just unconditionally
2104   // update the picker here, even though it's probably redundant.
2105   UpdatePickerLocked();
2106   // Return status.
2107   if (!errors.empty()) {
2108     return absl::UnavailableError(absl::StrCat(
2109         "errors from children: [", absl::StrJoin(errors, "; "), "]"));
2110   }
2111   return absl::OkStatus();
2112 }
2113 
ExitIdleLocked()2114 void RlsLb::ExitIdleLocked() {
2115   MutexLock lock(&mu_);
2116   for (auto& child_entry : child_policy_map_) {
2117     child_entry.second->ExitIdleLocked();
2118   }
2119 }
2120 
ResetBackoffLocked()2121 void RlsLb::ResetBackoffLocked() {
2122   {
2123     MutexLock lock(&mu_);
2124     rls_channel_->ResetBackoff();
2125     cache_.ResetAllBackoff();
2126   }
2127   for (auto& child : child_policy_map_) {
2128     child.second->ResetBackoffLocked();
2129   }
2130 }
2131 
ShutdownLocked()2132 void RlsLb::ShutdownLocked() {
2133   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
2134     gpr_log(GPR_INFO, "[rlslb %p] policy shutdown", this);
2135   }
2136   registered_metric_callback_.reset();
2137   MutexLock lock(&mu_);
2138   is_shutdown_ = true;
2139   config_.reset(DEBUG_LOCATION, "ShutdownLocked");
2140   channel_args_ = ChannelArgs();
2141   cache_.Shutdown();
2142   request_map_.clear();
2143   rls_channel_.reset();
2144   default_child_policy_.reset();
2145 }
2146 
UpdatePickerAsync()2147 void RlsLb::UpdatePickerAsync() {
2148   // Run via the ExecCtx, since the caller may be holding the lock, and
2149   // we don't want to be doing that when we hop into the WorkSerializer,
2150   // in case the WorkSerializer callback happens to run inline.
2151   ExecCtx::Run(
2152       DEBUG_LOCATION,
2153       GRPC_CLOSURE_CREATE(UpdatePickerCallback,
2154                           Ref(DEBUG_LOCATION, "UpdatePickerCallback").release(),
2155                           grpc_schedule_on_exec_ctx),
2156       absl::OkStatus());
2157 }
2158 
UpdatePickerCallback(void * arg,grpc_error_handle)2159 void RlsLb::UpdatePickerCallback(void* arg, grpc_error_handle /*error*/) {
2160   auto* rls_lb = static_cast<RlsLb*>(arg);
2161   rls_lb->work_serializer()->Run(
2162       [rls_lb]() {
2163         RefCountedPtr<RlsLb> lb_policy(rls_lb);
2164         lb_policy->UpdatePickerLocked();
2165         lb_policy.reset(DEBUG_LOCATION, "UpdatePickerCallback");
2166       },
2167       DEBUG_LOCATION);
2168 }
2169 
UpdatePickerLocked()2170 void RlsLb::UpdatePickerLocked() {
2171   // If we're in the process of propagating an update from our parent to
2172   // our children, ignore any updates that come from the children.  We
2173   // will instead return a new picker once the update has been seen by
2174   // all children.  This avoids unnecessary picker churn while an update
2175   // is being propagated to our children.
2176   if (update_in_progress_) return;
2177   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
2178     gpr_log(GPR_INFO, "[rlslb %p] updating picker", this);
2179   }
2180   grpc_connectivity_state state = GRPC_CHANNEL_IDLE;
2181   if (!child_policy_map_.empty()) {
2182     state = GRPC_CHANNEL_TRANSIENT_FAILURE;
2183     int num_idle = 0;
2184     int num_connecting = 0;
2185     {
2186       MutexLock lock(&mu_);
2187       if (is_shutdown_) return;
2188       for (auto& p : child_policy_map_) {
2189         grpc_connectivity_state child_state = p.second->connectivity_state();
2190         if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
2191           gpr_log(GPR_INFO, "[rlslb %p] target %s in state %s", this,
2192                   p.second->target().c_str(),
2193                   ConnectivityStateName(child_state));
2194         }
2195         if (child_state == GRPC_CHANNEL_READY) {
2196           state = GRPC_CHANNEL_READY;
2197           break;
2198         } else if (child_state == GRPC_CHANNEL_CONNECTING) {
2199           ++num_connecting;
2200         } else if (child_state == GRPC_CHANNEL_IDLE) {
2201           ++num_idle;
2202         }
2203       }
2204       if (state != GRPC_CHANNEL_READY) {
2205         if (num_connecting > 0) {
2206           state = GRPC_CHANNEL_CONNECTING;
2207         } else if (num_idle > 0) {
2208           state = GRPC_CHANNEL_IDLE;
2209         }
2210       }
2211     }
2212   }
2213   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
2214     gpr_log(GPR_INFO, "[rlslb %p] reporting state %s", this,
2215             ConnectivityStateName(state));
2216   }
2217   absl::Status status;
2218   if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
2219     status = absl::UnavailableError("no children available");
2220   }
2221   channel_control_helper()->UpdateState(
2222       state, status,
2223       MakeRefCounted<Picker>(RefAsSubclass<RlsLb>(DEBUG_LOCATION, "Picker")));
2224 }
2225 
MaybeExportPickCount(GlobalInstrumentsRegistry::GlobalUInt64CounterHandle handle,absl::string_view target,const PickResult & pick_result)2226 void RlsLb::MaybeExportPickCount(
2227     GlobalInstrumentsRegistry::GlobalUInt64CounterHandle handle,
2228     absl::string_view target, const PickResult& pick_result) {
2229   absl::string_view pick_result_string = Match(
2230       pick_result.result,
2231       [](const LoadBalancingPolicy::PickResult::Complete&) {
2232         return "complete";
2233       },
2234       [](const LoadBalancingPolicy::PickResult::Queue&) { return ""; },
2235       [](const LoadBalancingPolicy::PickResult::Fail&) { return "fail"; },
2236       [](const LoadBalancingPolicy::PickResult::Drop&) { return "drop"; });
2237   if (pick_result_string.empty()) return;  // Don't report queued picks.
2238   auto& stats_plugins = channel_control_helper()->GetStatsPluginGroup();
2239   stats_plugins.AddCounter(
2240       handle, 1,
2241       {channel_control_helper()->GetTarget(), config_->lookup_service(), target,
2242        pick_result_string},
2243       {});
2244 }
2245 
2246 //
2247 // RlsLbFactory
2248 //
2249 
2250 struct GrpcKeyBuilder {
2251   struct Name {
2252     std::string service;
2253     std::string method;
2254 
JsonLoadergrpc_core::__anon52bb9b7c0111::GrpcKeyBuilder::Name2255     static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
2256       static const auto* loader = JsonObjectLoader<Name>()
2257                                       .Field("service", &Name::service)
2258                                       .OptionalField("method", &Name::method)
2259                                       .Finish();
2260       return loader;
2261     }
2262   };
2263 
2264   struct NameMatcher {
2265     std::string key;
2266     std::vector<std::string> names;
2267     absl::optional<bool> required_match;
2268 
JsonLoadergrpc_core::__anon52bb9b7c0111::GrpcKeyBuilder::NameMatcher2269     static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
2270       static const auto* loader =
2271           JsonObjectLoader<NameMatcher>()
2272               .Field("key", &NameMatcher::key)
2273               .Field("names", &NameMatcher::names)
2274               .OptionalField("requiredMatch", &NameMatcher::required_match)
2275               .Finish();
2276       return loader;
2277     }
2278 
JsonPostLoadgrpc_core::__anon52bb9b7c0111::GrpcKeyBuilder::NameMatcher2279     void JsonPostLoad(const Json&, const JsonArgs&, ValidationErrors* errors) {
2280       // key must be non-empty.
2281       {
2282         ValidationErrors::ScopedField field(errors, ".key");
2283         if (!errors->FieldHasErrors() && key.empty()) {
2284           errors->AddError("must be non-empty");
2285         }
2286       }
2287       // List of header names must be non-empty.
2288       {
2289         ValidationErrors::ScopedField field(errors, ".names");
2290         if (!errors->FieldHasErrors() && names.empty()) {
2291           errors->AddError("must be non-empty");
2292         }
2293         // Individual header names must be non-empty.
2294         for (size_t i = 0; i < names.size(); ++i) {
2295           ValidationErrors::ScopedField field(errors,
2296                                               absl::StrCat("[", i, "]"));
2297           if (!errors->FieldHasErrors() && names[i].empty()) {
2298             errors->AddError("must be non-empty");
2299           }
2300         }
2301       }
2302       // requiredMatch must not be present.
2303       {
2304         ValidationErrors::ScopedField field(errors, ".requiredMatch");
2305         if (required_match.has_value()) {
2306           errors->AddError("must not be present");
2307         }
2308       }
2309     }
2310   };
2311 
2312   struct ExtraKeys {
2313     absl::optional<std::string> host_key;
2314     absl::optional<std::string> service_key;
2315     absl::optional<std::string> method_key;
2316 
JsonLoadergrpc_core::__anon52bb9b7c0111::GrpcKeyBuilder::ExtraKeys2317     static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
2318       static const auto* loader =
2319           JsonObjectLoader<ExtraKeys>()
2320               .OptionalField("host", &ExtraKeys::host_key)
2321               .OptionalField("service", &ExtraKeys::service_key)
2322               .OptionalField("method", &ExtraKeys::method_key)
2323               .Finish();
2324       return loader;
2325     }
2326 
JsonPostLoadgrpc_core::__anon52bb9b7c0111::GrpcKeyBuilder::ExtraKeys2327     void JsonPostLoad(const Json&, const JsonArgs&, ValidationErrors* errors) {
2328       auto check_field = [&](const std::string& field_name,
2329                              absl::optional<std::string>* struct_field) {
2330         ValidationErrors::ScopedField field(errors,
2331                                             absl::StrCat(".", field_name));
2332         if (struct_field->has_value() && (*struct_field)->empty()) {
2333           errors->AddError("must be non-empty if set");
2334         }
2335       };
2336       check_field("host", &host_key);
2337       check_field("service", &service_key);
2338       check_field("method", &method_key);
2339     }
2340   };
2341 
2342   std::vector<Name> names;
2343   std::vector<NameMatcher> headers;
2344   ExtraKeys extra_keys;
2345   std::map<std::string /*key*/, std::string /*value*/> constant_keys;
2346 
JsonLoadergrpc_core::__anon52bb9b7c0111::GrpcKeyBuilder2347   static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
2348     static const auto* loader =
2349         JsonObjectLoader<GrpcKeyBuilder>()
2350             .Field("names", &GrpcKeyBuilder::names)
2351             .OptionalField("headers", &GrpcKeyBuilder::headers)
2352             .OptionalField("extraKeys", &GrpcKeyBuilder::extra_keys)
2353             .OptionalField("constantKeys", &GrpcKeyBuilder::constant_keys)
2354             .Finish();
2355     return loader;
2356   }
2357 
JsonPostLoadgrpc_core::__anon52bb9b7c0111::GrpcKeyBuilder2358   void JsonPostLoad(const Json&, const JsonArgs&, ValidationErrors* errors) {
2359     // The names field must be non-empty.
2360     {
2361       ValidationErrors::ScopedField field(errors, ".names");
2362       if (!errors->FieldHasErrors() && names.empty()) {
2363         errors->AddError("must be non-empty");
2364       }
2365     }
2366     // Make sure no key in constantKeys is empty.
2367     if (constant_keys.find("") != constant_keys.end()) {
2368       ValidationErrors::ScopedField field(errors, ".constantKeys[\"\"]");
2369       errors->AddError("key must be non-empty");
2370     }
2371     // Check for duplicate keys.
2372     std::set<absl::string_view> keys_seen;
2373     auto duplicate_key_check_func = [&keys_seen, errors](
2374                                         const std::string& key,
2375                                         const std::string& field_name) {
2376       if (key.empty()) return;  // Already generated an error about this.
2377       ValidationErrors::ScopedField field(errors, field_name);
2378       auto it = keys_seen.find(key);
2379       if (it != keys_seen.end()) {
2380         errors->AddError(absl::StrCat("duplicate key \"", key, "\""));
2381       } else {
2382         keys_seen.insert(key);
2383       }
2384     };
2385     for (size_t i = 0; i < headers.size(); ++i) {
2386       NameMatcher& header = headers[i];
2387       duplicate_key_check_func(header.key,
2388                                absl::StrCat(".headers[", i, "].key"));
2389     }
2390     for (const auto& p : constant_keys) {
2391       duplicate_key_check_func(
2392           p.first, absl::StrCat(".constantKeys[\"", p.first, "\"]"));
2393     }
2394     if (extra_keys.host_key.has_value()) {
2395       duplicate_key_check_func(*extra_keys.host_key, ".extraKeys.host");
2396     }
2397     if (extra_keys.service_key.has_value()) {
2398       duplicate_key_check_func(*extra_keys.service_key, ".extraKeys.service");
2399     }
2400     if (extra_keys.method_key.has_value()) {
2401       duplicate_key_check_func(*extra_keys.method_key, ".extraKeys.method");
2402     }
2403   }
2404 };
2405 
JsonLoader(const JsonArgs &)2406 const JsonLoaderInterface* RlsLbConfig::RouteLookupConfig::JsonLoader(
2407     const JsonArgs&) {
2408   static const auto* loader =
2409       JsonObjectLoader<RouteLookupConfig>()
2410           // Note: Some fields require manual processing and are handled in
2411           // JsonPostLoad() instead.
2412           .Field("lookupService", &RouteLookupConfig::lookup_service)
2413           .OptionalField("lookupServiceTimeout",
2414                          &RouteLookupConfig::lookup_service_timeout)
2415           .OptionalField("maxAge", &RouteLookupConfig::max_age)
2416           .OptionalField("staleAge", &RouteLookupConfig::stale_age)
2417           .Field("cacheSizeBytes", &RouteLookupConfig::cache_size_bytes)
2418           .OptionalField("defaultTarget", &RouteLookupConfig::default_target)
2419           .Finish();
2420   return loader;
2421 }
2422 
JsonPostLoad(const Json & json,const JsonArgs & args,ValidationErrors * errors)2423 void RlsLbConfig::RouteLookupConfig::JsonPostLoad(const Json& json,
2424                                                   const JsonArgs& args,
2425                                                   ValidationErrors* errors) {
2426   // Parse grpcKeybuilders.
2427   auto grpc_keybuilders = LoadJsonObjectField<std::vector<GrpcKeyBuilder>>(
2428       json.object(), args, "grpcKeybuilders", errors);
2429   if (grpc_keybuilders.has_value()) {
2430     ValidationErrors::ScopedField field(errors, ".grpcKeybuilders");
2431     for (size_t i = 0; i < grpc_keybuilders->size(); ++i) {
2432       ValidationErrors::ScopedField field(errors, absl::StrCat("[", i, "]"));
2433       auto& grpc_keybuilder = (*grpc_keybuilders)[i];
2434       // Construct KeyBuilder.
2435       RlsLbConfig::KeyBuilder key_builder;
2436       for (const auto& header : grpc_keybuilder.headers) {
2437         key_builder.header_keys.emplace(header.key, header.names);
2438       }
2439       if (grpc_keybuilder.extra_keys.host_key.has_value()) {
2440         key_builder.host_key = std::move(*grpc_keybuilder.extra_keys.host_key);
2441       }
2442       if (grpc_keybuilder.extra_keys.service_key.has_value()) {
2443         key_builder.service_key =
2444             std::move(*grpc_keybuilder.extra_keys.service_key);
2445       }
2446       if (grpc_keybuilder.extra_keys.method_key.has_value()) {
2447         key_builder.method_key =
2448             std::move(*grpc_keybuilder.extra_keys.method_key);
2449       }
2450       key_builder.constant_keys = std::move(grpc_keybuilder.constant_keys);
2451       // Add entries to map.
2452       for (const auto& name : grpc_keybuilder.names) {
2453         std::string path = absl::StrCat("/", name.service, "/", name.method);
2454         bool inserted = key_builder_map.emplace(path, key_builder).second;
2455         if (!inserted) {
2456           errors->AddError(absl::StrCat("duplicate entry for \"", path, "\""));
2457         }
2458       }
2459     }
2460   }
2461   // Validate lookupService.
2462   {
2463     ValidationErrors::ScopedField field(errors, ".lookupService");
2464     if (!errors->FieldHasErrors() &&
2465         !CoreConfiguration::Get().resolver_registry().IsValidTarget(
2466             lookup_service)) {
2467       errors->AddError("must be valid gRPC target URI");
2468     }
2469   }
2470   // Clamp maxAge to the max allowed value.
2471   if (max_age > kMaxMaxAge) max_age = kMaxMaxAge;
2472   // If staleAge is set, then maxAge must also be set.
2473   if (json.object().find("staleAge") != json.object().end() &&
2474       json.object().find("maxAge") == json.object().end()) {
2475     ValidationErrors::ScopedField field(errors, ".maxAge");
2476     errors->AddError("must be set if staleAge is set");
2477   }
2478   // Ignore staleAge if greater than or equal to maxAge.
2479   if (stale_age >= max_age) stale_age = max_age;
2480   // Validate cacheSizeBytes.
2481   {
2482     ValidationErrors::ScopedField field(errors, ".cacheSizeBytes");
2483     if (!errors->FieldHasErrors() && cache_size_bytes <= 0) {
2484       errors->AddError("must be greater than 0");
2485     }
2486   }
2487   // Clamp cacheSizeBytes to the max allowed value.
2488   if (cache_size_bytes > kMaxCacheSizeBytes) {
2489     cache_size_bytes = kMaxCacheSizeBytes;
2490   }
2491   // Validate defaultTarget.
2492   {
2493     ValidationErrors::ScopedField field(errors, ".defaultTarget");
2494     if (!errors->FieldHasErrors() &&
2495         json.object().find("defaultTarget") != json.object().end() &&
2496         default_target.empty()) {
2497       errors->AddError("must be non-empty if set");
2498     }
2499   }
2500 }
2501 
JsonLoader(const JsonArgs &)2502 const JsonLoaderInterface* RlsLbConfig::JsonLoader(const JsonArgs&) {
2503   static const auto* loader =
2504       JsonObjectLoader<RlsLbConfig>()
2505           // Note: Some fields require manual processing and are handled in
2506           // JsonPostLoad() instead.
2507           .Field("routeLookupConfig", &RlsLbConfig::route_lookup_config_)
2508           .Field("childPolicyConfigTargetFieldName",
2509                  &RlsLbConfig::child_policy_config_target_field_name_)
2510           .Finish();
2511   return loader;
2512 }
2513 
JsonPostLoad(const Json & json,const JsonArgs &,ValidationErrors * errors)2514 void RlsLbConfig::JsonPostLoad(const Json& json, const JsonArgs&,
2515                                ValidationErrors* errors) {
2516   // Parse routeLookupChannelServiceConfig.
2517   auto it = json.object().find("routeLookupChannelServiceConfig");
2518   if (it != json.object().end()) {
2519     ValidationErrors::ScopedField field(errors,
2520                                         ".routeLookupChannelServiceConfig");
2521     // Don't need to save the result here, just need the errors (if any).
2522     ServiceConfigImpl::Create(ChannelArgs(), it->second, errors);
2523   }
2524   // Validate childPolicyConfigTargetFieldName.
2525   {
2526     ValidationErrors::ScopedField field(errors,
2527                                         ".childPolicyConfigTargetFieldName");
2528     if (!errors->FieldHasErrors() &&
2529         child_policy_config_target_field_name_.empty()) {
2530       errors->AddError("must be non-empty");
2531     }
2532   }
2533   // Parse childPolicy.
2534   {
2535     ValidationErrors::ScopedField field(errors, ".childPolicy");
2536     auto it = json.object().find("childPolicy");
2537     if (it == json.object().end()) {
2538       errors->AddError("field not present");
2539     } else {
2540       // Add target to all child policy configs in the list.
2541       std::string target = route_lookup_config_.default_target.empty()
2542                                ? kFakeTargetFieldValue
2543                                : route_lookup_config_.default_target;
2544       auto child_policy_config = InsertOrUpdateChildPolicyField(
2545           child_policy_config_target_field_name_, target, it->second, errors);
2546       if (child_policy_config.has_value()) {
2547         child_policy_config_ = std::move(*child_policy_config);
2548         // Parse the config.
2549         auto parsed_config =
2550             CoreConfiguration::Get()
2551                 .lb_policy_registry()
2552                 .ParseLoadBalancingConfig(child_policy_config_);
2553         if (!parsed_config.ok()) {
2554           errors->AddError(parsed_config.status().message());
2555         } else {
2556           // Find the chosen config and return it in JSON form.
2557           // We remove all non-selected configs, and in the selected config,
2558           // we leave the target field in place, set to the default value.
2559           // This slightly optimizes what we need to do later when we update
2560           // a child policy for a given target.
2561           for (const Json& config : child_policy_config_.array()) {
2562             if (config.object().begin()->first == (*parsed_config)->name()) {
2563               child_policy_config_ = Json::FromArray({config});
2564               break;
2565             }
2566           }
2567           // If default target is set, set the default child config.
2568           if (!route_lookup_config_.default_target.empty()) {
2569             default_child_policy_parsed_config_ = std::move(*parsed_config);
2570           }
2571         }
2572       }
2573     }
2574   }
2575 }
2576 
2577 class RlsLbFactory final : public LoadBalancingPolicyFactory {
2578  public:
name() const2579   absl::string_view name() const override { return kRls; }
2580 
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const2581   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
2582       LoadBalancingPolicy::Args args) const override {
2583     return MakeOrphanable<RlsLb>(std::move(args));
2584   }
2585 
2586   absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const2587   ParseLoadBalancingConfig(const Json& json) const override {
2588     return LoadFromJson<RefCountedPtr<RlsLbConfig>>(
2589         json, JsonArgs(), "errors validing RLS LB policy config");
2590   }
2591 };
2592 
2593 }  //  namespace
2594 
RegisterRlsLbPolicy(CoreConfiguration::Builder * builder)2595 void RegisterRlsLbPolicy(CoreConfiguration::Builder* builder) {
2596   builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
2597       std::make_unique<RlsLbFactory>());
2598 }
2599 
2600 }  // namespace grpc_core
2601