xref: /aosp_15_r20/external/cronet/base/task/thread_pool/worker_thread.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/worker_thread.h"
6 
7 #include <stddef.h>
8 
9 #include <algorithm>
10 #include <atomic>
11 #include <optional>
12 #include <utility>
13 
14 #include "base/check_op.h"
15 #include "base/compiler_specific.h"
16 #include "base/debug/alias.h"
17 #include "base/feature_list.h"
18 #include "base/functional/callback_helpers.h"
19 #include "base/synchronization/waitable_event.h"
20 #include "base/task/task_features.h"
21 #include "base/task/thread_pool/environment_config.h"
22 #include "base/task/thread_pool/worker_thread_observer.h"
23 #include "base/threading/hang_watcher.h"
24 #include "base/time/time.h"
25 #include "base/time/time_override.h"
26 #include "base/trace_event/base_tracing.h"
27 #include "build/build_config.h"
28 #include "partition_alloc/partition_alloc_buildflags.h"
29 #include "partition_alloc/partition_alloc_config.h"
30 
31 #if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
32 #include "base/files/file_descriptor_watcher_posix.h"
33 #endif
34 
35 #if BUILDFLAG(IS_APPLE)
36 #include "base/apple/scoped_nsautorelease_pool.h"
37 #endif
38 
39 #if BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \
40     PA_CONFIG(THREAD_CACHE_SUPPORTED)
41 #include "partition_alloc/thread_cache.h"
42 #endif
43 
44 namespace base::internal {
45 
46 constexpr TimeDelta WorkerThread::Delegate::kPurgeThreadCacheIdleDelay;
47 
GetThreadLabel() const48 WorkerThread::ThreadLabel WorkerThread::Delegate::GetThreadLabel() const {
49   return WorkerThread::ThreadLabel::POOLED;
50 }
51 
WaitForWork()52 void WorkerThread::Delegate::WaitForWork() {
53   const TimeDelta sleep_duration_before_worker_reclaim = GetSleepTimeout();
54 
55   // When a thread goes to sleep, the memory retained by its thread cache is
56   // trapped there for as long as the thread sleeps. To prevent that, we can
57   // either purge the thread cache right before going to sleep, or after some
58   // delay.
59   //
60   // Purging the thread cache incurs a cost on the next task, since its thread
61   // cache will be empty and allocation performance initially lower. As a lot of
62   // sleeps are very short, do not purge all the time (this would also make
63   // sleep / wakeups cycles more costly).
64   //
65   // Instead, sleep for min(timeout, 1s). If the wait times out then purge at
66   // that point, and go to sleep for the remaining of the time. This ensures
67   // that we do no work for short sleeps, and that threads do not get awaken
68   // many times.
69 #if BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \
70     PA_CONFIG(THREAD_CACHE_SUPPORTED)
71   const TimeDelta sleep_duration_before_purge =
72       GetSleepDurationBeforePurge(base::TimeTicks::Now());
73 
74   const bool was_signaled = TimedWait(std::min(
75       sleep_duration_before_purge, sleep_duration_before_worker_reclaim));
76   // Timed out.
77   if (!was_signaled) {
78     partition_alloc::ThreadCache::PurgeCurrentThread();
79 
80     // The thread woke up to purge before its standard reclaim time. Sleep for
81     // what's remaining until then.
82     if (sleep_duration_before_worker_reclaim > sleep_duration_before_purge) {
83       TimedWait(sleep_duration_before_worker_reclaim.is_max()
84                     ? TimeDelta::Max()
85                     : sleep_duration_before_worker_reclaim -
86                           sleep_duration_before_purge);
87     }
88   }
89 #else
90   TimedWait(sleep_duration_before_worker_reclaim);
91 #endif  // BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) &&
92         // PA_CONFIG(THREAD_CACHE_SUPPORTED)
93 }
94 
IsDelayFirstWorkerSleepEnabled()95 bool WorkerThread::Delegate::IsDelayFirstWorkerSleepEnabled() {
96   static bool state = FeatureList::IsEnabled(kDelayFirstWorkerWake);
97   return state;
98 }
99 
100 #if BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \
101     PA_CONFIG(THREAD_CACHE_SUPPORTED)
GetSleepDurationBeforePurge(TimeTicks now)102 TimeDelta WorkerThread::Delegate::GetSleepDurationBeforePurge(TimeTicks now) {
103   base::TimeDelta sleep_duration_before_purge = kPurgeThreadCacheIdleDelay;
104 
105   if (!IsDelayFirstWorkerSleepEnabled()) {
106     return sleep_duration_before_purge;
107   }
108 
109   // Use the first time a worker goes to sleep in this process as an
110   // approximation of the process creation time.
111   static const TimeTicks first_sleep_time = now;
112   const TimeTicks first_sleep_time_to_use =
113       !first_sleep_time_for_testing_.is_null() ? first_sleep_time_for_testing_
114                                                : first_sleep_time;
115   const base::TimeTicks first_wake_time =
116       first_sleep_time_to_use + kFirstSleepDurationBeforePurge;
117 
118   // A sleep that occurs within `kFirstSleepDurationBeforePurge` of the
119   // first sleep lasts at least `kFirstSleepDurationBeforePurge`.
120   if (now <= first_wake_time) {
121     // Avoid sleeping for less than `sleep_duration_before_purge` since that is
122     // the shortest expected duration to wait for a purge.
123     sleep_duration_before_purge =
124         std::max(kFirstSleepDurationBeforePurge, sleep_duration_before_purge);
125   }
126 
127   // Align wakeups for purges to reduce the chances of taking the CPU out of
128   // sleep multiple times for these operations. This can happen if many workers
129   // in the same process scheduled wakeups. This can create a situation where
130   // any one worker wakes every `kPurgeThreadCacheIdleDelay` / N where N is the
131   // number of workers.
132   const TimeTicks snapped_purge_time =
133       (now + sleep_duration_before_purge)
134           .SnappedToNextTick(TimeTicks(), kPurgeThreadCacheIdleDelay);
135 
136   return snapped_purge_time - now;
137 }
138 
139 #endif  // BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) &&
140         // PA_CONFIG(THREAD_CACHE_SUPPORTED)
141 
WorkerThread(ThreadType thread_type_hint,TrackedRef<TaskTracker> task_tracker,size_t sequence_num,const CheckedLock * predecessor_lock,void * flow_terminator)142 WorkerThread::WorkerThread(ThreadType thread_type_hint,
143                            TrackedRef<TaskTracker> task_tracker,
144                            size_t sequence_num,
145                            const CheckedLock* predecessor_lock,
146                            void* flow_terminator)
147     : thread_lock_(predecessor_lock),
148       task_tracker_(std::move(task_tracker)),
149       thread_type_hint_(thread_type_hint),
150       current_thread_type_(GetDesiredThreadType()),
151       sequence_num_(sequence_num),
152       flow_terminator_(flow_terminator == nullptr
153                            ? reinterpret_cast<intptr_t>(this)
154                            : reinterpret_cast<intptr_t>(flow_terminator)) {
155   DCHECK(task_tracker_);
156   DCHECK(CanUseBackgroundThreadTypeForWorkerThread() ||
157          thread_type_hint_ != ThreadType::kBackground);
158   DCHECK(CanUseUtilityThreadTypeForWorkerThread() ||
159          thread_type_hint != ThreadType::kUtility);
160 }
161 
Start(scoped_refptr<SingleThreadTaskRunner> io_thread_task_runner,WorkerThreadObserver * worker_thread_observer)162 bool WorkerThread::Start(
163     scoped_refptr<SingleThreadTaskRunner> io_thread_task_runner,
164     WorkerThreadObserver* worker_thread_observer) {
165   CheckedLock::AssertNoLockHeldOnCurrentThread();
166 
167   // Prime kDelayFirstWorkerWake's feature state right away on thread creation
168   // instead of looking it up for the first time later on thread as this avoids
169   // a data race in tests that may ~FeatureList while the first worker thread
170   // is still initializing (the first WorkerThread will be started on the main
171   // thread as part of ThreadPoolImpl::Start() so doing it then avoids this
172   // race), crbug.com/1344573.
173   // Note 1: the feature state is always available at this point as
174   // ThreadPoolInstance::Start() contractually happens-after FeatureList
175   // initialization.
176   // Note 2: This is done on Start instead of in the constructor as construction
177   // happens under a ThreadGroup lock which precludes calling into
178   // FeatureList (as that can also use a lock).
179   delegate()->IsDelayFirstWorkerSleepEnabled();
180 
181   CheckedAutoLock auto_lock(thread_lock_);
182   DCHECK(thread_handle_.is_null());
183 
184 #if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
185   DCHECK(io_thread_task_runner);
186   io_thread_task_runner_ = std::move(io_thread_task_runner);
187 #endif
188 
189   if (should_exit_.IsSet() || join_called_for_testing()) {
190     return true;
191   }
192 
193   DCHECK(!worker_thread_observer_);
194   worker_thread_observer_ = worker_thread_observer;
195 
196   self_ = this;
197 
198   constexpr size_t kDefaultStackSize = 0;
199   PlatformThread::CreateWithType(kDefaultStackSize, this, &thread_handle_,
200                                  current_thread_type_);
201 
202   if (thread_handle_.is_null()) {
203     self_ = nullptr;
204     return false;
205   }
206 
207   return true;
208 }
209 
Destroy()210 void WorkerThread::Destroy() {
211   CheckedAutoLock auto_lock(thread_lock_);
212 
213   // If |thread_handle_| wasn't joined, detach it.
214   if (!thread_handle_.is_null()) {
215     PlatformThread::Detach(thread_handle_);
216   }
217 }
218 
ThreadAliveForTesting() const219 bool WorkerThread::ThreadAliveForTesting() const {
220   CheckedAutoLock auto_lock(thread_lock_);
221   return !thread_handle_.is_null();
222 }
223 
224 WorkerThread::~WorkerThread() = default;
225 
MaybeUpdateThreadType()226 void WorkerThread::MaybeUpdateThreadType() {
227   UpdateThreadType(GetDesiredThreadType());
228 }
229 
BeginUnusedPeriod()230 void WorkerThread::BeginUnusedPeriod() {
231   CheckedAutoLock auto_lock(thread_lock_);
232   DCHECK(last_used_time_.is_null());
233   last_used_time_ = subtle::TimeTicksNowIgnoringOverride();
234 }
235 
EndUnusedPeriod()236 void WorkerThread::EndUnusedPeriod() {
237   CheckedAutoLock auto_lock(thread_lock_);
238   DCHECK(!last_used_time_.is_null());
239   last_used_time_ = TimeTicks();
240 }
241 
GetLastUsedTime() const242 TimeTicks WorkerThread::GetLastUsedTime() const {
243   CheckedAutoLock auto_lock(thread_lock_);
244   return last_used_time_;
245 }
246 
ShouldExit() const247 bool WorkerThread::ShouldExit() const {
248   // The ordering of the checks is important below. This WorkerThread may be
249   // released and outlive |task_tracker_| in unit tests. However, when the
250   // WorkerThread is released, |should_exit_| will be set, so check that
251   // first.
252   return should_exit_.IsSet() || join_called_for_testing() ||
253          task_tracker_->IsShutdownComplete();
254 }
255 
GetDesiredThreadType() const256 ThreadType WorkerThread::GetDesiredThreadType() const {
257   // To avoid shutdown hangs, disallow a type below kNormal during shutdown
258   if (task_tracker_->HasShutdownStarted())
259     return ThreadType::kDefault;
260 
261   return thread_type_hint_;
262 }
263 
UpdateThreadType(ThreadType desired_thread_type)264 void WorkerThread::UpdateThreadType(ThreadType desired_thread_type) {
265   if (desired_thread_type == current_thread_type_)
266     return;
267 
268   PlatformThread::SetCurrentThreadType(desired_thread_type);
269   current_thread_type_ = desired_thread_type;
270 }
271 
ThreadMain()272 void WorkerThread::ThreadMain() {
273 #if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
274   DCHECK(io_thread_task_runner_);
275   FileDescriptorWatcher file_descriptor_watcher(io_thread_task_runner_);
276 #endif
277 
278   if (thread_type_hint_ == ThreadType::kBackground) {
279     switch (delegate()->GetThreadLabel()) {
280       case ThreadLabel::POOLED:
281         RunBackgroundPooledWorker();
282         return;
283       case ThreadLabel::SHARED:
284         RunBackgroundSharedWorker();
285         return;
286       case ThreadLabel::DEDICATED:
287         RunBackgroundDedicatedWorker();
288         return;
289 #if BUILDFLAG(IS_WIN)
290       case ThreadLabel::SHARED_COM:
291         RunBackgroundSharedCOMWorker();
292         return;
293       case ThreadLabel::DEDICATED_COM:
294         RunBackgroundDedicatedCOMWorker();
295         return;
296 #endif  // BUILDFLAG(IS_WIN)
297     }
298   }
299 
300   switch (delegate()->GetThreadLabel()) {
301     case ThreadLabel::POOLED:
302       RunPooledWorker();
303       return;
304     case ThreadLabel::SHARED:
305       RunSharedWorker();
306       return;
307     case ThreadLabel::DEDICATED:
308       RunDedicatedWorker();
309       return;
310 #if BUILDFLAG(IS_WIN)
311     case ThreadLabel::SHARED_COM:
312       RunSharedCOMWorker();
313       return;
314     case ThreadLabel::DEDICATED_COM:
315       RunDedicatedCOMWorker();
316       return;
317 #endif  // BUILDFLAG(IS_WIN)
318   }
319 }
320 
RunPooledWorker()321 NOINLINE void WorkerThread::RunPooledWorker() {
322   RunWorker();
323   NO_CODE_FOLDING();
324 }
325 
RunBackgroundPooledWorker()326 NOINLINE void WorkerThread::RunBackgroundPooledWorker() {
327   RunWorker();
328   NO_CODE_FOLDING();
329 }
330 
RunSharedWorker()331 NOINLINE void WorkerThread::RunSharedWorker() {
332   RunWorker();
333   NO_CODE_FOLDING();
334 }
335 
RunBackgroundSharedWorker()336 NOINLINE void WorkerThread::RunBackgroundSharedWorker() {
337   RunWorker();
338   NO_CODE_FOLDING();
339 }
340 
RunDedicatedWorker()341 NOINLINE void WorkerThread::RunDedicatedWorker() {
342   RunWorker();
343   NO_CODE_FOLDING();
344 }
345 
RunBackgroundDedicatedWorker()346 NOINLINE void WorkerThread::RunBackgroundDedicatedWorker() {
347   RunWorker();
348   NO_CODE_FOLDING();
349 }
350 
351 #if BUILDFLAG(IS_WIN)
RunSharedCOMWorker()352 NOINLINE void WorkerThread::RunSharedCOMWorker() {
353   RunWorker();
354   NO_CODE_FOLDING();
355 }
356 
RunBackgroundSharedCOMWorker()357 NOINLINE void WorkerThread::RunBackgroundSharedCOMWorker() {
358   RunWorker();
359   NO_CODE_FOLDING();
360 }
361 
RunDedicatedCOMWorker()362 NOINLINE void WorkerThread::RunDedicatedCOMWorker() {
363   RunWorker();
364   NO_CODE_FOLDING();
365 }
366 
RunBackgroundDedicatedCOMWorker()367 NOINLINE void WorkerThread::RunBackgroundDedicatedCOMWorker() {
368   RunWorker();
369   NO_CODE_FOLDING();
370 }
371 #endif  // BUILDFLAG(IS_WIN)
372 
RunWorker()373 void WorkerThread::RunWorker() {
374   DCHECK_EQ(self_, this);
375   TRACE_EVENT_INSTANT0("base", "WorkerThread born", TRACE_EVENT_SCOPE_THREAD);
376   TRACE_EVENT_BEGIN0("base", "WorkerThread active");
377 
378   if (worker_thread_observer_) {
379     worker_thread_observer_->OnWorkerThreadMainEntry();
380   }
381 
382   delegate()->OnMainEntry(this);
383 
384   // Background threads can take an arbitrary amount of time to complete, do not
385   // watch them for hangs. Ignore priority boosting for now.
386   const bool watch_for_hangs =
387       base::HangWatcher::IsThreadPoolHangWatchingEnabled() &&
388       GetDesiredThreadType() != ThreadType::kBackground;
389 
390   // If this process has a HangWatcher register this thread for watching.
391   base::ScopedClosureRunner unregister_for_hang_watching;
392   if (watch_for_hangs) {
393     unregister_for_hang_watching = base::HangWatcher::RegisterThread(
394         base::HangWatcher::ThreadType::kThreadPoolThread);
395   }
396 
397   while (!ShouldExit()) {
398 #if BUILDFLAG(IS_APPLE)
399     apple::ScopedNSAutoreleasePool autorelease_pool;
400 #endif
401     std::optional<WatchHangsInScope> hang_watch_scope;
402 
403     TRACE_EVENT_END0("base", "WorkerThread active");
404     // TODO(crbug.com/1021571): Remove this once fixed.
405     PERFETTO_INTERNAL_ADD_EMPTY_EVENT();
406     hang_watch_scope.reset();
407     delegate()->WaitForWork();
408     TRACE_EVENT_BEGIN("base", "WorkerThread active",
409                       perfetto::TerminatingFlow::FromPointer(
410                           reinterpret_cast<void*>(flow_terminator_)));
411 
412     // Don't GetWork() in the case where we woke up for Cleanup().
413     if (ShouldExit()) {
414       break;
415     }
416 
417     if (watch_for_hangs) {
418       hang_watch_scope.emplace();
419     }
420 
421     // Thread type needs to be updated before GetWork.
422     UpdateThreadType(GetDesiredThreadType());
423 
424     // Get the task source containing the first task to execute.
425     RegisteredTaskSource task_source = delegate()->GetWork(this);
426 
427     // If acquiring work failed and the worker's still alive,
428     // record that this is an unnecessary wakeup.
429     if (!task_source && !ShouldExit()) {
430       delegate()->RecordUnnecessaryWakeup();
431     }
432 
433     while (task_source) {
434       // Alias pointer for investigation of memory corruption. crbug.com/1218384
435       TaskSource* task_source_before_run = task_source.get();
436       base::debug::Alias(&task_source_before_run);
437 
438       task_source = task_tracker_->RunAndPopNextTask(std::move(task_source));
439       // Alias pointer for investigation of memory corruption. crbug.com/1218384
440       TaskSource* task_source_before_move = task_source.get();
441       base::debug::Alias(&task_source_before_move);
442 
443       // We emplace the hang_watch_scope here so that each hang watch scope
444       // covers one GetWork (or SwapProcessedTask) as well as one
445       // RunAndPopNextTask.
446       if (watch_for_hangs) {
447         hang_watch_scope.emplace();
448       }
449 
450       RegisteredTaskSource new_task_source =
451           delegate()->SwapProcessedTask(std::move(task_source), this);
452 
453       UpdateThreadType(GetDesiredThreadType());
454 
455       // Check that task_source is always cleared, to help investigation of
456       // memory corruption where task_source is non-null after being moved.
457       // crbug.com/1218384
458       CHECK(!task_source);
459       task_source = std::move(new_task_source);
460     }
461   }
462 
463   // Important: It is unsafe to access unowned state (e.g. |task_tracker_|)
464   // after invoking OnMainExit().
465 
466   delegate()->OnMainExit(this);
467 
468   if (worker_thread_observer_) {
469     worker_thread_observer_->OnWorkerThreadMainExit();
470   }
471 
472   // Release the self-reference to |this|. This can result in deleting |this|
473   // and as such no more member accesses should be made after this point.
474   self_ = nullptr;
475 
476   TRACE_EVENT_END0("base", "WorkerThread active");
477   TRACE_EVENT_INSTANT0("base", "WorkerThread dead", TRACE_EVENT_SCOPE_THREAD);
478   // TODO(crbug.com/1021571): Remove this once fixed.
479   PERFETTO_INTERNAL_ADD_EMPTY_EVENT();
480 }
481 
482 }  // namespace base::internal
483