1 // Copyright 2017 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/thread_pool/thread_group.h"
6
7 #include <memory>
8 #include <tuple>
9 #include <utility>
10
11 #include "base/barrier_closure.h"
12 #include "base/functional/bind.h"
13 #include "base/functional/callback_helpers.h"
14 #include "base/location.h"
15 #include "base/memory/ref_counted.h"
16 #include "base/task/task_runner.h"
17 #include "base/task/task_traits.h"
18 #include "base/task/thread_pool/can_run_policy_test.h"
19 #include "base/task/thread_pool/delayed_task_manager.h"
20 #include "base/task/thread_pool/pooled_sequenced_task_runner.h"
21 #include "base/task/thread_pool/task_tracker.h"
22 #include "base/task/thread_pool/test_task_factory.h"
23 #include "base/task/thread_pool/test_utils.h"
24 #include "base/task/thread_pool/thread_group_impl.h"
25 #include "base/test/bind.h"
26 #include "base/test/test_timeouts.h"
27 #include "base/test/test_waitable_event.h"
28 #include "base/threading/platform_thread.h"
29 #include "base/threading/scoped_blocking_call.h"
30 #include "base/threading/scoped_blocking_call_internal.h"
31 #include "base/threading/simple_thread.h"
32 #include "base/threading/thread.h"
33 #include "base/threading/thread_restrictions.h"
34 #include "build/build_config.h"
35 #include "testing/gtest/include/gtest/gtest.h"
36
37 #if BUILDFLAG(IS_WIN)
38 #include "base/win/com_init_check_hook.h"
39 #include "base/win/com_init_util.h"
40 #endif
41
42 namespace base {
43 namespace internal {
44
45 namespace {
46
47 constexpr size_t kMaxTasks = 4;
48 constexpr size_t kTooManyTasks = 1000;
49 // By default, tests allow half of the thread group to be used by best-effort
50 // tasks.
51 constexpr size_t kMaxBestEffortTasks = kMaxTasks / 2;
52 constexpr size_t kNumThreadsPostingTasks = 4;
53 constexpr size_t kNumTasksPostedPerThread = 150;
54
55 using PostNestedTask = test::TestTaskFactory::PostNestedTask;
56
57 class ThreadPostingTasks : public SimpleThread {
58 public:
59 // Constructs a thread that posts |num_tasks_posted_per_thread| tasks to
60 // |thread_group| through an |execution_mode| task runner. If
61 // |post_nested_task| is YES, each task posted by this thread posts another
62 // task when it runs.
ThreadPostingTasks(test::MockPooledTaskRunnerDelegate * mock_pooled_task_runner_delegate_,TaskSourceExecutionMode execution_mode,PostNestedTask post_nested_task)63 ThreadPostingTasks(
64 test::MockPooledTaskRunnerDelegate* mock_pooled_task_runner_delegate_,
65 TaskSourceExecutionMode execution_mode,
66 PostNestedTask post_nested_task)
67 : SimpleThread("ThreadPostingTasks"),
68 post_nested_task_(post_nested_task),
69 factory_(test::CreatePooledTaskRunnerWithExecutionMode(
70 execution_mode,
71 mock_pooled_task_runner_delegate_),
72 execution_mode) {}
73 ThreadPostingTasks(const ThreadPostingTasks&) = delete;
74 ThreadPostingTasks& operator=(const ThreadPostingTasks&) = delete;
75
factory() const76 const test::TestTaskFactory* factory() const { return &factory_; }
77
78 private:
Run()79 void Run() override {
80 for (size_t i = 0; i < kNumTasksPostedPerThread; ++i)
81 EXPECT_TRUE(factory_.PostTask(post_nested_task_, OnceClosure()));
82 }
83
84 const scoped_refptr<TaskRunner> task_runner_;
85 const PostNestedTask post_nested_task_;
86 test::TestTaskFactory factory_;
87 };
88
89 class ThreadGroupTestBase : public testing::Test, public ThreadGroup::Delegate {
90 public:
91 ThreadGroupTestBase(const ThreadGroupTestBase&) = delete;
92 ThreadGroupTestBase& operator=(const ThreadGroupTestBase&) = delete;
93
94 protected:
95 ThreadGroupTestBase() = default;
96
SetUp()97 void SetUp() override {
98 service_thread_.Start();
99 delayed_task_manager_.Start(service_thread_.task_runner());
100 CreateThreadGroup();
101 }
102
TearDown()103 void TearDown() override {
104 delayed_task_manager_.Shutdown();
105 service_thread_.Stop();
106 DestroyThreadGroup();
107 }
108
CreateThreadGroup()109 void CreateThreadGroup() {
110 ASSERT_FALSE(thread_group_);
111 thread_group_ = std::make_unique<ThreadGroupImpl>(
112 "TestThreadGroup", "A", ThreadType::kDefault,
113 task_tracker_.GetTrackedRef(), tracked_ref_factory_.GetTrackedRef());
114
115 mock_pooled_task_runner_delegate_.SetThreadGroup(thread_group_.get());
116 }
117
StartThreadGroup(ThreadGroup::WorkerEnvironment worker_environment=ThreadGroup::WorkerEnvironment::NONE)118 void StartThreadGroup(ThreadGroup::WorkerEnvironment worker_environment =
119 ThreadGroup::WorkerEnvironment::NONE) {
120 ASSERT_TRUE(thread_group_);
121 ThreadGroupImpl* thread_group_impl =
122 static_cast<ThreadGroupImpl*>(thread_group_.get());
123 thread_group_impl->Start(kMaxTasks, kMaxBestEffortTasks, TimeDelta::Max(),
124 service_thread_.task_runner(), nullptr,
125 worker_environment,
126 /*synchronous_thread_start_for_testing=*/false,
127 /*may_block_threshold=*/{});
128 }
129
DestroyThreadGroup()130 void DestroyThreadGroup() {
131 if (!thread_group_) {
132 return;
133 }
134
135 thread_group_->JoinForTesting();
136 mock_pooled_task_runner_delegate_.SetThreadGroup(nullptr);
137 thread_group_.reset();
138 }
139
140 Thread service_thread_{"ThreadPoolServiceThread"};
141 TaskTracker task_tracker_;
142 DelayedTaskManager delayed_task_manager_;
143 test::MockPooledTaskRunnerDelegate mock_pooled_task_runner_delegate_ = {
144 task_tracker_.GetTrackedRef(), &delayed_task_manager_};
145
146 std::unique_ptr<ThreadGroup> thread_group_;
147
148 private:
149 // ThreadGroup::Delegate:
GetThreadGroupForTraits(const TaskTraits & traits)150 ThreadGroup* GetThreadGroupForTraits(const TaskTraits& traits) override {
151 return thread_group_.get();
152 }
153
154 TrackedRefFactory<ThreadGroup::Delegate> tracked_ref_factory_{this};
155 };
156
157 using ThreadGroupTest = ThreadGroupTestBase;
158
159 // TODO(etiennep): Audit tests that don't need TaskSourceExecutionMode
160 // parameter.
161 class ThreadGroupTestAllExecutionModes
162 : public ThreadGroupTestBase,
163 public testing::WithParamInterface<TaskSourceExecutionMode> {
164 public:
165 ThreadGroupTestAllExecutionModes() = default;
166 ThreadGroupTestAllExecutionModes(const ThreadGroupTestAllExecutionModes&) =
167 delete;
168 ThreadGroupTestAllExecutionModes& operator=(
169 const ThreadGroupTestAllExecutionModes&) = delete;
170
execution_mode() const171 TaskSourceExecutionMode execution_mode() const { return GetParam(); }
172
CreatePooledTaskRunner(const TaskTraits & traits={})173 scoped_refptr<TaskRunner> CreatePooledTaskRunner(
174 const TaskTraits& traits = {}) {
175 return test::CreatePooledTaskRunnerWithExecutionMode(
176 execution_mode(), &mock_pooled_task_runner_delegate_, traits);
177 }
178 };
179
ShouldNotRun()180 void ShouldNotRun() {
181 ADD_FAILURE() << "Ran a task that shouldn't run.";
182 }
183
184 } // namespace
185
TEST_P(ThreadGroupTestAllExecutionModes,PostTasks)186 TEST_P(ThreadGroupTestAllExecutionModes, PostTasks) {
187 StartThreadGroup();
188 // Create threads to post tasks.
189 std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks;
190 for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
191 threads_posting_tasks.push_back(std::make_unique<ThreadPostingTasks>(
192 &mock_pooled_task_runner_delegate_, execution_mode(),
193 PostNestedTask::NO));
194 threads_posting_tasks.back()->Start();
195 }
196
197 // Wait for all tasks to run.
198 for (const auto& thread_posting_tasks : threads_posting_tasks) {
199 thread_posting_tasks->Join();
200 thread_posting_tasks->factory()->WaitForAllTasksToRun();
201 }
202
203 // Flush the task tracker to be sure that no task accesses its TestTaskFactory
204 // after |thread_posting_tasks| is destroyed.
205 task_tracker_.FlushForTesting();
206 }
207
TEST_P(ThreadGroupTestAllExecutionModes,NestedPostTasks)208 TEST_P(ThreadGroupTestAllExecutionModes, NestedPostTasks) {
209 StartThreadGroup();
210 // Create threads to post tasks. Each task posted by these threads will post
211 // another task when it runs.
212 std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks;
213 for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
214 threads_posting_tasks.push_back(std::make_unique<ThreadPostingTasks>(
215 &mock_pooled_task_runner_delegate_, execution_mode(),
216 PostNestedTask::YES));
217 threads_posting_tasks.back()->Start();
218 }
219
220 // Wait for all tasks to run.
221 for (const auto& thread_posting_tasks : threads_posting_tasks) {
222 thread_posting_tasks->Join();
223 thread_posting_tasks->factory()->WaitForAllTasksToRun();
224 }
225
226 // Flush the task tracker to be sure that no task accesses its TestTaskFactory
227 // after |thread_posting_tasks| is destroyed.
228 task_tracker_.FlushForTesting();
229 }
230
231 // Verify that a Task can't be posted after shutdown.
TEST_P(ThreadGroupTestAllExecutionModes,PostTaskAfterShutdown)232 TEST_P(ThreadGroupTestAllExecutionModes, PostTaskAfterShutdown) {
233 StartThreadGroup();
234 auto task_runner = CreatePooledTaskRunner();
235 test::ShutdownTaskTracker(&task_tracker_);
236 EXPECT_FALSE(task_runner->PostTask(FROM_HERE, BindOnce(&ShouldNotRun)));
237 }
238
239 // Verify that a Task runs shortly after its delay expires.
TEST_P(ThreadGroupTestAllExecutionModes,PostDelayedTask)240 TEST_P(ThreadGroupTestAllExecutionModes, PostDelayedTask) {
241 StartThreadGroup();
242 // kJob doesn't support delays.
243 if (execution_mode() == TaskSourceExecutionMode::kJob)
244 return;
245
246 TestWaitableEvent task_ran(WaitableEvent::ResetPolicy::AUTOMATIC);
247 auto task_runner = CreatePooledTaskRunner();
248
249 // Wait until the task runner is up and running to make sure the test below is
250 // solely timing the delayed task, not bringing up a physical thread.
251 task_runner->PostTask(
252 FROM_HERE, BindOnce(&TestWaitableEvent::Signal, Unretained(&task_ran)));
253 task_ran.Wait();
254 ASSERT_TRUE(!task_ran.IsSignaled());
255
256 // Post a task with a short delay.
257 const TimeTicks start_time = TimeTicks::Now();
258 EXPECT_TRUE(task_runner->PostDelayedTask(
259 FROM_HERE, BindOnce(&TestWaitableEvent::Signal, Unretained(&task_ran)),
260 TestTimeouts::tiny_timeout()));
261
262 // Wait until the task runs.
263 task_ran.Wait();
264
265 // Expect the task to run after its delay expires, but no more than a
266 // reasonable amount of time after that (overloaded bots can be slow sometimes
267 // so give it 10X flexibility).
268 const TimeDelta actual_delay = TimeTicks::Now() - start_time;
269 EXPECT_GE(actual_delay, TestTimeouts::tiny_timeout());
270 EXPECT_LT(actual_delay, 10 * TestTimeouts::tiny_timeout());
271 }
272
273 // Verify that the RunsTasksInCurrentSequence() method of a SEQUENCED TaskRunner
274 // returns false when called from a task that isn't part of the sequence. Note:
275 // Tests that use TestTaskFactory already verify that
276 // RunsTasksInCurrentSequence() returns true when appropriate so this method
277 // complements it to get full coverage of that method.
TEST_P(ThreadGroupTestAllExecutionModes,SequencedRunsTasksInCurrentSequence)278 TEST_P(ThreadGroupTestAllExecutionModes, SequencedRunsTasksInCurrentSequence) {
279 StartThreadGroup();
280 auto task_runner = CreatePooledTaskRunner();
281 auto sequenced_task_runner = test::CreatePooledSequencedTaskRunner(
282 TaskTraits(), &mock_pooled_task_runner_delegate_);
283
284 TestWaitableEvent task_ran;
285 task_runner->PostTask(
286 FROM_HERE,
287 BindOnce(
288 [](scoped_refptr<SequencedTaskRunner> sequenced_task_runner,
289 TestWaitableEvent* task_ran) {
290 EXPECT_FALSE(sequenced_task_runner->RunsTasksInCurrentSequence());
291 task_ran->Signal();
292 },
293 sequenced_task_runner, Unretained(&task_ran)));
294 task_ran.Wait();
295 }
296
297 // Verify that tasks posted before Start run after Start.
TEST_P(ThreadGroupTestAllExecutionModes,PostBeforeStart)298 TEST_P(ThreadGroupTestAllExecutionModes, PostBeforeStart) {
299 TestWaitableEvent task_1_running;
300 TestWaitableEvent task_2_running;
301
302 auto task_runner = CreatePooledTaskRunner();
303 task_runner->PostTask(FROM_HERE, BindOnce(&TestWaitableEvent::Signal,
304 Unretained(&task_1_running)));
305 task_runner->PostTask(FROM_HERE, BindOnce(&TestWaitableEvent::Signal,
306 Unretained(&task_2_running)));
307
308 // Workers should not be created and tasks should not run before the thread
309 // group is started. The sleep is to give time for the tasks to potentially
310 // run.
311 PlatformThread::Sleep(TestTimeouts::tiny_timeout());
312 EXPECT_FALSE(task_1_running.IsSignaled());
313 EXPECT_FALSE(task_2_running.IsSignaled());
314
315 StartThreadGroup();
316
317 // Tasks should run shortly after the thread group is started.
318 task_1_running.Wait();
319 task_2_running.Wait();
320
321 task_tracker_.FlushForTesting();
322 }
323
324 // Verify that tasks only run when allowed by the CanRunPolicy.
TEST_P(ThreadGroupTestAllExecutionModes,CanRunPolicyBasic)325 TEST_P(ThreadGroupTestAllExecutionModes, CanRunPolicyBasic) {
326 StartThreadGroup();
327 test::TestCanRunPolicyBasic(
328 thread_group_.get(),
329 [this](TaskPriority priority) {
330 return CreatePooledTaskRunner({priority});
331 },
332 &task_tracker_);
333 }
334
TEST_F(ThreadGroupTest,CanRunPolicyUpdatedBeforeRun)335 TEST_F(ThreadGroupTest, CanRunPolicyUpdatedBeforeRun) {
336 StartThreadGroup();
337 // This test only works with SequencedTaskRunner become it assumes
338 // ordered execution of 2 posted tasks.
339 test::TestCanRunPolicyChangedBeforeRun(
340 thread_group_.get(),
341 [this](TaskPriority priority) {
342 return test::CreatePooledSequencedTaskRunner(
343 {priority}, &mock_pooled_task_runner_delegate_);
344 },
345 &task_tracker_);
346 }
347
TEST_P(ThreadGroupTestAllExecutionModes,CanRunPolicyLoad)348 TEST_P(ThreadGroupTestAllExecutionModes, CanRunPolicyLoad) {
349 StartThreadGroup();
350 test::TestCanRunPolicyLoad(
351 thread_group_.get(),
352 [this](TaskPriority priority) {
353 return CreatePooledTaskRunner({priority});
354 },
355 &task_tracker_);
356 }
357
358 // Verifies that ShouldYield() returns true for a priority that is not allowed
359 // to run by the CanRunPolicy.
TEST_F(ThreadGroupTest,CanRunPolicyShouldYield)360 TEST_F(ThreadGroupTest, CanRunPolicyShouldYield) {
361 StartThreadGroup();
362
363 task_tracker_.SetCanRunPolicy(CanRunPolicy::kNone);
364 thread_group_->DidUpdateCanRunPolicy();
365 EXPECT_TRUE(
366 thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
367 EXPECT_TRUE(
368 thread_group_->ShouldYield({TaskPriority::USER_VISIBLE, TimeTicks()}));
369
370 task_tracker_.SetCanRunPolicy(CanRunPolicy::kForegroundOnly);
371 thread_group_->DidUpdateCanRunPolicy();
372 EXPECT_TRUE(
373 thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
374 EXPECT_FALSE(
375 thread_group_->ShouldYield({TaskPriority::USER_VISIBLE, TimeTicks()}));
376
377 task_tracker_.SetCanRunPolicy(CanRunPolicy::kAll);
378 thread_group_->DidUpdateCanRunPolicy();
379 EXPECT_FALSE(
380 thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
381 EXPECT_FALSE(
382 thread_group_->ShouldYield({TaskPriority::USER_VISIBLE, TimeTicks()}));
383 }
384
385 // Verify that the maximum number of BEST_EFFORT tasks that can run concurrently
386 // in a thread group does not affect Sequences with a priority that was
387 // increased from BEST_EFFORT to USER_BLOCKING.
TEST_F(ThreadGroupTest,UpdatePriorityBestEffortToUserBlocking)388 TEST_F(ThreadGroupTest, UpdatePriorityBestEffortToUserBlocking) {
389 StartThreadGroup();
390
391 CheckedLock num_tasks_running_lock;
392
393 ConditionVariable num_tasks_running_cv =
394 num_tasks_running_lock.CreateConditionVariable();
395 num_tasks_running_cv.declare_only_used_while_idle();
396
397 size_t num_tasks_running = 0;
398
399 // Post |kMaxTasks| BEST_EFFORT tasks that block until they all start running.
400 std::vector<scoped_refptr<PooledSequencedTaskRunner>> task_runners;
401
402 for (size_t i = 0; i < kMaxTasks; ++i) {
403 task_runners.push_back(MakeRefCounted<PooledSequencedTaskRunner>(
404 TaskTraits(TaskPriority::BEST_EFFORT),
405 &mock_pooled_task_runner_delegate_));
406 task_runners.back()->PostTask(
407 FROM_HERE, BindLambdaForTesting([&]() {
408 // Increment the number of tasks running.
409 {
410 CheckedAutoLock auto_lock(num_tasks_running_lock);
411 ++num_tasks_running;
412 }
413 num_tasks_running_cv.Broadcast();
414
415 // Wait until all posted tasks are running.
416 CheckedAutoLock auto_lock(num_tasks_running_lock);
417 while (num_tasks_running < kMaxTasks)
418 num_tasks_running_cv.Wait();
419 }));
420 }
421
422 // Wait until |kMaxBestEffort| tasks start running.
423 {
424 CheckedAutoLock auto_lock(num_tasks_running_lock);
425 while (num_tasks_running < kMaxBestEffortTasks)
426 num_tasks_running_cv.Wait();
427 }
428
429 // Update the priority of all TaskRunners to USER_BLOCKING.
430 for (size_t i = 0; i < kMaxTasks; ++i)
431 task_runners[i]->UpdatePriority(TaskPriority::USER_BLOCKING);
432
433 // Wait until all posted tasks start running. This should not block forever,
434 // even in a thread group that enforces a maximum number of concurrent
435 // BEST_EFFORT tasks lower than |kMaxTasks|.
436 static_assert(kMaxBestEffortTasks < kMaxTasks, "");
437 {
438 CheckedAutoLock auto_lock(num_tasks_running_lock);
439 while (num_tasks_running < kMaxTasks)
440 num_tasks_running_cv.Wait();
441 }
442
443 task_tracker_.FlushForTesting();
444 }
445
446 // Regression test for crbug.com/955953.
TEST_P(ThreadGroupTestAllExecutionModes,ScopedBlockingCallTwice)447 TEST_P(ThreadGroupTestAllExecutionModes, ScopedBlockingCallTwice) {
448 StartThreadGroup();
449 auto task_runner = test::CreatePooledTaskRunnerWithExecutionMode(
450 execution_mode(), &mock_pooled_task_runner_delegate_, {MayBlock()});
451
452 TestWaitableEvent task_ran;
453 task_runner->PostTask(FROM_HERE,
454 BindOnce(
455 [](TestWaitableEvent* task_ran) {
456 {
457 ScopedBlockingCall scoped_blocking_call(
458 FROM_HERE, BlockingType::MAY_BLOCK);
459 }
460 {
461 ScopedBlockingCall scoped_blocking_call(
462 FROM_HERE, BlockingType::MAY_BLOCK);
463 }
464 task_ran->Signal();
465 },
466 Unretained(&task_ran)));
467 task_ran.Wait();
468 }
469
470 #if BUILDFLAG(IS_WIN)
TEST_P(ThreadGroupTestAllExecutionModes,COMMTAWorkerEnvironment)471 TEST_P(ThreadGroupTestAllExecutionModes, COMMTAWorkerEnvironment) {
472 StartThreadGroup(ThreadGroup::WorkerEnvironment::COM_MTA);
473 auto task_runner = test::CreatePooledTaskRunnerWithExecutionMode(
474 execution_mode(), &mock_pooled_task_runner_delegate_);
475
476 TestWaitableEvent task_ran;
477 task_runner->PostTask(
478 FROM_HERE, BindOnce(
479 [](TestWaitableEvent* task_ran) {
480 win::AssertComApartmentType(win::ComApartmentType::MTA);
481 task_ran->Signal();
482 },
483 Unretained(&task_ran)));
484 task_ran.Wait();
485 }
486
TEST_P(ThreadGroupTestAllExecutionModes,NoWorkerEnvironment)487 TEST_P(ThreadGroupTestAllExecutionModes, NoWorkerEnvironment) {
488 StartThreadGroup(ThreadGroup::WorkerEnvironment::NONE);
489 auto task_runner = test::CreatePooledTaskRunnerWithExecutionMode(
490 execution_mode(), &mock_pooled_task_runner_delegate_);
491
492 TestWaitableEvent task_ran;
493 task_runner->PostTask(
494 FROM_HERE, BindOnce(
495 [](TestWaitableEvent* task_ran) {
496 win::AssertComApartmentType(win::ComApartmentType::NONE);
497 task_ran->Signal();
498 },
499 Unretained(&task_ran)));
500 task_ran.Wait();
501 }
502 #endif
503
504 // Verifies that ShouldYield() returns false when there is no pending task.
TEST_F(ThreadGroupTest,ShouldYieldSingleTask)505 TEST_F(ThreadGroupTest, ShouldYieldSingleTask) {
506 StartThreadGroup();
507
508 test::CreatePooledTaskRunner({TaskPriority::USER_BLOCKING},
509 &mock_pooled_task_runner_delegate_)
510 ->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
511 EXPECT_FALSE(thread_group_->ShouldYield(
512 {TaskPriority::BEST_EFFORT, TimeTicks::Now()}));
513 EXPECT_FALSE(thread_group_->ShouldYield(
514 {TaskPriority::USER_VISIBLE, TimeTicks::Now()}));
515 EXPECT_FALSE(thread_group_->ShouldYield(
516 {TaskPriority::USER_VISIBLE, TimeTicks::Now()}));
517 }));
518
519 task_tracker_.FlushForTesting();
520 }
521
522 // Verify that tasks from a JobTaskSource run at the intended concurrency.
TEST_F(ThreadGroupTest,ScheduleJobTaskSource)523 TEST_F(ThreadGroupTest, ScheduleJobTaskSource) {
524 StartThreadGroup();
525
526 TestWaitableEvent threads_running;
527 TestWaitableEvent threads_continue;
528
529 RepeatingClosure threads_running_barrier = BarrierClosure(
530 kMaxTasks,
531 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
532
533 auto job_task = base::MakeRefCounted<test::MockJobTask>(
534 BindLambdaForTesting(
535 [&threads_running_barrier, &threads_continue](JobDelegate*) {
536 threads_running_barrier.Run();
537 threads_continue.Wait();
538 }),
539 /* num_tasks_to_run */ kMaxTasks);
540 scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
541 FROM_HERE, {}, &mock_pooled_task_runner_delegate_);
542
543 auto registered_task_source =
544 task_tracker_.RegisterTaskSource(std::move(task_source));
545 EXPECT_TRUE(registered_task_source);
546 thread_group_->PushTaskSourceAndWakeUpWorkers(
547 RegisteredTaskSourceAndTransaction::FromTaskSource(
548 std::move(registered_task_source)));
549
550 threads_running.Wait();
551 threads_continue.Signal();
552
553 // Flush the task tracker to be sure that no local variables are accessed by
554 // tasks after the end of the scope.
555 task_tracker_.FlushForTesting();
556 }
557
558 // Verify that tasks from a JobTaskSource run at the intended concurrency.
TEST_F(ThreadGroupTest,ScheduleJobTaskSourceMultipleTime)559 TEST_F(ThreadGroupTest, ScheduleJobTaskSourceMultipleTime) {
560 StartThreadGroup();
561
562 TestWaitableEvent thread_running;
563 TestWaitableEvent thread_continue;
564 auto job_task = base::MakeRefCounted<test::MockJobTask>(
565 BindLambdaForTesting([&thread_running, &thread_continue](JobDelegate*) {
566 DCHECK(!thread_running.IsSignaled());
567 thread_running.Signal();
568 thread_continue.Wait();
569 }),
570 /* num_tasks_to_run */ 1);
571 scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
572 FROM_HERE, {}, &mock_pooled_task_runner_delegate_);
573
574 thread_group_->PushTaskSourceAndWakeUpWorkers(
575 RegisteredTaskSourceAndTransaction::FromTaskSource(
576 task_tracker_.RegisterTaskSource(task_source)));
577
578 // Enqueuing the task source again shouldn't affect the number of time it's
579 // run.
580 thread_group_->PushTaskSourceAndWakeUpWorkers(
581 RegisteredTaskSourceAndTransaction::FromTaskSource(
582 task_tracker_.RegisterTaskSource(task_source)));
583
584 thread_running.Wait();
585 thread_continue.Signal();
586
587 // Once the worker task ran, enqueuing the task source has no effect.
588 thread_group_->PushTaskSourceAndWakeUpWorkers(
589 RegisteredTaskSourceAndTransaction::FromTaskSource(
590 task_tracker_.RegisterTaskSource(task_source)));
591
592 // Flush the task tracker to be sure that no local variables are accessed by
593 // tasks after the end of the scope.
594 task_tracker_.FlushForTesting();
595 }
596
597 // Verify that Cancel() on a job stops running the worker task and causes
598 // current workers to yield.
TEST_F(ThreadGroupTest,CancelJobTaskSource)599 TEST_F(ThreadGroupTest, CancelJobTaskSource) {
600 StartThreadGroup();
601
602 CheckedLock tasks_running_lock;
603 ConditionVariable tasks_running_cv =
604 tasks_running_lock.CreateConditionVariable();
605 bool tasks_running = false;
606
607 // Schedule a big number of tasks.
608 auto job_task = base::MakeRefCounted<test::MockJobTask>(
609 BindLambdaForTesting([&](JobDelegate* delegate) {
610 {
611 CheckedAutoLock auto_lock(tasks_running_lock);
612 tasks_running = true;
613 }
614 tasks_running_cv.Signal();
615
616 while (!delegate->ShouldYield()) {
617 }
618 }),
619 /* num_tasks_to_run */ kTooManyTasks);
620 scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
621 FROM_HERE, {}, &mock_pooled_task_runner_delegate_);
622
623 mock_pooled_task_runner_delegate_.EnqueueJobTaskSource(task_source);
624 JobHandle job_handle = internal::JobTaskSource::CreateJobHandle(task_source);
625
626 // Wait for at least 1 task to start running.
627 {
628 CheckedAutoLock auto_lock(tasks_running_lock);
629 while (!tasks_running)
630 tasks_running_cv.Wait();
631 }
632
633 // Cancels pending tasks and unblocks running ones.
634 job_handle.Cancel();
635
636 // This should not block since the job got cancelled.
637 task_tracker_.FlushForTesting();
638 }
639
640 // Verify that calling JobTaskSource::NotifyConcurrencyIncrease() (re-)schedule
641 // tasks with the intended concurrency.
TEST_F(ThreadGroupTest,JobTaskSourceConcurrencyIncrease)642 TEST_F(ThreadGroupTest, JobTaskSourceConcurrencyIncrease) {
643 StartThreadGroup();
644
645 TestWaitableEvent threads_running_a;
646 TestWaitableEvent threads_continue;
647
648 // Initially schedule half the tasks.
649 RepeatingClosure threads_running_barrier = BarrierClosure(
650 kMaxTasks / 2,
651 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running_a)));
652
653 auto job_state = base::MakeRefCounted<test::MockJobTask>(
654 BindLambdaForTesting(
655 [&threads_running_barrier, &threads_continue](JobDelegate*) {
656 threads_running_barrier.Run();
657 threads_continue.Wait();
658 }),
659 /* num_tasks_to_run */ kMaxTasks / 2);
660 auto task_source = job_state->GetJobTaskSource(
661 FROM_HERE, {}, &mock_pooled_task_runner_delegate_);
662
663 auto registered_task_source = task_tracker_.RegisterTaskSource(task_source);
664 EXPECT_TRUE(registered_task_source);
665 thread_group_->PushTaskSourceAndWakeUpWorkers(
666 RegisteredTaskSourceAndTransaction::FromTaskSource(
667 std::move(registered_task_source)));
668
669 threads_running_a.Wait();
670 // Reset |threads_running_barrier| for the remaining tasks.
671 TestWaitableEvent threads_running_b;
672 threads_running_barrier = BarrierClosure(
673 kMaxTasks / 2,
674 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running_b)));
675 job_state->SetNumTasksToRun(kMaxTasks);
676
677 // Unblocks tasks to let them racily wait for NotifyConcurrencyIncrease() to
678 // be called.
679 threads_continue.Signal();
680 task_source->NotifyConcurrencyIncrease();
681 // Wait for the remaining tasks. This should not block forever.
682 threads_running_b.Wait();
683
684 // Flush the task tracker to be sure that no local variables are accessed by
685 // tasks after the end of the scope.
686 task_tracker_.FlushForTesting();
687 }
688
689 // Verify that a JobTaskSource that becomes empty while in the queue eventually
690 // gets discarded.
TEST_F(ThreadGroupTest,ScheduleEmptyJobTaskSource)691 TEST_F(ThreadGroupTest, ScheduleEmptyJobTaskSource) {
692 StartThreadGroup();
693
694 task_tracker_.SetCanRunPolicy(CanRunPolicy::kNone);
695
696 auto job_task = base::MakeRefCounted<test::MockJobTask>(
697 BindRepeating([](JobDelegate*) { ShouldNotRun(); }),
698 /* num_tasks_to_run */ 1);
699 scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
700 FROM_HERE, {}, &mock_pooled_task_runner_delegate_);
701
702 auto registered_task_source =
703 task_tracker_.RegisterTaskSource(std::move(task_source));
704 EXPECT_TRUE(registered_task_source);
705 thread_group_->PushTaskSourceAndWakeUpWorkers(
706 RegisteredTaskSourceAndTransaction::FromTaskSource(
707 std::move(registered_task_source)));
708
709 // The worker task will never run.
710 job_task->SetNumTasksToRun(0);
711
712 task_tracker_.SetCanRunPolicy(CanRunPolicy::kAll);
713 thread_group_->DidUpdateCanRunPolicy();
714
715 // This should not block since there's no task to run.
716 task_tracker_.FlushForTesting();
717 }
718
719 // Verify that Join() on a job contributes to max concurrency and waits for all
720 // workers to return.
TEST_F(ThreadGroupTest,JoinJobTaskSource)721 TEST_F(ThreadGroupTest, JoinJobTaskSource) {
722 StartThreadGroup();
723
724 TestWaitableEvent threads_continue;
725 RepeatingClosure threads_continue_barrier = BarrierClosure(
726 kMaxTasks + 1,
727 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_continue)));
728
729 auto job_task = base::MakeRefCounted<test::MockJobTask>(
730 BindLambdaForTesting([&](JobDelegate*) {
731 threads_continue_barrier.Run();
732 threads_continue.Wait();
733 }),
734 /* num_tasks_to_run */ kMaxTasks + 1);
735 scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
736 FROM_HERE, {}, &mock_pooled_task_runner_delegate_);
737
738 mock_pooled_task_runner_delegate_.EnqueueJobTaskSource(task_source);
739 JobHandle job_handle = internal::JobTaskSource::CreateJobHandle(task_source);
740 job_handle.Join();
741 // All worker tasks should complete before Join() returns.
742 EXPECT_EQ(0U, job_task->GetMaxConcurrency(0));
743 thread_group_->JoinForTesting();
744 EXPECT_EQ(1U, task_source->HasOneRef());
745 // Prevent TearDown() from calling JoinForTesting() again.
746 mock_pooled_task_runner_delegate_.SetThreadGroup(nullptr);
747 thread_group_ = nullptr;
748 }
749
750 // Verify that finishing work outside of a job unblocks workers with a stale
751 // max concurrency.
TEST_F(ThreadGroupTest,JoinJobTaskSourceStaleConcurrency)752 TEST_F(ThreadGroupTest, JoinJobTaskSourceStaleConcurrency) {
753 StartThreadGroup();
754
755 TestWaitableEvent thread_running;
756 std::atomic_size_t max_concurrency(1);
757 auto task_source = MakeRefCounted<JobTaskSource>(
758 FROM_HERE, TaskTraits{},
759 BindLambdaForTesting([&](JobDelegate*) { thread_running.Signal(); }),
760 BindLambdaForTesting(
761 [&](size_t /*worker_count*/) -> size_t { return max_concurrency; }),
762 &mock_pooled_task_runner_delegate_);
763
764 mock_pooled_task_runner_delegate_.EnqueueJobTaskSource(task_source);
765 JobHandle job_handle = internal::JobTaskSource::CreateJobHandle(task_source);
766 thread_running.Wait();
767
768 // Racily update max concurrency to unblock the task that was waiting on
769 // NotifyMaxConcurrency().
770 max_concurrency = 0;
771 job_handle.Join();
772
773 // This should not block since the job was joined.
774 task_tracker_.FlushForTesting();
775 }
776
777 // Verify that cancelling a job unblocks workers with a stale max concurrency.
TEST_F(ThreadGroupTest,CancelJobTaskSourceWithStaleConcurrency)778 TEST_F(ThreadGroupTest, CancelJobTaskSourceWithStaleConcurrency) {
779 StartThreadGroup();
780
781 TestWaitableEvent thread_running;
782 auto task_source = MakeRefCounted<JobTaskSource>(
783 FROM_HERE, TaskTraits{},
784 BindLambdaForTesting([&](JobDelegate*) { thread_running.Signal(); }),
785 BindRepeating([](size_t /*worker_count*/) -> size_t { return 1; }),
786 &mock_pooled_task_runner_delegate_);
787
788 mock_pooled_task_runner_delegate_.EnqueueJobTaskSource(task_source);
789 JobHandle job_handle = internal::JobTaskSource::CreateJobHandle(task_source);
790 thread_running.Wait();
791 job_handle.Cancel();
792
793 // This should not block since the job got cancelled.
794 task_tracker_.FlushForTesting();
795 }
796
797 // Verify that the maximum number of BEST_EFFORT tasks that can run concurrently
798 // in a thread group does not affect JobTaskSource with a priority that was
799 // increased from BEST_EFFORT to USER_BLOCKING.
TEST_F(ThreadGroupTest,JobTaskSourceUpdatePriority)800 TEST_F(ThreadGroupTest, JobTaskSourceUpdatePriority) {
801 StartThreadGroup();
802
803 CheckedLock num_tasks_running_lock;
804
805 ConditionVariable num_tasks_running_cv =
806 num_tasks_running_lock.CreateConditionVariable();
807 num_tasks_running_cv.declare_only_used_while_idle();
808
809 size_t num_tasks_running = 0;
810
811 auto job_task = base::MakeRefCounted<test::MockJobTask>(
812 BindLambdaForTesting([&](JobDelegate*) {
813 // Increment the number of tasks running.
814 {
815 CheckedAutoLock auto_lock(num_tasks_running_lock);
816 ++num_tasks_running;
817 }
818 num_tasks_running_cv.Broadcast();
819
820 // Wait until all posted tasks are running.
821 CheckedAutoLock auto_lock(num_tasks_running_lock);
822 while (num_tasks_running < kMaxTasks)
823 num_tasks_running_cv.Wait();
824 }),
825 /* num_tasks_to_run */ kMaxTasks);
826 scoped_refptr<JobTaskSource> task_source =
827 job_task->GetJobTaskSource(FROM_HERE, {TaskPriority::BEST_EFFORT},
828 &mock_pooled_task_runner_delegate_);
829
830 auto registered_task_source = task_tracker_.RegisterTaskSource(task_source);
831 EXPECT_TRUE(registered_task_source);
832 thread_group_->PushTaskSourceAndWakeUpWorkers(
833 RegisteredTaskSourceAndTransaction::FromTaskSource(
834 std::move(registered_task_source)));
835
836 // Wait until |kMaxBestEffort| tasks start running.
837 {
838 CheckedAutoLock auto_lock(num_tasks_running_lock);
839 while (num_tasks_running < kMaxBestEffortTasks)
840 num_tasks_running_cv.Wait();
841 }
842
843 // Update the priority to USER_BLOCKING.
844 auto transaction = task_source->BeginTransaction();
845 transaction.UpdatePriority(TaskPriority::USER_BLOCKING);
846 thread_group_->UpdateSortKey(std::move(transaction));
847
848 // Wait until all posted tasks start running. This should not block forever,
849 // even in a thread group that enforces a maximum number of concurrent
850 // BEST_EFFORT tasks lower than |kMaxTasks|.
851 static_assert(kMaxBestEffortTasks < kMaxTasks, "");
852 {
853 CheckedAutoLock auto_lock(num_tasks_running_lock);
854 while (num_tasks_running < kMaxTasks)
855 num_tasks_running_cv.Wait();
856 }
857
858 // Flush the task tracker to be sure that no local variables are accessed by
859 // tasks after the end of the scope.
860 task_tracker_.FlushForTesting();
861 }
862
863 INSTANTIATE_TEST_SUITE_P(GenericParallel,
864 ThreadGroupTestAllExecutionModes,
865 ::testing::Values(TaskSourceExecutionMode::kParallel));
866 INSTANTIATE_TEST_SUITE_P(
867 GenericSequenced,
868 ThreadGroupTestAllExecutionModes,
869 ::testing::Values(TaskSourceExecutionMode::kSequenced));
870 INSTANTIATE_TEST_SUITE_P(GenericJob,
871 ThreadGroupTestAllExecutionModes,
872 ::testing::Values(TaskSourceExecutionMode::kJob));
873
874 } // namespace internal
875 } // namespace base
876