xref: /aosp_15_r20/external/cronet/base/task/thread_pool/thread_group_semaphore.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2024 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/thread_group_semaphore.h"
6 
7 #include <algorithm>
8 #include <string_view>
9 
10 #include "base/metrics/histogram_macros.h"
11 #include "base/sequence_token.h"
12 #include "base/strings/stringprintf.h"
13 #include "base/task/common/checked_lock.h"
14 #include "base/task/thread_pool/thread_group.h"
15 #include "base/task/thread_pool/worker_thread_semaphore.h"
16 #include "base/threading/scoped_blocking_call.h"
17 #include "base/threading/scoped_blocking_call_internal.h"
18 #include "base/threading/thread_checker.h"
19 #include "base/time/time_override.h"
20 #include "base/trace_event/base_tracing.h"
21 
22 namespace base {
23 namespace internal {
24 
25 namespace {
26 constexpr size_t kMaxNumberOfWorkers = 256;
27 }  // namespace
28 
29 // Upon destruction, executes actions that control the number of active workers.
30 // Useful to satisfy locking requirements of these actions.
31 class ThreadGroupSemaphore::SemaphoreScopedCommandsExecutor
32     : public ThreadGroup::BaseScopedCommandsExecutor {
33  public:
SemaphoreScopedCommandsExecutor(ThreadGroupSemaphore * outer)34   explicit SemaphoreScopedCommandsExecutor(ThreadGroupSemaphore* outer)
35       : BaseScopedCommandsExecutor(outer) {}
36 
37   SemaphoreScopedCommandsExecutor(const SemaphoreScopedCommandsExecutor&) =
38       delete;
39   SemaphoreScopedCommandsExecutor& operator=(
40       const SemaphoreScopedCommandsExecutor&) = delete;
~SemaphoreScopedCommandsExecutor()41   ~SemaphoreScopedCommandsExecutor() override {
42     CheckedLock::AssertNoLockHeldOnCurrentThread();
43     for (int i = 0; i < semaphore_signal_count_; ++i) {
44       TRACE_EVENT_INSTANT("wakeup.flow", "WorkerThreadSemaphore::Signal",
45                           perfetto::Flow::FromPointer(&outer()->semaphore_));
46       outer()->semaphore_.Signal();
47     }
48   }
49 
ScheduleSignal()50   void ScheduleSignal() EXCLUSIVE_LOCKS_REQUIRED(outer()->lock_) {
51     ++semaphore_signal_count_;
52     ++outer()->num_active_signals_;
53   }
54 
55  private:
56   friend class ThreadGroupSemaphore;
57 
outer()58   ThreadGroupSemaphore* outer() {
59     return static_cast<ThreadGroupSemaphore*>(outer_);
60   }
61 
62   int semaphore_signal_count_ = 0;
63 };
64 
65 class ThreadGroupSemaphore::SemaphoreWorkerDelegate
66     : public ThreadGroup::ThreadGroupWorkerDelegate,
67       public WorkerThreadSemaphore::Delegate {
68  public:
69   // `outer` owns the worker for which this delegate is
70   // constructed. `join_called_for_testing` is shared amongst workers, and
71   // owned by `outer`.
72   SemaphoreWorkerDelegate(TrackedRef<ThreadGroup> outer,
73                           bool is_excess,
74                           AtomicFlag* join_called_for_testing);
75   SemaphoreWorkerDelegate(const SemaphoreWorkerDelegate&) = delete;
76   SemaphoreWorkerDelegate& operator=(const SemaphoreWorkerDelegate&) = delete;
77 
78   // OnMainExit() handles the thread-affine cleanup;
79   // SemaphoreWorkerDelegate can thereafter safely be deleted from any thread.
80   ~SemaphoreWorkerDelegate() override = default;
81 
82   // WorkerThread::Delegate:
83   void OnMainEntry(WorkerThread* worker) override;
84   void OnMainExit(WorkerThread* worker) override;
85   RegisteredTaskSource GetWork(WorkerThread* worker) override;
86   RegisteredTaskSource SwapProcessedTask(RegisteredTaskSource task_source,
87                                          WorkerThread* worker) override;
88   void RecordUnnecessaryWakeup() override;
89   TimeDelta GetSleepTimeout() override;
90 
91  private:
outer() const92   const ThreadGroupSemaphore* outer() const {
93     return static_cast<ThreadGroupSemaphore*>(outer_.get());
94   }
outer()95   ThreadGroupSemaphore* outer() {
96     return static_cast<ThreadGroupSemaphore*>(outer_.get());
97   }
98 
99   // ThreadGroup::ThreadGroupWorkerDelegate:
100   bool CanGetWorkLockRequired(BaseScopedCommandsExecutor* executor,
101                               WorkerThread* worker)
102       EXCLUSIVE_LOCKS_REQUIRED(outer()->lock_) override;
103   void CleanupLockRequired(BaseScopedCommandsExecutor* executor,
104                            WorkerThread* worker)
105       EXCLUSIVE_LOCKS_REQUIRED(outer()->lock_) override;
106   void OnWorkerBecomesIdleLockRequired(BaseScopedCommandsExecutor* executor,
107                                        WorkerThread* worker)
108       EXCLUSIVE_LOCKS_REQUIRED(outer()->lock_) override;
109 
110   // Returns true if `worker` is allowed to cleanup and remove itself from the
111   // thread group. Called from GetWork() when no work is available.
112   bool CanCleanupLockRequired(const WorkerThread* worker)
113       EXCLUSIVE_LOCKS_REQUIRED(outer()->lock_) override;
114 };
115 
116 std::unique_ptr<ThreadGroup::BaseScopedCommandsExecutor>
GetExecutor()117 ThreadGroupSemaphore::GetExecutor() {
118   return std::make_unique<SemaphoreScopedCommandsExecutor>(this);
119 }
120 
ThreadGroupSemaphore(std::string_view histogram_label,std::string_view thread_group_label,ThreadType thread_type_hint,TrackedRef<TaskTracker> task_tracker,TrackedRef<Delegate> delegate)121 ThreadGroupSemaphore::ThreadGroupSemaphore(std::string_view histogram_label,
122                                            std::string_view thread_group_label,
123                                            ThreadType thread_type_hint,
124                                            TrackedRef<TaskTracker> task_tracker,
125                                            TrackedRef<Delegate> delegate)
126     : ThreadGroup(histogram_label,
127                   thread_group_label,
128                   thread_type_hint,
129                   std::move(task_tracker),
130                   std::move(delegate)),
131       tracked_ref_factory_(this) {
132   DCHECK(!thread_group_label_.empty());
133 }
134 
Start(size_t max_tasks,size_t max_best_effort_tasks,TimeDelta suggested_reclaim_time,scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner,WorkerThreadObserver * worker_thread_observer,WorkerEnvironment worker_environment,bool synchronous_thread_start_for_testing,std::optional<TimeDelta> may_block_threshold)135 void ThreadGroupSemaphore::Start(
136     size_t max_tasks,
137     size_t max_best_effort_tasks,
138     TimeDelta suggested_reclaim_time,
139     scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner,
140     WorkerThreadObserver* worker_thread_observer,
141     WorkerEnvironment worker_environment,
142     bool synchronous_thread_start_for_testing,
143     std::optional<TimeDelta> may_block_threshold) {
144   ThreadGroup::StartImpl(
145       max_tasks, max_best_effort_tasks, suggested_reclaim_time,
146       service_thread_task_runner, worker_thread_observer, worker_environment,
147       synchronous_thread_start_for_testing, may_block_threshold);
148 
149   SemaphoreScopedCommandsExecutor executor(this);
150   CheckedAutoLock auto_lock(lock_);
151   DCHECK(workers_.empty());
152   EnsureEnoughWorkersLockRequired(&executor);
153 }
154 
~ThreadGroupSemaphore()155 ThreadGroupSemaphore::~ThreadGroupSemaphore() {
156   // ThreadGroup should only ever be deleted:
157   //  1) In tests, after JoinForTesting().
158   //  2) In production, iff initialization failed.
159   // In both cases `workers_` should be empty.
160   DCHECK(workers_.empty());
161 }
162 
UpdateSortKey(TaskSource::Transaction transaction)163 void ThreadGroupSemaphore::UpdateSortKey(TaskSource::Transaction transaction) {
164   SemaphoreScopedCommandsExecutor executor(this);
165   UpdateSortKeyImpl(&executor, std::move(transaction));
166 }
167 
PushTaskSourceAndWakeUpWorkers(RegisteredTaskSourceAndTransaction transaction_with_task_source)168 void ThreadGroupSemaphore::PushTaskSourceAndWakeUpWorkers(
169     RegisteredTaskSourceAndTransaction transaction_with_task_source) {
170   SemaphoreScopedCommandsExecutor executor(this);
171   PushTaskSourceAndWakeUpWorkersImpl(&executor,
172                                      std::move(transaction_with_task_source));
173 }
174 
NumberOfIdleWorkersLockRequiredForTesting() const175 size_t ThreadGroupSemaphore::NumberOfIdleWorkersLockRequiredForTesting() const {
176   return ClampSub(workers_.size(), num_active_signals_);
177 }
178 
SemaphoreWorkerDelegate(TrackedRef<ThreadGroup> outer,bool is_excess,AtomicFlag * join_called_for_testing)179 ThreadGroupSemaphore::SemaphoreWorkerDelegate::SemaphoreWorkerDelegate(
180     TrackedRef<ThreadGroup> outer,
181     bool is_excess,
182     AtomicFlag* join_called_for_testing)
183     : ThreadGroupWorkerDelegate(std::move(outer), is_excess),
184       WorkerThreadSemaphore::Delegate(
185           &static_cast<ThreadGroupSemaphore*>(outer.get())->semaphore_,
186           join_called_for_testing) {}
187 
OnMainEntry(WorkerThread * worker)188 void ThreadGroupSemaphore::SemaphoreWorkerDelegate::OnMainEntry(
189     WorkerThread* worker) {
190   OnMainEntryImpl(worker);
191 }
192 
OnMainExit(WorkerThread * worker_base)193 void ThreadGroupSemaphore::SemaphoreWorkerDelegate::OnMainExit(
194     WorkerThread* worker_base) {
195   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
196 
197 #if DCHECK_IS_ON()
198   WorkerThreadSemaphore* worker =
199       static_cast<WorkerThreadSemaphore*>(worker_base);
200   {
201     bool shutdown_complete = outer()->task_tracker_->IsShutdownComplete();
202     CheckedAutoLock auto_lock(outer()->lock_);
203 
204     // `worker` should already have been removed from `workers_` by the time the
205     // thread is about to exit. (except in the cases where the thread group is
206     // no longer going to be used - in which case, it's fine for there to be
207     // invalid workers in the thread group).
208     if (!shutdown_complete && !outer()->join_called_for_testing_.IsSet()) {
209       DCHECK(!ContainsWorker(outer()->workers_, worker));
210     }
211   }
212 #endif
213 
214 #if BUILDFLAG(IS_WIN)
215   worker_only().win_thread_environment.reset();
216 #endif  // BUILDFLAG(IS_WIN)
217 
218   // Count cleaned up workers for tests. It's important to do this here
219   // instead of at the end of CleanupLockRequired() because some side-effects
220   // of cleaning up happen outside the lock (e.g. recording histograms) and
221   // resuming from tests must happen-after that point or checks on the main
222   // thread will be flaky (crbug.com/1047733).
223   CheckedAutoLock auto_lock(outer()->lock_);
224   ++outer()->num_workers_cleaned_up_for_testing_;
225 #if DCHECK_IS_ON()
226   outer()->some_workers_cleaned_up_for_testing_ = true;
227 #endif
228   if (outer()->num_workers_cleaned_up_for_testing_cv_) {
229     outer()->num_workers_cleaned_up_for_testing_cv_->Signal();
230   }
231 }
232 
CanGetWorkLockRequired(BaseScopedCommandsExecutor * executor,WorkerThread * worker_base)233 bool ThreadGroupSemaphore::SemaphoreWorkerDelegate::CanGetWorkLockRequired(
234     BaseScopedCommandsExecutor* executor,
235     WorkerThread* worker_base) {
236   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
237   WorkerThreadSemaphore* worker =
238       static_cast<WorkerThreadSemaphore*>(worker_base);
239 
240   AnnotateAcquiredLockAlias annotate(outer()->lock_, lock());
241   // `timed_out_` is set by TimedWait().
242   if (timed_out_) {
243     if (CanCleanupLockRequired(worker)) {
244       CleanupLockRequired(executor, worker);
245     }
246     return false;
247   }
248 
249   // If too many workers are currently awake (contrasted with ThreadGroupImpl
250   // where this decision is made by the number of workers which were signaled),
251   // this worker should not get work, until tasks are no longer in excess
252   // (i.e. max tasks increases).
253   if (outer()->num_active_signals_ > outer()->max_tasks_) {
254     OnWorkerBecomesIdleLockRequired(executor, worker);
255     return false;
256   }
257   return true;
258 }
259 
GetWork(WorkerThread * worker)260 RegisteredTaskSource ThreadGroupSemaphore::SemaphoreWorkerDelegate::GetWork(
261     WorkerThread* worker) {
262   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
263   DCHECK(!read_worker().current_task_priority);
264   DCHECK(!read_worker().current_shutdown_behavior);
265 
266   SemaphoreScopedCommandsExecutor executor(outer());
267   CheckedAutoLock auto_lock(outer()->lock_);
268   AnnotateAcquiredLockAlias alias(
269       outer()->lock_, static_cast<ThreadGroupSemaphore*>(outer_.get())->lock_);
270 
271   return GetWorkLockRequired(&executor, worker);
272 }
273 
274 RegisteredTaskSource
SwapProcessedTask(RegisteredTaskSource task_source,WorkerThread * worker)275 ThreadGroupSemaphore::SemaphoreWorkerDelegate::SwapProcessedTask(
276     RegisteredTaskSource task_source,
277     WorkerThread* worker) {
278   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
279   DCHECK(read_worker().current_task_priority);
280   DCHECK(read_worker().current_shutdown_behavior);
281 
282   // A transaction to the TaskSource to reenqueue, if any. Instantiated here as
283   // `TaskSource::lock_` is a UniversalPredecessor and must always be acquired
284   // prior to acquiring a second lock
285   std::optional<RegisteredTaskSourceAndTransaction>
286       transaction_with_task_source;
287   if (task_source) {
288     transaction_with_task_source.emplace(
289         RegisteredTaskSourceAndTransaction::FromTaskSource(
290             std::move(task_source)));
291   }
292 
293   SemaphoreScopedCommandsExecutor workers_executor(outer());
294   ScopedReenqueueExecutor reenqueue_executor;
295   CheckedAutoLock auto_lock(outer()->lock_);
296   AnnotateAcquiredLockAlias annotate(outer()->lock_, lock());
297 
298   // During shutdown, max_tasks may have been incremented in
299   // OnShutdownStartedLockRequired().
300   if (incremented_max_tasks_for_shutdown_) {
301     DCHECK(outer()->shutdown_started_);
302     outer()->DecrementMaxTasksLockRequired();
303     if (*read_worker().current_task_priority == TaskPriority::BEST_EFFORT) {
304       outer()->DecrementMaxBestEffortTasksLockRequired();
305     }
306     incremented_max_tasks_since_blocked_ = false;
307     incremented_max_best_effort_tasks_since_blocked_ = false;
308     incremented_max_tasks_for_shutdown_ = false;
309   }
310 
311   DCHECK(read_worker().blocking_start_time.is_null());
312   DCHECK(!incremented_max_tasks_since_blocked_);
313   DCHECK(!incremented_max_best_effort_tasks_since_blocked_);
314 
315   // Running task bookkeeping.
316   outer()->DecrementTasksRunningLockRequired(
317       *read_worker().current_task_priority);
318   write_worker().current_shutdown_behavior = std::nullopt;
319   write_worker().current_task_priority = std::nullopt;
320 
321   if (transaction_with_task_source) {
322     // If there is a task to enqueue, we can swap it for another task without
323     // changing DesiredNumAwakeWorkers(), and thus without worrying about
324     // signaling/waiting.
325     outer()->ReEnqueueTaskSourceLockRequired(
326         &workers_executor, &reenqueue_executor,
327         std::move(transaction_with_task_source.value()));
328 
329     return GetWorkLockRequired(&workers_executor,
330                                static_cast<WorkerThreadSemaphore*>(worker));
331   } else if (outer()->GetDesiredNumAwakeWorkersLockRequired() >=
332              outer()->num_active_signals_) {
333     // When the thread pool wants more work to be run but hasn't signaled
334     // workers for it yet we can take advantage and grab more work without
335     // signal/wait contention.
336     return GetWorkLockRequired(&workers_executor,
337                                static_cast<WorkerThreadSemaphore*>(worker));
338   }
339 
340   // In the case where the worker does not have a task source to exchange and
341   // the thread group doesn't want more work than the number of workers awake,
342   // it must WaitForWork(), to keep `num_active_signals` synchronized with the
343   // number of desired awake workers.
344   OnWorkerBecomesIdleLockRequired(&workers_executor, worker);
345   return nullptr;
346 }
347 
GetSleepTimeout()348 TimeDelta ThreadGroupSemaphore::SemaphoreWorkerDelegate::GetSleepTimeout() {
349   return ThreadPoolSleepTimeout();
350 }
351 
CanCleanupLockRequired(const WorkerThread * worker)352 bool ThreadGroupSemaphore::SemaphoreWorkerDelegate::CanCleanupLockRequired(
353     const WorkerThread* worker) {
354   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
355   return is_excess_ && LIKELY(!outer()->worker_cleanup_disallowed_for_testing_);
356 }
357 
CleanupLockRequired(BaseScopedCommandsExecutor * executor,WorkerThread * worker_base)358 void ThreadGroupSemaphore::SemaphoreWorkerDelegate::CleanupLockRequired(
359     BaseScopedCommandsExecutor* executor,
360     WorkerThread* worker_base) {
361   WorkerThreadSemaphore* worker =
362       static_cast<WorkerThreadSemaphore*>(worker_base);
363   DCHECK(!outer()->join_called_for_testing_.IsSet());
364   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
365 
366   worker->Cleanup();
367 
368   // Remove the worker from `workers_`.
369   DCHECK(!outer()->after_start().no_worker_reclaim ||
370          outer()->workers_.size() > outer()->after_start().initial_max_tasks);
371   auto num_erased = std::erase(outer()->workers_, worker);
372   CHECK_EQ(num_erased, 1u);
373 }
374 
375 void ThreadGroupSemaphore::SemaphoreWorkerDelegate::
OnWorkerBecomesIdleLockRequired(BaseScopedCommandsExecutor * executor,WorkerThread * worker_base)376     OnWorkerBecomesIdleLockRequired(BaseScopedCommandsExecutor* executor,
377                                     WorkerThread* worker_base) {
378   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
379   CHECK_GT(outer()->num_active_signals_, 0u);
380   --outer()->num_active_signals_;
381   outer()->idle_workers_set_cv_for_testing_.Signal();
382 }
383 
RecordUnnecessaryWakeup()384 void ThreadGroupSemaphore::SemaphoreWorkerDelegate::RecordUnnecessaryWakeup() {
385   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
386   RecordUnnecessaryWakeupImpl();
387 }
388 
JoinForTesting()389 void ThreadGroupSemaphore::JoinForTesting() {
390   decltype(workers_) workers_copy;
391   {
392     SemaphoreScopedCommandsExecutor executor(this);
393     CheckedAutoLock auto_lock(lock_);
394     AnnotateAcquiredLockAlias alias(lock_, executor.outer()->lock_);
395     priority_queue_.EnableFlushTaskSourcesOnDestroyForTesting();
396 
397     DCHECK_GT(workers_.size(), size_t(0))
398         << "Joined an unstarted thread group.";
399 
400     join_called_for_testing_.Set();
401 
402     // Ensure WorkerThreads in `workers_` do not attempt to cleanup while
403     // being joined.
404     worker_cleanup_disallowed_for_testing_ = true;
405 
406     // Make a copy of the WorkerThreads so that we can call
407     // WorkerThread::JoinForTesting() without holding `lock_` since
408     // WorkerThreads may need to access `workers_`.
409     workers_copy = workers_;
410 
411     for (size_t i = 0; i < workers_copy.size(); ++i) {
412       executor.ScheduleSignal();
413     }
414     join_called_for_testing_.Set();
415   }
416   for (const auto& worker : workers_copy) {
417     static_cast<WorkerThreadSemaphore*>(worker.get())->JoinForTesting();
418   }
419 
420   CheckedAutoLock auto_lock(lock_);
421   DCHECK(workers_ == workers_copy);
422   // Release `workers_` to clear their TrackedRef against `this`.
423   workers_.clear();
424 }
425 
CreateAndRegisterWorkerLockRequired(SemaphoreScopedCommandsExecutor * executor)426 void ThreadGroupSemaphore::CreateAndRegisterWorkerLockRequired(
427     SemaphoreScopedCommandsExecutor* executor) {
428   if (workers_.size() == kMaxNumberOfWorkers) {
429     return;
430   }
431   DCHECK_LT(workers_.size(), kMaxNumberOfWorkers);
432   if (workers_.size() >= max_tasks_) {
433     return;
434   }
435   DCHECK(!join_called_for_testing_.IsSet());
436 
437   // WorkerThread needs `lock_` as a predecessor for its thread lock because in
438   // GetWork(), `lock_` is first acquired and then the thread lock is acquired
439   // when GetLastUsedTime() is called on the worker by CanGetWorkLockRequired().
440   scoped_refptr<WorkerThreadSemaphore> worker =
441       MakeRefCounted<WorkerThreadSemaphore>(
442           thread_type_hint_,
443           std::make_unique<SemaphoreWorkerDelegate>(
444               tracked_ref_factory_.GetTrackedRef(),
445               /*is_excess=*/after_start().no_worker_reclaim
446                   ? workers_.size() >= after_start().initial_max_tasks
447                   : true,
448               &join_called_for_testing_),
449           task_tracker_, worker_sequence_num_++, &lock_, &semaphore_);
450   DCHECK(worker);
451   workers_.push_back(worker);
452   DCHECK_LE(workers_.size(), max_tasks_);
453   executor->ScheduleStart(worker);
454 }
455 
DidUpdateCanRunPolicy()456 void ThreadGroupSemaphore::DidUpdateCanRunPolicy() {
457   SemaphoreScopedCommandsExecutor executor(this);
458   CheckedAutoLock auto_lock(lock_);
459   EnsureEnoughWorkersLockRequired(&executor);
460 }
461 
GetWorkerDelegate(WorkerThread * worker)462 ThreadGroup::ThreadGroupWorkerDelegate* ThreadGroupSemaphore::GetWorkerDelegate(
463     WorkerThread* worker) {
464   return static_cast<ThreadGroup::ThreadGroupWorkerDelegate*>(
465       static_cast<SemaphoreWorkerDelegate*>(worker->delegate()));
466 }
467 
OnShutdownStarted()468 void ThreadGroupSemaphore::OnShutdownStarted() {
469   SemaphoreScopedCommandsExecutor executor(this);
470   OnShutDownStartedImpl(&executor);
471 }
472 
EnsureEnoughWorkersLockRequired(BaseScopedCommandsExecutor * base_executor)473 void ThreadGroupSemaphore::EnsureEnoughWorkersLockRequired(
474     BaseScopedCommandsExecutor* base_executor) {
475   // Don't do anything if the thread group isn't started.
476   if (max_tasks_ == 0 || UNLIKELY(join_called_for_testing_.IsSet())) {
477     return;
478   }
479 
480   SemaphoreScopedCommandsExecutor* executor =
481       static_cast<SemaphoreScopedCommandsExecutor*>(base_executor);
482 
483   const size_t desired_awake_workers = GetDesiredNumAwakeWorkersLockRequired();
484   // The +1 here is due to the fact that we always want there to be one idle
485   // worker.
486   const size_t num_workers_to_create =
487       std::min({static_cast<size_t>(after_start().max_num_workers_created),
488                 static_cast<size_t>(
489                     ClampSub(desired_awake_workers + 1, workers_.size()))});
490   for (size_t i = 0; i < num_workers_to_create; ++i) {
491     CreateAndRegisterWorkerLockRequired(executor);
492   }
493 
494   const size_t new_signals = std::min(
495       // Don't signal more than `workers_.size()` workers.
496       {ClampSub(workers_.size(), num_active_signals_),
497        ClampSub(desired_awake_workers, num_active_signals_)});
498   AnnotateAcquiredLockAlias alias(lock_, executor->outer()->lock_);
499   for (size_t i = 0; i < new_signals; ++i) {
500     executor->ScheduleSignal();
501   }
502 
503   // This function is called every time a task source is (re-)enqueued,
504   // hence the minimum priority needs to be updated.
505   UpdateMinAllowedPriorityLockRequired();
506 
507   // Ensure that the number of workers is periodically adjusted if needed.
508   MaybeScheduleAdjustMaxTasksLockRequired(executor);
509 }
510 
511 }  // namespace internal
512 }  // namespace base
513