1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "src/core/lib/surface/completion_queue.h"
20
21 #include <stddef.h>
22
23 #include <memory>
24
25 #include "absl/status/status.h"
26 #include "gtest/gtest.h"
27
28 #include <grpc/grpc.h>
29 #include <grpc/support/log.h>
30 #include <grpc/support/sync.h>
31 #include <grpc/support/time.h>
32
33 #include "src/core/lib/gpr/useful.h"
34 #include "src/core/lib/iomgr/exec_ctx.h"
35 #include "test/core/util/test_config.h"
36
37 #define LOG_TEST(x) gpr_log(GPR_INFO, "%s", x)
38
create_test_tag(void)39 static void* create_test_tag(void) {
40 static intptr_t i = 0;
41 return reinterpret_cast<void*>(++i);
42 }
43
44 // helper for tests to shutdown correctly and tersely
shutdown_and_destroy(grpc_completion_queue * cc)45 static void shutdown_and_destroy(grpc_completion_queue* cc) {
46 grpc_event ev;
47 grpc_completion_queue_shutdown(cc);
48
49 switch (grpc_get_cq_completion_type(cc)) {
50 case GRPC_CQ_NEXT: {
51 ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
52 nullptr);
53 ASSERT_EQ(ev.type, GRPC_QUEUE_SHUTDOWN);
54 break;
55 }
56 case GRPC_CQ_PLUCK: {
57 ev = grpc_completion_queue_pluck(
58 cc, create_test_tag(), gpr_inf_past(GPR_CLOCK_REALTIME), nullptr);
59 ASSERT_EQ(ev.type, GRPC_QUEUE_SHUTDOWN);
60 break;
61 }
62 case GRPC_CQ_CALLBACK: {
63 // Nothing to do here. The shutdown callback will be invoked when
64 // possible.
65 break;
66 }
67 default: {
68 gpr_log(GPR_ERROR, "Unknown completion type");
69 break;
70 }
71 }
72
73 grpc_completion_queue_destroy(cc);
74 }
75
76 // ensure we can create and destroy a completion channel
TEST(GrpcCompletionQueueTest,TestNoOp)77 TEST(GrpcCompletionQueueTest, TestNoOp) {
78 grpc_cq_completion_type completion_types[] = {GRPC_CQ_NEXT, GRPC_CQ_PLUCK};
79 grpc_cq_polling_type polling_types[] = {
80 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
81 grpc_completion_queue_attributes attr;
82 LOG_TEST("test_no_op");
83
84 attr.version = 1;
85 for (size_t i = 0; i < GPR_ARRAY_SIZE(completion_types); i++) {
86 for (size_t j = 0; j < GPR_ARRAY_SIZE(polling_types); j++) {
87 attr.cq_completion_type = completion_types[i];
88 attr.cq_polling_type = polling_types[j];
89 shutdown_and_destroy(grpc_completion_queue_create(
90 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr));
91 }
92 }
93 }
94
TEST(GrpcCompletionQueueTest,TestPollsetConversion)95 TEST(GrpcCompletionQueueTest, TestPollsetConversion) {
96 grpc_cq_completion_type completion_types[] = {GRPC_CQ_NEXT, GRPC_CQ_PLUCK};
97 grpc_cq_polling_type polling_types[] = {GRPC_CQ_DEFAULT_POLLING,
98 GRPC_CQ_NON_LISTENING};
99 grpc_completion_queue* cq;
100 grpc_completion_queue_attributes attr;
101
102 LOG_TEST("test_pollset_conversion");
103
104 attr.version = 1;
105 for (size_t i = 0; i < GPR_ARRAY_SIZE(completion_types); i++) {
106 for (size_t j = 0; j < GPR_ARRAY_SIZE(polling_types); j++) {
107 attr.cq_completion_type = completion_types[i];
108 attr.cq_polling_type = polling_types[j];
109 cq = grpc_completion_queue_create(
110 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
111 ASSERT_NE(grpc_cq_pollset(cq), nullptr);
112 shutdown_and_destroy(cq);
113 }
114 }
115 }
116
TEST(GrpcCompletionQueueTest,TestWaitEmpty)117 TEST(GrpcCompletionQueueTest, TestWaitEmpty) {
118 grpc_cq_polling_type polling_types[] = {
119 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
120 grpc_completion_queue* cc;
121 grpc_completion_queue_attributes attr;
122 grpc_event event;
123
124 LOG_TEST("test_wait_empty");
125
126 attr.version = 1;
127 attr.cq_completion_type = GRPC_CQ_NEXT;
128 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
129 attr.cq_polling_type = polling_types[i];
130 cc = grpc_completion_queue_create(
131 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
132 event =
133 grpc_completion_queue_next(cc, gpr_now(GPR_CLOCK_REALTIME), nullptr);
134 ASSERT_EQ(event.type, GRPC_QUEUE_TIMEOUT);
135 shutdown_and_destroy(cc);
136 }
137 }
138
do_nothing_end_completion(void *,grpc_cq_completion *)139 static void do_nothing_end_completion(void* /*arg*/,
140 grpc_cq_completion* /*c*/) {}
141
TEST(GrpcCompletionQueueTest,TestCqEndOp)142 TEST(GrpcCompletionQueueTest, TestCqEndOp) {
143 grpc_event ev;
144 grpc_completion_queue* cc;
145 grpc_cq_completion completion;
146 grpc_cq_polling_type polling_types[] = {
147 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
148 grpc_completion_queue_attributes attr;
149 void* tag = create_test_tag();
150
151 LOG_TEST("test_cq_end_op");
152
153 attr.version = 1;
154 attr.cq_completion_type = GRPC_CQ_NEXT;
155 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
156 grpc_core::ExecCtx exec_ctx;
157 attr.cq_polling_type = polling_types[i];
158 cc = grpc_completion_queue_create(
159 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
160
161 ASSERT_TRUE(grpc_cq_begin_op(cc, tag));
162 grpc_cq_end_op(cc, tag, absl::OkStatus(), do_nothing_end_completion,
163 nullptr, &completion);
164
165 ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
166 nullptr);
167 ASSERT_EQ(ev.type, GRPC_OP_COMPLETE);
168 ASSERT_EQ(ev.tag, tag);
169 ASSERT_TRUE(ev.success);
170
171 shutdown_and_destroy(cc);
172 }
173 }
174
TEST(GrpcCompletionQueueTest,TestCqTlsCacheFull)175 TEST(GrpcCompletionQueueTest, TestCqTlsCacheFull) {
176 grpc_event ev;
177 grpc_completion_queue* cc;
178 grpc_cq_completion completion;
179 grpc_cq_polling_type polling_types[] = {
180 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
181 grpc_completion_queue_attributes attr;
182 void* tag = create_test_tag();
183 void* res_tag;
184 int ok;
185
186 LOG_TEST("test_cq_tls_cache_full");
187
188 attr.version = 1;
189 attr.cq_completion_type = GRPC_CQ_NEXT;
190 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
191 grpc_core::ExecCtx exec_ctx; // Reset exec_ctx
192 attr.cq_polling_type = polling_types[i];
193 cc = grpc_completion_queue_create(
194 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
195
196 grpc_completion_queue_thread_local_cache_init(cc);
197 ASSERT_TRUE(grpc_cq_begin_op(cc, tag));
198 grpc_cq_end_op(cc, tag, absl::OkStatus(), do_nothing_end_completion,
199 nullptr, &completion);
200
201 ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
202 nullptr);
203 ASSERT_EQ(ev.type, GRPC_QUEUE_TIMEOUT);
204
205 ASSERT_EQ(grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok),
206 1);
207 ASSERT_EQ(res_tag, tag);
208 ASSERT_TRUE(ok);
209
210 ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
211 nullptr);
212 ASSERT_EQ(ev.type, GRPC_QUEUE_TIMEOUT);
213
214 shutdown_and_destroy(cc);
215 }
216 }
217
TEST(GrpcCompletionQueueTest,TestCqTlsCacheEmpty)218 TEST(GrpcCompletionQueueTest, TestCqTlsCacheEmpty) {
219 grpc_completion_queue* cc;
220 grpc_cq_polling_type polling_types[] = {
221 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
222 grpc_completion_queue_attributes attr;
223 void* res_tag;
224 int ok;
225
226 LOG_TEST("test_cq_tls_cache_empty");
227
228 attr.version = 1;
229 attr.cq_completion_type = GRPC_CQ_NEXT;
230 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
231 grpc_core::ExecCtx exec_ctx; // Reset exec_ctx
232 attr.cq_polling_type = polling_types[i];
233 cc = grpc_completion_queue_create(
234 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
235
236 ASSERT_EQ(grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok),
237 0);
238 grpc_completion_queue_thread_local_cache_init(cc);
239 ASSERT_EQ(grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok),
240 0);
241 shutdown_and_destroy(cc);
242 }
243 }
244
TEST(GrpcCompletionQueueTest,TestShutdownThenNextPolling)245 TEST(GrpcCompletionQueueTest, TestShutdownThenNextPolling) {
246 grpc_cq_polling_type polling_types[] = {
247 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
248 grpc_completion_queue* cc;
249 grpc_completion_queue_attributes attr;
250 grpc_event event;
251 LOG_TEST("test_shutdown_then_next_polling");
252
253 attr.version = 1;
254 attr.cq_completion_type = GRPC_CQ_NEXT;
255 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
256 attr.cq_polling_type = polling_types[i];
257 cc = grpc_completion_queue_create(
258 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
259 grpc_completion_queue_shutdown(cc);
260 event = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
261 nullptr);
262 ASSERT_EQ(event.type, GRPC_QUEUE_SHUTDOWN);
263 grpc_completion_queue_destroy(cc);
264 }
265 }
266
TEST(GrpcCompletionQueueTest,TestShutdownThenNextWithTimeout)267 TEST(GrpcCompletionQueueTest, TestShutdownThenNextWithTimeout) {
268 grpc_cq_polling_type polling_types[] = {
269 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
270 grpc_completion_queue* cc;
271 grpc_completion_queue_attributes attr;
272 grpc_event event;
273 LOG_TEST("test_shutdown_then_next_with_timeout");
274
275 attr.version = 1;
276 attr.cq_completion_type = GRPC_CQ_NEXT;
277 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
278 attr.cq_polling_type = polling_types[i];
279 cc = grpc_completion_queue_create(
280 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
281
282 grpc_completion_queue_shutdown(cc);
283 event = grpc_completion_queue_next(cc, gpr_inf_future(GPR_CLOCK_REALTIME),
284 nullptr);
285 ASSERT_EQ(event.type, GRPC_QUEUE_SHUTDOWN);
286 grpc_completion_queue_destroy(cc);
287 }
288 }
289
TEST(GrpcCompletionQueueTest,TestPluck)290 TEST(GrpcCompletionQueueTest, TestPluck) {
291 grpc_event ev;
292 grpc_completion_queue* cc;
293 void* tags[128];
294 grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
295 grpc_cq_polling_type polling_types[] = {
296 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
297 grpc_completion_queue_attributes attr;
298 unsigned i, j;
299
300 LOG_TEST("test_pluck");
301
302 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
303 tags[i] = create_test_tag();
304 for (j = 0; j < i; j++) {
305 ASSERT_NE(tags[i], tags[j]);
306 }
307 }
308
309 attr.version = 1;
310 attr.cq_completion_type = GRPC_CQ_PLUCK;
311 for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) {
312 grpc_core::ExecCtx exec_ctx; // reset exec_ctx
313 attr.cq_polling_type = polling_types[pidx];
314 cc = grpc_completion_queue_create(
315 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
316
317 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
318 ASSERT_TRUE(grpc_cq_begin_op(cc, tags[i]));
319 grpc_cq_end_op(cc, tags[i], absl::OkStatus(), do_nothing_end_completion,
320 nullptr, &completions[i]);
321 }
322
323 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
324 ev = grpc_completion_queue_pluck(
325 cc, tags[i], gpr_inf_past(GPR_CLOCK_REALTIME), nullptr);
326 ASSERT_EQ(ev.tag, tags[i]);
327 }
328
329 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
330 ASSERT_TRUE(grpc_cq_begin_op(cc, tags[i]));
331 grpc_cq_end_op(cc, tags[i], absl::OkStatus(), do_nothing_end_completion,
332 nullptr, &completions[i]);
333 }
334
335 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
336 ev = grpc_completion_queue_pluck(cc, tags[GPR_ARRAY_SIZE(tags) - i - 1],
337 gpr_inf_past(GPR_CLOCK_REALTIME),
338 nullptr);
339 ASSERT_EQ(ev.tag, tags[GPR_ARRAY_SIZE(tags) - i - 1]);
340 }
341
342 shutdown_and_destroy(cc);
343 }
344 }
345
TEST(GrpcCompletionQueueTest,TestPluckAfterShutdown)346 TEST(GrpcCompletionQueueTest, TestPluckAfterShutdown) {
347 grpc_cq_polling_type polling_types[] = {
348 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
349 grpc_event ev;
350 grpc_completion_queue* cc;
351 grpc_completion_queue_attributes attr;
352
353 LOG_TEST("test_pluck_after_shutdown");
354
355 attr.version = 1;
356 attr.cq_completion_type = GRPC_CQ_PLUCK;
357 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
358 attr.cq_polling_type = polling_types[i];
359 cc = grpc_completion_queue_create(
360 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
361 grpc_completion_queue_shutdown(cc);
362 ev = grpc_completion_queue_pluck(
363 cc, nullptr, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
364 ASSERT_EQ(ev.type, GRPC_QUEUE_SHUTDOWN);
365 grpc_completion_queue_destroy(cc);
366 }
367 }
368
TEST(GrpcCompletionQueueTest,TestCallback)369 TEST(GrpcCompletionQueueTest, TestCallback) {
370 grpc_completion_queue* cc;
371 static void* tags[128];
372 grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
373 grpc_cq_polling_type polling_types[] = {
374 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
375 grpc_completion_queue_attributes attr;
376 unsigned i;
377 static gpr_mu mu, shutdown_mu;
378 static gpr_cv cv, shutdown_cv;
379 static int cb_counter;
380 gpr_mu_init(&mu);
381 gpr_mu_init(&shutdown_mu);
382 gpr_cv_init(&cv);
383 gpr_cv_init(&shutdown_cv);
384
385 LOG_TEST("test_callback");
386
387 bool got_shutdown = false;
388 class ShutdownCallback : public grpc_completion_queue_functor {
389 public:
390 explicit ShutdownCallback(bool* done) : done_(done) {
391 functor_run = &ShutdownCallback::Run;
392 inlineable = false;
393 }
394 ~ShutdownCallback() {}
395 static void Run(grpc_completion_queue_functor* cb, int ok) {
396 gpr_mu_lock(&shutdown_mu);
397 *static_cast<ShutdownCallback*>(cb)->done_ = static_cast<bool>(ok);
398 // Signal when the shutdown callback is completed.
399 gpr_cv_signal(&shutdown_cv);
400 gpr_mu_unlock(&shutdown_mu);
401 }
402
403 private:
404 bool* done_;
405 };
406 ShutdownCallback shutdown_cb(&got_shutdown);
407
408 attr.version = 2;
409 attr.cq_completion_type = GRPC_CQ_CALLBACK;
410 attr.cq_shutdown_cb = &shutdown_cb;
411
412 for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) {
413 int sumtags = 0;
414 int counter = 0;
415 cb_counter = 0;
416 {
417 // reset exec_ctx types
418 grpc_core::ExecCtx exec_ctx;
419 attr.cq_polling_type = polling_types[pidx];
420 cc = grpc_completion_queue_create(
421 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
422
423 class TagCallback : public grpc_completion_queue_functor {
424 public:
425 TagCallback(int* counter, int tag) : counter_(counter), tag_(tag) {
426 functor_run = &TagCallback::Run;
427 // Inlineable should be false since this callback takes locks.
428 inlineable = false;
429 }
430 ~TagCallback() {}
431 static void Run(grpc_completion_queue_functor* cb, int ok) {
432 ASSERT_TRUE(static_cast<bool>(ok));
433 auto* callback = static_cast<TagCallback*>(cb);
434 gpr_mu_lock(&mu);
435 cb_counter++;
436 *callback->counter_ += callback->tag_;
437 if (cb_counter == GPR_ARRAY_SIZE(tags)) {
438 gpr_cv_signal(&cv);
439 }
440 gpr_mu_unlock(&mu);
441 delete callback;
442 };
443
444 private:
445 int* counter_;
446 int tag_;
447 };
448
449 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
450 tags[i] = static_cast<void*>(new TagCallback(&counter, i));
451 sumtags += i;
452 }
453
454 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
455 ASSERT_TRUE(grpc_cq_begin_op(cc, tags[i]));
456 grpc_cq_end_op(cc, tags[i], absl::OkStatus(), do_nothing_end_completion,
457 nullptr, &completions[i]);
458 }
459
460 gpr_mu_lock(&mu);
461 while (cb_counter != GPR_ARRAY_SIZE(tags)) {
462 // Wait for all the callbacks to complete.
463 gpr_cv_wait(&cv, &mu, gpr_inf_future(GPR_CLOCK_REALTIME));
464 }
465 gpr_mu_unlock(&mu);
466
467 shutdown_and_destroy(cc);
468
469 gpr_mu_lock(&shutdown_mu);
470 while (!got_shutdown) {
471 // Wait for the shutdown callback to complete.
472 gpr_cv_wait(&shutdown_cv, &shutdown_mu,
473 gpr_inf_future(GPR_CLOCK_REALTIME));
474 }
475 gpr_mu_unlock(&shutdown_mu);
476 }
477
478 // Run the assertions to check if the test ran successfully.
479 ASSERT_EQ(sumtags, counter);
480 ASSERT_TRUE(got_shutdown);
481 got_shutdown = false;
482 }
483
484 gpr_cv_destroy(&cv);
485 gpr_cv_destroy(&shutdown_cv);
486 gpr_mu_destroy(&mu);
487 gpr_mu_destroy(&shutdown_mu);
488 }
489
490 struct thread_state {
491 grpc_completion_queue* cc;
492 void* tag;
493 };
494
main(int argc,char ** argv)495 int main(int argc, char** argv) {
496 grpc::testing::TestEnvironment env(&argc, argv);
497 ::testing::InitGoogleTest(&argc, argv);
498 grpc::testing::TestGrpcScope grpc_scope;
499 return RUN_ALL_TESTS();
500 }
501