xref: /aosp_15_r20/external/grpc-grpc/test/core/gprpp/work_serializer_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2019 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/core/lib/gprpp/work_serializer.h"
20 
21 #include <stddef.h>
22 
23 #include <memory>
24 #include <thread>
25 #include <utility>
26 #include <vector>
27 
28 #include "absl/functional/any_invocable.h"
29 #include "absl/synchronization/barrier.h"
30 #include "absl/time/clock.h"
31 #include "absl/time/time.h"
32 #include "gtest/gtest.h"
33 
34 #include <grpc/grpc.h>
35 #include <grpc/support/sync.h>
36 #include <grpc/support/time.h>
37 
38 #include "src/core/lib/debug/histogram_view.h"
39 #include "src/core/lib/debug/stats.h"
40 #include "src/core/lib/debug/stats_data.h"
41 #include "src/core/lib/event_engine/default_event_engine.h"
42 #include "src/core/lib/experiments/experiments.h"
43 #include "src/core/lib/gprpp/notification.h"
44 #include "src/core/lib/gprpp/thd.h"
45 #include "src/core/lib/iomgr/exec_ctx.h"
46 #include "test/core/event_engine/event_engine_test_utils.h"
47 #include "test/core/util/test_config.h"
48 
49 using grpc_event_engine::experimental::GetDefaultEventEngine;
50 using grpc_event_engine::experimental::WaitForSingleOwner;
51 
52 namespace grpc_core {
53 namespace {
TEST(WorkSerializerTest,NoOp)54 TEST(WorkSerializerTest, NoOp) {
55   auto lock = std::make_unique<WorkSerializer>(GetDefaultEventEngine());
56   lock.reset();
57   WaitForSingleOwner(GetDefaultEventEngine());
58 }
59 
TEST(WorkSerializerTest,ExecuteOneRun)60 TEST(WorkSerializerTest, ExecuteOneRun) {
61   auto lock = std::make_unique<WorkSerializer>(GetDefaultEventEngine());
62   gpr_event done;
63   gpr_event_init(&done);
64   lock->Run([&done]() { gpr_event_set(&done, reinterpret_cast<void*>(1)); },
65             DEBUG_LOCATION);
66   EXPECT_TRUE(gpr_event_wait(&done, grpc_timeout_seconds_to_deadline(5)) !=
67               nullptr);
68   lock.reset();
69   WaitForSingleOwner(GetDefaultEventEngine());
70 }
71 
TEST(WorkSerializerTest,ExecuteOneScheduleAndDrain)72 TEST(WorkSerializerTest, ExecuteOneScheduleAndDrain) {
73   auto lock = std::make_unique<WorkSerializer>(GetDefaultEventEngine());
74   gpr_event done;
75   gpr_event_init(&done);
76   lock->Schedule(
77       [&done]() {
78         EXPECT_EQ(gpr_event_get(&done), nullptr);
79         gpr_event_set(&done, reinterpret_cast<void*>(1));
80       },
81       DEBUG_LOCATION);
82   lock->DrainQueue();
83   EXPECT_TRUE(gpr_event_wait(&done, grpc_timeout_seconds_to_deadline(5)) !=
84               nullptr);
85   lock.reset();
86   WaitForSingleOwner(GetDefaultEventEngine());
87 }
88 
89 class TestThread {
90  public:
TestThread(WorkSerializer * lock)91   explicit TestThread(WorkSerializer* lock)
92       : lock_(lock), thread_("grpc_execute_many", ExecuteManyLoop, this) {
93     gpr_event_init(&done_);
94     thread_.Start();
95   }
96 
~TestThread()97   ~TestThread() {
98     EXPECT_NE(gpr_event_wait(&done_, gpr_inf_future(GPR_CLOCK_REALTIME)),
99               nullptr);
100     thread_.Join();
101   }
102 
103  private:
ExecuteManyLoop(void * arg)104   static void ExecuteManyLoop(void* arg) {
105     TestThread* self = static_cast<TestThread*>(arg);
106     size_t n = 1;
107     for (size_t i = 0; i < 10; i++) {
108       for (size_t j = 0; j < 10000; j++) {
109         struct ExecutionArgs {
110           size_t* counter;
111           size_t value;
112         };
113         ExecutionArgs* c = new ExecutionArgs;
114         c->counter = &self->counter_;
115         c->value = n++;
116         self->lock_->Run(
117             [c]() {
118               EXPECT_TRUE(*c->counter == c->value - 1);
119               *c->counter = c->value;
120               delete c;
121             },
122             DEBUG_LOCATION);
123       }
124       // sleep for a little bit, to test other threads picking up the load
125       gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(100));
126     }
127     self->lock_->Run(
128         [self]() { gpr_event_set(&self->done_, reinterpret_cast<void*>(1)); },
129         DEBUG_LOCATION);
130   }
131 
132   WorkSerializer* lock_ = nullptr;
133   Thread thread_;
134   size_t counter_ = 0;
135   gpr_event done_;
136 };
137 
TEST(WorkSerializerTest,ExecuteMany)138 TEST(WorkSerializerTest, ExecuteMany) {
139   auto lock = std::make_unique<WorkSerializer>(GetDefaultEventEngine());
140   {
141     std::vector<std::unique_ptr<TestThread>> threads;
142     for (size_t i = 0; i < 10; ++i) {
143       threads.push_back(std::make_unique<TestThread>(lock.get()));
144     }
145   }
146   lock.reset();
147   WaitForSingleOwner(GetDefaultEventEngine());
148 }
149 
150 class TestThreadScheduleAndDrain {
151  public:
TestThreadScheduleAndDrain(WorkSerializer * lock)152   explicit TestThreadScheduleAndDrain(WorkSerializer* lock)
153       : lock_(lock), thread_("grpc_execute_many", ExecuteManyLoop, this) {
154     gpr_event_init(&done_);
155     thread_.Start();
156   }
157 
~TestThreadScheduleAndDrain()158   ~TestThreadScheduleAndDrain() {
159     EXPECT_NE(gpr_event_wait(&done_, gpr_inf_future(GPR_CLOCK_REALTIME)),
160               nullptr);
161     thread_.Join();
162   }
163 
164  private:
ExecuteManyLoop(void * arg)165   static void ExecuteManyLoop(void* arg) {
166     TestThreadScheduleAndDrain* self =
167         static_cast<TestThreadScheduleAndDrain*>(arg);
168     size_t n = 1;
169     for (size_t i = 0; i < 10; i++) {
170       for (size_t j = 0; j < 10000; j++) {
171         struct ExecutionArgs {
172           size_t* counter;
173           size_t value;
174         };
175         ExecutionArgs* c = new ExecutionArgs;
176         c->counter = &self->counter_;
177         c->value = n++;
178         self->lock_->Schedule(
179             [c]() {
180               EXPECT_TRUE(*c->counter == c->value - 1);
181               *c->counter = c->value;
182               delete c;
183             },
184             DEBUG_LOCATION);
185       }
186       self->lock_->DrainQueue();
187       // sleep for a little bit, to test other threads picking up the load
188       gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(100));
189     }
190     self->lock_->Run(
191         [self]() { gpr_event_set(&self->done_, reinterpret_cast<void*>(1)); },
192         DEBUG_LOCATION);
193   }
194 
195   WorkSerializer* lock_ = nullptr;
196   Thread thread_;
197   size_t counter_ = 0;
198   gpr_event done_;
199 };
200 
TEST(WorkSerializerTest,ExecuteManyScheduleAndDrain)201 TEST(WorkSerializerTest, ExecuteManyScheduleAndDrain) {
202   auto lock = std::make_unique<WorkSerializer>(GetDefaultEventEngine());
203   {
204     std::vector<std::unique_ptr<TestThreadScheduleAndDrain>> threads;
205     for (size_t i = 0; i < 10; ++i) {
206       threads.push_back(
207           std::make_unique<TestThreadScheduleAndDrain>(lock.get()));
208     }
209   }
210   lock.reset();
211   WaitForSingleOwner(GetDefaultEventEngine());
212 }
213 
TEST(WorkSerializerTest,ExecuteManyMixedRunScheduleAndDrain)214 TEST(WorkSerializerTest, ExecuteManyMixedRunScheduleAndDrain) {
215   auto lock = std::make_unique<WorkSerializer>(GetDefaultEventEngine());
216   {
217     std::vector<std::unique_ptr<TestThread>> run_threads;
218     std::vector<std::unique_ptr<TestThreadScheduleAndDrain>> schedule_threads;
219     for (size_t i = 0; i < 10; ++i) {
220       run_threads.push_back(std::make_unique<TestThread>(lock.get()));
221       schedule_threads.push_back(
222           std::make_unique<TestThreadScheduleAndDrain>(lock.get()));
223     }
224   }
225   lock.reset();
226   WaitForSingleOwner(GetDefaultEventEngine());
227 }
228 
229 // Tests that work serializers allow destruction from the last callback
TEST(WorkSerializerTest,CallbackDestroysWorkSerializer)230 TEST(WorkSerializerTest, CallbackDestroysWorkSerializer) {
231   auto lock = std::make_shared<WorkSerializer>(GetDefaultEventEngine());
232   lock->Run([&]() { lock.reset(); }, DEBUG_LOCATION);
233   WaitForSingleOwner(GetDefaultEventEngine());
234 }
235 
236 // Tests additional racy conditions when the last callback triggers work
237 // serializer destruction.
TEST(WorkSerializerTest,WorkSerializerDestructionRace)238 TEST(WorkSerializerTest, WorkSerializerDestructionRace) {
239   for (int i = 0; i < 1000; ++i) {
240     auto lock = std::make_shared<WorkSerializer>(GetDefaultEventEngine());
241     Notification notification;
242     std::thread t1([&]() {
243       notification.WaitForNotification();
244       lock.reset();
245     });
246     lock->Run([&]() { notification.Notify(); }, DEBUG_LOCATION);
247     t1.join();
248   }
249   WaitForSingleOwner(GetDefaultEventEngine());
250 }
251 
252 // Tests racy conditions when the last callback triggers work
253 // serializer destruction.
TEST(WorkSerializerTest,WorkSerializerDestructionRaceMultipleThreads)254 TEST(WorkSerializerTest, WorkSerializerDestructionRaceMultipleThreads) {
255   auto lock = std::make_shared<WorkSerializer>(GetDefaultEventEngine());
256   absl::Barrier barrier(11);
257   std::vector<std::thread> threads;
258   threads.reserve(10);
259   for (int i = 0; i < 10; ++i) {
260     threads.emplace_back([lock, &barrier]() mutable {
261       barrier.Block();
262       lock->Run([lock]() mutable { lock.reset(); }, DEBUG_LOCATION);
263     });
264   }
265   barrier.Block();
266   lock.reset();
267   for (auto& thread : threads) {
268     thread.join();
269   }
270   WaitForSingleOwner(GetDefaultEventEngine());
271 }
272 
TEST(WorkSerializerTest,MetricsWork)273 TEST(WorkSerializerTest, MetricsWork) {
274   if (!IsWorkSerializerDispatchEnabled()) {
275     GTEST_SKIP() << "Work serializer dispatch experiment not enabled";
276   }
277 
278   auto serializer = std::make_unique<WorkSerializer>(GetDefaultEventEngine());
279   auto schedule_sleep = [&serializer](absl::Duration how_long) {
280     ExecCtx exec_ctx;
281     auto n = std::make_shared<Notification>();
282     serializer->Run(
283         [how_long, n]() {
284           absl::SleepFor(how_long);
285           n->Notify();
286         },
287         DEBUG_LOCATION);
288     return n;
289   };
290   auto before = global_stats().Collect();
291   auto stats_diff_from = [&before](absl::AnyInvocable<void()> f) {
292     f();
293     // Insert a pause for the work serialier to update the stats. Reading stats
294     // here can still race with the work serializer's update attempt.
295     gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
296     auto after = global_stats().Collect();
297     auto diff = after->Diff(*before);
298     before = std::move(after);
299     return diff;
300   };
301   // Test adding one work item to the queue
302   auto diff = stats_diff_from(
303       [&] { schedule_sleep(absl::Seconds(1))->WaitForNotification(); });
304   EXPECT_EQ(diff->work_serializer_items_enqueued, 1);
305   EXPECT_EQ(diff->work_serializer_items_dequeued, 1);
306   EXPECT_GE(diff->histogram(GlobalStats::Histogram::kWorkSerializerItemsPerRun)
307                 .Percentile(0.5),
308             1.0);
309   EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerItemsPerRun)
310                 .Percentile(0.5),
311             2.0);
312   EXPECT_GE(diff->histogram(GlobalStats::Histogram::kWorkSerializerRunTimeMs)
313                 .Percentile(0.5),
314             800.0);
315   EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerRunTimeMs)
316                 .Percentile(0.5),
317             1300.0);
318   EXPECT_GE(diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimeMs)
319                 .Percentile(0.5),
320             800.0);
321   EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimeMs)
322                 .Percentile(0.5),
323             1300.0);
324   EXPECT_GE(
325       diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimePerItemMs)
326           .Percentile(0.5),
327       800.0);
328   EXPECT_LE(
329       diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimePerItemMs)
330           .Percentile(0.5),
331       1300.0);
332   EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerRunTimeMs)
333                 .Percentile(0.5),
334             diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimeMs)
335                 .Percentile(0.5));
336   // Now throw a bunch of work in and see that we get good results
337   diff = stats_diff_from([&] {
338     for (int i = 0; i < 10; i++) {
339       schedule_sleep(absl::Milliseconds(1000));
340     }
341     schedule_sleep(absl::Milliseconds(1000))->WaitForNotification();
342   });
343   EXPECT_EQ(diff->work_serializer_items_enqueued, 11);
344   EXPECT_EQ(diff->work_serializer_items_dequeued, 11);
345   EXPECT_GE(diff->histogram(GlobalStats::Histogram::kWorkSerializerItemsPerRun)
346                 .Percentile(0.5),
347             7.0);
348   EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerItemsPerRun)
349                 .Percentile(0.5),
350             15.0);
351   EXPECT_GE(diff->histogram(GlobalStats::Histogram::kWorkSerializerRunTimeMs)
352                 .Percentile(0.5),
353             7000.0);
354   EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerRunTimeMs)
355                 .Percentile(0.5),
356             15000.0);
357   EXPECT_GE(diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimeMs)
358                 .Percentile(0.5),
359             7000.0);
360   EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimeMs)
361                 .Percentile(0.5),
362             15000.0);
363   EXPECT_GE(
364       diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimePerItemMs)
365           .Percentile(0.5),
366       800.0);
367   EXPECT_LE(
368       diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimePerItemMs)
369           .Percentile(0.5),
370       1300.0);
371   EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerRunTimeMs)
372                 .Percentile(0.5),
373             diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimeMs)
374                 .Percentile(0.5));
375 
376   serializer.reset();
377   WaitForSingleOwner(GetDefaultEventEngine());
378 }
379 
380 #ifndef NDEBUG
TEST(WorkSerializerTest,RunningInWorkSerializer)381 TEST(WorkSerializerTest, RunningInWorkSerializer) {
382   auto work_serializer1 =
383       std::make_shared<WorkSerializer>(GetDefaultEventEngine());
384   auto work_serializer2 =
385       std::make_shared<WorkSerializer>(GetDefaultEventEngine());
386   EXPECT_FALSE(work_serializer1->RunningInWorkSerializer());
387   EXPECT_FALSE(work_serializer2->RunningInWorkSerializer());
388   work_serializer1->Run(
389       [=]() {
390         EXPECT_TRUE(work_serializer1->RunningInWorkSerializer());
391         EXPECT_FALSE(work_serializer2->RunningInWorkSerializer());
392         work_serializer2->Run(
393             [=]() {
394               EXPECT_EQ(work_serializer1->RunningInWorkSerializer(),
395                         !IsWorkSerializerDispatchEnabled());
396               EXPECT_TRUE(work_serializer2->RunningInWorkSerializer());
397             },
398             DEBUG_LOCATION);
399       },
400       DEBUG_LOCATION);
401   EXPECT_FALSE(work_serializer1->RunningInWorkSerializer());
402   EXPECT_FALSE(work_serializer2->RunningInWorkSerializer());
403   work_serializer2->Run(
404       [=]() {
405         EXPECT_FALSE(work_serializer1->RunningInWorkSerializer());
406         EXPECT_TRUE(work_serializer2->RunningInWorkSerializer());
407         work_serializer1->Run(
408             [=]() {
409               EXPECT_TRUE(work_serializer1->RunningInWorkSerializer());
410               EXPECT_EQ(work_serializer2->RunningInWorkSerializer(),
411                         !IsWorkSerializerDispatchEnabled());
412             },
413             DEBUG_LOCATION);
414       },
415       DEBUG_LOCATION);
416   EXPECT_FALSE(work_serializer1->RunningInWorkSerializer());
417   EXPECT_FALSE(work_serializer2->RunningInWorkSerializer());
418   Notification done1;
419   Notification done2;
420   work_serializer1->Run([&done1]() { done1.Notify(); }, DEBUG_LOCATION);
421   work_serializer2->Run([&done2]() { done2.Notify(); }, DEBUG_LOCATION);
422   done1.WaitForNotification();
423   done2.WaitForNotification();
424   work_serializer1.reset();
425   work_serializer2.reset();
426   WaitForSingleOwner(GetDefaultEventEngine());
427 }
428 #endif
429 
430 }  // namespace
431 }  // namespace grpc_core
432 
main(int argc,char ** argv)433 int main(int argc, char** argv) {
434   grpc::testing::TestEnvironment env(&argc, argv);
435   ::testing::InitGoogleTest(&argc, argv);
436   grpc_init();
437   int retval = RUN_ALL_TESTS();
438   grpc_shutdown();
439   return retval;
440 }
441