xref: /aosp_15_r20/external/cronet/base/task/thread_pool/sequence.h (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef BASE_TASK_THREAD_POOL_SEQUENCE_H_
6 #define BASE_TASK_THREAD_POOL_SEQUENCE_H_
7 
8 #include <stddef.h>
9 
10 #include "base/base_export.h"
11 #include "base/containers/intrusive_heap.h"
12 #include "base/containers/queue.h"
13 #include "base/sequence_token.h"
14 #include "base/task/task_traits.h"
15 #include "base/task/thread_pool/pooled_parallel_task_runner.h"
16 #include "base/task/thread_pool/task.h"
17 #include "base/task/thread_pool/task_source.h"
18 #include "base/task/thread_pool/task_source_sort_key.h"
19 #include "base/thread_annotations.h"
20 #include "base/threading/sequence_local_storage_map.h"
21 
22 namespace base {
23 namespace internal {
24 
25 // A Sequence is intended to hold delayed tasks and immediate tasks.
26 // Delayed tasks are held in a prority_queue until they are ripe and
27 // immediate tasks in a simple fifo queue.
28 // When Sequence::TakeTask is called, we select the next appropriate task
29 // from both queues and return it.
30 // Each queue holds slots each containing up to a single Task that must be
31 // executed in posting/runtime order.
32 //
33 // In comments below, an "empty Sequence" is a Sequence with no slot.
34 //
35 // Note: there is a known refcounted-ownership cycle in the Scheduler
36 // architecture: Sequence -> Task -> TaskRunner -> Sequence -> ...
37 // This is okay so long as the other owners of Sequence (PriorityQueue and
38 // WorkerThread in alternation and
39 // ThreadGroup::WorkerThreadDelegateImpl::GetWork()
40 // temporarily) keep running it (and taking Tasks from it as a result). A
41 // dangling reference cycle would only occur should they release their reference
42 // to it while it's not empty. In other words, it is only correct for them to
43 // release it after PopTask() returns false to indicate it was made empty by
44 // that call (in which case the next PushImmediateTask() will return true to
45 // indicate to the caller that the Sequence should be re-enqueued for
46 // execution). This class is thread-safe.
47 class BASE_EXPORT Sequence : public TaskSource {
48  public:
49   // A Transaction can perform multiple operations atomically on a
50   // Sequence. While a Transaction is alive, it is guaranteed that nothing
51   // else will access the Sequence; the Sequence's lock is held for the
52   // lifetime of the Transaction.
53   class BASE_EXPORT Transaction : public TaskSource::Transaction {
54    public:
55     Transaction(Transaction&& other);
56     Transaction(const Transaction&) = delete;
57     Transaction& operator=(const Transaction&) = delete;
58     ~Transaction();
59 
60     // Returns true if the sequence must be added to the immediate queue after
61     // receiving a new immediate Task in order to be scheduled. If the caller
62     // doesn't want the sequence to be scheduled, it may not add the sequence to
63     // the immediate queue even if this returns true.
64     bool WillPushImmediateTask();
65 
66     // Adds immediate |task| to the end of this sequence.
67     void PushImmediateTask(Task task);
68 
69     // Adds a delayed |task| in this sequence, and returns true if the sequence
70     // needs to be re-enqueued in the delayed queue as a result of this
71     // sequence's delayed sort key changing.
72     bool PushDelayedTask(Task task);
73 
sequence()74     Sequence* sequence() const { return static_cast<Sequence*>(task_source()); }
75 
76    private:
77     friend class Sequence;
78 
79     explicit Transaction(Sequence* sequence);
80   };
81 
82   // |traits| is metadata that applies to all Tasks in the Sequence.
83   // |task_runner| is a reference to the TaskRunner feeding this TaskSource.
84   // |task_runner| can be nullptr only for tasks with no TaskRunner, in which
85   // case |execution_mode| must be kParallel. Otherwise, |execution_mode| is the
86   // execution mode of |task_runner|.
87   Sequence(const TaskTraits& traits,
88            SequencedTaskRunner* task_runner,
89            TaskSourceExecutionMode execution_mode);
90   Sequence(const Sequence&) = delete;
91   Sequence& operator=(const Sequence&) = delete;
92 
93   // Begins a Transaction. This method cannot be called on a thread which has an
94   // active Sequence::Transaction.
95   [[nodiscard]] Transaction BeginTransaction();
96 
97   // TaskSource:
98   ExecutionEnvironment GetExecutionEnvironment() override;
99   size_t GetRemainingConcurrency() const override;
100   TaskSourceSortKey GetSortKey() const override;
101   TimeTicks GetDelayedSortKey() const override;
102 
103   // Returns a token that uniquely identifies this Sequence.
token()104   const SequenceToken& token() const { return token_; }
105 
sequence_local_storage()106   SequenceLocalStorageMap* sequence_local_storage() {
107     return &sequence_local_storage_;
108   }
109 
110   bool OnBecomeReady() override;
111 
has_worker_for_testing()112   bool has_worker_for_testing() const NO_THREAD_SAFETY_ANALYSIS {
113     return has_worker_;
114   }
is_immediate_for_testing()115   bool is_immediate_for_testing() const { return is_immediate_; }
IsEmptyForTesting()116   bool IsEmptyForTesting() const NO_THREAD_SAFETY_ANALYSIS { return IsEmpty(); }
117 
118   // A reference to TaskRunner is only retained between
119   // PushImmediateTask()/PushDelayedTask() and when DidProcessTask() returns
120   // false, guaranteeing it is safe to dereference this pointer. Otherwise, the
121   // caller should guarantee such TaskRunner still exists before dereferencing.
task_runner()122   SequencedTaskRunner* task_runner() const { return task_runner_; }
123 
124  private:
125   ~Sequence() override;
126 
127   struct DelayedTaskGreater {
128     bool operator()(const Task& lhs, const Task& rhs) const;
129   };
130 
131   // TaskSource:
132   RunStatus WillRunTask() override;
133   Task TakeTask(TaskSource::Transaction* transaction) override;
134   std::optional<Task> Clear(TaskSource::Transaction* transaction) override;
135   bool DidProcessTask(TaskSource::Transaction* transaction) override;
136   bool WillReEnqueue(TimeTicks now,
137                      TaskSource::Transaction* transaction) override;
138 
139   // Returns true if the delayed task to be posted will cause the delayed sort
140   // key to change.
141   bool DelayedSortKeyWillChange(const Task& delayed_task) const
142       EXCLUSIVE_LOCKS_REQUIRED(lock_);
143 
144   // Selects the earliest task to run, either from immediate or
145   // delayed queue and return it.
146   // Expects this sequence to have at least one task that can run
147   // immediately.
148   Task TakeEarliestTask() EXCLUSIVE_LOCKS_REQUIRED(lock_);
149 
150   // Get and return next task from immediate queue
151   Task TakeNextImmediateTask() EXCLUSIVE_LOCKS_REQUIRED(lock_);
152 
153   // Update the next earliest/latest ready time.
154   void UpdateReadyTimes() EXCLUSIVE_LOCKS_REQUIRED(lock_);
155 
156   // Returns true if there are immediate tasks
157   bool HasImmediateTasks() const EXCLUSIVE_LOCKS_REQUIRED(lock_);
158 
159   // Returns true if tasks ready to be executed
160   bool HasReadyTasks(TimeTicks now) const override;
161 
162   bool IsEmpty() const EXCLUSIVE_LOCKS_REQUIRED(lock_);
163 
164   // Releases reference to TaskRunner.
165   void ReleaseTaskRunner();
166 
167   const SequenceToken token_ = SequenceToken::Create();
168 
169   // A pointer to the TaskRunner that posts to this TaskSource, if any. The
170   // derived class is responsible for calling AddRef() when a TaskSource from
171   // which no Task is executing becomes non-empty and Release() when
172   // it becomes empty again (e.g. when DidProcessTask() returns false).
173   //
174   // In practise, this pointer is going to become dangling. See task_runner()
175   // comment.
176   raw_ptr<SequencedTaskRunner, DisableDanglingPtrDetection> task_runner_;
177 
178   // Queues of tasks to execute.
179   base::queue<Task> queue_ GUARDED_BY(lock_);
180   base::IntrusiveHeap<Task, DelayedTaskGreater> delayed_queue_
181       GUARDED_BY(lock_);
182 
183   // Caches the latest/earliest ready time for atomic access. Writes are
184   // protected by |lock_|, but allows atomic reads outside of |lock_|. If this
185   // sequence is empty, these are in an unknown state and shouldn't be read.
186 
187   // Minimum of latest_delayed_run_time() of next delayed task if any, and
188   // |queue_time| of next immediate task if any.
GUARDED_BY(lock_)189   std::atomic<TimeTicks> latest_ready_time_ GUARDED_BY(lock_){TimeTicks()};
190   // is_null() if there is an immediate task, or earliest_delayed_run_time() of
191   // next delayed task otherwise.
GUARDED_BY(lock_)192   std::atomic<TimeTicks> earliest_ready_time_ GUARDED_BY(lock_){TimeTicks()};
193 
194   // True if a worker is currently associated with a Task from this Sequence.
195   bool has_worker_ = false;
196 
197   // True if the sequence has ready tasks and requested to be queued as such
198   // through WillPushImmediateTask() or OnBecomeReady(). Reset to false once all
199   // ready tasks are done being processed and either DidProcessTask() or
200   // WillReEnqueue() returned false. Normally, |is_immediate_| is protected by
201   // |lock_|, except in OnBecomeReady() hence the use of atomics.
202   std::atomic_bool is_immediate_{false};
203 
204   // Holds data stored through the SequenceLocalStorageSlot API.
205   SequenceLocalStorageMap sequence_local_storage_;
206 };
207 
208 }  // namespace internal
209 }  // namespace base
210 
211 #endif  // BASE_TASK_THREAD_POOL_SEQUENCE_H_
212