xref: /aosp_15_r20/external/grpc-grpc/test/core/surface/completion_queue_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/core/lib/surface/completion_queue.h"
20 
21 #include <stddef.h>
22 
23 #include <memory>
24 
25 #include "absl/status/status.h"
26 #include "gtest/gtest.h"
27 
28 #include <grpc/grpc.h>
29 #include <grpc/support/log.h>
30 #include <grpc/support/sync.h>
31 #include <grpc/support/time.h>
32 
33 #include "src/core/lib/gpr/useful.h"
34 #include "src/core/lib/iomgr/exec_ctx.h"
35 #include "test/core/util/test_config.h"
36 
37 #define LOG_TEST(x) gpr_log(GPR_INFO, "%s", x)
38 
create_test_tag(void)39 static void* create_test_tag(void) {
40   static intptr_t i = 0;
41   return reinterpret_cast<void*>(++i);
42 }
43 
44 // helper for tests to shutdown correctly and tersely
shutdown_and_destroy(grpc_completion_queue * cc)45 static void shutdown_and_destroy(grpc_completion_queue* cc) {
46   grpc_event ev;
47   grpc_completion_queue_shutdown(cc);
48 
49   switch (grpc_get_cq_completion_type(cc)) {
50     case GRPC_CQ_NEXT: {
51       ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
52                                       nullptr);
53       ASSERT_EQ(ev.type, GRPC_QUEUE_SHUTDOWN);
54       break;
55     }
56     case GRPC_CQ_PLUCK: {
57       ev = grpc_completion_queue_pluck(
58           cc, create_test_tag(), gpr_inf_past(GPR_CLOCK_REALTIME), nullptr);
59       ASSERT_EQ(ev.type, GRPC_QUEUE_SHUTDOWN);
60       break;
61     }
62     case GRPC_CQ_CALLBACK: {
63       // Nothing to do here. The shutdown callback will be invoked when
64       // possible.
65       break;
66     }
67     default: {
68       gpr_log(GPR_ERROR, "Unknown completion type");
69       break;
70     }
71   }
72 
73   grpc_completion_queue_destroy(cc);
74 }
75 
76 // ensure we can create and destroy a completion channel
TEST(GrpcCompletionQueueTest,TestNoOp)77 TEST(GrpcCompletionQueueTest, TestNoOp) {
78   grpc_cq_completion_type completion_types[] = {GRPC_CQ_NEXT, GRPC_CQ_PLUCK};
79   grpc_cq_polling_type polling_types[] = {
80       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
81   grpc_completion_queue_attributes attr;
82   LOG_TEST("test_no_op");
83 
84   attr.version = 1;
85   for (size_t i = 0; i < GPR_ARRAY_SIZE(completion_types); i++) {
86     for (size_t j = 0; j < GPR_ARRAY_SIZE(polling_types); j++) {
87       attr.cq_completion_type = completion_types[i];
88       attr.cq_polling_type = polling_types[j];
89       shutdown_and_destroy(grpc_completion_queue_create(
90           grpc_completion_queue_factory_lookup(&attr), &attr, nullptr));
91     }
92   }
93 }
94 
TEST(GrpcCompletionQueueTest,TestPollsetConversion)95 TEST(GrpcCompletionQueueTest, TestPollsetConversion) {
96   grpc_cq_completion_type completion_types[] = {GRPC_CQ_NEXT, GRPC_CQ_PLUCK};
97   grpc_cq_polling_type polling_types[] = {GRPC_CQ_DEFAULT_POLLING,
98                                           GRPC_CQ_NON_LISTENING};
99   grpc_completion_queue* cq;
100   grpc_completion_queue_attributes attr;
101 
102   LOG_TEST("test_pollset_conversion");
103 
104   attr.version = 1;
105   for (size_t i = 0; i < GPR_ARRAY_SIZE(completion_types); i++) {
106     for (size_t j = 0; j < GPR_ARRAY_SIZE(polling_types); j++) {
107       attr.cq_completion_type = completion_types[i];
108       attr.cq_polling_type = polling_types[j];
109       cq = grpc_completion_queue_create(
110           grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
111       ASSERT_NE(grpc_cq_pollset(cq), nullptr);
112       shutdown_and_destroy(cq);
113     }
114   }
115 }
116 
TEST(GrpcCompletionQueueTest,TestWaitEmpty)117 TEST(GrpcCompletionQueueTest, TestWaitEmpty) {
118   grpc_cq_polling_type polling_types[] = {
119       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
120   grpc_completion_queue* cc;
121   grpc_completion_queue_attributes attr;
122   grpc_event event;
123 
124   LOG_TEST("test_wait_empty");
125 
126   attr.version = 1;
127   attr.cq_completion_type = GRPC_CQ_NEXT;
128   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
129     attr.cq_polling_type = polling_types[i];
130     cc = grpc_completion_queue_create(
131         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
132     event =
133         grpc_completion_queue_next(cc, gpr_now(GPR_CLOCK_REALTIME), nullptr);
134     ASSERT_EQ(event.type, GRPC_QUEUE_TIMEOUT);
135     shutdown_and_destroy(cc);
136   }
137 }
138 
do_nothing_end_completion(void *,grpc_cq_completion *)139 static void do_nothing_end_completion(void* /*arg*/,
140                                       grpc_cq_completion* /*c*/) {}
141 
TEST(GrpcCompletionQueueTest,TestCqEndOp)142 TEST(GrpcCompletionQueueTest, TestCqEndOp) {
143   grpc_event ev;
144   grpc_completion_queue* cc;
145   grpc_cq_completion completion;
146   grpc_cq_polling_type polling_types[] = {
147       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
148   grpc_completion_queue_attributes attr;
149   void* tag = create_test_tag();
150 
151   LOG_TEST("test_cq_end_op");
152 
153   attr.version = 1;
154   attr.cq_completion_type = GRPC_CQ_NEXT;
155   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
156     grpc_core::ExecCtx exec_ctx;
157     attr.cq_polling_type = polling_types[i];
158     cc = grpc_completion_queue_create(
159         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
160 
161     ASSERT_TRUE(grpc_cq_begin_op(cc, tag));
162     grpc_cq_end_op(cc, tag, absl::OkStatus(), do_nothing_end_completion,
163                    nullptr, &completion);
164 
165     ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
166                                     nullptr);
167     ASSERT_EQ(ev.type, GRPC_OP_COMPLETE);
168     ASSERT_EQ(ev.tag, tag);
169     ASSERT_TRUE(ev.success);
170 
171     shutdown_and_destroy(cc);
172   }
173 }
174 
TEST(GrpcCompletionQueueTest,TestCqTlsCacheFull)175 TEST(GrpcCompletionQueueTest, TestCqTlsCacheFull) {
176   grpc_event ev;
177   grpc_completion_queue* cc;
178   grpc_cq_completion completion;
179   grpc_cq_polling_type polling_types[] = {
180       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
181   grpc_completion_queue_attributes attr;
182   void* tag = create_test_tag();
183   void* res_tag;
184   int ok;
185 
186   LOG_TEST("test_cq_tls_cache_full");
187 
188   attr.version = 1;
189   attr.cq_completion_type = GRPC_CQ_NEXT;
190   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
191     grpc_core::ExecCtx exec_ctx;  // Reset exec_ctx
192     attr.cq_polling_type = polling_types[i];
193     cc = grpc_completion_queue_create(
194         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
195 
196     grpc_completion_queue_thread_local_cache_init(cc);
197     ASSERT_TRUE(grpc_cq_begin_op(cc, tag));
198     grpc_cq_end_op(cc, tag, absl::OkStatus(), do_nothing_end_completion,
199                    nullptr, &completion);
200 
201     ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
202                                     nullptr);
203     ASSERT_EQ(ev.type, GRPC_QUEUE_TIMEOUT);
204 
205     ASSERT_EQ(grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok),
206               1);
207     ASSERT_EQ(res_tag, tag);
208     ASSERT_TRUE(ok);
209 
210     ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
211                                     nullptr);
212     ASSERT_EQ(ev.type, GRPC_QUEUE_TIMEOUT);
213 
214     shutdown_and_destroy(cc);
215   }
216 }
217 
TEST(GrpcCompletionQueueTest,TestCqTlsCacheEmpty)218 TEST(GrpcCompletionQueueTest, TestCqTlsCacheEmpty) {
219   grpc_completion_queue* cc;
220   grpc_cq_polling_type polling_types[] = {
221       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
222   grpc_completion_queue_attributes attr;
223   void* res_tag;
224   int ok;
225 
226   LOG_TEST("test_cq_tls_cache_empty");
227 
228   attr.version = 1;
229   attr.cq_completion_type = GRPC_CQ_NEXT;
230   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
231     grpc_core::ExecCtx exec_ctx;  // Reset exec_ctx
232     attr.cq_polling_type = polling_types[i];
233     cc = grpc_completion_queue_create(
234         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
235 
236     ASSERT_EQ(grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok),
237               0);
238     grpc_completion_queue_thread_local_cache_init(cc);
239     ASSERT_EQ(grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok),
240               0);
241     shutdown_and_destroy(cc);
242   }
243 }
244 
TEST(GrpcCompletionQueueTest,TestShutdownThenNextPolling)245 TEST(GrpcCompletionQueueTest, TestShutdownThenNextPolling) {
246   grpc_cq_polling_type polling_types[] = {
247       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
248   grpc_completion_queue* cc;
249   grpc_completion_queue_attributes attr;
250   grpc_event event;
251   LOG_TEST("test_shutdown_then_next_polling");
252 
253   attr.version = 1;
254   attr.cq_completion_type = GRPC_CQ_NEXT;
255   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
256     attr.cq_polling_type = polling_types[i];
257     cc = grpc_completion_queue_create(
258         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
259     grpc_completion_queue_shutdown(cc);
260     event = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
261                                        nullptr);
262     ASSERT_EQ(event.type, GRPC_QUEUE_SHUTDOWN);
263     grpc_completion_queue_destroy(cc);
264   }
265 }
266 
TEST(GrpcCompletionQueueTest,TestShutdownThenNextWithTimeout)267 TEST(GrpcCompletionQueueTest, TestShutdownThenNextWithTimeout) {
268   grpc_cq_polling_type polling_types[] = {
269       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
270   grpc_completion_queue* cc;
271   grpc_completion_queue_attributes attr;
272   grpc_event event;
273   LOG_TEST("test_shutdown_then_next_with_timeout");
274 
275   attr.version = 1;
276   attr.cq_completion_type = GRPC_CQ_NEXT;
277   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
278     attr.cq_polling_type = polling_types[i];
279     cc = grpc_completion_queue_create(
280         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
281 
282     grpc_completion_queue_shutdown(cc);
283     event = grpc_completion_queue_next(cc, gpr_inf_future(GPR_CLOCK_REALTIME),
284                                        nullptr);
285     ASSERT_EQ(event.type, GRPC_QUEUE_SHUTDOWN);
286     grpc_completion_queue_destroy(cc);
287   }
288 }
289 
TEST(GrpcCompletionQueueTest,TestPluck)290 TEST(GrpcCompletionQueueTest, TestPluck) {
291   grpc_event ev;
292   grpc_completion_queue* cc;
293   void* tags[128];
294   grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
295   grpc_cq_polling_type polling_types[] = {
296       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
297   grpc_completion_queue_attributes attr;
298   unsigned i, j;
299 
300   LOG_TEST("test_pluck");
301 
302   for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
303     tags[i] = create_test_tag();
304     for (j = 0; j < i; j++) {
305       ASSERT_NE(tags[i], tags[j]);
306     }
307   }
308 
309   attr.version = 1;
310   attr.cq_completion_type = GRPC_CQ_PLUCK;
311   for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) {
312     grpc_core::ExecCtx exec_ctx;  // reset exec_ctx
313     attr.cq_polling_type = polling_types[pidx];
314     cc = grpc_completion_queue_create(
315         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
316 
317     for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
318       ASSERT_TRUE(grpc_cq_begin_op(cc, tags[i]));
319       grpc_cq_end_op(cc, tags[i], absl::OkStatus(), do_nothing_end_completion,
320                      nullptr, &completions[i]);
321     }
322 
323     for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
324       ev = grpc_completion_queue_pluck(
325           cc, tags[i], gpr_inf_past(GPR_CLOCK_REALTIME), nullptr);
326       ASSERT_EQ(ev.tag, tags[i]);
327     }
328 
329     for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
330       ASSERT_TRUE(grpc_cq_begin_op(cc, tags[i]));
331       grpc_cq_end_op(cc, tags[i], absl::OkStatus(), do_nothing_end_completion,
332                      nullptr, &completions[i]);
333     }
334 
335     for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
336       ev = grpc_completion_queue_pluck(cc, tags[GPR_ARRAY_SIZE(tags) - i - 1],
337                                        gpr_inf_past(GPR_CLOCK_REALTIME),
338                                        nullptr);
339       ASSERT_EQ(ev.tag, tags[GPR_ARRAY_SIZE(tags) - i - 1]);
340     }
341 
342     shutdown_and_destroy(cc);
343   }
344 }
345 
TEST(GrpcCompletionQueueTest,TestPluckAfterShutdown)346 TEST(GrpcCompletionQueueTest, TestPluckAfterShutdown) {
347   grpc_cq_polling_type polling_types[] = {
348       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
349   grpc_event ev;
350   grpc_completion_queue* cc;
351   grpc_completion_queue_attributes attr;
352 
353   LOG_TEST("test_pluck_after_shutdown");
354 
355   attr.version = 1;
356   attr.cq_completion_type = GRPC_CQ_PLUCK;
357   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
358     attr.cq_polling_type = polling_types[i];
359     cc = grpc_completion_queue_create(
360         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
361     grpc_completion_queue_shutdown(cc);
362     ev = grpc_completion_queue_pluck(
363         cc, nullptr, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
364     ASSERT_EQ(ev.type, GRPC_QUEUE_SHUTDOWN);
365     grpc_completion_queue_destroy(cc);
366   }
367 }
368 
TEST(GrpcCompletionQueueTest,TestCallback)369 TEST(GrpcCompletionQueueTest, TestCallback) {
370   grpc_completion_queue* cc;
371   static void* tags[128];
372   grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
373   grpc_cq_polling_type polling_types[] = {
374       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
375   grpc_completion_queue_attributes attr;
376   unsigned i;
377   static gpr_mu mu, shutdown_mu;
378   static gpr_cv cv, shutdown_cv;
379   static int cb_counter;
380   gpr_mu_init(&mu);
381   gpr_mu_init(&shutdown_mu);
382   gpr_cv_init(&cv);
383   gpr_cv_init(&shutdown_cv);
384 
385   LOG_TEST("test_callback");
386 
387   bool got_shutdown = false;
388   class ShutdownCallback : public grpc_completion_queue_functor {
389    public:
390     explicit ShutdownCallback(bool* done) : done_(done) {
391       functor_run = &ShutdownCallback::Run;
392       inlineable = false;
393     }
394     ~ShutdownCallback() {}
395     static void Run(grpc_completion_queue_functor* cb, int ok) {
396       gpr_mu_lock(&shutdown_mu);
397       *static_cast<ShutdownCallback*>(cb)->done_ = static_cast<bool>(ok);
398       // Signal when the shutdown callback is completed.
399       gpr_cv_signal(&shutdown_cv);
400       gpr_mu_unlock(&shutdown_mu);
401     }
402 
403    private:
404     bool* done_;
405   };
406   ShutdownCallback shutdown_cb(&got_shutdown);
407 
408   attr.version = 2;
409   attr.cq_completion_type = GRPC_CQ_CALLBACK;
410   attr.cq_shutdown_cb = &shutdown_cb;
411 
412   for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) {
413     int sumtags = 0;
414     int counter = 0;
415     cb_counter = 0;
416     {
417       // reset exec_ctx types
418       grpc_core::ExecCtx exec_ctx;
419       attr.cq_polling_type = polling_types[pidx];
420       cc = grpc_completion_queue_create(
421           grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
422 
423       class TagCallback : public grpc_completion_queue_functor {
424        public:
425         TagCallback(int* counter, int tag) : counter_(counter), tag_(tag) {
426           functor_run = &TagCallback::Run;
427           // Inlineable should be false since this callback takes locks.
428           inlineable = false;
429         }
430         ~TagCallback() {}
431         static void Run(grpc_completion_queue_functor* cb, int ok) {
432           ASSERT_TRUE(static_cast<bool>(ok));
433           auto* callback = static_cast<TagCallback*>(cb);
434           gpr_mu_lock(&mu);
435           cb_counter++;
436           *callback->counter_ += callback->tag_;
437           if (cb_counter == GPR_ARRAY_SIZE(tags)) {
438             gpr_cv_signal(&cv);
439           }
440           gpr_mu_unlock(&mu);
441           delete callback;
442         };
443 
444        private:
445         int* counter_;
446         int tag_;
447       };
448 
449       for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
450         tags[i] = static_cast<void*>(new TagCallback(&counter, i));
451         sumtags += i;
452       }
453 
454       for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
455         ASSERT_TRUE(grpc_cq_begin_op(cc, tags[i]));
456         grpc_cq_end_op(cc, tags[i], absl::OkStatus(), do_nothing_end_completion,
457                        nullptr, &completions[i]);
458       }
459 
460       gpr_mu_lock(&mu);
461       while (cb_counter != GPR_ARRAY_SIZE(tags)) {
462         // Wait for all the callbacks to complete.
463         gpr_cv_wait(&cv, &mu, gpr_inf_future(GPR_CLOCK_REALTIME));
464       }
465       gpr_mu_unlock(&mu);
466 
467       shutdown_and_destroy(cc);
468 
469       gpr_mu_lock(&shutdown_mu);
470       while (!got_shutdown) {
471         // Wait for the shutdown callback to complete.
472         gpr_cv_wait(&shutdown_cv, &shutdown_mu,
473                     gpr_inf_future(GPR_CLOCK_REALTIME));
474       }
475       gpr_mu_unlock(&shutdown_mu);
476     }
477 
478     // Run the assertions to check if the test ran successfully.
479     ASSERT_EQ(sumtags, counter);
480     ASSERT_TRUE(got_shutdown);
481     got_shutdown = false;
482   }
483 
484   gpr_cv_destroy(&cv);
485   gpr_cv_destroy(&shutdown_cv);
486   gpr_mu_destroy(&mu);
487   gpr_mu_destroy(&shutdown_mu);
488 }
489 
490 struct thread_state {
491   grpc_completion_queue* cc;
492   void* tag;
493 };
494 
main(int argc,char ** argv)495 int main(int argc, char** argv) {
496   grpc::testing::TestEnvironment env(&argc, argv);
497   ::testing::InitGoogleTest(&argc, argv);
498   grpc::testing::TestGrpcScope grpc_scope;
499   return RUN_ALL_TESTS();
500 }
501