xref: /aosp_15_r20/external/grpc-grpc/test/core/transport/chttp2/flow_control_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
16 
17 #include <memory>
18 #include <tuple>
19 
20 #include "gtest/gtest.h"
21 
22 #include <grpc/support/time.h>
23 
24 #include "src/core/lib/experiments/experiments.h"
25 #include "src/core/lib/gpr/useful.h"
26 #include "src/core/lib/gprpp/ref_counted_ptr.h"
27 #include "src/core/lib/gprpp/time.h"
28 #include "src/core/lib/iomgr/exec_ctx.h"
29 #include "src/core/lib/resource_quota/resource_quota.h"
30 #include "src/core/lib/transport/bdp_estimator.h"
31 
32 extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type);
33 
34 namespace grpc_core {
35 namespace chttp2 {
36 
37 namespace {
38 
39 constexpr uint64_t kMaxAdvanceTimeMillis = 24ull * 365 * 3600 * 1000;
40 
41 gpr_timespec g_now;
now_impl(gpr_clock_type clock_type)42 gpr_timespec now_impl(gpr_clock_type clock_type) {
43   GPR_ASSERT(clock_type != GPR_TIMESPAN);
44   gpr_timespec ts = g_now;
45   ts.clock_type = clock_type;
46   return ts;
47 }
48 
InitGlobals()49 void InitGlobals() {
50   g_now = {1, 0, GPR_CLOCK_MONOTONIC};
51   TestOnlySetProcessEpoch(g_now);
52   gpr_now_impl = now_impl;
53 }
54 
AdvanceClockMillis(uint64_t millis)55 void AdvanceClockMillis(uint64_t millis) {
56   ExecCtx exec_ctx;
57   g_now = gpr_time_add(g_now, gpr_time_from_millis(Clamp(millis, uint64_t{1},
58                                                          kMaxAdvanceTimeMillis),
59                                                    GPR_TIMESPAN));
60   exec_ctx.InvalidateNow();
61 }
62 
63 class TransportTargetWindowEstimatesMocker
64     : public chttp2::TestOnlyTransportTargetWindowEstimatesMocker {
65  public:
TransportTargetWindowEstimatesMocker()66   explicit TransportTargetWindowEstimatesMocker() {}
67 
ComputeNextTargetInitialWindowSizeFromPeriodicUpdate(double current_target)68   double ComputeNextTargetInitialWindowSizeFromPeriodicUpdate(
69       double current_target) override {
70     const double kSmallWindow = 16384;
71     const double kBigWindow = 1024 * 1024;
72     // Bounce back and forth between small and big initial windows.
73     if (current_target > kSmallWindow) {
74       return kSmallWindow;
75     } else {
76       return kBigWindow;
77     }
78   }
79 };
80 
81 }  // namespace
82 
83 class FlowControlTest : public ::testing::Test {
84  protected:
85   MemoryOwner memory_owner_ = MemoryOwner(
86       ResourceQuota::Default()->memory_quota()->CreateMemoryOwner());
87 };
88 
TEST_F(FlowControlTest,NoOp)89 TEST_F(FlowControlTest, NoOp) {
90   ExecCtx exec_ctx;
91   TransportFlowControl tfc("test", true, &memory_owner_);
92   StreamFlowControl sfc(&tfc);
93   // Check initial values are per http2 spec
94   EXPECT_EQ(tfc.acked_init_window(), 65535);
95   EXPECT_EQ(tfc.remote_window(), 65535);
96   EXPECT_EQ(tfc.target_frame_size(), 16384);
97   EXPECT_EQ(tfc.target_preferred_rx_crypto_frame_size(), INT_MAX);
98   EXPECT_EQ(sfc.remote_window_delta(), 0);
99   EXPECT_EQ(sfc.min_progress_size(), 0);
100   EXPECT_EQ(sfc.announced_window_delta(), 0);
101 }
102 
TEST_F(FlowControlTest,SendData)103 TEST_F(FlowControlTest, SendData) {
104   ExecCtx exec_ctx;
105   TransportFlowControl tfc("test", true, &memory_owner_);
106   StreamFlowControl sfc(&tfc);
107   int64_t prev_preferred_rx_frame_size =
108       tfc.target_preferred_rx_crypto_frame_size();
109   {
110     StreamFlowControl::OutgoingUpdateContext sfc_upd(&sfc);
111     sfc_upd.SentData(1024);
112   }
113   EXPECT_EQ(sfc.remote_window_delta(), -1024);
114   EXPECT_EQ(tfc.remote_window(), 65535 - 1024);
115   EXPECT_EQ(tfc.target_preferred_rx_crypto_frame_size(),
116             prev_preferred_rx_frame_size);
117 }
118 
TEST_F(FlowControlTest,InitialTransportUpdate)119 TEST_F(FlowControlTest, InitialTransportUpdate) {
120   ExecCtx exec_ctx;
121   TransportFlowControl tfc("test", true, &memory_owner_);
122   EXPECT_EQ(TransportFlowControl::IncomingUpdateContext(&tfc).MakeAction(),
123             FlowControlAction());
124 }
125 
TEST_F(FlowControlTest,InitialStreamUpdate)126 TEST_F(FlowControlTest, InitialStreamUpdate) {
127   ExecCtx exec_ctx;
128   TransportFlowControl tfc("test", true, &memory_owner_);
129   StreamFlowControl sfc(&tfc);
130   EXPECT_EQ(StreamFlowControl::IncomingUpdateContext(&sfc).MakeAction(),
131             FlowControlAction());
132 }
133 
TEST_F(FlowControlTest,PeriodicUpdate)134 TEST_F(FlowControlTest, PeriodicUpdate) {
135   ExecCtx exec_ctx;
136   TransportFlowControl tfc("test", true, &memory_owner_);
137   constexpr int kNumPeriodicUpdates = 100;
138   Timestamp next_ping = Timestamp::Now() + Duration::Milliseconds(1000);
139   uint32_t prev_max_frame_size = tfc.target_frame_size();
140   for (int i = 0; i < kNumPeriodicUpdates; i++) {
141     BdpEstimator* bdp = tfc.bdp_estimator();
142     bdp->AddIncomingBytes(1024 + (i * 100));
143     // Advance clock till the timestamp of the next ping.
144     AdvanceClockMillis((next_ping - Timestamp::Now()).millis());
145     bdp->SchedulePing();
146     bdp->StartPing();
147     AdvanceClockMillis(10);
148     next_ping = bdp->CompletePing();
149     FlowControlAction action = tfc.PeriodicUpdate();
150     if (IsTcpFrameSizeTuningEnabled()) {
151       if (action.send_max_frame_size_update() !=
152           FlowControlAction::Urgency::NO_ACTION_NEEDED) {
153         prev_max_frame_size = action.max_frame_size();
154       }
155       EXPECT_EQ(action.preferred_rx_crypto_frame_size(),
156                 Clamp(2 * prev_max_frame_size, 16384u, 0x7fffffffu));
157       EXPECT_TRUE(action.preferred_rx_crypto_frame_size_update() !=
158                   FlowControlAction::Urgency::NO_ACTION_NEEDED);
159     } else {
160       EXPECT_EQ(action.preferred_rx_crypto_frame_size(), 0);
161       EXPECT_TRUE(action.preferred_rx_crypto_frame_size_update() ==
162                   FlowControlAction::Urgency::NO_ACTION_NEEDED);
163     }
164   }
165 }
166 
TEST_F(FlowControlTest,RecvData)167 TEST_F(FlowControlTest, RecvData) {
168   ExecCtx exec_ctx;
169   TransportFlowControl tfc("test", true, &memory_owner_);
170   StreamFlowControl sfc(&tfc);
171   StreamFlowControl::IncomingUpdateContext sfc_upd(&sfc);
172   int64_t prev_preferred_rx_frame_size =
173       tfc.target_preferred_rx_crypto_frame_size();
174   EXPECT_EQ(absl::OkStatus(), sfc_upd.RecvData(1024));
175   std::ignore = sfc_upd.MakeAction();
176   EXPECT_EQ(tfc.announced_window(), 65535 - 1024);
177   EXPECT_EQ(sfc.announced_window_delta(), -1024);
178   EXPECT_EQ(tfc.target_preferred_rx_crypto_frame_size(),
179             prev_preferred_rx_frame_size);
180 }
181 
TEST_F(FlowControlTest,TrackMinProgressSize)182 TEST_F(FlowControlTest, TrackMinProgressSize) {
183   ExecCtx exec_ctx;
184   TransportFlowControl tfc("test", true, &memory_owner_);
185   StreamFlowControl sfc(&tfc);
186   {
187     StreamFlowControl::IncomingUpdateContext sfc_upd(&sfc);
188     sfc_upd.SetMinProgressSize(5);
189     std::ignore = sfc_upd.MakeAction();
190   }
191   EXPECT_EQ(sfc.min_progress_size(), 5);
192   {
193     StreamFlowControl::IncomingUpdateContext sfc_upd(&sfc);
194     sfc_upd.SetMinProgressSize(10);
195     std::ignore = sfc_upd.MakeAction();
196   }
197   EXPECT_EQ(sfc.min_progress_size(), 10);
198   {
199     StreamFlowControl::IncomingUpdateContext sfc_upd(&sfc);
200     EXPECT_EQ(absl::OkStatus(), sfc_upd.RecvData(5));
201     std::ignore = sfc_upd.MakeAction();
202   }
203   EXPECT_EQ(sfc.min_progress_size(), 5);
204   {
205     StreamFlowControl::IncomingUpdateContext sfc_upd(&sfc);
206     EXPECT_EQ(absl::OkStatus(), sfc_upd.RecvData(5));
207     std::ignore = sfc_upd.MakeAction();
208   }
209   EXPECT_EQ(sfc.min_progress_size(), 0);
210   {
211     StreamFlowControl::IncomingUpdateContext sfc_upd(&sfc);
212     EXPECT_EQ(absl::OkStatus(), sfc_upd.RecvData(5));
213     std::ignore = sfc_upd.MakeAction();
214   }
215   EXPECT_EQ(sfc.min_progress_size(), 0);
216 }
217 
TEST_F(FlowControlTest,NoUpdateWithoutReader)218 TEST_F(FlowControlTest, NoUpdateWithoutReader) {
219   ExecCtx exec_ctx;
220   TransportFlowControl tfc("test", true, &memory_owner_);
221   StreamFlowControl sfc(&tfc);
222   for (int i = 0; i < 65535; i++) {
223     StreamFlowControl::IncomingUpdateContext sfc_upd(&sfc);
224     EXPECT_EQ(sfc_upd.RecvData(1), absl::OkStatus());
225     EXPECT_EQ(sfc_upd.MakeAction().send_stream_update(),
226               FlowControlAction::Urgency::NO_ACTION_NEEDED);
227   }
228   // Empty window needing 1 byte to progress should trigger an immediate read.
229   {
230     StreamFlowControl::IncomingUpdateContext sfc_upd(&sfc);
231     sfc_upd.SetMinProgressSize(1);
232     EXPECT_EQ(sfc.min_progress_size(), 1);
233     EXPECT_EQ(sfc_upd.MakeAction().send_stream_update(),
234               FlowControlAction::Urgency::UPDATE_IMMEDIATELY);
235   }
236   EXPECT_GT(tfc.MaybeSendUpdate(false), 0);
237   EXPECT_GT(sfc.MaybeSendUpdate(), 0);
238 }
239 
TEST_F(FlowControlTest,GradualReadsUpdate)240 TEST_F(FlowControlTest, GradualReadsUpdate) {
241   ExecCtx exec_ctx;
242   TransportFlowControl tfc("test", true, &memory_owner_);
243   StreamFlowControl sfc(&tfc);
244   int immediate_updates = 0;
245   int queued_updates = 0;
246   for (int i = 0; i < 65535; i++) {
247     StreamFlowControl::IncomingUpdateContext sfc_upd(&sfc);
248     EXPECT_EQ(sfc_upd.RecvData(1), absl::OkStatus());
249     sfc_upd.SetPendingSize(0);
250     switch (sfc_upd.MakeAction().send_stream_update()) {
251       case FlowControlAction::Urgency::UPDATE_IMMEDIATELY:
252         immediate_updates++;
253         break;
254       case FlowControlAction::Urgency::QUEUE_UPDATE:
255         queued_updates++;
256         break;
257       case FlowControlAction::Urgency::NO_ACTION_NEEDED:
258         break;
259     }
260   }
261   EXPECT_GE(immediate_updates, 0);
262   EXPECT_GT(queued_updates, 0);
263   EXPECT_EQ(immediate_updates + queued_updates, 65535);
264 }
265 
266 }  // namespace chttp2
267 }  // namespace grpc_core
268 
main(int argc,char ** argv)269 int main(int argc, char** argv) {
270   ::testing::InitGoogleTest(&argc, argv);
271   grpc_core::chttp2::g_test_only_transport_target_window_estimates_mocker =
272       new grpc_core::chttp2::TransportTargetWindowEstimatesMocker();
273   grpc_core::chttp2::InitGlobals();
274   return RUN_ALL_TESTS();
275 }
276