1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
16
17 #include <memory>
18 #include <tuple>
19
20 #include "gtest/gtest.h"
21
22 #include <grpc/support/time.h>
23
24 #include "src/core/lib/experiments/experiments.h"
25 #include "src/core/lib/gpr/useful.h"
26 #include "src/core/lib/gprpp/ref_counted_ptr.h"
27 #include "src/core/lib/gprpp/time.h"
28 #include "src/core/lib/iomgr/exec_ctx.h"
29 #include "src/core/lib/resource_quota/resource_quota.h"
30 #include "src/core/lib/transport/bdp_estimator.h"
31
32 extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type);
33
34 namespace grpc_core {
35 namespace chttp2 {
36
37 namespace {
38
39 constexpr uint64_t kMaxAdvanceTimeMillis = 24ull * 365 * 3600 * 1000;
40
41 gpr_timespec g_now;
now_impl(gpr_clock_type clock_type)42 gpr_timespec now_impl(gpr_clock_type clock_type) {
43 GPR_ASSERT(clock_type != GPR_TIMESPAN);
44 gpr_timespec ts = g_now;
45 ts.clock_type = clock_type;
46 return ts;
47 }
48
InitGlobals()49 void InitGlobals() {
50 g_now = {1, 0, GPR_CLOCK_MONOTONIC};
51 TestOnlySetProcessEpoch(g_now);
52 gpr_now_impl = now_impl;
53 }
54
AdvanceClockMillis(uint64_t millis)55 void AdvanceClockMillis(uint64_t millis) {
56 ExecCtx exec_ctx;
57 g_now = gpr_time_add(g_now, gpr_time_from_millis(Clamp(millis, uint64_t{1},
58 kMaxAdvanceTimeMillis),
59 GPR_TIMESPAN));
60 exec_ctx.InvalidateNow();
61 }
62
63 class TransportTargetWindowEstimatesMocker
64 : public chttp2::TestOnlyTransportTargetWindowEstimatesMocker {
65 public:
TransportTargetWindowEstimatesMocker()66 explicit TransportTargetWindowEstimatesMocker() {}
67
ComputeNextTargetInitialWindowSizeFromPeriodicUpdate(double current_target)68 double ComputeNextTargetInitialWindowSizeFromPeriodicUpdate(
69 double current_target) override {
70 const double kSmallWindow = 16384;
71 const double kBigWindow = 1024 * 1024;
72 // Bounce back and forth between small and big initial windows.
73 if (current_target > kSmallWindow) {
74 return kSmallWindow;
75 } else {
76 return kBigWindow;
77 }
78 }
79 };
80
81 } // namespace
82
83 class FlowControlTest : public ::testing::Test {
84 protected:
85 MemoryOwner memory_owner_ = MemoryOwner(
86 ResourceQuota::Default()->memory_quota()->CreateMemoryOwner());
87 };
88
TEST_F(FlowControlTest,NoOp)89 TEST_F(FlowControlTest, NoOp) {
90 ExecCtx exec_ctx;
91 TransportFlowControl tfc("test", true, &memory_owner_);
92 StreamFlowControl sfc(&tfc);
93 // Check initial values are per http2 spec
94 EXPECT_EQ(tfc.acked_init_window(), 65535);
95 EXPECT_EQ(tfc.remote_window(), 65535);
96 EXPECT_EQ(tfc.target_frame_size(), 16384);
97 EXPECT_EQ(tfc.target_preferred_rx_crypto_frame_size(), INT_MAX);
98 EXPECT_EQ(sfc.remote_window_delta(), 0);
99 EXPECT_EQ(sfc.min_progress_size(), 0);
100 EXPECT_EQ(sfc.announced_window_delta(), 0);
101 }
102
TEST_F(FlowControlTest,SendData)103 TEST_F(FlowControlTest, SendData) {
104 ExecCtx exec_ctx;
105 TransportFlowControl tfc("test", true, &memory_owner_);
106 StreamFlowControl sfc(&tfc);
107 int64_t prev_preferred_rx_frame_size =
108 tfc.target_preferred_rx_crypto_frame_size();
109 {
110 StreamFlowControl::OutgoingUpdateContext sfc_upd(&sfc);
111 sfc_upd.SentData(1024);
112 }
113 EXPECT_EQ(sfc.remote_window_delta(), -1024);
114 EXPECT_EQ(tfc.remote_window(), 65535 - 1024);
115 EXPECT_EQ(tfc.target_preferred_rx_crypto_frame_size(),
116 prev_preferred_rx_frame_size);
117 }
118
TEST_F(FlowControlTest,InitialTransportUpdate)119 TEST_F(FlowControlTest, InitialTransportUpdate) {
120 ExecCtx exec_ctx;
121 TransportFlowControl tfc("test", true, &memory_owner_);
122 EXPECT_EQ(TransportFlowControl::IncomingUpdateContext(&tfc).MakeAction(),
123 FlowControlAction());
124 }
125
TEST_F(FlowControlTest,InitialStreamUpdate)126 TEST_F(FlowControlTest, InitialStreamUpdate) {
127 ExecCtx exec_ctx;
128 TransportFlowControl tfc("test", true, &memory_owner_);
129 StreamFlowControl sfc(&tfc);
130 EXPECT_EQ(StreamFlowControl::IncomingUpdateContext(&sfc).MakeAction(),
131 FlowControlAction());
132 }
133
TEST_F(FlowControlTest,PeriodicUpdate)134 TEST_F(FlowControlTest, PeriodicUpdate) {
135 ExecCtx exec_ctx;
136 TransportFlowControl tfc("test", true, &memory_owner_);
137 constexpr int kNumPeriodicUpdates = 100;
138 Timestamp next_ping = Timestamp::Now() + Duration::Milliseconds(1000);
139 uint32_t prev_max_frame_size = tfc.target_frame_size();
140 for (int i = 0; i < kNumPeriodicUpdates; i++) {
141 BdpEstimator* bdp = tfc.bdp_estimator();
142 bdp->AddIncomingBytes(1024 + (i * 100));
143 // Advance clock till the timestamp of the next ping.
144 AdvanceClockMillis((next_ping - Timestamp::Now()).millis());
145 bdp->SchedulePing();
146 bdp->StartPing();
147 AdvanceClockMillis(10);
148 next_ping = bdp->CompletePing();
149 FlowControlAction action = tfc.PeriodicUpdate();
150 if (IsTcpFrameSizeTuningEnabled()) {
151 if (action.send_max_frame_size_update() !=
152 FlowControlAction::Urgency::NO_ACTION_NEEDED) {
153 prev_max_frame_size = action.max_frame_size();
154 }
155 EXPECT_EQ(action.preferred_rx_crypto_frame_size(),
156 Clamp(2 * prev_max_frame_size, 16384u, 0x7fffffffu));
157 EXPECT_TRUE(action.preferred_rx_crypto_frame_size_update() !=
158 FlowControlAction::Urgency::NO_ACTION_NEEDED);
159 } else {
160 EXPECT_EQ(action.preferred_rx_crypto_frame_size(), 0);
161 EXPECT_TRUE(action.preferred_rx_crypto_frame_size_update() ==
162 FlowControlAction::Urgency::NO_ACTION_NEEDED);
163 }
164 }
165 }
166
TEST_F(FlowControlTest,RecvData)167 TEST_F(FlowControlTest, RecvData) {
168 ExecCtx exec_ctx;
169 TransportFlowControl tfc("test", true, &memory_owner_);
170 StreamFlowControl sfc(&tfc);
171 StreamFlowControl::IncomingUpdateContext sfc_upd(&sfc);
172 int64_t prev_preferred_rx_frame_size =
173 tfc.target_preferred_rx_crypto_frame_size();
174 EXPECT_EQ(absl::OkStatus(), sfc_upd.RecvData(1024));
175 std::ignore = sfc_upd.MakeAction();
176 EXPECT_EQ(tfc.announced_window(), 65535 - 1024);
177 EXPECT_EQ(sfc.announced_window_delta(), -1024);
178 EXPECT_EQ(tfc.target_preferred_rx_crypto_frame_size(),
179 prev_preferred_rx_frame_size);
180 }
181
TEST_F(FlowControlTest,TrackMinProgressSize)182 TEST_F(FlowControlTest, TrackMinProgressSize) {
183 ExecCtx exec_ctx;
184 TransportFlowControl tfc("test", true, &memory_owner_);
185 StreamFlowControl sfc(&tfc);
186 {
187 StreamFlowControl::IncomingUpdateContext sfc_upd(&sfc);
188 sfc_upd.SetMinProgressSize(5);
189 std::ignore = sfc_upd.MakeAction();
190 }
191 EXPECT_EQ(sfc.min_progress_size(), 5);
192 {
193 StreamFlowControl::IncomingUpdateContext sfc_upd(&sfc);
194 sfc_upd.SetMinProgressSize(10);
195 std::ignore = sfc_upd.MakeAction();
196 }
197 EXPECT_EQ(sfc.min_progress_size(), 10);
198 {
199 StreamFlowControl::IncomingUpdateContext sfc_upd(&sfc);
200 EXPECT_EQ(absl::OkStatus(), sfc_upd.RecvData(5));
201 std::ignore = sfc_upd.MakeAction();
202 }
203 EXPECT_EQ(sfc.min_progress_size(), 5);
204 {
205 StreamFlowControl::IncomingUpdateContext sfc_upd(&sfc);
206 EXPECT_EQ(absl::OkStatus(), sfc_upd.RecvData(5));
207 std::ignore = sfc_upd.MakeAction();
208 }
209 EXPECT_EQ(sfc.min_progress_size(), 0);
210 {
211 StreamFlowControl::IncomingUpdateContext sfc_upd(&sfc);
212 EXPECT_EQ(absl::OkStatus(), sfc_upd.RecvData(5));
213 std::ignore = sfc_upd.MakeAction();
214 }
215 EXPECT_EQ(sfc.min_progress_size(), 0);
216 }
217
TEST_F(FlowControlTest,NoUpdateWithoutReader)218 TEST_F(FlowControlTest, NoUpdateWithoutReader) {
219 ExecCtx exec_ctx;
220 TransportFlowControl tfc("test", true, &memory_owner_);
221 StreamFlowControl sfc(&tfc);
222 for (int i = 0; i < 65535; i++) {
223 StreamFlowControl::IncomingUpdateContext sfc_upd(&sfc);
224 EXPECT_EQ(sfc_upd.RecvData(1), absl::OkStatus());
225 EXPECT_EQ(sfc_upd.MakeAction().send_stream_update(),
226 FlowControlAction::Urgency::NO_ACTION_NEEDED);
227 }
228 // Empty window needing 1 byte to progress should trigger an immediate read.
229 {
230 StreamFlowControl::IncomingUpdateContext sfc_upd(&sfc);
231 sfc_upd.SetMinProgressSize(1);
232 EXPECT_EQ(sfc.min_progress_size(), 1);
233 EXPECT_EQ(sfc_upd.MakeAction().send_stream_update(),
234 FlowControlAction::Urgency::UPDATE_IMMEDIATELY);
235 }
236 EXPECT_GT(tfc.MaybeSendUpdate(false), 0);
237 EXPECT_GT(sfc.MaybeSendUpdate(), 0);
238 }
239
TEST_F(FlowControlTest,GradualReadsUpdate)240 TEST_F(FlowControlTest, GradualReadsUpdate) {
241 ExecCtx exec_ctx;
242 TransportFlowControl tfc("test", true, &memory_owner_);
243 StreamFlowControl sfc(&tfc);
244 int immediate_updates = 0;
245 int queued_updates = 0;
246 for (int i = 0; i < 65535; i++) {
247 StreamFlowControl::IncomingUpdateContext sfc_upd(&sfc);
248 EXPECT_EQ(sfc_upd.RecvData(1), absl::OkStatus());
249 sfc_upd.SetPendingSize(0);
250 switch (sfc_upd.MakeAction().send_stream_update()) {
251 case FlowControlAction::Urgency::UPDATE_IMMEDIATELY:
252 immediate_updates++;
253 break;
254 case FlowControlAction::Urgency::QUEUE_UPDATE:
255 queued_updates++;
256 break;
257 case FlowControlAction::Urgency::NO_ACTION_NEEDED:
258 break;
259 }
260 }
261 EXPECT_GE(immediate_updates, 0);
262 EXPECT_GT(queued_updates, 0);
263 EXPECT_EQ(immediate_updates + queued_updates, 65535);
264 }
265
266 } // namespace chttp2
267 } // namespace grpc_core
268
main(int argc,char ** argv)269 int main(int argc, char** argv) {
270 ::testing::InitGoogleTest(&argc, argv);
271 grpc_core::chttp2::g_test_only_transport_target_window_estimates_mocker =
272 new grpc_core::chttp2::TransportTargetWindowEstimatesMocker();
273 grpc_core::chttp2::InitGlobals();
274 return RUN_ALL_TESTS();
275 }
276