xref: /aosp_15_r20/external/grpc-grpc/test/core/iomgr/buffer_list_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/core/lib/iomgr/buffer_list.h"
20 
21 #include <gtest/gtest.h>
22 
23 #include <grpc/grpc.h>
24 #include <grpc/support/time.h>
25 
26 #include "src/core/lib/gpr/useful.h"
27 #include "src/core/lib/gprpp/time.h"
28 #include "src/core/lib/iomgr/exec_ctx.h"
29 #include "src/core/lib/iomgr/internal_errqueue.h"
30 #include "src/core/lib/iomgr/port.h"
31 
32 #ifdef GRPC_LINUX_ERRQUEUE
33 
34 constexpr uint64_t kMaxAdvanceTimeMillis = 24ull * 365 * 3600 * 1000;
35 
36 extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type);
37 
38 static gpr_timespec g_now;
now_impl(gpr_clock_type clock_type)39 gpr_timespec now_impl(gpr_clock_type clock_type) {
40   GPR_ASSERT(clock_type != GPR_TIMESPAN);
41   gpr_timespec ts = g_now;
42   ts.clock_type = clock_type;
43   return ts;
44 }
45 
InitGlobals()46 void InitGlobals() {
47   g_now = {1, 0, GPR_CLOCK_MONOTONIC};
48   grpc_core::TestOnlySetProcessEpoch(g_now);
49   gpr_now_impl = now_impl;
50 }
51 
AdvanceClockMillis(uint64_t millis)52 void AdvanceClockMillis(uint64_t millis) {
53   grpc_core::ExecCtx exec_ctx;
54   g_now = gpr_time_add(
55       g_now,
56       gpr_time_from_millis(grpc_core::Clamp(millis, static_cast<uint64_t>(1),
57                                             kMaxAdvanceTimeMillis),
58                            GPR_TIMESPAN));
59   exec_ctx.InvalidateNow();
60 }
61 
62 /// Tests that all TracedBuffer elements in the list are flushed out on
63 /// shutdown.
64 /// Also tests that arg is passed correctly.
65 ///
TEST(BufferListTest,Testshutdownflusheslist)66 TEST(BufferListTest, Testshutdownflusheslist) {
67   grpc_core::grpc_tcp_set_write_timestamps_callback(
68       [](void* arg, grpc_core::Timestamps* /*ts*/, grpc_error_handle error) {
69         ASSERT_TRUE(error.ok());
70         ASSERT_NE(arg, nullptr);
71         gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
72         gpr_atm_rel_store(done, gpr_atm{1});
73       });
74   grpc_core::TracedBufferList tb_list;
75 #define NUM_ELEM 5
76   gpr_atm verifier_called[NUM_ELEM];
77   for (auto i = 0; i < NUM_ELEM; i++) {
78     gpr_atm_rel_store(&verifier_called[i], gpr_atm{0});
79     tb_list.AddNewEntry(i, 0, static_cast<void*>(&verifier_called[i]));
80   }
81   tb_list.Shutdown(nullptr, absl::OkStatus());
82   for (auto i = 0; i < NUM_ELEM; i++) {
83     ASSERT_EQ(gpr_atm_acq_load(&verifier_called[i]), 1);
84   }
85 }
86 
TestVerifierCalledOnAckVerifier(void * arg,grpc_core::Timestamps * ts,grpc_error_handle error)87 static void TestVerifierCalledOnAckVerifier(void* arg,
88                                             grpc_core::Timestamps* ts,
89                                             grpc_error_handle error) {
90   ASSERT_TRUE(error.ok());
91   ASSERT_NE(arg, nullptr);
92   ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME);
93   ASSERT_EQ(ts->acked_time.time.tv_sec, 123);
94   ASSERT_EQ(ts->acked_time.time.tv_nsec, 456);
95   ASSERT_GT(ts->info.length, 0);
96   gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
97   gpr_atm_rel_store(done, gpr_atm{1});
98 }
99 
100 /// Tests that the timestamp verifier is called on an ACK timestamp.
101 ///
TEST(BufferListTest,Testverifiercalledonack)102 TEST(BufferListTest, Testverifiercalledonack) {
103   struct sock_extended_err serr;
104   serr.ee_data = 213;
105   serr.ee_info = grpc_core::SCM_TSTAMP_ACK;
106   struct grpc_core::scm_timestamping tss;
107   tss.ts[0].tv_sec = 123;
108   tss.ts[0].tv_nsec = 456;
109   grpc_core::grpc_tcp_set_write_timestamps_callback(
110       TestVerifierCalledOnAckVerifier);
111   grpc_core::TracedBufferList tb_list;
112   gpr_atm verifier_called;
113   gpr_atm_rel_store(&verifier_called, gpr_atm{0});
114   tb_list.AddNewEntry(213, 0, &verifier_called);
115   tb_list.ProcessTimestamp(&serr, nullptr, &tss);
116   ASSERT_EQ(gpr_atm_acq_load(&verifier_called), 1);
117   tb_list.Shutdown(nullptr, absl::OkStatus());
118 }
119 
120 /// Tests that shutdown can be called repeatedly.
121 ///
TEST(BufferListTest,Testrepeatedshutdown)122 TEST(BufferListTest, Testrepeatedshutdown) {
123   struct sock_extended_err serr;
124   serr.ee_data = 213;
125   serr.ee_info = grpc_core::SCM_TSTAMP_ACK;
126   struct grpc_core::scm_timestamping tss;
127   tss.ts[0].tv_sec = 123;
128   tss.ts[0].tv_nsec = 456;
129   grpc_core::grpc_tcp_set_write_timestamps_callback(
130       TestVerifierCalledOnAckVerifier);
131   grpc_core::TracedBufferList tb_list;
132   gpr_atm verifier_called;
133   gpr_atm_rel_store(&verifier_called, gpr_atm{0});
134   tb_list.AddNewEntry(213, 0, &verifier_called);
135   tb_list.ProcessTimestamp(&serr, nullptr, &tss);
136   ASSERT_EQ(gpr_atm_acq_load(&verifier_called), 1);
137   tb_list.Shutdown(nullptr, absl::OkStatus());
138   tb_list.Shutdown(nullptr, absl::OkStatus());
139   tb_list.Shutdown(nullptr, absl::OkStatus());
140 }
141 
TEST(BufferListTest,TestLongPendingAckForOneTracedBuffer)142 TEST(BufferListTest, TestLongPendingAckForOneTracedBuffer) {
143   constexpr int kMaxPendingAckMillis = 10000;
144   struct sock_extended_err serr[3];
145   gpr_atm verifier_called[3];
146   struct grpc_core::scm_timestamping tss;
147   grpc_core::TracedBufferList tb_list;
148   serr[0].ee_data = 1;
149   serr[0].ee_info = grpc_core::SCM_TSTAMP_SCHED;
150   serr[1].ee_data = 1;
151   serr[1].ee_info = grpc_core::SCM_TSTAMP_SND;
152   serr[2].ee_data = 1;
153   serr[2].ee_info = grpc_core::SCM_TSTAMP_ACK;
154   gpr_atm_rel_store(&verifier_called[0], static_cast<gpr_atm>(0));
155   gpr_atm_rel_store(&verifier_called[1], static_cast<gpr_atm>(0));
156   gpr_atm_rel_store(&verifier_called[2], static_cast<gpr_atm>(0));
157 
158   //  Add 3 traced buffers
159   tb_list.AddNewEntry(1, 0, &verifier_called[0]);
160   tb_list.AddNewEntry(2, 0, &verifier_called[1]);
161   tb_list.AddNewEntry(3, 0, &verifier_called[2]);
162 
163   AdvanceClockMillis(kMaxPendingAckMillis);
164   tss.ts[0].tv_sec = g_now.tv_sec;
165   tss.ts[0].tv_nsec = g_now.tv_nsec;
166 
167   // Process SCHED Timestamp for 1st traced buffer.
168   // Nothing should be flushed.
169   grpc_core::grpc_tcp_set_write_timestamps_callback(
170       [](void*, grpc_core::Timestamps*, grpc_error_handle) {
171         ASSERT_TRUE(false);
172       });
173   tb_list.ProcessTimestamp(&serr[0], nullptr, &tss);
174   ASSERT_EQ(tb_list.Size(), 3);
175   ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast<gpr_atm>(0));
176   ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast<gpr_atm>(0));
177   ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast<gpr_atm>(0));
178 
179   AdvanceClockMillis(kMaxPendingAckMillis);
180   tss.ts[0].tv_sec = g_now.tv_sec;
181   tss.ts[0].tv_nsec = g_now.tv_nsec;
182 
183   // Process SND Timestamp for 1st traced buffer. The second and third traced
184   // buffers must be flushed because the max pending ack time would have
185   // elapsed for them.
186   grpc_core::grpc_tcp_set_write_timestamps_callback(
187       [](void* arg, grpc_core::Timestamps*, grpc_error_handle error) {
188         ASSERT_EQ(error, absl::DeadlineExceededError("Ack timed out"));
189         ASSERT_NE(arg, nullptr);
190         gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
191         gpr_atm_rel_store(done, static_cast<gpr_atm>(1));
192       });
193   tb_list.ProcessTimestamp(&serr[1], nullptr, &tss);
194   ASSERT_EQ(tb_list.Size(), 1);
195   ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast<gpr_atm>(0));
196   ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast<gpr_atm>(1));
197   ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast<gpr_atm>(1));
198 
199   AdvanceClockMillis(kMaxPendingAckMillis);
200   tss.ts[0].tv_sec = g_now.tv_sec;
201   tss.ts[0].tv_nsec = g_now.tv_nsec;
202 
203   // Process ACK Timestamp for 1st traced buffer.
204   grpc_core::grpc_tcp_set_write_timestamps_callback(
205       [](void* arg, grpc_core::Timestamps* ts, grpc_error_handle error) {
206         ASSERT_TRUE(error.ok());
207         ASSERT_NE(arg, nullptr);
208         ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME);
209         ASSERT_EQ(ts->acked_time.time.tv_sec, g_now.tv_sec);
210         ASSERT_EQ(ts->acked_time.time.tv_nsec, g_now.tv_nsec);
211         ASSERT_GT(ts->info.length, 0);
212         gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
213         gpr_atm_rel_store(done, static_cast<gpr_atm>(2));
214       });
215   tb_list.ProcessTimestamp(&serr[2], nullptr, &tss);
216   ASSERT_EQ(tb_list.Size(), 0);
217   ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast<gpr_atm>(2));
218   ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast<gpr_atm>(1));
219   ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast<gpr_atm>(1));
220 
221   tb_list.Shutdown(nullptr, absl::OkStatus());
222 }
223 
TEST(BufferListTest,TestLongPendingAckForSomeTracedBuffers)224 TEST(BufferListTest, TestLongPendingAckForSomeTracedBuffers) {
225   constexpr int kNumTracedBuffers = 10;
226   constexpr int kMaxPendingAckMillis = 10000;
227   struct sock_extended_err serr[kNumTracedBuffers];
228   gpr_atm verifier_called[kNumTracedBuffers];
229   struct grpc_core::scm_timestamping tss;
230   tss.ts[0].tv_sec = 123;
231   tss.ts[0].tv_nsec = 456;
232   grpc_core::grpc_tcp_set_write_timestamps_callback(
233       [](void* arg, grpc_core::Timestamps* ts, grpc_error_handle error) {
234         ASSERT_NE(arg, nullptr);
235         if (error.ok()) {
236           ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME);
237           ASSERT_EQ(ts->acked_time.time.tv_sec, 123);
238           ASSERT_EQ(ts->acked_time.time.tv_nsec, 456);
239           ASSERT_GT(ts->info.length, 0);
240           *(reinterpret_cast<int*>(arg)) = 1;
241         } else if (error == absl::DeadlineExceededError("Ack timed out")) {
242           *(reinterpret_cast<int*>(arg)) = 2;
243         } else {
244           ASSERT_TRUE(false);
245         }
246       });
247   grpc_core::TracedBufferList tb_list;
248   for (int i = 0; i < kNumTracedBuffers; i++) {
249     serr[i].ee_data = i + 1;
250     serr[i].ee_info = grpc_core::SCM_TSTAMP_ACK;
251     gpr_atm_rel_store(&verifier_called[i], static_cast<gpr_atm>(0));
252     tb_list.AddNewEntry(i + 1, 0, &verifier_called[i]);
253   }
254   int elapsed_time_millis = 0;
255   int increment_millis = (2 * kMaxPendingAckMillis) / 10;
256   for (int i = 0; i < kNumTracedBuffers; i++) {
257     AdvanceClockMillis(increment_millis);
258     elapsed_time_millis += increment_millis;
259     tb_list.ProcessTimestamp(&serr[i], nullptr, &tss);
260     if (elapsed_time_millis > kMaxPendingAckMillis) {
261       // MaxPendingAckMillis has elapsed. the rest of tb_list must have been
262       // flushed now.
263       ASSERT_EQ(tb_list.Size(), 0);
264       if (elapsed_time_millis - kMaxPendingAckMillis == increment_millis) {
265         // The first ProcessTimestamp just after kMaxPendingAckMillis would have
266         // still successfully processed the head traced buffer entry and then
267         // discarded all the other remaining traced buffer entries. The first
268         // traced buffer entry would have been processed because the ACK
269         // timestamp was received for it.
270         ASSERT_EQ(gpr_atm_acq_load(&verifier_called[i]),
271                   static_cast<gpr_atm>(1));
272       } else {
273         ASSERT_EQ(gpr_atm_acq_load(&verifier_called[i]),
274                   static_cast<gpr_atm>(2));
275       }
276     } else {
277       ASSERT_EQ(tb_list.Size(), kNumTracedBuffers - (i + 1));
278       ASSERT_EQ(gpr_atm_acq_load(&verifier_called[i]), static_cast<gpr_atm>(1));
279     }
280   }
281   tb_list.Shutdown(nullptr, absl::OkStatus());
282 }
283 
main(int argc,char ** argv)284 int main(int argc, char** argv) {
285   ::testing::InitGoogleTest(&argc, argv);
286   InitGlobals();
287   return RUN_ALL_TESTS();
288 }
289 
290 #else  // GRPC_LINUX_ERRQUEUE
291 
main(int,char **)292 int main(int /*argc*/, char** /*argv*/) { return 0; }
293 
294 #endif  // GRPC_LINUX_ERRQUEUE
295