1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/thread_pool/thread_group_impl.h"
6
7 #include <optional>
8 #include <string_view>
9
10 #include "base/auto_reset.h"
11 #include "base/metrics/histogram_macros.h"
12 #include "base/sequence_token.h"
13 #include "base/task/common/checked_lock.h"
14 #include "base/task/thread_pool/thread_group_worker_delegate.h"
15 #include "base/task/thread_pool/worker_thread_waitable_event.h"
16 #include "base/threading/scoped_blocking_call.h"
17 #include "base/threading/scoped_blocking_call_internal.h"
18 #include "base/threading/thread_checker.h"
19 #include "base/time/time_override.h"
20 #include "base/trace_event/base_tracing.h"
21 #include "third_party/abseil-cpp/absl/container/inlined_vector.h"
22
23 namespace base {
24 namespace internal {
25
26 namespace {
27
28 constexpr size_t kMaxNumberOfWorkers = 256;
29
30 } // namespace
31
32 // Upon destruction, executes actions that control the number of active workers.
33 // Useful to satisfy locking requirements of these actions.
34 class ThreadGroupImpl::ScopedCommandsExecutor
35 : public ThreadGroup::BaseScopedCommandsExecutor {
36 public:
ScopedCommandsExecutor(ThreadGroupImpl * outer)37 explicit ScopedCommandsExecutor(ThreadGroupImpl* outer)
38 : BaseScopedCommandsExecutor(outer) {}
39
40 ScopedCommandsExecutor(const ScopedCommandsExecutor&) = delete;
41 ScopedCommandsExecutor& operator=(const ScopedCommandsExecutor&) = delete;
~ScopedCommandsExecutor()42 ~ScopedCommandsExecutor() override {
43 CheckedLock::AssertNoLockHeldOnCurrentThread();
44
45 // Wake up workers.
46 for (auto worker : workers_to_wake_up_) {
47 worker->WakeUp();
48 }
49 }
50
ScheduleWakeUp(scoped_refptr<WorkerThreadWaitableEvent> worker)51 void ScheduleWakeUp(scoped_refptr<WorkerThreadWaitableEvent> worker) {
52 workers_to_wake_up_.emplace_back(std::move(worker));
53 }
54
55 private:
56 absl::InlinedVector<scoped_refptr<WorkerThreadWaitableEvent>, 2>
57 workers_to_wake_up_;
58 };
59
60 class ThreadGroupImpl::WaitableEventWorkerDelegate
61 : public ThreadGroup::ThreadGroupWorkerDelegate,
62 public WorkerThreadWaitableEvent::Delegate {
63 public:
64 // |outer| owns the worker for which this delegate is constructed. If
65 // |is_excess| is true, this worker will be eligible for reclaim.
66 explicit WaitableEventWorkerDelegate(TrackedRef<ThreadGroup> outer,
67 bool is_excess);
68 WaitableEventWorkerDelegate(const WaitableEventWorkerDelegate&) = delete;
69 WaitableEventWorkerDelegate& operator=(const WaitableEventWorkerDelegate&) =
70 delete;
71
72 // OnMainExit() handles the thread-affine cleanup;
73 // WaitableEventWorkerDelegate can thereafter safely be deleted from any
74 // thread.
75 ~WaitableEventWorkerDelegate() override = default;
76
77 // ThreadGroup::Delegate:
78 void OnMainEntry(WorkerThread* worker) override;
79 void OnMainExit(WorkerThread* worker) override;
80 RegisteredTaskSource GetWork(WorkerThread* worker) override;
81 RegisteredTaskSource SwapProcessedTask(RegisteredTaskSource task_source,
82 WorkerThread* worker) override;
83
84 // WorkerThreadWaitableEvent::Delegate:
85 void RecordUnnecessaryWakeup() override;
86 TimeDelta GetSleepTimeout() override;
87
88 private:
outer() const89 ThreadGroupImpl* outer() const {
90 return static_cast<ThreadGroupImpl*>(outer_.get());
91 }
92
93 // ThreadGroup::ThreadGroupWorkerDelegate:
94 bool CanGetWorkLockRequired(BaseScopedCommandsExecutor* executor,
95 WorkerThread* worker)
96 EXCLUSIVE_LOCKS_REQUIRED(outer()->lock_) override;
97 void CleanupLockRequired(BaseScopedCommandsExecutor* executor,
98 WorkerThread* worker)
99 EXCLUSIVE_LOCKS_REQUIRED(outer()->lock_) override;
100 void OnWorkerBecomesIdleLockRequired(BaseScopedCommandsExecutor* executor,
101 WorkerThread* worker)
102 EXCLUSIVE_LOCKS_REQUIRED(outer()->lock_) override;
103
104 // Returns true if |worker| is allowed to cleanup and remove itself from the
105 // thread group. Called from GetWork() when no work is available.
106 bool CanCleanupLockRequired(const WorkerThread* worker)
107 EXCLUSIVE_LOCKS_REQUIRED(outer()->lock_) override;
108 };
109
110 std::unique_ptr<ThreadGroup::BaseScopedCommandsExecutor>
GetExecutor()111 ThreadGroupImpl::GetExecutor() {
112 return static_cast<std::unique_ptr<BaseScopedCommandsExecutor>>(
113 std::make_unique<ScopedCommandsExecutor>(this));
114 }
115
ThreadGroupImpl(std::string_view histogram_label,std::string_view thread_group_label,ThreadType thread_type_hint,TrackedRef<TaskTracker> task_tracker,TrackedRef<Delegate> delegate)116 ThreadGroupImpl::ThreadGroupImpl(std::string_view histogram_label,
117 std::string_view thread_group_label,
118 ThreadType thread_type_hint,
119 TrackedRef<TaskTracker> task_tracker,
120 TrackedRef<Delegate> delegate)
121 : ThreadGroup(histogram_label,
122 thread_group_label,
123 thread_type_hint,
124 std::move(task_tracker),
125 std::move(delegate)),
126 tracked_ref_factory_(this) {
127 DCHECK(!thread_group_label_.empty());
128 }
129
Start(size_t max_tasks,size_t max_best_effort_tasks,TimeDelta suggested_reclaim_time,scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner,WorkerThreadObserver * worker_thread_observer,WorkerEnvironment worker_environment,bool synchronous_thread_start_for_testing,std::optional<TimeDelta> may_block_threshold)130 void ThreadGroupImpl::Start(
131 size_t max_tasks,
132 size_t max_best_effort_tasks,
133 TimeDelta suggested_reclaim_time,
134 scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner,
135 WorkerThreadObserver* worker_thread_observer,
136 WorkerEnvironment worker_environment,
137 bool synchronous_thread_start_for_testing,
138 std::optional<TimeDelta> may_block_threshold) {
139 ThreadGroup::StartImpl(
140 max_tasks, max_best_effort_tasks, suggested_reclaim_time,
141 service_thread_task_runner, worker_thread_observer, worker_environment,
142 synchronous_thread_start_for_testing, may_block_threshold);
143
144 ScopedCommandsExecutor executor(this);
145 CheckedAutoLock auto_lock(lock_);
146 DCHECK(workers_.empty());
147 EnsureEnoughWorkersLockRequired(&executor);
148 }
149
~ThreadGroupImpl()150 ThreadGroupImpl::~ThreadGroupImpl() {
151 // ThreadGroup should only ever be deleted:
152 // 1) In tests, after JoinForTesting().
153 // 2) In production, iff initialization failed.
154 // In both cases |workers_| should be empty.
155 DCHECK(workers_.empty());
156 }
157
UpdateSortKey(TaskSource::Transaction transaction)158 void ThreadGroupImpl::UpdateSortKey(TaskSource::Transaction transaction) {
159 ScopedCommandsExecutor executor(this);
160 UpdateSortKeyImpl(&executor, std::move(transaction));
161 }
162
PushTaskSourceAndWakeUpWorkers(RegisteredTaskSourceAndTransaction transaction_with_task_source)163 void ThreadGroupImpl::PushTaskSourceAndWakeUpWorkers(
164 RegisteredTaskSourceAndTransaction transaction_with_task_source) {
165 ScopedCommandsExecutor executor(this);
166 PushTaskSourceAndWakeUpWorkersImpl(&executor,
167 std::move(transaction_with_task_source));
168 }
169
WaitableEventWorkerDelegate(TrackedRef<ThreadGroup> outer,bool is_excess)170 ThreadGroupImpl::WaitableEventWorkerDelegate::WaitableEventWorkerDelegate(
171 TrackedRef<ThreadGroup> outer,
172 bool is_excess)
173 : ThreadGroupWorkerDelegate(std::move(outer), is_excess) {
174 // Bound in OnMainEntry().
175 DETACH_FROM_THREAD(worker_thread_checker_);
176 }
177
GetSleepTimeout()178 TimeDelta ThreadGroupImpl::WaitableEventWorkerDelegate::GetSleepTimeout() {
179 return ThreadPoolSleepTimeout();
180 }
181
OnMainEntry(WorkerThread * worker)182 void ThreadGroupImpl::WaitableEventWorkerDelegate::OnMainEntry(
183 WorkerThread* worker) {
184 OnMainEntryImpl(worker);
185 }
186
OnMainExit(WorkerThread * worker_base)187 void ThreadGroupImpl::WaitableEventWorkerDelegate::OnMainExit(
188 WorkerThread* worker_base) {
189 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
190
191 #if DCHECK_IS_ON()
192 WorkerThreadWaitableEvent* worker =
193 static_cast<WorkerThreadWaitableEvent*>(worker_base);
194 {
195 bool shutdown_complete = outer()->task_tracker_->IsShutdownComplete();
196 CheckedAutoLock auto_lock(outer()->lock_);
197
198 // |worker| should already have been removed from the idle workers set and
199 // |workers_| by the time the thread is about to exit. (except in the
200 // cases where the thread group is no longer going to be used - in which
201 // case, it's fine for there to be invalid workers in the thread group).
202 if (!shutdown_complete && !outer()->join_for_testing_started_) {
203 DCHECK(!outer()->idle_workers_set_.Contains(worker));
204 DCHECK(!ContainsWorker(outer()->workers_, worker));
205 }
206 }
207 #endif
208
209 #if BUILDFLAG(IS_WIN)
210 worker_only().win_thread_environment.reset();
211 #endif // BUILDFLAG(IS_WIN)
212
213 // Count cleaned up workers for tests. It's important to do this here
214 // instead of at the end of CleanupLockRequired() because some side-effects
215 // of cleaning up happen outside the lock (e.g. recording histograms) and
216 // resuming from tests must happen-after that point or checks on the main
217 // thread will be flaky (crbug.com/1047733).
218 CheckedAutoLock auto_lock(outer()->lock_);
219 ++outer()->num_workers_cleaned_up_for_testing_;
220 #if DCHECK_IS_ON()
221 outer()->some_workers_cleaned_up_for_testing_ = true;
222 #endif
223 if (outer()->num_workers_cleaned_up_for_testing_cv_) {
224 outer()->num_workers_cleaned_up_for_testing_cv_->Signal();
225 }
226 }
227
CanGetWorkLockRequired(BaseScopedCommandsExecutor * executor,WorkerThread * worker_base)228 bool ThreadGroupImpl::WaitableEventWorkerDelegate::CanGetWorkLockRequired(
229 BaseScopedCommandsExecutor* executor,
230 WorkerThread* worker_base) {
231 WorkerThreadWaitableEvent* worker =
232 static_cast<WorkerThreadWaitableEvent*>(worker_base);
233
234 const bool is_on_idle_workers_set = outer()->IsOnIdleSetLockRequired(worker);
235 DCHECK_EQ(is_on_idle_workers_set,
236 outer()->idle_workers_set_.Contains(worker));
237
238 AnnotateAcquiredLockAlias annotate(outer()->lock_, lock());
239 // This occurs when the when WorkerThread::Delegate::WaitForWork() times out
240 // (i.e. when the worker's wakes up after GetSleepTimeout()).
241 if (is_on_idle_workers_set) {
242 if (CanCleanupLockRequired(worker)) {
243 CleanupLockRequired(executor, worker);
244 }
245 return false;
246 }
247
248 // If too many workers are running, this worker should not get work, until
249 // tasks are no longer in excess (i.e. max tasks increases). This ensures that
250 // if this worker is in excess, it gets a chance to being cleaned up.
251 if (outer()->GetNumAwakeWorkersLockRequired() > outer()->max_tasks_) {
252 OnWorkerBecomesIdleLockRequired(executor, worker);
253 return false;
254 }
255
256 return true;
257 }
258
GetWork(WorkerThread * worker)259 RegisteredTaskSource ThreadGroupImpl::WaitableEventWorkerDelegate::GetWork(
260 WorkerThread* worker) {
261 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
262 DCHECK(!read_worker().current_task_priority);
263 DCHECK(!read_worker().current_shutdown_behavior);
264
265 ScopedCommandsExecutor executor(outer());
266 CheckedAutoLock auto_lock(outer()->lock_);
267 AnnotateAcquiredLockAlias alias(
268 outer()->lock_, static_cast<ThreadGroupImpl*>(outer_.get())->lock_);
269 return GetWorkLockRequired(&executor, worker);
270 }
271
272 RegisteredTaskSource
SwapProcessedTask(RegisteredTaskSource task_source,WorkerThread * worker)273 ThreadGroupImpl::WaitableEventWorkerDelegate::SwapProcessedTask(
274 RegisteredTaskSource task_source,
275 WorkerThread* worker) {
276 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
277 DCHECK(read_worker().current_task_priority);
278 DCHECK(read_worker().current_shutdown_behavior);
279
280 // A transaction to the TaskSource to reenqueue, if any. Instantiated here as
281 // |TaskSource::lock_| is a UniversalPredecessor and must always be acquired
282 // prior to acquiring a second lock
283 std::optional<RegisteredTaskSourceAndTransaction>
284 transaction_with_task_source;
285 if (task_source) {
286 transaction_with_task_source.emplace(
287 RegisteredTaskSourceAndTransaction::FromTaskSource(
288 std::move(task_source)));
289 }
290
291 // Calling WakeUp() guarantees that this WorkerThread will run Tasks from
292 // TaskSources returned by the GetWork() method of |delegate_| until it
293 // returns nullptr. Resetting |wake_up_event_| here doesn't break this
294 // invariant and avoids a useless loop iteration before going to sleep if
295 // WakeUp() is called while this WorkerThread is awake.
296 wake_up_event_.Reset();
297
298 ScopedCommandsExecutor workers_executor(outer());
299 ScopedReenqueueExecutor reenqueue_executor;
300 CheckedAutoLock auto_lock(outer()->lock_);
301 AnnotateAcquiredLockAlias annotate(outer()->lock_, lock());
302
303 // During shutdown, max_tasks may have been incremented in
304 // OnShutdownStartedLockRequired().
305 if (incremented_max_tasks_for_shutdown_) {
306 DCHECK(outer()->shutdown_started_);
307 outer()->DecrementMaxTasksLockRequired();
308 if (*read_worker().current_task_priority == TaskPriority::BEST_EFFORT) {
309 outer()->DecrementMaxBestEffortTasksLockRequired();
310 }
311 incremented_max_tasks_since_blocked_ = false;
312 incremented_max_best_effort_tasks_since_blocked_ = false;
313 incremented_max_tasks_for_shutdown_ = false;
314 }
315
316 DCHECK(read_worker().blocking_start_time.is_null());
317 DCHECK(!incremented_max_tasks_since_blocked_);
318 DCHECK(!incremented_max_best_effort_tasks_since_blocked_);
319
320 // Running task bookkeeping.
321 outer()->DecrementTasksRunningLockRequired(
322 *read_worker().current_task_priority);
323 write_worker().current_shutdown_behavior = std::nullopt;
324 write_worker().current_task_priority = std::nullopt;
325
326 if (transaction_with_task_source) {
327 outer()->ReEnqueueTaskSourceLockRequired(
328 &workers_executor, &reenqueue_executor,
329 std::move(transaction_with_task_source.value()));
330 }
331
332 return GetWorkLockRequired(&workers_executor,
333 static_cast<WorkerThreadWaitableEvent*>(worker));
334 }
335
CanCleanupLockRequired(const WorkerThread * worker)336 bool ThreadGroupImpl::WaitableEventWorkerDelegate::CanCleanupLockRequired(
337 const WorkerThread* worker) {
338 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
339 if (!is_excess_) {
340 return false;
341 }
342
343 const TimeTicks last_used_time = worker->GetLastUsedTime();
344 return !last_used_time.is_null() &&
345 subtle::TimeTicksNowIgnoringOverride() - last_used_time >=
346 outer()->after_start().suggested_reclaim_time &&
347 LIKELY(!outer()->worker_cleanup_disallowed_for_testing_);
348 }
349
CleanupLockRequired(BaseScopedCommandsExecutor * executor,WorkerThread * worker_base)350 void ThreadGroupImpl::WaitableEventWorkerDelegate::CleanupLockRequired(
351 BaseScopedCommandsExecutor* executor,
352 WorkerThread* worker_base) {
353 WorkerThreadWaitableEvent* worker =
354 static_cast<WorkerThreadWaitableEvent*>(worker_base);
355 DCHECK(!outer()->join_for_testing_started_);
356 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
357
358 worker->Cleanup();
359
360 if (outer()->IsOnIdleSetLockRequired(worker)) {
361 outer()->idle_workers_set_.Remove(worker);
362 }
363
364 // Remove the worker from |workers_|.
365 auto worker_iter = ranges::find(outer()->workers_, worker);
366 CHECK(worker_iter != outer()->workers_.end(), base::NotFatalUntil::M125);
367 outer()->workers_.erase(worker_iter);
368 }
369
370 void ThreadGroupImpl::WaitableEventWorkerDelegate::
OnWorkerBecomesIdleLockRequired(BaseScopedCommandsExecutor * executor,WorkerThread * worker_base)371 OnWorkerBecomesIdleLockRequired(BaseScopedCommandsExecutor* executor,
372 WorkerThread* worker_base) {
373 WorkerThreadWaitableEvent* worker =
374 static_cast<WorkerThreadWaitableEvent*>(worker_base);
375
376 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
377 DCHECK(!outer()->idle_workers_set_.Contains(worker));
378
379 // Add the worker to the idle set.
380 outer()->idle_workers_set_.Insert(worker);
381 DCHECK_LE(outer()->idle_workers_set_.Size(), outer()->workers_.size());
382 outer()->idle_workers_set_cv_for_testing_.Broadcast();
383 }
384
RecordUnnecessaryWakeup()385 void ThreadGroupImpl::WaitableEventWorkerDelegate::RecordUnnecessaryWakeup() {
386 RecordUnnecessaryWakeupImpl();
387 }
388
JoinForTesting()389 void ThreadGroupImpl::JoinForTesting() {
390 decltype(workers_) workers_copy;
391 {
392 CheckedAutoLock auto_lock(lock_);
393 priority_queue_.EnableFlushTaskSourcesOnDestroyForTesting();
394
395 DCHECK_GT(workers_.size(), size_t(0))
396 << "Joined an unstarted thread group.";
397
398 join_for_testing_started_ = true;
399
400 // Ensure WorkerThreads in |workers_| do not attempt to cleanup while
401 // being joined.
402 worker_cleanup_disallowed_for_testing_ = true;
403
404 // Make a copy of the WorkerThreads so that we can call
405 // WorkerThread::JoinForTesting() without holding |lock_| since
406 // WorkerThreads may need to access |workers_|.
407 workers_copy = workers_;
408 }
409 for (const auto& worker : workers_copy) {
410 static_cast<WorkerThreadWaitableEvent*>(worker.get())->JoinForTesting();
411 }
412
413 CheckedAutoLock auto_lock(lock_);
414 DCHECK(workers_ == workers_copy);
415 // Release |workers_| to clear their TrackedRef against |this|.
416 workers_.clear();
417 }
418
NumberOfIdleWorkersLockRequiredForTesting() const419 size_t ThreadGroupImpl::NumberOfIdleWorkersLockRequiredForTesting() const {
420 return idle_workers_set_.Size();
421 }
422
MaintainAtLeastOneIdleWorkerLockRequired(ScopedCommandsExecutor * executor)423 void ThreadGroupImpl::MaintainAtLeastOneIdleWorkerLockRequired(
424 ScopedCommandsExecutor* executor) {
425 if (workers_.size() == kMaxNumberOfWorkers) {
426 return;
427 }
428 DCHECK_LT(workers_.size(), kMaxNumberOfWorkers);
429
430 if (!idle_workers_set_.IsEmpty()) {
431 return;
432 }
433
434 if (workers_.size() >= max_tasks_) {
435 return;
436 }
437
438 scoped_refptr<WorkerThreadWaitableEvent> new_worker =
439 CreateAndRegisterWorkerLockRequired(executor);
440 DCHECK(new_worker);
441 idle_workers_set_.Insert(new_worker.get());
442 }
443
444 scoped_refptr<WorkerThreadWaitableEvent>
CreateAndRegisterWorkerLockRequired(ScopedCommandsExecutor * executor)445 ThreadGroupImpl::CreateAndRegisterWorkerLockRequired(
446 ScopedCommandsExecutor* executor) {
447 DCHECK(!join_for_testing_started_);
448 DCHECK_LT(workers_.size(), max_tasks_);
449 DCHECK_LT(workers_.size(), kMaxNumberOfWorkers);
450 DCHECK(idle_workers_set_.IsEmpty());
451
452 // WorkerThread needs |lock_| as a predecessor for its thread lock because in
453 // GetWork(), |lock_| is first acquired and then the thread lock is acquired
454 // when GetLastUsedTime() is called on the worker by CanGetWorkLockRequired().
455 scoped_refptr<WorkerThreadWaitableEvent> worker =
456 MakeRefCounted<WorkerThreadWaitableEvent>(
457 thread_type_hint_,
458 std::make_unique<WaitableEventWorkerDelegate>(
459 tracked_ref_factory_.GetTrackedRef(),
460 /* is_excess=*/after_start().no_worker_reclaim
461 ? workers_.size() >= after_start().initial_max_tasks
462 : true),
463 task_tracker_, worker_sequence_num_++, &lock_);
464
465 workers_.push_back(worker);
466 executor->ScheduleStart(worker);
467 DCHECK_LE(workers_.size(), max_tasks_);
468
469 return worker;
470 }
471
GetNumAwakeWorkersLockRequired() const472 size_t ThreadGroupImpl::GetNumAwakeWorkersLockRequired() const {
473 DCHECK_GE(workers_.size(), idle_workers_set_.Size());
474 size_t num_awake_workers = workers_.size() - idle_workers_set_.Size();
475 DCHECK_GE(num_awake_workers, num_running_tasks_);
476 return num_awake_workers;
477 }
478
DidUpdateCanRunPolicy()479 void ThreadGroupImpl::DidUpdateCanRunPolicy() {
480 ScopedCommandsExecutor executor(this);
481 CheckedAutoLock auto_lock(lock_);
482 EnsureEnoughWorkersLockRequired(&executor);
483 }
484
GetWorkerDelegate(WorkerThread * worker)485 ThreadGroup::ThreadGroupWorkerDelegate* ThreadGroupImpl::GetWorkerDelegate(
486 WorkerThread* worker) {
487 return static_cast<ThreadGroup::ThreadGroupWorkerDelegate*>(
488 static_cast<WaitableEventWorkerDelegate*>(worker->delegate()));
489 }
490
OnShutdownStarted()491 void ThreadGroupImpl::OnShutdownStarted() {
492 ScopedCommandsExecutor executor(this);
493 OnShutDownStartedImpl(&executor);
494 }
495
EnsureEnoughWorkersLockRequired(BaseScopedCommandsExecutor * base_executor)496 void ThreadGroupImpl::EnsureEnoughWorkersLockRequired(
497 BaseScopedCommandsExecutor* base_executor) {
498 // Don't do anything if the thread group isn't started.
499 if (max_tasks_ == 0 || UNLIKELY(join_for_testing_started_)) {
500 return;
501 }
502
503 ScopedCommandsExecutor* executor =
504 static_cast<ScopedCommandsExecutor*>(base_executor);
505
506 const size_t desired_num_awake_workers =
507 GetDesiredNumAwakeWorkersLockRequired();
508 const size_t num_awake_workers = GetNumAwakeWorkersLockRequired();
509
510 size_t num_workers_to_wake_up =
511 ClampSub(desired_num_awake_workers, num_awake_workers);
512 num_workers_to_wake_up = std::min(num_workers_to_wake_up, size_t(2U));
513
514 // Wake up the appropriate number of workers.
515 for (size_t i = 0; i < num_workers_to_wake_up; ++i) {
516 MaintainAtLeastOneIdleWorkerLockRequired(executor);
517 WorkerThreadWaitableEvent* worker_to_wakeup = idle_workers_set_.Take();
518 DCHECK(worker_to_wakeup);
519 executor->ScheduleWakeUp(worker_to_wakeup);
520 }
521
522 // In the case where the loop above didn't wake up any worker and we don't
523 // have excess workers, the idle worker should be maintained. This happens
524 // when called from the last worker awake, or a recent increase in |max_tasks|
525 // now makes it possible to keep an idle worker.
526 if (desired_num_awake_workers == num_awake_workers) {
527 MaintainAtLeastOneIdleWorkerLockRequired(executor);
528 }
529
530 // This function is called every time a task source is (re-)enqueued,
531 // hence the minimum priority needs to be updated.
532 UpdateMinAllowedPriorityLockRequired();
533
534 // Ensure that the number of workers is periodically adjusted if needed.
535 MaybeScheduleAdjustMaxTasksLockRequired(executor);
536 }
537
IsOnIdleSetLockRequired(WorkerThreadWaitableEvent * worker) const538 bool ThreadGroupImpl::IsOnIdleSetLockRequired(
539 WorkerThreadWaitableEvent* worker) const {
540 // To avoid searching through the idle set : use GetLastUsedTime() not being
541 // null (or being directly on top of the idle set) as a proxy for being on
542 // the idle set.
543 return idle_workers_set_.Peek() == worker ||
544 !worker->GetLastUsedTime().is_null();
545 }
546
547 } // namespace internal
548 } // namespace base
549