1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/thread_pool/thread_group_impl.h"
6
7 #include <stddef.h>
8
9 #include <algorithm>
10 #include <atomic>
11 #include <memory>
12 #include <optional>
13 #include <unordered_set>
14 #include <utility>
15 #include <vector>
16
17 #include "base/atomicops.h"
18 #include "base/barrier_closure.h"
19 #include "base/functional/bind.h"
20 #include "base/functional/callback.h"
21 #include "base/functional/callback_helpers.h"
22 #include "base/memory/ptr_util.h"
23 #include "base/memory/raw_ptr.h"
24 #include "base/memory/ref_counted.h"
25 #include "base/metrics/statistics_recorder.h"
26 #include "base/synchronization/atomic_flag.h"
27 #include "base/synchronization/condition_variable.h"
28 #include "base/synchronization/lock.h"
29 #include "base/task/task_features.h"
30 #include "base/task/task_runner.h"
31 #include "base/task/thread_pool/delayed_task_manager.h"
32 #include "base/task/thread_pool/environment_config.h"
33 #include "base/task/thread_pool/pooled_task_runner_delegate.h"
34 #include "base/task/thread_pool/sequence.h"
35 #include "base/task/thread_pool/task_source_sort_key.h"
36 #include "base/task/thread_pool/task_tracker.h"
37 #include "base/task/thread_pool/test_task_factory.h"
38 #include "base/task/thread_pool/test_utils.h"
39 #include "base/task/thread_pool/worker_thread_observer.h"
40 #include "base/test/bind.h"
41 #include "base/test/gtest_util.h"
42 #include "base/test/scoped_feature_list.h"
43 #include "base/test/test_simple_task_runner.h"
44 #include "base/test/test_timeouts.h"
45 #include "base/test/test_waitable_event.h"
46 #include "base/threading/platform_thread.h"
47 #include "base/threading/scoped_blocking_call.h"
48 #include "base/threading/simple_thread.h"
49 #include "base/threading/thread.h"
50 #include "base/threading/thread_checker_impl.h"
51 #include "base/time/time.h"
52 #include "base/timer/timer.h"
53 #include "build/build_config.h"
54 #include "testing/gtest/include/gtest/gtest.h"
55
56 namespace base {
57 namespace internal {
58 namespace {
59
60 constexpr size_t kMaxTasks = 4;
61 constexpr size_t kNumThreadsPostingTasks = 4;
62 constexpr size_t kNumTasksPostedPerThread = 150;
63 // This can't be lower because Windows' TestWaitableEvent wakes up too early
64 // when a small timeout is used. This results in many spurious wake ups before a
65 // worker is allowed to cleanup.
66 constexpr TimeDelta kReclaimTimeForCleanupTests = Milliseconds(500);
67 constexpr size_t kLargeNumber = 512;
68
69 class ThreadGroupImplImplTestBase : public ThreadGroup::Delegate {
70 public:
71 ThreadGroupImplImplTestBase(const ThreadGroupImplImplTestBase&) = delete;
72 ThreadGroupImplImplTestBase& operator=(const ThreadGroupImplImplTestBase&) =
73 delete;
74
75 protected:
ThreadGroupImplImplTestBase()76 ThreadGroupImplImplTestBase()
77 : service_thread_("ThreadPoolServiceThread"),
78 tracked_ref_factory_(this) {}
79
CommonTearDown()80 void CommonTearDown() {
81 delayed_task_manager_.Shutdown();
82 service_thread_.Stop();
83 task_tracker_.FlushForTesting();
84 if (thread_group_)
85 thread_group_->JoinForTesting();
86 mock_pooled_task_runner_delegate_.SetThreadGroup(nullptr);
87 thread_group_.reset();
88 }
89
CreateThreadGroup(ThreadType thread_type=ThreadType::kDefault)90 void CreateThreadGroup(ThreadType thread_type = ThreadType::kDefault) {
91 ASSERT_FALSE(thread_group_);
92 service_thread_.Start();
93 delayed_task_manager_.Start(service_thread_.task_runner());
94 thread_group_ = std::make_unique<ThreadGroupImpl>(
95 "TestThreadGroup", "A", thread_type, task_tracker_.GetTrackedRef(),
96 tracked_ref_factory_.GetTrackedRef());
97 ASSERT_TRUE(thread_group_);
98
99 mock_pooled_task_runner_delegate_.SetThreadGroup(thread_group_.get());
100 }
101
StartThreadGroup(TimeDelta suggested_reclaim_time,size_t max_tasks,std::optional<int> max_best_effort_tasks=std::nullopt,WorkerThreadObserver * worker_observer=nullptr,std::optional<TimeDelta> may_block_threshold=std::nullopt)102 void StartThreadGroup(
103 TimeDelta suggested_reclaim_time,
104 size_t max_tasks,
105 std::optional<int> max_best_effort_tasks = std::nullopt,
106 WorkerThreadObserver* worker_observer = nullptr,
107 std::optional<TimeDelta> may_block_threshold = std::nullopt) {
108 ASSERT_TRUE(thread_group_);
109 thread_group_->Start(
110 max_tasks,
111 max_best_effort_tasks ? max_best_effort_tasks.value() : max_tasks,
112 suggested_reclaim_time, service_thread_.task_runner(), worker_observer,
113 ThreadGroup::WorkerEnvironment::NONE,
114 /* synchronous_thread_start_for_testing=*/false, may_block_threshold);
115 }
116
CreateAndStartThreadGroup(TimeDelta suggested_reclaim_time=TimeDelta::Max (),size_t max_tasks=kMaxTasks,std::optional<int> max_best_effort_tasks=std::nullopt,WorkerThreadObserver * worker_observer=nullptr,std::optional<TimeDelta> may_block_threshold=std::nullopt)117 void CreateAndStartThreadGroup(
118 TimeDelta suggested_reclaim_time = TimeDelta::Max(),
119 size_t max_tasks = kMaxTasks,
120 std::optional<int> max_best_effort_tasks = std::nullopt,
121 WorkerThreadObserver* worker_observer = nullptr,
122 std::optional<TimeDelta> may_block_threshold = std::nullopt) {
123 CreateThreadGroup();
124 StartThreadGroup(suggested_reclaim_time, max_tasks, max_best_effort_tasks,
125 worker_observer, may_block_threshold);
126 }
127
128 Thread service_thread_;
129 TaskTracker task_tracker_;
130 std::unique_ptr<ThreadGroupImpl> thread_group_;
131 DelayedTaskManager delayed_task_manager_;
132 TrackedRefFactory<ThreadGroup::Delegate> tracked_ref_factory_;
133 test::MockPooledTaskRunnerDelegate mock_pooled_task_runner_delegate_ = {
134 task_tracker_.GetTrackedRef(), &delayed_task_manager_};
135
136 private:
137 // ThreadGroup::Delegate:
GetThreadGroupForTraits(const TaskTraits & traits)138 ThreadGroup* GetThreadGroupForTraits(const TaskTraits& traits) override {
139 return thread_group_.get();
140 }
141 };
142
143 class ThreadGroupImplImplTest : public ThreadGroupImplImplTestBase,
144 public testing::Test {
145 public:
146 ThreadGroupImplImplTest(const ThreadGroupImplImplTest&) = delete;
147 ThreadGroupImplImplTest& operator=(const ThreadGroupImplImplTest&) = delete;
148
149 protected:
150 ThreadGroupImplImplTest() = default;
151
SetUp()152 void SetUp() override { CreateAndStartThreadGroup(); }
153
TearDown()154 void TearDown() override { ThreadGroupImplImplTestBase::CommonTearDown(); }
155 };
156
157 class ThreadGroupImplImplTestParam
158 : public ThreadGroupImplImplTestBase,
159 public testing::TestWithParam<TaskSourceExecutionMode> {
160 public:
161 ThreadGroupImplImplTestParam(const ThreadGroupImplImplTestParam&) = delete;
162 ThreadGroupImplImplTestParam& operator=(const ThreadGroupImplImplTestParam&) =
163 delete;
164
165 protected:
166 ThreadGroupImplImplTestParam() = default;
167
SetUp()168 void SetUp() override { CreateAndStartThreadGroup(); }
169
TearDown()170 void TearDown() override { ThreadGroupImplImplTestBase::CommonTearDown(); }
171 };
172
173 using PostNestedTask = test::TestTaskFactory::PostNestedTask;
174
175 class ThreadPostingTasksWaitIdle : public SimpleThread {
176 public:
177 // Constructs a thread that posts tasks to |thread_group| through an
178 // |execution_mode| task runner. The thread waits until all workers in
179 // |thread_group| are idle before posting a new task.
ThreadPostingTasksWaitIdle(ThreadGroupImpl * thread_group,test::MockPooledTaskRunnerDelegate * mock_pooled_task_runner_delegate_,TaskSourceExecutionMode execution_mode)180 ThreadPostingTasksWaitIdle(
181 ThreadGroupImpl* thread_group,
182 test::MockPooledTaskRunnerDelegate* mock_pooled_task_runner_delegate_,
183 TaskSourceExecutionMode execution_mode)
184 : SimpleThread("ThreadPostingTasksWaitIdle"),
185 thread_group_(thread_group),
186 factory_(CreatePooledTaskRunnerWithExecutionMode(
187 execution_mode,
188 mock_pooled_task_runner_delegate_),
189 execution_mode) {
190 DCHECK(thread_group_);
191 }
192 ThreadPostingTasksWaitIdle(const ThreadPostingTasksWaitIdle&) = delete;
193 ThreadPostingTasksWaitIdle& operator=(const ThreadPostingTasksWaitIdle&) =
194 delete;
195
factory() const196 const test::TestTaskFactory* factory() const { return &factory_; }
197
198 private:
Run()199 void Run() override {
200 for (size_t i = 0; i < kNumTasksPostedPerThread; ++i) {
201 thread_group_->WaitForAllWorkersIdleForTesting();
202 EXPECT_TRUE(factory_.PostTask(PostNestedTask::NO, OnceClosure()));
203 }
204 }
205
206 const raw_ptr<ThreadGroupImpl> thread_group_;
207 const scoped_refptr<TaskRunner> task_runner_;
208 test::TestTaskFactory factory_;
209 };
210
211 } // namespace
212
TEST_P(ThreadGroupImplImplTestParam,PostTasksWaitAllWorkersIdle)213 TEST_P(ThreadGroupImplImplTestParam, PostTasksWaitAllWorkersIdle) {
214 // Create threads to post tasks. To verify that workers can sleep and be woken
215 // up when new tasks are posted, wait for all workers to become idle before
216 // posting a new task.
217 std::vector<std::unique_ptr<ThreadPostingTasksWaitIdle>>
218 threads_posting_tasks;
219 for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
220 threads_posting_tasks.push_back(
221 std::make_unique<ThreadPostingTasksWaitIdle>(
222 thread_group_.get(), &mock_pooled_task_runner_delegate_,
223 GetParam()));
224 threads_posting_tasks.back()->Start();
225 }
226
227 // Wait for all tasks to run.
228 for (const auto& thread_posting_tasks : threads_posting_tasks) {
229 thread_posting_tasks->Join();
230 thread_posting_tasks->factory()->WaitForAllTasksToRun();
231 }
232
233 // Wait until all workers are idle to be sure that no task accesses its
234 // TestTaskFactory after |thread_posting_tasks| is destroyed.
235 thread_group_->WaitForAllWorkersIdleForTesting();
236 }
237
TEST_P(ThreadGroupImplImplTestParam,PostTasksWithOneAvailableWorker)238 TEST_P(ThreadGroupImplImplTestParam, PostTasksWithOneAvailableWorker) {
239 // Post blocking tasks to keep all workers busy except one until |event| is
240 // signaled. Use different factories so that tasks are added to different
241 // sequences and can run simultaneously when the execution mode is SEQUENCED.
242 TestWaitableEvent event;
243 std::vector<std::unique_ptr<test::TestTaskFactory>> blocked_task_factories;
244 for (size_t i = 0; i < (kMaxTasks - 1); ++i) {
245 blocked_task_factories.push_back(std::make_unique<test::TestTaskFactory>(
246 CreatePooledTaskRunnerWithExecutionMode(
247 GetParam(), &mock_pooled_task_runner_delegate_),
248 GetParam()));
249 EXPECT_TRUE(blocked_task_factories.back()->PostTask(
250 PostNestedTask::NO,
251 BindOnce(&TestWaitableEvent::Wait, Unretained(&event))));
252 blocked_task_factories.back()->WaitForAllTasksToRun();
253 }
254
255 // Post |kNumTasksPostedPerThread| tasks that should all run despite the fact
256 // that only one worker in |thread_group_| isn't busy.
257 test::TestTaskFactory short_task_factory(
258 CreatePooledTaskRunnerWithExecutionMode(
259 GetParam(), &mock_pooled_task_runner_delegate_),
260 GetParam());
261 for (size_t i = 0; i < kNumTasksPostedPerThread; ++i)
262 EXPECT_TRUE(short_task_factory.PostTask(PostNestedTask::NO, OnceClosure()));
263 short_task_factory.WaitForAllTasksToRun();
264
265 // Release tasks waiting on |event|.
266 event.Signal();
267
268 // Wait until all workers are idle to be sure that no task accesses
269 // its TestTaskFactory after it is destroyed.
270 thread_group_->WaitForAllWorkersIdleForTesting();
271 }
272
TEST_P(ThreadGroupImplImplTestParam,Saturate)273 TEST_P(ThreadGroupImplImplTestParam, Saturate) {
274 // Verify that it is possible to have |kMaxTasks| tasks/sequences running
275 // simultaneously. Use different factories so that the blocking tasks are
276 // added to different sequences and can run simultaneously when the execution
277 // mode is SEQUENCED.
278 TestWaitableEvent event;
279 std::vector<std::unique_ptr<test::TestTaskFactory>> factories;
280 for (size_t i = 0; i < kMaxTasks; ++i) {
281 factories.push_back(std::make_unique<test::TestTaskFactory>(
282 CreatePooledTaskRunnerWithExecutionMode(
283 GetParam(), &mock_pooled_task_runner_delegate_),
284 GetParam()));
285 EXPECT_TRUE(factories.back()->PostTask(
286 PostNestedTask::NO,
287 BindOnce(&TestWaitableEvent::Wait, Unretained(&event))));
288 factories.back()->WaitForAllTasksToRun();
289 }
290
291 // Release tasks waiting on |event|.
292 event.Signal();
293
294 // Wait until all workers are idle to be sure that no task accesses
295 // its TestTaskFactory after it is destroyed.
296 thread_group_->WaitForAllWorkersIdleForTesting();
297 }
298
299 // Verifies that ShouldYield() returns true for priorities lower than the
300 // highest priority pending while the thread group is flooded with USER_VISIBLE
301 // tasks.
TEST_F(ThreadGroupImplImplTest,ShouldYieldFloodedUserVisible)302 TEST_F(ThreadGroupImplImplTest, ShouldYieldFloodedUserVisible) {
303 TestWaitableEvent threads_running;
304 TestWaitableEvent threads_continue;
305
306 // Saturate workers with USER_VISIBLE tasks to ensure ShouldYield() returns
307 // true when a tasks of higher priority is posted.
308 RepeatingClosure threads_running_barrier = BarrierClosure(
309 kMaxTasks,
310 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
311
312 auto job_task = base::MakeRefCounted<test::MockJobTask>(
313 BindLambdaForTesting(
314 [&threads_running_barrier, &threads_continue](JobDelegate* delegate) {
315 threads_running_barrier.Run();
316 threads_continue.Wait();
317 }),
318 /* num_tasks_to_run */ kMaxTasks);
319 scoped_refptr<JobTaskSource> task_source =
320 job_task->GetJobTaskSource(FROM_HERE, {TaskPriority::USER_VISIBLE},
321 &mock_pooled_task_runner_delegate_);
322
323 auto registered_task_source = task_tracker_.RegisterTaskSource(task_source);
324 ASSERT_TRUE(registered_task_source);
325 static_cast<ThreadGroup*>(thread_group_.get())
326 ->PushTaskSourceAndWakeUpWorkers(
327 RegisteredTaskSourceAndTransaction::FromTaskSource(
328 std::move(registered_task_source)));
329
330 threads_running.Wait();
331
332 // Posting a BEST_EFFORT task should not cause any other tasks to yield.
333 // Once this task gets to run, no other task needs to yield.
334 // Note: This is only true because this test is using a single ThreadGroup.
335 // Under the ThreadPool this wouldn't be racy because BEST_EFFORT tasks
336 // run in an independent ThreadGroup.
337 test::CreatePooledTaskRunner({TaskPriority::BEST_EFFORT},
338 &mock_pooled_task_runner_delegate_)
339 ->PostTask(
340 FROM_HERE, BindLambdaForTesting([&]() {
341 EXPECT_FALSE(thread_group_->ShouldYield(
342 {TaskPriority::BEST_EFFORT, TimeTicks(), /* worker_count=*/1}));
343 }));
344 // A BEST_EFFORT task with more workers shouldn't have to yield.
345 EXPECT_FALSE(thread_group_->ShouldYield(
346 {TaskPriority::BEST_EFFORT, TimeTicks(), /* worker_count=*/2}));
347 EXPECT_FALSE(thread_group_->ShouldYield(
348 {TaskPriority::BEST_EFFORT, TimeTicks(), /* worker_count=*/0}));
349 EXPECT_FALSE(thread_group_->ShouldYield(
350 {TaskPriority::USER_VISIBLE, TimeTicks(), /* worker_count=*/0}));
351 EXPECT_FALSE(thread_group_->ShouldYield(
352 {TaskPriority::USER_BLOCKING, TimeTicks(), /* worker_count=*/0}));
353
354 // Posting a USER_VISIBLE task should cause BEST_EFFORT and USER_VISIBLE with
355 // higher worker_count tasks to yield.
356 auto post_user_visible = [&]() {
357 test::CreatePooledTaskRunner({TaskPriority::USER_VISIBLE},
358 &mock_pooled_task_runner_delegate_)
359 ->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
360 EXPECT_FALSE(thread_group_->ShouldYield(
361 {TaskPriority::USER_VISIBLE, TimeTicks(),
362 /* worker_count=*/1}));
363 }));
364 };
365 // A USER_VISIBLE task with too many workers should yield.
366 post_user_visible();
367 EXPECT_TRUE(thread_group_->ShouldYield(
368 {TaskPriority::USER_VISIBLE, TimeTicks(), /* worker_count=*/2}));
369 post_user_visible();
370 EXPECT_TRUE(thread_group_->ShouldYield(
371 {TaskPriority::BEST_EFFORT, TimeTicks(), /* worker_count=*/0}));
372 post_user_visible();
373 EXPECT_FALSE(thread_group_->ShouldYield(
374 {TaskPriority::USER_VISIBLE, TimeTicks(), /* worker_count=*/1}));
375 EXPECT_FALSE(thread_group_->ShouldYield(
376 {TaskPriority::USER_BLOCKING, TimeTicks(), /* worker_count=*/0}));
377
378 // Posting a USER_BLOCKING task should cause BEST_EFFORT, USER_VISIBLE and
379 // USER_BLOCKING with higher worker_count tasks to yield.
380 auto post_user_blocking = [&]() {
381 test::CreatePooledTaskRunner({TaskPriority::USER_BLOCKING},
382 &mock_pooled_task_runner_delegate_)
383 ->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
384 // Once this task got to start, no other task needs to
385 // yield.
386 EXPECT_FALSE(thread_group_->ShouldYield(
387 {TaskPriority::USER_BLOCKING, TimeTicks(),
388 /* worker_count=*/1}));
389 }));
390 };
391 // A USER_BLOCKING task with too many workers should have to yield.
392 post_user_blocking();
393 EXPECT_TRUE(thread_group_->ShouldYield(
394 {TaskPriority::USER_BLOCKING, TimeTicks(), /* worker_count=*/2}));
395 post_user_blocking();
396 EXPECT_TRUE(thread_group_->ShouldYield(
397 {TaskPriority::BEST_EFFORT, TimeTicks(), /* worker_count=*/0}));
398 post_user_blocking();
399 EXPECT_TRUE(thread_group_->ShouldYield(
400 {TaskPriority::USER_VISIBLE, TimeTicks(), /* worker_count=*/0}));
401 post_user_blocking();
402 EXPECT_FALSE(thread_group_->ShouldYield(
403 {TaskPriority::USER_BLOCKING, TimeTicks(), /* worker_count=*/1}));
404
405 threads_continue.Signal();
406 task_tracker_.FlushForTesting();
407 }
408
409 INSTANTIATE_TEST_SUITE_P(Parallel,
410 ThreadGroupImplImplTestParam,
411 ::testing::Values(TaskSourceExecutionMode::kParallel));
412 INSTANTIATE_TEST_SUITE_P(
413 Sequenced,
414 ThreadGroupImplImplTestParam,
415 ::testing::Values(TaskSourceExecutionMode::kSequenced));
416
417 INSTANTIATE_TEST_SUITE_P(Job,
418 ThreadGroupImplImplTestParam,
419 ::testing::Values(TaskSourceExecutionMode::kJob));
420
421 namespace {
422
423 class ThreadGroupImplImplStartInBodyTest : public ThreadGroupImplImplTest {
424 public:
SetUp()425 void SetUp() override {
426 CreateThreadGroup();
427 // Let the test start the thread group.
428 }
429 };
430
TaskPostedBeforeStart(PlatformThreadRef * platform_thread_ref,TestWaitableEvent * task_running,TestWaitableEvent * barrier)431 void TaskPostedBeforeStart(PlatformThreadRef* platform_thread_ref,
432 TestWaitableEvent* task_running,
433 TestWaitableEvent* barrier) {
434 *platform_thread_ref = PlatformThread::CurrentRef();
435 task_running->Signal();
436 barrier->Wait();
437 }
438
439 } // namespace
440
441 // Verify that 2 tasks posted before Start() to a ThreadGroupImpl with
442 // more than 2 workers run on different workers when Start() is called.
TEST_F(ThreadGroupImplImplStartInBodyTest,PostTasksBeforeStart)443 TEST_F(ThreadGroupImplImplStartInBodyTest, PostTasksBeforeStart) {
444 PlatformThreadRef task_1_thread_ref;
445 PlatformThreadRef task_2_thread_ref;
446 TestWaitableEvent task_1_running;
447 TestWaitableEvent task_2_running;
448
449 // This event is used to prevent a task from completing before the other task
450 // starts running. If that happened, both tasks could run on the same worker
451 // and this test couldn't verify that the correct number of workers were woken
452 // up.
453 TestWaitableEvent barrier;
454
455 test::CreatePooledTaskRunner({WithBaseSyncPrimitives()},
456 &mock_pooled_task_runner_delegate_)
457 ->PostTask(
458 FROM_HERE,
459 BindOnce(&TaskPostedBeforeStart, Unretained(&task_1_thread_ref),
460 Unretained(&task_1_running), Unretained(&barrier)));
461 test::CreatePooledTaskRunner({WithBaseSyncPrimitives()},
462 &mock_pooled_task_runner_delegate_)
463 ->PostTask(
464 FROM_HERE,
465 BindOnce(&TaskPostedBeforeStart, Unretained(&task_2_thread_ref),
466 Unretained(&task_2_running), Unretained(&barrier)));
467
468 // Workers should not be created and tasks should not run before the thread
469 // group is started.
470 EXPECT_EQ(0U, thread_group_->NumberOfWorkersForTesting());
471 EXPECT_FALSE(task_1_running.IsSignaled());
472 EXPECT_FALSE(task_2_running.IsSignaled());
473
474 StartThreadGroup(TimeDelta::Max(), kMaxTasks);
475
476 // Tasks should run shortly after the thread group is started.
477 task_1_running.Wait();
478 task_2_running.Wait();
479
480 // Tasks should run on different threads.
481 EXPECT_NE(task_1_thread_ref, task_2_thread_ref);
482
483 barrier.Signal();
484 task_tracker_.FlushForTesting();
485 }
486
487 // Verify that posting many tasks before Start will cause the number of workers
488 // to grow to |max_tasks_| after Start.
TEST_F(ThreadGroupImplImplStartInBodyTest,PostManyTasks)489 TEST_F(ThreadGroupImplImplStartInBodyTest, PostManyTasks) {
490 scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
491 {WithBaseSyncPrimitives()}, &mock_pooled_task_runner_delegate_);
492 constexpr size_t kNumTasksPosted = 2 * kMaxTasks;
493
494 TestWaitableEvent threads_running;
495 TestWaitableEvent threads_continue;
496
497 RepeatingClosure threads_running_barrier = BarrierClosure(
498 kMaxTasks,
499 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
500 // Posting these tasks should cause new workers to be created.
501 for (size_t i = 0; i < kMaxTasks; ++i) {
502 task_runner->PostTask(
503 FROM_HERE, BindLambdaForTesting([&]() {
504 threads_running_barrier.Run();
505 threads_continue.Wait();
506 }));
507 }
508 // Post the remaining |kNumTasksPosted - kMaxTasks| tasks, don't wait for them
509 // as they'll be blocked behind the above kMaxtasks.
510 for (size_t i = kMaxTasks; i < kNumTasksPosted; ++i)
511 task_runner->PostTask(FROM_HERE, DoNothing());
512
513 EXPECT_EQ(0U, thread_group_->NumberOfWorkersForTesting());
514
515 StartThreadGroup(TimeDelta::Max(), kMaxTasks);
516 EXPECT_GT(thread_group_->NumberOfWorkersForTesting(), 0U);
517 EXPECT_EQ(kMaxTasks, thread_group_->GetMaxTasksForTesting());
518
519 threads_running.Wait();
520 EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(),
521 thread_group_->GetMaxTasksForTesting());
522 threads_continue.Signal();
523 task_tracker_.FlushForTesting();
524 }
525
526 namespace {
527
528 class BackgroundThreadGroupImplTest : public ThreadGroupImplImplTest {
529 public:
CreateAndStartThreadGroup(TimeDelta suggested_reclaim_time=TimeDelta::Max (),size_t max_tasks=kMaxTasks,std::optional<int> max_best_effort_tasks=std::nullopt,WorkerThreadObserver * worker_observer=nullptr,std::optional<TimeDelta> may_block_threshold=std::nullopt)530 void CreateAndStartThreadGroup(
531 TimeDelta suggested_reclaim_time = TimeDelta::Max(),
532 size_t max_tasks = kMaxTasks,
533 std::optional<int> max_best_effort_tasks = std::nullopt,
534 WorkerThreadObserver* worker_observer = nullptr,
535 std::optional<TimeDelta> may_block_threshold = std::nullopt) {
536 if (!CanUseBackgroundThreadTypeForWorkerThread())
537 return;
538 CreateThreadGroup(ThreadType::kBackground);
539 StartThreadGroup(suggested_reclaim_time, max_tasks, max_best_effort_tasks,
540 worker_observer, may_block_threshold);
541 }
542
SetUp()543 void SetUp() override { CreateAndStartThreadGroup(); }
544 };
545
546 } // namespace
547
548 // Verify that ScopedBlockingCall updates thread type when necessary per
549 // shutdown state.
TEST_F(BackgroundThreadGroupImplTest,UpdatePriorityBlockingStarted)550 TEST_F(BackgroundThreadGroupImplTest, UpdatePriorityBlockingStarted) {
551 if (!CanUseBackgroundThreadTypeForWorkerThread())
552 return;
553
554 const scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
555 {MayBlock(), WithBaseSyncPrimitives(), TaskPriority::BEST_EFFORT},
556 &mock_pooled_task_runner_delegate_);
557
558 TestWaitableEvent threads_running;
559 RepeatingClosure threads_running_barrier = BarrierClosure(
560 kMaxTasks,
561 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
562
563 TestWaitableEvent blocking_threads_continue;
564
565 for (size_t i = 0; i < kMaxTasks; ++i) {
566 task_runner->PostTask(
567 FROM_HERE, BindLambdaForTesting([&]() {
568 EXPECT_EQ(ThreadType::kBackground,
569 PlatformThread::GetCurrentThreadType());
570 {
571 // ScopedBlockingCall before shutdown doesn't affect priority.
572 ScopedBlockingCall scoped_blocking_call(FROM_HERE,
573 BlockingType::MAY_BLOCK);
574 EXPECT_EQ(ThreadType::kBackground,
575 PlatformThread::GetCurrentThreadType());
576 }
577 threads_running_barrier.Run();
578 blocking_threads_continue.Wait();
579 // This is reached after StartShutdown(), at which point we expect
580 // ScopedBlockingCall to update thread priority.
581 ScopedBlockingCall scoped_blocking_call(FROM_HERE,
582 BlockingType::MAY_BLOCK);
583 EXPECT_EQ(ThreadType::kDefault,
584 PlatformThread::GetCurrentThreadType());
585 }));
586 }
587 threads_running.Wait();
588
589 task_tracker_.StartShutdown();
590 blocking_threads_continue.Signal();
591 task_tracker_.FlushForTesting();
592 }
593
594 namespace {
595
596 class ThreadGroupImplStandbyPolicyTest : public ThreadGroupImplImplTestBase,
597 public testing::Test {
598 public:
599 ThreadGroupImplStandbyPolicyTest() = default;
600 ThreadGroupImplStandbyPolicyTest(const ThreadGroupImplStandbyPolicyTest&) =
601 delete;
602 ThreadGroupImplStandbyPolicyTest& operator=(
603 const ThreadGroupImplStandbyPolicyTest&) = delete;
604
SetUp()605 void SetUp() override {
606 CreateAndStartThreadGroup(kReclaimTimeForCleanupTests);
607 }
608
TearDown()609 void TearDown() override { ThreadGroupImplImplTestBase::CommonTearDown(); }
610 };
611
612 } // namespace
613
TEST_F(ThreadGroupImplStandbyPolicyTest,InitOne)614 TEST_F(ThreadGroupImplStandbyPolicyTest, InitOne) {
615 EXPECT_EQ(1U, thread_group_->NumberOfWorkersForTesting());
616 }
617
618 namespace {
619
620 enum class OptionalBlockingType {
621 NO_BLOCK,
622 MAY_BLOCK,
623 WILL_BLOCK,
624 };
625
626 struct NestedBlockingType {
NestedBlockingTypebase::internal::__anon866189150d11::NestedBlockingType627 NestedBlockingType(BlockingType first_in,
628 OptionalBlockingType second_in,
629 BlockingType behaves_as_in)
630 : first(first_in), second(second_in), behaves_as(behaves_as_in) {}
631
632 BlockingType first;
633 OptionalBlockingType second;
634 BlockingType behaves_as;
635 };
636
637 class NestedScopedBlockingCall {
638 public:
NestedScopedBlockingCall(const NestedBlockingType & nested_blocking_type)639 explicit NestedScopedBlockingCall(
640 const NestedBlockingType& nested_blocking_type)
641 : first_scoped_blocking_call_(FROM_HERE, nested_blocking_type.first),
642 second_scoped_blocking_call_(
643 nested_blocking_type.second == OptionalBlockingType::WILL_BLOCK
644 ? std::make_unique<ScopedBlockingCall>(FROM_HERE,
645 BlockingType::WILL_BLOCK)
646 : (nested_blocking_type.second ==
647 OptionalBlockingType::MAY_BLOCK
648 ? std::make_unique<ScopedBlockingCall>(
649 FROM_HERE,
650 BlockingType::MAY_BLOCK)
651 : nullptr)) {}
652 NestedScopedBlockingCall(const NestedScopedBlockingCall&) = delete;
653 NestedScopedBlockingCall& operator=(const NestedScopedBlockingCall&) = delete;
654
655 private:
656 ScopedBlockingCall first_scoped_blocking_call_;
657 std::unique_ptr<ScopedBlockingCall> second_scoped_blocking_call_;
658 };
659
660 } // namespace
661
662 class ThreadGroupImplBlockingTest
663 : public ThreadGroupImplImplTestBase,
664 public testing::TestWithParam<NestedBlockingType> {
665 public:
666 ThreadGroupImplBlockingTest() = default;
667 ThreadGroupImplBlockingTest(const ThreadGroupImplBlockingTest&) = delete;
668 ThreadGroupImplBlockingTest& operator=(const ThreadGroupImplBlockingTest&) =
669 delete;
670
ParamInfoToString(::testing::TestParamInfo<NestedBlockingType> param_info)671 static std::string ParamInfoToString(
672 ::testing::TestParamInfo<NestedBlockingType> param_info) {
673 std::string str = param_info.param.first == BlockingType::MAY_BLOCK
674 ? "MAY_BLOCK"
675 : "WILL_BLOCK";
676 if (param_info.param.second == OptionalBlockingType::MAY_BLOCK)
677 str += "_MAY_BLOCK";
678 else if (param_info.param.second == OptionalBlockingType::WILL_BLOCK)
679 str += "_WILL_BLOCK";
680 return str;
681 }
682
TearDown()683 void TearDown() override { ThreadGroupImplImplTestBase::CommonTearDown(); }
684
685 protected:
686 // Saturates the thread group with a task that first blocks, waits to be
687 // unblocked, then exits.
SaturateWithBlockingTasks(const NestedBlockingType & nested_blocking_type,TaskPriority priority=TaskPriority::USER_BLOCKING)688 void SaturateWithBlockingTasks(
689 const NestedBlockingType& nested_blocking_type,
690 TaskPriority priority = TaskPriority::USER_BLOCKING) {
691 TestWaitableEvent threads_running;
692
693 const scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
694 {MayBlock(), WithBaseSyncPrimitives(), priority},
695 &mock_pooled_task_runner_delegate_);
696
697 RepeatingClosure threads_running_barrier = BarrierClosure(
698 kMaxTasks,
699 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
700
701 for (size_t i = 0; i < kMaxTasks; ++i) {
702 task_runner->PostTask(
703 FROM_HERE, BindLambdaForTesting([this, &threads_running_barrier,
704 nested_blocking_type]() {
705 NestedScopedBlockingCall nested_scoped_blocking_call(
706 nested_blocking_type);
707 threads_running_barrier.Run();
708 blocking_threads_continue_.Wait();
709 }));
710 }
711 threads_running.Wait();
712 }
713
714 // Saturates the thread group with a task that waits for other tasks without
715 // entering a ScopedBlockingCall, then exits.
SaturateWithBusyTasks(TaskPriority priority=TaskPriority::USER_BLOCKING,TaskShutdownBehavior shutdown_behavior=TaskShutdownBehavior::SKIP_ON_SHUTDOWN)716 void SaturateWithBusyTasks(
717 TaskPriority priority = TaskPriority::USER_BLOCKING,
718 TaskShutdownBehavior shutdown_behavior =
719 TaskShutdownBehavior::SKIP_ON_SHUTDOWN) {
720 TestWaitableEvent threads_running;
721
722 const scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
723 {MayBlock(), WithBaseSyncPrimitives(), priority, shutdown_behavior},
724 &mock_pooled_task_runner_delegate_);
725
726 RepeatingClosure threads_running_barrier = BarrierClosure(
727 kMaxTasks,
728 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
729 // Posting these tasks should cause new workers to be created.
730 for (size_t i = 0; i < kMaxTasks; ++i) {
731 task_runner->PostTask(
732 FROM_HERE, BindLambdaForTesting([this, &threads_running_barrier]() {
733 threads_running_barrier.Run();
734 busy_threads_continue_.Wait();
735 }));
736 }
737 threads_running.Wait();
738 }
739
740 // Returns how long we can expect a change to |max_tasks_| to occur
741 // after a task has become blocked.
GetMaxTasksChangeSleepTime()742 TimeDelta GetMaxTasksChangeSleepTime() {
743 return std::max(thread_group_->blocked_workers_poll_period_for_testing(),
744 thread_group_->may_block_threshold_for_testing()) +
745 TestTimeouts::tiny_timeout();
746 }
747
748 // Waits indefinitely, until |thread_group_|'s max tasks increases to
749 // |expected_max_tasks|.
ExpectMaxTasksIncreasesTo(size_t expected_max_tasks)750 void ExpectMaxTasksIncreasesTo(size_t expected_max_tasks) {
751 size_t max_tasks = thread_group_->GetMaxTasksForTesting();
752 while (max_tasks != expected_max_tasks) {
753 PlatformThread::Sleep(GetMaxTasksChangeSleepTime());
754 size_t new_max_tasks = thread_group_->GetMaxTasksForTesting();
755 ASSERT_GE(new_max_tasks, max_tasks);
756 max_tasks = new_max_tasks;
757 }
758 }
759
760 // Unblocks tasks posted by SaturateWithBlockingTasks().
UnblockBlockingTasks()761 void UnblockBlockingTasks() { blocking_threads_continue_.Signal(); }
762
763 // Unblocks tasks posted by SaturateWithBusyTasks().
UnblockBusyTasks()764 void UnblockBusyTasks() { busy_threads_continue_.Signal(); }
765
766 const scoped_refptr<TaskRunner> task_runner_ =
767 test::CreatePooledTaskRunner({MayBlock(), WithBaseSyncPrimitives()},
768 &mock_pooled_task_runner_delegate_);
769
770 private:
771 TestWaitableEvent blocking_threads_continue_;
772 TestWaitableEvent busy_threads_continue_;
773 };
774
775 // Verify that SaturateWithBlockingTasks() causes max tasks to increase and
776 // creates a worker if needed. Also verify that UnblockBlockingTasks() decreases
777 // max tasks after an increase.
TEST_P(ThreadGroupImplBlockingTest,ThreadBlockedUnblocked)778 TEST_P(ThreadGroupImplBlockingTest, ThreadBlockedUnblocked) {
779 CreateAndStartThreadGroup();
780
781 ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
782
783 SaturateWithBlockingTasks(GetParam());
784
785 // Forces |kMaxTasks| extra workers to be instantiated by posting tasks. This
786 // should not block forever.
787 SaturateWithBusyTasks();
788
789 EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
790
791 UnblockBusyTasks();
792 UnblockBlockingTasks();
793 task_tracker_.FlushForTesting();
794 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
795 }
796
797 // Verify that SaturateWithBlockingTasks() of BEST_EFFORT tasks causes max best
798 // effort tasks to increase and creates a worker if needed. Also verify that
799 // UnblockBlockingTasks() decreases max best effort tasks after an increase.
TEST_P(ThreadGroupImplBlockingTest,ThreadBlockedUnblockedBestEffort)800 TEST_P(ThreadGroupImplBlockingTest, ThreadBlockedUnblockedBestEffort) {
801 CreateAndStartThreadGroup();
802
803 ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
804 ASSERT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
805
806 SaturateWithBlockingTasks(GetParam(), TaskPriority::BEST_EFFORT);
807
808 // Forces |kMaxTasks| extra workers to be instantiated by posting tasks. This
809 // should not block forever.
810 SaturateWithBusyTasks(TaskPriority::BEST_EFFORT);
811
812 EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
813
814 UnblockBusyTasks();
815 UnblockBlockingTasks();
816 task_tracker_.FlushForTesting();
817 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
818 EXPECT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
819 }
820
821 // Verify that flooding the thread group with more BEST_EFFORT tasks than
822 // kMaxBestEffortTasks doesn't prevent USER_VISIBLE tasks from running.
TEST_P(ThreadGroupImplBlockingTest,TooManyBestEffortTasks)823 TEST_P(ThreadGroupImplBlockingTest, TooManyBestEffortTasks) {
824 constexpr size_t kMaxBestEffortTasks = kMaxTasks / 2;
825
826 CreateAndStartThreadGroup(TimeDelta::Max(), kMaxTasks, kMaxBestEffortTasks);
827
828 TestWaitableEvent threads_continue;
829 {
830 TestWaitableEvent entered_blocking_scope;
831 RepeatingClosure entered_blocking_scope_barrier = BarrierClosure(
832 kMaxBestEffortTasks + 1, BindOnce(&TestWaitableEvent::Signal,
833 Unretained(&entered_blocking_scope)));
834 TestWaitableEvent exit_blocking_scope;
835
836 TestWaitableEvent threads_running;
837 RepeatingClosure threads_running_barrier = BarrierClosure(
838 kMaxBestEffortTasks + 1,
839 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
840
841 const auto best_effort_task_runner =
842 test::CreatePooledTaskRunner({TaskPriority::BEST_EFFORT, MayBlock()},
843 &mock_pooled_task_runner_delegate_);
844 for (size_t i = 0; i < kMaxBestEffortTasks + 1; ++i) {
845 best_effort_task_runner->PostTask(
846 FROM_HERE, BindLambdaForTesting([&]() {
847 {
848 NestedScopedBlockingCall scoped_blocking_call(GetParam());
849 entered_blocking_scope_barrier.Run();
850 exit_blocking_scope.Wait();
851 }
852 threads_running_barrier.Run();
853 threads_continue.Wait();
854 }));
855 }
856 entered_blocking_scope.Wait();
857 exit_blocking_scope.Signal();
858 threads_running.Wait();
859 }
860
861 // At this point, kMaxBestEffortTasks + 1 threads are running (plus
862 // potentially the idle thread), but max_task and max_best_effort_task are
863 // back to normal.
864 EXPECT_GE(thread_group_->NumberOfWorkersForTesting(),
865 kMaxBestEffortTasks + 1);
866 EXPECT_LE(thread_group_->NumberOfWorkersForTesting(),
867 kMaxBestEffortTasks + 2);
868 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
869
870 TestWaitableEvent threads_running;
871 task_runner_->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
872 threads_running.Signal();
873 threads_continue.Wait();
874 }));
875
876 // This should not block forever.
877 threads_running.Wait();
878
879 EXPECT_GE(thread_group_->NumberOfWorkersForTesting(),
880 kMaxBestEffortTasks + 2);
881 EXPECT_LE(thread_group_->NumberOfWorkersForTesting(),
882 kMaxBestEffortTasks + 3);
883 threads_continue.Signal();
884
885 task_tracker_.FlushForTesting();
886 }
887
888 // Verify that tasks posted in a saturated thread group before a
889 // ScopedBlockingCall will execute after ScopedBlockingCall is instantiated.
TEST_P(ThreadGroupImplBlockingTest,PostBeforeBlocking)890 TEST_P(ThreadGroupImplBlockingTest, PostBeforeBlocking) {
891 CreateAndStartThreadGroup();
892
893 TestWaitableEvent thread_running(WaitableEvent::ResetPolicy::AUTOMATIC);
894 TestWaitableEvent thread_can_block;
895 TestWaitableEvent threads_continue;
896
897 for (size_t i = 0; i < kMaxTasks; ++i) {
898 task_runner_->PostTask(
899 FROM_HERE,
900 BindOnce(
901 [](const NestedBlockingType& nested_blocking_type,
902 TestWaitableEvent* thread_running,
903 TestWaitableEvent* thread_can_block,
904 TestWaitableEvent* threads_continue) {
905 thread_running->Signal();
906 thread_can_block->Wait();
907
908 NestedScopedBlockingCall nested_scoped_blocking_call(
909 nested_blocking_type);
910 threads_continue->Wait();
911 },
912 GetParam(), Unretained(&thread_running),
913 Unretained(&thread_can_block), Unretained(&threads_continue)));
914 thread_running.Wait();
915 }
916
917 // All workers should be occupied and the thread group should be saturated.
918 // Workers have not entered ScopedBlockingCall yet.
919 EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), kMaxTasks);
920 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
921
922 TestWaitableEvent extra_threads_running;
923 TestWaitableEvent extra_threads_continue;
924 RepeatingClosure extra_threads_running_barrier = BarrierClosure(
925 kMaxTasks,
926 BindOnce(&TestWaitableEvent::Signal, Unretained(&extra_threads_running)));
927 for (size_t i = 0; i < kMaxTasks; ++i) {
928 task_runner_->PostTask(
929 FROM_HERE, BindOnce(
930 [](RepeatingClosure* extra_threads_running_barrier,
931 TestWaitableEvent* extra_threads_continue) {
932 extra_threads_running_barrier->Run();
933 extra_threads_continue->Wait();
934 },
935 Unretained(&extra_threads_running_barrier),
936 Unretained(&extra_threads_continue)));
937 }
938
939 // Allow tasks to enter ScopedBlockingCall. Workers should be created for the
940 // tasks we just posted.
941 thread_can_block.Signal();
942
943 // Should not block forever.
944 extra_threads_running.Wait();
945 EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
946 extra_threads_continue.Signal();
947
948 threads_continue.Signal();
949 task_tracker_.FlushForTesting();
950 }
951
952 // Verify that workers become idle when the thread group is over-capacity and
953 // that those workers do no work.
TEST_P(ThreadGroupImplBlockingTest,WorkersIdleWhenOverCapacity)954 TEST_P(ThreadGroupImplBlockingTest, WorkersIdleWhenOverCapacity) {
955 CreateAndStartThreadGroup();
956
957 ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
958
959 SaturateWithBlockingTasks(GetParam());
960
961 // Forces |kMaxTasks| extra workers to be instantiated by posting tasks.
962 SaturateWithBusyTasks();
963
964 ASSERT_EQ(thread_group_->NumberOfIdleWorkersForTesting(), 0U);
965 EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
966
967 AtomicFlag is_exiting;
968 // These tasks should not get executed until after other tasks become
969 // unblocked.
970 for (size_t i = 0; i < kMaxTasks; ++i) {
971 task_runner_->PostTask(FROM_HERE, BindOnce(
972 [](AtomicFlag* is_exiting) {
973 EXPECT_TRUE(is_exiting->IsSet());
974 },
975 Unretained(&is_exiting)));
976 }
977
978 // The original |kMaxTasks| will finish their tasks after being unblocked.
979 // There will be work in the work queue, but the thread group should now be
980 // over-capacity and workers will become idle.
981 UnblockBlockingTasks();
982 thread_group_->WaitForWorkersIdleForTesting(kMaxTasks);
983 EXPECT_EQ(thread_group_->NumberOfIdleWorkersForTesting(), kMaxTasks);
984
985 // Posting more tasks should not cause workers idle from the thread group
986 // being over capacity to begin doing work.
987 for (size_t i = 0; i < kMaxTasks; ++i) {
988 task_runner_->PostTask(FROM_HERE, BindOnce(
989 [](AtomicFlag* is_exiting) {
990 EXPECT_TRUE(is_exiting->IsSet());
991 },
992 Unretained(&is_exiting)));
993 }
994
995 // Give time for those idle workers to possibly do work (which should not
996 // happen).
997 PlatformThread::Sleep(TestTimeouts::tiny_timeout());
998
999 is_exiting.Set();
1000 // Unblocks the new workers.
1001 UnblockBusyTasks();
1002 task_tracker_.FlushForTesting();
1003 }
1004
1005 // Verify that an increase of max tasks with SaturateWithBlockingTasks()
1006 // increases the number of tasks that can run before ShouldYield returns true.
TEST_P(ThreadGroupImplBlockingTest,ThreadBlockedUnblockedShouldYield)1007 TEST_P(ThreadGroupImplBlockingTest, ThreadBlockedUnblockedShouldYield) {
1008 CreateAndStartThreadGroup();
1009
1010 ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1011
1012 EXPECT_FALSE(
1013 thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
1014 SaturateWithBlockingTasks(GetParam());
1015 EXPECT_FALSE(
1016 thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
1017
1018 // Forces |kMaxTasks| extra workers to be instantiated by posting tasks. This
1019 // should not block forever.
1020 SaturateWithBusyTasks();
1021
1022 // All tasks can run, hence ShouldYield returns false.
1023 EXPECT_FALSE(
1024 thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
1025
1026 // Post a USER_VISIBLE task that can't run since workers are saturated. This
1027 // should cause BEST_EFFORT tasks to yield.
1028 test::CreatePooledTaskRunner({TaskPriority::USER_VISIBLE},
1029 &mock_pooled_task_runner_delegate_)
1030 ->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
1031 EXPECT_FALSE(thread_group_->ShouldYield(
1032 {TaskPriority::BEST_EFFORT, TimeTicks()}));
1033 }));
1034 EXPECT_TRUE(
1035 thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
1036
1037 // Post a USER_BLOCKING task that can't run since workers are saturated. This
1038 // should cause USER_VISIBLE tasks to yield.
1039 test::CreatePooledTaskRunner({TaskPriority::USER_BLOCKING},
1040 &mock_pooled_task_runner_delegate_)
1041 ->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
1042 EXPECT_FALSE(thread_group_->ShouldYield(
1043 {TaskPriority::USER_VISIBLE, TimeTicks()}));
1044 }));
1045 EXPECT_TRUE(
1046 thread_group_->ShouldYield({TaskPriority::USER_VISIBLE, TimeTicks()}));
1047
1048 UnblockBusyTasks();
1049 UnblockBlockingTasks();
1050 task_tracker_.FlushForTesting();
1051 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1052 }
1053
1054 INSTANTIATE_TEST_SUITE_P(
1055 All,
1056 ThreadGroupImplBlockingTest,
1057 ::testing::Values(NestedBlockingType(BlockingType::MAY_BLOCK,
1058 OptionalBlockingType::NO_BLOCK,
1059 BlockingType::MAY_BLOCK),
1060 NestedBlockingType(BlockingType::WILL_BLOCK,
1061 OptionalBlockingType::NO_BLOCK,
1062 BlockingType::WILL_BLOCK),
1063 NestedBlockingType(BlockingType::MAY_BLOCK,
1064 OptionalBlockingType::WILL_BLOCK,
1065 BlockingType::WILL_BLOCK),
1066 NestedBlockingType(BlockingType::WILL_BLOCK,
1067 OptionalBlockingType::MAY_BLOCK,
1068 BlockingType::WILL_BLOCK)),
1069 ThreadGroupImplBlockingTest::ParamInfoToString);
1070
1071 // Verify that if a thread enters the scope of a MAY_BLOCK ScopedBlockingCall,
1072 // but exits the scope before the MayBlock threshold is reached, that the max
1073 // tasks does not increase.
TEST_F(ThreadGroupImplBlockingTest,ThreadBlockUnblockPremature)1074 TEST_F(ThreadGroupImplBlockingTest, ThreadBlockUnblockPremature) {
1075 // Create a thread group with an infinite MayBlock threshold so that a
1076 // MAY_BLOCK ScopedBlockingCall never increases the max tasks.
1077 CreateAndStartThreadGroup(TimeDelta::Max(), // |suggested_reclaim_time|
1078 kMaxTasks, // |max_tasks|
1079 std::nullopt, // |max_best_effort_tasks|
1080 nullptr, // |worker_observer|
1081 TimeDelta::Max()); // |may_block_threshold|
1082
1083 ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1084
1085 SaturateWithBlockingTasks(NestedBlockingType(BlockingType::MAY_BLOCK,
1086 OptionalBlockingType::NO_BLOCK,
1087 BlockingType::MAY_BLOCK));
1088 PlatformThread::Sleep(
1089 2 * thread_group_->blocked_workers_poll_period_for_testing());
1090 EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), kMaxTasks);
1091 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1092
1093 UnblockBlockingTasks();
1094 task_tracker_.FlushForTesting();
1095 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1096 }
1097
1098 // Verify that if a BEST_EFFORT task enters the scope of a WILL_BLOCK
1099 // ScopedBlockingCall, but exits the scope before the MayBlock threshold is
1100 // reached, that the max best effort tasks does not increase.
TEST_F(ThreadGroupImplBlockingTest,ThreadBlockUnblockPrematureBestEffort)1101 TEST_F(ThreadGroupImplBlockingTest, ThreadBlockUnblockPrematureBestEffort) {
1102 // Create a thread group with an infinite MayBlock threshold so that a
1103 // MAY_BLOCK ScopedBlockingCall never increases the max tasks.
1104 CreateAndStartThreadGroup(TimeDelta::Max(), // |suggested_reclaim_time|
1105 kMaxTasks, // |max_tasks|
1106 kMaxTasks, // |max_best_effort_tasks|
1107 nullptr, // |worker_observer|
1108 TimeDelta::Max()); // |may_block_threshold|
1109
1110 ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1111 ASSERT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
1112
1113 SaturateWithBlockingTasks(NestedBlockingType(BlockingType::WILL_BLOCK,
1114 OptionalBlockingType::NO_BLOCK,
1115 BlockingType::WILL_BLOCK),
1116 TaskPriority::BEST_EFFORT);
1117 PlatformThread::Sleep(
1118 2 * thread_group_->blocked_workers_poll_period_for_testing());
1119 EXPECT_GE(thread_group_->NumberOfWorkersForTesting(), kMaxTasks);
1120 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), 2 * kMaxTasks);
1121 EXPECT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
1122
1123 UnblockBlockingTasks();
1124 task_tracker_.FlushForTesting();
1125 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1126 EXPECT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
1127 }
1128
1129 // Verify that if max tasks is incremented because of a MAY_BLOCK
1130 // ScopedBlockingCall, it isn't incremented again when there is a nested
1131 // WILL_BLOCK ScopedBlockingCall.
TEST_F(ThreadGroupImplBlockingTest,MayBlockIncreaseCapacityNestedWillBlock)1132 TEST_F(ThreadGroupImplBlockingTest, MayBlockIncreaseCapacityNestedWillBlock) {
1133 CreateAndStartThreadGroup();
1134
1135 ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1136 auto task_runner =
1137 test::CreatePooledTaskRunner({MayBlock(), WithBaseSyncPrimitives()},
1138 &mock_pooled_task_runner_delegate_);
1139 TestWaitableEvent can_return;
1140
1141 // Saturate the thread group so that a MAY_BLOCK ScopedBlockingCall would
1142 // increment the max tasks.
1143 for (size_t i = 0; i < kMaxTasks - 1; ++i) {
1144 task_runner->PostTask(
1145 FROM_HERE, BindOnce(&TestWaitableEvent::Wait, Unretained(&can_return)));
1146 }
1147
1148 TestWaitableEvent can_instantiate_will_block;
1149 TestWaitableEvent did_instantiate_will_block;
1150
1151 // Post a task that instantiates a MAY_BLOCK ScopedBlockingCall.
1152 task_runner->PostTask(
1153 FROM_HERE,
1154 BindOnce(
1155 [](TestWaitableEvent* can_instantiate_will_block,
1156 TestWaitableEvent* did_instantiate_will_block,
1157 TestWaitableEvent* can_return) {
1158 ScopedBlockingCall may_block(FROM_HERE, BlockingType::MAY_BLOCK);
1159 can_instantiate_will_block->Wait();
1160 ScopedBlockingCall will_block(FROM_HERE, BlockingType::WILL_BLOCK);
1161 did_instantiate_will_block->Signal();
1162 can_return->Wait();
1163 },
1164 Unretained(&can_instantiate_will_block),
1165 Unretained(&did_instantiate_will_block), Unretained(&can_return)));
1166
1167 // After a short delay, max tasks should be incremented.
1168 ExpectMaxTasksIncreasesTo(kMaxTasks + 1);
1169
1170 // Wait until the task instantiates a WILL_BLOCK ScopedBlockingCall.
1171 can_instantiate_will_block.Signal();
1172 did_instantiate_will_block.Wait();
1173
1174 // Max tasks shouldn't be incremented again.
1175 EXPECT_EQ(kMaxTasks + 1, thread_group_->GetMaxTasksForTesting());
1176
1177 // Tear down.
1178 can_return.Signal();
1179 task_tracker_.FlushForTesting();
1180 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1181 }
1182
1183 // Verify that OnShutdownStarted() causes max tasks to increase and creates a
1184 // worker if needed. Also verify that UnblockBusyTasks() decreases max tasks
1185 // after an increase.
TEST_F(ThreadGroupImplBlockingTest,ThreadBusyShutdown)1186 TEST_F(ThreadGroupImplBlockingTest, ThreadBusyShutdown) {
1187 CreateAndStartThreadGroup();
1188 ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1189
1190 SaturateWithBusyTasks(TaskPriority::BEST_EFFORT,
1191 TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN);
1192 thread_group_->OnShutdownStarted();
1193
1194 // Forces |kMaxTasks| extra workers to be instantiated by posting tasks. This
1195 // should not block forever.
1196 SaturateWithBusyTasks(TaskPriority::BEST_EFFORT,
1197 TaskShutdownBehavior::BLOCK_SHUTDOWN);
1198
1199 EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
1200
1201 UnblockBusyTasks();
1202 task_tracker_.FlushForTesting();
1203 thread_group_->JoinForTesting();
1204 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1205 mock_pooled_task_runner_delegate_.SetThreadGroup(nullptr);
1206 thread_group_.reset();
1207 }
1208
1209 enum class ReclaimType { DELAYED_RECLAIM, NO_RECLAIM };
1210
1211 class ThreadGroupImplOverCapacityTest
1212 : public ThreadGroupImplImplTestBase,
1213 public testing::TestWithParam<ReclaimType> {
1214 public:
1215 ThreadGroupImplOverCapacityTest() = default;
1216 ThreadGroupImplOverCapacityTest(const ThreadGroupImplOverCapacityTest&) =
1217 delete;
1218 ThreadGroupImplOverCapacityTest& operator=(
1219 const ThreadGroupImplOverCapacityTest&) = delete;
1220
SetUp()1221 void SetUp() override {
1222 if (GetParam() == ReclaimType::NO_RECLAIM) {
1223 feature_list.InitAndEnableFeature(kNoWorkerThreadReclaim);
1224 }
1225 CreateThreadGroup();
1226 task_runner_ =
1227 test::CreatePooledTaskRunner({MayBlock(), WithBaseSyncPrimitives()},
1228 &mock_pooled_task_runner_delegate_);
1229 }
1230
TearDown()1231 void TearDown() override { ThreadGroupImplImplTestBase::CommonTearDown(); }
1232
1233 protected:
1234 base::test::ScopedFeatureList feature_list;
1235 scoped_refptr<TaskRunner> task_runner_;
1236 static constexpr size_t kLocalMaxTasks = 3;
1237
CreateThreadGroup()1238 void CreateThreadGroup() {
1239 ASSERT_FALSE(thread_group_);
1240 service_thread_.Start();
1241 delayed_task_manager_.Start(service_thread_.task_runner());
1242 thread_group_ = std::make_unique<ThreadGroupImpl>(
1243 "OverCapacityTestThreadGroup", "A", ThreadType::kDefault,
1244 task_tracker_.GetTrackedRef(), tracked_ref_factory_.GetTrackedRef());
1245 ASSERT_TRUE(thread_group_);
1246
1247 mock_pooled_task_runner_delegate_.SetThreadGroup(thread_group_.get());
1248 }
1249 };
1250
1251 // Verify that workers that become idle due to the thread group being over
1252 // capacity will eventually cleanup.
TEST_P(ThreadGroupImplOverCapacityTest,VerifyCleanup)1253 TEST_P(ThreadGroupImplOverCapacityTest, VerifyCleanup) {
1254 StartThreadGroup(kReclaimTimeForCleanupTests, kLocalMaxTasks);
1255 TestWaitableEvent threads_running;
1256 TestWaitableEvent threads_continue;
1257 RepeatingClosure threads_running_barrier = BarrierClosure(
1258 kLocalMaxTasks,
1259 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
1260
1261 TestWaitableEvent blocked_call_continue;
1262 RepeatingClosure closure = BindRepeating(
1263 [](RepeatingClosure* threads_running_barrier,
1264 TestWaitableEvent* threads_continue,
1265 TestWaitableEvent* blocked_call_continue) {
1266 threads_running_barrier->Run();
1267 {
1268 ScopedBlockingCall scoped_blocking_call(FROM_HERE,
1269 BlockingType::WILL_BLOCK);
1270 blocked_call_continue->Wait();
1271 }
1272 threads_continue->Wait();
1273 },
1274 Unretained(&threads_running_barrier), Unretained(&threads_continue),
1275 Unretained(&blocked_call_continue));
1276
1277 for (size_t i = 0; i < kLocalMaxTasks; ++i)
1278 task_runner_->PostTask(FROM_HERE, closure);
1279
1280 threads_running.Wait();
1281
1282 TestWaitableEvent extra_threads_running;
1283 TestWaitableEvent extra_threads_continue;
1284
1285 RepeatingClosure extra_threads_running_barrier = BarrierClosure(
1286 kLocalMaxTasks,
1287 BindOnce(&TestWaitableEvent::Signal, Unretained(&extra_threads_running)));
1288 // These tasks should run on the new threads from increasing max tasks.
1289 for (size_t i = 0; i < kLocalMaxTasks; ++i) {
1290 task_runner_->PostTask(
1291 FROM_HERE, BindOnce(
1292 [](RepeatingClosure* extra_threads_running_barrier,
1293 TestWaitableEvent* extra_threads_continue) {
1294 extra_threads_running_barrier->Run();
1295 extra_threads_continue->Wait();
1296 },
1297 Unretained(&extra_threads_running_barrier),
1298 Unretained(&extra_threads_continue)));
1299 }
1300 extra_threads_running.Wait();
1301
1302 ASSERT_EQ(kLocalMaxTasks * 2, thread_group_->NumberOfWorkersForTesting());
1303 EXPECT_EQ(kLocalMaxTasks * 2, thread_group_->GetMaxTasksForTesting());
1304 blocked_call_continue.Signal();
1305 extra_threads_continue.Signal();
1306
1307 // Periodically post tasks to ensure that posting tasks does not prevent
1308 // workers that are idle due to the thread group being over capacity from
1309 // cleaning up.
1310 for (int i = 0; i < 16; ++i) {
1311 task_runner_->PostDelayedTask(FROM_HERE, DoNothing(),
1312 kReclaimTimeForCleanupTests * i * 0.5);
1313 }
1314
1315 if (GetParam() == ReclaimType::DELAYED_RECLAIM) {
1316 // Note: one worker above capacity will not get cleaned up since it's on the
1317 // front of the idle set.
1318 thread_group_->WaitForWorkersCleanedUpForTesting(kLocalMaxTasks - 1);
1319 EXPECT_EQ(kLocalMaxTasks + 1, thread_group_->NumberOfWorkersForTesting());
1320 threads_continue.Signal();
1321 } else {
1322 // When workers are't automatically reclaimed after a delay, blocking tasks
1323 // need to return for extra workers to be cleaned up.
1324 threads_continue.Signal();
1325 thread_group_->WaitForWorkersCleanedUpForTesting(kLocalMaxTasks);
1326 EXPECT_EQ(kLocalMaxTasks, thread_group_->NumberOfWorkersForTesting());
1327 }
1328
1329 threads_continue.Signal();
1330 task_tracker_.FlushForTesting();
1331 }
1332
1333 INSTANTIATE_TEST_SUITE_P(ReclaimType,
1334 ThreadGroupImplOverCapacityTest,
1335 ::testing::Values(ReclaimType::DELAYED_RECLAIM,
1336 ReclaimType::NO_RECLAIM));
1337
1338 // Verify that the maximum number of workers is 256 and that hitting the max
1339 // leaves the thread group in a valid state with regards to max tasks.
TEST_F(ThreadGroupImplBlockingTest,MaximumWorkersTest)1340 TEST_F(ThreadGroupImplBlockingTest, MaximumWorkersTest) {
1341 CreateAndStartThreadGroup();
1342
1343 constexpr size_t kMaxNumberOfWorkers = 256;
1344 constexpr size_t kNumExtraTasks = 10;
1345
1346 TestWaitableEvent early_blocking_threads_running;
1347 RepeatingClosure early_threads_barrier_closure =
1348 BarrierClosure(kMaxNumberOfWorkers,
1349 BindOnce(&TestWaitableEvent::Signal,
1350 Unretained(&early_blocking_threads_running)));
1351
1352 TestWaitableEvent early_threads_finished;
1353 RepeatingClosure early_threads_finished_barrier = BarrierClosure(
1354 kMaxNumberOfWorkers, BindOnce(&TestWaitableEvent::Signal,
1355 Unretained(&early_threads_finished)));
1356
1357 TestWaitableEvent early_release_threads_continue;
1358
1359 // Post ScopedBlockingCall tasks to hit the worker cap.
1360 for (size_t i = 0; i < kMaxNumberOfWorkers; ++i) {
1361 task_runner_->PostTask(
1362 FROM_HERE, BindOnce(
1363 [](RepeatingClosure* early_threads_barrier_closure,
1364 TestWaitableEvent* early_release_threads_continue,
1365 RepeatingClosure* early_threads_finished) {
1366 {
1367 ScopedBlockingCall scoped_blocking_call(
1368 FROM_HERE, BlockingType::WILL_BLOCK);
1369 early_threads_barrier_closure->Run();
1370 early_release_threads_continue->Wait();
1371 }
1372 early_threads_finished->Run();
1373 },
1374 Unretained(&early_threads_barrier_closure),
1375 Unretained(&early_release_threads_continue),
1376 Unretained(&early_threads_finished_barrier)));
1377 }
1378
1379 early_blocking_threads_running.Wait();
1380 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(),
1381 kMaxTasks + kMaxNumberOfWorkers);
1382
1383 TestWaitableEvent late_release_thread_contine;
1384 TestWaitableEvent late_blocking_threads_running;
1385
1386 RepeatingClosure late_threads_barrier_closure = BarrierClosure(
1387 kNumExtraTasks, BindOnce(&TestWaitableEvent::Signal,
1388 Unretained(&late_blocking_threads_running)));
1389
1390 // Posts additional tasks. Note: we should already have |kMaxNumberOfWorkers|
1391 // tasks running. These tasks should not be able to get executed yet as the
1392 // thread group is already at its max worker cap.
1393 for (size_t i = 0; i < kNumExtraTasks; ++i) {
1394 task_runner_->PostTask(
1395 FROM_HERE, BindOnce(
1396 [](RepeatingClosure* late_threads_barrier_closure,
1397 TestWaitableEvent* late_release_thread_contine) {
1398 ScopedBlockingCall scoped_blocking_call(
1399 FROM_HERE, BlockingType::WILL_BLOCK);
1400 late_threads_barrier_closure->Run();
1401 late_release_thread_contine->Wait();
1402 },
1403 Unretained(&late_threads_barrier_closure),
1404 Unretained(&late_release_thread_contine)));
1405 }
1406
1407 // Give time to see if we exceed the max number of workers.
1408 PlatformThread::Sleep(TestTimeouts::tiny_timeout());
1409 EXPECT_LE(thread_group_->NumberOfWorkersForTesting(), kMaxNumberOfWorkers);
1410
1411 early_release_threads_continue.Signal();
1412 early_threads_finished.Wait();
1413 late_blocking_threads_running.Wait();
1414
1415 TestWaitableEvent final_tasks_running;
1416 TestWaitableEvent final_tasks_continue;
1417 RepeatingClosure final_tasks_running_barrier = BarrierClosure(
1418 kMaxTasks,
1419 BindOnce(&TestWaitableEvent::Signal, Unretained(&final_tasks_running)));
1420
1421 // Verify that we are still able to saturate the thread group.
1422 for (size_t i = 0; i < kMaxTasks; ++i) {
1423 task_runner_->PostTask(FROM_HERE,
1424 BindOnce(
1425 [](RepeatingClosure* closure,
1426 TestWaitableEvent* final_tasks_continue) {
1427 closure->Run();
1428 final_tasks_continue->Wait();
1429 },
1430 Unretained(&final_tasks_running_barrier),
1431 Unretained(&final_tasks_continue)));
1432 }
1433 final_tasks_running.Wait();
1434 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks + kNumExtraTasks);
1435 late_release_thread_contine.Signal();
1436 final_tasks_continue.Signal();
1437 task_tracker_.FlushForTesting();
1438 }
1439
1440 // Verify that the maximum number of best-effort tasks that can run concurrently
1441 // is honored.
TEST_F(ThreadGroupImplImplStartInBodyTest,MaxBestEffortTasks)1442 TEST_F(ThreadGroupImplImplStartInBodyTest, MaxBestEffortTasks) {
1443 constexpr int kMaxBestEffortTasks = kMaxTasks / 2;
1444 StartThreadGroup(TimeDelta::Max(), // |suggested_reclaim_time|
1445 kMaxTasks, // |max_tasks|
1446 kMaxBestEffortTasks); // |max_best_effort_tasks|
1447 const scoped_refptr<TaskRunner> foreground_runner =
1448 test::CreatePooledTaskRunner({MayBlock()},
1449 &mock_pooled_task_runner_delegate_);
1450 const scoped_refptr<TaskRunner> background_runner =
1451 test::CreatePooledTaskRunner({TaskPriority::BEST_EFFORT, MayBlock()},
1452 &mock_pooled_task_runner_delegate_);
1453
1454 // It should be possible to have |kMaxBestEffortTasks|
1455 // TaskPriority::BEST_EFFORT tasks running concurrently.
1456 TestWaitableEvent best_effort_tasks_running;
1457 TestWaitableEvent unblock_best_effort_tasks;
1458 RepeatingClosure best_effort_tasks_running_barrier = BarrierClosure(
1459 kMaxBestEffortTasks, BindOnce(&TestWaitableEvent::Signal,
1460 Unretained(&best_effort_tasks_running)));
1461
1462 for (int i = 0; i < kMaxBestEffortTasks; ++i) {
1463 background_runner->PostTask(
1464 FROM_HERE, base::BindLambdaForTesting([&]() {
1465 best_effort_tasks_running_barrier.Run();
1466 unblock_best_effort_tasks.Wait();
1467 }));
1468 }
1469 best_effort_tasks_running.Wait();
1470
1471 // No more TaskPriority::BEST_EFFORT task should run.
1472 AtomicFlag extra_best_effort_task_can_run;
1473 TestWaitableEvent extra_best_effort_task_running;
1474 background_runner->PostTask(
1475 FROM_HERE, base::BindLambdaForTesting([&]() {
1476 EXPECT_TRUE(extra_best_effort_task_can_run.IsSet());
1477 extra_best_effort_task_running.Signal();
1478 }));
1479
1480 // An extra foreground task should be able to run.
1481 TestWaitableEvent foreground_task_running;
1482 foreground_runner->PostTask(
1483 FROM_HERE, base::BindOnce(&TestWaitableEvent::Signal,
1484 Unretained(&foreground_task_running)));
1485 foreground_task_running.Wait();
1486
1487 // Completion of the TaskPriority::BEST_EFFORT tasks should allow the extra
1488 // TaskPriority::BEST_EFFORT task to run.
1489 extra_best_effort_task_can_run.Set();
1490 unblock_best_effort_tasks.Signal();
1491 extra_best_effort_task_running.Wait();
1492
1493 // Wait for all tasks to complete before exiting to avoid invalid accesses.
1494 task_tracker_.FlushForTesting();
1495 }
1496
1497 // Verify that flooding the thread group with BEST_EFFORT tasks doesn't cause
1498 // the creation of more than |max_best_effort_tasks| + 1 workers.
TEST_F(ThreadGroupImplImplStartInBodyTest,FloodBestEffortTasksDoesNotCreateTooManyWorkers)1499 TEST_F(ThreadGroupImplImplStartInBodyTest,
1500 FloodBestEffortTasksDoesNotCreateTooManyWorkers) {
1501 constexpr size_t kMaxBestEffortTasks = kMaxTasks / 2;
1502 StartThreadGroup(TimeDelta::Max(), // |suggested_reclaim_time|
1503 kMaxTasks, // |max_tasks|
1504 kMaxBestEffortTasks); // |max_best_effort_tasks|
1505
1506 const scoped_refptr<TaskRunner> runner =
1507 test::CreatePooledTaskRunner({TaskPriority::BEST_EFFORT, MayBlock()},
1508 &mock_pooled_task_runner_delegate_);
1509
1510 for (size_t i = 0; i < kLargeNumber; ++i) {
1511 runner->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
1512 EXPECT_LE(thread_group_->NumberOfWorkersForTesting(),
1513 kMaxBestEffortTasks + 1);
1514 }));
1515 }
1516
1517 // Wait for all tasks to complete before exiting to avoid invalid accesses.
1518 task_tracker_.FlushForTesting();
1519 }
1520
1521 // Previously, a WILL_BLOCK ScopedBlockingCall unconditionally woke up a worker
1522 // if the priority queue was non-empty. Sometimes, that caused multiple workers
1523 // to be woken up for the same sequence. This test verifies that it is no longer
1524 // the case:
1525 // 1. Post and run task A.
1526 // 2. Post task B from task A.
1527 // 3. Task A enters a WILL_BLOCK ScopedBlockingCall. Once the idle thread is
1528 // created, this should no-op because there are already enough workers
1529 // (previously, a worker would be woken up because the priority queue isn't
1530 // empty).
1531 // 5. Wait for all tasks to complete.
TEST_F(ThreadGroupImplImplStartInBodyTest,RepeatedWillBlockDoesNotCreateTooManyWorkers)1532 TEST_F(ThreadGroupImplImplStartInBodyTest,
1533 RepeatedWillBlockDoesNotCreateTooManyWorkers) {
1534 constexpr size_t kNumWorkers = 2U;
1535 StartThreadGroup(TimeDelta::Max(), // |suggested_reclaim_time|
1536 kNumWorkers, // |max_tasks|
1537 std::nullopt); // |max_best_effort_tasks|
1538 const scoped_refptr<TaskRunner> runner = test::CreatePooledTaskRunner(
1539 {MayBlock()}, &mock_pooled_task_runner_delegate_);
1540
1541 for (size_t i = 0; i < kLargeNumber; ++i) {
1542 runner->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
1543 runner->PostTask(
1544 FROM_HERE, BindLambdaForTesting([&]() {
1545 EXPECT_LE(
1546 thread_group_->NumberOfWorkersForTesting(),
1547 kNumWorkers + 1);
1548 }));
1549 // Number of workers should not increase when there is
1550 // enough capacity to accommodate queued and running
1551 // sequences.
1552 ScopedBlockingCall scoped_blocking_call(
1553 FROM_HERE, BlockingType::WILL_BLOCK);
1554 EXPECT_EQ(kNumWorkers + 1,
1555 thread_group_->NumberOfWorkersForTesting());
1556 }));
1557 // Wait for all tasks to complete.
1558 task_tracker_.FlushForTesting();
1559 }
1560 }
1561
1562 namespace {
1563
1564 class ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest
1565 : public ThreadGroupImplImplTestBase,
1566 public testing::TestWithParam<BlockingType> {
1567 public:
1568 static constexpr int kMaxBestEffortTasks = kMaxTasks / 2;
1569
1570 ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest() = default;
1571 ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest(
1572 const ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest&) = delete;
1573 ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest& operator=(
1574 const ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest&) = delete;
1575
SetUp()1576 void SetUp() override {
1577 CreateThreadGroup();
1578 thread_group_->Start(kMaxTasks, kMaxBestEffortTasks, base::TimeDelta::Max(),
1579 service_thread_.task_runner(), nullptr,
1580 ThreadGroup::WorkerEnvironment::NONE,
1581 /*synchronous_thread_start_for_testing=*/false,
1582 /*may_block_threshold=*/{});
1583 }
1584
TearDown()1585 void TearDown() override { ThreadGroupImplImplTestBase::CommonTearDown(); }
1586
1587 private:
1588 };
1589
1590 } // namespace
1591
TEST_P(ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest,BlockingCallAndMaxBestEffortTasksTest)1592 TEST_P(ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest,
1593 BlockingCallAndMaxBestEffortTasksTest) {
1594 const scoped_refptr<TaskRunner> background_runner =
1595 test::CreatePooledTaskRunner({TaskPriority::BEST_EFFORT, MayBlock()},
1596 &mock_pooled_task_runner_delegate_);
1597
1598 // Post |kMaxBestEffortTasks| TaskPriority::BEST_EFFORT tasks that block in a
1599 // ScopedBlockingCall.
1600 TestWaitableEvent blocking_best_effort_tasks_running;
1601 TestWaitableEvent unblock_blocking_best_effort_tasks;
1602 RepeatingClosure blocking_best_effort_tasks_running_barrier =
1603 BarrierClosure(kMaxBestEffortTasks,
1604 BindOnce(&TestWaitableEvent::Signal,
1605 Unretained(&blocking_best_effort_tasks_running)));
1606 for (int i = 0; i < kMaxBestEffortTasks; ++i) {
1607 background_runner->PostTask(
1608 FROM_HERE, base::BindLambdaForTesting([&]() {
1609 blocking_best_effort_tasks_running_barrier.Run();
1610 ScopedBlockingCall scoped_blocking_call(FROM_HERE, GetParam());
1611 unblock_blocking_best_effort_tasks.Wait();
1612 }));
1613 }
1614 blocking_best_effort_tasks_running.Wait();
1615
1616 // Post an extra |kMaxBestEffortTasks| TaskPriority::BEST_EFFORT tasks. They
1617 // should be able to run, because the existing TaskPriority::BEST_EFFORT tasks
1618 // are blocked within a ScopedBlockingCall.
1619 //
1620 // Note: We block the tasks until they have all started running to make sure
1621 // that it is possible to run an extra |kMaxBestEffortTasks| concurrently.
1622 TestWaitableEvent best_effort_tasks_running;
1623 TestWaitableEvent unblock_best_effort_tasks;
1624 RepeatingClosure best_effort_tasks_running_barrier = BarrierClosure(
1625 kMaxBestEffortTasks, BindOnce(&TestWaitableEvent::Signal,
1626 Unretained(&best_effort_tasks_running)));
1627 for (int i = 0; i < kMaxBestEffortTasks; ++i) {
1628 background_runner->PostTask(
1629 FROM_HERE, base::BindLambdaForTesting([&]() {
1630 best_effort_tasks_running_barrier.Run();
1631 unblock_best_effort_tasks.Wait();
1632 }));
1633 }
1634 best_effort_tasks_running.Wait();
1635
1636 // Unblock all tasks and tear down.
1637 unblock_blocking_best_effort_tasks.Signal();
1638 unblock_best_effort_tasks.Signal();
1639 task_tracker_.FlushForTesting();
1640 }
1641
1642 INSTANTIATE_TEST_SUITE_P(MayBlock,
1643 ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest,
1644 ::testing::Values(BlockingType::MAY_BLOCK));
1645 INSTANTIATE_TEST_SUITE_P(WillBlock,
1646 ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest,
1647 ::testing::Values(BlockingType::WILL_BLOCK));
1648
1649 // Verify that worker detachment doesn't race with worker cleanup, regression
1650 // test for https://crbug.com/810464.
TEST_F(ThreadGroupImplImplStartInBodyTest,RacyCleanup)1651 TEST_F(ThreadGroupImplImplStartInBodyTest, RacyCleanup) {
1652 constexpr size_t kLocalMaxTasks = 256;
1653 constexpr TimeDelta kReclaimTimeForRacyCleanupTest = Milliseconds(10);
1654
1655 thread_group_->Start(kLocalMaxTasks, kLocalMaxTasks,
1656 kReclaimTimeForRacyCleanupTest,
1657 service_thread_.task_runner(), nullptr,
1658 ThreadGroup::WorkerEnvironment::NONE,
1659 /*synchronous_thread_start_for_testing=*/false,
1660 /*may_block_threshold=*/{});
1661
1662 scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
1663 {WithBaseSyncPrimitives()}, &mock_pooled_task_runner_delegate_);
1664
1665 TestWaitableEvent threads_running;
1666 TestWaitableEvent unblock_threads;
1667 RepeatingClosure threads_running_barrier = BarrierClosure(
1668 kLocalMaxTasks,
1669 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
1670
1671 for (size_t i = 0; i < kLocalMaxTasks; ++i) {
1672 task_runner->PostTask(
1673 FROM_HERE,
1674 BindOnce(
1675 [](OnceClosure on_running, TestWaitableEvent* unblock_threads) {
1676 std::move(on_running).Run();
1677 unblock_threads->Wait();
1678 },
1679 threads_running_barrier, Unretained(&unblock_threads)));
1680 }
1681
1682 // Wait for all workers to be ready and release them all at once.
1683 threads_running.Wait();
1684 unblock_threads.Signal();
1685
1686 // Sleep to wakeup precisely when all workers are going to try to cleanup per
1687 // being idle.
1688 PlatformThread::Sleep(kReclaimTimeForRacyCleanupTest);
1689
1690 thread_group_->JoinForTesting();
1691
1692 // Unwinding this test will be racy if worker cleanup can race with
1693 // ThreadGroupImpl destruction : https://crbug.com/810464.
1694 mock_pooled_task_runner_delegate_.SetThreadGroup(nullptr);
1695 thread_group_.reset();
1696 }
1697
1698 } // namespace internal
1699 } // namespace base
1700