1 // Copyright 2024 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/thread_pool/thread_group_semaphore.h"
6
7 #include <stddef.h>
8
9 #include <algorithm>
10 #include <atomic>
11 #include <memory>
12 #include <optional>
13 #include <unordered_set>
14 #include <utility>
15 #include <vector>
16
17 #include "base/atomicops.h"
18 #include "base/barrier_closure.h"
19 #include "base/functional/bind.h"
20 #include "base/functional/callback.h"
21 #include "base/functional/callback_helpers.h"
22 #include "base/memory/ptr_util.h"
23 #include "base/memory/raw_ptr.h"
24 #include "base/memory/ref_counted.h"
25 #include "base/metrics/statistics_recorder.h"
26 #include "base/synchronization/atomic_flag.h"
27 #include "base/synchronization/condition_variable.h"
28 #include "base/synchronization/lock.h"
29 #include "base/task/task_features.h"
30 #include "base/task/task_runner.h"
31 #include "base/task/thread_pool/delayed_task_manager.h"
32 #include "base/task/thread_pool/environment_config.h"
33 #include "base/task/thread_pool/pooled_task_runner_delegate.h"
34 #include "base/task/thread_pool/sequence.h"
35 #include "base/task/thread_pool/task_source_sort_key.h"
36 #include "base/task/thread_pool/task_tracker.h"
37 #include "base/task/thread_pool/test_task_factory.h"
38 #include "base/task/thread_pool/test_utils.h"
39 #include "base/task/thread_pool/worker_thread_observer.h"
40 #include "base/test/bind.h"
41 #include "base/test/gtest_util.h"
42 #include "base/test/scoped_feature_list.h"
43 #include "base/test/test_simple_task_runner.h"
44 #include "base/test/test_timeouts.h"
45 #include "base/test/test_waitable_event.h"
46 #include "base/threading/platform_thread.h"
47 #include "base/threading/scoped_blocking_call.h"
48 #include "base/threading/simple_thread.h"
49 #include "base/threading/thread.h"
50 #include "base/threading/thread_checker_impl.h"
51 #include "base/time/time.h"
52 #include "base/timer/timer.h"
53 #include "build/build_config.h"
54 #include "testing/gtest/include/gtest/gtest.h"
55
56 namespace base {
57 namespace internal {
58 namespace {
59
60 constexpr size_t kMaxTasks = 4;
61 constexpr size_t kNumThreadsPostingTasks = 4;
62 constexpr size_t kNumTasksPostedPerThread = 150;
63 // This can't be lower because Windows' TestWaitableEvent wakes up too early
64 // when a small timeout is used. This results in many spurious wake ups before a
65 // worker is allowed to cleanup.
66 constexpr TimeDelta kReclaimTimeForCleanupTests = Milliseconds(500);
67 constexpr size_t kLargeNumber = 512;
68
69 class ThreadGroupSemaphoreImplTestBase : public ThreadGroup::Delegate {
70 public:
71 ThreadGroupSemaphoreImplTestBase(const ThreadGroupSemaphoreImplTestBase&) =
72 delete;
73 ThreadGroupSemaphoreImplTestBase& operator=(
74 const ThreadGroupSemaphoreImplTestBase&) = delete;
75
76 protected:
ThreadGroupSemaphoreImplTestBase()77 ThreadGroupSemaphoreImplTestBase()
78 : service_thread_("ThreadPoolServiceThread"),
79 tracked_ref_factory_(this) {}
80
CommonTearDown()81 void CommonTearDown() {
82 delayed_task_manager_.Shutdown();
83 service_thread_.Stop();
84 task_tracker_.FlushForTesting();
85 if (thread_group_) {
86 thread_group_->JoinForTesting();
87 }
88 mock_pooled_task_runner_delegate_.SetThreadGroup(nullptr);
89 thread_group_.reset();
90 }
91
CreateThreadGroup(ThreadType thread_type=ThreadType::kDefault)92 void CreateThreadGroup(ThreadType thread_type = ThreadType::kDefault) {
93 ASSERT_FALSE(thread_group_);
94 service_thread_.Start();
95 delayed_task_manager_.Start(service_thread_.task_runner());
96 thread_group_ = std::make_unique<ThreadGroupSemaphore>(
97 "TestThreadGroup", "A", thread_type, task_tracker_.GetTrackedRef(),
98 tracked_ref_factory_.GetTrackedRef());
99 ASSERT_TRUE(thread_group_);
100
101 mock_pooled_task_runner_delegate_.SetThreadGroup(thread_group_.get());
102 }
103
StartThreadGroup(TimeDelta suggested_reclaim_time,size_t max_tasks,std::optional<int> max_best_effort_tasks=std::nullopt,WorkerThreadObserver * worker_observer=nullptr,std::optional<TimeDelta> may_block_threshold=std::nullopt)104 void StartThreadGroup(
105 TimeDelta suggested_reclaim_time,
106 size_t max_tasks,
107 std::optional<int> max_best_effort_tasks = std::nullopt,
108 WorkerThreadObserver* worker_observer = nullptr,
109 std::optional<TimeDelta> may_block_threshold = std::nullopt) {
110 ASSERT_TRUE(thread_group_);
111 thread_group_->Start(
112 max_tasks,
113 max_best_effort_tasks ? max_best_effort_tasks.value() : max_tasks,
114 suggested_reclaim_time, service_thread_.task_runner(), worker_observer,
115 ThreadGroup::WorkerEnvironment::NONE,
116 /* synchronous_thread_start_for_testing=*/false, may_block_threshold);
117 }
118
CreateAndStartThreadGroup(TimeDelta suggested_reclaim_time=TimeDelta::Max (),size_t max_tasks=kMaxTasks,std::optional<int> max_best_effort_tasks=std::nullopt,WorkerThreadObserver * worker_observer=nullptr,std::optional<TimeDelta> may_block_threshold=std::nullopt)119 void CreateAndStartThreadGroup(
120 TimeDelta suggested_reclaim_time = TimeDelta::Max(),
121 size_t max_tasks = kMaxTasks,
122 std::optional<int> max_best_effort_tasks = std::nullopt,
123 WorkerThreadObserver* worker_observer = nullptr,
124 std::optional<TimeDelta> may_block_threshold = std::nullopt) {
125 CreateThreadGroup();
126 StartThreadGroup(suggested_reclaim_time, max_tasks, max_best_effort_tasks,
127 worker_observer, may_block_threshold);
128 }
129
130 Thread service_thread_;
131 TaskTracker task_tracker_;
132 std::unique_ptr<ThreadGroupSemaphore> thread_group_;
133 DelayedTaskManager delayed_task_manager_;
134 TrackedRefFactory<ThreadGroup::Delegate> tracked_ref_factory_;
135 test::MockPooledTaskRunnerDelegate mock_pooled_task_runner_delegate_ = {
136 task_tracker_.GetTrackedRef(), &delayed_task_manager_};
137
138 private:
139 // ThreadGroup::Delegate:
GetThreadGroupForTraits(const TaskTraits & traits)140 ThreadGroup* GetThreadGroupForTraits(const TaskTraits& traits) override {
141 return thread_group_.get();
142 }
143 };
144
145 class ThreadGroupSemaphoreImplTest : public ThreadGroupSemaphoreImplTestBase,
146 public testing::Test {
147 public:
148 ThreadGroupSemaphoreImplTest(const ThreadGroupSemaphoreImplTest&) = delete;
149 ThreadGroupSemaphoreImplTest& operator=(const ThreadGroupSemaphoreImplTest&) =
150 delete;
151
152 protected:
153 ThreadGroupSemaphoreImplTest() = default;
154
SetUp()155 void SetUp() override { CreateAndStartThreadGroup(); }
156
TearDown()157 void TearDown() override {
158 ThreadGroupSemaphoreImplTestBase::CommonTearDown();
159 }
160 };
161
162 class ThreadGroupSemaphoreImplTestParam
163 : public ThreadGroupSemaphoreImplTestBase,
164 public testing::TestWithParam<TaskSourceExecutionMode> {
165 public:
166 ThreadGroupSemaphoreImplTestParam(const ThreadGroupSemaphoreImplTestParam&) =
167 delete;
168 ThreadGroupSemaphoreImplTestParam& operator=(
169 const ThreadGroupSemaphoreImplTestParam&) = delete;
170
171 protected:
172 ThreadGroupSemaphoreImplTestParam() = default;
173
SetUp()174 void SetUp() override { CreateAndStartThreadGroup(); }
175
TearDown()176 void TearDown() override {
177 ThreadGroupSemaphoreImplTestBase::CommonTearDown();
178 }
179 };
180
181 using PostNestedTask = test::TestTaskFactory::PostNestedTask;
182
183 class ThreadPostingTasksWaitIdle : public SimpleThread {
184 public:
185 // Constructs a thread that posts tasks to |thread_group| through an
186 // |execution_mode| task runner. The thread waits until all workers in
187 // |thread_group| are idle before posting a new task.
ThreadPostingTasksWaitIdle(ThreadGroupSemaphore * thread_group,test::MockPooledTaskRunnerDelegate * mock_pooled_task_runner_delegate_,TaskSourceExecutionMode execution_mode)188 ThreadPostingTasksWaitIdle(
189 ThreadGroupSemaphore* thread_group,
190 test::MockPooledTaskRunnerDelegate* mock_pooled_task_runner_delegate_,
191 TaskSourceExecutionMode execution_mode)
192 : SimpleThread("ThreadPostingTasksWaitIdle"),
193 thread_group_(thread_group),
194 factory_(CreatePooledTaskRunnerWithExecutionMode(
195 execution_mode,
196 mock_pooled_task_runner_delegate_),
197 execution_mode) {
198 DCHECK(thread_group_);
199 }
200 ThreadPostingTasksWaitIdle(const ThreadPostingTasksWaitIdle&) = delete;
201 ThreadPostingTasksWaitIdle& operator=(const ThreadPostingTasksWaitIdle&) =
202 delete;
203
factory() const204 const test::TestTaskFactory* factory() const { return &factory_; }
205
206 private:
Run()207 void Run() override {
208 for (size_t i = 0; i < kNumTasksPostedPerThread; ++i) {
209 thread_group_->WaitForAllWorkersIdleForTesting();
210 EXPECT_TRUE(factory_.PostTask(PostNestedTask::NO, OnceClosure()));
211 }
212 }
213
214 const raw_ptr<ThreadGroupSemaphore> thread_group_;
215 const scoped_refptr<TaskRunner> task_runner_;
216 test::TestTaskFactory factory_;
217 };
218
219 } // namespace
220
TEST_P(ThreadGroupSemaphoreImplTestParam,PostTasksWaitAllWorkersIdle)221 TEST_P(ThreadGroupSemaphoreImplTestParam, PostTasksWaitAllWorkersIdle) {
222 // Create threads to post tasks. To verify that workers can sleep and be woken
223 // up when new tasks are posted, wait for all workers to become idle before
224 // posting a new task.
225 std::vector<std::unique_ptr<ThreadPostingTasksWaitIdle>>
226 threads_posting_tasks;
227 for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
228 threads_posting_tasks.push_back(
229 std::make_unique<ThreadPostingTasksWaitIdle>(
230 thread_group_.get(), &mock_pooled_task_runner_delegate_,
231 GetParam()));
232 threads_posting_tasks.back()->Start();
233 }
234
235 // Wait for all tasks to run.
236 for (const auto& thread_posting_tasks : threads_posting_tasks) {
237 thread_posting_tasks->Join();
238 thread_posting_tasks->factory()->WaitForAllTasksToRun();
239 }
240
241 // Wait until all workers are idle to be sure that no task accesses its
242 // TestTaskFactory after |thread_posting_tasks| is destroyed.
243 thread_group_->WaitForAllWorkersIdleForTesting();
244 }
245
TEST_P(ThreadGroupSemaphoreImplTestParam,PostTasksWithOneAvailableWorker)246 TEST_P(ThreadGroupSemaphoreImplTestParam, PostTasksWithOneAvailableWorker) {
247 // Post blocking tasks to keep all workers busy except one until |event| is
248 // signaled. Use different factories so that tasks are added to different
249 // sequences and can run simultaneously when the execution mode is SEQUENCED.
250 TestWaitableEvent event;
251 std::vector<std::unique_ptr<test::TestTaskFactory>> blocked_task_factories;
252 for (size_t i = 0; i < (kMaxTasks - 1); ++i) {
253 blocked_task_factories.push_back(std::make_unique<test::TestTaskFactory>(
254 CreatePooledTaskRunnerWithExecutionMode(
255 GetParam(), &mock_pooled_task_runner_delegate_),
256 GetParam()));
257 EXPECT_TRUE(blocked_task_factories.back()->PostTask(
258 PostNestedTask::NO,
259 BindOnce(&TestWaitableEvent::Wait, Unretained(&event))));
260 blocked_task_factories.back()->WaitForAllTasksToRun();
261 }
262
263 // Post |kNumTasksPostedPerThread| tasks that should all run despite the fact
264 // that only one worker in |thread_group_| isn't busy.
265 test::TestTaskFactory short_task_factory(
266 CreatePooledTaskRunnerWithExecutionMode(
267 GetParam(), &mock_pooled_task_runner_delegate_),
268 GetParam());
269 for (size_t i = 0; i < kNumTasksPostedPerThread; ++i) {
270 EXPECT_TRUE(short_task_factory.PostTask(PostNestedTask::NO, OnceClosure()));
271 }
272 short_task_factory.WaitForAllTasksToRun();
273
274 // Release tasks waiting on |event|.
275 event.Signal();
276
277 // Wait until all workers are idle to be sure that no task accesses
278 // its TestTaskFactory after it is destroyed.
279 thread_group_->WaitForAllWorkersIdleForTesting();
280 }
281
TEST_P(ThreadGroupSemaphoreImplTestParam,Saturate)282 TEST_P(ThreadGroupSemaphoreImplTestParam, Saturate) {
283 // Verify that it is possible to have |kMaxTasks| tasks/sequences running
284 // simultaneously. Use different factories so that the blocking tasks are
285 // added to different sequences and can run simultaneously when the execution
286 // mode is SEQUENCED.
287 TestWaitableEvent event;
288 std::vector<std::unique_ptr<test::TestTaskFactory>> factories;
289 for (size_t i = 0; i < kMaxTasks; ++i) {
290 factories.push_back(std::make_unique<test::TestTaskFactory>(
291 CreatePooledTaskRunnerWithExecutionMode(
292 GetParam(), &mock_pooled_task_runner_delegate_),
293 GetParam()));
294 EXPECT_TRUE(factories.back()->PostTask(
295 PostNestedTask::NO,
296 BindOnce(&TestWaitableEvent::Wait, Unretained(&event))));
297 factories.back()->WaitForAllTasksToRun();
298 }
299
300 // Release tasks waiting on |event|.
301 event.Signal();
302
303 // Wait until all workers are idle to be sure that no task accesses
304 // its TestTaskFactory after it is destroyed.
305 thread_group_->WaitForAllWorkersIdleForTesting();
306 }
307
308 // Verifies that ShouldYield() returns true for priorities lower than the
309 // highest priority pending while the thread group is flooded with USER_VISIBLE
310 // tasks.
TEST_F(ThreadGroupSemaphoreImplTest,ShouldYieldFloodedUserVisible)311 TEST_F(ThreadGroupSemaphoreImplTest, ShouldYieldFloodedUserVisible) {
312 TestWaitableEvent threads_running;
313 TestWaitableEvent threads_continue;
314
315 // Saturate workers with USER_VISIBLE tasks to ensure ShouldYield() returns
316 // true when a tasks of higher priority is posted.
317 RepeatingClosure threads_running_barrier = BarrierClosure(
318 kMaxTasks,
319 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
320
321 auto job_task = base::MakeRefCounted<test::MockJobTask>(
322 BindLambdaForTesting(
323 [&threads_running_barrier, &threads_continue](JobDelegate* delegate) {
324 threads_running_barrier.Run();
325 threads_continue.Wait();
326 }),
327 /* num_tasks_to_run */ kMaxTasks);
328 scoped_refptr<JobTaskSource> task_source =
329 job_task->GetJobTaskSource(FROM_HERE, {TaskPriority::USER_VISIBLE},
330 &mock_pooled_task_runner_delegate_);
331 task_source->NotifyConcurrencyIncrease();
332
333 threads_running.Wait();
334
335 // Posting a BEST_EFFORT task should not cause any other tasks to yield.
336 // Once this task gets to run, no other task needs to yield.
337 // Note: This is only true because this test is using a single ThreadGroup.
338 // Under the ThreadPool this wouldn't be racy because BEST_EFFORT tasks
339 // run in an independent ThreadGroup.
340 test::CreatePooledTaskRunner({TaskPriority::BEST_EFFORT},
341 &mock_pooled_task_runner_delegate_)
342 ->PostTask(
343 FROM_HERE, BindLambdaForTesting([&]() {
344 EXPECT_FALSE(thread_group_->ShouldYield(
345 {TaskPriority::BEST_EFFORT, TimeTicks(), /* worker_count=*/1}));
346 }));
347 // A BEST_EFFORT task with more workers shouldn't have to yield.
348 EXPECT_FALSE(thread_group_->ShouldYield(
349 {TaskPriority::BEST_EFFORT, TimeTicks(), /* worker_count=*/2}));
350 EXPECT_FALSE(thread_group_->ShouldYield(
351 {TaskPriority::BEST_EFFORT, TimeTicks(), /* worker_count=*/0}));
352 EXPECT_FALSE(thread_group_->ShouldYield(
353 {TaskPriority::USER_VISIBLE, TimeTicks(), /* worker_count=*/0}));
354 EXPECT_FALSE(thread_group_->ShouldYield(
355 {TaskPriority::USER_BLOCKING, TimeTicks(), /* worker_count=*/0}));
356
357 // Posting a USER_VISIBLE task should cause BEST_EFFORT and USER_VISIBLE with
358 // higher worker_count tasks to yield.
359 auto post_user_visible = [&]() {
360 test::CreatePooledTaskRunner({TaskPriority::USER_VISIBLE},
361 &mock_pooled_task_runner_delegate_)
362 ->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
363 EXPECT_FALSE(thread_group_->ShouldYield(
364 {TaskPriority::USER_VISIBLE, TimeTicks(),
365 /* worker_count=*/1}));
366 }));
367 };
368 // A USER_VISIBLE task with too many workers should yield.
369 post_user_visible();
370 EXPECT_TRUE(thread_group_->ShouldYield(
371 {TaskPriority::USER_VISIBLE, TimeTicks(), /* worker_count=*/2}));
372 post_user_visible();
373 EXPECT_TRUE(thread_group_->ShouldYield(
374 {TaskPriority::BEST_EFFORT, TimeTicks(), /* worker_count=*/0}));
375 post_user_visible();
376 EXPECT_FALSE(thread_group_->ShouldYield(
377 {TaskPriority::USER_VISIBLE, TimeTicks(), /* worker_count=*/1}));
378 EXPECT_FALSE(thread_group_->ShouldYield(
379 {TaskPriority::USER_BLOCKING, TimeTicks(), /* worker_count=*/0}));
380
381 // Posting a USER_BLOCKING task should cause BEST_EFFORT, USER_VISIBLE and
382 // USER_BLOCKING with higher worker_count tasks to yield.
383 auto post_user_blocking = [&]() {
384 test::CreatePooledTaskRunner({TaskPriority::USER_BLOCKING},
385 &mock_pooled_task_runner_delegate_)
386 ->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
387 // Once this task got to start, no other task needs to
388 // yield.
389 EXPECT_FALSE(thread_group_->ShouldYield(
390 {TaskPriority::USER_BLOCKING, TimeTicks(),
391 /* worker_count=*/1}));
392 }));
393 };
394 // A USER_BLOCKING task with too many workers should have to yield.
395 post_user_blocking();
396 EXPECT_TRUE(thread_group_->ShouldYield(
397 {TaskPriority::USER_BLOCKING, TimeTicks(), /* worker_count=*/2}));
398 post_user_blocking();
399 EXPECT_TRUE(thread_group_->ShouldYield(
400 {TaskPriority::BEST_EFFORT, TimeTicks(), /* worker_count=*/0}));
401 post_user_blocking();
402 EXPECT_TRUE(thread_group_->ShouldYield(
403 {TaskPriority::USER_VISIBLE, TimeTicks(), /* worker_count=*/0}));
404 post_user_blocking();
405 EXPECT_FALSE(thread_group_->ShouldYield(
406 {TaskPriority::USER_BLOCKING, TimeTicks(), /* worker_count=*/1}));
407
408 threads_continue.Signal();
409 task_tracker_.FlushForTesting();
410 }
411
412 INSTANTIATE_TEST_SUITE_P(Parallel,
413 ThreadGroupSemaphoreImplTestParam,
414 ::testing::Values(TaskSourceExecutionMode::kParallel));
415 INSTANTIATE_TEST_SUITE_P(
416 Sequenced,
417 ThreadGroupSemaphoreImplTestParam,
418 ::testing::Values(TaskSourceExecutionMode::kSequenced));
419
420 INSTANTIATE_TEST_SUITE_P(Job,
421 ThreadGroupSemaphoreImplTestParam,
422 ::testing::Values(TaskSourceExecutionMode::kJob));
423
424 namespace {
425
426 class ThreadGroupSemaphoreImplStartInBodyTest
427 : public ThreadGroupSemaphoreImplTest {
428 public:
SetUp()429 void SetUp() override {
430 CreateThreadGroup();
431 // Let the test start the thread group.
432 }
433 };
434
TaskPostedBeforeStart(PlatformThreadRef * platform_thread_ref,TestWaitableEvent * task_running,TestWaitableEvent * barrier)435 void TaskPostedBeforeStart(PlatformThreadRef* platform_thread_ref,
436 TestWaitableEvent* task_running,
437 TestWaitableEvent* barrier) {
438 *platform_thread_ref = PlatformThread::CurrentRef();
439 task_running->Signal();
440 barrier->Wait();
441 }
442
443 } // namespace
444
445 // Verify that 2 tasks posted before Start() to a ThreadGroupSemaphore with
446 // more than 2 workers run on different workers when Start() is called.
TEST_F(ThreadGroupSemaphoreImplStartInBodyTest,PostTasksBeforeStart)447 TEST_F(ThreadGroupSemaphoreImplStartInBodyTest, PostTasksBeforeStart) {
448 PlatformThreadRef task_1_thread_ref;
449 PlatformThreadRef task_2_thread_ref;
450 TestWaitableEvent task_1_running;
451 TestWaitableEvent task_2_running;
452
453 // This event is used to prevent a task from completing before the other task
454 // starts running. If that happened, both tasks could run on the same worker
455 // and this test couldn't verify that the correct number of workers were woken
456 // up.
457 TestWaitableEvent barrier;
458
459 test::CreatePooledTaskRunner({WithBaseSyncPrimitives()},
460 &mock_pooled_task_runner_delegate_)
461 ->PostTask(
462 FROM_HERE,
463 BindOnce(&TaskPostedBeforeStart, Unretained(&task_1_thread_ref),
464 Unretained(&task_1_running), Unretained(&barrier)));
465 test::CreatePooledTaskRunner({WithBaseSyncPrimitives()},
466 &mock_pooled_task_runner_delegate_)
467 ->PostTask(
468 FROM_HERE,
469 BindOnce(&TaskPostedBeforeStart, Unretained(&task_2_thread_ref),
470 Unretained(&task_2_running), Unretained(&barrier)));
471
472 // Workers should not be created and tasks should not run before the thread
473 // group is started.
474 EXPECT_EQ(0U, thread_group_->NumberOfWorkersForTesting());
475 EXPECT_FALSE(task_1_running.IsSignaled());
476 EXPECT_FALSE(task_2_running.IsSignaled());
477
478 StartThreadGroup(TimeDelta::Max(), kMaxTasks);
479
480 // Tasks should run shortly after the thread group is started.
481 task_1_running.Wait();
482 task_2_running.Wait();
483
484 // Tasks should run on different threads.
485 EXPECT_NE(task_1_thread_ref, task_2_thread_ref);
486
487 barrier.Signal();
488 task_tracker_.FlushForTesting();
489 }
490
491 // Verify that posting many tasks before Start will cause the number of workers
492 // to grow to |max_tasks_| after Start.
TEST_F(ThreadGroupSemaphoreImplStartInBodyTest,PostManyTasks)493 TEST_F(ThreadGroupSemaphoreImplStartInBodyTest, PostManyTasks) {
494 scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
495 {WithBaseSyncPrimitives()}, &mock_pooled_task_runner_delegate_);
496 constexpr size_t kNumTasksPosted = 2 * kMaxTasks;
497
498 TestWaitableEvent threads_running;
499 TestWaitableEvent threads_continue;
500
501 RepeatingClosure threads_running_barrier = BarrierClosure(
502 kMaxTasks,
503 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
504 // Posting these tasks should cause new workers to be created.
505 for (size_t i = 0; i < kMaxTasks; ++i) {
506 task_runner->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
507 threads_running_barrier.Run();
508 threads_continue.Wait();
509 }));
510 }
511 // Post the remaining |kNumTasksPosted - kMaxTasks| tasks, don't wait for them
512 // as they'll be blocked behind the above kMaxtasks.
513 for (size_t i = kMaxTasks; i < kNumTasksPosted; ++i) {
514 task_runner->PostTask(FROM_HERE, DoNothing());
515 }
516
517 EXPECT_EQ(0U, thread_group_->NumberOfWorkersForTesting());
518
519 StartThreadGroup(TimeDelta::Max(), kMaxTasks);
520 EXPECT_GT(thread_group_->NumberOfWorkersForTesting(), 0U);
521 EXPECT_EQ(kMaxTasks, thread_group_->GetMaxTasksForTesting());
522
523 threads_running.Wait();
524 EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(),
525 thread_group_->GetMaxTasksForTesting());
526 threads_continue.Signal();
527 task_tracker_.FlushForTesting();
528 }
529
530 namespace {
531
532 class BackgroundThreadGroupSemaphoreTest : public ThreadGroupSemaphoreImplTest {
533 public:
CreateAndStartThreadGroup(TimeDelta suggested_reclaim_time=TimeDelta::Max (),size_t max_tasks=kMaxTasks,std::optional<int> max_best_effort_tasks=std::nullopt,WorkerThreadObserver * worker_observer=nullptr,std::optional<TimeDelta> may_block_threshold=std::nullopt)534 void CreateAndStartThreadGroup(
535 TimeDelta suggested_reclaim_time = TimeDelta::Max(),
536 size_t max_tasks = kMaxTasks,
537 std::optional<int> max_best_effort_tasks = std::nullopt,
538 WorkerThreadObserver* worker_observer = nullptr,
539 std::optional<TimeDelta> may_block_threshold = std::nullopt) {
540 if (!CanUseBackgroundThreadTypeForWorkerThread()) {
541 return;
542 }
543 CreateThreadGroup(ThreadType::kBackground);
544 StartThreadGroup(suggested_reclaim_time, max_tasks, max_best_effort_tasks,
545 worker_observer, may_block_threshold);
546 }
547
SetUp()548 void SetUp() override { CreateAndStartThreadGroup(); }
549 };
550
551 } // namespace
552
553 // Verify that ScopedBlockingCall updates thread type when necessary per
554 // shutdown state.
TEST_F(BackgroundThreadGroupSemaphoreTest,UpdatePriorityBlockingStarted)555 TEST_F(BackgroundThreadGroupSemaphoreTest, UpdatePriorityBlockingStarted) {
556 if (!CanUseBackgroundThreadTypeForWorkerThread()) {
557 return;
558 }
559
560 const scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
561 {MayBlock(), WithBaseSyncPrimitives(), TaskPriority::BEST_EFFORT},
562 &mock_pooled_task_runner_delegate_);
563
564 TestWaitableEvent threads_running;
565 RepeatingClosure threads_running_barrier = BarrierClosure(
566 kMaxTasks,
567 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
568
569 TestWaitableEvent blocking_threads_continue;
570
571 for (size_t i = 0; i < kMaxTasks; ++i) {
572 task_runner->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
573 EXPECT_EQ(ThreadType::kBackground,
574 PlatformThread::GetCurrentThreadType());
575 {
576 // ScopedBlockingCall before shutdown doesn't
577 // affect priority.
578 ScopedBlockingCall scoped_blocking_call(
579 FROM_HERE, BlockingType::MAY_BLOCK);
580 EXPECT_EQ(ThreadType::kBackground,
581 PlatformThread::GetCurrentThreadType());
582 }
583 threads_running_barrier.Run();
584 blocking_threads_continue.Wait();
585 // This is reached after StartShutdown(), at which
586 // point we expect ScopedBlockingCall to update
587 // thread priority.
588 ScopedBlockingCall scoped_blocking_call(
589 FROM_HERE, BlockingType::MAY_BLOCK);
590 EXPECT_EQ(ThreadType::kDefault,
591 PlatformThread::GetCurrentThreadType());
592 }));
593 }
594 threads_running.Wait();
595
596 task_tracker_.StartShutdown();
597 blocking_threads_continue.Signal();
598 task_tracker_.FlushForTesting();
599 }
600
601 namespace {
602
603 class ThreadGroupSemaphoreStandbyPolicyTest
604 : public ThreadGroupSemaphoreImplTestBase,
605 public testing::Test {
606 public:
607 ThreadGroupSemaphoreStandbyPolicyTest() = default;
608 ThreadGroupSemaphoreStandbyPolicyTest(
609 const ThreadGroupSemaphoreStandbyPolicyTest&) = delete;
610 ThreadGroupSemaphoreStandbyPolicyTest& operator=(
611 const ThreadGroupSemaphoreStandbyPolicyTest&) = delete;
612
SetUp()613 void SetUp() override {
614 CreateAndStartThreadGroup(kReclaimTimeForCleanupTests);
615 }
616
TearDown()617 void TearDown() override {
618 ThreadGroupSemaphoreImplTestBase::CommonTearDown();
619 }
620 };
621
622 } // namespace
623
TEST_F(ThreadGroupSemaphoreStandbyPolicyTest,InitOne)624 TEST_F(ThreadGroupSemaphoreStandbyPolicyTest, InitOne) {
625 EXPECT_EQ(1U, thread_group_->NumberOfWorkersForTesting());
626 }
627
628 namespace {
629
630 enum class OptionalBlockingType {
631 NO_BLOCK,
632 MAY_BLOCK,
633 WILL_BLOCK,
634 };
635
636 struct NestedBlockingType {
NestedBlockingTypebase::internal::__anon1e2f1e470d11::NestedBlockingType637 NestedBlockingType(BlockingType first_in,
638 OptionalBlockingType second_in,
639 BlockingType behaves_as_in)
640 : first(first_in), second(second_in), behaves_as(behaves_as_in) {}
641
642 BlockingType first;
643 OptionalBlockingType second;
644 BlockingType behaves_as;
645 };
646
647 class NestedScopedBlockingCall {
648 public:
NestedScopedBlockingCall(const NestedBlockingType & nested_blocking_type)649 explicit NestedScopedBlockingCall(
650 const NestedBlockingType& nested_blocking_type)
651 : first_scoped_blocking_call_(FROM_HERE, nested_blocking_type.first),
652 second_scoped_blocking_call_(
653 nested_blocking_type.second == OptionalBlockingType::WILL_BLOCK
654 ? std::make_unique<ScopedBlockingCall>(FROM_HERE,
655 BlockingType::WILL_BLOCK)
656 : (nested_blocking_type.second ==
657 OptionalBlockingType::MAY_BLOCK
658 ? std::make_unique<ScopedBlockingCall>(
659 FROM_HERE,
660 BlockingType::MAY_BLOCK)
661 : nullptr)) {}
662 NestedScopedBlockingCall(const NestedScopedBlockingCall&) = delete;
663 NestedScopedBlockingCall& operator=(const NestedScopedBlockingCall&) = delete;
664
665 private:
666 ScopedBlockingCall first_scoped_blocking_call_;
667 std::unique_ptr<ScopedBlockingCall> second_scoped_blocking_call_;
668 };
669
670 } // namespace
671
672 class ThreadGroupSemaphoreBlockingTest
673 : public ThreadGroupSemaphoreImplTestBase,
674 public testing::TestWithParam<NestedBlockingType> {
675 public:
676 ThreadGroupSemaphoreBlockingTest() = default;
677 ThreadGroupSemaphoreBlockingTest(const ThreadGroupSemaphoreBlockingTest&) =
678 delete;
679 ThreadGroupSemaphoreBlockingTest& operator=(
680 const ThreadGroupSemaphoreBlockingTest&) = delete;
681
ParamInfoToString(::testing::TestParamInfo<NestedBlockingType> param_info)682 static std::string ParamInfoToString(
683 ::testing::TestParamInfo<NestedBlockingType> param_info) {
684 std::string str = param_info.param.first == BlockingType::MAY_BLOCK
685 ? "MAY_BLOCK"
686 : "WILL_BLOCK";
687 if (param_info.param.second == OptionalBlockingType::MAY_BLOCK) {
688 str += "_MAY_BLOCK";
689 } else if (param_info.param.second == OptionalBlockingType::WILL_BLOCK) {
690 str += "_WILL_BLOCK";
691 }
692 return str;
693 }
694
TearDown()695 void TearDown() override {
696 ThreadGroupSemaphoreImplTestBase::CommonTearDown();
697 }
698
699 protected:
700 // Saturates the thread group with a task that first blocks, waits to be
701 // unblocked, then exits.
SaturateWithBlockingTasks(const NestedBlockingType & nested_blocking_type,TaskPriority priority=TaskPriority::USER_BLOCKING)702 void SaturateWithBlockingTasks(
703 const NestedBlockingType& nested_blocking_type,
704 TaskPriority priority = TaskPriority::USER_BLOCKING) {
705 TestWaitableEvent threads_running;
706
707 const scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
708 {MayBlock(), WithBaseSyncPrimitives(), priority},
709 &mock_pooled_task_runner_delegate_);
710
711 RepeatingClosure threads_running_barrier = BarrierClosure(
712 kMaxTasks,
713 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
714
715 for (size_t i = 0; i < kMaxTasks; ++i) {
716 task_runner->PostTask(
717 FROM_HERE, BindLambdaForTesting([this, &threads_running_barrier,
718 nested_blocking_type]() {
719 NestedScopedBlockingCall nested_scoped_blocking_call(
720 nested_blocking_type);
721 threads_running_barrier.Run();
722 blocking_threads_continue_.Wait();
723 }));
724 }
725 threads_running.Wait();
726 }
727
728 // Saturates the thread group with a task that waits for other tasks without
729 // entering a ScopedBlockingCall, then exits.
SaturateWithBusyTasks(TaskPriority priority=TaskPriority::USER_BLOCKING,TaskShutdownBehavior shutdown_behavior=TaskShutdownBehavior::SKIP_ON_SHUTDOWN)730 void SaturateWithBusyTasks(
731 TaskPriority priority = TaskPriority::USER_BLOCKING,
732 TaskShutdownBehavior shutdown_behavior =
733 TaskShutdownBehavior::SKIP_ON_SHUTDOWN) {
734 TestWaitableEvent threads_running;
735
736 const scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
737 {MayBlock(), WithBaseSyncPrimitives(), priority, shutdown_behavior},
738 &mock_pooled_task_runner_delegate_);
739
740 RepeatingClosure threads_running_barrier = BarrierClosure(
741 kMaxTasks,
742 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
743 // Posting these tasks should cause new workers to be created.
744 for (size_t i = 0; i < kMaxTasks; ++i) {
745 task_runner->PostTask(
746 FROM_HERE, BindLambdaForTesting([this, &threads_running_barrier]() {
747 threads_running_barrier.Run();
748 busy_threads_continue_.Wait();
749 }));
750 }
751 threads_running.Wait();
752 }
753
754 // Returns how long we can expect a change to |max_tasks_| to occur
755 // after a task has become blocked.
GetMaxTasksChangeSleepTime()756 TimeDelta GetMaxTasksChangeSleepTime() {
757 return std::max(thread_group_->blocked_workers_poll_period_for_testing(),
758 thread_group_->may_block_threshold_for_testing()) +
759 TestTimeouts::tiny_timeout();
760 }
761
762 // Waits indefinitely, until |thread_group_|'s max tasks increases to
763 // |expected_max_tasks|.
ExpectMaxTasksIncreasesTo(size_t expected_max_tasks)764 void ExpectMaxTasksIncreasesTo(size_t expected_max_tasks) {
765 size_t max_tasks = thread_group_->GetMaxTasksForTesting();
766 while (max_tasks != expected_max_tasks) {
767 PlatformThread::Sleep(GetMaxTasksChangeSleepTime());
768 size_t new_max_tasks = thread_group_->GetMaxTasksForTesting();
769 ASSERT_GE(new_max_tasks, max_tasks);
770 max_tasks = new_max_tasks;
771 }
772 }
773
774 // Unblocks tasks posted by SaturateWithBlockingTasks().
UnblockBlockingTasks()775 void UnblockBlockingTasks() { blocking_threads_continue_.Signal(); }
776
777 // Unblocks tasks posted by SaturateWithBusyTasks().
UnblockBusyTasks()778 void UnblockBusyTasks() { busy_threads_continue_.Signal(); }
779
780 const scoped_refptr<TaskRunner> task_runner_ =
781 test::CreatePooledTaskRunner({MayBlock(), WithBaseSyncPrimitives()},
782 &mock_pooled_task_runner_delegate_);
783
784 private:
785 TestWaitableEvent blocking_threads_continue_;
786 TestWaitableEvent busy_threads_continue_;
787 };
788
789 // Verify that SaturateWithBlockingTasks() causes max tasks to increase and
790 // creates a worker if needed. Also verify that UnblockBlockingTasks() decreases
791 // max tasks after an increase.
TEST_P(ThreadGroupSemaphoreBlockingTest,ThreadBlockedUnblocked)792 TEST_P(ThreadGroupSemaphoreBlockingTest, ThreadBlockedUnblocked) {
793 CreateAndStartThreadGroup();
794
795 ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
796
797 SaturateWithBlockingTasks(GetParam());
798
799 // Forces |kMaxTasks| extra workers to be instantiated by posting tasks. This
800 // should not block forever.
801 SaturateWithBusyTasks();
802
803 EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
804
805 UnblockBusyTasks();
806 UnblockBlockingTasks();
807 task_tracker_.FlushForTesting();
808 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
809 }
810
811 // Verify that SaturateWithBlockingTasks() of BEST_EFFORT tasks causes max best
812 // effort tasks to increase and creates a worker if needed. Also verify that
813 // UnblockBlockingTasks() decreases max best effort tasks after an increase.
TEST_P(ThreadGroupSemaphoreBlockingTest,ThreadBlockedUnblockedBestEffort)814 TEST_P(ThreadGroupSemaphoreBlockingTest, ThreadBlockedUnblockedBestEffort) {
815 CreateAndStartThreadGroup();
816
817 ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
818 ASSERT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
819
820 SaturateWithBlockingTasks(GetParam(), TaskPriority::BEST_EFFORT);
821
822 // Forces |kMaxTasks| extra workers to be instantiated by posting tasks. This
823 // should not block forever.
824 SaturateWithBusyTasks(TaskPriority::BEST_EFFORT);
825
826 EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
827
828 UnblockBusyTasks();
829 UnblockBlockingTasks();
830 task_tracker_.FlushForTesting();
831 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
832 EXPECT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
833 }
834
835 // Verify that flooding the thread group with more BEST_EFFORT tasks than
836 // kMaxBestEffortTasks doesn't prevent USER_VISIBLE tasks from running.
TEST_P(ThreadGroupSemaphoreBlockingTest,TooManyBestEffortTasks)837 TEST_P(ThreadGroupSemaphoreBlockingTest, TooManyBestEffortTasks) {
838 constexpr size_t kMaxBestEffortTasks = kMaxTasks / 2;
839
840 CreateAndStartThreadGroup(TimeDelta::Max(), kMaxTasks, kMaxBestEffortTasks);
841
842 TestWaitableEvent threads_continue;
843 {
844 TestWaitableEvent entered_blocking_scope;
845 RepeatingClosure entered_blocking_scope_barrier = BarrierClosure(
846 kMaxBestEffortTasks + 1, BindOnce(&TestWaitableEvent::Signal,
847 Unretained(&entered_blocking_scope)));
848 TestWaitableEvent exit_blocking_scope;
849
850 TestWaitableEvent threads_running;
851 RepeatingClosure threads_running_barrier = BarrierClosure(
852 kMaxBestEffortTasks + 1,
853 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
854
855 const auto best_effort_task_runner =
856 test::CreatePooledTaskRunner({TaskPriority::BEST_EFFORT, MayBlock()},
857 &mock_pooled_task_runner_delegate_);
858 for (size_t i = 0; i < kMaxBestEffortTasks + 1; ++i) {
859 best_effort_task_runner->PostTask(
860 FROM_HERE, BindLambdaForTesting([&]() {
861 {
862 NestedScopedBlockingCall scoped_blocking_call(GetParam());
863 entered_blocking_scope_barrier.Run();
864 exit_blocking_scope.Wait();
865 }
866 threads_running_barrier.Run();
867 threads_continue.Wait();
868 }));
869 }
870 entered_blocking_scope.Wait();
871 exit_blocking_scope.Signal();
872 threads_running.Wait();
873 }
874
875 // At this point, kMaxBestEffortTasks + 1 threads are running (plus
876 // potentially the idle thread), but max_task and max_best_effort_task are
877 // back to normal.
878 EXPECT_GE(thread_group_->NumberOfWorkersForTesting(),
879 kMaxBestEffortTasks + 1);
880 EXPECT_LE(thread_group_->NumberOfWorkersForTesting(),
881 kMaxBestEffortTasks + 2);
882 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
883
884 TestWaitableEvent threads_running;
885 task_runner_->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
886 threads_running.Signal();
887 threads_continue.Wait();
888 }));
889
890 // This should not block forever.
891 threads_running.Wait();
892
893 EXPECT_GE(thread_group_->NumberOfWorkersForTesting(),
894 kMaxBestEffortTasks + 2);
895 EXPECT_LE(thread_group_->NumberOfWorkersForTesting(),
896 kMaxBestEffortTasks + 3);
897 threads_continue.Signal();
898
899 task_tracker_.FlushForTesting();
900 }
901
902 // Verify that tasks posted in a saturated thread group before a
903 // ScopedBlockingCall will execute after ScopedBlockingCall is instantiated.
TEST_P(ThreadGroupSemaphoreBlockingTest,PostBeforeBlocking)904 TEST_P(ThreadGroupSemaphoreBlockingTest, PostBeforeBlocking) {
905 CreateAndStartThreadGroup();
906
907 TestWaitableEvent thread_running(WaitableEvent::ResetPolicy::AUTOMATIC);
908 TestWaitableEvent thread_can_block;
909 TestWaitableEvent threads_continue;
910
911 for (size_t i = 0; i < kMaxTasks; ++i) {
912 task_runner_->PostTask(
913 FROM_HERE,
914 BindOnce(
915 [](const NestedBlockingType& nested_blocking_type,
916 TestWaitableEvent* thread_running,
917 TestWaitableEvent* thread_can_block,
918 TestWaitableEvent* threads_continue) {
919 thread_running->Signal();
920 thread_can_block->Wait();
921
922 NestedScopedBlockingCall nested_scoped_blocking_call(
923 nested_blocking_type);
924 threads_continue->Wait();
925 },
926 GetParam(), Unretained(&thread_running),
927 Unretained(&thread_can_block), Unretained(&threads_continue)));
928 thread_running.Wait();
929 }
930
931 // All workers should be occupied and the thread group should be saturated.
932 // Workers have not entered ScopedBlockingCall yet.
933 EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), kMaxTasks);
934 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
935
936 TestWaitableEvent extra_threads_running;
937 TestWaitableEvent extra_threads_continue;
938 RepeatingClosure extra_threads_running_barrier = BarrierClosure(
939 kMaxTasks,
940 BindOnce(&TestWaitableEvent::Signal, Unretained(&extra_threads_running)));
941 for (size_t i = 0; i < kMaxTasks; ++i) {
942 task_runner_->PostTask(
943 FROM_HERE, BindOnce(
944 [](RepeatingClosure* extra_threads_running_barrier,
945 TestWaitableEvent* extra_threads_continue) {
946 extra_threads_running_barrier->Run();
947 extra_threads_continue->Wait();
948 },
949 Unretained(&extra_threads_running_barrier),
950 Unretained(&extra_threads_continue)));
951 }
952
953 // Allow tasks to enter ScopedBlockingCall. Workers should be created for the
954 // tasks we just posted.
955 thread_can_block.Signal();
956
957 // Should not block forever.
958 extra_threads_running.Wait();
959 EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
960 extra_threads_continue.Signal();
961
962 threads_continue.Signal();
963 task_tracker_.FlushForTesting();
964 }
965
966 // Verify that workers become idle when the thread group is over-capacity and
967 // that those workers do no work.
TEST_P(ThreadGroupSemaphoreBlockingTest,WorkersIdleWhenOverCapacity)968 TEST_P(ThreadGroupSemaphoreBlockingTest, WorkersIdleWhenOverCapacity) {
969 CreateAndStartThreadGroup();
970
971 ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
972
973 SaturateWithBlockingTasks(GetParam());
974
975 // Forces |kMaxTasks| extra workers to be instantiated by posting tasks.
976 SaturateWithBusyTasks();
977
978 ASSERT_EQ(thread_group_->NumberOfIdleWorkersForTesting(), 0U);
979 EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
980
981 AtomicFlag is_exiting;
982 // These tasks should not get executed until after other tasks become
983 // unblocked.
984 for (size_t i = 0; i < kMaxTasks; ++i) {
985 task_runner_->PostTask(FROM_HERE, BindOnce(
986 [](AtomicFlag* is_exiting) {
987 EXPECT_TRUE(is_exiting->IsSet());
988 },
989 Unretained(&is_exiting)));
990 }
991
992 // The original |kMaxTasks| will finish their tasks after being unblocked.
993 // There will be work in the work queue, but the thread group should now be
994 // over-capacity and workers will become idle.
995 UnblockBlockingTasks();
996 thread_group_->WaitForWorkersIdleForTesting(kMaxTasks);
997 EXPECT_EQ(thread_group_->NumberOfIdleWorkersForTesting(), kMaxTasks);
998
999 // Posting more tasks should not cause workers idle from the thread group
1000 // being over capacity to begin doing work.
1001 for (size_t i = 0; i < kMaxTasks; ++i) {
1002 task_runner_->PostTask(FROM_HERE, BindOnce(
1003 [](AtomicFlag* is_exiting) {
1004 EXPECT_TRUE(is_exiting->IsSet());
1005 },
1006 Unretained(&is_exiting)));
1007 }
1008
1009 // Give time for those idle workers to possibly do work (which should not
1010 // happen).
1011 PlatformThread::Sleep(TestTimeouts::tiny_timeout());
1012
1013 is_exiting.Set();
1014 // Unblocks the new workers.
1015 UnblockBusyTasks();
1016 task_tracker_.FlushForTesting();
1017 }
1018
1019 // Verify that an increase of max tasks with SaturateWithBlockingTasks()
1020 // increases the number of tasks that can run before ShouldYield returns true.
TEST_P(ThreadGroupSemaphoreBlockingTest,ThreadBlockedUnblockedShouldYield)1021 TEST_P(ThreadGroupSemaphoreBlockingTest, ThreadBlockedUnblockedShouldYield) {
1022 CreateAndStartThreadGroup();
1023
1024 ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1025
1026 EXPECT_FALSE(
1027 thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
1028 SaturateWithBlockingTasks(GetParam());
1029 EXPECT_FALSE(
1030 thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
1031
1032 // Forces |kMaxTasks| extra workers to be instantiated by posting tasks. This
1033 // should not block forever.
1034 SaturateWithBusyTasks();
1035
1036 // All tasks can run, hence ShouldYield returns false.
1037 EXPECT_FALSE(
1038 thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
1039
1040 // Post a USER_VISIBLE task that can't run since workers are saturated. This
1041 // should cause BEST_EFFORT tasks to yield.
1042 test::CreatePooledTaskRunner({TaskPriority::USER_VISIBLE},
1043 &mock_pooled_task_runner_delegate_)
1044 ->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
1045 EXPECT_FALSE(thread_group_->ShouldYield(
1046 {TaskPriority::BEST_EFFORT, TimeTicks()}));
1047 }));
1048 EXPECT_TRUE(
1049 thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
1050
1051 // Post a USER_BLOCKING task that can't run since workers are saturated. This
1052 // should cause USER_VISIBLE tasks to yield.
1053 test::CreatePooledTaskRunner({TaskPriority::USER_BLOCKING},
1054 &mock_pooled_task_runner_delegate_)
1055 ->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
1056 EXPECT_FALSE(thread_group_->ShouldYield(
1057 {TaskPriority::USER_VISIBLE, TimeTicks()}));
1058 }));
1059 EXPECT_TRUE(
1060 thread_group_->ShouldYield({TaskPriority::USER_VISIBLE, TimeTicks()}));
1061
1062 UnblockBusyTasks();
1063 UnblockBlockingTasks();
1064 task_tracker_.FlushForTesting();
1065 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1066 }
1067
1068 INSTANTIATE_TEST_SUITE_P(
1069 All,
1070 ThreadGroupSemaphoreBlockingTest,
1071 ::testing::Values(NestedBlockingType(BlockingType::MAY_BLOCK,
1072 OptionalBlockingType::NO_BLOCK,
1073 BlockingType::MAY_BLOCK),
1074 NestedBlockingType(BlockingType::WILL_BLOCK,
1075 OptionalBlockingType::NO_BLOCK,
1076 BlockingType::WILL_BLOCK),
1077 NestedBlockingType(BlockingType::MAY_BLOCK,
1078 OptionalBlockingType::WILL_BLOCK,
1079 BlockingType::WILL_BLOCK),
1080 NestedBlockingType(BlockingType::WILL_BLOCK,
1081 OptionalBlockingType::MAY_BLOCK,
1082 BlockingType::WILL_BLOCK)),
1083 ThreadGroupSemaphoreBlockingTest::ParamInfoToString);
1084
1085 // Verify that if a thread enters the scope of a MAY_BLOCK ScopedBlockingCall,
1086 // but exits the scope before the MayBlock threshold is reached, that the max
1087 // tasks does not increase.
TEST_F(ThreadGroupSemaphoreBlockingTest,ThreadBlockUnblockPremature)1088 TEST_F(ThreadGroupSemaphoreBlockingTest, ThreadBlockUnblockPremature) {
1089 // Create a thread group with an infinite MayBlock threshold so that a
1090 // MAY_BLOCK ScopedBlockingCall never increases the max tasks.
1091 CreateAndStartThreadGroup(TimeDelta::Max(), // |suggested_reclaim_time|
1092 kMaxTasks, // |max_tasks|
1093 std::nullopt, // |max_best_effort_tasks|
1094 nullptr, // |worker_observer|
1095 TimeDelta::Max()); // |may_block_threshold|
1096
1097 ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1098
1099 SaturateWithBlockingTasks(NestedBlockingType(BlockingType::MAY_BLOCK,
1100 OptionalBlockingType::NO_BLOCK,
1101 BlockingType::MAY_BLOCK));
1102 PlatformThread::Sleep(
1103 2 * thread_group_->blocked_workers_poll_period_for_testing());
1104 EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), kMaxTasks);
1105 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1106
1107 UnblockBlockingTasks();
1108 task_tracker_.FlushForTesting();
1109 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1110 }
1111
1112 // Verify that if a BEST_EFFORT task enters the scope of a WILL_BLOCK
1113 // ScopedBlockingCall, but exits the scope before the MayBlock threshold is
1114 // reached, that the max best effort tasks does not increase.
TEST_F(ThreadGroupSemaphoreBlockingTest,ThreadBlockUnblockPrematureBestEffort)1115 TEST_F(ThreadGroupSemaphoreBlockingTest,
1116 ThreadBlockUnblockPrematureBestEffort) {
1117 // Create a thread group with an infinite MayBlock threshold so that a
1118 // MAY_BLOCK ScopedBlockingCall never increases the max tasks.
1119 CreateAndStartThreadGroup(TimeDelta::Max(), // |suggested_reclaim_time|
1120 kMaxTasks, // |max_tasks|
1121 kMaxTasks, // |max_best_effort_tasks|
1122 nullptr, // |worker_observer|
1123 TimeDelta::Max()); // |may_block_threshold|
1124
1125 ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1126 ASSERT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
1127
1128 SaturateWithBlockingTasks(NestedBlockingType(BlockingType::WILL_BLOCK,
1129 OptionalBlockingType::NO_BLOCK,
1130 BlockingType::WILL_BLOCK),
1131 TaskPriority::BEST_EFFORT);
1132 PlatformThread::Sleep(
1133 2 * thread_group_->blocked_workers_poll_period_for_testing());
1134 EXPECT_GE(thread_group_->NumberOfWorkersForTesting(), kMaxTasks);
1135 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), 2 * kMaxTasks);
1136 EXPECT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
1137
1138 UnblockBlockingTasks();
1139 task_tracker_.FlushForTesting();
1140 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1141 EXPECT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
1142 }
1143
1144 // Verify that if max tasks is incremented because of a MAY_BLOCK
1145 // ScopedBlockingCall, it isn't incremented again when there is a nested
1146 // WILL_BLOCK ScopedBlockingCall.
TEST_F(ThreadGroupSemaphoreBlockingTest,MayBlockIncreaseCapacityNestedWillBlock)1147 TEST_F(ThreadGroupSemaphoreBlockingTest,
1148 MayBlockIncreaseCapacityNestedWillBlock) {
1149 CreateAndStartThreadGroup();
1150
1151 ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1152 auto task_runner =
1153 test::CreatePooledTaskRunner({MayBlock(), WithBaseSyncPrimitives()},
1154 &mock_pooled_task_runner_delegate_);
1155 TestWaitableEvent can_return;
1156
1157 // Saturate the thread group so that a MAY_BLOCK ScopedBlockingCall would
1158 // increment the max tasks.
1159 for (size_t i = 0; i < kMaxTasks - 1; ++i) {
1160 task_runner->PostTask(
1161 FROM_HERE, BindOnce(&TestWaitableEvent::Wait, Unretained(&can_return)));
1162 }
1163
1164 TestWaitableEvent can_instantiate_will_block;
1165 TestWaitableEvent did_instantiate_will_block;
1166
1167 // Post a task that instantiates a MAY_BLOCK ScopedBlockingCall.
1168 task_runner->PostTask(
1169 FROM_HERE,
1170 BindOnce(
1171 [](TestWaitableEvent* can_instantiate_will_block,
1172 TestWaitableEvent* did_instantiate_will_block,
1173 TestWaitableEvent* can_return) {
1174 ScopedBlockingCall may_block(FROM_HERE, BlockingType::MAY_BLOCK);
1175 can_instantiate_will_block->Wait();
1176 ScopedBlockingCall will_block(FROM_HERE, BlockingType::WILL_BLOCK);
1177 did_instantiate_will_block->Signal();
1178 can_return->Wait();
1179 },
1180 Unretained(&can_instantiate_will_block),
1181 Unretained(&did_instantiate_will_block), Unretained(&can_return)));
1182
1183 // After a short delay, max tasks should be incremented.
1184 ExpectMaxTasksIncreasesTo(kMaxTasks + 1);
1185
1186 // Wait until the task instantiates a WILL_BLOCK ScopedBlockingCall.
1187 can_instantiate_will_block.Signal();
1188 did_instantiate_will_block.Wait();
1189
1190 // Max tasks shouldn't be incremented again.
1191 EXPECT_EQ(kMaxTasks + 1, thread_group_->GetMaxTasksForTesting());
1192
1193 // Tear down.
1194 can_return.Signal();
1195 task_tracker_.FlushForTesting();
1196 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1197 }
1198
1199 // Verify that OnShutdownStarted() causes max tasks to increase and creates a
1200 // worker if needed. Also verify that UnblockBusyTasks() decreases max tasks
1201 // after an increase.
TEST_F(ThreadGroupSemaphoreBlockingTest,ThreadBusyShutdown)1202 TEST_F(ThreadGroupSemaphoreBlockingTest, ThreadBusyShutdown) {
1203 CreateAndStartThreadGroup();
1204 ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1205
1206 SaturateWithBusyTasks(TaskPriority::BEST_EFFORT,
1207 TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN);
1208 thread_group_->OnShutdownStarted();
1209
1210 // Forces |kMaxTasks| extra workers to be instantiated by posting tasks. This
1211 // should not block forever.
1212 SaturateWithBusyTasks(TaskPriority::BEST_EFFORT,
1213 TaskShutdownBehavior::BLOCK_SHUTDOWN);
1214
1215 EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
1216
1217 UnblockBusyTasks();
1218 task_tracker_.FlushForTesting();
1219 thread_group_->JoinForTesting();
1220 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1221 mock_pooled_task_runner_delegate_.SetThreadGroup(nullptr);
1222 thread_group_.reset();
1223 }
1224
1225 enum class ReclaimType { DELAYED_RECLAIM, NO_RECLAIM };
1226
1227 class ThreadGroupSemaphoreOverCapacityTest
1228 : public ThreadGroupSemaphoreImplTestBase,
1229 public testing::TestWithParam<ReclaimType> {
1230 public:
1231 ThreadGroupSemaphoreOverCapacityTest() = default;
1232 ThreadGroupSemaphoreOverCapacityTest(
1233 const ThreadGroupSemaphoreOverCapacityTest&) = delete;
1234 ThreadGroupSemaphoreOverCapacityTest& operator=(
1235 const ThreadGroupSemaphoreOverCapacityTest&) = delete;
1236
SetUp()1237 void SetUp() override {
1238 if (GetParam() == ReclaimType::NO_RECLAIM) {
1239 feature_list.InitAndEnableFeature(kNoWorkerThreadReclaim);
1240 }
1241 CreateThreadGroup();
1242 task_runner_ =
1243 test::CreatePooledTaskRunner({MayBlock(), WithBaseSyncPrimitives()},
1244 &mock_pooled_task_runner_delegate_);
1245 }
1246
TearDown()1247 void TearDown() override {
1248 ThreadGroupSemaphoreImplTestBase::CommonTearDown();
1249 }
1250
1251 protected:
1252 base::test::ScopedFeatureList feature_list;
1253 scoped_refptr<TaskRunner> task_runner_;
1254 static constexpr size_t kLocalMaxTasks = 3;
1255
CreateThreadGroup()1256 void CreateThreadGroup() {
1257 ASSERT_FALSE(thread_group_);
1258 service_thread_.Start();
1259 delayed_task_manager_.Start(service_thread_.task_runner());
1260 thread_group_ = std::make_unique<ThreadGroupSemaphore>(
1261 "OverCapacityTestThreadGroup", "A", ThreadType::kDefault,
1262 task_tracker_.GetTrackedRef(), tracked_ref_factory_.GetTrackedRef());
1263 ASSERT_TRUE(thread_group_);
1264
1265 mock_pooled_task_runner_delegate_.SetThreadGroup(thread_group_.get());
1266 }
1267 };
1268
1269 // Verify that workers that become idle due to the thread group being over
1270 // capacity will eventually cleanup.
TEST_P(ThreadGroupSemaphoreOverCapacityTest,VerifyCleanup)1271 TEST_P(ThreadGroupSemaphoreOverCapacityTest, VerifyCleanup) {
1272 StartThreadGroup(kReclaimTimeForCleanupTests, kLocalMaxTasks);
1273 TestWaitableEvent threads_running;
1274 TestWaitableEvent threads_continue;
1275 RepeatingClosure threads_running_barrier = BarrierClosure(
1276 kLocalMaxTasks,
1277 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
1278
1279 TestWaitableEvent blocked_call_continue;
1280 RepeatingClosure closure = BindRepeating(
1281 [](RepeatingClosure* threads_running_barrier,
1282 TestWaitableEvent* threads_continue,
1283 TestWaitableEvent* blocked_call_continue) {
1284 threads_running_barrier->Run();
1285 {
1286 ScopedBlockingCall scoped_blocking_call(FROM_HERE,
1287 BlockingType::WILL_BLOCK);
1288 blocked_call_continue->Wait();
1289 }
1290 threads_continue->Wait();
1291 },
1292 Unretained(&threads_running_barrier), Unretained(&threads_continue),
1293 Unretained(&blocked_call_continue));
1294
1295 for (size_t i = 0; i < kLocalMaxTasks; ++i) {
1296 task_runner_->PostTask(FROM_HERE, closure);
1297 }
1298
1299 threads_running.Wait();
1300
1301 TestWaitableEvent extra_threads_running;
1302 TestWaitableEvent extra_threads_continue;
1303
1304 RepeatingClosure extra_threads_running_barrier = BarrierClosure(
1305 kLocalMaxTasks,
1306 BindOnce(&TestWaitableEvent::Signal, Unretained(&extra_threads_running)));
1307 // These tasks should run on the new threads from increasing max tasks.
1308 for (size_t i = 0; i < kLocalMaxTasks; ++i) {
1309 task_runner_->PostTask(
1310 FROM_HERE, BindOnce(
1311 [](RepeatingClosure* extra_threads_running_barrier,
1312 TestWaitableEvent* extra_threads_continue) {
1313 extra_threads_running_barrier->Run();
1314 extra_threads_continue->Wait();
1315 },
1316 Unretained(&extra_threads_running_barrier),
1317 Unretained(&extra_threads_continue)));
1318 }
1319 extra_threads_running.Wait();
1320
1321 ASSERT_EQ(kLocalMaxTasks * 2, thread_group_->NumberOfWorkersForTesting());
1322 EXPECT_EQ(kLocalMaxTasks * 2, thread_group_->GetMaxTasksForTesting());
1323 blocked_call_continue.Signal();
1324 extra_threads_continue.Signal();
1325
1326 // This test in TGS does not post tasks intermittently because semaphores on
1327 // some platforms implement anti-starvation logic, and so a pool which is
1328 // given tasks from time to time will not always run the task on the same
1329 // worker, leading to many of the workers being "used" despite only one worker
1330 // being used at a time.
1331
1332 if (GetParam() == ReclaimType::DELAYED_RECLAIM) {
1333 thread_group_->WaitForWorkersCleanedUpForTesting(kLocalMaxTasks - 1);
1334 EXPECT_GE(kLocalMaxTasks + 1, thread_group_->NumberOfWorkersForTesting());
1335 threads_continue.Signal();
1336 } else {
1337 // When workers are't automatically reclaimed after a delay, blocking tasks
1338 // need to return for extra workers to be cleaned up.
1339 threads_continue.Signal();
1340 thread_group_->WaitForWorkersCleanedUpForTesting(kLocalMaxTasks);
1341 EXPECT_EQ(kLocalMaxTasks, thread_group_->NumberOfWorkersForTesting());
1342 }
1343
1344 threads_continue.Signal();
1345 task_tracker_.FlushForTesting();
1346 }
1347
1348 INSTANTIATE_TEST_SUITE_P(ReclaimType,
1349 ThreadGroupSemaphoreOverCapacityTest,
1350 ::testing::Values(ReclaimType::DELAYED_RECLAIM,
1351 ReclaimType::NO_RECLAIM));
1352
1353 // Verify that the maximum number of workers is 256 and that hitting the max
1354 // leaves the thread group in a valid state with regards to max tasks.
TEST_F(ThreadGroupSemaphoreBlockingTest,MaximumWorkersTest)1355 TEST_F(ThreadGroupSemaphoreBlockingTest, MaximumWorkersTest) {
1356 CreateAndStartThreadGroup();
1357
1358 constexpr size_t kMaxNumberOfWorkers = 256;
1359 constexpr size_t kNumExtraTasks = 10;
1360
1361 TestWaitableEvent early_blocking_threads_running;
1362 RepeatingClosure early_threads_barrier_closure =
1363 BarrierClosure(kMaxNumberOfWorkers,
1364 BindOnce(&TestWaitableEvent::Signal,
1365 Unretained(&early_blocking_threads_running)));
1366
1367 TestWaitableEvent early_threads_finished;
1368 RepeatingClosure early_threads_finished_barrier = BarrierClosure(
1369 kMaxNumberOfWorkers, BindOnce(&TestWaitableEvent::Signal,
1370 Unretained(&early_threads_finished)));
1371
1372 TestWaitableEvent early_release_threads_continue;
1373
1374 // Post ScopedBlockingCall tasks to hit the worker cap.
1375 for (size_t i = 0; i < kMaxNumberOfWorkers; ++i) {
1376 task_runner_->PostTask(
1377 FROM_HERE, BindOnce(
1378 [](RepeatingClosure* early_threads_barrier_closure,
1379 TestWaitableEvent* early_release_threads_continue,
1380 RepeatingClosure* early_threads_finished) {
1381 {
1382 ScopedBlockingCall scoped_blocking_call(
1383 FROM_HERE, BlockingType::WILL_BLOCK);
1384 early_threads_barrier_closure->Run();
1385 early_release_threads_continue->Wait();
1386 }
1387 early_threads_finished->Run();
1388 },
1389 Unretained(&early_threads_barrier_closure),
1390 Unretained(&early_release_threads_continue),
1391 Unretained(&early_threads_finished_barrier)));
1392 }
1393
1394 early_blocking_threads_running.Wait();
1395 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(),
1396 kMaxTasks + kMaxNumberOfWorkers);
1397
1398 TestWaitableEvent late_release_thread_contine;
1399 TestWaitableEvent late_blocking_threads_running;
1400
1401 RepeatingClosure late_threads_barrier_closure = BarrierClosure(
1402 kNumExtraTasks, BindOnce(&TestWaitableEvent::Signal,
1403 Unretained(&late_blocking_threads_running)));
1404
1405 // Posts additional tasks. Note: we should already have |kMaxNumberOfWorkers|
1406 // tasks running. These tasks should not be able to get executed yet as the
1407 // thread group is already at its max worker cap.
1408 for (size_t i = 0; i < kNumExtraTasks; ++i) {
1409 task_runner_->PostTask(
1410 FROM_HERE, BindOnce(
1411 [](RepeatingClosure* late_threads_barrier_closure,
1412 TestWaitableEvent* late_release_thread_contine) {
1413 ScopedBlockingCall scoped_blocking_call(
1414 FROM_HERE, BlockingType::WILL_BLOCK);
1415 late_threads_barrier_closure->Run();
1416 late_release_thread_contine->Wait();
1417 },
1418 Unretained(&late_threads_barrier_closure),
1419 Unretained(&late_release_thread_contine)));
1420 }
1421
1422 // Give time to see if we exceed the max number of workers.
1423 PlatformThread::Sleep(TestTimeouts::tiny_timeout());
1424 EXPECT_LE(thread_group_->NumberOfWorkersForTesting(), kMaxNumberOfWorkers);
1425
1426 early_release_threads_continue.Signal();
1427 early_threads_finished.Wait();
1428 late_blocking_threads_running.Wait();
1429
1430 TestWaitableEvent final_tasks_running;
1431 TestWaitableEvent final_tasks_continue;
1432 RepeatingClosure final_tasks_running_barrier = BarrierClosure(
1433 kMaxTasks,
1434 BindOnce(&TestWaitableEvent::Signal, Unretained(&final_tasks_running)));
1435
1436 // Verify that we are still able to saturate the thread group.
1437 for (size_t i = 0; i < kMaxTasks; ++i) {
1438 task_runner_->PostTask(FROM_HERE,
1439 BindOnce(
1440 [](RepeatingClosure* closure,
1441 TestWaitableEvent* final_tasks_continue) {
1442 closure->Run();
1443 final_tasks_continue->Wait();
1444 },
1445 Unretained(&final_tasks_running_barrier),
1446 Unretained(&final_tasks_continue)));
1447 }
1448 final_tasks_running.Wait();
1449 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks + kNumExtraTasks);
1450 late_release_thread_contine.Signal();
1451 final_tasks_continue.Signal();
1452 task_tracker_.FlushForTesting();
1453 }
1454
1455 // Verify that the maximum number of best-effort tasks that can run concurrently
1456 // is honored.
TEST_F(ThreadGroupSemaphoreImplStartInBodyTest,MaxBestEffortTasks)1457 TEST_F(ThreadGroupSemaphoreImplStartInBodyTest, MaxBestEffortTasks) {
1458 constexpr int kMaxBestEffortTasks = kMaxTasks / 2;
1459 StartThreadGroup(TimeDelta::Max(), // |suggested_reclaim_time|
1460 kMaxTasks, // |max_tasks|
1461 kMaxBestEffortTasks); // |max_best_effort_tasks|
1462 const scoped_refptr<TaskRunner> foreground_runner =
1463 test::CreatePooledTaskRunner({MayBlock()},
1464 &mock_pooled_task_runner_delegate_);
1465 const scoped_refptr<TaskRunner> background_runner =
1466 test::CreatePooledTaskRunner({TaskPriority::BEST_EFFORT, MayBlock()},
1467 &mock_pooled_task_runner_delegate_);
1468
1469 // It should be possible to have |kMaxBestEffortTasks|
1470 // TaskPriority::BEST_EFFORT tasks running concurrently.
1471 TestWaitableEvent best_effort_tasks_running;
1472 TestWaitableEvent unblock_best_effort_tasks;
1473 RepeatingClosure best_effort_tasks_running_barrier = BarrierClosure(
1474 kMaxBestEffortTasks, BindOnce(&TestWaitableEvent::Signal,
1475 Unretained(&best_effort_tasks_running)));
1476
1477 for (int i = 0; i < kMaxBestEffortTasks; ++i) {
1478 background_runner->PostTask(FROM_HERE, base::BindLambdaForTesting([&]() {
1479 best_effort_tasks_running_barrier.Run();
1480 unblock_best_effort_tasks.Wait();
1481 }));
1482 }
1483 best_effort_tasks_running.Wait();
1484
1485 // No more TaskPriority::BEST_EFFORT task should run.
1486 AtomicFlag extra_best_effort_task_can_run;
1487 TestWaitableEvent extra_best_effort_task_running;
1488 background_runner->PostTask(
1489 FROM_HERE, base::BindLambdaForTesting([&]() {
1490 EXPECT_TRUE(extra_best_effort_task_can_run.IsSet());
1491 extra_best_effort_task_running.Signal();
1492 }));
1493
1494 // An extra foreground task should be able to run.
1495 TestWaitableEvent foreground_task_running;
1496 foreground_runner->PostTask(
1497 FROM_HERE, base::BindOnce(&TestWaitableEvent::Signal,
1498 Unretained(&foreground_task_running)));
1499 foreground_task_running.Wait();
1500
1501 // Completion of the TaskPriority::BEST_EFFORT tasks should allow the extra
1502 // TaskPriority::BEST_EFFORT task to run.
1503 extra_best_effort_task_can_run.Set();
1504 unblock_best_effort_tasks.Signal();
1505 extra_best_effort_task_running.Wait();
1506
1507 // Wait for all tasks to complete before exiting to avoid invalid accesses.
1508 task_tracker_.FlushForTesting();
1509 }
1510
1511 // Verify that flooding the thread group with BEST_EFFORT tasks doesn't cause
1512 // the creation of more than |max_best_effort_tasks| + 1 workers.
TEST_F(ThreadGroupSemaphoreImplStartInBodyTest,FloodBestEffortTasksDoesNotCreateTooManyWorkers)1513 TEST_F(ThreadGroupSemaphoreImplStartInBodyTest,
1514 FloodBestEffortTasksDoesNotCreateTooManyWorkers) {
1515 constexpr size_t kMaxBestEffortTasks = kMaxTasks / 2;
1516 StartThreadGroup(TimeDelta::Max(), // |suggested_reclaim_time|
1517 kMaxTasks, // |max_tasks|
1518 kMaxBestEffortTasks); // |max_best_effort_tasks|
1519
1520 const scoped_refptr<TaskRunner> runner =
1521 test::CreatePooledTaskRunner({TaskPriority::BEST_EFFORT, MayBlock()},
1522 &mock_pooled_task_runner_delegate_);
1523
1524 for (size_t i = 0; i < kLargeNumber; ++i) {
1525 runner->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
1526 EXPECT_LE(thread_group_->NumberOfWorkersForTesting(),
1527 kMaxBestEffortTasks + 1);
1528 }));
1529 }
1530
1531 // Wait for all tasks to complete before exiting to avoid invalid accesses.
1532 task_tracker_.FlushForTesting();
1533 }
1534
1535 // Previously, a WILL_BLOCK ScopedBlockingCall unconditionally woke up a worker
1536 // if the priority queue was non-empty. Sometimes, that caused multiple workers
1537 // to be woken up for the same sequence. This test verifies that it is no longer
1538 // the case:
1539 // 1. Post and run task A.
1540 // 2. Post task B from task A.
1541 // 3. Task A enters a WILL_BLOCK ScopedBlockingCall. Once the idle thread is
1542 // created, this should no-op because there are already enough workers
1543 // (previously, a worker would be woken up because the priority queue isn't
1544 // empty).
1545 // 5. Wait for all tasks to complete.
TEST_F(ThreadGroupSemaphoreImplStartInBodyTest,RepeatedWillBlockDoesNotCreateTooManyWorkers)1546 TEST_F(ThreadGroupSemaphoreImplStartInBodyTest,
1547 RepeatedWillBlockDoesNotCreateTooManyWorkers) {
1548 constexpr size_t kNumWorkers = 2U;
1549 StartThreadGroup(TimeDelta::Max(), // |suggested_reclaim_time|
1550 kNumWorkers, // |max_tasks|
1551 std::nullopt); // |max_best_effort_tasks|
1552 const scoped_refptr<TaskRunner> runner = test::CreatePooledTaskRunner(
1553 {MayBlock()}, &mock_pooled_task_runner_delegate_);
1554
1555 for (size_t i = 0; i < kLargeNumber; ++i) {
1556 runner->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
1557 runner->PostTask(
1558 FROM_HERE, BindLambdaForTesting([&]() {
1559 EXPECT_LE(
1560 thread_group_->NumberOfWorkersForTesting(),
1561 kNumWorkers + 1);
1562 }));
1563 // Number of workers should not increase when there is
1564 // enough capacity to accommodate queued and running
1565 // sequences.
1566 ScopedBlockingCall scoped_blocking_call(
1567 FROM_HERE, BlockingType::WILL_BLOCK);
1568 EXPECT_GE(kNumWorkers + 1,
1569 thread_group_->NumberOfWorkersForTesting());
1570 }));
1571 // Wait for all tasks to complete.
1572 task_tracker_.FlushForTesting();
1573 }
1574 }
1575
1576 namespace {
1577
1578 class ThreadGroupSemaphoreBlockingCallAndMaxBestEffortTasksTest
1579 : public ThreadGroupSemaphoreImplTestBase,
1580 public testing::TestWithParam<BlockingType> {
1581 public:
1582 static constexpr int kMaxBestEffortTasks = kMaxTasks / 2;
1583
1584 ThreadGroupSemaphoreBlockingCallAndMaxBestEffortTasksTest() = default;
1585 ThreadGroupSemaphoreBlockingCallAndMaxBestEffortTasksTest(
1586 const ThreadGroupSemaphoreBlockingCallAndMaxBestEffortTasksTest&) =
1587 delete;
1588 ThreadGroupSemaphoreBlockingCallAndMaxBestEffortTasksTest& operator=(
1589 const ThreadGroupSemaphoreBlockingCallAndMaxBestEffortTasksTest&) =
1590 delete;
1591
SetUp()1592 void SetUp() override {
1593 CreateThreadGroup();
1594 thread_group_->Start(kMaxTasks, kMaxBestEffortTasks, base::TimeDelta::Max(),
1595 service_thread_.task_runner(), nullptr,
1596 ThreadGroup::WorkerEnvironment::NONE,
1597 /*synchronous_thread_start_for_testing=*/false,
1598 /*may_block_threshold=*/{});
1599 }
1600
TearDown()1601 void TearDown() override {
1602 ThreadGroupSemaphoreImplTestBase::CommonTearDown();
1603 }
1604
1605 private:
1606 };
1607
1608 } // namespace
1609
TEST_P(ThreadGroupSemaphoreBlockingCallAndMaxBestEffortTasksTest,BlockingCallAndMaxBestEffortTasksTest)1610 TEST_P(ThreadGroupSemaphoreBlockingCallAndMaxBestEffortTasksTest,
1611 BlockingCallAndMaxBestEffortTasksTest) {
1612 const scoped_refptr<TaskRunner> background_runner =
1613 test::CreatePooledTaskRunner({TaskPriority::BEST_EFFORT, MayBlock()},
1614 &mock_pooled_task_runner_delegate_);
1615
1616 // Post |kMaxBestEffortTasks| TaskPriority::BEST_EFFORT tasks that block in a
1617 // ScopedBlockingCall.
1618 TestWaitableEvent blocking_best_effort_tasks_running;
1619 TestWaitableEvent unblock_blocking_best_effort_tasks;
1620 RepeatingClosure blocking_best_effort_tasks_running_barrier =
1621 BarrierClosure(kMaxBestEffortTasks,
1622 BindOnce(&TestWaitableEvent::Signal,
1623 Unretained(&blocking_best_effort_tasks_running)));
1624 for (int i = 0; i < kMaxBestEffortTasks; ++i) {
1625 background_runner->PostTask(
1626 FROM_HERE, base::BindLambdaForTesting([&]() {
1627 blocking_best_effort_tasks_running_barrier.Run();
1628 ScopedBlockingCall scoped_blocking_call(FROM_HERE, GetParam());
1629 unblock_blocking_best_effort_tasks.Wait();
1630 }));
1631 }
1632 blocking_best_effort_tasks_running.Wait();
1633
1634 // Post an extra |kMaxBestEffortTasks| TaskPriority::BEST_EFFORT tasks. They
1635 // should be able to run, because the existing TaskPriority::BEST_EFFORT tasks
1636 // are blocked within a ScopedBlockingCall.
1637 //
1638 // Note: We block the tasks until they have all started running to make sure
1639 // that it is possible to run an extra |kMaxBestEffortTasks| concurrently.
1640 TestWaitableEvent best_effort_tasks_running;
1641 TestWaitableEvent unblock_best_effort_tasks;
1642 RepeatingClosure best_effort_tasks_running_barrier = BarrierClosure(
1643 kMaxBestEffortTasks, BindOnce(&TestWaitableEvent::Signal,
1644 Unretained(&best_effort_tasks_running)));
1645 for (int i = 0; i < kMaxBestEffortTasks; ++i) {
1646 background_runner->PostTask(FROM_HERE, base::BindLambdaForTesting([&]() {
1647 best_effort_tasks_running_barrier.Run();
1648 unblock_best_effort_tasks.Wait();
1649 }));
1650 }
1651 best_effort_tasks_running.Wait();
1652
1653 // Unblock all tasks and tear down.
1654 unblock_blocking_best_effort_tasks.Signal();
1655 unblock_best_effort_tasks.Signal();
1656 task_tracker_.FlushForTesting();
1657 }
1658
1659 INSTANTIATE_TEST_SUITE_P(
1660 MayBlock,
1661 ThreadGroupSemaphoreBlockingCallAndMaxBestEffortTasksTest,
1662 ::testing::Values(BlockingType::MAY_BLOCK));
1663 INSTANTIATE_TEST_SUITE_P(
1664 WillBlock,
1665 ThreadGroupSemaphoreBlockingCallAndMaxBestEffortTasksTest,
1666 ::testing::Values(BlockingType::WILL_BLOCK));
1667
1668 // Verify that worker detachment doesn't race with worker cleanup, regression
1669 // test for https://crbug.com/810464.
TEST_F(ThreadGroupSemaphoreImplStartInBodyTest,RacyCleanup)1670 TEST_F(ThreadGroupSemaphoreImplStartInBodyTest, RacyCleanup) {
1671 constexpr size_t kLocalMaxTasks = 256;
1672 constexpr TimeDelta kReclaimTimeForRacyCleanupTest = Milliseconds(10);
1673
1674 thread_group_->Start(kLocalMaxTasks, kLocalMaxTasks,
1675 kReclaimTimeForRacyCleanupTest,
1676 service_thread_.task_runner(), nullptr,
1677 ThreadGroup::WorkerEnvironment::NONE,
1678 /*synchronous_thread_start_for_testing=*/false,
1679 /*may_block_threshold=*/{});
1680
1681 scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
1682 {WithBaseSyncPrimitives()}, &mock_pooled_task_runner_delegate_);
1683
1684 TestWaitableEvent threads_running;
1685 TestWaitableEvent unblock_threads;
1686 RepeatingClosure threads_running_barrier = BarrierClosure(
1687 kLocalMaxTasks,
1688 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
1689
1690 for (size_t i = 0; i < kLocalMaxTasks; ++i) {
1691 task_runner->PostTask(
1692 FROM_HERE,
1693 BindOnce(
1694 [](OnceClosure on_running, TestWaitableEvent* unblock_threads) {
1695 std::move(on_running).Run();
1696 unblock_threads->Wait();
1697 },
1698 threads_running_barrier, Unretained(&unblock_threads)));
1699 }
1700
1701 // Wait for all workers to be ready and release them all at once.
1702 threads_running.Wait();
1703 unblock_threads.Signal();
1704
1705 // Sleep to wakeup precisely when all workers are going to try to cleanup per
1706 // being idle.
1707 PlatformThread::Sleep(kReclaimTimeForRacyCleanupTest);
1708
1709 thread_group_->JoinForTesting();
1710
1711 // Unwinding this test will be racy if worker cleanup can race with
1712 // ThreadGroupSemaphore destruction : https://crbug.com/810464.
1713 mock_pooled_task_runner_delegate_.SetThreadGroup(nullptr);
1714 thread_group_.reset();
1715 }
1716
1717 } // namespace internal
1718 } // namespace base
1719