1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "src/core/load_balancing/weighted_round_robin/static_stride_scheduler.h"
20 
21 #include <algorithm>
22 #include <cmath>
23 #include <limits>
24 #include <utility>
25 #include <vector>
26 
27 #include "absl/functional/any_invocable.h"
28 
29 #include <grpc/support/log.h>
30 
31 namespace grpc_core {
32 
33 namespace {
34 
35 constexpr uint16_t kMaxWeight = std::numeric_limits<uint16_t>::max();
36 
37 // Assuming the mean of all known weights is M, StaticStrideScheduler will cap
38 // from above all known weights that are bigger than M*kMaxRatio (to
39 // M*kMaxRatio).
40 //
41 // This is done to limit the number of rounds for picks.
42 constexpr double kMaxRatio = 10;
43 
44 // Assuming the mean of all known weights is M, StaticStrideScheduler will cap
45 // from below all known weights to M*kMinRatio.
46 //
47 // This is done as a performance optimization for edge cases when channels with
48 // large weights are non-accepting (and thus WeightedRoundRobin will retry
49 // picking them over and over again), and there are also channels with near-zero
50 // weights that are possibly accepting. In this case, without kMinRatio, it
51 // would potentially require WeightedRoundRobin to perform thousands of picks
52 // until it gets a single channel with near-zero weight. This was a part of what
53 // hapenned in b/276292666.
54 //
55 // The current value of 0.01 was chosen without any experimenting. It should
56 // ensure that WeightedRoundRobin doesn't do much more than an order of 100
57 // picks of non-accepting channels with high weights in such corner cases. But
58 // it also makes WeightedRoundRobin to send slightly more requests to
59 // potentially very bad tasks (that would have near-zero weights) than zero.
60 // This is not necesserily a downside, though. Perhaps this is not a problem at
61 // all and we should increase this value (to 0.05 or 0.1) to save CPU cycles.
62 //
63 // Note that this class treats weights that are exactly equal to zero as unknown
64 // and thus needing to be replaced with M. This behavior itself makes sense
65 // (fresh channels without feedback information will get an average flow of
66 // requests). However, it follows from this that this class will replace weight
67 // = 0 with M, but weight = epsilon with M*kMinRatio, and this step function is
68 // logically faulty. A demonstration of this is that the function that computes
69 // weights in WeightedRoundRobin
70 // (http://google3/production/rpc/stubs/core/loadbalancing/weightedroundrobin.cc;l=324-325;rcl=514986476)
71 // will cap some epsilon values to zero. There should be a clear distinction
72 // between "task is new, weight is unknown" and "task is unhealthy, weight is
73 // very low". A better solution would be to not mix "unknown" and "weight" into
74 // a single value but represent weights as std::optional<float> or, if memory
75 // usage is a concern, use NaN as the indicator of unknown weight.
76 constexpr double kMinRatio = 0.01;
77 
78 }  // namespace
79 
Make(absl::Span<const float> float_weights,absl::AnyInvocable<uint32_t ()> next_sequence_func)80 absl::optional<StaticStrideScheduler> StaticStrideScheduler::Make(
81     absl::Span<const float> float_weights,
82     absl::AnyInvocable<uint32_t()> next_sequence_func) {
83   if (float_weights.empty()) return absl::nullopt;
84   if (float_weights.size() == 1) return absl::nullopt;
85 
86   // TODO(b/190488683): should we normalize negative weights to 0?
87 
88   const size_t n = float_weights.size();
89   size_t num_zero_weight_channels = 0;
90   double sum = 0;
91   float unscaled_max = 0;
92   for (const float weight : float_weights) {
93     sum += weight;
94     unscaled_max = std::max(unscaled_max, weight);
95     if (weight == 0) {
96       ++num_zero_weight_channels;
97     }
98   }
99 
100   if (num_zero_weight_channels == n) return absl::nullopt;
101 
102   // Mean of non-zero weights before scaling to `kMaxWeight`.
103   const double unscaled_mean =
104       sum / static_cast<double>(n - num_zero_weight_channels);
105   const double ratio = unscaled_max / unscaled_mean;
106 
107   // Adjust max value such that ratio does not exceed kMaxRatio. This should
108   // ensure that we on average do at most kMaxRatio rounds for picks.
109   if (ratio > kMaxRatio) {
110     unscaled_max = kMaxRatio * unscaled_mean;
111   }
112 
113   // Scale weights such that the largest is equal to `kMaxWeight`. This should
114   // be accurate enough once we convert to an integer. Quantisation errors won't
115   // be measurable on borg.
116   // TODO(b/190488683): it may be more stable over updates if we try to keep
117   // `scaling_factor` consistent, and only change it when we can't accurately
118   // represent the new weights.
119   const double scaling_factor = kMaxWeight / unscaled_max;
120 
121   // Note that since we cap the weights to stay within kMaxRatio, `mean` might
122   // not match the actual mean of the values that end up in the scheduler.
123   const uint16_t mean = std::lround(scaling_factor * unscaled_mean);
124 
125   // We compute weight_lower_bound and cap it to 1 from below so that in the
126   // worst case we represent tiny weights as 1 but not as 0 (which would cause
127   // an infinite loop as in b/276292666). This capping to 1 is probably only
128   // useful in case someone misconfigures kMinRatio to be very small.
129   //
130   // NOMUTANTS -- We have tests for this expression, but they are not precise
131   // enough to catch errors of plus/minus 1, what mutation testing does.
132   const uint16_t weight_lower_bound =
133       std::max(static_cast<uint16_t>(1),
134                static_cast<uint16_t>(std::lround(mean * kMinRatio)));
135 
136   std::vector<uint16_t> weights;
137   weights.reserve(n);
138   for (size_t i = 0; i < n; ++i) {
139     if (float_weights[i] == 0) {  // Weight is unknown.
140       weights.push_back(mean);
141     } else {
142       const double float_weight_capped_from_above =
143           std::min(float_weights[i], unscaled_max);
144       const uint16_t weight =
145           std::lround(float_weight_capped_from_above * scaling_factor);
146       weights.push_back(std::max(weight, weight_lower_bound));
147     }
148   }
149 
150   GPR_ASSERT(weights.size() == float_weights.size());
151   return StaticStrideScheduler{std::move(weights),
152                                std::move(next_sequence_func)};
153 }
154 
StaticStrideScheduler(std::vector<uint16_t> weights,absl::AnyInvocable<uint32_t ()> next_sequence_func)155 StaticStrideScheduler::StaticStrideScheduler(
156     std::vector<uint16_t> weights,
157     absl::AnyInvocable<uint32_t()> next_sequence_func)
158     : next_sequence_func_(std::move(next_sequence_func)),
159       weights_(std::move(weights)) {
160   GPR_ASSERT(next_sequence_func_ != nullptr);
161 }
162 
Pick() const163 size_t StaticStrideScheduler::Pick() const {
164   while (true) {
165     const uint32_t sequence = next_sequence_func_();
166 
167     // The sequence number is split in two: the lower %n gives the index of the
168     // backend, and the rest gives the number of times we've iterated through
169     // all backends. `generation` is used to deterministically decide whether
170     // we pick or skip the backend on this iteration, in proportion to the
171     // backend's weight.
172     const uint64_t backend_index = sequence % weights_.size();
173     const uint64_t generation = sequence / weights_.size();
174     const uint64_t weight = weights_[backend_index];
175 
176     // We pick a backend `weight` times per `kMaxWeight` generations. The
177     // multiply and modulus ~evenly spread out the picks for a given backend
178     // between different generations. The offset by `backend_index` helps to
179     // reduce the chance of multiple consecutive non-picks: if we have two
180     // consecutive backends with an equal, say, 80% weight of the max, with no
181     // offset we would see 1/5 generations that skipped both.
182     // TODO(b/190488683): add test for offset efficacy.
183     const uint16_t kOffset = kMaxWeight / 2;
184     const uint16_t mod =
185         (weight * generation + backend_index * kOffset) % kMaxWeight;
186 
187     if (mod < kMaxWeight - weight) {
188       // Probability of skipping = 1 - mean(weights) / max(weights).
189       // For a typical large-scale service using RR, max task utilization will
190       // be ~100% when mean utilization is ~80%. So ~20% of picks will be
191       // skipped.
192       continue;
193     }
194     return backend_index;
195   }
196 }
197 
198 }  // namespace grpc_core
199