1 // Copyright 2017 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/thread_pool/thread_group.h"
6
7 #include <string_view>
8 #include <utility>
9
10 #include "base/check.h"
11 #include "base/feature_list.h"
12 #include "base/functional/bind.h"
13 #include "base/functional/callback_helpers.h"
14 #include "base/task/task_features.h"
15 #include "base/task/thread_pool/task_tracker.h"
16 #include "base/task/thread_pool/thread_group_worker_delegate.h"
17 #include "build/build_config.h"
18 #include "third_party/abseil-cpp/absl/base/attributes.h"
19
20 #if BUILDFLAG(IS_WIN)
21 #include "base/win/com_init_check_hook.h"
22 #include "base/win/scoped_winrt_initializer.h"
23 #endif
24
25 namespace base {
26 namespace internal {
27
28 namespace {
29
30 constexpr size_t kMaxNumberOfWorkers = 256;
31
32 // In a background thread group:
33 // - Blocking calls take more time than in a foreground thread group.
34 // - We want to minimize impact on foreground work, not maximize execution
35 // throughput.
36 // For these reasons, the timeout to increase the maximum number of concurrent
37 // tasks when there is a MAY_BLOCK ScopedBlockingCall is *long*. It is not
38 // infinite because execution throughput should not be reduced forever if a task
39 // blocks forever.
40 //
41 // TODO(fdoray): On platforms without background thread groups, blocking in a
42 // BEST_EFFORT task should:
43 // 1. Increment the maximum number of concurrent tasks after a *short* timeout,
44 // to allow scheduling of USER_VISIBLE/USER_BLOCKING tasks.
45 // 2. Increment the maximum number of concurrent BEST_EFFORT tasks after a
46 // *long* timeout, because we only want to allow more BEST_EFFORT tasks to be
47 // be scheduled concurrently when we believe that a BEST_EFFORT task is
48 // blocked forever.
49 // Currently, only 1. is true as the configuration is per thread group.
50 // TODO(https://crbug.com/927755): Fix racy condition when MayBlockThreshold ==
51 // BlockedWorkersPoll.
52 constexpr TimeDelta kForegroundMayBlockThreshold = Milliseconds(1000);
53 constexpr TimeDelta kForegroundBlockedWorkersPoll = Milliseconds(1200);
54 constexpr TimeDelta kBackgroundMayBlockThreshold = Seconds(10);
55 constexpr TimeDelta kBackgroundBlockedWorkersPoll = Seconds(12);
56
57 // ThreadGroup that owns the current thread, if any.
58 ABSL_CONST_INIT thread_local const ThreadGroup* current_thread_group = nullptr;
59
60 } // namespace
61
62 constexpr ThreadGroup::YieldSortKey ThreadGroup::kMaxYieldSortKey;
63
ScheduleReleaseTaskSource(RegisteredTaskSource task_source)64 void ThreadGroup::BaseScopedCommandsExecutor::ScheduleReleaseTaskSource(
65 RegisteredTaskSource task_source) {
66 task_sources_to_release_.push_back(std::move(task_source));
67 }
68
ScheduleAdjustMaxTasks()69 void ThreadGroup::BaseScopedCommandsExecutor::ScheduleAdjustMaxTasks() {
70 DCHECK(!must_schedule_adjust_max_tasks_);
71 must_schedule_adjust_max_tasks_ = true;
72 }
73
ScheduleStart(scoped_refptr<WorkerThread> worker)74 void ThreadGroup::BaseScopedCommandsExecutor::ScheduleStart(
75 scoped_refptr<WorkerThread> worker) {
76 workers_to_start_.emplace_back(std::move(worker));
77 }
78
BaseScopedCommandsExecutor(ThreadGroup * outer)79 ThreadGroup::BaseScopedCommandsExecutor::BaseScopedCommandsExecutor(
80 ThreadGroup* outer)
81 : outer_(outer) {}
82
~BaseScopedCommandsExecutor()83 ThreadGroup::BaseScopedCommandsExecutor::~BaseScopedCommandsExecutor() {
84 CheckedLock::AssertNoLockHeldOnCurrentThread();
85 Flush();
86 }
87
FlushWorkerCreation(CheckedLock * held_lock)88 void ThreadGroup::BaseScopedCommandsExecutor::FlushWorkerCreation(
89 CheckedLock* held_lock) {
90 // This function crucially only wakes up workers, rather than also signaling
91 // them, and therefore, does not call FlushImpl(). FlushImpl() requires not
92 // holding any locks on the calling thread, while a TaskSource Transaction
93 // lock can be held while calling this function.
94 CheckedAutoUnlock auto_unlock(*held_lock);
95 if (workers_to_start_.empty()) {
96 return;
97 }
98
99 Flush();
100 workers_to_start_.clear();
101 must_schedule_adjust_max_tasks_ = false;
102 }
103
Flush()104 void ThreadGroup::BaseScopedCommandsExecutor::Flush() {
105 // Start workers. Happens after wake ups (implemented by children and thus
106 // called on their destructor, i.e. before this) to prevent the case where a
107 // worker enters its main function, is descheduled because it wasn't woken up
108 // yet, and is woken up immediately after.
109 for (auto worker : workers_to_start_) {
110 worker->Start(outer_->after_start().service_thread_task_runner,
111 outer_->after_start().worker_thread_observer);
112 if (outer_->worker_started_for_testing_) {
113 outer_->worker_started_for_testing_->Wait();
114 }
115 }
116 workers_to_start_.clear();
117
118 if (must_schedule_adjust_max_tasks_) {
119 outer_->ScheduleAdjustMaxTasks();
120 }
121 }
122
123 ThreadGroup::ScopedReenqueueExecutor::ScopedReenqueueExecutor() = default;
124
~ScopedReenqueueExecutor()125 ThreadGroup::ScopedReenqueueExecutor::~ScopedReenqueueExecutor() {
126 if (destination_thread_group_) {
127 destination_thread_group_->PushTaskSourceAndWakeUpWorkers(
128 std::move(transaction_with_task_source_.value()));
129 }
130 }
131
132 void ThreadGroup::ScopedReenqueueExecutor::
SchedulePushTaskSourceAndWakeUpWorkers(RegisteredTaskSourceAndTransaction transaction_with_task_source,ThreadGroup * destination_thread_group)133 SchedulePushTaskSourceAndWakeUpWorkers(
134 RegisteredTaskSourceAndTransaction transaction_with_task_source,
135 ThreadGroup* destination_thread_group) {
136 DCHECK(destination_thread_group);
137 DCHECK(!destination_thread_group_);
138 DCHECK(!transaction_with_task_source_);
139 transaction_with_task_source_.emplace(
140 std::move(transaction_with_task_source));
141 destination_thread_group_ = destination_thread_group;
142 }
143
ThreadGroup(std::string_view histogram_label,std::string_view thread_group_label,ThreadType thread_type_hint,TrackedRef<TaskTracker> task_tracker,TrackedRef<Delegate> delegate)144 ThreadGroup::ThreadGroup(std::string_view histogram_label,
145 std::string_view thread_group_label,
146 ThreadType thread_type_hint,
147 TrackedRef<TaskTracker> task_tracker,
148 TrackedRef<Delegate> delegate)
149 : task_tracker_(std::move(task_tracker)),
150 delegate_(std::move(delegate)),
151 histogram_label_(histogram_label),
152 thread_group_label_(thread_group_label),
153 thread_type_hint_(thread_type_hint),
154 idle_workers_set_cv_for_testing_(lock_.CreateConditionVariable()) {
155 DCHECK(!thread_group_label_.empty());
156 }
157
StartImpl(size_t max_tasks,size_t max_best_effort_tasks,TimeDelta suggested_reclaim_time,scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner,WorkerThreadObserver * worker_thread_observer,WorkerEnvironment worker_environment,bool synchronous_thread_start_for_testing,std::optional<TimeDelta> may_block_threshold)158 void ThreadGroup::StartImpl(
159 size_t max_tasks,
160 size_t max_best_effort_tasks,
161 TimeDelta suggested_reclaim_time,
162 scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner,
163 WorkerThreadObserver* worker_thread_observer,
164 WorkerEnvironment worker_environment,
165 bool synchronous_thread_start_for_testing,
166 std::optional<TimeDelta> may_block_threshold) {
167 if (synchronous_thread_start_for_testing) {
168 worker_started_for_testing_.emplace(WaitableEvent::ResetPolicy::AUTOMATIC);
169 // Don't emit a ScopedBlockingCallWithBaseSyncPrimitives from this
170 // WaitableEvent or it defeats the purpose of having threads start without
171 // externally visible side-effects.
172 worker_started_for_testing_->declare_only_used_while_idle();
173 }
174
175 in_start().no_worker_reclaim = FeatureList::IsEnabled(kNoWorkerThreadReclaim);
176 in_start().may_block_threshold =
177 may_block_threshold ? may_block_threshold.value()
178 : (thread_type_hint_ != ThreadType::kBackground
179 ? kForegroundMayBlockThreshold
180 : kBackgroundMayBlockThreshold);
181 in_start().blocked_workers_poll_period =
182 thread_type_hint_ != ThreadType::kBackground
183 ? kForegroundBlockedWorkersPoll
184 : kBackgroundBlockedWorkersPoll;
185 in_start().max_num_workers_created = base::kMaxNumWorkersCreated.Get();
186
187 CheckedAutoLock auto_lock(lock_);
188
189 max_tasks_ = max_tasks;
190 DCHECK_GE(max_tasks_, 1U);
191 in_start().initial_max_tasks = std::min(max_tasks_, kMaxNumberOfWorkers);
192 max_best_effort_tasks_ = max_best_effort_tasks;
193 in_start().suggested_reclaim_time = suggested_reclaim_time;
194 in_start().worker_environment = worker_environment;
195 in_start().service_thread_task_runner = std::move(service_thread_task_runner);
196 in_start().worker_thread_observer = worker_thread_observer;
197
198 #if DCHECK_IS_ON()
199 in_start().initialized = true;
200 #endif
201 }
202
203 ThreadGroup::~ThreadGroup() = default;
204
BindToCurrentThread()205 void ThreadGroup::BindToCurrentThread() {
206 DCHECK(!CurrentThreadHasGroup());
207 current_thread_group = this;
208 }
209
UnbindFromCurrentThread()210 void ThreadGroup::UnbindFromCurrentThread() {
211 DCHECK(IsBoundToCurrentThread());
212 current_thread_group = nullptr;
213 }
214
IsBoundToCurrentThread() const215 bool ThreadGroup::IsBoundToCurrentThread() const {
216 return current_thread_group == this;
217 }
218
219 size_t
GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired() const220 ThreadGroup::GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired()
221 const {
222 // For simplicity, only 1 worker is assigned to each task source regardless of
223 // its max concurrency, with the exception of the top task source.
224 const size_t num_queued =
225 priority_queue_.GetNumTaskSourcesWithPriority(TaskPriority::BEST_EFFORT);
226 if (num_queued == 0 ||
227 !task_tracker_->CanRunPriority(TaskPriority::BEST_EFFORT)) {
228 return 0U;
229 }
230 if (priority_queue_.PeekSortKey().priority() == TaskPriority::BEST_EFFORT) {
231 // Assign the correct number of workers for the top TaskSource (-1 for the
232 // worker that is already accounted for in |num_queued|).
233 return std::max<size_t>(
234 1, num_queued +
235 priority_queue_.PeekTaskSource()->GetRemainingConcurrency() - 1);
236 }
237 return num_queued;
238 }
239
240 size_t
GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired() const241 ThreadGroup::GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired()
242 const {
243 // For simplicity, only 1 worker is assigned to each task source regardless of
244 // its max concurrency, with the exception of the top task source.
245 const size_t num_queued = priority_queue_.GetNumTaskSourcesWithPriority(
246 TaskPriority::USER_VISIBLE) +
247 priority_queue_.GetNumTaskSourcesWithPriority(
248 TaskPriority::USER_BLOCKING);
249 if (num_queued == 0 ||
250 !task_tracker_->CanRunPriority(TaskPriority::HIGHEST)) {
251 return 0U;
252 }
253 auto priority = priority_queue_.PeekSortKey().priority();
254 if (priority == TaskPriority::USER_VISIBLE ||
255 priority == TaskPriority::USER_BLOCKING) {
256 // Assign the correct number of workers for the top TaskSource (-1 for the
257 // worker that is already accounted for in |num_queued|).
258 return std::max<size_t>(
259 1, num_queued +
260 priority_queue_.PeekTaskSource()->GetRemainingConcurrency() - 1);
261 }
262 return num_queued;
263 }
264
RemoveTaskSource(const TaskSource & task_source)265 RegisteredTaskSource ThreadGroup::RemoveTaskSource(
266 const TaskSource& task_source) {
267 CheckedAutoLock auto_lock(lock_);
268 return priority_queue_.RemoveTaskSource(task_source);
269 }
270
ReEnqueueTaskSourceLockRequired(BaseScopedCommandsExecutor * workers_executor,ScopedReenqueueExecutor * reenqueue_executor,RegisteredTaskSourceAndTransaction transaction_with_task_source)271 void ThreadGroup::ReEnqueueTaskSourceLockRequired(
272 BaseScopedCommandsExecutor* workers_executor,
273 ScopedReenqueueExecutor* reenqueue_executor,
274 RegisteredTaskSourceAndTransaction transaction_with_task_source) {
275 // Decide in which thread group the TaskSource should be reenqueued.
276 ThreadGroup* destination_thread_group = delegate_->GetThreadGroupForTraits(
277 transaction_with_task_source.transaction.traits());
278
279 bool push_to_immediate_queue =
280 transaction_with_task_source.task_source.WillReEnqueue(
281 TimeTicks::Now(), &transaction_with_task_source.transaction);
282
283 if (destination_thread_group == this) {
284 // Another worker that was running a task from this task source may have
285 // reenqueued it already, in which case its heap_handle will be valid. It
286 // shouldn't be queued twice so the task source registration is released.
287 if (transaction_with_task_source.task_source->immediate_heap_handle()
288 .IsValid()) {
289 workers_executor->ScheduleReleaseTaskSource(
290 std::move(transaction_with_task_source.task_source));
291 } else {
292 // If the TaskSource should be reenqueued in the current thread group,
293 // reenqueue it inside the scope of the lock.
294 if (push_to_immediate_queue) {
295 auto sort_key = transaction_with_task_source.task_source->GetSortKey();
296 // When moving |task_source| into |priority_queue_|, it may be destroyed
297 // on another thread as soon as |lock_| is released, since we're no
298 // longer holding a reference to it. To prevent UAF, release
299 // |transaction| before moving |task_source|. Ref. crbug.com/1412008
300 transaction_with_task_source.transaction.Release();
301 priority_queue_.Push(
302 std::move(transaction_with_task_source.task_source), sort_key);
303 }
304 }
305 // This is called unconditionally to ensure there are always workers to run
306 // task sources in the queue. Some ThreadGroup implementations only invoke
307 // TakeRegisteredTaskSource() once per wake up and hence this is required to
308 // avoid races that could leave a task source stranded in the queue with no
309 // active workers.
310 EnsureEnoughWorkersLockRequired(workers_executor);
311 } else {
312 // Otherwise, schedule a reenqueue after releasing the lock.
313 reenqueue_executor->SchedulePushTaskSourceAndWakeUpWorkers(
314 std::move(transaction_with_task_source), destination_thread_group);
315 }
316 }
317
TakeRegisteredTaskSource(BaseScopedCommandsExecutor * executor)318 RegisteredTaskSource ThreadGroup::TakeRegisteredTaskSource(
319 BaseScopedCommandsExecutor* executor) {
320 DCHECK(!priority_queue_.IsEmpty());
321
322 auto run_status = priority_queue_.PeekTaskSource().WillRunTask();
323
324 if (run_status == TaskSource::RunStatus::kDisallowed) {
325 executor->ScheduleReleaseTaskSource(priority_queue_.PopTaskSource());
326 return nullptr;
327 }
328
329 if (run_status == TaskSource::RunStatus::kAllowedSaturated) {
330 return priority_queue_.PopTaskSource();
331 }
332
333 // If the TaskSource isn't saturated, check whether TaskTracker allows it to
334 // remain in the PriorityQueue.
335 // The canonical way of doing this is to pop the task source to return, call
336 // RegisterTaskSource() to get an additional RegisteredTaskSource, and
337 // reenqueue that task source if valid. Instead, it is cheaper and equivalent
338 // to peek the task source, call RegisterTaskSource() to get an additional
339 // RegisteredTaskSource to replace if valid, and only pop |priority_queue_|
340 // otherwise.
341 RegisteredTaskSource task_source =
342 task_tracker_->RegisterTaskSource(priority_queue_.PeekTaskSource().get());
343 if (!task_source) {
344 return priority_queue_.PopTaskSource();
345 }
346 // Replace the top task_source and then update the queue.
347 std::swap(priority_queue_.PeekTaskSource(), task_source);
348 priority_queue_.UpdateSortKey(*task_source.get(), task_source->GetSortKey());
349 return task_source;
350 }
351
UpdateSortKeyImpl(BaseScopedCommandsExecutor * executor,TaskSource::Transaction transaction)352 void ThreadGroup::UpdateSortKeyImpl(BaseScopedCommandsExecutor* executor,
353 TaskSource::Transaction transaction) {
354 CheckedAutoLock auto_lock(lock_);
355 priority_queue_.UpdateSortKey(*transaction.task_source(),
356 transaction.task_source()->GetSortKey());
357 EnsureEnoughWorkersLockRequired(executor);
358 }
359
PushTaskSourceAndWakeUpWorkersImpl(BaseScopedCommandsExecutor * executor,RegisteredTaskSourceAndTransaction transaction_with_task_source)360 void ThreadGroup::PushTaskSourceAndWakeUpWorkersImpl(
361 BaseScopedCommandsExecutor* executor,
362 RegisteredTaskSourceAndTransaction transaction_with_task_source) {
363 DCHECK_EQ(delegate_->GetThreadGroupForTraits(
364 transaction_with_task_source.transaction.traits()),
365 this);
366 CheckedAutoLock lock(lock_);
367 if (transaction_with_task_source.task_source->immediate_heap_handle()
368 .IsValid()) {
369 // If the task source changed group, it is possible that multiple concurrent
370 // workers try to enqueue it. Only the first enqueue should succeed.
371 executor->ScheduleReleaseTaskSource(
372 std::move(transaction_with_task_source.task_source));
373 return;
374 }
375 auto sort_key = transaction_with_task_source.task_source->GetSortKey();
376 // When moving |task_source| into |priority_queue_|, it may be destroyed
377 // on another thread as soon as |lock_| is released, since we're no longer
378 // holding a reference to it. To prevent UAF, release |transaction| before
379 // moving |task_source|. Ref. crbug.com/1412008
380 transaction_with_task_source.transaction.Release();
381 priority_queue_.Push(std::move(transaction_with_task_source.task_source),
382 sort_key);
383 EnsureEnoughWorkersLockRequired(executor);
384 }
385
EnqueueAllTaskSources(PriorityQueue * new_priority_queue)386 void ThreadGroup::EnqueueAllTaskSources(PriorityQueue* new_priority_queue) {
387 std::unique_ptr<BaseScopedCommandsExecutor> executor = GetExecutor();
388 CheckedAutoLock lock(lock_);
389 while (!new_priority_queue->IsEmpty()) {
390 TaskSourceSortKey top_sort_key = new_priority_queue->PeekSortKey();
391 RegisteredTaskSource task_source = new_priority_queue->PopTaskSource();
392 priority_queue_.Push(std::move(task_source), top_sort_key);
393 }
394 }
395
HandoffAllTaskSourcesToOtherThreadGroup(ThreadGroup * destination_thread_group)396 void ThreadGroup::HandoffAllTaskSourcesToOtherThreadGroup(
397 ThreadGroup* destination_thread_group) {
398 PriorityQueue new_priority_queue;
399 TaskSourceSortKey top_sort_key;
400 {
401 CheckedAutoLock current_thread_group_lock(lock_);
402 new_priority_queue.swap(priority_queue_);
403 }
404 destination_thread_group->EnqueueAllTaskSources(&new_priority_queue);
405 }
406
HandoffNonUserBlockingTaskSourcesToOtherThreadGroup(ThreadGroup * destination_thread_group)407 void ThreadGroup::HandoffNonUserBlockingTaskSourcesToOtherThreadGroup(
408 ThreadGroup* destination_thread_group) {
409 PriorityQueue new_priority_queue;
410 TaskSourceSortKey top_sort_key;
411 {
412 // This works because all USER_BLOCKING tasks are at the front of the queue.
413 CheckedAutoLock current_thread_group_lock(lock_);
414 while (!priority_queue_.IsEmpty() &&
415 (top_sort_key = priority_queue_.PeekSortKey()).priority() ==
416 TaskPriority::USER_BLOCKING) {
417 new_priority_queue.Push(priority_queue_.PopTaskSource(), top_sort_key);
418 }
419 new_priority_queue.swap(priority_queue_);
420 }
421 destination_thread_group->EnqueueAllTaskSources(&new_priority_queue);
422 }
423
ShouldYield(TaskSourceSortKey sort_key)424 bool ThreadGroup::ShouldYield(TaskSourceSortKey sort_key) {
425 DCHECK(TS_UNCHECKED_READ(max_allowed_sort_key_).is_lock_free());
426
427 if (!task_tracker_->CanRunPriority(sort_key.priority()))
428 return true;
429 // It is safe to read |max_allowed_sort_key_| without a lock since this
430 // variable is atomic, keeping in mind that threads may not immediately see
431 // the new value when it is updated.
432 auto max_allowed_sort_key =
433 TS_UNCHECKED_READ(max_allowed_sort_key_).load(std::memory_order_relaxed);
434
435 // To reduce unnecessary yielding, a task will never yield to a BEST_EFFORT
436 // task regardless of its worker_count.
437 if (sort_key.priority() > max_allowed_sort_key.priority ||
438 max_allowed_sort_key.priority == TaskPriority::BEST_EFFORT) {
439 return false;
440 }
441 // Otherwise, a task only yields to a task of equal priority if its
442 // worker_count would be greater still after yielding, e.g. a job with 1
443 // worker doesn't yield to a job with 0 workers.
444 if (sort_key.priority() == max_allowed_sort_key.priority &&
445 sort_key.worker_count() <= max_allowed_sort_key.worker_count + 1) {
446 return false;
447 }
448
449 // Reset |max_allowed_sort_key_| so that only one thread should yield at a
450 // time for a given task.
451 max_allowed_sort_key =
452 TS_UNCHECKED_READ(max_allowed_sort_key_)
453 .exchange(kMaxYieldSortKey, std::memory_order_relaxed);
454 // Another thread might have decided to yield and racily reset
455 // |max_allowed_sort_key_|, in which case this thread doesn't yield.
456 return max_allowed_sort_key.priority != TaskPriority::BEST_EFFORT;
457 }
458
459 #if BUILDFLAG(IS_WIN)
460 // static
461 std::unique_ptr<win::ScopedWindowsThreadEnvironment>
GetScopedWindowsThreadEnvironment(WorkerEnvironment environment)462 ThreadGroup::GetScopedWindowsThreadEnvironment(WorkerEnvironment environment) {
463 std::unique_ptr<win::ScopedWindowsThreadEnvironment> scoped_environment;
464 if (environment == WorkerEnvironment::COM_MTA) {
465 scoped_environment = std::make_unique<win::ScopedWinrtInitializer>();
466
467 // TODO(crbug.com/1498668): rollback the change or replace it with a CHECK
468 // before closing the bug.
469 DUMP_WILL_BE_CHECK(scoped_environment->Succeeded());
470 }
471
472 DCHECK(!scoped_environment || scoped_environment->Succeeded());
473 return scoped_environment;
474 }
475 #endif
476
477 // static
CurrentThreadHasGroup()478 bool ThreadGroup::CurrentThreadHasGroup() {
479 return current_thread_group != nullptr;
480 }
481
GetMaxTasksForTesting() const482 size_t ThreadGroup::GetMaxTasksForTesting() const {
483 CheckedAutoLock auto_lock(lock_);
484 return max_tasks_;
485 }
486
GetMaxBestEffortTasksForTesting() const487 size_t ThreadGroup::GetMaxBestEffortTasksForTesting() const {
488 CheckedAutoLock auto_lock(lock_);
489 return max_best_effort_tasks_;
490 }
491
WaitForWorkersIdleLockRequiredForTesting(size_t n)492 void ThreadGroup::WaitForWorkersIdleLockRequiredForTesting(size_t n) {
493 // Make sure workers do not cleanup while watching the idle count.
494 AutoReset<bool> ban_cleanups(&worker_cleanup_disallowed_for_testing_, true);
495
496 while (NumberOfIdleWorkersLockRequiredForTesting() < n) {
497 idle_workers_set_cv_for_testing_.Wait();
498 }
499 }
500
WaitForWorkersIdleForTesting(size_t n)501 void ThreadGroup::WaitForWorkersIdleForTesting(size_t n) {
502 CheckedAutoLock auto_lock(lock_);
503
504 #if DCHECK_IS_ON()
505 DCHECK(!some_workers_cleaned_up_for_testing_)
506 << "Workers detached prior to waiting for a specific number of idle "
507 "workers. Doing the wait under such conditions is flaky. Consider "
508 "setting the suggested reclaim time to TimeDelta::Max() in Start().";
509 #endif
510
511 WaitForWorkersIdleLockRequiredForTesting(n);
512 }
513
WaitForAllWorkersIdleForTesting()514 void ThreadGroup::WaitForAllWorkersIdleForTesting() {
515 CheckedAutoLock auto_lock(lock_);
516 WaitForWorkersIdleLockRequiredForTesting(workers_.size());
517 }
518
WaitForWorkersCleanedUpForTesting(size_t n)519 void ThreadGroup::WaitForWorkersCleanedUpForTesting(size_t n) {
520 CheckedAutoLock auto_lock(lock_);
521
522 if (!num_workers_cleaned_up_for_testing_cv_) {
523 lock_.CreateConditionVariableAndEmplace(
524 num_workers_cleaned_up_for_testing_cv_);
525 }
526
527 while (num_workers_cleaned_up_for_testing_ < n) {
528 num_workers_cleaned_up_for_testing_cv_->Wait();
529 }
530
531 num_workers_cleaned_up_for_testing_ = 0;
532 }
533
GetMaxConcurrentNonBlockedTasksDeprecated() const534 size_t ThreadGroup::GetMaxConcurrentNonBlockedTasksDeprecated() const {
535 #if DCHECK_IS_ON()
536 CheckedAutoLock auto_lock(lock_);
537 DCHECK_NE(after_start().initial_max_tasks, 0U)
538 << "GetMaxConcurrentTasksDeprecated() should only be called after the "
539 << "thread group has started.";
540 #endif
541 return after_start().initial_max_tasks;
542 }
543
NumberOfWorkersForTesting() const544 size_t ThreadGroup::NumberOfWorkersForTesting() const {
545 CheckedAutoLock auto_lock(lock_);
546 return workers_.size();
547 }
548
NumberOfIdleWorkersForTesting() const549 size_t ThreadGroup::NumberOfIdleWorkersForTesting() const {
550 CheckedAutoLock auto_lock(lock_);
551 return NumberOfIdleWorkersLockRequiredForTesting();
552 }
553
GetDesiredNumAwakeWorkersLockRequired() const554 size_t ThreadGroup::GetDesiredNumAwakeWorkersLockRequired() const {
555 // Number of BEST_EFFORT task sources that are running or queued and allowed
556 // to run by the CanRunPolicy.
557 const size_t num_running_or_queued_can_run_best_effort_task_sources =
558 num_running_best_effort_tasks_ +
559 GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired();
560
561 const size_t workers_for_best_effort_task_sources =
562 std::max(std::min(num_running_or_queued_can_run_best_effort_task_sources,
563 max_best_effort_tasks_),
564 num_running_best_effort_tasks_);
565
566 // Number of USER_{VISIBLE|BLOCKING} task sources that are running or queued.
567 const size_t num_running_or_queued_foreground_task_sources =
568 (num_running_tasks_ - num_running_best_effort_tasks_) +
569 GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired();
570
571 const size_t workers_for_foreground_task_sources =
572 num_running_or_queued_foreground_task_sources;
573
574 return std::min({workers_for_best_effort_task_sources +
575 workers_for_foreground_task_sources,
576 max_tasks_, kMaxNumberOfWorkers});
577 }
578
MaybeScheduleAdjustMaxTasksLockRequired(BaseScopedCommandsExecutor * executor)579 void ThreadGroup::MaybeScheduleAdjustMaxTasksLockRequired(
580 BaseScopedCommandsExecutor* executor) {
581 if (!adjust_max_tasks_posted_ &&
582 ShouldPeriodicallyAdjustMaxTasksLockRequired()) {
583 executor->ScheduleAdjustMaxTasks();
584 adjust_max_tasks_posted_ = true;
585 }
586 }
587
ScheduleAdjustMaxTasks()588 void ThreadGroup::ScheduleAdjustMaxTasks() {
589 // |adjust_max_tasks_posted_| can't change before the task posted below runs.
590 // Skip check on NaCl to avoid unsafe reference acquisition warning.
591 #if !BUILDFLAG(IS_NACL)
592 DCHECK(TS_UNCHECKED_READ(adjust_max_tasks_posted_));
593 #endif
594
595 after_start().service_thread_task_runner->PostDelayedTask(
596 FROM_HERE, BindOnce(&ThreadGroup::AdjustMaxTasks, Unretained(this)),
597 after_start().blocked_workers_poll_period);
598 }
599
AdjustMaxTasks()600 void ThreadGroup::AdjustMaxTasks() {
601 DCHECK(
602 after_start().service_thread_task_runner->RunsTasksInCurrentSequence());
603
604 std::unique_ptr<BaseScopedCommandsExecutor> executor = GetExecutor();
605 CheckedAutoLock auto_lock(lock_);
606 DCHECK(adjust_max_tasks_posted_);
607 adjust_max_tasks_posted_ = false;
608
609 // Increment max tasks for each worker that has been within a MAY_BLOCK
610 // ScopedBlockingCall for more than may_block_threshold.
611 for (scoped_refptr<WorkerThread> worker : workers_) {
612 // The delegates of workers inside a ThreadGroup should be
613 // WaitableEventWorkerDelegates.
614 ThreadGroupWorkerDelegate* delegate = GetWorkerDelegate(worker.get());
615 AnnotateAcquiredLockAlias annotate(lock_, delegate->lock());
616 delegate->MaybeIncrementMaxTasksLockRequired();
617 }
618
619 // Wake up workers according to the updated |max_tasks_|. This will also
620 // reschedule AdjustMaxTasks() if necessary.
621 EnsureEnoughWorkersLockRequired(executor.get());
622 }
623
OnShutDownStartedImpl(BaseScopedCommandsExecutor * executor)624 void ThreadGroup::OnShutDownStartedImpl(BaseScopedCommandsExecutor* executor) {
625 CheckedAutoLock auto_lock(lock_);
626
627 // Don't do anything if the thread group isn't started.
628 if (max_tasks_ == 0 || UNLIKELY(join_for_testing_started_)) {
629 return;
630 }
631
632 // Start a MAY_BLOCK scope on each worker that is already running a task.
633 for (scoped_refptr<WorkerThread>& worker : workers_) {
634 // The delegates of workers inside a ThreadGroup should be
635 // WorkerThreadDelegateImpls.
636 ThreadGroupWorkerDelegate* delegate = GetWorkerDelegate(worker.get());
637 AnnotateAcquiredLockAlias annotate(lock_, delegate->lock());
638 delegate->OnShutdownStartedLockRequired(executor);
639 }
640 EnsureEnoughWorkersLockRequired(executor);
641
642 shutdown_started_ = true;
643 }
644
ShouldPeriodicallyAdjustMaxTasksLockRequired()645 bool ThreadGroup::ShouldPeriodicallyAdjustMaxTasksLockRequired() {
646 // AdjustMaxTasks() should be scheduled to periodically adjust |max_tasks_|
647 // and |max_best_effort_tasks_| when (1) the concurrency limits are not large
648 // enough to accommodate all queued and running task sources and an idle
649 // worker and (2) there are unresolved MAY_BLOCK ScopedBlockingCalls.
650 // - When (1) is false: No worker would be created or woken up if the
651 // concurrency limits were increased, so there is no hurry to increase them.
652 // - When (2) is false: The concurrency limits could not be increased by
653 // AdjustMaxTasks().
654
655 const size_t num_running_or_queued_best_effort_task_sources =
656 num_running_best_effort_tasks_ +
657 GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired();
658 if (num_running_or_queued_best_effort_task_sources > max_best_effort_tasks_ &&
659 num_unresolved_best_effort_may_block_ > 0) {
660 return true;
661 }
662
663 const size_t num_running_or_queued_task_sources =
664 num_running_tasks_ +
665 GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired() +
666 GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired();
667 constexpr size_t kIdleWorker = 1;
668 return num_running_or_queued_task_sources + kIdleWorker > max_tasks_ &&
669 num_unresolved_may_block_ > 0;
670 }
671
UpdateMinAllowedPriorityLockRequired()672 void ThreadGroup::UpdateMinAllowedPriorityLockRequired() {
673 if (priority_queue_.IsEmpty() || num_running_tasks_ < max_tasks_) {
674 max_allowed_sort_key_.store(kMaxYieldSortKey, std::memory_order_relaxed);
675 } else {
676 max_allowed_sort_key_.store({priority_queue_.PeekSortKey().priority(),
677 priority_queue_.PeekSortKey().worker_count()},
678 std::memory_order_relaxed);
679 }
680 }
681
DecrementTasksRunningLockRequired(TaskPriority priority)682 void ThreadGroup::DecrementTasksRunningLockRequired(TaskPriority priority) {
683 DCHECK_GT(num_running_tasks_, 0U);
684 --num_running_tasks_;
685 if (priority == TaskPriority::BEST_EFFORT) {
686 DCHECK_GT(num_running_best_effort_tasks_, 0U);
687 --num_running_best_effort_tasks_;
688 }
689 UpdateMinAllowedPriorityLockRequired();
690 }
691
IncrementTasksRunningLockRequired(TaskPriority priority)692 void ThreadGroup::IncrementTasksRunningLockRequired(TaskPriority priority) {
693 ++num_running_tasks_;
694 DCHECK_LE(num_running_tasks_, max_tasks_);
695 DCHECK_LE(num_running_tasks_, kMaxNumberOfWorkers);
696 if (priority == TaskPriority::BEST_EFFORT) {
697 ++num_running_best_effort_tasks_;
698 DCHECK_LE(num_running_best_effort_tasks_, num_running_tasks_);
699 DCHECK_LE(num_running_best_effort_tasks_, max_best_effort_tasks_);
700 }
701 UpdateMinAllowedPriorityLockRequired();
702 }
703
DecrementMaxTasksLockRequired()704 void ThreadGroup::DecrementMaxTasksLockRequired() {
705 DCHECK_GT(num_running_tasks_, 0U);
706 DCHECK_GT(max_tasks_, 0U);
707 --max_tasks_;
708 UpdateMinAllowedPriorityLockRequired();
709 }
710
IncrementMaxTasksLockRequired()711 void ThreadGroup::IncrementMaxTasksLockRequired() {
712 DCHECK_GT(num_running_tasks_, 0U);
713 ++max_tasks_;
714 UpdateMinAllowedPriorityLockRequired();
715 }
716
DecrementMaxBestEffortTasksLockRequired()717 void ThreadGroup::DecrementMaxBestEffortTasksLockRequired() {
718 DCHECK_GT(num_running_tasks_, 0U);
719 DCHECK_GT(max_best_effort_tasks_, 0U);
720 --max_best_effort_tasks_;
721 UpdateMinAllowedPriorityLockRequired();
722 }
723
IncrementMaxBestEffortTasksLockRequired()724 void ThreadGroup::IncrementMaxBestEffortTasksLockRequired() {
725 DCHECK_GT(num_running_tasks_, 0U);
726 ++max_best_effort_tasks_;
727 UpdateMinAllowedPriorityLockRequired();
728 }
729
730 ThreadGroup::InitializedInStart::InitializedInStart() = default;
731 ThreadGroup::InitializedInStart::~InitializedInStart() = default;
732
733 } // namespace internal
734 } // namespace base
735