1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/thread_pool/worker_thread.h"
6
7 #include <stddef.h>
8
9 #include <algorithm>
10 #include <atomic>
11 #include <optional>
12 #include <utility>
13
14 #include "base/check_op.h"
15 #include "base/compiler_specific.h"
16 #include "base/debug/alias.h"
17 #include "base/feature_list.h"
18 #include "base/functional/callback_helpers.h"
19 #include "base/synchronization/waitable_event.h"
20 #include "base/task/task_features.h"
21 #include "base/task/thread_pool/environment_config.h"
22 #include "base/task/thread_pool/worker_thread_observer.h"
23 #include "base/threading/hang_watcher.h"
24 #include "base/time/time.h"
25 #include "base/time/time_override.h"
26 #include "base/trace_event/base_tracing.h"
27 #include "build/build_config.h"
28 #include "partition_alloc/partition_alloc_buildflags.h"
29 #include "partition_alloc/partition_alloc_config.h"
30
31 #if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
32 #include "base/files/file_descriptor_watcher_posix.h"
33 #endif
34
35 #if BUILDFLAG(IS_APPLE)
36 #include "base/apple/scoped_nsautorelease_pool.h"
37 #endif
38
39 #if BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \
40 PA_CONFIG(THREAD_CACHE_SUPPORTED)
41 #include "partition_alloc/thread_cache.h"
42 #endif
43
44 namespace base::internal {
45
46 constexpr TimeDelta WorkerThread::Delegate::kPurgeThreadCacheIdleDelay;
47
GetThreadLabel() const48 WorkerThread::ThreadLabel WorkerThread::Delegate::GetThreadLabel() const {
49 return WorkerThread::ThreadLabel::POOLED;
50 }
51
WaitForWork()52 void WorkerThread::Delegate::WaitForWork() {
53 const TimeDelta sleep_duration_before_worker_reclaim = GetSleepTimeout();
54
55 // When a thread goes to sleep, the memory retained by its thread cache is
56 // trapped there for as long as the thread sleeps. To prevent that, we can
57 // either purge the thread cache right before going to sleep, or after some
58 // delay.
59 //
60 // Purging the thread cache incurs a cost on the next task, since its thread
61 // cache will be empty and allocation performance initially lower. As a lot of
62 // sleeps are very short, do not purge all the time (this would also make
63 // sleep / wakeups cycles more costly).
64 //
65 // Instead, sleep for min(timeout, 1s). If the wait times out then purge at
66 // that point, and go to sleep for the remaining of the time. This ensures
67 // that we do no work for short sleeps, and that threads do not get awaken
68 // many times.
69 #if BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \
70 PA_CONFIG(THREAD_CACHE_SUPPORTED)
71 const TimeDelta sleep_duration_before_purge =
72 GetSleepDurationBeforePurge(base::TimeTicks::Now());
73
74 const bool was_signaled = TimedWait(std::min(
75 sleep_duration_before_purge, sleep_duration_before_worker_reclaim));
76 // Timed out.
77 if (!was_signaled) {
78 partition_alloc::ThreadCache::PurgeCurrentThread();
79
80 // The thread woke up to purge before its standard reclaim time. Sleep for
81 // what's remaining until then.
82 if (sleep_duration_before_worker_reclaim > sleep_duration_before_purge) {
83 TimedWait(sleep_duration_before_worker_reclaim.is_max()
84 ? TimeDelta::Max()
85 : sleep_duration_before_worker_reclaim -
86 sleep_duration_before_purge);
87 }
88 }
89 #else
90 TimedWait(sleep_duration_before_worker_reclaim);
91 #endif // BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) &&
92 // PA_CONFIG(THREAD_CACHE_SUPPORTED)
93 }
94
IsDelayFirstWorkerSleepEnabled()95 bool WorkerThread::Delegate::IsDelayFirstWorkerSleepEnabled() {
96 static bool state = FeatureList::IsEnabled(kDelayFirstWorkerWake);
97 return state;
98 }
99
100 #if BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \
101 PA_CONFIG(THREAD_CACHE_SUPPORTED)
GetSleepDurationBeforePurge(TimeTicks now)102 TimeDelta WorkerThread::Delegate::GetSleepDurationBeforePurge(TimeTicks now) {
103 base::TimeDelta sleep_duration_before_purge = kPurgeThreadCacheIdleDelay;
104
105 if (!IsDelayFirstWorkerSleepEnabled()) {
106 return sleep_duration_before_purge;
107 }
108
109 // Use the first time a worker goes to sleep in this process as an
110 // approximation of the process creation time.
111 static const TimeTicks first_sleep_time = now;
112 const TimeTicks first_sleep_time_to_use =
113 !first_sleep_time_for_testing_.is_null() ? first_sleep_time_for_testing_
114 : first_sleep_time;
115 const base::TimeTicks first_wake_time =
116 first_sleep_time_to_use + kFirstSleepDurationBeforePurge;
117
118 // A sleep that occurs within `kFirstSleepDurationBeforePurge` of the
119 // first sleep lasts at least `kFirstSleepDurationBeforePurge`.
120 if (now <= first_wake_time) {
121 // Avoid sleeping for less than `sleep_duration_before_purge` since that is
122 // the shortest expected duration to wait for a purge.
123 sleep_duration_before_purge =
124 std::max(kFirstSleepDurationBeforePurge, sleep_duration_before_purge);
125 }
126
127 // Align wakeups for purges to reduce the chances of taking the CPU out of
128 // sleep multiple times for these operations. This can happen if many workers
129 // in the same process scheduled wakeups. This can create a situation where
130 // any one worker wakes every `kPurgeThreadCacheIdleDelay` / N where N is the
131 // number of workers.
132 const TimeTicks snapped_purge_time =
133 (now + sleep_duration_before_purge)
134 .SnappedToNextTick(TimeTicks(), kPurgeThreadCacheIdleDelay);
135
136 return snapped_purge_time - now;
137 }
138
139 #endif // BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) &&
140 // PA_CONFIG(THREAD_CACHE_SUPPORTED)
141
WorkerThread(ThreadType thread_type_hint,TrackedRef<TaskTracker> task_tracker,size_t sequence_num,const CheckedLock * predecessor_lock,void * flow_terminator)142 WorkerThread::WorkerThread(ThreadType thread_type_hint,
143 TrackedRef<TaskTracker> task_tracker,
144 size_t sequence_num,
145 const CheckedLock* predecessor_lock,
146 void* flow_terminator)
147 : thread_lock_(predecessor_lock),
148 task_tracker_(std::move(task_tracker)),
149 thread_type_hint_(thread_type_hint),
150 current_thread_type_(GetDesiredThreadType()),
151 sequence_num_(sequence_num),
152 flow_terminator_(flow_terminator == nullptr
153 ? reinterpret_cast<intptr_t>(this)
154 : reinterpret_cast<intptr_t>(flow_terminator)) {
155 DCHECK(task_tracker_);
156 DCHECK(CanUseBackgroundThreadTypeForWorkerThread() ||
157 thread_type_hint_ != ThreadType::kBackground);
158 DCHECK(CanUseUtilityThreadTypeForWorkerThread() ||
159 thread_type_hint != ThreadType::kUtility);
160 }
161
Start(scoped_refptr<SingleThreadTaskRunner> io_thread_task_runner,WorkerThreadObserver * worker_thread_observer)162 bool WorkerThread::Start(
163 scoped_refptr<SingleThreadTaskRunner> io_thread_task_runner,
164 WorkerThreadObserver* worker_thread_observer) {
165 CheckedLock::AssertNoLockHeldOnCurrentThread();
166
167 // Prime kDelayFirstWorkerWake's feature state right away on thread creation
168 // instead of looking it up for the first time later on thread as this avoids
169 // a data race in tests that may ~FeatureList while the first worker thread
170 // is still initializing (the first WorkerThread will be started on the main
171 // thread as part of ThreadPoolImpl::Start() so doing it then avoids this
172 // race), crbug.com/1344573.
173 // Note 1: the feature state is always available at this point as
174 // ThreadPoolInstance::Start() contractually happens-after FeatureList
175 // initialization.
176 // Note 2: This is done on Start instead of in the constructor as construction
177 // happens under a ThreadGroup lock which precludes calling into
178 // FeatureList (as that can also use a lock).
179 delegate()->IsDelayFirstWorkerSleepEnabled();
180
181 CheckedAutoLock auto_lock(thread_lock_);
182 DCHECK(thread_handle_.is_null());
183
184 #if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
185 DCHECK(io_thread_task_runner);
186 io_thread_task_runner_ = std::move(io_thread_task_runner);
187 #endif
188
189 if (should_exit_.IsSet() || join_called_for_testing()) {
190 return true;
191 }
192
193 DCHECK(!worker_thread_observer_);
194 worker_thread_observer_ = worker_thread_observer;
195
196 self_ = this;
197
198 constexpr size_t kDefaultStackSize = 0;
199 PlatformThread::CreateWithType(kDefaultStackSize, this, &thread_handle_,
200 current_thread_type_);
201
202 if (thread_handle_.is_null()) {
203 self_ = nullptr;
204 return false;
205 }
206
207 return true;
208 }
209
Destroy()210 void WorkerThread::Destroy() {
211 CheckedAutoLock auto_lock(thread_lock_);
212
213 // If |thread_handle_| wasn't joined, detach it.
214 if (!thread_handle_.is_null()) {
215 PlatformThread::Detach(thread_handle_);
216 }
217 }
218
ThreadAliveForTesting() const219 bool WorkerThread::ThreadAliveForTesting() const {
220 CheckedAutoLock auto_lock(thread_lock_);
221 return !thread_handle_.is_null();
222 }
223
224 WorkerThread::~WorkerThread() = default;
225
MaybeUpdateThreadType()226 void WorkerThread::MaybeUpdateThreadType() {
227 UpdateThreadType(GetDesiredThreadType());
228 }
229
BeginUnusedPeriod()230 void WorkerThread::BeginUnusedPeriod() {
231 CheckedAutoLock auto_lock(thread_lock_);
232 DCHECK(last_used_time_.is_null());
233 last_used_time_ = subtle::TimeTicksNowIgnoringOverride();
234 }
235
EndUnusedPeriod()236 void WorkerThread::EndUnusedPeriod() {
237 CheckedAutoLock auto_lock(thread_lock_);
238 DCHECK(!last_used_time_.is_null());
239 last_used_time_ = TimeTicks();
240 }
241
GetLastUsedTime() const242 TimeTicks WorkerThread::GetLastUsedTime() const {
243 CheckedAutoLock auto_lock(thread_lock_);
244 return last_used_time_;
245 }
246
ShouldExit() const247 bool WorkerThread::ShouldExit() const {
248 // The ordering of the checks is important below. This WorkerThread may be
249 // released and outlive |task_tracker_| in unit tests. However, when the
250 // WorkerThread is released, |should_exit_| will be set, so check that
251 // first.
252 return should_exit_.IsSet() || join_called_for_testing() ||
253 task_tracker_->IsShutdownComplete();
254 }
255
GetDesiredThreadType() const256 ThreadType WorkerThread::GetDesiredThreadType() const {
257 // To avoid shutdown hangs, disallow a type below kNormal during shutdown
258 if (task_tracker_->HasShutdownStarted())
259 return ThreadType::kDefault;
260
261 return thread_type_hint_;
262 }
263
UpdateThreadType(ThreadType desired_thread_type)264 void WorkerThread::UpdateThreadType(ThreadType desired_thread_type) {
265 if (desired_thread_type == current_thread_type_)
266 return;
267
268 PlatformThread::SetCurrentThreadType(desired_thread_type);
269 current_thread_type_ = desired_thread_type;
270 }
271
ThreadMain()272 void WorkerThread::ThreadMain() {
273 #if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
274 DCHECK(io_thread_task_runner_);
275 FileDescriptorWatcher file_descriptor_watcher(io_thread_task_runner_);
276 #endif
277
278 if (thread_type_hint_ == ThreadType::kBackground) {
279 switch (delegate()->GetThreadLabel()) {
280 case ThreadLabel::POOLED:
281 RunBackgroundPooledWorker();
282 return;
283 case ThreadLabel::SHARED:
284 RunBackgroundSharedWorker();
285 return;
286 case ThreadLabel::DEDICATED:
287 RunBackgroundDedicatedWorker();
288 return;
289 #if BUILDFLAG(IS_WIN)
290 case ThreadLabel::SHARED_COM:
291 RunBackgroundSharedCOMWorker();
292 return;
293 case ThreadLabel::DEDICATED_COM:
294 RunBackgroundDedicatedCOMWorker();
295 return;
296 #endif // BUILDFLAG(IS_WIN)
297 }
298 }
299
300 switch (delegate()->GetThreadLabel()) {
301 case ThreadLabel::POOLED:
302 RunPooledWorker();
303 return;
304 case ThreadLabel::SHARED:
305 RunSharedWorker();
306 return;
307 case ThreadLabel::DEDICATED:
308 RunDedicatedWorker();
309 return;
310 #if BUILDFLAG(IS_WIN)
311 case ThreadLabel::SHARED_COM:
312 RunSharedCOMWorker();
313 return;
314 case ThreadLabel::DEDICATED_COM:
315 RunDedicatedCOMWorker();
316 return;
317 #endif // BUILDFLAG(IS_WIN)
318 }
319 }
320
RunPooledWorker()321 NOINLINE void WorkerThread::RunPooledWorker() {
322 RunWorker();
323 NO_CODE_FOLDING();
324 }
325
RunBackgroundPooledWorker()326 NOINLINE void WorkerThread::RunBackgroundPooledWorker() {
327 RunWorker();
328 NO_CODE_FOLDING();
329 }
330
RunSharedWorker()331 NOINLINE void WorkerThread::RunSharedWorker() {
332 RunWorker();
333 NO_CODE_FOLDING();
334 }
335
RunBackgroundSharedWorker()336 NOINLINE void WorkerThread::RunBackgroundSharedWorker() {
337 RunWorker();
338 NO_CODE_FOLDING();
339 }
340
RunDedicatedWorker()341 NOINLINE void WorkerThread::RunDedicatedWorker() {
342 RunWorker();
343 NO_CODE_FOLDING();
344 }
345
RunBackgroundDedicatedWorker()346 NOINLINE void WorkerThread::RunBackgroundDedicatedWorker() {
347 RunWorker();
348 NO_CODE_FOLDING();
349 }
350
351 #if BUILDFLAG(IS_WIN)
RunSharedCOMWorker()352 NOINLINE void WorkerThread::RunSharedCOMWorker() {
353 RunWorker();
354 NO_CODE_FOLDING();
355 }
356
RunBackgroundSharedCOMWorker()357 NOINLINE void WorkerThread::RunBackgroundSharedCOMWorker() {
358 RunWorker();
359 NO_CODE_FOLDING();
360 }
361
RunDedicatedCOMWorker()362 NOINLINE void WorkerThread::RunDedicatedCOMWorker() {
363 RunWorker();
364 NO_CODE_FOLDING();
365 }
366
RunBackgroundDedicatedCOMWorker()367 NOINLINE void WorkerThread::RunBackgroundDedicatedCOMWorker() {
368 RunWorker();
369 NO_CODE_FOLDING();
370 }
371 #endif // BUILDFLAG(IS_WIN)
372
RunWorker()373 void WorkerThread::RunWorker() {
374 DCHECK_EQ(self_, this);
375 TRACE_EVENT_INSTANT0("base", "WorkerThread born", TRACE_EVENT_SCOPE_THREAD);
376 TRACE_EVENT_BEGIN0("base", "WorkerThread active");
377
378 if (worker_thread_observer_) {
379 worker_thread_observer_->OnWorkerThreadMainEntry();
380 }
381
382 delegate()->OnMainEntry(this);
383
384 // Background threads can take an arbitrary amount of time to complete, do not
385 // watch them for hangs. Ignore priority boosting for now.
386 const bool watch_for_hangs =
387 base::HangWatcher::IsThreadPoolHangWatchingEnabled() &&
388 GetDesiredThreadType() != ThreadType::kBackground;
389
390 // If this process has a HangWatcher register this thread for watching.
391 base::ScopedClosureRunner unregister_for_hang_watching;
392 if (watch_for_hangs) {
393 unregister_for_hang_watching = base::HangWatcher::RegisterThread(
394 base::HangWatcher::ThreadType::kThreadPoolThread);
395 }
396
397 while (!ShouldExit()) {
398 #if BUILDFLAG(IS_APPLE)
399 apple::ScopedNSAutoreleasePool autorelease_pool;
400 #endif
401 std::optional<WatchHangsInScope> hang_watch_scope;
402
403 TRACE_EVENT_END0("base", "WorkerThread active");
404 // TODO(crbug.com/1021571): Remove this once fixed.
405 PERFETTO_INTERNAL_ADD_EMPTY_EVENT();
406 hang_watch_scope.reset();
407 delegate()->WaitForWork();
408 TRACE_EVENT_BEGIN("base", "WorkerThread active",
409 perfetto::TerminatingFlow::FromPointer(
410 reinterpret_cast<void*>(flow_terminator_)));
411
412 // Don't GetWork() in the case where we woke up for Cleanup().
413 if (ShouldExit()) {
414 break;
415 }
416
417 if (watch_for_hangs) {
418 hang_watch_scope.emplace();
419 }
420
421 // Thread type needs to be updated before GetWork.
422 UpdateThreadType(GetDesiredThreadType());
423
424 // Get the task source containing the first task to execute.
425 RegisteredTaskSource task_source = delegate()->GetWork(this);
426
427 // If acquiring work failed and the worker's still alive,
428 // record that this is an unnecessary wakeup.
429 if (!task_source && !ShouldExit()) {
430 delegate()->RecordUnnecessaryWakeup();
431 }
432
433 while (task_source) {
434 // Alias pointer for investigation of memory corruption. crbug.com/1218384
435 TaskSource* task_source_before_run = task_source.get();
436 base::debug::Alias(&task_source_before_run);
437
438 task_source = task_tracker_->RunAndPopNextTask(std::move(task_source));
439 // Alias pointer for investigation of memory corruption. crbug.com/1218384
440 TaskSource* task_source_before_move = task_source.get();
441 base::debug::Alias(&task_source_before_move);
442
443 // We emplace the hang_watch_scope here so that each hang watch scope
444 // covers one GetWork (or SwapProcessedTask) as well as one
445 // RunAndPopNextTask.
446 if (watch_for_hangs) {
447 hang_watch_scope.emplace();
448 }
449
450 RegisteredTaskSource new_task_source =
451 delegate()->SwapProcessedTask(std::move(task_source), this);
452
453 UpdateThreadType(GetDesiredThreadType());
454
455 // Check that task_source is always cleared, to help investigation of
456 // memory corruption where task_source is non-null after being moved.
457 // crbug.com/1218384
458 CHECK(!task_source);
459 task_source = std::move(new_task_source);
460 }
461 }
462
463 // Important: It is unsafe to access unowned state (e.g. |task_tracker_|)
464 // after invoking OnMainExit().
465
466 delegate()->OnMainExit(this);
467
468 if (worker_thread_observer_) {
469 worker_thread_observer_->OnWorkerThreadMainExit();
470 }
471
472 // Release the self-reference to |this|. This can result in deleting |this|
473 // and as such no more member accesses should be made after this point.
474 self_ = nullptr;
475
476 TRACE_EVENT_END0("base", "WorkerThread active");
477 TRACE_EVENT_INSTANT0("base", "WorkerThread dead", TRACE_EVENT_SCOPE_THREAD);
478 // TODO(crbug.com/1021571): Remove this once fixed.
479 PERFETTO_INTERNAL_ADD_EMPTY_EVENT();
480 }
481
482 } // namespace base::internal
483