xref: /aosp_15_r20/external/cronet/base/task/thread_pool/thread_pool_impl.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/thread_pool_impl.h"
6 
7 #include <algorithm>
8 #include <optional>
9 #include <string>
10 #include <string_view>
11 #include <utility>
12 
13 #include "base/base_switches.h"
14 #include "base/command_line.h"
15 #include "base/compiler_specific.h"
16 #include "base/debug/leak_annotations.h"
17 #include "base/feature_list.h"
18 #include "base/functional/bind.h"
19 #include "base/functional/callback_helpers.h"
20 #include "base/message_loop/message_pump.h"
21 #include "base/message_loop/message_pump_type.h"
22 #include "base/metrics/field_trial_params.h"
23 #include "base/strings/string_util.h"
24 #include "base/system/sys_info.h"
25 #include "base/task/scoped_set_task_priority_for_current_thread.h"
26 #include "base/task/thread_pool/pooled_parallel_task_runner.h"
27 #include "base/task/thread_pool/pooled_sequenced_task_runner.h"
28 #include "base/task/thread_pool/task.h"
29 #include "base/task/thread_pool/task_source.h"
30 #include "base/task/thread_pool/task_source_sort_key.h"
31 #include "base/task/thread_pool/thread_group_impl.h"
32 #include "base/task/thread_pool/thread_group_semaphore.h"
33 #include "base/task/thread_pool/worker_thread.h"
34 #include "base/thread_annotations.h"
35 #include "base/threading/platform_thread.h"
36 #include "base/time/time.h"
37 #include "build/build_config.h"
38 
39 namespace base {
40 namespace internal {
41 
42 namespace {
43 
44 constexpr EnvironmentParams kForegroundPoolEnvironmentParams{
45     "Foreground", base::ThreadType::kDefault};
46 
47 constexpr EnvironmentParams kUtilityPoolEnvironmentParams{
48     "Utility", base::ThreadType::kUtility};
49 
50 constexpr EnvironmentParams kBackgroundPoolEnvironmentParams{
51     "Background", base::ThreadType::kBackground};
52 
53 constexpr size_t kMaxBestEffortTasks = 2;
54 
55 // Indicates whether BEST_EFFORT tasks are disabled by a command line switch.
HasDisableBestEffortTasksSwitch()56 bool HasDisableBestEffortTasksSwitch() {
57   // The CommandLine might not be initialized if ThreadPool is initialized in a
58   // dynamic library which doesn't have access to argc/argv.
59   return CommandLine::InitializedForCurrentProcess() &&
60          CommandLine::ForCurrentProcess()->HasSwitch(
61              switches::kDisableBestEffortTasks);
62 }
63 
64 // A global variable that can be set from test fixtures while no
65 // ThreadPoolInstance is active. Global instead of being a member variable to
66 // avoid having to add a public API to ThreadPoolInstance::InitParams for this
67 // internal edge case.
68 bool g_synchronous_thread_start_for_testing = false;
69 
70 }  // namespace
71 
ThreadPoolImpl(std::string_view histogram_label)72 ThreadPoolImpl::ThreadPoolImpl(std::string_view histogram_label)
73     : ThreadPoolImpl(histogram_label, std::make_unique<TaskTrackerImpl>()) {}
74 
ThreadPoolImpl(std::string_view histogram_label,std::unique_ptr<TaskTrackerImpl> task_tracker,bool use_background_threads)75 ThreadPoolImpl::ThreadPoolImpl(std::string_view histogram_label,
76                                std::unique_ptr<TaskTrackerImpl> task_tracker,
77                                bool use_background_threads)
78     : histogram_label_(histogram_label),
79       task_tracker_(std::move(task_tracker)),
80       use_background_threads_(use_background_threads),
81       single_thread_task_runner_manager_(task_tracker_->GetTrackedRef(),
82                                          &delayed_task_manager_),
83       has_disable_best_effort_switch_(HasDisableBestEffortTasksSwitch()),
84       tracked_ref_factory_(this) {
85   foreground_thread_group_ = std::make_unique<ThreadGroupImpl>(
86       histogram_label.empty()
87           ? std::string()
88           : JoinString(
89                 {histogram_label, kForegroundPoolEnvironmentParams.name_suffix},
90                 "."),
91       kForegroundPoolEnvironmentParams.name_suffix,
92       kForegroundPoolEnvironmentParams.thread_type_hint,
93       task_tracker_->GetTrackedRef(), tracked_ref_factory_.GetTrackedRef());
94 
95   if (CanUseBackgroundThreadTypeForWorkerThread()) {
96     background_thread_group_ = std::make_unique<ThreadGroupImpl>(
97         histogram_label.empty()
98             ? std::string()
99             : JoinString({histogram_label,
100                           kBackgroundPoolEnvironmentParams.name_suffix},
101                          "."),
102         kBackgroundPoolEnvironmentParams.name_suffix,
103         use_background_threads
104             ? kBackgroundPoolEnvironmentParams.thread_type_hint
105             : kForegroundPoolEnvironmentParams.thread_type_hint,
106         task_tracker_->GetTrackedRef(), tracked_ref_factory_.GetTrackedRef());
107   }
108 }
109 
~ThreadPoolImpl()110 ThreadPoolImpl::~ThreadPoolImpl() {
111 #if DCHECK_IS_ON()
112   DCHECK(join_for_testing_returned_.IsSet());
113 #endif
114 
115   // Reset thread groups to release held TrackedRefs, which block teardown.
116   foreground_thread_group_.reset();
117   utility_thread_group_.reset();
118   background_thread_group_.reset();
119 }
120 
Start(const ThreadPoolInstance::InitParams & init_params,WorkerThreadObserver * worker_thread_observer)121 void ThreadPoolImpl::Start(const ThreadPoolInstance::InitParams& init_params,
122                            WorkerThreadObserver* worker_thread_observer) {
123   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
124   DCHECK(!started_);
125 
126   // The max number of concurrent BEST_EFFORT tasks is |kMaxBestEffortTasks|,
127   // unless the max number of foreground threads is lower.
128   size_t max_best_effort_tasks =
129       std::min(kMaxBestEffortTasks, init_params.max_num_foreground_threads);
130 
131   // Start the service thread. On platforms that support it (POSIX except NaCL
132   // SFI), the service thread runs a MessageLoopForIO which is used to support
133   // FileDescriptorWatcher in the scope in which tasks run.
134   ServiceThread::Options service_thread_options;
135   service_thread_options.message_pump_type =
136 #if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
137       MessagePumpType::IO;
138 #else
139       MessagePumpType::DEFAULT;
140 #endif
141   CHECK(service_thread_.StartWithOptions(std::move(service_thread_options)));
142   if (g_synchronous_thread_start_for_testing)
143     service_thread_.WaitUntilThreadStarted();
144 
145   if (FeatureList::IsEnabled(kThreadGroupSemaphore)) {
146     auto old_foreground_group = std::move(foreground_thread_group_);
147 
148     foreground_thread_group_ = std::make_unique<ThreadGroupSemaphore>(
149         histogram_label_.empty()
150             ? std::string()
151             : JoinString({histogram_label_,
152                           kForegroundPoolEnvironmentParams.name_suffix},
153                          "."),
154         kForegroundPoolEnvironmentParams.name_suffix,
155         kForegroundPoolEnvironmentParams.thread_type_hint,
156         task_tracker_->GetTrackedRef(), tracked_ref_factory_.GetTrackedRef());
157 
158     old_foreground_group->HandoffAllTaskSourcesToOtherThreadGroup(
159         foreground_thread_group_.get());
160 
161     if (background_thread_group_) {
162       auto old_background_group = std::move(background_thread_group_);
163 
164       background_thread_group_ = std::make_unique<ThreadGroupSemaphore>(
165           histogram_label_.empty()
166               ? std::string()
167               : JoinString({histogram_label_,
168                             kBackgroundPoolEnvironmentParams.name_suffix},
169                            "."),
170           kBackgroundPoolEnvironmentParams.name_suffix,
171           use_background_threads_
172               ? kBackgroundPoolEnvironmentParams.thread_type_hint
173               : kForegroundPoolEnvironmentParams.thread_type_hint,
174           task_tracker_->GetTrackedRef(), tracked_ref_factory_.GetTrackedRef());
175 
176       old_background_group->HandoffAllTaskSourcesToOtherThreadGroup(
177           background_thread_group_.get());
178     }
179 
180     if (FeatureList::IsEnabled(kUseUtilityThreadGroup) &&
181         CanUseUtilityThreadTypeForWorkerThread()) {
182       utility_thread_group_ = std::make_unique<ThreadGroupSemaphore>(
183           histogram_label_.empty()
184               ? std::string()
185               : JoinString({histogram_label_,
186                             kUtilityPoolEnvironmentParams.name_suffix},
187                            "."),
188           kUtilityPoolEnvironmentParams.name_suffix,
189           kUtilityPoolEnvironmentParams.thread_type_hint,
190           task_tracker_->GetTrackedRef(), tracked_ref_factory_.GetTrackedRef());
191       foreground_thread_group_
192           ->HandoffNonUserBlockingTaskSourcesToOtherThreadGroup(
193               utility_thread_group_.get());
194     }
195   } else {
196     if (FeatureList::IsEnabled(kUseUtilityThreadGroup) &&
197         CanUseUtilityThreadTypeForWorkerThread()) {
198       utility_thread_group_ = std::make_unique<ThreadGroupImpl>(
199           histogram_label_.empty()
200               ? std::string()
201               : JoinString({histogram_label_,
202                             kUtilityPoolEnvironmentParams.name_suffix},
203                            "."),
204           kUtilityPoolEnvironmentParams.name_suffix,
205           kUtilityPoolEnvironmentParams.thread_type_hint,
206           task_tracker_->GetTrackedRef(), tracked_ref_factory_.GetTrackedRef());
207       foreground_thread_group_
208           ->HandoffNonUserBlockingTaskSourcesToOtherThreadGroup(
209               utility_thread_group_.get());
210     }
211   }
212 
213   // Update the CanRunPolicy based on |has_disable_best_effort_switch_|.
214   UpdateCanRunPolicy();
215 
216   // Needs to happen after starting the service thread to get its task_runner().
217   auto service_thread_task_runner = service_thread_.task_runner();
218   delayed_task_manager_.Start(service_thread_task_runner);
219 
220   single_thread_task_runner_manager_.Start(service_thread_task_runner,
221                                            worker_thread_observer);
222 
223   ThreadGroup::WorkerEnvironment worker_environment;
224   switch (init_params.common_thread_pool_environment) {
225     case InitParams::CommonThreadPoolEnvironment::DEFAULT:
226       worker_environment = ThreadGroup::WorkerEnvironment::NONE;
227       break;
228 #if BUILDFLAG(IS_WIN)
229     case InitParams::CommonThreadPoolEnvironment::COM_MTA:
230       worker_environment = ThreadGroup::WorkerEnvironment::COM_MTA;
231       break;
232 #endif
233   }
234 
235   size_t foreground_threads = init_params.max_num_foreground_threads;
236   size_t utility_threads = init_params.max_num_utility_threads;
237 
238   // On platforms that can't use the background thread priority, best-effort
239   // tasks run in foreground pools. A cap is set on the number of best-effort
240   // tasks that can run in foreground pools to ensure that there is always
241   // room for incoming foreground tasks and to minimize the performance impact
242   // of best-effort tasks.
243   foreground_thread_group_.get()->Start(
244       foreground_threads, max_best_effort_tasks,
245       init_params.suggested_reclaim_time, service_thread_task_runner,
246       worker_thread_observer, worker_environment,
247       g_synchronous_thread_start_for_testing,
248       /*may_block_threshold=*/{});
249 
250   if (utility_thread_group_) {
251     utility_thread_group_.get()->Start(
252         utility_threads, max_best_effort_tasks,
253         init_params.suggested_reclaim_time, service_thread_task_runner,
254         worker_thread_observer, worker_environment,
255         g_synchronous_thread_start_for_testing,
256         /*may_block_threshold=*/{});
257   }
258 
259   if (background_thread_group_) {
260     background_thread_group_.get()->Start(
261         max_best_effort_tasks, max_best_effort_tasks,
262         init_params.suggested_reclaim_time, service_thread_task_runner,
263         worker_thread_observer, worker_environment,
264         g_synchronous_thread_start_for_testing,
265         /*may_block_threshold=*/{});
266   }
267 
268   started_ = true;
269 }
270 
WasStarted() const271 bool ThreadPoolImpl::WasStarted() const {
272   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
273   return started_;
274 }
275 
WasStartedUnsafe() const276 bool ThreadPoolImpl::WasStartedUnsafe() const {
277   return TS_UNCHECKED_READ(started_);
278 }
279 
PostDelayedTask(const Location & from_here,const TaskTraits & traits,OnceClosure task,TimeDelta delay)280 bool ThreadPoolImpl::PostDelayedTask(const Location& from_here,
281                                      const TaskTraits& traits,
282                                      OnceClosure task,
283                                      TimeDelta delay) {
284   // Post |task| as part of a one-off single-task Sequence.
285   return PostTaskWithSequence(
286       Task(from_here, std::move(task), TimeTicks::Now(), delay,
287            MessagePump::GetLeewayIgnoringThreadOverride()),
288       MakeRefCounted<Sequence>(traits, nullptr,
289                                TaskSourceExecutionMode::kParallel));
290 }
291 
CreateTaskRunner(const TaskTraits & traits)292 scoped_refptr<TaskRunner> ThreadPoolImpl::CreateTaskRunner(
293     const TaskTraits& traits) {
294   return MakeRefCounted<PooledParallelTaskRunner>(traits, this);
295 }
296 
CreateSequencedTaskRunner(const TaskTraits & traits)297 scoped_refptr<SequencedTaskRunner> ThreadPoolImpl::CreateSequencedTaskRunner(
298     const TaskTraits& traits) {
299   return MakeRefCounted<PooledSequencedTaskRunner>(traits, this);
300 }
301 
302 scoped_refptr<SingleThreadTaskRunner>
CreateSingleThreadTaskRunner(const TaskTraits & traits,SingleThreadTaskRunnerThreadMode thread_mode)303 ThreadPoolImpl::CreateSingleThreadTaskRunner(
304     const TaskTraits& traits,
305     SingleThreadTaskRunnerThreadMode thread_mode) {
306   return single_thread_task_runner_manager_.CreateSingleThreadTaskRunner(
307       traits, thread_mode);
308 }
309 
310 #if BUILDFLAG(IS_WIN)
CreateCOMSTATaskRunner(const TaskTraits & traits,SingleThreadTaskRunnerThreadMode thread_mode)311 scoped_refptr<SingleThreadTaskRunner> ThreadPoolImpl::CreateCOMSTATaskRunner(
312     const TaskTraits& traits,
313     SingleThreadTaskRunnerThreadMode thread_mode) {
314   return single_thread_task_runner_manager_.CreateCOMSTATaskRunner(traits,
315                                                                    thread_mode);
316 }
317 #endif  // BUILDFLAG(IS_WIN)
318 
319 scoped_refptr<UpdateableSequencedTaskRunner>
CreateUpdateableSequencedTaskRunner(const TaskTraits & traits)320 ThreadPoolImpl::CreateUpdateableSequencedTaskRunner(const TaskTraits& traits) {
321   return MakeRefCounted<PooledSequencedTaskRunner>(traits, this);
322 }
323 
NextScheduledRunTimeForTesting() const324 std::optional<TimeTicks> ThreadPoolImpl::NextScheduledRunTimeForTesting()
325     const {
326   if (task_tracker_->HasIncompleteTaskSourcesForTesting())
327     return TimeTicks::Now();
328   return delayed_task_manager_.NextScheduledRunTime();
329 }
330 
ProcessRipeDelayedTasksForTesting()331 void ThreadPoolImpl::ProcessRipeDelayedTasksForTesting() {
332   delayed_task_manager_.ProcessRipeTasks();
333 }
334 
335 // static
SetSynchronousThreadStartForTesting(bool enabled)336 void ThreadPoolImpl::SetSynchronousThreadStartForTesting(bool enabled) {
337   DCHECK(!ThreadPoolInstance::Get());
338   g_synchronous_thread_start_for_testing = enabled;
339 }
340 
GetMaxConcurrentNonBlockedTasksWithTraitsDeprecated(const TaskTraits & traits) const341 size_t ThreadPoolImpl::GetMaxConcurrentNonBlockedTasksWithTraitsDeprecated(
342     const TaskTraits& traits) const {
343   // This method does not support getting the maximum number of BEST_EFFORT
344   // tasks that can run concurrently in a pool.
345   DCHECK_NE(traits.priority(), TaskPriority::BEST_EFFORT);
346   return GetThreadGroupForTraits(traits)
347       ->GetMaxConcurrentNonBlockedTasksDeprecated();
348 }
349 
Shutdown()350 void ThreadPoolImpl::Shutdown() {
351   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
352 
353   // Cancels an internal service thread task. This must be done before stopping
354   // the service thread.
355   delayed_task_manager_.Shutdown();
356 
357   // Stop() the ServiceThread before triggering shutdown. This ensures that no
358   // more delayed tasks or file descriptor watches will trigger during shutdown
359   // (preventing http://crbug.com/698140). None of these asynchronous tasks
360   // being guaranteed to happen anyways, stopping right away is valid behavior
361   // and avoids the more complex alternative of shutting down the service thread
362   // atomically during TaskTracker shutdown.
363   service_thread_.Stop();
364 
365   task_tracker_->StartShutdown();
366 
367   // Allow all tasks to run. Done after initiating shutdown to ensure that non-
368   // BLOCK_SHUTDOWN tasks don't get a chance to run and that BLOCK_SHUTDOWN
369   // tasks run with a normal thread priority.
370   UpdateCanRunPolicy();
371 
372   // Ensures that there are enough background worker to run BLOCK_SHUTDOWN
373   // tasks.
374   foreground_thread_group_->OnShutdownStarted();
375   if (utility_thread_group_)
376     utility_thread_group_->OnShutdownStarted();
377   if (background_thread_group_)
378     background_thread_group_->OnShutdownStarted();
379 
380   task_tracker_->CompleteShutdown();
381 }
382 
FlushForTesting()383 void ThreadPoolImpl::FlushForTesting() {
384   task_tracker_->FlushForTesting();
385 }
386 
FlushAsyncForTesting(OnceClosure flush_callback)387 void ThreadPoolImpl::FlushAsyncForTesting(OnceClosure flush_callback) {
388   task_tracker_->FlushAsyncForTesting(std::move(flush_callback));
389 }
390 
JoinForTesting()391 void ThreadPoolImpl::JoinForTesting() {
392 #if DCHECK_IS_ON()
393   DCHECK(!join_for_testing_returned_.IsSet());
394 #endif
395   // Cancels an internal service thread task. This must be done before stopping
396   // the service thread.
397   delayed_task_manager_.Shutdown();
398   // The service thread must be stopped before the workers are joined, otherwise
399   // tasks scheduled by the DelayedTaskManager might be posted between joining
400   // those workers and stopping the service thread which will cause a CHECK. See
401   // https://crbug.com/771701.
402   service_thread_.Stop();
403   single_thread_task_runner_manager_.JoinForTesting();
404   foreground_thread_group_->JoinForTesting();
405   if (utility_thread_group_)
406     utility_thread_group_->JoinForTesting();  // IN-TEST
407   if (background_thread_group_)
408     background_thread_group_->JoinForTesting();
409 #if DCHECK_IS_ON()
410   join_for_testing_returned_.Set();
411 #endif
412 }
413 
BeginFence()414 void ThreadPoolImpl::BeginFence() {
415   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
416   ++num_fences_;
417   UpdateCanRunPolicy();
418 }
419 
EndFence()420 void ThreadPoolImpl::EndFence() {
421   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
422   DCHECK_GT(num_fences_, 0);
423   --num_fences_;
424   UpdateCanRunPolicy();
425 }
426 
BeginBestEffortFence()427 void ThreadPoolImpl::BeginBestEffortFence() {
428   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
429   ++num_best_effort_fences_;
430   UpdateCanRunPolicy();
431 }
432 
EndBestEffortFence()433 void ThreadPoolImpl::EndBestEffortFence() {
434   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
435   DCHECK_GT(num_best_effort_fences_, 0);
436   --num_best_effort_fences_;
437   UpdateCanRunPolicy();
438 }
439 
BeginFizzlingBlockShutdownTasks()440 void ThreadPoolImpl::BeginFizzlingBlockShutdownTasks() {
441   task_tracker_->BeginFizzlingBlockShutdownTasks();
442 }
443 
EndFizzlingBlockShutdownTasks()444 void ThreadPoolImpl::EndFizzlingBlockShutdownTasks() {
445   task_tracker_->EndFizzlingBlockShutdownTasks();
446 }
447 
PostTaskWithSequenceNow(Task task,scoped_refptr<Sequence> sequence)448 bool ThreadPoolImpl::PostTaskWithSequenceNow(Task task,
449                                              scoped_refptr<Sequence> sequence) {
450   auto transaction = sequence->BeginTransaction();
451   const bool sequence_should_be_queued = transaction.WillPushImmediateTask();
452   RegisteredTaskSource task_source;
453   if (sequence_should_be_queued) {
454     task_source = task_tracker_->RegisterTaskSource(sequence);
455     // We shouldn't push |task| if we're not allowed to queue |task_source|.
456     if (!task_source)
457       return false;
458   }
459   if (!task_tracker_->WillPostTaskNow(task, transaction.traits().priority()))
460     return false;
461   transaction.PushImmediateTask(std::move(task));
462   if (task_source) {
463     const TaskTraits traits = transaction.traits();
464     GetThreadGroupForTraits(traits)->PushTaskSourceAndWakeUpWorkers(
465         {std::move(task_source), std::move(transaction)});
466   }
467   return true;
468 }
469 
PostTaskWithSequence(Task task,scoped_refptr<Sequence> sequence)470 bool ThreadPoolImpl::PostTaskWithSequence(Task task,
471                                           scoped_refptr<Sequence> sequence) {
472   // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
473   // for details.
474   CHECK(task.task);
475   DCHECK(sequence);
476 
477   if (!task_tracker_->WillPostTask(&task, sequence->shutdown_behavior())) {
478     // `task`'s destructor may run sequence-affine code, so it must be leaked
479     // when `WillPostTask` returns false.
480     auto leak = std::make_unique<Task>(std::move(task));
481     ANNOTATE_LEAKING_OBJECT_PTR(leak.get());
482     leak.release();
483     return false;
484   }
485 
486   if (task.delayed_run_time.is_null()) {
487     return PostTaskWithSequenceNow(std::move(task), std::move(sequence));
488   } else {
489     // It's safe to take a ref on this pointer since the caller must have a ref
490     // to the TaskRunner in order to post.
491     scoped_refptr<TaskRunner> task_runner = sequence->task_runner();
492     delayed_task_manager_.AddDelayedTask(
493         std::move(task),
494         BindOnce(
495             [](scoped_refptr<Sequence> sequence,
496                ThreadPoolImpl* thread_pool_impl, scoped_refptr<TaskRunner>,
497                Task task) {
498               thread_pool_impl->PostTaskWithSequenceNow(std::move(task),
499                                                         std::move(sequence));
500             },
501             std::move(sequence), Unretained(this), std::move(task_runner)));
502   }
503 
504   return true;
505 }
506 
ShouldYield(const TaskSource * task_source)507 bool ThreadPoolImpl::ShouldYield(const TaskSource* task_source) {
508   const TaskPriority priority = task_source->priority_racy();
509   auto* const thread_group =
510       GetThreadGroupForTraits({priority, task_source->thread_policy()});
511   // A task whose priority changed and is now running in the wrong thread group
512   // should yield so it's rescheduled in the right one.
513   if (!thread_group->IsBoundToCurrentThread())
514     return true;
515   return GetThreadGroupForTraits({priority, task_source->thread_policy()})
516       ->ShouldYield(task_source->GetSortKey());
517 }
518 
EnqueueJobTaskSource(scoped_refptr<JobTaskSource> task_source)519 bool ThreadPoolImpl::EnqueueJobTaskSource(
520     scoped_refptr<JobTaskSource> task_source) {
521   auto registered_task_source =
522       task_tracker_->RegisterTaskSource(std::move(task_source));
523   if (!registered_task_source)
524     return false;
525   task_tracker_->WillEnqueueJob(
526       static_cast<JobTaskSource*>(registered_task_source.get()));
527   auto transaction = registered_task_source->BeginTransaction();
528   const TaskTraits traits = transaction.traits();
529   GetThreadGroupForTraits(traits)->PushTaskSourceAndWakeUpWorkers(
530       {std::move(registered_task_source), std::move(transaction)});
531   return true;
532 }
533 
RemoveJobTaskSource(scoped_refptr<JobTaskSource> task_source)534 void ThreadPoolImpl::RemoveJobTaskSource(
535     scoped_refptr<JobTaskSource> task_source) {
536   auto transaction = task_source->BeginTransaction();
537   ThreadGroup* const current_thread_group =
538       GetThreadGroupForTraits(transaction.traits());
539   current_thread_group->RemoveTaskSource(*task_source);
540 }
541 
UpdatePriority(scoped_refptr<TaskSource> task_source,TaskPriority priority)542 void ThreadPoolImpl::UpdatePriority(scoped_refptr<TaskSource> task_source,
543                                     TaskPriority priority) {
544   auto transaction = task_source->BeginTransaction();
545 
546   if (transaction.traits().priority() == priority)
547     return;
548 
549   if (transaction.traits().priority() == TaskPriority::BEST_EFFORT) {
550     DCHECK(transaction.traits().thread_policy_set_explicitly())
551         << "A ThreadPolicy must be specified in the TaskTraits of an "
552            "UpdateableSequencedTaskRunner whose priority is increased from "
553            "BEST_EFFORT. See ThreadPolicy documentation.";
554   }
555 
556   ThreadGroup* const current_thread_group =
557       GetThreadGroupForTraits(transaction.traits());
558   transaction.UpdatePriority(priority);
559   ThreadGroup* const new_thread_group =
560       GetThreadGroupForTraits(transaction.traits());
561 
562   if (new_thread_group == current_thread_group) {
563     // |task_source|'s position needs to be updated within its current thread
564     // group.
565     current_thread_group->UpdateSortKey(std::move(transaction));
566   } else {
567     // |task_source| is changing thread groups; remove it from its current
568     // thread group and reenqueue it.
569     auto registered_task_source =
570         current_thread_group->RemoveTaskSource(*task_source);
571     if (registered_task_source) {
572       DCHECK(task_source);
573       new_thread_group->PushTaskSourceAndWakeUpWorkers(
574           {std::move(registered_task_source), std::move(transaction)});
575     }
576   }
577 }
578 
UpdateJobPriority(scoped_refptr<TaskSource> task_source,TaskPriority priority)579 void ThreadPoolImpl::UpdateJobPriority(scoped_refptr<TaskSource> task_source,
580                                        TaskPriority priority) {
581   UpdatePriority(std::move(task_source), priority);
582 }
583 
GetThreadGroupForTraits(const TaskTraits & traits) const584 const ThreadGroup* ThreadPoolImpl::GetThreadGroupForTraits(
585     const TaskTraits& traits) const {
586   return const_cast<ThreadPoolImpl*>(this)->GetThreadGroupForTraits(traits);
587 }
588 
GetThreadGroupForTraits(const TaskTraits & traits)589 ThreadGroup* ThreadPoolImpl::GetThreadGroupForTraits(const TaskTraits& traits) {
590   if (traits.priority() == TaskPriority::BEST_EFFORT &&
591       traits.thread_policy() == ThreadPolicy::PREFER_BACKGROUND &&
592       background_thread_group_) {
593     return background_thread_group_.get();
594   }
595 
596   if (traits.priority() <= TaskPriority::USER_VISIBLE &&
597       traits.thread_policy() == ThreadPolicy::PREFER_BACKGROUND &&
598       utility_thread_group_) {
599     return utility_thread_group_.get();
600   }
601 
602   return foreground_thread_group_.get();
603 }
604 
UpdateCanRunPolicy()605 void ThreadPoolImpl::UpdateCanRunPolicy() {
606   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
607 
608   CanRunPolicy can_run_policy;
609   if ((num_fences_ == 0 && num_best_effort_fences_ == 0 &&
610        !has_disable_best_effort_switch_) ||
611       task_tracker_->HasShutdownStarted()) {
612     can_run_policy = CanRunPolicy::kAll;
613   } else if (num_fences_ != 0) {
614     can_run_policy = CanRunPolicy::kNone;
615   } else {
616     DCHECK(num_best_effort_fences_ > 0 || has_disable_best_effort_switch_);
617     can_run_policy = CanRunPolicy::kForegroundOnly;
618   }
619 
620   task_tracker_->SetCanRunPolicy(can_run_policy);
621   foreground_thread_group_->DidUpdateCanRunPolicy();
622   if (utility_thread_group_)
623     utility_thread_group_->DidUpdateCanRunPolicy();
624   if (background_thread_group_)
625     background_thread_group_->DidUpdateCanRunPolicy();
626   single_thread_task_runner_manager_.DidUpdateCanRunPolicy();
627 }
628 
629 }  // namespace internal
630 }  // namespace base
631