1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H
20 #define GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H
21 
22 #include <grpc/support/port_platform.h>
23 
24 #include <limits.h>
25 #include <stdint.h>
26 
27 #include <iosfwd>
28 #include <string>
29 #include <utility>
30 
31 #include "absl/functional/function_ref.h"
32 #include "absl/status/status.h"
33 #include "absl/strings/string_view.h"
34 #include "absl/types/optional.h"
35 
36 #include <grpc/support/log.h>
37 
38 #include "src/core/ext/transport/chttp2/transport/http2_settings.h"
39 #include "src/core/lib/debug/trace.h"
40 #include "src/core/lib/gprpp/time.h"
41 #include "src/core/lib/resource_quota/memory_quota.h"
42 #include "src/core/lib/transport/bdp_estimator.h"
43 #include "src/core/lib/transport/pid_controller.h"
44 
45 extern grpc_core::TraceFlag grpc_flowctl_trace;
46 
47 namespace grpc {
48 namespace testing {
49 class TrickledCHTTP2;  // to make this a friend
50 }  // namespace testing
51 }  // namespace grpc
52 
53 namespace grpc_core {
54 namespace chttp2 {
55 
56 static constexpr uint32_t kDefaultWindow = 65535;
57 static constexpr uint32_t kDefaultFrameSize = 16384;
58 static constexpr int64_t kMaxWindow = static_cast<int64_t>((1u << 31) - 1);
59 // If smaller than this, advertise zero window.
60 static constexpr uint32_t kMinPositiveInitialWindowSize = 1024;
61 static constexpr const uint32_t kMaxInitialWindowSize = (1u << 30);
62 // The maximum per-stream flow control window delta to advertise.
63 static constexpr const int64_t kMaxWindowDelta = (1u << 20);
64 static constexpr const int kDefaultPreferredRxCryptoFrameSize = INT_MAX;
65 
66 // TODO(ctiller): clean up when flow_control_fixes is enabled by default
67 static constexpr uint32_t kFrameSize = 1024 * 1024;
68 static constexpr const uint32_t kMinInitialWindowSize = 128;
69 
70 class TransportFlowControl;
71 class StreamFlowControl;
72 
73 enum class StallEdge { kNoChange, kStalled, kUnstalled };
74 
75 // Encapsulates a collections of actions the transport needs to take with
76 // regard to flow control. Each action comes with urgencies that tell the
77 // transport how quickly the action must take place.
78 class GRPC_MUST_USE_RESULT FlowControlAction {
79  public:
80   enum class Urgency : uint8_t {
81     // Nothing to be done.
82     NO_ACTION_NEEDED = 0,
83     // Initiate a write to update the initial window immediately.
84     UPDATE_IMMEDIATELY,
85     // Push the flow control update into a send buffer, to be sent
86     // out the next time a write is initiated.
87     QUEUE_UPDATE,
88   };
89 
send_stream_update()90   Urgency send_stream_update() const { return send_stream_update_; }
send_transport_update()91   Urgency send_transport_update() const { return send_transport_update_; }
send_initial_window_update()92   Urgency send_initial_window_update() const {
93     return send_initial_window_update_;
94   }
send_max_frame_size_update()95   Urgency send_max_frame_size_update() const {
96     return send_max_frame_size_update_;
97   }
preferred_rx_crypto_frame_size_update()98   Urgency preferred_rx_crypto_frame_size_update() const {
99     return preferred_rx_crypto_frame_size_update_;
100   }
initial_window_size()101   uint32_t initial_window_size() const { return initial_window_size_; }
max_frame_size()102   uint32_t max_frame_size() const { return max_frame_size_; }
preferred_rx_crypto_frame_size()103   uint32_t preferred_rx_crypto_frame_size() const {
104     return preferred_rx_crypto_frame_size_;
105   }
106 
set_send_stream_update(Urgency u)107   FlowControlAction& set_send_stream_update(Urgency u) {
108     send_stream_update_ = u;
109     return *this;
110   }
set_send_transport_update(Urgency u)111   FlowControlAction& set_send_transport_update(Urgency u) {
112     send_transport_update_ = u;
113     return *this;
114   }
set_send_initial_window_update(Urgency u,uint32_t update)115   FlowControlAction& set_send_initial_window_update(Urgency u,
116                                                     uint32_t update) {
117     send_initial_window_update_ = u;
118     initial_window_size_ = update;
119     return *this;
120   }
set_send_max_frame_size_update(Urgency u,uint32_t update)121   FlowControlAction& set_send_max_frame_size_update(Urgency u,
122                                                     uint32_t update) {
123     send_max_frame_size_update_ = u;
124     max_frame_size_ = update;
125     return *this;
126   }
set_preferred_rx_crypto_frame_size_update(Urgency u,uint32_t update)127   FlowControlAction& set_preferred_rx_crypto_frame_size_update(
128       Urgency u, uint32_t update) {
129     preferred_rx_crypto_frame_size_update_ = u;
130     preferred_rx_crypto_frame_size_ = update;
131     return *this;
132   }
133 
134   static const char* UrgencyString(Urgency u);
135   std::string DebugString() const;
136 
AssertEmpty()137   void AssertEmpty() { GPR_ASSERT(*this == FlowControlAction()); }
138 
139   bool operator==(const FlowControlAction& other) const {
140     return send_stream_update_ == other.send_stream_update_ &&
141            send_transport_update_ == other.send_transport_update_ &&
142            send_initial_window_update_ == other.send_initial_window_update_ &&
143            send_max_frame_size_update_ == other.send_max_frame_size_update_ &&
144            (send_initial_window_update_ == Urgency::NO_ACTION_NEEDED ||
145             initial_window_size_ == other.initial_window_size_) &&
146            (send_max_frame_size_update_ == Urgency::NO_ACTION_NEEDED ||
147             max_frame_size_ == other.max_frame_size_) &&
148            (preferred_rx_crypto_frame_size_update_ ==
149                 Urgency::NO_ACTION_NEEDED ||
150             preferred_rx_crypto_frame_size_ ==
151                 other.preferred_rx_crypto_frame_size_);
152   }
153 
154  private:
155   Urgency send_stream_update_ = Urgency::NO_ACTION_NEEDED;
156   Urgency send_transport_update_ = Urgency::NO_ACTION_NEEDED;
157   Urgency send_initial_window_update_ = Urgency::NO_ACTION_NEEDED;
158   Urgency send_max_frame_size_update_ = Urgency::NO_ACTION_NEEDED;
159   Urgency preferred_rx_crypto_frame_size_update_ = Urgency::NO_ACTION_NEEDED;
160   uint32_t initial_window_size_ = 0;
161   uint32_t max_frame_size_ = 0;
162   uint32_t preferred_rx_crypto_frame_size_ = 0;
163 };
164 
165 std::ostream& operator<<(std::ostream& out, FlowControlAction::Urgency urgency);
166 std::ostream& operator<<(std::ostream& out, const FlowControlAction& action);
167 
168 // Implementation of flow control that abides to HTTP/2 spec and attempts
169 // to be as performant as possible.
170 class TransportFlowControl final {
171  public:
172   explicit TransportFlowControl(absl::string_view name, bool enable_bdp_probe,
173                                 MemoryOwner* memory_owner);
~TransportFlowControl()174   ~TransportFlowControl() {}
175 
bdp_probe()176   bool bdp_probe() const { return enable_bdp_probe_; }
177 
178   // returns an announce if we should send a transport update to our peer,
179   // else returns zero; writing_anyway indicates if a write would happen
180   // regardless of the send - if it is false and this function returns non-zero,
181   // this announce will cause a write to occur
182   uint32_t MaybeSendUpdate(bool writing_anyway);
183 
184   // Track an update to the incoming flow control counters - that is how many
185   // tokens we report to our peer that we're willing to accept.
186   // Instantiators *must* call MakeAction before destruction of this value.
187   class IncomingUpdateContext {
188    public:
IncomingUpdateContext(TransportFlowControl * tfc)189     explicit IncomingUpdateContext(TransportFlowControl* tfc) : tfc_(tfc) {}
~IncomingUpdateContext()190     ~IncomingUpdateContext() { GPR_ASSERT(tfc_ == nullptr); }
191 
192     IncomingUpdateContext(const IncomingUpdateContext&) = delete;
193     IncomingUpdateContext& operator=(const IncomingUpdateContext&) = delete;
194 
195     // Reads the flow control data and returns an actionable struct that will
196     // tell chttp2 exactly what it needs to do
MakeAction()197     FlowControlAction MakeAction() {
198       return std::exchange(tfc_, nullptr)->UpdateAction(FlowControlAction());
199     }
200 
201     // Notify of data receipt. Returns OkStatus if the data was accepted,
202     // else an error status if the connection should be closed.
203     absl::Status RecvData(
204         int64_t incoming_frame_size, absl::FunctionRef<absl::Status()> stream =
205                                          []() { return absl::OkStatus(); });
206 
207     // Update a stream announce window delta, keeping track of how much total
208     // positive delta is present on the transport.
UpdateAnnouncedWindowDelta(int64_t * delta,int64_t change)209     void UpdateAnnouncedWindowDelta(int64_t* delta, int64_t change) {
210       if (change == 0) return;
211       if (*delta > 0) {
212         tfc_->announced_stream_total_over_incoming_window_ -= *delta;
213       }
214       *delta += change;
215       if (*delta > 0) {
216         tfc_->announced_stream_total_over_incoming_window_ += *delta;
217       }
218     }
219 
220    private:
221     TransportFlowControl* tfc_;
222   };
223 
224   // Track an update to the outgoing flow control counters - that is how many
225   // tokens our peer has said we can send.
226   class OutgoingUpdateContext {
227    public:
OutgoingUpdateContext(TransportFlowControl * tfc)228     explicit OutgoingUpdateContext(TransportFlowControl* tfc) : tfc_(tfc) {}
StreamSentData(int64_t size)229     void StreamSentData(int64_t size) { tfc_->remote_window_ -= size; }
230 
231     // we have received a WINDOW_UPDATE frame for a transport
RecvUpdate(uint32_t size)232     void RecvUpdate(uint32_t size) { tfc_->remote_window_ += size; }
233 
234     // Finish the update and check whether we became stalled or unstalled.
Finish()235     StallEdge Finish() {
236       bool is_stalled = tfc_->remote_window_ <= 0;
237       if (is_stalled != was_stalled_) {
238         return is_stalled ? StallEdge::kStalled : StallEdge::kUnstalled;
239       } else {
240         return StallEdge::kNoChange;
241       }
242     }
243 
244    private:
245     TransportFlowControl* tfc_;
246     const bool was_stalled_ = tfc_->remote_window_ <= 0;
247   };
248 
249   // Call periodically (at a low-ish rate, 100ms - 10s makes sense)
250   // to perform more complex flow control calculations and return an action
251   // to let chttp2 change its parameters
252   FlowControlAction PeriodicUpdate();
253 
254   int64_t target_window() const;
target_frame_size()255   int64_t target_frame_size() const { return target_frame_size_; }
target_preferred_rx_crypto_frame_size()256   int64_t target_preferred_rx_crypto_frame_size() const {
257     return target_preferred_rx_crypto_frame_size_;
258   }
259 
bdp_estimator()260   BdpEstimator* bdp_estimator() { return &bdp_estimator_; }
261 
acked_init_window()262   uint32_t acked_init_window() const { return acked_init_window_; }
sent_init_window()263   uint32_t sent_init_window() const { return target_initial_window_size_; }
264 
265   FlowControlAction SetAckedInitialWindow(uint32_t value);
266 
267   // Getters
remote_window()268   int64_t remote_window() const { return remote_window_; }
announced_window()269   int64_t announced_window() const { return announced_window_; }
270 
announced_stream_total_over_incoming_window()271   int64_t announced_stream_total_over_incoming_window() const {
272     return announced_stream_total_over_incoming_window_;
273   }
274 
RemoveAnnouncedWindowDelta(int64_t delta)275   void RemoveAnnouncedWindowDelta(int64_t delta) {
276     if (delta > 0) {
277       announced_stream_total_over_incoming_window_ -= delta;
278     }
279   }
280 
281  private:
282   double TargetLogBdp();
283   double SmoothLogBdp(double value);
284   double TargetInitialWindowSizeBasedOnMemoryPressureAndBdp() const;
285   static void UpdateSetting(grpc_chttp2_setting_id id, int64_t* desired_value,
286                             uint32_t new_desired_value,
287                             FlowControlAction* action,
288                             FlowControlAction& (FlowControlAction::*set)(
289                                 FlowControlAction::Urgency, uint32_t));
290 
291   FlowControlAction UpdateAction(FlowControlAction action);
292 
293   MemoryOwner* const memory_owner_;
294 
295   /// calculating what we should give for local window:
296   /// we track the total amount of flow control over initial window size
297   /// across all streams: this is data that we want to receive right now (it
298   /// has an outstanding read)
299   /// and the total amount of flow control under initial window size across all
300   /// streams: this is data we've read early
301   /// we want to adjust incoming_window such that:
302   /// incoming_window = total_over - max(bdp - total_under, 0)
303   int64_t announced_stream_total_over_incoming_window_ = 0;
304 
305   /// should we probe bdp?
306   const bool enable_bdp_probe_;
307 
308   // bdp estimation
309   BdpEstimator bdp_estimator_;
310 
311   // pid controller
312   PidController pid_controller_;
313   Timestamp last_pid_update_;
314 
315   int64_t remote_window_ = kDefaultWindow;
316   int64_t target_initial_window_size_ = kDefaultWindow;
317   int64_t target_frame_size_ = kDefaultFrameSize;
318   int64_t target_preferred_rx_crypto_frame_size_ =
319       kDefaultPreferredRxCryptoFrameSize;
320   int64_t announced_window_ = kDefaultWindow;
321   uint32_t acked_init_window_ = kDefaultWindow;
322 };
323 
324 // Implementation of flow control that abides to HTTP/2 spec and attempts
325 // to be as performant as possible.
326 class StreamFlowControl final {
327  public:
328   explicit StreamFlowControl(TransportFlowControl* tfc);
~StreamFlowControl()329   ~StreamFlowControl() {
330     tfc_->RemoveAnnouncedWindowDelta(announced_window_delta_);
331   }
332 
333   // Track an update to the incoming flow control counters - that is how many
334   // tokens we report to our peer that we're willing to accept.
335   // Instantiators *must* call MakeAction before destruction of this value.
336   class IncomingUpdateContext {
337    public:
IncomingUpdateContext(StreamFlowControl * sfc)338     explicit IncomingUpdateContext(StreamFlowControl* sfc)
339         : tfc_upd_(sfc->tfc_), sfc_(sfc) {}
340 
MakeAction()341     FlowControlAction MakeAction() {
342       return sfc_->UpdateAction(tfc_upd_.MakeAction());
343     }
344 
345     // we have received data from the wire
346     absl::Status RecvData(int64_t incoming_frame_size);
347 
348     // the application is asking for a certain amount of bytes
SetMinProgressSize(int64_t min_progress_size)349     void SetMinProgressSize(int64_t min_progress_size) {
350       sfc_->min_progress_size_ = min_progress_size;
351     }
352 
353     void SetPendingSize(int64_t pending_size);
354 
355    private:
356     TransportFlowControl::IncomingUpdateContext tfc_upd_;
357     StreamFlowControl* const sfc_;
358   };
359 
360   // Track an update to the outgoing flow control counters - that is how many
361   // tokens our peer has said we can send.
362   class OutgoingUpdateContext {
363    public:
OutgoingUpdateContext(StreamFlowControl * sfc)364     explicit OutgoingUpdateContext(StreamFlowControl* sfc)
365         : tfc_upd_(sfc->tfc_), sfc_(sfc) {}
366     // we have received a WINDOW_UPDATE frame for a stream
RecvUpdate(uint32_t size)367     void RecvUpdate(uint32_t size) { sfc_->remote_window_delta_ += size; }
368     // we have sent data on the wire, we must track this in our bookkeeping for
369     // the remote peer's flow control.
SentData(int64_t outgoing_frame_size)370     void SentData(int64_t outgoing_frame_size) {
371       tfc_upd_.StreamSentData(outgoing_frame_size);
372       sfc_->remote_window_delta_ -= outgoing_frame_size;
373     }
374 
375    private:
376     TransportFlowControl::OutgoingUpdateContext tfc_upd_;
377     StreamFlowControl* const sfc_;
378   };
379 
380   // returns an announce if we should send a stream update to our peer, else
381   // returns zero
382   uint32_t MaybeSendUpdate();
383 
remote_window_delta()384   int64_t remote_window_delta() const { return remote_window_delta_; }
announced_window_delta()385   int64_t announced_window_delta() const { return announced_window_delta_; }
min_progress_size()386   int64_t min_progress_size() const { return min_progress_size_; }
387 
388  private:
389   TransportFlowControl* const tfc_;
390   int64_t min_progress_size_ = 0;
391   int64_t remote_window_delta_ = 0;
392   int64_t announced_window_delta_ = 0;
393   absl::optional<int64_t> pending_size_;
394 
395   FlowControlAction UpdateAction(FlowControlAction action);
396   int64_t DesiredAnnounceSize() const;
397 };
398 
399 class TestOnlyTransportTargetWindowEstimatesMocker {
400  public:
~TestOnlyTransportTargetWindowEstimatesMocker()401   virtual ~TestOnlyTransportTargetWindowEstimatesMocker() {}
402   virtual double ComputeNextTargetInitialWindowSizeFromPeriodicUpdate(
403       double current_target) = 0;
404 };
405 
406 extern TestOnlyTransportTargetWindowEstimatesMocker*
407     g_test_only_transport_target_window_estimates_mocker;
408 
409 }  // namespace chttp2
410 }  // namespace grpc_core
411 
412 #endif  // GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H
413