xref: /aosp_15_r20/external/cronet/base/task/thread_pool/thread_group_semaphore.h (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2024 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef BASE_TASK_THREAD_POOL_THREAD_GROUP_SEMAPHORE_H_
6 #define BASE_TASK_THREAD_POOL_THREAD_GROUP_SEMAPHORE_H_
7 
8 #include <optional>
9 #include <string_view>
10 
11 #include "base/base_export.h"
12 #include "base/task/thread_pool/task_source.h"
13 #include "base/task/thread_pool/thread_group_impl.h"
14 #include "base/task/thread_pool/worker_thread_semaphore.h"
15 
16 namespace base {
17 
18 class WorkerThreadObserver;
19 
20 namespace internal {
21 
22 class TaskTracker;
23 
24 // A group of `WorkerThreadSemaphore`s that run `Task`s.
25 class BASE_EXPORT ThreadGroupSemaphore : public ThreadGroup {
26  public:
27   // Constructs a group without workers.
28   //
29   // `histogram_label` is used to label the thread group's histograms as
30   // "ThreadPool." + histogram_name + "." + `histogram_label` + extra suffixes.
31   // It must not be empty. `thread_group_label` is used to label the thread
32   // group's threads, it must not be empty. `thread_type_hint` is the preferred
33   // thread type; the actual thread type depends on shutdown state and platform
34   // capabilities. `task_tracker` keeps track of tasks.
35   ThreadGroupSemaphore(std::string_view histogram_label,
36                        std::string_view thread_group_label,
37                        ThreadType thread_type_hint,
38                        TrackedRef<TaskTracker> task_tracker,
39                        TrackedRef<Delegate> delegate);
40 
41   ThreadGroupSemaphore(const ThreadGroupSemaphore&) = delete;
42   ThreadGroupSemaphore& operator=(const ThreadGroupSemaphore&) = delete;
43   // Destroying a ThreadGroupSemaphore returned by Create() is not allowed
44   // in production; it is always leaked. In tests, it can only be destroyed
45   // after JoinForTesting() has returned.
46   ~ThreadGroupSemaphore() override;
47 
48   // ThreadGroup:
49   void Start(size_t max_tasks,
50              size_t max_best_effort_tasks,
51              TimeDelta suggested_reclaim_time,
52              scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner,
53              WorkerThreadObserver* worker_thread_observer,
54              WorkerEnvironment worker_environment,
55              bool synchronous_thread_start_for_testing = false,
56              std::optional<TimeDelta> may_block_threshold =
57                  std::optional<TimeDelta>()) override;
58   void JoinForTesting() override;
59   void DidUpdateCanRunPolicy() override;
60   void OnShutdownStarted() override;
61   std::unique_ptr<BaseScopedCommandsExecutor> GetExecutor() override;
62   size_t NumberOfIdleWorkersLockRequiredForTesting() const
63       EXCLUSIVE_LOCKS_REQUIRED(lock_) override;
64 
65  private:
66   class SemaphoreScopedCommandsExecutor;
67   class SemaphoreWorkerDelegate;
68 
69   // friend tests so that they can access `blocked_workers_poll_period` and
70   // may_block_threshold(), both in ThreadGroup.
71   friend class ThreadGroupSemaphoreBlockingTest;
72   friend class ThreadGroupSemaphoreMayBlockTest;
73   FRIEND_TEST_ALL_PREFIXES(ThreadGroupSemaphoreBlockingTest,
74                            ThreadBlockUnblockPremature);
75   FRIEND_TEST_ALL_PREFIXES(ThreadGroupSemaphoreBlockingTest,
76                            ThreadBlockUnblockPrematureBestEffort);
77 
78   // ThreadGroup:
79   void UpdateSortKey(TaskSource::Transaction transaction) override;
80   void PushTaskSourceAndWakeUpWorkers(
81       RegisteredTaskSourceAndTransaction transaction_with_task_source) override;
82   void EnsureEnoughWorkersLockRequired(BaseScopedCommandsExecutor* executor)
83       override EXCLUSIVE_LOCKS_REQUIRED(lock_);
84   ThreadGroupWorkerDelegate* GetWorkerDelegate(WorkerThread* worker) override;
85 
86   void CreateAndRegisterWorkerLockRequired(
87       SemaphoreScopedCommandsExecutor* executor)
88       EXCLUSIVE_LOCKS_REQUIRED(lock_);
89 
90   // Passed to SemaphoreWorkerDelegate.
91   Semaphore semaphore_{0};
92   AtomicFlag join_called_for_testing_{};
93 
94   // A worker (any worker) becomes 'signaled' when the semaphore is incremented,
95   // and is no longer considered signaled at
96   // OnWorkerBecomesIdleLockRequired(). Will not exceed `workers_.size()` due to
97   // being deliberately capped in`EnsureEnoughWorkersLockRequired().
98   size_t num_active_signals_ GUARDED_BY(lock_) = 0;
99 
100   size_t worker_sequence_num_ GUARDED_BY(lock_) = 0;
101 
102   // Ensures recently cleaned up workers (ref.
103   // SemaphoreWorkerDelegate::CleanupLockRequired()) had time to exit as
104   // they have a raw reference to `this` (and to TaskTracker) which can
105   // otherwise result in racy use-after-frees per no longer being part of
106   // `workers_` and hence not being explicitly joined in JoinForTesting():
107   // https://crbug.com/810464. Uses AtomicRefCount to make its only public
108   // method thread-safe.
109   TrackedRefFactory<ThreadGroup> tracked_ref_factory_;
110 };
111 
112 }  // namespace internal
113 }  // namespace base
114 
115 #endif  // BASE_TASK_THREAD_POOL_THREAD_GROUP_SEMAPHORE_H_
116