1 // Copyright 2024 The Chromium Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef BASE_TASK_THREAD_POOL_THREAD_GROUP_SEMAPHORE_H_ 6 #define BASE_TASK_THREAD_POOL_THREAD_GROUP_SEMAPHORE_H_ 7 8 #include <optional> 9 #include <string_view> 10 11 #include "base/base_export.h" 12 #include "base/task/thread_pool/task_source.h" 13 #include "base/task/thread_pool/thread_group_impl.h" 14 #include "base/task/thread_pool/worker_thread_semaphore.h" 15 16 namespace base { 17 18 class WorkerThreadObserver; 19 20 namespace internal { 21 22 class TaskTracker; 23 24 // A group of `WorkerThreadSemaphore`s that run `Task`s. 25 class BASE_EXPORT ThreadGroupSemaphore : public ThreadGroup { 26 public: 27 // Constructs a group without workers. 28 // 29 // `histogram_label` is used to label the thread group's histograms as 30 // "ThreadPool." + histogram_name + "." + `histogram_label` + extra suffixes. 31 // It must not be empty. `thread_group_label` is used to label the thread 32 // group's threads, it must not be empty. `thread_type_hint` is the preferred 33 // thread type; the actual thread type depends on shutdown state and platform 34 // capabilities. `task_tracker` keeps track of tasks. 35 ThreadGroupSemaphore(std::string_view histogram_label, 36 std::string_view thread_group_label, 37 ThreadType thread_type_hint, 38 TrackedRef<TaskTracker> task_tracker, 39 TrackedRef<Delegate> delegate); 40 41 ThreadGroupSemaphore(const ThreadGroupSemaphore&) = delete; 42 ThreadGroupSemaphore& operator=(const ThreadGroupSemaphore&) = delete; 43 // Destroying a ThreadGroupSemaphore returned by Create() is not allowed 44 // in production; it is always leaked. In tests, it can only be destroyed 45 // after JoinForTesting() has returned. 46 ~ThreadGroupSemaphore() override; 47 48 // ThreadGroup: 49 void Start(size_t max_tasks, 50 size_t max_best_effort_tasks, 51 TimeDelta suggested_reclaim_time, 52 scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner, 53 WorkerThreadObserver* worker_thread_observer, 54 WorkerEnvironment worker_environment, 55 bool synchronous_thread_start_for_testing = false, 56 std::optional<TimeDelta> may_block_threshold = 57 std::optional<TimeDelta>()) override; 58 void JoinForTesting() override; 59 void DidUpdateCanRunPolicy() override; 60 void OnShutdownStarted() override; 61 std::unique_ptr<BaseScopedCommandsExecutor> GetExecutor() override; 62 size_t NumberOfIdleWorkersLockRequiredForTesting() const 63 EXCLUSIVE_LOCKS_REQUIRED(lock_) override; 64 65 private: 66 class SemaphoreScopedCommandsExecutor; 67 class SemaphoreWorkerDelegate; 68 69 // friend tests so that they can access `blocked_workers_poll_period` and 70 // may_block_threshold(), both in ThreadGroup. 71 friend class ThreadGroupSemaphoreBlockingTest; 72 friend class ThreadGroupSemaphoreMayBlockTest; 73 FRIEND_TEST_ALL_PREFIXES(ThreadGroupSemaphoreBlockingTest, 74 ThreadBlockUnblockPremature); 75 FRIEND_TEST_ALL_PREFIXES(ThreadGroupSemaphoreBlockingTest, 76 ThreadBlockUnblockPrematureBestEffort); 77 78 // ThreadGroup: 79 void UpdateSortKey(TaskSource::Transaction transaction) override; 80 void PushTaskSourceAndWakeUpWorkers( 81 RegisteredTaskSourceAndTransaction transaction_with_task_source) override; 82 void EnsureEnoughWorkersLockRequired(BaseScopedCommandsExecutor* executor) 83 override EXCLUSIVE_LOCKS_REQUIRED(lock_); 84 ThreadGroupWorkerDelegate* GetWorkerDelegate(WorkerThread* worker) override; 85 86 void CreateAndRegisterWorkerLockRequired( 87 SemaphoreScopedCommandsExecutor* executor) 88 EXCLUSIVE_LOCKS_REQUIRED(lock_); 89 90 // Passed to SemaphoreWorkerDelegate. 91 Semaphore semaphore_{0}; 92 AtomicFlag join_called_for_testing_{}; 93 94 // A worker (any worker) becomes 'signaled' when the semaphore is incremented, 95 // and is no longer considered signaled at 96 // OnWorkerBecomesIdleLockRequired(). Will not exceed `workers_.size()` due to 97 // being deliberately capped in`EnsureEnoughWorkersLockRequired(). 98 size_t num_active_signals_ GUARDED_BY(lock_) = 0; 99 100 size_t worker_sequence_num_ GUARDED_BY(lock_) = 0; 101 102 // Ensures recently cleaned up workers (ref. 103 // SemaphoreWorkerDelegate::CleanupLockRequired()) had time to exit as 104 // they have a raw reference to `this` (and to TaskTracker) which can 105 // otherwise result in racy use-after-frees per no longer being part of 106 // `workers_` and hence not being explicitly joined in JoinForTesting(): 107 // https://crbug.com/810464. Uses AtomicRefCount to make its only public 108 // method thread-safe. 109 TrackedRefFactory<ThreadGroup> tracked_ref_factory_; 110 }; 111 112 } // namespace internal 113 } // namespace base 114 115 #endif // BASE_TASK_THREAD_POOL_THREAD_GROUP_SEMAPHORE_H_ 116