1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/thread_pool/thread_pool_impl.h"
6
7 #include <algorithm>
8 #include <optional>
9 #include <string>
10 #include <string_view>
11 #include <utility>
12
13 #include "base/base_switches.h"
14 #include "base/command_line.h"
15 #include "base/compiler_specific.h"
16 #include "base/debug/leak_annotations.h"
17 #include "base/feature_list.h"
18 #include "base/functional/bind.h"
19 #include "base/functional/callback_helpers.h"
20 #include "base/message_loop/message_pump.h"
21 #include "base/message_loop/message_pump_type.h"
22 #include "base/metrics/field_trial_params.h"
23 #include "base/strings/string_util.h"
24 #include "base/system/sys_info.h"
25 #include "base/task/scoped_set_task_priority_for_current_thread.h"
26 #include "base/task/thread_pool/pooled_parallel_task_runner.h"
27 #include "base/task/thread_pool/pooled_sequenced_task_runner.h"
28 #include "base/task/thread_pool/task.h"
29 #include "base/task/thread_pool/task_source.h"
30 #include "base/task/thread_pool/task_source_sort_key.h"
31 #include "base/task/thread_pool/thread_group_impl.h"
32 #include "base/task/thread_pool/thread_group_semaphore.h"
33 #include "base/task/thread_pool/worker_thread.h"
34 #include "base/thread_annotations.h"
35 #include "base/threading/platform_thread.h"
36 #include "base/time/time.h"
37 #include "build/build_config.h"
38
39 namespace base {
40 namespace internal {
41
42 namespace {
43
44 constexpr EnvironmentParams kForegroundPoolEnvironmentParams{
45 "Foreground", base::ThreadType::kDefault};
46
47 constexpr EnvironmentParams kUtilityPoolEnvironmentParams{
48 "Utility", base::ThreadType::kUtility};
49
50 constexpr EnvironmentParams kBackgroundPoolEnvironmentParams{
51 "Background", base::ThreadType::kBackground};
52
53 constexpr size_t kMaxBestEffortTasks = 2;
54
55 // Indicates whether BEST_EFFORT tasks are disabled by a command line switch.
HasDisableBestEffortTasksSwitch()56 bool HasDisableBestEffortTasksSwitch() {
57 // The CommandLine might not be initialized if ThreadPool is initialized in a
58 // dynamic library which doesn't have access to argc/argv.
59 return CommandLine::InitializedForCurrentProcess() &&
60 CommandLine::ForCurrentProcess()->HasSwitch(
61 switches::kDisableBestEffortTasks);
62 }
63
64 // A global variable that can be set from test fixtures while no
65 // ThreadPoolInstance is active. Global instead of being a member variable to
66 // avoid having to add a public API to ThreadPoolInstance::InitParams for this
67 // internal edge case.
68 bool g_synchronous_thread_start_for_testing = false;
69
70 } // namespace
71
ThreadPoolImpl(std::string_view histogram_label)72 ThreadPoolImpl::ThreadPoolImpl(std::string_view histogram_label)
73 : ThreadPoolImpl(histogram_label, std::make_unique<TaskTrackerImpl>()) {}
74
ThreadPoolImpl(std::string_view histogram_label,std::unique_ptr<TaskTrackerImpl> task_tracker,bool use_background_threads)75 ThreadPoolImpl::ThreadPoolImpl(std::string_view histogram_label,
76 std::unique_ptr<TaskTrackerImpl> task_tracker,
77 bool use_background_threads)
78 : histogram_label_(histogram_label),
79 task_tracker_(std::move(task_tracker)),
80 use_background_threads_(use_background_threads),
81 single_thread_task_runner_manager_(task_tracker_->GetTrackedRef(),
82 &delayed_task_manager_),
83 has_disable_best_effort_switch_(HasDisableBestEffortTasksSwitch()),
84 tracked_ref_factory_(this) {
85 foreground_thread_group_ = std::make_unique<ThreadGroupImpl>(
86 histogram_label.empty()
87 ? std::string()
88 : JoinString(
89 {histogram_label, kForegroundPoolEnvironmentParams.name_suffix},
90 "."),
91 kForegroundPoolEnvironmentParams.name_suffix,
92 kForegroundPoolEnvironmentParams.thread_type_hint,
93 task_tracker_->GetTrackedRef(), tracked_ref_factory_.GetTrackedRef());
94
95 if (CanUseBackgroundThreadTypeForWorkerThread()) {
96 background_thread_group_ = std::make_unique<ThreadGroupImpl>(
97 histogram_label.empty()
98 ? std::string()
99 : JoinString({histogram_label,
100 kBackgroundPoolEnvironmentParams.name_suffix},
101 "."),
102 kBackgroundPoolEnvironmentParams.name_suffix,
103 use_background_threads
104 ? kBackgroundPoolEnvironmentParams.thread_type_hint
105 : kForegroundPoolEnvironmentParams.thread_type_hint,
106 task_tracker_->GetTrackedRef(), tracked_ref_factory_.GetTrackedRef());
107 }
108 }
109
~ThreadPoolImpl()110 ThreadPoolImpl::~ThreadPoolImpl() {
111 #if DCHECK_IS_ON()
112 DCHECK(join_for_testing_returned_.IsSet());
113 #endif
114
115 // Reset thread groups to release held TrackedRefs, which block teardown.
116 foreground_thread_group_.reset();
117 utility_thread_group_.reset();
118 background_thread_group_.reset();
119 }
120
Start(const ThreadPoolInstance::InitParams & init_params,WorkerThreadObserver * worker_thread_observer)121 void ThreadPoolImpl::Start(const ThreadPoolInstance::InitParams& init_params,
122 WorkerThreadObserver* worker_thread_observer) {
123 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
124 DCHECK(!started_);
125
126 // The max number of concurrent BEST_EFFORT tasks is |kMaxBestEffortTasks|,
127 // unless the max number of foreground threads is lower.
128 size_t max_best_effort_tasks =
129 std::min(kMaxBestEffortTasks, init_params.max_num_foreground_threads);
130
131 // Start the service thread. On platforms that support it (POSIX except NaCL
132 // SFI), the service thread runs a MessageLoopForIO which is used to support
133 // FileDescriptorWatcher in the scope in which tasks run.
134 ServiceThread::Options service_thread_options;
135 service_thread_options.message_pump_type =
136 #if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
137 MessagePumpType::IO;
138 #else
139 MessagePumpType::DEFAULT;
140 #endif
141 CHECK(service_thread_.StartWithOptions(std::move(service_thread_options)));
142 if (g_synchronous_thread_start_for_testing)
143 service_thread_.WaitUntilThreadStarted();
144
145 if (FeatureList::IsEnabled(kThreadGroupSemaphore)) {
146 auto old_foreground_group = std::move(foreground_thread_group_);
147
148 foreground_thread_group_ = std::make_unique<ThreadGroupSemaphore>(
149 histogram_label_.empty()
150 ? std::string()
151 : JoinString({histogram_label_,
152 kForegroundPoolEnvironmentParams.name_suffix},
153 "."),
154 kForegroundPoolEnvironmentParams.name_suffix,
155 kForegroundPoolEnvironmentParams.thread_type_hint,
156 task_tracker_->GetTrackedRef(), tracked_ref_factory_.GetTrackedRef());
157
158 old_foreground_group->HandoffAllTaskSourcesToOtherThreadGroup(
159 foreground_thread_group_.get());
160
161 if (background_thread_group_) {
162 auto old_background_group = std::move(background_thread_group_);
163
164 background_thread_group_ = std::make_unique<ThreadGroupSemaphore>(
165 histogram_label_.empty()
166 ? std::string()
167 : JoinString({histogram_label_,
168 kBackgroundPoolEnvironmentParams.name_suffix},
169 "."),
170 kBackgroundPoolEnvironmentParams.name_suffix,
171 use_background_threads_
172 ? kBackgroundPoolEnvironmentParams.thread_type_hint
173 : kForegroundPoolEnvironmentParams.thread_type_hint,
174 task_tracker_->GetTrackedRef(), tracked_ref_factory_.GetTrackedRef());
175
176 old_background_group->HandoffAllTaskSourcesToOtherThreadGroup(
177 background_thread_group_.get());
178 }
179
180 if (FeatureList::IsEnabled(kUseUtilityThreadGroup) &&
181 CanUseUtilityThreadTypeForWorkerThread()) {
182 utility_thread_group_ = std::make_unique<ThreadGroupSemaphore>(
183 histogram_label_.empty()
184 ? std::string()
185 : JoinString({histogram_label_,
186 kUtilityPoolEnvironmentParams.name_suffix},
187 "."),
188 kUtilityPoolEnvironmentParams.name_suffix,
189 kUtilityPoolEnvironmentParams.thread_type_hint,
190 task_tracker_->GetTrackedRef(), tracked_ref_factory_.GetTrackedRef());
191 foreground_thread_group_
192 ->HandoffNonUserBlockingTaskSourcesToOtherThreadGroup(
193 utility_thread_group_.get());
194 }
195 } else {
196 if (FeatureList::IsEnabled(kUseUtilityThreadGroup) &&
197 CanUseUtilityThreadTypeForWorkerThread()) {
198 utility_thread_group_ = std::make_unique<ThreadGroupImpl>(
199 histogram_label_.empty()
200 ? std::string()
201 : JoinString({histogram_label_,
202 kUtilityPoolEnvironmentParams.name_suffix},
203 "."),
204 kUtilityPoolEnvironmentParams.name_suffix,
205 kUtilityPoolEnvironmentParams.thread_type_hint,
206 task_tracker_->GetTrackedRef(), tracked_ref_factory_.GetTrackedRef());
207 foreground_thread_group_
208 ->HandoffNonUserBlockingTaskSourcesToOtherThreadGroup(
209 utility_thread_group_.get());
210 }
211 }
212
213 // Update the CanRunPolicy based on |has_disable_best_effort_switch_|.
214 UpdateCanRunPolicy();
215
216 // Needs to happen after starting the service thread to get its task_runner().
217 auto service_thread_task_runner = service_thread_.task_runner();
218 delayed_task_manager_.Start(service_thread_task_runner);
219
220 single_thread_task_runner_manager_.Start(service_thread_task_runner,
221 worker_thread_observer);
222
223 ThreadGroup::WorkerEnvironment worker_environment;
224 switch (init_params.common_thread_pool_environment) {
225 case InitParams::CommonThreadPoolEnvironment::DEFAULT:
226 worker_environment = ThreadGroup::WorkerEnvironment::NONE;
227 break;
228 #if BUILDFLAG(IS_WIN)
229 case InitParams::CommonThreadPoolEnvironment::COM_MTA:
230 worker_environment = ThreadGroup::WorkerEnvironment::COM_MTA;
231 break;
232 #endif
233 }
234
235 size_t foreground_threads = init_params.max_num_foreground_threads;
236 size_t utility_threads = init_params.max_num_utility_threads;
237
238 // On platforms that can't use the background thread priority, best-effort
239 // tasks run in foreground pools. A cap is set on the number of best-effort
240 // tasks that can run in foreground pools to ensure that there is always
241 // room for incoming foreground tasks and to minimize the performance impact
242 // of best-effort tasks.
243 foreground_thread_group_.get()->Start(
244 foreground_threads, max_best_effort_tasks,
245 init_params.suggested_reclaim_time, service_thread_task_runner,
246 worker_thread_observer, worker_environment,
247 g_synchronous_thread_start_for_testing,
248 /*may_block_threshold=*/{});
249
250 if (utility_thread_group_) {
251 utility_thread_group_.get()->Start(
252 utility_threads, max_best_effort_tasks,
253 init_params.suggested_reclaim_time, service_thread_task_runner,
254 worker_thread_observer, worker_environment,
255 g_synchronous_thread_start_for_testing,
256 /*may_block_threshold=*/{});
257 }
258
259 if (background_thread_group_) {
260 background_thread_group_.get()->Start(
261 max_best_effort_tasks, max_best_effort_tasks,
262 init_params.suggested_reclaim_time, service_thread_task_runner,
263 worker_thread_observer, worker_environment,
264 g_synchronous_thread_start_for_testing,
265 /*may_block_threshold=*/{});
266 }
267
268 started_ = true;
269 }
270
WasStarted() const271 bool ThreadPoolImpl::WasStarted() const {
272 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
273 return started_;
274 }
275
WasStartedUnsafe() const276 bool ThreadPoolImpl::WasStartedUnsafe() const {
277 return TS_UNCHECKED_READ(started_);
278 }
279
PostDelayedTask(const Location & from_here,const TaskTraits & traits,OnceClosure task,TimeDelta delay)280 bool ThreadPoolImpl::PostDelayedTask(const Location& from_here,
281 const TaskTraits& traits,
282 OnceClosure task,
283 TimeDelta delay) {
284 // Post |task| as part of a one-off single-task Sequence.
285 return PostTaskWithSequence(
286 Task(from_here, std::move(task), TimeTicks::Now(), delay,
287 MessagePump::GetLeewayIgnoringThreadOverride()),
288 MakeRefCounted<Sequence>(traits, nullptr,
289 TaskSourceExecutionMode::kParallel));
290 }
291
CreateTaskRunner(const TaskTraits & traits)292 scoped_refptr<TaskRunner> ThreadPoolImpl::CreateTaskRunner(
293 const TaskTraits& traits) {
294 return MakeRefCounted<PooledParallelTaskRunner>(traits, this);
295 }
296
CreateSequencedTaskRunner(const TaskTraits & traits)297 scoped_refptr<SequencedTaskRunner> ThreadPoolImpl::CreateSequencedTaskRunner(
298 const TaskTraits& traits) {
299 return MakeRefCounted<PooledSequencedTaskRunner>(traits, this);
300 }
301
302 scoped_refptr<SingleThreadTaskRunner>
CreateSingleThreadTaskRunner(const TaskTraits & traits,SingleThreadTaskRunnerThreadMode thread_mode)303 ThreadPoolImpl::CreateSingleThreadTaskRunner(
304 const TaskTraits& traits,
305 SingleThreadTaskRunnerThreadMode thread_mode) {
306 return single_thread_task_runner_manager_.CreateSingleThreadTaskRunner(
307 traits, thread_mode);
308 }
309
310 #if BUILDFLAG(IS_WIN)
CreateCOMSTATaskRunner(const TaskTraits & traits,SingleThreadTaskRunnerThreadMode thread_mode)311 scoped_refptr<SingleThreadTaskRunner> ThreadPoolImpl::CreateCOMSTATaskRunner(
312 const TaskTraits& traits,
313 SingleThreadTaskRunnerThreadMode thread_mode) {
314 return single_thread_task_runner_manager_.CreateCOMSTATaskRunner(traits,
315 thread_mode);
316 }
317 #endif // BUILDFLAG(IS_WIN)
318
319 scoped_refptr<UpdateableSequencedTaskRunner>
CreateUpdateableSequencedTaskRunner(const TaskTraits & traits)320 ThreadPoolImpl::CreateUpdateableSequencedTaskRunner(const TaskTraits& traits) {
321 return MakeRefCounted<PooledSequencedTaskRunner>(traits, this);
322 }
323
NextScheduledRunTimeForTesting() const324 std::optional<TimeTicks> ThreadPoolImpl::NextScheduledRunTimeForTesting()
325 const {
326 if (task_tracker_->HasIncompleteTaskSourcesForTesting())
327 return TimeTicks::Now();
328 return delayed_task_manager_.NextScheduledRunTime();
329 }
330
ProcessRipeDelayedTasksForTesting()331 void ThreadPoolImpl::ProcessRipeDelayedTasksForTesting() {
332 delayed_task_manager_.ProcessRipeTasks();
333 }
334
335 // static
SetSynchronousThreadStartForTesting(bool enabled)336 void ThreadPoolImpl::SetSynchronousThreadStartForTesting(bool enabled) {
337 DCHECK(!ThreadPoolInstance::Get());
338 g_synchronous_thread_start_for_testing = enabled;
339 }
340
GetMaxConcurrentNonBlockedTasksWithTraitsDeprecated(const TaskTraits & traits) const341 size_t ThreadPoolImpl::GetMaxConcurrentNonBlockedTasksWithTraitsDeprecated(
342 const TaskTraits& traits) const {
343 // This method does not support getting the maximum number of BEST_EFFORT
344 // tasks that can run concurrently in a pool.
345 DCHECK_NE(traits.priority(), TaskPriority::BEST_EFFORT);
346 return GetThreadGroupForTraits(traits)
347 ->GetMaxConcurrentNonBlockedTasksDeprecated();
348 }
349
Shutdown()350 void ThreadPoolImpl::Shutdown() {
351 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
352
353 // Cancels an internal service thread task. This must be done before stopping
354 // the service thread.
355 delayed_task_manager_.Shutdown();
356
357 // Stop() the ServiceThread before triggering shutdown. This ensures that no
358 // more delayed tasks or file descriptor watches will trigger during shutdown
359 // (preventing http://crbug.com/698140). None of these asynchronous tasks
360 // being guaranteed to happen anyways, stopping right away is valid behavior
361 // and avoids the more complex alternative of shutting down the service thread
362 // atomically during TaskTracker shutdown.
363 service_thread_.Stop();
364
365 task_tracker_->StartShutdown();
366
367 // Allow all tasks to run. Done after initiating shutdown to ensure that non-
368 // BLOCK_SHUTDOWN tasks don't get a chance to run and that BLOCK_SHUTDOWN
369 // tasks run with a normal thread priority.
370 UpdateCanRunPolicy();
371
372 // Ensures that there are enough background worker to run BLOCK_SHUTDOWN
373 // tasks.
374 foreground_thread_group_->OnShutdownStarted();
375 if (utility_thread_group_)
376 utility_thread_group_->OnShutdownStarted();
377 if (background_thread_group_)
378 background_thread_group_->OnShutdownStarted();
379
380 task_tracker_->CompleteShutdown();
381 }
382
FlushForTesting()383 void ThreadPoolImpl::FlushForTesting() {
384 task_tracker_->FlushForTesting();
385 }
386
FlushAsyncForTesting(OnceClosure flush_callback)387 void ThreadPoolImpl::FlushAsyncForTesting(OnceClosure flush_callback) {
388 task_tracker_->FlushAsyncForTesting(std::move(flush_callback));
389 }
390
JoinForTesting()391 void ThreadPoolImpl::JoinForTesting() {
392 #if DCHECK_IS_ON()
393 DCHECK(!join_for_testing_returned_.IsSet());
394 #endif
395 // Cancels an internal service thread task. This must be done before stopping
396 // the service thread.
397 delayed_task_manager_.Shutdown();
398 // The service thread must be stopped before the workers are joined, otherwise
399 // tasks scheduled by the DelayedTaskManager might be posted between joining
400 // those workers and stopping the service thread which will cause a CHECK. See
401 // https://crbug.com/771701.
402 service_thread_.Stop();
403 single_thread_task_runner_manager_.JoinForTesting();
404 foreground_thread_group_->JoinForTesting();
405 if (utility_thread_group_)
406 utility_thread_group_->JoinForTesting(); // IN-TEST
407 if (background_thread_group_)
408 background_thread_group_->JoinForTesting();
409 #if DCHECK_IS_ON()
410 join_for_testing_returned_.Set();
411 #endif
412 }
413
BeginFence()414 void ThreadPoolImpl::BeginFence() {
415 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
416 ++num_fences_;
417 UpdateCanRunPolicy();
418 }
419
EndFence()420 void ThreadPoolImpl::EndFence() {
421 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
422 DCHECK_GT(num_fences_, 0);
423 --num_fences_;
424 UpdateCanRunPolicy();
425 }
426
BeginBestEffortFence()427 void ThreadPoolImpl::BeginBestEffortFence() {
428 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
429 ++num_best_effort_fences_;
430 UpdateCanRunPolicy();
431 }
432
EndBestEffortFence()433 void ThreadPoolImpl::EndBestEffortFence() {
434 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
435 DCHECK_GT(num_best_effort_fences_, 0);
436 --num_best_effort_fences_;
437 UpdateCanRunPolicy();
438 }
439
BeginFizzlingBlockShutdownTasks()440 void ThreadPoolImpl::BeginFizzlingBlockShutdownTasks() {
441 task_tracker_->BeginFizzlingBlockShutdownTasks();
442 }
443
EndFizzlingBlockShutdownTasks()444 void ThreadPoolImpl::EndFizzlingBlockShutdownTasks() {
445 task_tracker_->EndFizzlingBlockShutdownTasks();
446 }
447
PostTaskWithSequenceNow(Task task,scoped_refptr<Sequence> sequence)448 bool ThreadPoolImpl::PostTaskWithSequenceNow(Task task,
449 scoped_refptr<Sequence> sequence) {
450 auto transaction = sequence->BeginTransaction();
451 const bool sequence_should_be_queued = transaction.WillPushImmediateTask();
452 RegisteredTaskSource task_source;
453 if (sequence_should_be_queued) {
454 task_source = task_tracker_->RegisterTaskSource(sequence);
455 // We shouldn't push |task| if we're not allowed to queue |task_source|.
456 if (!task_source)
457 return false;
458 }
459 if (!task_tracker_->WillPostTaskNow(task, transaction.traits().priority()))
460 return false;
461 transaction.PushImmediateTask(std::move(task));
462 if (task_source) {
463 const TaskTraits traits = transaction.traits();
464 GetThreadGroupForTraits(traits)->PushTaskSourceAndWakeUpWorkers(
465 {std::move(task_source), std::move(transaction)});
466 }
467 return true;
468 }
469
PostTaskWithSequence(Task task,scoped_refptr<Sequence> sequence)470 bool ThreadPoolImpl::PostTaskWithSequence(Task task,
471 scoped_refptr<Sequence> sequence) {
472 // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
473 // for details.
474 CHECK(task.task);
475 DCHECK(sequence);
476
477 if (!task_tracker_->WillPostTask(&task, sequence->shutdown_behavior())) {
478 // `task`'s destructor may run sequence-affine code, so it must be leaked
479 // when `WillPostTask` returns false.
480 auto leak = std::make_unique<Task>(std::move(task));
481 ANNOTATE_LEAKING_OBJECT_PTR(leak.get());
482 leak.release();
483 return false;
484 }
485
486 if (task.delayed_run_time.is_null()) {
487 return PostTaskWithSequenceNow(std::move(task), std::move(sequence));
488 } else {
489 // It's safe to take a ref on this pointer since the caller must have a ref
490 // to the TaskRunner in order to post.
491 scoped_refptr<TaskRunner> task_runner = sequence->task_runner();
492 delayed_task_manager_.AddDelayedTask(
493 std::move(task),
494 BindOnce(
495 [](scoped_refptr<Sequence> sequence,
496 ThreadPoolImpl* thread_pool_impl, scoped_refptr<TaskRunner>,
497 Task task) {
498 thread_pool_impl->PostTaskWithSequenceNow(std::move(task),
499 std::move(sequence));
500 },
501 std::move(sequence), Unretained(this), std::move(task_runner)));
502 }
503
504 return true;
505 }
506
ShouldYield(const TaskSource * task_source)507 bool ThreadPoolImpl::ShouldYield(const TaskSource* task_source) {
508 const TaskPriority priority = task_source->priority_racy();
509 auto* const thread_group =
510 GetThreadGroupForTraits({priority, task_source->thread_policy()});
511 // A task whose priority changed and is now running in the wrong thread group
512 // should yield so it's rescheduled in the right one.
513 if (!thread_group->IsBoundToCurrentThread())
514 return true;
515 return GetThreadGroupForTraits({priority, task_source->thread_policy()})
516 ->ShouldYield(task_source->GetSortKey());
517 }
518
EnqueueJobTaskSource(scoped_refptr<JobTaskSource> task_source)519 bool ThreadPoolImpl::EnqueueJobTaskSource(
520 scoped_refptr<JobTaskSource> task_source) {
521 auto registered_task_source =
522 task_tracker_->RegisterTaskSource(std::move(task_source));
523 if (!registered_task_source)
524 return false;
525 task_tracker_->WillEnqueueJob(
526 static_cast<JobTaskSource*>(registered_task_source.get()));
527 auto transaction = registered_task_source->BeginTransaction();
528 const TaskTraits traits = transaction.traits();
529 GetThreadGroupForTraits(traits)->PushTaskSourceAndWakeUpWorkers(
530 {std::move(registered_task_source), std::move(transaction)});
531 return true;
532 }
533
RemoveJobTaskSource(scoped_refptr<JobTaskSource> task_source)534 void ThreadPoolImpl::RemoveJobTaskSource(
535 scoped_refptr<JobTaskSource> task_source) {
536 auto transaction = task_source->BeginTransaction();
537 ThreadGroup* const current_thread_group =
538 GetThreadGroupForTraits(transaction.traits());
539 current_thread_group->RemoveTaskSource(*task_source);
540 }
541
UpdatePriority(scoped_refptr<TaskSource> task_source,TaskPriority priority)542 void ThreadPoolImpl::UpdatePriority(scoped_refptr<TaskSource> task_source,
543 TaskPriority priority) {
544 auto transaction = task_source->BeginTransaction();
545
546 if (transaction.traits().priority() == priority)
547 return;
548
549 if (transaction.traits().priority() == TaskPriority::BEST_EFFORT) {
550 DCHECK(transaction.traits().thread_policy_set_explicitly())
551 << "A ThreadPolicy must be specified in the TaskTraits of an "
552 "UpdateableSequencedTaskRunner whose priority is increased from "
553 "BEST_EFFORT. See ThreadPolicy documentation.";
554 }
555
556 ThreadGroup* const current_thread_group =
557 GetThreadGroupForTraits(transaction.traits());
558 transaction.UpdatePriority(priority);
559 ThreadGroup* const new_thread_group =
560 GetThreadGroupForTraits(transaction.traits());
561
562 if (new_thread_group == current_thread_group) {
563 // |task_source|'s position needs to be updated within its current thread
564 // group.
565 current_thread_group->UpdateSortKey(std::move(transaction));
566 } else {
567 // |task_source| is changing thread groups; remove it from its current
568 // thread group and reenqueue it.
569 auto registered_task_source =
570 current_thread_group->RemoveTaskSource(*task_source);
571 if (registered_task_source) {
572 DCHECK(task_source);
573 new_thread_group->PushTaskSourceAndWakeUpWorkers(
574 {std::move(registered_task_source), std::move(transaction)});
575 }
576 }
577 }
578
UpdateJobPriority(scoped_refptr<TaskSource> task_source,TaskPriority priority)579 void ThreadPoolImpl::UpdateJobPriority(scoped_refptr<TaskSource> task_source,
580 TaskPriority priority) {
581 UpdatePriority(std::move(task_source), priority);
582 }
583
GetThreadGroupForTraits(const TaskTraits & traits) const584 const ThreadGroup* ThreadPoolImpl::GetThreadGroupForTraits(
585 const TaskTraits& traits) const {
586 return const_cast<ThreadPoolImpl*>(this)->GetThreadGroupForTraits(traits);
587 }
588
GetThreadGroupForTraits(const TaskTraits & traits)589 ThreadGroup* ThreadPoolImpl::GetThreadGroupForTraits(const TaskTraits& traits) {
590 if (traits.priority() == TaskPriority::BEST_EFFORT &&
591 traits.thread_policy() == ThreadPolicy::PREFER_BACKGROUND &&
592 background_thread_group_) {
593 return background_thread_group_.get();
594 }
595
596 if (traits.priority() <= TaskPriority::USER_VISIBLE &&
597 traits.thread_policy() == ThreadPolicy::PREFER_BACKGROUND &&
598 utility_thread_group_) {
599 return utility_thread_group_.get();
600 }
601
602 return foreground_thread_group_.get();
603 }
604
UpdateCanRunPolicy()605 void ThreadPoolImpl::UpdateCanRunPolicy() {
606 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
607
608 CanRunPolicy can_run_policy;
609 if ((num_fences_ == 0 && num_best_effort_fences_ == 0 &&
610 !has_disable_best_effort_switch_) ||
611 task_tracker_->HasShutdownStarted()) {
612 can_run_policy = CanRunPolicy::kAll;
613 } else if (num_fences_ != 0) {
614 can_run_policy = CanRunPolicy::kNone;
615 } else {
616 DCHECK(num_best_effort_fences_ > 0 || has_disable_best_effort_switch_);
617 can_run_policy = CanRunPolicy::kForegroundOnly;
618 }
619
620 task_tracker_->SetCanRunPolicy(can_run_policy);
621 foreground_thread_group_->DidUpdateCanRunPolicy();
622 if (utility_thread_group_)
623 utility_thread_group_->DidUpdateCanRunPolicy();
624 if (background_thread_group_)
625 background_thread_group_->DidUpdateCanRunPolicy();
626 single_thread_task_runner_manager_.DidUpdateCanRunPolicy();
627 }
628
629 } // namespace internal
630 } // namespace base
631