xref: /aosp_15_r20/external/cronet/base/task/thread_pool/task_tracker.h (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef BASE_TASK_THREAD_POOL_TASK_TRACKER_H_
6 #define BASE_TASK_THREAD_POOL_TASK_TRACKER_H_
7 
8 #include <atomic>
9 #include <functional>
10 #include <limits>
11 #include <memory>
12 #include <queue>
13 #include <string>
14 
15 #include "base/atomic_sequence_num.h"
16 #include "base/atomicops.h"
17 #include "base/base_export.h"
18 #include "base/containers/circular_deque.h"
19 #include "base/functional/callback_forward.h"
20 #include "base/sequence_checker.h"
21 #include "base/strings/string_piece.h"
22 #include "base/synchronization/waitable_event.h"
23 #include "base/task/common/checked_lock.h"
24 #include "base/task/common/task_annotator.h"
25 #include "base/task/task_traits.h"
26 #include "base/task/thread_pool/task.h"
27 #include "base/task/thread_pool/task_source.h"
28 #include "base/task/thread_pool/tracked_ref.h"
29 #include "base/thread_annotations.h"
30 #include "base/threading/thread_local.h"
31 
32 namespace base {
33 
34 class ConditionVariable;
35 
36 namespace internal {
37 
38 class JobTaskSource;
39 
40 // Determines which tasks are allowed to run.
41 enum class CanRunPolicy {
42   // All tasks are allowed to run.
43   kAll,
44   // Only USER_VISIBLE and USER_BLOCKING tasks are allowed to run.
45   kForegroundOnly,
46   // No tasks can run.
47   kNone,
48 };
49 
50 // TaskTracker enforces policies that determines whether:
51 // - A task can be pushed to a task source (WillPostTask).
52 // - A task source can be queued (WillQueueTaskSource).
53 // - Tasks for a given priority can run (CanRunPriority).
54 // - The next task in a queued task source can run (RunAndPopNextTask).
55 // TaskTracker also sets up the environment to run a task (RunAndPopNextTask)
56 // and records metrics and trace events. This class is thread-safe.
57 class BASE_EXPORT TaskTracker {
58  public:
59   TaskTracker();
60   TaskTracker(const TaskTracker&) = delete;
61   TaskTracker& operator=(const TaskTracker&) = delete;
62   virtual ~TaskTracker();
63 
64   // Initiates shutdown. Once this is called, only BLOCK_SHUTDOWN tasks will
65   // start running (doesn't affect tasks that are already running). This can
66   // only be called once.
67   void StartShutdown();
68 
69   // Synchronously completes shutdown. StartShutdown() must be called first.
70   // Returns when:
71   // - All SKIP_ON_SHUTDOWN tasks that were already running have completed their
72   //   execution.
73   // - All posted BLOCK_SHUTDOWN tasks have completed their execution.
74   // CONTINUE_ON_SHUTDOWN tasks still may be running after Shutdown returns.
75   // This can only be called once.
76   void CompleteShutdown();
77 
78   // Waits until there are no incomplete task sources. May be called in tests
79   // to validate that a condition is met after all task sources have run.
80   //
81   // Does not wait for delayed tasks. Waits for task sources posted from
82   // other threads during the call. Returns immediately when shutdown completes.
83   void FlushForTesting();
84 
85   // Returns and calls |flush_callback| when there are no incomplete undelayed
86   // tasks. |flush_callback| may be called back on any thread and should not
87   // perform a lot of work. May be used when additional work on the current
88   // thread needs to be performed during a flush. Only one
89   // FlushAsyncForTesting() may be pending at any given time.
90   void FlushAsyncForTesting(OnceClosure flush_callback);
91 
92   // Sets the new CanRunPolicy policy, possibly affecting result of
93   // CanRunPriority(). The caller must wake up worker as appropriate so that
94   // tasks that are allowed to run by the new policy can be scheduled.
95   void SetCanRunPolicy(CanRunPolicy can_run_policy);
96 
97   // Informs this TaskTracker that |task| with |shutdown_behavior| is about to
98   // be pushed to a task source (if non-delayed) or be added to the
99   // DelayedTaskManager (if delayed). Returns true if this operation is allowed
100   // (the operation should be performed if-and-only-if it is). This method may
101   // also modify metadata on |task| if desired.
102   // If this returns false, `task` must be leaked by the caller if deleting it
103   // on the current sequence may invoke sequence-affine code that belongs to
104   // another sequence.
105   bool WillPostTask(Task* task, TaskShutdownBehavior shutdown_behavior);
106 
107   // Informs this TaskTracker that |task| that is about to be pushed to a task
108   // source with |priority|. Returns true if this operation is allowed (the
109   // operation should be performed if-and-only-if it is).
110   [[nodiscard]] bool WillPostTaskNow(const Task& task,
111                                      TaskPriority priority) const;
112 
113   // Informs this TaskTracker that |task_source| is about to be queued. Returns
114   // a RegisteredTaskSource that should be queued if-and-only-if it evaluates to
115   // true.
116   RegisteredTaskSource RegisterTaskSource(
117       scoped_refptr<TaskSource> task_source);
118 
119   // Informs this TaskTracker that |task_source| is about to be queued.
120   void WillEnqueueJob(JobTaskSource* task_source);
121 
122   // Returns true if a task with |priority| can run under to the current policy.
123   bool CanRunPriority(TaskPriority priority) const;
124 
125   // Runs the next task in |task_source| unless the current shutdown state
126   // prevents that. Then, pops the task from |task_source| (even if it didn't
127   // run). Returns |task_source| if non-empty after popping a task from it
128   // (which indicates that it should be reenqueued). WillPostTask() must have
129   // allowed the task in front of |task_source| to be posted before this is
130   // called.
131   RegisteredTaskSource RunAndPopNextTask(RegisteredTaskSource task_source);
132 
133   // Returns true once shutdown has started (StartShutdown() was called).
134   // Note: sequential consistency with the thread calling StartShutdown() isn't
135   // guaranteed by this call.
136   bool HasShutdownStarted() const;
137 
138   // Returns true if shutdown has completed (StartShutdown() was called and
139   // no tasks are blocking shutdown).
140   bool IsShutdownComplete() const;
141 
GetTrackedRef()142   TrackedRef<TaskTracker> GetTrackedRef() {
143     return tracked_ref_factory_.GetTrackedRef();
144   }
145 
146   void BeginFizzlingBlockShutdownTasks();
147   void EndFizzlingBlockShutdownTasks();
148 
149   // Returns true if there are task sources that haven't completed their
150   // execution (still queued or in progress). If it returns false: the side-
151   // effects of all completed tasks are guaranteed to be visible to the caller.
152   bool HasIncompleteTaskSourcesForTesting() const;
153 
154  protected:
155   // Runs and deletes |task|. |task| is deleted in the environment where it
156   // runs. |task_source| is the task source from which |task| was extracted.
157   // |traits| are the traits of |task_source|. An override is expected to call
158   // its parent's implementation but is free to perform extra work before and
159   // after doing so.
160   virtual void RunTask(Task task,
161                        TaskSource* task_source,
162                        const TaskTraits& traits);
163 
164   // Allow a subclass to wait more interactively for any running shutdown tasks
165   // before blocking the thread.
166   virtual void BeginCompleteShutdown(base::WaitableEvent& shutdown_event);
167 
168   // Asserts that FlushForTesting() is allowed to be called. Overridden in tests
169   // in situations where it is not.
AssertFlushForTestingAllowed()170   virtual void AssertFlushForTestingAllowed() {}
171 
172  private:
173   friend class RegisteredTaskSource;
174   class State;
175 
176   void PerformShutdown();
177 
178   // Called before WillPostTask() informs the tracing system that a task has
179   // been posted. Updates |num_items_blocking_shutdown_| if necessary and
180   // returns true if the current shutdown state allows the task to be posted.
181   bool BeforeQueueTaskSource(TaskShutdownBehavior shutdown_behavior);
182 
183   // Called before a task with |effective_shutdown_behavior| is run by
184   // RunTask(). Updates |num_items_blocking_shutdown_| if necessary and returns
185   // true if the current shutdown state allows the task to be run.
186   bool BeforeRunTask(TaskShutdownBehavior shutdown_behavior);
187 
188   // Called after a task with |effective_shutdown_behavior| has been run by
189   // RunTask(). Updates |num_items_blocking_shutdown_| if necessary.
190   void AfterRunTask(TaskShutdownBehavior shutdown_behavior);
191 
192   // Informs this TaskTracker that |task_source| won't be reenqueued and returns
193   // the underlying TaskSource. This is called before destroying a valid
194   // RegisteredTaskSource. Updates |num_items_blocking_shutdown_| if necessary.
195   scoped_refptr<TaskSource> UnregisterTaskSource(
196       scoped_refptr<TaskSource> task_source);
197 
198   // Called when an item blocking shutdown finishes after shutdown has started.
199   void DecrementNumItemsBlockingShutdown();
200 
201   // Decrements the number of incomplete task sources and signals |flush_cv_|
202   // if it reaches zero.
203   void DecrementNumIncompleteTaskSources();
204 
205   // Invokes all |flush_callbacks_for_testing_| if any in a lock-safe manner.
206   void InvokeFlushCallbacksForTesting();
207 
208   // Dummy frames to allow identification of shutdown behavior in a stack trace.
209   void RunContinueOnShutdown(Task& task,
210                              const TaskTraits& traits,
211                              TaskSource* task_source,
212                              const SequenceToken& token);
213   void RunSkipOnShutdown(Task& task,
214                          const TaskTraits& traits,
215                          TaskSource* task_source,
216                          const SequenceToken& token);
217   void RunBlockShutdown(Task& task,
218                         const TaskTraits& traits,
219                         TaskSource* task_source,
220                         const SequenceToken& token);
221   void RunTaskWithShutdownBehavior(Task& task,
222                                    const TaskTraits& traits,
223                                    TaskSource* task_source,
224                                    const SequenceToken& token);
225 
226   NOT_TAIL_CALLED void RunTaskImpl(Task& task,
227                                    const TaskTraits& traits,
228                                    TaskSource* task_source,
229                                    const SequenceToken& token);
230 
231   TaskAnnotator task_annotator_;
232 
233   // Indicates whether logging information about TaskPriority::BEST_EFFORT tasks
234   // was enabled with a command line switch.
235   const bool has_log_best_effort_tasks_switch_;
236 
237   // Number of tasks blocking shutdown and boolean indicating whether shutdown
238   // has started. |shutdown_lock_| should be held to access |shutdown_event_|
239   // when this indicates that shutdown has started because State doesn't provide
240   // memory barriers. It intentionally trades having to use a Lock on shutdown
241   // with not needing memory barriers at runtime.
242   const std::unique_ptr<State> state_;
243 
244   // Number of task sources that haven't completed their execution. Is
245   // decremented with a memory barrier after the last task of a task source
246   // runs. Is accessed with an acquire memory barrier in FlushForTesting(). The
247   // memory barriers ensure that the memory written by flushed task sources is
248   // visible when FlushForTesting() returns.
249   std::atomic_int num_incomplete_task_sources_{0};
250 
251   // Global policy the determines result of CanRunPriority().
252   std::atomic<CanRunPolicy> can_run_policy_;
253 
254   // Lock associated with |flush_cv_|. Partially synchronizes access to
255   // |num_incomplete_task_sources_|. Full synchronization isn't needed
256   // because it's atomic, but synchronization is needed to coordinate waking and
257   // sleeping at the right time. Fully synchronizes access to
258   // |flush_callbacks_for_testing_|.
259   mutable CheckedLock flush_lock_;
260 
261   // Signaled when |num_incomplete_task_sources_| is or reaches zero or when
262   // shutdown completes.
263   ConditionVariable flush_cv_;
264 
265   // All invoked, if any, when |num_incomplete_task_sources_| is zero or when
266   // shutdown completes.
267   base::circular_deque<OnceClosure> flush_callbacks_for_testing_
268       GUARDED_BY(flush_lock_);
269 
270   // Synchronizes access to shutdown related members below.
271   mutable CheckedLock shutdown_lock_;
272 
273   // Event instantiated when shutdown starts and signaled when shutdown
274   // completes.
275   std::unique_ptr<WaitableEvent> shutdown_event_ GUARDED_BY(shutdown_lock_);
276 
277   // Used to generate unique |PendingTask::sequence_num| when posting tasks.
278   AtomicSequenceNumber sequence_nums_;
279 
280   // Ensures all state (e.g. dangling cleaned up workers) is coalesced before
281   // destroying the TaskTracker (e.g. in test environments).
282   // Ref. https://crbug.com/827615.
283   TrackedRefFactory<TaskTracker> tracked_ref_factory_;
284 };
285 
286 }  // namespace internal
287 }  // namespace base
288 
289 #endif  // BASE_TASK_THREAD_POOL_TASK_TRACKER_H_
290