xref: /aosp_15_r20/external/cronet/base/task/thread_pool/thread_group_semaphore_unittest.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2024 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/thread_group_semaphore.h"
6 
7 #include <stddef.h>
8 
9 #include <algorithm>
10 #include <atomic>
11 #include <memory>
12 #include <optional>
13 #include <unordered_set>
14 #include <utility>
15 #include <vector>
16 
17 #include "base/atomicops.h"
18 #include "base/barrier_closure.h"
19 #include "base/functional/bind.h"
20 #include "base/functional/callback.h"
21 #include "base/functional/callback_helpers.h"
22 #include "base/memory/ptr_util.h"
23 #include "base/memory/raw_ptr.h"
24 #include "base/memory/ref_counted.h"
25 #include "base/metrics/statistics_recorder.h"
26 #include "base/synchronization/atomic_flag.h"
27 #include "base/synchronization/condition_variable.h"
28 #include "base/synchronization/lock.h"
29 #include "base/task/task_features.h"
30 #include "base/task/task_runner.h"
31 #include "base/task/thread_pool/delayed_task_manager.h"
32 #include "base/task/thread_pool/environment_config.h"
33 #include "base/task/thread_pool/pooled_task_runner_delegate.h"
34 #include "base/task/thread_pool/sequence.h"
35 #include "base/task/thread_pool/task_source_sort_key.h"
36 #include "base/task/thread_pool/task_tracker.h"
37 #include "base/task/thread_pool/test_task_factory.h"
38 #include "base/task/thread_pool/test_utils.h"
39 #include "base/task/thread_pool/worker_thread_observer.h"
40 #include "base/test/bind.h"
41 #include "base/test/gtest_util.h"
42 #include "base/test/scoped_feature_list.h"
43 #include "base/test/test_simple_task_runner.h"
44 #include "base/test/test_timeouts.h"
45 #include "base/test/test_waitable_event.h"
46 #include "base/threading/platform_thread.h"
47 #include "base/threading/scoped_blocking_call.h"
48 #include "base/threading/simple_thread.h"
49 #include "base/threading/thread.h"
50 #include "base/threading/thread_checker_impl.h"
51 #include "base/time/time.h"
52 #include "base/timer/timer.h"
53 #include "build/build_config.h"
54 #include "testing/gtest/include/gtest/gtest.h"
55 
56 namespace base {
57 namespace internal {
58 namespace {
59 
60 constexpr size_t kMaxTasks = 4;
61 constexpr size_t kNumThreadsPostingTasks = 4;
62 constexpr size_t kNumTasksPostedPerThread = 150;
63 // This can't be lower because Windows' TestWaitableEvent wakes up too early
64 // when a small timeout is used. This results in many spurious wake ups before a
65 // worker is allowed to cleanup.
66 constexpr TimeDelta kReclaimTimeForCleanupTests = Milliseconds(500);
67 constexpr size_t kLargeNumber = 512;
68 
69 class ThreadGroupSemaphoreImplTestBase : public ThreadGroup::Delegate {
70  public:
71   ThreadGroupSemaphoreImplTestBase(const ThreadGroupSemaphoreImplTestBase&) =
72       delete;
73   ThreadGroupSemaphoreImplTestBase& operator=(
74       const ThreadGroupSemaphoreImplTestBase&) = delete;
75 
76  protected:
ThreadGroupSemaphoreImplTestBase()77   ThreadGroupSemaphoreImplTestBase()
78       : service_thread_("ThreadPoolServiceThread"),
79         tracked_ref_factory_(this) {}
80 
CommonTearDown()81   void CommonTearDown() {
82     delayed_task_manager_.Shutdown();
83     service_thread_.Stop();
84     task_tracker_.FlushForTesting();
85     if (thread_group_) {
86       thread_group_->JoinForTesting();
87     }
88     mock_pooled_task_runner_delegate_.SetThreadGroup(nullptr);
89     thread_group_.reset();
90   }
91 
CreateThreadGroup(ThreadType thread_type=ThreadType::kDefault)92   void CreateThreadGroup(ThreadType thread_type = ThreadType::kDefault) {
93     ASSERT_FALSE(thread_group_);
94     service_thread_.Start();
95     delayed_task_manager_.Start(service_thread_.task_runner());
96     thread_group_ = std::make_unique<ThreadGroupSemaphore>(
97         "TestThreadGroup", "A", thread_type, task_tracker_.GetTrackedRef(),
98         tracked_ref_factory_.GetTrackedRef());
99     ASSERT_TRUE(thread_group_);
100 
101     mock_pooled_task_runner_delegate_.SetThreadGroup(thread_group_.get());
102   }
103 
StartThreadGroup(TimeDelta suggested_reclaim_time,size_t max_tasks,std::optional<int> max_best_effort_tasks=std::nullopt,WorkerThreadObserver * worker_observer=nullptr,std::optional<TimeDelta> may_block_threshold=std::nullopt)104   void StartThreadGroup(
105       TimeDelta suggested_reclaim_time,
106       size_t max_tasks,
107       std::optional<int> max_best_effort_tasks = std::nullopt,
108       WorkerThreadObserver* worker_observer = nullptr,
109       std::optional<TimeDelta> may_block_threshold = std::nullopt) {
110     ASSERT_TRUE(thread_group_);
111     thread_group_->Start(
112         max_tasks,
113         max_best_effort_tasks ? max_best_effort_tasks.value() : max_tasks,
114         suggested_reclaim_time, service_thread_.task_runner(), worker_observer,
115         ThreadGroup::WorkerEnvironment::NONE,
116         /* synchronous_thread_start_for_testing=*/false, may_block_threshold);
117   }
118 
CreateAndStartThreadGroup(TimeDelta suggested_reclaim_time=TimeDelta::Max (),size_t max_tasks=kMaxTasks,std::optional<int> max_best_effort_tasks=std::nullopt,WorkerThreadObserver * worker_observer=nullptr,std::optional<TimeDelta> may_block_threshold=std::nullopt)119   void CreateAndStartThreadGroup(
120       TimeDelta suggested_reclaim_time = TimeDelta::Max(),
121       size_t max_tasks = kMaxTasks,
122       std::optional<int> max_best_effort_tasks = std::nullopt,
123       WorkerThreadObserver* worker_observer = nullptr,
124       std::optional<TimeDelta> may_block_threshold = std::nullopt) {
125     CreateThreadGroup();
126     StartThreadGroup(suggested_reclaim_time, max_tasks, max_best_effort_tasks,
127                      worker_observer, may_block_threshold);
128   }
129 
130   Thread service_thread_;
131   TaskTracker task_tracker_;
132   std::unique_ptr<ThreadGroupSemaphore> thread_group_;
133   DelayedTaskManager delayed_task_manager_;
134   TrackedRefFactory<ThreadGroup::Delegate> tracked_ref_factory_;
135   test::MockPooledTaskRunnerDelegate mock_pooled_task_runner_delegate_ = {
136       task_tracker_.GetTrackedRef(), &delayed_task_manager_};
137 
138  private:
139   // ThreadGroup::Delegate:
GetThreadGroupForTraits(const TaskTraits & traits)140   ThreadGroup* GetThreadGroupForTraits(const TaskTraits& traits) override {
141     return thread_group_.get();
142   }
143 };
144 
145 class ThreadGroupSemaphoreImplTest : public ThreadGroupSemaphoreImplTestBase,
146                                      public testing::Test {
147  public:
148   ThreadGroupSemaphoreImplTest(const ThreadGroupSemaphoreImplTest&) = delete;
149   ThreadGroupSemaphoreImplTest& operator=(const ThreadGroupSemaphoreImplTest&) =
150       delete;
151 
152  protected:
153   ThreadGroupSemaphoreImplTest() = default;
154 
SetUp()155   void SetUp() override { CreateAndStartThreadGroup(); }
156 
TearDown()157   void TearDown() override {
158     ThreadGroupSemaphoreImplTestBase::CommonTearDown();
159   }
160 };
161 
162 class ThreadGroupSemaphoreImplTestParam
163     : public ThreadGroupSemaphoreImplTestBase,
164       public testing::TestWithParam<TaskSourceExecutionMode> {
165  public:
166   ThreadGroupSemaphoreImplTestParam(const ThreadGroupSemaphoreImplTestParam&) =
167       delete;
168   ThreadGroupSemaphoreImplTestParam& operator=(
169       const ThreadGroupSemaphoreImplTestParam&) = delete;
170 
171  protected:
172   ThreadGroupSemaphoreImplTestParam() = default;
173 
SetUp()174   void SetUp() override { CreateAndStartThreadGroup(); }
175 
TearDown()176   void TearDown() override {
177     ThreadGroupSemaphoreImplTestBase::CommonTearDown();
178   }
179 };
180 
181 using PostNestedTask = test::TestTaskFactory::PostNestedTask;
182 
183 class ThreadPostingTasksWaitIdle : public SimpleThread {
184  public:
185   // Constructs a thread that posts tasks to |thread_group| through an
186   // |execution_mode| task runner. The thread waits until all workers in
187   // |thread_group| are idle before posting a new task.
ThreadPostingTasksWaitIdle(ThreadGroupSemaphore * thread_group,test::MockPooledTaskRunnerDelegate * mock_pooled_task_runner_delegate_,TaskSourceExecutionMode execution_mode)188   ThreadPostingTasksWaitIdle(
189       ThreadGroupSemaphore* thread_group,
190       test::MockPooledTaskRunnerDelegate* mock_pooled_task_runner_delegate_,
191       TaskSourceExecutionMode execution_mode)
192       : SimpleThread("ThreadPostingTasksWaitIdle"),
193         thread_group_(thread_group),
194         factory_(CreatePooledTaskRunnerWithExecutionMode(
195                      execution_mode,
196                      mock_pooled_task_runner_delegate_),
197                  execution_mode) {
198     DCHECK(thread_group_);
199   }
200   ThreadPostingTasksWaitIdle(const ThreadPostingTasksWaitIdle&) = delete;
201   ThreadPostingTasksWaitIdle& operator=(const ThreadPostingTasksWaitIdle&) =
202       delete;
203 
factory() const204   const test::TestTaskFactory* factory() const { return &factory_; }
205 
206  private:
Run()207   void Run() override {
208     for (size_t i = 0; i < kNumTasksPostedPerThread; ++i) {
209       thread_group_->WaitForAllWorkersIdleForTesting();
210       EXPECT_TRUE(factory_.PostTask(PostNestedTask::NO, OnceClosure()));
211     }
212   }
213 
214   const raw_ptr<ThreadGroupSemaphore> thread_group_;
215   const scoped_refptr<TaskRunner> task_runner_;
216   test::TestTaskFactory factory_;
217 };
218 
219 }  // namespace
220 
TEST_P(ThreadGroupSemaphoreImplTestParam,PostTasksWaitAllWorkersIdle)221 TEST_P(ThreadGroupSemaphoreImplTestParam, PostTasksWaitAllWorkersIdle) {
222   // Create threads to post tasks. To verify that workers can sleep and be woken
223   // up when new tasks are posted, wait for all workers to become idle before
224   // posting a new task.
225   std::vector<std::unique_ptr<ThreadPostingTasksWaitIdle>>
226       threads_posting_tasks;
227   for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
228     threads_posting_tasks.push_back(
229         std::make_unique<ThreadPostingTasksWaitIdle>(
230             thread_group_.get(), &mock_pooled_task_runner_delegate_,
231             GetParam()));
232     threads_posting_tasks.back()->Start();
233   }
234 
235   // Wait for all tasks to run.
236   for (const auto& thread_posting_tasks : threads_posting_tasks) {
237     thread_posting_tasks->Join();
238     thread_posting_tasks->factory()->WaitForAllTasksToRun();
239   }
240 
241   // Wait until all workers are idle to be sure that no task accesses its
242   // TestTaskFactory after |thread_posting_tasks| is destroyed.
243   thread_group_->WaitForAllWorkersIdleForTesting();
244 }
245 
TEST_P(ThreadGroupSemaphoreImplTestParam,PostTasksWithOneAvailableWorker)246 TEST_P(ThreadGroupSemaphoreImplTestParam, PostTasksWithOneAvailableWorker) {
247   // Post blocking tasks to keep all workers busy except one until |event| is
248   // signaled. Use different factories so that tasks are added to different
249   // sequences and can run simultaneously when the execution mode is SEQUENCED.
250   TestWaitableEvent event;
251   std::vector<std::unique_ptr<test::TestTaskFactory>> blocked_task_factories;
252   for (size_t i = 0; i < (kMaxTasks - 1); ++i) {
253     blocked_task_factories.push_back(std::make_unique<test::TestTaskFactory>(
254         CreatePooledTaskRunnerWithExecutionMode(
255             GetParam(), &mock_pooled_task_runner_delegate_),
256         GetParam()));
257     EXPECT_TRUE(blocked_task_factories.back()->PostTask(
258         PostNestedTask::NO,
259         BindOnce(&TestWaitableEvent::Wait, Unretained(&event))));
260     blocked_task_factories.back()->WaitForAllTasksToRun();
261   }
262 
263   // Post |kNumTasksPostedPerThread| tasks that should all run despite the fact
264   // that only one worker in |thread_group_| isn't busy.
265   test::TestTaskFactory short_task_factory(
266       CreatePooledTaskRunnerWithExecutionMode(
267           GetParam(), &mock_pooled_task_runner_delegate_),
268       GetParam());
269   for (size_t i = 0; i < kNumTasksPostedPerThread; ++i) {
270     EXPECT_TRUE(short_task_factory.PostTask(PostNestedTask::NO, OnceClosure()));
271   }
272   short_task_factory.WaitForAllTasksToRun();
273 
274   // Release tasks waiting on |event|.
275   event.Signal();
276 
277   // Wait until all workers are idle to be sure that no task accesses
278   // its TestTaskFactory after it is destroyed.
279   thread_group_->WaitForAllWorkersIdleForTesting();
280 }
281 
TEST_P(ThreadGroupSemaphoreImplTestParam,Saturate)282 TEST_P(ThreadGroupSemaphoreImplTestParam, Saturate) {
283   // Verify that it is possible to have |kMaxTasks| tasks/sequences running
284   // simultaneously. Use different factories so that the blocking tasks are
285   // added to different sequences and can run simultaneously when the execution
286   // mode is SEQUENCED.
287   TestWaitableEvent event;
288   std::vector<std::unique_ptr<test::TestTaskFactory>> factories;
289   for (size_t i = 0; i < kMaxTasks; ++i) {
290     factories.push_back(std::make_unique<test::TestTaskFactory>(
291         CreatePooledTaskRunnerWithExecutionMode(
292             GetParam(), &mock_pooled_task_runner_delegate_),
293         GetParam()));
294     EXPECT_TRUE(factories.back()->PostTask(
295         PostNestedTask::NO,
296         BindOnce(&TestWaitableEvent::Wait, Unretained(&event))));
297     factories.back()->WaitForAllTasksToRun();
298   }
299 
300   // Release tasks waiting on |event|.
301   event.Signal();
302 
303   // Wait until all workers are idle to be sure that no task accesses
304   // its TestTaskFactory after it is destroyed.
305   thread_group_->WaitForAllWorkersIdleForTesting();
306 }
307 
308 // Verifies that ShouldYield() returns true for priorities lower than the
309 // highest priority pending while the thread group is flooded with USER_VISIBLE
310 // tasks.
TEST_F(ThreadGroupSemaphoreImplTest,ShouldYieldFloodedUserVisible)311 TEST_F(ThreadGroupSemaphoreImplTest, ShouldYieldFloodedUserVisible) {
312   TestWaitableEvent threads_running;
313   TestWaitableEvent threads_continue;
314 
315   // Saturate workers with USER_VISIBLE tasks to ensure ShouldYield() returns
316   // true when a tasks of higher priority is posted.
317   RepeatingClosure threads_running_barrier = BarrierClosure(
318       kMaxTasks,
319       BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
320 
321   auto job_task = base::MakeRefCounted<test::MockJobTask>(
322       BindLambdaForTesting(
323           [&threads_running_barrier, &threads_continue](JobDelegate* delegate) {
324             threads_running_barrier.Run();
325             threads_continue.Wait();
326           }),
327       /* num_tasks_to_run */ kMaxTasks);
328   scoped_refptr<JobTaskSource> task_source =
329       job_task->GetJobTaskSource(FROM_HERE, {TaskPriority::USER_VISIBLE},
330                                  &mock_pooled_task_runner_delegate_);
331   task_source->NotifyConcurrencyIncrease();
332 
333   threads_running.Wait();
334 
335   // Posting a BEST_EFFORT task should not cause any other tasks to yield.
336   // Once this task gets to run, no other task needs to yield.
337   // Note: This is only true because this test is using a single ThreadGroup.
338   //       Under the ThreadPool this wouldn't be racy because BEST_EFFORT tasks
339   //       run in an independent ThreadGroup.
340   test::CreatePooledTaskRunner({TaskPriority::BEST_EFFORT},
341                                &mock_pooled_task_runner_delegate_)
342       ->PostTask(
343           FROM_HERE, BindLambdaForTesting([&]() {
344             EXPECT_FALSE(thread_group_->ShouldYield(
345                 {TaskPriority::BEST_EFFORT, TimeTicks(), /* worker_count=*/1}));
346           }));
347   // A BEST_EFFORT task with more workers shouldn't have to yield.
348   EXPECT_FALSE(thread_group_->ShouldYield(
349       {TaskPriority::BEST_EFFORT, TimeTicks(), /* worker_count=*/2}));
350   EXPECT_FALSE(thread_group_->ShouldYield(
351       {TaskPriority::BEST_EFFORT, TimeTicks(), /* worker_count=*/0}));
352   EXPECT_FALSE(thread_group_->ShouldYield(
353       {TaskPriority::USER_VISIBLE, TimeTicks(), /* worker_count=*/0}));
354   EXPECT_FALSE(thread_group_->ShouldYield(
355       {TaskPriority::USER_BLOCKING, TimeTicks(), /* worker_count=*/0}));
356 
357   // Posting a USER_VISIBLE task should cause BEST_EFFORT and USER_VISIBLE with
358   // higher worker_count tasks to yield.
359   auto post_user_visible = [&]() {
360     test::CreatePooledTaskRunner({TaskPriority::USER_VISIBLE},
361                                  &mock_pooled_task_runner_delegate_)
362         ->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
363                      EXPECT_FALSE(thread_group_->ShouldYield(
364                          {TaskPriority::USER_VISIBLE, TimeTicks(),
365                           /* worker_count=*/1}));
366                    }));
367   };
368   // A USER_VISIBLE task with too many workers should yield.
369   post_user_visible();
370   EXPECT_TRUE(thread_group_->ShouldYield(
371       {TaskPriority::USER_VISIBLE, TimeTicks(), /* worker_count=*/2}));
372   post_user_visible();
373   EXPECT_TRUE(thread_group_->ShouldYield(
374       {TaskPriority::BEST_EFFORT, TimeTicks(), /* worker_count=*/0}));
375   post_user_visible();
376   EXPECT_FALSE(thread_group_->ShouldYield(
377       {TaskPriority::USER_VISIBLE, TimeTicks(), /* worker_count=*/1}));
378   EXPECT_FALSE(thread_group_->ShouldYield(
379       {TaskPriority::USER_BLOCKING, TimeTicks(), /* worker_count=*/0}));
380 
381   // Posting a USER_BLOCKING task should cause BEST_EFFORT, USER_VISIBLE and
382   // USER_BLOCKING with higher worker_count tasks to yield.
383   auto post_user_blocking = [&]() {
384     test::CreatePooledTaskRunner({TaskPriority::USER_BLOCKING},
385                                  &mock_pooled_task_runner_delegate_)
386         ->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
387                      // Once this task got to start, no other task needs to
388                      // yield.
389                      EXPECT_FALSE(thread_group_->ShouldYield(
390                          {TaskPriority::USER_BLOCKING, TimeTicks(),
391                           /* worker_count=*/1}));
392                    }));
393   };
394   // A USER_BLOCKING task with too many workers should have to yield.
395   post_user_blocking();
396   EXPECT_TRUE(thread_group_->ShouldYield(
397       {TaskPriority::USER_BLOCKING, TimeTicks(), /* worker_count=*/2}));
398   post_user_blocking();
399   EXPECT_TRUE(thread_group_->ShouldYield(
400       {TaskPriority::BEST_EFFORT, TimeTicks(), /* worker_count=*/0}));
401   post_user_blocking();
402   EXPECT_TRUE(thread_group_->ShouldYield(
403       {TaskPriority::USER_VISIBLE, TimeTicks(), /* worker_count=*/0}));
404   post_user_blocking();
405   EXPECT_FALSE(thread_group_->ShouldYield(
406       {TaskPriority::USER_BLOCKING, TimeTicks(), /* worker_count=*/1}));
407 
408   threads_continue.Signal();
409   task_tracker_.FlushForTesting();
410 }
411 
412 INSTANTIATE_TEST_SUITE_P(Parallel,
413                          ThreadGroupSemaphoreImplTestParam,
414                          ::testing::Values(TaskSourceExecutionMode::kParallel));
415 INSTANTIATE_TEST_SUITE_P(
416     Sequenced,
417     ThreadGroupSemaphoreImplTestParam,
418     ::testing::Values(TaskSourceExecutionMode::kSequenced));
419 
420 INSTANTIATE_TEST_SUITE_P(Job,
421                          ThreadGroupSemaphoreImplTestParam,
422                          ::testing::Values(TaskSourceExecutionMode::kJob));
423 
424 namespace {
425 
426 class ThreadGroupSemaphoreImplStartInBodyTest
427     : public ThreadGroupSemaphoreImplTest {
428  public:
SetUp()429   void SetUp() override {
430     CreateThreadGroup();
431     // Let the test start the thread group.
432   }
433 };
434 
TaskPostedBeforeStart(PlatformThreadRef * platform_thread_ref,TestWaitableEvent * task_running,TestWaitableEvent * barrier)435 void TaskPostedBeforeStart(PlatformThreadRef* platform_thread_ref,
436                            TestWaitableEvent* task_running,
437                            TestWaitableEvent* barrier) {
438   *platform_thread_ref = PlatformThread::CurrentRef();
439   task_running->Signal();
440   barrier->Wait();
441 }
442 
443 }  // namespace
444 
445 // Verify that 2 tasks posted before Start() to a ThreadGroupSemaphore with
446 // more than 2 workers run on different workers when Start() is called.
TEST_F(ThreadGroupSemaphoreImplStartInBodyTest,PostTasksBeforeStart)447 TEST_F(ThreadGroupSemaphoreImplStartInBodyTest, PostTasksBeforeStart) {
448   PlatformThreadRef task_1_thread_ref;
449   PlatformThreadRef task_2_thread_ref;
450   TestWaitableEvent task_1_running;
451   TestWaitableEvent task_2_running;
452 
453   // This event is used to prevent a task from completing before the other task
454   // starts running. If that happened, both tasks could run on the same worker
455   // and this test couldn't verify that the correct number of workers were woken
456   // up.
457   TestWaitableEvent barrier;
458 
459   test::CreatePooledTaskRunner({WithBaseSyncPrimitives()},
460                                &mock_pooled_task_runner_delegate_)
461       ->PostTask(
462           FROM_HERE,
463           BindOnce(&TaskPostedBeforeStart, Unretained(&task_1_thread_ref),
464                    Unretained(&task_1_running), Unretained(&barrier)));
465   test::CreatePooledTaskRunner({WithBaseSyncPrimitives()},
466                                &mock_pooled_task_runner_delegate_)
467       ->PostTask(
468           FROM_HERE,
469           BindOnce(&TaskPostedBeforeStart, Unretained(&task_2_thread_ref),
470                    Unretained(&task_2_running), Unretained(&barrier)));
471 
472   // Workers should not be created and tasks should not run before the thread
473   // group is started.
474   EXPECT_EQ(0U, thread_group_->NumberOfWorkersForTesting());
475   EXPECT_FALSE(task_1_running.IsSignaled());
476   EXPECT_FALSE(task_2_running.IsSignaled());
477 
478   StartThreadGroup(TimeDelta::Max(), kMaxTasks);
479 
480   // Tasks should run shortly after the thread group is started.
481   task_1_running.Wait();
482   task_2_running.Wait();
483 
484   // Tasks should run on different threads.
485   EXPECT_NE(task_1_thread_ref, task_2_thread_ref);
486 
487   barrier.Signal();
488   task_tracker_.FlushForTesting();
489 }
490 
491 // Verify that posting many tasks before Start will cause the number of workers
492 // to grow to |max_tasks_| after Start.
TEST_F(ThreadGroupSemaphoreImplStartInBodyTest,PostManyTasks)493 TEST_F(ThreadGroupSemaphoreImplStartInBodyTest, PostManyTasks) {
494   scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
495       {WithBaseSyncPrimitives()}, &mock_pooled_task_runner_delegate_);
496   constexpr size_t kNumTasksPosted = 2 * kMaxTasks;
497 
498   TestWaitableEvent threads_running;
499   TestWaitableEvent threads_continue;
500 
501   RepeatingClosure threads_running_barrier = BarrierClosure(
502       kMaxTasks,
503       BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
504   // Posting these tasks should cause new workers to be created.
505   for (size_t i = 0; i < kMaxTasks; ++i) {
506     task_runner->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
507                             threads_running_barrier.Run();
508                             threads_continue.Wait();
509                           }));
510   }
511   // Post the remaining |kNumTasksPosted - kMaxTasks| tasks, don't wait for them
512   // as they'll be blocked behind the above kMaxtasks.
513   for (size_t i = kMaxTasks; i < kNumTasksPosted; ++i) {
514     task_runner->PostTask(FROM_HERE, DoNothing());
515   }
516 
517   EXPECT_EQ(0U, thread_group_->NumberOfWorkersForTesting());
518 
519   StartThreadGroup(TimeDelta::Max(), kMaxTasks);
520   EXPECT_GT(thread_group_->NumberOfWorkersForTesting(), 0U);
521   EXPECT_EQ(kMaxTasks, thread_group_->GetMaxTasksForTesting());
522 
523   threads_running.Wait();
524   EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(),
525             thread_group_->GetMaxTasksForTesting());
526   threads_continue.Signal();
527   task_tracker_.FlushForTesting();
528 }
529 
530 namespace {
531 
532 class BackgroundThreadGroupSemaphoreTest : public ThreadGroupSemaphoreImplTest {
533  public:
CreateAndStartThreadGroup(TimeDelta suggested_reclaim_time=TimeDelta::Max (),size_t max_tasks=kMaxTasks,std::optional<int> max_best_effort_tasks=std::nullopt,WorkerThreadObserver * worker_observer=nullptr,std::optional<TimeDelta> may_block_threshold=std::nullopt)534   void CreateAndStartThreadGroup(
535       TimeDelta suggested_reclaim_time = TimeDelta::Max(),
536       size_t max_tasks = kMaxTasks,
537       std::optional<int> max_best_effort_tasks = std::nullopt,
538       WorkerThreadObserver* worker_observer = nullptr,
539       std::optional<TimeDelta> may_block_threshold = std::nullopt) {
540     if (!CanUseBackgroundThreadTypeForWorkerThread()) {
541       return;
542     }
543     CreateThreadGroup(ThreadType::kBackground);
544     StartThreadGroup(suggested_reclaim_time, max_tasks, max_best_effort_tasks,
545                      worker_observer, may_block_threshold);
546   }
547 
SetUp()548   void SetUp() override { CreateAndStartThreadGroup(); }
549 };
550 
551 }  // namespace
552 
553 // Verify that ScopedBlockingCall updates thread type when necessary per
554 // shutdown state.
TEST_F(BackgroundThreadGroupSemaphoreTest,UpdatePriorityBlockingStarted)555 TEST_F(BackgroundThreadGroupSemaphoreTest, UpdatePriorityBlockingStarted) {
556   if (!CanUseBackgroundThreadTypeForWorkerThread()) {
557     return;
558   }
559 
560   const scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
561       {MayBlock(), WithBaseSyncPrimitives(), TaskPriority::BEST_EFFORT},
562       &mock_pooled_task_runner_delegate_);
563 
564   TestWaitableEvent threads_running;
565   RepeatingClosure threads_running_barrier = BarrierClosure(
566       kMaxTasks,
567       BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
568 
569   TestWaitableEvent blocking_threads_continue;
570 
571   for (size_t i = 0; i < kMaxTasks; ++i) {
572     task_runner->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
573                             EXPECT_EQ(ThreadType::kBackground,
574                                       PlatformThread::GetCurrentThreadType());
575                             {
576                               // ScopedBlockingCall before shutdown doesn't
577                               // affect priority.
578                               ScopedBlockingCall scoped_blocking_call(
579                                   FROM_HERE, BlockingType::MAY_BLOCK);
580                               EXPECT_EQ(ThreadType::kBackground,
581                                         PlatformThread::GetCurrentThreadType());
582                             }
583                             threads_running_barrier.Run();
584                             blocking_threads_continue.Wait();
585                             // This is reached after StartShutdown(), at which
586                             // point we expect ScopedBlockingCall to update
587                             // thread priority.
588                             ScopedBlockingCall scoped_blocking_call(
589                                 FROM_HERE, BlockingType::MAY_BLOCK);
590                             EXPECT_EQ(ThreadType::kDefault,
591                                       PlatformThread::GetCurrentThreadType());
592                           }));
593   }
594   threads_running.Wait();
595 
596   task_tracker_.StartShutdown();
597   blocking_threads_continue.Signal();
598   task_tracker_.FlushForTesting();
599 }
600 
601 namespace {
602 
603 class ThreadGroupSemaphoreStandbyPolicyTest
604     : public ThreadGroupSemaphoreImplTestBase,
605       public testing::Test {
606  public:
607   ThreadGroupSemaphoreStandbyPolicyTest() = default;
608   ThreadGroupSemaphoreStandbyPolicyTest(
609       const ThreadGroupSemaphoreStandbyPolicyTest&) = delete;
610   ThreadGroupSemaphoreStandbyPolicyTest& operator=(
611       const ThreadGroupSemaphoreStandbyPolicyTest&) = delete;
612 
SetUp()613   void SetUp() override {
614     CreateAndStartThreadGroup(kReclaimTimeForCleanupTests);
615   }
616 
TearDown()617   void TearDown() override {
618     ThreadGroupSemaphoreImplTestBase::CommonTearDown();
619   }
620 };
621 
622 }  // namespace
623 
TEST_F(ThreadGroupSemaphoreStandbyPolicyTest,InitOne)624 TEST_F(ThreadGroupSemaphoreStandbyPolicyTest, InitOne) {
625   EXPECT_EQ(1U, thread_group_->NumberOfWorkersForTesting());
626 }
627 
628 namespace {
629 
630 enum class OptionalBlockingType {
631   NO_BLOCK,
632   MAY_BLOCK,
633   WILL_BLOCK,
634 };
635 
636 struct NestedBlockingType {
NestedBlockingTypebase::internal::__anon1e2f1e470d11::NestedBlockingType637   NestedBlockingType(BlockingType first_in,
638                      OptionalBlockingType second_in,
639                      BlockingType behaves_as_in)
640       : first(first_in), second(second_in), behaves_as(behaves_as_in) {}
641 
642   BlockingType first;
643   OptionalBlockingType second;
644   BlockingType behaves_as;
645 };
646 
647 class NestedScopedBlockingCall {
648  public:
NestedScopedBlockingCall(const NestedBlockingType & nested_blocking_type)649   explicit NestedScopedBlockingCall(
650       const NestedBlockingType& nested_blocking_type)
651       : first_scoped_blocking_call_(FROM_HERE, nested_blocking_type.first),
652         second_scoped_blocking_call_(
653             nested_blocking_type.second == OptionalBlockingType::WILL_BLOCK
654                 ? std::make_unique<ScopedBlockingCall>(FROM_HERE,
655                                                        BlockingType::WILL_BLOCK)
656                 : (nested_blocking_type.second ==
657                            OptionalBlockingType::MAY_BLOCK
658                        ? std::make_unique<ScopedBlockingCall>(
659                              FROM_HERE,
660                              BlockingType::MAY_BLOCK)
661                        : nullptr)) {}
662   NestedScopedBlockingCall(const NestedScopedBlockingCall&) = delete;
663   NestedScopedBlockingCall& operator=(const NestedScopedBlockingCall&) = delete;
664 
665  private:
666   ScopedBlockingCall first_scoped_blocking_call_;
667   std::unique_ptr<ScopedBlockingCall> second_scoped_blocking_call_;
668 };
669 
670 }  // namespace
671 
672 class ThreadGroupSemaphoreBlockingTest
673     : public ThreadGroupSemaphoreImplTestBase,
674       public testing::TestWithParam<NestedBlockingType> {
675  public:
676   ThreadGroupSemaphoreBlockingTest() = default;
677   ThreadGroupSemaphoreBlockingTest(const ThreadGroupSemaphoreBlockingTest&) =
678       delete;
679   ThreadGroupSemaphoreBlockingTest& operator=(
680       const ThreadGroupSemaphoreBlockingTest&) = delete;
681 
ParamInfoToString(::testing::TestParamInfo<NestedBlockingType> param_info)682   static std::string ParamInfoToString(
683       ::testing::TestParamInfo<NestedBlockingType> param_info) {
684     std::string str = param_info.param.first == BlockingType::MAY_BLOCK
685                           ? "MAY_BLOCK"
686                           : "WILL_BLOCK";
687     if (param_info.param.second == OptionalBlockingType::MAY_BLOCK) {
688       str += "_MAY_BLOCK";
689     } else if (param_info.param.second == OptionalBlockingType::WILL_BLOCK) {
690       str += "_WILL_BLOCK";
691     }
692     return str;
693   }
694 
TearDown()695   void TearDown() override {
696     ThreadGroupSemaphoreImplTestBase::CommonTearDown();
697   }
698 
699  protected:
700   // Saturates the thread group with a task that first blocks, waits to be
701   // unblocked, then exits.
SaturateWithBlockingTasks(const NestedBlockingType & nested_blocking_type,TaskPriority priority=TaskPriority::USER_BLOCKING)702   void SaturateWithBlockingTasks(
703       const NestedBlockingType& nested_blocking_type,
704       TaskPriority priority = TaskPriority::USER_BLOCKING) {
705     TestWaitableEvent threads_running;
706 
707     const scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
708         {MayBlock(), WithBaseSyncPrimitives(), priority},
709         &mock_pooled_task_runner_delegate_);
710 
711     RepeatingClosure threads_running_barrier = BarrierClosure(
712         kMaxTasks,
713         BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
714 
715     for (size_t i = 0; i < kMaxTasks; ++i) {
716       task_runner->PostTask(
717           FROM_HERE, BindLambdaForTesting([this, &threads_running_barrier,
718                                            nested_blocking_type]() {
719             NestedScopedBlockingCall nested_scoped_blocking_call(
720                 nested_blocking_type);
721             threads_running_barrier.Run();
722             blocking_threads_continue_.Wait();
723           }));
724     }
725     threads_running.Wait();
726   }
727 
728   // Saturates the thread group with a task that waits for other tasks without
729   // entering a ScopedBlockingCall, then exits.
SaturateWithBusyTasks(TaskPriority priority=TaskPriority::USER_BLOCKING,TaskShutdownBehavior shutdown_behavior=TaskShutdownBehavior::SKIP_ON_SHUTDOWN)730   void SaturateWithBusyTasks(
731       TaskPriority priority = TaskPriority::USER_BLOCKING,
732       TaskShutdownBehavior shutdown_behavior =
733           TaskShutdownBehavior::SKIP_ON_SHUTDOWN) {
734     TestWaitableEvent threads_running;
735 
736     const scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
737         {MayBlock(), WithBaseSyncPrimitives(), priority, shutdown_behavior},
738         &mock_pooled_task_runner_delegate_);
739 
740     RepeatingClosure threads_running_barrier = BarrierClosure(
741         kMaxTasks,
742         BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
743     // Posting these tasks should cause new workers to be created.
744     for (size_t i = 0; i < kMaxTasks; ++i) {
745       task_runner->PostTask(
746           FROM_HERE, BindLambdaForTesting([this, &threads_running_barrier]() {
747             threads_running_barrier.Run();
748             busy_threads_continue_.Wait();
749           }));
750     }
751     threads_running.Wait();
752   }
753 
754   // Returns how long we can expect a change to |max_tasks_| to occur
755   // after a task has become blocked.
GetMaxTasksChangeSleepTime()756   TimeDelta GetMaxTasksChangeSleepTime() {
757     return std::max(thread_group_->blocked_workers_poll_period_for_testing(),
758                     thread_group_->may_block_threshold_for_testing()) +
759            TestTimeouts::tiny_timeout();
760   }
761 
762   // Waits indefinitely, until |thread_group_|'s max tasks increases to
763   // |expected_max_tasks|.
ExpectMaxTasksIncreasesTo(size_t expected_max_tasks)764   void ExpectMaxTasksIncreasesTo(size_t expected_max_tasks) {
765     size_t max_tasks = thread_group_->GetMaxTasksForTesting();
766     while (max_tasks != expected_max_tasks) {
767       PlatformThread::Sleep(GetMaxTasksChangeSleepTime());
768       size_t new_max_tasks = thread_group_->GetMaxTasksForTesting();
769       ASSERT_GE(new_max_tasks, max_tasks);
770       max_tasks = new_max_tasks;
771     }
772   }
773 
774   // Unblocks tasks posted by SaturateWithBlockingTasks().
UnblockBlockingTasks()775   void UnblockBlockingTasks() { blocking_threads_continue_.Signal(); }
776 
777   // Unblocks tasks posted by SaturateWithBusyTasks().
UnblockBusyTasks()778   void UnblockBusyTasks() { busy_threads_continue_.Signal(); }
779 
780   const scoped_refptr<TaskRunner> task_runner_ =
781       test::CreatePooledTaskRunner({MayBlock(), WithBaseSyncPrimitives()},
782                                    &mock_pooled_task_runner_delegate_);
783 
784  private:
785   TestWaitableEvent blocking_threads_continue_;
786   TestWaitableEvent busy_threads_continue_;
787 };
788 
789 // Verify that SaturateWithBlockingTasks() causes max tasks to increase and
790 // creates a worker if needed. Also verify that UnblockBlockingTasks() decreases
791 // max tasks after an increase.
TEST_P(ThreadGroupSemaphoreBlockingTest,ThreadBlockedUnblocked)792 TEST_P(ThreadGroupSemaphoreBlockingTest, ThreadBlockedUnblocked) {
793   CreateAndStartThreadGroup();
794 
795   ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
796 
797   SaturateWithBlockingTasks(GetParam());
798 
799   // Forces |kMaxTasks| extra workers to be instantiated by posting tasks. This
800   // should not block forever.
801   SaturateWithBusyTasks();
802 
803   EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
804 
805   UnblockBusyTasks();
806   UnblockBlockingTasks();
807   task_tracker_.FlushForTesting();
808   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
809 }
810 
811 // Verify that SaturateWithBlockingTasks() of BEST_EFFORT tasks causes max best
812 // effort tasks to increase and creates a worker if needed. Also verify that
813 // UnblockBlockingTasks() decreases max best effort tasks after an increase.
TEST_P(ThreadGroupSemaphoreBlockingTest,ThreadBlockedUnblockedBestEffort)814 TEST_P(ThreadGroupSemaphoreBlockingTest, ThreadBlockedUnblockedBestEffort) {
815   CreateAndStartThreadGroup();
816 
817   ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
818   ASSERT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
819 
820   SaturateWithBlockingTasks(GetParam(), TaskPriority::BEST_EFFORT);
821 
822   // Forces |kMaxTasks| extra workers to be instantiated by posting tasks. This
823   // should not block forever.
824   SaturateWithBusyTasks(TaskPriority::BEST_EFFORT);
825 
826   EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
827 
828   UnblockBusyTasks();
829   UnblockBlockingTasks();
830   task_tracker_.FlushForTesting();
831   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
832   EXPECT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
833 }
834 
835 // Verify that flooding the thread group with more BEST_EFFORT tasks than
836 // kMaxBestEffortTasks doesn't prevent USER_VISIBLE tasks from running.
TEST_P(ThreadGroupSemaphoreBlockingTest,TooManyBestEffortTasks)837 TEST_P(ThreadGroupSemaphoreBlockingTest, TooManyBestEffortTasks) {
838   constexpr size_t kMaxBestEffortTasks = kMaxTasks / 2;
839 
840   CreateAndStartThreadGroup(TimeDelta::Max(), kMaxTasks, kMaxBestEffortTasks);
841 
842   TestWaitableEvent threads_continue;
843   {
844     TestWaitableEvent entered_blocking_scope;
845     RepeatingClosure entered_blocking_scope_barrier = BarrierClosure(
846         kMaxBestEffortTasks + 1, BindOnce(&TestWaitableEvent::Signal,
847                                           Unretained(&entered_blocking_scope)));
848     TestWaitableEvent exit_blocking_scope;
849 
850     TestWaitableEvent threads_running;
851     RepeatingClosure threads_running_barrier = BarrierClosure(
852         kMaxBestEffortTasks + 1,
853         BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
854 
855     const auto best_effort_task_runner =
856         test::CreatePooledTaskRunner({TaskPriority::BEST_EFFORT, MayBlock()},
857                                      &mock_pooled_task_runner_delegate_);
858     for (size_t i = 0; i < kMaxBestEffortTasks + 1; ++i) {
859       best_effort_task_runner->PostTask(
860           FROM_HERE, BindLambdaForTesting([&]() {
861             {
862               NestedScopedBlockingCall scoped_blocking_call(GetParam());
863               entered_blocking_scope_barrier.Run();
864               exit_blocking_scope.Wait();
865             }
866             threads_running_barrier.Run();
867             threads_continue.Wait();
868           }));
869     }
870     entered_blocking_scope.Wait();
871     exit_blocking_scope.Signal();
872     threads_running.Wait();
873   }
874 
875   // At this point, kMaxBestEffortTasks + 1 threads are running (plus
876   // potentially the idle thread), but max_task and max_best_effort_task are
877   // back to normal.
878   EXPECT_GE(thread_group_->NumberOfWorkersForTesting(),
879             kMaxBestEffortTasks + 1);
880   EXPECT_LE(thread_group_->NumberOfWorkersForTesting(),
881             kMaxBestEffortTasks + 2);
882   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
883 
884   TestWaitableEvent threads_running;
885   task_runner_->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
886                            threads_running.Signal();
887                            threads_continue.Wait();
888                          }));
889 
890   // This should not block forever.
891   threads_running.Wait();
892 
893   EXPECT_GE(thread_group_->NumberOfWorkersForTesting(),
894             kMaxBestEffortTasks + 2);
895   EXPECT_LE(thread_group_->NumberOfWorkersForTesting(),
896             kMaxBestEffortTasks + 3);
897   threads_continue.Signal();
898 
899   task_tracker_.FlushForTesting();
900 }
901 
902 // Verify that tasks posted in a saturated thread group before a
903 // ScopedBlockingCall will execute after ScopedBlockingCall is instantiated.
TEST_P(ThreadGroupSemaphoreBlockingTest,PostBeforeBlocking)904 TEST_P(ThreadGroupSemaphoreBlockingTest, PostBeforeBlocking) {
905   CreateAndStartThreadGroup();
906 
907   TestWaitableEvent thread_running(WaitableEvent::ResetPolicy::AUTOMATIC);
908   TestWaitableEvent thread_can_block;
909   TestWaitableEvent threads_continue;
910 
911   for (size_t i = 0; i < kMaxTasks; ++i) {
912     task_runner_->PostTask(
913         FROM_HERE,
914         BindOnce(
915             [](const NestedBlockingType& nested_blocking_type,
916                TestWaitableEvent* thread_running,
917                TestWaitableEvent* thread_can_block,
918                TestWaitableEvent* threads_continue) {
919               thread_running->Signal();
920               thread_can_block->Wait();
921 
922               NestedScopedBlockingCall nested_scoped_blocking_call(
923                   nested_blocking_type);
924               threads_continue->Wait();
925             },
926             GetParam(), Unretained(&thread_running),
927             Unretained(&thread_can_block), Unretained(&threads_continue)));
928     thread_running.Wait();
929   }
930 
931   // All workers should be occupied and the thread group should be saturated.
932   // Workers have not entered ScopedBlockingCall yet.
933   EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), kMaxTasks);
934   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
935 
936   TestWaitableEvent extra_threads_running;
937   TestWaitableEvent extra_threads_continue;
938   RepeatingClosure extra_threads_running_barrier = BarrierClosure(
939       kMaxTasks,
940       BindOnce(&TestWaitableEvent::Signal, Unretained(&extra_threads_running)));
941   for (size_t i = 0; i < kMaxTasks; ++i) {
942     task_runner_->PostTask(
943         FROM_HERE, BindOnce(
944                        [](RepeatingClosure* extra_threads_running_barrier,
945                           TestWaitableEvent* extra_threads_continue) {
946                          extra_threads_running_barrier->Run();
947                          extra_threads_continue->Wait();
948                        },
949                        Unretained(&extra_threads_running_barrier),
950                        Unretained(&extra_threads_continue)));
951   }
952 
953   // Allow tasks to enter ScopedBlockingCall. Workers should be created for the
954   // tasks we just posted.
955   thread_can_block.Signal();
956 
957   // Should not block forever.
958   extra_threads_running.Wait();
959   EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
960   extra_threads_continue.Signal();
961 
962   threads_continue.Signal();
963   task_tracker_.FlushForTesting();
964 }
965 
966 // Verify that workers become idle when the thread group is over-capacity and
967 // that those workers do no work.
TEST_P(ThreadGroupSemaphoreBlockingTest,WorkersIdleWhenOverCapacity)968 TEST_P(ThreadGroupSemaphoreBlockingTest, WorkersIdleWhenOverCapacity) {
969   CreateAndStartThreadGroup();
970 
971   ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
972 
973   SaturateWithBlockingTasks(GetParam());
974 
975   // Forces |kMaxTasks| extra workers to be instantiated by posting tasks.
976   SaturateWithBusyTasks();
977 
978   ASSERT_EQ(thread_group_->NumberOfIdleWorkersForTesting(), 0U);
979   EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
980 
981   AtomicFlag is_exiting;
982   // These tasks should not get executed until after other tasks become
983   // unblocked.
984   for (size_t i = 0; i < kMaxTasks; ++i) {
985     task_runner_->PostTask(FROM_HERE, BindOnce(
986                                           [](AtomicFlag* is_exiting) {
987                                             EXPECT_TRUE(is_exiting->IsSet());
988                                           },
989                                           Unretained(&is_exiting)));
990   }
991 
992   // The original |kMaxTasks| will finish their tasks after being unblocked.
993   // There will be work in the work queue, but the thread group should now be
994   // over-capacity and workers will become idle.
995   UnblockBlockingTasks();
996   thread_group_->WaitForWorkersIdleForTesting(kMaxTasks);
997   EXPECT_EQ(thread_group_->NumberOfIdleWorkersForTesting(), kMaxTasks);
998 
999   // Posting more tasks should not cause workers idle from the thread group
1000   // being over capacity to begin doing work.
1001   for (size_t i = 0; i < kMaxTasks; ++i) {
1002     task_runner_->PostTask(FROM_HERE, BindOnce(
1003                                           [](AtomicFlag* is_exiting) {
1004                                             EXPECT_TRUE(is_exiting->IsSet());
1005                                           },
1006                                           Unretained(&is_exiting)));
1007   }
1008 
1009   // Give time for those idle workers to possibly do work (which should not
1010   // happen).
1011   PlatformThread::Sleep(TestTimeouts::tiny_timeout());
1012 
1013   is_exiting.Set();
1014   // Unblocks the new workers.
1015   UnblockBusyTasks();
1016   task_tracker_.FlushForTesting();
1017 }
1018 
1019 // Verify that an increase of max tasks with SaturateWithBlockingTasks()
1020 // increases the number of tasks that can run before ShouldYield returns true.
TEST_P(ThreadGroupSemaphoreBlockingTest,ThreadBlockedUnblockedShouldYield)1021 TEST_P(ThreadGroupSemaphoreBlockingTest, ThreadBlockedUnblockedShouldYield) {
1022   CreateAndStartThreadGroup();
1023 
1024   ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1025 
1026   EXPECT_FALSE(
1027       thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
1028   SaturateWithBlockingTasks(GetParam());
1029   EXPECT_FALSE(
1030       thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
1031 
1032   // Forces |kMaxTasks| extra workers to be instantiated by posting tasks. This
1033   // should not block forever.
1034   SaturateWithBusyTasks();
1035 
1036   // All tasks can run, hence ShouldYield returns false.
1037   EXPECT_FALSE(
1038       thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
1039 
1040   // Post a USER_VISIBLE task that can't run since workers are saturated. This
1041   // should cause BEST_EFFORT tasks to yield.
1042   test::CreatePooledTaskRunner({TaskPriority::USER_VISIBLE},
1043                                &mock_pooled_task_runner_delegate_)
1044       ->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
1045                    EXPECT_FALSE(thread_group_->ShouldYield(
1046                        {TaskPriority::BEST_EFFORT, TimeTicks()}));
1047                  }));
1048   EXPECT_TRUE(
1049       thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
1050 
1051   // Post a USER_BLOCKING task that can't run since workers are saturated. This
1052   // should cause USER_VISIBLE tasks to yield.
1053   test::CreatePooledTaskRunner({TaskPriority::USER_BLOCKING},
1054                                &mock_pooled_task_runner_delegate_)
1055       ->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
1056                    EXPECT_FALSE(thread_group_->ShouldYield(
1057                        {TaskPriority::USER_VISIBLE, TimeTicks()}));
1058                  }));
1059   EXPECT_TRUE(
1060       thread_group_->ShouldYield({TaskPriority::USER_VISIBLE, TimeTicks()}));
1061 
1062   UnblockBusyTasks();
1063   UnblockBlockingTasks();
1064   task_tracker_.FlushForTesting();
1065   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1066 }
1067 
1068 INSTANTIATE_TEST_SUITE_P(
1069     All,
1070     ThreadGroupSemaphoreBlockingTest,
1071     ::testing::Values(NestedBlockingType(BlockingType::MAY_BLOCK,
1072                                          OptionalBlockingType::NO_BLOCK,
1073                                          BlockingType::MAY_BLOCK),
1074                       NestedBlockingType(BlockingType::WILL_BLOCK,
1075                                          OptionalBlockingType::NO_BLOCK,
1076                                          BlockingType::WILL_BLOCK),
1077                       NestedBlockingType(BlockingType::MAY_BLOCK,
1078                                          OptionalBlockingType::WILL_BLOCK,
1079                                          BlockingType::WILL_BLOCK),
1080                       NestedBlockingType(BlockingType::WILL_BLOCK,
1081                                          OptionalBlockingType::MAY_BLOCK,
1082                                          BlockingType::WILL_BLOCK)),
1083     ThreadGroupSemaphoreBlockingTest::ParamInfoToString);
1084 
1085 // Verify that if a thread enters the scope of a MAY_BLOCK ScopedBlockingCall,
1086 // but exits the scope before the MayBlock threshold is reached, that the max
1087 // tasks does not increase.
TEST_F(ThreadGroupSemaphoreBlockingTest,ThreadBlockUnblockPremature)1088 TEST_F(ThreadGroupSemaphoreBlockingTest, ThreadBlockUnblockPremature) {
1089   // Create a thread group with an infinite MayBlock threshold so that a
1090   // MAY_BLOCK ScopedBlockingCall never increases the max tasks.
1091   CreateAndStartThreadGroup(TimeDelta::Max(),   // |suggested_reclaim_time|
1092                             kMaxTasks,          // |max_tasks|
1093                             std::nullopt,       // |max_best_effort_tasks|
1094                             nullptr,            // |worker_observer|
1095                             TimeDelta::Max());  // |may_block_threshold|
1096 
1097   ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1098 
1099   SaturateWithBlockingTasks(NestedBlockingType(BlockingType::MAY_BLOCK,
1100                                                OptionalBlockingType::NO_BLOCK,
1101                                                BlockingType::MAY_BLOCK));
1102   PlatformThread::Sleep(
1103       2 * thread_group_->blocked_workers_poll_period_for_testing());
1104   EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), kMaxTasks);
1105   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1106 
1107   UnblockBlockingTasks();
1108   task_tracker_.FlushForTesting();
1109   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1110 }
1111 
1112 // Verify that if a BEST_EFFORT task enters the scope of a WILL_BLOCK
1113 // ScopedBlockingCall, but exits the scope before the MayBlock threshold is
1114 // reached, that the max best effort tasks does not increase.
TEST_F(ThreadGroupSemaphoreBlockingTest,ThreadBlockUnblockPrematureBestEffort)1115 TEST_F(ThreadGroupSemaphoreBlockingTest,
1116        ThreadBlockUnblockPrematureBestEffort) {
1117   // Create a thread group with an infinite MayBlock threshold so that a
1118   // MAY_BLOCK ScopedBlockingCall never increases the max tasks.
1119   CreateAndStartThreadGroup(TimeDelta::Max(),   // |suggested_reclaim_time|
1120                             kMaxTasks,          // |max_tasks|
1121                             kMaxTasks,          // |max_best_effort_tasks|
1122                             nullptr,            // |worker_observer|
1123                             TimeDelta::Max());  // |may_block_threshold|
1124 
1125   ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1126   ASSERT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
1127 
1128   SaturateWithBlockingTasks(NestedBlockingType(BlockingType::WILL_BLOCK,
1129                                                OptionalBlockingType::NO_BLOCK,
1130                                                BlockingType::WILL_BLOCK),
1131                             TaskPriority::BEST_EFFORT);
1132   PlatformThread::Sleep(
1133       2 * thread_group_->blocked_workers_poll_period_for_testing());
1134   EXPECT_GE(thread_group_->NumberOfWorkersForTesting(), kMaxTasks);
1135   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), 2 * kMaxTasks);
1136   EXPECT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
1137 
1138   UnblockBlockingTasks();
1139   task_tracker_.FlushForTesting();
1140   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1141   EXPECT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
1142 }
1143 
1144 // Verify that if max tasks is incremented because of a MAY_BLOCK
1145 // ScopedBlockingCall, it isn't incremented again when there is a nested
1146 // WILL_BLOCK ScopedBlockingCall.
TEST_F(ThreadGroupSemaphoreBlockingTest,MayBlockIncreaseCapacityNestedWillBlock)1147 TEST_F(ThreadGroupSemaphoreBlockingTest,
1148        MayBlockIncreaseCapacityNestedWillBlock) {
1149   CreateAndStartThreadGroup();
1150 
1151   ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1152   auto task_runner =
1153       test::CreatePooledTaskRunner({MayBlock(), WithBaseSyncPrimitives()},
1154                                    &mock_pooled_task_runner_delegate_);
1155   TestWaitableEvent can_return;
1156 
1157   // Saturate the thread group so that a MAY_BLOCK ScopedBlockingCall would
1158   // increment the max tasks.
1159   for (size_t i = 0; i < kMaxTasks - 1; ++i) {
1160     task_runner->PostTask(
1161         FROM_HERE, BindOnce(&TestWaitableEvent::Wait, Unretained(&can_return)));
1162   }
1163 
1164   TestWaitableEvent can_instantiate_will_block;
1165   TestWaitableEvent did_instantiate_will_block;
1166 
1167   // Post a task that instantiates a MAY_BLOCK ScopedBlockingCall.
1168   task_runner->PostTask(
1169       FROM_HERE,
1170       BindOnce(
1171           [](TestWaitableEvent* can_instantiate_will_block,
1172              TestWaitableEvent* did_instantiate_will_block,
1173              TestWaitableEvent* can_return) {
1174             ScopedBlockingCall may_block(FROM_HERE, BlockingType::MAY_BLOCK);
1175             can_instantiate_will_block->Wait();
1176             ScopedBlockingCall will_block(FROM_HERE, BlockingType::WILL_BLOCK);
1177             did_instantiate_will_block->Signal();
1178             can_return->Wait();
1179           },
1180           Unretained(&can_instantiate_will_block),
1181           Unretained(&did_instantiate_will_block), Unretained(&can_return)));
1182 
1183   // After a short delay, max tasks should be incremented.
1184   ExpectMaxTasksIncreasesTo(kMaxTasks + 1);
1185 
1186   // Wait until the task instantiates a WILL_BLOCK ScopedBlockingCall.
1187   can_instantiate_will_block.Signal();
1188   did_instantiate_will_block.Wait();
1189 
1190   // Max tasks shouldn't be incremented again.
1191   EXPECT_EQ(kMaxTasks + 1, thread_group_->GetMaxTasksForTesting());
1192 
1193   // Tear down.
1194   can_return.Signal();
1195   task_tracker_.FlushForTesting();
1196   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1197 }
1198 
1199 // Verify that OnShutdownStarted() causes max tasks to increase and creates a
1200 // worker if needed. Also verify that UnblockBusyTasks() decreases max tasks
1201 // after an increase.
TEST_F(ThreadGroupSemaphoreBlockingTest,ThreadBusyShutdown)1202 TEST_F(ThreadGroupSemaphoreBlockingTest, ThreadBusyShutdown) {
1203   CreateAndStartThreadGroup();
1204   ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1205 
1206   SaturateWithBusyTasks(TaskPriority::BEST_EFFORT,
1207                         TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN);
1208   thread_group_->OnShutdownStarted();
1209 
1210   // Forces |kMaxTasks| extra workers to be instantiated by posting tasks. This
1211   // should not block forever.
1212   SaturateWithBusyTasks(TaskPriority::BEST_EFFORT,
1213                         TaskShutdownBehavior::BLOCK_SHUTDOWN);
1214 
1215   EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
1216 
1217   UnblockBusyTasks();
1218   task_tracker_.FlushForTesting();
1219   thread_group_->JoinForTesting();
1220   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1221   mock_pooled_task_runner_delegate_.SetThreadGroup(nullptr);
1222   thread_group_.reset();
1223 }
1224 
1225 enum class ReclaimType { DELAYED_RECLAIM, NO_RECLAIM };
1226 
1227 class ThreadGroupSemaphoreOverCapacityTest
1228     : public ThreadGroupSemaphoreImplTestBase,
1229       public testing::TestWithParam<ReclaimType> {
1230  public:
1231   ThreadGroupSemaphoreOverCapacityTest() = default;
1232   ThreadGroupSemaphoreOverCapacityTest(
1233       const ThreadGroupSemaphoreOverCapacityTest&) = delete;
1234   ThreadGroupSemaphoreOverCapacityTest& operator=(
1235       const ThreadGroupSemaphoreOverCapacityTest&) = delete;
1236 
SetUp()1237   void SetUp() override {
1238     if (GetParam() == ReclaimType::NO_RECLAIM) {
1239       feature_list.InitAndEnableFeature(kNoWorkerThreadReclaim);
1240     }
1241     CreateThreadGroup();
1242     task_runner_ =
1243         test::CreatePooledTaskRunner({MayBlock(), WithBaseSyncPrimitives()},
1244                                      &mock_pooled_task_runner_delegate_);
1245   }
1246 
TearDown()1247   void TearDown() override {
1248     ThreadGroupSemaphoreImplTestBase::CommonTearDown();
1249   }
1250 
1251  protected:
1252   base::test::ScopedFeatureList feature_list;
1253   scoped_refptr<TaskRunner> task_runner_;
1254   static constexpr size_t kLocalMaxTasks = 3;
1255 
CreateThreadGroup()1256   void CreateThreadGroup() {
1257     ASSERT_FALSE(thread_group_);
1258     service_thread_.Start();
1259     delayed_task_manager_.Start(service_thread_.task_runner());
1260     thread_group_ = std::make_unique<ThreadGroupSemaphore>(
1261         "OverCapacityTestThreadGroup", "A", ThreadType::kDefault,
1262         task_tracker_.GetTrackedRef(), tracked_ref_factory_.GetTrackedRef());
1263     ASSERT_TRUE(thread_group_);
1264 
1265     mock_pooled_task_runner_delegate_.SetThreadGroup(thread_group_.get());
1266   }
1267 };
1268 
1269 // Verify that workers that become idle due to the thread group being over
1270 // capacity will eventually cleanup.
TEST_P(ThreadGroupSemaphoreOverCapacityTest,VerifyCleanup)1271 TEST_P(ThreadGroupSemaphoreOverCapacityTest, VerifyCleanup) {
1272   StartThreadGroup(kReclaimTimeForCleanupTests, kLocalMaxTasks);
1273   TestWaitableEvent threads_running;
1274   TestWaitableEvent threads_continue;
1275   RepeatingClosure threads_running_barrier = BarrierClosure(
1276       kLocalMaxTasks,
1277       BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
1278 
1279   TestWaitableEvent blocked_call_continue;
1280   RepeatingClosure closure = BindRepeating(
1281       [](RepeatingClosure* threads_running_barrier,
1282          TestWaitableEvent* threads_continue,
1283          TestWaitableEvent* blocked_call_continue) {
1284         threads_running_barrier->Run();
1285         {
1286           ScopedBlockingCall scoped_blocking_call(FROM_HERE,
1287                                                   BlockingType::WILL_BLOCK);
1288           blocked_call_continue->Wait();
1289         }
1290         threads_continue->Wait();
1291       },
1292       Unretained(&threads_running_barrier), Unretained(&threads_continue),
1293       Unretained(&blocked_call_continue));
1294 
1295   for (size_t i = 0; i < kLocalMaxTasks; ++i) {
1296     task_runner_->PostTask(FROM_HERE, closure);
1297   }
1298 
1299   threads_running.Wait();
1300 
1301   TestWaitableEvent extra_threads_running;
1302   TestWaitableEvent extra_threads_continue;
1303 
1304   RepeatingClosure extra_threads_running_barrier = BarrierClosure(
1305       kLocalMaxTasks,
1306       BindOnce(&TestWaitableEvent::Signal, Unretained(&extra_threads_running)));
1307   // These tasks should run on the new threads from increasing max tasks.
1308   for (size_t i = 0; i < kLocalMaxTasks; ++i) {
1309     task_runner_->PostTask(
1310         FROM_HERE, BindOnce(
1311                        [](RepeatingClosure* extra_threads_running_barrier,
1312                           TestWaitableEvent* extra_threads_continue) {
1313                          extra_threads_running_barrier->Run();
1314                          extra_threads_continue->Wait();
1315                        },
1316                        Unretained(&extra_threads_running_barrier),
1317                        Unretained(&extra_threads_continue)));
1318   }
1319   extra_threads_running.Wait();
1320 
1321   ASSERT_EQ(kLocalMaxTasks * 2, thread_group_->NumberOfWorkersForTesting());
1322   EXPECT_EQ(kLocalMaxTasks * 2, thread_group_->GetMaxTasksForTesting());
1323   blocked_call_continue.Signal();
1324   extra_threads_continue.Signal();
1325 
1326   // This test in TGS does not post tasks intermittently because semaphores on
1327   // some platforms implement anti-starvation logic, and so a pool which is
1328   // given tasks from time to time will not always run the task on the same
1329   // worker, leading to many of the workers being "used" despite only one worker
1330   // being used at a time.
1331 
1332   if (GetParam() == ReclaimType::DELAYED_RECLAIM) {
1333     thread_group_->WaitForWorkersCleanedUpForTesting(kLocalMaxTasks - 1);
1334     EXPECT_GE(kLocalMaxTasks + 1, thread_group_->NumberOfWorkersForTesting());
1335     threads_continue.Signal();
1336   } else {
1337     // When workers are't automatically reclaimed after a delay, blocking tasks
1338     // need to return for extra workers to be cleaned up.
1339     threads_continue.Signal();
1340     thread_group_->WaitForWorkersCleanedUpForTesting(kLocalMaxTasks);
1341     EXPECT_EQ(kLocalMaxTasks, thread_group_->NumberOfWorkersForTesting());
1342   }
1343 
1344   threads_continue.Signal();
1345   task_tracker_.FlushForTesting();
1346 }
1347 
1348 INSTANTIATE_TEST_SUITE_P(ReclaimType,
1349                          ThreadGroupSemaphoreOverCapacityTest,
1350                          ::testing::Values(ReclaimType::DELAYED_RECLAIM,
1351                                            ReclaimType::NO_RECLAIM));
1352 
1353 // Verify that the maximum number of workers is 256 and that hitting the max
1354 // leaves the thread group in a valid state with regards to max tasks.
TEST_F(ThreadGroupSemaphoreBlockingTest,MaximumWorkersTest)1355 TEST_F(ThreadGroupSemaphoreBlockingTest, MaximumWorkersTest) {
1356   CreateAndStartThreadGroup();
1357 
1358   constexpr size_t kMaxNumberOfWorkers = 256;
1359   constexpr size_t kNumExtraTasks = 10;
1360 
1361   TestWaitableEvent early_blocking_threads_running;
1362   RepeatingClosure early_threads_barrier_closure =
1363       BarrierClosure(kMaxNumberOfWorkers,
1364                      BindOnce(&TestWaitableEvent::Signal,
1365                               Unretained(&early_blocking_threads_running)));
1366 
1367   TestWaitableEvent early_threads_finished;
1368   RepeatingClosure early_threads_finished_barrier = BarrierClosure(
1369       kMaxNumberOfWorkers, BindOnce(&TestWaitableEvent::Signal,
1370                                     Unretained(&early_threads_finished)));
1371 
1372   TestWaitableEvent early_release_threads_continue;
1373 
1374   // Post ScopedBlockingCall tasks to hit the worker cap.
1375   for (size_t i = 0; i < kMaxNumberOfWorkers; ++i) {
1376     task_runner_->PostTask(
1377         FROM_HERE, BindOnce(
1378                        [](RepeatingClosure* early_threads_barrier_closure,
1379                           TestWaitableEvent* early_release_threads_continue,
1380                           RepeatingClosure* early_threads_finished) {
1381                          {
1382                            ScopedBlockingCall scoped_blocking_call(
1383                                FROM_HERE, BlockingType::WILL_BLOCK);
1384                            early_threads_barrier_closure->Run();
1385                            early_release_threads_continue->Wait();
1386                          }
1387                          early_threads_finished->Run();
1388                        },
1389                        Unretained(&early_threads_barrier_closure),
1390                        Unretained(&early_release_threads_continue),
1391                        Unretained(&early_threads_finished_barrier)));
1392   }
1393 
1394   early_blocking_threads_running.Wait();
1395   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(),
1396             kMaxTasks + kMaxNumberOfWorkers);
1397 
1398   TestWaitableEvent late_release_thread_contine;
1399   TestWaitableEvent late_blocking_threads_running;
1400 
1401   RepeatingClosure late_threads_barrier_closure = BarrierClosure(
1402       kNumExtraTasks, BindOnce(&TestWaitableEvent::Signal,
1403                                Unretained(&late_blocking_threads_running)));
1404 
1405   // Posts additional tasks. Note: we should already have |kMaxNumberOfWorkers|
1406   // tasks running. These tasks should not be able to get executed yet as the
1407   // thread group is already at its max worker cap.
1408   for (size_t i = 0; i < kNumExtraTasks; ++i) {
1409     task_runner_->PostTask(
1410         FROM_HERE, BindOnce(
1411                        [](RepeatingClosure* late_threads_barrier_closure,
1412                           TestWaitableEvent* late_release_thread_contine) {
1413                          ScopedBlockingCall scoped_blocking_call(
1414                              FROM_HERE, BlockingType::WILL_BLOCK);
1415                          late_threads_barrier_closure->Run();
1416                          late_release_thread_contine->Wait();
1417                        },
1418                        Unretained(&late_threads_barrier_closure),
1419                        Unretained(&late_release_thread_contine)));
1420   }
1421 
1422   // Give time to see if we exceed the max number of workers.
1423   PlatformThread::Sleep(TestTimeouts::tiny_timeout());
1424   EXPECT_LE(thread_group_->NumberOfWorkersForTesting(), kMaxNumberOfWorkers);
1425 
1426   early_release_threads_continue.Signal();
1427   early_threads_finished.Wait();
1428   late_blocking_threads_running.Wait();
1429 
1430   TestWaitableEvent final_tasks_running;
1431   TestWaitableEvent final_tasks_continue;
1432   RepeatingClosure final_tasks_running_barrier = BarrierClosure(
1433       kMaxTasks,
1434       BindOnce(&TestWaitableEvent::Signal, Unretained(&final_tasks_running)));
1435 
1436   // Verify that we are still able to saturate the thread group.
1437   for (size_t i = 0; i < kMaxTasks; ++i) {
1438     task_runner_->PostTask(FROM_HERE,
1439                            BindOnce(
1440                                [](RepeatingClosure* closure,
1441                                   TestWaitableEvent* final_tasks_continue) {
1442                                  closure->Run();
1443                                  final_tasks_continue->Wait();
1444                                },
1445                                Unretained(&final_tasks_running_barrier),
1446                                Unretained(&final_tasks_continue)));
1447   }
1448   final_tasks_running.Wait();
1449   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks + kNumExtraTasks);
1450   late_release_thread_contine.Signal();
1451   final_tasks_continue.Signal();
1452   task_tracker_.FlushForTesting();
1453 }
1454 
1455 // Verify that the maximum number of best-effort tasks that can run concurrently
1456 // is honored.
TEST_F(ThreadGroupSemaphoreImplStartInBodyTest,MaxBestEffortTasks)1457 TEST_F(ThreadGroupSemaphoreImplStartInBodyTest, MaxBestEffortTasks) {
1458   constexpr int kMaxBestEffortTasks = kMaxTasks / 2;
1459   StartThreadGroup(TimeDelta::Max(),      // |suggested_reclaim_time|
1460                    kMaxTasks,             // |max_tasks|
1461                    kMaxBestEffortTasks);  // |max_best_effort_tasks|
1462   const scoped_refptr<TaskRunner> foreground_runner =
1463       test::CreatePooledTaskRunner({MayBlock()},
1464                                    &mock_pooled_task_runner_delegate_);
1465   const scoped_refptr<TaskRunner> background_runner =
1466       test::CreatePooledTaskRunner({TaskPriority::BEST_EFFORT, MayBlock()},
1467                                    &mock_pooled_task_runner_delegate_);
1468 
1469   // It should be possible to have |kMaxBestEffortTasks|
1470   // TaskPriority::BEST_EFFORT tasks running concurrently.
1471   TestWaitableEvent best_effort_tasks_running;
1472   TestWaitableEvent unblock_best_effort_tasks;
1473   RepeatingClosure best_effort_tasks_running_barrier = BarrierClosure(
1474       kMaxBestEffortTasks, BindOnce(&TestWaitableEvent::Signal,
1475                                     Unretained(&best_effort_tasks_running)));
1476 
1477   for (int i = 0; i < kMaxBestEffortTasks; ++i) {
1478     background_runner->PostTask(FROM_HERE, base::BindLambdaForTesting([&]() {
1479                                   best_effort_tasks_running_barrier.Run();
1480                                   unblock_best_effort_tasks.Wait();
1481                                 }));
1482   }
1483   best_effort_tasks_running.Wait();
1484 
1485   // No more TaskPriority::BEST_EFFORT task should run.
1486   AtomicFlag extra_best_effort_task_can_run;
1487   TestWaitableEvent extra_best_effort_task_running;
1488   background_runner->PostTask(
1489       FROM_HERE, base::BindLambdaForTesting([&]() {
1490         EXPECT_TRUE(extra_best_effort_task_can_run.IsSet());
1491         extra_best_effort_task_running.Signal();
1492       }));
1493 
1494   // An extra foreground task should be able to run.
1495   TestWaitableEvent foreground_task_running;
1496   foreground_runner->PostTask(
1497       FROM_HERE, base::BindOnce(&TestWaitableEvent::Signal,
1498                                 Unretained(&foreground_task_running)));
1499   foreground_task_running.Wait();
1500 
1501   // Completion of the TaskPriority::BEST_EFFORT tasks should allow the extra
1502   // TaskPriority::BEST_EFFORT task to run.
1503   extra_best_effort_task_can_run.Set();
1504   unblock_best_effort_tasks.Signal();
1505   extra_best_effort_task_running.Wait();
1506 
1507   // Wait for all tasks to complete before exiting to avoid invalid accesses.
1508   task_tracker_.FlushForTesting();
1509 }
1510 
1511 // Verify that flooding the thread group with BEST_EFFORT tasks doesn't cause
1512 // the creation of more than |max_best_effort_tasks| + 1 workers.
TEST_F(ThreadGroupSemaphoreImplStartInBodyTest,FloodBestEffortTasksDoesNotCreateTooManyWorkers)1513 TEST_F(ThreadGroupSemaphoreImplStartInBodyTest,
1514        FloodBestEffortTasksDoesNotCreateTooManyWorkers) {
1515   constexpr size_t kMaxBestEffortTasks = kMaxTasks / 2;
1516   StartThreadGroup(TimeDelta::Max(),      // |suggested_reclaim_time|
1517                    kMaxTasks,             // |max_tasks|
1518                    kMaxBestEffortTasks);  // |max_best_effort_tasks|
1519 
1520   const scoped_refptr<TaskRunner> runner =
1521       test::CreatePooledTaskRunner({TaskPriority::BEST_EFFORT, MayBlock()},
1522                                    &mock_pooled_task_runner_delegate_);
1523 
1524   for (size_t i = 0; i < kLargeNumber; ++i) {
1525     runner->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
1526                        EXPECT_LE(thread_group_->NumberOfWorkersForTesting(),
1527                                  kMaxBestEffortTasks + 1);
1528                      }));
1529   }
1530 
1531   // Wait for all tasks to complete before exiting to avoid invalid accesses.
1532   task_tracker_.FlushForTesting();
1533 }
1534 
1535 // Previously, a WILL_BLOCK ScopedBlockingCall unconditionally woke up a worker
1536 // if the priority queue was non-empty. Sometimes, that caused multiple workers
1537 // to be woken up for the same sequence. This test verifies that it is no longer
1538 // the case:
1539 // 1. Post and run task A.
1540 // 2. Post task B from task A.
1541 // 3. Task A enters a WILL_BLOCK ScopedBlockingCall. Once the idle thread is
1542 //    created, this should no-op because there are already enough workers
1543 //    (previously, a worker would be woken up because the priority queue isn't
1544 //    empty).
1545 // 5. Wait for all tasks to complete.
TEST_F(ThreadGroupSemaphoreImplStartInBodyTest,RepeatedWillBlockDoesNotCreateTooManyWorkers)1546 TEST_F(ThreadGroupSemaphoreImplStartInBodyTest,
1547        RepeatedWillBlockDoesNotCreateTooManyWorkers) {
1548   constexpr size_t kNumWorkers = 2U;
1549   StartThreadGroup(TimeDelta::Max(),  // |suggested_reclaim_time|
1550                    kNumWorkers,       // |max_tasks|
1551                    std::nullopt);     // |max_best_effort_tasks|
1552   const scoped_refptr<TaskRunner> runner = test::CreatePooledTaskRunner(
1553       {MayBlock()}, &mock_pooled_task_runner_delegate_);
1554 
1555   for (size_t i = 0; i < kLargeNumber; ++i) {
1556     runner->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
1557                        runner->PostTask(
1558                            FROM_HERE, BindLambdaForTesting([&]() {
1559                              EXPECT_LE(
1560                                  thread_group_->NumberOfWorkersForTesting(),
1561                                  kNumWorkers + 1);
1562                            }));
1563                        // Number of workers should not increase when there is
1564                        // enough capacity to accommodate queued and running
1565                        // sequences.
1566                        ScopedBlockingCall scoped_blocking_call(
1567                            FROM_HERE, BlockingType::WILL_BLOCK);
1568                        EXPECT_GE(kNumWorkers + 1,
1569                                  thread_group_->NumberOfWorkersForTesting());
1570                      }));
1571     // Wait for all tasks to complete.
1572     task_tracker_.FlushForTesting();
1573   }
1574 }
1575 
1576 namespace {
1577 
1578 class ThreadGroupSemaphoreBlockingCallAndMaxBestEffortTasksTest
1579     : public ThreadGroupSemaphoreImplTestBase,
1580       public testing::TestWithParam<BlockingType> {
1581  public:
1582   static constexpr int kMaxBestEffortTasks = kMaxTasks / 2;
1583 
1584   ThreadGroupSemaphoreBlockingCallAndMaxBestEffortTasksTest() = default;
1585   ThreadGroupSemaphoreBlockingCallAndMaxBestEffortTasksTest(
1586       const ThreadGroupSemaphoreBlockingCallAndMaxBestEffortTasksTest&) =
1587       delete;
1588   ThreadGroupSemaphoreBlockingCallAndMaxBestEffortTasksTest& operator=(
1589       const ThreadGroupSemaphoreBlockingCallAndMaxBestEffortTasksTest&) =
1590       delete;
1591 
SetUp()1592   void SetUp() override {
1593     CreateThreadGroup();
1594     thread_group_->Start(kMaxTasks, kMaxBestEffortTasks, base::TimeDelta::Max(),
1595                          service_thread_.task_runner(), nullptr,
1596                          ThreadGroup::WorkerEnvironment::NONE,
1597                          /*synchronous_thread_start_for_testing=*/false,
1598                          /*may_block_threshold=*/{});
1599   }
1600 
TearDown()1601   void TearDown() override {
1602     ThreadGroupSemaphoreImplTestBase::CommonTearDown();
1603   }
1604 
1605  private:
1606 };
1607 
1608 }  // namespace
1609 
TEST_P(ThreadGroupSemaphoreBlockingCallAndMaxBestEffortTasksTest,BlockingCallAndMaxBestEffortTasksTest)1610 TEST_P(ThreadGroupSemaphoreBlockingCallAndMaxBestEffortTasksTest,
1611        BlockingCallAndMaxBestEffortTasksTest) {
1612   const scoped_refptr<TaskRunner> background_runner =
1613       test::CreatePooledTaskRunner({TaskPriority::BEST_EFFORT, MayBlock()},
1614                                    &mock_pooled_task_runner_delegate_);
1615 
1616   // Post |kMaxBestEffortTasks| TaskPriority::BEST_EFFORT tasks that block in a
1617   // ScopedBlockingCall.
1618   TestWaitableEvent blocking_best_effort_tasks_running;
1619   TestWaitableEvent unblock_blocking_best_effort_tasks;
1620   RepeatingClosure blocking_best_effort_tasks_running_barrier =
1621       BarrierClosure(kMaxBestEffortTasks,
1622                      BindOnce(&TestWaitableEvent::Signal,
1623                               Unretained(&blocking_best_effort_tasks_running)));
1624   for (int i = 0; i < kMaxBestEffortTasks; ++i) {
1625     background_runner->PostTask(
1626         FROM_HERE, base::BindLambdaForTesting([&]() {
1627           blocking_best_effort_tasks_running_barrier.Run();
1628           ScopedBlockingCall scoped_blocking_call(FROM_HERE, GetParam());
1629           unblock_blocking_best_effort_tasks.Wait();
1630         }));
1631   }
1632   blocking_best_effort_tasks_running.Wait();
1633 
1634   // Post an extra |kMaxBestEffortTasks| TaskPriority::BEST_EFFORT tasks. They
1635   // should be able to run, because the existing TaskPriority::BEST_EFFORT tasks
1636   // are blocked within a ScopedBlockingCall.
1637   //
1638   // Note: We block the tasks until they have all started running to make sure
1639   // that it is possible to run an extra |kMaxBestEffortTasks| concurrently.
1640   TestWaitableEvent best_effort_tasks_running;
1641   TestWaitableEvent unblock_best_effort_tasks;
1642   RepeatingClosure best_effort_tasks_running_barrier = BarrierClosure(
1643       kMaxBestEffortTasks, BindOnce(&TestWaitableEvent::Signal,
1644                                     Unretained(&best_effort_tasks_running)));
1645   for (int i = 0; i < kMaxBestEffortTasks; ++i) {
1646     background_runner->PostTask(FROM_HERE, base::BindLambdaForTesting([&]() {
1647                                   best_effort_tasks_running_barrier.Run();
1648                                   unblock_best_effort_tasks.Wait();
1649                                 }));
1650   }
1651   best_effort_tasks_running.Wait();
1652 
1653   // Unblock all tasks and tear down.
1654   unblock_blocking_best_effort_tasks.Signal();
1655   unblock_best_effort_tasks.Signal();
1656   task_tracker_.FlushForTesting();
1657 }
1658 
1659 INSTANTIATE_TEST_SUITE_P(
1660     MayBlock,
1661     ThreadGroupSemaphoreBlockingCallAndMaxBestEffortTasksTest,
1662     ::testing::Values(BlockingType::MAY_BLOCK));
1663 INSTANTIATE_TEST_SUITE_P(
1664     WillBlock,
1665     ThreadGroupSemaphoreBlockingCallAndMaxBestEffortTasksTest,
1666     ::testing::Values(BlockingType::WILL_BLOCK));
1667 
1668 // Verify that worker detachment doesn't race with worker cleanup, regression
1669 // test for https://crbug.com/810464.
TEST_F(ThreadGroupSemaphoreImplStartInBodyTest,RacyCleanup)1670 TEST_F(ThreadGroupSemaphoreImplStartInBodyTest, RacyCleanup) {
1671   constexpr size_t kLocalMaxTasks = 256;
1672   constexpr TimeDelta kReclaimTimeForRacyCleanupTest = Milliseconds(10);
1673 
1674   thread_group_->Start(kLocalMaxTasks, kLocalMaxTasks,
1675                        kReclaimTimeForRacyCleanupTest,
1676                        service_thread_.task_runner(), nullptr,
1677                        ThreadGroup::WorkerEnvironment::NONE,
1678                        /*synchronous_thread_start_for_testing=*/false,
1679                        /*may_block_threshold=*/{});
1680 
1681   scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
1682       {WithBaseSyncPrimitives()}, &mock_pooled_task_runner_delegate_);
1683 
1684   TestWaitableEvent threads_running;
1685   TestWaitableEvent unblock_threads;
1686   RepeatingClosure threads_running_barrier = BarrierClosure(
1687       kLocalMaxTasks,
1688       BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
1689 
1690   for (size_t i = 0; i < kLocalMaxTasks; ++i) {
1691     task_runner->PostTask(
1692         FROM_HERE,
1693         BindOnce(
1694             [](OnceClosure on_running, TestWaitableEvent* unblock_threads) {
1695               std::move(on_running).Run();
1696               unblock_threads->Wait();
1697             },
1698             threads_running_barrier, Unretained(&unblock_threads)));
1699   }
1700 
1701   // Wait for all workers to be ready and release them all at once.
1702   threads_running.Wait();
1703   unblock_threads.Signal();
1704 
1705   // Sleep to wakeup precisely when all workers are going to try to cleanup per
1706   // being idle.
1707   PlatformThread::Sleep(kReclaimTimeForRacyCleanupTest);
1708 
1709   thread_group_->JoinForTesting();
1710 
1711   // Unwinding this test will be racy if worker cleanup can race with
1712   // ThreadGroupSemaphore destruction : https://crbug.com/810464.
1713   mock_pooled_task_runner_delegate_.SetThreadGroup(nullptr);
1714   thread_group_.reset();
1715 }
1716 
1717 }  // namespace internal
1718 }  // namespace base
1719