xref: /aosp_15_r20/external/cronet/base/task/thread_pool/task_source.h (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2019 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef BASE_TASK_THREAD_POOL_TASK_SOURCE_H_
6 #define BASE_TASK_THREAD_POOL_TASK_SOURCE_H_
7 
8 #include <stddef.h>
9 
10 #include "base/base_export.h"
11 #include "base/containers/intrusive_heap.h"
12 #include "base/dcheck_is_on.h"
13 #include "base/memory/raw_ptr.h"
14 #include "base/memory/ref_counted.h"
15 #include "base/memory/stack_allocated.h"
16 #include "base/sequence_token.h"
17 #include "base/task/common/checked_lock.h"
18 #include "base/task/task_traits.h"
19 #include "base/task/thread_pool/task.h"
20 #include "base/task/thread_pool/task_source_sort_key.h"
21 #include "base/threading/sequence_local_storage_map.h"
22 #include "base/time/time.h"
23 
24 namespace base {
25 namespace internal {
26 
27 class TaskTracker;
28 
29 enum class TaskSourceExecutionMode {
30   kParallel,
31   kSequenced,
32   kSingleThread,
33   kJob,
34   kMax = kJob,
35 };
36 
37 struct BASE_EXPORT ExecutionEnvironment {
38   STACK_ALLOCATED();
39 
40  public:
ExecutionEnvironmentExecutionEnvironment41   ExecutionEnvironment(SequenceToken token) : token(token) {}
42 
ExecutionEnvironmentExecutionEnvironment43   ExecutionEnvironment(SequenceToken token,
44                        SequenceLocalStorageMap* sequence_local_storage,
45                        SingleThreadTaskRunner* single_thread_task_runner)
46       : token(token),
47         sequence_local_storage(sequence_local_storage),
48         single_thread_task_runner(single_thread_task_runner) {}
49 
ExecutionEnvironmentExecutionEnvironment50   ExecutionEnvironment(SequenceToken token,
51                        SequenceLocalStorageMap* sequence_local_storage,
52                        SequencedTaskRunner* sequenced_task_runner)
53       : token(token),
54         sequence_local_storage(sequence_local_storage),
55         sequenced_task_runner(sequenced_task_runner) {}
56   ~ExecutionEnvironment();
57 
58   const SequenceToken token;
59   SequenceLocalStorageMap* const sequence_local_storage = nullptr;
60   SingleThreadTaskRunner* const single_thread_task_runner = nullptr;
61   SequencedTaskRunner* const sequenced_task_runner = nullptr;
62 };
63 
64 // A TaskSource is a virtual class that provides a series of Tasks that must be
65 // executed immediately or in the future.
66 //
67 // When a task source has delayed tasks but no immediate tasks, the scheduler
68 // must call OnBecomeReady() after HasReadyTasks(now) == true, which is
69 // guaranteed once now >= GetDelayedSortKey().
70 //
71 // A task source is registered when it's ready to be added to the immediate
72 // queue. A task source is ready to be queued when either:
73 // 1- It has new tasks that can run concurrently as a result of external
74 //    operations, e.g. posting a new immediate task to an empty Sequence or
75 //    increasing max concurrency of a JobTaskSource;
76 // 2- A worker finished running a task from it and both DidProcessTask() and
77 //    WillReEnqueue() returned true; or
78 // 3- A worker is about to run a task from it and WillRunTask() returned
79 //    kAllowedNotSaturated.
80 // 4- A delayed task became ready and OnBecomeReady() returns true.
81 //
82 // A worker may perform the following sequence of operations on a
83 // RegisteredTaskSource after obtaining it from the queue:
84 // 1- Check whether a task can run with WillRunTask() (and register/enqueue the
85 //    task source again if not saturated).
86 // 2- (optional) Iff (1) determined that a task can run, access the next task
87 //    with TakeTask().
88 // 3- (optional) Execute the task.
89 // 4- Inform the task source that a task was processed with DidProcessTask(),
90 //    and re-enqueue the task source iff requested. The task source is ready to
91 //    run immediately iff WillReEnqueue() returns true.
92 // When a task source is registered multiple times, many overlapping chains of
93 // operations may run concurrently, as permitted by WillRunTask(). This allows
94 // tasks from the same task source to run in parallel.
95 // However, the following invariants are kept:
96 // - The number of workers concurrently running tasks never goes over the
97 //   intended concurrency.
98 // - If the task source has more tasks that can run concurrently, it must be
99 //   queued.
100 //
101 // Note: there is a known refcounted-ownership cycle in the ThreadPool
102 // architecture: TaskSource -> TaskRunner -> TaskSource -> ... This is okay so
103 // long as the other owners of TaskSource (PriorityQueue and WorkerThread in
104 // alternation and ThreadGroup::WorkerThreadDelegateImpl::GetWork()
105 // temporarily) keep running it (and taking Tasks from it as a result). A
106 // dangling reference cycle would only occur should they release their reference
107 // to it while it's not empty. In other words, it is only correct for them to
108 // release it when DidProcessTask() returns false.
109 //
110 // This class is thread-safe.
111 class BASE_EXPORT TaskSource : public RefCountedThreadSafe<TaskSource> {
112  public:
113   // Indicates whether WillRunTask() allows TakeTask() to be called on a
114   // RegisteredTaskSource.
115   enum class RunStatus {
116     // TakeTask() cannot be called.
117     kDisallowed,
118     // TakeTask() may called, and the TaskSource has not reached its maximum
119     // concurrency (i.e. the TaskSource still needs to be queued).
120     kAllowedNotSaturated,
121     // TakeTask() may called, and the TaskSource has reached its maximum
122     // concurrency (i.e. the TaskSource no longer needs to be queued).
123     kAllowedSaturated,
124   };
125 
126   // A Transaction can perform multiple operations atomically on a
127   // TaskSource. While a Transaction is alive, it is guaranteed that nothing
128   // else will access the TaskSource; the TaskSource's lock is held for the
129   // lifetime of the Transaction. No Transaction must be held when ~TaskSource()
130   // is called.
131   class BASE_EXPORT Transaction {
132     STACK_ALLOCATED();
133 
134    public:
135     Transaction(Transaction&& other);
136     Transaction(const Transaction&) = delete;
137     Transaction& operator=(const Transaction&) = delete;
138     ~Transaction();
139 
140     operator bool() const { return !!task_source_; }
141 
142     // Sets TaskSource priority to |priority|.
143     void UpdatePriority(TaskPriority priority);
144 
145     // Returns the traits of all Tasks in the TaskSource.
traits()146     TaskTraits traits() const { return task_source_->traits_; }
147 
task_source()148     TaskSource* task_source() const { return task_source_; }
149 
150     void Release();
151 
152    protected:
153     explicit Transaction(TaskSource* task_source);
154 
155    private:
156     friend class TaskSource;
157 
158     TaskSource* task_source_ = nullptr;
159   };
160 
161   // |traits| is metadata that applies to all Tasks in the TaskSource.
162   TaskSource(const TaskTraits& traits,
163              TaskSourceExecutionMode execution_mode);
164   TaskSource(const TaskSource&) = delete;
165   TaskSource& operator=(const TaskSource&) = delete;
166 
167   // Begins a Transaction. This method cannot be called on a thread which has an
168   // active TaskSource::Transaction.
169   [[nodiscard]] Transaction BeginTransaction();
170 
171   virtual ExecutionEnvironment GetExecutionEnvironment() = 0;
172 
173   // Thread-safe but the returned value may immediately be obsolete. As such
174   // this should only be used as a best-effort guess of how many more workers
175   // are needed. This may be called on an empty task source.
176   virtual size_t GetRemainingConcurrency() const = 0;
177 
178   // Returns a TaskSourceSortKey representing the priority of the TaskSource.
179   virtual TaskSourceSortKey GetSortKey() const = 0;
180   // Returns a Timeticks object representing the next delayed runtime of the
181   // TaskSource.
182   virtual TimeTicks GetDelayedSortKey() const = 0;
183   // Returns true if there are tasks ready to be executed. Thread-safe but the
184   // returned value may immediately be obsolete.
185   virtual bool HasReadyTasks(TimeTicks now) const = 0;
186   // Returns true if the TaskSource should be moved to the immediate queue
187   // due to ready delayed tasks. Note: Returns false if the TaskSource contains
188   // ready delayed tasks, but expects to already be in the immediate queue.
189   virtual bool OnBecomeReady() = 0;
190 
191   // Support for IntrusiveHeap in ThreadGroup::PriorityQueue.
192   void SetImmediateHeapHandle(const HeapHandle& handle);
193   void ClearImmediateHeapHandle();
GetImmediateHeapHandle()194   HeapHandle GetImmediateHeapHandle() const {
195     return immediate_pq_heap_handle_;
196   }
197 
immediate_heap_handle()198   HeapHandle immediate_heap_handle() const { return immediate_pq_heap_handle_; }
199 
200   // Support for IntrusiveHeap in ThreadGroup::DelayedPriorityQueue.
201   void SetDelayedHeapHandle(const HeapHandle& handle);
202   void ClearDelayedHeapHandle();
GetDelayedHeapHandle()203   HeapHandle GetDelayedHeapHandle() const { return delayed_pq_heap_handle_; }
204 
delayed_heap_handle()205   HeapHandle delayed_heap_handle() const { return delayed_pq_heap_handle_; }
206 
207   // Returns the shutdown behavior of all Tasks in the TaskSource. Can be
208   // accessed without a Transaction because it is never mutated.
shutdown_behavior()209   TaskShutdownBehavior shutdown_behavior() const {
210     return traits_.shutdown_behavior();
211   }
212   // Returns a racy priority of the TaskSource. Can be accessed without a
213   // Transaction but may return an outdated result.
priority_racy()214   TaskPriority priority_racy() const {
215     return priority_racy_.load(std::memory_order_relaxed);
216   }
217   // Returns the thread policy of the TaskSource. Can be accessed without a
218   // Transaction because it is never mutated.
thread_policy()219   ThreadPolicy thread_policy() const { return traits_.thread_policy(); }
220 
execution_mode()221   TaskSourceExecutionMode execution_mode() const { return execution_mode_; }
222 
223   void ClearForTesting();
224 
225  protected:
226   virtual ~TaskSource();
227 
228   virtual RunStatus WillRunTask() = 0;
229 
230   // Implementations of TakeTask(), DidProcessTask(), WillReEnqueue(), and
231   // Clear() must ensure proper synchronization iff |transaction| is nullptr.
232   virtual Task TakeTask(TaskSource::Transaction* transaction) = 0;
233   virtual bool DidProcessTask(TaskSource::Transaction* transaction) = 0;
234   virtual bool WillReEnqueue(TimeTicks now,
235                              TaskSource::Transaction* transaction) = 0;
236 
237   // This may be called for each outstanding RegisteredTaskSource that's ready.
238   // The implementation needs to support this being called multiple times;
239   // unless it guarantees never to hand-out multiple RegisteredTaskSources that
240   // are concurrently ready.
241   virtual std::optional<Task> Clear(TaskSource::Transaction* transaction) = 0;
242 
243   // Sets TaskSource priority to |priority|.
244   void UpdatePriority(TaskPriority priority);
245 
246   // The TaskTraits of all Tasks in the TaskSource.
247   TaskTraits traits_;
248 
249   // The cached priority for atomic access.
250   std::atomic<TaskPriority> priority_racy_;
251 
252   // Synchronizes access to all members.
253   mutable CheckedLock lock_{UniversalPredecessor()};
254 
255  private:
256   friend class RefCountedThreadSafe<TaskSource>;
257   friend class RegisteredTaskSource;
258 
259   // The TaskSource's position in its current PriorityQueue. Access is protected
260   // by the PriorityQueue's lock.
261   HeapHandle immediate_pq_heap_handle_;
262 
263   // The TaskSource's position in its current DelayedPriorityQueue. Access is
264   // protected by the DelayedPriorityQueue's lock.
265   HeapHandle delayed_pq_heap_handle_;
266 
267   TaskSourceExecutionMode execution_mode_;
268 };
269 
270 // Wrapper around TaskSource to signify the intent to queue and run it.
271 // RegisteredTaskSource can only be created with TaskTracker and may only be
272 // used by a single worker at a time. However, the same task source may be
273 // registered several times, spawning multiple RegisteredTaskSources. A
274 // RegisteredTaskSource resets to its initial state when WillRunTask() fails
275 // or after DidProcessTask() and WillReEnqueue(), so it can be used again.
276 class BASE_EXPORT RegisteredTaskSource {
277  public:
278   RegisteredTaskSource();
279   RegisteredTaskSource(std::nullptr_t);
280   RegisteredTaskSource(RegisteredTaskSource&& other) noexcept;
281   RegisteredTaskSource(const RegisteredTaskSource&) = delete;
282   RegisteredTaskSource& operator=(const RegisteredTaskSource&) = delete;
283   ~RegisteredTaskSource();
284 
285   RegisteredTaskSource& operator=(RegisteredTaskSource&& other);
286 
287   operator bool() const { return task_source_ != nullptr; }
288   TaskSource* operator->() const { return task_source_.get(); }
get()289   TaskSource* get() const { return task_source_.get(); }
290 
291   static RegisteredTaskSource CreateForTesting(
292       scoped_refptr<TaskSource> task_source,
293       TaskTracker* task_tracker = nullptr);
294 
295   // Can only be called if this RegisteredTaskSource is in its initial state.
296   // Returns the underlying task source. An Optional is used in preparation for
297   // the merge between ThreadPool and TaskQueueManager (in Blink).
298   // https://crbug.com/783309
299   scoped_refptr<TaskSource> Unregister();
300 
301   // Informs this TaskSource that the current worker would like to run a Task
302   // from it. Can only be called if in its initial state. Returns a RunStatus
303   // that indicates if the operation is allowed (TakeTask() can be called).
304   TaskSource::RunStatus WillRunTask();
305 
306   // Returns the next task to run from this TaskSource. This should be called
307   // only after WillRunTask() returned RunStatus::kAllowed*. |transaction| is
308   // optional and should only be provided if this operation is already part of
309   // a transaction.
310   [[nodiscard]] Task TakeTask(TaskSource::Transaction* transaction = nullptr);
311 
312   // Must be called after WillRunTask() or once the task was run if TakeTask()
313   // was called. This resets this RegisteredTaskSource to its initial state so
314   // that WillRunTask() may be called again. |transaction| is optional and
315   // should only be provided if this operation is already part of a transaction.
316   // Returns true if the TaskSource should be queued after this operation.
317   bool DidProcessTask(TaskSource::Transaction* transaction = nullptr);
318 
319   // Must be called iff DidProcessTask() previously returns true .
320   // |transaction| is optional and should only be provided if this
321   // operation is already part of a transaction. Returns true if the
322   // TaskSource is ready to run immediately.
323   bool WillReEnqueue(TimeTicks now,
324                      TaskSource::Transaction* transaction = nullptr);
325 
326   // Returns a task that clears this TaskSource to make it empty. |transaction|
327   // is optional and should only be provided if this operation is already part
328   // of a transaction.
329   [[nodiscard]] std::optional<Task> Clear(
330       TaskSource::Transaction* transaction = nullptr);
331 
332  private:
333   friend class TaskTracker;
334   RegisteredTaskSource(scoped_refptr<TaskSource> task_source,
335                        TaskTracker* task_tracker);
336 
337 #if DCHECK_IS_ON()
338   // Indicates the step of a task execution chain.
339   enum class State {
340     kInitial,       // WillRunTask() may be called.
341     kReady,         // After WillRunTask() returned a valid RunStatus.
342   };
343 
344   State run_step_ = State::kInitial;
345 #endif  // DCHECK_IS_ON()
346 
347   scoped_refptr<TaskSource> task_source_;
348   raw_ptr<TaskTracker, LeakedDanglingUntriaged> task_tracker_ = nullptr;
349 };
350 
351 // A pair of Transaction and RegisteredTaskSource. Useful to carry a
352 // RegisteredTaskSource with an associated Transaction.
353 struct BASE_EXPORT RegisteredTaskSourceAndTransaction {
354   STACK_ALLOCATED();
355 
356  public:
357   RegisteredTaskSourceAndTransaction(RegisteredTaskSource task_source_in,
358                                      TaskSource::Transaction transaction_in);
359 
360   RegisteredTaskSourceAndTransaction(
361       RegisteredTaskSourceAndTransaction&& other) = default;
362   RegisteredTaskSourceAndTransaction(
363       const RegisteredTaskSourceAndTransaction&) = delete;
364   RegisteredTaskSourceAndTransaction& operator=(
365       const RegisteredTaskSourceAndTransaction&) = delete;
366   ~RegisteredTaskSourceAndTransaction() = default;
367 
368   static RegisteredTaskSourceAndTransaction FromTaskSource(
369       RegisteredTaskSource task_source_in);
370 
371   RegisteredTaskSource task_source;
372   TaskSource::Transaction transaction;
373 };
374 
375 struct BASE_EXPORT TaskSourceAndTransaction {
376   STACK_ALLOCATED();
377 
378  public:
379   TaskSourceAndTransaction(scoped_refptr<TaskSource> task_source_in,
380                            TaskSource::Transaction transaction_in);
381 
382   TaskSourceAndTransaction(TaskSourceAndTransaction&& other);
383   TaskSourceAndTransaction(const TaskSourceAndTransaction&) = delete;
384   TaskSourceAndTransaction& operator=(const TaskSourceAndTransaction&) = delete;
385   ~TaskSourceAndTransaction();
386 
387   static TaskSourceAndTransaction FromTaskSource(
388       scoped_refptr<TaskSource> task_source_in);
389 
390   scoped_refptr<TaskSource> task_source;
391   TaskSource::Transaction transaction;
392 };
393 
394 }  // namespace internal
395 }  // namespace base
396 
397 #endif  // BASE_TASK_THREAD_POOL_TASK_SOURCE_H_
398