xref: /aosp_15_r20/external/cronet/base/task/thread_pool/test_utils.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2017 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/test_utils.h"
6 
7 #include <utility>
8 
9 #include "base/check.h"
10 #include "base/debug/leak_annotations.h"
11 #include "base/functional/bind.h"
12 #include "base/functional/overloaded.h"
13 #include "base/memory/raw_ptr.h"
14 #include "base/synchronization/condition_variable.h"
15 #include "base/task/thread_pool/pooled_parallel_task_runner.h"
16 #include "base/task/thread_pool/pooled_sequenced_task_runner.h"
17 #include "base/test/bind.h"
18 #include "base/threading/scoped_blocking_call_internal.h"
19 #include "base/threading/thread_restrictions.h"
20 #include "testing/gtest/include/gtest/gtest.h"
21 #include "third_party/abseil-cpp/absl/types/variant.h"
22 
23 namespace base {
24 namespace internal {
25 namespace test {
26 
27 namespace {
28 
29 // A task runner that posts each task as a MockJobTaskSource that runs a single
30 // task. This is used to run ThreadGroupTests which require a TaskRunner with
31 // kJob execution mode. Delayed tasks are not supported.
32 class MockJobTaskRunner : public TaskRunner {
33  public:
MockJobTaskRunner(const TaskTraits & traits,PooledTaskRunnerDelegate * pooled_task_runner_delegate)34   MockJobTaskRunner(const TaskTraits& traits,
35                     PooledTaskRunnerDelegate* pooled_task_runner_delegate)
36       : traits_(traits),
37         pooled_task_runner_delegate_(pooled_task_runner_delegate) {}
38 
39   MockJobTaskRunner(const MockJobTaskRunner&) = delete;
40   MockJobTaskRunner& operator=(const MockJobTaskRunner&) = delete;
41 
42   // TaskRunner:
43   bool PostDelayedTask(const Location& from_here,
44                        OnceClosure closure,
45                        TimeDelta delay) override;
46 
47  private:
48   ~MockJobTaskRunner() override = default;
49 
50   const TaskTraits traits_;
51   const raw_ptr<PooledTaskRunnerDelegate> pooled_task_runner_delegate_;
52 };
53 
PostDelayedTask(const Location & from_here,OnceClosure closure,TimeDelta delay)54 bool MockJobTaskRunner::PostDelayedTask(const Location& from_here,
55                                         OnceClosure closure,
56                                         TimeDelta delay) {
57   DCHECK_EQ(delay, TimeDelta());  // Jobs doesn't support delayed tasks.
58 
59   if (!PooledTaskRunnerDelegate::MatchesCurrentDelegate(
60           pooled_task_runner_delegate_)) {
61     return false;
62   }
63 
64   auto job_task = base::MakeRefCounted<MockJobTask>(std::move(closure));
65   scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
66       from_here, traits_, pooled_task_runner_delegate_);
67   return pooled_task_runner_delegate_->EnqueueJobTaskSource(
68       std::move(task_source));
69 }
70 
CreateJobTaskRunner(const TaskTraits & traits,MockPooledTaskRunnerDelegate * mock_pooled_task_runner_delegate)71 scoped_refptr<TaskRunner> CreateJobTaskRunner(
72     const TaskTraits& traits,
73     MockPooledTaskRunnerDelegate* mock_pooled_task_runner_delegate) {
74   return MakeRefCounted<MockJobTaskRunner>(traits,
75                                            mock_pooled_task_runner_delegate);
76 }
77 
78 }  // namespace
79 
MockWorkerThreadObserver()80 MockWorkerThreadObserver::MockWorkerThreadObserver()
81     : on_main_exit_cv_(lock_.CreateConditionVariable()) {}
82 
~MockWorkerThreadObserver()83 MockWorkerThreadObserver::~MockWorkerThreadObserver() {
84   WaitCallsOnMainExit();
85 }
86 
AllowCallsOnMainExit(int num_calls)87 void MockWorkerThreadObserver::AllowCallsOnMainExit(int num_calls) {
88   CheckedAutoLock auto_lock(lock_);
89   EXPECT_EQ(0, allowed_calls_on_main_exit_);
90   allowed_calls_on_main_exit_ = num_calls;
91 }
92 
WaitCallsOnMainExit()93 void MockWorkerThreadObserver::WaitCallsOnMainExit() {
94   CheckedAutoLock auto_lock(lock_);
95   while (allowed_calls_on_main_exit_ != 0)
96     on_main_exit_cv_.Wait();
97 }
98 
OnWorkerThreadMainExit()99 void MockWorkerThreadObserver::OnWorkerThreadMainExit() {
100   CheckedAutoLock auto_lock(lock_);
101   EXPECT_GE(allowed_calls_on_main_exit_, 0);
102   --allowed_calls_on_main_exit_;
103   if (allowed_calls_on_main_exit_ == 0)
104     on_main_exit_cv_.Signal();
105 }
106 
CreateSequenceWithTask(Task task,const TaskTraits & traits,scoped_refptr<SequencedTaskRunner> task_runner,TaskSourceExecutionMode execution_mode)107 scoped_refptr<Sequence> CreateSequenceWithTask(
108     Task task,
109     const TaskTraits& traits,
110     scoped_refptr<SequencedTaskRunner> task_runner,
111     TaskSourceExecutionMode execution_mode) {
112   scoped_refptr<Sequence> sequence =
113       MakeRefCounted<Sequence>(traits, task_runner.get(), execution_mode);
114   auto transaction = sequence->BeginTransaction();
115   transaction.WillPushImmediateTask();
116   transaction.PushImmediateTask(std::move(task));
117   return sequence;
118 }
119 
CreatePooledTaskRunnerWithExecutionMode(TaskSourceExecutionMode execution_mode,MockPooledTaskRunnerDelegate * mock_pooled_task_runner_delegate,const TaskTraits & traits)120 scoped_refptr<TaskRunner> CreatePooledTaskRunnerWithExecutionMode(
121     TaskSourceExecutionMode execution_mode,
122     MockPooledTaskRunnerDelegate* mock_pooled_task_runner_delegate,
123     const TaskTraits& traits) {
124   switch (execution_mode) {
125     case TaskSourceExecutionMode::kParallel:
126       return CreatePooledTaskRunner(traits, mock_pooled_task_runner_delegate);
127     case TaskSourceExecutionMode::kSequenced:
128       return CreatePooledSequencedTaskRunner(traits,
129                                              mock_pooled_task_runner_delegate);
130     case TaskSourceExecutionMode::kJob:
131       return CreateJobTaskRunner(traits, mock_pooled_task_runner_delegate);
132     default:
133       // Fall through.
134       break;
135   }
136   ADD_FAILURE() << "Unexpected ExecutionMode";
137   return nullptr;
138 }
139 
CreatePooledTaskRunner(const TaskTraits & traits,MockPooledTaskRunnerDelegate * mock_pooled_task_runner_delegate)140 scoped_refptr<TaskRunner> CreatePooledTaskRunner(
141     const TaskTraits& traits,
142     MockPooledTaskRunnerDelegate* mock_pooled_task_runner_delegate) {
143   return MakeRefCounted<PooledParallelTaskRunner>(
144       traits, mock_pooled_task_runner_delegate);
145 }
146 
CreatePooledSequencedTaskRunner(const TaskTraits & traits,MockPooledTaskRunnerDelegate * mock_pooled_task_runner_delegate)147 scoped_refptr<SequencedTaskRunner> CreatePooledSequencedTaskRunner(
148     const TaskTraits& traits,
149     MockPooledTaskRunnerDelegate* mock_pooled_task_runner_delegate) {
150   return MakeRefCounted<PooledSequencedTaskRunner>(
151       traits, mock_pooled_task_runner_delegate);
152 }
153 
MockPooledTaskRunnerDelegate(TrackedRef<TaskTracker> task_tracker,DelayedTaskManager * delayed_task_manager)154 MockPooledTaskRunnerDelegate::MockPooledTaskRunnerDelegate(
155     TrackedRef<TaskTracker> task_tracker,
156     DelayedTaskManager* delayed_task_manager)
157     : task_tracker_(task_tracker),
158       delayed_task_manager_(delayed_task_manager) {}
159 
160 MockPooledTaskRunnerDelegate::~MockPooledTaskRunnerDelegate() = default;
161 
PostTaskWithSequence(Task task,scoped_refptr<Sequence> sequence)162 bool MockPooledTaskRunnerDelegate::PostTaskWithSequence(
163     Task task,
164     scoped_refptr<Sequence> sequence) {
165   // |thread_group_| must be initialized with SetThreadGroup() before
166   // proceeding.
167   DCHECK(thread_group_);
168   DCHECK(task.task);
169   DCHECK(sequence);
170 
171   if (!task_tracker_->WillPostTask(&task, sequence->shutdown_behavior())) {
172     // `task`'s destructor may run sequence-affine code, so it must be leaked
173     // when `WillPostTask` returns false.
174     auto leak = std::make_unique<Task>(std::move(task));
175     ANNOTATE_LEAKING_OBJECT_PTR(leak.get());
176     leak.release();
177     return false;
178   }
179 
180   if (task.delayed_run_time.is_null()) {
181     PostTaskWithSequenceNow(std::move(task), std::move(sequence));
182   } else {
183     // It's safe to take a ref on this pointer since the caller must have a ref
184     // to the TaskRunner in order to post.
185     scoped_refptr<TaskRunner> task_runner = sequence->task_runner();
186     delayed_task_manager_->AddDelayedTask(
187         std::move(task),
188         BindOnce(
189             [](scoped_refptr<Sequence> sequence,
190                MockPooledTaskRunnerDelegate* self,
191                scoped_refptr<TaskRunner> task_runner, Task task) {
192               self->PostTaskWithSequenceNow(std::move(task),
193                                             std::move(sequence));
194             },
195             std::move(sequence), Unretained(this), std::move(task_runner)));
196   }
197 
198   return true;
199 }
200 
PostTaskWithSequenceNow(Task task,scoped_refptr<Sequence> sequence)201 void MockPooledTaskRunnerDelegate::PostTaskWithSequenceNow(
202     Task task,
203     scoped_refptr<Sequence> sequence) {
204   auto transaction = sequence->BeginTransaction();
205   const bool sequence_should_be_queued = transaction.WillPushImmediateTask();
206   RegisteredTaskSource task_source;
207   if (sequence_should_be_queued) {
208     task_source = task_tracker_->RegisterTaskSource(std::move(sequence));
209     // We shouldn't push |task| if we're not allowed to queue |task_source|.
210     if (!task_source)
211       return;
212   }
213   transaction.PushImmediateTask(std::move(task));
214   if (task_source) {
215     thread_group_->PushTaskSourceAndWakeUpWorkers(
216         {std::move(task_source), std::move(transaction)});
217   }
218 }
219 
ShouldYield(const TaskSource * task_source)220 bool MockPooledTaskRunnerDelegate::ShouldYield(const TaskSource* task_source) {
221   return thread_group_->ShouldYield(task_source->GetSortKey());
222 }
223 
EnqueueJobTaskSource(scoped_refptr<JobTaskSource> task_source)224 bool MockPooledTaskRunnerDelegate::EnqueueJobTaskSource(
225     scoped_refptr<JobTaskSource> task_source) {
226   // |thread_group_| must be initialized with SetThreadGroup() before
227   // proceeding.
228   DCHECK(thread_group_);
229   DCHECK(task_source);
230 
231   auto registered_task_source =
232       task_tracker_->RegisterTaskSource(std::move(task_source));
233   if (!registered_task_source)
234     return false;
235   auto transaction = registered_task_source->BeginTransaction();
236   thread_group_->PushTaskSourceAndWakeUpWorkers(
237       {std::move(registered_task_source), std::move(transaction)});
238   return true;
239 }
240 
RemoveJobTaskSource(scoped_refptr<JobTaskSource> task_source)241 void MockPooledTaskRunnerDelegate::RemoveJobTaskSource(
242     scoped_refptr<JobTaskSource> task_source) {
243   thread_group_->RemoveTaskSource(*task_source);
244 }
245 
UpdatePriority(scoped_refptr<TaskSource> task_source,TaskPriority priority)246 void MockPooledTaskRunnerDelegate::UpdatePriority(
247     scoped_refptr<TaskSource> task_source,
248     TaskPriority priority) {
249   auto transaction = task_source->BeginTransaction();
250   transaction.UpdatePriority(priority);
251   thread_group_->UpdateSortKey(std::move(transaction));
252 }
253 
UpdateJobPriority(scoped_refptr<TaskSource> task_source,TaskPriority priority)254 void MockPooledTaskRunnerDelegate::UpdateJobPriority(
255     scoped_refptr<TaskSource> task_source,
256     TaskPriority priority) {
257   UpdatePriority(std::move(task_source), priority);
258 }
259 
SetThreadGroup(ThreadGroup * thread_group)260 void MockPooledTaskRunnerDelegate::SetThreadGroup(ThreadGroup* thread_group) {
261   thread_group_ = thread_group;
262 }
263 
264 MockJobTask::~MockJobTask() = default;
265 
MockJobTask(base::RepeatingCallback<void (JobDelegate *)> worker_task,size_t num_tasks_to_run)266 MockJobTask::MockJobTask(
267     base::RepeatingCallback<void(JobDelegate*)> worker_task,
268     size_t num_tasks_to_run)
269     : task_(std::move(worker_task)),
270       remaining_num_tasks_to_run_(num_tasks_to_run) {
271   CHECK(!absl::get<decltype(worker_task)>(task_).is_null());
272 }
273 
MockJobTask(base::OnceClosure worker_task)274 MockJobTask::MockJobTask(base::OnceClosure worker_task)
275     : task_(std::move(worker_task)), remaining_num_tasks_to_run_(1) {
276   CHECK(!absl::get<decltype(worker_task)>(task_).is_null());
277 }
278 
SetNumTasksToRun(size_t num_tasks_to_run)279 void MockJobTask::SetNumTasksToRun(size_t num_tasks_to_run) {
280   if (num_tasks_to_run == 0) {
281     remaining_num_tasks_to_run_ = 0;
282     return;
283   }
284   if (auto* closure = absl::get_if<base::OnceClosure>(&task_); closure) {
285     // 0 is already handled above, so this can only be an attempt to set to
286     // a non-zero value for a OnceClosure. In that case, the only permissible
287     // value is 1, and the closure must not be null.
288     //
289     // Note that there is no need to check `!is_null()` for repeating callbacks,
290     // since `Run(JobDelegate*)` never consumes the repeating callback variant.
291     CHECK(!closure->is_null());
292     CHECK_EQ(1u, num_tasks_to_run);
293   }
294   remaining_num_tasks_to_run_ = num_tasks_to_run;
295 }
296 
GetMaxConcurrency(size_t) const297 size_t MockJobTask::GetMaxConcurrency(size_t /* worker_count */) const {
298   return remaining_num_tasks_to_run_.load();
299 }
300 
Run(JobDelegate * delegate)301 void MockJobTask::Run(JobDelegate* delegate) {
302   absl::visit(
303       base::Overloaded{
304           [](OnceClosure& closure) { std::move(closure).Run(); },
305           [delegate](const RepeatingCallback<void(JobDelegate*)>& callback) {
306             callback.Run(delegate);
307           }},
308       task_);
309   CHECK_GT(remaining_num_tasks_to_run_.fetch_sub(1), 0u);
310 }
311 
GetJobTaskSource(const Location & from_here,const TaskTraits & traits,PooledTaskRunnerDelegate * delegate)312 scoped_refptr<JobTaskSource> MockJobTask::GetJobTaskSource(
313     const Location& from_here,
314     const TaskTraits& traits,
315     PooledTaskRunnerDelegate* delegate) {
316   return MakeRefCounted<JobTaskSource>(
317       from_here, traits, base::BindRepeating(&test::MockJobTask::Run, this),
318       base::BindRepeating(&test::MockJobTask::GetMaxConcurrency, this),
319       delegate);
320 }
321 
QueueAndRunTaskSource(TaskTracker * task_tracker,scoped_refptr<TaskSource> task_source)322 RegisteredTaskSource QueueAndRunTaskSource(
323     TaskTracker* task_tracker,
324     scoped_refptr<TaskSource> task_source) {
325   auto registered_task_source =
326       task_tracker->RegisterTaskSource(std::move(task_source));
327   EXPECT_TRUE(registered_task_source);
328   EXPECT_NE(registered_task_source.WillRunTask(),
329             TaskSource::RunStatus::kDisallowed);
330   return task_tracker->RunAndPopNextTask(std::move(registered_task_source));
331 }
332 
ShutdownTaskTracker(TaskTracker * task_tracker)333 void ShutdownTaskTracker(TaskTracker* task_tracker) {
334   task_tracker->StartShutdown();
335   task_tracker->CompleteShutdown();
336 }
337 
338 }  // namespace test
339 }  // namespace internal
340 }  // namespace base
341