1 //
2 //
3 // Copyright 2019 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "src/core/lib/gprpp/work_serializer.h"
20
21 #include <stddef.h>
22
23 #include <memory>
24 #include <thread>
25 #include <utility>
26 #include <vector>
27
28 #include "absl/functional/any_invocable.h"
29 #include "absl/synchronization/barrier.h"
30 #include "absl/time/clock.h"
31 #include "absl/time/time.h"
32 #include "gtest/gtest.h"
33
34 #include <grpc/grpc.h>
35 #include <grpc/support/sync.h>
36 #include <grpc/support/time.h>
37
38 #include "src/core/lib/debug/histogram_view.h"
39 #include "src/core/lib/debug/stats.h"
40 #include "src/core/lib/debug/stats_data.h"
41 #include "src/core/lib/event_engine/default_event_engine.h"
42 #include "src/core/lib/experiments/experiments.h"
43 #include "src/core/lib/gprpp/notification.h"
44 #include "src/core/lib/gprpp/thd.h"
45 #include "src/core/lib/iomgr/exec_ctx.h"
46 #include "test/core/event_engine/event_engine_test_utils.h"
47 #include "test/core/util/test_config.h"
48
49 using grpc_event_engine::experimental::GetDefaultEventEngine;
50 using grpc_event_engine::experimental::WaitForSingleOwner;
51
52 namespace grpc_core {
53 namespace {
TEST(WorkSerializerTest,NoOp)54 TEST(WorkSerializerTest, NoOp) {
55 auto lock = std::make_unique<WorkSerializer>(GetDefaultEventEngine());
56 lock.reset();
57 WaitForSingleOwner(GetDefaultEventEngine());
58 }
59
TEST(WorkSerializerTest,ExecuteOneRun)60 TEST(WorkSerializerTest, ExecuteOneRun) {
61 auto lock = std::make_unique<WorkSerializer>(GetDefaultEventEngine());
62 gpr_event done;
63 gpr_event_init(&done);
64 lock->Run([&done]() { gpr_event_set(&done, reinterpret_cast<void*>(1)); },
65 DEBUG_LOCATION);
66 EXPECT_TRUE(gpr_event_wait(&done, grpc_timeout_seconds_to_deadline(5)) !=
67 nullptr);
68 lock.reset();
69 WaitForSingleOwner(GetDefaultEventEngine());
70 }
71
TEST(WorkSerializerTest,ExecuteOneScheduleAndDrain)72 TEST(WorkSerializerTest, ExecuteOneScheduleAndDrain) {
73 auto lock = std::make_unique<WorkSerializer>(GetDefaultEventEngine());
74 gpr_event done;
75 gpr_event_init(&done);
76 lock->Schedule(
77 [&done]() {
78 EXPECT_EQ(gpr_event_get(&done), nullptr);
79 gpr_event_set(&done, reinterpret_cast<void*>(1));
80 },
81 DEBUG_LOCATION);
82 lock->DrainQueue();
83 EXPECT_TRUE(gpr_event_wait(&done, grpc_timeout_seconds_to_deadline(5)) !=
84 nullptr);
85 lock.reset();
86 WaitForSingleOwner(GetDefaultEventEngine());
87 }
88
89 class TestThread {
90 public:
TestThread(WorkSerializer * lock)91 explicit TestThread(WorkSerializer* lock)
92 : lock_(lock), thread_("grpc_execute_many", ExecuteManyLoop, this) {
93 gpr_event_init(&done_);
94 thread_.Start();
95 }
96
~TestThread()97 ~TestThread() {
98 EXPECT_NE(gpr_event_wait(&done_, gpr_inf_future(GPR_CLOCK_REALTIME)),
99 nullptr);
100 thread_.Join();
101 }
102
103 private:
ExecuteManyLoop(void * arg)104 static void ExecuteManyLoop(void* arg) {
105 TestThread* self = static_cast<TestThread*>(arg);
106 size_t n = 1;
107 for (size_t i = 0; i < 10; i++) {
108 for (size_t j = 0; j < 10000; j++) {
109 struct ExecutionArgs {
110 size_t* counter;
111 size_t value;
112 };
113 ExecutionArgs* c = new ExecutionArgs;
114 c->counter = &self->counter_;
115 c->value = n++;
116 self->lock_->Run(
117 [c]() {
118 EXPECT_TRUE(*c->counter == c->value - 1);
119 *c->counter = c->value;
120 delete c;
121 },
122 DEBUG_LOCATION);
123 }
124 // sleep for a little bit, to test other threads picking up the load
125 gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(100));
126 }
127 self->lock_->Run(
128 [self]() { gpr_event_set(&self->done_, reinterpret_cast<void*>(1)); },
129 DEBUG_LOCATION);
130 }
131
132 WorkSerializer* lock_ = nullptr;
133 Thread thread_;
134 size_t counter_ = 0;
135 gpr_event done_;
136 };
137
TEST(WorkSerializerTest,ExecuteMany)138 TEST(WorkSerializerTest, ExecuteMany) {
139 auto lock = std::make_unique<WorkSerializer>(GetDefaultEventEngine());
140 {
141 std::vector<std::unique_ptr<TestThread>> threads;
142 for (size_t i = 0; i < 10; ++i) {
143 threads.push_back(std::make_unique<TestThread>(lock.get()));
144 }
145 }
146 lock.reset();
147 WaitForSingleOwner(GetDefaultEventEngine());
148 }
149
150 class TestThreadScheduleAndDrain {
151 public:
TestThreadScheduleAndDrain(WorkSerializer * lock)152 explicit TestThreadScheduleAndDrain(WorkSerializer* lock)
153 : lock_(lock), thread_("grpc_execute_many", ExecuteManyLoop, this) {
154 gpr_event_init(&done_);
155 thread_.Start();
156 }
157
~TestThreadScheduleAndDrain()158 ~TestThreadScheduleAndDrain() {
159 EXPECT_NE(gpr_event_wait(&done_, gpr_inf_future(GPR_CLOCK_REALTIME)),
160 nullptr);
161 thread_.Join();
162 }
163
164 private:
ExecuteManyLoop(void * arg)165 static void ExecuteManyLoop(void* arg) {
166 TestThreadScheduleAndDrain* self =
167 static_cast<TestThreadScheduleAndDrain*>(arg);
168 size_t n = 1;
169 for (size_t i = 0; i < 10; i++) {
170 for (size_t j = 0; j < 10000; j++) {
171 struct ExecutionArgs {
172 size_t* counter;
173 size_t value;
174 };
175 ExecutionArgs* c = new ExecutionArgs;
176 c->counter = &self->counter_;
177 c->value = n++;
178 self->lock_->Schedule(
179 [c]() {
180 EXPECT_TRUE(*c->counter == c->value - 1);
181 *c->counter = c->value;
182 delete c;
183 },
184 DEBUG_LOCATION);
185 }
186 self->lock_->DrainQueue();
187 // sleep for a little bit, to test other threads picking up the load
188 gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(100));
189 }
190 self->lock_->Run(
191 [self]() { gpr_event_set(&self->done_, reinterpret_cast<void*>(1)); },
192 DEBUG_LOCATION);
193 }
194
195 WorkSerializer* lock_ = nullptr;
196 Thread thread_;
197 size_t counter_ = 0;
198 gpr_event done_;
199 };
200
TEST(WorkSerializerTest,ExecuteManyScheduleAndDrain)201 TEST(WorkSerializerTest, ExecuteManyScheduleAndDrain) {
202 auto lock = std::make_unique<WorkSerializer>(GetDefaultEventEngine());
203 {
204 std::vector<std::unique_ptr<TestThreadScheduleAndDrain>> threads;
205 for (size_t i = 0; i < 10; ++i) {
206 threads.push_back(
207 std::make_unique<TestThreadScheduleAndDrain>(lock.get()));
208 }
209 }
210 lock.reset();
211 WaitForSingleOwner(GetDefaultEventEngine());
212 }
213
TEST(WorkSerializerTest,ExecuteManyMixedRunScheduleAndDrain)214 TEST(WorkSerializerTest, ExecuteManyMixedRunScheduleAndDrain) {
215 auto lock = std::make_unique<WorkSerializer>(GetDefaultEventEngine());
216 {
217 std::vector<std::unique_ptr<TestThread>> run_threads;
218 std::vector<std::unique_ptr<TestThreadScheduleAndDrain>> schedule_threads;
219 for (size_t i = 0; i < 10; ++i) {
220 run_threads.push_back(std::make_unique<TestThread>(lock.get()));
221 schedule_threads.push_back(
222 std::make_unique<TestThreadScheduleAndDrain>(lock.get()));
223 }
224 }
225 lock.reset();
226 WaitForSingleOwner(GetDefaultEventEngine());
227 }
228
229 // Tests that work serializers allow destruction from the last callback
TEST(WorkSerializerTest,CallbackDestroysWorkSerializer)230 TEST(WorkSerializerTest, CallbackDestroysWorkSerializer) {
231 auto lock = std::make_shared<WorkSerializer>(GetDefaultEventEngine());
232 lock->Run([&]() { lock.reset(); }, DEBUG_LOCATION);
233 WaitForSingleOwner(GetDefaultEventEngine());
234 }
235
236 // Tests additional racy conditions when the last callback triggers work
237 // serializer destruction.
TEST(WorkSerializerTest,WorkSerializerDestructionRace)238 TEST(WorkSerializerTest, WorkSerializerDestructionRace) {
239 for (int i = 0; i < 1000; ++i) {
240 auto lock = std::make_shared<WorkSerializer>(GetDefaultEventEngine());
241 Notification notification;
242 std::thread t1([&]() {
243 notification.WaitForNotification();
244 lock.reset();
245 });
246 lock->Run([&]() { notification.Notify(); }, DEBUG_LOCATION);
247 t1.join();
248 }
249 WaitForSingleOwner(GetDefaultEventEngine());
250 }
251
252 // Tests racy conditions when the last callback triggers work
253 // serializer destruction.
TEST(WorkSerializerTest,WorkSerializerDestructionRaceMultipleThreads)254 TEST(WorkSerializerTest, WorkSerializerDestructionRaceMultipleThreads) {
255 auto lock = std::make_shared<WorkSerializer>(GetDefaultEventEngine());
256 absl::Barrier barrier(11);
257 std::vector<std::thread> threads;
258 threads.reserve(10);
259 for (int i = 0; i < 10; ++i) {
260 threads.emplace_back([lock, &barrier]() mutable {
261 barrier.Block();
262 lock->Run([lock]() mutable { lock.reset(); }, DEBUG_LOCATION);
263 });
264 }
265 barrier.Block();
266 lock.reset();
267 for (auto& thread : threads) {
268 thread.join();
269 }
270 WaitForSingleOwner(GetDefaultEventEngine());
271 }
272
TEST(WorkSerializerTest,MetricsWork)273 TEST(WorkSerializerTest, MetricsWork) {
274 if (!IsWorkSerializerDispatchEnabled()) {
275 GTEST_SKIP() << "Work serializer dispatch experiment not enabled";
276 }
277
278 auto serializer = std::make_unique<WorkSerializer>(GetDefaultEventEngine());
279 auto schedule_sleep = [&serializer](absl::Duration how_long) {
280 ExecCtx exec_ctx;
281 auto n = std::make_shared<Notification>();
282 serializer->Run(
283 [how_long, n]() {
284 absl::SleepFor(how_long);
285 n->Notify();
286 },
287 DEBUG_LOCATION);
288 return n;
289 };
290 auto before = global_stats().Collect();
291 auto stats_diff_from = [&before](absl::AnyInvocable<void()> f) {
292 f();
293 // Insert a pause for the work serialier to update the stats. Reading stats
294 // here can still race with the work serializer's update attempt.
295 gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
296 auto after = global_stats().Collect();
297 auto diff = after->Diff(*before);
298 before = std::move(after);
299 return diff;
300 };
301 // Test adding one work item to the queue
302 auto diff = stats_diff_from(
303 [&] { schedule_sleep(absl::Seconds(1))->WaitForNotification(); });
304 EXPECT_EQ(diff->work_serializer_items_enqueued, 1);
305 EXPECT_EQ(diff->work_serializer_items_dequeued, 1);
306 EXPECT_GE(diff->histogram(GlobalStats::Histogram::kWorkSerializerItemsPerRun)
307 .Percentile(0.5),
308 1.0);
309 EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerItemsPerRun)
310 .Percentile(0.5),
311 2.0);
312 EXPECT_GE(diff->histogram(GlobalStats::Histogram::kWorkSerializerRunTimeMs)
313 .Percentile(0.5),
314 800.0);
315 EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerRunTimeMs)
316 .Percentile(0.5),
317 1300.0);
318 EXPECT_GE(diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimeMs)
319 .Percentile(0.5),
320 800.0);
321 EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimeMs)
322 .Percentile(0.5),
323 1300.0);
324 EXPECT_GE(
325 diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimePerItemMs)
326 .Percentile(0.5),
327 800.0);
328 EXPECT_LE(
329 diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimePerItemMs)
330 .Percentile(0.5),
331 1300.0);
332 EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerRunTimeMs)
333 .Percentile(0.5),
334 diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimeMs)
335 .Percentile(0.5));
336 // Now throw a bunch of work in and see that we get good results
337 diff = stats_diff_from([&] {
338 for (int i = 0; i < 10; i++) {
339 schedule_sleep(absl::Milliseconds(1000));
340 }
341 schedule_sleep(absl::Milliseconds(1000))->WaitForNotification();
342 });
343 EXPECT_EQ(diff->work_serializer_items_enqueued, 11);
344 EXPECT_EQ(diff->work_serializer_items_dequeued, 11);
345 EXPECT_GE(diff->histogram(GlobalStats::Histogram::kWorkSerializerItemsPerRun)
346 .Percentile(0.5),
347 7.0);
348 EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerItemsPerRun)
349 .Percentile(0.5),
350 15.0);
351 EXPECT_GE(diff->histogram(GlobalStats::Histogram::kWorkSerializerRunTimeMs)
352 .Percentile(0.5),
353 7000.0);
354 EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerRunTimeMs)
355 .Percentile(0.5),
356 15000.0);
357 EXPECT_GE(diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimeMs)
358 .Percentile(0.5),
359 7000.0);
360 EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimeMs)
361 .Percentile(0.5),
362 15000.0);
363 EXPECT_GE(
364 diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimePerItemMs)
365 .Percentile(0.5),
366 800.0);
367 EXPECT_LE(
368 diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimePerItemMs)
369 .Percentile(0.5),
370 1300.0);
371 EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerRunTimeMs)
372 .Percentile(0.5),
373 diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimeMs)
374 .Percentile(0.5));
375
376 serializer.reset();
377 WaitForSingleOwner(GetDefaultEventEngine());
378 }
379
380 #ifndef NDEBUG
TEST(WorkSerializerTest,RunningInWorkSerializer)381 TEST(WorkSerializerTest, RunningInWorkSerializer) {
382 auto work_serializer1 =
383 std::make_shared<WorkSerializer>(GetDefaultEventEngine());
384 auto work_serializer2 =
385 std::make_shared<WorkSerializer>(GetDefaultEventEngine());
386 EXPECT_FALSE(work_serializer1->RunningInWorkSerializer());
387 EXPECT_FALSE(work_serializer2->RunningInWorkSerializer());
388 work_serializer1->Run(
389 [=]() {
390 EXPECT_TRUE(work_serializer1->RunningInWorkSerializer());
391 EXPECT_FALSE(work_serializer2->RunningInWorkSerializer());
392 work_serializer2->Run(
393 [=]() {
394 EXPECT_EQ(work_serializer1->RunningInWorkSerializer(),
395 !IsWorkSerializerDispatchEnabled());
396 EXPECT_TRUE(work_serializer2->RunningInWorkSerializer());
397 },
398 DEBUG_LOCATION);
399 },
400 DEBUG_LOCATION);
401 EXPECT_FALSE(work_serializer1->RunningInWorkSerializer());
402 EXPECT_FALSE(work_serializer2->RunningInWorkSerializer());
403 work_serializer2->Run(
404 [=]() {
405 EXPECT_FALSE(work_serializer1->RunningInWorkSerializer());
406 EXPECT_TRUE(work_serializer2->RunningInWorkSerializer());
407 work_serializer1->Run(
408 [=]() {
409 EXPECT_TRUE(work_serializer1->RunningInWorkSerializer());
410 EXPECT_EQ(work_serializer2->RunningInWorkSerializer(),
411 !IsWorkSerializerDispatchEnabled());
412 },
413 DEBUG_LOCATION);
414 },
415 DEBUG_LOCATION);
416 EXPECT_FALSE(work_serializer1->RunningInWorkSerializer());
417 EXPECT_FALSE(work_serializer2->RunningInWorkSerializer());
418 Notification done1;
419 Notification done2;
420 work_serializer1->Run([&done1]() { done1.Notify(); }, DEBUG_LOCATION);
421 work_serializer2->Run([&done2]() { done2.Notify(); }, DEBUG_LOCATION);
422 done1.WaitForNotification();
423 done2.WaitForNotification();
424 work_serializer1.reset();
425 work_serializer2.reset();
426 WaitForSingleOwner(GetDefaultEventEngine());
427 }
428 #endif
429
430 } // namespace
431 } // namespace grpc_core
432
main(int argc,char ** argv)433 int main(int argc, char** argv) {
434 grpc::testing::TestEnvironment env(&argc, argv);
435 ::testing::InitGoogleTest(&argc, argv);
436 grpc_init();
437 int retval = RUN_ALL_TESTS();
438 grpc_shutdown();
439 return retval;
440 }
441