xref: /aosp_15_r20/external/cronet/base/task/thread_pool/thread_group_impl.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/thread_group_impl.h"
6 
7 #include <optional>
8 #include <string_view>
9 
10 #include "base/auto_reset.h"
11 #include "base/metrics/histogram_macros.h"
12 #include "base/sequence_token.h"
13 #include "base/task/common/checked_lock.h"
14 #include "base/task/thread_pool/thread_group_worker_delegate.h"
15 #include "base/task/thread_pool/worker_thread_waitable_event.h"
16 #include "base/threading/scoped_blocking_call.h"
17 #include "base/threading/scoped_blocking_call_internal.h"
18 #include "base/threading/thread_checker.h"
19 #include "base/time/time_override.h"
20 #include "base/trace_event/base_tracing.h"
21 #include "third_party/abseil-cpp/absl/container/inlined_vector.h"
22 
23 namespace base {
24 namespace internal {
25 
26 namespace {
27 
28 constexpr size_t kMaxNumberOfWorkers = 256;
29 
30 }  // namespace
31 
32 // Upon destruction, executes actions that control the number of active workers.
33 // Useful to satisfy locking requirements of these actions.
34 class ThreadGroupImpl::ScopedCommandsExecutor
35     : public ThreadGroup::BaseScopedCommandsExecutor {
36  public:
ScopedCommandsExecutor(ThreadGroupImpl * outer)37   explicit ScopedCommandsExecutor(ThreadGroupImpl* outer)
38       : BaseScopedCommandsExecutor(outer) {}
39 
40   ScopedCommandsExecutor(const ScopedCommandsExecutor&) = delete;
41   ScopedCommandsExecutor& operator=(const ScopedCommandsExecutor&) = delete;
~ScopedCommandsExecutor()42   ~ScopedCommandsExecutor() override {
43     CheckedLock::AssertNoLockHeldOnCurrentThread();
44 
45     // Wake up workers.
46     for (auto worker : workers_to_wake_up_) {
47       worker->WakeUp();
48     }
49   }
50 
ScheduleWakeUp(scoped_refptr<WorkerThreadWaitableEvent> worker)51   void ScheduleWakeUp(scoped_refptr<WorkerThreadWaitableEvent> worker) {
52     workers_to_wake_up_.emplace_back(std::move(worker));
53   }
54 
55  private:
56   absl::InlinedVector<scoped_refptr<WorkerThreadWaitableEvent>, 2>
57       workers_to_wake_up_;
58 };
59 
60 class ThreadGroupImpl::WaitableEventWorkerDelegate
61     : public ThreadGroup::ThreadGroupWorkerDelegate,
62       public WorkerThreadWaitableEvent::Delegate {
63  public:
64   // |outer| owns the worker for which this delegate is constructed. If
65   // |is_excess| is true, this worker will be eligible for reclaim.
66   explicit WaitableEventWorkerDelegate(TrackedRef<ThreadGroup> outer,
67                                        bool is_excess);
68   WaitableEventWorkerDelegate(const WaitableEventWorkerDelegate&) = delete;
69   WaitableEventWorkerDelegate& operator=(const WaitableEventWorkerDelegate&) =
70       delete;
71 
72   // OnMainExit() handles the thread-affine cleanup;
73   // WaitableEventWorkerDelegate can thereafter safely be deleted from any
74   // thread.
75   ~WaitableEventWorkerDelegate() override = default;
76 
77   // ThreadGroup::Delegate:
78   void OnMainEntry(WorkerThread* worker) override;
79   void OnMainExit(WorkerThread* worker) override;
80   RegisteredTaskSource GetWork(WorkerThread* worker) override;
81   RegisteredTaskSource SwapProcessedTask(RegisteredTaskSource task_source,
82                                          WorkerThread* worker) override;
83 
84   // WorkerThreadWaitableEvent::Delegate:
85   void RecordUnnecessaryWakeup() override;
86   TimeDelta GetSleepTimeout() override;
87 
88  private:
outer() const89   ThreadGroupImpl* outer() const {
90     return static_cast<ThreadGroupImpl*>(outer_.get());
91   }
92 
93   // ThreadGroup::ThreadGroupWorkerDelegate:
94   bool CanGetWorkLockRequired(BaseScopedCommandsExecutor* executor,
95                               WorkerThread* worker)
96       EXCLUSIVE_LOCKS_REQUIRED(outer()->lock_) override;
97   void CleanupLockRequired(BaseScopedCommandsExecutor* executor,
98                            WorkerThread* worker)
99       EXCLUSIVE_LOCKS_REQUIRED(outer()->lock_) override;
100   void OnWorkerBecomesIdleLockRequired(BaseScopedCommandsExecutor* executor,
101                                        WorkerThread* worker)
102       EXCLUSIVE_LOCKS_REQUIRED(outer()->lock_) override;
103 
104   // Returns true if |worker| is allowed to cleanup and remove itself from the
105   // thread group. Called from GetWork() when no work is available.
106   bool CanCleanupLockRequired(const WorkerThread* worker)
107       EXCLUSIVE_LOCKS_REQUIRED(outer()->lock_) override;
108 };
109 
110 std::unique_ptr<ThreadGroup::BaseScopedCommandsExecutor>
GetExecutor()111 ThreadGroupImpl::GetExecutor() {
112   return static_cast<std::unique_ptr<BaseScopedCommandsExecutor>>(
113       std::make_unique<ScopedCommandsExecutor>(this));
114 }
115 
ThreadGroupImpl(std::string_view histogram_label,std::string_view thread_group_label,ThreadType thread_type_hint,TrackedRef<TaskTracker> task_tracker,TrackedRef<Delegate> delegate)116 ThreadGroupImpl::ThreadGroupImpl(std::string_view histogram_label,
117                                  std::string_view thread_group_label,
118                                  ThreadType thread_type_hint,
119                                  TrackedRef<TaskTracker> task_tracker,
120                                  TrackedRef<Delegate> delegate)
121     : ThreadGroup(histogram_label,
122                   thread_group_label,
123                   thread_type_hint,
124                   std::move(task_tracker),
125                   std::move(delegate)),
126       tracked_ref_factory_(this) {
127   DCHECK(!thread_group_label_.empty());
128 }
129 
Start(size_t max_tasks,size_t max_best_effort_tasks,TimeDelta suggested_reclaim_time,scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner,WorkerThreadObserver * worker_thread_observer,WorkerEnvironment worker_environment,bool synchronous_thread_start_for_testing,std::optional<TimeDelta> may_block_threshold)130 void ThreadGroupImpl::Start(
131     size_t max_tasks,
132     size_t max_best_effort_tasks,
133     TimeDelta suggested_reclaim_time,
134     scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner,
135     WorkerThreadObserver* worker_thread_observer,
136     WorkerEnvironment worker_environment,
137     bool synchronous_thread_start_for_testing,
138     std::optional<TimeDelta> may_block_threshold) {
139   ThreadGroup::StartImpl(
140       max_tasks, max_best_effort_tasks, suggested_reclaim_time,
141       service_thread_task_runner, worker_thread_observer, worker_environment,
142       synchronous_thread_start_for_testing, may_block_threshold);
143 
144   ScopedCommandsExecutor executor(this);
145   CheckedAutoLock auto_lock(lock_);
146   DCHECK(workers_.empty());
147   EnsureEnoughWorkersLockRequired(&executor);
148 }
149 
~ThreadGroupImpl()150 ThreadGroupImpl::~ThreadGroupImpl() {
151   // ThreadGroup should only ever be deleted:
152   //  1) In tests, after JoinForTesting().
153   //  2) In production, iff initialization failed.
154   // In both cases |workers_| should be empty.
155   DCHECK(workers_.empty());
156 }
157 
UpdateSortKey(TaskSource::Transaction transaction)158 void ThreadGroupImpl::UpdateSortKey(TaskSource::Transaction transaction) {
159   ScopedCommandsExecutor executor(this);
160   UpdateSortKeyImpl(&executor, std::move(transaction));
161 }
162 
PushTaskSourceAndWakeUpWorkers(RegisteredTaskSourceAndTransaction transaction_with_task_source)163 void ThreadGroupImpl::PushTaskSourceAndWakeUpWorkers(
164     RegisteredTaskSourceAndTransaction transaction_with_task_source) {
165   ScopedCommandsExecutor executor(this);
166   PushTaskSourceAndWakeUpWorkersImpl(&executor,
167                                      std::move(transaction_with_task_source));
168 }
169 
WaitableEventWorkerDelegate(TrackedRef<ThreadGroup> outer,bool is_excess)170 ThreadGroupImpl::WaitableEventWorkerDelegate::WaitableEventWorkerDelegate(
171     TrackedRef<ThreadGroup> outer,
172     bool is_excess)
173     : ThreadGroupWorkerDelegate(std::move(outer), is_excess) {
174   // Bound in OnMainEntry().
175   DETACH_FROM_THREAD(worker_thread_checker_);
176 }
177 
GetSleepTimeout()178 TimeDelta ThreadGroupImpl::WaitableEventWorkerDelegate::GetSleepTimeout() {
179   return ThreadPoolSleepTimeout();
180 }
181 
OnMainEntry(WorkerThread * worker)182 void ThreadGroupImpl::WaitableEventWorkerDelegate::OnMainEntry(
183     WorkerThread* worker) {
184   OnMainEntryImpl(worker);
185 }
186 
OnMainExit(WorkerThread * worker_base)187 void ThreadGroupImpl::WaitableEventWorkerDelegate::OnMainExit(
188     WorkerThread* worker_base) {
189   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
190 
191 #if DCHECK_IS_ON()
192   WorkerThreadWaitableEvent* worker =
193       static_cast<WorkerThreadWaitableEvent*>(worker_base);
194   {
195     bool shutdown_complete = outer()->task_tracker_->IsShutdownComplete();
196     CheckedAutoLock auto_lock(outer()->lock_);
197 
198     // |worker| should already have been removed from the idle workers set and
199     // |workers_| by the time the thread is about to exit. (except in the
200     // cases where the thread group is no longer going to be used - in which
201     // case, it's fine for there to be invalid workers in the thread group).
202     if (!shutdown_complete && !outer()->join_for_testing_started_) {
203       DCHECK(!outer()->idle_workers_set_.Contains(worker));
204       DCHECK(!ContainsWorker(outer()->workers_, worker));
205     }
206   }
207 #endif
208 
209 #if BUILDFLAG(IS_WIN)
210   worker_only().win_thread_environment.reset();
211 #endif  // BUILDFLAG(IS_WIN)
212 
213   // Count cleaned up workers for tests. It's important to do this here
214   // instead of at the end of CleanupLockRequired() because some side-effects
215   // of cleaning up happen outside the lock (e.g. recording histograms) and
216   // resuming from tests must happen-after that point or checks on the main
217   // thread will be flaky (crbug.com/1047733).
218   CheckedAutoLock auto_lock(outer()->lock_);
219   ++outer()->num_workers_cleaned_up_for_testing_;
220 #if DCHECK_IS_ON()
221   outer()->some_workers_cleaned_up_for_testing_ = true;
222 #endif
223   if (outer()->num_workers_cleaned_up_for_testing_cv_) {
224     outer()->num_workers_cleaned_up_for_testing_cv_->Signal();
225   }
226 }
227 
CanGetWorkLockRequired(BaseScopedCommandsExecutor * executor,WorkerThread * worker_base)228 bool ThreadGroupImpl::WaitableEventWorkerDelegate::CanGetWorkLockRequired(
229     BaseScopedCommandsExecutor* executor,
230     WorkerThread* worker_base) {
231   WorkerThreadWaitableEvent* worker =
232       static_cast<WorkerThreadWaitableEvent*>(worker_base);
233 
234   const bool is_on_idle_workers_set = outer()->IsOnIdleSetLockRequired(worker);
235   DCHECK_EQ(is_on_idle_workers_set,
236             outer()->idle_workers_set_.Contains(worker));
237 
238   AnnotateAcquiredLockAlias annotate(outer()->lock_, lock());
239   // This occurs when the when WorkerThread::Delegate::WaitForWork() times out
240   // (i.e. when the worker's wakes up after GetSleepTimeout()).
241   if (is_on_idle_workers_set) {
242     if (CanCleanupLockRequired(worker)) {
243       CleanupLockRequired(executor, worker);
244     }
245     return false;
246   }
247 
248   // If too many workers are running, this worker should not get work, until
249   // tasks are no longer in excess (i.e. max tasks increases). This ensures that
250   // if this worker is in excess, it gets a chance to being cleaned up.
251   if (outer()->GetNumAwakeWorkersLockRequired() > outer()->max_tasks_) {
252     OnWorkerBecomesIdleLockRequired(executor, worker);
253     return false;
254   }
255 
256   return true;
257 }
258 
GetWork(WorkerThread * worker)259 RegisteredTaskSource ThreadGroupImpl::WaitableEventWorkerDelegate::GetWork(
260     WorkerThread* worker) {
261   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
262   DCHECK(!read_worker().current_task_priority);
263   DCHECK(!read_worker().current_shutdown_behavior);
264 
265   ScopedCommandsExecutor executor(outer());
266   CheckedAutoLock auto_lock(outer()->lock_);
267   AnnotateAcquiredLockAlias alias(
268       outer()->lock_, static_cast<ThreadGroupImpl*>(outer_.get())->lock_);
269   return GetWorkLockRequired(&executor, worker);
270 }
271 
272 RegisteredTaskSource
SwapProcessedTask(RegisteredTaskSource task_source,WorkerThread * worker)273 ThreadGroupImpl::WaitableEventWorkerDelegate::SwapProcessedTask(
274     RegisteredTaskSource task_source,
275     WorkerThread* worker) {
276   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
277   DCHECK(read_worker().current_task_priority);
278   DCHECK(read_worker().current_shutdown_behavior);
279 
280   // A transaction to the TaskSource to reenqueue, if any. Instantiated here as
281   // |TaskSource::lock_| is a UniversalPredecessor and must always be acquired
282   // prior to acquiring a second lock
283   std::optional<RegisteredTaskSourceAndTransaction>
284       transaction_with_task_source;
285   if (task_source) {
286     transaction_with_task_source.emplace(
287         RegisteredTaskSourceAndTransaction::FromTaskSource(
288             std::move(task_source)));
289   }
290 
291   // Calling WakeUp() guarantees that this WorkerThread will run Tasks from
292   // TaskSources returned by the GetWork() method of |delegate_| until it
293   // returns nullptr. Resetting |wake_up_event_| here doesn't break this
294   // invariant and avoids a useless loop iteration before going to sleep if
295   // WakeUp() is called while this WorkerThread is awake.
296   wake_up_event_.Reset();
297 
298   ScopedCommandsExecutor workers_executor(outer());
299   ScopedReenqueueExecutor reenqueue_executor;
300   CheckedAutoLock auto_lock(outer()->lock_);
301   AnnotateAcquiredLockAlias annotate(outer()->lock_, lock());
302 
303   // During shutdown, max_tasks may have been incremented in
304   // OnShutdownStartedLockRequired().
305   if (incremented_max_tasks_for_shutdown_) {
306     DCHECK(outer()->shutdown_started_);
307     outer()->DecrementMaxTasksLockRequired();
308     if (*read_worker().current_task_priority == TaskPriority::BEST_EFFORT) {
309       outer()->DecrementMaxBestEffortTasksLockRequired();
310     }
311     incremented_max_tasks_since_blocked_ = false;
312     incremented_max_best_effort_tasks_since_blocked_ = false;
313     incremented_max_tasks_for_shutdown_ = false;
314   }
315 
316   DCHECK(read_worker().blocking_start_time.is_null());
317   DCHECK(!incremented_max_tasks_since_blocked_);
318   DCHECK(!incremented_max_best_effort_tasks_since_blocked_);
319 
320   // Running task bookkeeping.
321   outer()->DecrementTasksRunningLockRequired(
322       *read_worker().current_task_priority);
323   write_worker().current_shutdown_behavior = std::nullopt;
324   write_worker().current_task_priority = std::nullopt;
325 
326   if (transaction_with_task_source) {
327     outer()->ReEnqueueTaskSourceLockRequired(
328         &workers_executor, &reenqueue_executor,
329         std::move(transaction_with_task_source.value()));
330   }
331 
332   return GetWorkLockRequired(&workers_executor,
333                              static_cast<WorkerThreadWaitableEvent*>(worker));
334 }
335 
CanCleanupLockRequired(const WorkerThread * worker)336 bool ThreadGroupImpl::WaitableEventWorkerDelegate::CanCleanupLockRequired(
337     const WorkerThread* worker) {
338   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
339   if (!is_excess_) {
340     return false;
341   }
342 
343   const TimeTicks last_used_time = worker->GetLastUsedTime();
344   return !last_used_time.is_null() &&
345          subtle::TimeTicksNowIgnoringOverride() - last_used_time >=
346              outer()->after_start().suggested_reclaim_time &&
347          LIKELY(!outer()->worker_cleanup_disallowed_for_testing_);
348 }
349 
CleanupLockRequired(BaseScopedCommandsExecutor * executor,WorkerThread * worker_base)350 void ThreadGroupImpl::WaitableEventWorkerDelegate::CleanupLockRequired(
351     BaseScopedCommandsExecutor* executor,
352     WorkerThread* worker_base) {
353   WorkerThreadWaitableEvent* worker =
354       static_cast<WorkerThreadWaitableEvent*>(worker_base);
355   DCHECK(!outer()->join_for_testing_started_);
356   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
357 
358   worker->Cleanup();
359 
360   if (outer()->IsOnIdleSetLockRequired(worker)) {
361     outer()->idle_workers_set_.Remove(worker);
362   }
363 
364   // Remove the worker from |workers_|.
365   auto worker_iter = ranges::find(outer()->workers_, worker);
366   CHECK(worker_iter != outer()->workers_.end(), base::NotFatalUntil::M125);
367   outer()->workers_.erase(worker_iter);
368 }
369 
370 void ThreadGroupImpl::WaitableEventWorkerDelegate::
OnWorkerBecomesIdleLockRequired(BaseScopedCommandsExecutor * executor,WorkerThread * worker_base)371     OnWorkerBecomesIdleLockRequired(BaseScopedCommandsExecutor* executor,
372                                     WorkerThread* worker_base) {
373   WorkerThreadWaitableEvent* worker =
374       static_cast<WorkerThreadWaitableEvent*>(worker_base);
375 
376   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
377   DCHECK(!outer()->idle_workers_set_.Contains(worker));
378 
379   // Add the worker to the idle set.
380   outer()->idle_workers_set_.Insert(worker);
381   DCHECK_LE(outer()->idle_workers_set_.Size(), outer()->workers_.size());
382   outer()->idle_workers_set_cv_for_testing_.Broadcast();
383 }
384 
RecordUnnecessaryWakeup()385 void ThreadGroupImpl::WaitableEventWorkerDelegate::RecordUnnecessaryWakeup() {
386   RecordUnnecessaryWakeupImpl();
387 }
388 
JoinForTesting()389 void ThreadGroupImpl::JoinForTesting() {
390   decltype(workers_) workers_copy;
391   {
392     CheckedAutoLock auto_lock(lock_);
393     priority_queue_.EnableFlushTaskSourcesOnDestroyForTesting();
394 
395     DCHECK_GT(workers_.size(), size_t(0))
396         << "Joined an unstarted thread group.";
397 
398     join_for_testing_started_ = true;
399 
400     // Ensure WorkerThreads in |workers_| do not attempt to cleanup while
401     // being joined.
402     worker_cleanup_disallowed_for_testing_ = true;
403 
404     // Make a copy of the WorkerThreads so that we can call
405     // WorkerThread::JoinForTesting() without holding |lock_| since
406     // WorkerThreads may need to access |workers_|.
407     workers_copy = workers_;
408   }
409   for (const auto& worker : workers_copy) {
410     static_cast<WorkerThreadWaitableEvent*>(worker.get())->JoinForTesting();
411   }
412 
413   CheckedAutoLock auto_lock(lock_);
414   DCHECK(workers_ == workers_copy);
415   // Release |workers_| to clear their TrackedRef against |this|.
416   workers_.clear();
417 }
418 
NumberOfIdleWorkersLockRequiredForTesting() const419 size_t ThreadGroupImpl::NumberOfIdleWorkersLockRequiredForTesting() const {
420   return idle_workers_set_.Size();
421 }
422 
MaintainAtLeastOneIdleWorkerLockRequired(ScopedCommandsExecutor * executor)423 void ThreadGroupImpl::MaintainAtLeastOneIdleWorkerLockRequired(
424     ScopedCommandsExecutor* executor) {
425   if (workers_.size() == kMaxNumberOfWorkers) {
426     return;
427   }
428   DCHECK_LT(workers_.size(), kMaxNumberOfWorkers);
429 
430   if (!idle_workers_set_.IsEmpty()) {
431     return;
432   }
433 
434   if (workers_.size() >= max_tasks_) {
435     return;
436   }
437 
438   scoped_refptr<WorkerThreadWaitableEvent> new_worker =
439       CreateAndRegisterWorkerLockRequired(executor);
440   DCHECK(new_worker);
441   idle_workers_set_.Insert(new_worker.get());
442 }
443 
444 scoped_refptr<WorkerThreadWaitableEvent>
CreateAndRegisterWorkerLockRequired(ScopedCommandsExecutor * executor)445 ThreadGroupImpl::CreateAndRegisterWorkerLockRequired(
446     ScopedCommandsExecutor* executor) {
447   DCHECK(!join_for_testing_started_);
448   DCHECK_LT(workers_.size(), max_tasks_);
449   DCHECK_LT(workers_.size(), kMaxNumberOfWorkers);
450   DCHECK(idle_workers_set_.IsEmpty());
451 
452   // WorkerThread needs |lock_| as a predecessor for its thread lock because in
453   // GetWork(), |lock_| is first acquired and then the thread lock is acquired
454   // when GetLastUsedTime() is called on the worker by CanGetWorkLockRequired().
455   scoped_refptr<WorkerThreadWaitableEvent> worker =
456       MakeRefCounted<WorkerThreadWaitableEvent>(
457           thread_type_hint_,
458           std::make_unique<WaitableEventWorkerDelegate>(
459               tracked_ref_factory_.GetTrackedRef(),
460               /* is_excess=*/after_start().no_worker_reclaim
461                   ? workers_.size() >= after_start().initial_max_tasks
462                   : true),
463           task_tracker_, worker_sequence_num_++, &lock_);
464 
465   workers_.push_back(worker);
466   executor->ScheduleStart(worker);
467   DCHECK_LE(workers_.size(), max_tasks_);
468 
469   return worker;
470 }
471 
GetNumAwakeWorkersLockRequired() const472 size_t ThreadGroupImpl::GetNumAwakeWorkersLockRequired() const {
473   DCHECK_GE(workers_.size(), idle_workers_set_.Size());
474   size_t num_awake_workers = workers_.size() - idle_workers_set_.Size();
475   DCHECK_GE(num_awake_workers, num_running_tasks_);
476   return num_awake_workers;
477 }
478 
DidUpdateCanRunPolicy()479 void ThreadGroupImpl::DidUpdateCanRunPolicy() {
480   ScopedCommandsExecutor executor(this);
481   CheckedAutoLock auto_lock(lock_);
482   EnsureEnoughWorkersLockRequired(&executor);
483 }
484 
GetWorkerDelegate(WorkerThread * worker)485 ThreadGroup::ThreadGroupWorkerDelegate* ThreadGroupImpl::GetWorkerDelegate(
486     WorkerThread* worker) {
487   return static_cast<ThreadGroup::ThreadGroupWorkerDelegate*>(
488       static_cast<WaitableEventWorkerDelegate*>(worker->delegate()));
489 }
490 
OnShutdownStarted()491 void ThreadGroupImpl::OnShutdownStarted() {
492   ScopedCommandsExecutor executor(this);
493   OnShutDownStartedImpl(&executor);
494 }
495 
EnsureEnoughWorkersLockRequired(BaseScopedCommandsExecutor * base_executor)496 void ThreadGroupImpl::EnsureEnoughWorkersLockRequired(
497     BaseScopedCommandsExecutor* base_executor) {
498   // Don't do anything if the thread group isn't started.
499   if (max_tasks_ == 0 || UNLIKELY(join_for_testing_started_)) {
500     return;
501   }
502 
503   ScopedCommandsExecutor* executor =
504       static_cast<ScopedCommandsExecutor*>(base_executor);
505 
506   const size_t desired_num_awake_workers =
507       GetDesiredNumAwakeWorkersLockRequired();
508   const size_t num_awake_workers = GetNumAwakeWorkersLockRequired();
509 
510   size_t num_workers_to_wake_up =
511       ClampSub(desired_num_awake_workers, num_awake_workers);
512   num_workers_to_wake_up = std::min(num_workers_to_wake_up, size_t(2U));
513 
514   // Wake up the appropriate number of workers.
515   for (size_t i = 0; i < num_workers_to_wake_up; ++i) {
516     MaintainAtLeastOneIdleWorkerLockRequired(executor);
517     WorkerThreadWaitableEvent* worker_to_wakeup = idle_workers_set_.Take();
518     DCHECK(worker_to_wakeup);
519     executor->ScheduleWakeUp(worker_to_wakeup);
520   }
521 
522   // In the case where the loop above didn't wake up any worker and we don't
523   // have excess workers, the idle worker should be maintained. This happens
524   // when called from the last worker awake, or a recent increase in |max_tasks|
525   // now makes it possible to keep an idle worker.
526   if (desired_num_awake_workers == num_awake_workers) {
527     MaintainAtLeastOneIdleWorkerLockRequired(executor);
528   }
529 
530   // This function is called every time a task source is (re-)enqueued,
531   // hence the minimum priority needs to be updated.
532   UpdateMinAllowedPriorityLockRequired();
533 
534   // Ensure that the number of workers is periodically adjusted if needed.
535   MaybeScheduleAdjustMaxTasksLockRequired(executor);
536 }
537 
IsOnIdleSetLockRequired(WorkerThreadWaitableEvent * worker) const538 bool ThreadGroupImpl::IsOnIdleSetLockRequired(
539     WorkerThreadWaitableEvent* worker) const {
540   // To avoid searching through the idle set : use GetLastUsedTime() not being
541   // null (or being directly on top of the idle set) as a proxy for being on
542   // the idle set.
543   return idle_workers_set_.Peek() == worker ||
544          !worker->GetLastUsedTime().is_null();
545 }
546 
547 }  // namespace internal
548 }  // namespace base
549