xref: /aosp_15_r20/external/cronet/base/task/thread_pool/thread_group.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2017 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/thread_group.h"
6 
7 #include <string_view>
8 #include <utility>
9 
10 #include "base/check.h"
11 #include "base/feature_list.h"
12 #include "base/functional/bind.h"
13 #include "base/functional/callback_helpers.h"
14 #include "base/task/task_features.h"
15 #include "base/task/thread_pool/task_tracker.h"
16 #include "base/task/thread_pool/thread_group_worker_delegate.h"
17 #include "build/build_config.h"
18 #include "third_party/abseil-cpp/absl/base/attributes.h"
19 
20 #if BUILDFLAG(IS_WIN)
21 #include "base/win/com_init_check_hook.h"
22 #include "base/win/scoped_winrt_initializer.h"
23 #endif
24 
25 namespace base {
26 namespace internal {
27 
28 namespace {
29 
30 constexpr size_t kMaxNumberOfWorkers = 256;
31 
32 // In a background thread group:
33 // - Blocking calls take more time than in a foreground thread group.
34 // - We want to minimize impact on foreground work, not maximize execution
35 //   throughput.
36 // For these reasons, the timeout to increase the maximum number of concurrent
37 // tasks when there is a MAY_BLOCK ScopedBlockingCall is *long*. It is not
38 // infinite because execution throughput should not be reduced forever if a task
39 // blocks forever.
40 //
41 // TODO(fdoray): On platforms without background thread groups, blocking in a
42 // BEST_EFFORT task should:
43 // 1. Increment the maximum number of concurrent tasks after a *short* timeout,
44 //    to allow scheduling of USER_VISIBLE/USER_BLOCKING tasks.
45 // 2. Increment the maximum number of concurrent BEST_EFFORT tasks after a
46 //    *long* timeout, because we only want to allow more BEST_EFFORT tasks to be
47 //    be scheduled concurrently when we believe that a BEST_EFFORT task is
48 //    blocked forever.
49 // Currently, only 1. is true as the configuration is per thread group.
50 // TODO(https://crbug.com/927755): Fix racy condition when MayBlockThreshold ==
51 // BlockedWorkersPoll.
52 constexpr TimeDelta kForegroundMayBlockThreshold = Milliseconds(1000);
53 constexpr TimeDelta kForegroundBlockedWorkersPoll = Milliseconds(1200);
54 constexpr TimeDelta kBackgroundMayBlockThreshold = Seconds(10);
55 constexpr TimeDelta kBackgroundBlockedWorkersPoll = Seconds(12);
56 
57 // ThreadGroup that owns the current thread, if any.
58 ABSL_CONST_INIT thread_local const ThreadGroup* current_thread_group = nullptr;
59 
60 }  // namespace
61 
62 constexpr ThreadGroup::YieldSortKey ThreadGroup::kMaxYieldSortKey;
63 
ScheduleReleaseTaskSource(RegisteredTaskSource task_source)64 void ThreadGroup::BaseScopedCommandsExecutor::ScheduleReleaseTaskSource(
65     RegisteredTaskSource task_source) {
66   task_sources_to_release_.push_back(std::move(task_source));
67 }
68 
ScheduleAdjustMaxTasks()69 void ThreadGroup::BaseScopedCommandsExecutor::ScheduleAdjustMaxTasks() {
70   DCHECK(!must_schedule_adjust_max_tasks_);
71   must_schedule_adjust_max_tasks_ = true;
72 }
73 
ScheduleStart(scoped_refptr<WorkerThread> worker)74 void ThreadGroup::BaseScopedCommandsExecutor::ScheduleStart(
75     scoped_refptr<WorkerThread> worker) {
76   workers_to_start_.emplace_back(std::move(worker));
77 }
78 
BaseScopedCommandsExecutor(ThreadGroup * outer)79 ThreadGroup::BaseScopedCommandsExecutor::BaseScopedCommandsExecutor(
80     ThreadGroup* outer)
81     : outer_(outer) {}
82 
~BaseScopedCommandsExecutor()83 ThreadGroup::BaseScopedCommandsExecutor::~BaseScopedCommandsExecutor() {
84   CheckedLock::AssertNoLockHeldOnCurrentThread();
85   Flush();
86 }
87 
FlushWorkerCreation(CheckedLock * held_lock)88 void ThreadGroup::BaseScopedCommandsExecutor::FlushWorkerCreation(
89     CheckedLock* held_lock) {
90   // This function crucially only wakes up workers, rather than also signaling
91   // them, and therefore, does not call FlushImpl(). FlushImpl() requires not
92   // holding any locks on the calling thread, while a TaskSource Transaction
93   // lock can be held while calling this function.
94   CheckedAutoUnlock auto_unlock(*held_lock);
95   if (workers_to_start_.empty()) {
96     return;
97   }
98 
99   Flush();
100   workers_to_start_.clear();
101   must_schedule_adjust_max_tasks_ = false;
102 }
103 
Flush()104 void ThreadGroup::BaseScopedCommandsExecutor::Flush() {
105   // Start workers. Happens after wake ups (implemented by children and thus
106   // called on their destructor, i.e. before this) to prevent the case where a
107   // worker enters its main function, is descheduled because it wasn't woken up
108   // yet, and is woken up immediately after.
109   for (auto worker : workers_to_start_) {
110     worker->Start(outer_->after_start().service_thread_task_runner,
111                   outer_->after_start().worker_thread_observer);
112     if (outer_->worker_started_for_testing_) {
113       outer_->worker_started_for_testing_->Wait();
114     }
115   }
116   workers_to_start_.clear();
117 
118   if (must_schedule_adjust_max_tasks_) {
119     outer_->ScheduleAdjustMaxTasks();
120   }
121 }
122 
123 ThreadGroup::ScopedReenqueueExecutor::ScopedReenqueueExecutor() = default;
124 
~ScopedReenqueueExecutor()125 ThreadGroup::ScopedReenqueueExecutor::~ScopedReenqueueExecutor() {
126   if (destination_thread_group_) {
127     destination_thread_group_->PushTaskSourceAndWakeUpWorkers(
128         std::move(transaction_with_task_source_.value()));
129   }
130 }
131 
132 void ThreadGroup::ScopedReenqueueExecutor::
SchedulePushTaskSourceAndWakeUpWorkers(RegisteredTaskSourceAndTransaction transaction_with_task_source,ThreadGroup * destination_thread_group)133     SchedulePushTaskSourceAndWakeUpWorkers(
134         RegisteredTaskSourceAndTransaction transaction_with_task_source,
135         ThreadGroup* destination_thread_group) {
136   DCHECK(destination_thread_group);
137   DCHECK(!destination_thread_group_);
138   DCHECK(!transaction_with_task_source_);
139   transaction_with_task_source_.emplace(
140       std::move(transaction_with_task_source));
141   destination_thread_group_ = destination_thread_group;
142 }
143 
ThreadGroup(std::string_view histogram_label,std::string_view thread_group_label,ThreadType thread_type_hint,TrackedRef<TaskTracker> task_tracker,TrackedRef<Delegate> delegate)144 ThreadGroup::ThreadGroup(std::string_view histogram_label,
145                          std::string_view thread_group_label,
146                          ThreadType thread_type_hint,
147                          TrackedRef<TaskTracker> task_tracker,
148                          TrackedRef<Delegate> delegate)
149     : task_tracker_(std::move(task_tracker)),
150       delegate_(std::move(delegate)),
151       histogram_label_(histogram_label),
152       thread_group_label_(thread_group_label),
153       thread_type_hint_(thread_type_hint),
154       idle_workers_set_cv_for_testing_(lock_.CreateConditionVariable()) {
155   DCHECK(!thread_group_label_.empty());
156 }
157 
StartImpl(size_t max_tasks,size_t max_best_effort_tasks,TimeDelta suggested_reclaim_time,scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner,WorkerThreadObserver * worker_thread_observer,WorkerEnvironment worker_environment,bool synchronous_thread_start_for_testing,std::optional<TimeDelta> may_block_threshold)158 void ThreadGroup::StartImpl(
159     size_t max_tasks,
160     size_t max_best_effort_tasks,
161     TimeDelta suggested_reclaim_time,
162     scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner,
163     WorkerThreadObserver* worker_thread_observer,
164     WorkerEnvironment worker_environment,
165     bool synchronous_thread_start_for_testing,
166     std::optional<TimeDelta> may_block_threshold) {
167   if (synchronous_thread_start_for_testing) {
168     worker_started_for_testing_.emplace(WaitableEvent::ResetPolicy::AUTOMATIC);
169     // Don't emit a ScopedBlockingCallWithBaseSyncPrimitives from this
170     // WaitableEvent or it defeats the purpose of having threads start without
171     // externally visible side-effects.
172     worker_started_for_testing_->declare_only_used_while_idle();
173   }
174 
175   in_start().no_worker_reclaim = FeatureList::IsEnabled(kNoWorkerThreadReclaim);
176   in_start().may_block_threshold =
177       may_block_threshold ? may_block_threshold.value()
178                           : (thread_type_hint_ != ThreadType::kBackground
179                                  ? kForegroundMayBlockThreshold
180                                  : kBackgroundMayBlockThreshold);
181   in_start().blocked_workers_poll_period =
182       thread_type_hint_ != ThreadType::kBackground
183           ? kForegroundBlockedWorkersPoll
184           : kBackgroundBlockedWorkersPoll;
185   in_start().max_num_workers_created = base::kMaxNumWorkersCreated.Get();
186 
187   CheckedAutoLock auto_lock(lock_);
188 
189   max_tasks_ = max_tasks;
190   DCHECK_GE(max_tasks_, 1U);
191   in_start().initial_max_tasks = std::min(max_tasks_, kMaxNumberOfWorkers);
192   max_best_effort_tasks_ = max_best_effort_tasks;
193   in_start().suggested_reclaim_time = suggested_reclaim_time;
194   in_start().worker_environment = worker_environment;
195   in_start().service_thread_task_runner = std::move(service_thread_task_runner);
196   in_start().worker_thread_observer = worker_thread_observer;
197 
198 #if DCHECK_IS_ON()
199   in_start().initialized = true;
200 #endif
201 }
202 
203 ThreadGroup::~ThreadGroup() = default;
204 
BindToCurrentThread()205 void ThreadGroup::BindToCurrentThread() {
206   DCHECK(!CurrentThreadHasGroup());
207   current_thread_group = this;
208 }
209 
UnbindFromCurrentThread()210 void ThreadGroup::UnbindFromCurrentThread() {
211   DCHECK(IsBoundToCurrentThread());
212   current_thread_group = nullptr;
213 }
214 
IsBoundToCurrentThread() const215 bool ThreadGroup::IsBoundToCurrentThread() const {
216   return current_thread_group == this;
217 }
218 
219 size_t
GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired() const220 ThreadGroup::GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired()
221     const {
222   // For simplicity, only 1 worker is assigned to each task source regardless of
223   // its max concurrency, with the exception of the top task source.
224   const size_t num_queued =
225       priority_queue_.GetNumTaskSourcesWithPriority(TaskPriority::BEST_EFFORT);
226   if (num_queued == 0 ||
227       !task_tracker_->CanRunPriority(TaskPriority::BEST_EFFORT)) {
228     return 0U;
229   }
230   if (priority_queue_.PeekSortKey().priority() == TaskPriority::BEST_EFFORT) {
231     // Assign the correct number of workers for the top TaskSource (-1 for the
232     // worker that is already accounted for in |num_queued|).
233     return std::max<size_t>(
234         1, num_queued +
235                priority_queue_.PeekTaskSource()->GetRemainingConcurrency() - 1);
236   }
237   return num_queued;
238 }
239 
240 size_t
GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired() const241 ThreadGroup::GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired()
242     const {
243   // For simplicity, only 1 worker is assigned to each task source regardless of
244   // its max concurrency, with the exception of the top task source.
245   const size_t num_queued = priority_queue_.GetNumTaskSourcesWithPriority(
246                                 TaskPriority::USER_VISIBLE) +
247                             priority_queue_.GetNumTaskSourcesWithPriority(
248                                 TaskPriority::USER_BLOCKING);
249   if (num_queued == 0 ||
250       !task_tracker_->CanRunPriority(TaskPriority::HIGHEST)) {
251     return 0U;
252   }
253   auto priority = priority_queue_.PeekSortKey().priority();
254   if (priority == TaskPriority::USER_VISIBLE ||
255       priority == TaskPriority::USER_BLOCKING) {
256     // Assign the correct number of workers for the top TaskSource (-1 for the
257     // worker that is already accounted for in |num_queued|).
258     return std::max<size_t>(
259         1, num_queued +
260                priority_queue_.PeekTaskSource()->GetRemainingConcurrency() - 1);
261   }
262   return num_queued;
263 }
264 
RemoveTaskSource(const TaskSource & task_source)265 RegisteredTaskSource ThreadGroup::RemoveTaskSource(
266     const TaskSource& task_source) {
267   CheckedAutoLock auto_lock(lock_);
268   return priority_queue_.RemoveTaskSource(task_source);
269 }
270 
ReEnqueueTaskSourceLockRequired(BaseScopedCommandsExecutor * workers_executor,ScopedReenqueueExecutor * reenqueue_executor,RegisteredTaskSourceAndTransaction transaction_with_task_source)271 void ThreadGroup::ReEnqueueTaskSourceLockRequired(
272     BaseScopedCommandsExecutor* workers_executor,
273     ScopedReenqueueExecutor* reenqueue_executor,
274     RegisteredTaskSourceAndTransaction transaction_with_task_source) {
275   // Decide in which thread group the TaskSource should be reenqueued.
276   ThreadGroup* destination_thread_group = delegate_->GetThreadGroupForTraits(
277       transaction_with_task_source.transaction.traits());
278 
279   bool push_to_immediate_queue =
280       transaction_with_task_source.task_source.WillReEnqueue(
281           TimeTicks::Now(), &transaction_with_task_source.transaction);
282 
283   if (destination_thread_group == this) {
284     // Another worker that was running a task from this task source may have
285     // reenqueued it already, in which case its heap_handle will be valid. It
286     // shouldn't be queued twice so the task source registration is released.
287     if (transaction_with_task_source.task_source->immediate_heap_handle()
288             .IsValid()) {
289       workers_executor->ScheduleReleaseTaskSource(
290           std::move(transaction_with_task_source.task_source));
291     } else {
292       // If the TaskSource should be reenqueued in the current thread group,
293       // reenqueue it inside the scope of the lock.
294       if (push_to_immediate_queue) {
295         auto sort_key = transaction_with_task_source.task_source->GetSortKey();
296         // When moving |task_source| into |priority_queue_|, it may be destroyed
297         // on another thread as soon as |lock_| is released, since we're no
298         // longer holding a reference to it. To prevent UAF, release
299         // |transaction| before moving |task_source|. Ref. crbug.com/1412008
300         transaction_with_task_source.transaction.Release();
301         priority_queue_.Push(
302             std::move(transaction_with_task_source.task_source), sort_key);
303       }
304     }
305     // This is called unconditionally to ensure there are always workers to run
306     // task sources in the queue. Some ThreadGroup implementations only invoke
307     // TakeRegisteredTaskSource() once per wake up and hence this is required to
308     // avoid races that could leave a task source stranded in the queue with no
309     // active workers.
310     EnsureEnoughWorkersLockRequired(workers_executor);
311   } else {
312     // Otherwise, schedule a reenqueue after releasing the lock.
313     reenqueue_executor->SchedulePushTaskSourceAndWakeUpWorkers(
314         std::move(transaction_with_task_source), destination_thread_group);
315   }
316 }
317 
TakeRegisteredTaskSource(BaseScopedCommandsExecutor * executor)318 RegisteredTaskSource ThreadGroup::TakeRegisteredTaskSource(
319     BaseScopedCommandsExecutor* executor) {
320   DCHECK(!priority_queue_.IsEmpty());
321 
322   auto run_status = priority_queue_.PeekTaskSource().WillRunTask();
323 
324   if (run_status == TaskSource::RunStatus::kDisallowed) {
325     executor->ScheduleReleaseTaskSource(priority_queue_.PopTaskSource());
326     return nullptr;
327   }
328 
329   if (run_status == TaskSource::RunStatus::kAllowedSaturated) {
330     return priority_queue_.PopTaskSource();
331   }
332 
333   // If the TaskSource isn't saturated, check whether TaskTracker allows it to
334   // remain in the PriorityQueue.
335   // The canonical way of doing this is to pop the task source to return, call
336   // RegisterTaskSource() to get an additional RegisteredTaskSource, and
337   // reenqueue that task source if valid. Instead, it is cheaper and equivalent
338   // to peek the task source, call RegisterTaskSource() to get an additional
339   // RegisteredTaskSource to replace if valid, and only pop |priority_queue_|
340   // otherwise.
341   RegisteredTaskSource task_source =
342       task_tracker_->RegisterTaskSource(priority_queue_.PeekTaskSource().get());
343   if (!task_source) {
344     return priority_queue_.PopTaskSource();
345   }
346   // Replace the top task_source and then update the queue.
347   std::swap(priority_queue_.PeekTaskSource(), task_source);
348   priority_queue_.UpdateSortKey(*task_source.get(), task_source->GetSortKey());
349   return task_source;
350 }
351 
UpdateSortKeyImpl(BaseScopedCommandsExecutor * executor,TaskSource::Transaction transaction)352 void ThreadGroup::UpdateSortKeyImpl(BaseScopedCommandsExecutor* executor,
353                                     TaskSource::Transaction transaction) {
354   CheckedAutoLock auto_lock(lock_);
355   priority_queue_.UpdateSortKey(*transaction.task_source(),
356                                 transaction.task_source()->GetSortKey());
357   EnsureEnoughWorkersLockRequired(executor);
358 }
359 
PushTaskSourceAndWakeUpWorkersImpl(BaseScopedCommandsExecutor * executor,RegisteredTaskSourceAndTransaction transaction_with_task_source)360 void ThreadGroup::PushTaskSourceAndWakeUpWorkersImpl(
361     BaseScopedCommandsExecutor* executor,
362     RegisteredTaskSourceAndTransaction transaction_with_task_source) {
363   DCHECK_EQ(delegate_->GetThreadGroupForTraits(
364                 transaction_with_task_source.transaction.traits()),
365             this);
366   CheckedAutoLock lock(lock_);
367   if (transaction_with_task_source.task_source->immediate_heap_handle()
368           .IsValid()) {
369     // If the task source changed group, it is possible that multiple concurrent
370     // workers try to enqueue it. Only the first enqueue should succeed.
371     executor->ScheduleReleaseTaskSource(
372         std::move(transaction_with_task_source.task_source));
373     return;
374   }
375   auto sort_key = transaction_with_task_source.task_source->GetSortKey();
376   // When moving |task_source| into |priority_queue_|, it may be destroyed
377   // on another thread as soon as |lock_| is released, since we're no longer
378   // holding a reference to it. To prevent UAF, release |transaction| before
379   // moving |task_source|. Ref. crbug.com/1412008
380   transaction_with_task_source.transaction.Release();
381   priority_queue_.Push(std::move(transaction_with_task_source.task_source),
382                        sort_key);
383   EnsureEnoughWorkersLockRequired(executor);
384 }
385 
EnqueueAllTaskSources(PriorityQueue * new_priority_queue)386 void ThreadGroup::EnqueueAllTaskSources(PriorityQueue* new_priority_queue) {
387   std::unique_ptr<BaseScopedCommandsExecutor> executor = GetExecutor();
388   CheckedAutoLock lock(lock_);
389   while (!new_priority_queue->IsEmpty()) {
390     TaskSourceSortKey top_sort_key = new_priority_queue->PeekSortKey();
391     RegisteredTaskSource task_source = new_priority_queue->PopTaskSource();
392     priority_queue_.Push(std::move(task_source), top_sort_key);
393   }
394 }
395 
HandoffAllTaskSourcesToOtherThreadGroup(ThreadGroup * destination_thread_group)396 void ThreadGroup::HandoffAllTaskSourcesToOtherThreadGroup(
397     ThreadGroup* destination_thread_group) {
398   PriorityQueue new_priority_queue;
399   TaskSourceSortKey top_sort_key;
400   {
401     CheckedAutoLock current_thread_group_lock(lock_);
402     new_priority_queue.swap(priority_queue_);
403   }
404   destination_thread_group->EnqueueAllTaskSources(&new_priority_queue);
405 }
406 
HandoffNonUserBlockingTaskSourcesToOtherThreadGroup(ThreadGroup * destination_thread_group)407 void ThreadGroup::HandoffNonUserBlockingTaskSourcesToOtherThreadGroup(
408     ThreadGroup* destination_thread_group) {
409   PriorityQueue new_priority_queue;
410   TaskSourceSortKey top_sort_key;
411   {
412     // This works because all USER_BLOCKING tasks are at the front of the queue.
413     CheckedAutoLock current_thread_group_lock(lock_);
414     while (!priority_queue_.IsEmpty() &&
415            (top_sort_key = priority_queue_.PeekSortKey()).priority() ==
416                TaskPriority::USER_BLOCKING) {
417       new_priority_queue.Push(priority_queue_.PopTaskSource(), top_sort_key);
418     }
419     new_priority_queue.swap(priority_queue_);
420   }
421   destination_thread_group->EnqueueAllTaskSources(&new_priority_queue);
422 }
423 
ShouldYield(TaskSourceSortKey sort_key)424 bool ThreadGroup::ShouldYield(TaskSourceSortKey sort_key) {
425   DCHECK(TS_UNCHECKED_READ(max_allowed_sort_key_).is_lock_free());
426 
427   if (!task_tracker_->CanRunPriority(sort_key.priority()))
428     return true;
429   // It is safe to read |max_allowed_sort_key_| without a lock since this
430   // variable is atomic, keeping in mind that threads may not immediately see
431   // the new value when it is updated.
432   auto max_allowed_sort_key =
433       TS_UNCHECKED_READ(max_allowed_sort_key_).load(std::memory_order_relaxed);
434 
435   // To reduce unnecessary yielding, a task will never yield to a BEST_EFFORT
436   // task regardless of its worker_count.
437   if (sort_key.priority() > max_allowed_sort_key.priority ||
438       max_allowed_sort_key.priority == TaskPriority::BEST_EFFORT) {
439     return false;
440   }
441   // Otherwise, a task only yields to a task of equal priority if its
442   // worker_count would be greater still after yielding, e.g. a job with 1
443   // worker doesn't yield to a job with 0 workers.
444   if (sort_key.priority() == max_allowed_sort_key.priority &&
445       sort_key.worker_count() <= max_allowed_sort_key.worker_count + 1) {
446     return false;
447   }
448 
449   // Reset |max_allowed_sort_key_| so that only one thread should yield at a
450   // time for a given task.
451   max_allowed_sort_key =
452       TS_UNCHECKED_READ(max_allowed_sort_key_)
453           .exchange(kMaxYieldSortKey, std::memory_order_relaxed);
454   // Another thread might have decided to yield and racily reset
455   // |max_allowed_sort_key_|, in which case this thread doesn't yield.
456   return max_allowed_sort_key.priority != TaskPriority::BEST_EFFORT;
457 }
458 
459 #if BUILDFLAG(IS_WIN)
460 // static
461 std::unique_ptr<win::ScopedWindowsThreadEnvironment>
GetScopedWindowsThreadEnvironment(WorkerEnvironment environment)462 ThreadGroup::GetScopedWindowsThreadEnvironment(WorkerEnvironment environment) {
463   std::unique_ptr<win::ScopedWindowsThreadEnvironment> scoped_environment;
464   if (environment == WorkerEnvironment::COM_MTA) {
465     scoped_environment = std::make_unique<win::ScopedWinrtInitializer>();
466 
467     // TODO(crbug.com/1498668): rollback the change or replace it with a CHECK
468     // before closing the bug.
469     DUMP_WILL_BE_CHECK(scoped_environment->Succeeded());
470   }
471 
472   DCHECK(!scoped_environment || scoped_environment->Succeeded());
473   return scoped_environment;
474 }
475 #endif
476 
477 // static
CurrentThreadHasGroup()478 bool ThreadGroup::CurrentThreadHasGroup() {
479   return current_thread_group != nullptr;
480 }
481 
GetMaxTasksForTesting() const482 size_t ThreadGroup::GetMaxTasksForTesting() const {
483   CheckedAutoLock auto_lock(lock_);
484   return max_tasks_;
485 }
486 
GetMaxBestEffortTasksForTesting() const487 size_t ThreadGroup::GetMaxBestEffortTasksForTesting() const {
488   CheckedAutoLock auto_lock(lock_);
489   return max_best_effort_tasks_;
490 }
491 
WaitForWorkersIdleLockRequiredForTesting(size_t n)492 void ThreadGroup::WaitForWorkersIdleLockRequiredForTesting(size_t n) {
493   // Make sure workers do not cleanup while watching the idle count.
494   AutoReset<bool> ban_cleanups(&worker_cleanup_disallowed_for_testing_, true);
495 
496   while (NumberOfIdleWorkersLockRequiredForTesting() < n) {
497     idle_workers_set_cv_for_testing_.Wait();
498   }
499 }
500 
WaitForWorkersIdleForTesting(size_t n)501 void ThreadGroup::WaitForWorkersIdleForTesting(size_t n) {
502   CheckedAutoLock auto_lock(lock_);
503 
504 #if DCHECK_IS_ON()
505   DCHECK(!some_workers_cleaned_up_for_testing_)
506       << "Workers detached prior to waiting for a specific number of idle "
507          "workers. Doing the wait under such conditions is flaky. Consider "
508          "setting the suggested reclaim time to TimeDelta::Max() in Start().";
509 #endif
510 
511   WaitForWorkersIdleLockRequiredForTesting(n);
512 }
513 
WaitForAllWorkersIdleForTesting()514 void ThreadGroup::WaitForAllWorkersIdleForTesting() {
515   CheckedAutoLock auto_lock(lock_);
516   WaitForWorkersIdleLockRequiredForTesting(workers_.size());
517 }
518 
WaitForWorkersCleanedUpForTesting(size_t n)519 void ThreadGroup::WaitForWorkersCleanedUpForTesting(size_t n) {
520   CheckedAutoLock auto_lock(lock_);
521 
522   if (!num_workers_cleaned_up_for_testing_cv_) {
523     lock_.CreateConditionVariableAndEmplace(
524         num_workers_cleaned_up_for_testing_cv_);
525   }
526 
527   while (num_workers_cleaned_up_for_testing_ < n) {
528     num_workers_cleaned_up_for_testing_cv_->Wait();
529   }
530 
531   num_workers_cleaned_up_for_testing_ = 0;
532 }
533 
GetMaxConcurrentNonBlockedTasksDeprecated() const534 size_t ThreadGroup::GetMaxConcurrentNonBlockedTasksDeprecated() const {
535 #if DCHECK_IS_ON()
536   CheckedAutoLock auto_lock(lock_);
537   DCHECK_NE(after_start().initial_max_tasks, 0U)
538       << "GetMaxConcurrentTasksDeprecated() should only be called after the "
539       << "thread group has started.";
540 #endif
541   return after_start().initial_max_tasks;
542 }
543 
NumberOfWorkersForTesting() const544 size_t ThreadGroup::NumberOfWorkersForTesting() const {
545   CheckedAutoLock auto_lock(lock_);
546   return workers_.size();
547 }
548 
NumberOfIdleWorkersForTesting() const549 size_t ThreadGroup::NumberOfIdleWorkersForTesting() const {
550   CheckedAutoLock auto_lock(lock_);
551   return NumberOfIdleWorkersLockRequiredForTesting();
552 }
553 
GetDesiredNumAwakeWorkersLockRequired() const554 size_t ThreadGroup::GetDesiredNumAwakeWorkersLockRequired() const {
555   // Number of BEST_EFFORT task sources that are running or queued and allowed
556   // to run by the CanRunPolicy.
557   const size_t num_running_or_queued_can_run_best_effort_task_sources =
558       num_running_best_effort_tasks_ +
559       GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired();
560 
561   const size_t workers_for_best_effort_task_sources =
562       std::max(std::min(num_running_or_queued_can_run_best_effort_task_sources,
563                         max_best_effort_tasks_),
564                num_running_best_effort_tasks_);
565 
566   // Number of USER_{VISIBLE|BLOCKING} task sources that are running or queued.
567   const size_t num_running_or_queued_foreground_task_sources =
568       (num_running_tasks_ - num_running_best_effort_tasks_) +
569       GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired();
570 
571   const size_t workers_for_foreground_task_sources =
572       num_running_or_queued_foreground_task_sources;
573 
574   return std::min({workers_for_best_effort_task_sources +
575                        workers_for_foreground_task_sources,
576                    max_tasks_, kMaxNumberOfWorkers});
577 }
578 
MaybeScheduleAdjustMaxTasksLockRequired(BaseScopedCommandsExecutor * executor)579 void ThreadGroup::MaybeScheduleAdjustMaxTasksLockRequired(
580     BaseScopedCommandsExecutor* executor) {
581   if (!adjust_max_tasks_posted_ &&
582       ShouldPeriodicallyAdjustMaxTasksLockRequired()) {
583     executor->ScheduleAdjustMaxTasks();
584     adjust_max_tasks_posted_ = true;
585   }
586 }
587 
ScheduleAdjustMaxTasks()588 void ThreadGroup::ScheduleAdjustMaxTasks() {
589   // |adjust_max_tasks_posted_| can't change before the task posted below runs.
590   // Skip check on NaCl to avoid unsafe reference acquisition warning.
591 #if !BUILDFLAG(IS_NACL)
592   DCHECK(TS_UNCHECKED_READ(adjust_max_tasks_posted_));
593 #endif
594 
595   after_start().service_thread_task_runner->PostDelayedTask(
596       FROM_HERE, BindOnce(&ThreadGroup::AdjustMaxTasks, Unretained(this)),
597       after_start().blocked_workers_poll_period);
598 }
599 
AdjustMaxTasks()600 void ThreadGroup::AdjustMaxTasks() {
601   DCHECK(
602       after_start().service_thread_task_runner->RunsTasksInCurrentSequence());
603 
604   std::unique_ptr<BaseScopedCommandsExecutor> executor = GetExecutor();
605   CheckedAutoLock auto_lock(lock_);
606   DCHECK(adjust_max_tasks_posted_);
607   adjust_max_tasks_posted_ = false;
608 
609   // Increment max tasks for each worker that has been within a MAY_BLOCK
610   // ScopedBlockingCall for more than may_block_threshold.
611   for (scoped_refptr<WorkerThread> worker : workers_) {
612     // The delegates of workers inside a ThreadGroup should be
613     // WaitableEventWorkerDelegates.
614     ThreadGroupWorkerDelegate* delegate = GetWorkerDelegate(worker.get());
615     AnnotateAcquiredLockAlias annotate(lock_, delegate->lock());
616     delegate->MaybeIncrementMaxTasksLockRequired();
617   }
618 
619   // Wake up workers according to the updated |max_tasks_|. This will also
620   // reschedule AdjustMaxTasks() if necessary.
621   EnsureEnoughWorkersLockRequired(executor.get());
622 }
623 
OnShutDownStartedImpl(BaseScopedCommandsExecutor * executor)624 void ThreadGroup::OnShutDownStartedImpl(BaseScopedCommandsExecutor* executor) {
625   CheckedAutoLock auto_lock(lock_);
626 
627   // Don't do anything if the thread group isn't started.
628   if (max_tasks_ == 0 || UNLIKELY(join_for_testing_started_)) {
629     return;
630   }
631 
632   // Start a MAY_BLOCK scope on each worker that is already running a task.
633   for (scoped_refptr<WorkerThread>& worker : workers_) {
634     // The delegates of workers inside a ThreadGroup should be
635     // WorkerThreadDelegateImpls.
636     ThreadGroupWorkerDelegate* delegate = GetWorkerDelegate(worker.get());
637     AnnotateAcquiredLockAlias annotate(lock_, delegate->lock());
638     delegate->OnShutdownStartedLockRequired(executor);
639   }
640   EnsureEnoughWorkersLockRequired(executor);
641 
642   shutdown_started_ = true;
643 }
644 
ShouldPeriodicallyAdjustMaxTasksLockRequired()645 bool ThreadGroup::ShouldPeriodicallyAdjustMaxTasksLockRequired() {
646   // AdjustMaxTasks() should be scheduled to periodically adjust |max_tasks_|
647   // and |max_best_effort_tasks_| when (1) the concurrency limits are not large
648   // enough to accommodate all queued and running task sources and an idle
649   // worker and (2) there are unresolved MAY_BLOCK ScopedBlockingCalls.
650   // - When (1) is false: No worker would be created or woken up if the
651   //   concurrency limits were increased, so there is no hurry to increase them.
652   // - When (2) is false: The concurrency limits could not be increased by
653   //   AdjustMaxTasks().
654 
655   const size_t num_running_or_queued_best_effort_task_sources =
656       num_running_best_effort_tasks_ +
657       GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired();
658   if (num_running_or_queued_best_effort_task_sources > max_best_effort_tasks_ &&
659       num_unresolved_best_effort_may_block_ > 0) {
660     return true;
661   }
662 
663   const size_t num_running_or_queued_task_sources =
664       num_running_tasks_ +
665       GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired() +
666       GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired();
667   constexpr size_t kIdleWorker = 1;
668   return num_running_or_queued_task_sources + kIdleWorker > max_tasks_ &&
669          num_unresolved_may_block_ > 0;
670 }
671 
UpdateMinAllowedPriorityLockRequired()672 void ThreadGroup::UpdateMinAllowedPriorityLockRequired() {
673   if (priority_queue_.IsEmpty() || num_running_tasks_ < max_tasks_) {
674     max_allowed_sort_key_.store(kMaxYieldSortKey, std::memory_order_relaxed);
675   } else {
676     max_allowed_sort_key_.store({priority_queue_.PeekSortKey().priority(),
677                                  priority_queue_.PeekSortKey().worker_count()},
678                                 std::memory_order_relaxed);
679   }
680 }
681 
DecrementTasksRunningLockRequired(TaskPriority priority)682 void ThreadGroup::DecrementTasksRunningLockRequired(TaskPriority priority) {
683   DCHECK_GT(num_running_tasks_, 0U);
684   --num_running_tasks_;
685   if (priority == TaskPriority::BEST_EFFORT) {
686     DCHECK_GT(num_running_best_effort_tasks_, 0U);
687     --num_running_best_effort_tasks_;
688   }
689   UpdateMinAllowedPriorityLockRequired();
690 }
691 
IncrementTasksRunningLockRequired(TaskPriority priority)692 void ThreadGroup::IncrementTasksRunningLockRequired(TaskPriority priority) {
693   ++num_running_tasks_;
694   DCHECK_LE(num_running_tasks_, max_tasks_);
695   DCHECK_LE(num_running_tasks_, kMaxNumberOfWorkers);
696   if (priority == TaskPriority::BEST_EFFORT) {
697     ++num_running_best_effort_tasks_;
698     DCHECK_LE(num_running_best_effort_tasks_, num_running_tasks_);
699     DCHECK_LE(num_running_best_effort_tasks_, max_best_effort_tasks_);
700   }
701   UpdateMinAllowedPriorityLockRequired();
702 }
703 
DecrementMaxTasksLockRequired()704 void ThreadGroup::DecrementMaxTasksLockRequired() {
705   DCHECK_GT(num_running_tasks_, 0U);
706   DCHECK_GT(max_tasks_, 0U);
707   --max_tasks_;
708   UpdateMinAllowedPriorityLockRequired();
709 }
710 
IncrementMaxTasksLockRequired()711 void ThreadGroup::IncrementMaxTasksLockRequired() {
712   DCHECK_GT(num_running_tasks_, 0U);
713   ++max_tasks_;
714   UpdateMinAllowedPriorityLockRequired();
715 }
716 
DecrementMaxBestEffortTasksLockRequired()717 void ThreadGroup::DecrementMaxBestEffortTasksLockRequired() {
718   DCHECK_GT(num_running_tasks_, 0U);
719   DCHECK_GT(max_best_effort_tasks_, 0U);
720   --max_best_effort_tasks_;
721   UpdateMinAllowedPriorityLockRequired();
722 }
723 
IncrementMaxBestEffortTasksLockRequired()724 void ThreadGroup::IncrementMaxBestEffortTasksLockRequired() {
725   DCHECK_GT(num_running_tasks_, 0U);
726   ++max_best_effort_tasks_;
727   UpdateMinAllowedPriorityLockRequired();
728 }
729 
730 ThreadGroup::InitializedInStart::InitializedInStart() = default;
731 ThreadGroup::InitializedInStart::~InitializedInStart() = default;
732 
733 }  // namespace internal
734 }  // namespace base
735