1 // Copyright 2024 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/thread_pool/thread_group_semaphore.h"
6
7 #include <algorithm>
8 #include <string_view>
9
10 #include "base/metrics/histogram_macros.h"
11 #include "base/sequence_token.h"
12 #include "base/strings/stringprintf.h"
13 #include "base/task/common/checked_lock.h"
14 #include "base/task/thread_pool/thread_group.h"
15 #include "base/task/thread_pool/worker_thread_semaphore.h"
16 #include "base/threading/scoped_blocking_call.h"
17 #include "base/threading/scoped_blocking_call_internal.h"
18 #include "base/threading/thread_checker.h"
19 #include "base/time/time_override.h"
20 #include "base/trace_event/base_tracing.h"
21
22 namespace base {
23 namespace internal {
24
25 namespace {
26 constexpr size_t kMaxNumberOfWorkers = 256;
27 } // namespace
28
29 // Upon destruction, executes actions that control the number of active workers.
30 // Useful to satisfy locking requirements of these actions.
31 class ThreadGroupSemaphore::SemaphoreScopedCommandsExecutor
32 : public ThreadGroup::BaseScopedCommandsExecutor {
33 public:
SemaphoreScopedCommandsExecutor(ThreadGroupSemaphore * outer)34 explicit SemaphoreScopedCommandsExecutor(ThreadGroupSemaphore* outer)
35 : BaseScopedCommandsExecutor(outer) {}
36
37 SemaphoreScopedCommandsExecutor(const SemaphoreScopedCommandsExecutor&) =
38 delete;
39 SemaphoreScopedCommandsExecutor& operator=(
40 const SemaphoreScopedCommandsExecutor&) = delete;
~SemaphoreScopedCommandsExecutor()41 ~SemaphoreScopedCommandsExecutor() override {
42 CheckedLock::AssertNoLockHeldOnCurrentThread();
43 for (int i = 0; i < semaphore_signal_count_; ++i) {
44 TRACE_EVENT_INSTANT("wakeup.flow", "WorkerThreadSemaphore::Signal",
45 perfetto::Flow::FromPointer(&outer()->semaphore_));
46 outer()->semaphore_.Signal();
47 }
48 }
49
ScheduleSignal()50 void ScheduleSignal() EXCLUSIVE_LOCKS_REQUIRED(outer()->lock_) {
51 ++semaphore_signal_count_;
52 ++outer()->num_active_signals_;
53 }
54
55 private:
56 friend class ThreadGroupSemaphore;
57
outer()58 ThreadGroupSemaphore* outer() {
59 return static_cast<ThreadGroupSemaphore*>(outer_);
60 }
61
62 int semaphore_signal_count_ = 0;
63 };
64
65 class ThreadGroupSemaphore::SemaphoreWorkerDelegate
66 : public ThreadGroup::ThreadGroupWorkerDelegate,
67 public WorkerThreadSemaphore::Delegate {
68 public:
69 // `outer` owns the worker for which this delegate is
70 // constructed. `join_called_for_testing` is shared amongst workers, and
71 // owned by `outer`.
72 SemaphoreWorkerDelegate(TrackedRef<ThreadGroup> outer,
73 bool is_excess,
74 AtomicFlag* join_called_for_testing);
75 SemaphoreWorkerDelegate(const SemaphoreWorkerDelegate&) = delete;
76 SemaphoreWorkerDelegate& operator=(const SemaphoreWorkerDelegate&) = delete;
77
78 // OnMainExit() handles the thread-affine cleanup;
79 // SemaphoreWorkerDelegate can thereafter safely be deleted from any thread.
80 ~SemaphoreWorkerDelegate() override = default;
81
82 // WorkerThread::Delegate:
83 void OnMainEntry(WorkerThread* worker) override;
84 void OnMainExit(WorkerThread* worker) override;
85 RegisteredTaskSource GetWork(WorkerThread* worker) override;
86 RegisteredTaskSource SwapProcessedTask(RegisteredTaskSource task_source,
87 WorkerThread* worker) override;
88 void RecordUnnecessaryWakeup() override;
89 TimeDelta GetSleepTimeout() override;
90
91 private:
outer() const92 const ThreadGroupSemaphore* outer() const {
93 return static_cast<ThreadGroupSemaphore*>(outer_.get());
94 }
outer()95 ThreadGroupSemaphore* outer() {
96 return static_cast<ThreadGroupSemaphore*>(outer_.get());
97 }
98
99 // ThreadGroup::ThreadGroupWorkerDelegate:
100 bool CanGetWorkLockRequired(BaseScopedCommandsExecutor* executor,
101 WorkerThread* worker)
102 EXCLUSIVE_LOCKS_REQUIRED(outer()->lock_) override;
103 void CleanupLockRequired(BaseScopedCommandsExecutor* executor,
104 WorkerThread* worker)
105 EXCLUSIVE_LOCKS_REQUIRED(outer()->lock_) override;
106 void OnWorkerBecomesIdleLockRequired(BaseScopedCommandsExecutor* executor,
107 WorkerThread* worker)
108 EXCLUSIVE_LOCKS_REQUIRED(outer()->lock_) override;
109
110 // Returns true if `worker` is allowed to cleanup and remove itself from the
111 // thread group. Called from GetWork() when no work is available.
112 bool CanCleanupLockRequired(const WorkerThread* worker)
113 EXCLUSIVE_LOCKS_REQUIRED(outer()->lock_) override;
114 };
115
116 std::unique_ptr<ThreadGroup::BaseScopedCommandsExecutor>
GetExecutor()117 ThreadGroupSemaphore::GetExecutor() {
118 return std::make_unique<SemaphoreScopedCommandsExecutor>(this);
119 }
120
ThreadGroupSemaphore(std::string_view histogram_label,std::string_view thread_group_label,ThreadType thread_type_hint,TrackedRef<TaskTracker> task_tracker,TrackedRef<Delegate> delegate)121 ThreadGroupSemaphore::ThreadGroupSemaphore(std::string_view histogram_label,
122 std::string_view thread_group_label,
123 ThreadType thread_type_hint,
124 TrackedRef<TaskTracker> task_tracker,
125 TrackedRef<Delegate> delegate)
126 : ThreadGroup(histogram_label,
127 thread_group_label,
128 thread_type_hint,
129 std::move(task_tracker),
130 std::move(delegate)),
131 tracked_ref_factory_(this) {
132 DCHECK(!thread_group_label_.empty());
133 }
134
Start(size_t max_tasks,size_t max_best_effort_tasks,TimeDelta suggested_reclaim_time,scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner,WorkerThreadObserver * worker_thread_observer,WorkerEnvironment worker_environment,bool synchronous_thread_start_for_testing,std::optional<TimeDelta> may_block_threshold)135 void ThreadGroupSemaphore::Start(
136 size_t max_tasks,
137 size_t max_best_effort_tasks,
138 TimeDelta suggested_reclaim_time,
139 scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner,
140 WorkerThreadObserver* worker_thread_observer,
141 WorkerEnvironment worker_environment,
142 bool synchronous_thread_start_for_testing,
143 std::optional<TimeDelta> may_block_threshold) {
144 ThreadGroup::StartImpl(
145 max_tasks, max_best_effort_tasks, suggested_reclaim_time,
146 service_thread_task_runner, worker_thread_observer, worker_environment,
147 synchronous_thread_start_for_testing, may_block_threshold);
148
149 SemaphoreScopedCommandsExecutor executor(this);
150 CheckedAutoLock auto_lock(lock_);
151 DCHECK(workers_.empty());
152 EnsureEnoughWorkersLockRequired(&executor);
153 }
154
~ThreadGroupSemaphore()155 ThreadGroupSemaphore::~ThreadGroupSemaphore() {
156 // ThreadGroup should only ever be deleted:
157 // 1) In tests, after JoinForTesting().
158 // 2) In production, iff initialization failed.
159 // In both cases `workers_` should be empty.
160 DCHECK(workers_.empty());
161 }
162
UpdateSortKey(TaskSource::Transaction transaction)163 void ThreadGroupSemaphore::UpdateSortKey(TaskSource::Transaction transaction) {
164 SemaphoreScopedCommandsExecutor executor(this);
165 UpdateSortKeyImpl(&executor, std::move(transaction));
166 }
167
PushTaskSourceAndWakeUpWorkers(RegisteredTaskSourceAndTransaction transaction_with_task_source)168 void ThreadGroupSemaphore::PushTaskSourceAndWakeUpWorkers(
169 RegisteredTaskSourceAndTransaction transaction_with_task_source) {
170 SemaphoreScopedCommandsExecutor executor(this);
171 PushTaskSourceAndWakeUpWorkersImpl(&executor,
172 std::move(transaction_with_task_source));
173 }
174
NumberOfIdleWorkersLockRequiredForTesting() const175 size_t ThreadGroupSemaphore::NumberOfIdleWorkersLockRequiredForTesting() const {
176 return ClampSub(workers_.size(), num_active_signals_);
177 }
178
SemaphoreWorkerDelegate(TrackedRef<ThreadGroup> outer,bool is_excess,AtomicFlag * join_called_for_testing)179 ThreadGroupSemaphore::SemaphoreWorkerDelegate::SemaphoreWorkerDelegate(
180 TrackedRef<ThreadGroup> outer,
181 bool is_excess,
182 AtomicFlag* join_called_for_testing)
183 : ThreadGroupWorkerDelegate(std::move(outer), is_excess),
184 WorkerThreadSemaphore::Delegate(
185 &static_cast<ThreadGroupSemaphore*>(outer.get())->semaphore_,
186 join_called_for_testing) {}
187
OnMainEntry(WorkerThread * worker)188 void ThreadGroupSemaphore::SemaphoreWorkerDelegate::OnMainEntry(
189 WorkerThread* worker) {
190 OnMainEntryImpl(worker);
191 }
192
OnMainExit(WorkerThread * worker_base)193 void ThreadGroupSemaphore::SemaphoreWorkerDelegate::OnMainExit(
194 WorkerThread* worker_base) {
195 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
196
197 #if DCHECK_IS_ON()
198 WorkerThreadSemaphore* worker =
199 static_cast<WorkerThreadSemaphore*>(worker_base);
200 {
201 bool shutdown_complete = outer()->task_tracker_->IsShutdownComplete();
202 CheckedAutoLock auto_lock(outer()->lock_);
203
204 // `worker` should already have been removed from `workers_` by the time the
205 // thread is about to exit. (except in the cases where the thread group is
206 // no longer going to be used - in which case, it's fine for there to be
207 // invalid workers in the thread group).
208 if (!shutdown_complete && !outer()->join_called_for_testing_.IsSet()) {
209 DCHECK(!ContainsWorker(outer()->workers_, worker));
210 }
211 }
212 #endif
213
214 #if BUILDFLAG(IS_WIN)
215 worker_only().win_thread_environment.reset();
216 #endif // BUILDFLAG(IS_WIN)
217
218 // Count cleaned up workers for tests. It's important to do this here
219 // instead of at the end of CleanupLockRequired() because some side-effects
220 // of cleaning up happen outside the lock (e.g. recording histograms) and
221 // resuming from tests must happen-after that point or checks on the main
222 // thread will be flaky (crbug.com/1047733).
223 CheckedAutoLock auto_lock(outer()->lock_);
224 ++outer()->num_workers_cleaned_up_for_testing_;
225 #if DCHECK_IS_ON()
226 outer()->some_workers_cleaned_up_for_testing_ = true;
227 #endif
228 if (outer()->num_workers_cleaned_up_for_testing_cv_) {
229 outer()->num_workers_cleaned_up_for_testing_cv_->Signal();
230 }
231 }
232
CanGetWorkLockRequired(BaseScopedCommandsExecutor * executor,WorkerThread * worker_base)233 bool ThreadGroupSemaphore::SemaphoreWorkerDelegate::CanGetWorkLockRequired(
234 BaseScopedCommandsExecutor* executor,
235 WorkerThread* worker_base) {
236 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
237 WorkerThreadSemaphore* worker =
238 static_cast<WorkerThreadSemaphore*>(worker_base);
239
240 AnnotateAcquiredLockAlias annotate(outer()->lock_, lock());
241 // `timed_out_` is set by TimedWait().
242 if (timed_out_) {
243 if (CanCleanupLockRequired(worker)) {
244 CleanupLockRequired(executor, worker);
245 }
246 return false;
247 }
248
249 // If too many workers are currently awake (contrasted with ThreadGroupImpl
250 // where this decision is made by the number of workers which were signaled),
251 // this worker should not get work, until tasks are no longer in excess
252 // (i.e. max tasks increases).
253 if (outer()->num_active_signals_ > outer()->max_tasks_) {
254 OnWorkerBecomesIdleLockRequired(executor, worker);
255 return false;
256 }
257 return true;
258 }
259
GetWork(WorkerThread * worker)260 RegisteredTaskSource ThreadGroupSemaphore::SemaphoreWorkerDelegate::GetWork(
261 WorkerThread* worker) {
262 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
263 DCHECK(!read_worker().current_task_priority);
264 DCHECK(!read_worker().current_shutdown_behavior);
265
266 SemaphoreScopedCommandsExecutor executor(outer());
267 CheckedAutoLock auto_lock(outer()->lock_);
268 AnnotateAcquiredLockAlias alias(
269 outer()->lock_, static_cast<ThreadGroupSemaphore*>(outer_.get())->lock_);
270
271 return GetWorkLockRequired(&executor, worker);
272 }
273
274 RegisteredTaskSource
SwapProcessedTask(RegisteredTaskSource task_source,WorkerThread * worker)275 ThreadGroupSemaphore::SemaphoreWorkerDelegate::SwapProcessedTask(
276 RegisteredTaskSource task_source,
277 WorkerThread* worker) {
278 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
279 DCHECK(read_worker().current_task_priority);
280 DCHECK(read_worker().current_shutdown_behavior);
281
282 // A transaction to the TaskSource to reenqueue, if any. Instantiated here as
283 // `TaskSource::lock_` is a UniversalPredecessor and must always be acquired
284 // prior to acquiring a second lock
285 std::optional<RegisteredTaskSourceAndTransaction>
286 transaction_with_task_source;
287 if (task_source) {
288 transaction_with_task_source.emplace(
289 RegisteredTaskSourceAndTransaction::FromTaskSource(
290 std::move(task_source)));
291 }
292
293 SemaphoreScopedCommandsExecutor workers_executor(outer());
294 ScopedReenqueueExecutor reenqueue_executor;
295 CheckedAutoLock auto_lock(outer()->lock_);
296 AnnotateAcquiredLockAlias annotate(outer()->lock_, lock());
297
298 // During shutdown, max_tasks may have been incremented in
299 // OnShutdownStartedLockRequired().
300 if (incremented_max_tasks_for_shutdown_) {
301 DCHECK(outer()->shutdown_started_);
302 outer()->DecrementMaxTasksLockRequired();
303 if (*read_worker().current_task_priority == TaskPriority::BEST_EFFORT) {
304 outer()->DecrementMaxBestEffortTasksLockRequired();
305 }
306 incremented_max_tasks_since_blocked_ = false;
307 incremented_max_best_effort_tasks_since_blocked_ = false;
308 incremented_max_tasks_for_shutdown_ = false;
309 }
310
311 DCHECK(read_worker().blocking_start_time.is_null());
312 DCHECK(!incremented_max_tasks_since_blocked_);
313 DCHECK(!incremented_max_best_effort_tasks_since_blocked_);
314
315 // Running task bookkeeping.
316 outer()->DecrementTasksRunningLockRequired(
317 *read_worker().current_task_priority);
318 write_worker().current_shutdown_behavior = std::nullopt;
319 write_worker().current_task_priority = std::nullopt;
320
321 if (transaction_with_task_source) {
322 // If there is a task to enqueue, we can swap it for another task without
323 // changing DesiredNumAwakeWorkers(), and thus without worrying about
324 // signaling/waiting.
325 outer()->ReEnqueueTaskSourceLockRequired(
326 &workers_executor, &reenqueue_executor,
327 std::move(transaction_with_task_source.value()));
328
329 return GetWorkLockRequired(&workers_executor,
330 static_cast<WorkerThreadSemaphore*>(worker));
331 } else if (outer()->GetDesiredNumAwakeWorkersLockRequired() >=
332 outer()->num_active_signals_) {
333 // When the thread pool wants more work to be run but hasn't signaled
334 // workers for it yet we can take advantage and grab more work without
335 // signal/wait contention.
336 return GetWorkLockRequired(&workers_executor,
337 static_cast<WorkerThreadSemaphore*>(worker));
338 }
339
340 // In the case where the worker does not have a task source to exchange and
341 // the thread group doesn't want more work than the number of workers awake,
342 // it must WaitForWork(), to keep `num_active_signals` synchronized with the
343 // number of desired awake workers.
344 OnWorkerBecomesIdleLockRequired(&workers_executor, worker);
345 return nullptr;
346 }
347
GetSleepTimeout()348 TimeDelta ThreadGroupSemaphore::SemaphoreWorkerDelegate::GetSleepTimeout() {
349 return ThreadPoolSleepTimeout();
350 }
351
CanCleanupLockRequired(const WorkerThread * worker)352 bool ThreadGroupSemaphore::SemaphoreWorkerDelegate::CanCleanupLockRequired(
353 const WorkerThread* worker) {
354 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
355 return is_excess_ && LIKELY(!outer()->worker_cleanup_disallowed_for_testing_);
356 }
357
CleanupLockRequired(BaseScopedCommandsExecutor * executor,WorkerThread * worker_base)358 void ThreadGroupSemaphore::SemaphoreWorkerDelegate::CleanupLockRequired(
359 BaseScopedCommandsExecutor* executor,
360 WorkerThread* worker_base) {
361 WorkerThreadSemaphore* worker =
362 static_cast<WorkerThreadSemaphore*>(worker_base);
363 DCHECK(!outer()->join_called_for_testing_.IsSet());
364 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
365
366 worker->Cleanup();
367
368 // Remove the worker from `workers_`.
369 DCHECK(!outer()->after_start().no_worker_reclaim ||
370 outer()->workers_.size() > outer()->after_start().initial_max_tasks);
371 auto num_erased = std::erase(outer()->workers_, worker);
372 CHECK_EQ(num_erased, 1u);
373 }
374
375 void ThreadGroupSemaphore::SemaphoreWorkerDelegate::
OnWorkerBecomesIdleLockRequired(BaseScopedCommandsExecutor * executor,WorkerThread * worker_base)376 OnWorkerBecomesIdleLockRequired(BaseScopedCommandsExecutor* executor,
377 WorkerThread* worker_base) {
378 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
379 CHECK_GT(outer()->num_active_signals_, 0u);
380 --outer()->num_active_signals_;
381 outer()->idle_workers_set_cv_for_testing_.Signal();
382 }
383
RecordUnnecessaryWakeup()384 void ThreadGroupSemaphore::SemaphoreWorkerDelegate::RecordUnnecessaryWakeup() {
385 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
386 RecordUnnecessaryWakeupImpl();
387 }
388
JoinForTesting()389 void ThreadGroupSemaphore::JoinForTesting() {
390 decltype(workers_) workers_copy;
391 {
392 SemaphoreScopedCommandsExecutor executor(this);
393 CheckedAutoLock auto_lock(lock_);
394 AnnotateAcquiredLockAlias alias(lock_, executor.outer()->lock_);
395 priority_queue_.EnableFlushTaskSourcesOnDestroyForTesting();
396
397 DCHECK_GT(workers_.size(), size_t(0))
398 << "Joined an unstarted thread group.";
399
400 join_called_for_testing_.Set();
401
402 // Ensure WorkerThreads in `workers_` do not attempt to cleanup while
403 // being joined.
404 worker_cleanup_disallowed_for_testing_ = true;
405
406 // Make a copy of the WorkerThreads so that we can call
407 // WorkerThread::JoinForTesting() without holding `lock_` since
408 // WorkerThreads may need to access `workers_`.
409 workers_copy = workers_;
410
411 for (size_t i = 0; i < workers_copy.size(); ++i) {
412 executor.ScheduleSignal();
413 }
414 join_called_for_testing_.Set();
415 }
416 for (const auto& worker : workers_copy) {
417 static_cast<WorkerThreadSemaphore*>(worker.get())->JoinForTesting();
418 }
419
420 CheckedAutoLock auto_lock(lock_);
421 DCHECK(workers_ == workers_copy);
422 // Release `workers_` to clear their TrackedRef against `this`.
423 workers_.clear();
424 }
425
CreateAndRegisterWorkerLockRequired(SemaphoreScopedCommandsExecutor * executor)426 void ThreadGroupSemaphore::CreateAndRegisterWorkerLockRequired(
427 SemaphoreScopedCommandsExecutor* executor) {
428 if (workers_.size() == kMaxNumberOfWorkers) {
429 return;
430 }
431 DCHECK_LT(workers_.size(), kMaxNumberOfWorkers);
432 if (workers_.size() >= max_tasks_) {
433 return;
434 }
435 DCHECK(!join_called_for_testing_.IsSet());
436
437 // WorkerThread needs `lock_` as a predecessor for its thread lock because in
438 // GetWork(), `lock_` is first acquired and then the thread lock is acquired
439 // when GetLastUsedTime() is called on the worker by CanGetWorkLockRequired().
440 scoped_refptr<WorkerThreadSemaphore> worker =
441 MakeRefCounted<WorkerThreadSemaphore>(
442 thread_type_hint_,
443 std::make_unique<SemaphoreWorkerDelegate>(
444 tracked_ref_factory_.GetTrackedRef(),
445 /*is_excess=*/after_start().no_worker_reclaim
446 ? workers_.size() >= after_start().initial_max_tasks
447 : true,
448 &join_called_for_testing_),
449 task_tracker_, worker_sequence_num_++, &lock_, &semaphore_);
450 DCHECK(worker);
451 workers_.push_back(worker);
452 DCHECK_LE(workers_.size(), max_tasks_);
453 executor->ScheduleStart(worker);
454 }
455
DidUpdateCanRunPolicy()456 void ThreadGroupSemaphore::DidUpdateCanRunPolicy() {
457 SemaphoreScopedCommandsExecutor executor(this);
458 CheckedAutoLock auto_lock(lock_);
459 EnsureEnoughWorkersLockRequired(&executor);
460 }
461
GetWorkerDelegate(WorkerThread * worker)462 ThreadGroup::ThreadGroupWorkerDelegate* ThreadGroupSemaphore::GetWorkerDelegate(
463 WorkerThread* worker) {
464 return static_cast<ThreadGroup::ThreadGroupWorkerDelegate*>(
465 static_cast<SemaphoreWorkerDelegate*>(worker->delegate()));
466 }
467
OnShutdownStarted()468 void ThreadGroupSemaphore::OnShutdownStarted() {
469 SemaphoreScopedCommandsExecutor executor(this);
470 OnShutDownStartedImpl(&executor);
471 }
472
EnsureEnoughWorkersLockRequired(BaseScopedCommandsExecutor * base_executor)473 void ThreadGroupSemaphore::EnsureEnoughWorkersLockRequired(
474 BaseScopedCommandsExecutor* base_executor) {
475 // Don't do anything if the thread group isn't started.
476 if (max_tasks_ == 0 || UNLIKELY(join_called_for_testing_.IsSet())) {
477 return;
478 }
479
480 SemaphoreScopedCommandsExecutor* executor =
481 static_cast<SemaphoreScopedCommandsExecutor*>(base_executor);
482
483 const size_t desired_awake_workers = GetDesiredNumAwakeWorkersLockRequired();
484 // The +1 here is due to the fact that we always want there to be one idle
485 // worker.
486 const size_t num_workers_to_create =
487 std::min({static_cast<size_t>(after_start().max_num_workers_created),
488 static_cast<size_t>(
489 ClampSub(desired_awake_workers + 1, workers_.size()))});
490 for (size_t i = 0; i < num_workers_to_create; ++i) {
491 CreateAndRegisterWorkerLockRequired(executor);
492 }
493
494 const size_t new_signals = std::min(
495 // Don't signal more than `workers_.size()` workers.
496 {ClampSub(workers_.size(), num_active_signals_),
497 ClampSub(desired_awake_workers, num_active_signals_)});
498 AnnotateAcquiredLockAlias alias(lock_, executor->outer()->lock_);
499 for (size_t i = 0; i < new_signals; ++i) {
500 executor->ScheduleSignal();
501 }
502
503 // This function is called every time a task source is (re-)enqueued,
504 // hence the minimum priority needs to be updated.
505 UpdateMinAllowedPriorityLockRequired();
506
507 // Ensure that the number of workers is periodically adjusted if needed.
508 MaybeScheduleAdjustMaxTasksLockRequired(executor);
509 }
510
511 } // namespace internal
512 } // namespace base
513