xref: /aosp_15_r20/external/grpc-grpc/test/core/iomgr/endpoint_tests.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "test/core/iomgr/endpoint_tests.h"
20 
21 #include <limits.h>
22 #include <stdbool.h>
23 #include <sys/types.h>
24 
25 #include <grpc/slice.h>
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/log.h>
28 #include <grpc/support/time.h>
29 
30 #include "src/core/lib/gpr/useful.h"
31 #include "src/core/lib/gprpp/crash.h"
32 #include "src/core/lib/gprpp/time.h"
33 #include "src/core/lib/iomgr/error.h"
34 #include "src/core/lib/slice/slice_internal.h"
35 #include "test/core/util/test_config.h"
36 
37 //
38 // General test notes:
39 
40 // All tests which write data into an endpoint write i%256 into byte i, which
41 // is verified by readers.
42 
43 // In general there are a few interesting things to vary which may lead to
44 // exercising different codepaths in an implementation:
45 // 1. Total amount of data written to the endpoint
46 // 2. Size of slice allocations
47 // 3. Amount of data we read from or write to the endpoint at once
48 
49 // The tests here tend to parameterize these where applicable.
50 
51 //
52 
53 static gpr_mu* g_mu;
54 static grpc_pollset* g_pollset;
55 
count_slices(grpc_slice * slices,size_t nslices,int * current_data)56 size_t count_slices(grpc_slice* slices, size_t nslices, int* current_data) {
57   size_t num_bytes = 0;
58   size_t i;
59   size_t j;
60   unsigned char* buf;
61   for (i = 0; i < nslices; ++i) {
62     buf = GRPC_SLICE_START_PTR(slices[i]);
63     for (j = 0; j < GRPC_SLICE_LENGTH(slices[i]); ++j) {
64       GPR_ASSERT(buf[j] == *current_data);
65       *current_data = (*current_data + 1) % 256;
66     }
67     num_bytes += GRPC_SLICE_LENGTH(slices[i]);
68   }
69   return num_bytes;
70 }
71 
begin_test(grpc_endpoint_test_config config,const char * test_name,size_t slice_size)72 static grpc_endpoint_test_fixture begin_test(grpc_endpoint_test_config config,
73                                              const char* test_name,
74                                              size_t slice_size) {
75   gpr_log(GPR_INFO, "%s/%s", test_name, config.name);
76   return config.create_fixture(slice_size);
77 }
78 
end_test(grpc_endpoint_test_config config)79 static void end_test(grpc_endpoint_test_config config) { config.clean_up(); }
80 
allocate_blocks(size_t num_bytes,size_t slice_size,size_t * num_blocks,uint8_t * current_data)81 static grpc_slice* allocate_blocks(size_t num_bytes, size_t slice_size,
82                                    size_t* num_blocks, uint8_t* current_data) {
83   size_t nslices = num_bytes / slice_size + (num_bytes % slice_size ? 1 : 0);
84   grpc_slice* slices =
85       static_cast<grpc_slice*>(gpr_malloc(sizeof(grpc_slice) * nslices));
86   size_t num_bytes_left = num_bytes;
87   size_t i;
88   size_t j;
89   unsigned char* buf;
90   *num_blocks = nslices;
91 
92   for (i = 0; i < nslices; ++i) {
93     slices[i] = grpc_slice_malloc(slice_size > num_bytes_left ? num_bytes_left
94                                                               : slice_size);
95     num_bytes_left -= GRPC_SLICE_LENGTH(slices[i]);
96     buf = GRPC_SLICE_START_PTR(slices[i]);
97     for (j = 0; j < GRPC_SLICE_LENGTH(slices[i]); ++j) {
98       buf[j] = *current_data;
99       (*current_data)++;
100     }
101   }
102   GPR_ASSERT(num_bytes_left == 0);
103   return slices;
104 }
105 
106 struct read_and_write_test_state {
107   grpc_endpoint* read_ep;
108   grpc_endpoint* write_ep;
109   size_t target_bytes;
110   size_t bytes_read;
111   size_t current_write_size;
112   size_t bytes_written;
113   int current_read_data;
114   uint8_t current_write_data;
115   int read_done;
116   int write_done;
117   int max_write_frame_size;
118   grpc_slice_buffer incoming;
119   grpc_slice_buffer outgoing;
120   grpc_closure done_read;
121   grpc_closure done_write;
122   grpc_closure read_scheduler;
123   grpc_closure write_scheduler;
124 };
125 
read_scheduler(void * data,grpc_error_handle)126 static void read_scheduler(void* data, grpc_error_handle /* error */) {
127   struct read_and_write_test_state* state =
128       static_cast<struct read_and_write_test_state*>(data);
129   grpc_endpoint_read(state->read_ep, &state->incoming, &state->done_read,
130                      /*urgent=*/false, /*min_progress_size=*/1);
131 }
132 
read_and_write_test_read_handler_read_done(read_and_write_test_state * state,int read_done_state)133 static void read_and_write_test_read_handler_read_done(
134     read_and_write_test_state* state, int read_done_state) {
135   gpr_log(GPR_DEBUG, "Read handler done");
136   gpr_mu_lock(g_mu);
137   state->read_done = read_done_state;
138   GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr));
139   gpr_mu_unlock(g_mu);
140 }
141 
read_and_write_test_read_handler(void * data,grpc_error_handle error)142 static void read_and_write_test_read_handler(void* data,
143                                              grpc_error_handle error) {
144   struct read_and_write_test_state* state =
145       static_cast<struct read_and_write_test_state*>(data);
146   if (!error.ok()) {
147     read_and_write_test_read_handler_read_done(state, 1);
148     return;
149   }
150   state->bytes_read += count_slices(
151       state->incoming.slices, state->incoming.count, &state->current_read_data);
152   if (state->bytes_read == state->target_bytes) {
153     read_and_write_test_read_handler_read_done(state, 2);
154     return;
155   }
156   // We perform many reads one after another. If grpc_endpoint_read and the
157   // read_handler are both run inline, we might end up growing the stack
158   // beyond the limit. Schedule the read on ExecCtx to avoid this.
159   grpc_core::ExecCtx::Run(DEBUG_LOCATION, &state->read_scheduler,
160                           absl::OkStatus());
161 }
162 
write_scheduler(void * data,grpc_error_handle)163 static void write_scheduler(void* data, grpc_error_handle /* error */) {
164   struct read_and_write_test_state* state =
165       static_cast<struct read_and_write_test_state*>(data);
166   grpc_endpoint_write(state->write_ep, &state->outgoing, &state->done_write,
167                       nullptr, /*max_frame_size=*/state->max_write_frame_size);
168 }
169 
read_and_write_test_write_handler(void * data,grpc_error_handle error)170 static void read_and_write_test_write_handler(void* data,
171                                               grpc_error_handle error) {
172   struct read_and_write_test_state* state =
173       static_cast<struct read_and_write_test_state*>(data);
174   grpc_slice* slices = nullptr;
175   size_t nslices;
176 
177   if (error.ok()) {
178     state->bytes_written += state->current_write_size;
179     if (state->target_bytes - state->bytes_written <
180         state->current_write_size) {
181       state->current_write_size = state->target_bytes - state->bytes_written;
182     }
183     if (state->current_write_size != 0) {
184       slices = allocate_blocks(state->current_write_size, 8192, &nslices,
185                                &state->current_write_data);
186       grpc_slice_buffer_reset_and_unref(&state->outgoing);
187       grpc_slice_buffer_addn(&state->outgoing, slices, nslices);
188       // We perform many writes one after another. If grpc_endpoint_write and
189       // the write_handler are both run inline, we might end up growing the
190       // stack beyond the limit. Schedule the write on ExecCtx to avoid this.
191       grpc_core::ExecCtx::Run(DEBUG_LOCATION, &state->write_scheduler,
192                               absl::OkStatus());
193       gpr_free(slices);
194       return;
195     }
196   }
197 
198   gpr_log(GPR_DEBUG, "Write handler done");
199   gpr_mu_lock(g_mu);
200   state->write_done = 1 + (error.ok());
201   GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr));
202   gpr_mu_unlock(g_mu);
203 }
204 
205 // Do both reading and writing using the grpc_endpoint API.
206 
207 // This also includes a test of the shutdown behavior.
208 //
read_and_write_test(grpc_endpoint_test_config config,size_t num_bytes,size_t write_size,size_t slice_size,int max_write_frame_size,bool shutdown)209 static void read_and_write_test(grpc_endpoint_test_config config,
210                                 size_t num_bytes, size_t write_size,
211                                 size_t slice_size, int max_write_frame_size,
212                                 bool shutdown) {
213   struct read_and_write_test_state state;
214   grpc_endpoint_test_fixture f =
215       begin_test(config, "read_and_write_test", slice_size);
216   grpc_core::ExecCtx exec_ctx;
217   auto deadline = grpc_core::Timestamp::FromTimespecRoundUp(
218       grpc_timeout_seconds_to_deadline(300));
219   gpr_log(GPR_DEBUG,
220           "num_bytes=%" PRIuPTR " write_size=%" PRIuPTR " slice_size=%" PRIuPTR
221           " shutdown=%d",
222           num_bytes, write_size, slice_size, shutdown);
223 
224   if (shutdown) {
225     gpr_log(GPR_INFO, "Start read and write shutdown test");
226   } else {
227     gpr_log(GPR_INFO,
228             "Start read and write test with %" PRIuPTR
229             " bytes, slice size %" PRIuPTR,
230             num_bytes, slice_size);
231   }
232 
233   state.read_ep = f.client_ep;
234   state.write_ep = f.server_ep;
235   state.target_bytes = num_bytes;
236   state.bytes_read = 0;
237   state.current_write_size = write_size;
238   state.max_write_frame_size = max_write_frame_size;
239   state.bytes_written = 0;
240   state.read_done = 0;
241   state.write_done = 0;
242   state.current_read_data = 0;
243   state.current_write_data = 0;
244   GRPC_CLOSURE_INIT(&state.read_scheduler, read_scheduler, &state,
245                     grpc_schedule_on_exec_ctx);
246   GRPC_CLOSURE_INIT(&state.done_read, read_and_write_test_read_handler, &state,
247                     grpc_schedule_on_exec_ctx);
248   GRPC_CLOSURE_INIT(&state.write_scheduler, write_scheduler, &state,
249                     grpc_schedule_on_exec_ctx);
250   GRPC_CLOSURE_INIT(&state.done_write, read_and_write_test_write_handler,
251                     &state, grpc_schedule_on_exec_ctx);
252   grpc_slice_buffer_init(&state.outgoing);
253   grpc_slice_buffer_init(&state.incoming);
254 
255   // Get started by pretending an initial write completed
256   // NOTE: Sets up initial conditions so we can have the same write handler
257   // for the first iteration as for later iterations. It does the right thing
258   // even when bytes_written is unsigned.
259   state.bytes_written -= state.current_write_size;
260   read_and_write_test_write_handler(&state, absl::OkStatus());
261   grpc_core::ExecCtx::Get()->Flush();
262 
263   grpc_endpoint_read(state.read_ep, &state.incoming, &state.done_read,
264                      /*urgent=*/false, /*min_progress_size=*/1);
265   if (shutdown) {
266     gpr_log(GPR_DEBUG, "shutdown read");
267     grpc_endpoint_shutdown(state.read_ep, GRPC_ERROR_CREATE("Test Shutdown"));
268     gpr_log(GPR_DEBUG, "shutdown write");
269     grpc_endpoint_shutdown(state.write_ep, GRPC_ERROR_CREATE("Test Shutdown"));
270   }
271   grpc_core::ExecCtx::Get()->Flush();
272 
273   gpr_mu_lock(g_mu);
274   while (!state.read_done || !state.write_done) {
275     grpc_pollset_worker* worker = nullptr;
276     GPR_ASSERT(grpc_core::Timestamp::Now() < deadline);
277     GPR_ASSERT(GRPC_LOG_IF_ERROR(
278         "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
279   }
280   gpr_mu_unlock(g_mu);
281   grpc_core::ExecCtx::Get()->Flush();
282 
283   end_test(config);
284   grpc_slice_buffer_destroy(&state.outgoing);
285   grpc_slice_buffer_destroy(&state.incoming);
286   grpc_endpoint_destroy(state.read_ep);
287   grpc_endpoint_destroy(state.write_ep);
288 }
289 
inc_on_failure(void * arg,grpc_error_handle error)290 static void inc_on_failure(void* arg, grpc_error_handle error) {
291   gpr_mu_lock(g_mu);
292   *static_cast<int*>(arg) += (!error.ok());
293   GPR_ASSERT(GRPC_LOG_IF_ERROR("kick", grpc_pollset_kick(g_pollset, nullptr)));
294   gpr_mu_unlock(g_mu);
295 }
296 
wait_for_fail_count(int * fail_count,int want_fail_count)297 static void wait_for_fail_count(int* fail_count, int want_fail_count) {
298   grpc_core::ExecCtx::Get()->Flush();
299   gpr_mu_lock(g_mu);
300   grpc_core::Timestamp deadline = grpc_core::Timestamp::FromTimespecRoundUp(
301       grpc_timeout_seconds_to_deadline(10));
302   while (grpc_core::Timestamp::Now() < deadline &&
303          *fail_count < want_fail_count) {
304     grpc_pollset_worker* worker = nullptr;
305     GPR_ASSERT(GRPC_LOG_IF_ERROR(
306         "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
307     gpr_mu_unlock(g_mu);
308     grpc_core::ExecCtx::Get()->Flush();
309     gpr_mu_lock(g_mu);
310   }
311   GPR_ASSERT(*fail_count == want_fail_count);
312   gpr_mu_unlock(g_mu);
313 }
314 
multiple_shutdown_test(grpc_endpoint_test_config config)315 static void multiple_shutdown_test(grpc_endpoint_test_config config) {
316   grpc_endpoint_test_fixture f =
317       begin_test(config, "multiple_shutdown_test", 128);
318   int fail_count = 0;
319   grpc_slice_buffer slice_buffer;
320   grpc_slice_buffer_init(&slice_buffer);
321 
322   grpc_core::ExecCtx exec_ctx;
323   grpc_endpoint_add_to_pollset(f.client_ep, g_pollset);
324   grpc_endpoint_read(f.client_ep, &slice_buffer,
325                      GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
326                                          grpc_schedule_on_exec_ctx),
327                      /*urgent=*/false, /*min_progress_size=*/1);
328   wait_for_fail_count(&fail_count, 0);
329   grpc_endpoint_shutdown(f.client_ep, GRPC_ERROR_CREATE("Test Shutdown"));
330   wait_for_fail_count(&fail_count, 1);
331   grpc_endpoint_read(f.client_ep, &slice_buffer,
332                      GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
333                                          grpc_schedule_on_exec_ctx),
334                      /*urgent=*/false, /*min_progress_size=*/1);
335   wait_for_fail_count(&fail_count, 2);
336   grpc_slice_buffer_add(&slice_buffer, grpc_slice_from_copied_string("a"));
337   grpc_endpoint_write(f.client_ep, &slice_buffer,
338                       GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
339                                           grpc_schedule_on_exec_ctx),
340                       nullptr, /*max_frame_size=*/INT_MAX);
341   wait_for_fail_count(&fail_count, 3);
342   grpc_endpoint_shutdown(f.client_ep, GRPC_ERROR_CREATE("Test Shutdown"));
343   wait_for_fail_count(&fail_count, 3);
344 
345   grpc_slice_buffer_destroy(&slice_buffer);
346 
347   grpc_endpoint_destroy(f.client_ep);
348   grpc_endpoint_destroy(f.server_ep);
349 }
350 
grpc_endpoint_tests(grpc_endpoint_test_config config,grpc_pollset * pollset,gpr_mu * mu)351 void grpc_endpoint_tests(grpc_endpoint_test_config config,
352                          grpc_pollset* pollset, gpr_mu* mu) {
353   size_t i;
354   g_pollset = pollset;
355   g_mu = mu;
356   multiple_shutdown_test(config);
357   for (int i = 1; i <= 10000; i = i * 10) {
358     read_and_write_test(config, 10000000, 100000, 8192, i, false);
359     read_and_write_test(config, 1000000, 100000, 1, i, false);
360     read_and_write_test(config, 100000000, 100000, 1, i, true);
361   }
362   for (i = 1; i < 1000; i = std::max(i + 1, i * 5 / 4)) {
363     read_and_write_test(config, 40320, i, i, i, false);
364   }
365   g_pollset = nullptr;
366   g_mu = nullptr;
367 }
368