xref: /aosp_15_r20/external/cronet/base/task/sequence_manager/sequence_manager_impl.h (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2018 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef BASE_TASK_SEQUENCE_MANAGER_SEQUENCE_MANAGER_IMPL_H_
6 #define BASE_TASK_SEQUENCE_MANAGER_SEQUENCE_MANAGER_IMPL_H_
7 
8 #include <deque>
9 #include <map>
10 #include <memory>
11 #include <optional>
12 #include <set>
13 #include <string>
14 #include <utility>
15 
16 #include "base/atomic_sequence_num.h"
17 #include "base/base_export.h"
18 #include "base/callback_list.h"
19 #include "base/containers/circular_deque.h"
20 #include "base/debug/crash_logging.h"
21 #include "base/feature_list.h"
22 #include "base/functional/callback_forward.h"
23 #include "base/memory/raw_ptr.h"
24 #include "base/memory/scoped_refptr.h"
25 #include "base/memory/weak_ptr.h"
26 #include "base/message_loop/message_pump_type.h"
27 #include "base/observer_list.h"
28 #include "base/pending_task.h"
29 #include "base/rand_util.h"
30 #include "base/run_loop.h"
31 #include "base/synchronization/lock.h"
32 #include "base/task/current_thread.h"
33 #include "base/task/sequence_manager/associated_thread_id.h"
34 #include "base/task/sequence_manager/enqueue_order.h"
35 #include "base/task/sequence_manager/enqueue_order_generator.h"
36 #include "base/task/sequence_manager/sequence_manager.h"
37 #include "base/task/sequence_manager/task_queue.h"
38 #include "base/task/sequence_manager/task_queue_impl.h"
39 #include "base/task/sequence_manager/task_queue_selector.h"
40 #include "base/task/sequence_manager/thread_controller.h"
41 #include "base/task/sequence_manager/work_tracker.h"
42 #include "base/task/sequenced_task_runner.h"
43 #include "base/task/single_thread_task_runner.h"
44 #include "base/threading/thread_checker.h"
45 #include "base/time/default_tick_clock.h"
46 #include "base/types/pass_key.h"
47 #include "base/values.h"
48 #include "build/build_config.h"
49 
50 namespace base {
51 
52 namespace internal {
53 class SequenceManagerThreadDelegate;
54 }
55 
56 namespace trace_event {
57 class ConvertableToTraceFormat;
58 }  // namespace trace_event
59 
60 namespace sequence_manager {
61 
62 class SequenceManagerForTest;
63 class TaskQueue;
64 class TaskTimeObserver;
65 class TimeDomain;
66 
67 namespace internal {
68 
69 class TaskQueueImpl;
70 class DefaultWakeUpQueue;
71 class SequenceManagerImpl;
72 class ThreadControllerImpl;
73 
74 // A private factory method for SequenceManagerThreadDelegate which is
75 // equivalent to sequence_manager::CreateUnboundSequenceManager() but returns
76 // the underlying impl.
77 std::unique_ptr<SequenceManagerImpl> CreateUnboundSequenceManagerImpl(
78     PassKey<base::internal::SequenceManagerThreadDelegate>,
79     SequenceManager::Settings settings);
80 
81 // The task queue manager provides N task queues and a selector interface for
82 // choosing which task queue to service next. Each task queue consists of two
83 // sub queues:
84 //
85 // 1. Incoming task queue. Tasks that are posted get immediately appended here.
86 //    When a task is appended into an empty incoming queue, the task manager
87 //    work function (DoWork()) is scheduled to run on the main task runner.
88 //
89 // 2. Work queue. If a work queue is empty when DoWork() is entered, tasks from
90 //    the incoming task queue (if any) are moved here. The work queues are
91 //    registered with the selector as input to the scheduling decision.
92 //
93 class BASE_EXPORT SequenceManagerImpl
94     : public SequenceManager,
95       public internal::SequencedTaskSource,
96       public internal::TaskQueueSelector::Observer,
97       public RunLoop::NestingObserver {
98  public:
99   using Observer = SequenceManager::Observer;
100 
101   SequenceManagerImpl(const SequenceManagerImpl&) = delete;
102   SequenceManagerImpl& operator=(const SequenceManagerImpl&) = delete;
103   ~SequenceManagerImpl() override;
104 
105   // Initializes features for this class. See `base::features::Init()`.
106   static void InitializeFeatures();
107 
108   // SequenceManager implementation:
109   void BindToCurrentThread() override;
110   scoped_refptr<SequencedTaskRunner> GetTaskRunnerForCurrentTask() override;
111   void BindToMessagePump(std::unique_ptr<MessagePump> message_pump) override;
112   void SetObserver(Observer* observer) override;
113   void AddTaskTimeObserver(TaskTimeObserver* task_time_observer) override;
114   void RemoveTaskTimeObserver(TaskTimeObserver* task_time_observer) override;
115   void SetTimeDomain(TimeDomain* time_domain) override;
116   void ResetTimeDomain() override;
117   const TickClock* GetTickClock() const override;
118   TimeTicks NowTicks() const override;
119   void SetDefaultTaskRunner(
120       scoped_refptr<SingleThreadTaskRunner> task_runner) override;
121   void ReclaimMemory() override;
122   bool GetAndClearSystemIsQuiescentBit() override;
123   void SetWorkBatchSize(int work_batch_size) override;
124   void EnableCrashKeys(const char* async_stack_crash_key) override;
125   const MetricRecordingSettings& GetMetricRecordingSettings() const override;
126   size_t GetPendingTaskCountForTesting() const override;
127   TaskQueue::Handle CreateTaskQueue(const TaskQueue::Spec& spec) override;
128   std::string DescribeAllPendingTasks() const override;
129   void PrioritizeYieldingToNative(base::TimeTicks prioritize_until) override;
130   void AddTaskObserver(TaskObserver* task_observer) override;
131   void RemoveTaskObserver(TaskObserver* task_observer) override;
132   std::optional<WakeUp> GetNextDelayedWakeUp() const override;
133   TaskQueue::QueuePriority GetPriorityCount() const override;
134 
135   // SequencedTaskSource implementation:
136   void SetRunTaskSynchronouslyAllowed(
137       bool can_run_tasks_synchronously) override;
138   std::optional<SelectedTask> SelectNextTask(
139       LazyNow& lazy_now,
140       SelectTaskOption option = SelectTaskOption::kDefault) override;
141   void DidRunTask(LazyNow& lazy_now) override;
142   std::optional<WakeUp> GetPendingWakeUp(
143       LazyNow* lazy_now,
144       SelectTaskOption option = SelectTaskOption::kDefault) override;
145   bool HasPendingHighResolutionTasks() override;
146   void OnBeginWork() override;
147   bool OnIdle() override;
148   void MaybeEmitTaskDetails(
149       perfetto::EventContext& ctx,
150       const SequencedTaskSource::SelectedTask& selected_task) const override;
151 
152   void AddDestructionObserver(
153       CurrentThread::DestructionObserver* destruction_observer);
154   void RemoveDestructionObserver(
155       CurrentThread::DestructionObserver* destruction_observer);
156   [[nodiscard]] CallbackListSubscription RegisterOnNextIdleCallback(
157       OnceClosure on_next_idle_callback);
158 
159   // Sets / returns the default TaskRunner. Thread-safe.
160   void SetTaskRunner(scoped_refptr<SingleThreadTaskRunner> task_runner);
161   scoped_refptr<SingleThreadTaskRunner> GetTaskRunner();
162 
163   bool IsBoundToCurrentThread() const;
164   MessagePump* GetMessagePump() const;
165   bool IsType(MessagePumpType type) const;
166   void SetAddQueueTimeToTasks(bool enable);
167   void SetTaskExecutionAllowedInNativeNestedLoop(bool allowed);
168   bool IsTaskExecutionAllowedInNativeNestedLoop() const;
169 #if BUILDFLAG(IS_IOS)
170   void AttachToMessagePump();
171 #endif
172   bool IsIdleForTesting() override;
173   void EnableMessagePumpTimeKeeperMetrics(const char* thread_name);
174 
175   // Requests that a task to process work is scheduled.
176   void ScheduleWork();
177 
178   // Returns the currently executing TaskQueue if any. Must be called on the
179   // thread this class was created on.
180   internal::TaskQueueImpl* currently_executing_task_queue() const;
181 
182   // Unregisters a TaskQueue previously created by |NewTaskQueue()|.
183   // No tasks will run on this queue after this call.
184   void UnregisterTaskQueueImpl(
185       std::unique_ptr<internal::TaskQueueImpl> task_queue);
186 
associated_thread()187   scoped_refptr<const AssociatedThreadId> associated_thread() const {
188     return associated_thread_;
189   }
190 
settings()191   const Settings& settings() const { return settings_; }
192 
193   WeakPtr<SequenceManagerImpl> GetWeakPtr();
194 
195   // How frequently to perform housekeeping tasks (sweeping canceled tasks etc).
196   static constexpr TimeDelta kReclaimMemoryInterval = Seconds(30);
197 
198  protected:
199   static std::unique_ptr<ThreadControllerImpl>
200   CreateThreadControllerImplForCurrentThread(const TickClock* clock);
201 
202   // Create a task queue manager where |controller| controls the thread
203   // on which the tasks are eventually run.
204   SequenceManagerImpl(std::unique_ptr<internal::ThreadController> controller,
205                       SequenceManager::Settings settings = Settings());
206 
207   friend class internal::TaskQueueImpl;
208   friend class internal::DefaultWakeUpQueue;
209   friend class ::base::sequence_manager::SequenceManagerForTest;
210 
211  private:
212   // Returns the SequenceManager running the
213   // current thread. It must only be used on the thread it was obtained.
214   // Only to be used by CurrentThread for the moment
215   static SequenceManagerImpl* GetCurrent();
216   friend class ::base::CurrentThread;
217 
218   // Factory friends to call into private creation methods.
219   friend std::unique_ptr<SequenceManager>
220       sequence_manager::CreateSequenceManagerOnCurrentThread(
221           SequenceManager::Settings);
222   friend std::unique_ptr<SequenceManager>
223   sequence_manager::CreateSequenceManagerOnCurrentThreadWithPump(
224       std::unique_ptr<MessagePump> message_pump,
225       SequenceManager::Settings);
226   friend std::unique_ptr<SequenceManager>
227       sequence_manager::CreateUnboundSequenceManager(SequenceManager::Settings);
228   friend std::unique_ptr<SequenceManagerImpl>
229       sequence_manager::internal::CreateUnboundSequenceManagerImpl(
230           PassKey<base::internal::SequenceManagerThreadDelegate>,
231           SequenceManager::Settings);
232 
233   // Assume direct control over current thread and create a SequenceManager.
234   // This function should be called only once per thread.
235   // This function assumes that a task execution environment is already
236   // initialized for the current thread.
237   static std::unique_ptr<SequenceManagerImpl> CreateOnCurrentThread(
238       SequenceManager::Settings settings);
239 
240   // Create an unbound SequenceManager (typically for a future thread). The
241   // SequenceManager can be initialized on the current thread and then needs to
242   // be bound and initialized on the target thread by calling one of the Bind*()
243   // methods.
244   static std::unique_ptr<SequenceManagerImpl> CreateUnbound(
245       SequenceManager::Settings settings);
246 
247   enum class ProcessTaskResult {
248     kDeferred,
249     kExecuted,
250     kSequenceManagerDeleted,
251   };
252 
253   // SequenceManager maintains a queue of non-nestable tasks since they're
254   // uncommon and allocating an extra deque per TaskQueue will waste the memory.
255   using NonNestableTaskDeque =
256       circular_deque<internal::TaskQueueImpl::DeferredNonNestableTask>;
257 
258   // We have to track rentrancy because we support nested runloops but the
259   // selector interface is unaware of those.  This struct keeps track off all
260   // task related state needed to make pairs of SelectNextTask() / DidRunTask()
261   // work.
262   struct ExecutingTask {
ExecutingTaskExecutingTask263     ExecutingTask(Task&& task,
264                   internal::TaskQueueImpl* task_queue,
265                   TaskQueue::TaskTiming task_timing)
266         : pending_task(std::move(task)),
267           task_queue(task_queue),
268           task_queue_name(task_queue->GetProtoName()),
269           task_timing(task_timing),
270           priority(task_queue->GetQueuePriority()),
271           task_type(pending_task.task_type) {}
272 
273     Task pending_task;
274 
275     // `task_queue` is not a raw_ptr<...> for performance reasons (based on
276     // analysis of sampling profiler data and tab_search:top100:2020).
277     RAW_PTR_EXCLUSION internal::TaskQueueImpl* task_queue = nullptr;
278     // Save task_queue_name as the task queue can be deleted within the task.
279     QueueName task_queue_name;
280     TaskQueue::TaskTiming task_timing;
281     // Save priority as it might change after running a task.
282     TaskQueue::QueuePriority priority;
283     // Save task metadata to use in after running a task as |pending_task|
284     // won't be available then.
285     int task_type;
286   };
287 
288   struct MainThreadOnly {
289     explicit MainThreadOnly(
290         SequenceManagerImpl* sequence_manager,
291         const scoped_refptr<AssociatedThreadId>& associated_thread,
292         const SequenceManager::Settings& settings,
293         const base::TickClock* clock);
294     ~MainThreadOnly();
295 
296     int nesting_depth = 0;
297     NonNestableTaskDeque non_nestable_task_queue;
298     // TODO(altimin): Switch to instruction pointer crash key when it's
299     // available.
300     raw_ptr<debug::CrashKeyString> file_name_crash_key = nullptr;
301     raw_ptr<debug::CrashKeyString> function_name_crash_key = nullptr;
302     raw_ptr<debug::CrashKeyString> async_stack_crash_key = nullptr;
303     std::array<char, static_cast<size_t>(debug::CrashKeySize::Size64)>
304         async_stack_buffer = {};
305 
306     std::optional<base::MetricsSubSampler> metrics_subsampler;
307 
308     internal::TaskQueueSelector selector;
309     ObserverList<TaskObserver>::UncheckedAndDanglingUntriaged task_observers;
310     ObserverList<TaskTimeObserver>::UncheckedAndDanglingUntriaged
311         task_time_observers;
312     const raw_ptr<const base::TickClock> default_clock;
313     raw_ptr<TimeDomain> time_domain = nullptr;
314 
315     std::unique_ptr<WakeUpQueue> wake_up_queue;
316     std::unique_ptr<WakeUpQueue> non_waking_wake_up_queue;
317 
318     // If true MaybeReclaimMemory will attempt to reclaim memory.
319     bool memory_reclaim_scheduled = false;
320 
321     // Used to ensure we don't perform expensive housekeeping too frequently.
322     TimeTicks next_time_to_reclaim_memory;
323 
324     // List of task queues managed by this SequenceManager.
325     // - active_queues contains queues that are still running tasks, which are
326     //   are owned by relevant TaskQueues.
327     // - queues_to_delete contains soon-to-be-deleted queues, because some
328     //   internal scheduling code does not expect queues to be pulled
329     //   from underneath.
330 
331     std::set<raw_ptr<internal::TaskQueueImpl, SetExperimental>> active_queues;
332 
333     std::map<internal::TaskQueueImpl*, std::unique_ptr<internal::TaskQueueImpl>>
334         queues_to_delete;
335 
336     bool task_was_run_on_quiescence_monitored_queue = false;
337     bool nesting_observer_registered_ = false;
338 
339     // Use std::deque() so that references returned by SelectNextTask() remain
340     // valid until the matching call to DidRunTask(), even when nested RunLoops
341     // cause tasks to be pushed on the stack in-between. This is needed because
342     // references are kept in local variables by calling code between
343     // SelectNextTask()/DidRunTask().
344     std::deque<ExecutingTask> task_execution_stack;
345 
346     raw_ptr<Observer> observer = nullptr;  // NOT OWNED
347 
348     ObserverList<CurrentThread::DestructionObserver>::
349         UncheckedAndDanglingUntriaged destruction_observers;
350 
351     // Notified the next time `OnIdle()` completes without scheduling additional
352     // work.
353     OnceClosureList on_next_idle_callbacks;
354   };
355 
356   void CompleteInitializationOnBoundThread();
357 
358   // TaskQueueSelector::Observer:
359   void OnTaskQueueEnabled(internal::TaskQueueImpl* queue) override;
360   void OnWorkAvailable() override;
361 
362   // RunLoop::NestingObserver:
363   void OnBeginNestedRunLoop() override;
364   void OnExitNestedRunLoop() override;
365 
366   // Schedules next wake-up at the given time, canceling any previous requests.
367   // Use std::nullopt to cancel a wake-up. Must be called on the thread this
368   // class was created on.
369   void SetNextWakeUp(LazyNow* lazy_now, std::optional<WakeUp> wake_up);
370 
371   // Called before TaskQueue requests to reload its empty immediate work queue.
372   void WillRequestReloadImmediateWorkQueue();
373 
374   // Returns a valid `SyncWorkAuthorization` if a call to `RunOrPostTask` on a
375   // `SequencedTaskRunner` bound to this `SequenceManager` may run its task
376   // synchronously.
377   SyncWorkAuthorization TryAcquireSyncWorkAuthorization();
378 
379   // Called when a task is about to be queued. May add metadata to the task and
380   // emit trace events.
381   void WillQueueTask(Task* pending_task);
382 
383   // Enqueues onto delayed WorkQueues all delayed tasks which must run now
384   // (cannot be postponed) and possibly some delayed tasks which can run now but
385   // could be postponed (due to how tasks are stored, it is not possible to
386   // retrieve all such tasks efficiently) and reloads any empty work queues.
387   void MoveReadyDelayedTasksToWorkQueues(LazyNow* lazy_now);
388 
389   void NotifyWillProcessTask(ExecutingTask* task, LazyNow* time_before_task);
390   void NotifyDidProcessTask(ExecutingTask* task, LazyNow* time_after_task);
391 
392   EnqueueOrder GetNextSequenceNumber();
393 
394   bool GetAddQueueTimeToTasks();
395 
396   std::unique_ptr<trace_event::ConvertableToTraceFormat>
397   AsValueWithSelectorResultForTracing(internal::WorkQueue* selected_work_queue,
398                                       bool force_verbose) const;
399   Value::Dict AsValueWithSelectorResult(
400       internal::WorkQueue* selected_work_queue,
401       bool force_verbose) const;
402 
403   // Used in construction of TaskQueueImpl to obtain an AtomicFlag which it can
404   // use to request reload by ReloadEmptyWorkQueues. The lifetime of
405   // TaskQueueImpl is managed by this class and the handle will be released by
406   // TaskQueueImpl::UnregisterTaskQueue which is always called before the
407   // queue's destruction.
408   AtomicFlagSet::AtomicFlag GetFlagToRequestReloadForEmptyQueue(
409       TaskQueueImpl* task_queue);
410 
411   // Calls |TakeImmediateIncomingQueueTasks| on all queues with their reload
412   // flag set in |empty_queues_to_reload_|.
413   void ReloadEmptyWorkQueues();
414 
415   std::unique_ptr<internal::TaskQueueImpl> CreateTaskQueueImpl(
416       const TaskQueue::Spec& spec);
417 
418   // Periodically reclaims memory by sweeping away canceled tasks and shrinking
419   // buffers.
420   void MaybeReclaimMemory();
421 
422   // Deletes queues marked for deletion and empty queues marked for shutdown.
423   void CleanUpQueues();
424 
425   // Removes canceled delayed tasks from the front of wake up queue.
426   void RemoveAllCanceledDelayedTasksFromFront(LazyNow* lazy_now);
427 
428   TaskQueue::TaskTiming::TimeRecordingPolicy ShouldRecordTaskTiming(
429       const internal::TaskQueueImpl* task_queue);
430   bool ShouldRecordCPUTimeForTask();
431 
432   // Write the async stack trace onto a crash key as whitespace-delimited hex
433   // addresses.
434   void RecordCrashKeys(const PendingTask&);
435 
436   // Helper to terminate all scoped trace events to allow starting new ones
437   // in SelectNextTask().
438   std::optional<SelectedTask> SelectNextTaskImpl(LazyNow& lazy_now,
439                                                  SelectTaskOption option);
440 
441   // Returns a wake-up for the next delayed task which is not ripe for
442   // execution, or nullopt if `option` is `kSkipDelayedTask` or there
443   // are no such tasks (immediate tasks don't count).
444   std::optional<WakeUp> GetNextDelayedWakeUpWithOption(
445       SelectTaskOption option) const;
446 
447   // Given a `wake_up` describing when the next delayed task should run, returns
448   // a wake up that should be scheduled on the thread. `is_immediate()` if the
449   // wake up should run immediately. `nullopt` if no wake up is required because
450   // `wake_up` is `nullopt` or a `time_domain` is used.
451   std::optional<WakeUp> AdjustWakeUp(std::optional<WakeUp> wake_up,
452                                      LazyNow* lazy_now) const;
453 
454   void MaybeAddLeewayToTask(Task& task) const;
455 
456 #if DCHECK_IS_ON()
457   void LogTaskDebugInfo(const internal::WorkQueue* work_queue) const;
458 #endif
459 
460   // Determines if wall time or thread time should be recorded for the next
461   // task.
462   TaskQueue::TaskTiming InitializeTaskTiming(
463       internal::TaskQueueImpl* task_queue);
464 
465   const scoped_refptr<AssociatedThreadId> associated_thread_;
466 
467   EnqueueOrderGenerator enqueue_order_generator_;
468 
469   const std::unique_ptr<internal::ThreadController> controller_;
470   const Settings settings_;
471 
472   const MetricRecordingSettings metric_recording_settings_;
473 
474   WorkTracker work_tracker_;
475 
476   // Whether to add the queue time to tasks.
477   base::subtle::Atomic32 add_queue_time_to_tasks_;
478 
479   AtomicFlagSet empty_queues_to_reload_;
480 
481   MainThreadOnly main_thread_only_;
main_thread_only()482   MainThreadOnly& main_thread_only() {
483     DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
484     return main_thread_only_;
485   }
main_thread_only()486   const MainThreadOnly& main_thread_only() const {
487     DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
488     return main_thread_only_;
489   }
490 
491   // |clock_| either refers to the TickClock representation of |time_domain|
492   // (same object) if any, or to |default_clock| otherwise. It is maintained as
493   // an atomic pointer here for multi-threaded usage.
494   std::atomic<const base::TickClock*> clock_;
main_thread_clock()495   const base::TickClock* main_thread_clock() const {
496     DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
497     return clock_.load(std::memory_order_relaxed);
498   }
any_thread_clock()499   const base::TickClock* any_thread_clock() const {
500     // |memory_order_acquire| matched by |memory_order_release| in
501     // SetTimeDomain() to ensure all data used by |clock_| is visible when read
502     // from the current thread. A thread might try to access a stale |clock_|
503     // but that's not an issue since |time_domain| contractually outlives
504     // SequenceManagerImpl even if it's reset.
505     return clock_.load(std::memory_order_acquire);
506   }
507 
508   WeakPtrFactory<SequenceManagerImpl> weak_factory_{this};
509 };
510 
511 }  // namespace internal
512 }  // namespace sequence_manager
513 }  // namespace base
514 
515 #endif  // BASE_TASK_SEQUENCE_MANAGER_SEQUENCE_MANAGER_IMPL_H_
516