1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
22
23 #include <inttypes.h>
24
25 #include <algorithm>
26 #include <cmath>
27 #include <initializer_list>
28 #include <ostream>
29 #include <string>
30 #include <tuple>
31 #include <vector>
32
33 #include "absl/strings/str_cat.h"
34 #include "absl/strings/str_format.h"
35 #include "absl/strings/str_join.h"
36
37 #include <grpc/support/log.h>
38
39 #include "src/core/ext/transport/chttp2/transport/http2_settings.h"
40 #include "src/core/lib/experiments/experiments.h"
41 #include "src/core/lib/gpr/useful.h"
42 #include "src/core/lib/resource_quota/memory_quota.h"
43
44 grpc_core::TraceFlag grpc_flowctl_trace(false, "flowctl");
45
46 namespace grpc_core {
47 namespace chttp2 {
48
49 TestOnlyTransportTargetWindowEstimatesMocker*
50 g_test_only_transport_target_window_estimates_mocker;
51
52 namespace {
53
54 constexpr const int64_t kMaxWindowUpdateSize = (1u << 31) - 1;
55
56 } // namespace
57
UrgencyString(Urgency u)58 const char* FlowControlAction::UrgencyString(Urgency u) {
59 switch (u) {
60 case Urgency::NO_ACTION_NEEDED:
61 return "no-action";
62 case Urgency::UPDATE_IMMEDIATELY:
63 return "now";
64 case Urgency::QUEUE_UPDATE:
65 return "queue";
66 default:
67 GPR_UNREACHABLE_CODE(return "unknown");
68 }
69 GPR_UNREACHABLE_CODE(return "unknown");
70 }
71
operator <<(std::ostream & out,FlowControlAction::Urgency u)72 std::ostream& operator<<(std::ostream& out, FlowControlAction::Urgency u) {
73 return out << FlowControlAction::UrgencyString(u);
74 }
75
DebugString() const76 std::string FlowControlAction::DebugString() const {
77 std::vector<std::string> segments;
78 if (send_transport_update_ != Urgency::NO_ACTION_NEEDED) {
79 segments.push_back(
80 absl::StrCat("t:", UrgencyString(send_transport_update_)));
81 }
82 if (send_stream_update_ != Urgency::NO_ACTION_NEEDED) {
83 segments.push_back(absl::StrCat("s:", UrgencyString(send_stream_update_)));
84 }
85 if (send_initial_window_update_ != Urgency::NO_ACTION_NEEDED) {
86 segments.push_back(
87 absl::StrCat("iw=", initial_window_size_, ":",
88 UrgencyString(send_initial_window_update_)));
89 }
90 if (send_max_frame_size_update_ != Urgency::NO_ACTION_NEEDED) {
91 segments.push_back(
92 absl::StrCat("mf=", max_frame_size_, ":",
93 UrgencyString(send_max_frame_size_update_)));
94 }
95 if (segments.empty()) return "no action";
96 return absl::StrJoin(segments, ",");
97 }
98
operator <<(std::ostream & out,const FlowControlAction & action)99 std::ostream& operator<<(std::ostream& out, const FlowControlAction& action) {
100 return out << action.DebugString();
101 }
102
TransportFlowControl(absl::string_view name,bool enable_bdp_probe,MemoryOwner * memory_owner)103 TransportFlowControl::TransportFlowControl(absl::string_view name,
104 bool enable_bdp_probe,
105 MemoryOwner* memory_owner)
106 : memory_owner_(memory_owner),
107 enable_bdp_probe_(enable_bdp_probe),
108 bdp_estimator_(name),
109 pid_controller_(PidController::Args()
110 .set_gain_p(4)
111 .set_gain_i(8)
112 .set_gain_d(0)
113 .set_initial_control_value(TargetLogBdp())
114 .set_min_control_value(-1)
115 .set_max_control_value(25)
116 .set_integral_range(10)),
117 last_pid_update_(Timestamp::Now()) {}
118
MaybeSendUpdate(bool writing_anyway)119 uint32_t TransportFlowControl::MaybeSendUpdate(bool writing_anyway) {
120 const uint32_t target_announced_window =
121 static_cast<uint32_t>(target_window());
122 if ((writing_anyway || announced_window_ <= target_announced_window / 2) &&
123 announced_window_ != target_announced_window) {
124 const uint32_t announce =
125 static_cast<uint32_t>(Clamp(target_announced_window - announced_window_,
126 int64_t{0}, kMaxWindowUpdateSize));
127 announced_window_ += announce;
128 return announce;
129 }
130 return 0;
131 }
132
StreamFlowControl(TransportFlowControl * tfc)133 StreamFlowControl::StreamFlowControl(TransportFlowControl* tfc) : tfc_(tfc) {}
134
RecvData(int64_t incoming_frame_size)135 absl::Status StreamFlowControl::IncomingUpdateContext::RecvData(
136 int64_t incoming_frame_size) {
137 return tfc_upd_.RecvData(incoming_frame_size, [this, incoming_frame_size]() {
138 int64_t acked_stream_window =
139 sfc_->announced_window_delta_ + sfc_->tfc_->acked_init_window();
140 if (incoming_frame_size > acked_stream_window) {
141 return absl::InternalError(absl::StrFormat(
142 "frame of size %" PRId64 " overflows local window of %" PRId64,
143 incoming_frame_size, acked_stream_window));
144 }
145
146 tfc_upd_.UpdateAnnouncedWindowDelta(&sfc_->announced_window_delta_,
147 -incoming_frame_size);
148 sfc_->min_progress_size_ -=
149 std::min(sfc_->min_progress_size_, incoming_frame_size);
150 return absl::OkStatus();
151 });
152 }
153
RecvData(int64_t incoming_frame_size,absl::FunctionRef<absl::Status ()> stream)154 absl::Status TransportFlowControl::IncomingUpdateContext::RecvData(
155 int64_t incoming_frame_size, absl::FunctionRef<absl::Status()> stream) {
156 if (incoming_frame_size > tfc_->announced_window_) {
157 return absl::InternalError(absl::StrFormat(
158 "frame of size %" PRId64 " overflows local window of %" PRId64,
159 incoming_frame_size, tfc_->announced_window_));
160 }
161 absl::Status error = stream();
162 if (!error.ok()) return error;
163 tfc_->announced_window_ -= incoming_frame_size;
164 return absl::OkStatus();
165 }
166
target_window() const167 int64_t TransportFlowControl::target_window() const {
168 // See comment above announced_stream_total_over_incoming_window_ for the
169 // logic behind this decision.
170 return static_cast<uint32_t>(
171 std::min(static_cast<int64_t>((1u << 31) - 1),
172 announced_stream_total_over_incoming_window_ +
173 target_initial_window_size_));
174 }
175
UpdateAction(FlowControlAction action)176 FlowControlAction TransportFlowControl::UpdateAction(FlowControlAction action) {
177 const int64_t target = target_window();
178 // round up so that one byte targets are sent.
179 const int64_t send_threshold = (target + 1) / 2;
180 if (announced_window_ < send_threshold) {
181 action.set_send_transport_update(
182 FlowControlAction::Urgency::UPDATE_IMMEDIATELY);
183 }
184 return action;
185 }
186
187 // Take in a target and modifies it based on the memory pressure of the system
AdjustForMemoryPressure(double memory_pressure,double target)188 static double AdjustForMemoryPressure(double memory_pressure, double target) {
189 // do not increase window under heavy memory pressure.
190 static const double kLowMemPressure = 0.1;
191 static const double kZeroTarget = 22;
192 static const double kHighMemPressure = 0.8;
193 static const double kMaxMemPressure = 0.9;
194 if (memory_pressure < kLowMemPressure && target < kZeroTarget) {
195 target = (target - kZeroTarget) * memory_pressure / kLowMemPressure +
196 kZeroTarget;
197 } else if (memory_pressure > kHighMemPressure) {
198 target *= 1 - std::min(1.0, (memory_pressure - kHighMemPressure) /
199 (kMaxMemPressure - kHighMemPressure));
200 }
201 return target;
202 }
203
TargetLogBdp()204 double TransportFlowControl::TargetLogBdp() {
205 return AdjustForMemoryPressure(
206 memory_owner_->is_valid()
207 ? memory_owner_->GetPressureInfo().pressure_control_value
208 : 0.0,
209 1 + log2(bdp_estimator_.EstimateBdp()));
210 }
211
SmoothLogBdp(double value)212 double TransportFlowControl::SmoothLogBdp(double value) {
213 Timestamp now = Timestamp::Now();
214 double bdp_error = value - pid_controller_.last_control_value();
215 const double dt = (now - last_pid_update_).seconds();
216 last_pid_update_ = now;
217 // Limit dt to 100ms
218 const double kMaxDt = 0.1;
219 return pid_controller_.Update(bdp_error, dt > kMaxDt ? kMaxDt : dt);
220 }
221
222 double
TargetInitialWindowSizeBasedOnMemoryPressureAndBdp() const223 TransportFlowControl::TargetInitialWindowSizeBasedOnMemoryPressureAndBdp()
224 const {
225 const double bdp = bdp_estimator_.EstimateBdp() * 2.0;
226 const double memory_pressure =
227 memory_owner_->GetPressureInfo().pressure_control_value;
228 // Linear interpolation between two values.
229 // Given a line segment between the two points (t_min, a), and (t_max, b),
230 // and a value t such that t_min <= t <= t_max, return the value on the line
231 // segment at t.
232 auto lerp = [](double t, double t_min, double t_max, double a, double b) {
233 return a + (b - a) * (t - t_min) / (t_max - t_min);
234 };
235 // We split memory pressure into three broad regions:
236 // 1. Low memory pressure, the "anything goes" case - we assume no memory
237 // pressure concerns and advertise a huge window to keep things flowing.
238 // 2. Moderate memory pressure, the "adjust to BDP" case - we linearly ramp
239 // down window size to 2*BDP - which should still allow bytes to flow, but
240 // is arguably more considered.
241 // 3. High memory pressure - past 50% we linearly ramp down window size from
242 // BDP to 0 - at which point senders effectively must request to send bytes
243 // to us.
244 //
245 // ▲
246 // │
247 // 4mb ────┤---------x----
248 // │ -----
249 // BDP ────┤ ----x---
250 // │ ----
251 // │ -----
252 // │ ----
253 // │ -----
254 // │ ---x
255 // ├─────────┬─────────────┬────────────────────────┬─────►
256 // │Anything │Adjust to │Drop to zero │
257 // │Goes │BDP │ │
258 // 0% 20% 50% 100% memory
259 // pressure
260 const double kAnythingGoesPressure = 0.2;
261 const double kAdjustedToBdpPressure = 0.5;
262 const double kOneMegabyte = 1024.0 * 1024.0;
263 const double kAnythingGoesWindow = std::max(4.0 * kOneMegabyte, bdp);
264 if (memory_pressure < kAnythingGoesPressure) {
265 return kAnythingGoesWindow;
266 } else if (memory_pressure < kAdjustedToBdpPressure) {
267 return lerp(memory_pressure, kAnythingGoesPressure, kAdjustedToBdpPressure,
268 kAnythingGoesWindow, bdp);
269 } else if (memory_pressure < 1.0) {
270 return lerp(memory_pressure, kAdjustedToBdpPressure, 1.0, bdp, 0);
271 } else {
272 return 0;
273 }
274 }
275
UpdateSetting(grpc_chttp2_setting_id id,int64_t * desired_value,uint32_t new_desired_value,FlowControlAction * action,FlowControlAction & (FlowControlAction::* set)(FlowControlAction::Urgency,uint32_t))276 void TransportFlowControl::UpdateSetting(
277 grpc_chttp2_setting_id id, int64_t* desired_value,
278 uint32_t new_desired_value, FlowControlAction* action,
279 FlowControlAction& (FlowControlAction::*set)(FlowControlAction::Urgency,
280 uint32_t)) {
281 new_desired_value =
282 Clamp(new_desired_value, grpc_chttp2_settings_parameters[id].min_value,
283 grpc_chttp2_settings_parameters[id].max_value);
284 if (new_desired_value != *desired_value) {
285 if (grpc_flowctl_trace.enabled()) {
286 gpr_log(GPR_INFO, "[flowctl] UPDATE SETTING %s from %" PRId64 " to %d",
287 grpc_chttp2_settings_parameters[id].name, *desired_value,
288 new_desired_value);
289 }
290 // Reaching zero can only happen for initial window size, and if it occurs
291 // we really want to wake up writes and ensure all the queued stream
292 // window updates are flushed, since stream flow control operates
293 // differently at zero window size.
294 FlowControlAction::Urgency urgency =
295 FlowControlAction::Urgency::QUEUE_UPDATE;
296 if (*desired_value == 0 || new_desired_value == 0) {
297 urgency = FlowControlAction::Urgency::UPDATE_IMMEDIATELY;
298 }
299 *desired_value = new_desired_value;
300 (action->*set)(urgency, *desired_value);
301 }
302 }
303
SetAckedInitialWindow(uint32_t value)304 FlowControlAction TransportFlowControl::SetAckedInitialWindow(uint32_t value) {
305 acked_init_window_ = value;
306 FlowControlAction action;
307 if (acked_init_window_ != target_initial_window_size_) {
308 FlowControlAction::Urgency urgency =
309 FlowControlAction::Urgency::QUEUE_UPDATE;
310 if (acked_init_window_ == 0 || target_initial_window_size_ == 0) {
311 urgency = FlowControlAction::Urgency::UPDATE_IMMEDIATELY;
312 }
313 action.set_send_initial_window_update(urgency, target_initial_window_size_);
314 }
315 return action;
316 }
317
PeriodicUpdate()318 FlowControlAction TransportFlowControl::PeriodicUpdate() {
319 FlowControlAction action;
320 if (enable_bdp_probe_) {
321 // get bdp estimate and update initial_window accordingly.
322 // target might change based on how much memory pressure we are under
323 // TODO(ncteisen): experiment with setting target to be huge under low
324 // memory pressure.
325 uint32_t target = static_cast<uint32_t>(RoundUpToPowerOf2(
326 Clamp(IsMemoryPressureControllerEnabled()
327 ? TargetInitialWindowSizeBasedOnMemoryPressureAndBdp()
328 : pow(2, SmoothLogBdp(TargetLogBdp())),
329 0.0, static_cast<double>(kMaxInitialWindowSize))));
330 if (target < kMinPositiveInitialWindowSize) target = 0;
331 if (g_test_only_transport_target_window_estimates_mocker != nullptr) {
332 // Hook for simulating unusual flow control situations in tests.
333 target = g_test_only_transport_target_window_estimates_mocker
334 ->ComputeNextTargetInitialWindowSizeFromPeriodicUpdate(
335 target_initial_window_size_ /* current target */);
336 }
337 // Though initial window 'could' drop to 0, we keep the floor at
338 // kMinInitialWindowSize
339 UpdateSetting(GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
340 &target_initial_window_size_, target, &action,
341 &FlowControlAction::set_send_initial_window_update);
342 // we target the max of BDP or bandwidth in microseconds.
343 UpdateSetting(GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, &target_frame_size_,
344 target, &action,
345 &FlowControlAction::set_send_max_frame_size_update);
346
347 if (IsTcpFrameSizeTuningEnabled()) {
348 // Advertise PREFERRED_RECEIVE_CRYPTO_FRAME_SIZE to peer. By advertising
349 // PREFERRED_RECEIVE_CRYPTO_FRAME_SIZE to the peer, we are informing the
350 // peer that we have tcp frame size tuning enabled and we inform it of our
351 // prefered rx frame sizes. The prefered rx frame size is determined as:
352 // Clamp(target_frame_size_ * 2, 16384, 0x7fffffff). In the future, this
353 // maybe updated to a different function of the memory pressure.
354 UpdateSetting(
355 GRPC_CHTTP2_SETTINGS_GRPC_PREFERRED_RECEIVE_CRYPTO_FRAME_SIZE,
356 &target_preferred_rx_crypto_frame_size_,
357 Clamp(static_cast<unsigned int>(target_frame_size_ * 2), 16384u,
358 0x7ffffffu),
359 &action,
360 &FlowControlAction::set_preferred_rx_crypto_frame_size_update);
361 }
362 }
363 return UpdateAction(action);
364 }
365
MaybeSendUpdate()366 uint32_t StreamFlowControl::MaybeSendUpdate() {
367 TransportFlowControl::IncomingUpdateContext tfc_upd(tfc_);
368 const int64_t announce = DesiredAnnounceSize();
369 pending_size_ = absl::nullopt;
370 tfc_upd.UpdateAnnouncedWindowDelta(&announced_window_delta_, announce);
371 GPR_ASSERT(DesiredAnnounceSize() == 0);
372 std::ignore = tfc_upd.MakeAction();
373 return static_cast<uint32_t>(announce);
374 }
375
DesiredAnnounceSize() const376 int64_t StreamFlowControl::DesiredAnnounceSize() const {
377 int64_t desired_window_delta = [this]() {
378 if (min_progress_size_ == 0) {
379 if (pending_size_.has_value() &&
380 announced_window_delta_ < -*pending_size_) {
381 return -*pending_size_;
382 } else {
383 return announced_window_delta_;
384 }
385 } else {
386 return std::min(min_progress_size_, kMaxWindowDelta);
387 }
388 }();
389 return Clamp(desired_window_delta - announced_window_delta_, int64_t{0},
390 kMaxWindowUpdateSize);
391 }
392
UpdateAction(FlowControlAction action)393 FlowControlAction StreamFlowControl::UpdateAction(FlowControlAction action) {
394 const int64_t desired_announce_size = DesiredAnnounceSize();
395 if (desired_announce_size > 0) {
396 FlowControlAction::Urgency urgency =
397 FlowControlAction::Urgency::QUEUE_UPDATE;
398 // Size at which we probably want to wake up and write regardless of whether
399 // we *have* to.
400 // Currently set at half the initial window size or 8kb (whichever is
401 // greater). 8kb means we don't send rapidly unnecessarily when the initial
402 // window size is small.
403 const int64_t hurry_up_size = std::max(
404 static_cast<int64_t>(tfc_->sent_init_window()) / 2, int64_t{8192});
405 if (desired_announce_size > hurry_up_size) {
406 urgency = FlowControlAction::Urgency::UPDATE_IMMEDIATELY;
407 }
408 // min_progress_size_ > 0 means we have a reader ready to read.
409 if (min_progress_size_ > 0) {
410 // If we're into initial window to receive that data we should wake up and
411 // send an update.
412 if (announced_window_delta_ < 0) {
413 urgency = FlowControlAction::Urgency::UPDATE_IMMEDIATELY;
414 } else if (announced_window_delta_ == 0 &&
415 tfc_->sent_init_window() == 0) {
416 // Special case when initial window size is zero, meaning that
417 // announced_window_delta cannot become negative (it may already be so
418 // however).
419 urgency = FlowControlAction::Urgency::UPDATE_IMMEDIATELY;
420 }
421 }
422 action.set_send_stream_update(urgency);
423 }
424 return action;
425 }
426
SetPendingSize(int64_t pending_size)427 void StreamFlowControl::IncomingUpdateContext::SetPendingSize(
428 int64_t pending_size) {
429 GPR_ASSERT(pending_size >= 0);
430 sfc_->pending_size_ = pending_size;
431 }
432
433 } // namespace chttp2
434 } // namespace grpc_core
435