// Copyright 2016 The Chromium Authors // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #ifndef BASE_TASK_THREAD_POOL_SEQUENCE_H_ #define BASE_TASK_THREAD_POOL_SEQUENCE_H_ #include #include "base/base_export.h" #include "base/containers/intrusive_heap.h" #include "base/containers/queue.h" #include "base/sequence_token.h" #include "base/task/task_traits.h" #include "base/task/thread_pool/pooled_parallel_task_runner.h" #include "base/task/thread_pool/task.h" #include "base/task/thread_pool/task_source.h" #include "base/task/thread_pool/task_source_sort_key.h" #include "base/thread_annotations.h" #include "base/threading/sequence_local_storage_map.h" namespace base { namespace internal { // A Sequence is intended to hold delayed tasks and immediate tasks. // Delayed tasks are held in a prority_queue until they are ripe and // immediate tasks in a simple fifo queue. // When Sequence::TakeTask is called, we select the next appropriate task // from both queues and return it. // Each queue holds slots each containing up to a single Task that must be // executed in posting/runtime order. // // In comments below, an "empty Sequence" is a Sequence with no slot. // // Note: there is a known refcounted-ownership cycle in the Scheduler // architecture: Sequence -> Task -> TaskRunner -> Sequence -> ... // This is okay so long as the other owners of Sequence (PriorityQueue and // WorkerThread in alternation and // ThreadGroup::WorkerThreadDelegateImpl::GetWork() // temporarily) keep running it (and taking Tasks from it as a result). A // dangling reference cycle would only occur should they release their reference // to it while it's not empty. In other words, it is only correct for them to // release it after PopTask() returns false to indicate it was made empty by // that call (in which case the next PushImmediateTask() will return true to // indicate to the caller that the Sequence should be re-enqueued for // execution). This class is thread-safe. class BASE_EXPORT Sequence : public TaskSource { public: // A Transaction can perform multiple operations atomically on a // Sequence. While a Transaction is alive, it is guaranteed that nothing // else will access the Sequence; the Sequence's lock is held for the // lifetime of the Transaction. class BASE_EXPORT Transaction : public TaskSource::Transaction { public: Transaction(Transaction&& other); Transaction(const Transaction&) = delete; Transaction& operator=(const Transaction&) = delete; ~Transaction(); // Returns true if the sequence must be added to the immediate queue after // receiving a new immediate Task in order to be scheduled. If the caller // doesn't want the sequence to be scheduled, it may not add the sequence to // the immediate queue even if this returns true. bool WillPushImmediateTask(); // Adds immediate |task| to the end of this sequence. void PushImmediateTask(Task task); // Adds a delayed |task| in this sequence, and returns true if the sequence // needs to be re-enqueued in the delayed queue as a result of this // sequence's delayed sort key changing. bool PushDelayedTask(Task task); Sequence* sequence() const { return static_cast(task_source()); } private: friend class Sequence; explicit Transaction(Sequence* sequence); }; // |traits| is metadata that applies to all Tasks in the Sequence. // |task_runner| is a reference to the TaskRunner feeding this TaskSource. // |task_runner| can be nullptr only for tasks with no TaskRunner, in which // case |execution_mode| must be kParallel. Otherwise, |execution_mode| is the // execution mode of |task_runner|. Sequence(const TaskTraits& traits, SequencedTaskRunner* task_runner, TaskSourceExecutionMode execution_mode); Sequence(const Sequence&) = delete; Sequence& operator=(const Sequence&) = delete; // Begins a Transaction. This method cannot be called on a thread which has an // active Sequence::Transaction. [[nodiscard]] Transaction BeginTransaction(); // TaskSource: ExecutionEnvironment GetExecutionEnvironment() override; size_t GetRemainingConcurrency() const override; TaskSourceSortKey GetSortKey() const override; TimeTicks GetDelayedSortKey() const override; // Returns a token that uniquely identifies this Sequence. const SequenceToken& token() const { return token_; } SequenceLocalStorageMap* sequence_local_storage() { return &sequence_local_storage_; } bool OnBecomeReady() override; bool has_worker_for_testing() const NO_THREAD_SAFETY_ANALYSIS { return has_worker_; } bool is_immediate_for_testing() const { return is_immediate_; } bool IsEmptyForTesting() const NO_THREAD_SAFETY_ANALYSIS { return IsEmpty(); } // A reference to TaskRunner is only retained between // PushImmediateTask()/PushDelayedTask() and when DidProcessTask() returns // false, guaranteeing it is safe to dereference this pointer. Otherwise, the // caller should guarantee such TaskRunner still exists before dereferencing. SequencedTaskRunner* task_runner() const { return task_runner_; } private: ~Sequence() override; struct DelayedTaskGreater { bool operator()(const Task& lhs, const Task& rhs) const; }; // TaskSource: RunStatus WillRunTask() override; Task TakeTask(TaskSource::Transaction* transaction) override; std::optional Clear(TaskSource::Transaction* transaction) override; bool DidProcessTask(TaskSource::Transaction* transaction) override; bool WillReEnqueue(TimeTicks now, TaskSource::Transaction* transaction) override; // Returns true if the delayed task to be posted will cause the delayed sort // key to change. bool DelayedSortKeyWillChange(const Task& delayed_task) const EXCLUSIVE_LOCKS_REQUIRED(lock_); // Selects the earliest task to run, either from immediate or // delayed queue and return it. // Expects this sequence to have at least one task that can run // immediately. Task TakeEarliestTask() EXCLUSIVE_LOCKS_REQUIRED(lock_); // Get and return next task from immediate queue Task TakeNextImmediateTask() EXCLUSIVE_LOCKS_REQUIRED(lock_); // Update the next earliest/latest ready time. void UpdateReadyTimes() EXCLUSIVE_LOCKS_REQUIRED(lock_); // Returns true if there are immediate tasks bool HasImmediateTasks() const EXCLUSIVE_LOCKS_REQUIRED(lock_); // Returns true if tasks ready to be executed bool HasReadyTasks(TimeTicks now) const override; bool IsEmpty() const EXCLUSIVE_LOCKS_REQUIRED(lock_); // Releases reference to TaskRunner. void ReleaseTaskRunner(); const SequenceToken token_ = SequenceToken::Create(); // A pointer to the TaskRunner that posts to this TaskSource, if any. The // derived class is responsible for calling AddRef() when a TaskSource from // which no Task is executing becomes non-empty and Release() when // it becomes empty again (e.g. when DidProcessTask() returns false). // // In practise, this pointer is going to become dangling. See task_runner() // comment. raw_ptr task_runner_; // Queues of tasks to execute. base::queue queue_ GUARDED_BY(lock_); base::IntrusiveHeap delayed_queue_ GUARDED_BY(lock_); // Caches the latest/earliest ready time for atomic access. Writes are // protected by |lock_|, but allows atomic reads outside of |lock_|. If this // sequence is empty, these are in an unknown state and shouldn't be read. // Minimum of latest_delayed_run_time() of next delayed task if any, and // |queue_time| of next immediate task if any. std::atomic latest_ready_time_ GUARDED_BY(lock_){TimeTicks()}; // is_null() if there is an immediate task, or earliest_delayed_run_time() of // next delayed task otherwise. std::atomic earliest_ready_time_ GUARDED_BY(lock_){TimeTicks()}; // True if a worker is currently associated with a Task from this Sequence. bool has_worker_ = false; // True if the sequence has ready tasks and requested to be queued as such // through WillPushImmediateTask() or OnBecomeReady(). Reset to false once all // ready tasks are done being processed and either DidProcessTask() or // WillReEnqueue() returned false. Normally, |is_immediate_| is protected by // |lock_|, except in OnBecomeReady() hence the use of atomics. std::atomic_bool is_immediate_{false}; // Holds data stored through the SequenceLocalStorageSlot API. SequenceLocalStorageMap sequence_local_storage_; }; } // namespace internal } // namespace base #endif // BASE_TASK_THREAD_POOL_SEQUENCE_H_