1 //
2 // Copyright 2020 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 // Implementation of the Route Lookup Service (RLS) LB policy
18 //
19 // The policy queries a route lookup service for the name of the actual service
20 // to use. A child policy that recognizes the name as a field of its
21 // configuration will take further load balancing action on the request.
22
23 #include <grpc/support/port_platform.h>
24
25 #include "src/core/load_balancing/rls/rls.h"
26
27 #include <inttypes.h>
28 #include <stdlib.h>
29 #include <string.h>
30
31 #include <algorithm>
32 #include <deque>
33 #include <list>
34 #include <map>
35 #include <memory>
36 #include <random>
37 #include <set>
38 #include <string>
39 #include <type_traits>
40 #include <unordered_map>
41 #include <utility>
42 #include <vector>
43
44 #include "absl/base/thread_annotations.h"
45 #include "absl/hash/hash.h"
46 #include "absl/random/random.h"
47 #include "absl/status/status.h"
48 #include "absl/status/statusor.h"
49 #include "absl/strings/str_cat.h"
50 #include "absl/strings/str_format.h"
51 #include "absl/strings/str_join.h"
52 #include "absl/strings/string_view.h"
53 #include "absl/types/optional.h"
54 #include "upb/base/string_view.h"
55 #include "upb/mem/arena.hpp"
56
57 #include <grpc/byte_buffer.h>
58 #include <grpc/byte_buffer_reader.h>
59 #include <grpc/event_engine/event_engine.h>
60 #include <grpc/grpc.h>
61 #include <grpc/impl/channel_arg_names.h>
62 #include <grpc/impl/connectivity_state.h>
63 #include <grpc/impl/propagation_bits.h>
64 #include <grpc/slice.h>
65 #include <grpc/status.h>
66 #include <grpc/support/json.h>
67 #include <grpc/support/log.h>
68
69 #include "src/core/client_channel/client_channel_filter.h"
70 #include "src/core/lib/backoff/backoff.h"
71 #include "src/core/lib/channel/channel_args.h"
72 #include "src/core/lib/channel/channelz.h"
73 #include "src/core/lib/channel/metrics.h"
74 #include "src/core/lib/config/core_configuration.h"
75 #include "src/core/lib/debug/trace.h"
76 #include "src/core/lib/gprpp/debug_location.h"
77 #include "src/core/lib/gprpp/dual_ref_counted.h"
78 #include "src/core/lib/gprpp/match.h"
79 #include "src/core/lib/gprpp/orphanable.h"
80 #include "src/core/lib/gprpp/ref_counted_ptr.h"
81 #include "src/core/lib/gprpp/status_helper.h"
82 #include "src/core/lib/gprpp/sync.h"
83 #include "src/core/lib/gprpp/time.h"
84 #include "src/core/lib/gprpp/uuid_v4.h"
85 #include "src/core/lib/gprpp/validation_errors.h"
86 #include "src/core/lib/gprpp/work_serializer.h"
87 #include "src/core/lib/iomgr/closure.h"
88 #include "src/core/lib/iomgr/error.h"
89 #include "src/core/lib/iomgr/exec_ctx.h"
90 #include "src/core/lib/iomgr/pollset_set.h"
91 #include "src/core/lib/json/json.h"
92 #include "src/core/lib/json/json_args.h"
93 #include "src/core/lib/json/json_object_loader.h"
94 #include "src/core/lib/json/json_writer.h"
95 #include "src/core/lib/security/credentials/fake/fake_credentials.h"
96 #include "src/core/lib/slice/slice.h"
97 #include "src/core/lib/slice/slice_internal.h"
98 #include "src/core/lib/surface/call.h"
99 #include "src/core/lib/surface/channel.h"
100 #include "src/core/lib/transport/connectivity_state.h"
101 #include "src/core/lib/transport/error_utils.h"
102 #include "src/core/load_balancing/child_policy_handler.h"
103 #include "src/core/load_balancing/delegating_helper.h"
104 #include "src/core/load_balancing/lb_policy.h"
105 #include "src/core/load_balancing/lb_policy_factory.h"
106 #include "src/core/load_balancing/lb_policy_registry.h"
107 #include "src/core/resolver/endpoint_addresses.h"
108 #include "src/core/resolver/resolver_registry.h"
109 #include "src/core/service_config/service_config_impl.h"
110 #include "src/proto/grpc/lookup/v1/rls.upb.h"
111
112 using ::grpc_event_engine::experimental::EventEngine;
113
114 namespace grpc_core {
115
116 TraceFlag grpc_lb_rls_trace(false, "rls_lb");
117
118 namespace {
119
120 constexpr absl::string_view kMetricLabelRlsServerTarget =
121 "grpc.lb.rls.server_target";
122 constexpr absl::string_view kMetricLabelRlsInstanceUuid =
123 "grpc.lb.rls.instance_uuid";
124 constexpr absl::string_view kMetricRlsDataPlaneTarget =
125 "grpc.lb.rls.data_plane_target";
126 constexpr absl::string_view kMetricLabelPickResult = "grpc.lb.pick_result";
127
128 const auto kMetricCacheSize =
129 GlobalInstrumentsRegistry::RegisterCallbackInt64Gauge(
130 "grpc.lb.rls.cache_size", "EXPERIMENTAL. Size of the RLS cache.", "By",
131 {kMetricLabelTarget, kMetricLabelRlsServerTarget,
132 kMetricLabelRlsInstanceUuid},
133 {}, false);
134
135 const auto kMetricCacheEntries =
136 GlobalInstrumentsRegistry::RegisterCallbackInt64Gauge(
137 "grpc.lb.rls.cache_entries",
138 "EXPERIMENTAL. Number of entries in the RLS cache.", "{entry}",
139 {kMetricLabelTarget, kMetricLabelRlsServerTarget,
140 kMetricLabelRlsInstanceUuid},
141 {}, false);
142
143 const auto kMetricDefaultTargetPicks =
144 GlobalInstrumentsRegistry::RegisterUInt64Counter(
145 "grpc.lb.rls.default_target_picks",
146 "EXPERIMENTAL. Number of LB picks sent to the default target.",
147 "{pick}",
148 {kMetricLabelTarget, kMetricLabelRlsServerTarget,
149 kMetricRlsDataPlaneTarget, kMetricLabelPickResult},
150 {}, false);
151
152 const auto kMetricTargetPicks =
153 GlobalInstrumentsRegistry::RegisterUInt64Counter(
154 "grpc.lb.rls.target_picks",
155 "EXPERIMENTAL. Number of LB picks sent to each RLS target. Note that "
156 "if the default target is also returned by the RLS server, RPCs sent "
157 "to that target from the cache will be counted in this metric, not "
158 "in grpc.rls.default_target_picks.",
159 "{pick}",
160 {kMetricLabelTarget, kMetricLabelRlsServerTarget,
161 kMetricRlsDataPlaneTarget, kMetricLabelPickResult},
162 {}, false);
163
164 const auto kMetricFailedPicks =
165 GlobalInstrumentsRegistry::RegisterUInt64Counter(
166 "grpc.lb.rls.failed_picks",
167 "EXPERIMENTAL. Number of LB picks failed due to either a failed RLS "
168 "request or the RLS channel being throttled.",
169 "{pick}", {kMetricLabelTarget, kMetricLabelRlsServerTarget}, {}, false);
170
171 constexpr absl::string_view kRls = "rls_experimental";
172 const char kGrpc[] = "grpc";
173 const char* kRlsRequestPath = "/grpc.lookup.v1.RouteLookupService/RouteLookup";
174 const char* kFakeTargetFieldValue = "fake_target_field_value";
175 const char* kRlsHeaderKey = "x-google-rls-data";
176
177 const Duration kDefaultLookupServiceTimeout = Duration::Seconds(10);
178 const Duration kMaxMaxAge = Duration::Minutes(5);
179 const Duration kMinExpirationTime = Duration::Seconds(5);
180 const Duration kCacheBackoffInitial = Duration::Seconds(1);
181 const double kCacheBackoffMultiplier = 1.6;
182 const double kCacheBackoffJitter = 0.2;
183 const Duration kCacheBackoffMax = Duration::Minutes(2);
184 const Duration kDefaultThrottleWindowSize = Duration::Seconds(30);
185 const double kDefaultThrottleRatioForSuccesses = 2.0;
186 const int kDefaultThrottlePadding = 8;
187 const Duration kCacheCleanupTimerInterval = Duration::Minutes(1);
188 const int64_t kMaxCacheSizeBytes = 5 * 1024 * 1024;
189
190 // Parsed RLS LB policy configuration.
191 class RlsLbConfig final : public LoadBalancingPolicy::Config {
192 public:
193 struct KeyBuilder {
194 std::map<std::string /*key*/, std::vector<std::string /*header*/>>
195 header_keys;
196 std::string host_key;
197 std::string service_key;
198 std::string method_key;
199 std::map<std::string /*key*/, std::string /*value*/> constant_keys;
200 };
201 using KeyBuilderMap = std::unordered_map<std::string /*path*/, KeyBuilder>;
202
203 struct RouteLookupConfig {
204 KeyBuilderMap key_builder_map;
205 std::string lookup_service;
206 Duration lookup_service_timeout = kDefaultLookupServiceTimeout;
207 Duration max_age = kMaxMaxAge;
208 Duration stale_age = kMaxMaxAge;
209 int64_t cache_size_bytes = 0;
210 std::string default_target;
211
212 static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
213 void JsonPostLoad(const Json& json, const JsonArgs& args,
214 ValidationErrors* errors);
215 };
216
217 RlsLbConfig() = default;
218
219 RlsLbConfig(const RlsLbConfig&) = delete;
220 RlsLbConfig& operator=(const RlsLbConfig&) = delete;
221
222 RlsLbConfig(RlsLbConfig&& other) = delete;
223 RlsLbConfig& operator=(RlsLbConfig&& other) = delete;
224
name() const225 absl::string_view name() const override { return kRls; }
226
key_builder_map() const227 const KeyBuilderMap& key_builder_map() const {
228 return route_lookup_config_.key_builder_map;
229 }
lookup_service() const230 const std::string& lookup_service() const {
231 return route_lookup_config_.lookup_service;
232 }
lookup_service_timeout() const233 Duration lookup_service_timeout() const {
234 return route_lookup_config_.lookup_service_timeout;
235 }
max_age() const236 Duration max_age() const { return route_lookup_config_.max_age; }
stale_age() const237 Duration stale_age() const { return route_lookup_config_.stale_age; }
cache_size_bytes() const238 int64_t cache_size_bytes() const {
239 return route_lookup_config_.cache_size_bytes;
240 }
default_target() const241 const std::string& default_target() const {
242 return route_lookup_config_.default_target;
243 }
rls_channel_service_config() const244 const std::string& rls_channel_service_config() const {
245 return rls_channel_service_config_;
246 }
child_policy_config() const247 const Json& child_policy_config() const { return child_policy_config_; }
child_policy_config_target_field_name() const248 const std::string& child_policy_config_target_field_name() const {
249 return child_policy_config_target_field_name_;
250 }
251 RefCountedPtr<LoadBalancingPolicy::Config>
default_child_policy_parsed_config() const252 default_child_policy_parsed_config() const {
253 return default_child_policy_parsed_config_;
254 }
255
256 static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
257 void JsonPostLoad(const Json& json, const JsonArgs&,
258 ValidationErrors* errors);
259
260 private:
261 RouteLookupConfig route_lookup_config_;
262 std::string rls_channel_service_config_;
263 Json child_policy_config_;
264 std::string child_policy_config_target_field_name_;
265 RefCountedPtr<LoadBalancingPolicy::Config>
266 default_child_policy_parsed_config_;
267 };
268
269 // RLS LB policy.
270 class RlsLb final : public LoadBalancingPolicy {
271 public:
272 explicit RlsLb(Args args);
273
name() const274 absl::string_view name() const override { return kRls; }
275 absl::Status UpdateLocked(UpdateArgs args) override;
276 void ExitIdleLocked() override;
277 void ResetBackoffLocked() override;
278
279 private:
280 // Key to access entries in the cache and the request map.
281 struct RequestKey {
282 std::map<std::string, std::string> key_map;
283
operator ==grpc_core::__anon52bb9b7c0111::RlsLb::RequestKey284 bool operator==(const RequestKey& rhs) const {
285 return key_map == rhs.key_map;
286 }
287
288 template <typename H>
AbslHashValue(H h,const RequestKey & key)289 friend H AbslHashValue(H h, const RequestKey& key) {
290 std::hash<std::string> string_hasher;
291 for (auto& kv : key.key_map) {
292 h = H::combine(std::move(h), string_hasher(kv.first),
293 string_hasher(kv.second));
294 }
295 return h;
296 }
297
Sizegrpc_core::__anon52bb9b7c0111::RlsLb::RequestKey298 size_t Size() const {
299 size_t size = sizeof(RequestKey);
300 for (auto& kv : key_map) {
301 size += kv.first.length() + kv.second.length();
302 }
303 return size;
304 }
305
ToStringgrpc_core::__anon52bb9b7c0111::RlsLb::RequestKey306 std::string ToString() const {
307 return absl::StrCat(
308 "{", absl::StrJoin(key_map, ",", absl::PairFormatter("=")), "}");
309 }
310 };
311
312 // Data from an RLS response.
313 struct ResponseInfo {
314 absl::Status status;
315 std::vector<std::string> targets;
316 std::string header_data;
317
ToStringgrpc_core::__anon52bb9b7c0111::RlsLb::ResponseInfo318 std::string ToString() const {
319 return absl::StrFormat("{status=%s, targets=[%s], header_data=\"%s\"}",
320 status.ToString(), absl::StrJoin(targets, ","),
321 header_data);
322 }
323 };
324
325 // Wraps a child policy for a given RLS target.
326 class ChildPolicyWrapper final : public DualRefCounted<ChildPolicyWrapper> {
327 public:
328 ChildPolicyWrapper(RefCountedPtr<RlsLb> lb_policy, std::string target);
329
target() const330 const std::string& target() const { return target_; }
331
Pick(PickArgs args)332 PickResult Pick(PickArgs args) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
333 return picker_->Pick(args);
334 }
335
336 // Updates for the child policy are handled in two phases:
337 // 1. In StartUpdate(), we parse and validate the new child policy
338 // config and store the parsed config.
339 // 2. In MaybeFinishUpdate(), we actually pass the parsed config to the
340 // child policy's UpdateLocked() method.
341 //
342 // The reason we do this is to avoid deadlocks. In StartUpdate(),
343 // if the new config fails to validate, then we need to set
344 // picker_ to an instance that will fail all requests, which
345 // requires holding the lock. However, we cannot call the child
346 // policy's UpdateLocked() method from MaybeFinishUpdate() while
347 // holding the lock, since that would cause a deadlock: the child's
348 // UpdateLocked() will call the helper's UpdateState() method, which
349 // will try to acquire the lock to set picker_. So StartUpdate() is
350 // called while we are still holding the lock, but MaybeFinishUpdate()
351 // is called after releasing it.
352 //
353 // Both methods grab the data they need from the parent object.
354 void StartUpdate() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
355 absl::Status MaybeFinishUpdate() ABSL_LOCKS_EXCLUDED(&RlsLb::mu_);
356
ExitIdleLocked()357 void ExitIdleLocked() {
358 if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
359 }
360
ResetBackoffLocked()361 void ResetBackoffLocked() {
362 if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
363 }
364
365 // Gets the connectivity state of the child policy. Once the child policy
366 // reports TRANSIENT_FAILURE, the function will always return
367 // TRANSIENT_FAILURE state instead of the actual state of the child policy
368 // until the child policy reports another READY state.
connectivity_state() const369 grpc_connectivity_state connectivity_state() const
370 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
371 return connectivity_state_;
372 }
373
374 private:
375 // ChannelControlHelper object that allows the child policy to update state
376 // with the wrapper.
377 class ChildPolicyHelper final : public DelegatingChannelControlHelper {
378 public:
ChildPolicyHelper(WeakRefCountedPtr<ChildPolicyWrapper> wrapper)379 explicit ChildPolicyHelper(WeakRefCountedPtr<ChildPolicyWrapper> wrapper)
380 : wrapper_(std::move(wrapper)) {}
~ChildPolicyHelper()381 ~ChildPolicyHelper() override {
382 wrapper_.reset(DEBUG_LOCATION, "ChildPolicyHelper");
383 }
384
385 void UpdateState(grpc_connectivity_state state,
386 const absl::Status& status,
387 RefCountedPtr<SubchannelPicker> picker) override;
388
389 private:
parent_helper() const390 ChannelControlHelper* parent_helper() const override {
391 return wrapper_->lb_policy_->channel_control_helper();
392 }
393
394 WeakRefCountedPtr<ChildPolicyWrapper> wrapper_;
395 };
396
397 // Note: We are forced to disable lock analysis here because
398 // Orphan() is called by Unref() which is called by RefCountedPtr<>, which
399 // cannot have lock annotations for this particular caller.
400 void Orphaned() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
401
402 RefCountedPtr<RlsLb> lb_policy_;
403 std::string target_;
404
405 bool is_shutdown_ = false;
406
407 OrphanablePtr<ChildPolicyHandler> child_policy_;
408 RefCountedPtr<LoadBalancingPolicy::Config> pending_config_;
409
410 grpc_connectivity_state connectivity_state_ ABSL_GUARDED_BY(&RlsLb::mu_) =
411 GRPC_CHANNEL_CONNECTING;
412 RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker_
413 ABSL_GUARDED_BY(&RlsLb::mu_);
414 };
415
416 // A picker that uses the cache and the request map in the LB policy
417 // (synchronized via a mutex) to determine how to route requests.
418 class Picker final : public LoadBalancingPolicy::SubchannelPicker {
419 public:
420 explicit Picker(RefCountedPtr<RlsLb> lb_policy);
421
422 PickResult Pick(PickArgs args) override;
423
424 private:
425 PickResult PickFromDefaultTargetOrFail(const char* reason, PickArgs args,
426 absl::Status status)
427 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
428
429 RefCountedPtr<RlsLb> lb_policy_;
430 RefCountedPtr<RlsLbConfig> config_;
431 RefCountedPtr<ChildPolicyWrapper> default_child_policy_;
432 };
433
434 // An LRU cache with adjustable size.
435 class Cache final {
436 public:
437 using Iterator = std::list<RequestKey>::iterator;
438
439 class Entry final : public InternallyRefCounted<Entry> {
440 public:
441 Entry(RefCountedPtr<RlsLb> lb_policy, const RequestKey& key);
442
443 // Notify the entry when it's evicted from the cache. Performs shut down.
444 // Note: We are forced to disable lock analysis here because
445 // Orphan() is called by OrphanablePtr<>, which cannot have lock
446 // annotations for this particular caller.
447 void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
448
status() const449 const absl::Status& status() const
450 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
451 return status_;
452 }
backoff_time() const453 Timestamp backoff_time() const
454 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
455 return backoff_time_;
456 }
backoff_expiration_time() const457 Timestamp backoff_expiration_time() const
458 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
459 return backoff_expiration_time_;
460 }
data_expiration_time() const461 Timestamp data_expiration_time() const
462 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
463 return data_expiration_time_;
464 }
header_data() const465 const std::string& header_data() const
466 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
467 return header_data_;
468 }
stale_time() const469 Timestamp stale_time() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
470 return stale_time_;
471 }
min_expiration_time() const472 Timestamp min_expiration_time() const
473 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
474 return min_expiration_time_;
475 }
476
TakeBackoffState()477 std::unique_ptr<BackOff> TakeBackoffState()
478 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
479 return std::move(backoff_state_);
480 }
481
482 // Cache size of entry.
483 size_t Size() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
484
485 // Pick subchannel for request based on the entry's state.
486 PickResult Pick(PickArgs args) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
487
488 // If the cache entry is in backoff state, resets the backoff and, if
489 // applicable, its backoff timer. The method does not update the LB
490 // policy's picker; the caller is responsible for that if necessary.
491 void ResetBackoff() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
492
493 // Check if the entry should be removed by the clean-up timer.
494 bool ShouldRemove() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
495
496 // Check if the entry can be evicted from the cache, i.e. the
497 // min_expiration_time_ has passed.
498 bool CanEvict() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
499
500 // Updates the entry upon reception of a new RLS response.
501 // Returns a list of child policy wrappers on which FinishUpdate()
502 // needs to be called after releasing the lock.
503 std::vector<ChildPolicyWrapper*> OnRlsResponseLocked(
504 ResponseInfo response, std::unique_ptr<BackOff> backoff_state)
505 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
506
507 // Moves entry to the end of the LRU list.
508 void MarkUsed() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
509
510 private:
511 class BackoffTimer final : public InternallyRefCounted<BackoffTimer> {
512 public:
513 BackoffTimer(RefCountedPtr<Entry> entry, Timestamp backoff_time);
514
515 // Note: We are forced to disable lock analysis here because
516 // Orphan() is called by OrphanablePtr<>, which cannot have lock
517 // annotations for this particular caller.
518 void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
519
520 private:
521 void OnBackoffTimerLocked();
522
523 RefCountedPtr<Entry> entry_;
524 absl::optional<EventEngine::TaskHandle> backoff_timer_task_handle_
525 ABSL_GUARDED_BY(&RlsLb::mu_);
526 };
527
528 RefCountedPtr<RlsLb> lb_policy_;
529
530 bool is_shutdown_ ABSL_GUARDED_BY(&RlsLb::mu_) = false;
531
532 // Backoff states
533 absl::Status status_ ABSL_GUARDED_BY(&RlsLb::mu_);
534 std::unique_ptr<BackOff> backoff_state_ ABSL_GUARDED_BY(&RlsLb::mu_);
535 Timestamp backoff_time_ ABSL_GUARDED_BY(&RlsLb::mu_) =
536 Timestamp::InfPast();
537 Timestamp backoff_expiration_time_ ABSL_GUARDED_BY(&RlsLb::mu_) =
538 Timestamp::InfPast();
539 OrphanablePtr<BackoffTimer> backoff_timer_;
540
541 // RLS response states
542 std::vector<RefCountedPtr<ChildPolicyWrapper>> child_policy_wrappers_
543 ABSL_GUARDED_BY(&RlsLb::mu_);
544 std::string header_data_ ABSL_GUARDED_BY(&RlsLb::mu_);
545 Timestamp data_expiration_time_ ABSL_GUARDED_BY(&RlsLb::mu_) =
546 Timestamp::InfPast();
547 Timestamp stale_time_ ABSL_GUARDED_BY(&RlsLb::mu_) = Timestamp::InfPast();
548
549 Timestamp min_expiration_time_ ABSL_GUARDED_BY(&RlsLb::mu_);
550 Cache::Iterator lru_iterator_ ABSL_GUARDED_BY(&RlsLb::mu_);
551 };
552
553 explicit Cache(RlsLb* lb_policy);
554
555 // Finds an entry from the cache that corresponds to a key. If an entry is
556 // not found, nullptr is returned. Otherwise, the entry is considered
557 // recently used and its order in the LRU list of the cache is updated.
558 Entry* Find(const RequestKey& key)
559 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
560
561 // Finds an entry from the cache that corresponds to a key. If an entry is
562 // not found, an entry is created, inserted in the cache, and returned to
563 // the caller. Otherwise, the entry found is returned to the caller. The
564 // entry returned to the user is considered recently used and its order in
565 // the LRU list of the cache is updated.
566 Entry* FindOrInsert(const RequestKey& key)
567 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
568
569 // Resizes the cache. If the new cache size is greater than the current size
570 // of the cache, do nothing. Otherwise, evict the oldest entries that
571 // exceed the new size limit of the cache.
572 void Resize(size_t bytes) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
573
574 // Resets backoff of all the cache entries.
575 void ResetAllBackoff() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
576
577 // Shutdown the cache; clean-up and orphan all the stored cache entries.
578 void Shutdown() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
579
580 void ReportMetricsLocked(CallbackMetricReporter& reporter)
581 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
582
583 private:
584 // Shared logic for starting the cleanup timer
585 void StartCleanupTimer() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
586
587 void OnCleanupTimer();
588
589 // Returns the entry size for a given key.
590 static size_t EntrySizeForKey(const RequestKey& key);
591
592 // Evicts oversized cache elements when the current size is greater than
593 // the specified limit.
594 void MaybeShrinkSize(size_t bytes)
595 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
596
597 RlsLb* lb_policy_;
598
599 size_t size_limit_ ABSL_GUARDED_BY(&RlsLb::mu_) = 0;
600 size_t size_ ABSL_GUARDED_BY(&RlsLb::mu_) = 0;
601
602 std::list<RequestKey> lru_list_ ABSL_GUARDED_BY(&RlsLb::mu_);
603 std::unordered_map<RequestKey, OrphanablePtr<Entry>, absl::Hash<RequestKey>>
604 map_ ABSL_GUARDED_BY(&RlsLb::mu_);
605 absl::optional<EventEngine::TaskHandle> cleanup_timer_handle_;
606 };
607
608 // Channel for communicating with the RLS server.
609 // Contains throttling logic for RLS requests.
610 class RlsChannel final : public InternallyRefCounted<RlsChannel> {
611 public:
612 explicit RlsChannel(RefCountedPtr<RlsLb> lb_policy);
613
614 // Shuts down the channel.
615 void Orphan() override;
616
617 // Starts an RLS call.
618 // If stale_entry is non-null, it points to the entry containing
619 // stale data for the key.
620 void StartRlsCall(const RequestKey& key, Cache::Entry* stale_entry)
621 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
622
623 // Reports the result of an RLS call to the throttle.
624 void ReportResponseLocked(bool response_succeeded)
625 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
626
627 // Checks if a proposed RLS call should be throttled.
ShouldThrottle()628 bool ShouldThrottle() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
629 return throttle_.ShouldThrottle();
630 }
631
632 // Resets the channel's backoff.
633 void ResetBackoff();
634
channel() const635 Channel* channel() const { return channel_.get(); }
636
637 private:
638 // Watches the state of the RLS channel. Notifies the LB policy when
639 // the channel was previously in TRANSIENT_FAILURE and then becomes READY.
640 class StateWatcher final : public AsyncConnectivityStateWatcherInterface {
641 public:
StateWatcher(RefCountedPtr<RlsChannel> rls_channel)642 explicit StateWatcher(RefCountedPtr<RlsChannel> rls_channel)
643 : AsyncConnectivityStateWatcherInterface(
644 rls_channel->lb_policy_->work_serializer()),
645 rls_channel_(std::move(rls_channel)) {}
646
647 private:
648 void OnConnectivityStateChange(grpc_connectivity_state new_state,
649 const absl::Status& status) override;
650
651 RefCountedPtr<RlsChannel> rls_channel_;
652 bool was_transient_failure_ = false;
653 };
654
655 // Throttle state for RLS requests.
656 class Throttle final {
657 public:
Throttle(Duration window_size=kDefaultThrottleWindowSize,float ratio_for_successes=kDefaultThrottleRatioForSuccesses,int padding=kDefaultThrottlePadding)658 explicit Throttle(
659 Duration window_size = kDefaultThrottleWindowSize,
660 float ratio_for_successes = kDefaultThrottleRatioForSuccesses,
661 int padding = kDefaultThrottlePadding)
662 : window_size_(window_size),
663 ratio_for_successes_(ratio_for_successes),
664 padding_(padding) {}
665
666 bool ShouldThrottle() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
667
668 void RegisterResponse(bool success)
669 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
670
671 private:
672 Duration window_size_;
673 double ratio_for_successes_;
674 int padding_;
675 std::mt19937 rng_{std::random_device()()};
676
677 // Logged timestamp of requests.
678 std::deque<Timestamp> requests_ ABSL_GUARDED_BY(&RlsLb::mu_);
679
680 // Logged timestamps of failures.
681 std::deque<Timestamp> failures_ ABSL_GUARDED_BY(&RlsLb::mu_);
682 };
683
684 RefCountedPtr<RlsLb> lb_policy_;
685 bool is_shutdown_ = false;
686
687 OrphanablePtr<Channel> channel_;
688 RefCountedPtr<channelz::ChannelNode> parent_channelz_node_;
689 StateWatcher* watcher_ = nullptr;
690 Throttle throttle_ ABSL_GUARDED_BY(&RlsLb::mu_);
691 };
692
693 // A pending RLS request. Instances will be tracked in request_map_.
694 class RlsRequest final : public InternallyRefCounted<RlsRequest> {
695 public:
696 // Asynchronously starts a call on rls_channel for key.
697 // Stores backoff_state, which will be transferred to the data cache
698 // if the RLS request fails.
699 RlsRequest(RefCountedPtr<RlsLb> lb_policy, RlsLb::RequestKey key,
700 RefCountedPtr<RlsChannel> rls_channel,
701 std::unique_ptr<BackOff> backoff_state,
702 grpc_lookup_v1_RouteLookupRequest_Reason reason,
703 std::string stale_header_data);
704 ~RlsRequest() override;
705
706 // Shuts down the request. If the request is still in flight, it is
707 // cancelled, in which case no response will be added to the cache.
708 void Orphan() override;
709
710 private:
711 // Callback to be invoked to start the call.
712 static void StartCall(void* arg, grpc_error_handle error);
713
714 // Helper for StartCall() that runs within the WorkSerializer.
715 void StartCallLocked();
716
717 // Callback to be invoked when the call is completed.
718 static void OnRlsCallComplete(void* arg, grpc_error_handle error);
719
720 // Call completion callback running on LB policy WorkSerializer.
721 void OnRlsCallCompleteLocked(grpc_error_handle error);
722
723 grpc_byte_buffer* MakeRequestProto();
724 ResponseInfo ParseResponseProto();
725
726 RefCountedPtr<RlsLb> lb_policy_;
727 RlsLb::RequestKey key_;
728 RefCountedPtr<RlsChannel> rls_channel_;
729 std::unique_ptr<BackOff> backoff_state_;
730 grpc_lookup_v1_RouteLookupRequest_Reason reason_;
731 std::string stale_header_data_;
732
733 // RLS call state.
734 Timestamp deadline_;
735 grpc_closure call_start_cb_;
736 grpc_closure call_complete_cb_;
737 grpc_call* call_ = nullptr;
738 grpc_byte_buffer* send_message_ = nullptr;
739 grpc_metadata_array recv_initial_metadata_;
740 grpc_byte_buffer* recv_message_ = nullptr;
741 grpc_metadata_array recv_trailing_metadata_;
742 grpc_status_code status_recv_;
743 grpc_slice status_details_recv_;
744 };
745
746 void ShutdownLocked() override;
747
748 // Returns a new picker to the channel to trigger reprocessing of
749 // pending picks. Schedules the actual picker update on the ExecCtx
750 // to be run later, so it's safe to invoke this while holding the lock.
751 void UpdatePickerAsync();
752 // Hops into work serializer and calls UpdatePickerLocked().
753 static void UpdatePickerCallback(void* arg, grpc_error_handle error);
754 // Updates the picker in the work serializer.
755 void UpdatePickerLocked() ABSL_LOCKS_EXCLUDED(&mu_);
756
757 void MaybeExportPickCount(
758 GlobalInstrumentsRegistry::GlobalUInt64CounterHandle handle,
759 absl::string_view target, const PickResult& pick_result);
760
761 const std::string instance_uuid_;
762
763 // Mutex to guard LB policy state that is accessed by the picker.
764 Mutex mu_;
765 bool is_shutdown_ ABSL_GUARDED_BY(mu_) = false;
766 bool update_in_progress_ = false;
767 Cache cache_ ABSL_GUARDED_BY(mu_);
768 // Maps an RLS request key to an RlsRequest object that represents a pending
769 // RLS request.
770 std::unordered_map<RequestKey, OrphanablePtr<RlsRequest>,
771 absl::Hash<RequestKey>>
772 request_map_ ABSL_GUARDED_BY(mu_);
773 // The channel on which RLS requests are sent.
774 // Note that this channel may be swapped out when the RLS policy gets
775 // an update. However, when that happens, any existing entries in
776 // request_map_ will continue to use the previous channel.
777 OrphanablePtr<RlsChannel> rls_channel_ ABSL_GUARDED_BY(mu_);
778
779 // Accessed only from within WorkSerializer.
780 absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses_;
781 ChannelArgs channel_args_;
782 RefCountedPtr<RlsLbConfig> config_;
783 RefCountedPtr<ChildPolicyWrapper> default_child_policy_;
784 std::map<std::string /*target*/, ChildPolicyWrapper*> child_policy_map_;
785
786 // Must be after mu_, so that it is destroyed before mu_.
787 std::unique_ptr<RegisteredMetricCallback> registered_metric_callback_;
788 };
789
790 //
791 // RlsLb::ChildPolicyWrapper
792 //
793
ChildPolicyWrapper(RefCountedPtr<RlsLb> lb_policy,std::string target)794 RlsLb::ChildPolicyWrapper::ChildPolicyWrapper(RefCountedPtr<RlsLb> lb_policy,
795 std::string target)
796 : DualRefCounted<ChildPolicyWrapper>(
797 GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace) ? "ChildPolicyWrapper"
798 : nullptr),
799 lb_policy_(std::move(lb_policy)),
800 target_(std::move(target)),
801 picker_(MakeRefCounted<QueuePicker>(nullptr)) {
802 lb_policy_->child_policy_map_.emplace(target_, this);
803 }
804
Orphaned()805 void RlsLb::ChildPolicyWrapper::Orphaned() {
806 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
807 gpr_log(GPR_INFO, "[rlslb %p] ChildPolicyWrapper=%p [%s]: shutdown",
808 lb_policy_.get(), this, target_.c_str());
809 }
810 is_shutdown_ = true;
811 lb_policy_->child_policy_map_.erase(target_);
812 if (child_policy_ != nullptr) {
813 grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
814 lb_policy_->interested_parties());
815 child_policy_.reset();
816 }
817 picker_.reset();
818 }
819
InsertOrUpdateChildPolicyField(const std::string & field,const std::string & value,const Json & config,ValidationErrors * errors)820 absl::optional<Json> InsertOrUpdateChildPolicyField(const std::string& field,
821 const std::string& value,
822 const Json& config,
823 ValidationErrors* errors) {
824 if (config.type() != Json::Type::kArray) {
825 errors->AddError("is not an array");
826 return absl::nullopt;
827 }
828 const size_t original_num_errors = errors->size();
829 Json::Array array;
830 for (size_t i = 0; i < config.array().size(); ++i) {
831 const Json& child_json = config.array()[i];
832 ValidationErrors::ScopedField json_field(errors, absl::StrCat("[", i, "]"));
833 if (child_json.type() != Json::Type::kObject) {
834 errors->AddError("is not an object");
835 } else {
836 const Json::Object& child = child_json.object();
837 if (child.size() != 1) {
838 errors->AddError("child policy object contains more than one field");
839 } else {
840 const std::string& child_name = child.begin()->first;
841 ValidationErrors::ScopedField json_field(
842 errors, absl::StrCat("[\"", child_name, "\"]"));
843 const Json& child_config_json = child.begin()->second;
844 if (child_config_json.type() != Json::Type::kObject) {
845 errors->AddError("child policy config is not an object");
846 } else {
847 Json::Object child_config = child_config_json.object();
848 child_config[field] = Json::FromString(value);
849 array.emplace_back(Json::FromObject(
850 {{child_name, Json::FromObject(std::move(child_config))}}));
851 }
852 }
853 }
854 }
855 if (errors->size() != original_num_errors) return absl::nullopt;
856 return Json::FromArray(std::move(array));
857 }
858
StartUpdate()859 void RlsLb::ChildPolicyWrapper::StartUpdate() {
860 ValidationErrors errors;
861 auto child_policy_config = InsertOrUpdateChildPolicyField(
862 lb_policy_->config_->child_policy_config_target_field_name(), target_,
863 lb_policy_->config_->child_policy_config(), &errors);
864 GPR_ASSERT(child_policy_config.has_value());
865 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
866 gpr_log(
867 GPR_INFO,
868 "[rlslb %p] ChildPolicyWrapper=%p [%s]: validating update, config: %s",
869 lb_policy_.get(), this, target_.c_str(),
870 JsonDump(*child_policy_config).c_str());
871 }
872 auto config =
873 CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
874 *child_policy_config);
875 // Returned RLS target fails the validation.
876 if (!config.ok()) {
877 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
878 gpr_log(GPR_INFO,
879 "[rlslb %p] ChildPolicyWrapper=%p [%s]: config failed to parse: "
880 "%s",
881 lb_policy_.get(), this, target_.c_str(),
882 config.status().ToString().c_str());
883 }
884 pending_config_.reset();
885 picker_ = MakeRefCounted<TransientFailurePicker>(
886 absl::UnavailableError(config.status().message()));
887 child_policy_.reset();
888 } else {
889 pending_config_ = std::move(*config);
890 }
891 }
892
MaybeFinishUpdate()893 absl::Status RlsLb::ChildPolicyWrapper::MaybeFinishUpdate() {
894 // If pending_config_ is not set, that means StartUpdate() failed, so
895 // there's nothing to do here.
896 if (pending_config_ == nullptr) return absl::OkStatus();
897 // If child policy doesn't yet exist, create it.
898 if (child_policy_ == nullptr) {
899 Args create_args;
900 create_args.work_serializer = lb_policy_->work_serializer();
901 create_args.channel_control_helper = std::make_unique<ChildPolicyHelper>(
902 WeakRef(DEBUG_LOCATION, "ChildPolicyHelper"));
903 create_args.args = lb_policy_->channel_args_;
904 child_policy_ = MakeOrphanable<ChildPolicyHandler>(std::move(create_args),
905 &grpc_lb_rls_trace);
906 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
907 gpr_log(GPR_INFO,
908 "[rlslb %p] ChildPolicyWrapper=%p [%s], created new child policy "
909 "handler %p",
910 lb_policy_.get(), this, target_.c_str(), child_policy_.get());
911 }
912 grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(),
913 lb_policy_->interested_parties());
914 }
915 // Send the child the updated config.
916 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
917 gpr_log(GPR_INFO,
918 "[rlslb %p] ChildPolicyWrapper=%p [%s], updating child policy "
919 "handler %p",
920 lb_policy_.get(), this, target_.c_str(), child_policy_.get());
921 }
922 UpdateArgs update_args;
923 update_args.config = std::move(pending_config_);
924 update_args.addresses = lb_policy_->addresses_;
925 update_args.args = lb_policy_->channel_args_;
926 return child_policy_->UpdateLocked(std::move(update_args));
927 }
928
929 //
930 // RlsLb::ChildPolicyWrapper::ChildPolicyHelper
931 //
932
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)933 void RlsLb::ChildPolicyWrapper::ChildPolicyHelper::UpdateState(
934 grpc_connectivity_state state, const absl::Status& status,
935 RefCountedPtr<SubchannelPicker> picker) {
936 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
937 gpr_log(GPR_INFO,
938 "[rlslb %p] ChildPolicyWrapper=%p [%s] ChildPolicyHelper=%p: "
939 "UpdateState(state=%s, status=%s, picker=%p)",
940 wrapper_->lb_policy_.get(), wrapper_.get(),
941 wrapper_->target_.c_str(), this, ConnectivityStateName(state),
942 status.ToString().c_str(), picker.get());
943 }
944 {
945 MutexLock lock(&wrapper_->lb_policy_->mu_);
946 if (wrapper_->is_shutdown_) return;
947 // TODO(roth): It looks like this ignores subsequent TF updates that
948 // might change the status used to fail picks, which seems wrong.
949 if (wrapper_->connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE &&
950 state != GRPC_CHANNEL_READY) {
951 return;
952 }
953 wrapper_->connectivity_state_ = state;
954 GPR_DEBUG_ASSERT(picker != nullptr);
955 if (picker != nullptr) {
956 wrapper_->picker_ = std::move(picker);
957 }
958 }
959 wrapper_->lb_policy_->UpdatePickerLocked();
960 }
961
962 //
963 // RlsLb::Picker
964 //
965
966 // Builds the key to be used for a request based on path and initial_metadata.
BuildKeyMap(const RlsLbConfig::KeyBuilderMap & key_builder_map,absl::string_view path,absl::string_view host,const LoadBalancingPolicy::MetadataInterface * initial_metadata)967 std::map<std::string, std::string> BuildKeyMap(
968 const RlsLbConfig::KeyBuilderMap& key_builder_map, absl::string_view path,
969 absl::string_view host,
970 const LoadBalancingPolicy::MetadataInterface* initial_metadata) {
971 size_t last_slash_pos = path.npos; // May need this a few times, so cache it.
972 // Find key builder for this path.
973 auto it = key_builder_map.find(std::string(path));
974 if (it == key_builder_map.end()) {
975 // Didn't find exact match, try method wildcard.
976 last_slash_pos = path.rfind('/');
977 GPR_DEBUG_ASSERT(last_slash_pos != path.npos);
978 if (GPR_UNLIKELY(last_slash_pos == path.npos)) return {};
979 std::string service(path.substr(0, last_slash_pos + 1));
980 it = key_builder_map.find(service);
981 if (it == key_builder_map.end()) return {};
982 }
983 const RlsLbConfig::KeyBuilder* key_builder = &it->second;
984 // Construct key map using key builder.
985 std::map<std::string, std::string> key_map;
986 // Add header keys.
987 for (const auto& p : key_builder->header_keys) {
988 const std::string& key = p.first;
989 const std::vector<std::string>& header_names = p.second;
990 for (const std::string& header_name : header_names) {
991 std::string buffer;
992 absl::optional<absl::string_view> value =
993 initial_metadata->Lookup(header_name, &buffer);
994 if (value.has_value()) {
995 key_map[key] = std::string(*value);
996 break;
997 }
998 }
999 }
1000 // Add constant keys.
1001 key_map.insert(key_builder->constant_keys.begin(),
1002 key_builder->constant_keys.end());
1003 // Add host key.
1004 if (!key_builder->host_key.empty()) {
1005 key_map[key_builder->host_key] = std::string(host);
1006 }
1007 // Add service key.
1008 if (!key_builder->service_key.empty()) {
1009 if (last_slash_pos == path.npos) {
1010 last_slash_pos = path.rfind('/');
1011 GPR_DEBUG_ASSERT(last_slash_pos != path.npos);
1012 if (GPR_UNLIKELY(last_slash_pos == path.npos)) return {};
1013 }
1014 key_map[key_builder->service_key] =
1015 std::string(path.substr(1, last_slash_pos - 1));
1016 }
1017 // Add method key.
1018 if (!key_builder->method_key.empty()) {
1019 if (last_slash_pos == path.npos) {
1020 last_slash_pos = path.rfind('/');
1021 GPR_DEBUG_ASSERT(last_slash_pos != path.npos);
1022 if (GPR_UNLIKELY(last_slash_pos == path.npos)) return {};
1023 }
1024 key_map[key_builder->method_key] =
1025 std::string(path.substr(last_slash_pos + 1));
1026 }
1027 return key_map;
1028 }
1029
Picker(RefCountedPtr<RlsLb> lb_policy)1030 RlsLb::Picker::Picker(RefCountedPtr<RlsLb> lb_policy)
1031 : lb_policy_(std::move(lb_policy)), config_(lb_policy_->config_) {
1032 if (lb_policy_->default_child_policy_ != nullptr) {
1033 default_child_policy_ =
1034 lb_policy_->default_child_policy_->Ref(DEBUG_LOCATION, "Picker");
1035 }
1036 }
1037
Pick(PickArgs args)1038 LoadBalancingPolicy::PickResult RlsLb::Picker::Pick(PickArgs args) {
1039 // Construct key for request.
1040 RequestKey key = {
1041 BuildKeyMap(config_->key_builder_map(), args.path,
1042 lb_policy_->channel_control_helper()->GetAuthority(),
1043 args.initial_metadata)};
1044 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1045 gpr_log(GPR_INFO, "[rlslb %p] picker=%p: request keys: %s",
1046 lb_policy_.get(), this, key.ToString().c_str());
1047 }
1048 Timestamp now = Timestamp::Now();
1049 MutexLock lock(&lb_policy_->mu_);
1050 if (lb_policy_->is_shutdown_) {
1051 return PickResult::Fail(
1052 absl::UnavailableError("LB policy already shut down"));
1053 }
1054 // Check if there's a cache entry.
1055 Cache::Entry* entry = lb_policy_->cache_.Find(key);
1056 // If there is no cache entry, or if the cache entry is not in backoff
1057 // and has a stale time in the past, and there is not already a
1058 // pending RLS request for this key, then try to start a new RLS request.
1059 if ((entry == nullptr ||
1060 (entry->stale_time() < now && entry->backoff_time() < now)) &&
1061 lb_policy_->request_map_.find(key) == lb_policy_->request_map_.end()) {
1062 // Check if requests are being throttled.
1063 if (lb_policy_->rls_channel_->ShouldThrottle()) {
1064 // Request is throttled.
1065 // If there is no non-expired data in the cache, then we use the
1066 // default target if set, or else we fail the pick.
1067 if (entry == nullptr || entry->data_expiration_time() < now) {
1068 return PickFromDefaultTargetOrFail(
1069 "RLS call throttled", args,
1070 absl::UnavailableError("RLS request throttled"));
1071 }
1072 }
1073 // Start the RLS call.
1074 lb_policy_->rls_channel_->StartRlsCall(
1075 key, (entry == nullptr || entry->data_expiration_time() < now) ? nullptr
1076 : entry);
1077 }
1078 // If the cache entry exists, see if it has usable data.
1079 if (entry != nullptr) {
1080 // If the entry has non-expired data, use it.
1081 if (entry->data_expiration_time() >= now) {
1082 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1083 gpr_log(GPR_INFO, "[rlslb %p] picker=%p: using cache entry %p",
1084 lb_policy_.get(), this, entry);
1085 }
1086 return entry->Pick(args);
1087 }
1088 // If the entry is in backoff, then use the default target if set,
1089 // or else fail the pick.
1090 if (entry->backoff_time() >= now) {
1091 return PickFromDefaultTargetOrFail(
1092 "RLS call in backoff", args,
1093 absl::UnavailableError(absl::StrCat("RLS request failed: ",
1094 entry->status().ToString())));
1095 }
1096 }
1097 // RLS call pending. Queue the pick.
1098 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1099 gpr_log(GPR_INFO, "[rlslb %p] picker=%p: RLS request pending; queuing pick",
1100 lb_policy_.get(), this);
1101 }
1102 return PickResult::Queue();
1103 }
1104
PickFromDefaultTargetOrFail(const char * reason,PickArgs args,absl::Status status)1105 LoadBalancingPolicy::PickResult RlsLb::Picker::PickFromDefaultTargetOrFail(
1106 const char* reason, PickArgs args, absl::Status status) {
1107 if (default_child_policy_ != nullptr) {
1108 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1109 gpr_log(GPR_INFO, "[rlslb %p] picker=%p: %s; using default target",
1110 lb_policy_.get(), this, reason);
1111 }
1112 auto pick_result = default_child_policy_->Pick(args);
1113 lb_policy_->MaybeExportPickCount(kMetricDefaultTargetPicks,
1114 config_->default_target(), pick_result);
1115 return pick_result;
1116 }
1117 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1118 gpr_log(GPR_INFO, "[rlslb %p] picker=%p: %s; failing pick",
1119 lb_policy_.get(), this, reason);
1120 }
1121 auto& stats_plugins =
1122 lb_policy_->channel_control_helper()->GetStatsPluginGroup();
1123 stats_plugins.AddCounter(kMetricFailedPicks, 1,
1124 {lb_policy_->channel_control_helper()->GetTarget(),
1125 config_->lookup_service()},
1126 {});
1127 return PickResult::Fail(std::move(status));
1128 }
1129
1130 //
1131 // RlsLb::Cache::Entry::BackoffTimer
1132 //
1133
BackoffTimer(RefCountedPtr<Entry> entry,Timestamp backoff_time)1134 RlsLb::Cache::Entry::BackoffTimer::BackoffTimer(RefCountedPtr<Entry> entry,
1135 Timestamp backoff_time)
1136 : entry_(std::move(entry)) {
1137 backoff_timer_task_handle_ =
1138 entry_->lb_policy_->channel_control_helper()->GetEventEngine()->RunAfter(
1139 backoff_time - Timestamp::Now(),
1140 [self = Ref(DEBUG_LOCATION, "BackoffTimer")]() mutable {
1141 ApplicationCallbackExecCtx callback_exec_ctx;
1142 ExecCtx exec_ctx;
1143 auto self_ptr = self.get();
1144 self_ptr->entry_->lb_policy_->work_serializer()->Run(
1145 [self = std::move(self)]() { self->OnBackoffTimerLocked(); },
1146 DEBUG_LOCATION);
1147 });
1148 }
1149
Orphan()1150 void RlsLb::Cache::Entry::BackoffTimer::Orphan() {
1151 if (backoff_timer_task_handle_.has_value() &&
1152 entry_->lb_policy_->channel_control_helper()->GetEventEngine()->Cancel(
1153 *backoff_timer_task_handle_)) {
1154 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1155 gpr_log(GPR_INFO, "[rlslb %p] cache entry=%p %s, backoff timer canceled",
1156 entry_->lb_policy_.get(), entry_.get(),
1157 entry_->is_shutdown_ ? "(shut down)"
1158 : entry_->lru_iterator_->ToString().c_str());
1159 }
1160 }
1161 backoff_timer_task_handle_.reset();
1162 Unref(DEBUG_LOCATION, "Orphan");
1163 }
1164
OnBackoffTimerLocked()1165 void RlsLb::Cache::Entry::BackoffTimer::OnBackoffTimerLocked() {
1166 {
1167 MutexLock lock(&entry_->lb_policy_->mu_);
1168 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1169 gpr_log(GPR_INFO, "[rlslb %p] cache entry=%p %s, backoff timer fired",
1170 entry_->lb_policy_.get(), entry_.get(),
1171 entry_->is_shutdown_ ? "(shut down)"
1172 : entry_->lru_iterator_->ToString().c_str());
1173 }
1174 // Skip the update if Orphaned
1175 if (!backoff_timer_task_handle_.has_value()) return;
1176 backoff_timer_task_handle_.reset();
1177 }
1178 // The pick was in backoff state and there could be a pick queued if
1179 // wait_for_ready is true. We'll update the picker for that case.
1180 entry_->lb_policy_->UpdatePickerLocked();
1181 }
1182
1183 //
1184 // RlsLb::Cache::Entry
1185 //
1186
MakeCacheEntryBackoff()1187 std::unique_ptr<BackOff> MakeCacheEntryBackoff() {
1188 return std::make_unique<BackOff>(
1189 BackOff::Options()
1190 .set_initial_backoff(kCacheBackoffInitial)
1191 .set_multiplier(kCacheBackoffMultiplier)
1192 .set_jitter(kCacheBackoffJitter)
1193 .set_max_backoff(kCacheBackoffMax));
1194 }
1195
Entry(RefCountedPtr<RlsLb> lb_policy,const RequestKey & key)1196 RlsLb::Cache::Entry::Entry(RefCountedPtr<RlsLb> lb_policy,
1197 const RequestKey& key)
1198 : InternallyRefCounted<Entry>(
1199 GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace) ? "CacheEntry" : nullptr),
1200 lb_policy_(std::move(lb_policy)),
1201 backoff_state_(MakeCacheEntryBackoff()),
1202 min_expiration_time_(Timestamp::Now() + kMinExpirationTime),
1203 lru_iterator_(lb_policy_->cache_.lru_list_.insert(
1204 lb_policy_->cache_.lru_list_.end(), key)) {}
1205
Orphan()1206 void RlsLb::Cache::Entry::Orphan() {
1207 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1208 gpr_log(GPR_INFO, "[rlslb %p] cache entry=%p %s: cache entry evicted",
1209 lb_policy_.get(), this, lru_iterator_->ToString().c_str());
1210 }
1211 is_shutdown_ = true;
1212 lb_policy_->cache_.lru_list_.erase(lru_iterator_);
1213 lru_iterator_ = lb_policy_->cache_.lru_list_.end(); // Just in case.
1214 backoff_state_.reset();
1215 if (backoff_timer_ != nullptr) {
1216 backoff_timer_.reset();
1217 lb_policy_->UpdatePickerAsync();
1218 }
1219 child_policy_wrappers_.clear();
1220 Unref(DEBUG_LOCATION, "Orphan");
1221 }
1222
Size() const1223 size_t RlsLb::Cache::Entry::Size() const {
1224 // lru_iterator_ is not valid once we're shut down.
1225 GPR_ASSERT(!is_shutdown_);
1226 return lb_policy_->cache_.EntrySizeForKey(*lru_iterator_);
1227 }
1228
Pick(PickArgs args)1229 LoadBalancingPolicy::PickResult RlsLb::Cache::Entry::Pick(PickArgs args) {
1230 size_t i = 0;
1231 ChildPolicyWrapper* child_policy_wrapper = nullptr;
1232 // Skip targets before the last one that are in state TRANSIENT_FAILURE.
1233 for (; i < child_policy_wrappers_.size(); ++i) {
1234 child_policy_wrapper = child_policy_wrappers_[i].get();
1235 if (child_policy_wrapper->connectivity_state() ==
1236 GRPC_CHANNEL_TRANSIENT_FAILURE &&
1237 i < child_policy_wrappers_.size() - 1) {
1238 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1239 gpr_log(GPR_INFO,
1240 "[rlslb %p] cache entry=%p %s: target %s (%" PRIuPTR
1241 " of %" PRIuPTR ") in state TRANSIENT_FAILURE; skipping",
1242 lb_policy_.get(), this, lru_iterator_->ToString().c_str(),
1243 child_policy_wrapper->target().c_str(), i,
1244 child_policy_wrappers_.size());
1245 }
1246 continue;
1247 }
1248 break;
1249 }
1250 // Child policy not in TRANSIENT_FAILURE or is the last target in
1251 // the list, so delegate.
1252 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1253 gpr_log(GPR_INFO,
1254 "[rlslb %p] cache entry=%p %s: target %s (%" PRIuPTR " of %" PRIuPTR
1255 ") in state %s; delegating",
1256 lb_policy_.get(), this, lru_iterator_->ToString().c_str(),
1257 child_policy_wrapper->target().c_str(), i,
1258 child_policy_wrappers_.size(),
1259 ConnectivityStateName(child_policy_wrapper->connectivity_state()));
1260 }
1261 // Add header data.
1262 // Note that even if the target we're using is in TRANSIENT_FAILURE,
1263 // the pick might still succeed (e.g., if the child is ring_hash), so
1264 // we need to pass the right header info down in all cases.
1265 if (!header_data_.empty()) {
1266 char* copied_header_data =
1267 static_cast<char*>(args.call_state->Alloc(header_data_.length() + 1));
1268 strcpy(copied_header_data, header_data_.c_str());
1269 args.initial_metadata->Add(kRlsHeaderKey, copied_header_data);
1270 }
1271 auto pick_result = child_policy_wrapper->Pick(args);
1272 lb_policy_->MaybeExportPickCount(kMetricTargetPicks,
1273 child_policy_wrapper->target(), pick_result);
1274 return pick_result;
1275 }
1276
ResetBackoff()1277 void RlsLb::Cache::Entry::ResetBackoff() {
1278 backoff_time_ = Timestamp::InfPast();
1279 backoff_timer_.reset();
1280 }
1281
ShouldRemove() const1282 bool RlsLb::Cache::Entry::ShouldRemove() const {
1283 Timestamp now = Timestamp::Now();
1284 return data_expiration_time_ < now && backoff_expiration_time_ < now;
1285 }
1286
CanEvict() const1287 bool RlsLb::Cache::Entry::CanEvict() const {
1288 Timestamp now = Timestamp::Now();
1289 return min_expiration_time_ < now;
1290 }
1291
MarkUsed()1292 void RlsLb::Cache::Entry::MarkUsed() {
1293 auto& lru_list = lb_policy_->cache_.lru_list_;
1294 auto new_it = lru_list.insert(lru_list.end(), *lru_iterator_);
1295 lru_list.erase(lru_iterator_);
1296 lru_iterator_ = new_it;
1297 }
1298
1299 std::vector<RlsLb::ChildPolicyWrapper*>
OnRlsResponseLocked(ResponseInfo response,std::unique_ptr<BackOff> backoff_state)1300 RlsLb::Cache::Entry::OnRlsResponseLocked(
1301 ResponseInfo response, std::unique_ptr<BackOff> backoff_state) {
1302 // Move the entry to the end of the LRU list.
1303 MarkUsed();
1304 // If the request failed, store the failed status and update the
1305 // backoff state.
1306 if (!response.status.ok()) {
1307 status_ = response.status;
1308 if (backoff_state != nullptr) {
1309 backoff_state_ = std::move(backoff_state);
1310 } else {
1311 backoff_state_ = MakeCacheEntryBackoff();
1312 }
1313 backoff_time_ = backoff_state_->NextAttemptTime();
1314 Timestamp now = Timestamp::Now();
1315 backoff_expiration_time_ = now + (backoff_time_ - now) * 2;
1316 backoff_timer_ = MakeOrphanable<BackoffTimer>(
1317 Ref(DEBUG_LOCATION, "BackoffTimer"), backoff_time_);
1318 lb_policy_->UpdatePickerAsync();
1319 return {};
1320 }
1321 // Request succeeded, so store the result.
1322 header_data_ = std::move(response.header_data);
1323 Timestamp now = Timestamp::Now();
1324 data_expiration_time_ = now + lb_policy_->config_->max_age();
1325 stale_time_ = now + lb_policy_->config_->stale_age();
1326 status_ = absl::OkStatus();
1327 backoff_state_.reset();
1328 backoff_time_ = Timestamp::InfPast();
1329 backoff_expiration_time_ = Timestamp::InfPast();
1330 // Check if we need to update this list of targets.
1331 bool targets_changed = [&]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
1332 if (child_policy_wrappers_.size() != response.targets.size()) return true;
1333 for (size_t i = 0; i < response.targets.size(); ++i) {
1334 if (child_policy_wrappers_[i]->target() != response.targets[i]) {
1335 return true;
1336 }
1337 }
1338 return false;
1339 }();
1340 if (!targets_changed) {
1341 // Targets didn't change, so we're not updating the list of child
1342 // policies. Return a new picker so that any queued requests can be
1343 // re-processed.
1344 lb_policy_->UpdatePickerAsync();
1345 return {};
1346 }
1347 // Target list changed, so update it.
1348 std::set<absl::string_view> old_targets;
1349 for (RefCountedPtr<ChildPolicyWrapper>& child_policy_wrapper :
1350 child_policy_wrappers_) {
1351 old_targets.emplace(child_policy_wrapper->target());
1352 }
1353 bool update_picker = false;
1354 std::vector<ChildPolicyWrapper*> child_policies_to_finish_update;
1355 std::vector<RefCountedPtr<ChildPolicyWrapper>> new_child_policy_wrappers;
1356 new_child_policy_wrappers.reserve(response.targets.size());
1357 for (std::string& target : response.targets) {
1358 auto it = lb_policy_->child_policy_map_.find(target);
1359 if (it == lb_policy_->child_policy_map_.end()) {
1360 auto new_child = MakeRefCounted<ChildPolicyWrapper>(
1361 lb_policy_.Ref(DEBUG_LOCATION, "ChildPolicyWrapper"), target);
1362 new_child->StartUpdate();
1363 child_policies_to_finish_update.push_back(new_child.get());
1364 new_child_policy_wrappers.emplace_back(std::move(new_child));
1365 } else {
1366 new_child_policy_wrappers.emplace_back(
1367 it->second->Ref(DEBUG_LOCATION, "CacheEntry"));
1368 // If the target already existed but was not previously used for
1369 // this key, then we'll need to update the picker, since we
1370 // didn't actually create a new child policy, which would have
1371 // triggered an RLS picker update when it returned its first picker.
1372 if (old_targets.find(target) == old_targets.end()) {
1373 update_picker = true;
1374 }
1375 }
1376 }
1377 child_policy_wrappers_ = std::move(new_child_policy_wrappers);
1378 if (update_picker) {
1379 lb_policy_->UpdatePickerAsync();
1380 }
1381 return child_policies_to_finish_update;
1382 }
1383
1384 //
1385 // RlsLb::Cache
1386 //
1387
Cache(RlsLb * lb_policy)1388 RlsLb::Cache::Cache(RlsLb* lb_policy) : lb_policy_(lb_policy) {
1389 StartCleanupTimer();
1390 }
1391
Find(const RequestKey & key)1392 RlsLb::Cache::Entry* RlsLb::Cache::Find(const RequestKey& key) {
1393 auto it = map_.find(key);
1394 if (it == map_.end()) return nullptr;
1395 it->second->MarkUsed();
1396 return it->second.get();
1397 }
1398
FindOrInsert(const RequestKey & key)1399 RlsLb::Cache::Entry* RlsLb::Cache::FindOrInsert(const RequestKey& key) {
1400 auto it = map_.find(key);
1401 // If not found, create new entry.
1402 if (it == map_.end()) {
1403 size_t entry_size = EntrySizeForKey(key);
1404 MaybeShrinkSize(size_limit_ - std::min(size_limit_, entry_size));
1405 Entry* entry = new Entry(
1406 lb_policy_->RefAsSubclass<RlsLb>(DEBUG_LOCATION, "CacheEntry"), key);
1407 map_.emplace(key, OrphanablePtr<Entry>(entry));
1408 size_ += entry_size;
1409 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1410 gpr_log(GPR_INFO, "[rlslb %p] key=%s: cache entry added, entry=%p",
1411 lb_policy_, key.ToString().c_str(), entry);
1412 }
1413 return entry;
1414 }
1415 // Entry found, so use it.
1416 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1417 gpr_log(GPR_INFO, "[rlslb %p] key=%s: found cache entry %p", lb_policy_,
1418 key.ToString().c_str(), it->second.get());
1419 }
1420 it->second->MarkUsed();
1421 return it->second.get();
1422 }
1423
Resize(size_t bytes)1424 void RlsLb::Cache::Resize(size_t bytes) {
1425 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1426 gpr_log(GPR_INFO, "[rlslb %p] resizing cache to %" PRIuPTR " bytes",
1427 lb_policy_, bytes);
1428 }
1429 size_limit_ = bytes;
1430 MaybeShrinkSize(size_limit_);
1431 }
1432
ResetAllBackoff()1433 void RlsLb::Cache::ResetAllBackoff() {
1434 for (auto& p : map_) {
1435 p.second->ResetBackoff();
1436 }
1437 lb_policy_->UpdatePickerAsync();
1438 }
1439
Shutdown()1440 void RlsLb::Cache::Shutdown() {
1441 map_.clear();
1442 lru_list_.clear();
1443 if (cleanup_timer_handle_.has_value() &&
1444 lb_policy_->channel_control_helper()->GetEventEngine()->Cancel(
1445 *cleanup_timer_handle_)) {
1446 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1447 gpr_log(GPR_INFO, "[rlslb %p] cache cleanup timer canceled", lb_policy_);
1448 }
1449 }
1450 cleanup_timer_handle_.reset();
1451 }
1452
ReportMetricsLocked(CallbackMetricReporter & reporter)1453 void RlsLb::Cache::ReportMetricsLocked(CallbackMetricReporter& reporter) {
1454 reporter.Report(
1455 kMetricCacheSize, size_,
1456 {lb_policy_->channel_control_helper()->GetTarget(),
1457 lb_policy_->config_->lookup_service(), lb_policy_->instance_uuid_},
1458 {});
1459 reporter.Report(
1460 kMetricCacheEntries, map_.size(),
1461 {lb_policy_->channel_control_helper()->GetTarget(),
1462 lb_policy_->config_->lookup_service(), lb_policy_->instance_uuid_},
1463 {});
1464 }
1465
StartCleanupTimer()1466 void RlsLb::Cache::StartCleanupTimer() {
1467 cleanup_timer_handle_ =
1468 lb_policy_->channel_control_helper()->GetEventEngine()->RunAfter(
1469 kCacheCleanupTimerInterval,
1470 [this, lb_policy = lb_policy_->Ref(DEBUG_LOCATION,
1471 "CacheCleanupTimer")]() mutable {
1472 ApplicationCallbackExecCtx callback_exec_ctx;
1473 ExecCtx exec_ctx;
1474 lb_policy_->work_serializer()->Run(
1475 [this, lb_policy = std::move(lb_policy)]() {
1476 // The lb_policy ref is held until the callback completes
1477 OnCleanupTimer();
1478 },
1479 DEBUG_LOCATION);
1480 });
1481 }
1482
OnCleanupTimer()1483 void RlsLb::Cache::OnCleanupTimer() {
1484 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1485 gpr_log(GPR_INFO, "[rlslb %p] cache cleanup timer fired", lb_policy_);
1486 }
1487 MutexLock lock(&lb_policy_->mu_);
1488 if (!cleanup_timer_handle_.has_value()) return;
1489 if (lb_policy_->is_shutdown_) return;
1490 for (auto it = map_.begin(); it != map_.end();) {
1491 if (GPR_UNLIKELY(it->second->ShouldRemove() && it->second->CanEvict())) {
1492 size_ -= it->second->Size();
1493 it = map_.erase(it);
1494 } else {
1495 ++it;
1496 }
1497 }
1498 StartCleanupTimer();
1499 }
1500
EntrySizeForKey(const RequestKey & key)1501 size_t RlsLb::Cache::EntrySizeForKey(const RequestKey& key) {
1502 // Key is stored twice, once in LRU list and again in the cache map.
1503 return (key.Size() * 2) + sizeof(Entry);
1504 }
1505
MaybeShrinkSize(size_t bytes)1506 void RlsLb::Cache::MaybeShrinkSize(size_t bytes) {
1507 while (size_ > bytes) {
1508 auto lru_it = lru_list_.begin();
1509 if (GPR_UNLIKELY(lru_it == lru_list_.end())) break;
1510 auto map_it = map_.find(*lru_it);
1511 GPR_ASSERT(map_it != map_.end());
1512 if (!map_it->second->CanEvict()) break;
1513 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1514 gpr_log(GPR_INFO, "[rlslb %p] LRU eviction: removing entry %p %s",
1515 lb_policy_, map_it->second.get(), lru_it->ToString().c_str());
1516 }
1517 size_ -= map_it->second->Size();
1518 map_.erase(map_it);
1519 }
1520 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1521 gpr_log(GPR_INFO,
1522 "[rlslb %p] LRU pass complete: desired size=%" PRIuPTR
1523 " size=%" PRIuPTR,
1524 lb_policy_, bytes, size_);
1525 }
1526 }
1527
1528 //
1529 // RlsLb::RlsChannel::StateWatcher
1530 //
1531
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status & status)1532 void RlsLb::RlsChannel::StateWatcher::OnConnectivityStateChange(
1533 grpc_connectivity_state new_state, const absl::Status& status) {
1534 auto* lb_policy = rls_channel_->lb_policy_.get();
1535 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1536 gpr_log(GPR_INFO,
1537 "[rlslb %p] RlsChannel=%p StateWatcher=%p: "
1538 "state changed to %s (%s)",
1539 lb_policy, rls_channel_.get(), this,
1540 ConnectivityStateName(new_state), status.ToString().c_str());
1541 }
1542 if (rls_channel_->is_shutdown_) return;
1543 MutexLock lock(&lb_policy->mu_);
1544 if (new_state == GRPC_CHANNEL_READY && was_transient_failure_) {
1545 was_transient_failure_ = false;
1546 // Reset the backoff of all cache entries, so that we don't
1547 // double-penalize if an RLS request fails while the channel is
1548 // down, since the throttling for the channel being down is handled
1549 // at the channel level instead of in the individual cache entries.
1550 lb_policy->cache_.ResetAllBackoff();
1551 } else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
1552 was_transient_failure_ = true;
1553 }
1554 }
1555
1556 //
1557 // RlsLb::RlsChannel::Throttle
1558 //
1559
ShouldThrottle()1560 bool RlsLb::RlsChannel::Throttle::ShouldThrottle() {
1561 Timestamp now = Timestamp::Now();
1562 while (!requests_.empty() && now - requests_.front() > window_size_) {
1563 requests_.pop_front();
1564 }
1565 while (!failures_.empty() && now - failures_.front() > window_size_) {
1566 failures_.pop_front();
1567 }
1568 // Compute probability of throttling.
1569 float num_requests = requests_.size();
1570 float num_successes = num_requests - failures_.size();
1571 // Note: it's possible that this ratio will be negative, in which case
1572 // no throttling will be done.
1573 float throttle_probability =
1574 (num_requests - (num_successes * ratio_for_successes_)) /
1575 (num_requests + padding_);
1576 // Generate a random number for the request.
1577 std::uniform_real_distribution<float> dist(0, 1.0);
1578 // Check if we should throttle the request.
1579 bool throttle = dist(rng_) < throttle_probability;
1580 // If we're throttling, record the request and the failure.
1581 if (throttle) {
1582 requests_.push_back(now);
1583 failures_.push_back(now);
1584 }
1585 return throttle;
1586 }
1587
RegisterResponse(bool success)1588 void RlsLb::RlsChannel::Throttle::RegisterResponse(bool success) {
1589 Timestamp now = Timestamp::Now();
1590 requests_.push_back(now);
1591 if (!success) failures_.push_back(now);
1592 }
1593
1594 //
1595 // RlsLb::RlsChannel
1596 //
1597
RlsChannel(RefCountedPtr<RlsLb> lb_policy)1598 RlsLb::RlsChannel::RlsChannel(RefCountedPtr<RlsLb> lb_policy)
1599 : InternallyRefCounted<RlsChannel>(
1600 GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace) ? "RlsChannel" : nullptr),
1601 lb_policy_(std::move(lb_policy)) {
1602 // Get channel creds from parent channel.
1603 // Note that we are using the "unsafe" channel creds here, which do
1604 // include any associated call creds. This is safe in this case,
1605 // because we are using the parent channel's authority on the RLS channel.
1606 auto creds =
1607 lb_policy_->channel_control_helper()->GetUnsafeChannelCredentials();
1608 // Use the parent channel's authority.
1609 auto authority = lb_policy_->channel_control_helper()->GetAuthority();
1610 ChannelArgs args = ChannelArgs()
1611 .Set(GRPC_ARG_DEFAULT_AUTHORITY, authority)
1612 .Set(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL, 1);
1613 // Propagate fake security connector expected targets, if any.
1614 // (This is ugly, but it seems better than propagating all channel args
1615 // from the parent channel by default and then having a giant
1616 // exclude list of args to strip out, like we do in grpclb.)
1617 absl::optional<absl::string_view> fake_security_expected_targets =
1618 lb_policy_->channel_args_.GetString(
1619 GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS);
1620 if (fake_security_expected_targets.has_value()) {
1621 args = args.Set(GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS,
1622 *fake_security_expected_targets);
1623 }
1624 // Add service config args if needed.
1625 const std::string& service_config =
1626 lb_policy_->config_->rls_channel_service_config();
1627 if (!service_config.empty()) {
1628 args = args.Set(GRPC_ARG_SERVICE_CONFIG, service_config)
1629 .Set(GRPC_ARG_SERVICE_CONFIG_DISABLE_RESOLUTION, 1);
1630 }
1631 channel_.reset(Channel::FromC(
1632 grpc_channel_create(lb_policy_->config_->lookup_service().c_str(),
1633 creds.get(), args.ToC().get())));
1634 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1635 gpr_log(GPR_INFO, "[rlslb %p] RlsChannel=%p: created channel %p for %s",
1636 lb_policy_.get(), this, channel_.get(),
1637 lb_policy_->config_->lookup_service().c_str());
1638 }
1639 if (channel_ != nullptr) {
1640 // Set up channelz linkage.
1641 channelz::ChannelNode* child_channelz_node = channel_->channelz_node();
1642 auto parent_channelz_node =
1643 lb_policy_->channel_args_.GetObjectRef<channelz::ChannelNode>();
1644 if (child_channelz_node != nullptr && parent_channelz_node != nullptr) {
1645 parent_channelz_node->AddChildChannel(child_channelz_node->uuid());
1646 parent_channelz_node_ = std::move(parent_channelz_node);
1647 }
1648 // Start connectivity watch.
1649 watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "StateWatcher"));
1650 channel_->AddConnectivityWatcher(
1651 GRPC_CHANNEL_IDLE,
1652 OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
1653 }
1654 }
1655
Orphan()1656 void RlsLb::RlsChannel::Orphan() {
1657 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1658 gpr_log(GPR_INFO, "[rlslb %p] RlsChannel=%p, channel=%p: shutdown",
1659 lb_policy_.get(), this, channel_.get());
1660 }
1661 is_shutdown_ = true;
1662 if (channel_ != nullptr) {
1663 // Remove channelz linkage.
1664 if (parent_channelz_node_ != nullptr) {
1665 channelz::ChannelNode* child_channelz_node = channel_->channelz_node();
1666 GPR_ASSERT(child_channelz_node != nullptr);
1667 parent_channelz_node_->RemoveChildChannel(child_channelz_node->uuid());
1668 }
1669 // Stop connectivity watch.
1670 if (watcher_ != nullptr) {
1671 channel_->RemoveConnectivityWatcher(watcher_);
1672 watcher_ = nullptr;
1673 }
1674 channel_.reset();
1675 }
1676 Unref(DEBUG_LOCATION, "Orphan");
1677 }
1678
StartRlsCall(const RequestKey & key,Cache::Entry * stale_entry)1679 void RlsLb::RlsChannel::StartRlsCall(const RequestKey& key,
1680 Cache::Entry* stale_entry) {
1681 std::unique_ptr<BackOff> backoff_state;
1682 grpc_lookup_v1_RouteLookupRequest_Reason reason =
1683 grpc_lookup_v1_RouteLookupRequest_REASON_MISS;
1684 std::string stale_header_data;
1685 if (stale_entry != nullptr) {
1686 backoff_state = stale_entry->TakeBackoffState();
1687 reason = grpc_lookup_v1_RouteLookupRequest_REASON_STALE;
1688 stale_header_data = stale_entry->header_data();
1689 }
1690 lb_policy_->request_map_.emplace(
1691 key, MakeOrphanable<RlsRequest>(
1692 lb_policy_.Ref(DEBUG_LOCATION, "RlsRequest"), key,
1693 lb_policy_->rls_channel_->Ref(DEBUG_LOCATION, "RlsRequest"),
1694 std::move(backoff_state), reason, std::move(stale_header_data)));
1695 }
1696
ReportResponseLocked(bool response_succeeded)1697 void RlsLb::RlsChannel::ReportResponseLocked(bool response_succeeded) {
1698 throttle_.RegisterResponse(response_succeeded);
1699 }
1700
ResetBackoff()1701 void RlsLb::RlsChannel::ResetBackoff() {
1702 GPR_DEBUG_ASSERT(channel_ != nullptr);
1703 channel_->ResetConnectionBackoff();
1704 }
1705
1706 //
1707 // RlsLb::RlsRequest
1708 //
1709
RlsRequest(RefCountedPtr<RlsLb> lb_policy,RequestKey key,RefCountedPtr<RlsChannel> rls_channel,std::unique_ptr<BackOff> backoff_state,grpc_lookup_v1_RouteLookupRequest_Reason reason,std::string stale_header_data)1710 RlsLb::RlsRequest::RlsRequest(RefCountedPtr<RlsLb> lb_policy, RequestKey key,
1711 RefCountedPtr<RlsChannel> rls_channel,
1712 std::unique_ptr<BackOff> backoff_state,
1713 grpc_lookup_v1_RouteLookupRequest_Reason reason,
1714 std::string stale_header_data)
1715 : InternallyRefCounted<RlsRequest>(
1716 GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace) ? "RlsRequest" : nullptr),
1717 lb_policy_(std::move(lb_policy)),
1718 key_(std::move(key)),
1719 rls_channel_(std::move(rls_channel)),
1720 backoff_state_(std::move(backoff_state)),
1721 reason_(reason),
1722 stale_header_data_(std::move(stale_header_data)) {
1723 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1724 gpr_log(GPR_INFO,
1725 "[rlslb %p] rls_request=%p: RLS request created for key %s",
1726 lb_policy_.get(), this, key_.ToString().c_str());
1727 }
1728 GRPC_CLOSURE_INIT(&call_complete_cb_, OnRlsCallComplete, this, nullptr);
1729 ExecCtx::Run(
1730 DEBUG_LOCATION,
1731 GRPC_CLOSURE_INIT(&call_start_cb_, StartCall,
1732 Ref(DEBUG_LOCATION, "StartCall").release(), nullptr),
1733 absl::OkStatus());
1734 }
1735
~RlsRequest()1736 RlsLb::RlsRequest::~RlsRequest() { GPR_ASSERT(call_ == nullptr); }
1737
Orphan()1738 void RlsLb::RlsRequest::Orphan() {
1739 if (call_ != nullptr) {
1740 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1741 gpr_log(GPR_INFO, "[rlslb %p] rls_request=%p %s: cancelling RLS call",
1742 lb_policy_.get(), this, key_.ToString().c_str());
1743 }
1744 grpc_call_cancel_internal(call_);
1745 }
1746 Unref(DEBUG_LOCATION, "Orphan");
1747 }
1748
StartCall(void * arg,grpc_error_handle)1749 void RlsLb::RlsRequest::StartCall(void* arg, grpc_error_handle /*error*/) {
1750 auto* request = static_cast<RlsRequest*>(arg);
1751 request->lb_policy_->work_serializer()->Run(
1752 [request]() {
1753 request->StartCallLocked();
1754 request->Unref(DEBUG_LOCATION, "StartCall");
1755 },
1756 DEBUG_LOCATION);
1757 }
1758
StartCallLocked()1759 void RlsLb::RlsRequest::StartCallLocked() {
1760 {
1761 MutexLock lock(&lb_policy_->mu_);
1762 if (lb_policy_->is_shutdown_) return;
1763 }
1764 Timestamp now = Timestamp::Now();
1765 deadline_ = now + lb_policy_->config_->lookup_service_timeout();
1766 grpc_metadata_array_init(&recv_initial_metadata_);
1767 grpc_metadata_array_init(&recv_trailing_metadata_);
1768 call_ = rls_channel_->channel()->CreateCall(
1769 /*parent_call=*/nullptr, GRPC_PROPAGATE_DEFAULTS, /*cq=*/nullptr,
1770 lb_policy_->interested_parties(),
1771 Slice::FromStaticString(kRlsRequestPath), /*authority=*/absl::nullopt,
1772 deadline_, /*registered_method=*/true);
1773 grpc_op ops[6];
1774 memset(ops, 0, sizeof(ops));
1775 grpc_op* op = ops;
1776 op->op = GRPC_OP_SEND_INITIAL_METADATA;
1777 ++op;
1778 op->op = GRPC_OP_SEND_MESSAGE;
1779 send_message_ = MakeRequestProto();
1780 op->data.send_message.send_message = send_message_;
1781 ++op;
1782 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
1783 ++op;
1784 op->op = GRPC_OP_RECV_INITIAL_METADATA;
1785 op->data.recv_initial_metadata.recv_initial_metadata =
1786 &recv_initial_metadata_;
1787 ++op;
1788 op->op = GRPC_OP_RECV_MESSAGE;
1789 op->data.recv_message.recv_message = &recv_message_;
1790 ++op;
1791 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
1792 op->data.recv_status_on_client.trailing_metadata = &recv_trailing_metadata_;
1793 op->data.recv_status_on_client.status = &status_recv_;
1794 op->data.recv_status_on_client.status_details = &status_details_recv_;
1795 ++op;
1796 Ref(DEBUG_LOCATION, "OnRlsCallComplete").release();
1797 auto call_error = grpc_call_start_batch_and_execute(
1798 call_, ops, static_cast<size_t>(op - ops), &call_complete_cb_);
1799 GPR_ASSERT(call_error == GRPC_CALL_OK);
1800 }
1801
OnRlsCallComplete(void * arg,grpc_error_handle error)1802 void RlsLb::RlsRequest::OnRlsCallComplete(void* arg, grpc_error_handle error) {
1803 auto* request = static_cast<RlsRequest*>(arg);
1804 request->lb_policy_->work_serializer()->Run(
1805 [request, error]() {
1806 request->OnRlsCallCompleteLocked(error);
1807 request->Unref(DEBUG_LOCATION, "OnRlsCallComplete");
1808 },
1809 DEBUG_LOCATION);
1810 }
1811
OnRlsCallCompleteLocked(grpc_error_handle error)1812 void RlsLb::RlsRequest::OnRlsCallCompleteLocked(grpc_error_handle error) {
1813 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1814 std::string status_message(StringViewFromSlice(status_details_recv_));
1815 gpr_log(GPR_INFO,
1816 "[rlslb %p] rls_request=%p %s, error=%s, status={%d, %s} RLS call "
1817 "response received",
1818 lb_policy_.get(), this, key_.ToString().c_str(),
1819 StatusToString(error).c_str(), status_recv_,
1820 status_message.c_str());
1821 }
1822 // Parse response.
1823 ResponseInfo response;
1824 if (!error.ok()) {
1825 grpc_status_code code;
1826 std::string message;
1827 grpc_error_get_status(error, deadline_, &code, &message,
1828 /*http_error=*/nullptr, /*error_string=*/nullptr);
1829 response.status =
1830 absl::Status(static_cast<absl::StatusCode>(code), message);
1831 } else if (status_recv_ != GRPC_STATUS_OK) {
1832 response.status = absl::Status(static_cast<absl::StatusCode>(status_recv_),
1833 StringViewFromSlice(status_details_recv_));
1834 } else {
1835 response = ParseResponseProto();
1836 }
1837 // Clean up call state.
1838 grpc_byte_buffer_destroy(send_message_);
1839 grpc_byte_buffer_destroy(recv_message_);
1840 grpc_metadata_array_destroy(&recv_initial_metadata_);
1841 grpc_metadata_array_destroy(&recv_trailing_metadata_);
1842 CSliceUnref(status_details_recv_);
1843 grpc_call_unref(call_);
1844 call_ = nullptr;
1845 // Return result to cache.
1846 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1847 gpr_log(GPR_INFO, "[rlslb %p] rls_request=%p %s: response info: %s",
1848 lb_policy_.get(), this, key_.ToString().c_str(),
1849 response.ToString().c_str());
1850 }
1851 std::vector<ChildPolicyWrapper*> child_policies_to_finish_update;
1852 {
1853 MutexLock lock(&lb_policy_->mu_);
1854 if (lb_policy_->is_shutdown_) return;
1855 rls_channel_->ReportResponseLocked(response.status.ok());
1856 Cache::Entry* cache_entry = lb_policy_->cache_.FindOrInsert(key_);
1857 child_policies_to_finish_update = cache_entry->OnRlsResponseLocked(
1858 std::move(response), std::move(backoff_state_));
1859 lb_policy_->request_map_.erase(key_);
1860 }
1861 // Now that we've released the lock, finish the update on any newly
1862 // created child policies.
1863 for (ChildPolicyWrapper* child : child_policies_to_finish_update) {
1864 // TODO(roth): If the child reports an error with the update, we
1865 // need to propagate that back to the resolver somehow.
1866 (void)child->MaybeFinishUpdate();
1867 }
1868 }
1869
MakeRequestProto()1870 grpc_byte_buffer* RlsLb::RlsRequest::MakeRequestProto() {
1871 upb::Arena arena;
1872 grpc_lookup_v1_RouteLookupRequest* req =
1873 grpc_lookup_v1_RouteLookupRequest_new(arena.ptr());
1874 grpc_lookup_v1_RouteLookupRequest_set_target_type(
1875 req, upb_StringView_FromDataAndSize(kGrpc, sizeof(kGrpc) - 1));
1876 for (const auto& kv : key_.key_map) {
1877 grpc_lookup_v1_RouteLookupRequest_key_map_set(
1878 req, upb_StringView_FromDataAndSize(kv.first.data(), kv.first.size()),
1879 upb_StringView_FromDataAndSize(kv.second.data(), kv.second.size()),
1880 arena.ptr());
1881 }
1882 grpc_lookup_v1_RouteLookupRequest_set_reason(req, reason_);
1883 if (!stale_header_data_.empty()) {
1884 grpc_lookup_v1_RouteLookupRequest_set_stale_header_data(
1885 req, upb_StringView_FromDataAndSize(stale_header_data_.data(),
1886 stale_header_data_.size()));
1887 }
1888 size_t len;
1889 char* buf =
1890 grpc_lookup_v1_RouteLookupRequest_serialize(req, arena.ptr(), &len);
1891 grpc_slice send_slice = grpc_slice_from_copied_buffer(buf, len);
1892 grpc_byte_buffer* byte_buffer = grpc_raw_byte_buffer_create(&send_slice, 1);
1893 CSliceUnref(send_slice);
1894 return byte_buffer;
1895 }
1896
ParseResponseProto()1897 RlsLb::ResponseInfo RlsLb::RlsRequest::ParseResponseProto() {
1898 ResponseInfo response_info;
1899 upb::Arena arena;
1900 grpc_byte_buffer_reader bbr;
1901 grpc_byte_buffer_reader_init(&bbr, recv_message_);
1902 grpc_slice recv_slice = grpc_byte_buffer_reader_readall(&bbr);
1903 grpc_byte_buffer_reader_destroy(&bbr);
1904 grpc_lookup_v1_RouteLookupResponse* response =
1905 grpc_lookup_v1_RouteLookupResponse_parse(
1906 reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(recv_slice)),
1907 GRPC_SLICE_LENGTH(recv_slice), arena.ptr());
1908 CSliceUnref(recv_slice);
1909 if (response == nullptr) {
1910 response_info.status = absl::InternalError("cannot parse RLS response");
1911 return response_info;
1912 }
1913 size_t num_targets;
1914 const upb_StringView* targets_strview =
1915 grpc_lookup_v1_RouteLookupResponse_targets(response, &num_targets);
1916 if (num_targets == 0) {
1917 response_info.status =
1918 absl::InvalidArgumentError("RLS response has no target entry");
1919 return response_info;
1920 }
1921 response_info.targets.reserve(num_targets);
1922 for (size_t i = 0; i < num_targets; ++i) {
1923 response_info.targets.emplace_back(targets_strview[i].data,
1924 targets_strview[i].size);
1925 }
1926 upb_StringView header_data_strview =
1927 grpc_lookup_v1_RouteLookupResponse_header_data(response);
1928 response_info.header_data =
1929 std::string(header_data_strview.data, header_data_strview.size);
1930 return response_info;
1931 }
1932
1933 //
1934 // RlsLb
1935 //
1936
GenerateUUID()1937 std::string GenerateUUID() {
1938 absl::uniform_int_distribution<uint64_t> distribution;
1939 absl::BitGen bitgen;
1940 uint64_t hi = distribution(bitgen);
1941 uint64_t lo = distribution(bitgen);
1942 return GenerateUUIDv4(hi, lo);
1943 }
1944
RlsLb(Args args)1945 RlsLb::RlsLb(Args args)
1946 : LoadBalancingPolicy(std::move(args)),
1947 instance_uuid_(channel_args()
1948 .GetOwnedString(GRPC_ARG_TEST_ONLY_RLS_INSTANCE_ID)
1949 .value_or(GenerateUUID())),
1950 cache_(this),
1951 registered_metric_callback_(
1952 channel_control_helper()->GetStatsPluginGroup().RegisterCallback(
1953 [this](CallbackMetricReporter& reporter) {
1954 MutexLock lock(&mu_);
1955 cache_.ReportMetricsLocked(reporter);
1956 },
__anon52bb9b7c0902(CallbackMetricReporter& reporter) 1957 {kMetricCacheSize, kMetricCacheEntries})) {
1958 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1959 gpr_log(GPR_INFO, "[rlslb %p] policy created", this);
1960 }
1961 }
1962
EndpointsEqual(const absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> endpoints1,const absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> endpoints2)1963 bool EndpointsEqual(
1964 const absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> endpoints1,
1965 const absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>>
1966 endpoints2) {
1967 if (endpoints1.status() != endpoints2.status()) return false;
1968 if (endpoints1.ok()) {
1969 std::vector<EndpointAddresses> e1_list;
1970 (*endpoints1)->ForEach([&](const EndpointAddresses& endpoint) {
1971 e1_list.push_back(endpoint);
1972 });
1973 size_t i = 0;
1974 bool different = false;
1975 (*endpoints2)->ForEach([&](const EndpointAddresses& endpoint) {
1976 if (endpoint != e1_list[i++]) different = true;
1977 });
1978 if (different) return false;
1979 if (i != e1_list.size()) return false;
1980 }
1981 return true;
1982 }
1983
UpdateLocked(UpdateArgs args)1984 absl::Status RlsLb::UpdateLocked(UpdateArgs args) {
1985 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
1986 gpr_log(GPR_INFO, "[rlslb %p] policy updated", this);
1987 }
1988 update_in_progress_ = true;
1989 // Swap out config.
1990 RefCountedPtr<RlsLbConfig> old_config = std::move(config_);
1991 config_ = args.config.TakeAsSubclass<RlsLbConfig>();
1992 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace) &&
1993 (old_config == nullptr ||
1994 old_config->child_policy_config() != config_->child_policy_config())) {
1995 gpr_log(GPR_INFO, "[rlslb %p] updated child policy config: %s", this,
1996 JsonDump(config_->child_policy_config()).c_str());
1997 }
1998 // Swap out addresses.
1999 // If the new address list is an error and we have an existing address list,
2000 // stick with the existing addresses.
2001 absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> old_addresses;
2002 if (args.addresses.ok()) {
2003 old_addresses = std::move(addresses_);
2004 addresses_ = std::move(args.addresses);
2005 } else {
2006 old_addresses = addresses_;
2007 }
2008 // Swap out channel args.
2009 channel_args_ = std::move(args.args);
2010 // Determine whether we need to update all child policies.
2011 bool update_child_policies =
2012 old_config == nullptr ||
2013 old_config->child_policy_config() != config_->child_policy_config() ||
2014 !EndpointsEqual(old_addresses, addresses_) || args.args != channel_args_;
2015 // If default target changes, swap out child policy.
2016 bool created_default_child = false;
2017 if (old_config == nullptr ||
2018 config_->default_target() != old_config->default_target()) {
2019 if (config_->default_target().empty()) {
2020 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
2021 gpr_log(GPR_INFO, "[rlslb %p] unsetting default target", this);
2022 }
2023 default_child_policy_.reset();
2024 } else {
2025 auto it = child_policy_map_.find(config_->default_target());
2026 if (it == child_policy_map_.end()) {
2027 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
2028 gpr_log(GPR_INFO, "[rlslb %p] creating new default target", this);
2029 }
2030 default_child_policy_ = MakeRefCounted<ChildPolicyWrapper>(
2031 RefAsSubclass<RlsLb>(DEBUG_LOCATION, "ChildPolicyWrapper"),
2032 config_->default_target());
2033 created_default_child = true;
2034 } else {
2035 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
2036 gpr_log(GPR_INFO,
2037 "[rlslb %p] using existing child for default target", this);
2038 }
2039 default_child_policy_ =
2040 it->second->Ref(DEBUG_LOCATION, "DefaultChildPolicy");
2041 }
2042 }
2043 }
2044 // Now grab the lock to swap out the state it guards.
2045 {
2046 MutexLock lock(&mu_);
2047 // Swap out RLS channel if needed.
2048 if (old_config == nullptr ||
2049 config_->lookup_service() != old_config->lookup_service()) {
2050 rls_channel_ = MakeOrphanable<RlsChannel>(
2051 RefAsSubclass<RlsLb>(DEBUG_LOCATION, "RlsChannel"));
2052 }
2053 // Resize cache if needed.
2054 if (old_config == nullptr ||
2055 config_->cache_size_bytes() != old_config->cache_size_bytes()) {
2056 cache_.Resize(static_cast<size_t>(config_->cache_size_bytes()));
2057 }
2058 // Start update of child policies if needed.
2059 if (update_child_policies) {
2060 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
2061 gpr_log(GPR_INFO, "[rlslb %p] starting child policy updates", this);
2062 }
2063 for (auto& p : child_policy_map_) {
2064 p.second->StartUpdate();
2065 }
2066 } else if (created_default_child) {
2067 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
2068 gpr_log(GPR_INFO, "[rlslb %p] starting default child policy update",
2069 this);
2070 }
2071 default_child_policy_->StartUpdate();
2072 }
2073 }
2074 // Now that we've released the lock, finish update of child policies.
2075 std::vector<std::string> errors;
2076 if (update_child_policies) {
2077 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
2078 gpr_log(GPR_INFO, "[rlslb %p] finishing child policy updates", this);
2079 }
2080 for (auto& p : child_policy_map_) {
2081 absl::Status status = p.second->MaybeFinishUpdate();
2082 if (!status.ok()) {
2083 errors.emplace_back(
2084 absl::StrCat("target ", p.first, ": ", status.ToString()));
2085 }
2086 }
2087 } else if (created_default_child) {
2088 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
2089 gpr_log(GPR_INFO, "[rlslb %p] finishing default child policy update",
2090 this);
2091 }
2092 absl::Status status = default_child_policy_->MaybeFinishUpdate();
2093 if (!status.ok()) {
2094 errors.emplace_back(absl::StrCat("target ", config_->default_target(),
2095 ": ", status.ToString()));
2096 }
2097 }
2098 update_in_progress_ = false;
2099 // In principle, we need to update the picker here only if the config
2100 // fields used by the picker have changed. However, it seems fragile
2101 // to check individual fields, since the picker logic could change in
2102 // the future to use additional config fields, and we might not
2103 // remember to update the code here. So for now, we just unconditionally
2104 // update the picker here, even though it's probably redundant.
2105 UpdatePickerLocked();
2106 // Return status.
2107 if (!errors.empty()) {
2108 return absl::UnavailableError(absl::StrCat(
2109 "errors from children: [", absl::StrJoin(errors, "; "), "]"));
2110 }
2111 return absl::OkStatus();
2112 }
2113
ExitIdleLocked()2114 void RlsLb::ExitIdleLocked() {
2115 MutexLock lock(&mu_);
2116 for (auto& child_entry : child_policy_map_) {
2117 child_entry.second->ExitIdleLocked();
2118 }
2119 }
2120
ResetBackoffLocked()2121 void RlsLb::ResetBackoffLocked() {
2122 {
2123 MutexLock lock(&mu_);
2124 rls_channel_->ResetBackoff();
2125 cache_.ResetAllBackoff();
2126 }
2127 for (auto& child : child_policy_map_) {
2128 child.second->ResetBackoffLocked();
2129 }
2130 }
2131
ShutdownLocked()2132 void RlsLb::ShutdownLocked() {
2133 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
2134 gpr_log(GPR_INFO, "[rlslb %p] policy shutdown", this);
2135 }
2136 registered_metric_callback_.reset();
2137 MutexLock lock(&mu_);
2138 is_shutdown_ = true;
2139 config_.reset(DEBUG_LOCATION, "ShutdownLocked");
2140 channel_args_ = ChannelArgs();
2141 cache_.Shutdown();
2142 request_map_.clear();
2143 rls_channel_.reset();
2144 default_child_policy_.reset();
2145 }
2146
UpdatePickerAsync()2147 void RlsLb::UpdatePickerAsync() {
2148 // Run via the ExecCtx, since the caller may be holding the lock, and
2149 // we don't want to be doing that when we hop into the WorkSerializer,
2150 // in case the WorkSerializer callback happens to run inline.
2151 ExecCtx::Run(
2152 DEBUG_LOCATION,
2153 GRPC_CLOSURE_CREATE(UpdatePickerCallback,
2154 Ref(DEBUG_LOCATION, "UpdatePickerCallback").release(),
2155 grpc_schedule_on_exec_ctx),
2156 absl::OkStatus());
2157 }
2158
UpdatePickerCallback(void * arg,grpc_error_handle)2159 void RlsLb::UpdatePickerCallback(void* arg, grpc_error_handle /*error*/) {
2160 auto* rls_lb = static_cast<RlsLb*>(arg);
2161 rls_lb->work_serializer()->Run(
2162 [rls_lb]() {
2163 RefCountedPtr<RlsLb> lb_policy(rls_lb);
2164 lb_policy->UpdatePickerLocked();
2165 lb_policy.reset(DEBUG_LOCATION, "UpdatePickerCallback");
2166 },
2167 DEBUG_LOCATION);
2168 }
2169
UpdatePickerLocked()2170 void RlsLb::UpdatePickerLocked() {
2171 // If we're in the process of propagating an update from our parent to
2172 // our children, ignore any updates that come from the children. We
2173 // will instead return a new picker once the update has been seen by
2174 // all children. This avoids unnecessary picker churn while an update
2175 // is being propagated to our children.
2176 if (update_in_progress_) return;
2177 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
2178 gpr_log(GPR_INFO, "[rlslb %p] updating picker", this);
2179 }
2180 grpc_connectivity_state state = GRPC_CHANNEL_IDLE;
2181 if (!child_policy_map_.empty()) {
2182 state = GRPC_CHANNEL_TRANSIENT_FAILURE;
2183 int num_idle = 0;
2184 int num_connecting = 0;
2185 {
2186 MutexLock lock(&mu_);
2187 if (is_shutdown_) return;
2188 for (auto& p : child_policy_map_) {
2189 grpc_connectivity_state child_state = p.second->connectivity_state();
2190 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
2191 gpr_log(GPR_INFO, "[rlslb %p] target %s in state %s", this,
2192 p.second->target().c_str(),
2193 ConnectivityStateName(child_state));
2194 }
2195 if (child_state == GRPC_CHANNEL_READY) {
2196 state = GRPC_CHANNEL_READY;
2197 break;
2198 } else if (child_state == GRPC_CHANNEL_CONNECTING) {
2199 ++num_connecting;
2200 } else if (child_state == GRPC_CHANNEL_IDLE) {
2201 ++num_idle;
2202 }
2203 }
2204 if (state != GRPC_CHANNEL_READY) {
2205 if (num_connecting > 0) {
2206 state = GRPC_CHANNEL_CONNECTING;
2207 } else if (num_idle > 0) {
2208 state = GRPC_CHANNEL_IDLE;
2209 }
2210 }
2211 }
2212 }
2213 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
2214 gpr_log(GPR_INFO, "[rlslb %p] reporting state %s", this,
2215 ConnectivityStateName(state));
2216 }
2217 absl::Status status;
2218 if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
2219 status = absl::UnavailableError("no children available");
2220 }
2221 channel_control_helper()->UpdateState(
2222 state, status,
2223 MakeRefCounted<Picker>(RefAsSubclass<RlsLb>(DEBUG_LOCATION, "Picker")));
2224 }
2225
MaybeExportPickCount(GlobalInstrumentsRegistry::GlobalUInt64CounterHandle handle,absl::string_view target,const PickResult & pick_result)2226 void RlsLb::MaybeExportPickCount(
2227 GlobalInstrumentsRegistry::GlobalUInt64CounterHandle handle,
2228 absl::string_view target, const PickResult& pick_result) {
2229 absl::string_view pick_result_string = Match(
2230 pick_result.result,
2231 [](const LoadBalancingPolicy::PickResult::Complete&) {
2232 return "complete";
2233 },
2234 [](const LoadBalancingPolicy::PickResult::Queue&) { return ""; },
2235 [](const LoadBalancingPolicy::PickResult::Fail&) { return "fail"; },
2236 [](const LoadBalancingPolicy::PickResult::Drop&) { return "drop"; });
2237 if (pick_result_string.empty()) return; // Don't report queued picks.
2238 auto& stats_plugins = channel_control_helper()->GetStatsPluginGroup();
2239 stats_plugins.AddCounter(
2240 handle, 1,
2241 {channel_control_helper()->GetTarget(), config_->lookup_service(), target,
2242 pick_result_string},
2243 {});
2244 }
2245
2246 //
2247 // RlsLbFactory
2248 //
2249
2250 struct GrpcKeyBuilder {
2251 struct Name {
2252 std::string service;
2253 std::string method;
2254
JsonLoadergrpc_core::__anon52bb9b7c0111::GrpcKeyBuilder::Name2255 static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
2256 static const auto* loader = JsonObjectLoader<Name>()
2257 .Field("service", &Name::service)
2258 .OptionalField("method", &Name::method)
2259 .Finish();
2260 return loader;
2261 }
2262 };
2263
2264 struct NameMatcher {
2265 std::string key;
2266 std::vector<std::string> names;
2267 absl::optional<bool> required_match;
2268
JsonLoadergrpc_core::__anon52bb9b7c0111::GrpcKeyBuilder::NameMatcher2269 static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
2270 static const auto* loader =
2271 JsonObjectLoader<NameMatcher>()
2272 .Field("key", &NameMatcher::key)
2273 .Field("names", &NameMatcher::names)
2274 .OptionalField("requiredMatch", &NameMatcher::required_match)
2275 .Finish();
2276 return loader;
2277 }
2278
JsonPostLoadgrpc_core::__anon52bb9b7c0111::GrpcKeyBuilder::NameMatcher2279 void JsonPostLoad(const Json&, const JsonArgs&, ValidationErrors* errors) {
2280 // key must be non-empty.
2281 {
2282 ValidationErrors::ScopedField field(errors, ".key");
2283 if (!errors->FieldHasErrors() && key.empty()) {
2284 errors->AddError("must be non-empty");
2285 }
2286 }
2287 // List of header names must be non-empty.
2288 {
2289 ValidationErrors::ScopedField field(errors, ".names");
2290 if (!errors->FieldHasErrors() && names.empty()) {
2291 errors->AddError("must be non-empty");
2292 }
2293 // Individual header names must be non-empty.
2294 for (size_t i = 0; i < names.size(); ++i) {
2295 ValidationErrors::ScopedField field(errors,
2296 absl::StrCat("[", i, "]"));
2297 if (!errors->FieldHasErrors() && names[i].empty()) {
2298 errors->AddError("must be non-empty");
2299 }
2300 }
2301 }
2302 // requiredMatch must not be present.
2303 {
2304 ValidationErrors::ScopedField field(errors, ".requiredMatch");
2305 if (required_match.has_value()) {
2306 errors->AddError("must not be present");
2307 }
2308 }
2309 }
2310 };
2311
2312 struct ExtraKeys {
2313 absl::optional<std::string> host_key;
2314 absl::optional<std::string> service_key;
2315 absl::optional<std::string> method_key;
2316
JsonLoadergrpc_core::__anon52bb9b7c0111::GrpcKeyBuilder::ExtraKeys2317 static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
2318 static const auto* loader =
2319 JsonObjectLoader<ExtraKeys>()
2320 .OptionalField("host", &ExtraKeys::host_key)
2321 .OptionalField("service", &ExtraKeys::service_key)
2322 .OptionalField("method", &ExtraKeys::method_key)
2323 .Finish();
2324 return loader;
2325 }
2326
JsonPostLoadgrpc_core::__anon52bb9b7c0111::GrpcKeyBuilder::ExtraKeys2327 void JsonPostLoad(const Json&, const JsonArgs&, ValidationErrors* errors) {
2328 auto check_field = [&](const std::string& field_name,
2329 absl::optional<std::string>* struct_field) {
2330 ValidationErrors::ScopedField field(errors,
2331 absl::StrCat(".", field_name));
2332 if (struct_field->has_value() && (*struct_field)->empty()) {
2333 errors->AddError("must be non-empty if set");
2334 }
2335 };
2336 check_field("host", &host_key);
2337 check_field("service", &service_key);
2338 check_field("method", &method_key);
2339 }
2340 };
2341
2342 std::vector<Name> names;
2343 std::vector<NameMatcher> headers;
2344 ExtraKeys extra_keys;
2345 std::map<std::string /*key*/, std::string /*value*/> constant_keys;
2346
JsonLoadergrpc_core::__anon52bb9b7c0111::GrpcKeyBuilder2347 static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
2348 static const auto* loader =
2349 JsonObjectLoader<GrpcKeyBuilder>()
2350 .Field("names", &GrpcKeyBuilder::names)
2351 .OptionalField("headers", &GrpcKeyBuilder::headers)
2352 .OptionalField("extraKeys", &GrpcKeyBuilder::extra_keys)
2353 .OptionalField("constantKeys", &GrpcKeyBuilder::constant_keys)
2354 .Finish();
2355 return loader;
2356 }
2357
JsonPostLoadgrpc_core::__anon52bb9b7c0111::GrpcKeyBuilder2358 void JsonPostLoad(const Json&, const JsonArgs&, ValidationErrors* errors) {
2359 // The names field must be non-empty.
2360 {
2361 ValidationErrors::ScopedField field(errors, ".names");
2362 if (!errors->FieldHasErrors() && names.empty()) {
2363 errors->AddError("must be non-empty");
2364 }
2365 }
2366 // Make sure no key in constantKeys is empty.
2367 if (constant_keys.find("") != constant_keys.end()) {
2368 ValidationErrors::ScopedField field(errors, ".constantKeys[\"\"]");
2369 errors->AddError("key must be non-empty");
2370 }
2371 // Check for duplicate keys.
2372 std::set<absl::string_view> keys_seen;
2373 auto duplicate_key_check_func = [&keys_seen, errors](
2374 const std::string& key,
2375 const std::string& field_name) {
2376 if (key.empty()) return; // Already generated an error about this.
2377 ValidationErrors::ScopedField field(errors, field_name);
2378 auto it = keys_seen.find(key);
2379 if (it != keys_seen.end()) {
2380 errors->AddError(absl::StrCat("duplicate key \"", key, "\""));
2381 } else {
2382 keys_seen.insert(key);
2383 }
2384 };
2385 for (size_t i = 0; i < headers.size(); ++i) {
2386 NameMatcher& header = headers[i];
2387 duplicate_key_check_func(header.key,
2388 absl::StrCat(".headers[", i, "].key"));
2389 }
2390 for (const auto& p : constant_keys) {
2391 duplicate_key_check_func(
2392 p.first, absl::StrCat(".constantKeys[\"", p.first, "\"]"));
2393 }
2394 if (extra_keys.host_key.has_value()) {
2395 duplicate_key_check_func(*extra_keys.host_key, ".extraKeys.host");
2396 }
2397 if (extra_keys.service_key.has_value()) {
2398 duplicate_key_check_func(*extra_keys.service_key, ".extraKeys.service");
2399 }
2400 if (extra_keys.method_key.has_value()) {
2401 duplicate_key_check_func(*extra_keys.method_key, ".extraKeys.method");
2402 }
2403 }
2404 };
2405
JsonLoader(const JsonArgs &)2406 const JsonLoaderInterface* RlsLbConfig::RouteLookupConfig::JsonLoader(
2407 const JsonArgs&) {
2408 static const auto* loader =
2409 JsonObjectLoader<RouteLookupConfig>()
2410 // Note: Some fields require manual processing and are handled in
2411 // JsonPostLoad() instead.
2412 .Field("lookupService", &RouteLookupConfig::lookup_service)
2413 .OptionalField("lookupServiceTimeout",
2414 &RouteLookupConfig::lookup_service_timeout)
2415 .OptionalField("maxAge", &RouteLookupConfig::max_age)
2416 .OptionalField("staleAge", &RouteLookupConfig::stale_age)
2417 .Field("cacheSizeBytes", &RouteLookupConfig::cache_size_bytes)
2418 .OptionalField("defaultTarget", &RouteLookupConfig::default_target)
2419 .Finish();
2420 return loader;
2421 }
2422
JsonPostLoad(const Json & json,const JsonArgs & args,ValidationErrors * errors)2423 void RlsLbConfig::RouteLookupConfig::JsonPostLoad(const Json& json,
2424 const JsonArgs& args,
2425 ValidationErrors* errors) {
2426 // Parse grpcKeybuilders.
2427 auto grpc_keybuilders = LoadJsonObjectField<std::vector<GrpcKeyBuilder>>(
2428 json.object(), args, "grpcKeybuilders", errors);
2429 if (grpc_keybuilders.has_value()) {
2430 ValidationErrors::ScopedField field(errors, ".grpcKeybuilders");
2431 for (size_t i = 0; i < grpc_keybuilders->size(); ++i) {
2432 ValidationErrors::ScopedField field(errors, absl::StrCat("[", i, "]"));
2433 auto& grpc_keybuilder = (*grpc_keybuilders)[i];
2434 // Construct KeyBuilder.
2435 RlsLbConfig::KeyBuilder key_builder;
2436 for (const auto& header : grpc_keybuilder.headers) {
2437 key_builder.header_keys.emplace(header.key, header.names);
2438 }
2439 if (grpc_keybuilder.extra_keys.host_key.has_value()) {
2440 key_builder.host_key = std::move(*grpc_keybuilder.extra_keys.host_key);
2441 }
2442 if (grpc_keybuilder.extra_keys.service_key.has_value()) {
2443 key_builder.service_key =
2444 std::move(*grpc_keybuilder.extra_keys.service_key);
2445 }
2446 if (grpc_keybuilder.extra_keys.method_key.has_value()) {
2447 key_builder.method_key =
2448 std::move(*grpc_keybuilder.extra_keys.method_key);
2449 }
2450 key_builder.constant_keys = std::move(grpc_keybuilder.constant_keys);
2451 // Add entries to map.
2452 for (const auto& name : grpc_keybuilder.names) {
2453 std::string path = absl::StrCat("/", name.service, "/", name.method);
2454 bool inserted = key_builder_map.emplace(path, key_builder).second;
2455 if (!inserted) {
2456 errors->AddError(absl::StrCat("duplicate entry for \"", path, "\""));
2457 }
2458 }
2459 }
2460 }
2461 // Validate lookupService.
2462 {
2463 ValidationErrors::ScopedField field(errors, ".lookupService");
2464 if (!errors->FieldHasErrors() &&
2465 !CoreConfiguration::Get().resolver_registry().IsValidTarget(
2466 lookup_service)) {
2467 errors->AddError("must be valid gRPC target URI");
2468 }
2469 }
2470 // Clamp maxAge to the max allowed value.
2471 if (max_age > kMaxMaxAge) max_age = kMaxMaxAge;
2472 // If staleAge is set, then maxAge must also be set.
2473 if (json.object().find("staleAge") != json.object().end() &&
2474 json.object().find("maxAge") == json.object().end()) {
2475 ValidationErrors::ScopedField field(errors, ".maxAge");
2476 errors->AddError("must be set if staleAge is set");
2477 }
2478 // Ignore staleAge if greater than or equal to maxAge.
2479 if (stale_age >= max_age) stale_age = max_age;
2480 // Validate cacheSizeBytes.
2481 {
2482 ValidationErrors::ScopedField field(errors, ".cacheSizeBytes");
2483 if (!errors->FieldHasErrors() && cache_size_bytes <= 0) {
2484 errors->AddError("must be greater than 0");
2485 }
2486 }
2487 // Clamp cacheSizeBytes to the max allowed value.
2488 if (cache_size_bytes > kMaxCacheSizeBytes) {
2489 cache_size_bytes = kMaxCacheSizeBytes;
2490 }
2491 // Validate defaultTarget.
2492 {
2493 ValidationErrors::ScopedField field(errors, ".defaultTarget");
2494 if (!errors->FieldHasErrors() &&
2495 json.object().find("defaultTarget") != json.object().end() &&
2496 default_target.empty()) {
2497 errors->AddError("must be non-empty if set");
2498 }
2499 }
2500 }
2501
JsonLoader(const JsonArgs &)2502 const JsonLoaderInterface* RlsLbConfig::JsonLoader(const JsonArgs&) {
2503 static const auto* loader =
2504 JsonObjectLoader<RlsLbConfig>()
2505 // Note: Some fields require manual processing and are handled in
2506 // JsonPostLoad() instead.
2507 .Field("routeLookupConfig", &RlsLbConfig::route_lookup_config_)
2508 .Field("childPolicyConfigTargetFieldName",
2509 &RlsLbConfig::child_policy_config_target_field_name_)
2510 .Finish();
2511 return loader;
2512 }
2513
JsonPostLoad(const Json & json,const JsonArgs &,ValidationErrors * errors)2514 void RlsLbConfig::JsonPostLoad(const Json& json, const JsonArgs&,
2515 ValidationErrors* errors) {
2516 // Parse routeLookupChannelServiceConfig.
2517 auto it = json.object().find("routeLookupChannelServiceConfig");
2518 if (it != json.object().end()) {
2519 ValidationErrors::ScopedField field(errors,
2520 ".routeLookupChannelServiceConfig");
2521 // Don't need to save the result here, just need the errors (if any).
2522 ServiceConfigImpl::Create(ChannelArgs(), it->second, errors);
2523 }
2524 // Validate childPolicyConfigTargetFieldName.
2525 {
2526 ValidationErrors::ScopedField field(errors,
2527 ".childPolicyConfigTargetFieldName");
2528 if (!errors->FieldHasErrors() &&
2529 child_policy_config_target_field_name_.empty()) {
2530 errors->AddError("must be non-empty");
2531 }
2532 }
2533 // Parse childPolicy.
2534 {
2535 ValidationErrors::ScopedField field(errors, ".childPolicy");
2536 auto it = json.object().find("childPolicy");
2537 if (it == json.object().end()) {
2538 errors->AddError("field not present");
2539 } else {
2540 // Add target to all child policy configs in the list.
2541 std::string target = route_lookup_config_.default_target.empty()
2542 ? kFakeTargetFieldValue
2543 : route_lookup_config_.default_target;
2544 auto child_policy_config = InsertOrUpdateChildPolicyField(
2545 child_policy_config_target_field_name_, target, it->second, errors);
2546 if (child_policy_config.has_value()) {
2547 child_policy_config_ = std::move(*child_policy_config);
2548 // Parse the config.
2549 auto parsed_config =
2550 CoreConfiguration::Get()
2551 .lb_policy_registry()
2552 .ParseLoadBalancingConfig(child_policy_config_);
2553 if (!parsed_config.ok()) {
2554 errors->AddError(parsed_config.status().message());
2555 } else {
2556 // Find the chosen config and return it in JSON form.
2557 // We remove all non-selected configs, and in the selected config,
2558 // we leave the target field in place, set to the default value.
2559 // This slightly optimizes what we need to do later when we update
2560 // a child policy for a given target.
2561 for (const Json& config : child_policy_config_.array()) {
2562 if (config.object().begin()->first == (*parsed_config)->name()) {
2563 child_policy_config_ = Json::FromArray({config});
2564 break;
2565 }
2566 }
2567 // If default target is set, set the default child config.
2568 if (!route_lookup_config_.default_target.empty()) {
2569 default_child_policy_parsed_config_ = std::move(*parsed_config);
2570 }
2571 }
2572 }
2573 }
2574 }
2575 }
2576
2577 class RlsLbFactory final : public LoadBalancingPolicyFactory {
2578 public:
name() const2579 absl::string_view name() const override { return kRls; }
2580
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const2581 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
2582 LoadBalancingPolicy::Args args) const override {
2583 return MakeOrphanable<RlsLb>(std::move(args));
2584 }
2585
2586 absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const2587 ParseLoadBalancingConfig(const Json& json) const override {
2588 return LoadFromJson<RefCountedPtr<RlsLbConfig>>(
2589 json, JsonArgs(), "errors validing RLS LB policy config");
2590 }
2591 };
2592
2593 } // namespace
2594
RegisterRlsLbPolicy(CoreConfiguration::Builder * builder)2595 void RegisterRlsLbPolicy(CoreConfiguration::Builder* builder) {
2596 builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
2597 std::make_unique<RlsLbFactory>());
2598 }
2599
2600 } // namespace grpc_core
2601