xref: /aosp_15_r20/external/cronet/base/task/thread_pool/thread_group_impl_unittest.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/thread_group_impl.h"
6 
7 #include <stddef.h>
8 
9 #include <algorithm>
10 #include <atomic>
11 #include <memory>
12 #include <optional>
13 #include <unordered_set>
14 #include <utility>
15 #include <vector>
16 
17 #include "base/atomicops.h"
18 #include "base/barrier_closure.h"
19 #include "base/functional/bind.h"
20 #include "base/functional/callback.h"
21 #include "base/functional/callback_helpers.h"
22 #include "base/memory/ptr_util.h"
23 #include "base/memory/raw_ptr.h"
24 #include "base/memory/ref_counted.h"
25 #include "base/metrics/statistics_recorder.h"
26 #include "base/synchronization/atomic_flag.h"
27 #include "base/synchronization/condition_variable.h"
28 #include "base/synchronization/lock.h"
29 #include "base/task/task_features.h"
30 #include "base/task/task_runner.h"
31 #include "base/task/thread_pool/delayed_task_manager.h"
32 #include "base/task/thread_pool/environment_config.h"
33 #include "base/task/thread_pool/pooled_task_runner_delegate.h"
34 #include "base/task/thread_pool/sequence.h"
35 #include "base/task/thread_pool/task_source_sort_key.h"
36 #include "base/task/thread_pool/task_tracker.h"
37 #include "base/task/thread_pool/test_task_factory.h"
38 #include "base/task/thread_pool/test_utils.h"
39 #include "base/task/thread_pool/worker_thread_observer.h"
40 #include "base/test/bind.h"
41 #include "base/test/gtest_util.h"
42 #include "base/test/scoped_feature_list.h"
43 #include "base/test/test_simple_task_runner.h"
44 #include "base/test/test_timeouts.h"
45 #include "base/test/test_waitable_event.h"
46 #include "base/threading/platform_thread.h"
47 #include "base/threading/scoped_blocking_call.h"
48 #include "base/threading/simple_thread.h"
49 #include "base/threading/thread.h"
50 #include "base/threading/thread_checker_impl.h"
51 #include "base/time/time.h"
52 #include "base/timer/timer.h"
53 #include "build/build_config.h"
54 #include "testing/gtest/include/gtest/gtest.h"
55 
56 namespace base {
57 namespace internal {
58 namespace {
59 
60 constexpr size_t kMaxTasks = 4;
61 constexpr size_t kNumThreadsPostingTasks = 4;
62 constexpr size_t kNumTasksPostedPerThread = 150;
63 // This can't be lower because Windows' TestWaitableEvent wakes up too early
64 // when a small timeout is used. This results in many spurious wake ups before a
65 // worker is allowed to cleanup.
66 constexpr TimeDelta kReclaimTimeForCleanupTests = Milliseconds(500);
67 constexpr size_t kLargeNumber = 512;
68 
69 class ThreadGroupImplImplTestBase : public ThreadGroup::Delegate {
70  public:
71   ThreadGroupImplImplTestBase(const ThreadGroupImplImplTestBase&) = delete;
72   ThreadGroupImplImplTestBase& operator=(const ThreadGroupImplImplTestBase&) =
73       delete;
74 
75  protected:
ThreadGroupImplImplTestBase()76   ThreadGroupImplImplTestBase()
77       : service_thread_("ThreadPoolServiceThread"),
78         tracked_ref_factory_(this) {}
79 
CommonTearDown()80   void CommonTearDown() {
81     delayed_task_manager_.Shutdown();
82     service_thread_.Stop();
83     task_tracker_.FlushForTesting();
84     if (thread_group_)
85       thread_group_->JoinForTesting();
86     mock_pooled_task_runner_delegate_.SetThreadGroup(nullptr);
87     thread_group_.reset();
88   }
89 
CreateThreadGroup(ThreadType thread_type=ThreadType::kDefault)90   void CreateThreadGroup(ThreadType thread_type = ThreadType::kDefault) {
91     ASSERT_FALSE(thread_group_);
92     service_thread_.Start();
93     delayed_task_manager_.Start(service_thread_.task_runner());
94     thread_group_ = std::make_unique<ThreadGroupImpl>(
95         "TestThreadGroup", "A", thread_type, task_tracker_.GetTrackedRef(),
96         tracked_ref_factory_.GetTrackedRef());
97     ASSERT_TRUE(thread_group_);
98 
99     mock_pooled_task_runner_delegate_.SetThreadGroup(thread_group_.get());
100   }
101 
StartThreadGroup(TimeDelta suggested_reclaim_time,size_t max_tasks,std::optional<int> max_best_effort_tasks=std::nullopt,WorkerThreadObserver * worker_observer=nullptr,std::optional<TimeDelta> may_block_threshold=std::nullopt)102   void StartThreadGroup(
103       TimeDelta suggested_reclaim_time,
104       size_t max_tasks,
105       std::optional<int> max_best_effort_tasks = std::nullopt,
106       WorkerThreadObserver* worker_observer = nullptr,
107       std::optional<TimeDelta> may_block_threshold = std::nullopt) {
108     ASSERT_TRUE(thread_group_);
109     thread_group_->Start(
110         max_tasks,
111         max_best_effort_tasks ? max_best_effort_tasks.value() : max_tasks,
112         suggested_reclaim_time, service_thread_.task_runner(), worker_observer,
113         ThreadGroup::WorkerEnvironment::NONE,
114         /* synchronous_thread_start_for_testing=*/false, may_block_threshold);
115   }
116 
CreateAndStartThreadGroup(TimeDelta suggested_reclaim_time=TimeDelta::Max (),size_t max_tasks=kMaxTasks,std::optional<int> max_best_effort_tasks=std::nullopt,WorkerThreadObserver * worker_observer=nullptr,std::optional<TimeDelta> may_block_threshold=std::nullopt)117   void CreateAndStartThreadGroup(
118       TimeDelta suggested_reclaim_time = TimeDelta::Max(),
119       size_t max_tasks = kMaxTasks,
120       std::optional<int> max_best_effort_tasks = std::nullopt,
121       WorkerThreadObserver* worker_observer = nullptr,
122       std::optional<TimeDelta> may_block_threshold = std::nullopt) {
123     CreateThreadGroup();
124     StartThreadGroup(suggested_reclaim_time, max_tasks, max_best_effort_tasks,
125                      worker_observer, may_block_threshold);
126   }
127 
128   Thread service_thread_;
129   TaskTracker task_tracker_;
130   std::unique_ptr<ThreadGroupImpl> thread_group_;
131   DelayedTaskManager delayed_task_manager_;
132   TrackedRefFactory<ThreadGroup::Delegate> tracked_ref_factory_;
133   test::MockPooledTaskRunnerDelegate mock_pooled_task_runner_delegate_ = {
134       task_tracker_.GetTrackedRef(), &delayed_task_manager_};
135 
136  private:
137   // ThreadGroup::Delegate:
GetThreadGroupForTraits(const TaskTraits & traits)138   ThreadGroup* GetThreadGroupForTraits(const TaskTraits& traits) override {
139     return thread_group_.get();
140   }
141 };
142 
143 class ThreadGroupImplImplTest : public ThreadGroupImplImplTestBase,
144                                 public testing::Test {
145  public:
146   ThreadGroupImplImplTest(const ThreadGroupImplImplTest&) = delete;
147   ThreadGroupImplImplTest& operator=(const ThreadGroupImplImplTest&) = delete;
148 
149  protected:
150   ThreadGroupImplImplTest() = default;
151 
SetUp()152   void SetUp() override { CreateAndStartThreadGroup(); }
153 
TearDown()154   void TearDown() override { ThreadGroupImplImplTestBase::CommonTearDown(); }
155 };
156 
157 class ThreadGroupImplImplTestParam
158     : public ThreadGroupImplImplTestBase,
159       public testing::TestWithParam<TaskSourceExecutionMode> {
160  public:
161   ThreadGroupImplImplTestParam(const ThreadGroupImplImplTestParam&) = delete;
162   ThreadGroupImplImplTestParam& operator=(const ThreadGroupImplImplTestParam&) =
163       delete;
164 
165  protected:
166   ThreadGroupImplImplTestParam() = default;
167 
SetUp()168   void SetUp() override { CreateAndStartThreadGroup(); }
169 
TearDown()170   void TearDown() override { ThreadGroupImplImplTestBase::CommonTearDown(); }
171 };
172 
173 using PostNestedTask = test::TestTaskFactory::PostNestedTask;
174 
175 class ThreadPostingTasksWaitIdle : public SimpleThread {
176  public:
177   // Constructs a thread that posts tasks to |thread_group| through an
178   // |execution_mode| task runner. The thread waits until all workers in
179   // |thread_group| are idle before posting a new task.
ThreadPostingTasksWaitIdle(ThreadGroupImpl * thread_group,test::MockPooledTaskRunnerDelegate * mock_pooled_task_runner_delegate_,TaskSourceExecutionMode execution_mode)180   ThreadPostingTasksWaitIdle(
181       ThreadGroupImpl* thread_group,
182       test::MockPooledTaskRunnerDelegate* mock_pooled_task_runner_delegate_,
183       TaskSourceExecutionMode execution_mode)
184       : SimpleThread("ThreadPostingTasksWaitIdle"),
185         thread_group_(thread_group),
186         factory_(CreatePooledTaskRunnerWithExecutionMode(
187                      execution_mode,
188                      mock_pooled_task_runner_delegate_),
189                  execution_mode) {
190     DCHECK(thread_group_);
191   }
192   ThreadPostingTasksWaitIdle(const ThreadPostingTasksWaitIdle&) = delete;
193   ThreadPostingTasksWaitIdle& operator=(const ThreadPostingTasksWaitIdle&) =
194       delete;
195 
factory() const196   const test::TestTaskFactory* factory() const { return &factory_; }
197 
198  private:
Run()199   void Run() override {
200     for (size_t i = 0; i < kNumTasksPostedPerThread; ++i) {
201       thread_group_->WaitForAllWorkersIdleForTesting();
202       EXPECT_TRUE(factory_.PostTask(PostNestedTask::NO, OnceClosure()));
203     }
204   }
205 
206   const raw_ptr<ThreadGroupImpl> thread_group_;
207   const scoped_refptr<TaskRunner> task_runner_;
208   test::TestTaskFactory factory_;
209 };
210 
211 }  // namespace
212 
TEST_P(ThreadGroupImplImplTestParam,PostTasksWaitAllWorkersIdle)213 TEST_P(ThreadGroupImplImplTestParam, PostTasksWaitAllWorkersIdle) {
214   // Create threads to post tasks. To verify that workers can sleep and be woken
215   // up when new tasks are posted, wait for all workers to become idle before
216   // posting a new task.
217   std::vector<std::unique_ptr<ThreadPostingTasksWaitIdle>>
218       threads_posting_tasks;
219   for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
220     threads_posting_tasks.push_back(
221         std::make_unique<ThreadPostingTasksWaitIdle>(
222             thread_group_.get(), &mock_pooled_task_runner_delegate_,
223             GetParam()));
224     threads_posting_tasks.back()->Start();
225   }
226 
227   // Wait for all tasks to run.
228   for (const auto& thread_posting_tasks : threads_posting_tasks) {
229     thread_posting_tasks->Join();
230     thread_posting_tasks->factory()->WaitForAllTasksToRun();
231   }
232 
233   // Wait until all workers are idle to be sure that no task accesses its
234   // TestTaskFactory after |thread_posting_tasks| is destroyed.
235   thread_group_->WaitForAllWorkersIdleForTesting();
236 }
237 
TEST_P(ThreadGroupImplImplTestParam,PostTasksWithOneAvailableWorker)238 TEST_P(ThreadGroupImplImplTestParam, PostTasksWithOneAvailableWorker) {
239   // Post blocking tasks to keep all workers busy except one until |event| is
240   // signaled. Use different factories so that tasks are added to different
241   // sequences and can run simultaneously when the execution mode is SEQUENCED.
242   TestWaitableEvent event;
243   std::vector<std::unique_ptr<test::TestTaskFactory>> blocked_task_factories;
244   for (size_t i = 0; i < (kMaxTasks - 1); ++i) {
245     blocked_task_factories.push_back(std::make_unique<test::TestTaskFactory>(
246         CreatePooledTaskRunnerWithExecutionMode(
247             GetParam(), &mock_pooled_task_runner_delegate_),
248         GetParam()));
249     EXPECT_TRUE(blocked_task_factories.back()->PostTask(
250         PostNestedTask::NO,
251         BindOnce(&TestWaitableEvent::Wait, Unretained(&event))));
252     blocked_task_factories.back()->WaitForAllTasksToRun();
253   }
254 
255   // Post |kNumTasksPostedPerThread| tasks that should all run despite the fact
256   // that only one worker in |thread_group_| isn't busy.
257   test::TestTaskFactory short_task_factory(
258       CreatePooledTaskRunnerWithExecutionMode(
259           GetParam(), &mock_pooled_task_runner_delegate_),
260       GetParam());
261   for (size_t i = 0; i < kNumTasksPostedPerThread; ++i)
262     EXPECT_TRUE(short_task_factory.PostTask(PostNestedTask::NO, OnceClosure()));
263   short_task_factory.WaitForAllTasksToRun();
264 
265   // Release tasks waiting on |event|.
266   event.Signal();
267 
268   // Wait until all workers are idle to be sure that no task accesses
269   // its TestTaskFactory after it is destroyed.
270   thread_group_->WaitForAllWorkersIdleForTesting();
271 }
272 
TEST_P(ThreadGroupImplImplTestParam,Saturate)273 TEST_P(ThreadGroupImplImplTestParam, Saturate) {
274   // Verify that it is possible to have |kMaxTasks| tasks/sequences running
275   // simultaneously. Use different factories so that the blocking tasks are
276   // added to different sequences and can run simultaneously when the execution
277   // mode is SEQUENCED.
278   TestWaitableEvent event;
279   std::vector<std::unique_ptr<test::TestTaskFactory>> factories;
280   for (size_t i = 0; i < kMaxTasks; ++i) {
281     factories.push_back(std::make_unique<test::TestTaskFactory>(
282         CreatePooledTaskRunnerWithExecutionMode(
283             GetParam(), &mock_pooled_task_runner_delegate_),
284         GetParam()));
285     EXPECT_TRUE(factories.back()->PostTask(
286         PostNestedTask::NO,
287         BindOnce(&TestWaitableEvent::Wait, Unretained(&event))));
288     factories.back()->WaitForAllTasksToRun();
289   }
290 
291   // Release tasks waiting on |event|.
292   event.Signal();
293 
294   // Wait until all workers are idle to be sure that no task accesses
295   // its TestTaskFactory after it is destroyed.
296   thread_group_->WaitForAllWorkersIdleForTesting();
297 }
298 
299 // Verifies that ShouldYield() returns true for priorities lower than the
300 // highest priority pending while the thread group is flooded with USER_VISIBLE
301 // tasks.
TEST_F(ThreadGroupImplImplTest,ShouldYieldFloodedUserVisible)302 TEST_F(ThreadGroupImplImplTest, ShouldYieldFloodedUserVisible) {
303   TestWaitableEvent threads_running;
304   TestWaitableEvent threads_continue;
305 
306   // Saturate workers with USER_VISIBLE tasks to ensure ShouldYield() returns
307   // true when a tasks of higher priority is posted.
308   RepeatingClosure threads_running_barrier = BarrierClosure(
309       kMaxTasks,
310       BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
311 
312   auto job_task = base::MakeRefCounted<test::MockJobTask>(
313       BindLambdaForTesting(
314           [&threads_running_barrier, &threads_continue](JobDelegate* delegate) {
315             threads_running_barrier.Run();
316             threads_continue.Wait();
317           }),
318       /* num_tasks_to_run */ kMaxTasks);
319   scoped_refptr<JobTaskSource> task_source =
320       job_task->GetJobTaskSource(FROM_HERE, {TaskPriority::USER_VISIBLE},
321                                  &mock_pooled_task_runner_delegate_);
322 
323   auto registered_task_source = task_tracker_.RegisterTaskSource(task_source);
324   ASSERT_TRUE(registered_task_source);
325   static_cast<ThreadGroup*>(thread_group_.get())
326       ->PushTaskSourceAndWakeUpWorkers(
327           RegisteredTaskSourceAndTransaction::FromTaskSource(
328               std::move(registered_task_source)));
329 
330   threads_running.Wait();
331 
332   // Posting a BEST_EFFORT task should not cause any other tasks to yield.
333   // Once this task gets to run, no other task needs to yield.
334   // Note: This is only true because this test is using a single ThreadGroup.
335   //       Under the ThreadPool this wouldn't be racy because BEST_EFFORT tasks
336   //       run in an independent ThreadGroup.
337   test::CreatePooledTaskRunner({TaskPriority::BEST_EFFORT},
338                                &mock_pooled_task_runner_delegate_)
339       ->PostTask(
340           FROM_HERE, BindLambdaForTesting([&]() {
341             EXPECT_FALSE(thread_group_->ShouldYield(
342                 {TaskPriority::BEST_EFFORT, TimeTicks(), /* worker_count=*/1}));
343           }));
344   // A BEST_EFFORT task with more workers shouldn't have to yield.
345   EXPECT_FALSE(thread_group_->ShouldYield(
346       {TaskPriority::BEST_EFFORT, TimeTicks(), /* worker_count=*/2}));
347   EXPECT_FALSE(thread_group_->ShouldYield(
348       {TaskPriority::BEST_EFFORT, TimeTicks(), /* worker_count=*/0}));
349   EXPECT_FALSE(thread_group_->ShouldYield(
350       {TaskPriority::USER_VISIBLE, TimeTicks(), /* worker_count=*/0}));
351   EXPECT_FALSE(thread_group_->ShouldYield(
352       {TaskPriority::USER_BLOCKING, TimeTicks(), /* worker_count=*/0}));
353 
354   // Posting a USER_VISIBLE task should cause BEST_EFFORT and USER_VISIBLE with
355   // higher worker_count tasks to yield.
356   auto post_user_visible = [&]() {
357     test::CreatePooledTaskRunner({TaskPriority::USER_VISIBLE},
358                                  &mock_pooled_task_runner_delegate_)
359         ->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
360                      EXPECT_FALSE(thread_group_->ShouldYield(
361                          {TaskPriority::USER_VISIBLE, TimeTicks(),
362                           /* worker_count=*/1}));
363                    }));
364   };
365   // A USER_VISIBLE task with too many workers should yield.
366   post_user_visible();
367   EXPECT_TRUE(thread_group_->ShouldYield(
368       {TaskPriority::USER_VISIBLE, TimeTicks(), /* worker_count=*/2}));
369   post_user_visible();
370   EXPECT_TRUE(thread_group_->ShouldYield(
371       {TaskPriority::BEST_EFFORT, TimeTicks(), /* worker_count=*/0}));
372   post_user_visible();
373   EXPECT_FALSE(thread_group_->ShouldYield(
374       {TaskPriority::USER_VISIBLE, TimeTicks(), /* worker_count=*/1}));
375   EXPECT_FALSE(thread_group_->ShouldYield(
376       {TaskPriority::USER_BLOCKING, TimeTicks(), /* worker_count=*/0}));
377 
378   // Posting a USER_BLOCKING task should cause BEST_EFFORT, USER_VISIBLE and
379   // USER_BLOCKING with higher worker_count tasks to yield.
380   auto post_user_blocking = [&]() {
381     test::CreatePooledTaskRunner({TaskPriority::USER_BLOCKING},
382                                  &mock_pooled_task_runner_delegate_)
383         ->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
384                      // Once this task got to start, no other task needs to
385                      // yield.
386                      EXPECT_FALSE(thread_group_->ShouldYield(
387                          {TaskPriority::USER_BLOCKING, TimeTicks(),
388                           /* worker_count=*/1}));
389                    }));
390   };
391   // A USER_BLOCKING task with too many workers should have to yield.
392   post_user_blocking();
393   EXPECT_TRUE(thread_group_->ShouldYield(
394       {TaskPriority::USER_BLOCKING, TimeTicks(), /* worker_count=*/2}));
395   post_user_blocking();
396   EXPECT_TRUE(thread_group_->ShouldYield(
397       {TaskPriority::BEST_EFFORT, TimeTicks(), /* worker_count=*/0}));
398   post_user_blocking();
399   EXPECT_TRUE(thread_group_->ShouldYield(
400       {TaskPriority::USER_VISIBLE, TimeTicks(), /* worker_count=*/0}));
401   post_user_blocking();
402   EXPECT_FALSE(thread_group_->ShouldYield(
403       {TaskPriority::USER_BLOCKING, TimeTicks(), /* worker_count=*/1}));
404 
405   threads_continue.Signal();
406   task_tracker_.FlushForTesting();
407 }
408 
409 INSTANTIATE_TEST_SUITE_P(Parallel,
410                          ThreadGroupImplImplTestParam,
411                          ::testing::Values(TaskSourceExecutionMode::kParallel));
412 INSTANTIATE_TEST_SUITE_P(
413     Sequenced,
414     ThreadGroupImplImplTestParam,
415     ::testing::Values(TaskSourceExecutionMode::kSequenced));
416 
417 INSTANTIATE_TEST_SUITE_P(Job,
418                          ThreadGroupImplImplTestParam,
419                          ::testing::Values(TaskSourceExecutionMode::kJob));
420 
421 namespace {
422 
423 class ThreadGroupImplImplStartInBodyTest : public ThreadGroupImplImplTest {
424  public:
SetUp()425   void SetUp() override {
426     CreateThreadGroup();
427     // Let the test start the thread group.
428   }
429 };
430 
TaskPostedBeforeStart(PlatformThreadRef * platform_thread_ref,TestWaitableEvent * task_running,TestWaitableEvent * barrier)431 void TaskPostedBeforeStart(PlatformThreadRef* platform_thread_ref,
432                            TestWaitableEvent* task_running,
433                            TestWaitableEvent* barrier) {
434   *platform_thread_ref = PlatformThread::CurrentRef();
435   task_running->Signal();
436   barrier->Wait();
437 }
438 
439 }  // namespace
440 
441 // Verify that 2 tasks posted before Start() to a ThreadGroupImpl with
442 // more than 2 workers run on different workers when Start() is called.
TEST_F(ThreadGroupImplImplStartInBodyTest,PostTasksBeforeStart)443 TEST_F(ThreadGroupImplImplStartInBodyTest, PostTasksBeforeStart) {
444   PlatformThreadRef task_1_thread_ref;
445   PlatformThreadRef task_2_thread_ref;
446   TestWaitableEvent task_1_running;
447   TestWaitableEvent task_2_running;
448 
449   // This event is used to prevent a task from completing before the other task
450   // starts running. If that happened, both tasks could run on the same worker
451   // and this test couldn't verify that the correct number of workers were woken
452   // up.
453   TestWaitableEvent barrier;
454 
455   test::CreatePooledTaskRunner({WithBaseSyncPrimitives()},
456                                &mock_pooled_task_runner_delegate_)
457       ->PostTask(
458           FROM_HERE,
459           BindOnce(&TaskPostedBeforeStart, Unretained(&task_1_thread_ref),
460                    Unretained(&task_1_running), Unretained(&barrier)));
461   test::CreatePooledTaskRunner({WithBaseSyncPrimitives()},
462                                &mock_pooled_task_runner_delegate_)
463       ->PostTask(
464           FROM_HERE,
465           BindOnce(&TaskPostedBeforeStart, Unretained(&task_2_thread_ref),
466                    Unretained(&task_2_running), Unretained(&barrier)));
467 
468   // Workers should not be created and tasks should not run before the thread
469   // group is started.
470   EXPECT_EQ(0U, thread_group_->NumberOfWorkersForTesting());
471   EXPECT_FALSE(task_1_running.IsSignaled());
472   EXPECT_FALSE(task_2_running.IsSignaled());
473 
474   StartThreadGroup(TimeDelta::Max(), kMaxTasks);
475 
476   // Tasks should run shortly after the thread group is started.
477   task_1_running.Wait();
478   task_2_running.Wait();
479 
480   // Tasks should run on different threads.
481   EXPECT_NE(task_1_thread_ref, task_2_thread_ref);
482 
483   barrier.Signal();
484   task_tracker_.FlushForTesting();
485 }
486 
487 // Verify that posting many tasks before Start will cause the number of workers
488 // to grow to |max_tasks_| after Start.
TEST_F(ThreadGroupImplImplStartInBodyTest,PostManyTasks)489 TEST_F(ThreadGroupImplImplStartInBodyTest, PostManyTasks) {
490   scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
491       {WithBaseSyncPrimitives()}, &mock_pooled_task_runner_delegate_);
492   constexpr size_t kNumTasksPosted = 2 * kMaxTasks;
493 
494   TestWaitableEvent threads_running;
495   TestWaitableEvent threads_continue;
496 
497   RepeatingClosure threads_running_barrier = BarrierClosure(
498       kMaxTasks,
499       BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
500   // Posting these tasks should cause new workers to be created.
501   for (size_t i = 0; i < kMaxTasks; ++i) {
502     task_runner->PostTask(
503         FROM_HERE, BindLambdaForTesting([&]() {
504           threads_running_barrier.Run();
505           threads_continue.Wait();
506         }));
507   }
508   // Post the remaining |kNumTasksPosted - kMaxTasks| tasks, don't wait for them
509   // as they'll be blocked behind the above kMaxtasks.
510   for (size_t i = kMaxTasks; i < kNumTasksPosted; ++i)
511     task_runner->PostTask(FROM_HERE, DoNothing());
512 
513   EXPECT_EQ(0U, thread_group_->NumberOfWorkersForTesting());
514 
515   StartThreadGroup(TimeDelta::Max(), kMaxTasks);
516   EXPECT_GT(thread_group_->NumberOfWorkersForTesting(), 0U);
517   EXPECT_EQ(kMaxTasks, thread_group_->GetMaxTasksForTesting());
518 
519   threads_running.Wait();
520   EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(),
521             thread_group_->GetMaxTasksForTesting());
522   threads_continue.Signal();
523   task_tracker_.FlushForTesting();
524 }
525 
526 namespace {
527 
528 class BackgroundThreadGroupImplTest : public ThreadGroupImplImplTest {
529  public:
CreateAndStartThreadGroup(TimeDelta suggested_reclaim_time=TimeDelta::Max (),size_t max_tasks=kMaxTasks,std::optional<int> max_best_effort_tasks=std::nullopt,WorkerThreadObserver * worker_observer=nullptr,std::optional<TimeDelta> may_block_threshold=std::nullopt)530   void CreateAndStartThreadGroup(
531       TimeDelta suggested_reclaim_time = TimeDelta::Max(),
532       size_t max_tasks = kMaxTasks,
533       std::optional<int> max_best_effort_tasks = std::nullopt,
534       WorkerThreadObserver* worker_observer = nullptr,
535       std::optional<TimeDelta> may_block_threshold = std::nullopt) {
536     if (!CanUseBackgroundThreadTypeForWorkerThread())
537       return;
538     CreateThreadGroup(ThreadType::kBackground);
539     StartThreadGroup(suggested_reclaim_time, max_tasks, max_best_effort_tasks,
540                      worker_observer, may_block_threshold);
541   }
542 
SetUp()543   void SetUp() override { CreateAndStartThreadGroup(); }
544 };
545 
546 }  // namespace
547 
548 // Verify that ScopedBlockingCall updates thread type when necessary per
549 // shutdown state.
TEST_F(BackgroundThreadGroupImplTest,UpdatePriorityBlockingStarted)550 TEST_F(BackgroundThreadGroupImplTest, UpdatePriorityBlockingStarted) {
551   if (!CanUseBackgroundThreadTypeForWorkerThread())
552     return;
553 
554   const scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
555       {MayBlock(), WithBaseSyncPrimitives(), TaskPriority::BEST_EFFORT},
556       &mock_pooled_task_runner_delegate_);
557 
558   TestWaitableEvent threads_running;
559   RepeatingClosure threads_running_barrier = BarrierClosure(
560       kMaxTasks,
561       BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
562 
563   TestWaitableEvent blocking_threads_continue;
564 
565   for (size_t i = 0; i < kMaxTasks; ++i) {
566     task_runner->PostTask(
567         FROM_HERE, BindLambdaForTesting([&]() {
568           EXPECT_EQ(ThreadType::kBackground,
569                     PlatformThread::GetCurrentThreadType());
570           {
571             // ScopedBlockingCall before shutdown doesn't affect priority.
572             ScopedBlockingCall scoped_blocking_call(FROM_HERE,
573                                                     BlockingType::MAY_BLOCK);
574             EXPECT_EQ(ThreadType::kBackground,
575                       PlatformThread::GetCurrentThreadType());
576           }
577           threads_running_barrier.Run();
578           blocking_threads_continue.Wait();
579           // This is reached after StartShutdown(), at which point we expect
580           // ScopedBlockingCall to update thread priority.
581           ScopedBlockingCall scoped_blocking_call(FROM_HERE,
582                                                   BlockingType::MAY_BLOCK);
583           EXPECT_EQ(ThreadType::kDefault,
584                     PlatformThread::GetCurrentThreadType());
585         }));
586   }
587   threads_running.Wait();
588 
589   task_tracker_.StartShutdown();
590   blocking_threads_continue.Signal();
591   task_tracker_.FlushForTesting();
592 }
593 
594 namespace {
595 
596 class ThreadGroupImplStandbyPolicyTest : public ThreadGroupImplImplTestBase,
597                                          public testing::Test {
598  public:
599   ThreadGroupImplStandbyPolicyTest() = default;
600   ThreadGroupImplStandbyPolicyTest(const ThreadGroupImplStandbyPolicyTest&) =
601       delete;
602   ThreadGroupImplStandbyPolicyTest& operator=(
603       const ThreadGroupImplStandbyPolicyTest&) = delete;
604 
SetUp()605   void SetUp() override {
606     CreateAndStartThreadGroup(kReclaimTimeForCleanupTests);
607   }
608 
TearDown()609   void TearDown() override { ThreadGroupImplImplTestBase::CommonTearDown(); }
610 };
611 
612 }  // namespace
613 
TEST_F(ThreadGroupImplStandbyPolicyTest,InitOne)614 TEST_F(ThreadGroupImplStandbyPolicyTest, InitOne) {
615   EXPECT_EQ(1U, thread_group_->NumberOfWorkersForTesting());
616 }
617 
618 namespace {
619 
620 enum class OptionalBlockingType {
621   NO_BLOCK,
622   MAY_BLOCK,
623   WILL_BLOCK,
624 };
625 
626 struct NestedBlockingType {
NestedBlockingTypebase::internal::__anon866189150d11::NestedBlockingType627   NestedBlockingType(BlockingType first_in,
628                      OptionalBlockingType second_in,
629                      BlockingType behaves_as_in)
630       : first(first_in), second(second_in), behaves_as(behaves_as_in) {}
631 
632   BlockingType first;
633   OptionalBlockingType second;
634   BlockingType behaves_as;
635 };
636 
637 class NestedScopedBlockingCall {
638  public:
NestedScopedBlockingCall(const NestedBlockingType & nested_blocking_type)639   explicit NestedScopedBlockingCall(
640       const NestedBlockingType& nested_blocking_type)
641       : first_scoped_blocking_call_(FROM_HERE, nested_blocking_type.first),
642         second_scoped_blocking_call_(
643             nested_blocking_type.second == OptionalBlockingType::WILL_BLOCK
644                 ? std::make_unique<ScopedBlockingCall>(FROM_HERE,
645                                                        BlockingType::WILL_BLOCK)
646                 : (nested_blocking_type.second ==
647                            OptionalBlockingType::MAY_BLOCK
648                        ? std::make_unique<ScopedBlockingCall>(
649                              FROM_HERE,
650                              BlockingType::MAY_BLOCK)
651                        : nullptr)) {}
652   NestedScopedBlockingCall(const NestedScopedBlockingCall&) = delete;
653   NestedScopedBlockingCall& operator=(const NestedScopedBlockingCall&) = delete;
654 
655  private:
656   ScopedBlockingCall first_scoped_blocking_call_;
657   std::unique_ptr<ScopedBlockingCall> second_scoped_blocking_call_;
658 };
659 
660 }  // namespace
661 
662 class ThreadGroupImplBlockingTest
663     : public ThreadGroupImplImplTestBase,
664       public testing::TestWithParam<NestedBlockingType> {
665  public:
666   ThreadGroupImplBlockingTest() = default;
667   ThreadGroupImplBlockingTest(const ThreadGroupImplBlockingTest&) = delete;
668   ThreadGroupImplBlockingTest& operator=(const ThreadGroupImplBlockingTest&) =
669       delete;
670 
ParamInfoToString(::testing::TestParamInfo<NestedBlockingType> param_info)671   static std::string ParamInfoToString(
672       ::testing::TestParamInfo<NestedBlockingType> param_info) {
673     std::string str = param_info.param.first == BlockingType::MAY_BLOCK
674                           ? "MAY_BLOCK"
675                           : "WILL_BLOCK";
676     if (param_info.param.second == OptionalBlockingType::MAY_BLOCK)
677       str += "_MAY_BLOCK";
678     else if (param_info.param.second == OptionalBlockingType::WILL_BLOCK)
679       str += "_WILL_BLOCK";
680     return str;
681   }
682 
TearDown()683   void TearDown() override { ThreadGroupImplImplTestBase::CommonTearDown(); }
684 
685  protected:
686   // Saturates the thread group with a task that first blocks, waits to be
687   // unblocked, then exits.
SaturateWithBlockingTasks(const NestedBlockingType & nested_blocking_type,TaskPriority priority=TaskPriority::USER_BLOCKING)688   void SaturateWithBlockingTasks(
689       const NestedBlockingType& nested_blocking_type,
690       TaskPriority priority = TaskPriority::USER_BLOCKING) {
691     TestWaitableEvent threads_running;
692 
693     const scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
694         {MayBlock(), WithBaseSyncPrimitives(), priority},
695         &mock_pooled_task_runner_delegate_);
696 
697     RepeatingClosure threads_running_barrier = BarrierClosure(
698         kMaxTasks,
699         BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
700 
701     for (size_t i = 0; i < kMaxTasks; ++i) {
702       task_runner->PostTask(
703           FROM_HERE, BindLambdaForTesting([this, &threads_running_barrier,
704                                            nested_blocking_type]() {
705             NestedScopedBlockingCall nested_scoped_blocking_call(
706                 nested_blocking_type);
707             threads_running_barrier.Run();
708             blocking_threads_continue_.Wait();
709           }));
710     }
711     threads_running.Wait();
712   }
713 
714   // Saturates the thread group with a task that waits for other tasks without
715   // entering a ScopedBlockingCall, then exits.
SaturateWithBusyTasks(TaskPriority priority=TaskPriority::USER_BLOCKING,TaskShutdownBehavior shutdown_behavior=TaskShutdownBehavior::SKIP_ON_SHUTDOWN)716   void SaturateWithBusyTasks(
717       TaskPriority priority = TaskPriority::USER_BLOCKING,
718       TaskShutdownBehavior shutdown_behavior =
719           TaskShutdownBehavior::SKIP_ON_SHUTDOWN) {
720     TestWaitableEvent threads_running;
721 
722     const scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
723         {MayBlock(), WithBaseSyncPrimitives(), priority, shutdown_behavior},
724         &mock_pooled_task_runner_delegate_);
725 
726     RepeatingClosure threads_running_barrier = BarrierClosure(
727         kMaxTasks,
728         BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
729     // Posting these tasks should cause new workers to be created.
730     for (size_t i = 0; i < kMaxTasks; ++i) {
731       task_runner->PostTask(
732           FROM_HERE, BindLambdaForTesting([this, &threads_running_barrier]() {
733             threads_running_barrier.Run();
734             busy_threads_continue_.Wait();
735           }));
736     }
737     threads_running.Wait();
738   }
739 
740   // Returns how long we can expect a change to |max_tasks_| to occur
741   // after a task has become blocked.
GetMaxTasksChangeSleepTime()742   TimeDelta GetMaxTasksChangeSleepTime() {
743     return std::max(thread_group_->blocked_workers_poll_period_for_testing(),
744                     thread_group_->may_block_threshold_for_testing()) +
745            TestTimeouts::tiny_timeout();
746   }
747 
748   // Waits indefinitely, until |thread_group_|'s max tasks increases to
749   // |expected_max_tasks|.
ExpectMaxTasksIncreasesTo(size_t expected_max_tasks)750   void ExpectMaxTasksIncreasesTo(size_t expected_max_tasks) {
751     size_t max_tasks = thread_group_->GetMaxTasksForTesting();
752     while (max_tasks != expected_max_tasks) {
753       PlatformThread::Sleep(GetMaxTasksChangeSleepTime());
754       size_t new_max_tasks = thread_group_->GetMaxTasksForTesting();
755       ASSERT_GE(new_max_tasks, max_tasks);
756       max_tasks = new_max_tasks;
757     }
758   }
759 
760   // Unblocks tasks posted by SaturateWithBlockingTasks().
UnblockBlockingTasks()761   void UnblockBlockingTasks() { blocking_threads_continue_.Signal(); }
762 
763   // Unblocks tasks posted by SaturateWithBusyTasks().
UnblockBusyTasks()764   void UnblockBusyTasks() { busy_threads_continue_.Signal(); }
765 
766   const scoped_refptr<TaskRunner> task_runner_ =
767       test::CreatePooledTaskRunner({MayBlock(), WithBaseSyncPrimitives()},
768                                    &mock_pooled_task_runner_delegate_);
769 
770  private:
771   TestWaitableEvent blocking_threads_continue_;
772   TestWaitableEvent busy_threads_continue_;
773 };
774 
775 // Verify that SaturateWithBlockingTasks() causes max tasks to increase and
776 // creates a worker if needed. Also verify that UnblockBlockingTasks() decreases
777 // max tasks after an increase.
TEST_P(ThreadGroupImplBlockingTest,ThreadBlockedUnblocked)778 TEST_P(ThreadGroupImplBlockingTest, ThreadBlockedUnblocked) {
779   CreateAndStartThreadGroup();
780 
781   ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
782 
783   SaturateWithBlockingTasks(GetParam());
784 
785   // Forces |kMaxTasks| extra workers to be instantiated by posting tasks. This
786   // should not block forever.
787   SaturateWithBusyTasks();
788 
789   EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
790 
791   UnblockBusyTasks();
792   UnblockBlockingTasks();
793   task_tracker_.FlushForTesting();
794   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
795 }
796 
797 // Verify that SaturateWithBlockingTasks() of BEST_EFFORT tasks causes max best
798 // effort tasks to increase and creates a worker if needed. Also verify that
799 // UnblockBlockingTasks() decreases max best effort tasks after an increase.
TEST_P(ThreadGroupImplBlockingTest,ThreadBlockedUnblockedBestEffort)800 TEST_P(ThreadGroupImplBlockingTest, ThreadBlockedUnblockedBestEffort) {
801   CreateAndStartThreadGroup();
802 
803   ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
804   ASSERT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
805 
806   SaturateWithBlockingTasks(GetParam(), TaskPriority::BEST_EFFORT);
807 
808   // Forces |kMaxTasks| extra workers to be instantiated by posting tasks. This
809   // should not block forever.
810   SaturateWithBusyTasks(TaskPriority::BEST_EFFORT);
811 
812   EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
813 
814   UnblockBusyTasks();
815   UnblockBlockingTasks();
816   task_tracker_.FlushForTesting();
817   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
818   EXPECT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
819 }
820 
821 // Verify that flooding the thread group with more BEST_EFFORT tasks than
822 // kMaxBestEffortTasks doesn't prevent USER_VISIBLE tasks from running.
TEST_P(ThreadGroupImplBlockingTest,TooManyBestEffortTasks)823 TEST_P(ThreadGroupImplBlockingTest, TooManyBestEffortTasks) {
824   constexpr size_t kMaxBestEffortTasks = kMaxTasks / 2;
825 
826   CreateAndStartThreadGroup(TimeDelta::Max(), kMaxTasks, kMaxBestEffortTasks);
827 
828   TestWaitableEvent threads_continue;
829   {
830     TestWaitableEvent entered_blocking_scope;
831     RepeatingClosure entered_blocking_scope_barrier = BarrierClosure(
832         kMaxBestEffortTasks + 1, BindOnce(&TestWaitableEvent::Signal,
833                                           Unretained(&entered_blocking_scope)));
834     TestWaitableEvent exit_blocking_scope;
835 
836     TestWaitableEvent threads_running;
837     RepeatingClosure threads_running_barrier = BarrierClosure(
838         kMaxBestEffortTasks + 1,
839         BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
840 
841     const auto best_effort_task_runner =
842         test::CreatePooledTaskRunner({TaskPriority::BEST_EFFORT, MayBlock()},
843                                      &mock_pooled_task_runner_delegate_);
844     for (size_t i = 0; i < kMaxBestEffortTasks + 1; ++i) {
845       best_effort_task_runner->PostTask(
846           FROM_HERE, BindLambdaForTesting([&]() {
847             {
848               NestedScopedBlockingCall scoped_blocking_call(GetParam());
849               entered_blocking_scope_barrier.Run();
850               exit_blocking_scope.Wait();
851             }
852             threads_running_barrier.Run();
853             threads_continue.Wait();
854           }));
855     }
856     entered_blocking_scope.Wait();
857     exit_blocking_scope.Signal();
858     threads_running.Wait();
859   }
860 
861   // At this point, kMaxBestEffortTasks + 1 threads are running (plus
862   // potentially the idle thread), but max_task and max_best_effort_task are
863   // back to normal.
864   EXPECT_GE(thread_group_->NumberOfWorkersForTesting(),
865             kMaxBestEffortTasks + 1);
866   EXPECT_LE(thread_group_->NumberOfWorkersForTesting(),
867             kMaxBestEffortTasks + 2);
868   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
869 
870   TestWaitableEvent threads_running;
871   task_runner_->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
872                            threads_running.Signal();
873                            threads_continue.Wait();
874                          }));
875 
876   // This should not block forever.
877   threads_running.Wait();
878 
879   EXPECT_GE(thread_group_->NumberOfWorkersForTesting(),
880             kMaxBestEffortTasks + 2);
881   EXPECT_LE(thread_group_->NumberOfWorkersForTesting(),
882             kMaxBestEffortTasks + 3);
883   threads_continue.Signal();
884 
885   task_tracker_.FlushForTesting();
886 }
887 
888 // Verify that tasks posted in a saturated thread group before a
889 // ScopedBlockingCall will execute after ScopedBlockingCall is instantiated.
TEST_P(ThreadGroupImplBlockingTest,PostBeforeBlocking)890 TEST_P(ThreadGroupImplBlockingTest, PostBeforeBlocking) {
891   CreateAndStartThreadGroup();
892 
893   TestWaitableEvent thread_running(WaitableEvent::ResetPolicy::AUTOMATIC);
894   TestWaitableEvent thread_can_block;
895   TestWaitableEvent threads_continue;
896 
897   for (size_t i = 0; i < kMaxTasks; ++i) {
898     task_runner_->PostTask(
899         FROM_HERE,
900         BindOnce(
901             [](const NestedBlockingType& nested_blocking_type,
902                TestWaitableEvent* thread_running,
903                TestWaitableEvent* thread_can_block,
904                TestWaitableEvent* threads_continue) {
905               thread_running->Signal();
906               thread_can_block->Wait();
907 
908               NestedScopedBlockingCall nested_scoped_blocking_call(
909                   nested_blocking_type);
910               threads_continue->Wait();
911             },
912             GetParam(), Unretained(&thread_running),
913             Unretained(&thread_can_block), Unretained(&threads_continue)));
914     thread_running.Wait();
915   }
916 
917   // All workers should be occupied and the thread group should be saturated.
918   // Workers have not entered ScopedBlockingCall yet.
919   EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), kMaxTasks);
920   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
921 
922   TestWaitableEvent extra_threads_running;
923   TestWaitableEvent extra_threads_continue;
924   RepeatingClosure extra_threads_running_barrier = BarrierClosure(
925       kMaxTasks,
926       BindOnce(&TestWaitableEvent::Signal, Unretained(&extra_threads_running)));
927   for (size_t i = 0; i < kMaxTasks; ++i) {
928     task_runner_->PostTask(
929         FROM_HERE, BindOnce(
930                        [](RepeatingClosure* extra_threads_running_barrier,
931                           TestWaitableEvent* extra_threads_continue) {
932                          extra_threads_running_barrier->Run();
933                          extra_threads_continue->Wait();
934                        },
935                        Unretained(&extra_threads_running_barrier),
936                        Unretained(&extra_threads_continue)));
937   }
938 
939   // Allow tasks to enter ScopedBlockingCall. Workers should be created for the
940   // tasks we just posted.
941   thread_can_block.Signal();
942 
943   // Should not block forever.
944   extra_threads_running.Wait();
945   EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
946   extra_threads_continue.Signal();
947 
948   threads_continue.Signal();
949   task_tracker_.FlushForTesting();
950 }
951 
952 // Verify that workers become idle when the thread group is over-capacity and
953 // that those workers do no work.
TEST_P(ThreadGroupImplBlockingTest,WorkersIdleWhenOverCapacity)954 TEST_P(ThreadGroupImplBlockingTest, WorkersIdleWhenOverCapacity) {
955   CreateAndStartThreadGroup();
956 
957   ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
958 
959   SaturateWithBlockingTasks(GetParam());
960 
961   // Forces |kMaxTasks| extra workers to be instantiated by posting tasks.
962   SaturateWithBusyTasks();
963 
964   ASSERT_EQ(thread_group_->NumberOfIdleWorkersForTesting(), 0U);
965   EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
966 
967   AtomicFlag is_exiting;
968   // These tasks should not get executed until after other tasks become
969   // unblocked.
970   for (size_t i = 0; i < kMaxTasks; ++i) {
971     task_runner_->PostTask(FROM_HERE, BindOnce(
972                                           [](AtomicFlag* is_exiting) {
973                                             EXPECT_TRUE(is_exiting->IsSet());
974                                           },
975                                           Unretained(&is_exiting)));
976   }
977 
978   // The original |kMaxTasks| will finish their tasks after being unblocked.
979   // There will be work in the work queue, but the thread group should now be
980   // over-capacity and workers will become idle.
981   UnblockBlockingTasks();
982   thread_group_->WaitForWorkersIdleForTesting(kMaxTasks);
983   EXPECT_EQ(thread_group_->NumberOfIdleWorkersForTesting(), kMaxTasks);
984 
985   // Posting more tasks should not cause workers idle from the thread group
986   // being over capacity to begin doing work.
987   for (size_t i = 0; i < kMaxTasks; ++i) {
988     task_runner_->PostTask(FROM_HERE, BindOnce(
989                                           [](AtomicFlag* is_exiting) {
990                                             EXPECT_TRUE(is_exiting->IsSet());
991                                           },
992                                           Unretained(&is_exiting)));
993   }
994 
995   // Give time for those idle workers to possibly do work (which should not
996   // happen).
997   PlatformThread::Sleep(TestTimeouts::tiny_timeout());
998 
999   is_exiting.Set();
1000   // Unblocks the new workers.
1001   UnblockBusyTasks();
1002   task_tracker_.FlushForTesting();
1003 }
1004 
1005 // Verify that an increase of max tasks with SaturateWithBlockingTasks()
1006 // increases the number of tasks that can run before ShouldYield returns true.
TEST_P(ThreadGroupImplBlockingTest,ThreadBlockedUnblockedShouldYield)1007 TEST_P(ThreadGroupImplBlockingTest, ThreadBlockedUnblockedShouldYield) {
1008   CreateAndStartThreadGroup();
1009 
1010   ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1011 
1012   EXPECT_FALSE(
1013       thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
1014   SaturateWithBlockingTasks(GetParam());
1015   EXPECT_FALSE(
1016       thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
1017 
1018   // Forces |kMaxTasks| extra workers to be instantiated by posting tasks. This
1019   // should not block forever.
1020   SaturateWithBusyTasks();
1021 
1022   // All tasks can run, hence ShouldYield returns false.
1023   EXPECT_FALSE(
1024       thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
1025 
1026   // Post a USER_VISIBLE task that can't run since workers are saturated. This
1027   // should cause BEST_EFFORT tasks to yield.
1028   test::CreatePooledTaskRunner({TaskPriority::USER_VISIBLE},
1029                                &mock_pooled_task_runner_delegate_)
1030       ->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
1031                    EXPECT_FALSE(thread_group_->ShouldYield(
1032                        {TaskPriority::BEST_EFFORT, TimeTicks()}));
1033                  }));
1034   EXPECT_TRUE(
1035       thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
1036 
1037   // Post a USER_BLOCKING task that can't run since workers are saturated. This
1038   // should cause USER_VISIBLE tasks to yield.
1039   test::CreatePooledTaskRunner({TaskPriority::USER_BLOCKING},
1040                                &mock_pooled_task_runner_delegate_)
1041       ->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
1042                    EXPECT_FALSE(thread_group_->ShouldYield(
1043                        {TaskPriority::USER_VISIBLE, TimeTicks()}));
1044                  }));
1045   EXPECT_TRUE(
1046       thread_group_->ShouldYield({TaskPriority::USER_VISIBLE, TimeTicks()}));
1047 
1048   UnblockBusyTasks();
1049   UnblockBlockingTasks();
1050   task_tracker_.FlushForTesting();
1051   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1052 }
1053 
1054 INSTANTIATE_TEST_SUITE_P(
1055     All,
1056     ThreadGroupImplBlockingTest,
1057     ::testing::Values(NestedBlockingType(BlockingType::MAY_BLOCK,
1058                                          OptionalBlockingType::NO_BLOCK,
1059                                          BlockingType::MAY_BLOCK),
1060                       NestedBlockingType(BlockingType::WILL_BLOCK,
1061                                          OptionalBlockingType::NO_BLOCK,
1062                                          BlockingType::WILL_BLOCK),
1063                       NestedBlockingType(BlockingType::MAY_BLOCK,
1064                                          OptionalBlockingType::WILL_BLOCK,
1065                                          BlockingType::WILL_BLOCK),
1066                       NestedBlockingType(BlockingType::WILL_BLOCK,
1067                                          OptionalBlockingType::MAY_BLOCK,
1068                                          BlockingType::WILL_BLOCK)),
1069     ThreadGroupImplBlockingTest::ParamInfoToString);
1070 
1071 // Verify that if a thread enters the scope of a MAY_BLOCK ScopedBlockingCall,
1072 // but exits the scope before the MayBlock threshold is reached, that the max
1073 // tasks does not increase.
TEST_F(ThreadGroupImplBlockingTest,ThreadBlockUnblockPremature)1074 TEST_F(ThreadGroupImplBlockingTest, ThreadBlockUnblockPremature) {
1075   // Create a thread group with an infinite MayBlock threshold so that a
1076   // MAY_BLOCK ScopedBlockingCall never increases the max tasks.
1077   CreateAndStartThreadGroup(TimeDelta::Max(),   // |suggested_reclaim_time|
1078                             kMaxTasks,          // |max_tasks|
1079                             std::nullopt,       // |max_best_effort_tasks|
1080                             nullptr,            // |worker_observer|
1081                             TimeDelta::Max());  // |may_block_threshold|
1082 
1083   ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1084 
1085   SaturateWithBlockingTasks(NestedBlockingType(BlockingType::MAY_BLOCK,
1086                                                OptionalBlockingType::NO_BLOCK,
1087                                                BlockingType::MAY_BLOCK));
1088   PlatformThread::Sleep(
1089       2 * thread_group_->blocked_workers_poll_period_for_testing());
1090   EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), kMaxTasks);
1091   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1092 
1093   UnblockBlockingTasks();
1094   task_tracker_.FlushForTesting();
1095   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1096 }
1097 
1098 // Verify that if a BEST_EFFORT task enters the scope of a WILL_BLOCK
1099 // ScopedBlockingCall, but exits the scope before the MayBlock threshold is
1100 // reached, that the max best effort tasks does not increase.
TEST_F(ThreadGroupImplBlockingTest,ThreadBlockUnblockPrematureBestEffort)1101 TEST_F(ThreadGroupImplBlockingTest, ThreadBlockUnblockPrematureBestEffort) {
1102   // Create a thread group with an infinite MayBlock threshold so that a
1103   // MAY_BLOCK ScopedBlockingCall never increases the max tasks.
1104   CreateAndStartThreadGroup(TimeDelta::Max(),   // |suggested_reclaim_time|
1105                             kMaxTasks,          // |max_tasks|
1106                             kMaxTasks,          // |max_best_effort_tasks|
1107                             nullptr,            // |worker_observer|
1108                             TimeDelta::Max());  // |may_block_threshold|
1109 
1110   ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1111   ASSERT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
1112 
1113   SaturateWithBlockingTasks(NestedBlockingType(BlockingType::WILL_BLOCK,
1114                                                OptionalBlockingType::NO_BLOCK,
1115                                                BlockingType::WILL_BLOCK),
1116                             TaskPriority::BEST_EFFORT);
1117   PlatformThread::Sleep(
1118       2 * thread_group_->blocked_workers_poll_period_for_testing());
1119   EXPECT_GE(thread_group_->NumberOfWorkersForTesting(), kMaxTasks);
1120   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), 2 * kMaxTasks);
1121   EXPECT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
1122 
1123   UnblockBlockingTasks();
1124   task_tracker_.FlushForTesting();
1125   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1126   EXPECT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
1127 }
1128 
1129 // Verify that if max tasks is incremented because of a MAY_BLOCK
1130 // ScopedBlockingCall, it isn't incremented again when there is a nested
1131 // WILL_BLOCK ScopedBlockingCall.
TEST_F(ThreadGroupImplBlockingTest,MayBlockIncreaseCapacityNestedWillBlock)1132 TEST_F(ThreadGroupImplBlockingTest, MayBlockIncreaseCapacityNestedWillBlock) {
1133   CreateAndStartThreadGroup();
1134 
1135   ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1136   auto task_runner =
1137       test::CreatePooledTaskRunner({MayBlock(), WithBaseSyncPrimitives()},
1138                                    &mock_pooled_task_runner_delegate_);
1139   TestWaitableEvent can_return;
1140 
1141   // Saturate the thread group so that a MAY_BLOCK ScopedBlockingCall would
1142   // increment the max tasks.
1143   for (size_t i = 0; i < kMaxTasks - 1; ++i) {
1144     task_runner->PostTask(
1145         FROM_HERE, BindOnce(&TestWaitableEvent::Wait, Unretained(&can_return)));
1146   }
1147 
1148   TestWaitableEvent can_instantiate_will_block;
1149   TestWaitableEvent did_instantiate_will_block;
1150 
1151   // Post a task that instantiates a MAY_BLOCK ScopedBlockingCall.
1152   task_runner->PostTask(
1153       FROM_HERE,
1154       BindOnce(
1155           [](TestWaitableEvent* can_instantiate_will_block,
1156              TestWaitableEvent* did_instantiate_will_block,
1157              TestWaitableEvent* can_return) {
1158             ScopedBlockingCall may_block(FROM_HERE, BlockingType::MAY_BLOCK);
1159             can_instantiate_will_block->Wait();
1160             ScopedBlockingCall will_block(FROM_HERE, BlockingType::WILL_BLOCK);
1161             did_instantiate_will_block->Signal();
1162             can_return->Wait();
1163           },
1164           Unretained(&can_instantiate_will_block),
1165           Unretained(&did_instantiate_will_block), Unretained(&can_return)));
1166 
1167   // After a short delay, max tasks should be incremented.
1168   ExpectMaxTasksIncreasesTo(kMaxTasks + 1);
1169 
1170   // Wait until the task instantiates a WILL_BLOCK ScopedBlockingCall.
1171   can_instantiate_will_block.Signal();
1172   did_instantiate_will_block.Wait();
1173 
1174   // Max tasks shouldn't be incremented again.
1175   EXPECT_EQ(kMaxTasks + 1, thread_group_->GetMaxTasksForTesting());
1176 
1177   // Tear down.
1178   can_return.Signal();
1179   task_tracker_.FlushForTesting();
1180   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1181 }
1182 
1183 // Verify that OnShutdownStarted() causes max tasks to increase and creates a
1184 // worker if needed. Also verify that UnblockBusyTasks() decreases max tasks
1185 // after an increase.
TEST_F(ThreadGroupImplBlockingTest,ThreadBusyShutdown)1186 TEST_F(ThreadGroupImplBlockingTest, ThreadBusyShutdown) {
1187   CreateAndStartThreadGroup();
1188   ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1189 
1190   SaturateWithBusyTasks(TaskPriority::BEST_EFFORT,
1191                         TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN);
1192   thread_group_->OnShutdownStarted();
1193 
1194   // Forces |kMaxTasks| extra workers to be instantiated by posting tasks. This
1195   // should not block forever.
1196   SaturateWithBusyTasks(TaskPriority::BEST_EFFORT,
1197                         TaskShutdownBehavior::BLOCK_SHUTDOWN);
1198 
1199   EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
1200 
1201   UnblockBusyTasks();
1202   task_tracker_.FlushForTesting();
1203   thread_group_->JoinForTesting();
1204   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1205   mock_pooled_task_runner_delegate_.SetThreadGroup(nullptr);
1206   thread_group_.reset();
1207 }
1208 
1209 enum class ReclaimType { DELAYED_RECLAIM, NO_RECLAIM };
1210 
1211 class ThreadGroupImplOverCapacityTest
1212     : public ThreadGroupImplImplTestBase,
1213       public testing::TestWithParam<ReclaimType> {
1214  public:
1215   ThreadGroupImplOverCapacityTest() = default;
1216   ThreadGroupImplOverCapacityTest(const ThreadGroupImplOverCapacityTest&) =
1217       delete;
1218   ThreadGroupImplOverCapacityTest& operator=(
1219       const ThreadGroupImplOverCapacityTest&) = delete;
1220 
SetUp()1221   void SetUp() override {
1222     if (GetParam() == ReclaimType::NO_RECLAIM) {
1223       feature_list.InitAndEnableFeature(kNoWorkerThreadReclaim);
1224     }
1225     CreateThreadGroup();
1226     task_runner_ =
1227         test::CreatePooledTaskRunner({MayBlock(), WithBaseSyncPrimitives()},
1228                                      &mock_pooled_task_runner_delegate_);
1229   }
1230 
TearDown()1231   void TearDown() override { ThreadGroupImplImplTestBase::CommonTearDown(); }
1232 
1233  protected:
1234   base::test::ScopedFeatureList feature_list;
1235   scoped_refptr<TaskRunner> task_runner_;
1236   static constexpr size_t kLocalMaxTasks = 3;
1237 
CreateThreadGroup()1238   void CreateThreadGroup() {
1239     ASSERT_FALSE(thread_group_);
1240     service_thread_.Start();
1241     delayed_task_manager_.Start(service_thread_.task_runner());
1242     thread_group_ = std::make_unique<ThreadGroupImpl>(
1243         "OverCapacityTestThreadGroup", "A", ThreadType::kDefault,
1244         task_tracker_.GetTrackedRef(), tracked_ref_factory_.GetTrackedRef());
1245     ASSERT_TRUE(thread_group_);
1246 
1247     mock_pooled_task_runner_delegate_.SetThreadGroup(thread_group_.get());
1248   }
1249 };
1250 
1251 // Verify that workers that become idle due to the thread group being over
1252 // capacity will eventually cleanup.
TEST_P(ThreadGroupImplOverCapacityTest,VerifyCleanup)1253 TEST_P(ThreadGroupImplOverCapacityTest, VerifyCleanup) {
1254   StartThreadGroup(kReclaimTimeForCleanupTests, kLocalMaxTasks);
1255   TestWaitableEvent threads_running;
1256   TestWaitableEvent threads_continue;
1257   RepeatingClosure threads_running_barrier = BarrierClosure(
1258       kLocalMaxTasks,
1259       BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
1260 
1261   TestWaitableEvent blocked_call_continue;
1262   RepeatingClosure closure = BindRepeating(
1263       [](RepeatingClosure* threads_running_barrier,
1264          TestWaitableEvent* threads_continue,
1265          TestWaitableEvent* blocked_call_continue) {
1266         threads_running_barrier->Run();
1267         {
1268           ScopedBlockingCall scoped_blocking_call(FROM_HERE,
1269                                                   BlockingType::WILL_BLOCK);
1270           blocked_call_continue->Wait();
1271         }
1272         threads_continue->Wait();
1273       },
1274       Unretained(&threads_running_barrier), Unretained(&threads_continue),
1275       Unretained(&blocked_call_continue));
1276 
1277   for (size_t i = 0; i < kLocalMaxTasks; ++i)
1278     task_runner_->PostTask(FROM_HERE, closure);
1279 
1280   threads_running.Wait();
1281 
1282   TestWaitableEvent extra_threads_running;
1283   TestWaitableEvent extra_threads_continue;
1284 
1285   RepeatingClosure extra_threads_running_barrier = BarrierClosure(
1286       kLocalMaxTasks,
1287       BindOnce(&TestWaitableEvent::Signal, Unretained(&extra_threads_running)));
1288   // These tasks should run on the new threads from increasing max tasks.
1289   for (size_t i = 0; i < kLocalMaxTasks; ++i) {
1290     task_runner_->PostTask(
1291         FROM_HERE, BindOnce(
1292                        [](RepeatingClosure* extra_threads_running_barrier,
1293                           TestWaitableEvent* extra_threads_continue) {
1294                          extra_threads_running_barrier->Run();
1295                          extra_threads_continue->Wait();
1296                        },
1297                        Unretained(&extra_threads_running_barrier),
1298                        Unretained(&extra_threads_continue)));
1299   }
1300   extra_threads_running.Wait();
1301 
1302   ASSERT_EQ(kLocalMaxTasks * 2, thread_group_->NumberOfWorkersForTesting());
1303   EXPECT_EQ(kLocalMaxTasks * 2, thread_group_->GetMaxTasksForTesting());
1304   blocked_call_continue.Signal();
1305   extra_threads_continue.Signal();
1306 
1307   // Periodically post tasks to ensure that posting tasks does not prevent
1308   // workers that are idle due to the thread group being over capacity from
1309   // cleaning up.
1310   for (int i = 0; i < 16; ++i) {
1311     task_runner_->PostDelayedTask(FROM_HERE, DoNothing(),
1312                                   kReclaimTimeForCleanupTests * i * 0.5);
1313   }
1314 
1315   if (GetParam() == ReclaimType::DELAYED_RECLAIM) {
1316     // Note: one worker above capacity will not get cleaned up since it's on the
1317     // front of the idle set.
1318     thread_group_->WaitForWorkersCleanedUpForTesting(kLocalMaxTasks - 1);
1319     EXPECT_EQ(kLocalMaxTasks + 1, thread_group_->NumberOfWorkersForTesting());
1320     threads_continue.Signal();
1321   } else {
1322     // When workers are't automatically reclaimed after a delay, blocking tasks
1323     // need to return for extra workers to be cleaned up.
1324     threads_continue.Signal();
1325     thread_group_->WaitForWorkersCleanedUpForTesting(kLocalMaxTasks);
1326     EXPECT_EQ(kLocalMaxTasks, thread_group_->NumberOfWorkersForTesting());
1327   }
1328 
1329   threads_continue.Signal();
1330   task_tracker_.FlushForTesting();
1331 }
1332 
1333 INSTANTIATE_TEST_SUITE_P(ReclaimType,
1334                          ThreadGroupImplOverCapacityTest,
1335                          ::testing::Values(ReclaimType::DELAYED_RECLAIM,
1336                                            ReclaimType::NO_RECLAIM));
1337 
1338 // Verify that the maximum number of workers is 256 and that hitting the max
1339 // leaves the thread group in a valid state with regards to max tasks.
TEST_F(ThreadGroupImplBlockingTest,MaximumWorkersTest)1340 TEST_F(ThreadGroupImplBlockingTest, MaximumWorkersTest) {
1341   CreateAndStartThreadGroup();
1342 
1343   constexpr size_t kMaxNumberOfWorkers = 256;
1344   constexpr size_t kNumExtraTasks = 10;
1345 
1346   TestWaitableEvent early_blocking_threads_running;
1347   RepeatingClosure early_threads_barrier_closure =
1348       BarrierClosure(kMaxNumberOfWorkers,
1349                      BindOnce(&TestWaitableEvent::Signal,
1350                               Unretained(&early_blocking_threads_running)));
1351 
1352   TestWaitableEvent early_threads_finished;
1353   RepeatingClosure early_threads_finished_barrier = BarrierClosure(
1354       kMaxNumberOfWorkers, BindOnce(&TestWaitableEvent::Signal,
1355                                     Unretained(&early_threads_finished)));
1356 
1357   TestWaitableEvent early_release_threads_continue;
1358 
1359   // Post ScopedBlockingCall tasks to hit the worker cap.
1360   for (size_t i = 0; i < kMaxNumberOfWorkers; ++i) {
1361     task_runner_->PostTask(
1362         FROM_HERE, BindOnce(
1363                        [](RepeatingClosure* early_threads_barrier_closure,
1364                           TestWaitableEvent* early_release_threads_continue,
1365                           RepeatingClosure* early_threads_finished) {
1366                          {
1367                            ScopedBlockingCall scoped_blocking_call(
1368                                FROM_HERE, BlockingType::WILL_BLOCK);
1369                            early_threads_barrier_closure->Run();
1370                            early_release_threads_continue->Wait();
1371                          }
1372                          early_threads_finished->Run();
1373                        },
1374                        Unretained(&early_threads_barrier_closure),
1375                        Unretained(&early_release_threads_continue),
1376                        Unretained(&early_threads_finished_barrier)));
1377   }
1378 
1379   early_blocking_threads_running.Wait();
1380   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(),
1381             kMaxTasks + kMaxNumberOfWorkers);
1382 
1383   TestWaitableEvent late_release_thread_contine;
1384   TestWaitableEvent late_blocking_threads_running;
1385 
1386   RepeatingClosure late_threads_barrier_closure = BarrierClosure(
1387       kNumExtraTasks, BindOnce(&TestWaitableEvent::Signal,
1388                                Unretained(&late_blocking_threads_running)));
1389 
1390   // Posts additional tasks. Note: we should already have |kMaxNumberOfWorkers|
1391   // tasks running. These tasks should not be able to get executed yet as the
1392   // thread group is already at its max worker cap.
1393   for (size_t i = 0; i < kNumExtraTasks; ++i) {
1394     task_runner_->PostTask(
1395         FROM_HERE, BindOnce(
1396                        [](RepeatingClosure* late_threads_barrier_closure,
1397                           TestWaitableEvent* late_release_thread_contine) {
1398                          ScopedBlockingCall scoped_blocking_call(
1399                              FROM_HERE, BlockingType::WILL_BLOCK);
1400                          late_threads_barrier_closure->Run();
1401                          late_release_thread_contine->Wait();
1402                        },
1403                        Unretained(&late_threads_barrier_closure),
1404                        Unretained(&late_release_thread_contine)));
1405   }
1406 
1407   // Give time to see if we exceed the max number of workers.
1408   PlatformThread::Sleep(TestTimeouts::tiny_timeout());
1409   EXPECT_LE(thread_group_->NumberOfWorkersForTesting(), kMaxNumberOfWorkers);
1410 
1411   early_release_threads_continue.Signal();
1412   early_threads_finished.Wait();
1413   late_blocking_threads_running.Wait();
1414 
1415   TestWaitableEvent final_tasks_running;
1416   TestWaitableEvent final_tasks_continue;
1417   RepeatingClosure final_tasks_running_barrier = BarrierClosure(
1418       kMaxTasks,
1419       BindOnce(&TestWaitableEvent::Signal, Unretained(&final_tasks_running)));
1420 
1421   // Verify that we are still able to saturate the thread group.
1422   for (size_t i = 0; i < kMaxTasks; ++i) {
1423     task_runner_->PostTask(FROM_HERE,
1424                            BindOnce(
1425                                [](RepeatingClosure* closure,
1426                                   TestWaitableEvent* final_tasks_continue) {
1427                                  closure->Run();
1428                                  final_tasks_continue->Wait();
1429                                },
1430                                Unretained(&final_tasks_running_barrier),
1431                                Unretained(&final_tasks_continue)));
1432   }
1433   final_tasks_running.Wait();
1434   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks + kNumExtraTasks);
1435   late_release_thread_contine.Signal();
1436   final_tasks_continue.Signal();
1437   task_tracker_.FlushForTesting();
1438 }
1439 
1440 // Verify that the maximum number of best-effort tasks that can run concurrently
1441 // is honored.
TEST_F(ThreadGroupImplImplStartInBodyTest,MaxBestEffortTasks)1442 TEST_F(ThreadGroupImplImplStartInBodyTest, MaxBestEffortTasks) {
1443   constexpr int kMaxBestEffortTasks = kMaxTasks / 2;
1444   StartThreadGroup(TimeDelta::Max(),      // |suggested_reclaim_time|
1445                    kMaxTasks,             // |max_tasks|
1446                    kMaxBestEffortTasks);  // |max_best_effort_tasks|
1447   const scoped_refptr<TaskRunner> foreground_runner =
1448       test::CreatePooledTaskRunner({MayBlock()},
1449                                    &mock_pooled_task_runner_delegate_);
1450   const scoped_refptr<TaskRunner> background_runner =
1451       test::CreatePooledTaskRunner({TaskPriority::BEST_EFFORT, MayBlock()},
1452                                    &mock_pooled_task_runner_delegate_);
1453 
1454   // It should be possible to have |kMaxBestEffortTasks|
1455   // TaskPriority::BEST_EFFORT tasks running concurrently.
1456   TestWaitableEvent best_effort_tasks_running;
1457   TestWaitableEvent unblock_best_effort_tasks;
1458   RepeatingClosure best_effort_tasks_running_barrier = BarrierClosure(
1459       kMaxBestEffortTasks, BindOnce(&TestWaitableEvent::Signal,
1460                                     Unretained(&best_effort_tasks_running)));
1461 
1462   for (int i = 0; i < kMaxBestEffortTasks; ++i) {
1463     background_runner->PostTask(
1464         FROM_HERE, base::BindLambdaForTesting([&]() {
1465           best_effort_tasks_running_barrier.Run();
1466           unblock_best_effort_tasks.Wait();
1467         }));
1468   }
1469   best_effort_tasks_running.Wait();
1470 
1471   // No more TaskPriority::BEST_EFFORT task should run.
1472   AtomicFlag extra_best_effort_task_can_run;
1473   TestWaitableEvent extra_best_effort_task_running;
1474   background_runner->PostTask(
1475       FROM_HERE, base::BindLambdaForTesting([&]() {
1476         EXPECT_TRUE(extra_best_effort_task_can_run.IsSet());
1477         extra_best_effort_task_running.Signal();
1478       }));
1479 
1480   // An extra foreground task should be able to run.
1481   TestWaitableEvent foreground_task_running;
1482   foreground_runner->PostTask(
1483       FROM_HERE, base::BindOnce(&TestWaitableEvent::Signal,
1484                                 Unretained(&foreground_task_running)));
1485   foreground_task_running.Wait();
1486 
1487   // Completion of the TaskPriority::BEST_EFFORT tasks should allow the extra
1488   // TaskPriority::BEST_EFFORT task to run.
1489   extra_best_effort_task_can_run.Set();
1490   unblock_best_effort_tasks.Signal();
1491   extra_best_effort_task_running.Wait();
1492 
1493   // Wait for all tasks to complete before exiting to avoid invalid accesses.
1494   task_tracker_.FlushForTesting();
1495 }
1496 
1497 // Verify that flooding the thread group with BEST_EFFORT tasks doesn't cause
1498 // the creation of more than |max_best_effort_tasks| + 1 workers.
TEST_F(ThreadGroupImplImplStartInBodyTest,FloodBestEffortTasksDoesNotCreateTooManyWorkers)1499 TEST_F(ThreadGroupImplImplStartInBodyTest,
1500        FloodBestEffortTasksDoesNotCreateTooManyWorkers) {
1501   constexpr size_t kMaxBestEffortTasks = kMaxTasks / 2;
1502   StartThreadGroup(TimeDelta::Max(),      // |suggested_reclaim_time|
1503                    kMaxTasks,             // |max_tasks|
1504                    kMaxBestEffortTasks);  // |max_best_effort_tasks|
1505 
1506   const scoped_refptr<TaskRunner> runner =
1507       test::CreatePooledTaskRunner({TaskPriority::BEST_EFFORT, MayBlock()},
1508                                    &mock_pooled_task_runner_delegate_);
1509 
1510   for (size_t i = 0; i < kLargeNumber; ++i) {
1511     runner->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
1512                        EXPECT_LE(thread_group_->NumberOfWorkersForTesting(),
1513                                  kMaxBestEffortTasks + 1);
1514                      }));
1515   }
1516 
1517   // Wait for all tasks to complete before exiting to avoid invalid accesses.
1518   task_tracker_.FlushForTesting();
1519 }
1520 
1521 // Previously, a WILL_BLOCK ScopedBlockingCall unconditionally woke up a worker
1522 // if the priority queue was non-empty. Sometimes, that caused multiple workers
1523 // to be woken up for the same sequence. This test verifies that it is no longer
1524 // the case:
1525 // 1. Post and run task A.
1526 // 2. Post task B from task A.
1527 // 3. Task A enters a WILL_BLOCK ScopedBlockingCall. Once the idle thread is
1528 //    created, this should no-op because there are already enough workers
1529 //    (previously, a worker would be woken up because the priority queue isn't
1530 //    empty).
1531 // 5. Wait for all tasks to complete.
TEST_F(ThreadGroupImplImplStartInBodyTest,RepeatedWillBlockDoesNotCreateTooManyWorkers)1532 TEST_F(ThreadGroupImplImplStartInBodyTest,
1533        RepeatedWillBlockDoesNotCreateTooManyWorkers) {
1534   constexpr size_t kNumWorkers = 2U;
1535   StartThreadGroup(TimeDelta::Max(),  // |suggested_reclaim_time|
1536                    kNumWorkers,       // |max_tasks|
1537                    std::nullopt);     // |max_best_effort_tasks|
1538   const scoped_refptr<TaskRunner> runner = test::CreatePooledTaskRunner(
1539       {MayBlock()}, &mock_pooled_task_runner_delegate_);
1540 
1541   for (size_t i = 0; i < kLargeNumber; ++i) {
1542     runner->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
1543                        runner->PostTask(
1544                            FROM_HERE, BindLambdaForTesting([&]() {
1545                              EXPECT_LE(
1546                                  thread_group_->NumberOfWorkersForTesting(),
1547                                  kNumWorkers + 1);
1548                            }));
1549                        // Number of workers should not increase when there is
1550                        // enough capacity to accommodate queued and running
1551                        // sequences.
1552                        ScopedBlockingCall scoped_blocking_call(
1553                            FROM_HERE, BlockingType::WILL_BLOCK);
1554                        EXPECT_EQ(kNumWorkers + 1,
1555                                  thread_group_->NumberOfWorkersForTesting());
1556                      }));
1557     // Wait for all tasks to complete.
1558     task_tracker_.FlushForTesting();
1559   }
1560 }
1561 
1562 namespace {
1563 
1564 class ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest
1565     : public ThreadGroupImplImplTestBase,
1566       public testing::TestWithParam<BlockingType> {
1567  public:
1568   static constexpr int kMaxBestEffortTasks = kMaxTasks / 2;
1569 
1570   ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest() = default;
1571   ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest(
1572       const ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest&) = delete;
1573   ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest& operator=(
1574       const ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest&) = delete;
1575 
SetUp()1576   void SetUp() override {
1577     CreateThreadGroup();
1578     thread_group_->Start(kMaxTasks, kMaxBestEffortTasks, base::TimeDelta::Max(),
1579                          service_thread_.task_runner(), nullptr,
1580                          ThreadGroup::WorkerEnvironment::NONE,
1581                          /*synchronous_thread_start_for_testing=*/false,
1582                          /*may_block_threshold=*/{});
1583   }
1584 
TearDown()1585   void TearDown() override { ThreadGroupImplImplTestBase::CommonTearDown(); }
1586 
1587  private:
1588 };
1589 
1590 }  // namespace
1591 
TEST_P(ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest,BlockingCallAndMaxBestEffortTasksTest)1592 TEST_P(ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest,
1593        BlockingCallAndMaxBestEffortTasksTest) {
1594   const scoped_refptr<TaskRunner> background_runner =
1595       test::CreatePooledTaskRunner({TaskPriority::BEST_EFFORT, MayBlock()},
1596                                    &mock_pooled_task_runner_delegate_);
1597 
1598   // Post |kMaxBestEffortTasks| TaskPriority::BEST_EFFORT tasks that block in a
1599   // ScopedBlockingCall.
1600   TestWaitableEvent blocking_best_effort_tasks_running;
1601   TestWaitableEvent unblock_blocking_best_effort_tasks;
1602   RepeatingClosure blocking_best_effort_tasks_running_barrier =
1603       BarrierClosure(kMaxBestEffortTasks,
1604                      BindOnce(&TestWaitableEvent::Signal,
1605                               Unretained(&blocking_best_effort_tasks_running)));
1606   for (int i = 0; i < kMaxBestEffortTasks; ++i) {
1607     background_runner->PostTask(
1608         FROM_HERE, base::BindLambdaForTesting([&]() {
1609           blocking_best_effort_tasks_running_barrier.Run();
1610           ScopedBlockingCall scoped_blocking_call(FROM_HERE, GetParam());
1611           unblock_blocking_best_effort_tasks.Wait();
1612         }));
1613   }
1614   blocking_best_effort_tasks_running.Wait();
1615 
1616   // Post an extra |kMaxBestEffortTasks| TaskPriority::BEST_EFFORT tasks. They
1617   // should be able to run, because the existing TaskPriority::BEST_EFFORT tasks
1618   // are blocked within a ScopedBlockingCall.
1619   //
1620   // Note: We block the tasks until they have all started running to make sure
1621   // that it is possible to run an extra |kMaxBestEffortTasks| concurrently.
1622   TestWaitableEvent best_effort_tasks_running;
1623   TestWaitableEvent unblock_best_effort_tasks;
1624   RepeatingClosure best_effort_tasks_running_barrier = BarrierClosure(
1625       kMaxBestEffortTasks, BindOnce(&TestWaitableEvent::Signal,
1626                                     Unretained(&best_effort_tasks_running)));
1627   for (int i = 0; i < kMaxBestEffortTasks; ++i) {
1628     background_runner->PostTask(
1629         FROM_HERE, base::BindLambdaForTesting([&]() {
1630           best_effort_tasks_running_barrier.Run();
1631           unblock_best_effort_tasks.Wait();
1632         }));
1633   }
1634   best_effort_tasks_running.Wait();
1635 
1636   // Unblock all tasks and tear down.
1637   unblock_blocking_best_effort_tasks.Signal();
1638   unblock_best_effort_tasks.Signal();
1639   task_tracker_.FlushForTesting();
1640 }
1641 
1642 INSTANTIATE_TEST_SUITE_P(MayBlock,
1643                          ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest,
1644                          ::testing::Values(BlockingType::MAY_BLOCK));
1645 INSTANTIATE_TEST_SUITE_P(WillBlock,
1646                          ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest,
1647                          ::testing::Values(BlockingType::WILL_BLOCK));
1648 
1649 // Verify that worker detachment doesn't race with worker cleanup, regression
1650 // test for https://crbug.com/810464.
TEST_F(ThreadGroupImplImplStartInBodyTest,RacyCleanup)1651 TEST_F(ThreadGroupImplImplStartInBodyTest, RacyCleanup) {
1652   constexpr size_t kLocalMaxTasks = 256;
1653   constexpr TimeDelta kReclaimTimeForRacyCleanupTest = Milliseconds(10);
1654 
1655   thread_group_->Start(kLocalMaxTasks, kLocalMaxTasks,
1656                        kReclaimTimeForRacyCleanupTest,
1657                        service_thread_.task_runner(), nullptr,
1658                        ThreadGroup::WorkerEnvironment::NONE,
1659                        /*synchronous_thread_start_for_testing=*/false,
1660                        /*may_block_threshold=*/{});
1661 
1662   scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
1663       {WithBaseSyncPrimitives()}, &mock_pooled_task_runner_delegate_);
1664 
1665   TestWaitableEvent threads_running;
1666   TestWaitableEvent unblock_threads;
1667   RepeatingClosure threads_running_barrier = BarrierClosure(
1668       kLocalMaxTasks,
1669       BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
1670 
1671   for (size_t i = 0; i < kLocalMaxTasks; ++i) {
1672     task_runner->PostTask(
1673         FROM_HERE,
1674         BindOnce(
1675             [](OnceClosure on_running, TestWaitableEvent* unblock_threads) {
1676               std::move(on_running).Run();
1677               unblock_threads->Wait();
1678             },
1679             threads_running_barrier, Unretained(&unblock_threads)));
1680   }
1681 
1682   // Wait for all workers to be ready and release them all at once.
1683   threads_running.Wait();
1684   unblock_threads.Signal();
1685 
1686   // Sleep to wakeup precisely when all workers are going to try to cleanup per
1687   // being idle.
1688   PlatformThread::Sleep(kReclaimTimeForRacyCleanupTest);
1689 
1690   thread_group_->JoinForTesting();
1691 
1692   // Unwinding this test will be racy if worker cleanup can race with
1693   // ThreadGroupImpl destruction : https://crbug.com/810464.
1694   mock_pooled_task_runner_delegate_.SetThreadGroup(nullptr);
1695   thread_group_.reset();
1696 }
1697 
1698 }  // namespace internal
1699 }  // namespace base
1700