1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
22 
23 #include <inttypes.h>
24 
25 #include <algorithm>
26 #include <cmath>
27 #include <initializer_list>
28 #include <ostream>
29 #include <string>
30 #include <tuple>
31 #include <vector>
32 
33 #include "absl/strings/str_cat.h"
34 #include "absl/strings/str_format.h"
35 #include "absl/strings/str_join.h"
36 
37 #include <grpc/support/log.h>
38 
39 #include "src/core/ext/transport/chttp2/transport/http2_settings.h"
40 #include "src/core/lib/experiments/experiments.h"
41 #include "src/core/lib/gpr/useful.h"
42 #include "src/core/lib/resource_quota/memory_quota.h"
43 
44 grpc_core::TraceFlag grpc_flowctl_trace(false, "flowctl");
45 
46 namespace grpc_core {
47 namespace chttp2 {
48 
49 TestOnlyTransportTargetWindowEstimatesMocker*
50     g_test_only_transport_target_window_estimates_mocker;
51 
52 namespace {
53 
54 constexpr const int64_t kMaxWindowUpdateSize = (1u << 31) - 1;
55 
56 }  // namespace
57 
UrgencyString(Urgency u)58 const char* FlowControlAction::UrgencyString(Urgency u) {
59   switch (u) {
60     case Urgency::NO_ACTION_NEEDED:
61       return "no-action";
62     case Urgency::UPDATE_IMMEDIATELY:
63       return "now";
64     case Urgency::QUEUE_UPDATE:
65       return "queue";
66     default:
67       GPR_UNREACHABLE_CODE(return "unknown");
68   }
69   GPR_UNREACHABLE_CODE(return "unknown");
70 }
71 
operator <<(std::ostream & out,FlowControlAction::Urgency u)72 std::ostream& operator<<(std::ostream& out, FlowControlAction::Urgency u) {
73   return out << FlowControlAction::UrgencyString(u);
74 }
75 
DebugString() const76 std::string FlowControlAction::DebugString() const {
77   std::vector<std::string> segments;
78   if (send_transport_update_ != Urgency::NO_ACTION_NEEDED) {
79     segments.push_back(
80         absl::StrCat("t:", UrgencyString(send_transport_update_)));
81   }
82   if (send_stream_update_ != Urgency::NO_ACTION_NEEDED) {
83     segments.push_back(absl::StrCat("s:", UrgencyString(send_stream_update_)));
84   }
85   if (send_initial_window_update_ != Urgency::NO_ACTION_NEEDED) {
86     segments.push_back(
87         absl::StrCat("iw=", initial_window_size_, ":",
88                      UrgencyString(send_initial_window_update_)));
89   }
90   if (send_max_frame_size_update_ != Urgency::NO_ACTION_NEEDED) {
91     segments.push_back(
92         absl::StrCat("mf=", max_frame_size_, ":",
93                      UrgencyString(send_max_frame_size_update_)));
94   }
95   if (segments.empty()) return "no action";
96   return absl::StrJoin(segments, ",");
97 }
98 
operator <<(std::ostream & out,const FlowControlAction & action)99 std::ostream& operator<<(std::ostream& out, const FlowControlAction& action) {
100   return out << action.DebugString();
101 }
102 
TransportFlowControl(absl::string_view name,bool enable_bdp_probe,MemoryOwner * memory_owner)103 TransportFlowControl::TransportFlowControl(absl::string_view name,
104                                            bool enable_bdp_probe,
105                                            MemoryOwner* memory_owner)
106     : memory_owner_(memory_owner),
107       enable_bdp_probe_(enable_bdp_probe),
108       bdp_estimator_(name),
109       pid_controller_(PidController::Args()
110                           .set_gain_p(4)
111                           .set_gain_i(8)
112                           .set_gain_d(0)
113                           .set_initial_control_value(TargetLogBdp())
114                           .set_min_control_value(-1)
115                           .set_max_control_value(25)
116                           .set_integral_range(10)),
117       last_pid_update_(Timestamp::Now()) {}
118 
MaybeSendUpdate(bool writing_anyway)119 uint32_t TransportFlowControl::MaybeSendUpdate(bool writing_anyway) {
120   const uint32_t target_announced_window =
121       static_cast<uint32_t>(target_window());
122   if ((writing_anyway || announced_window_ <= target_announced_window / 2) &&
123       announced_window_ != target_announced_window) {
124     const uint32_t announce =
125         static_cast<uint32_t>(Clamp(target_announced_window - announced_window_,
126                                     int64_t{0}, kMaxWindowUpdateSize));
127     announced_window_ += announce;
128     return announce;
129   }
130   return 0;
131 }
132 
StreamFlowControl(TransportFlowControl * tfc)133 StreamFlowControl::StreamFlowControl(TransportFlowControl* tfc) : tfc_(tfc) {}
134 
RecvData(int64_t incoming_frame_size)135 absl::Status StreamFlowControl::IncomingUpdateContext::RecvData(
136     int64_t incoming_frame_size) {
137   return tfc_upd_.RecvData(incoming_frame_size, [this, incoming_frame_size]() {
138     int64_t acked_stream_window =
139         sfc_->announced_window_delta_ + sfc_->tfc_->acked_init_window();
140     if (incoming_frame_size > acked_stream_window) {
141       return absl::InternalError(absl::StrFormat(
142           "frame of size %" PRId64 " overflows local window of %" PRId64,
143           incoming_frame_size, acked_stream_window));
144     }
145 
146     tfc_upd_.UpdateAnnouncedWindowDelta(&sfc_->announced_window_delta_,
147                                         -incoming_frame_size);
148     sfc_->min_progress_size_ -=
149         std::min(sfc_->min_progress_size_, incoming_frame_size);
150     return absl::OkStatus();
151   });
152 }
153 
RecvData(int64_t incoming_frame_size,absl::FunctionRef<absl::Status ()> stream)154 absl::Status TransportFlowControl::IncomingUpdateContext::RecvData(
155     int64_t incoming_frame_size, absl::FunctionRef<absl::Status()> stream) {
156   if (incoming_frame_size > tfc_->announced_window_) {
157     return absl::InternalError(absl::StrFormat(
158         "frame of size %" PRId64 " overflows local window of %" PRId64,
159         incoming_frame_size, tfc_->announced_window_));
160   }
161   absl::Status error = stream();
162   if (!error.ok()) return error;
163   tfc_->announced_window_ -= incoming_frame_size;
164   return absl::OkStatus();
165 }
166 
target_window() const167 int64_t TransportFlowControl::target_window() const {
168   // See comment above announced_stream_total_over_incoming_window_ for the
169   // logic behind this decision.
170   return static_cast<uint32_t>(
171       std::min(static_cast<int64_t>((1u << 31) - 1),
172                announced_stream_total_over_incoming_window_ +
173                    target_initial_window_size_));
174 }
175 
UpdateAction(FlowControlAction action)176 FlowControlAction TransportFlowControl::UpdateAction(FlowControlAction action) {
177   const int64_t target = target_window();
178   // round up so that one byte targets are sent.
179   const int64_t send_threshold = (target + 1) / 2;
180   if (announced_window_ < send_threshold) {
181     action.set_send_transport_update(
182         FlowControlAction::Urgency::UPDATE_IMMEDIATELY);
183   }
184   return action;
185 }
186 
187 // Take in a target and modifies it based on the memory pressure of the system
AdjustForMemoryPressure(double memory_pressure,double target)188 static double AdjustForMemoryPressure(double memory_pressure, double target) {
189   // do not increase window under heavy memory pressure.
190   static const double kLowMemPressure = 0.1;
191   static const double kZeroTarget = 22;
192   static const double kHighMemPressure = 0.8;
193   static const double kMaxMemPressure = 0.9;
194   if (memory_pressure < kLowMemPressure && target < kZeroTarget) {
195     target = (target - kZeroTarget) * memory_pressure / kLowMemPressure +
196              kZeroTarget;
197   } else if (memory_pressure > kHighMemPressure) {
198     target *= 1 - std::min(1.0, (memory_pressure - kHighMemPressure) /
199                                     (kMaxMemPressure - kHighMemPressure));
200   }
201   return target;
202 }
203 
TargetLogBdp()204 double TransportFlowControl::TargetLogBdp() {
205   return AdjustForMemoryPressure(
206       memory_owner_->is_valid()
207           ? memory_owner_->GetPressureInfo().pressure_control_value
208           : 0.0,
209       1 + log2(bdp_estimator_.EstimateBdp()));
210 }
211 
SmoothLogBdp(double value)212 double TransportFlowControl::SmoothLogBdp(double value) {
213   Timestamp now = Timestamp::Now();
214   double bdp_error = value - pid_controller_.last_control_value();
215   const double dt = (now - last_pid_update_).seconds();
216   last_pid_update_ = now;
217   // Limit dt to 100ms
218   const double kMaxDt = 0.1;
219   return pid_controller_.Update(bdp_error, dt > kMaxDt ? kMaxDt : dt);
220 }
221 
222 double
TargetInitialWindowSizeBasedOnMemoryPressureAndBdp() const223 TransportFlowControl::TargetInitialWindowSizeBasedOnMemoryPressureAndBdp()
224     const {
225   const double bdp = bdp_estimator_.EstimateBdp() * 2.0;
226   const double memory_pressure =
227       memory_owner_->GetPressureInfo().pressure_control_value;
228   // Linear interpolation between two values.
229   // Given a line segment between the two points (t_min, a), and (t_max, b),
230   // and a value t such that t_min <= t <= t_max, return the value on the line
231   // segment at t.
232   auto lerp = [](double t, double t_min, double t_max, double a, double b) {
233     return a + (b - a) * (t - t_min) / (t_max - t_min);
234   };
235   // We split memory pressure into three broad regions:
236   // 1. Low memory pressure, the "anything goes" case - we assume no memory
237   //    pressure concerns and advertise a huge window to keep things flowing.
238   // 2. Moderate memory pressure, the "adjust to BDP" case - we linearly ramp
239   //    down window size to 2*BDP - which should still allow bytes to flow, but
240   //    is arguably more considered.
241   // 3. High memory pressure - past 50% we linearly ramp down window size from
242   //    BDP to 0 - at which point senders effectively must request to send bytes
243   //    to us.
244   //
245   //          ▲
246   //          │
247   //  4mb ────┤---------x----
248   //          │              -----
249   //  BDP ────┤                   ----x---
250   //          │                           ----
251   //          │                               -----
252   //          │                                    ----
253   //          │                                        -----
254   //          │                                             ---x
255   //          ├─────────┬─────────────┬────────────────────────┬─────►
256   //          │Anything │Adjust to    │Drop to zero            │
257   //          │Goes     │BDP          │                        │
258   //          0%        20%           50%                      100% memory
259   //                                                                pressure
260   const double kAnythingGoesPressure = 0.2;
261   const double kAdjustedToBdpPressure = 0.5;
262   const double kOneMegabyte = 1024.0 * 1024.0;
263   const double kAnythingGoesWindow = std::max(4.0 * kOneMegabyte, bdp);
264   if (memory_pressure < kAnythingGoesPressure) {
265     return kAnythingGoesWindow;
266   } else if (memory_pressure < kAdjustedToBdpPressure) {
267     return lerp(memory_pressure, kAnythingGoesPressure, kAdjustedToBdpPressure,
268                 kAnythingGoesWindow, bdp);
269   } else if (memory_pressure < 1.0) {
270     return lerp(memory_pressure, kAdjustedToBdpPressure, 1.0, bdp, 0);
271   } else {
272     return 0;
273   }
274 }
275 
UpdateSetting(grpc_chttp2_setting_id id,int64_t * desired_value,uint32_t new_desired_value,FlowControlAction * action,FlowControlAction & (FlowControlAction::* set)(FlowControlAction::Urgency,uint32_t))276 void TransportFlowControl::UpdateSetting(
277     grpc_chttp2_setting_id id, int64_t* desired_value,
278     uint32_t new_desired_value, FlowControlAction* action,
279     FlowControlAction& (FlowControlAction::*set)(FlowControlAction::Urgency,
280                                                  uint32_t)) {
281   new_desired_value =
282       Clamp(new_desired_value, grpc_chttp2_settings_parameters[id].min_value,
283             grpc_chttp2_settings_parameters[id].max_value);
284   if (new_desired_value != *desired_value) {
285     if (grpc_flowctl_trace.enabled()) {
286       gpr_log(GPR_INFO, "[flowctl] UPDATE SETTING %s from %" PRId64 " to %d",
287               grpc_chttp2_settings_parameters[id].name, *desired_value,
288               new_desired_value);
289     }
290     // Reaching zero can only happen for initial window size, and if it occurs
291     // we really want to wake up writes and ensure all the queued stream
292     // window updates are flushed, since stream flow control operates
293     // differently at zero window size.
294     FlowControlAction::Urgency urgency =
295         FlowControlAction::Urgency::QUEUE_UPDATE;
296     if (*desired_value == 0 || new_desired_value == 0) {
297       urgency = FlowControlAction::Urgency::UPDATE_IMMEDIATELY;
298     }
299     *desired_value = new_desired_value;
300     (action->*set)(urgency, *desired_value);
301   }
302 }
303 
SetAckedInitialWindow(uint32_t value)304 FlowControlAction TransportFlowControl::SetAckedInitialWindow(uint32_t value) {
305   acked_init_window_ = value;
306   FlowControlAction action;
307   if (acked_init_window_ != target_initial_window_size_) {
308     FlowControlAction::Urgency urgency =
309         FlowControlAction::Urgency::QUEUE_UPDATE;
310     if (acked_init_window_ == 0 || target_initial_window_size_ == 0) {
311       urgency = FlowControlAction::Urgency::UPDATE_IMMEDIATELY;
312     }
313     action.set_send_initial_window_update(urgency, target_initial_window_size_);
314   }
315   return action;
316 }
317 
PeriodicUpdate()318 FlowControlAction TransportFlowControl::PeriodicUpdate() {
319   FlowControlAction action;
320   if (enable_bdp_probe_) {
321     // get bdp estimate and update initial_window accordingly.
322     // target might change based on how much memory pressure we are under
323     // TODO(ncteisen): experiment with setting target to be huge under low
324     // memory pressure.
325     uint32_t target = static_cast<uint32_t>(RoundUpToPowerOf2(
326         Clamp(IsMemoryPressureControllerEnabled()
327                   ? TargetInitialWindowSizeBasedOnMemoryPressureAndBdp()
328                   : pow(2, SmoothLogBdp(TargetLogBdp())),
329               0.0, static_cast<double>(kMaxInitialWindowSize))));
330     if (target < kMinPositiveInitialWindowSize) target = 0;
331     if (g_test_only_transport_target_window_estimates_mocker != nullptr) {
332       // Hook for simulating unusual flow control situations in tests.
333       target = g_test_only_transport_target_window_estimates_mocker
334                    ->ComputeNextTargetInitialWindowSizeFromPeriodicUpdate(
335                        target_initial_window_size_ /* current target */);
336     }
337     // Though initial window 'could' drop to 0, we keep the floor at
338     // kMinInitialWindowSize
339     UpdateSetting(GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
340                   &target_initial_window_size_, target, &action,
341                   &FlowControlAction::set_send_initial_window_update);
342     // we target the max of BDP or bandwidth in microseconds.
343     UpdateSetting(GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, &target_frame_size_,
344                   target, &action,
345                   &FlowControlAction::set_send_max_frame_size_update);
346 
347     if (IsTcpFrameSizeTuningEnabled()) {
348       // Advertise PREFERRED_RECEIVE_CRYPTO_FRAME_SIZE to peer. By advertising
349       // PREFERRED_RECEIVE_CRYPTO_FRAME_SIZE to the peer, we are informing the
350       // peer that we have tcp frame size tuning enabled and we inform it of our
351       // prefered rx frame sizes. The prefered rx frame size is determined as:
352       // Clamp(target_frame_size_ * 2, 16384, 0x7fffffff). In the future, this
353       // maybe updated to a different function of the memory pressure.
354       UpdateSetting(
355           GRPC_CHTTP2_SETTINGS_GRPC_PREFERRED_RECEIVE_CRYPTO_FRAME_SIZE,
356           &target_preferred_rx_crypto_frame_size_,
357           Clamp(static_cast<unsigned int>(target_frame_size_ * 2), 16384u,
358                 0x7ffffffu),
359           &action,
360           &FlowControlAction::set_preferred_rx_crypto_frame_size_update);
361     }
362   }
363   return UpdateAction(action);
364 }
365 
MaybeSendUpdate()366 uint32_t StreamFlowControl::MaybeSendUpdate() {
367   TransportFlowControl::IncomingUpdateContext tfc_upd(tfc_);
368   const int64_t announce = DesiredAnnounceSize();
369   pending_size_ = absl::nullopt;
370   tfc_upd.UpdateAnnouncedWindowDelta(&announced_window_delta_, announce);
371   GPR_ASSERT(DesiredAnnounceSize() == 0);
372   std::ignore = tfc_upd.MakeAction();
373   return static_cast<uint32_t>(announce);
374 }
375 
DesiredAnnounceSize() const376 int64_t StreamFlowControl::DesiredAnnounceSize() const {
377   int64_t desired_window_delta = [this]() {
378     if (min_progress_size_ == 0) {
379       if (pending_size_.has_value() &&
380           announced_window_delta_ < -*pending_size_) {
381         return -*pending_size_;
382       } else {
383         return announced_window_delta_;
384       }
385     } else {
386       return std::min(min_progress_size_, kMaxWindowDelta);
387     }
388   }();
389   return Clamp(desired_window_delta - announced_window_delta_, int64_t{0},
390                kMaxWindowUpdateSize);
391 }
392 
UpdateAction(FlowControlAction action)393 FlowControlAction StreamFlowControl::UpdateAction(FlowControlAction action) {
394   const int64_t desired_announce_size = DesiredAnnounceSize();
395   if (desired_announce_size > 0) {
396     FlowControlAction::Urgency urgency =
397         FlowControlAction::Urgency::QUEUE_UPDATE;
398     // Size at which we probably want to wake up and write regardless of whether
399     // we *have* to.
400     // Currently set at half the initial window size or 8kb (whichever is
401     // greater). 8kb means we don't send rapidly unnecessarily when the initial
402     // window size is small.
403     const int64_t hurry_up_size = std::max(
404         static_cast<int64_t>(tfc_->sent_init_window()) / 2, int64_t{8192});
405     if (desired_announce_size > hurry_up_size) {
406       urgency = FlowControlAction::Urgency::UPDATE_IMMEDIATELY;
407     }
408     // min_progress_size_ > 0 means we have a reader ready to read.
409     if (min_progress_size_ > 0) {
410       // If we're into initial window to receive that data we should wake up and
411       // send an update.
412       if (announced_window_delta_ < 0) {
413         urgency = FlowControlAction::Urgency::UPDATE_IMMEDIATELY;
414       } else if (announced_window_delta_ == 0 &&
415                  tfc_->sent_init_window() == 0) {
416         // Special case when initial window size is zero, meaning that
417         // announced_window_delta cannot become negative (it may already be so
418         // however).
419         urgency = FlowControlAction::Urgency::UPDATE_IMMEDIATELY;
420       }
421     }
422     action.set_send_stream_update(urgency);
423   }
424   return action;
425 }
426 
SetPendingSize(int64_t pending_size)427 void StreamFlowControl::IncomingUpdateContext::SetPendingSize(
428     int64_t pending_size) {
429   GPR_ASSERT(pending_size >= 0);
430   sfc_->pending_size_ = pending_size;
431 }
432 
433 }  // namespace chttp2
434 }  // namespace grpc_core
435