xref: /aosp_15_r20/external/cronet/base/task/thread_pool/thread_group_unittest.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2017 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/thread_group.h"
6 
7 #include <memory>
8 #include <tuple>
9 #include <utility>
10 
11 #include "base/barrier_closure.h"
12 #include "base/functional/bind.h"
13 #include "base/functional/callback_helpers.h"
14 #include "base/location.h"
15 #include "base/memory/ref_counted.h"
16 #include "base/task/task_runner.h"
17 #include "base/task/task_traits.h"
18 #include "base/task/thread_pool/can_run_policy_test.h"
19 #include "base/task/thread_pool/delayed_task_manager.h"
20 #include "base/task/thread_pool/pooled_sequenced_task_runner.h"
21 #include "base/task/thread_pool/task_tracker.h"
22 #include "base/task/thread_pool/test_task_factory.h"
23 #include "base/task/thread_pool/test_utils.h"
24 #include "base/task/thread_pool/thread_group_impl.h"
25 #include "base/test/bind.h"
26 #include "base/test/test_timeouts.h"
27 #include "base/test/test_waitable_event.h"
28 #include "base/threading/platform_thread.h"
29 #include "base/threading/scoped_blocking_call.h"
30 #include "base/threading/scoped_blocking_call_internal.h"
31 #include "base/threading/simple_thread.h"
32 #include "base/threading/thread.h"
33 #include "base/threading/thread_restrictions.h"
34 #include "build/build_config.h"
35 #include "testing/gtest/include/gtest/gtest.h"
36 
37 #if BUILDFLAG(IS_WIN)
38 #include "base/win/com_init_check_hook.h"
39 #include "base/win/com_init_util.h"
40 #endif
41 
42 namespace base {
43 namespace internal {
44 
45 namespace {
46 
47 constexpr size_t kMaxTasks = 4;
48 constexpr size_t kTooManyTasks = 1000;
49 // By default, tests allow half of the thread group to be used by best-effort
50 // tasks.
51 constexpr size_t kMaxBestEffortTasks = kMaxTasks / 2;
52 constexpr size_t kNumThreadsPostingTasks = 4;
53 constexpr size_t kNumTasksPostedPerThread = 150;
54 
55 using PostNestedTask = test::TestTaskFactory::PostNestedTask;
56 
57 class ThreadPostingTasks : public SimpleThread {
58  public:
59   // Constructs a thread that posts |num_tasks_posted_per_thread| tasks to
60   // |thread_group| through an |execution_mode| task runner. If
61   // |post_nested_task| is YES, each task posted by this thread posts another
62   // task when it runs.
ThreadPostingTasks(test::MockPooledTaskRunnerDelegate * mock_pooled_task_runner_delegate_,TaskSourceExecutionMode execution_mode,PostNestedTask post_nested_task)63   ThreadPostingTasks(
64       test::MockPooledTaskRunnerDelegate* mock_pooled_task_runner_delegate_,
65       TaskSourceExecutionMode execution_mode,
66       PostNestedTask post_nested_task)
67       : SimpleThread("ThreadPostingTasks"),
68         post_nested_task_(post_nested_task),
69         factory_(test::CreatePooledTaskRunnerWithExecutionMode(
70                      execution_mode,
71                      mock_pooled_task_runner_delegate_),
72                  execution_mode) {}
73   ThreadPostingTasks(const ThreadPostingTasks&) = delete;
74   ThreadPostingTasks& operator=(const ThreadPostingTasks&) = delete;
75 
factory() const76   const test::TestTaskFactory* factory() const { return &factory_; }
77 
78  private:
Run()79   void Run() override {
80     for (size_t i = 0; i < kNumTasksPostedPerThread; ++i)
81       EXPECT_TRUE(factory_.PostTask(post_nested_task_, OnceClosure()));
82   }
83 
84   const scoped_refptr<TaskRunner> task_runner_;
85   const PostNestedTask post_nested_task_;
86   test::TestTaskFactory factory_;
87 };
88 
89 class ThreadGroupTestBase : public testing::Test, public ThreadGroup::Delegate {
90  public:
91   ThreadGroupTestBase(const ThreadGroupTestBase&) = delete;
92   ThreadGroupTestBase& operator=(const ThreadGroupTestBase&) = delete;
93 
94  protected:
95   ThreadGroupTestBase() = default;
96 
SetUp()97   void SetUp() override {
98     service_thread_.Start();
99     delayed_task_manager_.Start(service_thread_.task_runner());
100     CreateThreadGroup();
101   }
102 
TearDown()103   void TearDown() override {
104     delayed_task_manager_.Shutdown();
105     service_thread_.Stop();
106     DestroyThreadGroup();
107   }
108 
CreateThreadGroup()109   void CreateThreadGroup() {
110     ASSERT_FALSE(thread_group_);
111     thread_group_ = std::make_unique<ThreadGroupImpl>(
112         "TestThreadGroup", "A", ThreadType::kDefault,
113         task_tracker_.GetTrackedRef(), tracked_ref_factory_.GetTrackedRef());
114 
115     mock_pooled_task_runner_delegate_.SetThreadGroup(thread_group_.get());
116   }
117 
StartThreadGroup(ThreadGroup::WorkerEnvironment worker_environment=ThreadGroup::WorkerEnvironment::NONE)118   void StartThreadGroup(ThreadGroup::WorkerEnvironment worker_environment =
119                             ThreadGroup::WorkerEnvironment::NONE) {
120     ASSERT_TRUE(thread_group_);
121     ThreadGroupImpl* thread_group_impl =
122         static_cast<ThreadGroupImpl*>(thread_group_.get());
123     thread_group_impl->Start(kMaxTasks, kMaxBestEffortTasks, TimeDelta::Max(),
124                              service_thread_.task_runner(), nullptr,
125                              worker_environment,
126                              /*synchronous_thread_start_for_testing=*/false,
127                              /*may_block_threshold=*/{});
128   }
129 
DestroyThreadGroup()130   void DestroyThreadGroup() {
131     if (!thread_group_) {
132       return;
133     }
134 
135     thread_group_->JoinForTesting();
136     mock_pooled_task_runner_delegate_.SetThreadGroup(nullptr);
137     thread_group_.reset();
138   }
139 
140   Thread service_thread_{"ThreadPoolServiceThread"};
141   TaskTracker task_tracker_;
142   DelayedTaskManager delayed_task_manager_;
143   test::MockPooledTaskRunnerDelegate mock_pooled_task_runner_delegate_ = {
144       task_tracker_.GetTrackedRef(), &delayed_task_manager_};
145 
146   std::unique_ptr<ThreadGroup> thread_group_;
147 
148  private:
149   // ThreadGroup::Delegate:
GetThreadGroupForTraits(const TaskTraits & traits)150   ThreadGroup* GetThreadGroupForTraits(const TaskTraits& traits) override {
151     return thread_group_.get();
152   }
153 
154   TrackedRefFactory<ThreadGroup::Delegate> tracked_ref_factory_{this};
155 };
156 
157 using ThreadGroupTest = ThreadGroupTestBase;
158 
159 // TODO(etiennep): Audit tests that don't need TaskSourceExecutionMode
160 // parameter.
161 class ThreadGroupTestAllExecutionModes
162     : public ThreadGroupTestBase,
163       public testing::WithParamInterface<TaskSourceExecutionMode> {
164  public:
165   ThreadGroupTestAllExecutionModes() = default;
166   ThreadGroupTestAllExecutionModes(const ThreadGroupTestAllExecutionModes&) =
167       delete;
168   ThreadGroupTestAllExecutionModes& operator=(
169       const ThreadGroupTestAllExecutionModes&) = delete;
170 
execution_mode() const171   TaskSourceExecutionMode execution_mode() const { return GetParam(); }
172 
CreatePooledTaskRunner(const TaskTraits & traits={})173   scoped_refptr<TaskRunner> CreatePooledTaskRunner(
174       const TaskTraits& traits = {}) {
175     return test::CreatePooledTaskRunnerWithExecutionMode(
176         execution_mode(), &mock_pooled_task_runner_delegate_, traits);
177   }
178 };
179 
ShouldNotRun()180 void ShouldNotRun() {
181   ADD_FAILURE() << "Ran a task that shouldn't run.";
182 }
183 
184 }  // namespace
185 
TEST_P(ThreadGroupTestAllExecutionModes,PostTasks)186 TEST_P(ThreadGroupTestAllExecutionModes, PostTasks) {
187   StartThreadGroup();
188   // Create threads to post tasks.
189   std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks;
190   for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
191     threads_posting_tasks.push_back(std::make_unique<ThreadPostingTasks>(
192         &mock_pooled_task_runner_delegate_, execution_mode(),
193         PostNestedTask::NO));
194     threads_posting_tasks.back()->Start();
195   }
196 
197   // Wait for all tasks to run.
198   for (const auto& thread_posting_tasks : threads_posting_tasks) {
199     thread_posting_tasks->Join();
200     thread_posting_tasks->factory()->WaitForAllTasksToRun();
201   }
202 
203   // Flush the task tracker to be sure that no task accesses its TestTaskFactory
204   // after |thread_posting_tasks| is destroyed.
205   task_tracker_.FlushForTesting();
206 }
207 
TEST_P(ThreadGroupTestAllExecutionModes,NestedPostTasks)208 TEST_P(ThreadGroupTestAllExecutionModes, NestedPostTasks) {
209   StartThreadGroup();
210   // Create threads to post tasks. Each task posted by these threads will post
211   // another task when it runs.
212   std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks;
213   for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
214     threads_posting_tasks.push_back(std::make_unique<ThreadPostingTasks>(
215         &mock_pooled_task_runner_delegate_, execution_mode(),
216         PostNestedTask::YES));
217     threads_posting_tasks.back()->Start();
218   }
219 
220   // Wait for all tasks to run.
221   for (const auto& thread_posting_tasks : threads_posting_tasks) {
222     thread_posting_tasks->Join();
223     thread_posting_tasks->factory()->WaitForAllTasksToRun();
224   }
225 
226   // Flush the task tracker to be sure that no task accesses its TestTaskFactory
227   // after |thread_posting_tasks| is destroyed.
228   task_tracker_.FlushForTesting();
229 }
230 
231 // Verify that a Task can't be posted after shutdown.
TEST_P(ThreadGroupTestAllExecutionModes,PostTaskAfterShutdown)232 TEST_P(ThreadGroupTestAllExecutionModes, PostTaskAfterShutdown) {
233   StartThreadGroup();
234   auto task_runner = CreatePooledTaskRunner();
235   test::ShutdownTaskTracker(&task_tracker_);
236   EXPECT_FALSE(task_runner->PostTask(FROM_HERE, BindOnce(&ShouldNotRun)));
237 }
238 
239 // Verify that a Task runs shortly after its delay expires.
TEST_P(ThreadGroupTestAllExecutionModes,PostDelayedTask)240 TEST_P(ThreadGroupTestAllExecutionModes, PostDelayedTask) {
241   StartThreadGroup();
242   // kJob doesn't support delays.
243   if (execution_mode() == TaskSourceExecutionMode::kJob)
244     return;
245 
246   TestWaitableEvent task_ran(WaitableEvent::ResetPolicy::AUTOMATIC);
247   auto task_runner = CreatePooledTaskRunner();
248 
249   // Wait until the task runner is up and running to make sure the test below is
250   // solely timing the delayed task, not bringing up a physical thread.
251   task_runner->PostTask(
252       FROM_HERE, BindOnce(&TestWaitableEvent::Signal, Unretained(&task_ran)));
253   task_ran.Wait();
254   ASSERT_TRUE(!task_ran.IsSignaled());
255 
256   // Post a task with a short delay.
257   const TimeTicks start_time = TimeTicks::Now();
258   EXPECT_TRUE(task_runner->PostDelayedTask(
259       FROM_HERE, BindOnce(&TestWaitableEvent::Signal, Unretained(&task_ran)),
260       TestTimeouts::tiny_timeout()));
261 
262   // Wait until the task runs.
263   task_ran.Wait();
264 
265   // Expect the task to run after its delay expires, but no more than a
266   // reasonable amount of time after that (overloaded bots can be slow sometimes
267   // so give it 10X flexibility).
268   const TimeDelta actual_delay = TimeTicks::Now() - start_time;
269   EXPECT_GE(actual_delay, TestTimeouts::tiny_timeout());
270   EXPECT_LT(actual_delay, 10 * TestTimeouts::tiny_timeout());
271 }
272 
273 // Verify that the RunsTasksInCurrentSequence() method of a SEQUENCED TaskRunner
274 // returns false when called from a task that isn't part of the sequence. Note:
275 // Tests that use TestTaskFactory already verify that
276 // RunsTasksInCurrentSequence() returns true when appropriate so this method
277 // complements it to get full coverage of that method.
TEST_P(ThreadGroupTestAllExecutionModes,SequencedRunsTasksInCurrentSequence)278 TEST_P(ThreadGroupTestAllExecutionModes, SequencedRunsTasksInCurrentSequence) {
279   StartThreadGroup();
280   auto task_runner = CreatePooledTaskRunner();
281   auto sequenced_task_runner = test::CreatePooledSequencedTaskRunner(
282       TaskTraits(), &mock_pooled_task_runner_delegate_);
283 
284   TestWaitableEvent task_ran;
285   task_runner->PostTask(
286       FROM_HERE,
287       BindOnce(
288           [](scoped_refptr<SequencedTaskRunner> sequenced_task_runner,
289              TestWaitableEvent* task_ran) {
290             EXPECT_FALSE(sequenced_task_runner->RunsTasksInCurrentSequence());
291             task_ran->Signal();
292           },
293           sequenced_task_runner, Unretained(&task_ran)));
294   task_ran.Wait();
295 }
296 
297 // Verify that tasks posted before Start run after Start.
TEST_P(ThreadGroupTestAllExecutionModes,PostBeforeStart)298 TEST_P(ThreadGroupTestAllExecutionModes, PostBeforeStart) {
299   TestWaitableEvent task_1_running;
300   TestWaitableEvent task_2_running;
301 
302   auto task_runner = CreatePooledTaskRunner();
303   task_runner->PostTask(FROM_HERE, BindOnce(&TestWaitableEvent::Signal,
304                                             Unretained(&task_1_running)));
305   task_runner->PostTask(FROM_HERE, BindOnce(&TestWaitableEvent::Signal,
306                                             Unretained(&task_2_running)));
307 
308   // Workers should not be created and tasks should not run before the thread
309   // group is started. The sleep is to give time for the tasks to potentially
310   // run.
311   PlatformThread::Sleep(TestTimeouts::tiny_timeout());
312   EXPECT_FALSE(task_1_running.IsSignaled());
313   EXPECT_FALSE(task_2_running.IsSignaled());
314 
315   StartThreadGroup();
316 
317   // Tasks should run shortly after the thread group is started.
318   task_1_running.Wait();
319   task_2_running.Wait();
320 
321   task_tracker_.FlushForTesting();
322 }
323 
324 // Verify that tasks only run when allowed by the CanRunPolicy.
TEST_P(ThreadGroupTestAllExecutionModes,CanRunPolicyBasic)325 TEST_P(ThreadGroupTestAllExecutionModes, CanRunPolicyBasic) {
326   StartThreadGroup();
327   test::TestCanRunPolicyBasic(
328       thread_group_.get(),
329       [this](TaskPriority priority) {
330         return CreatePooledTaskRunner({priority});
331       },
332       &task_tracker_);
333 }
334 
TEST_F(ThreadGroupTest,CanRunPolicyUpdatedBeforeRun)335 TEST_F(ThreadGroupTest, CanRunPolicyUpdatedBeforeRun) {
336   StartThreadGroup();
337   // This test only works with SequencedTaskRunner become it assumes
338   // ordered execution of 2 posted tasks.
339   test::TestCanRunPolicyChangedBeforeRun(
340       thread_group_.get(),
341       [this](TaskPriority priority) {
342         return test::CreatePooledSequencedTaskRunner(
343             {priority}, &mock_pooled_task_runner_delegate_);
344       },
345       &task_tracker_);
346 }
347 
TEST_P(ThreadGroupTestAllExecutionModes,CanRunPolicyLoad)348 TEST_P(ThreadGroupTestAllExecutionModes, CanRunPolicyLoad) {
349   StartThreadGroup();
350   test::TestCanRunPolicyLoad(
351       thread_group_.get(),
352       [this](TaskPriority priority) {
353         return CreatePooledTaskRunner({priority});
354       },
355       &task_tracker_);
356 }
357 
358 // Verifies that ShouldYield() returns true for a priority that is not allowed
359 // to run by the CanRunPolicy.
TEST_F(ThreadGroupTest,CanRunPolicyShouldYield)360 TEST_F(ThreadGroupTest, CanRunPolicyShouldYield) {
361   StartThreadGroup();
362 
363   task_tracker_.SetCanRunPolicy(CanRunPolicy::kNone);
364   thread_group_->DidUpdateCanRunPolicy();
365   EXPECT_TRUE(
366       thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
367   EXPECT_TRUE(
368       thread_group_->ShouldYield({TaskPriority::USER_VISIBLE, TimeTicks()}));
369 
370   task_tracker_.SetCanRunPolicy(CanRunPolicy::kForegroundOnly);
371   thread_group_->DidUpdateCanRunPolicy();
372   EXPECT_TRUE(
373       thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
374   EXPECT_FALSE(
375       thread_group_->ShouldYield({TaskPriority::USER_VISIBLE, TimeTicks()}));
376 
377   task_tracker_.SetCanRunPolicy(CanRunPolicy::kAll);
378   thread_group_->DidUpdateCanRunPolicy();
379   EXPECT_FALSE(
380       thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
381   EXPECT_FALSE(
382       thread_group_->ShouldYield({TaskPriority::USER_VISIBLE, TimeTicks()}));
383 }
384 
385 // Verify that the maximum number of BEST_EFFORT tasks that can run concurrently
386 // in a thread group does not affect Sequences with a priority that was
387 // increased from BEST_EFFORT to USER_BLOCKING.
TEST_F(ThreadGroupTest,UpdatePriorityBestEffortToUserBlocking)388 TEST_F(ThreadGroupTest, UpdatePriorityBestEffortToUserBlocking) {
389   StartThreadGroup();
390 
391   CheckedLock num_tasks_running_lock;
392 
393   ConditionVariable num_tasks_running_cv =
394       num_tasks_running_lock.CreateConditionVariable();
395   num_tasks_running_cv.declare_only_used_while_idle();
396 
397   size_t num_tasks_running = 0;
398 
399   // Post |kMaxTasks| BEST_EFFORT tasks that block until they all start running.
400   std::vector<scoped_refptr<PooledSequencedTaskRunner>> task_runners;
401 
402   for (size_t i = 0; i < kMaxTasks; ++i) {
403     task_runners.push_back(MakeRefCounted<PooledSequencedTaskRunner>(
404         TaskTraits(TaskPriority::BEST_EFFORT),
405         &mock_pooled_task_runner_delegate_));
406     task_runners.back()->PostTask(
407         FROM_HERE, BindLambdaForTesting([&]() {
408           // Increment the number of tasks running.
409           {
410             CheckedAutoLock auto_lock(num_tasks_running_lock);
411             ++num_tasks_running;
412           }
413           num_tasks_running_cv.Broadcast();
414 
415           // Wait until all posted tasks are running.
416           CheckedAutoLock auto_lock(num_tasks_running_lock);
417           while (num_tasks_running < kMaxTasks)
418             num_tasks_running_cv.Wait();
419         }));
420   }
421 
422   // Wait until |kMaxBestEffort| tasks start running.
423   {
424     CheckedAutoLock auto_lock(num_tasks_running_lock);
425     while (num_tasks_running < kMaxBestEffortTasks)
426       num_tasks_running_cv.Wait();
427   }
428 
429   // Update the priority of all TaskRunners to USER_BLOCKING.
430   for (size_t i = 0; i < kMaxTasks; ++i)
431     task_runners[i]->UpdatePriority(TaskPriority::USER_BLOCKING);
432 
433   // Wait until all posted tasks start running. This should not block forever,
434   // even in a thread group that enforces a maximum number of concurrent
435   // BEST_EFFORT tasks lower than |kMaxTasks|.
436   static_assert(kMaxBestEffortTasks < kMaxTasks, "");
437   {
438     CheckedAutoLock auto_lock(num_tasks_running_lock);
439     while (num_tasks_running < kMaxTasks)
440       num_tasks_running_cv.Wait();
441   }
442 
443   task_tracker_.FlushForTesting();
444 }
445 
446 // Regression test for crbug.com/955953.
TEST_P(ThreadGroupTestAllExecutionModes,ScopedBlockingCallTwice)447 TEST_P(ThreadGroupTestAllExecutionModes, ScopedBlockingCallTwice) {
448   StartThreadGroup();
449   auto task_runner = test::CreatePooledTaskRunnerWithExecutionMode(
450       execution_mode(), &mock_pooled_task_runner_delegate_, {MayBlock()});
451 
452   TestWaitableEvent task_ran;
453   task_runner->PostTask(FROM_HERE,
454                         BindOnce(
455                             [](TestWaitableEvent* task_ran) {
456                               {
457                                 ScopedBlockingCall scoped_blocking_call(
458                                     FROM_HERE, BlockingType::MAY_BLOCK);
459                               }
460                               {
461                                 ScopedBlockingCall scoped_blocking_call(
462                                     FROM_HERE, BlockingType::MAY_BLOCK);
463                               }
464                               task_ran->Signal();
465                             },
466                             Unretained(&task_ran)));
467   task_ran.Wait();
468 }
469 
470 #if BUILDFLAG(IS_WIN)
TEST_P(ThreadGroupTestAllExecutionModes,COMMTAWorkerEnvironment)471 TEST_P(ThreadGroupTestAllExecutionModes, COMMTAWorkerEnvironment) {
472   StartThreadGroup(ThreadGroup::WorkerEnvironment::COM_MTA);
473   auto task_runner = test::CreatePooledTaskRunnerWithExecutionMode(
474       execution_mode(), &mock_pooled_task_runner_delegate_);
475 
476   TestWaitableEvent task_ran;
477   task_runner->PostTask(
478       FROM_HERE, BindOnce(
479                      [](TestWaitableEvent* task_ran) {
480                        win::AssertComApartmentType(win::ComApartmentType::MTA);
481                        task_ran->Signal();
482                      },
483                      Unretained(&task_ran)));
484   task_ran.Wait();
485 }
486 
TEST_P(ThreadGroupTestAllExecutionModes,NoWorkerEnvironment)487 TEST_P(ThreadGroupTestAllExecutionModes, NoWorkerEnvironment) {
488   StartThreadGroup(ThreadGroup::WorkerEnvironment::NONE);
489   auto task_runner = test::CreatePooledTaskRunnerWithExecutionMode(
490       execution_mode(), &mock_pooled_task_runner_delegate_);
491 
492   TestWaitableEvent task_ran;
493   task_runner->PostTask(
494       FROM_HERE, BindOnce(
495                      [](TestWaitableEvent* task_ran) {
496                        win::AssertComApartmentType(win::ComApartmentType::NONE);
497                        task_ran->Signal();
498                      },
499                      Unretained(&task_ran)));
500   task_ran.Wait();
501 }
502 #endif
503 
504 // Verifies that ShouldYield() returns false when there is no pending task.
TEST_F(ThreadGroupTest,ShouldYieldSingleTask)505 TEST_F(ThreadGroupTest, ShouldYieldSingleTask) {
506   StartThreadGroup();
507 
508   test::CreatePooledTaskRunner({TaskPriority::USER_BLOCKING},
509                                &mock_pooled_task_runner_delegate_)
510       ->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
511                    EXPECT_FALSE(thread_group_->ShouldYield(
512                        {TaskPriority::BEST_EFFORT, TimeTicks::Now()}));
513                    EXPECT_FALSE(thread_group_->ShouldYield(
514                        {TaskPriority::USER_VISIBLE, TimeTicks::Now()}));
515                    EXPECT_FALSE(thread_group_->ShouldYield(
516                        {TaskPriority::USER_VISIBLE, TimeTicks::Now()}));
517                  }));
518 
519   task_tracker_.FlushForTesting();
520 }
521 
522 // Verify that tasks from a JobTaskSource run at the intended concurrency.
TEST_F(ThreadGroupTest,ScheduleJobTaskSource)523 TEST_F(ThreadGroupTest, ScheduleJobTaskSource) {
524   StartThreadGroup();
525 
526   TestWaitableEvent threads_running;
527   TestWaitableEvent threads_continue;
528 
529   RepeatingClosure threads_running_barrier = BarrierClosure(
530       kMaxTasks,
531       BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
532 
533   auto job_task = base::MakeRefCounted<test::MockJobTask>(
534       BindLambdaForTesting(
535           [&threads_running_barrier, &threads_continue](JobDelegate*) {
536             threads_running_barrier.Run();
537             threads_continue.Wait();
538           }),
539       /* num_tasks_to_run */ kMaxTasks);
540   scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
541       FROM_HERE, {}, &mock_pooled_task_runner_delegate_);
542 
543   auto registered_task_source =
544       task_tracker_.RegisterTaskSource(std::move(task_source));
545   EXPECT_TRUE(registered_task_source);
546   thread_group_->PushTaskSourceAndWakeUpWorkers(
547       RegisteredTaskSourceAndTransaction::FromTaskSource(
548           std::move(registered_task_source)));
549 
550   threads_running.Wait();
551   threads_continue.Signal();
552 
553   // Flush the task tracker to be sure that no local variables are accessed by
554   // tasks after the end of the scope.
555   task_tracker_.FlushForTesting();
556 }
557 
558 // Verify that tasks from a JobTaskSource run at the intended concurrency.
TEST_F(ThreadGroupTest,ScheduleJobTaskSourceMultipleTime)559 TEST_F(ThreadGroupTest, ScheduleJobTaskSourceMultipleTime) {
560   StartThreadGroup();
561 
562   TestWaitableEvent thread_running;
563   TestWaitableEvent thread_continue;
564   auto job_task = base::MakeRefCounted<test::MockJobTask>(
565       BindLambdaForTesting([&thread_running, &thread_continue](JobDelegate*) {
566         DCHECK(!thread_running.IsSignaled());
567         thread_running.Signal();
568         thread_continue.Wait();
569       }),
570       /* num_tasks_to_run */ 1);
571   scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
572       FROM_HERE, {}, &mock_pooled_task_runner_delegate_);
573 
574   thread_group_->PushTaskSourceAndWakeUpWorkers(
575       RegisteredTaskSourceAndTransaction::FromTaskSource(
576           task_tracker_.RegisterTaskSource(task_source)));
577 
578   // Enqueuing the task source again shouldn't affect the number of time it's
579   // run.
580   thread_group_->PushTaskSourceAndWakeUpWorkers(
581       RegisteredTaskSourceAndTransaction::FromTaskSource(
582           task_tracker_.RegisterTaskSource(task_source)));
583 
584   thread_running.Wait();
585   thread_continue.Signal();
586 
587   // Once the worker task ran, enqueuing the task source has no effect.
588   thread_group_->PushTaskSourceAndWakeUpWorkers(
589       RegisteredTaskSourceAndTransaction::FromTaskSource(
590           task_tracker_.RegisterTaskSource(task_source)));
591 
592   // Flush the task tracker to be sure that no local variables are accessed by
593   // tasks after the end of the scope.
594   task_tracker_.FlushForTesting();
595 }
596 
597 // Verify that Cancel() on a job stops running the worker task and causes
598 // current workers to yield.
TEST_F(ThreadGroupTest,CancelJobTaskSource)599 TEST_F(ThreadGroupTest, CancelJobTaskSource) {
600   StartThreadGroup();
601 
602   CheckedLock tasks_running_lock;
603   ConditionVariable tasks_running_cv =
604       tasks_running_lock.CreateConditionVariable();
605   bool tasks_running = false;
606 
607   // Schedule a big number of tasks.
608   auto job_task = base::MakeRefCounted<test::MockJobTask>(
609       BindLambdaForTesting([&](JobDelegate* delegate) {
610         {
611           CheckedAutoLock auto_lock(tasks_running_lock);
612           tasks_running = true;
613         }
614         tasks_running_cv.Signal();
615 
616         while (!delegate->ShouldYield()) {
617         }
618       }),
619       /* num_tasks_to_run */ kTooManyTasks);
620   scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
621       FROM_HERE, {}, &mock_pooled_task_runner_delegate_);
622 
623   mock_pooled_task_runner_delegate_.EnqueueJobTaskSource(task_source);
624   JobHandle job_handle = internal::JobTaskSource::CreateJobHandle(task_source);
625 
626   // Wait for at least 1 task to start running.
627   {
628     CheckedAutoLock auto_lock(tasks_running_lock);
629     while (!tasks_running)
630       tasks_running_cv.Wait();
631   }
632 
633   // Cancels pending tasks and unblocks running ones.
634   job_handle.Cancel();
635 
636   // This should not block since the job got cancelled.
637   task_tracker_.FlushForTesting();
638 }
639 
640 // Verify that calling JobTaskSource::NotifyConcurrencyIncrease() (re-)schedule
641 // tasks with the intended concurrency.
TEST_F(ThreadGroupTest,JobTaskSourceConcurrencyIncrease)642 TEST_F(ThreadGroupTest, JobTaskSourceConcurrencyIncrease) {
643   StartThreadGroup();
644 
645   TestWaitableEvent threads_running_a;
646   TestWaitableEvent threads_continue;
647 
648   // Initially schedule half the tasks.
649   RepeatingClosure threads_running_barrier = BarrierClosure(
650       kMaxTasks / 2,
651       BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running_a)));
652 
653   auto job_state = base::MakeRefCounted<test::MockJobTask>(
654       BindLambdaForTesting(
655           [&threads_running_barrier, &threads_continue](JobDelegate*) {
656             threads_running_barrier.Run();
657             threads_continue.Wait();
658           }),
659       /* num_tasks_to_run */ kMaxTasks / 2);
660   auto task_source = job_state->GetJobTaskSource(
661       FROM_HERE, {}, &mock_pooled_task_runner_delegate_);
662 
663   auto registered_task_source = task_tracker_.RegisterTaskSource(task_source);
664   EXPECT_TRUE(registered_task_source);
665   thread_group_->PushTaskSourceAndWakeUpWorkers(
666       RegisteredTaskSourceAndTransaction::FromTaskSource(
667           std::move(registered_task_source)));
668 
669   threads_running_a.Wait();
670   // Reset |threads_running_barrier| for the remaining tasks.
671   TestWaitableEvent threads_running_b;
672   threads_running_barrier = BarrierClosure(
673       kMaxTasks / 2,
674       BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running_b)));
675   job_state->SetNumTasksToRun(kMaxTasks);
676 
677   // Unblocks tasks to let them racily wait for NotifyConcurrencyIncrease() to
678   // be called.
679   threads_continue.Signal();
680   task_source->NotifyConcurrencyIncrease();
681   // Wait for the remaining tasks. This should not block forever.
682   threads_running_b.Wait();
683 
684   // Flush the task tracker to be sure that no local variables are accessed by
685   // tasks after the end of the scope.
686   task_tracker_.FlushForTesting();
687 }
688 
689 // Verify that a JobTaskSource that becomes empty while in the queue eventually
690 // gets discarded.
TEST_F(ThreadGroupTest,ScheduleEmptyJobTaskSource)691 TEST_F(ThreadGroupTest, ScheduleEmptyJobTaskSource) {
692   StartThreadGroup();
693 
694   task_tracker_.SetCanRunPolicy(CanRunPolicy::kNone);
695 
696   auto job_task = base::MakeRefCounted<test::MockJobTask>(
697       BindRepeating([](JobDelegate*) { ShouldNotRun(); }),
698       /* num_tasks_to_run */ 1);
699   scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
700       FROM_HERE, {}, &mock_pooled_task_runner_delegate_);
701 
702   auto registered_task_source =
703       task_tracker_.RegisterTaskSource(std::move(task_source));
704   EXPECT_TRUE(registered_task_source);
705   thread_group_->PushTaskSourceAndWakeUpWorkers(
706       RegisteredTaskSourceAndTransaction::FromTaskSource(
707           std::move(registered_task_source)));
708 
709   // The worker task will never run.
710   job_task->SetNumTasksToRun(0);
711 
712   task_tracker_.SetCanRunPolicy(CanRunPolicy::kAll);
713   thread_group_->DidUpdateCanRunPolicy();
714 
715   // This should not block since there's no task to run.
716   task_tracker_.FlushForTesting();
717 }
718 
719 // Verify that Join() on a job contributes to max concurrency and waits for all
720 // workers to return.
TEST_F(ThreadGroupTest,JoinJobTaskSource)721 TEST_F(ThreadGroupTest, JoinJobTaskSource) {
722   StartThreadGroup();
723 
724   TestWaitableEvent threads_continue;
725   RepeatingClosure threads_continue_barrier = BarrierClosure(
726       kMaxTasks + 1,
727       BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_continue)));
728 
729   auto job_task = base::MakeRefCounted<test::MockJobTask>(
730       BindLambdaForTesting([&](JobDelegate*) {
731         threads_continue_barrier.Run();
732         threads_continue.Wait();
733       }),
734       /* num_tasks_to_run */ kMaxTasks + 1);
735   scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
736       FROM_HERE, {}, &mock_pooled_task_runner_delegate_);
737 
738   mock_pooled_task_runner_delegate_.EnqueueJobTaskSource(task_source);
739   JobHandle job_handle = internal::JobTaskSource::CreateJobHandle(task_source);
740   job_handle.Join();
741   // All worker tasks should complete before Join() returns.
742   EXPECT_EQ(0U, job_task->GetMaxConcurrency(0));
743   thread_group_->JoinForTesting();
744   EXPECT_EQ(1U, task_source->HasOneRef());
745   // Prevent TearDown() from calling JoinForTesting() again.
746   mock_pooled_task_runner_delegate_.SetThreadGroup(nullptr);
747   thread_group_ = nullptr;
748 }
749 
750 // Verify that finishing work outside of a job unblocks workers with a stale
751 // max concurrency.
TEST_F(ThreadGroupTest,JoinJobTaskSourceStaleConcurrency)752 TEST_F(ThreadGroupTest, JoinJobTaskSourceStaleConcurrency) {
753   StartThreadGroup();
754 
755   TestWaitableEvent thread_running;
756   std::atomic_size_t max_concurrency(1);
757   auto task_source = MakeRefCounted<JobTaskSource>(
758       FROM_HERE, TaskTraits{},
759       BindLambdaForTesting([&](JobDelegate*) { thread_running.Signal(); }),
760       BindLambdaForTesting(
761           [&](size_t /*worker_count*/) -> size_t { return max_concurrency; }),
762       &mock_pooled_task_runner_delegate_);
763 
764   mock_pooled_task_runner_delegate_.EnqueueJobTaskSource(task_source);
765   JobHandle job_handle = internal::JobTaskSource::CreateJobHandle(task_source);
766   thread_running.Wait();
767 
768   // Racily update max concurrency to unblock the task that was waiting on
769   // NotifyMaxConcurrency().
770   max_concurrency = 0;
771   job_handle.Join();
772 
773   // This should not block since the job was joined.
774   task_tracker_.FlushForTesting();
775 }
776 
777 // Verify that cancelling a job unblocks workers with a stale max concurrency.
TEST_F(ThreadGroupTest,CancelJobTaskSourceWithStaleConcurrency)778 TEST_F(ThreadGroupTest, CancelJobTaskSourceWithStaleConcurrency) {
779   StartThreadGroup();
780 
781   TestWaitableEvent thread_running;
782   auto task_source = MakeRefCounted<JobTaskSource>(
783       FROM_HERE, TaskTraits{},
784       BindLambdaForTesting([&](JobDelegate*) { thread_running.Signal(); }),
785       BindRepeating([](size_t /*worker_count*/) -> size_t { return 1; }),
786       &mock_pooled_task_runner_delegate_);
787 
788   mock_pooled_task_runner_delegate_.EnqueueJobTaskSource(task_source);
789   JobHandle job_handle = internal::JobTaskSource::CreateJobHandle(task_source);
790   thread_running.Wait();
791   job_handle.Cancel();
792 
793   // This should not block since the job got cancelled.
794   task_tracker_.FlushForTesting();
795 }
796 
797 // Verify that the maximum number of BEST_EFFORT tasks that can run concurrently
798 // in a thread group does not affect JobTaskSource with a priority that was
799 // increased from BEST_EFFORT to USER_BLOCKING.
TEST_F(ThreadGroupTest,JobTaskSourceUpdatePriority)800 TEST_F(ThreadGroupTest, JobTaskSourceUpdatePriority) {
801   StartThreadGroup();
802 
803   CheckedLock num_tasks_running_lock;
804 
805   ConditionVariable num_tasks_running_cv =
806       num_tasks_running_lock.CreateConditionVariable();
807   num_tasks_running_cv.declare_only_used_while_idle();
808 
809   size_t num_tasks_running = 0;
810 
811   auto job_task = base::MakeRefCounted<test::MockJobTask>(
812       BindLambdaForTesting([&](JobDelegate*) {
813         // Increment the number of tasks running.
814         {
815           CheckedAutoLock auto_lock(num_tasks_running_lock);
816           ++num_tasks_running;
817         }
818         num_tasks_running_cv.Broadcast();
819 
820         // Wait until all posted tasks are running.
821         CheckedAutoLock auto_lock(num_tasks_running_lock);
822         while (num_tasks_running < kMaxTasks)
823           num_tasks_running_cv.Wait();
824       }),
825       /* num_tasks_to_run */ kMaxTasks);
826   scoped_refptr<JobTaskSource> task_source =
827       job_task->GetJobTaskSource(FROM_HERE, {TaskPriority::BEST_EFFORT},
828                                  &mock_pooled_task_runner_delegate_);
829 
830   auto registered_task_source = task_tracker_.RegisterTaskSource(task_source);
831   EXPECT_TRUE(registered_task_source);
832   thread_group_->PushTaskSourceAndWakeUpWorkers(
833       RegisteredTaskSourceAndTransaction::FromTaskSource(
834           std::move(registered_task_source)));
835 
836   // Wait until |kMaxBestEffort| tasks start running.
837   {
838     CheckedAutoLock auto_lock(num_tasks_running_lock);
839     while (num_tasks_running < kMaxBestEffortTasks)
840       num_tasks_running_cv.Wait();
841   }
842 
843   // Update the priority to USER_BLOCKING.
844   auto transaction = task_source->BeginTransaction();
845   transaction.UpdatePriority(TaskPriority::USER_BLOCKING);
846   thread_group_->UpdateSortKey(std::move(transaction));
847 
848   // Wait until all posted tasks start running. This should not block forever,
849   // even in a thread group that enforces a maximum number of concurrent
850   // BEST_EFFORT tasks lower than |kMaxTasks|.
851   static_assert(kMaxBestEffortTasks < kMaxTasks, "");
852   {
853     CheckedAutoLock auto_lock(num_tasks_running_lock);
854     while (num_tasks_running < kMaxTasks)
855       num_tasks_running_cv.Wait();
856   }
857 
858   // Flush the task tracker to be sure that no local variables are accessed by
859   // tasks after the end of the scope.
860   task_tracker_.FlushForTesting();
861 }
862 
863 INSTANTIATE_TEST_SUITE_P(GenericParallel,
864                          ThreadGroupTestAllExecutionModes,
865                          ::testing::Values(TaskSourceExecutionMode::kParallel));
866 INSTANTIATE_TEST_SUITE_P(
867     GenericSequenced,
868     ThreadGroupTestAllExecutionModes,
869     ::testing::Values(TaskSourceExecutionMode::kSequenced));
870 INSTANTIATE_TEST_SUITE_P(GenericJob,
871                          ThreadGroupTestAllExecutionModes,
872                          ::testing::Values(TaskSourceExecutionMode::kJob));
873 
874 }  // namespace internal
875 }  // namespace base
876