1 // Copyright 2018 The Chromium Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef BASE_TASK_SEQUENCE_MANAGER_SEQUENCE_MANAGER_IMPL_H_ 6 #define BASE_TASK_SEQUENCE_MANAGER_SEQUENCE_MANAGER_IMPL_H_ 7 8 #include <deque> 9 #include <map> 10 #include <memory> 11 #include <optional> 12 #include <set> 13 #include <string> 14 #include <utility> 15 16 #include "base/atomic_sequence_num.h" 17 #include "base/base_export.h" 18 #include "base/callback_list.h" 19 #include "base/containers/circular_deque.h" 20 #include "base/debug/crash_logging.h" 21 #include "base/feature_list.h" 22 #include "base/functional/callback_forward.h" 23 #include "base/memory/raw_ptr.h" 24 #include "base/memory/scoped_refptr.h" 25 #include "base/memory/weak_ptr.h" 26 #include "base/message_loop/message_pump_type.h" 27 #include "base/observer_list.h" 28 #include "base/pending_task.h" 29 #include "base/rand_util.h" 30 #include "base/run_loop.h" 31 #include "base/synchronization/lock.h" 32 #include "base/task/current_thread.h" 33 #include "base/task/sequence_manager/associated_thread_id.h" 34 #include "base/task/sequence_manager/enqueue_order.h" 35 #include "base/task/sequence_manager/enqueue_order_generator.h" 36 #include "base/task/sequence_manager/sequence_manager.h" 37 #include "base/task/sequence_manager/task_queue.h" 38 #include "base/task/sequence_manager/task_queue_impl.h" 39 #include "base/task/sequence_manager/task_queue_selector.h" 40 #include "base/task/sequence_manager/thread_controller.h" 41 #include "base/task/sequence_manager/work_tracker.h" 42 #include "base/task/sequenced_task_runner.h" 43 #include "base/task/single_thread_task_runner.h" 44 #include "base/threading/thread_checker.h" 45 #include "base/time/default_tick_clock.h" 46 #include "base/types/pass_key.h" 47 #include "base/values.h" 48 #include "build/build_config.h" 49 50 namespace base { 51 52 namespace internal { 53 class SequenceManagerThreadDelegate; 54 } 55 56 namespace trace_event { 57 class ConvertableToTraceFormat; 58 } // namespace trace_event 59 60 namespace sequence_manager { 61 62 class SequenceManagerForTest; 63 class TaskQueue; 64 class TaskTimeObserver; 65 class TimeDomain; 66 67 namespace internal { 68 69 class TaskQueueImpl; 70 class DefaultWakeUpQueue; 71 class SequenceManagerImpl; 72 class ThreadControllerImpl; 73 74 // A private factory method for SequenceManagerThreadDelegate which is 75 // equivalent to sequence_manager::CreateUnboundSequenceManager() but returns 76 // the underlying impl. 77 std::unique_ptr<SequenceManagerImpl> CreateUnboundSequenceManagerImpl( 78 PassKey<base::internal::SequenceManagerThreadDelegate>, 79 SequenceManager::Settings settings); 80 81 // The task queue manager provides N task queues and a selector interface for 82 // choosing which task queue to service next. Each task queue consists of two 83 // sub queues: 84 // 85 // 1. Incoming task queue. Tasks that are posted get immediately appended here. 86 // When a task is appended into an empty incoming queue, the task manager 87 // work function (DoWork()) is scheduled to run on the main task runner. 88 // 89 // 2. Work queue. If a work queue is empty when DoWork() is entered, tasks from 90 // the incoming task queue (if any) are moved here. The work queues are 91 // registered with the selector as input to the scheduling decision. 92 // 93 class BASE_EXPORT SequenceManagerImpl 94 : public SequenceManager, 95 public internal::SequencedTaskSource, 96 public internal::TaskQueueSelector::Observer, 97 public RunLoop::NestingObserver { 98 public: 99 using Observer = SequenceManager::Observer; 100 101 SequenceManagerImpl(const SequenceManagerImpl&) = delete; 102 SequenceManagerImpl& operator=(const SequenceManagerImpl&) = delete; 103 ~SequenceManagerImpl() override; 104 105 // Initializes features for this class. See `base::features::Init()`. 106 static void InitializeFeatures(); 107 108 // SequenceManager implementation: 109 void BindToCurrentThread() override; 110 scoped_refptr<SequencedTaskRunner> GetTaskRunnerForCurrentTask() override; 111 void BindToMessagePump(std::unique_ptr<MessagePump> message_pump) override; 112 void SetObserver(Observer* observer) override; 113 void AddTaskTimeObserver(TaskTimeObserver* task_time_observer) override; 114 void RemoveTaskTimeObserver(TaskTimeObserver* task_time_observer) override; 115 void SetTimeDomain(TimeDomain* time_domain) override; 116 void ResetTimeDomain() override; 117 const TickClock* GetTickClock() const override; 118 TimeTicks NowTicks() const override; 119 void SetDefaultTaskRunner( 120 scoped_refptr<SingleThreadTaskRunner> task_runner) override; 121 void ReclaimMemory() override; 122 bool GetAndClearSystemIsQuiescentBit() override; 123 void SetWorkBatchSize(int work_batch_size) override; 124 void EnableCrashKeys(const char* async_stack_crash_key) override; 125 const MetricRecordingSettings& GetMetricRecordingSettings() const override; 126 size_t GetPendingTaskCountForTesting() const override; 127 TaskQueue::Handle CreateTaskQueue(const TaskQueue::Spec& spec) override; 128 std::string DescribeAllPendingTasks() const override; 129 void PrioritizeYieldingToNative(base::TimeTicks prioritize_until) override; 130 void AddTaskObserver(TaskObserver* task_observer) override; 131 void RemoveTaskObserver(TaskObserver* task_observer) override; 132 std::optional<WakeUp> GetNextDelayedWakeUp() const override; 133 TaskQueue::QueuePriority GetPriorityCount() const override; 134 135 // SequencedTaskSource implementation: 136 void SetRunTaskSynchronouslyAllowed( 137 bool can_run_tasks_synchronously) override; 138 std::optional<SelectedTask> SelectNextTask( 139 LazyNow& lazy_now, 140 SelectTaskOption option = SelectTaskOption::kDefault) override; 141 void DidRunTask(LazyNow& lazy_now) override; 142 std::optional<WakeUp> GetPendingWakeUp( 143 LazyNow* lazy_now, 144 SelectTaskOption option = SelectTaskOption::kDefault) override; 145 bool HasPendingHighResolutionTasks() override; 146 void OnBeginWork() override; 147 bool OnIdle() override; 148 void MaybeEmitTaskDetails( 149 perfetto::EventContext& ctx, 150 const SequencedTaskSource::SelectedTask& selected_task) const override; 151 152 void AddDestructionObserver( 153 CurrentThread::DestructionObserver* destruction_observer); 154 void RemoveDestructionObserver( 155 CurrentThread::DestructionObserver* destruction_observer); 156 [[nodiscard]] CallbackListSubscription RegisterOnNextIdleCallback( 157 OnceClosure on_next_idle_callback); 158 159 // Sets / returns the default TaskRunner. Thread-safe. 160 void SetTaskRunner(scoped_refptr<SingleThreadTaskRunner> task_runner); 161 scoped_refptr<SingleThreadTaskRunner> GetTaskRunner(); 162 163 bool IsBoundToCurrentThread() const; 164 MessagePump* GetMessagePump() const; 165 bool IsType(MessagePumpType type) const; 166 void SetAddQueueTimeToTasks(bool enable); 167 void SetTaskExecutionAllowedInNativeNestedLoop(bool allowed); 168 bool IsTaskExecutionAllowedInNativeNestedLoop() const; 169 #if BUILDFLAG(IS_IOS) 170 void AttachToMessagePump(); 171 #endif 172 bool IsIdleForTesting() override; 173 void EnableMessagePumpTimeKeeperMetrics(const char* thread_name); 174 175 // Requests that a task to process work is scheduled. 176 void ScheduleWork(); 177 178 // Returns the currently executing TaskQueue if any. Must be called on the 179 // thread this class was created on. 180 internal::TaskQueueImpl* currently_executing_task_queue() const; 181 182 // Unregisters a TaskQueue previously created by |NewTaskQueue()|. 183 // No tasks will run on this queue after this call. 184 void UnregisterTaskQueueImpl( 185 std::unique_ptr<internal::TaskQueueImpl> task_queue); 186 associated_thread()187 scoped_refptr<const AssociatedThreadId> associated_thread() const { 188 return associated_thread_; 189 } 190 settings()191 const Settings& settings() const { return settings_; } 192 193 WeakPtr<SequenceManagerImpl> GetWeakPtr(); 194 195 // How frequently to perform housekeeping tasks (sweeping canceled tasks etc). 196 static constexpr TimeDelta kReclaimMemoryInterval = Seconds(30); 197 198 protected: 199 static std::unique_ptr<ThreadControllerImpl> 200 CreateThreadControllerImplForCurrentThread(const TickClock* clock); 201 202 // Create a task queue manager where |controller| controls the thread 203 // on which the tasks are eventually run. 204 SequenceManagerImpl(std::unique_ptr<internal::ThreadController> controller, 205 SequenceManager::Settings settings = Settings()); 206 207 friend class internal::TaskQueueImpl; 208 friend class internal::DefaultWakeUpQueue; 209 friend class ::base::sequence_manager::SequenceManagerForTest; 210 211 private: 212 // Returns the SequenceManager running the 213 // current thread. It must only be used on the thread it was obtained. 214 // Only to be used by CurrentThread for the moment 215 static SequenceManagerImpl* GetCurrent(); 216 friend class ::base::CurrentThread; 217 218 // Factory friends to call into private creation methods. 219 friend std::unique_ptr<SequenceManager> 220 sequence_manager::CreateSequenceManagerOnCurrentThread( 221 SequenceManager::Settings); 222 friend std::unique_ptr<SequenceManager> 223 sequence_manager::CreateSequenceManagerOnCurrentThreadWithPump( 224 std::unique_ptr<MessagePump> message_pump, 225 SequenceManager::Settings); 226 friend std::unique_ptr<SequenceManager> 227 sequence_manager::CreateUnboundSequenceManager(SequenceManager::Settings); 228 friend std::unique_ptr<SequenceManagerImpl> 229 sequence_manager::internal::CreateUnboundSequenceManagerImpl( 230 PassKey<base::internal::SequenceManagerThreadDelegate>, 231 SequenceManager::Settings); 232 233 // Assume direct control over current thread and create a SequenceManager. 234 // This function should be called only once per thread. 235 // This function assumes that a task execution environment is already 236 // initialized for the current thread. 237 static std::unique_ptr<SequenceManagerImpl> CreateOnCurrentThread( 238 SequenceManager::Settings settings); 239 240 // Create an unbound SequenceManager (typically for a future thread). The 241 // SequenceManager can be initialized on the current thread and then needs to 242 // be bound and initialized on the target thread by calling one of the Bind*() 243 // methods. 244 static std::unique_ptr<SequenceManagerImpl> CreateUnbound( 245 SequenceManager::Settings settings); 246 247 enum class ProcessTaskResult { 248 kDeferred, 249 kExecuted, 250 kSequenceManagerDeleted, 251 }; 252 253 // SequenceManager maintains a queue of non-nestable tasks since they're 254 // uncommon and allocating an extra deque per TaskQueue will waste the memory. 255 using NonNestableTaskDeque = 256 circular_deque<internal::TaskQueueImpl::DeferredNonNestableTask>; 257 258 // We have to track rentrancy because we support nested runloops but the 259 // selector interface is unaware of those. This struct keeps track off all 260 // task related state needed to make pairs of SelectNextTask() / DidRunTask() 261 // work. 262 struct ExecutingTask { ExecutingTaskExecutingTask263 ExecutingTask(Task&& task, 264 internal::TaskQueueImpl* task_queue, 265 TaskQueue::TaskTiming task_timing) 266 : pending_task(std::move(task)), 267 task_queue(task_queue), 268 task_queue_name(task_queue->GetProtoName()), 269 task_timing(task_timing), 270 priority(task_queue->GetQueuePriority()), 271 task_type(pending_task.task_type) {} 272 273 Task pending_task; 274 275 // `task_queue` is not a raw_ptr<...> for performance reasons (based on 276 // analysis of sampling profiler data and tab_search:top100:2020). 277 RAW_PTR_EXCLUSION internal::TaskQueueImpl* task_queue = nullptr; 278 // Save task_queue_name as the task queue can be deleted within the task. 279 QueueName task_queue_name; 280 TaskQueue::TaskTiming task_timing; 281 // Save priority as it might change after running a task. 282 TaskQueue::QueuePriority priority; 283 // Save task metadata to use in after running a task as |pending_task| 284 // won't be available then. 285 int task_type; 286 }; 287 288 struct MainThreadOnly { 289 explicit MainThreadOnly( 290 SequenceManagerImpl* sequence_manager, 291 const scoped_refptr<AssociatedThreadId>& associated_thread, 292 const SequenceManager::Settings& settings, 293 const base::TickClock* clock); 294 ~MainThreadOnly(); 295 296 int nesting_depth = 0; 297 NonNestableTaskDeque non_nestable_task_queue; 298 // TODO(altimin): Switch to instruction pointer crash key when it's 299 // available. 300 raw_ptr<debug::CrashKeyString> file_name_crash_key = nullptr; 301 raw_ptr<debug::CrashKeyString> function_name_crash_key = nullptr; 302 raw_ptr<debug::CrashKeyString> async_stack_crash_key = nullptr; 303 std::array<char, static_cast<size_t>(debug::CrashKeySize::Size64)> 304 async_stack_buffer = {}; 305 306 std::optional<base::MetricsSubSampler> metrics_subsampler; 307 308 internal::TaskQueueSelector selector; 309 ObserverList<TaskObserver>::UncheckedAndDanglingUntriaged task_observers; 310 ObserverList<TaskTimeObserver>::UncheckedAndDanglingUntriaged 311 task_time_observers; 312 const raw_ptr<const base::TickClock> default_clock; 313 raw_ptr<TimeDomain> time_domain = nullptr; 314 315 std::unique_ptr<WakeUpQueue> wake_up_queue; 316 std::unique_ptr<WakeUpQueue> non_waking_wake_up_queue; 317 318 // If true MaybeReclaimMemory will attempt to reclaim memory. 319 bool memory_reclaim_scheduled = false; 320 321 // Used to ensure we don't perform expensive housekeeping too frequently. 322 TimeTicks next_time_to_reclaim_memory; 323 324 // List of task queues managed by this SequenceManager. 325 // - active_queues contains queues that are still running tasks, which are 326 // are owned by relevant TaskQueues. 327 // - queues_to_delete contains soon-to-be-deleted queues, because some 328 // internal scheduling code does not expect queues to be pulled 329 // from underneath. 330 331 std::set<raw_ptr<internal::TaskQueueImpl, SetExperimental>> active_queues; 332 333 std::map<internal::TaskQueueImpl*, std::unique_ptr<internal::TaskQueueImpl>> 334 queues_to_delete; 335 336 bool task_was_run_on_quiescence_monitored_queue = false; 337 bool nesting_observer_registered_ = false; 338 339 // Use std::deque() so that references returned by SelectNextTask() remain 340 // valid until the matching call to DidRunTask(), even when nested RunLoops 341 // cause tasks to be pushed on the stack in-between. This is needed because 342 // references are kept in local variables by calling code between 343 // SelectNextTask()/DidRunTask(). 344 std::deque<ExecutingTask> task_execution_stack; 345 346 raw_ptr<Observer> observer = nullptr; // NOT OWNED 347 348 ObserverList<CurrentThread::DestructionObserver>:: 349 UncheckedAndDanglingUntriaged destruction_observers; 350 351 // Notified the next time `OnIdle()` completes without scheduling additional 352 // work. 353 OnceClosureList on_next_idle_callbacks; 354 }; 355 356 void CompleteInitializationOnBoundThread(); 357 358 // TaskQueueSelector::Observer: 359 void OnTaskQueueEnabled(internal::TaskQueueImpl* queue) override; 360 void OnWorkAvailable() override; 361 362 // RunLoop::NestingObserver: 363 void OnBeginNestedRunLoop() override; 364 void OnExitNestedRunLoop() override; 365 366 // Schedules next wake-up at the given time, canceling any previous requests. 367 // Use std::nullopt to cancel a wake-up. Must be called on the thread this 368 // class was created on. 369 void SetNextWakeUp(LazyNow* lazy_now, std::optional<WakeUp> wake_up); 370 371 // Called before TaskQueue requests to reload its empty immediate work queue. 372 void WillRequestReloadImmediateWorkQueue(); 373 374 // Returns a valid `SyncWorkAuthorization` if a call to `RunOrPostTask` on a 375 // `SequencedTaskRunner` bound to this `SequenceManager` may run its task 376 // synchronously. 377 SyncWorkAuthorization TryAcquireSyncWorkAuthorization(); 378 379 // Called when a task is about to be queued. May add metadata to the task and 380 // emit trace events. 381 void WillQueueTask(Task* pending_task); 382 383 // Enqueues onto delayed WorkQueues all delayed tasks which must run now 384 // (cannot be postponed) and possibly some delayed tasks which can run now but 385 // could be postponed (due to how tasks are stored, it is not possible to 386 // retrieve all such tasks efficiently) and reloads any empty work queues. 387 void MoveReadyDelayedTasksToWorkQueues(LazyNow* lazy_now); 388 389 void NotifyWillProcessTask(ExecutingTask* task, LazyNow* time_before_task); 390 void NotifyDidProcessTask(ExecutingTask* task, LazyNow* time_after_task); 391 392 EnqueueOrder GetNextSequenceNumber(); 393 394 bool GetAddQueueTimeToTasks(); 395 396 std::unique_ptr<trace_event::ConvertableToTraceFormat> 397 AsValueWithSelectorResultForTracing(internal::WorkQueue* selected_work_queue, 398 bool force_verbose) const; 399 Value::Dict AsValueWithSelectorResult( 400 internal::WorkQueue* selected_work_queue, 401 bool force_verbose) const; 402 403 // Used in construction of TaskQueueImpl to obtain an AtomicFlag which it can 404 // use to request reload by ReloadEmptyWorkQueues. The lifetime of 405 // TaskQueueImpl is managed by this class and the handle will be released by 406 // TaskQueueImpl::UnregisterTaskQueue which is always called before the 407 // queue's destruction. 408 AtomicFlagSet::AtomicFlag GetFlagToRequestReloadForEmptyQueue( 409 TaskQueueImpl* task_queue); 410 411 // Calls |TakeImmediateIncomingQueueTasks| on all queues with their reload 412 // flag set in |empty_queues_to_reload_|. 413 void ReloadEmptyWorkQueues(); 414 415 std::unique_ptr<internal::TaskQueueImpl> CreateTaskQueueImpl( 416 const TaskQueue::Spec& spec); 417 418 // Periodically reclaims memory by sweeping away canceled tasks and shrinking 419 // buffers. 420 void MaybeReclaimMemory(); 421 422 // Deletes queues marked for deletion and empty queues marked for shutdown. 423 void CleanUpQueues(); 424 425 // Removes canceled delayed tasks from the front of wake up queue. 426 void RemoveAllCanceledDelayedTasksFromFront(LazyNow* lazy_now); 427 428 TaskQueue::TaskTiming::TimeRecordingPolicy ShouldRecordTaskTiming( 429 const internal::TaskQueueImpl* task_queue); 430 bool ShouldRecordCPUTimeForTask(); 431 432 // Write the async stack trace onto a crash key as whitespace-delimited hex 433 // addresses. 434 void RecordCrashKeys(const PendingTask&); 435 436 // Helper to terminate all scoped trace events to allow starting new ones 437 // in SelectNextTask(). 438 std::optional<SelectedTask> SelectNextTaskImpl(LazyNow& lazy_now, 439 SelectTaskOption option); 440 441 // Returns a wake-up for the next delayed task which is not ripe for 442 // execution, or nullopt if `option` is `kSkipDelayedTask` or there 443 // are no such tasks (immediate tasks don't count). 444 std::optional<WakeUp> GetNextDelayedWakeUpWithOption( 445 SelectTaskOption option) const; 446 447 // Given a `wake_up` describing when the next delayed task should run, returns 448 // a wake up that should be scheduled on the thread. `is_immediate()` if the 449 // wake up should run immediately. `nullopt` if no wake up is required because 450 // `wake_up` is `nullopt` or a `time_domain` is used. 451 std::optional<WakeUp> AdjustWakeUp(std::optional<WakeUp> wake_up, 452 LazyNow* lazy_now) const; 453 454 void MaybeAddLeewayToTask(Task& task) const; 455 456 #if DCHECK_IS_ON() 457 void LogTaskDebugInfo(const internal::WorkQueue* work_queue) const; 458 #endif 459 460 // Determines if wall time or thread time should be recorded for the next 461 // task. 462 TaskQueue::TaskTiming InitializeTaskTiming( 463 internal::TaskQueueImpl* task_queue); 464 465 const scoped_refptr<AssociatedThreadId> associated_thread_; 466 467 EnqueueOrderGenerator enqueue_order_generator_; 468 469 const std::unique_ptr<internal::ThreadController> controller_; 470 const Settings settings_; 471 472 const MetricRecordingSettings metric_recording_settings_; 473 474 WorkTracker work_tracker_; 475 476 // Whether to add the queue time to tasks. 477 base::subtle::Atomic32 add_queue_time_to_tasks_; 478 479 AtomicFlagSet empty_queues_to_reload_; 480 481 MainThreadOnly main_thread_only_; main_thread_only()482 MainThreadOnly& main_thread_only() { 483 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker); 484 return main_thread_only_; 485 } main_thread_only()486 const MainThreadOnly& main_thread_only() const { 487 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker); 488 return main_thread_only_; 489 } 490 491 // |clock_| either refers to the TickClock representation of |time_domain| 492 // (same object) if any, or to |default_clock| otherwise. It is maintained as 493 // an atomic pointer here for multi-threaded usage. 494 std::atomic<const base::TickClock*> clock_; main_thread_clock()495 const base::TickClock* main_thread_clock() const { 496 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker); 497 return clock_.load(std::memory_order_relaxed); 498 } any_thread_clock()499 const base::TickClock* any_thread_clock() const { 500 // |memory_order_acquire| matched by |memory_order_release| in 501 // SetTimeDomain() to ensure all data used by |clock_| is visible when read 502 // from the current thread. A thread might try to access a stale |clock_| 503 // but that's not an issue since |time_domain| contractually outlives 504 // SequenceManagerImpl even if it's reset. 505 return clock_.load(std::memory_order_acquire); 506 } 507 508 WeakPtrFactory<SequenceManagerImpl> weak_factory_{this}; 509 }; 510 511 } // namespace internal 512 } // namespace sequence_manager 513 } // namespace base 514 515 #endif // BASE_TASK_SEQUENCE_MANAGER_SEQUENCE_MANAGER_IMPL_H_ 516