1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "test/core/iomgr/endpoint_tests.h"
20
21 #include <limits.h>
22 #include <stdbool.h>
23 #include <sys/types.h>
24
25 #include <grpc/slice.h>
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/log.h>
28 #include <grpc/support/time.h>
29
30 #include "src/core/lib/gpr/useful.h"
31 #include "src/core/lib/gprpp/crash.h"
32 #include "src/core/lib/gprpp/time.h"
33 #include "src/core/lib/iomgr/error.h"
34 #include "src/core/lib/slice/slice_internal.h"
35 #include "test/core/util/test_config.h"
36
37 //
38 // General test notes:
39
40 // All tests which write data into an endpoint write i%256 into byte i, which
41 // is verified by readers.
42
43 // In general there are a few interesting things to vary which may lead to
44 // exercising different codepaths in an implementation:
45 // 1. Total amount of data written to the endpoint
46 // 2. Size of slice allocations
47 // 3. Amount of data we read from or write to the endpoint at once
48
49 // The tests here tend to parameterize these where applicable.
50
51 //
52
53 static gpr_mu* g_mu;
54 static grpc_pollset* g_pollset;
55
count_slices(grpc_slice * slices,size_t nslices,int * current_data)56 size_t count_slices(grpc_slice* slices, size_t nslices, int* current_data) {
57 size_t num_bytes = 0;
58 size_t i;
59 size_t j;
60 unsigned char* buf;
61 for (i = 0; i < nslices; ++i) {
62 buf = GRPC_SLICE_START_PTR(slices[i]);
63 for (j = 0; j < GRPC_SLICE_LENGTH(slices[i]); ++j) {
64 GPR_ASSERT(buf[j] == *current_data);
65 *current_data = (*current_data + 1) % 256;
66 }
67 num_bytes += GRPC_SLICE_LENGTH(slices[i]);
68 }
69 return num_bytes;
70 }
71
begin_test(grpc_endpoint_test_config config,const char * test_name,size_t slice_size)72 static grpc_endpoint_test_fixture begin_test(grpc_endpoint_test_config config,
73 const char* test_name,
74 size_t slice_size) {
75 gpr_log(GPR_INFO, "%s/%s", test_name, config.name);
76 return config.create_fixture(slice_size);
77 }
78
end_test(grpc_endpoint_test_config config)79 static void end_test(grpc_endpoint_test_config config) { config.clean_up(); }
80
allocate_blocks(size_t num_bytes,size_t slice_size,size_t * num_blocks,uint8_t * current_data)81 static grpc_slice* allocate_blocks(size_t num_bytes, size_t slice_size,
82 size_t* num_blocks, uint8_t* current_data) {
83 size_t nslices = num_bytes / slice_size + (num_bytes % slice_size ? 1 : 0);
84 grpc_slice* slices =
85 static_cast<grpc_slice*>(gpr_malloc(sizeof(grpc_slice) * nslices));
86 size_t num_bytes_left = num_bytes;
87 size_t i;
88 size_t j;
89 unsigned char* buf;
90 *num_blocks = nslices;
91
92 for (i = 0; i < nslices; ++i) {
93 slices[i] = grpc_slice_malloc(slice_size > num_bytes_left ? num_bytes_left
94 : slice_size);
95 num_bytes_left -= GRPC_SLICE_LENGTH(slices[i]);
96 buf = GRPC_SLICE_START_PTR(slices[i]);
97 for (j = 0; j < GRPC_SLICE_LENGTH(slices[i]); ++j) {
98 buf[j] = *current_data;
99 (*current_data)++;
100 }
101 }
102 GPR_ASSERT(num_bytes_left == 0);
103 return slices;
104 }
105
106 struct read_and_write_test_state {
107 grpc_endpoint* read_ep;
108 grpc_endpoint* write_ep;
109 size_t target_bytes;
110 size_t bytes_read;
111 size_t current_write_size;
112 size_t bytes_written;
113 int current_read_data;
114 uint8_t current_write_data;
115 int read_done;
116 int write_done;
117 int max_write_frame_size;
118 grpc_slice_buffer incoming;
119 grpc_slice_buffer outgoing;
120 grpc_closure done_read;
121 grpc_closure done_write;
122 grpc_closure read_scheduler;
123 grpc_closure write_scheduler;
124 };
125
read_scheduler(void * data,grpc_error_handle)126 static void read_scheduler(void* data, grpc_error_handle /* error */) {
127 struct read_and_write_test_state* state =
128 static_cast<struct read_and_write_test_state*>(data);
129 grpc_endpoint_read(state->read_ep, &state->incoming, &state->done_read,
130 /*urgent=*/false, /*min_progress_size=*/1);
131 }
132
read_and_write_test_read_handler_read_done(read_and_write_test_state * state,int read_done_state)133 static void read_and_write_test_read_handler_read_done(
134 read_and_write_test_state* state, int read_done_state) {
135 gpr_log(GPR_DEBUG, "Read handler done");
136 gpr_mu_lock(g_mu);
137 state->read_done = read_done_state;
138 GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr));
139 gpr_mu_unlock(g_mu);
140 }
141
read_and_write_test_read_handler(void * data,grpc_error_handle error)142 static void read_and_write_test_read_handler(void* data,
143 grpc_error_handle error) {
144 struct read_and_write_test_state* state =
145 static_cast<struct read_and_write_test_state*>(data);
146 if (!error.ok()) {
147 read_and_write_test_read_handler_read_done(state, 1);
148 return;
149 }
150 state->bytes_read += count_slices(
151 state->incoming.slices, state->incoming.count, &state->current_read_data);
152 if (state->bytes_read == state->target_bytes) {
153 read_and_write_test_read_handler_read_done(state, 2);
154 return;
155 }
156 // We perform many reads one after another. If grpc_endpoint_read and the
157 // read_handler are both run inline, we might end up growing the stack
158 // beyond the limit. Schedule the read on ExecCtx to avoid this.
159 grpc_core::ExecCtx::Run(DEBUG_LOCATION, &state->read_scheduler,
160 absl::OkStatus());
161 }
162
write_scheduler(void * data,grpc_error_handle)163 static void write_scheduler(void* data, grpc_error_handle /* error */) {
164 struct read_and_write_test_state* state =
165 static_cast<struct read_and_write_test_state*>(data);
166 grpc_endpoint_write(state->write_ep, &state->outgoing, &state->done_write,
167 nullptr, /*max_frame_size=*/state->max_write_frame_size);
168 }
169
read_and_write_test_write_handler(void * data,grpc_error_handle error)170 static void read_and_write_test_write_handler(void* data,
171 grpc_error_handle error) {
172 struct read_and_write_test_state* state =
173 static_cast<struct read_and_write_test_state*>(data);
174 grpc_slice* slices = nullptr;
175 size_t nslices;
176
177 if (error.ok()) {
178 state->bytes_written += state->current_write_size;
179 if (state->target_bytes - state->bytes_written <
180 state->current_write_size) {
181 state->current_write_size = state->target_bytes - state->bytes_written;
182 }
183 if (state->current_write_size != 0) {
184 slices = allocate_blocks(state->current_write_size, 8192, &nslices,
185 &state->current_write_data);
186 grpc_slice_buffer_reset_and_unref(&state->outgoing);
187 grpc_slice_buffer_addn(&state->outgoing, slices, nslices);
188 // We perform many writes one after another. If grpc_endpoint_write and
189 // the write_handler are both run inline, we might end up growing the
190 // stack beyond the limit. Schedule the write on ExecCtx to avoid this.
191 grpc_core::ExecCtx::Run(DEBUG_LOCATION, &state->write_scheduler,
192 absl::OkStatus());
193 gpr_free(slices);
194 return;
195 }
196 }
197
198 gpr_log(GPR_DEBUG, "Write handler done");
199 gpr_mu_lock(g_mu);
200 state->write_done = 1 + (error.ok());
201 GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr));
202 gpr_mu_unlock(g_mu);
203 }
204
205 // Do both reading and writing using the grpc_endpoint API.
206
207 // This also includes a test of the shutdown behavior.
208 //
read_and_write_test(grpc_endpoint_test_config config,size_t num_bytes,size_t write_size,size_t slice_size,int max_write_frame_size,bool shutdown)209 static void read_and_write_test(grpc_endpoint_test_config config,
210 size_t num_bytes, size_t write_size,
211 size_t slice_size, int max_write_frame_size,
212 bool shutdown) {
213 struct read_and_write_test_state state;
214 grpc_endpoint_test_fixture f =
215 begin_test(config, "read_and_write_test", slice_size);
216 grpc_core::ExecCtx exec_ctx;
217 auto deadline = grpc_core::Timestamp::FromTimespecRoundUp(
218 grpc_timeout_seconds_to_deadline(300));
219 gpr_log(GPR_DEBUG,
220 "num_bytes=%" PRIuPTR " write_size=%" PRIuPTR " slice_size=%" PRIuPTR
221 " shutdown=%d",
222 num_bytes, write_size, slice_size, shutdown);
223
224 if (shutdown) {
225 gpr_log(GPR_INFO, "Start read and write shutdown test");
226 } else {
227 gpr_log(GPR_INFO,
228 "Start read and write test with %" PRIuPTR
229 " bytes, slice size %" PRIuPTR,
230 num_bytes, slice_size);
231 }
232
233 state.read_ep = f.client_ep;
234 state.write_ep = f.server_ep;
235 state.target_bytes = num_bytes;
236 state.bytes_read = 0;
237 state.current_write_size = write_size;
238 state.max_write_frame_size = max_write_frame_size;
239 state.bytes_written = 0;
240 state.read_done = 0;
241 state.write_done = 0;
242 state.current_read_data = 0;
243 state.current_write_data = 0;
244 GRPC_CLOSURE_INIT(&state.read_scheduler, read_scheduler, &state,
245 grpc_schedule_on_exec_ctx);
246 GRPC_CLOSURE_INIT(&state.done_read, read_and_write_test_read_handler, &state,
247 grpc_schedule_on_exec_ctx);
248 GRPC_CLOSURE_INIT(&state.write_scheduler, write_scheduler, &state,
249 grpc_schedule_on_exec_ctx);
250 GRPC_CLOSURE_INIT(&state.done_write, read_and_write_test_write_handler,
251 &state, grpc_schedule_on_exec_ctx);
252 grpc_slice_buffer_init(&state.outgoing);
253 grpc_slice_buffer_init(&state.incoming);
254
255 // Get started by pretending an initial write completed
256 // NOTE: Sets up initial conditions so we can have the same write handler
257 // for the first iteration as for later iterations. It does the right thing
258 // even when bytes_written is unsigned.
259 state.bytes_written -= state.current_write_size;
260 read_and_write_test_write_handler(&state, absl::OkStatus());
261 grpc_core::ExecCtx::Get()->Flush();
262
263 grpc_endpoint_read(state.read_ep, &state.incoming, &state.done_read,
264 /*urgent=*/false, /*min_progress_size=*/1);
265 if (shutdown) {
266 gpr_log(GPR_DEBUG, "shutdown read");
267 grpc_endpoint_shutdown(state.read_ep, GRPC_ERROR_CREATE("Test Shutdown"));
268 gpr_log(GPR_DEBUG, "shutdown write");
269 grpc_endpoint_shutdown(state.write_ep, GRPC_ERROR_CREATE("Test Shutdown"));
270 }
271 grpc_core::ExecCtx::Get()->Flush();
272
273 gpr_mu_lock(g_mu);
274 while (!state.read_done || !state.write_done) {
275 grpc_pollset_worker* worker = nullptr;
276 GPR_ASSERT(grpc_core::Timestamp::Now() < deadline);
277 GPR_ASSERT(GRPC_LOG_IF_ERROR(
278 "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
279 }
280 gpr_mu_unlock(g_mu);
281 grpc_core::ExecCtx::Get()->Flush();
282
283 end_test(config);
284 grpc_slice_buffer_destroy(&state.outgoing);
285 grpc_slice_buffer_destroy(&state.incoming);
286 grpc_endpoint_destroy(state.read_ep);
287 grpc_endpoint_destroy(state.write_ep);
288 }
289
inc_on_failure(void * arg,grpc_error_handle error)290 static void inc_on_failure(void* arg, grpc_error_handle error) {
291 gpr_mu_lock(g_mu);
292 *static_cast<int*>(arg) += (!error.ok());
293 GPR_ASSERT(GRPC_LOG_IF_ERROR("kick", grpc_pollset_kick(g_pollset, nullptr)));
294 gpr_mu_unlock(g_mu);
295 }
296
wait_for_fail_count(int * fail_count,int want_fail_count)297 static void wait_for_fail_count(int* fail_count, int want_fail_count) {
298 grpc_core::ExecCtx::Get()->Flush();
299 gpr_mu_lock(g_mu);
300 grpc_core::Timestamp deadline = grpc_core::Timestamp::FromTimespecRoundUp(
301 grpc_timeout_seconds_to_deadline(10));
302 while (grpc_core::Timestamp::Now() < deadline &&
303 *fail_count < want_fail_count) {
304 grpc_pollset_worker* worker = nullptr;
305 GPR_ASSERT(GRPC_LOG_IF_ERROR(
306 "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
307 gpr_mu_unlock(g_mu);
308 grpc_core::ExecCtx::Get()->Flush();
309 gpr_mu_lock(g_mu);
310 }
311 GPR_ASSERT(*fail_count == want_fail_count);
312 gpr_mu_unlock(g_mu);
313 }
314
multiple_shutdown_test(grpc_endpoint_test_config config)315 static void multiple_shutdown_test(grpc_endpoint_test_config config) {
316 grpc_endpoint_test_fixture f =
317 begin_test(config, "multiple_shutdown_test", 128);
318 int fail_count = 0;
319 grpc_slice_buffer slice_buffer;
320 grpc_slice_buffer_init(&slice_buffer);
321
322 grpc_core::ExecCtx exec_ctx;
323 grpc_endpoint_add_to_pollset(f.client_ep, g_pollset);
324 grpc_endpoint_read(f.client_ep, &slice_buffer,
325 GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
326 grpc_schedule_on_exec_ctx),
327 /*urgent=*/false, /*min_progress_size=*/1);
328 wait_for_fail_count(&fail_count, 0);
329 grpc_endpoint_shutdown(f.client_ep, GRPC_ERROR_CREATE("Test Shutdown"));
330 wait_for_fail_count(&fail_count, 1);
331 grpc_endpoint_read(f.client_ep, &slice_buffer,
332 GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
333 grpc_schedule_on_exec_ctx),
334 /*urgent=*/false, /*min_progress_size=*/1);
335 wait_for_fail_count(&fail_count, 2);
336 grpc_slice_buffer_add(&slice_buffer, grpc_slice_from_copied_string("a"));
337 grpc_endpoint_write(f.client_ep, &slice_buffer,
338 GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
339 grpc_schedule_on_exec_ctx),
340 nullptr, /*max_frame_size=*/INT_MAX);
341 wait_for_fail_count(&fail_count, 3);
342 grpc_endpoint_shutdown(f.client_ep, GRPC_ERROR_CREATE("Test Shutdown"));
343 wait_for_fail_count(&fail_count, 3);
344
345 grpc_slice_buffer_destroy(&slice_buffer);
346
347 grpc_endpoint_destroy(f.client_ep);
348 grpc_endpoint_destroy(f.server_ep);
349 }
350
grpc_endpoint_tests(grpc_endpoint_test_config config,grpc_pollset * pollset,gpr_mu * mu)351 void grpc_endpoint_tests(grpc_endpoint_test_config config,
352 grpc_pollset* pollset, gpr_mu* mu) {
353 size_t i;
354 g_pollset = pollset;
355 g_mu = mu;
356 multiple_shutdown_test(config);
357 for (int i = 1; i <= 10000; i = i * 10) {
358 read_and_write_test(config, 10000000, 100000, 8192, i, false);
359 read_and_write_test(config, 1000000, 100000, 1, i, false);
360 read_and_write_test(config, 100000000, 100000, 1, i, true);
361 }
362 for (i = 1; i < 1000; i = std::max(i + 1, i * 5 / 4)) {
363 read_and_write_test(config, 40320, i, i, i, false);
364 }
365 g_pollset = nullptr;
366 g_mu = nullptr;
367 }
368