1 // 2 // 3 // Copyright 2017 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 // 18 19 #ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H 20 #define GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H 21 22 #include <grpc/support/port_platform.h> 23 24 #include <limits.h> 25 #include <stdint.h> 26 27 #include <iosfwd> 28 #include <string> 29 #include <utility> 30 31 #include "absl/functional/function_ref.h" 32 #include "absl/status/status.h" 33 #include "absl/strings/string_view.h" 34 #include "absl/types/optional.h" 35 36 #include <grpc/support/log.h> 37 38 #include "src/core/ext/transport/chttp2/transport/http2_settings.h" 39 #include "src/core/lib/debug/trace.h" 40 #include "src/core/lib/gprpp/time.h" 41 #include "src/core/lib/resource_quota/memory_quota.h" 42 #include "src/core/lib/transport/bdp_estimator.h" 43 #include "src/core/lib/transport/pid_controller.h" 44 45 extern grpc_core::TraceFlag grpc_flowctl_trace; 46 47 namespace grpc { 48 namespace testing { 49 class TrickledCHTTP2; // to make this a friend 50 } // namespace testing 51 } // namespace grpc 52 53 namespace grpc_core { 54 namespace chttp2 { 55 56 static constexpr uint32_t kDefaultWindow = 65535; 57 static constexpr uint32_t kDefaultFrameSize = 16384; 58 static constexpr int64_t kMaxWindow = static_cast<int64_t>((1u << 31) - 1); 59 // If smaller than this, advertise zero window. 60 static constexpr uint32_t kMinPositiveInitialWindowSize = 1024; 61 static constexpr const uint32_t kMaxInitialWindowSize = (1u << 30); 62 // The maximum per-stream flow control window delta to advertise. 63 static constexpr const int64_t kMaxWindowDelta = (1u << 20); 64 static constexpr const int kDefaultPreferredRxCryptoFrameSize = INT_MAX; 65 66 // TODO(ctiller): clean up when flow_control_fixes is enabled by default 67 static constexpr uint32_t kFrameSize = 1024 * 1024; 68 static constexpr const uint32_t kMinInitialWindowSize = 128; 69 70 class TransportFlowControl; 71 class StreamFlowControl; 72 73 enum class StallEdge { kNoChange, kStalled, kUnstalled }; 74 75 // Encapsulates a collections of actions the transport needs to take with 76 // regard to flow control. Each action comes with urgencies that tell the 77 // transport how quickly the action must take place. 78 class GRPC_MUST_USE_RESULT FlowControlAction { 79 public: 80 enum class Urgency : uint8_t { 81 // Nothing to be done. 82 NO_ACTION_NEEDED = 0, 83 // Initiate a write to update the initial window immediately. 84 UPDATE_IMMEDIATELY, 85 // Push the flow control update into a send buffer, to be sent 86 // out the next time a write is initiated. 87 QUEUE_UPDATE, 88 }; 89 send_stream_update()90 Urgency send_stream_update() const { return send_stream_update_; } send_transport_update()91 Urgency send_transport_update() const { return send_transport_update_; } send_initial_window_update()92 Urgency send_initial_window_update() const { 93 return send_initial_window_update_; 94 } send_max_frame_size_update()95 Urgency send_max_frame_size_update() const { 96 return send_max_frame_size_update_; 97 } preferred_rx_crypto_frame_size_update()98 Urgency preferred_rx_crypto_frame_size_update() const { 99 return preferred_rx_crypto_frame_size_update_; 100 } initial_window_size()101 uint32_t initial_window_size() const { return initial_window_size_; } max_frame_size()102 uint32_t max_frame_size() const { return max_frame_size_; } preferred_rx_crypto_frame_size()103 uint32_t preferred_rx_crypto_frame_size() const { 104 return preferred_rx_crypto_frame_size_; 105 } 106 set_send_stream_update(Urgency u)107 FlowControlAction& set_send_stream_update(Urgency u) { 108 send_stream_update_ = u; 109 return *this; 110 } set_send_transport_update(Urgency u)111 FlowControlAction& set_send_transport_update(Urgency u) { 112 send_transport_update_ = u; 113 return *this; 114 } set_send_initial_window_update(Urgency u,uint32_t update)115 FlowControlAction& set_send_initial_window_update(Urgency u, 116 uint32_t update) { 117 send_initial_window_update_ = u; 118 initial_window_size_ = update; 119 return *this; 120 } set_send_max_frame_size_update(Urgency u,uint32_t update)121 FlowControlAction& set_send_max_frame_size_update(Urgency u, 122 uint32_t update) { 123 send_max_frame_size_update_ = u; 124 max_frame_size_ = update; 125 return *this; 126 } set_preferred_rx_crypto_frame_size_update(Urgency u,uint32_t update)127 FlowControlAction& set_preferred_rx_crypto_frame_size_update( 128 Urgency u, uint32_t update) { 129 preferred_rx_crypto_frame_size_update_ = u; 130 preferred_rx_crypto_frame_size_ = update; 131 return *this; 132 } 133 134 static const char* UrgencyString(Urgency u); 135 std::string DebugString() const; 136 AssertEmpty()137 void AssertEmpty() { GPR_ASSERT(*this == FlowControlAction()); } 138 139 bool operator==(const FlowControlAction& other) const { 140 return send_stream_update_ == other.send_stream_update_ && 141 send_transport_update_ == other.send_transport_update_ && 142 send_initial_window_update_ == other.send_initial_window_update_ && 143 send_max_frame_size_update_ == other.send_max_frame_size_update_ && 144 (send_initial_window_update_ == Urgency::NO_ACTION_NEEDED || 145 initial_window_size_ == other.initial_window_size_) && 146 (send_max_frame_size_update_ == Urgency::NO_ACTION_NEEDED || 147 max_frame_size_ == other.max_frame_size_) && 148 (preferred_rx_crypto_frame_size_update_ == 149 Urgency::NO_ACTION_NEEDED || 150 preferred_rx_crypto_frame_size_ == 151 other.preferred_rx_crypto_frame_size_); 152 } 153 154 private: 155 Urgency send_stream_update_ = Urgency::NO_ACTION_NEEDED; 156 Urgency send_transport_update_ = Urgency::NO_ACTION_NEEDED; 157 Urgency send_initial_window_update_ = Urgency::NO_ACTION_NEEDED; 158 Urgency send_max_frame_size_update_ = Urgency::NO_ACTION_NEEDED; 159 Urgency preferred_rx_crypto_frame_size_update_ = Urgency::NO_ACTION_NEEDED; 160 uint32_t initial_window_size_ = 0; 161 uint32_t max_frame_size_ = 0; 162 uint32_t preferred_rx_crypto_frame_size_ = 0; 163 }; 164 165 std::ostream& operator<<(std::ostream& out, FlowControlAction::Urgency urgency); 166 std::ostream& operator<<(std::ostream& out, const FlowControlAction& action); 167 168 // Implementation of flow control that abides to HTTP/2 spec and attempts 169 // to be as performant as possible. 170 class TransportFlowControl final { 171 public: 172 explicit TransportFlowControl(absl::string_view name, bool enable_bdp_probe, 173 MemoryOwner* memory_owner); ~TransportFlowControl()174 ~TransportFlowControl() {} 175 bdp_probe()176 bool bdp_probe() const { return enable_bdp_probe_; } 177 178 // returns an announce if we should send a transport update to our peer, 179 // else returns zero; writing_anyway indicates if a write would happen 180 // regardless of the send - if it is false and this function returns non-zero, 181 // this announce will cause a write to occur 182 uint32_t MaybeSendUpdate(bool writing_anyway); 183 184 // Track an update to the incoming flow control counters - that is how many 185 // tokens we report to our peer that we're willing to accept. 186 // Instantiators *must* call MakeAction before destruction of this value. 187 class IncomingUpdateContext { 188 public: IncomingUpdateContext(TransportFlowControl * tfc)189 explicit IncomingUpdateContext(TransportFlowControl* tfc) : tfc_(tfc) {} ~IncomingUpdateContext()190 ~IncomingUpdateContext() { GPR_ASSERT(tfc_ == nullptr); } 191 192 IncomingUpdateContext(const IncomingUpdateContext&) = delete; 193 IncomingUpdateContext& operator=(const IncomingUpdateContext&) = delete; 194 195 // Reads the flow control data and returns an actionable struct that will 196 // tell chttp2 exactly what it needs to do MakeAction()197 FlowControlAction MakeAction() { 198 return std::exchange(tfc_, nullptr)->UpdateAction(FlowControlAction()); 199 } 200 201 // Notify of data receipt. Returns OkStatus if the data was accepted, 202 // else an error status if the connection should be closed. 203 absl::Status RecvData( 204 int64_t incoming_frame_size, absl::FunctionRef<absl::Status()> stream = 205 []() { return absl::OkStatus(); }); 206 207 // Update a stream announce window delta, keeping track of how much total 208 // positive delta is present on the transport. UpdateAnnouncedWindowDelta(int64_t * delta,int64_t change)209 void UpdateAnnouncedWindowDelta(int64_t* delta, int64_t change) { 210 if (change == 0) return; 211 if (*delta > 0) { 212 tfc_->announced_stream_total_over_incoming_window_ -= *delta; 213 } 214 *delta += change; 215 if (*delta > 0) { 216 tfc_->announced_stream_total_over_incoming_window_ += *delta; 217 } 218 } 219 220 private: 221 TransportFlowControl* tfc_; 222 }; 223 224 // Track an update to the outgoing flow control counters - that is how many 225 // tokens our peer has said we can send. 226 class OutgoingUpdateContext { 227 public: OutgoingUpdateContext(TransportFlowControl * tfc)228 explicit OutgoingUpdateContext(TransportFlowControl* tfc) : tfc_(tfc) {} StreamSentData(int64_t size)229 void StreamSentData(int64_t size) { tfc_->remote_window_ -= size; } 230 231 // we have received a WINDOW_UPDATE frame for a transport RecvUpdate(uint32_t size)232 void RecvUpdate(uint32_t size) { tfc_->remote_window_ += size; } 233 234 // Finish the update and check whether we became stalled or unstalled. Finish()235 StallEdge Finish() { 236 bool is_stalled = tfc_->remote_window_ <= 0; 237 if (is_stalled != was_stalled_) { 238 return is_stalled ? StallEdge::kStalled : StallEdge::kUnstalled; 239 } else { 240 return StallEdge::kNoChange; 241 } 242 } 243 244 private: 245 TransportFlowControl* tfc_; 246 const bool was_stalled_ = tfc_->remote_window_ <= 0; 247 }; 248 249 // Call periodically (at a low-ish rate, 100ms - 10s makes sense) 250 // to perform more complex flow control calculations and return an action 251 // to let chttp2 change its parameters 252 FlowControlAction PeriodicUpdate(); 253 254 int64_t target_window() const; target_frame_size()255 int64_t target_frame_size() const { return target_frame_size_; } target_preferred_rx_crypto_frame_size()256 int64_t target_preferred_rx_crypto_frame_size() const { 257 return target_preferred_rx_crypto_frame_size_; 258 } 259 bdp_estimator()260 BdpEstimator* bdp_estimator() { return &bdp_estimator_; } 261 acked_init_window()262 uint32_t acked_init_window() const { return acked_init_window_; } sent_init_window()263 uint32_t sent_init_window() const { return target_initial_window_size_; } 264 265 FlowControlAction SetAckedInitialWindow(uint32_t value); 266 267 // Getters remote_window()268 int64_t remote_window() const { return remote_window_; } announced_window()269 int64_t announced_window() const { return announced_window_; } 270 announced_stream_total_over_incoming_window()271 int64_t announced_stream_total_over_incoming_window() const { 272 return announced_stream_total_over_incoming_window_; 273 } 274 RemoveAnnouncedWindowDelta(int64_t delta)275 void RemoveAnnouncedWindowDelta(int64_t delta) { 276 if (delta > 0) { 277 announced_stream_total_over_incoming_window_ -= delta; 278 } 279 } 280 281 private: 282 double TargetLogBdp(); 283 double SmoothLogBdp(double value); 284 double TargetInitialWindowSizeBasedOnMemoryPressureAndBdp() const; 285 static void UpdateSetting(grpc_chttp2_setting_id id, int64_t* desired_value, 286 uint32_t new_desired_value, 287 FlowControlAction* action, 288 FlowControlAction& (FlowControlAction::*set)( 289 FlowControlAction::Urgency, uint32_t)); 290 291 FlowControlAction UpdateAction(FlowControlAction action); 292 293 MemoryOwner* const memory_owner_; 294 295 /// calculating what we should give for local window: 296 /// we track the total amount of flow control over initial window size 297 /// across all streams: this is data that we want to receive right now (it 298 /// has an outstanding read) 299 /// and the total amount of flow control under initial window size across all 300 /// streams: this is data we've read early 301 /// we want to adjust incoming_window such that: 302 /// incoming_window = total_over - max(bdp - total_under, 0) 303 int64_t announced_stream_total_over_incoming_window_ = 0; 304 305 /// should we probe bdp? 306 const bool enable_bdp_probe_; 307 308 // bdp estimation 309 BdpEstimator bdp_estimator_; 310 311 // pid controller 312 PidController pid_controller_; 313 Timestamp last_pid_update_; 314 315 int64_t remote_window_ = kDefaultWindow; 316 int64_t target_initial_window_size_ = kDefaultWindow; 317 int64_t target_frame_size_ = kDefaultFrameSize; 318 int64_t target_preferred_rx_crypto_frame_size_ = 319 kDefaultPreferredRxCryptoFrameSize; 320 int64_t announced_window_ = kDefaultWindow; 321 uint32_t acked_init_window_ = kDefaultWindow; 322 }; 323 324 // Implementation of flow control that abides to HTTP/2 spec and attempts 325 // to be as performant as possible. 326 class StreamFlowControl final { 327 public: 328 explicit StreamFlowControl(TransportFlowControl* tfc); ~StreamFlowControl()329 ~StreamFlowControl() { 330 tfc_->RemoveAnnouncedWindowDelta(announced_window_delta_); 331 } 332 333 // Track an update to the incoming flow control counters - that is how many 334 // tokens we report to our peer that we're willing to accept. 335 // Instantiators *must* call MakeAction before destruction of this value. 336 class IncomingUpdateContext { 337 public: IncomingUpdateContext(StreamFlowControl * sfc)338 explicit IncomingUpdateContext(StreamFlowControl* sfc) 339 : tfc_upd_(sfc->tfc_), sfc_(sfc) {} 340 MakeAction()341 FlowControlAction MakeAction() { 342 return sfc_->UpdateAction(tfc_upd_.MakeAction()); 343 } 344 345 // we have received data from the wire 346 absl::Status RecvData(int64_t incoming_frame_size); 347 348 // the application is asking for a certain amount of bytes SetMinProgressSize(int64_t min_progress_size)349 void SetMinProgressSize(int64_t min_progress_size) { 350 sfc_->min_progress_size_ = min_progress_size; 351 } 352 353 void SetPendingSize(int64_t pending_size); 354 355 private: 356 TransportFlowControl::IncomingUpdateContext tfc_upd_; 357 StreamFlowControl* const sfc_; 358 }; 359 360 // Track an update to the outgoing flow control counters - that is how many 361 // tokens our peer has said we can send. 362 class OutgoingUpdateContext { 363 public: OutgoingUpdateContext(StreamFlowControl * sfc)364 explicit OutgoingUpdateContext(StreamFlowControl* sfc) 365 : tfc_upd_(sfc->tfc_), sfc_(sfc) {} 366 // we have received a WINDOW_UPDATE frame for a stream RecvUpdate(uint32_t size)367 void RecvUpdate(uint32_t size) { sfc_->remote_window_delta_ += size; } 368 // we have sent data on the wire, we must track this in our bookkeeping for 369 // the remote peer's flow control. SentData(int64_t outgoing_frame_size)370 void SentData(int64_t outgoing_frame_size) { 371 tfc_upd_.StreamSentData(outgoing_frame_size); 372 sfc_->remote_window_delta_ -= outgoing_frame_size; 373 } 374 375 private: 376 TransportFlowControl::OutgoingUpdateContext tfc_upd_; 377 StreamFlowControl* const sfc_; 378 }; 379 380 // returns an announce if we should send a stream update to our peer, else 381 // returns zero 382 uint32_t MaybeSendUpdate(); 383 remote_window_delta()384 int64_t remote_window_delta() const { return remote_window_delta_; } announced_window_delta()385 int64_t announced_window_delta() const { return announced_window_delta_; } min_progress_size()386 int64_t min_progress_size() const { return min_progress_size_; } 387 388 private: 389 TransportFlowControl* const tfc_; 390 int64_t min_progress_size_ = 0; 391 int64_t remote_window_delta_ = 0; 392 int64_t announced_window_delta_ = 0; 393 absl::optional<int64_t> pending_size_; 394 395 FlowControlAction UpdateAction(FlowControlAction action); 396 int64_t DesiredAnnounceSize() const; 397 }; 398 399 class TestOnlyTransportTargetWindowEstimatesMocker { 400 public: ~TestOnlyTransportTargetWindowEstimatesMocker()401 virtual ~TestOnlyTransportTargetWindowEstimatesMocker() {} 402 virtual double ComputeNextTargetInitialWindowSizeFromPeriodicUpdate( 403 double current_target) = 0; 404 }; 405 406 extern TestOnlyTransportTargetWindowEstimatesMocker* 407 g_test_only_transport_target_window_estimates_mocker; 408 409 } // namespace chttp2 410 } // namespace grpc_core 411 412 #endif // GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H 413