1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "src/core/lib/iomgr/buffer_list.h"
20
21 #include <gtest/gtest.h>
22
23 #include <grpc/grpc.h>
24 #include <grpc/support/time.h>
25
26 #include "src/core/lib/gpr/useful.h"
27 #include "src/core/lib/gprpp/time.h"
28 #include "src/core/lib/iomgr/exec_ctx.h"
29 #include "src/core/lib/iomgr/internal_errqueue.h"
30 #include "src/core/lib/iomgr/port.h"
31
32 #ifdef GRPC_LINUX_ERRQUEUE
33
34 constexpr uint64_t kMaxAdvanceTimeMillis = 24ull * 365 * 3600 * 1000;
35
36 extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type);
37
38 static gpr_timespec g_now;
now_impl(gpr_clock_type clock_type)39 gpr_timespec now_impl(gpr_clock_type clock_type) {
40 GPR_ASSERT(clock_type != GPR_TIMESPAN);
41 gpr_timespec ts = g_now;
42 ts.clock_type = clock_type;
43 return ts;
44 }
45
InitGlobals()46 void InitGlobals() {
47 g_now = {1, 0, GPR_CLOCK_MONOTONIC};
48 grpc_core::TestOnlySetProcessEpoch(g_now);
49 gpr_now_impl = now_impl;
50 }
51
AdvanceClockMillis(uint64_t millis)52 void AdvanceClockMillis(uint64_t millis) {
53 grpc_core::ExecCtx exec_ctx;
54 g_now = gpr_time_add(
55 g_now,
56 gpr_time_from_millis(grpc_core::Clamp(millis, static_cast<uint64_t>(1),
57 kMaxAdvanceTimeMillis),
58 GPR_TIMESPAN));
59 exec_ctx.InvalidateNow();
60 }
61
62 /// Tests that all TracedBuffer elements in the list are flushed out on
63 /// shutdown.
64 /// Also tests that arg is passed correctly.
65 ///
TEST(BufferListTest,Testshutdownflusheslist)66 TEST(BufferListTest, Testshutdownflusheslist) {
67 grpc_core::grpc_tcp_set_write_timestamps_callback(
68 [](void* arg, grpc_core::Timestamps* /*ts*/, grpc_error_handle error) {
69 ASSERT_TRUE(error.ok());
70 ASSERT_NE(arg, nullptr);
71 gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
72 gpr_atm_rel_store(done, gpr_atm{1});
73 });
74 grpc_core::TracedBufferList tb_list;
75 #define NUM_ELEM 5
76 gpr_atm verifier_called[NUM_ELEM];
77 for (auto i = 0; i < NUM_ELEM; i++) {
78 gpr_atm_rel_store(&verifier_called[i], gpr_atm{0});
79 tb_list.AddNewEntry(i, 0, static_cast<void*>(&verifier_called[i]));
80 }
81 tb_list.Shutdown(nullptr, absl::OkStatus());
82 for (auto i = 0; i < NUM_ELEM; i++) {
83 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[i]), 1);
84 }
85 }
86
TestVerifierCalledOnAckVerifier(void * arg,grpc_core::Timestamps * ts,grpc_error_handle error)87 static void TestVerifierCalledOnAckVerifier(void* arg,
88 grpc_core::Timestamps* ts,
89 grpc_error_handle error) {
90 ASSERT_TRUE(error.ok());
91 ASSERT_NE(arg, nullptr);
92 ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME);
93 ASSERT_EQ(ts->acked_time.time.tv_sec, 123);
94 ASSERT_EQ(ts->acked_time.time.tv_nsec, 456);
95 ASSERT_GT(ts->info.length, 0);
96 gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
97 gpr_atm_rel_store(done, gpr_atm{1});
98 }
99
100 /// Tests that the timestamp verifier is called on an ACK timestamp.
101 ///
TEST(BufferListTest,Testverifiercalledonack)102 TEST(BufferListTest, Testverifiercalledonack) {
103 struct sock_extended_err serr;
104 serr.ee_data = 213;
105 serr.ee_info = grpc_core::SCM_TSTAMP_ACK;
106 struct grpc_core::scm_timestamping tss;
107 tss.ts[0].tv_sec = 123;
108 tss.ts[0].tv_nsec = 456;
109 grpc_core::grpc_tcp_set_write_timestamps_callback(
110 TestVerifierCalledOnAckVerifier);
111 grpc_core::TracedBufferList tb_list;
112 gpr_atm verifier_called;
113 gpr_atm_rel_store(&verifier_called, gpr_atm{0});
114 tb_list.AddNewEntry(213, 0, &verifier_called);
115 tb_list.ProcessTimestamp(&serr, nullptr, &tss);
116 ASSERT_EQ(gpr_atm_acq_load(&verifier_called), 1);
117 tb_list.Shutdown(nullptr, absl::OkStatus());
118 }
119
120 /// Tests that shutdown can be called repeatedly.
121 ///
TEST(BufferListTest,Testrepeatedshutdown)122 TEST(BufferListTest, Testrepeatedshutdown) {
123 struct sock_extended_err serr;
124 serr.ee_data = 213;
125 serr.ee_info = grpc_core::SCM_TSTAMP_ACK;
126 struct grpc_core::scm_timestamping tss;
127 tss.ts[0].tv_sec = 123;
128 tss.ts[0].tv_nsec = 456;
129 grpc_core::grpc_tcp_set_write_timestamps_callback(
130 TestVerifierCalledOnAckVerifier);
131 grpc_core::TracedBufferList tb_list;
132 gpr_atm verifier_called;
133 gpr_atm_rel_store(&verifier_called, gpr_atm{0});
134 tb_list.AddNewEntry(213, 0, &verifier_called);
135 tb_list.ProcessTimestamp(&serr, nullptr, &tss);
136 ASSERT_EQ(gpr_atm_acq_load(&verifier_called), 1);
137 tb_list.Shutdown(nullptr, absl::OkStatus());
138 tb_list.Shutdown(nullptr, absl::OkStatus());
139 tb_list.Shutdown(nullptr, absl::OkStatus());
140 }
141
TEST(BufferListTest,TestLongPendingAckForOneTracedBuffer)142 TEST(BufferListTest, TestLongPendingAckForOneTracedBuffer) {
143 constexpr int kMaxPendingAckMillis = 10000;
144 struct sock_extended_err serr[3];
145 gpr_atm verifier_called[3];
146 struct grpc_core::scm_timestamping tss;
147 grpc_core::TracedBufferList tb_list;
148 serr[0].ee_data = 1;
149 serr[0].ee_info = grpc_core::SCM_TSTAMP_SCHED;
150 serr[1].ee_data = 1;
151 serr[1].ee_info = grpc_core::SCM_TSTAMP_SND;
152 serr[2].ee_data = 1;
153 serr[2].ee_info = grpc_core::SCM_TSTAMP_ACK;
154 gpr_atm_rel_store(&verifier_called[0], static_cast<gpr_atm>(0));
155 gpr_atm_rel_store(&verifier_called[1], static_cast<gpr_atm>(0));
156 gpr_atm_rel_store(&verifier_called[2], static_cast<gpr_atm>(0));
157
158 // Add 3 traced buffers
159 tb_list.AddNewEntry(1, 0, &verifier_called[0]);
160 tb_list.AddNewEntry(2, 0, &verifier_called[1]);
161 tb_list.AddNewEntry(3, 0, &verifier_called[2]);
162
163 AdvanceClockMillis(kMaxPendingAckMillis);
164 tss.ts[0].tv_sec = g_now.tv_sec;
165 tss.ts[0].tv_nsec = g_now.tv_nsec;
166
167 // Process SCHED Timestamp for 1st traced buffer.
168 // Nothing should be flushed.
169 grpc_core::grpc_tcp_set_write_timestamps_callback(
170 [](void*, grpc_core::Timestamps*, grpc_error_handle) {
171 ASSERT_TRUE(false);
172 });
173 tb_list.ProcessTimestamp(&serr[0], nullptr, &tss);
174 ASSERT_EQ(tb_list.Size(), 3);
175 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast<gpr_atm>(0));
176 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast<gpr_atm>(0));
177 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast<gpr_atm>(0));
178
179 AdvanceClockMillis(kMaxPendingAckMillis);
180 tss.ts[0].tv_sec = g_now.tv_sec;
181 tss.ts[0].tv_nsec = g_now.tv_nsec;
182
183 // Process SND Timestamp for 1st traced buffer. The second and third traced
184 // buffers must be flushed because the max pending ack time would have
185 // elapsed for them.
186 grpc_core::grpc_tcp_set_write_timestamps_callback(
187 [](void* arg, grpc_core::Timestamps*, grpc_error_handle error) {
188 ASSERT_EQ(error, absl::DeadlineExceededError("Ack timed out"));
189 ASSERT_NE(arg, nullptr);
190 gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
191 gpr_atm_rel_store(done, static_cast<gpr_atm>(1));
192 });
193 tb_list.ProcessTimestamp(&serr[1], nullptr, &tss);
194 ASSERT_EQ(tb_list.Size(), 1);
195 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast<gpr_atm>(0));
196 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast<gpr_atm>(1));
197 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast<gpr_atm>(1));
198
199 AdvanceClockMillis(kMaxPendingAckMillis);
200 tss.ts[0].tv_sec = g_now.tv_sec;
201 tss.ts[0].tv_nsec = g_now.tv_nsec;
202
203 // Process ACK Timestamp for 1st traced buffer.
204 grpc_core::grpc_tcp_set_write_timestamps_callback(
205 [](void* arg, grpc_core::Timestamps* ts, grpc_error_handle error) {
206 ASSERT_TRUE(error.ok());
207 ASSERT_NE(arg, nullptr);
208 ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME);
209 ASSERT_EQ(ts->acked_time.time.tv_sec, g_now.tv_sec);
210 ASSERT_EQ(ts->acked_time.time.tv_nsec, g_now.tv_nsec);
211 ASSERT_GT(ts->info.length, 0);
212 gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
213 gpr_atm_rel_store(done, static_cast<gpr_atm>(2));
214 });
215 tb_list.ProcessTimestamp(&serr[2], nullptr, &tss);
216 ASSERT_EQ(tb_list.Size(), 0);
217 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast<gpr_atm>(2));
218 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast<gpr_atm>(1));
219 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast<gpr_atm>(1));
220
221 tb_list.Shutdown(nullptr, absl::OkStatus());
222 }
223
TEST(BufferListTest,TestLongPendingAckForSomeTracedBuffers)224 TEST(BufferListTest, TestLongPendingAckForSomeTracedBuffers) {
225 constexpr int kNumTracedBuffers = 10;
226 constexpr int kMaxPendingAckMillis = 10000;
227 struct sock_extended_err serr[kNumTracedBuffers];
228 gpr_atm verifier_called[kNumTracedBuffers];
229 struct grpc_core::scm_timestamping tss;
230 tss.ts[0].tv_sec = 123;
231 tss.ts[0].tv_nsec = 456;
232 grpc_core::grpc_tcp_set_write_timestamps_callback(
233 [](void* arg, grpc_core::Timestamps* ts, grpc_error_handle error) {
234 ASSERT_NE(arg, nullptr);
235 if (error.ok()) {
236 ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME);
237 ASSERT_EQ(ts->acked_time.time.tv_sec, 123);
238 ASSERT_EQ(ts->acked_time.time.tv_nsec, 456);
239 ASSERT_GT(ts->info.length, 0);
240 *(reinterpret_cast<int*>(arg)) = 1;
241 } else if (error == absl::DeadlineExceededError("Ack timed out")) {
242 *(reinterpret_cast<int*>(arg)) = 2;
243 } else {
244 ASSERT_TRUE(false);
245 }
246 });
247 grpc_core::TracedBufferList tb_list;
248 for (int i = 0; i < kNumTracedBuffers; i++) {
249 serr[i].ee_data = i + 1;
250 serr[i].ee_info = grpc_core::SCM_TSTAMP_ACK;
251 gpr_atm_rel_store(&verifier_called[i], static_cast<gpr_atm>(0));
252 tb_list.AddNewEntry(i + 1, 0, &verifier_called[i]);
253 }
254 int elapsed_time_millis = 0;
255 int increment_millis = (2 * kMaxPendingAckMillis) / 10;
256 for (int i = 0; i < kNumTracedBuffers; i++) {
257 AdvanceClockMillis(increment_millis);
258 elapsed_time_millis += increment_millis;
259 tb_list.ProcessTimestamp(&serr[i], nullptr, &tss);
260 if (elapsed_time_millis > kMaxPendingAckMillis) {
261 // MaxPendingAckMillis has elapsed. the rest of tb_list must have been
262 // flushed now.
263 ASSERT_EQ(tb_list.Size(), 0);
264 if (elapsed_time_millis - kMaxPendingAckMillis == increment_millis) {
265 // The first ProcessTimestamp just after kMaxPendingAckMillis would have
266 // still successfully processed the head traced buffer entry and then
267 // discarded all the other remaining traced buffer entries. The first
268 // traced buffer entry would have been processed because the ACK
269 // timestamp was received for it.
270 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[i]),
271 static_cast<gpr_atm>(1));
272 } else {
273 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[i]),
274 static_cast<gpr_atm>(2));
275 }
276 } else {
277 ASSERT_EQ(tb_list.Size(), kNumTracedBuffers - (i + 1));
278 ASSERT_EQ(gpr_atm_acq_load(&verifier_called[i]), static_cast<gpr_atm>(1));
279 }
280 }
281 tb_list.Shutdown(nullptr, absl::OkStatus());
282 }
283
main(int argc,char ** argv)284 int main(int argc, char** argv) {
285 ::testing::InitGoogleTest(&argc, argv);
286 InitGlobals();
287 return RUN_ALL_TESTS();
288 }
289
290 #else // GRPC_LINUX_ERRQUEUE
291
main(int,char **)292 int main(int /*argc*/, char** /*argv*/) { return 0; }
293
294 #endif // GRPC_LINUX_ERRQUEUE
295